1/* 2 * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved. 3 * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 1996-2003 Intel Corporation. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the 10 * OpenIB.org BSD license below: 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above 17 * copyright notice, this list of conditions and the following 18 * disclaimer. 19 * 20 * - Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials 23 * provided with the distribution. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32 * SOFTWARE. 33 * 34 */ 35 36/* 37 * Abstract: 38 * Implementation of Dispatcher abstraction. 39 * 40 */ 41 42#if HAVE_CONFIG_H 43# include <config.h> 44#endif /* HAVE_CONFIG_H */ 45 46#include <stdlib.h> 47#include <complib/cl_dispatcher.h> 48#include <complib/cl_thread.h> 49#include <complib/cl_timer.h> 50 51/* give some guidance when we build our cl_pool of messages */ 52#define CL_DISP_INITIAL_MSG_COUNT 256 53#define CL_DISP_MSG_GROW_SIZE 64 54 55/* give some guidance when we build our cl_pool of registration elements */ 56#define CL_DISP_INITIAL_REG_COUNT 16 57#define CL_DISP_REG_GROW_SIZE 16 58 59/******************************************************************** 60 __cl_disp_worker 61 62 Description: 63 This function takes messages off the FIFO and calls Processmsg() 64 This function executes as passive level. 65 66 Inputs: 67 p_disp - Pointer to Dispatcher object 68 69 Outputs: 70 None 71 72 Returns: 73 None 74********************************************************************/ 75void __cl_disp_worker(IN void *context) 76{ 77 cl_disp_msg_t *p_msg; 78 cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context; 79 80 cl_spinlock_acquire(&p_disp->lock); 81 82 /* Process the FIFO until we drain it dry. */ 83 while (cl_qlist_count(&p_disp->msg_fifo)) { 84 /* Pop the message at the head from the FIFO. */ 85 p_msg = 86 (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo); 87 88 /* we track the tim ethe last message spent in the queue */ 89 p_disp->last_msg_queue_time_us = 90 cl_get_time_stamp() - p_msg->in_time; 91 92 /* 93 * Release the spinlock while the message is processed. 94 * The user's callback may reenter the dispatcher 95 * and cause the lock to be reaquired. 96 */ 97 cl_spinlock_release(&p_disp->lock); 98 p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg-> 99 context, 100 (void *)p_msg->p_data); 101 102 cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt); 103 104 /* The client has seen the data. Notify the sender as appropriate. */ 105 if (p_msg->pfn_xmt_callback) { 106 p_msg->pfn_xmt_callback((void *)p_msg->context, 107 (void *)p_msg->p_data); 108 cl_atomic_dec(&p_msg->p_src_reg->ref_cnt); 109 } 110 111 /* Grab the lock for the next iteration through the list. */ 112 cl_spinlock_acquire(&p_disp->lock); 113 114 /* Return this message to the pool. */ 115 cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg); 116 } 117 118 cl_spinlock_release(&p_disp->lock); 119} 120 121/******************************************************************** 122 ********************************************************************/ 123void cl_disp_construct(IN cl_dispatcher_t * const p_disp) 124{ 125 CL_ASSERT(p_disp); 126 127 cl_qlist_init(&p_disp->reg_list); 128 cl_ptr_vector_construct(&p_disp->reg_vec); 129 cl_qlist_init(&p_disp->msg_fifo); 130 cl_spinlock_construct(&p_disp->lock); 131 cl_qpool_construct(&p_disp->msg_pool); 132} 133 134/******************************************************************** 135 ********************************************************************/ 136void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp) 137{ 138 CL_ASSERT(p_disp); 139 140 /* Stop the thread pool. */ 141 cl_thread_pool_destroy(&p_disp->worker_threads); 142 143 /* Process all outstanding callbacks. */ 144 __cl_disp_worker(p_disp); 145 146 /* Free all registration info. */ 147 while (!cl_is_qlist_empty(&p_disp->reg_list)) 148 free(cl_qlist_remove_head(&p_disp->reg_list)); 149} 150 151/******************************************************************** 152 ********************************************************************/ 153void cl_disp_destroy(IN cl_dispatcher_t * const p_disp) 154{ 155 CL_ASSERT(p_disp); 156 157 cl_spinlock_destroy(&p_disp->lock); 158 /* Destroy the message pool */ 159 cl_qpool_destroy(&p_disp->msg_pool); 160 /* Destroy the pointer vector of registrants. */ 161 cl_ptr_vector_destroy(&p_disp->reg_vec); 162} 163 164/******************************************************************** 165 ********************************************************************/ 166cl_status_t 167cl_disp_init(IN cl_dispatcher_t * const p_disp, 168 IN const uint32_t thread_count, IN const char *const name) 169{ 170 cl_status_t status; 171 172 CL_ASSERT(p_disp); 173 174 cl_disp_construct(p_disp); 175 176 status = cl_spinlock_init(&p_disp->lock); 177 if (status != CL_SUCCESS) { 178 cl_disp_destroy(p_disp); 179 return (status); 180 } 181 182 /* Specify no upper limit to the number of messages in the pool */ 183 status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT, 184 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t), 185 NULL, NULL, NULL); 186 if (status != CL_SUCCESS) { 187 cl_disp_destroy(p_disp); 188 return (status); 189 } 190 191 status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT, 192 CL_DISP_REG_GROW_SIZE); 193 if (status != CL_SUCCESS) { 194 cl_disp_destroy(p_disp); 195 return (status); 196 } 197 198 status = cl_thread_pool_init(&p_disp->worker_threads, thread_count, 199 __cl_disp_worker, p_disp, name); 200 if (status != CL_SUCCESS) 201 cl_disp_destroy(p_disp); 202 203 return (status); 204} 205 206/******************************************************************** 207 ********************************************************************/ 208cl_disp_reg_handle_t 209cl_disp_register(IN cl_dispatcher_t * const p_disp, 210 IN const cl_disp_msgid_t msg_id, 211 IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL, 212 IN const void *const context OPTIONAL) 213{ 214 cl_disp_reg_info_t *p_reg; 215 cl_status_t status; 216 217 CL_ASSERT(p_disp); 218 219 /* Check that the requested registrant ID is available. */ 220 cl_spinlock_acquire(&p_disp->lock); 221 if ((msg_id != CL_DISP_MSGID_NONE) && 222 (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) && 223 (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) { 224 cl_spinlock_release(&p_disp->lock); 225 return (NULL); 226 } 227 228 /* Get a registration info from the pool. */ 229 p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t)); 230 if (!p_reg) { 231 cl_spinlock_release(&p_disp->lock); 232 return (NULL); 233 } else { 234 memset(p_reg, 0, sizeof(cl_disp_reg_info_t)); 235 } 236 237 p_reg->p_disp = p_disp; 238 p_reg->ref_cnt = 0; 239 p_reg->pfn_rcv_callback = pfn_callback; 240 p_reg->context = context; 241 p_reg->msg_id = msg_id; 242 243 /* Insert the registration in the list. */ 244 cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg); 245 246 /* Set the array entry to the registrant. */ 247 /* The ptr_vector grow automatically as necessary. */ 248 if (msg_id != CL_DISP_MSGID_NONE) { 249 status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg); 250 if (status != CL_SUCCESS) { 251 free(p_reg); 252 cl_spinlock_release(&p_disp->lock); 253 return (NULL); 254 } 255 } 256 257 cl_spinlock_release(&p_disp->lock); 258 259 return (p_reg); 260} 261 262/******************************************************************** 263 ********************************************************************/ 264void cl_disp_unregister(IN const cl_disp_reg_handle_t handle) 265{ 266 cl_disp_reg_info_t *p_reg; 267 cl_dispatcher_t *p_disp; 268 269 if (handle == CL_DISP_INVALID_HANDLE) 270 return; 271 272 p_reg = (cl_disp_reg_info_t *) handle; 273 p_disp = p_reg->p_disp; 274 CL_ASSERT(p_disp); 275 276 cl_spinlock_acquire(&p_disp->lock); 277 /* 278 * Clear the registrant vector entry. This will cause any further 279 * post calls to fail. 280 */ 281 if (p_reg->msg_id != CL_DISP_MSGID_NONE) { 282 CL_ASSERT(p_reg->msg_id < 283 cl_ptr_vector_get_size(&p_disp->reg_vec)); 284 cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL); 285 } 286 cl_spinlock_release(&p_disp->lock); 287 288 while (p_reg->ref_cnt > 0) 289 cl_thread_suspend(1); 290 291 cl_spinlock_acquire(&p_disp->lock); 292 /* Remove the registrant from the list. */ 293 cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg); 294 /* Return the registration info to the pool */ 295 free(p_reg); 296 297 cl_spinlock_release(&p_disp->lock); 298} 299 300/******************************************************************** 301 ********************************************************************/ 302cl_status_t 303cl_disp_post(IN const cl_disp_reg_handle_t handle, 304 IN const cl_disp_msgid_t msg_id, 305 IN const void *const p_data, 306 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL, 307 IN const void *const context OPTIONAL) 308{ 309 cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle; 310 cl_disp_reg_info_t *p_dest_reg; 311 cl_dispatcher_t *p_disp; 312 cl_disp_msg_t *p_msg; 313 314 p_disp = handle->p_disp; 315 CL_ASSERT(p_disp); 316 CL_ASSERT(msg_id != CL_DISP_MSGID_NONE); 317 318 cl_spinlock_acquire(&p_disp->lock); 319 /* Check that the recipient exists. */ 320 p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id); 321 if (!p_dest_reg) { 322 cl_spinlock_release(&p_disp->lock); 323 return (CL_NOT_FOUND); 324 } 325 326 /* Get a free message from the pool. */ 327 p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool); 328 if (!p_msg) { 329 cl_spinlock_release(&p_disp->lock); 330 return (CL_INSUFFICIENT_MEMORY); 331 } 332 333 /* Initialize the message */ 334 p_msg->p_src_reg = p_src_reg; 335 p_msg->p_dest_reg = p_dest_reg; 336 p_msg->p_data = p_data; 337 p_msg->pfn_xmt_callback = pfn_callback; 338 p_msg->context = context; 339 p_msg->in_time = cl_get_time_stamp(); 340 341 /* 342 * Increment the sender's reference count if they request a completion 343 * notification. 344 */ 345 if (pfn_callback) 346 cl_atomic_inc(&p_src_reg->ref_cnt); 347 348 /* Increment the recipient's reference count. */ 349 cl_atomic_inc(&p_dest_reg->ref_cnt); 350 351 /* Queue the message in the FIFO. */ 352 cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg); 353 cl_spinlock_release(&p_disp->lock); 354 355 /* Signal the thread pool that there is work to be done. */ 356 cl_thread_pool_signal(&p_disp->worker_threads); 357 return (CL_SUCCESS); 358} 359 360void 361cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle, 362 OUT uint32_t * p_num_queued_msgs, 363 OUT uint64_t * p_last_msg_queue_time_ms) 364{ 365 cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp; 366 367 cl_spinlock_acquire(&p_disp->lock); 368 369 if (p_last_msg_queue_time_ms) 370 *p_last_msg_queue_time_ms = 371 p_disp->last_msg_queue_time_us / 1000; 372 373 if (p_num_queued_msgs) 374 *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo); 375 376 cl_spinlock_release(&p_disp->lock); 377} 378