1219820Sjeff/* 2219820Sjeff * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved. 3219820Sjeff * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved. 4219820Sjeff * Copyright (c) 1996-2003 Intel Corporation. All rights reserved. 5219820Sjeff * 6219820Sjeff * This software is available to you under a choice of one of two 7219820Sjeff * licenses. You may choose to be licensed under the terms of the GNU 8219820Sjeff * General Public License (GPL) Version 2, available from the file 9219820Sjeff * COPYING in the main directory of this source tree, or the 10219820Sjeff * OpenIB.org BSD license below: 11219820Sjeff * 12219820Sjeff * Redistribution and use in source and binary forms, with or 13219820Sjeff * without modification, are permitted provided that the following 14219820Sjeff * conditions are met: 15219820Sjeff * 16219820Sjeff * - Redistributions of source code must retain the above 17219820Sjeff * copyright notice, this list of conditions and the following 18219820Sjeff * disclaimer. 19219820Sjeff * 20219820Sjeff * - Redistributions in binary form must reproduce the above 21219820Sjeff * copyright notice, this list of conditions and the following 22219820Sjeff * disclaimer in the documentation and/or other materials 23219820Sjeff * provided with the distribution. 24219820Sjeff * 25219820Sjeff * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26219820Sjeff * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27219820Sjeff * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28219820Sjeff * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29219820Sjeff * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30219820Sjeff * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31219820Sjeff * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32219820Sjeff * SOFTWARE. 33219820Sjeff * 34219820Sjeff */ 35219820Sjeff 36219820Sjeff/* 37219820Sjeff * Abstract: 38219820Sjeff * Implementation of Dispatcher abstraction. 39219820Sjeff * 40219820Sjeff */ 41219820Sjeff 42219820Sjeff#if HAVE_CONFIG_H 43219820Sjeff# include <config.h> 44219820Sjeff#endif /* HAVE_CONFIG_H */ 45219820Sjeff 46219820Sjeff#include <stdlib.h> 47219820Sjeff#include <complib/cl_dispatcher.h> 48219820Sjeff#include <complib/cl_thread.h> 49219820Sjeff#include <complib/cl_timer.h> 50219820Sjeff 51219820Sjeff/* give some guidance when we build our cl_pool of messages */ 52219820Sjeff#define CL_DISP_INITIAL_MSG_COUNT 256 53219820Sjeff#define CL_DISP_MSG_GROW_SIZE 64 54219820Sjeff 55219820Sjeff/* give some guidance when we build our cl_pool of registration elements */ 56219820Sjeff#define CL_DISP_INITIAL_REG_COUNT 16 57219820Sjeff#define CL_DISP_REG_GROW_SIZE 16 58219820Sjeff 59219820Sjeff/******************************************************************** 60219820Sjeff __cl_disp_worker 61219820Sjeff 62219820Sjeff Description: 63219820Sjeff This function takes messages off the FIFO and calls Processmsg() 64219820Sjeff This function executes as passive level. 65219820Sjeff 66219820Sjeff Inputs: 67219820Sjeff p_disp - Pointer to Dispatcher object 68219820Sjeff 69219820Sjeff Outputs: 70219820Sjeff None 71219820Sjeff 72219820Sjeff Returns: 73219820Sjeff None 74219820Sjeff********************************************************************/ 75219820Sjeffvoid __cl_disp_worker(IN void *context) 76219820Sjeff{ 77219820Sjeff cl_disp_msg_t *p_msg; 78219820Sjeff cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context; 79219820Sjeff 80219820Sjeff cl_spinlock_acquire(&p_disp->lock); 81219820Sjeff 82219820Sjeff /* Process the FIFO until we drain it dry. */ 83219820Sjeff while (cl_qlist_count(&p_disp->msg_fifo)) { 84219820Sjeff /* Pop the message at the head from the FIFO. */ 85219820Sjeff p_msg = 86219820Sjeff (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo); 87219820Sjeff 88219820Sjeff /* we track the tim ethe last message spent in the queue */ 89219820Sjeff p_disp->last_msg_queue_time_us = 90219820Sjeff cl_get_time_stamp() - p_msg->in_time; 91219820Sjeff 92219820Sjeff /* 93219820Sjeff * Release the spinlock while the message is processed. 94219820Sjeff * The user's callback may reenter the dispatcher 95219820Sjeff * and cause the lock to be reaquired. 96219820Sjeff */ 97219820Sjeff cl_spinlock_release(&p_disp->lock); 98219820Sjeff p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg-> 99219820Sjeff context, 100219820Sjeff (void *)p_msg->p_data); 101219820Sjeff 102219820Sjeff cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt); 103219820Sjeff 104219820Sjeff /* The client has seen the data. Notify the sender as appropriate. */ 105219820Sjeff if (p_msg->pfn_xmt_callback) { 106219820Sjeff p_msg->pfn_xmt_callback((void *)p_msg->context, 107219820Sjeff (void *)p_msg->p_data); 108219820Sjeff cl_atomic_dec(&p_msg->p_src_reg->ref_cnt); 109219820Sjeff } 110219820Sjeff 111219820Sjeff /* Grab the lock for the next iteration through the list. */ 112219820Sjeff cl_spinlock_acquire(&p_disp->lock); 113219820Sjeff 114219820Sjeff /* Return this message to the pool. */ 115219820Sjeff cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg); 116219820Sjeff } 117219820Sjeff 118219820Sjeff cl_spinlock_release(&p_disp->lock); 119219820Sjeff} 120219820Sjeff 121219820Sjeff/******************************************************************** 122219820Sjeff ********************************************************************/ 123219820Sjeffvoid cl_disp_construct(IN cl_dispatcher_t * const p_disp) 124219820Sjeff{ 125219820Sjeff CL_ASSERT(p_disp); 126219820Sjeff 127219820Sjeff cl_qlist_init(&p_disp->reg_list); 128219820Sjeff cl_ptr_vector_construct(&p_disp->reg_vec); 129219820Sjeff cl_qlist_init(&p_disp->msg_fifo); 130219820Sjeff cl_spinlock_construct(&p_disp->lock); 131219820Sjeff cl_qpool_construct(&p_disp->msg_pool); 132219820Sjeff} 133219820Sjeff 134219820Sjeff/******************************************************************** 135219820Sjeff ********************************************************************/ 136219820Sjeffvoid cl_disp_shutdown(IN cl_dispatcher_t * const p_disp) 137219820Sjeff{ 138219820Sjeff CL_ASSERT(p_disp); 139219820Sjeff 140219820Sjeff /* Stop the thread pool. */ 141219820Sjeff cl_thread_pool_destroy(&p_disp->worker_threads); 142219820Sjeff 143219820Sjeff /* Process all outstanding callbacks. */ 144219820Sjeff __cl_disp_worker(p_disp); 145219820Sjeff 146219820Sjeff /* Free all registration info. */ 147219820Sjeff while (!cl_is_qlist_empty(&p_disp->reg_list)) 148219820Sjeff free(cl_qlist_remove_head(&p_disp->reg_list)); 149219820Sjeff} 150219820Sjeff 151219820Sjeff/******************************************************************** 152219820Sjeff ********************************************************************/ 153219820Sjeffvoid cl_disp_destroy(IN cl_dispatcher_t * const p_disp) 154219820Sjeff{ 155219820Sjeff CL_ASSERT(p_disp); 156219820Sjeff 157219820Sjeff cl_spinlock_destroy(&p_disp->lock); 158219820Sjeff /* Destroy the message pool */ 159219820Sjeff cl_qpool_destroy(&p_disp->msg_pool); 160219820Sjeff /* Destroy the pointer vector of registrants. */ 161219820Sjeff cl_ptr_vector_destroy(&p_disp->reg_vec); 162219820Sjeff} 163219820Sjeff 164219820Sjeff/******************************************************************** 165219820Sjeff ********************************************************************/ 166219820Sjeffcl_status_t 167219820Sjeffcl_disp_init(IN cl_dispatcher_t * const p_disp, 168219820Sjeff IN const uint32_t thread_count, IN const char *const name) 169219820Sjeff{ 170219820Sjeff cl_status_t status; 171219820Sjeff 172219820Sjeff CL_ASSERT(p_disp); 173219820Sjeff 174219820Sjeff cl_disp_construct(p_disp); 175219820Sjeff 176219820Sjeff status = cl_spinlock_init(&p_disp->lock); 177219820Sjeff if (status != CL_SUCCESS) { 178219820Sjeff cl_disp_destroy(p_disp); 179219820Sjeff return (status); 180219820Sjeff } 181219820Sjeff 182219820Sjeff /* Specify no upper limit to the number of messages in the pool */ 183219820Sjeff status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT, 184219820Sjeff 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t), 185219820Sjeff NULL, NULL, NULL); 186219820Sjeff if (status != CL_SUCCESS) { 187219820Sjeff cl_disp_destroy(p_disp); 188219820Sjeff return (status); 189219820Sjeff } 190219820Sjeff 191219820Sjeff status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT, 192219820Sjeff CL_DISP_REG_GROW_SIZE); 193219820Sjeff if (status != CL_SUCCESS) { 194219820Sjeff cl_disp_destroy(p_disp); 195219820Sjeff return (status); 196219820Sjeff } 197219820Sjeff 198219820Sjeff status = cl_thread_pool_init(&p_disp->worker_threads, thread_count, 199219820Sjeff __cl_disp_worker, p_disp, name); 200219820Sjeff if (status != CL_SUCCESS) 201219820Sjeff cl_disp_destroy(p_disp); 202219820Sjeff 203219820Sjeff return (status); 204219820Sjeff} 205219820Sjeff 206219820Sjeff/******************************************************************** 207219820Sjeff ********************************************************************/ 208219820Sjeffcl_disp_reg_handle_t 209219820Sjeffcl_disp_register(IN cl_dispatcher_t * const p_disp, 210219820Sjeff IN const cl_disp_msgid_t msg_id, 211219820Sjeff IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL, 212219820Sjeff IN const void *const context OPTIONAL) 213219820Sjeff{ 214219820Sjeff cl_disp_reg_info_t *p_reg; 215219820Sjeff cl_status_t status; 216219820Sjeff 217219820Sjeff CL_ASSERT(p_disp); 218219820Sjeff 219219820Sjeff /* Check that the requested registrant ID is available. */ 220219820Sjeff cl_spinlock_acquire(&p_disp->lock); 221219820Sjeff if ((msg_id != CL_DISP_MSGID_NONE) && 222219820Sjeff (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) && 223219820Sjeff (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) { 224219820Sjeff cl_spinlock_release(&p_disp->lock); 225219820Sjeff return (NULL); 226219820Sjeff } 227219820Sjeff 228219820Sjeff /* Get a registration info from the pool. */ 229219820Sjeff p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t)); 230219820Sjeff if (!p_reg) { 231219820Sjeff cl_spinlock_release(&p_disp->lock); 232219820Sjeff return (NULL); 233219820Sjeff } else { 234219820Sjeff memset(p_reg, 0, sizeof(cl_disp_reg_info_t)); 235219820Sjeff } 236219820Sjeff 237219820Sjeff p_reg->p_disp = p_disp; 238219820Sjeff p_reg->ref_cnt = 0; 239219820Sjeff p_reg->pfn_rcv_callback = pfn_callback; 240219820Sjeff p_reg->context = context; 241219820Sjeff p_reg->msg_id = msg_id; 242219820Sjeff 243219820Sjeff /* Insert the registration in the list. */ 244219820Sjeff cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg); 245219820Sjeff 246219820Sjeff /* Set the array entry to the registrant. */ 247219820Sjeff /* The ptr_vector grow automatically as necessary. */ 248219820Sjeff if (msg_id != CL_DISP_MSGID_NONE) { 249219820Sjeff status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg); 250219820Sjeff if (status != CL_SUCCESS) { 251219820Sjeff free(p_reg); 252219820Sjeff cl_spinlock_release(&p_disp->lock); 253219820Sjeff return (NULL); 254219820Sjeff } 255219820Sjeff } 256219820Sjeff 257219820Sjeff cl_spinlock_release(&p_disp->lock); 258219820Sjeff 259219820Sjeff return (p_reg); 260219820Sjeff} 261219820Sjeff 262219820Sjeff/******************************************************************** 263219820Sjeff ********************************************************************/ 264219820Sjeffvoid cl_disp_unregister(IN const cl_disp_reg_handle_t handle) 265219820Sjeff{ 266219820Sjeff cl_disp_reg_info_t *p_reg; 267219820Sjeff cl_dispatcher_t *p_disp; 268219820Sjeff 269219820Sjeff if (handle == CL_DISP_INVALID_HANDLE) 270219820Sjeff return; 271219820Sjeff 272219820Sjeff p_reg = (cl_disp_reg_info_t *) handle; 273219820Sjeff p_disp = p_reg->p_disp; 274219820Sjeff CL_ASSERT(p_disp); 275219820Sjeff 276219820Sjeff cl_spinlock_acquire(&p_disp->lock); 277219820Sjeff /* 278219820Sjeff * Clear the registrant vector entry. This will cause any further 279219820Sjeff * post calls to fail. 280219820Sjeff */ 281219820Sjeff if (p_reg->msg_id != CL_DISP_MSGID_NONE) { 282219820Sjeff CL_ASSERT(p_reg->msg_id < 283219820Sjeff cl_ptr_vector_get_size(&p_disp->reg_vec)); 284219820Sjeff cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL); 285219820Sjeff } 286219820Sjeff cl_spinlock_release(&p_disp->lock); 287219820Sjeff 288219820Sjeff while (p_reg->ref_cnt > 0) 289219820Sjeff cl_thread_suspend(1); 290219820Sjeff 291219820Sjeff cl_spinlock_acquire(&p_disp->lock); 292219820Sjeff /* Remove the registrant from the list. */ 293219820Sjeff cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg); 294219820Sjeff /* Return the registration info to the pool */ 295219820Sjeff free(p_reg); 296219820Sjeff 297219820Sjeff cl_spinlock_release(&p_disp->lock); 298219820Sjeff} 299219820Sjeff 300219820Sjeff/******************************************************************** 301219820Sjeff ********************************************************************/ 302219820Sjeffcl_status_t 303219820Sjeffcl_disp_post(IN const cl_disp_reg_handle_t handle, 304219820Sjeff IN const cl_disp_msgid_t msg_id, 305219820Sjeff IN const void *const p_data, 306219820Sjeff IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL, 307219820Sjeff IN const void *const context OPTIONAL) 308219820Sjeff{ 309219820Sjeff cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle; 310219820Sjeff cl_disp_reg_info_t *p_dest_reg; 311219820Sjeff cl_dispatcher_t *p_disp; 312219820Sjeff cl_disp_msg_t *p_msg; 313219820Sjeff 314219820Sjeff p_disp = handle->p_disp; 315219820Sjeff CL_ASSERT(p_disp); 316219820Sjeff CL_ASSERT(msg_id != CL_DISP_MSGID_NONE); 317219820Sjeff 318219820Sjeff cl_spinlock_acquire(&p_disp->lock); 319219820Sjeff /* Check that the recipient exists. */ 320219820Sjeff p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id); 321219820Sjeff if (!p_dest_reg) { 322219820Sjeff cl_spinlock_release(&p_disp->lock); 323219820Sjeff return (CL_NOT_FOUND); 324219820Sjeff } 325219820Sjeff 326219820Sjeff /* Get a free message from the pool. */ 327219820Sjeff p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool); 328219820Sjeff if (!p_msg) { 329219820Sjeff cl_spinlock_release(&p_disp->lock); 330219820Sjeff return (CL_INSUFFICIENT_MEMORY); 331219820Sjeff } 332219820Sjeff 333219820Sjeff /* Initialize the message */ 334219820Sjeff p_msg->p_src_reg = p_src_reg; 335219820Sjeff p_msg->p_dest_reg = p_dest_reg; 336219820Sjeff p_msg->p_data = p_data; 337219820Sjeff p_msg->pfn_xmt_callback = pfn_callback; 338219820Sjeff p_msg->context = context; 339219820Sjeff p_msg->in_time = cl_get_time_stamp(); 340219820Sjeff 341219820Sjeff /* 342219820Sjeff * Increment the sender's reference count if they request a completion 343219820Sjeff * notification. 344219820Sjeff */ 345219820Sjeff if (pfn_callback) 346219820Sjeff cl_atomic_inc(&p_src_reg->ref_cnt); 347219820Sjeff 348219820Sjeff /* Increment the recipient's reference count. */ 349219820Sjeff cl_atomic_inc(&p_dest_reg->ref_cnt); 350219820Sjeff 351219820Sjeff /* Queue the message in the FIFO. */ 352219820Sjeff cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg); 353219820Sjeff cl_spinlock_release(&p_disp->lock); 354219820Sjeff 355219820Sjeff /* Signal the thread pool that there is work to be done. */ 356219820Sjeff cl_thread_pool_signal(&p_disp->worker_threads); 357219820Sjeff return (CL_SUCCESS); 358219820Sjeff} 359219820Sjeff 360219820Sjeffvoid 361219820Sjeffcl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle, 362219820Sjeff OUT uint32_t * p_num_queued_msgs, 363219820Sjeff OUT uint64_t * p_last_msg_queue_time_ms) 364219820Sjeff{ 365219820Sjeff cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp; 366219820Sjeff 367219820Sjeff cl_spinlock_acquire(&p_disp->lock); 368219820Sjeff 369219820Sjeff if (p_last_msg_queue_time_ms) 370219820Sjeff *p_last_msg_queue_time_ms = 371219820Sjeff p_disp->last_msg_queue_time_us / 1000; 372219820Sjeff 373219820Sjeff if (p_num_queued_msgs) 374219820Sjeff *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo); 375219820Sjeff 376219820Sjeff cl_spinlock_release(&p_disp->lock); 377219820Sjeff} 378