cl_dispatcher.c revision 321936
1139743Simp/* 243412Snewton * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved. 343412Snewton * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved. 443412Snewton * Copyright (c) 1996-2003 Intel Corporation. All rights reserved. 543412Snewton * 643412Snewton * This software is available to you under a choice of one of two 743412Snewton * licenses. You may choose to be licensed under the terms of the GNU 843412Snewton * General Public License (GPL) Version 2, available from the file 943412Snewton * COPYING in the main directory of this source tree, or the 1043412Snewton * OpenIB.org BSD license below: 1143412Snewton * 1243412Snewton * Redistribution and use in source and binary forms, with or 1343412Snewton * without modification, are permitted provided that the following 1443412Snewton * conditions are met: 1543412Snewton * 1643412Snewton * - Redistributions of source code must retain the above 1743412Snewton * copyright notice, this list of conditions and the following 1843412Snewton * disclaimer. 1943412Snewton * 2043412Snewton * - Redistributions in binary form must reproduce the above 2143412Snewton * copyright notice, this list of conditions and the following 2243412Snewton * disclaimer in the documentation and/or other materials 2343412Snewton * provided with the distribution. 2443412Snewton * 2543412Snewton * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 2643412Snewton * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 2749267Snewton * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 2850477Speter * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 2943412Snewton * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 3043412Snewton * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 3143412Snewton * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 3243412Snewton * SOFTWARE. 3343412Snewton * 3443412Snewton */ 3543412Snewton 3643412Snewton/* 3743412Snewton * Abstract: 3843412Snewton * Implementation of Dispatcher abstraction. 3943412Snewton * 4043412Snewton */ 4143412Snewton 4243412Snewton#if HAVE_CONFIG_H 4343412Snewton# include <config.h> 4443412Snewton#endif /* HAVE_CONFIG_H */ 4543412Snewton 4643412Snewton#include <stdlib.h> 4743412Snewton#include <complib/cl_dispatcher.h> 4843412Snewton#include <complib/cl_thread.h> 4943412Snewton#include <complib/cl_timer.h> 5043412Snewton 5143412Snewton/* give some guidance when we build our cl_pool of messages */ 5243412Snewton#define CL_DISP_INITIAL_MSG_COUNT 256 5343412Snewton#define CL_DISP_MSG_GROW_SIZE 64 5443412Snewton 5543412Snewton/* give some guidance when we build our cl_pool of registration elements */ 5643412Snewton#define CL_DISP_INITIAL_REG_COUNT 16 5743412Snewton#define CL_DISP_REG_GROW_SIZE 16 5843412Snewton 5943412Snewton/******************************************************************** 6043412Snewton __cl_disp_worker 6143412Snewton 6243412Snewton Description: 6343412Snewton This function takes messages off the FIFO and calls Processmsg() 6443412Snewton This function executes as passive level. 6543412Snewton 6643412Snewton Inputs: 6743412Snewton p_disp - Pointer to Dispatcher object 6843412Snewton 6943412Snewton Outputs: 7043412Snewton None 7143412Snewton 7243412Snewton Returns: 7343412Snewton None 7443412Snewton********************************************************************/ 7543412Snewtonvoid __cl_disp_worker(IN void *context) 7643412Snewton{ 7743412Snewton cl_disp_msg_t *p_msg; 7843412Snewton cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context; 7943412Snewton 8043412Snewton cl_spinlock_acquire(&p_disp->lock); 8143412Snewton 8243412Snewton /* Process the FIFO until we drain it dry. */ 8343412Snewton while (cl_qlist_count(&p_disp->msg_fifo)) { 8443412Snewton /* Pop the message at the head from the FIFO. */ 8543412Snewton p_msg = 8643412Snewton (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo); 8743412Snewton 88 /* we track the tim ethe last message spent in the queue */ 89 p_disp->last_msg_queue_time_us = 90 cl_get_time_stamp() - p_msg->in_time; 91 92 /* 93 * Release the spinlock while the message is processed. 94 * The user's callback may reenter the dispatcher 95 * and cause the lock to be reaquired. 96 */ 97 cl_spinlock_release(&p_disp->lock); 98 p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg-> 99 context, 100 (void *)p_msg->p_data); 101 102 cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt); 103 104 /* The client has seen the data. Notify the sender as appropriate. */ 105 if (p_msg->pfn_xmt_callback) { 106 p_msg->pfn_xmt_callback((void *)p_msg->context, 107 (void *)p_msg->p_data); 108 cl_atomic_dec(&p_msg->p_src_reg->ref_cnt); 109 } 110 111 /* Grab the lock for the next iteration through the list. */ 112 cl_spinlock_acquire(&p_disp->lock); 113 114 /* Return this message to the pool. */ 115 cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg); 116 } 117 118 cl_spinlock_release(&p_disp->lock); 119} 120 121void cl_disp_construct(IN cl_dispatcher_t * const p_disp) 122{ 123 CL_ASSERT(p_disp); 124 125 cl_qlist_init(&p_disp->reg_list); 126 cl_ptr_vector_construct(&p_disp->reg_vec); 127 cl_qlist_init(&p_disp->msg_fifo); 128 cl_spinlock_construct(&p_disp->lock); 129 cl_qpool_construct(&p_disp->msg_pool); 130} 131 132void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp) 133{ 134 CL_ASSERT(p_disp); 135 136 /* Stop the thread pool. */ 137 cl_thread_pool_destroy(&p_disp->worker_threads); 138 139 /* Process all outstanding callbacks. */ 140 __cl_disp_worker(p_disp); 141 142 /* Free all registration info. */ 143 while (!cl_is_qlist_empty(&p_disp->reg_list)) 144 free(cl_qlist_remove_head(&p_disp->reg_list)); 145} 146 147void cl_disp_destroy(IN cl_dispatcher_t * const p_disp) 148{ 149 CL_ASSERT(p_disp); 150 151 cl_spinlock_destroy(&p_disp->lock); 152 /* Destroy the message pool */ 153 cl_qpool_destroy(&p_disp->msg_pool); 154 /* Destroy the pointer vector of registrants. */ 155 cl_ptr_vector_destroy(&p_disp->reg_vec); 156} 157 158cl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp, 159 IN const uint32_t thread_count, 160 IN const char *const name) 161{ 162 cl_status_t status; 163 164 CL_ASSERT(p_disp); 165 166 cl_disp_construct(p_disp); 167 168 status = cl_spinlock_init(&p_disp->lock); 169 if (status != CL_SUCCESS) { 170 cl_disp_destroy(p_disp); 171 return (status); 172 } 173 174 /* Specify no upper limit to the number of messages in the pool */ 175 status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT, 176 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t), 177 NULL, NULL, NULL); 178 if (status != CL_SUCCESS) { 179 cl_disp_destroy(p_disp); 180 return (status); 181 } 182 183 status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT, 184 CL_DISP_REG_GROW_SIZE); 185 if (status != CL_SUCCESS) { 186 cl_disp_destroy(p_disp); 187 return (status); 188 } 189 190 status = cl_thread_pool_init(&p_disp->worker_threads, thread_count, 191 __cl_disp_worker, p_disp, name); 192 if (status != CL_SUCCESS) 193 cl_disp_destroy(p_disp); 194 195 return (status); 196} 197 198cl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp, 199 IN const cl_disp_msgid_t msg_id, 200 IN cl_pfn_msgrcv_cb_t pfn_callback 201 OPTIONAL, 202 IN const void *const context OPTIONAL) 203{ 204 cl_disp_reg_info_t *p_reg; 205 cl_status_t status; 206 207 CL_ASSERT(p_disp); 208 209 /* Check that the requested registrant ID is available. */ 210 cl_spinlock_acquire(&p_disp->lock); 211 if ((msg_id != CL_DISP_MSGID_NONE) && 212 (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) && 213 (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) { 214 cl_spinlock_release(&p_disp->lock); 215 return (NULL); 216 } 217 218 /* Get a registration info from the pool. */ 219 p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t)); 220 if (!p_reg) { 221 cl_spinlock_release(&p_disp->lock); 222 return (NULL); 223 } else { 224 memset(p_reg, 0, sizeof(cl_disp_reg_info_t)); 225 } 226 227 p_reg->p_disp = p_disp; 228 p_reg->ref_cnt = 0; 229 p_reg->pfn_rcv_callback = pfn_callback; 230 p_reg->context = context; 231 p_reg->msg_id = msg_id; 232 233 /* Insert the registration in the list. */ 234 cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg); 235 236 /* Set the array entry to the registrant. */ 237 /* The ptr_vector grow automatically as necessary. */ 238 if (msg_id != CL_DISP_MSGID_NONE) { 239 status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg); 240 if (status != CL_SUCCESS) { 241 free(p_reg); 242 cl_spinlock_release(&p_disp->lock); 243 return (NULL); 244 } 245 } 246 247 cl_spinlock_release(&p_disp->lock); 248 249 return (p_reg); 250} 251 252void cl_disp_unregister(IN const cl_disp_reg_handle_t handle) 253{ 254 cl_disp_reg_info_t *p_reg; 255 cl_dispatcher_t *p_disp; 256 257 if (handle == CL_DISP_INVALID_HANDLE) 258 return; 259 260 p_reg = (cl_disp_reg_info_t *) handle; 261 p_disp = p_reg->p_disp; 262 CL_ASSERT(p_disp); 263 264 cl_spinlock_acquire(&p_disp->lock); 265 /* 266 * Clear the registrant vector entry. This will cause any further 267 * post calls to fail. 268 */ 269 if (p_reg->msg_id != CL_DISP_MSGID_NONE) { 270 CL_ASSERT(p_reg->msg_id < 271 cl_ptr_vector_get_size(&p_disp->reg_vec)); 272 cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL); 273 } 274 cl_spinlock_release(&p_disp->lock); 275 276 while (p_reg->ref_cnt > 0) 277 cl_thread_suspend(1); 278 279 cl_spinlock_acquire(&p_disp->lock); 280 /* Remove the registrant from the list. */ 281 cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg); 282 /* Return the registration info to the pool */ 283 free(p_reg); 284 285 cl_spinlock_release(&p_disp->lock); 286} 287 288cl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle, 289 IN const cl_disp_msgid_t msg_id, 290 IN const void *const p_data, 291 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL, 292 IN const void *const context OPTIONAL) 293{ 294 cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle; 295 cl_disp_reg_info_t *p_dest_reg; 296 cl_dispatcher_t *p_disp; 297 cl_disp_msg_t *p_msg; 298 299 p_disp = handle->p_disp; 300 CL_ASSERT(p_disp); 301 CL_ASSERT(msg_id != CL_DISP_MSGID_NONE); 302 303 cl_spinlock_acquire(&p_disp->lock); 304 /* Check that the recipient exists. */ 305 if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) { 306 cl_spinlock_release(&p_disp->lock); 307 return (CL_NOT_FOUND); 308 } 309 310 p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id); 311 if (!p_dest_reg) { 312 cl_spinlock_release(&p_disp->lock); 313 return (CL_NOT_FOUND); 314 } 315 316 /* Get a free message from the pool. */ 317 p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool); 318 if (!p_msg) { 319 cl_spinlock_release(&p_disp->lock); 320 return (CL_INSUFFICIENT_MEMORY); 321 } 322 323 /* Initialize the message */ 324 p_msg->p_src_reg = p_src_reg; 325 p_msg->p_dest_reg = p_dest_reg; 326 p_msg->p_data = p_data; 327 p_msg->pfn_xmt_callback = pfn_callback; 328 p_msg->context = context; 329 p_msg->in_time = cl_get_time_stamp(); 330 331 /* 332 * Increment the sender's reference count if they request a completion 333 * notification. 334 */ 335 if (pfn_callback) 336 cl_atomic_inc(&p_src_reg->ref_cnt); 337 338 /* Increment the recipient's reference count. */ 339 cl_atomic_inc(&p_dest_reg->ref_cnt); 340 341 /* Queue the message in the FIFO. */ 342 cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg); 343 cl_spinlock_release(&p_disp->lock); 344 345 /* Signal the thread pool that there is work to be done. */ 346 cl_thread_pool_signal(&p_disp->worker_threads); 347 return (CL_SUCCESS); 348} 349 350void cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle, 351 OUT uint32_t * p_num_queued_msgs, 352 OUT uint64_t * p_last_msg_queue_time_ms) 353{ 354 cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp; 355 356 cl_spinlock_acquire(&p_disp->lock); 357 358 if (p_last_msg_queue_time_ms) 359 *p_last_msg_queue_time_ms = 360 p_disp->last_msg_queue_time_us / 1000; 361 362 if (p_num_queued_msgs) 363 *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo); 364 365 cl_spinlock_release(&p_disp->lock); 366} 367