clnt_vc.c revision 239963
1/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part.  Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California  94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36#endif
37#include <sys/cdefs.h>
38__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 239963 2012-09-01 02:56:17Z pfg $");
39
40/*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer.  The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message.  Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59#include <sys/param.h>
60#include <sys/systm.h>
61#include <sys/lock.h>
62#include <sys/malloc.h>
63#include <sys/mbuf.h>
64#include <sys/mutex.h>
65#include <sys/pcpu.h>
66#include <sys/proc.h>
67#include <sys/protosw.h>
68#include <sys/socket.h>
69#include <sys/socketvar.h>
70#include <sys/syslog.h>
71#include <sys/time.h>
72#include <sys/uio.h>
73
74#include <net/vnet.h>
75
76#include <netinet/tcp.h>
77
78#include <rpc/rpc.h>
79#include <rpc/rpc_com.h>
80
81#define MCALL_MSG_SIZE 24
82
83struct cmessage {
84        struct cmsghdr cmsg;
85        struct cmsgcred cmcred;
86};
87
88static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
89    rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
90static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
91static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
92static void clnt_vc_abort(CLIENT *);
93static bool_t clnt_vc_control(CLIENT *, u_int, void *);
94static void clnt_vc_close(CLIENT *);
95static void clnt_vc_destroy(CLIENT *);
96static bool_t time_not_ok(struct timeval *);
97static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99static struct clnt_ops clnt_vc_ops = {
100	.cl_call =	clnt_vc_call,
101	.cl_abort =	clnt_vc_abort,
102	.cl_geterr =	clnt_vc_geterr,
103	.cl_freeres =	clnt_vc_freeres,
104	.cl_close =	clnt_vc_close,
105	.cl_destroy =	clnt_vc_destroy,
106	.cl_control =	clnt_vc_control
107};
108
109/*
110 * A pending RPC request which awaits a reply. Requests which have
111 * received their reply will have cr_xid set to zero and cr_mrep to
112 * the mbuf chain of the reply.
113 */
114struct ct_request {
115	TAILQ_ENTRY(ct_request) cr_link;
116	uint32_t		cr_xid;		/* XID of request */
117	struct mbuf		*cr_mrep;	/* reply received by upcall */
118	int			cr_error;	/* any error from upcall */
119	char			cr_verf[MAX_AUTH_BYTES]; /* reply verf */
120};
121
122TAILQ_HEAD(ct_request_list, ct_request);
123
124struct ct_data {
125	struct mtx	ct_lock;
126	int		ct_threads;	/* number of threads in clnt_vc_call */
127	bool_t		ct_closing;	/* TRUE if we are closing */
128	bool_t		ct_closed;	/* TRUE if we are closed */
129	struct socket	*ct_socket;	/* connection socket */
130	bool_t		ct_closeit;	/* close it on destroy */
131	struct timeval	ct_wait;	/* wait interval in milliseconds */
132	struct sockaddr_storage	ct_addr; /* remote addr */
133	struct rpc_err	ct_error;
134	uint32_t	ct_xid;
135	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
136	size_t		ct_mpos;	/* pos after marshal */
137	const char	*ct_waitchan;
138	int		ct_waitflag;
139	struct mbuf	*ct_record;	/* current reply record */
140	size_t		ct_record_resid; /* how much left of reply to read */
141	bool_t		ct_record_eor;	 /* true if reading last fragment */
142	struct ct_request_list ct_pending;
143	int		ct_upcallrefs;	/* Ref cnt of upcalls in prog. */
144};
145
146static void clnt_vc_upcallsdone(struct ct_data *);
147
148static const char clnt_vc_errstr[] = "%s : %s";
149static const char clnt_vc_str[] = "clnt_vc_create";
150static const char clnt_read_vc_str[] = "read_vc";
151static const char __no_mem_str[] = "out of memory";
152
153/*
154 * Create a client handle for a connection.
155 * Default options are set, which the user can change using clnt_control()'s.
156 * The rpc/vc package does buffering similar to stdio, so the client
157 * must pick send and receive buffer sizes, 0 => use the default.
158 * NB: fd is copied into a private area.
159 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
160 * set this something more useful.
161 *
162 * fd should be an open socket
163 */
164CLIENT *
165clnt_vc_create(
166	struct socket *so,		/* open file descriptor */
167	struct sockaddr *raddr,		/* servers address */
168	const rpcprog_t prog,		/* program number */
169	const rpcvers_t vers,		/* version number */
170	size_t sendsz,			/* buffer recv size */
171	size_t recvsz,			/* buffer send size */
172	int intrflag)			/* interruptible */
173{
174	CLIENT *cl;			/* client handle */
175	struct ct_data *ct = NULL;	/* client handle */
176	struct timeval now;
177	struct rpc_msg call_msg;
178	static uint32_t disrupt;
179	struct __rpc_sockinfo si;
180	XDR xdrs;
181	int error, interrupted, one = 1, sleep_flag;
182	struct sockopt sopt;
183
184	if (disrupt == 0)
185		disrupt = (uint32_t)(long)raddr;
186
187	cl = (CLIENT *)mem_alloc(sizeof (*cl));
188	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
189
190	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
191	ct->ct_threads = 0;
192	ct->ct_closing = FALSE;
193	ct->ct_closed = FALSE;
194	ct->ct_upcallrefs = 0;
195
196	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
197		error = soconnect(so, raddr, curthread);
198		SOCK_LOCK(so);
199		interrupted = 0;
200		sleep_flag = PSOCK;
201		if (intrflag != 0)
202			sleep_flag |= (PCATCH | PBDRY);
203		while ((so->so_state & SS_ISCONNECTING)
204		    && so->so_error == 0) {
205			error = msleep(&so->so_timeo, SOCK_MTX(so),
206			    sleep_flag, "connec", 0);
207			if (error) {
208				if (error == EINTR || error == ERESTART)
209					interrupted = 1;
210				break;
211			}
212		}
213		if (error == 0) {
214			error = so->so_error;
215			so->so_error = 0;
216		}
217		SOCK_UNLOCK(so);
218		if (error) {
219			if (!interrupted)
220				so->so_state &= ~SS_ISCONNECTING;
221			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
222			rpc_createerr.cf_error.re_errno = error;
223			goto err;
224		}
225	}
226
227	if (!__rpc_socket2sockinfo(so, &si)) {
228		goto err;
229	}
230
231	if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
232		bzero(&sopt, sizeof(sopt));
233		sopt.sopt_dir = SOPT_SET;
234		sopt.sopt_level = SOL_SOCKET;
235		sopt.sopt_name = SO_KEEPALIVE;
236		sopt.sopt_val = &one;
237		sopt.sopt_valsize = sizeof(one);
238		sosetopt(so, &sopt);
239	}
240
241	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
242		bzero(&sopt, sizeof(sopt));
243		sopt.sopt_dir = SOPT_SET;
244		sopt.sopt_level = IPPROTO_TCP;
245		sopt.sopt_name = TCP_NODELAY;
246		sopt.sopt_val = &one;
247		sopt.sopt_valsize = sizeof(one);
248		sosetopt(so, &sopt);
249	}
250
251	ct->ct_closeit = FALSE;
252
253	/*
254	 * Set up private data struct
255	 */
256	ct->ct_socket = so;
257	ct->ct_wait.tv_sec = -1;
258	ct->ct_wait.tv_usec = -1;
259	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
260
261	/*
262	 * Initialize call message
263	 */
264	getmicrotime(&now);
265	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
266	call_msg.rm_xid = ct->ct_xid;
267	call_msg.rm_direction = CALL;
268	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
269	call_msg.rm_call.cb_prog = (uint32_t)prog;
270	call_msg.rm_call.cb_vers = (uint32_t)vers;
271
272	/*
273	 * pre-serialize the static part of the call msg and stash it away
274	 */
275	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
276	    XDR_ENCODE);
277	if (! xdr_callhdr(&xdrs, &call_msg)) {
278		if (ct->ct_closeit) {
279			soclose(ct->ct_socket);
280		}
281		goto err;
282	}
283	ct->ct_mpos = XDR_GETPOS(&xdrs);
284	XDR_DESTROY(&xdrs);
285	ct->ct_waitchan = "rpcrecv";
286	ct->ct_waitflag = 0;
287
288	/*
289	 * Create a client handle which uses xdrrec for serialization
290	 * and authnone for authentication.
291	 */
292	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
293	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
294	error = soreserve(ct->ct_socket, sendsz, recvsz);
295	if (error != 0) {
296		if (ct->ct_closeit) {
297			soclose(ct->ct_socket);
298		}
299		goto err;
300	}
301	cl->cl_refs = 1;
302	cl->cl_ops = &clnt_vc_ops;
303	cl->cl_private = ct;
304	cl->cl_auth = authnone_create();
305
306	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
307	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
308	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
309
310	ct->ct_record = NULL;
311	ct->ct_record_resid = 0;
312	TAILQ_INIT(&ct->ct_pending);
313	return (cl);
314
315err:
316	if (cl) {
317		if (ct) {
318			mtx_destroy(&ct->ct_lock);
319			mem_free(ct, sizeof (struct ct_data));
320		}
321		if (cl)
322			mem_free(cl, sizeof (CLIENT));
323	}
324	return ((CLIENT *)NULL);
325}
326
327static enum clnt_stat
328clnt_vc_call(
329	CLIENT		*cl,		/* client handle */
330	struct rpc_callextra *ext,	/* call metadata */
331	rpcproc_t	proc,		/* procedure number */
332	struct mbuf	*args,		/* pointer to args */
333	struct mbuf	**resultsp,	/* pointer to results */
334	struct timeval	utimeout)
335{
336	struct ct_data *ct = (struct ct_data *) cl->cl_private;
337	AUTH *auth;
338	struct rpc_err *errp;
339	enum clnt_stat stat;
340	XDR xdrs;
341	struct rpc_msg reply_msg;
342	bool_t ok;
343	int nrefreshes = 2;		/* number of times to refresh cred */
344	struct timeval timeout;
345	uint32_t xid;
346	struct mbuf *mreq = NULL, *results;
347	struct ct_request *cr;
348	int error;
349
350	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
351
352	mtx_lock(&ct->ct_lock);
353
354	if (ct->ct_closing || ct->ct_closed) {
355		mtx_unlock(&ct->ct_lock);
356		free(cr, M_RPC);
357		return (RPC_CANTSEND);
358	}
359	ct->ct_threads++;
360
361	if (ext) {
362		auth = ext->rc_auth;
363		errp = &ext->rc_err;
364	} else {
365		auth = cl->cl_auth;
366		errp = &ct->ct_error;
367	}
368
369	cr->cr_mrep = NULL;
370	cr->cr_error = 0;
371
372	if (ct->ct_wait.tv_usec == -1) {
373		timeout = utimeout;	/* use supplied timeout */
374	} else {
375		timeout = ct->ct_wait;	/* use default timeout */
376	}
377
378call_again:
379	mtx_assert(&ct->ct_lock, MA_OWNED);
380
381	ct->ct_xid++;
382	xid = ct->ct_xid;
383
384	mtx_unlock(&ct->ct_lock);
385
386	/*
387	 * Leave space to pre-pend the record mark.
388	 */
389	MGETHDR(mreq, M_WAIT, MT_DATA);
390	mreq->m_data += sizeof(uint32_t);
391	KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
392	    ("RPC header too big"));
393	bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
394	mreq->m_len = ct->ct_mpos;
395
396	/*
397	 * The XID is the first thing in the request.
398	 */
399	*mtod(mreq, uint32_t *) = htonl(xid);
400
401	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
402
403	errp->re_status = stat = RPC_SUCCESS;
404
405	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
406	    (! AUTH_MARSHALL(auth, xid, &xdrs,
407		m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
408		errp->re_status = stat = RPC_CANTENCODEARGS;
409		mtx_lock(&ct->ct_lock);
410		goto out;
411	}
412	mreq->m_pkthdr.len = m_length(mreq, NULL);
413
414	/*
415	 * Prepend a record marker containing the packet length.
416	 */
417	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
418	*mtod(mreq, uint32_t *) =
419		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
420
421	cr->cr_xid = xid;
422	mtx_lock(&ct->ct_lock);
423	/*
424	 * Check to see if the other end has already started to close down
425	 * the connection. The upcall will have set ct_error.re_status
426	 * to RPC_CANTRECV if this is the case.
427	 * If the other end starts to close down the connection after this
428	 * point, it will be detected later when cr_error is checked,
429	 * since the request is in the ct_pending queue.
430	 */
431	if (ct->ct_error.re_status == RPC_CANTRECV) {
432		if (errp != &ct->ct_error) {
433			errp->re_errno = ct->ct_error.re_errno;
434			errp->re_status = RPC_CANTRECV;
435		}
436		stat = RPC_CANTRECV;
437		goto out;
438	}
439	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
440	mtx_unlock(&ct->ct_lock);
441
442	/*
443	 * sosend consumes mreq.
444	 */
445	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
446	mreq = NULL;
447	if (error == EMSGSIZE) {
448		SOCKBUF_LOCK(&ct->ct_socket->so_snd);
449		sbwait(&ct->ct_socket->so_snd);
450		SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
451		AUTH_VALIDATE(auth, xid, NULL, NULL);
452		mtx_lock(&ct->ct_lock);
453		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
454		goto call_again;
455	}
456
457	reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
458	reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
459	reply_msg.acpted_rply.ar_verf.oa_length = 0;
460	reply_msg.acpted_rply.ar_results.where = NULL;
461	reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
462
463	mtx_lock(&ct->ct_lock);
464	if (error) {
465		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
466		errp->re_errno = error;
467		errp->re_status = stat = RPC_CANTSEND;
468		goto out;
469	}
470
471	/*
472	 * Check to see if we got an upcall while waiting for the
473	 * lock. In both these cases, the request has been removed
474	 * from ct->ct_pending.
475	 */
476	if (cr->cr_error) {
477		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
478		errp->re_errno = cr->cr_error;
479		errp->re_status = stat = RPC_CANTRECV;
480		goto out;
481	}
482	if (cr->cr_mrep) {
483		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
484		goto got_reply;
485	}
486
487	/*
488	 * Hack to provide rpc-based message passing
489	 */
490	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
491		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
492		errp->re_status = stat = RPC_TIMEDOUT;
493		goto out;
494	}
495
496	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
497	    tvtohz(&timeout));
498
499	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
500
501	if (error) {
502		/*
503		 * The sleep returned an error so our request is still
504		 * on the list. Turn the error code into an
505		 * appropriate client status.
506		 */
507		errp->re_errno = error;
508		switch (error) {
509		case EINTR:
510		case ERESTART:
511			stat = RPC_INTR;
512			break;
513		case EWOULDBLOCK:
514			stat = RPC_TIMEDOUT;
515			break;
516		default:
517			stat = RPC_CANTRECV;
518		}
519		errp->re_status = stat;
520		goto out;
521	} else {
522		/*
523		 * We were woken up by the upcall.  If the
524		 * upcall had a receive error, report that,
525		 * otherwise we have a reply.
526		 */
527		if (cr->cr_error) {
528			errp->re_errno = cr->cr_error;
529			errp->re_status = stat = RPC_CANTRECV;
530			goto out;
531		}
532	}
533
534got_reply:
535	/*
536	 * Now decode and validate the response. We need to drop the
537	 * lock since xdr_replymsg may end up sleeping in malloc.
538	 */
539	mtx_unlock(&ct->ct_lock);
540
541	if (ext && ext->rc_feedback)
542		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
543
544	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
545	ok = xdr_replymsg(&xdrs, &reply_msg);
546	cr->cr_mrep = NULL;
547
548	if (ok) {
549		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
550		    (reply_msg.acpted_rply.ar_stat == SUCCESS))
551			errp->re_status = stat = RPC_SUCCESS;
552		else
553			stat = _seterr_reply(&reply_msg, errp);
554
555		if (stat == RPC_SUCCESS) {
556			results = xdrmbuf_getall(&xdrs);
557			if (!AUTH_VALIDATE(auth, xid,
558				&reply_msg.acpted_rply.ar_verf,
559				&results)) {
560				errp->re_status = stat = RPC_AUTHERROR;
561				errp->re_why = AUTH_INVALIDRESP;
562			} else {
563				KASSERT(results,
564				    ("auth validated but no result"));
565				*resultsp = results;
566			}
567		}		/* end successful completion */
568		/*
569		 * If unsuccesful AND error is an authentication error
570		 * then refresh credentials and try again, else break
571		 */
572		else if (stat == RPC_AUTHERROR)
573			/* maybe our credentials need to be refreshed ... */
574			if (nrefreshes > 0 &&
575			    AUTH_REFRESH(auth, &reply_msg)) {
576				nrefreshes--;
577				XDR_DESTROY(&xdrs);
578				mtx_lock(&ct->ct_lock);
579				goto call_again;
580			}
581		/* end of unsuccessful completion */
582	}	/* end of valid reply message */
583	else {
584		errp->re_status = stat = RPC_CANTDECODERES;
585	}
586	XDR_DESTROY(&xdrs);
587	mtx_lock(&ct->ct_lock);
588out:
589	mtx_assert(&ct->ct_lock, MA_OWNED);
590
591	KASSERT(stat != RPC_SUCCESS || *resultsp,
592	    ("RPC_SUCCESS without reply"));
593
594	if (mreq)
595		m_freem(mreq);
596	if (cr->cr_mrep)
597		m_freem(cr->cr_mrep);
598
599	ct->ct_threads--;
600	if (ct->ct_closing)
601		wakeup(ct);
602
603	mtx_unlock(&ct->ct_lock);
604
605	if (auth && stat != RPC_SUCCESS)
606		AUTH_VALIDATE(auth, xid, NULL, NULL);
607
608	free(cr, M_RPC);
609
610	return (stat);
611}
612
613static void
614clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
615{
616	struct ct_data *ct = (struct ct_data *) cl->cl_private;
617
618	*errp = ct->ct_error;
619}
620
621static bool_t
622clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
623{
624	XDR xdrs;
625	bool_t dummy;
626
627	xdrs.x_op = XDR_FREE;
628	dummy = (*xdr_res)(&xdrs, res_ptr);
629
630	return (dummy);
631}
632
633/*ARGSUSED*/
634static void
635clnt_vc_abort(CLIENT *cl)
636{
637}
638
639static bool_t
640clnt_vc_control(CLIENT *cl, u_int request, void *info)
641{
642	struct ct_data *ct = (struct ct_data *)cl->cl_private;
643	void *infop = info;
644
645	mtx_lock(&ct->ct_lock);
646
647	switch (request) {
648	case CLSET_FD_CLOSE:
649		ct->ct_closeit = TRUE;
650		mtx_unlock(&ct->ct_lock);
651		return (TRUE);
652	case CLSET_FD_NCLOSE:
653		ct->ct_closeit = FALSE;
654		mtx_unlock(&ct->ct_lock);
655		return (TRUE);
656	default:
657		break;
658	}
659
660	/* for other requests which use info */
661	if (info == NULL) {
662		mtx_unlock(&ct->ct_lock);
663		return (FALSE);
664	}
665	switch (request) {
666	case CLSET_TIMEOUT:
667		if (time_not_ok((struct timeval *)info)) {
668			mtx_unlock(&ct->ct_lock);
669			return (FALSE);
670		}
671		ct->ct_wait = *(struct timeval *)infop;
672		break;
673	case CLGET_TIMEOUT:
674		*(struct timeval *)infop = ct->ct_wait;
675		break;
676	case CLGET_SERVER_ADDR:
677		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
678		break;
679	case CLGET_SVC_ADDR:
680		/*
681		 * Slightly different semantics to userland - we use
682		 * sockaddr instead of netbuf.
683		 */
684		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
685		break;
686	case CLSET_SVC_ADDR:		/* set to new address */
687		mtx_unlock(&ct->ct_lock);
688		return (FALSE);
689	case CLGET_XID:
690		*(uint32_t *)info = ct->ct_xid;
691		break;
692	case CLSET_XID:
693		/* This will set the xid of the NEXT call */
694		/* decrement by 1 as clnt_vc_call() increments once */
695		ct->ct_xid = *(uint32_t *)info - 1;
696		break;
697	case CLGET_VERS:
698		/*
699		 * This RELIES on the information that, in the call body,
700		 * the version number field is the fifth field from the
701		 * begining of the RPC header. MUST be changed if the
702		 * call_struct is changed
703		 */
704		*(uint32_t *)info =
705		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
706		    4 * BYTES_PER_XDR_UNIT));
707		break;
708
709	case CLSET_VERS:
710		*(uint32_t *)(void *)(ct->ct_mcallc +
711		    4 * BYTES_PER_XDR_UNIT) =
712		    htonl(*(uint32_t *)info);
713		break;
714
715	case CLGET_PROG:
716		/*
717		 * This RELIES on the information that, in the call body,
718		 * the program number field is the fourth field from the
719		 * begining of the RPC header. MUST be changed if the
720		 * call_struct is changed
721		 */
722		*(uint32_t *)info =
723		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
724		    3 * BYTES_PER_XDR_UNIT));
725		break;
726
727	case CLSET_PROG:
728		*(uint32_t *)(void *)(ct->ct_mcallc +
729		    3 * BYTES_PER_XDR_UNIT) =
730		    htonl(*(uint32_t *)info);
731		break;
732
733	case CLSET_WAITCHAN:
734		ct->ct_waitchan = (const char *)info;
735		break;
736
737	case CLGET_WAITCHAN:
738		*(const char **) info = ct->ct_waitchan;
739		break;
740
741	case CLSET_INTERRUPTIBLE:
742		if (*(int *) info)
743			ct->ct_waitflag = PCATCH | PBDRY;
744		else
745			ct->ct_waitflag = 0;
746		break;
747
748	case CLGET_INTERRUPTIBLE:
749		if (ct->ct_waitflag)
750			*(int *) info = TRUE;
751		else
752			*(int *) info = FALSE;
753		break;
754
755	default:
756		mtx_unlock(&ct->ct_lock);
757		return (FALSE);
758	}
759
760	mtx_unlock(&ct->ct_lock);
761	return (TRUE);
762}
763
764static void
765clnt_vc_close(CLIENT *cl)
766{
767	struct ct_data *ct = (struct ct_data *) cl->cl_private;
768	struct ct_request *cr;
769
770	mtx_lock(&ct->ct_lock);
771
772	if (ct->ct_closed) {
773		mtx_unlock(&ct->ct_lock);
774		return;
775	}
776
777	if (ct->ct_closing) {
778		while (ct->ct_closing)
779			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
780		KASSERT(ct->ct_closed, ("client should be closed"));
781		mtx_unlock(&ct->ct_lock);
782		return;
783	}
784
785	if (ct->ct_socket) {
786		ct->ct_closing = TRUE;
787		mtx_unlock(&ct->ct_lock);
788
789		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
790		soupcall_clear(ct->ct_socket, SO_RCV);
791		clnt_vc_upcallsdone(ct);
792		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
793
794		/*
795		 * Abort any pending requests and wait until everyone
796		 * has finished with clnt_vc_call.
797		 */
798		mtx_lock(&ct->ct_lock);
799		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
800			cr->cr_xid = 0;
801			cr->cr_error = ESHUTDOWN;
802			wakeup(cr);
803		}
804
805		while (ct->ct_threads)
806			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
807	}
808
809	ct->ct_closing = FALSE;
810	ct->ct_closed = TRUE;
811	mtx_unlock(&ct->ct_lock);
812	wakeup(ct);
813}
814
815static void
816clnt_vc_destroy(CLIENT *cl)
817{
818	struct ct_data *ct = (struct ct_data *) cl->cl_private;
819	struct socket *so = NULL;
820
821	clnt_vc_close(cl);
822
823	mtx_lock(&ct->ct_lock);
824
825	if (ct->ct_socket) {
826		if (ct->ct_closeit) {
827			so = ct->ct_socket;
828		}
829	}
830
831	mtx_unlock(&ct->ct_lock);
832
833	mtx_destroy(&ct->ct_lock);
834	if (so) {
835		soshutdown(so, SHUT_WR);
836		soclose(so);
837	}
838	mem_free(ct, sizeof(struct ct_data));
839	if (cl->cl_netid && cl->cl_netid[0])
840		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
841	if (cl->cl_tp && cl->cl_tp[0])
842		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
843	mem_free(cl, sizeof(CLIENT));
844}
845
846/*
847 * Make sure that the time is not garbage.   -1 value is disallowed.
848 * Note this is different from time_not_ok in clnt_dg.c
849 */
850static bool_t
851time_not_ok(struct timeval *t)
852{
853	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
854		t->tv_usec <= -1 || t->tv_usec > 1000000);
855}
856
857int
858clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
859{
860	struct ct_data *ct = (struct ct_data *) arg;
861	struct uio uio;
862	struct mbuf *m;
863	struct ct_request *cr;
864	int error, rcvflag, foundreq;
865	uint32_t xid, header;
866	bool_t do_read;
867
868	ct->ct_upcallrefs++;
869	uio.uio_td = curthread;
870	do {
871		/*
872		 * If ct_record_resid is zero, we are waiting for a
873		 * record mark.
874		 */
875		if (ct->ct_record_resid == 0) {
876
877			/*
878			 * Make sure there is either a whole record
879			 * mark in the buffer or there is some other
880			 * error condition
881			 */
882			do_read = FALSE;
883			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
884			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
885			    || so->so_error)
886				do_read = TRUE;
887
888			if (!do_read)
889				break;
890
891			SOCKBUF_UNLOCK(&so->so_rcv);
892			uio.uio_resid = sizeof(uint32_t);
893			m = NULL;
894			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
895			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
896			SOCKBUF_LOCK(&so->so_rcv);
897
898			if (error == EWOULDBLOCK)
899				break;
900
901			/*
902			 * If there was an error, wake up all pending
903			 * requests.
904			 */
905			if (error || uio.uio_resid > 0) {
906			wakeup_all:
907				mtx_lock(&ct->ct_lock);
908				if (!error) {
909					/*
910					 * We must have got EOF trying
911					 * to read from the stream.
912					 */
913					error = ECONNRESET;
914				}
915				ct->ct_error.re_status = RPC_CANTRECV;
916				ct->ct_error.re_errno = error;
917				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
918					cr->cr_error = error;
919					wakeup(cr);
920				}
921				mtx_unlock(&ct->ct_lock);
922				break;
923			}
924			m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
925			header = ntohl(header);
926			ct->ct_record = NULL;
927			ct->ct_record_resid = header & 0x7fffffff;
928			ct->ct_record_eor = ((header & 0x80000000) != 0);
929			m_freem(m);
930		} else {
931			/*
932			 * Wait until the socket has the whole record
933			 * buffered.
934			 */
935			do_read = FALSE;
936			if (so->so_rcv.sb_cc >= ct->ct_record_resid
937			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
938			    || so->so_error)
939				do_read = TRUE;
940
941			if (!do_read)
942				break;
943
944			/*
945			 * We have the record mark. Read as much as
946			 * the socket has buffered up to the end of
947			 * this record.
948			 */
949			SOCKBUF_UNLOCK(&so->so_rcv);
950			uio.uio_resid = ct->ct_record_resid;
951			m = NULL;
952			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
953			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
954			SOCKBUF_LOCK(&so->so_rcv);
955
956			if (error == EWOULDBLOCK)
957				break;
958
959			if (error || uio.uio_resid == ct->ct_record_resid)
960				goto wakeup_all;
961
962			/*
963			 * If we have part of the record already,
964			 * chain this bit onto the end.
965			 */
966			if (ct->ct_record)
967				m_last(ct->ct_record)->m_next = m;
968			else
969				ct->ct_record = m;
970
971			ct->ct_record_resid = uio.uio_resid;
972
973			/*
974			 * If we have the entire record, see if we can
975			 * match it to a request.
976			 */
977			if (ct->ct_record_resid == 0
978			    && ct->ct_record_eor) {
979				/*
980				 * The XID is in the first uint32_t of
981				 * the reply.
982				 */
983				if (ct->ct_record->m_len < sizeof(xid) &&
984				    m_length(ct->ct_record, NULL) <
985				    sizeof(xid)) {
986					m_freem(ct->ct_record);
987					break;
988				}
989				m_copydata(ct->ct_record, 0, sizeof(xid),
990				    (char *)&xid);
991				xid = ntohl(xid);
992
993				mtx_lock(&ct->ct_lock);
994				foundreq = 0;
995				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
996					if (cr->cr_xid == xid) {
997						/*
998						 * This one
999						 * matches. We leave
1000						 * the reply mbuf in
1001						 * cr->cr_mrep. Set
1002						 * the XID to zero so
1003						 * that we will ignore
1004						 * any duplicaed
1005						 * replies.
1006						 */
1007						cr->cr_xid = 0;
1008						cr->cr_mrep = ct->ct_record;
1009						cr->cr_error = 0;
1010						foundreq = 1;
1011						wakeup(cr);
1012						break;
1013					}
1014				}
1015				mtx_unlock(&ct->ct_lock);
1016
1017				if (!foundreq)
1018					m_freem(ct->ct_record);
1019				ct->ct_record = NULL;
1020			}
1021		}
1022	} while (m);
1023	ct->ct_upcallrefs--;
1024	if (ct->ct_upcallrefs < 0)
1025		panic("rpcvc upcall refcnt");
1026	if (ct->ct_upcallrefs == 0)
1027		wakeup(&ct->ct_upcallrefs);
1028	return (SU_OK);
1029}
1030
1031/*
1032 * Wait for all upcalls in progress to complete.
1033 */
1034static void
1035clnt_vc_upcallsdone(struct ct_data *ct)
1036{
1037
1038	SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1039
1040	while (ct->ct_upcallrefs > 0)
1041		(void) msleep(&ct->ct_upcallrefs,
1042		    SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1043}
1044