clnt_vc.c revision 240880
123934Sgibbs/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
223934Sgibbs
323934Sgibbs/*
423934Sgibbs * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5102668Sgibbs * unrestricted use provided that this legend is included on all tape
623934Sgibbs * media and as a part of the software program in whole or part.  Users
723934Sgibbs * may copy or modify Sun RPC without charge, but are not authorized
823934Sgibbs * to license or distribute it to anyone else except as part of a product or
923934Sgibbs * program developed by the user.
1023934Sgibbs *
1123934Sgibbs * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
1226997Sgibbs * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
1354211Sgibbs * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
1495376Sgibbs *
1595376Sgibbs * Sun RPC is provided with no support and without any obligation on the
1695376Sgibbs * part of Sun Microsystems, Inc. to assist in its use, correction,
1795376Sgibbs * modification or enhancement.
1895376Sgibbs *
1995376Sgibbs * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
2095376Sgibbs * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
2195376Sgibbs * OR ANY PART THEREOF.
2223934Sgibbs *
2363457Sgibbs * In no event will Sun Microsystems, Inc. be liable for any lost revenue
2495376Sgibbs * or profits or other special, indirect and consequential damages, even if
2595376Sgibbs * Sun has been advised of the possibility of such damages.
2663457Sgibbs *
2795376Sgibbs * Sun Microsystems, Inc.
2895376Sgibbs * 2550 Garcia Avenue
2995376Sgibbs * Mountain View, California  94043
3095376Sgibbs */
3195376Sgibbs
3295376Sgibbs#if defined(LIBC_SCCS) && !defined(lint)
3323934Sgibbsstatic char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
3423934Sgibbsstatic char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
3595376Sgibbsstatic char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
3695376Sgibbs#endif
3795376Sgibbs#include <sys/cdefs.h>
3895376Sgibbs__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 240880 2012-09-24 03:14:17Z pfg $");
3923934Sgibbs
40102668Sgibbs/*
4165943Sgibbs * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
4250477Speter *
4323934Sgibbs * Copyright (C) 1984, Sun Microsystems, Inc.
4423934Sgibbs *
4565943Sgibbs * TCP based RPC supports 'batched calls'.
4665943Sgibbs * A sequence of calls may be batched-up in a send buffer.  The rpc call
4765943Sgibbs * return immediately to the client even though the call was not necessarily
4823934Sgibbs * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
4965943Sgibbs * the rpc timeout value is zero (see clnt.h, rpc).
5023934Sgibbs *
5123934Sgibbs * Clients should NOT casually batch calls that in fact return results; that is,
5223934Sgibbs * the server side should be aware that a call is batched and not produce any
5323934Sgibbs * return message.  Batched calls that produce many result messages can
5423934Sgibbs * deadlock (netlock) the client and the server....
5523934Sgibbs *
5623934Sgibbs * Now go hang yourself.
5723934Sgibbs */
5823934Sgibbs
5923934Sgibbs#include <sys/param.h>
6023934Sgibbs#include <sys/systm.h>
6123934Sgibbs#include <sys/lock.h>
6260938Sjake#include <sys/malloc.h>
6323934Sgibbs#include <sys/mbuf.h>
6423934Sgibbs#include <sys/mutex.h>
6523934Sgibbs#include <sys/pcpu.h>
6623934Sgibbs#include <sys/proc.h>
6723934Sgibbs#include <sys/protosw.h>
6823934Sgibbs#include <sys/socket.h>
6923934Sgibbs#include <sys/socketvar.h>
7023934Sgibbs#include <sys/syslog.h>
7160938Sjake#include <sys/time.h>
7223934Sgibbs#include <sys/uio.h>
7323934Sgibbs
7466270Sgibbs#include <net/vnet.h>
7539220Sgibbs
7639220Sgibbs#include <netinet/tcp.h>
7723934Sgibbs
7823934Sgibbs#include <rpc/rpc.h>
79102668Sgibbs#include <rpc/rpc_com.h>
8023934Sgibbs
8123934Sgibbs#define MCALL_MSG_SIZE 24
82102668Sgibbs
8395376Sgibbsstruct cmessage {
8479873Sgibbs        struct cmsghdr cmsg;
8595376Sgibbs        struct cmsgcred cmcred;
8695376Sgibbs};
8795376Sgibbs
8823934Sgibbsstatic enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
8939220Sgibbs    rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
9039220Sgibbsstatic void clnt_vc_geterr(CLIENT *, struct rpc_err *);
9195376Sgibbsstatic bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
9239220Sgibbsstatic void clnt_vc_abort(CLIENT *);
9366270Sgibbsstatic bool_t clnt_vc_control(CLIENT *, u_int, void *);
9439220Sgibbsstatic void clnt_vc_close(CLIENT *);
9539220Sgibbsstatic void clnt_vc_destroy(CLIENT *);
96static bool_t time_not_ok(struct timeval *);
97static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99static struct clnt_ops clnt_vc_ops = {
100	.cl_call =	clnt_vc_call,
101	.cl_abort =	clnt_vc_abort,
102	.cl_geterr =	clnt_vc_geterr,
103	.cl_freeres =	clnt_vc_freeres,
104	.cl_close =	clnt_vc_close,
105	.cl_destroy =	clnt_vc_destroy,
106	.cl_control =	clnt_vc_control
107};
108
109/*
110 * A pending RPC request which awaits a reply. Requests which have
111 * received their reply will have cr_xid set to zero and cr_mrep to
112 * the mbuf chain of the reply.
113 */
114struct ct_request {
115	TAILQ_ENTRY(ct_request) cr_link;
116	uint32_t		cr_xid;		/* XID of request */
117	struct mbuf		*cr_mrep;	/* reply received by upcall */
118	int			cr_error;	/* any error from upcall */
119	char			cr_verf[MAX_AUTH_BYTES]; /* reply verf */
120};
121
122TAILQ_HEAD(ct_request_list, ct_request);
123
124struct ct_data {
125	struct mtx	ct_lock;
126	int		ct_threads;	/* number of threads in clnt_vc_call */
127	bool_t		ct_closing;	/* TRUE if we are closing */
128	bool_t		ct_closed;	/* TRUE if we are closed */
129	struct socket	*ct_socket;	/* connection socket */
130	bool_t		ct_closeit;	/* close it on destroy */
131	struct timeval	ct_wait;	/* wait interval in milliseconds */
132	struct sockaddr_storage	ct_addr; /* remote addr */
133	struct rpc_err	ct_error;
134	uint32_t	ct_xid;
135	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
136	size_t		ct_mpos;	/* pos after marshal */
137	const char	*ct_waitchan;
138	int		ct_waitflag;
139	struct mbuf	*ct_record;	/* current reply record */
140	size_t		ct_record_resid; /* how much left of reply to read */
141	bool_t		ct_record_eor;	 /* true if reading last fragment */
142	struct ct_request_list ct_pending;
143	int		ct_upcallrefs;	/* Ref cnt of upcalls in prog. */
144};
145
146static void clnt_vc_upcallsdone(struct ct_data *);
147
148static const char clnt_vc_errstr[] = "%s : %s";
149static const char clnt_vc_str[] = "clnt_vc_create";
150static const char clnt_read_vc_str[] = "read_vc";
151static const char __no_mem_str[] = "out of memory";
152
153/*
154 * Create a client handle for a connection.
155 * Default options are set, which the user can change using clnt_control()'s.
156 * The rpc/vc package does buffering similar to stdio, so the client
157 * must pick send and receive buffer sizes, 0 => use the default.
158 * NB: fd is copied into a private area.
159 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
160 * set this something more useful.
161 *
162 * fd should be an open socket
163 */
164CLIENT *
165clnt_vc_create(
166	struct socket *so,		/* open file descriptor */
167	struct sockaddr *raddr,		/* servers address */
168	const rpcprog_t prog,		/* program number */
169	const rpcvers_t vers,		/* version number */
170	size_t sendsz,			/* buffer recv size */
171	size_t recvsz,			/* buffer send size */
172	int intrflag)			/* interruptible */
173{
174	CLIENT *cl;			/* client handle */
175	struct ct_data *ct = NULL;	/* client handle */
176	struct timeval now;
177	struct rpc_msg call_msg;
178	static uint32_t disrupt;
179	struct __rpc_sockinfo si;
180	XDR xdrs;
181	int error, interrupted, one = 1, sleep_flag;
182	struct sockopt sopt;
183
184	if (disrupt == 0)
185		disrupt = (uint32_t)(long)raddr;
186
187	cl = (CLIENT *)mem_alloc(sizeof (*cl));
188	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
189
190	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
191	ct->ct_threads = 0;
192	ct->ct_closing = FALSE;
193	ct->ct_closed = FALSE;
194	ct->ct_upcallrefs = 0;
195
196	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
197		error = soconnect(so, raddr, curthread);
198		SOCK_LOCK(so);
199		interrupted = 0;
200		sleep_flag = PSOCK;
201		if (intrflag != 0)
202			sleep_flag |= (PCATCH | PBDRY);
203		while ((so->so_state & SS_ISCONNECTING)
204		    && so->so_error == 0) {
205			error = msleep(&so->so_timeo, SOCK_MTX(so),
206			    sleep_flag, "connec", 0);
207			if (error) {
208				if (error == EINTR || error == ERESTART)
209					interrupted = 1;
210				break;
211			}
212		}
213		if (error == 0) {
214			error = so->so_error;
215			so->so_error = 0;
216		}
217		SOCK_UNLOCK(so);
218		if (error) {
219			if (!interrupted)
220				so->so_state &= ~SS_ISCONNECTING;
221			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
222			rpc_createerr.cf_error.re_errno = error;
223			goto err;
224		}
225	}
226
227	if (!__rpc_socket2sockinfo(so, &si)) {
228		goto err;
229	}
230
231	if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
232		bzero(&sopt, sizeof(sopt));
233		sopt.sopt_dir = SOPT_SET;
234		sopt.sopt_level = SOL_SOCKET;
235		sopt.sopt_name = SO_KEEPALIVE;
236		sopt.sopt_val = &one;
237		sopt.sopt_valsize = sizeof(one);
238		sosetopt(so, &sopt);
239	}
240
241	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
242		bzero(&sopt, sizeof(sopt));
243		sopt.sopt_dir = SOPT_SET;
244		sopt.sopt_level = IPPROTO_TCP;
245		sopt.sopt_name = TCP_NODELAY;
246		sopt.sopt_val = &one;
247		sopt.sopt_valsize = sizeof(one);
248		sosetopt(so, &sopt);
249	}
250
251	ct->ct_closeit = FALSE;
252
253	/*
254	 * Set up private data struct
255	 */
256	ct->ct_socket = so;
257	ct->ct_wait.tv_sec = -1;
258	ct->ct_wait.tv_usec = -1;
259	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
260
261	/*
262	 * Initialize call message
263	 */
264	getmicrotime(&now);
265	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
266	call_msg.rm_xid = ct->ct_xid;
267	call_msg.rm_direction = CALL;
268	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
269	call_msg.rm_call.cb_prog = (uint32_t)prog;
270	call_msg.rm_call.cb_vers = (uint32_t)vers;
271
272	/*
273	 * pre-serialize the static part of the call msg and stash it away
274	 */
275	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
276	    XDR_ENCODE);
277	if (! xdr_callhdr(&xdrs, &call_msg)) {
278		if (ct->ct_closeit) {
279			soclose(ct->ct_socket);
280		}
281		goto err;
282	}
283	ct->ct_mpos = XDR_GETPOS(&xdrs);
284	XDR_DESTROY(&xdrs);
285	ct->ct_waitchan = "rpcrecv";
286	ct->ct_waitflag = 0;
287
288	/*
289	 * Create a client handle which uses xdrrec for serialization
290	 * and authnone for authentication.
291	 */
292	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
293	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
294	error = soreserve(ct->ct_socket, sendsz, recvsz);
295	if (error != 0) {
296		if (ct->ct_closeit) {
297			soclose(ct->ct_socket);
298		}
299		goto err;
300	}
301	cl->cl_refs = 1;
302	cl->cl_ops = &clnt_vc_ops;
303	cl->cl_private = ct;
304	cl->cl_auth = authnone_create();
305
306	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
307	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
308	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
309
310	ct->ct_record = NULL;
311	ct->ct_record_resid = 0;
312	TAILQ_INIT(&ct->ct_pending);
313	return (cl);
314
315err:
316	if (cl) {
317		if (ct) {
318			mtx_destroy(&ct->ct_lock);
319			mem_free(ct, sizeof (struct ct_data));
320		}
321		if (cl)
322			mem_free(cl, sizeof (CLIENT));
323	}
324	return ((CLIENT *)NULL);
325}
326
327static enum clnt_stat
328clnt_vc_call(
329	CLIENT		*cl,		/* client handle */
330	struct rpc_callextra *ext,	/* call metadata */
331	rpcproc_t	proc,		/* procedure number */
332	struct mbuf	*args,		/* pointer to args */
333	struct mbuf	**resultsp,	/* pointer to results */
334	struct timeval	utimeout)
335{
336	struct ct_data *ct = (struct ct_data *) cl->cl_private;
337	AUTH *auth;
338	struct rpc_err *errp;
339	enum clnt_stat stat;
340	XDR xdrs;
341	struct rpc_msg reply_msg;
342	bool_t ok;
343	int nrefreshes = 2;		/* number of times to refresh cred */
344	struct timeval timeout;
345	uint32_t xid;
346	struct mbuf *mreq = NULL, *results;
347	struct ct_request *cr;
348	int error;
349
350	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
351
352	mtx_lock(&ct->ct_lock);
353
354	if (ct->ct_closing || ct->ct_closed) {
355		mtx_unlock(&ct->ct_lock);
356		free(cr, M_RPC);
357		return (RPC_CANTSEND);
358	}
359	ct->ct_threads++;
360
361	if (ext) {
362		auth = ext->rc_auth;
363		errp = &ext->rc_err;
364	} else {
365		auth = cl->cl_auth;
366		errp = &ct->ct_error;
367	}
368
369	cr->cr_mrep = NULL;
370	cr->cr_error = 0;
371
372	if (ct->ct_wait.tv_usec == -1) {
373		timeout = utimeout;	/* use supplied timeout */
374	} else {
375		timeout = ct->ct_wait;	/* use default timeout */
376	}
377
378call_again:
379	mtx_assert(&ct->ct_lock, MA_OWNED);
380
381	ct->ct_xid++;
382	xid = ct->ct_xid;
383
384	mtx_unlock(&ct->ct_lock);
385
386	/*
387	 * Leave space to pre-pend the record mark.
388	 */
389	MGETHDR(mreq, M_WAIT, MT_DATA);
390	mreq->m_data += sizeof(uint32_t);
391	KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
392	    ("RPC header too big"));
393	bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
394	mreq->m_len = ct->ct_mpos;
395
396	/*
397	 * The XID is the first thing in the request.
398	 */
399	*mtod(mreq, uint32_t *) = htonl(xid);
400
401	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
402
403	errp->re_status = stat = RPC_SUCCESS;
404
405	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
406	    (! AUTH_MARSHALL(auth, xid, &xdrs,
407		m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
408		errp->re_status = stat = RPC_CANTENCODEARGS;
409		mtx_lock(&ct->ct_lock);
410		goto out;
411	}
412	mreq->m_pkthdr.len = m_length(mreq, NULL);
413
414	/*
415	 * Prepend a record marker containing the packet length.
416	 */
417	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
418	*mtod(mreq, uint32_t *) =
419		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
420
421	cr->cr_xid = xid;
422	mtx_lock(&ct->ct_lock);
423	/*
424	 * Check to see if the other end has already started to close down
425	 * the connection. The upcall will have set ct_error.re_status
426	 * to RPC_CANTRECV if this is the case.
427	 * If the other end starts to close down the connection after this
428	 * point, it will be detected later when cr_error is checked,
429	 * since the request is in the ct_pending queue.
430	 */
431	if (ct->ct_error.re_status == RPC_CANTRECV) {
432		if (errp != &ct->ct_error) {
433			errp->re_errno = ct->ct_error.re_errno;
434			errp->re_status = RPC_CANTRECV;
435		}
436		stat = RPC_CANTRECV;
437		goto out;
438	}
439	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
440	mtx_unlock(&ct->ct_lock);
441
442	/*
443	 * sosend consumes mreq.
444	 */
445	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
446	mreq = NULL;
447	if (error == EMSGSIZE) {
448		SOCKBUF_LOCK(&ct->ct_socket->so_snd);
449		sbwait(&ct->ct_socket->so_snd);
450		SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
451		AUTH_VALIDATE(auth, xid, NULL, NULL);
452		mtx_lock(&ct->ct_lock);
453		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
454		goto call_again;
455	}
456
457	reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
458	reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
459	reply_msg.acpted_rply.ar_verf.oa_length = 0;
460	reply_msg.acpted_rply.ar_results.where = NULL;
461	reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
462
463	mtx_lock(&ct->ct_lock);
464	if (error) {
465		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
466		errp->re_errno = error;
467		errp->re_status = stat = RPC_CANTSEND;
468		goto out;
469	}
470
471	/*
472	 * Check to see if we got an upcall while waiting for the
473	 * lock. In both these cases, the request has been removed
474	 * from ct->ct_pending.
475	 */
476	if (cr->cr_error) {
477		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
478		errp->re_errno = cr->cr_error;
479		errp->re_status = stat = RPC_CANTRECV;
480		goto out;
481	}
482	if (cr->cr_mrep) {
483		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
484		goto got_reply;
485	}
486
487	/*
488	 * Hack to provide rpc-based message passing
489	 */
490	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
491		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
492		errp->re_status = stat = RPC_TIMEDOUT;
493		goto out;
494	}
495
496	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
497	    tvtohz(&timeout));
498
499	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
500
501	if (error) {
502		/*
503		 * The sleep returned an error so our request is still
504		 * on the list. Turn the error code into an
505		 * appropriate client status.
506		 */
507		errp->re_errno = error;
508		switch (error) {
509		case EINTR:
510		case ERESTART:
511			stat = RPC_INTR;
512			break;
513		case EWOULDBLOCK:
514			stat = RPC_TIMEDOUT;
515			break;
516		default:
517			stat = RPC_CANTRECV;
518		}
519		errp->re_status = stat;
520		goto out;
521	} else {
522		/*
523		 * We were woken up by the upcall.  If the
524		 * upcall had a receive error, report that,
525		 * otherwise we have a reply.
526		 */
527		if (cr->cr_error) {
528			errp->re_errno = cr->cr_error;
529			errp->re_status = stat = RPC_CANTRECV;
530			goto out;
531		}
532	}
533
534got_reply:
535	/*
536	 * Now decode and validate the response. We need to drop the
537	 * lock since xdr_replymsg may end up sleeping in malloc.
538	 */
539	mtx_unlock(&ct->ct_lock);
540
541	if (ext && ext->rc_feedback)
542		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
543
544	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
545	ok = xdr_replymsg(&xdrs, &reply_msg);
546	cr->cr_mrep = NULL;
547
548	if (ok) {
549		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
550		    (reply_msg.acpted_rply.ar_stat == SUCCESS))
551			errp->re_status = stat = RPC_SUCCESS;
552		else
553			stat = _seterr_reply(&reply_msg, errp);
554
555		if (stat == RPC_SUCCESS) {
556			results = xdrmbuf_getall(&xdrs);
557			if (!AUTH_VALIDATE(auth, xid,
558				&reply_msg.acpted_rply.ar_verf,
559				&results)) {
560				errp->re_status = stat = RPC_AUTHERROR;
561				errp->re_why = AUTH_INVALIDRESP;
562			} else {
563				KASSERT(results,
564				    ("auth validated but no result"));
565				*resultsp = results;
566			}
567		}		/* end successful completion */
568		/*
569		 * If unsuccesful AND error is an authentication error
570		 * then refresh credentials and try again, else break
571		 */
572		else if (stat == RPC_AUTHERROR)
573			/* maybe our credentials need to be refreshed ... */
574			if (nrefreshes > 0 &&
575			    AUTH_REFRESH(auth, &reply_msg)) {
576				nrefreshes--;
577				XDR_DESTROY(&xdrs);
578				mtx_lock(&ct->ct_lock);
579				goto call_again;
580			}
581		/* end of unsuccessful completion */
582	}	/* end of valid reply message */
583	else {
584		errp->re_status = stat = RPC_CANTDECODERES;
585	}
586	XDR_DESTROY(&xdrs);
587	mtx_lock(&ct->ct_lock);
588out:
589	mtx_assert(&ct->ct_lock, MA_OWNED);
590
591	KASSERT(stat != RPC_SUCCESS || *resultsp,
592	    ("RPC_SUCCESS without reply"));
593
594	if (mreq)
595		m_freem(mreq);
596	if (cr->cr_mrep)
597		m_freem(cr->cr_mrep);
598
599	ct->ct_threads--;
600	if (ct->ct_closing)
601		wakeup(ct);
602
603	mtx_unlock(&ct->ct_lock);
604
605	if (auth && stat != RPC_SUCCESS)
606		AUTH_VALIDATE(auth, xid, NULL, NULL);
607
608	free(cr, M_RPC);
609
610	return (stat);
611}
612
613static void
614clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
615{
616	struct ct_data *ct = (struct ct_data *) cl->cl_private;
617
618	*errp = ct->ct_error;
619}
620
621static bool_t
622clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
623{
624	XDR xdrs;
625	bool_t dummy;
626
627	xdrs.x_op = XDR_FREE;
628	dummy = (*xdr_res)(&xdrs, res_ptr);
629
630	return (dummy);
631}
632
633/*ARGSUSED*/
634static void
635clnt_vc_abort(CLIENT *cl)
636{
637}
638
639static bool_t
640clnt_vc_control(CLIENT *cl, u_int request, void *info)
641{
642	struct ct_data *ct = (struct ct_data *)cl->cl_private;
643	void *infop = info;
644
645	mtx_lock(&ct->ct_lock);
646
647	switch (request) {
648	case CLSET_FD_CLOSE:
649		ct->ct_closeit = TRUE;
650		mtx_unlock(&ct->ct_lock);
651		return (TRUE);
652	case CLSET_FD_NCLOSE:
653		ct->ct_closeit = FALSE;
654		mtx_unlock(&ct->ct_lock);
655		return (TRUE);
656	default:
657		break;
658	}
659
660	/* for other requests which use info */
661	if (info == NULL) {
662		mtx_unlock(&ct->ct_lock);
663		return (FALSE);
664	}
665	switch (request) {
666	case CLSET_TIMEOUT:
667		if (time_not_ok((struct timeval *)info)) {
668			mtx_unlock(&ct->ct_lock);
669			return (FALSE);
670		}
671		ct->ct_wait = *(struct timeval *)infop;
672		break;
673	case CLGET_TIMEOUT:
674		*(struct timeval *)infop = ct->ct_wait;
675		break;
676	case CLGET_SERVER_ADDR:
677		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
678		break;
679	case CLGET_SVC_ADDR:
680		/*
681		 * Slightly different semantics to userland - we use
682		 * sockaddr instead of netbuf.
683		 */
684		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
685		break;
686	case CLSET_SVC_ADDR:		/* set to new address */
687		mtx_unlock(&ct->ct_lock);
688		return (FALSE);
689	case CLGET_XID:
690		*(uint32_t *)info = ct->ct_xid;
691		break;
692	case CLSET_XID:
693		/* This will set the xid of the NEXT call */
694		/* decrement by 1 as clnt_vc_call() increments once */
695		ct->ct_xid = *(uint32_t *)info - 1;
696		break;
697	case CLGET_VERS:
698		/*
699		 * This RELIES on the information that, in the call body,
700		 * the version number field is the fifth field from the
701		 * begining of the RPC header. MUST be changed if the
702		 * call_struct is changed
703		 */
704		*(uint32_t *)info =
705		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
706		    4 * BYTES_PER_XDR_UNIT));
707		break;
708
709	case CLSET_VERS:
710		*(uint32_t *)(void *)(ct->ct_mcallc +
711		    4 * BYTES_PER_XDR_UNIT) =
712		    htonl(*(uint32_t *)info);
713		break;
714
715	case CLGET_PROG:
716		/*
717		 * This RELIES on the information that, in the call body,
718		 * the program number field is the fourth field from the
719		 * begining of the RPC header. MUST be changed if the
720		 * call_struct is changed
721		 */
722		*(uint32_t *)info =
723		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
724		    3 * BYTES_PER_XDR_UNIT));
725		break;
726
727	case CLSET_PROG:
728		*(uint32_t *)(void *)(ct->ct_mcallc +
729		    3 * BYTES_PER_XDR_UNIT) =
730		    htonl(*(uint32_t *)info);
731		break;
732
733	case CLSET_WAITCHAN:
734		ct->ct_waitchan = (const char *)info;
735		break;
736
737	case CLGET_WAITCHAN:
738		*(const char **) info = ct->ct_waitchan;
739		break;
740
741	case CLSET_INTERRUPTIBLE:
742		if (*(int *) info)
743			ct->ct_waitflag = PCATCH | PBDRY;
744		else
745			ct->ct_waitflag = 0;
746		break;
747
748	case CLGET_INTERRUPTIBLE:
749		if (ct->ct_waitflag)
750			*(int *) info = TRUE;
751		else
752			*(int *) info = FALSE;
753		break;
754
755	default:
756		mtx_unlock(&ct->ct_lock);
757		return (FALSE);
758	}
759
760	mtx_unlock(&ct->ct_lock);
761	return (TRUE);
762}
763
764static void
765clnt_vc_close(CLIENT *cl)
766{
767	struct ct_data *ct = (struct ct_data *) cl->cl_private;
768	struct ct_request *cr;
769
770	mtx_lock(&ct->ct_lock);
771
772	if (ct->ct_closed) {
773		mtx_unlock(&ct->ct_lock);
774		return;
775	}
776
777	if (ct->ct_closing) {
778		while (ct->ct_closing)
779			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
780		KASSERT(ct->ct_closed, ("client should be closed"));
781		mtx_unlock(&ct->ct_lock);
782		return;
783	}
784
785	if (ct->ct_socket) {
786		ct->ct_closing = TRUE;
787		mtx_unlock(&ct->ct_lock);
788
789		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
790		soupcall_clear(ct->ct_socket, SO_RCV);
791		clnt_vc_upcallsdone(ct);
792		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
793
794		/*
795		 * Abort any pending requests and wait until everyone
796		 * has finished with clnt_vc_call.
797		 */
798		mtx_lock(&ct->ct_lock);
799		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
800			cr->cr_xid = 0;
801			cr->cr_error = ESHUTDOWN;
802			wakeup(cr);
803		}
804
805		while (ct->ct_threads)
806			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
807	}
808
809	ct->ct_closing = FALSE;
810	ct->ct_closed = TRUE;
811	mtx_unlock(&ct->ct_lock);
812	wakeup(ct);
813}
814
815static void
816clnt_vc_destroy(CLIENT *cl)
817{
818	struct ct_data *ct = (struct ct_data *) cl->cl_private;
819	struct socket *so = NULL;
820
821	clnt_vc_close(cl);
822
823	mtx_lock(&ct->ct_lock);
824
825	if (ct->ct_socket) {
826		if (ct->ct_closeit) {
827			so = ct->ct_socket;
828		}
829	}
830
831	mtx_unlock(&ct->ct_lock);
832
833	mtx_destroy(&ct->ct_lock);
834	if (so) {
835		soshutdown(so, SHUT_WR);
836		soclose(so);
837	}
838	mem_free(ct, sizeof(struct ct_data));
839	mem_free(cl, sizeof(CLIENT));
840}
841
842/*
843 * Make sure that the time is not garbage.   -1 value is disallowed.
844 * Note this is different from time_not_ok in clnt_dg.c
845 */
846static bool_t
847time_not_ok(struct timeval *t)
848{
849	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
850		t->tv_usec <= -1 || t->tv_usec > 1000000);
851}
852
853int
854clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
855{
856	struct ct_data *ct = (struct ct_data *) arg;
857	struct uio uio;
858	struct mbuf *m;
859	struct ct_request *cr;
860	int error, rcvflag, foundreq;
861	uint32_t xid, header;
862	bool_t do_read;
863
864	ct->ct_upcallrefs++;
865	uio.uio_td = curthread;
866	do {
867		/*
868		 * If ct_record_resid is zero, we are waiting for a
869		 * record mark.
870		 */
871		if (ct->ct_record_resid == 0) {
872
873			/*
874			 * Make sure there is either a whole record
875			 * mark in the buffer or there is some other
876			 * error condition
877			 */
878			do_read = FALSE;
879			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
880			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
881			    || so->so_error)
882				do_read = TRUE;
883
884			if (!do_read)
885				break;
886
887			SOCKBUF_UNLOCK(&so->so_rcv);
888			uio.uio_resid = sizeof(uint32_t);
889			m = NULL;
890			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
891			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
892			SOCKBUF_LOCK(&so->so_rcv);
893
894			if (error == EWOULDBLOCK)
895				break;
896
897			/*
898			 * If there was an error, wake up all pending
899			 * requests.
900			 */
901			if (error || uio.uio_resid > 0) {
902			wakeup_all:
903				mtx_lock(&ct->ct_lock);
904				if (!error) {
905					/*
906					 * We must have got EOF trying
907					 * to read from the stream.
908					 */
909					error = ECONNRESET;
910				}
911				ct->ct_error.re_status = RPC_CANTRECV;
912				ct->ct_error.re_errno = error;
913				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
914					cr->cr_error = error;
915					wakeup(cr);
916				}
917				mtx_unlock(&ct->ct_lock);
918				break;
919			}
920			m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
921			header = ntohl(header);
922			ct->ct_record = NULL;
923			ct->ct_record_resid = header & 0x7fffffff;
924			ct->ct_record_eor = ((header & 0x80000000) != 0);
925			m_freem(m);
926		} else {
927			/*
928			 * Wait until the socket has the whole record
929			 * buffered.
930			 */
931			do_read = FALSE;
932			if (so->so_rcv.sb_cc >= ct->ct_record_resid
933			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
934			    || so->so_error)
935				do_read = TRUE;
936
937			if (!do_read)
938				break;
939
940			/*
941			 * We have the record mark. Read as much as
942			 * the socket has buffered up to the end of
943			 * this record.
944			 */
945			SOCKBUF_UNLOCK(&so->so_rcv);
946			uio.uio_resid = ct->ct_record_resid;
947			m = NULL;
948			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
949			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
950			SOCKBUF_LOCK(&so->so_rcv);
951
952			if (error == EWOULDBLOCK)
953				break;
954
955			if (error || uio.uio_resid == ct->ct_record_resid)
956				goto wakeup_all;
957
958			/*
959			 * If we have part of the record already,
960			 * chain this bit onto the end.
961			 */
962			if (ct->ct_record)
963				m_last(ct->ct_record)->m_next = m;
964			else
965				ct->ct_record = m;
966
967			ct->ct_record_resid = uio.uio_resid;
968
969			/*
970			 * If we have the entire record, see if we can
971			 * match it to a request.
972			 */
973			if (ct->ct_record_resid == 0
974			    && ct->ct_record_eor) {
975				/*
976				 * The XID is in the first uint32_t of
977				 * the reply.
978				 */
979				if (ct->ct_record->m_len < sizeof(xid) &&
980				    m_length(ct->ct_record, NULL) <
981				    sizeof(xid)) {
982					m_freem(ct->ct_record);
983					break;
984				}
985				m_copydata(ct->ct_record, 0, sizeof(xid),
986				    (char *)&xid);
987				xid = ntohl(xid);
988
989				mtx_lock(&ct->ct_lock);
990				foundreq = 0;
991				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
992					if (cr->cr_xid == xid) {
993						/*
994						 * This one
995						 * matches. We leave
996						 * the reply mbuf in
997						 * cr->cr_mrep. Set
998						 * the XID to zero so
999						 * that we will ignore
1000						 * any duplicaed
1001						 * replies.
1002						 */
1003						cr->cr_xid = 0;
1004						cr->cr_mrep = ct->ct_record;
1005						cr->cr_error = 0;
1006						foundreq = 1;
1007						wakeup(cr);
1008						break;
1009					}
1010				}
1011				mtx_unlock(&ct->ct_lock);
1012
1013				if (!foundreq)
1014					m_freem(ct->ct_record);
1015				ct->ct_record = NULL;
1016			}
1017		}
1018	} while (m);
1019	ct->ct_upcallrefs--;
1020	if (ct->ct_upcallrefs < 0)
1021		panic("rpcvc upcall refcnt");
1022	if (ct->ct_upcallrefs == 0)
1023		wakeup(&ct->ct_upcallrefs);
1024	return (SU_OK);
1025}
1026
1027/*
1028 * Wait for all upcalls in progress to complete.
1029 */
1030static void
1031clnt_vc_upcallsdone(struct ct_data *ct)
1032{
1033
1034	SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1035
1036	while (ct->ct_upcallrefs > 0)
1037		(void) msleep(&ct->ct_upcallrefs,
1038		    SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1039}
1040