tcp_fusion.c revision 11042:2d6e217af1b4
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25
26#include <sys/types.h>
27#include <sys/stream.h>
28#include <sys/strsun.h>
29#include <sys/strsubr.h>
30#include <sys/debug.h>
31#include <sys/sdt.h>
32#include <sys/cmn_err.h>
33#include <sys/tihdr.h>
34
35#include <inet/common.h>
36#include <inet/optcom.h>
37#include <inet/ip.h>
38#include <inet/ip_if.h>
39#include <inet/ip_impl.h>
40#include <inet/tcp.h>
41#include <inet/tcp_impl.h>
42#include <inet/ipsec_impl.h>
43#include <inet/ipclassifier.h>
44#include <inet/ipp_common.h>
45#include <inet/ip_if.h>
46
47/*
48 * This file implements TCP fusion - a protocol-less data path for TCP
49 * loopback connections.  The fusion of two local TCP endpoints occurs
50 * at connection establishment time.  Various conditions (see details
51 * in tcp_fuse()) need to be met for fusion to be successful.  If it
52 * fails, we fall back to the regular TCP data path; if it succeeds,
53 * both endpoints proceed to use tcp_fuse_output() as the transmit path.
54 * tcp_fuse_output() enqueues application data directly onto the peer's
55 * receive queue; no protocol processing is involved.
56 *
57 * Sychronization is handled by squeue and the mutex tcp_non_sq_lock.
58 * One of the requirements for fusion to succeed is that both endpoints
59 * need to be using the same squeue.  This ensures that neither side
60 * can disappear while the other side is still sending data. Flow
61 * control information is manipulated outside the squeue, so the
62 * tcp_non_sq_lock must be held when touching tcp_flow_stopped.
63 */
64
65/*
66 * Setting this to false means we disable fusion altogether and
67 * loopback connections would go through the protocol paths.
68 */
69boolean_t do_tcp_fusion = B_TRUE;
70
71/*
72 * This routine gets called by the eager tcp upon changing state from
73 * SYN_RCVD to ESTABLISHED.  It fuses a direct path between itself
74 * and the active connect tcp such that the regular tcp processings
75 * may be bypassed under allowable circumstances.  Because the fusion
76 * requires both endpoints to be in the same squeue, it does not work
77 * for simultaneous active connects because there is no easy way to
78 * switch from one squeue to another once the connection is created.
79 * This is different from the eager tcp case where we assign it the
80 * same squeue as the one given to the active connect tcp during open.
81 */
82void
83tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcpha_t *tcpha)
84{
85	conn_t		*peer_connp, *connp = tcp->tcp_connp;
86	tcp_t		*peer_tcp;
87	tcp_stack_t	*tcps = tcp->tcp_tcps;
88	netstack_t	*ns;
89	ip_stack_t	*ipst = tcps->tcps_netstack->netstack_ip;
90
91	ASSERT(!tcp->tcp_fused);
92	ASSERT(tcp->tcp_loopback);
93	ASSERT(tcp->tcp_loopback_peer == NULL);
94	/*
95	 * We need to inherit conn_rcvbuf of the listener tcp,
96	 * but we can't really use tcp_listener since we get here after
97	 * sending up T_CONN_IND and tcp_tli_accept() may be called
98	 * independently, at which point tcp_listener is cleared;
99	 * this is why we use tcp_saved_listener. The listener itself
100	 * is guaranteed to be around until tcp_accept_finish() is called
101	 * on this eager -- this won't happen until we're done since we're
102	 * inside the eager's perimeter now.
103	 */
104	ASSERT(tcp->tcp_saved_listener != NULL);
105	/*
106	 * Lookup peer endpoint; search for the remote endpoint having
107	 * the reversed address-port quadruplet in ESTABLISHED state,
108	 * which is guaranteed to be unique in the system.  Zone check
109	 * is applied accordingly for loopback address, but not for
110	 * local address since we want fusion to happen across Zones.
111	 */
112	if (connp->conn_ipversion == IPV4_VERSION) {
113		peer_connp = ipcl_conn_tcp_lookup_reversed_ipv4(connp,
114		    (ipha_t *)iphdr, tcpha, ipst);
115	} else {
116		peer_connp = ipcl_conn_tcp_lookup_reversed_ipv6(connp,
117		    (ip6_t *)iphdr, tcpha, ipst);
118	}
119
120	/*
121	 * We can only proceed if peer exists, resides in the same squeue
122	 * as our conn and is not raw-socket. We also restrict fusion to
123	 * endpoints of the same type (STREAMS or non-STREAMS). The squeue
124	 * assignment of this eager tcp was done earlier at the time of SYN
125	 * processing in ip_fanout_tcp{_v6}.  Note that similar squeues by
126	 * itself doesn't guarantee a safe condition to fuse, hence we perform
127	 * additional tests below.
128	 */
129	ASSERT(peer_connp == NULL || peer_connp != connp);
130	if (peer_connp == NULL || peer_connp->conn_sqp != connp->conn_sqp ||
131	    !IPCL_IS_TCP(peer_connp) ||
132	    IPCL_IS_NONSTR(connp) != IPCL_IS_NONSTR(peer_connp)) {
133		if (peer_connp != NULL) {
134			TCP_STAT(tcps, tcp_fusion_unqualified);
135			CONN_DEC_REF(peer_connp);
136		}
137		return;
138	}
139	peer_tcp = peer_connp->conn_tcp;	/* active connect tcp */
140
141	ASSERT(peer_tcp != NULL && peer_tcp != tcp && !peer_tcp->tcp_fused);
142	ASSERT(peer_tcp->tcp_loopback_peer == NULL);
143	ASSERT(peer_connp->conn_sqp == connp->conn_sqp);
144
145	/*
146	 * Due to IRE changes the peer and us might not agree on tcp_loopback.
147	 * We bail in that case.
148	 */
149	if (!peer_tcp->tcp_loopback) {
150		TCP_STAT(tcps, tcp_fusion_unqualified);
151		CONN_DEC_REF(peer_connp);
152		return;
153	}
154	/*
155	 * Fuse the endpoints; we perform further checks against both
156	 * tcp endpoints to ensure that a fusion is allowed to happen.
157	 * In particular we bail out if kernel SSL exists.
158	 */
159	ns = tcps->tcps_netstack;
160	ipst = ns->netstack_ip;
161
162	if (!tcp->tcp_unfusable && !peer_tcp->tcp_unfusable &&
163	    (tcp->tcp_kssl_ent == NULL) && (tcp->tcp_xmit_head == NULL) &&
164	    (peer_tcp->tcp_xmit_head == NULL)) {
165		mblk_t *mp;
166		queue_t *peer_rq = peer_connp->conn_rq;
167
168		ASSERT(!TCP_IS_DETACHED(peer_tcp));
169		ASSERT(tcp->tcp_fused_sigurg_mp == NULL);
170		ASSERT(peer_tcp->tcp_fused_sigurg_mp == NULL);
171		ASSERT(tcp->tcp_kssl_ctx == NULL);
172
173		/*
174		 * We need to drain data on both endpoints during unfuse.
175		 * If we need to send up SIGURG at the time of draining,
176		 * we want to be sure that an mblk is readily available.
177		 * This is why we pre-allocate the M_PCSIG mblks for both
178		 * endpoints which will only be used during/after unfuse.
179		 * The mblk might already exist if we are doing a re-fuse.
180		 */
181		if (!IPCL_IS_NONSTR(tcp->tcp_connp)) {
182			ASSERT(!IPCL_IS_NONSTR(peer_tcp->tcp_connp));
183
184			if (tcp->tcp_fused_sigurg_mp == NULL) {
185				if ((mp = allocb(1, BPRI_HI)) == NULL)
186					goto failed;
187				tcp->tcp_fused_sigurg_mp = mp;
188			}
189
190			if (peer_tcp->tcp_fused_sigurg_mp == NULL) {
191				if ((mp = allocb(1, BPRI_HI)) == NULL)
192					goto failed;
193				peer_tcp->tcp_fused_sigurg_mp = mp;
194			}
195
196			if ((mp = allocb(sizeof (struct stroptions),
197			    BPRI_HI)) == NULL)
198				goto failed;
199		}
200
201		/* Fuse both endpoints */
202		peer_tcp->tcp_loopback_peer = tcp;
203		tcp->tcp_loopback_peer = peer_tcp;
204		peer_tcp->tcp_fused = tcp->tcp_fused = B_TRUE;
205
206		/*
207		 * We never use regular tcp paths in fusion and should
208		 * therefore clear tcp_unsent on both endpoints.  Having
209		 * them set to non-zero values means asking for trouble
210		 * especially after unfuse, where we may end up sending
211		 * through regular tcp paths which expect xmit_list and
212		 * friends to be correctly setup.
213		 */
214		peer_tcp->tcp_unsent = tcp->tcp_unsent = 0;
215
216		tcp_timers_stop(tcp);
217		tcp_timers_stop(peer_tcp);
218
219		/*
220		 * Set receive buffer and max packet size for the
221		 * active open tcp.
222		 * eager's values will be set in tcp_accept_finish.
223		 */
224		(void) tcp_rwnd_set(peer_tcp, peer_tcp->tcp_connp->conn_rcvbuf);
225
226		/*
227		 * Set the write offset value to zero since we won't
228		 * be needing any room for TCP/IP headers.
229		 */
230		if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp)) {
231			struct stroptions *stropt;
232
233			DB_TYPE(mp) = M_SETOPTS;
234			mp->b_wptr += sizeof (*stropt);
235
236			stropt = (struct stroptions *)mp->b_rptr;
237			stropt->so_flags = SO_WROFF;
238			stropt->so_wroff = 0;
239
240			/* Send the options up */
241			putnext(peer_rq, mp);
242		} else {
243			struct sock_proto_props sopp;
244
245			/* The peer is a non-STREAMS end point */
246			ASSERT(IPCL_IS_TCP(peer_connp));
247
248			sopp.sopp_flags = SOCKOPT_WROFF;
249			sopp.sopp_wroff = 0;
250			(*peer_connp->conn_upcalls->su_set_proto_props)
251			    (peer_connp->conn_upper_handle, &sopp);
252		}
253	} else {
254		TCP_STAT(tcps, tcp_fusion_unqualified);
255	}
256	CONN_DEC_REF(peer_connp);
257	return;
258
259failed:
260	if (tcp->tcp_fused_sigurg_mp != NULL) {
261		freeb(tcp->tcp_fused_sigurg_mp);
262		tcp->tcp_fused_sigurg_mp = NULL;
263	}
264	if (peer_tcp->tcp_fused_sigurg_mp != NULL) {
265		freeb(peer_tcp->tcp_fused_sigurg_mp);
266		peer_tcp->tcp_fused_sigurg_mp = NULL;
267	}
268	CONN_DEC_REF(peer_connp);
269}
270
271/*
272 * Unfuse a previously-fused pair of tcp loopback endpoints.
273 */
274void
275tcp_unfuse(tcp_t *tcp)
276{
277	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
278	tcp_stack_t *tcps = tcp->tcp_tcps;
279
280	ASSERT(tcp->tcp_fused && peer_tcp != NULL);
281	ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp);
282	ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp);
283	ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0);
284
285	/*
286	 * Cancel any pending push timers.
287	 */
288	if (tcp->tcp_push_tid != 0) {
289		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
290		tcp->tcp_push_tid = 0;
291	}
292	if (peer_tcp->tcp_push_tid != 0) {
293		(void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
294		peer_tcp->tcp_push_tid = 0;
295	}
296
297	/*
298	 * Drain any pending data; Note that in case of a detached tcp, the
299	 * draining will happen later after the tcp is unfused.  For non-
300	 * urgent data, this can be handled by the regular tcp_rcv_drain().
301	 * If we have urgent data sitting in the receive list, we will
302	 * need to send up a SIGURG signal first before draining the data.
303	 * All of these will be handled by the code in tcp_fuse_rcv_drain()
304	 * when called from tcp_rcv_drain().
305	 */
306	if (!TCP_IS_DETACHED(tcp)) {
307		(void) tcp_fuse_rcv_drain(tcp->tcp_connp->conn_rq, tcp,
308		    &tcp->tcp_fused_sigurg_mp);
309	}
310	if (!TCP_IS_DETACHED(peer_tcp)) {
311		(void) tcp_fuse_rcv_drain(peer_tcp->tcp_connp->conn_rq,
312		    peer_tcp,  &peer_tcp->tcp_fused_sigurg_mp);
313	}
314
315	/* Lift up any flow-control conditions */
316	mutex_enter(&tcp->tcp_non_sq_lock);
317	if (tcp->tcp_flow_stopped) {
318		tcp_clrqfull(tcp);
319		TCP_STAT(tcps, tcp_fusion_backenabled);
320	}
321	mutex_exit(&tcp->tcp_non_sq_lock);
322
323	mutex_enter(&peer_tcp->tcp_non_sq_lock);
324	if (peer_tcp->tcp_flow_stopped) {
325		tcp_clrqfull(peer_tcp);
326		TCP_STAT(tcps, tcp_fusion_backenabled);
327	}
328	mutex_exit(&peer_tcp->tcp_non_sq_lock);
329
330	/*
331	 * Update tha_seq and tha_ack in the header template
332	 */
333	tcp->tcp_tcpha->tha_seq = htonl(tcp->tcp_snxt);
334	tcp->tcp_tcpha->tha_ack = htonl(tcp->tcp_rnxt);
335	peer_tcp->tcp_tcpha->tha_seq = htonl(peer_tcp->tcp_snxt);
336	peer_tcp->tcp_tcpha->tha_ack = htonl(peer_tcp->tcp_rnxt);
337
338	/* Unfuse the endpoints */
339	peer_tcp->tcp_fused = tcp->tcp_fused = B_FALSE;
340	peer_tcp->tcp_loopback_peer = tcp->tcp_loopback_peer = NULL;
341}
342
343/*
344 * Fusion output routine used to handle urgent data sent by STREAMS based
345 * endpoints. This routine is called by tcp_fuse_output() for handling
346 * non-M_DATA mblks.
347 */
348void
349tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp)
350{
351	mblk_t *mp1;
352	struct T_exdata_ind *tei;
353	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
354	mblk_t *head, *prev_head = NULL;
355	tcp_stack_t	*tcps = tcp->tcp_tcps;
356
357	ASSERT(tcp->tcp_fused);
358	ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
359	ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
360	ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO);
361	ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA);
362	ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0);
363
364	/*
365	 * Urgent data arrives in the form of T_EXDATA_REQ from above.
366	 * Each occurence denotes a new urgent pointer.  For each new
367	 * urgent pointer we signal (SIGURG) the receiving app to indicate
368	 * that it needs to go into urgent mode.  This is similar to the
369	 * urgent data handling in the regular tcp.  We don't need to keep
370	 * track of where the urgent pointer is, because each T_EXDATA_REQ
371	 * "advances" the urgent pointer for us.
372	 *
373	 * The actual urgent data carried by T_EXDATA_REQ is then prepended
374	 * by a T_EXDATA_IND before being enqueued behind any existing data
375	 * destined for the receiving app.  There is only a single urgent
376	 * pointer (out-of-band mark) for a given tcp.  If the new urgent
377	 * data arrives before the receiving app reads some existing urgent
378	 * data, the previous marker is lost.  This behavior is emulated
379	 * accordingly below, by removing any existing T_EXDATA_IND messages
380	 * and essentially converting old urgent data into non-urgent.
381	 */
382	ASSERT(tcp->tcp_valid_bits & TCP_URG_VALID);
383	/* Let sender get out of urgent mode */
384	tcp->tcp_valid_bits &= ~TCP_URG_VALID;
385
386	/*
387	 * This flag indicates that a signal needs to be sent up.
388	 * This flag will only get cleared once SIGURG is delivered and
389	 * is not affected by the tcp_fused flag -- delivery will still
390	 * happen even after an endpoint is unfused, to handle the case
391	 * where the sending endpoint immediately closes/unfuses after
392	 * sending urgent data and the accept is not yet finished.
393	 */
394	peer_tcp->tcp_fused_sigurg = B_TRUE;
395
396	/* Reuse T_EXDATA_REQ mblk for T_EXDATA_IND */
397	DB_TYPE(mp) = M_PROTO;
398	tei = (struct T_exdata_ind *)mp->b_rptr;
399	tei->PRIM_type = T_EXDATA_IND;
400	tei->MORE_flag = 0;
401	mp->b_wptr = (uchar_t *)&tei[1];
402
403	TCP_STAT(tcps, tcp_fusion_urg);
404	BUMP_MIB(&tcps->tcps_mib, tcpOutUrg);
405
406	head = peer_tcp->tcp_rcv_list;
407	while (head != NULL) {
408		/*
409		 * Remove existing T_EXDATA_IND, keep the data which follows
410		 * it and relink our list.  Note that we don't modify the
411		 * tcp_rcv_last_tail since it never points to T_EXDATA_IND.
412		 */
413		if (DB_TYPE(head) != M_DATA) {
414			mp1 = head;
415
416			ASSERT(DB_TYPE(mp1->b_cont) == M_DATA);
417			head = mp1->b_cont;
418			mp1->b_cont = NULL;
419			head->b_next = mp1->b_next;
420			mp1->b_next = NULL;
421			if (prev_head != NULL)
422				prev_head->b_next = head;
423			if (peer_tcp->tcp_rcv_list == mp1)
424				peer_tcp->tcp_rcv_list = head;
425			if (peer_tcp->tcp_rcv_last_head == mp1)
426				peer_tcp->tcp_rcv_last_head = head;
427			freeb(mp1);
428		}
429		prev_head = head;
430		head = head->b_next;
431	}
432}
433
434/*
435 * Fusion output routine, called by tcp_output() and tcp_wput_proto().
436 * If we are modifying any member that can be changed outside the squeue,
437 * like tcp_flow_stopped, we need to take tcp_non_sq_lock.
438 */
439boolean_t
440tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
441{
442	conn_t		*connp = tcp->tcp_connp;
443	tcp_t		*peer_tcp = tcp->tcp_loopback_peer;
444	conn_t		*peer_connp = peer_tcp->tcp_connp;
445	boolean_t	flow_stopped, peer_data_queued = B_FALSE;
446	boolean_t	urgent = (DB_TYPE(mp) != M_DATA);
447	boolean_t	push = B_TRUE;
448	mblk_t		*mp1 = mp;
449	uint_t		ip_hdr_len;
450	uint32_t	recv_size = send_size;
451	tcp_stack_t	*tcps = tcp->tcp_tcps;
452	netstack_t	*ns = tcps->tcps_netstack;
453	ip_stack_t	*ipst = ns->netstack_ip;
454	ipsec_stack_t	*ipss = ns->netstack_ipsec;
455	iaflags_t	ixaflags = connp->conn_ixa->ixa_flags;
456	boolean_t	do_ipsec, hooks_out, hooks_in, ipobs_enabled;
457
458	ASSERT(tcp->tcp_fused);
459	ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
460	ASSERT(connp->conn_sqp == peer_connp->conn_sqp);
461	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO ||
462	    DB_TYPE(mp) == M_PCPROTO);
463
464	if (send_size == 0) {
465		freemsg(mp);
466		return (B_TRUE);
467	}
468
469	/*
470	 * Handle urgent data; we either send up SIGURG to the peer now
471	 * or do it later when we drain, in case the peer is detached
472	 * or if we're short of memory for M_PCSIG mblk.
473	 */
474	if (urgent) {
475		tcp_fuse_output_urg(tcp, mp);
476
477		mp1 = mp->b_cont;
478	}
479
480	/*
481	 * Check that we are still using an IRE_LOCAL or IRE_LOOPBACK before
482	 * further processes.
483	 */
484	if (!ip_output_verify_local(connp->conn_ixa))
485		goto unfuse;
486
487	/*
488	 * Build IP and TCP header in case we have something that needs the
489	 * headers. Those cases are:
490	 * 1. IPsec
491	 * 2. IPobs
492	 * 3. FW_HOOKS
493	 *
494	 * If tcp_xmit_mp() fails to dupb() the message, unfuse the connection
495	 * and back to regular path.
496	 */
497	if (ixaflags & IXAF_IS_IPV4) {
498		do_ipsec = (ixaflags & IXAF_IPSEC_SECURE) ||
499		    CONN_INBOUND_POLICY_PRESENT(peer_connp, ipss);
500
501		hooks_out = HOOKS4_INTERESTED_LOOPBACK_OUT(ipst);
502		hooks_in = HOOKS4_INTERESTED_LOOPBACK_IN(ipst);
503		ipobs_enabled = (ipst->ips_ip4_observe.he_interested != 0);
504	} else {
505		do_ipsec = (ixaflags & IXAF_IPSEC_SECURE) ||
506		    CONN_INBOUND_POLICY_PRESENT_V6(peer_connp, ipss);
507
508		hooks_out = HOOKS6_INTERESTED_LOOPBACK_OUT(ipst);
509		hooks_in = HOOKS6_INTERESTED_LOOPBACK_IN(ipst);
510		ipobs_enabled = (ipst->ips_ip6_observe.he_interested != 0);
511	}
512
513	/* We do logical 'or' for efficiency */
514	if (ipobs_enabled | do_ipsec | hooks_in | hooks_out) {
515		if ((mp1 = tcp_xmit_mp(tcp, mp1, tcp->tcp_mss, NULL, NULL,
516		    tcp->tcp_snxt, B_TRUE, NULL, B_FALSE)) == NULL)
517			/* If tcp_xmit_mp fails, use regular path */
518			goto unfuse;
519
520		/*
521		 * Leave all IP relevant processes to ip_output_process_local(),
522		 * which handles IPsec, IPobs, and FW_HOOKS.
523		 */
524		mp1 = ip_output_process_local(mp1, connp->conn_ixa, hooks_out,
525		    hooks_in, do_ipsec ? peer_connp : NULL);
526
527		/* If the message is dropped for any reason. */
528		if (mp1 == NULL)
529			goto unfuse;
530
531		/*
532		 * Data length might have been changed by FW_HOOKS.
533		 * We assume that the first mblk contains the TCP/IP headers.
534		 */
535		if (hooks_in || hooks_out) {
536			tcpha_t *tcpha;
537
538			ip_hdr_len = (ixaflags & IXAF_IS_IPV4) ?
539			    IPH_HDR_LENGTH((ipha_t *)mp1->b_rptr) :
540			    ip_hdr_length_v6(mp1, (ip6_t *)mp1->b_rptr);
541
542			tcpha = (tcpha_t *)&mp1->b_rptr[ip_hdr_len];
543			ASSERT((uchar_t *)tcpha + sizeof (tcpha_t) <=
544			    mp1->b_wptr);
545			recv_size += htonl(tcpha->tha_seq) - tcp->tcp_snxt;
546
547		}
548
549		/*
550		 * The message duplicated by tcp_xmit_mp is freed.
551		 * Note: the original message passed in remains unchanged.
552		 */
553		freemsg(mp1);
554	}
555
556	/*
557	 * Enqueue data into the peer's receive list; we may or may not
558	 * drain the contents depending on the conditions below.
559	 *
560	 * For non-STREAMS sockets we normally queue data directly in the
561	 * socket by calling the su_recv upcall. However, if the peer is
562	 * detached we use tcp_rcv_enqueue() instead. Queued data will be
563	 * drained when the accept completes (in tcp_accept_finish()).
564	 */
565	if (IPCL_IS_NONSTR(peer_connp) &&
566	    !TCP_IS_DETACHED(peer_tcp)) {
567		int error;
568		int flags = 0;
569
570		if ((tcp->tcp_valid_bits & TCP_URG_VALID) &&
571		    (tcp->tcp_urg == tcp->tcp_snxt)) {
572			flags = MSG_OOB;
573			(*peer_connp->conn_upcalls->su_signal_oob)
574			    (peer_connp->conn_upper_handle, 0);
575			tcp->tcp_valid_bits &= ~TCP_URG_VALID;
576		}
577		if ((*peer_connp->conn_upcalls->su_recv)(
578		    peer_connp->conn_upper_handle, mp, recv_size,
579		    flags, &error, &push) < 0) {
580			ASSERT(error != EOPNOTSUPP);
581			peer_data_queued = B_TRUE;
582		}
583	} else {
584		if (IPCL_IS_NONSTR(peer_connp) &&
585		    (tcp->tcp_valid_bits & TCP_URG_VALID) &&
586		    (tcp->tcp_urg == tcp->tcp_snxt)) {
587			/*
588			 * Can not deal with urgent pointers
589			 * that arrive before the connection has been
590			 * accept()ed.
591			 */
592			tcp->tcp_valid_bits &= ~TCP_URG_VALID;
593			freemsg(mp);
594			return (B_TRUE);
595		}
596
597		tcp_rcv_enqueue(peer_tcp, mp, recv_size,
598		    tcp->tcp_connp->conn_cred);
599
600		/* In case it wrapped around and also to keep it constant */
601		peer_tcp->tcp_rwnd += recv_size;
602	}
603
604	/*
605	 * Exercise flow-control when needed; we will get back-enabled
606	 * in either tcp_accept_finish(), tcp_unfuse(), or when data is
607	 * consumed. If peer endpoint is detached, we emulate streams flow
608	 * control by checking the peer's queue size and high water mark;
609	 * otherwise we simply use canputnext() to decide if we need to stop
610	 * our flow.
611	 *
612	 * Since we are accessing our tcp_flow_stopped and might modify it,
613	 * we need to take tcp->tcp_non_sq_lock.
614	 */
615	mutex_enter(&tcp->tcp_non_sq_lock);
616	flow_stopped = tcp->tcp_flow_stopped;
617	if ((TCP_IS_DETACHED(peer_tcp) &&
618	    (peer_tcp->tcp_rcv_cnt >= peer_connp->conn_rcvbuf)) ||
619	    (!TCP_IS_DETACHED(peer_tcp) &&
620	    !IPCL_IS_NONSTR(peer_connp) && !canputnext(peer_connp->conn_rq))) {
621		peer_data_queued = B_TRUE;
622	}
623
624	if (!flow_stopped && (peer_data_queued ||
625	    (TCP_UNSENT_BYTES(tcp) >= connp->conn_sndbuf))) {
626		tcp_setqfull(tcp);
627		flow_stopped = B_TRUE;
628		TCP_STAT(tcps, tcp_fusion_flowctl);
629		DTRACE_PROBE3(tcp__fuse__output__flowctl, tcp_t *, tcp,
630		    uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt);
631	} else if (flow_stopped && !peer_data_queued &&
632	    (TCP_UNSENT_BYTES(tcp) <= connp->conn_sndlowat)) {
633		tcp_clrqfull(tcp);
634		TCP_STAT(tcps, tcp_fusion_backenabled);
635		flow_stopped = B_FALSE;
636	}
637	mutex_exit(&tcp->tcp_non_sq_lock);
638
639	ipst->ips_loopback_packets++;
640	tcp->tcp_last_sent_len = send_size;
641
642	/* Need to adjust the following SNMP MIB-related variables */
643	tcp->tcp_snxt += send_size;
644	tcp->tcp_suna = tcp->tcp_snxt;
645	peer_tcp->tcp_rnxt += recv_size;
646	peer_tcp->tcp_rack = peer_tcp->tcp_rnxt;
647
648	BUMP_MIB(&tcps->tcps_mib, tcpOutDataSegs);
649	UPDATE_MIB(&tcps->tcps_mib, tcpOutDataBytes, send_size);
650
651	BUMP_MIB(&tcps->tcps_mib, tcpInSegs);
652	BUMP_MIB(&tcps->tcps_mib, tcpInDataInorderSegs);
653	UPDATE_MIB(&tcps->tcps_mib, tcpInDataInorderBytes, send_size);
654
655	BUMP_LOCAL(tcp->tcp_obsegs);
656	BUMP_LOCAL(peer_tcp->tcp_ibsegs);
657
658	DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size);
659
660	if (!IPCL_IS_NONSTR(peer_tcp->tcp_connp) &&
661	    !TCP_IS_DETACHED(peer_tcp)) {
662		/*
663		 * Drain the peer's receive queue it has urgent data or if
664		 * we're not flow-controlled.
665		 */
666		if (urgent || !flow_stopped) {
667			ASSERT(peer_tcp->tcp_rcv_list != NULL);
668			/*
669			 * For TLI-based streams, a thread in tcp_accept_swap()
670			 * can race with us.  That thread will ensure that the
671			 * correct peer_connp->conn_rq is globally visible
672			 * before peer_tcp->tcp_detached is visible as clear,
673			 * but we must also ensure that the load of conn_rq
674			 * cannot be reordered to be before the tcp_detached
675			 * check.
676			 */
677			membar_consumer();
678			(void) tcp_fuse_rcv_drain(peer_connp->conn_rq, peer_tcp,
679			    NULL);
680		}
681	}
682	return (B_TRUE);
683unfuse:
684	tcp_unfuse(tcp);
685	return (B_FALSE);
686}
687
688/*
689 * This routine gets called to deliver data upstream on a fused or
690 * previously fused tcp loopback endpoint; the latter happens only
691 * when there is a pending SIGURG signal plus urgent data that can't
692 * be sent upstream in the past.
693 */
694boolean_t
695tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
696{
697	mblk_t *mp;
698	conn_t	*connp = tcp->tcp_connp;
699
700#ifdef DEBUG
701	uint_t cnt = 0;
702#endif
703	tcp_stack_t	*tcps = tcp->tcp_tcps;
704	tcp_t		*peer_tcp = tcp->tcp_loopback_peer;
705
706	ASSERT(tcp->tcp_loopback);
707	ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg);
708	ASSERT(!tcp->tcp_fused || tcp->tcp_loopback_peer != NULL);
709	ASSERT(IPCL_IS_NONSTR(connp) || sigurg_mpp != NULL || tcp->tcp_fused);
710
711	/* No need for the push timer now, in case it was scheduled */
712	if (tcp->tcp_push_tid != 0) {
713		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
714		tcp->tcp_push_tid = 0;
715	}
716	/*
717	 * If there's urgent data sitting in receive list and we didn't
718	 * get a chance to send up a SIGURG signal, make sure we send
719	 * it first before draining in order to ensure that SIOCATMARK
720	 * works properly.
721	 */
722	if (tcp->tcp_fused_sigurg) {
723		ASSERT(!IPCL_IS_NONSTR(tcp->tcp_connp));
724
725		tcp->tcp_fused_sigurg = B_FALSE;
726		/*
727		 * sigurg_mpp is normally NULL, i.e. when we're still
728		 * fused and didn't get here because of tcp_unfuse().
729		 * In this case try hard to allocate the M_PCSIG mblk.
730		 */
731		if (sigurg_mpp == NULL &&
732		    (mp = allocb(1, BPRI_HI)) == NULL &&
733		    (mp = allocb_tryhard(1)) == NULL) {
734			/* Alloc failed; try again next time */
735			tcp->tcp_push_tid = TCP_TIMER(tcp,
736			    tcp_push_timer,
737			    MSEC_TO_TICK(
738			    tcps->tcps_push_timer_interval));
739			return (B_TRUE);
740		} else if (sigurg_mpp != NULL) {
741			/*
742			 * Use the supplied M_PCSIG mblk; it means we're
743			 * either unfused or in the process of unfusing,
744			 * and the drain must happen now.
745			 */
746			mp = *sigurg_mpp;
747			*sigurg_mpp = NULL;
748		}
749		ASSERT(mp != NULL);
750
751		/* Send up the signal */
752		DB_TYPE(mp) = M_PCSIG;
753		*mp->b_wptr++ = (uchar_t)SIGURG;
754		putnext(q, mp);
755
756		/*
757		 * Let the regular tcp_rcv_drain() path handle
758		 * draining the data if we're no longer fused.
759		 */
760		if (!tcp->tcp_fused)
761			return (B_FALSE);
762	}
763
764	/* Drain the data */
765	while ((mp = tcp->tcp_rcv_list) != NULL) {
766		tcp->tcp_rcv_list = mp->b_next;
767		mp->b_next = NULL;
768#ifdef DEBUG
769		cnt += msgdsize(mp);
770#endif
771		ASSERT(!IPCL_IS_NONSTR(connp));
772		putnext(q, mp);
773		TCP_STAT(tcps, tcp_fusion_putnext);
774	}
775
776#ifdef DEBUG
777	ASSERT(cnt == tcp->tcp_rcv_cnt);
778#endif
779	tcp->tcp_rcv_last_head = NULL;
780	tcp->tcp_rcv_last_tail = NULL;
781	tcp->tcp_rcv_cnt = 0;
782	tcp->tcp_rwnd = tcp->tcp_connp->conn_rcvbuf;
783
784	mutex_enter(&peer_tcp->tcp_non_sq_lock);
785	if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <=
786	    peer_tcp->tcp_connp->conn_sndlowat)) {
787		tcp_clrqfull(peer_tcp);
788		TCP_STAT(tcps, tcp_fusion_backenabled);
789	}
790	mutex_exit(&peer_tcp->tcp_non_sq_lock);
791
792	return (B_TRUE);
793}
794
795/*
796 * Calculate the size of receive buffer for a fused tcp endpoint.
797 */
798size_t
799tcp_fuse_set_rcv_hiwat(tcp_t *tcp, size_t rwnd)
800{
801	tcp_stack_t	*tcps = tcp->tcp_tcps;
802
803	ASSERT(tcp->tcp_fused);
804
805	/* Ensure that value is within the maximum upper bound */
806	if (rwnd > tcps->tcps_max_buf)
807		rwnd = tcps->tcps_max_buf;
808	/*
809	 * Round up to system page size in case SO_RCVBUF is modified
810	 * after SO_SNDBUF; the latter is also similarly rounded up.
811	 */
812	rwnd = P2ROUNDUP_TYPED(rwnd, PAGESIZE, size_t);
813
814	/*
815	 * Record high water mark, this is used for flow-control
816	 * purposes in tcp_fuse_output().
817	 */
818	tcp->tcp_connp->conn_rcvbuf = rwnd;
819	tcp->tcp_rwnd = rwnd;
820	return (rwnd);
821}
822
823/*
824 * Calculate the maximum outstanding unread data block for a fused tcp endpoint.
825 */
826int
827tcp_fuse_maxpsz(tcp_t *tcp)
828{
829	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
830	conn_t *connp = tcp->tcp_connp;
831	uint_t sndbuf = connp->conn_sndbuf;
832	uint_t maxpsz = sndbuf;
833
834	ASSERT(tcp->tcp_fused);
835	ASSERT(peer_tcp != NULL);
836	ASSERT(peer_tcp->tcp_connp->conn_rcvbuf != 0);
837	/*
838	 * In the fused loopback case, we want the stream head to split
839	 * up larger writes into smaller chunks for a more accurate flow-
840	 * control accounting.  Our maxpsz is half of the sender's send
841	 * buffer or the receiver's receive buffer, whichever is smaller.
842	 * We round up the buffer to system page size due to the lack of
843	 * TCP MSS concept in Fusion.
844	 */
845	if (maxpsz > peer_tcp->tcp_connp->conn_rcvbuf)
846		maxpsz = peer_tcp->tcp_connp->conn_rcvbuf;
847	maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1;
848
849	return (maxpsz);
850}
851
852/*
853 * Called to release flow control.
854 */
855void
856tcp_fuse_backenable(tcp_t *tcp)
857{
858	tcp_t *peer_tcp = tcp->tcp_loopback_peer;
859
860	ASSERT(tcp->tcp_fused);
861	ASSERT(peer_tcp != NULL && peer_tcp->tcp_fused);
862	ASSERT(peer_tcp->tcp_loopback_peer == tcp);
863	ASSERT(!TCP_IS_DETACHED(tcp));
864	ASSERT(tcp->tcp_connp->conn_sqp ==
865	    peer_tcp->tcp_connp->conn_sqp);
866
867	if (tcp->tcp_rcv_list != NULL)
868		(void) tcp_fuse_rcv_drain(tcp->tcp_connp->conn_rq, tcp, NULL);
869
870	mutex_enter(&peer_tcp->tcp_non_sq_lock);
871	if (peer_tcp->tcp_flow_stopped &&
872	    (TCP_UNSENT_BYTES(peer_tcp) <=
873	    peer_tcp->tcp_connp->conn_sndlowat)) {
874		tcp_clrqfull(peer_tcp);
875	}
876	mutex_exit(&peer_tcp->tcp_non_sq_lock);
877
878	TCP_STAT(tcp->tcp_tcps, tcp_fusion_backenabled);
879}
880