1/*	$NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc_tcp.c	2.2 88/08/01 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: releng/10.3/sys/rpc/svc_vc.c 287338 2015-09-01 01:01:35Z delphij $");
37
38/*
39 * svc_vc.c, Server side for Connection Oriented based RPC.
40 *
41 * Actually implements two flavors of transporter -
42 * a tcp rendezvouser (a listner and connection establisher)
43 * and a record/tcp stream.
44 */
45
46#include <sys/param.h>
47#include <sys/lock.h>
48#include <sys/kernel.h>
49#include <sys/malloc.h>
50#include <sys/mbuf.h>
51#include <sys/mutex.h>
52#include <sys/proc.h>
53#include <sys/protosw.h>
54#include <sys/queue.h>
55#include <sys/socket.h>
56#include <sys/socketvar.h>
57#include <sys/sx.h>
58#include <sys/systm.h>
59#include <sys/uio.h>
60
61#include <net/vnet.h>
62
63#include <netinet/tcp.h>
64
65#include <rpc/rpc.h>
66
67#include <rpc/krpc.h>
68#include <rpc/rpc_com.h>
69
70#include <security/mac/mac_framework.h>
71
72static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
73    struct sockaddr **, struct mbuf **);
74static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
75static void svc_vc_rendezvous_destroy(SVCXPRT *);
76static bool_t svc_vc_null(void);
77static void svc_vc_destroy(SVCXPRT *);
78static enum xprt_stat svc_vc_stat(SVCXPRT *);
79static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
80static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
81    struct sockaddr **, struct mbuf **);
82static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
83    struct sockaddr *, struct mbuf *, uint32_t *seq);
84static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
85static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
86    void *in);
87static void svc_vc_backchannel_destroy(SVCXPRT *);
88static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
89static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
90    struct sockaddr **, struct mbuf **);
91static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
92    struct sockaddr *, struct mbuf *, uint32_t *);
93static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
94    void *in);
95static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
96    struct sockaddr *raddr);
97static int svc_vc_accept(struct socket *head, struct socket **sop);
98static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
99
100static struct xp_ops svc_vc_rendezvous_ops = {
101	.xp_recv =	svc_vc_rendezvous_recv,
102	.xp_stat =	svc_vc_rendezvous_stat,
103	.xp_reply =	(bool_t (*)(SVCXPRT *, struct rpc_msg *,
104		struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
105	.xp_destroy =	svc_vc_rendezvous_destroy,
106	.xp_control =	svc_vc_rendezvous_control
107};
108
109static struct xp_ops svc_vc_ops = {
110	.xp_recv =	svc_vc_recv,
111	.xp_stat =	svc_vc_stat,
112	.xp_ack =	svc_vc_ack,
113	.xp_reply =	svc_vc_reply,
114	.xp_destroy =	svc_vc_destroy,
115	.xp_control =	svc_vc_control
116};
117
118static struct xp_ops svc_vc_backchannel_ops = {
119	.xp_recv =	svc_vc_backchannel_recv,
120	.xp_stat =	svc_vc_backchannel_stat,
121	.xp_reply =	svc_vc_backchannel_reply,
122	.xp_destroy =	svc_vc_backchannel_destroy,
123	.xp_control =	svc_vc_backchannel_control
124};
125
126/*
127 * Usage:
128 *	xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
129 *
130 * Creates, registers, and returns a (rpc) tcp based transporter.
131 * Once *xprt is initialized, it is registered as a transporter
132 * see (svc.h, xprt_register).  This routine returns
133 * a NULL if a problem occurred.
134 *
135 * The filedescriptor passed in is expected to refer to a bound, but
136 * not yet connected socket.
137 *
138 * Since streams do buffered io similar to stdio, the caller can specify
139 * how big the send and receive buffers are via the second and third parms;
140 * 0 => use the system default.
141 */
142SVCXPRT *
143svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
144    size_t recvsize)
145{
146	SVCXPRT *xprt;
147	struct sockaddr* sa;
148	int error;
149
150	SOCK_LOCK(so);
151	if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
152		SOCK_UNLOCK(so);
153		CURVNET_SET(so->so_vnet);
154		error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
155		CURVNET_RESTORE();
156		if (error)
157			return (NULL);
158		xprt = svc_vc_create_conn(pool, so, sa);
159		free(sa, M_SONAME);
160		return (xprt);
161	}
162	SOCK_UNLOCK(so);
163
164	xprt = svc_xprt_alloc();
165	sx_init(&xprt->xp_lock, "xprt->xp_lock");
166	xprt->xp_pool = pool;
167	xprt->xp_socket = so;
168	xprt->xp_p1 = NULL;
169	xprt->xp_p2 = NULL;
170	xprt->xp_ops = &svc_vc_rendezvous_ops;
171
172	CURVNET_SET(so->so_vnet);
173	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
174	CURVNET_RESTORE();
175	if (error) {
176		goto cleanup_svc_vc_create;
177	}
178
179	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
180	free(sa, M_SONAME);
181
182	xprt_register(xprt);
183
184	solisten(so, -1, curthread);
185
186	SOCKBUF_LOCK(&so->so_rcv);
187	xprt->xp_upcallset = 1;
188	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
189	SOCKBUF_UNLOCK(&so->so_rcv);
190
191	return (xprt);
192cleanup_svc_vc_create:
193	if (xprt) {
194		sx_destroy(&xprt->xp_lock);
195		svc_xprt_free(xprt);
196	}
197	return (NULL);
198}
199
200/*
201 * Create a new transport for a socket optained via soaccept().
202 */
203SVCXPRT *
204svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
205{
206	SVCXPRT *xprt = NULL;
207	struct cf_conn *cd = NULL;
208	struct sockaddr* sa = NULL;
209	struct sockopt opt;
210	int one = 1;
211	int error;
212
213	bzero(&opt, sizeof(struct sockopt));
214	opt.sopt_dir = SOPT_SET;
215	opt.sopt_level = SOL_SOCKET;
216	opt.sopt_name = SO_KEEPALIVE;
217	opt.sopt_val = &one;
218	opt.sopt_valsize = sizeof(one);
219	error = sosetopt(so, &opt);
220	if (error) {
221		return (NULL);
222	}
223
224	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
225		bzero(&opt, sizeof(struct sockopt));
226		opt.sopt_dir = SOPT_SET;
227		opt.sopt_level = IPPROTO_TCP;
228		opt.sopt_name = TCP_NODELAY;
229		opt.sopt_val = &one;
230		opt.sopt_valsize = sizeof(one);
231		error = sosetopt(so, &opt);
232		if (error) {
233			return (NULL);
234		}
235	}
236
237	cd = mem_alloc(sizeof(*cd));
238	cd->strm_stat = XPRT_IDLE;
239
240	xprt = svc_xprt_alloc();
241	sx_init(&xprt->xp_lock, "xprt->xp_lock");
242	xprt->xp_pool = pool;
243	xprt->xp_socket = so;
244	xprt->xp_p1 = cd;
245	xprt->xp_p2 = NULL;
246	xprt->xp_ops = &svc_vc_ops;
247
248	/*
249	 * See http://www.connectathon.org/talks96/nfstcp.pdf - client
250	 * has a 5 minute timer, server has a 6 minute timer.
251	 */
252	xprt->xp_idletimeout = 6 * 60;
253
254	memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
255
256	CURVNET_SET(so->so_vnet);
257	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
258	CURVNET_RESTORE();
259	if (error)
260		goto cleanup_svc_vc_create;
261
262	memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
263	free(sa, M_SONAME);
264
265	xprt_register(xprt);
266
267	SOCKBUF_LOCK(&so->so_rcv);
268	xprt->xp_upcallset = 1;
269	soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
270	SOCKBUF_UNLOCK(&so->so_rcv);
271
272	/*
273	 * Throw the transport into the active list in case it already
274	 * has some data buffered.
275	 */
276	sx_xlock(&xprt->xp_lock);
277	xprt_active(xprt);
278	sx_xunlock(&xprt->xp_lock);
279
280	return (xprt);
281cleanup_svc_vc_create:
282	if (xprt) {
283		sx_destroy(&xprt->xp_lock);
284		svc_xprt_free(xprt);
285	}
286	if (cd)
287		mem_free(cd, sizeof(*cd));
288	return (NULL);
289}
290
291/*
292 * Create a new transport for a backchannel on a clnt_vc socket.
293 */
294SVCXPRT *
295svc_vc_create_backchannel(SVCPOOL *pool)
296{
297	SVCXPRT *xprt = NULL;
298	struct cf_conn *cd = NULL;
299
300	cd = mem_alloc(sizeof(*cd));
301	cd->strm_stat = XPRT_IDLE;
302
303	xprt = svc_xprt_alloc();
304	sx_init(&xprt->xp_lock, "xprt->xp_lock");
305	xprt->xp_pool = pool;
306	xprt->xp_socket = NULL;
307	xprt->xp_p1 = cd;
308	xprt->xp_p2 = NULL;
309	xprt->xp_ops = &svc_vc_backchannel_ops;
310	return (xprt);
311}
312
313/*
314 * This does all of the accept except the final call to soaccept. The
315 * caller will call soaccept after dropping its locks (soaccept may
316 * call malloc).
317 */
318int
319svc_vc_accept(struct socket *head, struct socket **sop)
320{
321	int error = 0;
322	struct socket *so;
323
324	if ((head->so_options & SO_ACCEPTCONN) == 0) {
325		error = EINVAL;
326		goto done;
327	}
328#ifdef MAC
329	error = mac_socket_check_accept(curthread->td_ucred, head);
330	if (error != 0)
331		goto done;
332#endif
333	ACCEPT_LOCK();
334	if (TAILQ_EMPTY(&head->so_comp)) {
335		ACCEPT_UNLOCK();
336		error = EWOULDBLOCK;
337		goto done;
338	}
339	so = TAILQ_FIRST(&head->so_comp);
340	KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
341	KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
342
343	/*
344	 * Before changing the flags on the socket, we have to bump the
345	 * reference count.  Otherwise, if the protocol calls sofree(),
346	 * the socket will be released due to a zero refcount.
347	 * XXX might not need soref() since this is simpler than kern_accept.
348	 */
349	SOCK_LOCK(so);			/* soref() and so_state update */
350	soref(so);			/* file descriptor reference */
351
352	TAILQ_REMOVE(&head->so_comp, so, so_list);
353	head->so_qlen--;
354	so->so_state |= (head->so_state & SS_NBIO);
355	so->so_qstate &= ~SQ_COMP;
356	so->so_head = NULL;
357
358	SOCK_UNLOCK(so);
359	ACCEPT_UNLOCK();
360
361	*sop = so;
362
363	/* connection has been removed from the listen queue */
364	KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
365done:
366	return (error);
367}
368
369/*ARGSUSED*/
370static bool_t
371svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
372    struct sockaddr **addrp, struct mbuf **mp)
373{
374	struct socket *so = NULL;
375	struct sockaddr *sa = NULL;
376	int error;
377	SVCXPRT *new_xprt;
378
379	/*
380	 * The socket upcall calls xprt_active() which will eventually
381	 * cause the server to call us here. We attempt to accept a
382	 * connection from the socket and turn it into a new
383	 * transport. If the accept fails, we have drained all pending
384	 * connections so we call xprt_inactive().
385	 */
386	sx_xlock(&xprt->xp_lock);
387
388	error = svc_vc_accept(xprt->xp_socket, &so);
389
390	if (error == EWOULDBLOCK) {
391		/*
392		 * We must re-test for new connections after taking
393		 * the lock to protect us in the case where a new
394		 * connection arrives after our call to accept fails
395		 * with EWOULDBLOCK.
396		 */
397		ACCEPT_LOCK();
398		if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
399			xprt_inactive_self(xprt);
400		ACCEPT_UNLOCK();
401		sx_xunlock(&xprt->xp_lock);
402		return (FALSE);
403	}
404
405	if (error) {
406		SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
407		if (xprt->xp_upcallset) {
408			xprt->xp_upcallset = 0;
409			soupcall_clear(xprt->xp_socket, SO_RCV);
410		}
411		SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
412		xprt_inactive_self(xprt);
413		sx_xunlock(&xprt->xp_lock);
414		return (FALSE);
415	}
416
417	sx_xunlock(&xprt->xp_lock);
418
419	sa = 0;
420	error = soaccept(so, &sa);
421
422	if (error) {
423		/*
424		 * XXX not sure if I need to call sofree or soclose here.
425		 */
426		if (sa)
427			free(sa, M_SONAME);
428		return (FALSE);
429	}
430
431	/*
432	 * svc_vc_create_conn will call xprt_register - we don't need
433	 * to do anything with the new connection except derefence it.
434	 */
435	new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
436	if (!new_xprt) {
437		soclose(so);
438	} else {
439		SVC_RELEASE(new_xprt);
440	}
441
442	free(sa, M_SONAME);
443
444	return (FALSE); /* there is never an rpc msg to be processed */
445}
446
447/*ARGSUSED*/
448static enum xprt_stat
449svc_vc_rendezvous_stat(SVCXPRT *xprt)
450{
451
452	return (XPRT_IDLE);
453}
454
455static void
456svc_vc_destroy_common(SVCXPRT *xprt)
457{
458	SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
459	if (xprt->xp_upcallset) {
460		xprt->xp_upcallset = 0;
461		soupcall_clear(xprt->xp_socket, SO_RCV);
462	}
463	SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
464
465	if (xprt->xp_socket)
466		(void)soclose(xprt->xp_socket);
467
468	if (xprt->xp_netid)
469		(void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
470	svc_xprt_free(xprt);
471}
472
473static void
474svc_vc_rendezvous_destroy(SVCXPRT *xprt)
475{
476
477	svc_vc_destroy_common(xprt);
478}
479
480static void
481svc_vc_destroy(SVCXPRT *xprt)
482{
483	struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
484
485	svc_vc_destroy_common(xprt);
486
487	if (cd->mreq)
488		m_freem(cd->mreq);
489	if (cd->mpending)
490		m_freem(cd->mpending);
491	mem_free(cd, sizeof(*cd));
492}
493
494static void
495svc_vc_backchannel_destroy(SVCXPRT *xprt)
496{
497	struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
498	struct mbuf *m, *m2;
499
500	svc_xprt_free(xprt);
501	m = cd->mreq;
502	while (m != NULL) {
503		m2 = m;
504		m = m->m_nextpkt;
505		m_freem(m2);
506	}
507	mem_free(cd, sizeof(*cd));
508}
509
510/*ARGSUSED*/
511static bool_t
512svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
513{
514	return (FALSE);
515}
516
517static bool_t
518svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
519{
520
521	return (FALSE);
522}
523
524static bool_t
525svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
526{
527
528	return (FALSE);
529}
530
531static enum xprt_stat
532svc_vc_stat(SVCXPRT *xprt)
533{
534	struct cf_conn *cd;
535
536	cd = (struct cf_conn *)(xprt->xp_p1);
537
538	if (cd->strm_stat == XPRT_DIED)
539		return (XPRT_DIED);
540
541	if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
542		return (XPRT_MOREREQS);
543
544	if (soreadable(xprt->xp_socket))
545		return (XPRT_MOREREQS);
546
547	return (XPRT_IDLE);
548}
549
550static bool_t
551svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
552{
553
554	*ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
555	*ack -= xprt->xp_socket->so_snd.sb_cc;
556	return (TRUE);
557}
558
559static enum xprt_stat
560svc_vc_backchannel_stat(SVCXPRT *xprt)
561{
562	struct cf_conn *cd;
563
564	cd = (struct cf_conn *)(xprt->xp_p1);
565
566	if (cd->mreq != NULL)
567		return (XPRT_MOREREQS);
568
569	return (XPRT_IDLE);
570}
571
572/*
573 * If we have an mbuf chain in cd->mpending, try to parse a record from it,
574 * leaving the result in cd->mreq. If we don't have a complete record, leave
575 * the partial result in cd->mreq and try to read more from the socket.
576 */
577static int
578svc_vc_process_pending(SVCXPRT *xprt)
579{
580	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
581	struct socket *so = xprt->xp_socket;
582	struct mbuf *m;
583
584	/*
585	 * If cd->resid is non-zero, we have part of the
586	 * record already, otherwise we are expecting a record
587	 * marker.
588	 */
589	if (!cd->resid && cd->mpending) {
590		/*
591		 * See if there is enough data buffered to
592		 * make up a record marker. Make sure we can
593		 * handle the case where the record marker is
594		 * split across more than one mbuf.
595		 */
596		size_t n = 0;
597		uint32_t header;
598
599		m = cd->mpending;
600		while (n < sizeof(uint32_t) && m) {
601			n += m->m_len;
602			m = m->m_next;
603		}
604		if (n < sizeof(uint32_t)) {
605			so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
606			return (FALSE);
607		}
608		m_copydata(cd->mpending, 0, sizeof(header),
609		    (char *)&header);
610		header = ntohl(header);
611		cd->eor = (header & 0x80000000) != 0;
612		cd->resid = header & 0x7fffffff;
613		m_adj(cd->mpending, sizeof(uint32_t));
614	}
615
616	/*
617	 * Start pulling off mbufs from cd->mpending
618	 * until we either have a complete record or
619	 * we run out of data. We use m_split to pull
620	 * data - it will pull as much as possible and
621	 * split the last mbuf if necessary.
622	 */
623	while (cd->mpending && cd->resid) {
624		m = cd->mpending;
625		if (cd->mpending->m_next
626		    || cd->mpending->m_len > cd->resid)
627			cd->mpending = m_split(cd->mpending,
628			    cd->resid, M_WAITOK);
629		else
630			cd->mpending = NULL;
631		if (cd->mreq)
632			m_last(cd->mreq)->m_next = m;
633		else
634			cd->mreq = m;
635		while (m) {
636			cd->resid -= m->m_len;
637			m = m->m_next;
638		}
639	}
640
641	/*
642	 * Block receive upcalls if we have more data pending,
643	 * otherwise report our need.
644	 */
645	if (cd->mpending)
646		so->so_rcv.sb_lowat = INT_MAX;
647	else
648		so->so_rcv.sb_lowat =
649		    imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
650	return (TRUE);
651}
652
653static bool_t
654svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
655    struct sockaddr **addrp, struct mbuf **mp)
656{
657	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
658	struct uio uio;
659	struct mbuf *m;
660	struct socket* so = xprt->xp_socket;
661	XDR xdrs;
662	int error, rcvflag;
663	uint32_t xid_plus_direction[2];
664
665	/*
666	 * Serialise access to the socket and our own record parsing
667	 * state.
668	 */
669	sx_xlock(&xprt->xp_lock);
670
671	for (;;) {
672		/* If we have no request ready, check pending queue. */
673		while (cd->mpending &&
674		    (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) {
675			if (!svc_vc_process_pending(xprt))
676				break;
677		}
678
679		/* Process and return complete request in cd->mreq. */
680		if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
681
682			/*
683			 * Now, check for a backchannel reply.
684			 * The XID is in the first uint32_t of the reply
685			 * and the message direction is the second one.
686			 */
687			if ((cd->mreq->m_len >= sizeof(xid_plus_direction) ||
688			    m_length(cd->mreq, NULL) >=
689			    sizeof(xid_plus_direction)) &&
690			    xprt->xp_p2 != NULL) {
691				m_copydata(cd->mreq, 0,
692				    sizeof(xid_plus_direction),
693				    (char *)xid_plus_direction);
694				xid_plus_direction[0] =
695				    ntohl(xid_plus_direction[0]);
696				xid_plus_direction[1] =
697				    ntohl(xid_plus_direction[1]);
698				/* Check message direction. */
699				if (xid_plus_direction[1] == REPLY) {
700					clnt_bck_svccall(xprt->xp_p2,
701					    cd->mreq,
702					    xid_plus_direction[0]);
703					cd->mreq = NULL;
704					continue;
705				}
706			}
707
708			xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
709			cd->mreq = NULL;
710
711			/* Check for next request in a pending queue. */
712			svc_vc_process_pending(xprt);
713			if (cd->mreq == NULL || cd->resid != 0) {
714				SOCKBUF_LOCK(&so->so_rcv);
715				if (!soreadable(so))
716					xprt_inactive_self(xprt);
717				SOCKBUF_UNLOCK(&so->so_rcv);
718			}
719
720			sx_xunlock(&xprt->xp_lock);
721
722			if (! xdr_callmsg(&xdrs, msg)) {
723				XDR_DESTROY(&xdrs);
724				return (FALSE);
725			}
726
727			*addrp = NULL;
728			*mp = xdrmbuf_getall(&xdrs);
729			XDR_DESTROY(&xdrs);
730
731			return (TRUE);
732		}
733
734		/*
735		 * The socket upcall calls xprt_active() which will eventually
736		 * cause the server to call us here. We attempt to
737		 * read as much as possible from the socket and put
738		 * the result in cd->mpending. If the read fails,
739		 * we have drained both cd->mpending and the socket so
740		 * we can call xprt_inactive().
741		 */
742		uio.uio_resid = 1000000000;
743		uio.uio_td = curthread;
744		m = NULL;
745		rcvflag = MSG_DONTWAIT;
746		error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
747
748		if (error == EWOULDBLOCK) {
749			/*
750			 * We must re-test for readability after
751			 * taking the lock to protect us in the case
752			 * where a new packet arrives on the socket
753			 * after our call to soreceive fails with
754			 * EWOULDBLOCK.
755			 */
756			SOCKBUF_LOCK(&so->so_rcv);
757			if (!soreadable(so))
758				xprt_inactive_self(xprt);
759			SOCKBUF_UNLOCK(&so->so_rcv);
760			sx_xunlock(&xprt->xp_lock);
761			return (FALSE);
762		}
763
764		if (error) {
765			SOCKBUF_LOCK(&so->so_rcv);
766			if (xprt->xp_upcallset) {
767				xprt->xp_upcallset = 0;
768				soupcall_clear(so, SO_RCV);
769			}
770			SOCKBUF_UNLOCK(&so->so_rcv);
771			xprt_inactive_self(xprt);
772			cd->strm_stat = XPRT_DIED;
773			sx_xunlock(&xprt->xp_lock);
774			return (FALSE);
775		}
776
777		if (!m) {
778			/*
779			 * EOF - the other end has closed the socket.
780			 */
781			xprt_inactive_self(xprt);
782			cd->strm_stat = XPRT_DIED;
783			sx_xunlock(&xprt->xp_lock);
784			return (FALSE);
785		}
786
787		if (cd->mpending)
788			m_last(cd->mpending)->m_next = m;
789		else
790			cd->mpending = m;
791	}
792}
793
794static bool_t
795svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
796    struct sockaddr **addrp, struct mbuf **mp)
797{
798	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
799	struct ct_data *ct;
800	struct mbuf *m;
801	XDR xdrs;
802
803	sx_xlock(&xprt->xp_lock);
804	ct = (struct ct_data *)xprt->xp_p2;
805	if (ct == NULL) {
806		sx_xunlock(&xprt->xp_lock);
807		return (FALSE);
808	}
809	mtx_lock(&ct->ct_lock);
810	m = cd->mreq;
811	if (m == NULL) {
812		xprt_inactive_self(xprt);
813		mtx_unlock(&ct->ct_lock);
814		sx_xunlock(&xprt->xp_lock);
815		return (FALSE);
816	}
817	cd->mreq = m->m_nextpkt;
818	mtx_unlock(&ct->ct_lock);
819	sx_xunlock(&xprt->xp_lock);
820
821	xdrmbuf_create(&xdrs, m, XDR_DECODE);
822	if (! xdr_callmsg(&xdrs, msg)) {
823		XDR_DESTROY(&xdrs);
824		return (FALSE);
825	}
826	*addrp = NULL;
827	*mp = xdrmbuf_getall(&xdrs);
828	XDR_DESTROY(&xdrs);
829	return (TRUE);
830}
831
832static bool_t
833svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
834    struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
835{
836	XDR xdrs;
837	struct mbuf *mrep;
838	bool_t stat = TRUE;
839	int error, len;
840
841	/*
842	 * Leave space for record mark.
843	 */
844	mrep = m_gethdr(M_WAITOK, MT_DATA);
845	mrep->m_data += sizeof(uint32_t);
846
847	xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
848
849	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
850	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
851		if (!xdr_replymsg(&xdrs, msg))
852			stat = FALSE;
853		else
854			xdrmbuf_append(&xdrs, m);
855	} else {
856		stat = xdr_replymsg(&xdrs, msg);
857	}
858
859	if (stat) {
860		m_fixhdr(mrep);
861
862		/*
863		 * Prepend a record marker containing the reply length.
864		 */
865		M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
866		len = mrep->m_pkthdr.len;
867		*mtod(mrep, uint32_t *) =
868			htonl(0x80000000 | (len - sizeof(uint32_t)));
869		atomic_add_acq_32(&xprt->xp_snd_cnt, len);
870		error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
871		    0, curthread);
872		if (!error) {
873			atomic_add_rel_32(&xprt->xp_snt_cnt, len);
874			if (seq)
875				*seq = xprt->xp_snd_cnt;
876			stat = TRUE;
877		} else
878			atomic_subtract_32(&xprt->xp_snd_cnt, len);
879	} else {
880		m_freem(mrep);
881	}
882
883	XDR_DESTROY(&xdrs);
884
885	return (stat);
886}
887
888static bool_t
889svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
890    struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
891{
892	struct ct_data *ct;
893	XDR xdrs;
894	struct mbuf *mrep;
895	bool_t stat = TRUE;
896	int error;
897
898	/*
899	 * Leave space for record mark.
900	 */
901	mrep = m_gethdr(M_WAITOK, MT_DATA);
902	mrep->m_data += sizeof(uint32_t);
903
904	xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
905
906	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
907	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
908		if (!xdr_replymsg(&xdrs, msg))
909			stat = FALSE;
910		else
911			xdrmbuf_append(&xdrs, m);
912	} else {
913		stat = xdr_replymsg(&xdrs, msg);
914	}
915
916	if (stat) {
917		m_fixhdr(mrep);
918
919		/*
920		 * Prepend a record marker containing the reply length.
921		 */
922		M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
923		*mtod(mrep, uint32_t *) =
924			htonl(0x80000000 | (mrep->m_pkthdr.len
925				- sizeof(uint32_t)));
926		sx_xlock(&xprt->xp_lock);
927		ct = (struct ct_data *)xprt->xp_p2;
928		if (ct != NULL)
929			error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
930			    0, curthread);
931		else
932			error = EPIPE;
933		sx_xunlock(&xprt->xp_lock);
934		if (!error) {
935			stat = TRUE;
936		}
937	} else {
938		m_freem(mrep);
939	}
940
941	XDR_DESTROY(&xdrs);
942
943	return (stat);
944}
945
946static bool_t
947svc_vc_null()
948{
949
950	return (FALSE);
951}
952
953static int
954svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
955{
956	SVCXPRT *xprt = (SVCXPRT *) arg;
957
958	if (soreadable(xprt->xp_socket))
959		xprt_active(xprt);
960	return (SU_OK);
961}
962
963#if 0
964/*
965 * Get the effective UID of the sending process. Used by rpcbind, keyserv
966 * and rpc.yppasswdd on AF_LOCAL.
967 */
968int
969__rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
970	int sock, ret;
971	gid_t egid;
972	uid_t euid;
973	struct sockaddr *sa;
974
975	sock = transp->xp_fd;
976	sa = (struct sockaddr *)transp->xp_rtaddr;
977	if (sa->sa_family == AF_LOCAL) {
978		ret = getpeereid(sock, &euid, &egid);
979		if (ret == 0)
980			*uid = euid;
981		return (ret);
982	} else
983		return (-1);
984}
985#endif
986