clnt_vc.c revision 177633
1/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part.  Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California  94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36#endif
37#include <sys/cdefs.h>
38__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 177633 2008-03-26 15:23:12Z dfr $");
39
40/*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer.  The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message.  Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59#include <sys/param.h>
60#include <sys/systm.h>
61#include <sys/lock.h>
62#include <sys/malloc.h>
63#include <sys/mbuf.h>
64#include <sys/mutex.h>
65#include <sys/pcpu.h>
66#include <sys/proc.h>
67#include <sys/socket.h>
68#include <sys/socketvar.h>
69#include <sys/syslog.h>
70#include <sys/time.h>
71#include <sys/uio.h>
72
73#include <rpc/rpc.h>
74#include "rpc_com.h"
75
76#define MCALL_MSG_SIZE 24
77
78struct cmessage {
79        struct cmsghdr cmsg;
80        struct cmsgcred cmcred;
81};
82
83static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
84    xdrproc_t, void *, struct timeval);
85static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87static void clnt_vc_abort(CLIENT *);
88static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89static void clnt_vc_destroy(CLIENT *);
90static bool_t time_not_ok(struct timeval *);
91static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
92
93static struct clnt_ops clnt_vc_ops = {
94	.cl_call =	clnt_vc_call,
95	.cl_abort =	clnt_vc_abort,
96	.cl_geterr =	clnt_vc_geterr,
97	.cl_freeres =	clnt_vc_freeres,
98	.cl_destroy =	clnt_vc_destroy,
99	.cl_control =	clnt_vc_control
100};
101
102/*
103 * A pending RPC request which awaits a reply.
104 */
105struct ct_request {
106	TAILQ_ENTRY(ct_request) cr_link;
107	uint32_t		cr_xid;		/* XID of request */
108	struct mbuf		*cr_mrep;	/* reply received by upcall */
109	int			cr_error;	/* any error from upcall */
110};
111
112TAILQ_HEAD(ct_request_list, ct_request);
113
114struct ct_data {
115	struct mtx	ct_lock;
116	struct socket	*ct_socket;	/* connection socket */
117	bool_t		ct_closeit;	/* close it on destroy */
118	struct timeval	ct_wait;	/* wait interval in milliseconds */
119	struct sockaddr_storage	ct_addr; /* remote addr */
120	struct rpc_err	ct_error;
121	uint32_t	ct_xid;
122	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
123	size_t		ct_mpos;	/* pos after marshal */
124	const char	*ct_waitchan;
125	int		ct_waitflag;
126	struct mbuf	*ct_record;	/* current reply record */
127	size_t		ct_record_resid; /* how much left of reply to read */
128	bool_t		ct_record_eor;	 /* true if reading last fragment */
129	struct ct_request_list ct_pending;
130};
131
132static const char clnt_vc_errstr[] = "%s : %s";
133static const char clnt_vc_str[] = "clnt_vc_create";
134static const char clnt_read_vc_str[] = "read_vc";
135static const char __no_mem_str[] = "out of memory";
136
137/*
138 * Create a client handle for a connection.
139 * Default options are set, which the user can change using clnt_control()'s.
140 * The rpc/vc package does buffering similar to stdio, so the client
141 * must pick send and receive buffer sizes, 0 => use the default.
142 * NB: fd is copied into a private area.
143 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
144 * set this something more useful.
145 *
146 * fd should be an open socket
147 */
148CLIENT *
149clnt_vc_create(
150	struct socket *so,		/* open file descriptor */
151	struct sockaddr *raddr,		/* servers address */
152	const rpcprog_t prog,		/* program number */
153	const rpcvers_t vers,		/* version number */
154	size_t sendsz,			/* buffer recv size */
155	size_t recvsz)			/* buffer send size */
156{
157	CLIENT *cl;			/* client handle */
158	struct ct_data *ct = NULL;	/* client handle */
159	struct timeval now;
160	struct rpc_msg call_msg;
161	static uint32_t disrupt;
162	struct __rpc_sockinfo si;
163	XDR xdrs;
164	int error;
165
166	if (disrupt == 0)
167		disrupt = (uint32_t)(long)raddr;
168
169	cl = (CLIENT *)mem_alloc(sizeof (*cl));
170	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
171
172	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
173
174	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
175		error = soconnect(so, raddr, curthread);
176		if (error) {
177			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
178			rpc_createerr.cf_error.re_errno = error;
179			goto err;
180		}
181	}
182
183	if (!__rpc_socket2sockinfo(so, &si))
184		goto err;
185
186	ct->ct_closeit = FALSE;
187
188	/*
189	 * Set up private data struct
190	 */
191	ct->ct_socket = so;
192	ct->ct_wait.tv_sec = -1;
193	ct->ct_wait.tv_usec = -1;
194	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
195
196	/*
197	 * Initialize call message
198	 */
199	getmicrotime(&now);
200	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
201	call_msg.rm_xid = ct->ct_xid;
202	call_msg.rm_direction = CALL;
203	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
204	call_msg.rm_call.cb_prog = (uint32_t)prog;
205	call_msg.rm_call.cb_vers = (uint32_t)vers;
206
207	/*
208	 * pre-serialize the static part of the call msg and stash it away
209	 */
210	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
211	    XDR_ENCODE);
212	if (! xdr_callhdr(&xdrs, &call_msg)) {
213		if (ct->ct_closeit) {
214			soclose(ct->ct_socket);
215		}
216		goto err;
217	}
218	ct->ct_mpos = XDR_GETPOS(&xdrs);
219	XDR_DESTROY(&xdrs);
220	ct->ct_waitchan = "rpcrecv";
221	ct->ct_waitflag = 0;
222
223	/*
224	 * Create a client handle which uses xdrrec for serialization
225	 * and authnone for authentication.
226	 */
227	cl->cl_ops = &clnt_vc_ops;
228	cl->cl_private = ct;
229	cl->cl_auth = authnone_create();
230	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
231	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
232
233	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
234	ct->ct_socket->so_upcallarg = ct;
235	ct->ct_socket->so_upcall = clnt_vc_soupcall;
236	ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
237	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
238
239	ct->ct_record = NULL;
240	ct->ct_record_resid = 0;
241	TAILQ_INIT(&ct->ct_pending);
242	return (cl);
243
244err:
245	if (cl) {
246		if (ct) {
247			mem_free(ct, sizeof (struct ct_data));
248		}
249		if (cl)
250			mem_free(cl, sizeof (CLIENT));
251	}
252	return ((CLIENT *)NULL);
253}
254
255static enum clnt_stat
256clnt_vc_call(
257	CLIENT *cl,
258	rpcproc_t proc,
259	xdrproc_t xdr_args,
260	void *args_ptr,
261	xdrproc_t xdr_results,
262	void *results_ptr,
263	struct timeval utimeout)
264{
265	struct ct_data *ct = (struct ct_data *) cl->cl_private;
266	XDR xdrs;
267	struct rpc_msg reply_msg;
268	bool_t ok;
269	int nrefreshes = 2;		/* number of times to refresh cred */
270	struct timeval timeout;
271	uint32_t xid;
272	struct mbuf *mreq = NULL;
273	struct ct_request cr;
274	int error;
275
276	mtx_lock(&ct->ct_lock);
277
278	cr.cr_mrep = NULL;
279	cr.cr_error = 0;
280
281	if (ct->ct_wait.tv_usec == -1) {
282		timeout = utimeout;	/* use supplied timeout */
283	} else {
284		timeout = ct->ct_wait;	/* use default timeout */
285	}
286
287call_again:
288	mtx_assert(&ct->ct_lock, MA_OWNED);
289
290	ct->ct_xid++;
291	xid = ct->ct_xid;
292
293	mtx_unlock(&ct->ct_lock);
294
295	/*
296	 * Leave space to pre-pend the record mark.
297	 */
298	MGETHDR(mreq, M_WAIT, MT_DATA);
299	MCLGET(mreq, M_WAIT);
300	mreq->m_len = 0;
301	mreq->m_data += sizeof(uint32_t);
302	m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
303
304	/*
305	 * The XID is the first thing in the request.
306	 */
307	*mtod(mreq, uint32_t *) = htonl(xid);
308
309	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
310
311	ct->ct_error.re_status = RPC_SUCCESS;
312
313	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
314	    (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
315	    (! (*xdr_args)(&xdrs, args_ptr))) {
316		if (ct->ct_error.re_status == RPC_SUCCESS)
317			ct->ct_error.re_status = RPC_CANTENCODEARGS;
318		m_freem(mreq);
319		return (ct->ct_error.re_status);
320	}
321	m_fixhdr(mreq);
322
323	/*
324	 * Prepend a record marker containing the packet length.
325	 */
326	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
327	*mtod(mreq, uint32_t *) =
328		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
329
330	cr.cr_xid = xid;
331	mtx_lock(&ct->ct_lock);
332	TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
333	mtx_unlock(&ct->ct_lock);
334
335	/*
336	 * sosend consumes mreq.
337	 */
338	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
339	mreq = NULL;
340
341	reply_msg.acpted_rply.ar_verf = _null_auth;
342	reply_msg.acpted_rply.ar_results.where = results_ptr;
343	reply_msg.acpted_rply.ar_results.proc = xdr_results;
344
345	mtx_lock(&ct->ct_lock);
346
347	if (error) {
348		TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
349
350		ct->ct_error.re_errno = error;
351		ct->ct_error.re_status = RPC_CANTSEND;
352		goto out;
353	}
354
355	/*
356	 * Check to see if we got an upcall while waiting for the
357	 * lock. In both these cases, the request has been removed
358	 * from ct->ct_pending.
359	 */
360	if (cr.cr_error) {
361		ct->ct_error.re_errno = cr.cr_error;
362		ct->ct_error.re_status = RPC_CANTRECV;
363		goto out;
364	}
365	if (cr.cr_mrep) {
366		goto got_reply;
367	}
368
369	/*
370	 * Hack to provide rpc-based message passing
371	 */
372	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
373		if (cr.cr_xid)
374			TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
375		ct->ct_error.re_status = RPC_TIMEDOUT;
376		goto out;
377	}
378
379	error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
380	    tvtohz(&timeout));
381
382	if (error) {
383		/*
384		 * The sleep returned an error so our request is still
385		 * on the list. Turn the error code into an
386		 * appropriate client status.
387		 */
388		if (cr.cr_xid)
389			TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
390		ct->ct_error.re_errno = error;
391		switch (error) {
392		case EINTR:
393			ct->ct_error.re_status = RPC_INTR;
394			break;
395		case EWOULDBLOCK:
396			ct->ct_error.re_status = RPC_TIMEDOUT;
397			break;
398		default:
399			ct->ct_error.re_status = RPC_CANTRECV;
400		}
401		goto out;
402	} else {
403		/*
404		 * We were woken up by the upcall.  If the
405		 * upcall had a receive error, report that,
406		 * otherwise we have a reply.
407		 */
408		if (cr.cr_error) {
409			ct->ct_error.re_errno = cr.cr_error;
410			ct->ct_error.re_status = RPC_CANTRECV;
411			goto out;
412		}
413	}
414
415got_reply:
416	/*
417	 * Now decode and validate the response. We need to drop the
418	 * lock since xdr_replymsg may end up sleeping in malloc.
419	 */
420	mtx_unlock(&ct->ct_lock);
421
422	xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
423	ok = xdr_replymsg(&xdrs, &reply_msg);
424	XDR_DESTROY(&xdrs);
425	cr.cr_mrep = NULL;
426
427	mtx_lock(&ct->ct_lock);
428
429	if (ok) {
430		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
431			(reply_msg.acpted_rply.ar_stat == SUCCESS))
432			ct->ct_error.re_status = RPC_SUCCESS;
433		else
434			_seterr_reply(&reply_msg, &(ct->ct_error));
435
436		if (ct->ct_error.re_status == RPC_SUCCESS) {
437			if (! AUTH_VALIDATE(cl->cl_auth,
438					    &reply_msg.acpted_rply.ar_verf)) {
439				ct->ct_error.re_status = RPC_AUTHERROR;
440				ct->ct_error.re_why = AUTH_INVALIDRESP;
441			}
442			if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
443				xdrs.x_op = XDR_FREE;
444				(void) xdr_opaque_auth(&xdrs,
445					&(reply_msg.acpted_rply.ar_verf));
446			}
447		}		/* end successful completion */
448		/*
449		 * If unsuccesful AND error is an authentication error
450		 * then refresh credentials and try again, else break
451		 */
452		else if (ct->ct_error.re_status == RPC_AUTHERROR)
453			/* maybe our credentials need to be refreshed ... */
454			if (nrefreshes > 0 &&
455			    AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
456				nrefreshes--;
457				goto call_again;
458			}
459		/* end of unsuccessful completion */
460	}	/* end of valid reply message */
461	else {
462		ct->ct_error.re_status = RPC_CANTDECODERES;
463	}
464out:
465	mtx_assert(&ct->ct_lock, MA_OWNED);
466
467	if (mreq)
468		m_freem(mreq);
469	if (cr.cr_mrep)
470		m_freem(cr.cr_mrep);
471
472	mtx_unlock(&ct->ct_lock);
473	return (ct->ct_error.re_status);
474}
475
476static void
477clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
478{
479	struct ct_data *ct = (struct ct_data *) cl->cl_private;
480
481	*errp = ct->ct_error;
482}
483
484static bool_t
485clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
486{
487	XDR xdrs;
488	bool_t dummy;
489
490	xdrs.x_op = XDR_FREE;
491	dummy = (*xdr_res)(&xdrs, res_ptr);
492
493	return (dummy);
494}
495
496/*ARGSUSED*/
497static void
498clnt_vc_abort(CLIENT *cl)
499{
500}
501
502static bool_t
503clnt_vc_control(CLIENT *cl, u_int request, void *info)
504{
505	struct ct_data *ct = (struct ct_data *)cl->cl_private;
506	void *infop = info;
507
508	mtx_lock(&ct->ct_lock);
509
510	switch (request) {
511	case CLSET_FD_CLOSE:
512		ct->ct_closeit = TRUE;
513		mtx_unlock(&ct->ct_lock);
514		return (TRUE);
515	case CLSET_FD_NCLOSE:
516		ct->ct_closeit = FALSE;
517		mtx_unlock(&ct->ct_lock);
518		return (TRUE);
519	default:
520		break;
521	}
522
523	/* for other requests which use info */
524	if (info == NULL) {
525		mtx_unlock(&ct->ct_lock);
526		return (FALSE);
527	}
528	switch (request) {
529	case CLSET_TIMEOUT:
530		if (time_not_ok((struct timeval *)info)) {
531			mtx_unlock(&ct->ct_lock);
532			return (FALSE);
533		}
534		ct->ct_wait = *(struct timeval *)infop;
535		break;
536	case CLGET_TIMEOUT:
537		*(struct timeval *)infop = ct->ct_wait;
538		break;
539	case CLGET_SERVER_ADDR:
540		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
541		break;
542	case CLGET_SVC_ADDR:
543		/*
544		 * Slightly different semantics to userland - we use
545		 * sockaddr instead of netbuf.
546		 */
547		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
548		break;
549	case CLSET_SVC_ADDR:		/* set to new address */
550		mtx_unlock(&ct->ct_lock);
551		return (FALSE);
552	case CLGET_XID:
553		*(uint32_t *)info = ct->ct_xid;
554		break;
555	case CLSET_XID:
556		/* This will set the xid of the NEXT call */
557		/* decrement by 1 as clnt_vc_call() increments once */
558		ct->ct_xid = *(uint32_t *)info - 1;
559		break;
560	case CLGET_VERS:
561		/*
562		 * This RELIES on the information that, in the call body,
563		 * the version number field is the fifth field from the
564		 * begining of the RPC header. MUST be changed if the
565		 * call_struct is changed
566		 */
567		*(uint32_t *)info =
568		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
569		    4 * BYTES_PER_XDR_UNIT));
570		break;
571
572	case CLSET_VERS:
573		*(uint32_t *)(void *)(ct->ct_mcallc +
574		    4 * BYTES_PER_XDR_UNIT) =
575		    htonl(*(uint32_t *)info);
576		break;
577
578	case CLGET_PROG:
579		/*
580		 * This RELIES on the information that, in the call body,
581		 * the program number field is the fourth field from the
582		 * begining of the RPC header. MUST be changed if the
583		 * call_struct is changed
584		 */
585		*(uint32_t *)info =
586		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
587		    3 * BYTES_PER_XDR_UNIT));
588		break;
589
590	case CLSET_PROG:
591		*(uint32_t *)(void *)(ct->ct_mcallc +
592		    3 * BYTES_PER_XDR_UNIT) =
593		    htonl(*(uint32_t *)info);
594		break;
595
596	case CLSET_WAITCHAN:
597		ct->ct_waitchan = *(const char **)info;
598		break;
599
600	case CLGET_WAITCHAN:
601		*(const char **) info = ct->ct_waitchan;
602		break;
603
604	case CLSET_INTERRUPTIBLE:
605		if (*(int *) info)
606			ct->ct_waitflag = PCATCH;
607		else
608			ct->ct_waitflag = 0;
609		break;
610
611	case CLGET_INTERRUPTIBLE:
612		if (ct->ct_waitflag)
613			*(int *) info = TRUE;
614		else
615			*(int *) info = FALSE;
616		break;
617
618	default:
619		mtx_unlock(&ct->ct_lock);
620		return (FALSE);
621	}
622
623	mtx_unlock(&ct->ct_lock);
624	return (TRUE);
625}
626
627static void
628clnt_vc_destroy(CLIENT *cl)
629{
630	struct ct_data *ct = (struct ct_data *) cl->cl_private;
631	struct socket *so = NULL;
632
633	mtx_lock(&ct->ct_lock);
634
635	if (ct->ct_socket) {
636		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
637		ct->ct_socket->so_upcallarg = NULL;
638		ct->ct_socket->so_upcall = NULL;
639		ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
640		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
641
642		KASSERT(!TAILQ_FIRST(&ct->ct_pending),
643		    ("Destroying RPC client with pending RPC requests"));
644
645		if (ct->ct_closeit) {
646			so = ct->ct_socket;
647		}
648	}
649
650	mtx_unlock(&ct->ct_lock);
651
652	mtx_destroy(&ct->ct_lock);
653	if (so) {
654		soshutdown(so, SHUT_WR);
655		soclose(so);
656	}
657	mem_free(ct, sizeof(struct ct_data));
658	mem_free(cl, sizeof(CLIENT));
659}
660
661/*
662 * Make sure that the time is not garbage.   -1 value is disallowed.
663 * Note this is different from time_not_ok in clnt_dg.c
664 */
665static bool_t
666time_not_ok(struct timeval *t)
667{
668	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
669		t->tv_usec <= -1 || t->tv_usec > 1000000);
670}
671
672void
673clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
674{
675	struct ct_data *ct = (struct ct_data *) arg;
676	struct uio uio;
677	struct mbuf *m;
678	struct ct_request *cr;
679	int error, rcvflag, foundreq;
680	uint32_t xid, header;
681
682	uio.uio_td = curthread;
683	do {
684		/*
685		 * If ct_record_resid is zero, we are waiting for a
686		 * record mark.
687		 */
688		if (ct->ct_record_resid == 0) {
689			bool_t do_read;
690
691			/*
692			 * Make sure there is either a whole record
693			 * mark in the buffer or there is some other
694			 * error condition
695			 */
696			do_read = FALSE;
697			SOCKBUF_LOCK(&so->so_rcv);
698			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
699			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
700			    || so->so_error)
701				do_read = TRUE;
702			SOCKBUF_UNLOCK(&so->so_rcv);
703
704			if (!do_read)
705				return;
706
707			uio.uio_resid = sizeof(uint32_t);
708			m = NULL;
709			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
710			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
711
712			if (error == EWOULDBLOCK)
713				break;
714
715			/*
716			 * If there was an error, wake up all pending
717			 * requests.
718			 */
719			if (error || uio.uio_resid > 0) {
720			wakeup_all:
721				mtx_lock(&ct->ct_lock);
722				if (!error) {
723					/*
724					 * We must have got EOF trying
725					 * to read from the stream.
726					 */
727					error = ECONNRESET;
728				}
729				ct->ct_error.re_status = RPC_CANTRECV;
730				ct->ct_error.re_errno = error;
731				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
732					cr->cr_error = error;
733					wakeup(cr);
734				}
735				TAILQ_INIT(&ct->ct_pending);
736				mtx_unlock(&ct->ct_lock);
737				break;
738			}
739			memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
740			header = ntohl(header);
741			ct->ct_record = NULL;
742			ct->ct_record_resid = header & 0x7fffffff;
743			ct->ct_record_eor = ((header & 0x80000000) != 0);
744			m_freem(m);
745		} else {
746			/*
747			 * We have the record mark. Read as much as
748			 * the socket has buffered up to the end of
749			 * this record.
750			 */
751			uio.uio_resid = ct->ct_record_resid;
752			m = NULL;
753			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
754			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
755
756			if (error == EWOULDBLOCK)
757				break;
758
759			if (error || uio.uio_resid == ct->ct_record_resid)
760				goto wakeup_all;
761
762			/*
763			 * If we have part of the record already,
764			 * chain this bit onto the end.
765			 */
766			if (ct->ct_record)
767				m_last(ct->ct_record)->m_next = m;
768			else
769				ct->ct_record = m;
770
771			ct->ct_record_resid = uio.uio_resid;
772
773			/*
774			 * If we have the entire record, see if we can
775			 * match it to a request.
776			 */
777			if (ct->ct_record_resid == 0
778			    && ct->ct_record_eor) {
779				/*
780				 * The XID is in the first uint32_t of
781				 * the reply.
782				 */
783				ct->ct_record =
784					m_pullup(ct->ct_record, sizeof(xid));
785				if (!ct->ct_record)
786					break;
787				memcpy(&xid,
788				    mtod(ct->ct_record, uint32_t *),
789				    sizeof(uint32_t));
790				xid = ntohl(xid);
791
792				mtx_lock(&ct->ct_lock);
793				foundreq = 0;
794				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
795					if (cr->cr_xid == xid) {
796						/*
797						 * This one
798						 * matches. We snip it
799						 * out of the pending
800						 * list and leave the
801						 * reply mbuf in
802						 * cr->cr_mrep. Set
803						 * the XID to zero so
804						 * that clnt_vc_call
805						 * can know not to
806						 * repeat the
807						 * TAILQ_REMOVE.
808						 */
809						TAILQ_REMOVE(&ct->ct_pending,
810						    cr, cr_link);
811						cr->cr_xid = 0;
812						cr->cr_mrep = ct->ct_record;
813						cr->cr_error = 0;
814						foundreq = 1;
815						wakeup(cr);
816						break;
817					}
818				}
819				mtx_unlock(&ct->ct_lock);
820
821				if (!foundreq)
822					m_freem(ct->ct_record);
823				ct->ct_record = NULL;
824			}
825		}
826	} while (m);
827}
828