1321936Shselasky/* 2321936Shselasky * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved. 3321936Shselasky * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved. 4321936Shselasky * Copyright (c) 1996-2003 Intel Corporation. All rights reserved. 5321936Shselasky * 6321936Shselasky * This software is available to you under a choice of one of two 7321936Shselasky * licenses. You may choose to be licensed under the terms of the GNU 8321936Shselasky * General Public License (GPL) Version 2, available from the file 9321936Shselasky * COPYING in the main directory of this source tree, or the 10321936Shselasky * OpenIB.org BSD license below: 11321936Shselasky * 12321936Shselasky * Redistribution and use in source and binary forms, with or 13321936Shselasky * without modification, are permitted provided that the following 14321936Shselasky * conditions are met: 15321936Shselasky * 16321936Shselasky * - Redistributions of source code must retain the above 17321936Shselasky * copyright notice, this list of conditions and the following 18321936Shselasky * disclaimer. 19321936Shselasky * 20321936Shselasky * - Redistributions in binary form must reproduce the above 21321936Shselasky * copyright notice, this list of conditions and the following 22321936Shselasky * disclaimer in the documentation and/or other materials 23321936Shselasky * provided with the distribution. 24321936Shselasky * 25321936Shselasky * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26321936Shselasky * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27321936Shselasky * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28321936Shselasky * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29321936Shselasky * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30321936Shselasky * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31321936Shselasky * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32321936Shselasky * SOFTWARE. 33321936Shselasky * 34321936Shselasky */ 35321936Shselasky 36321936Shselasky/* 37321936Shselasky * Abstract: 38321936Shselasky * Implementation of Dispatcher abstraction. 39321936Shselasky * 40321936Shselasky */ 41321936Shselasky 42321936Shselasky#if HAVE_CONFIG_H 43321936Shselasky# include <config.h> 44321936Shselasky#endif /* HAVE_CONFIG_H */ 45321936Shselasky 46321936Shselasky#include <stdlib.h> 47321936Shselasky#include <complib/cl_dispatcher.h> 48321936Shselasky#include <complib/cl_thread.h> 49321936Shselasky#include <complib/cl_timer.h> 50321936Shselasky 51321936Shselasky/* give some guidance when we build our cl_pool of messages */ 52321936Shselasky#define CL_DISP_INITIAL_MSG_COUNT 256 53321936Shselasky#define CL_DISP_MSG_GROW_SIZE 64 54321936Shselasky 55321936Shselasky/* give some guidance when we build our cl_pool of registration elements */ 56321936Shselasky#define CL_DISP_INITIAL_REG_COUNT 16 57321936Shselasky#define CL_DISP_REG_GROW_SIZE 16 58321936Shselasky 59321936Shselasky/******************************************************************** 60321936Shselasky __cl_disp_worker 61321936Shselasky 62321936Shselasky Description: 63321936Shselasky This function takes messages off the FIFO and calls Processmsg() 64321936Shselasky This function executes as passive level. 65321936Shselasky 66321936Shselasky Inputs: 67321936Shselasky p_disp - Pointer to Dispatcher object 68321936Shselasky 69321936Shselasky Outputs: 70321936Shselasky None 71321936Shselasky 72321936Shselasky Returns: 73321936Shselasky None 74321936Shselasky********************************************************************/ 75321936Shselaskyvoid __cl_disp_worker(IN void *context) 76321936Shselasky{ 77321936Shselasky cl_disp_msg_t *p_msg; 78321936Shselasky cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context; 79321936Shselasky 80321936Shselasky cl_spinlock_acquire(&p_disp->lock); 81321936Shselasky 82321936Shselasky /* Process the FIFO until we drain it dry. */ 83321936Shselasky while (cl_qlist_count(&p_disp->msg_fifo)) { 84321936Shselasky /* Pop the message at the head from the FIFO. */ 85321936Shselasky p_msg = 86321936Shselasky (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo); 87321936Shselasky 88321936Shselasky /* we track the tim ethe last message spent in the queue */ 89321936Shselasky p_disp->last_msg_queue_time_us = 90321936Shselasky cl_get_time_stamp() - p_msg->in_time; 91321936Shselasky 92321936Shselasky /* 93321936Shselasky * Release the spinlock while the message is processed. 94321936Shselasky * The user's callback may reenter the dispatcher 95321936Shselasky * and cause the lock to be reaquired. 96321936Shselasky */ 97321936Shselasky cl_spinlock_release(&p_disp->lock); 98321936Shselasky p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg-> 99321936Shselasky context, 100321936Shselasky (void *)p_msg->p_data); 101321936Shselasky 102321936Shselasky cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt); 103321936Shselasky 104321936Shselasky /* The client has seen the data. Notify the sender as appropriate. */ 105321936Shselasky if (p_msg->pfn_xmt_callback) { 106321936Shselasky p_msg->pfn_xmt_callback((void *)p_msg->context, 107321936Shselasky (void *)p_msg->p_data); 108321936Shselasky cl_atomic_dec(&p_msg->p_src_reg->ref_cnt); 109321936Shselasky } 110321936Shselasky 111321936Shselasky /* Grab the lock for the next iteration through the list. */ 112321936Shselasky cl_spinlock_acquire(&p_disp->lock); 113321936Shselasky 114321936Shselasky /* Return this message to the pool. */ 115321936Shselasky cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg); 116321936Shselasky } 117321936Shselasky 118321936Shselasky cl_spinlock_release(&p_disp->lock); 119321936Shselasky} 120321936Shselasky 121321936Shselaskyvoid cl_disp_construct(IN cl_dispatcher_t * const p_disp) 122321936Shselasky{ 123321936Shselasky CL_ASSERT(p_disp); 124321936Shselasky 125321936Shselasky cl_qlist_init(&p_disp->reg_list); 126321936Shselasky cl_ptr_vector_construct(&p_disp->reg_vec); 127321936Shselasky cl_qlist_init(&p_disp->msg_fifo); 128321936Shselasky cl_spinlock_construct(&p_disp->lock); 129321936Shselasky cl_qpool_construct(&p_disp->msg_pool); 130321936Shselasky} 131321936Shselasky 132321936Shselaskyvoid cl_disp_shutdown(IN cl_dispatcher_t * const p_disp) 133321936Shselasky{ 134321936Shselasky CL_ASSERT(p_disp); 135321936Shselasky 136321936Shselasky /* Stop the thread pool. */ 137321936Shselasky cl_thread_pool_destroy(&p_disp->worker_threads); 138321936Shselasky 139321936Shselasky /* Process all outstanding callbacks. */ 140321936Shselasky __cl_disp_worker(p_disp); 141321936Shselasky 142321936Shselasky /* Free all registration info. */ 143321936Shselasky while (!cl_is_qlist_empty(&p_disp->reg_list)) 144321936Shselasky free(cl_qlist_remove_head(&p_disp->reg_list)); 145321936Shselasky} 146321936Shselasky 147321936Shselaskyvoid cl_disp_destroy(IN cl_dispatcher_t * const p_disp) 148321936Shselasky{ 149321936Shselasky CL_ASSERT(p_disp); 150321936Shselasky 151321936Shselasky cl_spinlock_destroy(&p_disp->lock); 152321936Shselasky /* Destroy the message pool */ 153321936Shselasky cl_qpool_destroy(&p_disp->msg_pool); 154321936Shselasky /* Destroy the pointer vector of registrants. */ 155321936Shselasky cl_ptr_vector_destroy(&p_disp->reg_vec); 156321936Shselasky} 157321936Shselasky 158321936Shselaskycl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp, 159321936Shselasky IN const uint32_t thread_count, 160321936Shselasky IN const char *const name) 161321936Shselasky{ 162321936Shselasky cl_status_t status; 163321936Shselasky 164321936Shselasky CL_ASSERT(p_disp); 165321936Shselasky 166321936Shselasky cl_disp_construct(p_disp); 167321936Shselasky 168321936Shselasky status = cl_spinlock_init(&p_disp->lock); 169321936Shselasky if (status != CL_SUCCESS) { 170321936Shselasky cl_disp_destroy(p_disp); 171321936Shselasky return (status); 172321936Shselasky } 173321936Shselasky 174321936Shselasky /* Specify no upper limit to the number of messages in the pool */ 175321936Shselasky status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT, 176321936Shselasky 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t), 177321936Shselasky NULL, NULL, NULL); 178321936Shselasky if (status != CL_SUCCESS) { 179321936Shselasky cl_disp_destroy(p_disp); 180321936Shselasky return (status); 181321936Shselasky } 182321936Shselasky 183321936Shselasky status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT, 184321936Shselasky CL_DISP_REG_GROW_SIZE); 185321936Shselasky if (status != CL_SUCCESS) { 186321936Shselasky cl_disp_destroy(p_disp); 187321936Shselasky return (status); 188321936Shselasky } 189321936Shselasky 190321936Shselasky status = cl_thread_pool_init(&p_disp->worker_threads, thread_count, 191321936Shselasky __cl_disp_worker, p_disp, name); 192321936Shselasky if (status != CL_SUCCESS) 193321936Shselasky cl_disp_destroy(p_disp); 194321936Shselasky 195321936Shselasky return (status); 196321936Shselasky} 197321936Shselasky 198321936Shselaskycl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp, 199321936Shselasky IN const cl_disp_msgid_t msg_id, 200321936Shselasky IN cl_pfn_msgrcv_cb_t pfn_callback 201321936Shselasky OPTIONAL, 202321936Shselasky IN const void *const context OPTIONAL) 203321936Shselasky{ 204321936Shselasky cl_disp_reg_info_t *p_reg; 205321936Shselasky cl_status_t status; 206321936Shselasky 207321936Shselasky CL_ASSERT(p_disp); 208321936Shselasky 209321936Shselasky /* Check that the requested registrant ID is available. */ 210321936Shselasky cl_spinlock_acquire(&p_disp->lock); 211321936Shselasky if ((msg_id != CL_DISP_MSGID_NONE) && 212321936Shselasky (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) && 213321936Shselasky (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) { 214321936Shselasky cl_spinlock_release(&p_disp->lock); 215321936Shselasky return (NULL); 216321936Shselasky } 217321936Shselasky 218321936Shselasky /* Get a registration info from the pool. */ 219321936Shselasky p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t)); 220321936Shselasky if (!p_reg) { 221321936Shselasky cl_spinlock_release(&p_disp->lock); 222321936Shselasky return (NULL); 223321936Shselasky } else { 224321936Shselasky memset(p_reg, 0, sizeof(cl_disp_reg_info_t)); 225321936Shselasky } 226321936Shselasky 227321936Shselasky p_reg->p_disp = p_disp; 228321936Shselasky p_reg->ref_cnt = 0; 229321936Shselasky p_reg->pfn_rcv_callback = pfn_callback; 230321936Shselasky p_reg->context = context; 231321936Shselasky p_reg->msg_id = msg_id; 232321936Shselasky 233321936Shselasky /* Insert the registration in the list. */ 234321936Shselasky cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg); 235321936Shselasky 236321936Shselasky /* Set the array entry to the registrant. */ 237321936Shselasky /* The ptr_vector grow automatically as necessary. */ 238321936Shselasky if (msg_id != CL_DISP_MSGID_NONE) { 239321936Shselasky status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg); 240321936Shselasky if (status != CL_SUCCESS) { 241321936Shselasky free(p_reg); 242321936Shselasky cl_spinlock_release(&p_disp->lock); 243321936Shselasky return (NULL); 244321936Shselasky } 245321936Shselasky } 246321936Shselasky 247321936Shselasky cl_spinlock_release(&p_disp->lock); 248321936Shselasky 249321936Shselasky return (p_reg); 250321936Shselasky} 251321936Shselasky 252321936Shselaskyvoid cl_disp_unregister(IN const cl_disp_reg_handle_t handle) 253321936Shselasky{ 254321936Shselasky cl_disp_reg_info_t *p_reg; 255321936Shselasky cl_dispatcher_t *p_disp; 256321936Shselasky 257321936Shselasky if (handle == CL_DISP_INVALID_HANDLE) 258321936Shselasky return; 259321936Shselasky 260321936Shselasky p_reg = (cl_disp_reg_info_t *) handle; 261321936Shselasky p_disp = p_reg->p_disp; 262321936Shselasky CL_ASSERT(p_disp); 263321936Shselasky 264321936Shselasky cl_spinlock_acquire(&p_disp->lock); 265321936Shselasky /* 266321936Shselasky * Clear the registrant vector entry. This will cause any further 267321936Shselasky * post calls to fail. 268321936Shselasky */ 269321936Shselasky if (p_reg->msg_id != CL_DISP_MSGID_NONE) { 270321936Shselasky CL_ASSERT(p_reg->msg_id < 271321936Shselasky cl_ptr_vector_get_size(&p_disp->reg_vec)); 272321936Shselasky cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL); 273321936Shselasky } 274321936Shselasky cl_spinlock_release(&p_disp->lock); 275321936Shselasky 276321936Shselasky while (p_reg->ref_cnt > 0) 277321936Shselasky cl_thread_suspend(1); 278321936Shselasky 279321936Shselasky cl_spinlock_acquire(&p_disp->lock); 280321936Shselasky /* Remove the registrant from the list. */ 281321936Shselasky cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg); 282321936Shselasky /* Return the registration info to the pool */ 283321936Shselasky free(p_reg); 284321936Shselasky 285321936Shselasky cl_spinlock_release(&p_disp->lock); 286321936Shselasky} 287321936Shselasky 288321936Shselaskycl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle, 289321936Shselasky IN const cl_disp_msgid_t msg_id, 290321936Shselasky IN const void *const p_data, 291321936Shselasky IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL, 292321936Shselasky IN const void *const context OPTIONAL) 293321936Shselasky{ 294321936Shselasky cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle; 295321936Shselasky cl_disp_reg_info_t *p_dest_reg; 296321936Shselasky cl_dispatcher_t *p_disp; 297321936Shselasky cl_disp_msg_t *p_msg; 298321936Shselasky 299321936Shselasky p_disp = handle->p_disp; 300321936Shselasky CL_ASSERT(p_disp); 301321936Shselasky CL_ASSERT(msg_id != CL_DISP_MSGID_NONE); 302321936Shselasky 303321936Shselasky cl_spinlock_acquire(&p_disp->lock); 304321936Shselasky /* Check that the recipient exists. */ 305321936Shselasky if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) { 306321936Shselasky cl_spinlock_release(&p_disp->lock); 307321936Shselasky return (CL_NOT_FOUND); 308321936Shselasky } 309321936Shselasky 310321936Shselasky p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id); 311321936Shselasky if (!p_dest_reg) { 312321936Shselasky cl_spinlock_release(&p_disp->lock); 313321936Shselasky return (CL_NOT_FOUND); 314321936Shselasky } 315321936Shselasky 316321936Shselasky /* Get a free message from the pool. */ 317321936Shselasky p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool); 318321936Shselasky if (!p_msg) { 319321936Shselasky cl_spinlock_release(&p_disp->lock); 320321936Shselasky return (CL_INSUFFICIENT_MEMORY); 321321936Shselasky } 322321936Shselasky 323321936Shselasky /* Initialize the message */ 324321936Shselasky p_msg->p_src_reg = p_src_reg; 325321936Shselasky p_msg->p_dest_reg = p_dest_reg; 326321936Shselasky p_msg->p_data = p_data; 327321936Shselasky p_msg->pfn_xmt_callback = pfn_callback; 328321936Shselasky p_msg->context = context; 329321936Shselasky p_msg->in_time = cl_get_time_stamp(); 330321936Shselasky 331321936Shselasky /* 332321936Shselasky * Increment the sender's reference count if they request a completion 333321936Shselasky * notification. 334321936Shselasky */ 335321936Shselasky if (pfn_callback) 336321936Shselasky cl_atomic_inc(&p_src_reg->ref_cnt); 337321936Shselasky 338321936Shselasky /* Increment the recipient's reference count. */ 339321936Shselasky cl_atomic_inc(&p_dest_reg->ref_cnt); 340321936Shselasky 341321936Shselasky /* Queue the message in the FIFO. */ 342321936Shselasky cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg); 343321936Shselasky cl_spinlock_release(&p_disp->lock); 344321936Shselasky 345321936Shselasky /* Signal the thread pool that there is work to be done. */ 346321936Shselasky cl_thread_pool_signal(&p_disp->worker_threads); 347321936Shselasky return (CL_SUCCESS); 348321936Shselasky} 349321936Shselasky 350321936Shselaskyvoid cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle, 351321936Shselasky OUT uint32_t * p_num_queued_msgs, 352321936Shselasky OUT uint64_t * p_last_msg_queue_time_ms) 353321936Shselasky{ 354321936Shselasky cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp; 355321936Shselasky 356321936Shselasky cl_spinlock_acquire(&p_disp->lock); 357321936Shselasky 358321936Shselasky if (p_last_msg_queue_time_ms) 359321936Shselasky *p_last_msg_queue_time_ms = 360321936Shselasky p_disp->last_msg_queue_time_us / 1000; 361321936Shselasky 362321936Shselasky if (p_num_queued_msgs) 363321936Shselasky *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo); 364321936Shselasky 365321936Shselasky cl_spinlock_release(&p_disp->lock); 366321936Shselasky} 367