cl_dispatcher.c revision 321936
1139743Simp/*
243412Snewton * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
343412Snewton * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
443412Snewton * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
543412Snewton *
643412Snewton * This software is available to you under a choice of one of two
743412Snewton * licenses.  You may choose to be licensed under the terms of the GNU
843412Snewton * General Public License (GPL) Version 2, available from the file
943412Snewton * COPYING in the main directory of this source tree, or the
1043412Snewton * OpenIB.org BSD license below:
1143412Snewton *
1243412Snewton *     Redistribution and use in source and binary forms, with or
1343412Snewton *     without modification, are permitted provided that the following
1443412Snewton *     conditions are met:
1543412Snewton *
1643412Snewton *      - Redistributions of source code must retain the above
1743412Snewton *        copyright notice, this list of conditions and the following
1843412Snewton *        disclaimer.
1943412Snewton *
2043412Snewton *      - Redistributions in binary form must reproduce the above
2143412Snewton *        copyright notice, this list of conditions and the following
2243412Snewton *        disclaimer in the documentation and/or other materials
2343412Snewton *        provided with the distribution.
2443412Snewton *
2543412Snewton * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
2643412Snewton * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
2749267Snewton * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
2850477Speter * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
2943412Snewton * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
3043412Snewton * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
3143412Snewton * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
3243412Snewton * SOFTWARE.
3343412Snewton *
3443412Snewton */
3543412Snewton
3643412Snewton/*
3743412Snewton * Abstract:
3843412Snewton *    Implementation of Dispatcher abstraction.
3943412Snewton *
4043412Snewton */
4143412Snewton
4243412Snewton#if HAVE_CONFIG_H
4343412Snewton#  include <config.h>
4443412Snewton#endif				/* HAVE_CONFIG_H */
4543412Snewton
4643412Snewton#include <stdlib.h>
4743412Snewton#include <complib/cl_dispatcher.h>
4843412Snewton#include <complib/cl_thread.h>
4943412Snewton#include <complib/cl_timer.h>
5043412Snewton
5143412Snewton/* give some guidance when we build our cl_pool of messages */
5243412Snewton#define CL_DISP_INITIAL_MSG_COUNT   256
5343412Snewton#define CL_DISP_MSG_GROW_SIZE       64
5443412Snewton
5543412Snewton/* give some guidance when we build our cl_pool of registration elements */
5643412Snewton#define CL_DISP_INITIAL_REG_COUNT   16
5743412Snewton#define CL_DISP_REG_GROW_SIZE       16
5843412Snewton
5943412Snewton/********************************************************************
6043412Snewton   __cl_disp_worker
6143412Snewton
6243412Snewton   Description:
6343412Snewton   This function takes messages off the FIFO and calls Processmsg()
6443412Snewton   This function executes as passive level.
6543412Snewton
6643412Snewton   Inputs:
6743412Snewton   p_disp - Pointer to Dispatcher object
6843412Snewton
6943412Snewton   Outputs:
7043412Snewton   None
7143412Snewton
7243412Snewton   Returns:
7343412Snewton   None
7443412Snewton********************************************************************/
7543412Snewtonvoid __cl_disp_worker(IN void *context)
7643412Snewton{
7743412Snewton	cl_disp_msg_t *p_msg;
7843412Snewton	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
7943412Snewton
8043412Snewton	cl_spinlock_acquire(&p_disp->lock);
8143412Snewton
8243412Snewton	/* Process the FIFO until we drain it dry. */
8343412Snewton	while (cl_qlist_count(&p_disp->msg_fifo)) {
8443412Snewton		/* Pop the message at the head from the FIFO. */
8543412Snewton		p_msg =
8643412Snewton		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
8743412Snewton
88		/* we track the tim ethe last message spent in the queue */
89		p_disp->last_msg_queue_time_us =
90		    cl_get_time_stamp() - p_msg->in_time;
91
92		/*
93		 * Release the spinlock while the message is processed.
94		 * The user's callback may reenter the dispatcher
95		 * and cause the lock to be reaquired.
96		 */
97		cl_spinlock_release(&p_disp->lock);
98		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99						    context,
100						    (void *)p_msg->p_data);
101
102		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103
104		/* The client has seen the data.  Notify the sender as appropriate. */
105		if (p_msg->pfn_xmt_callback) {
106			p_msg->pfn_xmt_callback((void *)p_msg->context,
107						(void *)p_msg->p_data);
108			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109		}
110
111		/* Grab the lock for the next iteration through the list. */
112		cl_spinlock_acquire(&p_disp->lock);
113
114		/* Return this message to the pool. */
115		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116	}
117
118	cl_spinlock_release(&p_disp->lock);
119}
120
121void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
122{
123	CL_ASSERT(p_disp);
124
125	cl_qlist_init(&p_disp->reg_list);
126	cl_ptr_vector_construct(&p_disp->reg_vec);
127	cl_qlist_init(&p_disp->msg_fifo);
128	cl_spinlock_construct(&p_disp->lock);
129	cl_qpool_construct(&p_disp->msg_pool);
130}
131
132void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
133{
134	CL_ASSERT(p_disp);
135
136	/* Stop the thread pool. */
137	cl_thread_pool_destroy(&p_disp->worker_threads);
138
139	/* Process all outstanding callbacks. */
140	__cl_disp_worker(p_disp);
141
142	/* Free all registration info. */
143	while (!cl_is_qlist_empty(&p_disp->reg_list))
144		free(cl_qlist_remove_head(&p_disp->reg_list));
145}
146
147void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
148{
149	CL_ASSERT(p_disp);
150
151	cl_spinlock_destroy(&p_disp->lock);
152	/* Destroy the message pool */
153	cl_qpool_destroy(&p_disp->msg_pool);
154	/* Destroy the pointer vector of registrants. */
155	cl_ptr_vector_destroy(&p_disp->reg_vec);
156}
157
158cl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp,
159			 IN const uint32_t thread_count,
160			 IN const char *const name)
161{
162	cl_status_t status;
163
164	CL_ASSERT(p_disp);
165
166	cl_disp_construct(p_disp);
167
168	status = cl_spinlock_init(&p_disp->lock);
169	if (status != CL_SUCCESS) {
170		cl_disp_destroy(p_disp);
171		return (status);
172	}
173
174	/* Specify no upper limit to the number of messages in the pool */
175	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
176			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
177			       NULL, NULL, NULL);
178	if (status != CL_SUCCESS) {
179		cl_disp_destroy(p_disp);
180		return (status);
181	}
182
183	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
184				    CL_DISP_REG_GROW_SIZE);
185	if (status != CL_SUCCESS) {
186		cl_disp_destroy(p_disp);
187		return (status);
188	}
189
190	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
191				     __cl_disp_worker, p_disp, name);
192	if (status != CL_SUCCESS)
193		cl_disp_destroy(p_disp);
194
195	return (status);
196}
197
198cl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp,
199				      IN const cl_disp_msgid_t msg_id,
200				      IN cl_pfn_msgrcv_cb_t pfn_callback
201				      OPTIONAL,
202				      IN const void *const context OPTIONAL)
203{
204	cl_disp_reg_info_t *p_reg;
205	cl_status_t status;
206
207	CL_ASSERT(p_disp);
208
209	/* Check that the requested registrant ID is available. */
210	cl_spinlock_acquire(&p_disp->lock);
211	if ((msg_id != CL_DISP_MSGID_NONE) &&
212	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
213	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
214		cl_spinlock_release(&p_disp->lock);
215		return (NULL);
216	}
217
218	/* Get a registration info from the pool. */
219	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
220	if (!p_reg) {
221		cl_spinlock_release(&p_disp->lock);
222		return (NULL);
223	} else {
224		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
225	}
226
227	p_reg->p_disp = p_disp;
228	p_reg->ref_cnt = 0;
229	p_reg->pfn_rcv_callback = pfn_callback;
230	p_reg->context = context;
231	p_reg->msg_id = msg_id;
232
233	/* Insert the registration in the list. */
234	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
235
236	/* Set the array entry to the registrant. */
237	/* The ptr_vector grow automatically as necessary. */
238	if (msg_id != CL_DISP_MSGID_NONE) {
239		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
240		if (status != CL_SUCCESS) {
241			free(p_reg);
242			cl_spinlock_release(&p_disp->lock);
243			return (NULL);
244		}
245	}
246
247	cl_spinlock_release(&p_disp->lock);
248
249	return (p_reg);
250}
251
252void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
253{
254	cl_disp_reg_info_t *p_reg;
255	cl_dispatcher_t *p_disp;
256
257	if (handle == CL_DISP_INVALID_HANDLE)
258		return;
259
260	p_reg = (cl_disp_reg_info_t *) handle;
261	p_disp = p_reg->p_disp;
262	CL_ASSERT(p_disp);
263
264	cl_spinlock_acquire(&p_disp->lock);
265	/*
266	 * Clear the registrant vector entry.  This will cause any further
267	 * post calls to fail.
268	 */
269	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
270		CL_ASSERT(p_reg->msg_id <
271			  cl_ptr_vector_get_size(&p_disp->reg_vec));
272		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
273	}
274	cl_spinlock_release(&p_disp->lock);
275
276	while (p_reg->ref_cnt > 0)
277		cl_thread_suspend(1);
278
279	cl_spinlock_acquire(&p_disp->lock);
280	/* Remove the registrant from the list. */
281	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
282	/* Return the registration info to the pool */
283	free(p_reg);
284
285	cl_spinlock_release(&p_disp->lock);
286}
287
288cl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle,
289			 IN const cl_disp_msgid_t msg_id,
290			 IN const void *const p_data,
291			 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
292			 IN const void *const context OPTIONAL)
293{
294	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
295	cl_disp_reg_info_t *p_dest_reg;
296	cl_dispatcher_t *p_disp;
297	cl_disp_msg_t *p_msg;
298
299	p_disp = handle->p_disp;
300	CL_ASSERT(p_disp);
301	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
302
303	cl_spinlock_acquire(&p_disp->lock);
304	/* Check that the recipient exists. */
305	if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) {
306		cl_spinlock_release(&p_disp->lock);
307		return (CL_NOT_FOUND);
308	}
309
310	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
311	if (!p_dest_reg) {
312		cl_spinlock_release(&p_disp->lock);
313		return (CL_NOT_FOUND);
314	}
315
316	/* Get a free message from the pool. */
317	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
318	if (!p_msg) {
319		cl_spinlock_release(&p_disp->lock);
320		return (CL_INSUFFICIENT_MEMORY);
321	}
322
323	/* Initialize the message */
324	p_msg->p_src_reg = p_src_reg;
325	p_msg->p_dest_reg = p_dest_reg;
326	p_msg->p_data = p_data;
327	p_msg->pfn_xmt_callback = pfn_callback;
328	p_msg->context = context;
329	p_msg->in_time = cl_get_time_stamp();
330
331	/*
332	 * Increment the sender's reference count if they request a completion
333	 * notification.
334	 */
335	if (pfn_callback)
336		cl_atomic_inc(&p_src_reg->ref_cnt);
337
338	/* Increment the recipient's reference count. */
339	cl_atomic_inc(&p_dest_reg->ref_cnt);
340
341	/* Queue the message in the FIFO. */
342	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
343	cl_spinlock_release(&p_disp->lock);
344
345	/* Signal the thread pool that there is work to be done. */
346	cl_thread_pool_signal(&p_disp->worker_threads);
347	return (CL_SUCCESS);
348}
349
350void cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
351			      OUT uint32_t * p_num_queued_msgs,
352			      OUT uint64_t * p_last_msg_queue_time_ms)
353{
354	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
355
356	cl_spinlock_acquire(&p_disp->lock);
357
358	if (p_last_msg_queue_time_ms)
359		*p_last_msg_queue_time_ms =
360		    p_disp->last_msg_queue_time_us / 1000;
361
362	if (p_num_queued_msgs)
363		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
364
365	cl_spinlock_release(&p_disp->lock);
366}
367