dapl_evd_util.c revision 9517:b4839b0aa7a4
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
24 */
25
26/*
27 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
28 * Use is subject to license terms.
29 */
30
31/*
32 *
33 * MODULE: dapl_evd_util.c
34 *
35 * PURPOSE: Manage EVD Info structure
36 *
37 * $Id: dapl_evd_util.c,v 1.41 2003/08/20 13:18:36 sjs2 Exp $
38 */
39
40#include <sys/time.h>
41#include <strings.h>
42#include "dapl_evd_util.h"
43#include "dapl_ia_util.h"
44#include "dapl_cno_util.h"
45#include "dapl_ring_buffer_util.h"
46#include "dapl_adapter_util.h"
47#include "dapl_tavor_ibtf_impl.h"
48#include "dapl_cookie.h"
49#include "dapl.h"
50
51
52#ifdef	DAPL_DBG	/* For debugging.  */
53static void
54dapli_evd_eh_print_cqe(
55	IN  ib_work_completion_t	cqe);
56#endif
57
58static DAT_BOOLEAN
59dapli_evd_cqe_to_event(
60    IN DAPL_EVD			*evd_ptr,
61    IN ib_work_completion_t	*cqe_ptr,
62    IN DAT_BOOLEAN		process_premature_events,
63    OUT DAT_EVENT		*event_ptr);
64
65static DAT_RETURN
66dapli_evd_event_alloc(
67	IN  DAPL_EVD		*evd_ptr,
68	IN  DAPL_CNO		*cno_ptr,
69	IN  DAT_COUNT		qlen);
70
71
72/*
73 * dapls_evd_internal_create
74 *
75 * actually create the evd.  this is called after all parameter checking
76 * has been performed in dapl_ep_create.  it is also called from dapl_ia_open
77 * to create the default async evd.
78 *
79 * Input:
80 * 	ia_ptr
81 *	cno_ptr
82 *	qlen
83 *	evd_flags
84 *
85 * Output:
86 * 	evd_ptr_ptr
87 *
88 * Returns:
89 * 	none
90 *
91 */
92
93DAT_RETURN
94dapls_evd_internal_create(
95    DAPL_IA		*ia_ptr,
96    DAPL_CNO		*cno_ptr,
97    DAT_COUNT		min_qlen,
98    DAT_EVD_FLAGS	evd_flags,
99    DAPL_EVD		**evd_ptr_ptr)
100{
101	DAPL_EVD	*evd_ptr;
102	DAT_COUNT	cq_len;
103	DAT_RETURN	dat_status;
104
105	dat_status	= DAT_SUCCESS;
106	*evd_ptr_ptr	= NULL;
107	cq_len		= min_qlen;
108
109	evd_ptr = dapls_evd_alloc(ia_ptr,
110	    cno_ptr,
111	    evd_flags,
112	    min_qlen);
113	if (!evd_ptr) {
114		dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,
115		    DAT_RESOURCE_MEMORY);
116		goto bail;
117	}
118
119	/*
120	 * If we are dealing with event streams besides a CQ event stream,
121	 * be conservative and set producer side locking.  Otherwise, no.
122	 */
123	evd_ptr->evd_producer_locking_needed =
124	    ((evd_flags & ~ (DAT_EVD_DTO_FLAG|DAT_EVD_RMR_BIND_FLAG)) != 0);
125
126	/* Before we setup any callbacks, transition state to OPEN.  */
127	evd_ptr->evd_state = DAPL_EVD_STATE_OPEN;
128
129	/*
130	 * we need to call cq_alloc even for connection/cr/async evds
131	 * since all the allocation happens there.
132	 */
133	dat_status = dapls_ib_cq_alloc(ia_ptr,
134	    evd_ptr, cno_ptr, &cq_len);
135	if (dat_status != DAT_SUCCESS) {
136		goto bail;
137	}
138
139	dat_status = dapls_ib_setup_async_callback(
140	    ia_ptr,
141	    DAPL_ASYNC_CQ_COMPLETION,
142	    (unsigned int *) evd_ptr->ib_cq_handle,
143	    (ib_async_handler_t)dapl_evd_dto_callback,
144	    evd_ptr);
145	if (dat_status != DAT_SUCCESS) {
146		goto bail;
147	}
148	/*
149	 * cq_notify is not required since when evd_wait is called
150	 * time we go and poll cq anyways.
151	 * dat_status = dapls_set_cq_notify(ia_ptr, evd_ptr);
152	 */
153
154	/*
155	 * We now have an accurate count of events, so allocate them into
156	 * the EVD
157	 */
158	dat_status = dapli_evd_event_alloc(evd_ptr, cno_ptr, cq_len);
159	if (dat_status != DAT_SUCCESS) {
160		goto bail;
161	}
162
163	/* We're assuming success in the following.   */
164	dapl_os_assert(dat_status == DAT_SUCCESS);
165	dapl_ia_link_evd(ia_ptr, evd_ptr);
166	*evd_ptr_ptr = evd_ptr;
167
168bail:
169	if (dat_status != DAT_SUCCESS) {
170		if (evd_ptr) {
171			(void) dapls_evd_dealloc(evd_ptr);
172		}
173	}
174
175	return (dat_status);
176}
177
178/*
179 * dapls_evd_alloc
180 *
181 * alloc and initialize an EVD struct
182 *
183 * Input:
184 * 	ia
185 *
186 * Output:
187 * 	evd_ptr
188 *
189 * Returns:
190 * 	none
191 *
192 */
193DAPL_EVD *
194dapls_evd_alloc(
195    IN DAPL_IA		*ia_ptr,
196    IN DAPL_CNO		*cno_ptr,
197    IN DAT_EVD_FLAGS	evd_flags,
198    IN DAT_COUNT	qlen) /* ARGSUSED */
199{
200	DAPL_EVD	*evd_ptr;
201
202	evd_ptr    = NULL;
203
204	/* Allocate EVD */
205	evd_ptr = (DAPL_EVD *)dapl_os_alloc(sizeof (DAPL_EVD));
206	if (!evd_ptr) {
207		goto bail;
208	}
209
210	/* zero the structure */
211	(void) dapl_os_memzero(evd_ptr, sizeof (DAPL_EVD));
212
213	/*
214	 * initialize the header
215	 */
216	evd_ptr->header.provider		= ia_ptr->header.provider;
217	evd_ptr->header.magic			= DAPL_MAGIC_EVD;
218	evd_ptr->header.handle_type		= DAT_HANDLE_TYPE_EVD;
219	evd_ptr->header.owner_ia		= ia_ptr;
220	evd_ptr->header.user_context.as_64	= 0;
221	evd_ptr->header.user_context.as_ptr	= NULL;
222	dapl_llist_init_entry(&evd_ptr->header.ia_list_entry);
223	dapl_os_lock_init(&evd_ptr->header.lock);
224
225	/*
226	 * Initialize the body
227	 */
228	evd_ptr->evd_state	= DAPL_EVD_STATE_INITIAL;
229	evd_ptr->evd_flags	= evd_flags;
230	evd_ptr->evd_enabled	= DAT_TRUE;
231	evd_ptr->evd_waitable	= DAT_TRUE;
232	evd_ptr->evd_producer_locking_needed = 1; /* Conservative value.  */
233	evd_ptr->ib_cq_handle	= IB_INVALID_HANDLE;
234	evd_ptr->evd_ref_count	= 0;
235	evd_ptr->catastrophic_overflow = DAT_FALSE;
236	evd_ptr->qlen		= qlen;
237
238	dapl_llist_init_entry(&evd_ptr->cno_list_entry);
239	evd_ptr->completion_type = DAPL_EVD_STATE_THRESHOLD;
240	(void) dapl_os_wait_object_init(&evd_ptr->wait_object);
241
242bail:
243	return (evd_ptr);
244}
245
246
247/*
248 * dapls_evd_event_alloc
249 *
250 * alloc events into an EVD.
251 *
252 * Input:
253 * 	evd_ptr
254 *	qlen
255 *
256 * Output:
257 * 	NONE
258 *
259 * Returns:
260 * 	DAT_SUCCESS
261 *	ERROR
262 *
263 */
264DAT_RETURN
265dapli_evd_event_alloc(
266    IN DAPL_EVD		*evd_ptr,
267    IN  DAPL_CNO	*cno_ptr,
268    IN DAT_COUNT	qlen)
269{
270	DAT_EVENT	*event_ptr;
271	DAT_COUNT	i;
272	DAT_RETURN	dat_status;
273
274	dat_status = DAT_SUCCESS;
275	event_ptr  = NULL;
276
277	/* Allocate EVENTs */
278	event_ptr = (DAT_EVENT *) dapl_os_alloc(qlen * sizeof (DAT_EVENT));
279	if (!event_ptr) {
280		goto bail;
281	}
282	evd_ptr->events = event_ptr;
283	evd_ptr->qlen = qlen;
284
285	/* allocate free event queue */
286	dat_status = dapls_rbuf_alloc(&evd_ptr->free_event_queue, qlen);
287	if (dat_status != DAT_SUCCESS) {
288		goto bail;
289	}
290
291	/* allocate pending event queue */
292	dat_status = dapls_rbuf_alloc(&evd_ptr->pending_event_queue, qlen);
293	if (dat_status != DAT_SUCCESS) {
294		goto bail;
295	}
296
297	/* add events to free event queue */
298	for (i = 0; i < qlen; i++) {
299		dat_status = dapls_rbuf_add(&evd_ptr->free_event_queue,
300		    (void *)event_ptr);
301		dapl_os_assert(dat_status == DAT_SUCCESS);
302		event_ptr++;
303	}
304	evd_ptr->cq_notified = DAT_FALSE;
305	evd_ptr->cq_notified_when = 0;
306	evd_ptr->cno_active_count = 0;
307	if (cno_ptr != NULL) {
308		dapl_os_lock(&cno_ptr->header.lock);
309		dapl_llist_add_head(&cno_ptr->evd_list_head,
310		    &evd_ptr->cno_list_entry, evd_ptr);
311		/* Take a reference count on the CNO */
312		dapl_os_atomic_inc(&cno_ptr->cno_ref_count);
313		dapl_os_unlock(&cno_ptr->header.lock);
314	}
315	evd_ptr->cno_ptr = cno_ptr;
316	evd_ptr->threshold = 0;
317
318bail:
319	return (dat_status);
320}
321
322
323/*
324 * dapls_evd_dealloc
325 *
326 * Free the passed in EVD structure. If an error occurs, this function
327 * will clean up all of the internal data structures and report the
328 * error.
329 *
330 * Input:
331 * 	evd_ptr
332 *
333 * Output:
334 * 	none
335 *
336 * Returns:
337 * 	status
338 *
339 */
340DAT_RETURN
341dapls_evd_dealloc(
342    IN DAPL_EVD		*evd_ptr)
343{
344	DAT_RETURN	dat_status;
345	DAPL_IA	*ia_ptr;
346
347	dat_status = DAT_SUCCESS;
348
349	dapl_os_assert(evd_ptr->header.magic == DAPL_MAGIC_EVD);
350	dapl_os_assert(evd_ptr->evd_ref_count == 0);
351
352	/*
353	 * Destroy the CQ first, to keep any more callbacks from coming
354	 * up from it.
355	 */
356	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
357		ia_ptr = evd_ptr->header.owner_ia;
358
359		dat_status = dapls_ib_cq_free(ia_ptr, evd_ptr);
360		if (dat_status != DAT_SUCCESS) {
361			goto bail;
362		}
363	}
364
365	/*
366	 * We should now be safe to invalidate the EVD; reset the
367	 * magic to prevent reuse.
368	 */
369	evd_ptr->header.magic = DAPL_MAGIC_INVALID;
370
371	/* Release reference on the CNO if it exists */
372	if (evd_ptr->cno_ptr != NULL) {
373		dapl_os_lock(&evd_ptr->cno_ptr->header.lock);
374		(void) dapl_llist_remove_entry(&evd_ptr->cno_ptr->evd_list_head,
375		    &evd_ptr->cno_list_entry);
376		dapl_os_atomic_dec(&evd_ptr->cno_ptr->cno_ref_count);
377		dapl_os_unlock(&evd_ptr->cno_ptr->header.lock);
378	}
379
380	/*
381	 * If the ring buffer allocation failed, then the dapls_rbuf_destroy
382	 * function will detect that the ring buffer's internal data (ex. base
383	 * pointer) are invalid and will handle the situation appropriately
384	 */
385	dapls_rbuf_destroy(&evd_ptr->free_event_queue);
386	dapls_rbuf_destroy(&evd_ptr->pending_event_queue);
387
388	if (evd_ptr->events) {
389		dapl_os_free(evd_ptr->events,
390		    evd_ptr->qlen * sizeof (DAT_EVENT));
391	}
392
393	(void) dapl_os_wait_object_destroy(&evd_ptr->wait_object);
394	dapl_os_free(evd_ptr, sizeof (DAPL_EVD));
395
396bail:
397	return (dat_status);
398}
399
400
401/*
402 * dapli_evd_eh_print_cqe
403 *
404 * Input:
405 *	cqe
406 *
407 * Output:
408 *	none
409 *
410 * Prints out a CQE for debug purposes
411 *
412 */
413
414#ifdef	DAPL_DBG	/* For debugging.  */
415void
416dapli_evd_eh_print_cqe(
417    IN 	ib_work_completion_t	cqe)
418{
419	static char *optable[] = {
420		"",
421		"OP_SEND",
422		"OP_RDMA_READ",
423		"OP_RDMA_WRITE",
424		"OP_COMP_AND_SWAP",
425		"OP_FETCH_AND_ADD",
426		"OP_BIND_MW",
427		"OP_RECEIVE",
428		"OP_RECEIVE_RDMAWI",
429		0
430	};
431	DAPL_COOKIE		*dto_cookie;
432
433	dto_cookie = (DAPL_COOKIE *) (uintptr_t)DAPL_GET_CQE_WRID(&cqe);
434
435	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
436	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
437	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
438	    "\t dapl_evd_dto_callback : CQE \n");
439	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
440	    "\t\t work_req_id 0x%llx\n", DAPL_GET_CQE_WRID(&cqe));
441	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
442	    "\t\t op_type: %s\n", optable[DAPL_GET_CQE_OPTYPE(&cqe)]);
443	if ((DAPL_GET_CQE_OPTYPE(&cqe) == OP_SEND) ||
444	    (DAPL_GET_CQE_OPTYPE(&cqe) == OP_RDMA_WRITE)) {
445		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
446		    "\t\t bytes_num %d\n", dto_cookie->val.dto.size);
447	} else {
448		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
449		    "\t\t bytes_num %d\n", DAPL_GET_CQE_BYTESNUM(&cqe));
450	}
451	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
452	    "\t\t status %d\n", DAPL_GET_CQE_STATUS(&cqe));
453	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
454	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
455}
456#endif
457
458/*
459 * Event posting code follows.
460 */
461
462/*
463 * These next two functions (dapli_evd_get_event and dapli_evd_post_event)
464 * are a pair.  They are always called together, from one of the functions
465 * at the end of this file (dapl_evd_post_*_event).
466 *
467 * Note that if producer side locking is enabled, the first one takes the
468 * EVD lock and the second releases it.
469 */
470
471/*
472 * dapli_evd_get_event
473 *
474 * Get an event struct from the evd.  The caller should fill in the event
475 * and call dapl_evd_post_event.
476 *
477 * If there are no events available, an overflow event is generated to the
478 * async EVD handler.
479 *
480 * If this EVD required producer locking, a successful return implies
481 * that the lock is held.
482 *
483 * Input:
484 * 	evd_ptr
485 *
486 * Output:
487 *	event
488 *
489 */
490
491static DAT_EVENT *
492dapli_evd_get_event(
493    DAPL_EVD *evd_ptr)
494{
495	DAT_EVENT	*event;
496
497	if (evd_ptr->evd_producer_locking_needed) {
498		dapl_os_lock(&evd_ptr->header.lock);
499	}
500
501	event = (DAT_EVENT *)dapls_rbuf_remove(&evd_ptr->free_event_queue);
502
503	/* Release the lock if it was taken and the call failed.  */
504	if (!event && evd_ptr->evd_producer_locking_needed) {
505		dapl_os_unlock(&evd_ptr->header.lock);
506	}
507
508	return (event);
509}
510
511/*
512 * dapli_evd_post_event
513 *
514 * Post the <event> to the evd.  If possible, invoke the evd's CNO.
515 * Otherwise post the event on the pending queue.
516 *
517 * If producer side locking is required, the EVD lock must be held upon
518 * entry to this function.
519 *
520 * Input:
521 * 	evd_ptr
522 * 	event
523 *
524 * Output:
525 *	none
526 *
527 */
528
529static void
530dapli_evd_post_event(
531    IN	DAPL_EVD	*evd_ptr,
532    IN	const DAT_EVENT	*event_ptr)
533{
534	DAT_RETURN	dat_status;
535	DAPL_CNO 	*cno_to_trigger = NULL;
536
537	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
538	    "dapli_evd_post_event: Called with event # %x\n",
539	    event_ptr->event_number);
540
541	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
542	    (void *)event_ptr);
543	dapl_os_assert(dat_status == DAT_SUCCESS);
544
545	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
546	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);
547
548	if (evd_ptr->evd_state == DAPL_EVD_STATE_OPEN) {
549		/* No waiter.  Arrange to trigger a CNO if it exists.  */
550
551		if (evd_ptr->evd_enabled) {
552			cno_to_trigger = evd_ptr->cno_ptr;
553		}
554		if (evd_ptr->evd_producer_locking_needed) {
555			dapl_os_unlock(&evd_ptr->header.lock);
556		}
557	} else {
558		/*
559		 * This routine gets called
560		 *  - In the context of the waiting thread when CQ, CM or ASYNC
561		 *    events need to be put on to the EVD ring buffer.
562		 *  - Due to a post of a software event.
563		 *
564		 * In the first case the waiting thread is pulling the events
565		 * from various streams into the evd so there is no need to
566		 * wake any thread. In the second case if the evd is in waited
567		 * state then we need to wakeup the waiting thread.
568		 */
569		if (event_ptr->event_number == DAT_SOFTWARE_EVENT) {
570			/*
571			 * We're in DAPL_EVD_STATE_WAITED.  Take the lock if
572			 * we don't have it, recheck, and signal.
573			 */
574
575			if (!evd_ptr->evd_producer_locking_needed) {
576				dapl_os_lock(&evd_ptr->header.lock);
577			}
578
579			if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED) {
580				dapl_os_unlock(&evd_ptr->header.lock);
581				(void) dapls_ib_event_wakeup(evd_ptr);
582			} else {
583				dapl_os_unlock(&evd_ptr->header.lock);
584			}
585		} else {
586			if (evd_ptr->evd_producer_locking_needed) {
587				dapl_os_unlock(&evd_ptr->header.lock);
588			}
589		}
590	}
591
592	if (cno_to_trigger != NULL) {
593		dapl_cno_trigger(cno_to_trigger, evd_ptr);
594	}
595}
596
597/*
598 * dapli_evd_post_event_nosignal
599 *
600 * Post the <event> to the evd.  Do not do any wakeup processing.
601 * This function should only be called if it is known that there are
602 * no waiters that it is appropriate to wakeup on this EVD.  An example
603 * of such a situation is during internal dat_evd_wait() processing.
604 *
605 * If producer side locking is required, the EVD lock must be held upon
606 * entry to this function.
607 *
608 * Input:
609 * 	evd_ptr
610 * 	event
611 *
612 * Output:
613 *	none
614 *
615 */
616
617static void
618dapli_evd_post_event_nosignal(
619    IN	DAPL_EVD	*evd_ptr,
620    IN	const DAT_EVENT	*event_ptr)
621{
622	DAT_RETURN	dat_status;
623
624	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
625	    "dapli_evd_post_event: Called with event # %x\n",
626	    event_ptr->event_number);
627
628	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
629	    (void *)event_ptr);
630	dapl_os_assert(dat_status == DAT_SUCCESS);
631
632	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
633	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);
634
635	if (evd_ptr->evd_producer_locking_needed) {
636		dapl_os_unlock(&evd_ptr->header.lock);
637	}
638}
639
640/*
641 * dapli_evd_format_overflow_event
642 *
643 * format an overflow event for posting
644 *
645 * Input:
646 * 	evd_ptr
647 * 	event_ptr
648 *
649 * Output:
650 *	none
651 *
652 */
653static void
654dapli_evd_format_overflow_event(
655	IN  DAPL_EVD  *evd_ptr,
656	OUT DAT_EVENT *event_ptr)
657{
658	DAPL_IA *ia_ptr;
659
660	ia_ptr = evd_ptr->header.owner_ia;
661
662	event_ptr->evd_handle   = (DAT_EVD_HANDLE)evd_ptr;
663	event_ptr->event_number = DAT_ASYNC_ERROR_EVD_OVERFLOW;
664	event_ptr->event_data.asynch_error_event_data.dat_handle =
665	    (DAT_HANDLE)ia_ptr;
666}
667
668/*
669 * dapli_evd_post_overflow_event
670 *
671 * post an overflow event
672 *
673 * Input:
674 * 	async_evd_ptr
675 * 	evd_ptr
676 *
677 * Output:
678 *	none
679 *
680 */
681static void
682dapli_evd_post_overflow_event(
683    IN  DAPL_EVD  *async_evd_ptr,
684    IN  DAPL_EVD  *overflow_evd_ptr)
685{
686	DAT_EVENT *overflow_event;
687
688	/*
689	 * The overflow_evd_ptr mght be the same as evd.
690	 * In that case we've got a catastrophic overflow.
691	 */
692	if (async_evd_ptr == overflow_evd_ptr) {
693		async_evd_ptr->catastrophic_overflow = DAT_TRUE;
694		async_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
695		return;
696	}
697
698	overflow_event = dapli_evd_get_event(overflow_evd_ptr);
699	if (!overflow_event) {
700		/* this is not good */
701		overflow_evd_ptr->catastrophic_overflow = DAT_TRUE;
702		overflow_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
703		return;
704	}
705	dapli_evd_format_overflow_event(overflow_evd_ptr, overflow_event);
706	dapli_evd_post_event(overflow_evd_ptr, overflow_event);
707}
708
709static DAT_EVENT *
710dapli_evd_get_and_init_event(
711    IN DAPL_EVD				*evd_ptr,
712    IN DAT_EVENT_NUMBER			event_number)
713{
714	DAT_EVENT 		*event_ptr;
715
716	event_ptr = dapli_evd_get_event(evd_ptr);
717	if (NULL == event_ptr) {
718		dapli_evd_post_overflow_event(
719		    evd_ptr->header.owner_ia->async_error_evd, evd_ptr);
720	} else {
721		event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;
722		event_ptr->event_number = event_number;
723	}
724
725	return (event_ptr);
726}
727
728DAT_RETURN
729dapls_evd_post_cr_arrival_event(
730    IN DAPL_EVD				*evd_ptr,
731    IN DAT_EVENT_NUMBER			event_number,
732    IN DAT_SP_HANDLE			sp_handle,
733    DAT_IA_ADDRESS_PTR			ia_address_ptr,
734    DAT_CONN_QUAL			conn_qual,
735    DAT_CR_HANDLE			cr_handle)
736{
737	DAT_EVENT 		*event_ptr;
738	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
739	/*
740	 * Note event lock may be held on successful return
741	 * to be released by dapli_evd_post_event(), if provider side locking
742	 * is needed.
743	 */
744
745	if (!event_ptr) {
746		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
747	}
748
749	event_ptr->event_data.cr_arrival_event_data.sp_handle = sp_handle;
750	event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr
751	    = ia_address_ptr;
752	event_ptr->event_data.cr_arrival_event_data.conn_qual = conn_qual;
753	event_ptr->event_data.cr_arrival_event_data.cr_handle = cr_handle;
754
755	dapli_evd_post_event(evd_ptr, event_ptr);
756	return (DAT_SUCCESS);
757}
758
759
760DAT_RETURN
761dapls_evd_post_connection_event(
762    IN DAPL_EVD				*evd_ptr,
763    IN DAT_EVENT_NUMBER			event_number,
764    IN DAT_EP_HANDLE			ep_handle,
765    IN DAT_COUNT			private_data_size,
766    IN DAT_PVOID			private_data)
767{
768	DAT_EVENT 		*event_ptr;
769	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
770	/*
771	 * Note event lock may be held on successful return
772	 * to be released by dapli_evd_post_event(), if provider side locking
773	 * is needed.
774	 */
775
776	if (!event_ptr) {
777		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
778	}
779
780	event_ptr->event_data.connect_event_data.ep_handle = ep_handle;
781	event_ptr->event_data.connect_event_data.private_data_size
782	    = private_data_size;
783	event_ptr->event_data.connect_event_data.private_data = private_data;
784
785	dapli_evd_post_event(evd_ptr, event_ptr);
786	return (DAT_SUCCESS);
787}
788
789
790DAT_RETURN
791dapls_evd_post_async_error_event(
792    IN DAPL_EVD				*evd_ptr,
793    IN DAT_EVENT_NUMBER			event_number,
794    IN DAT_IA_HANDLE			ia_handle)
795{
796	DAT_EVENT 		*event_ptr;
797	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
798	/*
799	 * Note event lock may be held on successful return
800	 * to be released by dapli_evd_post_event(), if provider side locking
801	 * is needed.
802	 */
803
804	if (!event_ptr) {
805		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
806	}
807
808	event_ptr->event_data.asynch_error_event_data.dat_handle = ia_handle;
809
810	dapli_evd_post_event(evd_ptr, event_ptr);
811	return (DAT_SUCCESS);
812}
813
814
815DAT_RETURN
816dapls_evd_post_software_event(
817    IN DAPL_EVD				*evd_ptr,
818    IN DAT_EVENT_NUMBER			event_number,
819    IN DAT_PVOID			pointer)
820{
821	DAT_EVENT 		*event_ptr;
822	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
823	/*
824	 * Note event lock may be held on successful return
825	 * to be released by dapli_evd_post_event(), if provider side locking
826	 * is needed.
827	 */
828
829	if (!event_ptr) {
830		return (DAT_QUEUE_FULL);
831	}
832
833	event_ptr->event_data.software_event_data.pointer = pointer;
834
835	dapli_evd_post_event(evd_ptr, event_ptr);
836	return (DAT_SUCCESS);
837}
838
839void
840dapls_evd_post_premature_events(IN DAPL_EP *ep_ptr)
841{
842	DAPL_EVD		*evd_ptr;
843	DAT_EVENT		*event;
844	ib_work_completion_t	*cqe;
845	uint32_t		qpn;
846	int			prm_idx;
847	int			nevents;
848	int			i;
849
850	dapls_ib_poll_premature_events(ep_ptr, &cqe, &nevents);
851	/* premature events are always recv events */
852	evd_ptr = ep_ptr->param.recv_evd_handle;
853	qpn = ep_ptr->qpn;
854
855	i = 0;
856	prm_idx = 0;
857	while (i < nevents) {
858		/*
859		 * If srq_attached, premature events cannot exceed max_recv_dtos
860		 */
861		dapl_os_assert(!ep_ptr->srq_attached ||
862		    (prm_idx <= ((DAPL_SRQ *)ep_ptr->param.srq_handle)->
863		    param.max_recv_dtos));
864
865		/*
866		 * The SRQ premature event list could potentially have
867		 * holes (ie. free entries in the middle) or premature
868		 * events for other QPs. These need to be skipped.
869		 */
870		if (ep_ptr->srq_attached &&
871		    (!DAPL_CQE_IS_VALID(&cqe[prm_idx]) ||
872		    (DAPL_GET_CQE_QPN(&cqe[prm_idx]) != qpn))) {
873			prm_idx++;
874			continue;
875		}
876
877		dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
878		    " Premature DTO processing\n");
879
880#ifdef	DAPL_DBG	/* For debugging.  */
881		dapli_evd_eh_print_cqe(cqe[i]);
882#endif
883		/*
884		 * Can use DAT_DTO_COMPLETION_EVENT because
885		 * dapli_evd_cqe_to_event will overwrite.
886		 */
887		event = dapli_evd_get_and_init_event(evd_ptr,
888		    DAT_DTO_COMPLETION_EVENT);
889		if (event == NULL) {
890			/* We've already attempted the overflow post, return */
891			return;
892		}
893		(void) dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_TRUE,
894		    event);
895		dapli_evd_post_event_nosignal(evd_ptr, event);
896		/*
897		 * For SRQ attached QPs recycle the premature event
898		 */
899		if (ep_ptr->srq_attached) {
900			dapls_ib_free_premature_events(ep_ptr, prm_idx);
901			prm_idx++;
902		}
903		i++;
904	}
905}
906
907/*
908 * dapli_evd_cqe_to_event
909 *
910 * Convert a CQE into an event structure.
911 *
912 * Input:
913 *	evd_ptr
914 * 	cqe_ptr
915 *
916 * Output:
917 * 	event_ptr
918 *
919 * Returns:
920 * 	none
921 *
922 */
923static DAT_BOOLEAN
924dapli_evd_cqe_to_event(
925    IN DAPL_EVD			*evd_ptr,
926    IN ib_work_completion_t	*cqe_ptr,
927    IN DAT_BOOLEAN		process_premature_events,
928    OUT DAT_EVENT		*event_ptr)
929{
930	DAPL_EP			*ep_ptr;
931	DAPL_SRQ		*srq_ptr;
932	DAPL_COOKIE		*cookie;
933	DAT_EP_STATE		ep_state;
934	ib_qp_handle_t		qp;
935	ib_uint32_t		ib_status;
936	ib_uint32_t		ibtype;
937	int			srq_enabled;
938	int			dto_error = 0;
939
940
941	/*
942	 * All that can be relied on if the status is bad is the status
943	 * and WRID.
944	 */
945	ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);
946
947	cookie = (DAPL_COOKIE *)((uintptr_t)DAPL_GET_CQE_WRID(cqe_ptr));
948	dapl_os_assert((NULL != cookie));
949
950	if (cookie->queue_type == DAPL_COOKIE_QUEUE_EP) {
951		srq_enabled = 0;
952		ep_ptr = cookie->queue.ep;
953	} else {
954		srq_enabled = 1;
955		srq_ptr = cookie->queue.srq;
956		dapl_os_assert(NULL != srq_ptr);
957		dapl_os_assert(srq_ptr->header.magic == DAPL_MAGIC_SRQ);
958		ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);
959		ep_ptr = dapls_ib_srq_lookup_ep(srq_ptr, cqe_ptr);
960	}
961
962	dapl_os_assert((NULL != ep_ptr));
963	dapl_os_assert((ep_ptr->header.magic == DAPL_MAGIC_EP) ||
964	    (ep_ptr->header.magic == DAPL_MAGIC_EP_EXIT));
965
966	event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;
967
968	/*
969	 * Check if the DTO completion arrived before CONNECTION_ESTABLISHED
970	 * event -
971	 *
972	 * Send DTOs can occur only if ep state is CONNECTED/DISCONNECTED
973	 * therefore it cannot occur before connection established event.
974	 * Receive DTO can potentially complete before connection established
975	 * event has been delivered to the client. In this case if the
976	 * ep state is ACTIVE_CONNECTION_PENDING (active side) or
977	 * COMPLETION_PENDING (passive side) the event is put in a special
978	 * event queue in the qp_handle.
979	 *
980	 */
981	if (!process_premature_events &&
982	    (cookie->type == DAPL_COOKIE_TYPE_DTO) &&
983	    (ib_status == IB_COMP_ST_SUCCESS)) {
984		ep_state = ep_ptr->param.ep_state;
985		qp = ep_ptr->qp_handle;
986		if ((ep_state == DAT_EP_STATE_ACTIVE_CONNECTION_PENDING) ||
987		    (ep_state == DAT_EP_STATE_COMPLETION_PENDING) ||
988		    (qp->qp_num_premature_events > 0)) {
989			/*
990			 * not yet ready to put the event in the evd ring
991			 * buffer
992			 */
993			dapls_ib_store_premature_events(qp, cqe_ptr);
994			return (DAT_FALSE);
995		}
996	}
997
998	switch (cookie->type) {
999	case DAPL_COOKIE_TYPE_DTO:
1000	{
1001		DAPL_COOKIE_BUFFER	*buffer;
1002
1003		if (DAPL_DTO_TYPE_RECV == cookie->val.dto.type) {
1004			if (srq_enabled) {
1005				dapl_os_atomic_dec(&srq_ptr->recv_count);
1006				buffer = &srq_ptr->recv_buffer;
1007			} else {
1008				dapl_os_atomic_dec(&ep_ptr->recv_count);
1009				buffer = &ep_ptr->recv_buffer;
1010			}
1011		} else {
1012			dapl_os_atomic_dec(&ep_ptr->req_count);
1013			buffer = &ep_ptr->req_buffer;
1014		}
1015
1016		event_ptr->event_number = DAT_DTO_COMPLETION_EVENT;
1017		event_ptr->event_data.dto_completion_event_data.ep_handle =
1018		    ep_ptr;
1019		event_ptr->event_data.dto_completion_event_data.user_cookie =
1020		    cookie->val.dto.cookie;
1021
1022		switch (ib_status) {
1023		case IB_COMP_ST_SUCCESS:
1024		{
1025			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);
1026
1027			event_ptr->event_data.dto_completion_event_data.status =
1028			    DAT_DTO_SUCCESS;
1029			dapl_os_assert((ibtype == OP_SEND &&
1030			    cookie->val.dto.type == DAPL_DTO_TYPE_SEND) ||
1031			    (ibtype == OP_RECEIVE &&
1032			    cookie->val.dto.type == DAPL_DTO_TYPE_RECV) ||
1033			    (ibtype == OP_RDMA_WRITE &&
1034			    cookie->val.dto.type ==
1035			    DAPL_DTO_TYPE_RDMA_WRITE) ||
1036			    (ibtype == OP_RDMA_READ &&
1037			    cookie->val.dto.type ==
1038			    DAPL_DTO_TYPE_RDMA_READ));
1039			break;
1040		}
1041		case IB_COMP_ST_LOCAL_LEN_ERR:
1042		{
1043			event_ptr->event_data.dto_completion_event_data.status =
1044			    DAT_DTO_ERR_LOCAL_LENGTH;
1045			break;
1046		}
1047		case IB_COMP_ST_LOCAL_PROTECT_ERR:
1048		{
1049			event_ptr->event_data.dto_completion_event_data.status =
1050			    DAT_DTO_ERR_LOCAL_PROTECTION;
1051			break;
1052		}
1053		case IB_COMP_ST_WR_FLUSHED_ERR:
1054		{
1055			event_ptr->event_data.dto_completion_event_data.status =
1056			    DAT_DTO_ERR_FLUSHED;
1057			break;
1058		}
1059		case IB_COMP_ST_BAD_RESPONSE_ERR:
1060		{
1061			event_ptr->event_data.dto_completion_event_data.status =
1062			    DAT_DTO_ERR_BAD_RESPONSE;
1063			break;
1064		}
1065		case IB_COMP_ST_REM_REQ_ERR:
1066		case IB_COMP_ST_REM_OP_ERR:
1067		{
1068			event_ptr->event_data.dto_completion_event_data.status =
1069			    DAT_DTO_ERR_REMOTE_RESPONDER;
1070			break;
1071		}
1072		case IB_COMP_ST_REM_ACC_ERR:
1073		{
1074			event_ptr->event_data.dto_completion_event_data.status =
1075			    DAT_DTO_ERR_REMOTE_ACCESS;
1076			break;
1077		}
1078		/*
1079		 * Unsupported RD errors
1080		 * case IB_COMP_ST_EE_STATE_ERR:
1081		 * case IB_COMP_ST_EE_CTX_NO_ERR:
1082		 */
1083		case IB_COMP_ST_TRANSP_COUNTER:
1084		{
1085			event_ptr->event_data.dto_completion_event_data.status =
1086			    DAT_DTO_ERR_TRANSPORT;
1087			break;
1088		}
1089		case IB_COMP_ST_RNR_COUNTER:
1090		{
1091			event_ptr->event_data.dto_completion_event_data.status =
1092			    DAT_DTO_ERR_RECEIVER_NOT_READY;
1093			break;
1094		}
1095		case IB_COMP_ST_MW_BIND_ERR:
1096		{
1097			event_ptr->event_data.dto_completion_event_data.status =
1098			    DAT_RMR_OPERATION_FAILED;
1099			break;
1100		}
1101		case IB_COMP_ST_LOCAL_OP_ERR:
1102		{
1103			event_ptr->event_data.dto_completion_event_data.status =
1104			    DAT_DTO_ERR_LOCAL_EP;
1105			break;
1106		}
1107		default:
1108		{
1109			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
1110			    " DTO completion ERROR: %d: op %#x\n",
1111			    DAPL_GET_CQE_STATUS(cqe_ptr),
1112			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
1113			event_ptr->event_data.dto_completion_event_data.status =
1114			    DAT_DTO_FAILURE;
1115			break;
1116		}
1117		}
1118
1119		/* Most error DTO ops result in disconnecting the EP */
1120		if ((event_ptr->event_data.dto_completion_event_data.status !=
1121		    DAT_DTO_SUCCESS) &&
1122		    (event_ptr->event_data.dto_completion_event_data.status !=
1123		    DAT_RMR_OPERATION_FAILED)) {
1124			dto_error = 1;
1125			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
1126			    " DTO completion ERROR: %d: op %#x\n",
1127			    DAPL_GET_CQE_STATUS(cqe_ptr),
1128			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
1129		}
1130
1131		if (cookie->val.dto.type == DAPL_DTO_TYPE_SEND ||
1132		    cookie->val.dto.type == DAPL_DTO_TYPE_RDMA_WRITE) {
1133			/* Get size from DTO; CQE value may be off.  */
1134			event_ptr->event_data.dto_completion_event_data.
1135			    transfered_length = cookie->val.dto.size;
1136		} else {
1137			event_ptr->event_data.dto_completion_event_data.
1138			    transfered_length = DAPL_GET_CQE_BYTESNUM(cqe_ptr);
1139		}
1140
1141		dapls_cookie_dealloc(buffer, cookie);
1142		break;
1143	}
1144
1145	case DAPL_COOKIE_TYPE_RMR:
1146	{
1147		dapl_os_atomic_dec(&ep_ptr->req_count);
1148
1149		event_ptr->event_number = DAT_RMR_BIND_COMPLETION_EVENT;
1150
1151		event_ptr->event_data.rmr_completion_event_data.rmr_handle =
1152		    cookie->val.rmr.rmr;
1153		event_ptr->event_data.rmr_completion_event_data.user_cookie =
1154		    cookie->val.rmr.cookie;
1155		if (ib_status == IB_COMP_ST_SUCCESS) {
1156			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);
1157
1158			event_ptr->event_data.rmr_completion_event_data.status =
1159			    DAT_RMR_BIND_SUCCESS;
1160			dapl_os_assert(ibtype == OP_BIND_MW);
1161		} else {
1162			event_ptr->event_data.rmr_completion_event_data.status =
1163			    DAT_RMR_BIND_FAILURE;
1164			dto_error = 1;
1165		}
1166
1167		dapls_cookie_dealloc(&ep_ptr->req_buffer, cookie);
1168		break;
1169	}
1170	default:
1171	{
1172		dapl_os_assert(!"Invalid Operation type");
1173		break;
1174	}
1175	}
1176
1177	/*
1178	 * A DTO failed this will cause the connection to be broken
1179	 */
1180	if ((dto_error) && (ep_ptr->param.ep_state == DAT_EP_STATE_CONNECTED)) {
1181		ep_ptr->param.ep_state = DAT_EP_STATE_DISCONNECTED;
1182		/*
1183		 * Disconnect at the IB level.
1184		 */
1185		dapls_ib_disconnect_clean(ep_ptr, DAT_TRUE, IB_CME_CONNECTED);
1186	}
1187	/* convert premature rec to error flush on disconnect */
1188	if (process_premature_events && (ep_ptr->param.ep_state ==
1189	    DAT_EP_STATE_DISCONNECTED) && (ib_status == IB_COMP_ST_SUCCESS)) {
1190		dapl_os_assert(ibtype == OP_RECEIVE &&
1191		    cookie->val.dto.type == DAPL_DTO_TYPE_RECV);
1192		event_ptr->event_data.dto_completion_event_data.status =
1193		    DAT_DTO_ERR_FLUSHED;
1194	}
1195	return (DAT_TRUE);
1196}
1197
1198/*
1199 * dapls_evd_copy_cq
1200 *
1201 * Copy all entries on a CQ associated with the EVD onto that EVD
1202 * Up to caller to handle races, if any.  Note that no EVD waiters will
1203 * be awoken by this copy.
1204 *
1205 * Input:
1206 *	evd_ptr
1207 *
1208 * Output:
1209 * 	nevents
1210 *
1211 * Returns:
1212 * 	none
1213 *
1214 */
1215void
1216dapls_evd_copy_cq(
1217	DAPL_EVD	*evd_ptr,
1218	int		*nevents)
1219{
1220	ib_work_completion_t	cqe[MAX_CQES_PER_POLL];
1221	DAT_RETURN		dat_status;
1222	ib_cq_handle_t		cq_handle;
1223	DAT_EVENT		*event;
1224	uint_t			num_cqes_polled = 0;
1225	int			cqe_events;
1226	int			i;
1227
1228	cq_handle = evd_ptr->ib_cq_handle;
1229
1230	*nevents = 0;
1231
1232	if (cq_handle == IB_INVALID_HANDLE) {
1233		/* Nothing to do if no CQ.  */
1234		return;
1235	}
1236	dat_status = DAPL_POLL(evd_ptr)(cq_handle,
1237	    cqe, MAX_CQES_PER_POLL, &num_cqes_polled);
1238
1239	if (dat_status == DAT_SUCCESS) {
1240		dapl_dbg_log(DAPL_DBG_TYPE_EVD, "dapls_evd_copy_cq: %u\n",
1241		    num_cqes_polled);
1242		cqe_events = 0;
1243		for (i = 0; i < num_cqes_polled; i++) {
1244#ifdef	DAPL_DBG	/* For debugging.  */
1245			dapli_evd_eh_print_cqe(cqe[i]);
1246#endif
1247
1248			/*
1249			 * Can use DAT_DTO_COMPLETION_EVENT because
1250			 * dapli_evd_cqe_to_event will overwrite.
1251			 */
1252
1253			event = dapli_evd_get_and_init_event(
1254			    evd_ptr, DAT_DTO_COMPLETION_EVENT);
1255			if (event == NULL) {
1256			/*
1257			 * We've already attempted the overflow post; return.
1258			 */
1259				return;
1260			}
1261			if (dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_FALSE,
1262			    event)) {
1263				dapli_evd_post_event_nosignal(evd_ptr, event);
1264				cqe_events++;
1265			} else {
1266				dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1267				    "dapls_evd_copy_cq: premature event\n");
1268				/*
1269				 * We've deferred processing the CQE, so add
1270				 * the event_ptr back to free queue
1271				 */
1272				dat_status = dapls_rbuf_add(&evd_ptr->
1273				    free_event_queue, (void *)event);
1274				dapl_os_assert(dat_status == DAT_SUCCESS);
1275				if (evd_ptr->evd_producer_locking_needed) {
1276					dapl_os_unlock(&evd_ptr->header.lock);
1277				}
1278			}
1279		}
1280		*nevents = cqe_events;
1281	} else if (DAT_GET_TYPE(dat_status) != DAT_QUEUE_EMPTY) {
1282		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
1283		    "dapls_evd_copy_cq: dapls_ib_completion_poll "
1284		    "returned 0x%x\n", dat_status);
1285		dapl_os_assert(!"Bad return from dapls_ib_completion_poll");
1286	}
1287}
1288
1289/*
1290 * dapls_evd_copy_events
1291 *
1292 * Copy all events associated with the EVD onto that EVD
1293 *
1294 * Input:
1295 *	evd_ptr
1296 *	timeout
1297 *
1298 * Output:
1299 * 	return status
1300 *
1301 * Returns:
1302 * 	none
1303 *
1304 */
1305DAT_RETURN
1306dapls_evd_copy_events(
1307    DAPL_EVD 	*evd_ptr,
1308    DAT_TIMEOUT timeout)
1309{
1310	dapl_ib_event_t	evp_arr[NUM_EVENTS_PER_POLL];
1311	dapl_ib_event_t	*evpp_start;
1312	dapl_ib_event_t	*evpp;
1313	DAPL_IA		*ia_ptr;
1314	DAT_RETURN	dat_status;
1315	int		waited;
1316	uint64_t	curr_time;
1317	uint64_t	final_time;
1318	uint64_t	time_left;
1319	int		events_needed = 0;
1320	int		nevents = 0;
1321	int		num_cqe = 0;
1322	int		num_ke = 0; /* kernel events - CM or ASYNC events */
1323	int		i;
1324
1325	/* rbuf count is zero on entry */
1326
1327	if (evd_ptr->evd_flags & (DAT_EVD_CONNECTION_FLAG |
1328	    DAT_EVD_CR_FLAG | DAT_EVD_ASYNC_FLAG)) {
1329		if (evd_ptr->threshold <= NUM_EVENTS_PER_POLL) {
1330			evpp = evp_arr;
1331		} else {
1332			/* need to allocate on the heap */
1333			evpp = (dapl_ib_event_t *)dapl_os_alloc(
1334			    evd_ptr->threshold * sizeof (dapl_ib_event_t));
1335			if (evpp == NULL) {
1336				return (DAT_INSUFFICIENT_RESOURCES);
1337			}
1338		}
1339		evpp_start = evpp;
1340		/* for evd_dequeue, check for ke before returning Q_EMPTY */
1341		if (evd_ptr->threshold == 0 && timeout == 0)
1342			evd_ptr->threshold = 1;
1343	} else {
1344		evpp = NULL;
1345		evpp_start = NULL;
1346	}
1347	ia_ptr = evd_ptr->header.owner_ia;
1348	waited = 0;
1349	dat_status = DAT_SUCCESS;
1350
1351	/* calculate various time wait elements */
1352	if (timeout == 0) {
1353		final_time = 0;
1354		time_left = 0;
1355	} else if (timeout == DAT_TIMEOUT_INFINITE) {
1356		/*
1357		 * The real value of DAT_TIMEOUT_INFINITE is fairly small
1358		 * ~71 mins, to prevent premature timeouts map it to
1359		 * 1 year.  NOTE: 64-bit integers are needed here
1360		 * because 32 bits is not enough.  Other types,
1361		 * such as clock_t are not 64-bit, so are not
1362		 * sufficient for this.  Similarly, hrtime_t is
1363		 * defined as a "nanosecond counter", which does not
1364		 * match our need for time in microseconds, so we
1365		 * just use the more general uint64_t here.
1366		 */
1367#define	DAPL_ONE_YEAR_IN_USEC	((365 * 24 * 3600) * 1000000LL)
1368		curr_time = gethrtime();
1369		time_left = DAPL_ONE_YEAR_IN_USEC;
1370		final_time = curr_time + DAPL_ONE_YEAR_IN_USEC * 1000;
1371	} else {
1372		/*
1373		 * maximum time by which the routine needs to return
1374		 * DAT_TIMEOUT_INFINITE is defined as ~0 but its of type int
1375		 * so mask the MSB to avoid overflow
1376		 */
1377		curr_time = gethrtime();
1378		final_time = curr_time + (uint64_t)(timeout&0x7fffffff)*1000;
1379		time_left = (final_time - curr_time)/1000;
1380	}
1381
1382	do {
1383		/*
1384		 * If this evd has a CQ event stream check the CQs first
1385		 */
1386		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
1387		    DAT_EVD_RMR_BIND_FLAG)) {
1388			/*
1389			 * Poll CQ for events, update the total number of CQEs
1390			 * so far
1391			 */
1392			nevents = 0;
1393			dapls_evd_copy_cq(evd_ptr, &nevents);
1394			num_cqe += nevents;
1395			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1396			    "dapls_evd_copy_event: copy_cq num_cqe(%d)\n",
1397			    num_cqe);
1398		}
1399
1400		/*
1401		 * We use the dapls_rbuf_count since it includes
1402		 *  - CQ events pulled by dapls_evd_copy_cq
1403		 *  - events added by dat_evd_post_se()
1404		 */
1405		events_needed = evd_ptr->threshold - num_ke -
1406		    dapls_rbuf_count(&evd_ptr->pending_event_queue);
1407
1408		/*
1409		 * check for pending events
1410		 * note: threshold=0 implies dapl_evd_dequeue
1411		 */
1412		if (events_needed < 0) {
1413			/* There are more than sufficient events */
1414			break;
1415		} else if (events_needed == 0) {
1416			/* report queue empty on dat_evd_dequeue */
1417			/* non CQ events are expected to be polled */
1418			/* by dat_evd_wait */
1419			if (evd_ptr->threshold == 0)
1420				dat_status =  DAT_ERROR(DAT_QUEUE_EMPTY, 0);
1421			/*
1422			 * when threshold > 0, we have sufficient events
1423			 */
1424			break;
1425		} else {
1426			/*
1427			 * when we reach here, this implies dat_evd_wait
1428			 * return on any dto completion as
1429			 * threshold > 1 will be taken as hint only
1430			 */
1431			if (num_cqe)
1432				break;
1433		}
1434
1435		/* check we've already waited */
1436		if (waited > 0) {
1437			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1438			    "dapls_evd_copy_event: waited[%d]\n", waited);
1439			if (dat_status != DAT_SUCCESS)
1440				break;
1441			curr_time = gethrtime();
1442			/* exit on time expired */
1443			if (curr_time >= final_time)
1444				break;
1445			time_left = (final_time - curr_time)/1000;
1446		}
1447
1448		/* check for DTO type evd's */
1449		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
1450		    DAT_EVD_RMR_BIND_FLAG)) {
1451			if (events_needed == 1) {
1452				/*
1453				 * Need only one event so enable cq
1454				 * notification
1455				 */
1456				/*
1457				 * XXX: Things need to be modified here to
1458				 * implement the NOTIFICATION suppression
1459				 * correctly - relies on THRESHOLD flag
1460				 * and UNSIGNALLED flag to be stored
1461				 * in the evd.
1462				 */
1463				dat_status = dapls_set_cq_notify(ia_ptr,
1464				    evd_ptr);
1465				if (dat_status != DAT_SUCCESS) {
1466					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1467					    "dapls_evd_copy_event:"
1468					    " set_cq_notify(%d)\n", dat_status);
1469					return (dat_status);
1470				}
1471			} else if (events_needed > 1) {
1472				/*
1473				 * We need multiple events so lets enable CQ for
1474				 * notification on N events.
1475				 * dat_status = dapls_set_cqN_notify(ia_ptr,
1476				 * evd_ptr, (uint32_t)events_needed);
1477				 */
1478				dat_status = dapls_set_cq_notify(ia_ptr,
1479				    evd_ptr);
1480				if (dat_status != DAT_SUCCESS) {
1481					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1482					    "dapls_evd_copy_event:"
1483					    " set_cqN_notify:%d\n", dat_status);
1484					return (dat_status);
1485				}
1486			}
1487
1488			/*
1489			 * Per Tavor PRM if completions occur after polling
1490			 * the CQ and before arming it, upon arming the CQ
1491			 * handler will be immediately fired. Hence it
1492			 * recommends that a re-poll of the CQ can be skipped
1493			 * as an optimization.
1494			 */
1495		}
1496
1497		nevents = 0;
1498
1499		/*
1500		 * non-NULL evpp_start denotes either
1501		 * DAT_EVD_CONNECTION_FLAG, DAT_EVD_CR_FLAG, DAT_EVD_ASYNC_FLAG
1502		 * is set and thus needs to check events from kernel
1503		 */
1504		if (evpp_start) {
1505			/*
1506			 * Even if dat_status is not DAT_SUCCESS, num_events
1507			 * could be non-zero.
1508			 */
1509			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
1510			    (evd_ptr->threshold - (num_cqe + num_ke)), evpp,
1511			    &nevents);
1512			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1513			    "dapls_evd_copy_event: poll returned 0x%x(%d)\n",
1514			    dat_status, nevents);
1515
1516			num_ke += nevents;
1517			evpp += nevents;
1518		} else {
1519			/* perform a timewait */
1520			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
1521			    0, NULL, &nevents);
1522			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1523			    "dapls_evd_copy_event: poll(cq_notification) "
1524			    "returned 0x%x\n", dat_status);
1525			if (DAT_GET_TYPE(dat_status) == DAT_INTERRUPTED_CALL)
1526				return (dat_status);
1527		}
1528
1529		waited++;
1530	} while (dapls_rbuf_count(&evd_ptr->pending_event_queue) + num_ke <
1531	    evd_ptr->threshold);
1532
1533	/* process the cm events now */
1534	for (i = 0; i < num_ke; i++) {
1535		switch (evpp_start[i].ibe_ev_family) {
1536		case DAPL_CR_EVENTS: /* PASSIVE side events */
1537		case DAPL_PASSIVE_CONNECTION_EVENTS:
1538			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1539			    "dapls_evd_copy_event: Passive side Event %d\n",
1540			    evpp_start[i].ibe_ce.ibce_event);
1541			dapls_cr_callback((ib_cm_handle_t)
1542			    evpp_start[i].ibe_ce.ibce_psep_cookie,
1543			    evpp_start[i].ibe_ce.ibce_event,
1544			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
1545			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
1546			break;
1547		case DAPL_ACTIVE_CONNECTION_EVENTS: /* ACTIVE side events */
1548			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1549			    "dapls_evd_copy_event: Active Conn Event %d\n",
1550			    evpp_start[i].ibe_ce.ibce_event);
1551			dapl_evd_connection_callback((ib_cm_handle_t)
1552			    IB_INVALID_HANDLE,
1553			    evpp_start[i].ibe_ce.ibce_event,
1554			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
1555			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
1556			break;
1557		case DAPL_ASYNC_EVENTS:
1558			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1559			    "dapls_evd_copy_event: Async Event %d\n",
1560			    evpp_start[i].ibe_async.ibae_type);
1561			dapls_ib_async_callback(evd_ptr,
1562			    ia_ptr->hca_ptr->ib_hca_handle,
1563			    &(evpp_start[i].ibe_async), ia_ptr);
1564			break;
1565		default:
1566			dapl_dbg_log(DAPL_DBG_TYPE_ERR,
1567			    "dapls_evd_copy_event: dapls_ib_event_poll %d "
1568			    "returned 0x%x\n", i, evpp_start[i].ibe_ev_family);
1569			dapl_os_assert(!"Bad return from dapls_ib_event_poll");
1570			break;
1571		}
1572	}
1573
1574	return (dat_status);
1575}
1576
1577/*
1578 * dapls_evd_cq_poll_to_event
1579 *
1580 * Attempt to dequeue a single CQE from a CQ and turn it into
1581 * an event.
1582 *
1583 * Input:
1584 *	evd_ptr
1585 *
1586 * Output:
1587 * 	event
1588 *
1589 * Returns:
1590 * 	Status of operation
1591 *
1592 */
1593DAT_RETURN
1594dapls_evd_cq_poll_to_event(
1595    IN DAPL_EVD 	*evd_ptr,
1596    OUT DAT_EVENT	*event)
1597{
1598	DAT_RETURN		dat_status;
1599	ib_work_completion_t	cur_cqe;
1600
1601	/* skip one layer of do-nothing function */
1602	dat_status = DAPL_POLL1(evd_ptr)(evd_ptr->ib_cq_handle, &cur_cqe);
1603
1604	if (dat_status == DAT_SUCCESS) {
1605#ifdef	DAPL_DBG	/* For debugging.  */
1606		dapli_evd_eh_print_cqe(cur_cqe);
1607#endif
1608		(void) dapli_evd_cqe_to_event(evd_ptr, &cur_cqe, DAT_FALSE,
1609		    event);
1610	}
1611
1612	return (dat_status);
1613}
1614
1615/*
1616 * Local variables:
1617 *  c-indent-level: 4
1618 *  c-basic-offset: 4
1619 *  tab-width: 8
1620 * End:
1621 */
1622