1/*
2 * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3 * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4 * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses.  You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
11 *
12 *     Redistribution and use in source and binary forms, with or
13 *     without modification, are permitted provided that the following
14 *     conditions are met:
15 *
16 *      - Redistributions of source code must retain the above
17 *        copyright notice, this list of conditions and the following
18 *        disclaimer.
19 *
20 *      - Redistributions in binary form must reproduce the above
21 *        copyright notice, this list of conditions and the following
22 *        disclaimer in the documentation and/or other materials
23 *        provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 *
34 */
35
36/*
37 * Abstract:
38 *    Implementation of Dispatcher abstraction.
39 *
40 */
41
42#if HAVE_CONFIG_H
43#  include <config.h>
44#endif				/* HAVE_CONFIG_H */
45
46#include <stdlib.h>
47#include <complib/cl_dispatcher.h>
48#include <complib/cl_thread.h>
49#include <complib/cl_timer.h>
50
51/* give some guidance when we build our cl_pool of messages */
52#define CL_DISP_INITIAL_MSG_COUNT   256
53#define CL_DISP_MSG_GROW_SIZE       64
54
55/* give some guidance when we build our cl_pool of registration elements */
56#define CL_DISP_INITIAL_REG_COUNT   16
57#define CL_DISP_REG_GROW_SIZE       16
58
59/********************************************************************
60   __cl_disp_worker
61
62   Description:
63   This function takes messages off the FIFO and calls Processmsg()
64   This function executes as passive level.
65
66   Inputs:
67   p_disp - Pointer to Dispatcher object
68
69   Outputs:
70   None
71
72   Returns:
73   None
74********************************************************************/
75void __cl_disp_worker(IN void *context)
76{
77	cl_disp_msg_t *p_msg;
78	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
79
80	cl_spinlock_acquire(&p_disp->lock);
81
82	/* Process the FIFO until we drain it dry. */
83	while (cl_qlist_count(&p_disp->msg_fifo)) {
84		/* Pop the message at the head from the FIFO. */
85		p_msg =
86		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
87
88		/* we track the tim ethe last message spent in the queue */
89		p_disp->last_msg_queue_time_us =
90		    cl_get_time_stamp() - p_msg->in_time;
91
92		/*
93		 * Release the spinlock while the message is processed.
94		 * The user's callback may reenter the dispatcher
95		 * and cause the lock to be reaquired.
96		 */
97		cl_spinlock_release(&p_disp->lock);
98		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99						    context,
100						    (void *)p_msg->p_data);
101
102		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103
104		/* The client has seen the data.  Notify the sender as appropriate. */
105		if (p_msg->pfn_xmt_callback) {
106			p_msg->pfn_xmt_callback((void *)p_msg->context,
107						(void *)p_msg->p_data);
108			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109		}
110
111		/* Grab the lock for the next iteration through the list. */
112		cl_spinlock_acquire(&p_disp->lock);
113
114		/* Return this message to the pool. */
115		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116	}
117
118	cl_spinlock_release(&p_disp->lock);
119}
120
121/********************************************************************
122 ********************************************************************/
123void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
124{
125	CL_ASSERT(p_disp);
126
127	cl_qlist_init(&p_disp->reg_list);
128	cl_ptr_vector_construct(&p_disp->reg_vec);
129	cl_qlist_init(&p_disp->msg_fifo);
130	cl_spinlock_construct(&p_disp->lock);
131	cl_qpool_construct(&p_disp->msg_pool);
132}
133
134/********************************************************************
135 ********************************************************************/
136void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
137{
138	CL_ASSERT(p_disp);
139
140	/* Stop the thread pool. */
141	cl_thread_pool_destroy(&p_disp->worker_threads);
142
143	/* Process all outstanding callbacks. */
144	__cl_disp_worker(p_disp);
145
146	/* Free all registration info. */
147	while (!cl_is_qlist_empty(&p_disp->reg_list))
148		free(cl_qlist_remove_head(&p_disp->reg_list));
149}
150
151/********************************************************************
152 ********************************************************************/
153void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
154{
155	CL_ASSERT(p_disp);
156
157	cl_spinlock_destroy(&p_disp->lock);
158	/* Destroy the message pool */
159	cl_qpool_destroy(&p_disp->msg_pool);
160	/* Destroy the pointer vector of registrants. */
161	cl_ptr_vector_destroy(&p_disp->reg_vec);
162}
163
164/********************************************************************
165 ********************************************************************/
166cl_status_t
167cl_disp_init(IN cl_dispatcher_t * const p_disp,
168	     IN const uint32_t thread_count, IN const char *const name)
169{
170	cl_status_t status;
171
172	CL_ASSERT(p_disp);
173
174	cl_disp_construct(p_disp);
175
176	status = cl_spinlock_init(&p_disp->lock);
177	if (status != CL_SUCCESS) {
178		cl_disp_destroy(p_disp);
179		return (status);
180	}
181
182	/* Specify no upper limit to the number of messages in the pool */
183	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
184			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
185			       NULL, NULL, NULL);
186	if (status != CL_SUCCESS) {
187		cl_disp_destroy(p_disp);
188		return (status);
189	}
190
191	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
192				    CL_DISP_REG_GROW_SIZE);
193	if (status != CL_SUCCESS) {
194		cl_disp_destroy(p_disp);
195		return (status);
196	}
197
198	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
199				     __cl_disp_worker, p_disp, name);
200	if (status != CL_SUCCESS)
201		cl_disp_destroy(p_disp);
202
203	return (status);
204}
205
206/********************************************************************
207 ********************************************************************/
208cl_disp_reg_handle_t
209cl_disp_register(IN cl_dispatcher_t * const p_disp,
210		 IN const cl_disp_msgid_t msg_id,
211		 IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL,
212		 IN const void *const context OPTIONAL)
213{
214	cl_disp_reg_info_t *p_reg;
215	cl_status_t status;
216
217	CL_ASSERT(p_disp);
218
219	/* Check that the requested registrant ID is available. */
220	cl_spinlock_acquire(&p_disp->lock);
221	if ((msg_id != CL_DISP_MSGID_NONE) &&
222	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
223	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
224		cl_spinlock_release(&p_disp->lock);
225		return (NULL);
226	}
227
228	/* Get a registration info from the pool. */
229	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
230	if (!p_reg) {
231		cl_spinlock_release(&p_disp->lock);
232		return (NULL);
233	} else {
234		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
235	}
236
237	p_reg->p_disp = p_disp;
238	p_reg->ref_cnt = 0;
239	p_reg->pfn_rcv_callback = pfn_callback;
240	p_reg->context = context;
241	p_reg->msg_id = msg_id;
242
243	/* Insert the registration in the list. */
244	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
245
246	/* Set the array entry to the registrant. */
247	/* The ptr_vector grow automatically as necessary. */
248	if (msg_id != CL_DISP_MSGID_NONE) {
249		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
250		if (status != CL_SUCCESS) {
251			free(p_reg);
252			cl_spinlock_release(&p_disp->lock);
253			return (NULL);
254		}
255	}
256
257	cl_spinlock_release(&p_disp->lock);
258
259	return (p_reg);
260}
261
262/********************************************************************
263 ********************************************************************/
264void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
265{
266	cl_disp_reg_info_t *p_reg;
267	cl_dispatcher_t *p_disp;
268
269	if (handle == CL_DISP_INVALID_HANDLE)
270		return;
271
272	p_reg = (cl_disp_reg_info_t *) handle;
273	p_disp = p_reg->p_disp;
274	CL_ASSERT(p_disp);
275
276	cl_spinlock_acquire(&p_disp->lock);
277	/*
278	 * Clear the registrant vector entry.  This will cause any further
279	 * post calls to fail.
280	 */
281	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
282		CL_ASSERT(p_reg->msg_id <
283			  cl_ptr_vector_get_size(&p_disp->reg_vec));
284		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
285	}
286	cl_spinlock_release(&p_disp->lock);
287
288	while (p_reg->ref_cnt > 0)
289		cl_thread_suspend(1);
290
291	cl_spinlock_acquire(&p_disp->lock);
292	/* Remove the registrant from the list. */
293	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
294	/* Return the registration info to the pool */
295	free(p_reg);
296
297	cl_spinlock_release(&p_disp->lock);
298}
299
300/********************************************************************
301 ********************************************************************/
302cl_status_t
303cl_disp_post(IN const cl_disp_reg_handle_t handle,
304	     IN const cl_disp_msgid_t msg_id,
305	     IN const void *const p_data,
306	     IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
307	     IN const void *const context OPTIONAL)
308{
309	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
310	cl_disp_reg_info_t *p_dest_reg;
311	cl_dispatcher_t *p_disp;
312	cl_disp_msg_t *p_msg;
313
314	p_disp = handle->p_disp;
315	CL_ASSERT(p_disp);
316	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
317
318	cl_spinlock_acquire(&p_disp->lock);
319	/* Check that the recipient exists. */
320	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
321	if (!p_dest_reg) {
322		cl_spinlock_release(&p_disp->lock);
323		return (CL_NOT_FOUND);
324	}
325
326	/* Get a free message from the pool. */
327	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
328	if (!p_msg) {
329		cl_spinlock_release(&p_disp->lock);
330		return (CL_INSUFFICIENT_MEMORY);
331	}
332
333	/* Initialize the message */
334	p_msg->p_src_reg = p_src_reg;
335	p_msg->p_dest_reg = p_dest_reg;
336	p_msg->p_data = p_data;
337	p_msg->pfn_xmt_callback = pfn_callback;
338	p_msg->context = context;
339	p_msg->in_time = cl_get_time_stamp();
340
341	/*
342	 * Increment the sender's reference count if they request a completion
343	 * notification.
344	 */
345	if (pfn_callback)
346		cl_atomic_inc(&p_src_reg->ref_cnt);
347
348	/* Increment the recipient's reference count. */
349	cl_atomic_inc(&p_dest_reg->ref_cnt);
350
351	/* Queue the message in the FIFO. */
352	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
353	cl_spinlock_release(&p_disp->lock);
354
355	/* Signal the thread pool that there is work to be done. */
356	cl_thread_pool_signal(&p_disp->worker_threads);
357	return (CL_SUCCESS);
358}
359
360void
361cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
362			 OUT uint32_t * p_num_queued_msgs,
363			 OUT uint64_t * p_last_msg_queue_time_ms)
364{
365	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
366
367	cl_spinlock_acquire(&p_disp->lock);
368
369	if (p_last_msg_queue_time_ms)
370		*p_last_msg_queue_time_ms =
371		    p_disp->last_msg_queue_time_us / 1000;
372
373	if (p_num_queued_msgs)
374		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
375
376	cl_spinlock_release(&p_disp->lock);
377}
378