1219820Sjeff/*
2219820Sjeff * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3219820Sjeff * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4219820Sjeff * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
5219820Sjeff *
6219820Sjeff * This software is available to you under a choice of one of two
7219820Sjeff * licenses.  You may choose to be licensed under the terms of the GNU
8219820Sjeff * General Public License (GPL) Version 2, available from the file
9219820Sjeff * COPYING in the main directory of this source tree, or the
10219820Sjeff * OpenIB.org BSD license below:
11219820Sjeff *
12219820Sjeff *     Redistribution and use in source and binary forms, with or
13219820Sjeff *     without modification, are permitted provided that the following
14219820Sjeff *     conditions are met:
15219820Sjeff *
16219820Sjeff *      - Redistributions of source code must retain the above
17219820Sjeff *        copyright notice, this list of conditions and the following
18219820Sjeff *        disclaimer.
19219820Sjeff *
20219820Sjeff *      - Redistributions in binary form must reproduce the above
21219820Sjeff *        copyright notice, this list of conditions and the following
22219820Sjeff *        disclaimer in the documentation and/or other materials
23219820Sjeff *        provided with the distribution.
24219820Sjeff *
25219820Sjeff * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26219820Sjeff * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27219820Sjeff * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28219820Sjeff * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29219820Sjeff * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30219820Sjeff * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31219820Sjeff * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32219820Sjeff * SOFTWARE.
33219820Sjeff *
34219820Sjeff */
35219820Sjeff
36219820Sjeff/*
37219820Sjeff * Abstract:
38219820Sjeff *    Implementation of Dispatcher abstraction.
39219820Sjeff *
40219820Sjeff */
41219820Sjeff
42219820Sjeff#if HAVE_CONFIG_H
43219820Sjeff#  include <config.h>
44219820Sjeff#endif				/* HAVE_CONFIG_H */
45219820Sjeff
46219820Sjeff#include <stdlib.h>
47219820Sjeff#include <complib/cl_dispatcher.h>
48219820Sjeff#include <complib/cl_thread.h>
49219820Sjeff#include <complib/cl_timer.h>
50219820Sjeff
51219820Sjeff/* give some guidance when we build our cl_pool of messages */
52219820Sjeff#define CL_DISP_INITIAL_MSG_COUNT   256
53219820Sjeff#define CL_DISP_MSG_GROW_SIZE       64
54219820Sjeff
55219820Sjeff/* give some guidance when we build our cl_pool of registration elements */
56219820Sjeff#define CL_DISP_INITIAL_REG_COUNT   16
57219820Sjeff#define CL_DISP_REG_GROW_SIZE       16
58219820Sjeff
59219820Sjeff/********************************************************************
60219820Sjeff   __cl_disp_worker
61219820Sjeff
62219820Sjeff   Description:
63219820Sjeff   This function takes messages off the FIFO and calls Processmsg()
64219820Sjeff   This function executes as passive level.
65219820Sjeff
66219820Sjeff   Inputs:
67219820Sjeff   p_disp - Pointer to Dispatcher object
68219820Sjeff
69219820Sjeff   Outputs:
70219820Sjeff   None
71219820Sjeff
72219820Sjeff   Returns:
73219820Sjeff   None
74219820Sjeff********************************************************************/
75219820Sjeffvoid __cl_disp_worker(IN void *context)
76219820Sjeff{
77219820Sjeff	cl_disp_msg_t *p_msg;
78219820Sjeff	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
79219820Sjeff
80219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
81219820Sjeff
82219820Sjeff	/* Process the FIFO until we drain it dry. */
83219820Sjeff	while (cl_qlist_count(&p_disp->msg_fifo)) {
84219820Sjeff		/* Pop the message at the head from the FIFO. */
85219820Sjeff		p_msg =
86219820Sjeff		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
87219820Sjeff
88219820Sjeff		/* we track the tim ethe last message spent in the queue */
89219820Sjeff		p_disp->last_msg_queue_time_us =
90219820Sjeff		    cl_get_time_stamp() - p_msg->in_time;
91219820Sjeff
92219820Sjeff		/*
93219820Sjeff		 * Release the spinlock while the message is processed.
94219820Sjeff		 * The user's callback may reenter the dispatcher
95219820Sjeff		 * and cause the lock to be reaquired.
96219820Sjeff		 */
97219820Sjeff		cl_spinlock_release(&p_disp->lock);
98219820Sjeff		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99219820Sjeff						    context,
100219820Sjeff						    (void *)p_msg->p_data);
101219820Sjeff
102219820Sjeff		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103219820Sjeff
104219820Sjeff		/* The client has seen the data.  Notify the sender as appropriate. */
105219820Sjeff		if (p_msg->pfn_xmt_callback) {
106219820Sjeff			p_msg->pfn_xmt_callback((void *)p_msg->context,
107219820Sjeff						(void *)p_msg->p_data);
108219820Sjeff			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109219820Sjeff		}
110219820Sjeff
111219820Sjeff		/* Grab the lock for the next iteration through the list. */
112219820Sjeff		cl_spinlock_acquire(&p_disp->lock);
113219820Sjeff
114219820Sjeff		/* Return this message to the pool. */
115219820Sjeff		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116219820Sjeff	}
117219820Sjeff
118219820Sjeff	cl_spinlock_release(&p_disp->lock);
119219820Sjeff}
120219820Sjeff
121219820Sjeff/********************************************************************
122219820Sjeff ********************************************************************/
123219820Sjeffvoid cl_disp_construct(IN cl_dispatcher_t * const p_disp)
124219820Sjeff{
125219820Sjeff	CL_ASSERT(p_disp);
126219820Sjeff
127219820Sjeff	cl_qlist_init(&p_disp->reg_list);
128219820Sjeff	cl_ptr_vector_construct(&p_disp->reg_vec);
129219820Sjeff	cl_qlist_init(&p_disp->msg_fifo);
130219820Sjeff	cl_spinlock_construct(&p_disp->lock);
131219820Sjeff	cl_qpool_construct(&p_disp->msg_pool);
132219820Sjeff}
133219820Sjeff
134219820Sjeff/********************************************************************
135219820Sjeff ********************************************************************/
136219820Sjeffvoid cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
137219820Sjeff{
138219820Sjeff	CL_ASSERT(p_disp);
139219820Sjeff
140219820Sjeff	/* Stop the thread pool. */
141219820Sjeff	cl_thread_pool_destroy(&p_disp->worker_threads);
142219820Sjeff
143219820Sjeff	/* Process all outstanding callbacks. */
144219820Sjeff	__cl_disp_worker(p_disp);
145219820Sjeff
146219820Sjeff	/* Free all registration info. */
147219820Sjeff	while (!cl_is_qlist_empty(&p_disp->reg_list))
148219820Sjeff		free(cl_qlist_remove_head(&p_disp->reg_list));
149219820Sjeff}
150219820Sjeff
151219820Sjeff/********************************************************************
152219820Sjeff ********************************************************************/
153219820Sjeffvoid cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
154219820Sjeff{
155219820Sjeff	CL_ASSERT(p_disp);
156219820Sjeff
157219820Sjeff	cl_spinlock_destroy(&p_disp->lock);
158219820Sjeff	/* Destroy the message pool */
159219820Sjeff	cl_qpool_destroy(&p_disp->msg_pool);
160219820Sjeff	/* Destroy the pointer vector of registrants. */
161219820Sjeff	cl_ptr_vector_destroy(&p_disp->reg_vec);
162219820Sjeff}
163219820Sjeff
164219820Sjeff/********************************************************************
165219820Sjeff ********************************************************************/
166219820Sjeffcl_status_t
167219820Sjeffcl_disp_init(IN cl_dispatcher_t * const p_disp,
168219820Sjeff	     IN const uint32_t thread_count, IN const char *const name)
169219820Sjeff{
170219820Sjeff	cl_status_t status;
171219820Sjeff
172219820Sjeff	CL_ASSERT(p_disp);
173219820Sjeff
174219820Sjeff	cl_disp_construct(p_disp);
175219820Sjeff
176219820Sjeff	status = cl_spinlock_init(&p_disp->lock);
177219820Sjeff	if (status != CL_SUCCESS) {
178219820Sjeff		cl_disp_destroy(p_disp);
179219820Sjeff		return (status);
180219820Sjeff	}
181219820Sjeff
182219820Sjeff	/* Specify no upper limit to the number of messages in the pool */
183219820Sjeff	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
184219820Sjeff			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
185219820Sjeff			       NULL, NULL, NULL);
186219820Sjeff	if (status != CL_SUCCESS) {
187219820Sjeff		cl_disp_destroy(p_disp);
188219820Sjeff		return (status);
189219820Sjeff	}
190219820Sjeff
191219820Sjeff	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
192219820Sjeff				    CL_DISP_REG_GROW_SIZE);
193219820Sjeff	if (status != CL_SUCCESS) {
194219820Sjeff		cl_disp_destroy(p_disp);
195219820Sjeff		return (status);
196219820Sjeff	}
197219820Sjeff
198219820Sjeff	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
199219820Sjeff				     __cl_disp_worker, p_disp, name);
200219820Sjeff	if (status != CL_SUCCESS)
201219820Sjeff		cl_disp_destroy(p_disp);
202219820Sjeff
203219820Sjeff	return (status);
204219820Sjeff}
205219820Sjeff
206219820Sjeff/********************************************************************
207219820Sjeff ********************************************************************/
208219820Sjeffcl_disp_reg_handle_t
209219820Sjeffcl_disp_register(IN cl_dispatcher_t * const p_disp,
210219820Sjeff		 IN const cl_disp_msgid_t msg_id,
211219820Sjeff		 IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL,
212219820Sjeff		 IN const void *const context OPTIONAL)
213219820Sjeff{
214219820Sjeff	cl_disp_reg_info_t *p_reg;
215219820Sjeff	cl_status_t status;
216219820Sjeff
217219820Sjeff	CL_ASSERT(p_disp);
218219820Sjeff
219219820Sjeff	/* Check that the requested registrant ID is available. */
220219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
221219820Sjeff	if ((msg_id != CL_DISP_MSGID_NONE) &&
222219820Sjeff	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
223219820Sjeff	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
224219820Sjeff		cl_spinlock_release(&p_disp->lock);
225219820Sjeff		return (NULL);
226219820Sjeff	}
227219820Sjeff
228219820Sjeff	/* Get a registration info from the pool. */
229219820Sjeff	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
230219820Sjeff	if (!p_reg) {
231219820Sjeff		cl_spinlock_release(&p_disp->lock);
232219820Sjeff		return (NULL);
233219820Sjeff	} else {
234219820Sjeff		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
235219820Sjeff	}
236219820Sjeff
237219820Sjeff	p_reg->p_disp = p_disp;
238219820Sjeff	p_reg->ref_cnt = 0;
239219820Sjeff	p_reg->pfn_rcv_callback = pfn_callback;
240219820Sjeff	p_reg->context = context;
241219820Sjeff	p_reg->msg_id = msg_id;
242219820Sjeff
243219820Sjeff	/* Insert the registration in the list. */
244219820Sjeff	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
245219820Sjeff
246219820Sjeff	/* Set the array entry to the registrant. */
247219820Sjeff	/* The ptr_vector grow automatically as necessary. */
248219820Sjeff	if (msg_id != CL_DISP_MSGID_NONE) {
249219820Sjeff		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
250219820Sjeff		if (status != CL_SUCCESS) {
251219820Sjeff			free(p_reg);
252219820Sjeff			cl_spinlock_release(&p_disp->lock);
253219820Sjeff			return (NULL);
254219820Sjeff		}
255219820Sjeff	}
256219820Sjeff
257219820Sjeff	cl_spinlock_release(&p_disp->lock);
258219820Sjeff
259219820Sjeff	return (p_reg);
260219820Sjeff}
261219820Sjeff
262219820Sjeff/********************************************************************
263219820Sjeff ********************************************************************/
264219820Sjeffvoid cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
265219820Sjeff{
266219820Sjeff	cl_disp_reg_info_t *p_reg;
267219820Sjeff	cl_dispatcher_t *p_disp;
268219820Sjeff
269219820Sjeff	if (handle == CL_DISP_INVALID_HANDLE)
270219820Sjeff		return;
271219820Sjeff
272219820Sjeff	p_reg = (cl_disp_reg_info_t *) handle;
273219820Sjeff	p_disp = p_reg->p_disp;
274219820Sjeff	CL_ASSERT(p_disp);
275219820Sjeff
276219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
277219820Sjeff	/*
278219820Sjeff	 * Clear the registrant vector entry.  This will cause any further
279219820Sjeff	 * post calls to fail.
280219820Sjeff	 */
281219820Sjeff	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
282219820Sjeff		CL_ASSERT(p_reg->msg_id <
283219820Sjeff			  cl_ptr_vector_get_size(&p_disp->reg_vec));
284219820Sjeff		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
285219820Sjeff	}
286219820Sjeff	cl_spinlock_release(&p_disp->lock);
287219820Sjeff
288219820Sjeff	while (p_reg->ref_cnt > 0)
289219820Sjeff		cl_thread_suspend(1);
290219820Sjeff
291219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
292219820Sjeff	/* Remove the registrant from the list. */
293219820Sjeff	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
294219820Sjeff	/* Return the registration info to the pool */
295219820Sjeff	free(p_reg);
296219820Sjeff
297219820Sjeff	cl_spinlock_release(&p_disp->lock);
298219820Sjeff}
299219820Sjeff
300219820Sjeff/********************************************************************
301219820Sjeff ********************************************************************/
302219820Sjeffcl_status_t
303219820Sjeffcl_disp_post(IN const cl_disp_reg_handle_t handle,
304219820Sjeff	     IN const cl_disp_msgid_t msg_id,
305219820Sjeff	     IN const void *const p_data,
306219820Sjeff	     IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
307219820Sjeff	     IN const void *const context OPTIONAL)
308219820Sjeff{
309219820Sjeff	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
310219820Sjeff	cl_disp_reg_info_t *p_dest_reg;
311219820Sjeff	cl_dispatcher_t *p_disp;
312219820Sjeff	cl_disp_msg_t *p_msg;
313219820Sjeff
314219820Sjeff	p_disp = handle->p_disp;
315219820Sjeff	CL_ASSERT(p_disp);
316219820Sjeff	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
317219820Sjeff
318219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
319219820Sjeff	/* Check that the recipient exists. */
320219820Sjeff	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
321219820Sjeff	if (!p_dest_reg) {
322219820Sjeff		cl_spinlock_release(&p_disp->lock);
323219820Sjeff		return (CL_NOT_FOUND);
324219820Sjeff	}
325219820Sjeff
326219820Sjeff	/* Get a free message from the pool. */
327219820Sjeff	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
328219820Sjeff	if (!p_msg) {
329219820Sjeff		cl_spinlock_release(&p_disp->lock);
330219820Sjeff		return (CL_INSUFFICIENT_MEMORY);
331219820Sjeff	}
332219820Sjeff
333219820Sjeff	/* Initialize the message */
334219820Sjeff	p_msg->p_src_reg = p_src_reg;
335219820Sjeff	p_msg->p_dest_reg = p_dest_reg;
336219820Sjeff	p_msg->p_data = p_data;
337219820Sjeff	p_msg->pfn_xmt_callback = pfn_callback;
338219820Sjeff	p_msg->context = context;
339219820Sjeff	p_msg->in_time = cl_get_time_stamp();
340219820Sjeff
341219820Sjeff	/*
342219820Sjeff	 * Increment the sender's reference count if they request a completion
343219820Sjeff	 * notification.
344219820Sjeff	 */
345219820Sjeff	if (pfn_callback)
346219820Sjeff		cl_atomic_inc(&p_src_reg->ref_cnt);
347219820Sjeff
348219820Sjeff	/* Increment the recipient's reference count. */
349219820Sjeff	cl_atomic_inc(&p_dest_reg->ref_cnt);
350219820Sjeff
351219820Sjeff	/* Queue the message in the FIFO. */
352219820Sjeff	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
353219820Sjeff	cl_spinlock_release(&p_disp->lock);
354219820Sjeff
355219820Sjeff	/* Signal the thread pool that there is work to be done. */
356219820Sjeff	cl_thread_pool_signal(&p_disp->worker_threads);
357219820Sjeff	return (CL_SUCCESS);
358219820Sjeff}
359219820Sjeff
360219820Sjeffvoid
361219820Sjeffcl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
362219820Sjeff			 OUT uint32_t * p_num_queued_msgs,
363219820Sjeff			 OUT uint64_t * p_last_msg_queue_time_ms)
364219820Sjeff{
365219820Sjeff	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
366219820Sjeff
367219820Sjeff	cl_spinlock_acquire(&p_disp->lock);
368219820Sjeff
369219820Sjeff	if (p_last_msg_queue_time_ms)
370219820Sjeff		*p_last_msg_queue_time_ms =
371219820Sjeff		    p_disp->last_msg_queue_time_us / 1000;
372219820Sjeff
373219820Sjeff	if (p_num_queued_msgs)
374219820Sjeff		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
375219820Sjeff
376219820Sjeff	cl_spinlock_release(&p_disp->lock);
377219820Sjeff}
378