1321936Shselasky/*
2321936Shselasky * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3321936Shselasky * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4321936Shselasky * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
5321936Shselasky *
6321936Shselasky * This software is available to you under a choice of one of two
7321936Shselasky * licenses.  You may choose to be licensed under the terms of the GNU
8321936Shselasky * General Public License (GPL) Version 2, available from the file
9321936Shselasky * COPYING in the main directory of this source tree, or the
10321936Shselasky * OpenIB.org BSD license below:
11321936Shselasky *
12321936Shselasky *     Redistribution and use in source and binary forms, with or
13321936Shselasky *     without modification, are permitted provided that the following
14321936Shselasky *     conditions are met:
15321936Shselasky *
16321936Shselasky *      - Redistributions of source code must retain the above
17321936Shselasky *        copyright notice, this list of conditions and the following
18321936Shselasky *        disclaimer.
19321936Shselasky *
20321936Shselasky *      - Redistributions in binary form must reproduce the above
21321936Shselasky *        copyright notice, this list of conditions and the following
22321936Shselasky *        disclaimer in the documentation and/or other materials
23321936Shselasky *        provided with the distribution.
24321936Shselasky *
25321936Shselasky * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26321936Shselasky * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27321936Shselasky * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28321936Shselasky * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29321936Shselasky * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30321936Shselasky * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31321936Shselasky * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32321936Shselasky * SOFTWARE.
33321936Shselasky *
34321936Shselasky */
35321936Shselasky
36321936Shselasky/*
37321936Shselasky * Abstract:
38321936Shselasky *    Implementation of Dispatcher abstraction.
39321936Shselasky *
40321936Shselasky */
41321936Shselasky
42321936Shselasky#if HAVE_CONFIG_H
43321936Shselasky#  include <config.h>
44321936Shselasky#endif				/* HAVE_CONFIG_H */
45321936Shselasky
46321936Shselasky#include <stdlib.h>
47321936Shselasky#include <complib/cl_dispatcher.h>
48321936Shselasky#include <complib/cl_thread.h>
49321936Shselasky#include <complib/cl_timer.h>
50321936Shselasky
51321936Shselasky/* give some guidance when we build our cl_pool of messages */
52321936Shselasky#define CL_DISP_INITIAL_MSG_COUNT   256
53321936Shselasky#define CL_DISP_MSG_GROW_SIZE       64
54321936Shselasky
55321936Shselasky/* give some guidance when we build our cl_pool of registration elements */
56321936Shselasky#define CL_DISP_INITIAL_REG_COUNT   16
57321936Shselasky#define CL_DISP_REG_GROW_SIZE       16
58321936Shselasky
59321936Shselasky/********************************************************************
60321936Shselasky   __cl_disp_worker
61321936Shselasky
62321936Shselasky   Description:
63321936Shselasky   This function takes messages off the FIFO and calls Processmsg()
64321936Shselasky   This function executes as passive level.
65321936Shselasky
66321936Shselasky   Inputs:
67321936Shselasky   p_disp - Pointer to Dispatcher object
68321936Shselasky
69321936Shselasky   Outputs:
70321936Shselasky   None
71321936Shselasky
72321936Shselasky   Returns:
73321936Shselasky   None
74321936Shselasky********************************************************************/
75321936Shselaskyvoid __cl_disp_worker(IN void *context)
76321936Shselasky{
77321936Shselasky	cl_disp_msg_t *p_msg;
78321936Shselasky	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
79321936Shselasky
80321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
81321936Shselasky
82321936Shselasky	/* Process the FIFO until we drain it dry. */
83321936Shselasky	while (cl_qlist_count(&p_disp->msg_fifo)) {
84321936Shselasky		/* Pop the message at the head from the FIFO. */
85321936Shselasky		p_msg =
86321936Shselasky		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
87321936Shselasky
88321936Shselasky		/* we track the tim ethe last message spent in the queue */
89321936Shselasky		p_disp->last_msg_queue_time_us =
90321936Shselasky		    cl_get_time_stamp() - p_msg->in_time;
91321936Shselasky
92321936Shselasky		/*
93321936Shselasky		 * Release the spinlock while the message is processed.
94321936Shselasky		 * The user's callback may reenter the dispatcher
95321936Shselasky		 * and cause the lock to be reaquired.
96321936Shselasky		 */
97321936Shselasky		cl_spinlock_release(&p_disp->lock);
98321936Shselasky		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99321936Shselasky						    context,
100321936Shselasky						    (void *)p_msg->p_data);
101321936Shselasky
102321936Shselasky		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103321936Shselasky
104321936Shselasky		/* The client has seen the data.  Notify the sender as appropriate. */
105321936Shselasky		if (p_msg->pfn_xmt_callback) {
106321936Shselasky			p_msg->pfn_xmt_callback((void *)p_msg->context,
107321936Shselasky						(void *)p_msg->p_data);
108321936Shselasky			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109321936Shselasky		}
110321936Shselasky
111321936Shselasky		/* Grab the lock for the next iteration through the list. */
112321936Shselasky		cl_spinlock_acquire(&p_disp->lock);
113321936Shselasky
114321936Shselasky		/* Return this message to the pool. */
115321936Shselasky		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116321936Shselasky	}
117321936Shselasky
118321936Shselasky	cl_spinlock_release(&p_disp->lock);
119321936Shselasky}
120321936Shselasky
121321936Shselaskyvoid cl_disp_construct(IN cl_dispatcher_t * const p_disp)
122321936Shselasky{
123321936Shselasky	CL_ASSERT(p_disp);
124321936Shselasky
125321936Shselasky	cl_qlist_init(&p_disp->reg_list);
126321936Shselasky	cl_ptr_vector_construct(&p_disp->reg_vec);
127321936Shselasky	cl_qlist_init(&p_disp->msg_fifo);
128321936Shselasky	cl_spinlock_construct(&p_disp->lock);
129321936Shselasky	cl_qpool_construct(&p_disp->msg_pool);
130321936Shselasky}
131321936Shselasky
132321936Shselaskyvoid cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
133321936Shselasky{
134321936Shselasky	CL_ASSERT(p_disp);
135321936Shselasky
136321936Shselasky	/* Stop the thread pool. */
137321936Shselasky	cl_thread_pool_destroy(&p_disp->worker_threads);
138321936Shselasky
139321936Shselasky	/* Process all outstanding callbacks. */
140321936Shselasky	__cl_disp_worker(p_disp);
141321936Shselasky
142321936Shselasky	/* Free all registration info. */
143321936Shselasky	while (!cl_is_qlist_empty(&p_disp->reg_list))
144321936Shselasky		free(cl_qlist_remove_head(&p_disp->reg_list));
145321936Shselasky}
146321936Shselasky
147321936Shselaskyvoid cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
148321936Shselasky{
149321936Shselasky	CL_ASSERT(p_disp);
150321936Shselasky
151321936Shselasky	cl_spinlock_destroy(&p_disp->lock);
152321936Shselasky	/* Destroy the message pool */
153321936Shselasky	cl_qpool_destroy(&p_disp->msg_pool);
154321936Shselasky	/* Destroy the pointer vector of registrants. */
155321936Shselasky	cl_ptr_vector_destroy(&p_disp->reg_vec);
156321936Shselasky}
157321936Shselasky
158321936Shselaskycl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp,
159321936Shselasky			 IN const uint32_t thread_count,
160321936Shselasky			 IN const char *const name)
161321936Shselasky{
162321936Shselasky	cl_status_t status;
163321936Shselasky
164321936Shselasky	CL_ASSERT(p_disp);
165321936Shselasky
166321936Shselasky	cl_disp_construct(p_disp);
167321936Shselasky
168321936Shselasky	status = cl_spinlock_init(&p_disp->lock);
169321936Shselasky	if (status != CL_SUCCESS) {
170321936Shselasky		cl_disp_destroy(p_disp);
171321936Shselasky		return (status);
172321936Shselasky	}
173321936Shselasky
174321936Shselasky	/* Specify no upper limit to the number of messages in the pool */
175321936Shselasky	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
176321936Shselasky			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
177321936Shselasky			       NULL, NULL, NULL);
178321936Shselasky	if (status != CL_SUCCESS) {
179321936Shselasky		cl_disp_destroy(p_disp);
180321936Shselasky		return (status);
181321936Shselasky	}
182321936Shselasky
183321936Shselasky	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
184321936Shselasky				    CL_DISP_REG_GROW_SIZE);
185321936Shselasky	if (status != CL_SUCCESS) {
186321936Shselasky		cl_disp_destroy(p_disp);
187321936Shselasky		return (status);
188321936Shselasky	}
189321936Shselasky
190321936Shselasky	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
191321936Shselasky				     __cl_disp_worker, p_disp, name);
192321936Shselasky	if (status != CL_SUCCESS)
193321936Shselasky		cl_disp_destroy(p_disp);
194321936Shselasky
195321936Shselasky	return (status);
196321936Shselasky}
197321936Shselasky
198321936Shselaskycl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp,
199321936Shselasky				      IN const cl_disp_msgid_t msg_id,
200321936Shselasky				      IN cl_pfn_msgrcv_cb_t pfn_callback
201321936Shselasky				      OPTIONAL,
202321936Shselasky				      IN const void *const context OPTIONAL)
203321936Shselasky{
204321936Shselasky	cl_disp_reg_info_t *p_reg;
205321936Shselasky	cl_status_t status;
206321936Shselasky
207321936Shselasky	CL_ASSERT(p_disp);
208321936Shselasky
209321936Shselasky	/* Check that the requested registrant ID is available. */
210321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
211321936Shselasky	if ((msg_id != CL_DISP_MSGID_NONE) &&
212321936Shselasky	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
213321936Shselasky	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
214321936Shselasky		cl_spinlock_release(&p_disp->lock);
215321936Shselasky		return (NULL);
216321936Shselasky	}
217321936Shselasky
218321936Shselasky	/* Get a registration info from the pool. */
219321936Shselasky	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
220321936Shselasky	if (!p_reg) {
221321936Shselasky		cl_spinlock_release(&p_disp->lock);
222321936Shselasky		return (NULL);
223321936Shselasky	} else {
224321936Shselasky		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
225321936Shselasky	}
226321936Shselasky
227321936Shselasky	p_reg->p_disp = p_disp;
228321936Shselasky	p_reg->ref_cnt = 0;
229321936Shselasky	p_reg->pfn_rcv_callback = pfn_callback;
230321936Shselasky	p_reg->context = context;
231321936Shselasky	p_reg->msg_id = msg_id;
232321936Shselasky
233321936Shselasky	/* Insert the registration in the list. */
234321936Shselasky	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
235321936Shselasky
236321936Shselasky	/* Set the array entry to the registrant. */
237321936Shselasky	/* The ptr_vector grow automatically as necessary. */
238321936Shselasky	if (msg_id != CL_DISP_MSGID_NONE) {
239321936Shselasky		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
240321936Shselasky		if (status != CL_SUCCESS) {
241321936Shselasky			free(p_reg);
242321936Shselasky			cl_spinlock_release(&p_disp->lock);
243321936Shselasky			return (NULL);
244321936Shselasky		}
245321936Shselasky	}
246321936Shselasky
247321936Shselasky	cl_spinlock_release(&p_disp->lock);
248321936Shselasky
249321936Shselasky	return (p_reg);
250321936Shselasky}
251321936Shselasky
252321936Shselaskyvoid cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
253321936Shselasky{
254321936Shselasky	cl_disp_reg_info_t *p_reg;
255321936Shselasky	cl_dispatcher_t *p_disp;
256321936Shselasky
257321936Shselasky	if (handle == CL_DISP_INVALID_HANDLE)
258321936Shselasky		return;
259321936Shselasky
260321936Shselasky	p_reg = (cl_disp_reg_info_t *) handle;
261321936Shselasky	p_disp = p_reg->p_disp;
262321936Shselasky	CL_ASSERT(p_disp);
263321936Shselasky
264321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
265321936Shselasky	/*
266321936Shselasky	 * Clear the registrant vector entry.  This will cause any further
267321936Shselasky	 * post calls to fail.
268321936Shselasky	 */
269321936Shselasky	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
270321936Shselasky		CL_ASSERT(p_reg->msg_id <
271321936Shselasky			  cl_ptr_vector_get_size(&p_disp->reg_vec));
272321936Shselasky		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
273321936Shselasky	}
274321936Shselasky	cl_spinlock_release(&p_disp->lock);
275321936Shselasky
276321936Shselasky	while (p_reg->ref_cnt > 0)
277321936Shselasky		cl_thread_suspend(1);
278321936Shselasky
279321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
280321936Shselasky	/* Remove the registrant from the list. */
281321936Shselasky	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
282321936Shselasky	/* Return the registration info to the pool */
283321936Shselasky	free(p_reg);
284321936Shselasky
285321936Shselasky	cl_spinlock_release(&p_disp->lock);
286321936Shselasky}
287321936Shselasky
288321936Shselaskycl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle,
289321936Shselasky			 IN const cl_disp_msgid_t msg_id,
290321936Shselasky			 IN const void *const p_data,
291321936Shselasky			 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
292321936Shselasky			 IN const void *const context OPTIONAL)
293321936Shselasky{
294321936Shselasky	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
295321936Shselasky	cl_disp_reg_info_t *p_dest_reg;
296321936Shselasky	cl_dispatcher_t *p_disp;
297321936Shselasky	cl_disp_msg_t *p_msg;
298321936Shselasky
299321936Shselasky	p_disp = handle->p_disp;
300321936Shselasky	CL_ASSERT(p_disp);
301321936Shselasky	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
302321936Shselasky
303321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
304321936Shselasky	/* Check that the recipient exists. */
305321936Shselasky	if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) {
306321936Shselasky		cl_spinlock_release(&p_disp->lock);
307321936Shselasky		return (CL_NOT_FOUND);
308321936Shselasky	}
309321936Shselasky
310321936Shselasky	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
311321936Shselasky	if (!p_dest_reg) {
312321936Shselasky		cl_spinlock_release(&p_disp->lock);
313321936Shselasky		return (CL_NOT_FOUND);
314321936Shselasky	}
315321936Shselasky
316321936Shselasky	/* Get a free message from the pool. */
317321936Shselasky	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
318321936Shselasky	if (!p_msg) {
319321936Shselasky		cl_spinlock_release(&p_disp->lock);
320321936Shselasky		return (CL_INSUFFICIENT_MEMORY);
321321936Shselasky	}
322321936Shselasky
323321936Shselasky	/* Initialize the message */
324321936Shselasky	p_msg->p_src_reg = p_src_reg;
325321936Shselasky	p_msg->p_dest_reg = p_dest_reg;
326321936Shselasky	p_msg->p_data = p_data;
327321936Shselasky	p_msg->pfn_xmt_callback = pfn_callback;
328321936Shselasky	p_msg->context = context;
329321936Shselasky	p_msg->in_time = cl_get_time_stamp();
330321936Shselasky
331321936Shselasky	/*
332321936Shselasky	 * Increment the sender's reference count if they request a completion
333321936Shselasky	 * notification.
334321936Shselasky	 */
335321936Shselasky	if (pfn_callback)
336321936Shselasky		cl_atomic_inc(&p_src_reg->ref_cnt);
337321936Shselasky
338321936Shselasky	/* Increment the recipient's reference count. */
339321936Shselasky	cl_atomic_inc(&p_dest_reg->ref_cnt);
340321936Shselasky
341321936Shselasky	/* Queue the message in the FIFO. */
342321936Shselasky	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
343321936Shselasky	cl_spinlock_release(&p_disp->lock);
344321936Shselasky
345321936Shselasky	/* Signal the thread pool that there is work to be done. */
346321936Shselasky	cl_thread_pool_signal(&p_disp->worker_threads);
347321936Shselasky	return (CL_SUCCESS);
348321936Shselasky}
349321936Shselasky
350321936Shselaskyvoid cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
351321936Shselasky			      OUT uint32_t * p_num_queued_msgs,
352321936Shselasky			      OUT uint64_t * p_last_msg_queue_time_ms)
353321936Shselasky{
354321936Shselasky	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
355321936Shselasky
356321936Shselasky	cl_spinlock_acquire(&p_disp->lock);
357321936Shselasky
358321936Shselasky	if (p_last_msg_queue_time_ms)
359321936Shselasky		*p_last_msg_queue_time_ms =
360321936Shselasky		    p_disp->last_msg_queue_time_us / 1000;
361321936Shselasky
362321936Shselasky	if (p_num_queued_msgs)
363321936Shselasky		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
364321936Shselasky
365321936Shselasky	cl_spinlock_release(&p_disp->lock);
366321936Shselasky}
367