1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
24 */
25
26#include <sys/types.h>
27#include <sys/errno.h>
28#include <sys/sysmacros.h>
29#include <sys/param.h>
30#include <sys/machsystm.h>
31#include <sys/stream.h>
32#include <sys/strsubr.h>
33#include <sys/kmem.h>
34#include <sys/strsun.h>
35#include <sys/callb.h>
36#include <sys/sdt.h>
37#include <sys/mach_descrip.h>
38#include <sys/mdeg.h>
39#include <net/if.h>
40#include <sys/vsw.h>
41#include <sys/vio_mailbox.h>
42#include <sys/vio_common.h>
43#include <sys/vnet_common.h>
44#include <sys/vnet_mailbox.h>
45#include <sys/vio_util.h>
46
47/*
48 * This file contains the implementation of RxDringData transfer mode of VIO
49 * Protocol in vsw. The functions in this file are invoked from vsw_ldc.c
50 * after RxDringData mode is negotiated with the peer during attribute phase of
51 * handshake. This file contains functions that setup the transmit and receive
52 * descriptor rings, and associated resources in RxDringData mode. It also
53 * contains the transmit and receive data processing functions that are invoked
54 * in RxDringData mode. The data processing routines in this file have the
55 * suffix '_shm' to indicate the shared memory mechanism used in RxDringData
56 * mode.
57 */
58
59/* Functions exported to vsw_ldc.c */
60vio_dring_reg_msg_t *vsw_create_rx_dring_info(vsw_ldc_t *);
61void vsw_destroy_rx_dring(vsw_ldc_t *ldcp);
62dring_info_t *vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt);
63void vsw_unmap_tx_dring(vsw_ldc_t *ldcp);
64int vsw_dringsend_shm(vsw_ldc_t *, mblk_t *);
65void vsw_ldc_rcv_worker(void *arg);
66void vsw_stop_rcv_thread(vsw_ldc_t *ldcp);
67void vsw_process_dringdata_shm(void *, void *);
68
69/* Internal functions */
70static dring_info_t *vsw_create_rx_dring(vsw_ldc_t *);
71static int vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp);
72static void vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp,
73	vio_dring_msg_t *msg);
74static void vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp,
75	vio_dring_msg_t *msg);
76static void vsw_ldc_rcv_shm(vsw_ldc_t *ldcp);
77static int vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp);
78static int vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size,
79    boolean_t handle_reset);
80
81/* Functions imported from vsw_ldc.c */
82extern void vsw_process_pkt(void *);
83extern void vsw_destroy_rxpools(void *);
84extern dring_info_t *vsw_map_dring_cmn(vsw_ldc_t *ldcp,
85    vio_dring_reg_msg_t *dring_pkt);
86extern void vsw_process_conn_evt(vsw_ldc_t *, uint16_t);
87extern mblk_t *vsw_vlan_frame_pretag(void *arg, int type, mblk_t *mp);
88
89/* Tunables */
90extern int vsw_wretries;
91extern int vsw_recv_delay;
92extern int vsw_recv_retries;
93extern uint32_t vsw_chain_len;
94extern uint32_t vsw_num_descriptors;
95extern uint32_t vsw_nrbufs_factor;
96
97#define	VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count, total_count)	\
98{									\
99	DTRACE_PROBE2(vsw_rx_pkts, vsw_ldc_t *, (ldcp), int, (count));	\
100	(vswp)->vsw_switch_frame((vswp), (bp), VSW_VNETPORT,		\
101	    (ldcp)->ldc_port, NULL);					\
102	(bp) = (bpt) = NULL;						\
103	(count) = 0;							\
104}
105
106vio_dring_reg_msg_t *
107vsw_create_rx_dring_info(vsw_ldc_t *ldcp)
108{
109	vio_dring_reg_msg_t	*mp;
110	vio_dring_reg_ext_msg_t	*emsg;
111	dring_info_t		*dp;
112	uint8_t			*buf;
113	vsw_t			*vswp = ldcp->ldc_vswp;
114
115	D1(vswp, "%s enter\n", __func__);
116
117	/*
118	 * If we can't create a dring, obviously no point sending
119	 * a message.
120	 */
121	if ((dp = vsw_create_rx_dring(ldcp)) == NULL)
122		return (NULL);
123
124	mp = kmem_zalloc(VNET_DRING_REG_EXT_MSG_SIZE(dp->data_ncookies),
125	    KM_SLEEP);
126
127	mp->tag.vio_msgtype = VIO_TYPE_CTRL;
128	mp->tag.vio_subtype = VIO_SUBTYPE_INFO;
129	mp->tag.vio_subtype_env = VIO_DRING_REG;
130	mp->tag.vio_sid = ldcp->local_session;
131
132	/* payload */
133	mp->num_descriptors = dp->num_descriptors;
134	mp->descriptor_size = dp->descriptor_size;
135	mp->options = dp->options;
136	mp->ncookies = dp->dring_ncookies;
137	bcopy(&dp->dring_cookie[0], &mp->cookie[0],
138	    sizeof (ldc_mem_cookie_t));
139
140	mp->dring_ident = 0;
141
142	buf = (uint8_t *)mp->cookie;
143
144	/* skip over dring cookies */
145	ASSERT(mp->ncookies == 1);
146	buf += (mp->ncookies * sizeof (ldc_mem_cookie_t));
147
148	emsg = (vio_dring_reg_ext_msg_t *)buf;
149
150	/* copy data_ncookies in the msg */
151	emsg->data_ncookies = dp->data_ncookies;
152
153	/* copy data area size in the msg */
154	emsg->data_area_size = dp->data_sz;
155
156	/* copy data area cookies in the msg */
157	bcopy(dp->data_cookie, (ldc_mem_cookie_t *)emsg->data_cookie,
158	    sizeof (ldc_mem_cookie_t) * dp->data_ncookies);
159
160	D1(vswp, "%s exit\n", __func__);
161
162	return (mp);
163}
164
165/*
166 * Allocate receive resources for the channel. The resources consist of a
167 * receive descriptor ring and an associated receive buffer area.
168 */
169static dring_info_t *
170vsw_create_rx_dring(vsw_ldc_t *ldcp)
171{
172	vsw_t			*vswp = ldcp->ldc_vswp;
173	ldc_mem_info_t		minfo;
174	dring_info_t		*dp;
175
176	dp = (dring_info_t *)kmem_zalloc(sizeof (dring_info_t), KM_SLEEP);
177	mutex_init(&dp->dlock, NULL, MUTEX_DRIVER, NULL);
178	ldcp->lane_out.dringp = dp;
179
180	/* Create the receive descriptor ring */
181	if ((ldc_mem_dring_create(vsw_num_descriptors,
182	    sizeof (vnet_rx_dringdata_desc_t), &dp->dring_handle)) != 0) {
183		DERR(vswp, "vsw_create_rx_dring(%lld): ldc dring create "
184		    "failed", ldcp->ldc_id);
185		goto fail;
186	}
187
188	ASSERT(dp->dring_handle != NULL);
189
190	/* Get the addr of descriptor ring */
191	if ((ldc_mem_dring_info(dp->dring_handle, &minfo)) != 0) {
192		DERR(vswp, "vsw_create_rx_dring(%lld): dring info failed\n",
193		    ldcp->ldc_id);
194		goto fail;
195	} else {
196		ASSERT(minfo.vaddr != 0);
197		dp->pub_addr = minfo.vaddr;
198	}
199
200	dp->num_descriptors = vsw_num_descriptors;
201	dp->descriptor_size = sizeof (vnet_rx_dringdata_desc_t);
202	dp->options = VIO_RX_DRING_DATA;
203	dp->dring_ncookies = 1;	/* guaranteed by ldc */
204	dp->num_bufs = VSW_RXDRING_NRBUFS;
205
206	/*
207	 * Allocate a table that maps descriptor to its associated buffer;
208	 * used while receiving to validate that the peer has not changed the
209	 * buffer offset provided in the descriptor.
210	 */
211	dp->rxdp_to_vmp = kmem_zalloc(dp->num_descriptors * sizeof (uintptr_t),
212	    KM_SLEEP);
213
214	/* Setup the descriptor ring */
215	if (vsw_setup_rx_dring(ldcp, dp)) {
216		DERR(vswp, "%s: unable to setup ring", __func__);
217		goto fail;
218	}
219
220	/*
221	 * The descriptors and the associated buffers are all ready;
222	 * now bind descriptor ring to the channel.
223	 */
224	if ((ldc_mem_dring_bind(ldcp->ldc_handle, dp->dring_handle,
225	    LDC_DIRECT_MAP | LDC_SHADOW_MAP, LDC_MEM_RW,
226	    &dp->dring_cookie[0], &dp->dring_ncookies)) != 0) {
227		DERR(vswp, "vsw_create_rx_dring: unable to bind to channel "
228		    "%lld", ldcp->ldc_id);
229		goto fail;
230	}
231
232	/* haven't used any descriptors yet */
233	dp->end_idx = 0;
234	dp->last_ack_recv = -1;
235	dp->next_rxi = 0;
236	return (dp);
237
238fail:
239	vsw_destroy_rx_dring(ldcp);
240	return (NULL);
241}
242
243/*
244 * Setup the descriptors in the rx dring.
245 * Returns 0 on success, 1 on failure.
246 */
247static int
248vsw_setup_rx_dring(vsw_ldc_t *ldcp, dring_info_t *dp)
249{
250	int				i, j;
251	int				rv;
252	size_t				data_sz;
253	vio_mblk_t			*vmp;
254	vio_mblk_t			**rxdp_to_vmp;
255	vnet_rx_dringdata_desc_t	*rxdp;
256	vnet_rx_dringdata_desc_t	*pub_addr;
257	vsw_t				*vswp = ldcp->ldc_vswp;
258	uint32_t			ncookies = 0;
259	static char			*name = "vsw_setup_rx_dring";
260	void				*data_addr = NULL;
261
262	/*
263	 * Allocate a single large buffer that serves as the rx buffer area.
264	 * We allocate a ldc memory handle and export the buffer area as shared
265	 * memory. We send the ldc memcookie for this buffer space to the peer,
266	 * as part of dring registration phase during handshake. We manage this
267	 * buffer area as individual buffers of max_frame_size and provide
268	 * specific buffer offsets in each descriptor to the peer. Note that
269	 * the factor used to compute the # of buffers (above) must be > 1 to
270	 * ensure that there are more buffers than the # of descriptors. This
271	 * is needed because, while the shared memory buffers are sent up our
272	 * stack during receive, the sender needs additional buffers that can
273	 * be used for further transmits. This also means there is no one to
274	 * one correspondence between the descriptor index and buffer offset.
275	 * The sender has to read the buffer offset in the descriptor and use
276	 * the specified offset to copy the tx data into the shared buffer. We
277	 * (receiver) manage the individual buffers and their state (see
278	 * VIO_MBLK_STATEs in vio_util.h).
279	 */
280	data_sz = RXDRING_DBLK_SZ(vswp->max_frame_size);
281
282	dp->desc_data_sz = data_sz;
283	dp->data_sz = (dp->num_bufs * data_sz);
284	data_addr = kmem_zalloc(dp->data_sz, KM_SLEEP);
285	dp->data_addr = data_addr;
286
287	D2(vswp, "%s: allocated %lld bytes at 0x%llx\n", name,
288	    dp->data_sz, dp->data_addr);
289
290	/* Allocate a ldc memhandle for the entire rx data area */
291	rv = ldc_mem_alloc_handle(ldcp->ldc_handle, &dp->data_handle);
292	if (rv != 0) {
293		DERR(vswp, "%s: alloc mem handle failed", name);
294		goto fail;
295	}
296
297	/* Allocate memory for the data cookies */
298	dp->data_cookie = kmem_zalloc(VNET_DATA_AREA_COOKIES *
299	    sizeof (ldc_mem_cookie_t), KM_SLEEP);
300
301	/*
302	 * Bind ldc memhandle to the corresponding rx data area.
303	 */
304	rv = ldc_mem_bind_handle(dp->data_handle, (caddr_t)data_addr,
305	    dp->data_sz, LDC_DIRECT_MAP, LDC_MEM_W,
306	    dp->data_cookie, &ncookies);
307	if (rv != 0) {
308		DERR(vswp, "%s(%lld): ldc_mem_bind_handle failed "
309		    "(rv %d)", name, ldcp->ldc_id, rv);
310		goto fail;
311	}
312	if ((ncookies == 0) || (ncookies > VNET_DATA_AREA_COOKIES)) {
313		goto fail;
314	}
315	dp->data_ncookies = ncookies;
316
317	for (j = 1; j < ncookies; j++) {
318		rv = ldc_mem_nextcookie(dp->data_handle,
319		    &(dp->data_cookie[j]));
320		if (rv != 0) {
321			DERR(vswp, "%s: ldc_mem_nextcookie "
322			    "failed rv (%d)", name, rv);
323			goto fail;
324		}
325	}
326
327	/*
328	 * Successful in binding the handle to rx data area. Now setup mblks
329	 * around each data buffer and setup the descriptors to point to these
330	 * rx data buffers. We associate each descriptor with a buffer
331	 * by specifying the buffer offset in the descriptor. When the peer
332	 * needs to transmit data, this offset is read by the peer to determine
333	 * the buffer in the mapped buffer area where the data to be
334	 * transmitted should be copied, for a specific descriptor.
335	 */
336	rv = vio_create_mblks(dp->num_bufs, data_sz, (uint8_t *)data_addr,
337	    &dp->rx_vmp);
338	if (rv != 0) {
339		goto fail;
340	}
341
342	pub_addr = dp->pub_addr;
343	rxdp_to_vmp = dp->rxdp_to_vmp;
344	for (i = 0; i < dp->num_descriptors; i++) {
345		rxdp = &pub_addr[i];
346		/* allocate an mblk around this data buffer */
347		vmp = vio_allocb(dp->rx_vmp);
348		ASSERT(vmp != NULL);
349		rxdp->data_buf_offset = VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN;
350		rxdp->dstate = VIO_DESC_FREE;
351		rxdp_to_vmp[i] = vmp;
352	}
353
354	return (0);
355
356fail:
357	/* return failure; caller will cleanup */
358	return (1);
359}
360
361/*
362 * Free receive resources for the channel.
363 */
364void
365vsw_destroy_rx_dring(vsw_ldc_t *ldcp)
366{
367	vsw_t		*vswp = ldcp->ldc_vswp;
368	lane_t		*lp = &ldcp->lane_out;
369	dring_info_t	*dp;
370
371	dp = lp->dringp;
372	if (dp == NULL) {
373		return;
374	}
375
376	mutex_enter(&dp->dlock);
377
378	if (dp->rx_vmp != NULL) {
379		vio_clobber_pool(dp->rx_vmp);
380		/*
381		 * If we can't destroy the rx pool for this channel, dispatch a
382		 * task to retry and clean up those rx pools. Note that we
383		 * don't need to wait for the task to complete. If the vsw
384		 * device itself gets detached (vsw_detach()), it will wait for
385		 * the task to complete implicitly in ddi_taskq_destroy().
386		 */
387		if (vio_destroy_mblks(dp->rx_vmp) != 0)  {
388			(void) ddi_taskq_dispatch(vswp->rxp_taskq,
389			    vsw_destroy_rxpools, dp->rx_vmp, DDI_SLEEP);
390		}
391	}
392
393	/* Free rx data area cookies */
394	if (dp->data_cookie != NULL) {
395		kmem_free(dp->data_cookie, VNET_DATA_AREA_COOKIES *
396		    sizeof (ldc_mem_cookie_t));
397		dp->data_cookie = NULL;
398	}
399
400	/* Unbind rx data area memhandle */
401	if (dp->data_ncookies != 0) {
402		(void) ldc_mem_unbind_handle(dp->data_handle);
403		dp->data_ncookies = 0;
404	}
405
406	/* Free rx data area memhandle */
407	if (dp->data_handle) {
408		(void) ldc_mem_free_handle(dp->data_handle);
409		dp->data_handle = 0;
410	}
411
412	/* Now free the rx data area itself */
413	if (dp->data_addr != NULL) {
414		kmem_free(dp->data_addr, dp->data_sz);
415	}
416
417	/* Finally, free the receive descriptor ring */
418	if (dp->dring_handle != NULL) {
419		(void) ldc_mem_dring_unbind(dp->dring_handle);
420		(void) ldc_mem_dring_destroy(dp->dring_handle);
421	}
422
423	if (dp->rxdp_to_vmp != NULL) {
424		kmem_free(dp->rxdp_to_vmp,
425		    dp->num_descriptors * sizeof (uintptr_t));
426		dp->rxdp_to_vmp = NULL;
427	}
428
429	mutex_exit(&dp->dlock);
430	mutex_destroy(&dp->dlock);
431	mutex_destroy(&dp->restart_lock);
432	kmem_free(dp, sizeof (dring_info_t));
433	lp->dringp = NULL;
434}
435
436/*
437 * Map the receive descriptor ring exported by the peer, as our transmit
438 * descriptor ring.
439 */
440dring_info_t *
441vsw_map_tx_dring(vsw_ldc_t *ldcp, void *pkt)
442{
443	int				i;
444	int				rv;
445	dring_info_t			*dp;
446	vnet_rx_dringdata_desc_t	*txdp;
447	on_trap_data_t			otd;
448	vio_dring_reg_msg_t		*dring_pkt = pkt;
449
450	dp = vsw_map_dring_cmn(ldcp, dring_pkt);
451	if (dp == NULL) {
452		return (NULL);
453	}
454
455	/* RxDringData mode specific initializations */
456	mutex_init(&dp->txlock, NULL, MUTEX_DRIVER, NULL);
457	mutex_init(&dp->restart_lock, NULL, MUTEX_DRIVER, NULL);
458	dp->next_txi = dp->restart_peer_txi = 0;
459	dp->restart_reqd = B_TRUE;
460	ldcp->dringdata_msgid = 0;
461	ldcp->lane_in.dringp = dp;
462
463	/*
464	 * Mark the descriptor state as 'done'. This is implementation specific
465	 * and not required by the protocol. In our implementation, we only
466	 * need the descripor to be in 'done' state to be used by the transmit
467	 * function and the peer is not aware of it. As the protocol requires
468	 * that during initial registration the exporting end point mark the
469	 * dstate as 'free', we change it 'done' here. After this, the dstate
470	 * in our implementation will keep moving between 'ready', set by our
471	 * transmit function; and and 'done', set by the peer (per protocol)
472	 * after receiving data.
473	 * Setup on_trap() protection before accessing dring shared memory area.
474	 */
475	rv = LDC_ON_TRAP(&otd);
476	if (rv != 0) {
477		/*
478		 * Data access fault occured down the code path below while
479		 * accessing the descriptors. Return failure.
480		 */
481		goto fail;
482	}
483
484	txdp = (vnet_rx_dringdata_desc_t *)dp->pub_addr;
485	for (i = 0; i < dp->num_descriptors; i++) {
486		txdp[i].dstate = VIO_DESC_DONE;
487	}
488
489	(void) LDC_NO_TRAP();
490
491	return (dp);
492
493fail:
494	if (dp->dring_handle != NULL) {
495		(void) ldc_mem_dring_unmap(dp->dring_handle);
496	}
497	kmem_free(dp, sizeof (*dp));
498	return (NULL);
499}
500
501/*
502 * Unmap the transmit descriptor ring.
503 */
504void
505vsw_unmap_tx_dring(vsw_ldc_t *ldcp)
506{
507	lane_t		*lp = &ldcp->lane_in;
508	dring_info_t	*dp;
509
510	if ((dp = lp->dringp) == NULL) {
511		return;
512	}
513
514	/* Unmap tx data area and free data handle */
515	if (dp->data_handle != NULL) {
516		(void) ldc_mem_unmap(dp->data_handle);
517		(void) ldc_mem_free_handle(dp->data_handle);
518		dp->data_handle = NULL;
519	}
520
521	/* Free tx data area cookies */
522	if (dp->data_cookie != NULL) {
523		kmem_free(dp->data_cookie, dp->data_ncookies *
524		    sizeof (ldc_mem_cookie_t));
525		dp->data_cookie = NULL;
526		dp->data_ncookies = 0;
527	}
528
529	/* Unmap peer's dring */
530	if (dp->dring_handle != NULL) {
531		(void) ldc_mem_dring_unmap(dp->dring_handle);
532		dp->dring_handle = NULL;
533	}
534
535	mutex_destroy(&dp->txlock);
536	kmem_free(dp, sizeof (dring_info_t));
537	lp->dringp = NULL;
538}
539
540/*
541 * A per LDC worker thread to process the rx dring and receive packets. This
542 * thread is woken up by the LDC interrupt handler when a dring data info
543 * message is received.
544 */
545void
546vsw_ldc_rcv_worker(void *arg)
547{
548	callb_cpr_t	cprinfo;
549	vsw_ldc_t	*ldcp = (vsw_ldc_t *)arg;
550	vsw_t		*vswp = ldcp->ldc_vswp;
551
552	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
553	CALLB_CPR_INIT(&cprinfo, &ldcp->rcv_thr_lock, callb_generic_cpr,
554	    "vsw_rcv_thread");
555	mutex_enter(&ldcp->rcv_thr_lock);
556	while (!(ldcp->rcv_thr_flags & VSW_WTHR_STOP)) {
557
558		CALLB_CPR_SAFE_BEGIN(&cprinfo);
559		/*
560		 * Wait until the data is received or a stop
561		 * request is received.
562		 */
563		while (!(ldcp->rcv_thr_flags &
564		    (VSW_WTHR_DATARCVD | VSW_WTHR_STOP))) {
565			cv_wait(&ldcp->rcv_thr_cv, &ldcp->rcv_thr_lock);
566		}
567		CALLB_CPR_SAFE_END(&cprinfo, &ldcp->rcv_thr_lock)
568
569		/*
570		 * First process the stop request.
571		 */
572		if (ldcp->rcv_thr_flags & VSW_WTHR_STOP) {
573			D2(vswp, "%s(%lld):Rx thread stopped\n",
574			    __func__, ldcp->ldc_id);
575			break;
576		}
577		ldcp->rcv_thr_flags &= ~VSW_WTHR_DATARCVD;
578		mutex_exit(&ldcp->rcv_thr_lock);
579		D1(vswp, "%s(%lld):calling vsw_process_pkt\n",
580		    __func__, ldcp->ldc_id);
581		vsw_ldc_rcv_shm(ldcp);
582		mutex_enter(&ldcp->rcv_thr_lock);
583	}
584
585	/*
586	 * Update the run status and wakeup the thread that
587	 * has sent the stop request.
588	 */
589	ldcp->rcv_thr_flags &= ~VSW_WTHR_STOP;
590	ldcp->rcv_thread = NULL;
591	CALLB_CPR_EXIT(&cprinfo);
592	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
593	thread_exit();
594}
595
596/*
597 * Process the rx descriptor ring in the context of receive worker
598 * thread and switch the received packets to their destinations.
599 */
600static void
601vsw_ldc_rcv_shm(vsw_ldc_t *ldcp)
602{
603	int		rv;
604	uint32_t	end_ix;
605	vio_dring_msg_t msg;
606	vio_dring_msg_t	*msgp = &msg;
607	int		count = 0;
608	int		total_count = 0;
609	uint32_t	retries = 0;
610	mblk_t		*bp = NULL;
611	mblk_t		*bpt = NULL;
612	mblk_t		*mp = NULL;
613	vsw_t		*vswp = ldcp->ldc_vswp;
614	lane_t		*lp = &ldcp->lane_out;
615	dring_info_t	*dp = lp->dringp;
616
617	do {
618again:
619		rv = vsw_receive_packet(ldcp, &mp);
620		if (rv != 0) {
621			if (rv == EINVAL) {
622				/* Invalid descriptor error; get next */
623				continue;
624			}
625			if (rv != EAGAIN) {
626				break;
627			}
628
629			/* Descriptor not ready for processsing */
630			if (retries == vsw_recv_retries) {
631				DTRACE_PROBE1(vsw_noready_rxds,
632				    vsw_ldc_t *, ldcp);
633				break;
634			}
635
636			/* Switch packets received so far before retrying */
637			if (bp != NULL) {
638				VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
639				    total_count);
640			}
641			retries++;
642			drv_usecwait(vsw_recv_delay);
643			goto again;
644		}
645		retries = 0;
646
647		/* Build a chain of received packets */
648		if (bp == NULL) {
649			/* first pkt */
650			bp = mp;
651			bpt = bp;
652			bpt->b_next = NULL;
653		} else {
654			mp->b_next = NULL;
655			bpt->b_next = mp;
656			bpt = mp;
657		}
658
659		total_count++;
660		count++;
661
662		/*
663		 * If we have gathered vsw_chain_len (tunable)
664		 * # of packets in the chain, switch them.
665		 */
666		if (count == vsw_chain_len) {
667			VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
668			    total_count);
669		}
670
671		/*
672		 * Stop further processing if we processed the entire dring
673		 * once; otherwise continue.
674		 */
675	} while (total_count < dp->num_bufs);
676
677	DTRACE_PROBE2(vsw_rx_total_count, vsw_ldc_t *, ldcp,
678	    int, (total_count));
679	if (bp != NULL) {
680		VSW_SWITCH_FRAMES(vswp, ldcp, bp, bpt, count,
681		    total_count);
682	}
683
684	/* Send stopped signal to peer (sender) */
685	end_ix = lp->dringp->next_rxi;
686	DECR_RXI(dp, end_ix);
687	msgp->tag.vio_msgtype = VIO_TYPE_DATA;
688	msgp->tag.vio_subtype = VIO_SUBTYPE_ACK;
689	msgp->tag.vio_subtype_env = VIO_DRING_DATA;
690	msgp->dring_ident = ldcp->lane_in.dringp->ident;
691	msgp->tag.vio_sid = ldcp->local_session;
692	msgp->dring_process_state = VIO_DP_STOPPED;
693	msgp->start_idx = VNET_START_IDX_UNSPEC;
694	msgp->end_idx = end_ix;
695
696	(void) vsw_send_msg_shm(ldcp, (void *)msgp,
697	    sizeof (vio_dring_msg_t), B_TRUE);
698
699	ldcp->ldc_stats.dring_data_acks_sent++;
700	ldcp->ldc_stats.dring_stopped_acks_sent++;
701}
702
703/*
704 * Process the next index in the rx dring and receive the associated packet.
705 *
706 * Returns:
707 *	bp:	Success: The received packet.
708 *		Failure: NULL
709 *      retval:
710 *		Success: 0
711 *		Failure: EAGAIN: Descriptor not ready
712 *			 EIO:    Descriptor contents invalid.
713 */
714static int
715vsw_receive_packet(vsw_ldc_t *ldcp, mblk_t **bp)
716{
717	uint32_t			rxi;
718	vio_mblk_t			*vmp;
719	vio_mblk_t			*new_vmp;
720	struct ether_header		*ehp;
721	vnet_rx_dringdata_desc_t	*rxdp;
722	int				err = 0;
723	uint_t				nbytes = 0;
724	mblk_t				*mp = NULL;
725	mblk_t				*dmp = NULL;
726	vgen_stats_t			*statsp = &ldcp->ldc_stats;
727	dring_info_t			*dp = ldcp->lane_out.dringp;
728	vnet_rx_dringdata_desc_t	*pub_addr = dp->pub_addr;
729
730	rxi = dp->next_rxi;
731	rxdp = &(pub_addr[rxi]);
732	vmp = dp->rxdp_to_vmp[rxi];
733
734	if (rxdp->dstate != VIO_DESC_READY) {
735		/*
736		 * Descriptor is not ready.
737		 */
738		return (EAGAIN);
739	}
740
741	/*
742	 * Ensure load ordering of dstate and nbytes.
743	 */
744	MEMBAR_CONSUMER();
745
746	if ((rxdp->nbytes < ETHERMIN) ||
747	    (rxdp->nbytes > ldcp->lane_in.mtu) ||
748	    (rxdp->data_buf_offset !=
749	    (VIO_MBLK_DATA_OFF(vmp) + VNET_IPALIGN))) {
750		/*
751		 * Descriptor contents invalid.
752		 */
753		statsp->ierrors++;
754		rxdp->dstate = VIO_DESC_DONE;
755		err = EIO;
756		goto done;
757	}
758
759	/*
760	 * Now allocate a new buffer for this descriptor before sending up the
761	 * buffer being processed. If that fails, stop processing; as we are
762	 * out of receive buffers.
763	 */
764	new_vmp = vio_allocb(dp->rx_vmp);
765
766	/*
767	 * Process the current buffer being received.
768	 */
769	nbytes = rxdp->nbytes;
770	mp = vmp->mp;
771
772	if (new_vmp == NULL) {
773		/*
774		 * We failed to get a new mapped buffer that is needed to
775		 * refill the descriptor. In that case, leave the current
776		 * buffer bound to the descriptor; allocate an mblk dynamically
777		 * and copy the contents of the buffer to the mblk. Then send
778		 * up this mblk. This way the sender has the same buffer as
779		 * before that can be used to send new data.
780		 */
781		statsp->norcvbuf++;
782		dmp = allocb(nbytes + VNET_IPALIGN, BPRI_MED);
783		bcopy(mp->b_rptr + VNET_IPALIGN,
784		    dmp->b_rptr + VNET_IPALIGN, nbytes);
785		mp = dmp;
786	} else {
787		/* Mark the status of the current rbuf */
788		vmp->state = VIO_MBLK_HAS_DATA;
789
790		/* Set the offset of the new buffer in the descriptor */
791		rxdp->data_buf_offset =
792		    VIO_MBLK_DATA_OFF(new_vmp) + VNET_IPALIGN;
793		dp->rxdp_to_vmp[rxi] = new_vmp;
794	}
795	mp->b_rptr += VNET_IPALIGN;
796	mp->b_wptr = mp->b_rptr + nbytes;
797
798	/*
799	 * Ensure store ordering of data_buf_offset and dstate; so that the
800	 * peer sees the right data_buf_offset after it checks that the dstate
801	 * is DONE.
802	 */
803	MEMBAR_PRODUCER();
804
805	/* Now mark the descriptor 'done' */
806	rxdp->dstate = VIO_DESC_DONE;
807
808	/* Update stats */
809	statsp->ipackets++;
810	statsp->rbytes += rxdp->nbytes;
811	ehp = (struct ether_header *)mp->b_rptr;
812	if (IS_BROADCAST(ehp))
813		statsp->brdcstrcv++;
814	else if (IS_MULTICAST(ehp))
815		statsp->multircv++;
816done:
817	/* Update the next index to be processed */
818	INCR_RXI(dp, rxi);
819
820	/* Save the new recv index */
821	dp->next_rxi = rxi;
822
823	/* Return the packet received */
824	*bp = mp;
825	return (err);
826}
827
828void
829vsw_stop_rcv_thread(vsw_ldc_t *ldcp)
830{
831	kt_did_t	tid = 0;
832	vsw_t		*vswp = ldcp->ldc_vswp;
833
834	D1(vswp, "%s(%lld):enter\n", __func__, ldcp->ldc_id);
835	/*
836	 * Send a stop request by setting the stop flag and
837	 * wait until the rcv process thread stops.
838	 */
839	mutex_enter(&ldcp->rcv_thr_lock);
840	if (ldcp->rcv_thread != NULL) {
841		tid = ldcp->rcv_thread->t_did;
842		ldcp->rcv_thr_flags |= VSW_WTHR_STOP;
843		cv_signal(&ldcp->rcv_thr_cv);
844	}
845	mutex_exit(&ldcp->rcv_thr_lock);
846
847	if (tid != 0) {
848		thread_join(tid);
849	}
850	D1(vswp, "%s(%lld):exit\n", __func__, ldcp->ldc_id);
851}
852
853int
854vsw_dringsend_shm(vsw_ldc_t *ldcp, mblk_t *mp)
855{
856	uint32_t			next_txi;
857	uint32_t			txi;
858	vnet_rx_dringdata_desc_t	*txdp;
859	struct ether_header		*ehp;
860	size_t				mblksz;
861	caddr_t				dst;
862	mblk_t				*bp;
863	size_t				size;
864	on_trap_data_t			otd;
865	uint32_t			buf_offset;
866	vnet_rx_dringdata_desc_t	*pub_addr;
867	vio_dring_msg_t			msg;
868	vio_dring_msg_t			*msgp = &msg;
869	int				rv = 0;
870	boolean_t			resched_peer = B_FALSE;
871	boolean_t			is_bcast = B_FALSE;
872	boolean_t			is_mcast = B_FALSE;
873	vgen_stats_t			*statsp = &ldcp->ldc_stats;
874	lane_t				*lane_in = &ldcp->lane_in;
875	lane_t				*lane_out = &ldcp->lane_out;
876	dring_info_t			*dp = lane_in->dringp;
877	vsw_t				*vswp = ldcp->ldc_vswp;
878
879	if ((!(lane_in->lstate & VSW_LANE_ACTIVE)) ||
880	    (ldcp->ldc_status != LDC_UP) || (ldcp->ldc_handle == NULL)) {
881		DWARN(vswp, "%s(%lld) status(%d) lstate(0x%llx), dropping "
882		    "packet\n", __func__, ldcp->ldc_id, ldcp->ldc_status,
883		    lane_in->lstate);
884		statsp->oerrors++;
885		return (LDC_TX_FAILURE);
886	}
887
888	if (dp == NULL) {
889		DERR(vswp, "%s(%lld): no dring for outbound lane on"
890		    " channel %d", __func__, ldcp->ldc_id, ldcp->ldc_id);
891		statsp->oerrors++;
892		return (LDC_TX_FAILURE);
893	}
894	pub_addr = dp->pub_addr;
895
896	size = msgsize(mp);
897
898	/*
899	 * Note: In RxDringData mode, lane_in is associated with transmit and
900	 * lane_out is associated with receive. However, we still keep the
901	 * negotiated mtu in lane_out (our exported attributes).
902	 */
903	if (size > (size_t)lane_out->mtu) {
904		DERR(vswp, "%s(%lld) invalid size (%ld)\n", __func__,
905		    ldcp->ldc_id, size);
906		statsp->oerrors++;
907		return (LDC_TX_FAILURE);
908	}
909
910	if (size < ETHERMIN)
911		size = ETHERMIN;
912
913	ehp = (struct ether_header *)mp->b_rptr;
914	is_bcast = IS_BROADCAST(ehp);
915	is_mcast = IS_MULTICAST(ehp);
916
917	/*
918	 * Setup on_trap() protection before accessing shared memory areas
919	 * (descriptor and data buffer). Note that we enable this protection a
920	 * little early and turn it off slightly later, than keeping it enabled
921	 * strictly at the points in code below where the descriptor and data
922	 * buffer are accessed. This is done for performance reasons:
923	 * (a) to avoid calling the trap protection code while holding mutex.
924	 * (b) to avoid multiple on/off steps for descriptor and data accesses.
925	 */
926	rv = LDC_ON_TRAP(&otd);
927	if (rv != 0) {
928		/*
929		 * Data access fault occured down the code path below while
930		 * accessing either the descriptor or the data buffer. Release
931		 * any locks that we might have acquired in the code below and
932		 * return failure.
933		 */
934		DERR(vswp, "%s(%lld) data access fault occured\n",
935		    __func__, ldcp->ldc_id);
936		statsp->oerrors++;
937		if (mutex_owned(&dp->txlock)) {
938			mutex_exit(&dp->txlock);
939		}
940		if (mutex_owned(&dp->restart_lock)) {
941			mutex_exit(&dp->restart_lock);
942		}
943		goto dringsend_shm_exit;
944	}
945
946	/*
947	 * Allocate a descriptor
948	 */
949	mutex_enter(&dp->txlock);
950	txi = next_txi = dp->next_txi;
951	INCR_TXI(dp, next_txi);
952	txdp = &(pub_addr[txi]);
953	if (txdp->dstate != VIO_DESC_DONE) { /* out of descriptors */
954		statsp->tx_no_desc++;
955		mutex_exit(&dp->txlock);
956		(void) LDC_NO_TRAP();
957		return (LDC_TX_NORESOURCES);
958	} else {
959		txdp->dstate = VIO_DESC_INITIALIZING;
960	}
961
962	/* Update descriptor ring index */
963	dp->next_txi = next_txi;
964	mutex_exit(&dp->txlock);
965
966	/* Ensure load ordering of dstate (above) and data_buf_offset. */
967	MEMBAR_CONSUMER();
968
969	/* Get the offset of the buffer to be used */
970	buf_offset = txdp->data_buf_offset;
971
972	/* Access the buffer using the offset */
973	dst = (caddr_t)dp->data_addr + buf_offset;
974
975	/* Copy data into mapped transmit buffer */
976	for (bp = mp; bp != NULL; bp = bp->b_cont) {
977		mblksz = MBLKL(bp);
978		bcopy(bp->b_rptr, dst, mblksz);
979		dst += mblksz;
980	}
981
982	/* Set the size of data in the descriptor */
983	txdp->nbytes = size;
984
985	/*
986	 * Ensure store ordering of nbytes and dstate (below); so that the peer
987	 * sees the right nbytes value after it checks that the dstate is READY.
988	 */
989	MEMBAR_PRODUCER();
990
991	mutex_enter(&dp->restart_lock);
992
993	ASSERT(txdp->dstate == VIO_DESC_INITIALIZING);
994
995	/* Mark the descriptor ready */
996	txdp->dstate = VIO_DESC_READY;
997
998	/* Check if peer needs wake up (handled below) */
999	if (dp->restart_reqd == B_TRUE && dp->restart_peer_txi == txi) {
1000		dp->restart_reqd = B_FALSE;
1001		resched_peer = B_TRUE;
1002	}
1003
1004	/* Update tx stats */
1005	statsp->opackets++;
1006	statsp->obytes += size;
1007	if (is_bcast)
1008		statsp->brdcstxmt++;
1009	else if (is_mcast)
1010		statsp->multixmt++;
1011
1012	mutex_exit(&dp->restart_lock);
1013
1014	/*
1015	 * We are done accessing shared memory; clear trap protection.
1016	 */
1017	(void) LDC_NO_TRAP();
1018
1019	/*
1020	 * Need to wake up the peer ?
1021	 */
1022	if (resched_peer == B_TRUE) {
1023		msgp->tag.vio_msgtype = VIO_TYPE_DATA;
1024		msgp->tag.vio_subtype = VIO_SUBTYPE_INFO;
1025		msgp->tag.vio_subtype_env = VIO_DRING_DATA;
1026		msgp->tag.vio_sid = ldcp->local_session;
1027		msgp->dring_ident = lane_out->dringp->ident;
1028		msgp->start_idx = txi;
1029		msgp->end_idx = -1;
1030
1031		rv = vsw_send_msg_shm(ldcp, (void *)msgp, sizeof (*msgp),
1032		    B_FALSE);
1033		if (rv != 0) {
1034			/* error: drop the packet */
1035			DERR(vswp, "%s(%lld) failed sending dringdata msg\n",
1036			    __func__, ldcp->ldc_id);
1037			mutex_enter(&dp->restart_lock);
1038			statsp->oerrors++;
1039			dp->restart_reqd = B_TRUE;
1040			mutex_exit(&dp->restart_lock);
1041		}
1042		statsp->dring_data_msgs_sent++;
1043	}
1044
1045dringsend_shm_exit:
1046	if (rv == ECONNRESET || rv == EACCES) {
1047		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1048	}
1049	return (LDC_TX_SUCCESS);
1050}
1051
1052void
1053vsw_process_dringdata_shm(void *arg, void *dpkt)
1054{
1055	vsw_ldc_t		*ldcp = arg;
1056	vsw_t			*vswp = ldcp->ldc_vswp;
1057	vio_dring_msg_t		*dring_pkt = dpkt;
1058
1059	switch (dring_pkt->tag.vio_subtype) {
1060	case VIO_SUBTYPE_INFO:
1061		D2(vswp, "%s(%lld): VIO_SUBTYPE_INFO", __func__, ldcp->ldc_id);
1062		vsw_process_dringdata_info_shm(ldcp, dring_pkt);
1063		break;
1064
1065	case VIO_SUBTYPE_ACK:
1066		D2(vswp, "%s(%lld): VIO_SUBTYPE_ACK", __func__, ldcp->ldc_id);
1067		vsw_process_dringdata_ack_shm(ldcp, dring_pkt);
1068		break;
1069
1070	case VIO_SUBTYPE_NACK:
1071		DWARN(vswp, "%s(%lld): VIO_SUBTYPE_NACK",
1072		    __func__, ldcp->ldc_id);
1073		/*
1074		 * Something is badly wrong if we are getting NACK's
1075		 * for our data pkts. So reset the channel.
1076		 */
1077		vsw_process_conn_evt(ldcp, VSW_CONN_RESTART);
1078		break;
1079
1080	default:
1081		DERR(vswp, "%s(%lld): Unknown vio_subtype %x\n", __func__,
1082		    ldcp->ldc_id, dring_pkt->tag.vio_subtype);
1083	}
1084}
1085
1086static void
1087vsw_process_dringdata_info_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1088{
1089	dring_info_t	*dp = ldcp->lane_in.dringp;
1090	vsw_t		*vswp = ldcp->ldc_vswp;
1091	vgen_stats_t	*statsp = &ldcp->ldc_stats;
1092
1093	if (dp->ident != msg->dring_ident) {
1094		/* drop the message */
1095		DERR(vswp, "%s(%lld): Invalid dring ident 0x%llx",
1096		    __func__, ldcp->ldc_id, msg->dring_ident);
1097		return;
1098	}
1099
1100	statsp->dring_data_msgs_rcvd++;
1101
1102	/*
1103	 * Wake up the rcv worker thread to process the rx dring.
1104	 */
1105	ASSERT(MUTEX_HELD(&ldcp->ldc_cblock));
1106	mutex_exit(&ldcp->ldc_cblock);
1107	mutex_enter(&ldcp->rcv_thr_lock);
1108	if (!(ldcp->rcv_thr_flags & VSW_WTHR_DATARCVD)) {
1109		ldcp->rcv_thr_flags |= VSW_WTHR_DATARCVD;
1110		cv_signal(&ldcp->rcv_thr_cv);
1111	}
1112	mutex_exit(&ldcp->rcv_thr_lock);
1113	mutex_enter(&ldcp->ldc_cblock);
1114}
1115
1116static void
1117vsw_process_dringdata_ack_shm(vsw_ldc_t *ldcp, vio_dring_msg_t *msg)
1118{
1119	dring_info_t			*dp;
1120	uint32_t			start;
1121	int32_t				end;
1122	int				rv;
1123	on_trap_data_t			otd;
1124	uint32_t			txi;
1125	vnet_rx_dringdata_desc_t	*txdp;
1126	vnet_rx_dringdata_desc_t	*pub_addr;
1127	boolean_t			ready_txd = B_FALSE;
1128	vsw_t				*vswp = ldcp->ldc_vswp;
1129	vgen_stats_t			*statsp = &ldcp->ldc_stats;
1130
1131	dp = ldcp->lane_in.dringp;
1132	start = msg->start_idx;
1133	end = msg->end_idx;
1134	pub_addr = dp->pub_addr;
1135
1136	/*
1137	 * In RxDringData mode (v1.6), start index of -1 can be used by the
1138	 * peer to indicate that it is unspecified. However, the end index
1139	 * must be set correctly indicating the last descriptor index processed.
1140	 */
1141	if (((start != VNET_START_IDX_UNSPEC) && !(CHECK_TXI(dp, start))) ||
1142	    !(CHECK_TXI(dp, end))) {
1143		/* drop the message if invalid index */
1144		DWARN(vswp, "%s(%lld): Invalid Tx ack start(%d) or end(%d)\n",
1145		    __func__, ldcp->ldc_id, start, end);
1146		return;
1147	}
1148
1149	/* Validate dring_ident */
1150	if (msg->dring_ident != ldcp->lane_out.dringp->ident) {
1151		/* invalid dring_ident, drop the msg */
1152		DWARN(vswp, "%s(%lld): Invalid dring ident 0x%x\n",
1153		    __func__, ldcp->ldc_id, msg->dring_ident);
1154		return;
1155	}
1156	statsp->dring_data_acks_rcvd++;
1157
1158	if (msg->dring_process_state != VIO_DP_STOPPED) {
1159		/*
1160		 * Receiver continued processing
1161		 * dring after sending us the ack.
1162		 */
1163		return;
1164	}
1165
1166	statsp->dring_stopped_acks_rcvd++;
1167
1168	/*
1169	 * Setup on_trap() protection before accessing dring shared memory area.
1170	 */
1171	rv = LDC_ON_TRAP(&otd);
1172	if (rv != 0) {
1173		/*
1174		 * Data access fault occured down the code path below while
1175		 * accessing the descriptors. Release any locks that we might
1176		 * have acquired in the code below and return failure.
1177		 */
1178		if (mutex_owned(&dp->restart_lock)) {
1179			mutex_exit(&dp->restart_lock);
1180		}
1181		return;
1182	}
1183
1184	/*
1185	 * Determine if there are any pending tx descriptors ready to be
1186	 * processed by the receiver(peer) and if so, send a message to the
1187	 * peer to restart receiving.
1188	 */
1189	mutex_enter(&dp->restart_lock);
1190
1191	ready_txd = B_FALSE;
1192	txi = end;
1193	INCR_TXI(dp, txi);
1194	txdp = &pub_addr[txi];
1195	if (txdp->dstate == VIO_DESC_READY) {
1196		ready_txd = B_TRUE;
1197	}
1198
1199	/*
1200	 * We are done accessing shared memory; clear trap protection.
1201	 */
1202	(void) LDC_NO_TRAP();
1203
1204	if (ready_txd == B_FALSE) {
1205		/*
1206		 * No ready tx descriptors. Set the flag to send a message to
1207		 * the peer when tx descriptors are ready in transmit routine.
1208		 */
1209		dp->restart_reqd = B_TRUE;
1210		dp->restart_peer_txi = txi;
1211		mutex_exit(&dp->restart_lock);
1212		return;
1213	}
1214
1215	/*
1216	 * We have some tx descriptors ready to be processed by the receiver.
1217	 * Send a dring data message to the peer to restart processing.
1218	 */
1219	dp->restart_reqd = B_FALSE;
1220	mutex_exit(&dp->restart_lock);
1221
1222	msg->tag.vio_msgtype = VIO_TYPE_DATA;
1223	msg->tag.vio_subtype = VIO_SUBTYPE_INFO;
1224	msg->tag.vio_subtype_env = VIO_DRING_DATA;
1225	msg->tag.vio_sid = ldcp->local_session;
1226	msg->dring_ident = ldcp->lane_out.dringp->ident;
1227	msg->start_idx = txi;
1228	msg->end_idx = -1;
1229	rv = vsw_send_msg_shm(ldcp, (void *)msg,
1230	    sizeof (vio_dring_msg_t), B_FALSE);
1231	statsp->dring_data_msgs_sent++;
1232	if (rv != 0) {
1233		mutex_enter(&dp->restart_lock);
1234		dp->restart_reqd = B_TRUE;
1235		mutex_exit(&dp->restart_lock);
1236	}
1237
1238	if (rv == ECONNRESET) {
1239		vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1240	}
1241}
1242
1243/*
1244 * Send dring data msgs (info/ack/nack) over LDC.
1245 */
1246int
1247vsw_send_msg_shm(vsw_ldc_t *ldcp, void *msgp, int size, boolean_t handle_reset)
1248{
1249	int			rv;
1250	int			retries = vsw_wretries;
1251	size_t			msglen = size;
1252	vsw_t			*vswp = ldcp->ldc_vswp;
1253	vio_dring_msg_t		*dmsg = (vio_dring_msg_t *)msgp;
1254
1255	D1(vswp, "vsw_send_msg (%lld) enter : sending %d bytes",
1256	    ldcp->ldc_id, size);
1257
1258	dmsg->seq_num = atomic_inc_32_nv(&ldcp->dringdata_msgid);
1259
1260	do {
1261		msglen = size;
1262		rv = ldc_write(ldcp->ldc_handle, (caddr_t)msgp, &msglen);
1263	} while (rv == EWOULDBLOCK && --retries > 0);
1264
1265	if ((rv != 0) || (msglen != size)) {
1266		DERR(vswp, "vsw_send_msg_shm:ldc_write failed: "
1267		    "chan(%lld) rv(%d) size (%d) msglen(%d)\n",
1268		    ldcp->ldc_id, rv, size, msglen);
1269		ldcp->ldc_stats.oerrors++;
1270	}
1271
1272	/*
1273	 * If channel has been reset we either handle it here or
1274	 * simply report back that it has been reset and let caller
1275	 * decide what to do.
1276	 */
1277	if (rv == ECONNRESET) {
1278		DWARN(vswp, "%s (%lld) channel reset", __func__, ldcp->ldc_id);
1279
1280		if (handle_reset) {
1281			vsw_process_conn_evt(ldcp, VSW_CONN_RESET);
1282		}
1283	}
1284
1285	return (rv);
1286}
1287