clnt_vc.c revision 180025
155714Skris/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
255714Skris
355714Skris/*
455714Skris * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
555714Skris * unrestricted use provided that this legend is included on all tape
655714Skris * media and as a part of the software program in whole or part.  Users
755714Skris * may copy or modify Sun RPC without charge, but are not authorized
855714Skris * to license or distribute it to anyone else except as part of a product or
955714Skris * program developed by the user.
1055714Skris *
1155714Skris * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
1255714Skris * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
1355714Skris * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
1455714Skris *
1555714Skris * Sun RPC is provided with no support and without any obligation on the
1655714Skris * part of Sun Microsystems, Inc. to assist in its use, correction,
1755714Skris * modification or enhancement.
1855714Skris *
1955714Skris * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
2055714Skris * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
2155714Skris * OR ANY PART THEREOF.
2255714Skris *
2355714Skris * In no event will Sun Microsystems, Inc. be liable for any lost revenue
2455714Skris * or profits or other special, indirect and consequential damages, even if
2555714Skris * Sun has been advised of the possibility of such damages.
2655714Skris *
2755714Skris * Sun Microsystems, Inc.
2855714Skris * 2550 Garcia Avenue
2955714Skris * Mountain View, California  94043
3055714Skris */
3155714Skris
3255714Skris#if defined(LIBC_SCCS) && !defined(lint)
3355714Skrisstatic char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
3455714Skrisstatic char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
3555714Skrisstatic char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
3655714Skris#endif
3755714Skris#include <sys/cdefs.h>
3855714Skris__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 180025 2008-06-26 10:21:54Z dfr $");
3955714Skris
4055714Skris/*
4155714Skris * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
4255714Skris *
4355714Skris * Copyright (C) 1984, Sun Microsystems, Inc.
4455714Skris *
4555714Skris * TCP based RPC supports 'batched calls'.
4655714Skris * A sequence of calls may be batched-up in a send buffer.  The rpc call
4755714Skris * return immediately to the client even though the call was not necessarily
4855714Skris * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
4955714Skris * the rpc timeout value is zero (see clnt.h, rpc).
5055714Skris *
5155714Skris * Clients should NOT casually batch calls that in fact return results; that is,
5255714Skris * the server side should be aware that a call is batched and not produce any
5355714Skris * return message.  Batched calls that produce many result messages can
5455714Skris * deadlock (netlock) the client and the server....
5555714Skris *
5655714Skris * Now go hang yourself.
5755714Skris */
5855714Skris
5955714Skris#include <sys/param.h>
6055714Skris#include <sys/systm.h>
6155714Skris#include <sys/lock.h>
6255714Skris#include <sys/malloc.h>
6355714Skris#include <sys/mbuf.h>
6455714Skris#include <sys/mutex.h>
65111147Snectar#include <sys/pcpu.h>
6655714Skris#include <sys/proc.h>
6755714Skris#include <sys/socket.h>
6855714Skris#include <sys/socketvar.h>
6955714Skris#include <sys/syslog.h>
7055714Skris#include <sys/time.h>
7155714Skris#include <sys/uio.h>
7255714Skris
7355714Skris#include <rpc/rpc.h>
7455714Skris#include <rpc/rpc_com.h>
7555714Skris
7655714Skris#define MCALL_MSG_SIZE 24
7755714Skris
7855714Skrisstruct cmessage {
7955714Skris        struct cmsghdr cmsg;
8055714Skris        struct cmsgcred cmcred;
8155714Skris};
8255714Skris
8355714Skrisstatic enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
8455714Skris    rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
8555714Skrisstatic void clnt_vc_geterr(CLIENT *, struct rpc_err *);
8655714Skrisstatic bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
8755714Skrisstatic void clnt_vc_abort(CLIENT *);
8855714Skrisstatic bool_t clnt_vc_control(CLIENT *, u_int, void *);
8955714Skrisstatic void clnt_vc_destroy(CLIENT *);
9055714Skrisstatic bool_t time_not_ok(struct timeval *);
9155714Skrisstatic void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
9255714Skris
9355714Skrisstatic struct clnt_ops clnt_vc_ops = {
9455714Skris	.cl_call =	clnt_vc_call,
9555714Skris	.cl_abort =	clnt_vc_abort,
9655714Skris	.cl_geterr =	clnt_vc_geterr,
9755714Skris	.cl_freeres =	clnt_vc_freeres,
9855714Skris	.cl_destroy =	clnt_vc_destroy,
9955714Skris	.cl_control =	clnt_vc_control
10055714Skris};
10155714Skris
10255714Skris/*
10355714Skris * A pending RPC request which awaits a reply. Requests which have
10455714Skris * received their reply will have cr_xid set to zero and cr_mrep to
10555714Skris * the mbuf chain of the reply.
10655714Skris */
10755714Skrisstruct ct_request {
10855714Skris	TAILQ_ENTRY(ct_request) cr_link;
10955714Skris	uint32_t		cr_xid;		/* XID of request */
11055714Skris	struct mbuf		*cr_mrep;	/* reply received by upcall */
11155714Skris	int			cr_error;	/* any error from upcall */
11255714Skris};
11355714Skris
11455714SkrisTAILQ_HEAD(ct_request_list, ct_request);
11555714Skris
11655714Skrisstruct ct_data {
11755714Skris	struct mtx	ct_lock;
11855714Skris	int		ct_threads;	/* number of threads in clnt_vc_call */
11955714Skris	bool_t		ct_closing;	/* TRUE if we are destroying client */
120111147Snectar	struct socket	*ct_socket;	/* connection socket */
12155714Skris	bool_t		ct_closeit;	/* close it on destroy */
12255714Skris	struct timeval	ct_wait;	/* wait interval in milliseconds */
12355714Skris	struct sockaddr_storage	ct_addr; /* remote addr */
12455714Skris	struct rpc_err	ct_error;
12555714Skris	uint32_t	ct_xid;
12655714Skris	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
12755714Skris	size_t		ct_mpos;	/* pos after marshal */
12855714Skris	const char	*ct_waitchan;
12955714Skris	int		ct_waitflag;
13055714Skris	struct mbuf	*ct_record;	/* current reply record */
13155714Skris	size_t		ct_record_resid; /* how much left of reply to read */
13255714Skris	bool_t		ct_record_eor;	 /* true if reading last fragment */
13355714Skris	struct ct_request_list ct_pending;
13455714Skris};
135
136static const char clnt_vc_errstr[] = "%s : %s";
137static const char clnt_vc_str[] = "clnt_vc_create";
138static const char clnt_read_vc_str[] = "read_vc";
139static const char __no_mem_str[] = "out of memory";
140
141/*
142 * Create a client handle for a connection.
143 * Default options are set, which the user can change using clnt_control()'s.
144 * The rpc/vc package does buffering similar to stdio, so the client
145 * must pick send and receive buffer sizes, 0 => use the default.
146 * NB: fd is copied into a private area.
147 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
148 * set this something more useful.
149 *
150 * fd should be an open socket
151 */
152CLIENT *
153clnt_vc_create(
154	struct socket *so,		/* open file descriptor */
155	struct sockaddr *raddr,		/* servers address */
156	const rpcprog_t prog,		/* program number */
157	const rpcvers_t vers,		/* version number */
158	size_t sendsz,			/* buffer recv size */
159	size_t recvsz)			/* buffer send size */
160{
161	CLIENT *cl;			/* client handle */
162	struct ct_data *ct = NULL;	/* client handle */
163	struct timeval now;
164	struct rpc_msg call_msg;
165	static uint32_t disrupt;
166	struct __rpc_sockinfo si;
167	XDR xdrs;
168	int error, interrupted;
169
170	if (disrupt == 0)
171		disrupt = (uint32_t)(long)raddr;
172
173	cl = (CLIENT *)mem_alloc(sizeof (*cl));
174	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
175
176	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
177	ct->ct_threads = 0;
178	ct->ct_closing = FALSE;
179
180	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
181		error = soconnect(so, raddr, curthread);
182		SOCK_LOCK(so);
183		interrupted = 0;
184		while ((so->so_state & SS_ISCONNECTING)
185		    && so->so_error == 0) {
186			error = msleep(&so->so_timeo, SOCK_MTX(so),
187			    PSOCK | PCATCH, "connec", 0);
188			if (error) {
189				if (error == EINTR || error == ERESTART)
190					interrupted = 1;
191				break;
192			}
193		}
194		if (error == 0) {
195			error = so->so_error;
196			so->so_error = 0;
197		}
198		SOCK_UNLOCK(so);
199		if (error) {
200			if (!interrupted)
201				so->so_state &= ~SS_ISCONNECTING;
202			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
203			rpc_createerr.cf_error.re_errno = error;
204			goto err;
205		}
206	}
207
208	if (!__rpc_socket2sockinfo(so, &si))
209		goto err;
210
211	ct->ct_closeit = FALSE;
212
213	/*
214	 * Set up private data struct
215	 */
216	ct->ct_socket = so;
217	ct->ct_wait.tv_sec = -1;
218	ct->ct_wait.tv_usec = -1;
219	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
220
221	/*
222	 * Initialize call message
223	 */
224	getmicrotime(&now);
225	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
226	call_msg.rm_xid = ct->ct_xid;
227	call_msg.rm_direction = CALL;
228	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
229	call_msg.rm_call.cb_prog = (uint32_t)prog;
230	call_msg.rm_call.cb_vers = (uint32_t)vers;
231
232	/*
233	 * pre-serialize the static part of the call msg and stash it away
234	 */
235	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
236	    XDR_ENCODE);
237	if (! xdr_callhdr(&xdrs, &call_msg)) {
238		if (ct->ct_closeit) {
239			soclose(ct->ct_socket);
240		}
241		goto err;
242	}
243	ct->ct_mpos = XDR_GETPOS(&xdrs);
244	XDR_DESTROY(&xdrs);
245	ct->ct_waitchan = "rpcrecv";
246	ct->ct_waitflag = 0;
247
248	/*
249	 * Create a client handle which uses xdrrec for serialization
250	 * and authnone for authentication.
251	 */
252	cl->cl_refs = 1;
253	cl->cl_ops = &clnt_vc_ops;
254	cl->cl_private = ct;
255	cl->cl_auth = authnone_create();
256	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
257	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
258
259	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
260	ct->ct_socket->so_upcallarg = ct;
261	ct->ct_socket->so_upcall = clnt_vc_soupcall;
262	ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
263	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
264
265	ct->ct_record = NULL;
266	ct->ct_record_resid = 0;
267	TAILQ_INIT(&ct->ct_pending);
268	return (cl);
269
270err:
271	if (cl) {
272		if (ct) {
273			mem_free(ct, sizeof (struct ct_data));
274		}
275		if (cl)
276			mem_free(cl, sizeof (CLIENT));
277	}
278	return ((CLIENT *)NULL);
279}
280
281static enum clnt_stat
282clnt_vc_call(
283	CLIENT *cl,
284	struct rpc_callextra *ext,
285	rpcproc_t proc,
286	xdrproc_t xdr_args,
287	void *args_ptr,
288	xdrproc_t xdr_results,
289	void *results_ptr,
290	struct timeval utimeout)
291{
292	struct ct_data *ct = (struct ct_data *) cl->cl_private;
293	AUTH *auth;
294	XDR xdrs;
295	struct rpc_msg reply_msg;
296	bool_t ok;
297	int nrefreshes = 2;		/* number of times to refresh cred */
298	struct timeval timeout;
299	uint32_t xid;
300	struct mbuf *mreq = NULL;
301	struct ct_request *cr;
302	int error;
303
304	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
305
306	mtx_lock(&ct->ct_lock);
307
308	if (ct->ct_closing) {
309		mtx_unlock(&ct->ct_lock);
310		free(cr, M_RPC);
311		return (RPC_CANTSEND);
312	}
313	ct->ct_threads++;
314
315	if (ext)
316		auth = ext->rc_auth;
317	else
318		auth = cl->cl_auth;
319
320	cr->cr_mrep = NULL;
321	cr->cr_error = 0;
322
323	if (ct->ct_wait.tv_usec == -1) {
324		timeout = utimeout;	/* use supplied timeout */
325	} else {
326		timeout = ct->ct_wait;	/* use default timeout */
327	}
328
329call_again:
330	mtx_assert(&ct->ct_lock, MA_OWNED);
331
332	ct->ct_xid++;
333	xid = ct->ct_xid;
334
335	mtx_unlock(&ct->ct_lock);
336
337	/*
338	 * Leave space to pre-pend the record mark.
339	 */
340	MGETHDR(mreq, M_WAIT, MT_DATA);
341	MCLGET(mreq, M_WAIT);
342	mreq->m_len = 0;
343	mreq->m_data += sizeof(uint32_t);
344	m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
345
346	/*
347	 * The XID is the first thing in the request.
348	 */
349	*mtod(mreq, uint32_t *) = htonl(xid);
350
351	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
352
353	ct->ct_error.re_status = RPC_SUCCESS;
354
355	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
356	    (! AUTH_MARSHALL(auth, &xdrs)) ||
357	    (! (*xdr_args)(&xdrs, args_ptr))) {
358		if (ct->ct_error.re_status == RPC_SUCCESS)
359			ct->ct_error.re_status = RPC_CANTENCODEARGS;
360		mtx_lock(&ct->ct_lock);
361		goto out;
362	}
363	m_fixhdr(mreq);
364
365	/*
366	 * Prepend a record marker containing the packet length.
367	 */
368	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
369	*mtod(mreq, uint32_t *) =
370		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
371
372	cr->cr_xid = xid;
373	mtx_lock(&ct->ct_lock);
374	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
375	mtx_unlock(&ct->ct_lock);
376
377	/*
378	 * sosend consumes mreq.
379	 */
380	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
381	mreq = NULL;
382
383	reply_msg.acpted_rply.ar_verf = _null_auth;
384	reply_msg.acpted_rply.ar_results.where = results_ptr;
385	reply_msg.acpted_rply.ar_results.proc = xdr_results;
386
387	mtx_lock(&ct->ct_lock);
388	if (error) {
389		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
390		ct->ct_error.re_errno = error;
391		ct->ct_error.re_status = RPC_CANTSEND;
392		goto out;
393	}
394
395	/*
396	 * Check to see if we got an upcall while waiting for the
397	 * lock. In both these cases, the request has been removed
398	 * from ct->ct_pending.
399	 */
400	if (cr->cr_error) {
401		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
402		ct->ct_error.re_errno = cr->cr_error;
403		ct->ct_error.re_status = RPC_CANTRECV;
404		goto out;
405	}
406	if (cr->cr_mrep) {
407		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
408		goto got_reply;
409	}
410
411	/*
412	 * Hack to provide rpc-based message passing
413	 */
414	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
415		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
416		ct->ct_error.re_status = RPC_TIMEDOUT;
417		goto out;
418	}
419
420	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
421	    tvtohz(&timeout));
422
423	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424
425	if (error) {
426		/*
427		 * The sleep returned an error so our request is still
428		 * on the list. Turn the error code into an
429		 * appropriate client status.
430		 */
431		ct->ct_error.re_errno = error;
432		switch (error) {
433		case EINTR:
434			ct->ct_error.re_status = RPC_INTR;
435			break;
436		case EWOULDBLOCK:
437			ct->ct_error.re_status = RPC_TIMEDOUT;
438			break;
439		default:
440			ct->ct_error.re_status = RPC_CANTRECV;
441		}
442		goto out;
443	} else {
444		/*
445		 * We were woken up by the upcall.  If the
446		 * upcall had a receive error, report that,
447		 * otherwise we have a reply.
448		 */
449		if (cr->cr_error) {
450			ct->ct_error.re_errno = cr->cr_error;
451			ct->ct_error.re_status = RPC_CANTRECV;
452			goto out;
453		}
454	}
455
456got_reply:
457	/*
458	 * Now decode and validate the response. We need to drop the
459	 * lock since xdr_replymsg may end up sleeping in malloc.
460	 */
461	mtx_unlock(&ct->ct_lock);
462
463	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
464	ok = xdr_replymsg(&xdrs, &reply_msg);
465	XDR_DESTROY(&xdrs);
466	cr->cr_mrep = NULL;
467
468	mtx_lock(&ct->ct_lock);
469
470	if (ok) {
471		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
472			(reply_msg.acpted_rply.ar_stat == SUCCESS))
473			ct->ct_error.re_status = RPC_SUCCESS;
474		else
475			_seterr_reply(&reply_msg, &(ct->ct_error));
476
477		if (ct->ct_error.re_status == RPC_SUCCESS) {
478			if (! AUTH_VALIDATE(cl->cl_auth,
479					    &reply_msg.acpted_rply.ar_verf)) {
480				ct->ct_error.re_status = RPC_AUTHERROR;
481				ct->ct_error.re_why = AUTH_INVALIDRESP;
482			}
483			if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
484				xdrs.x_op = XDR_FREE;
485				(void) xdr_opaque_auth(&xdrs,
486					&(reply_msg.acpted_rply.ar_verf));
487			}
488		}		/* end successful completion */
489		/*
490		 * If unsuccesful AND error is an authentication error
491		 * then refresh credentials and try again, else break
492		 */
493		else if (ct->ct_error.re_status == RPC_AUTHERROR)
494			/* maybe our credentials need to be refreshed ... */
495			if (nrefreshes > 0 &&
496			    AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
497				nrefreshes--;
498				goto call_again;
499			}
500		/* end of unsuccessful completion */
501	}	/* end of valid reply message */
502	else {
503		ct->ct_error.re_status = RPC_CANTDECODERES;
504	}
505out:
506	mtx_assert(&ct->ct_lock, MA_OWNED);
507
508	if (mreq)
509		m_freem(mreq);
510	if (cr->cr_mrep)
511		m_freem(cr->cr_mrep);
512
513	ct->ct_threads--;
514	if (ct->ct_closing)
515		wakeup(ct);
516
517	mtx_unlock(&ct->ct_lock);
518
519	free(cr, M_RPC);
520
521	return (ct->ct_error.re_status);
522}
523
524static void
525clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
526{
527	struct ct_data *ct = (struct ct_data *) cl->cl_private;
528
529	*errp = ct->ct_error;
530}
531
532static bool_t
533clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
534{
535	XDR xdrs;
536	bool_t dummy;
537
538	xdrs.x_op = XDR_FREE;
539	dummy = (*xdr_res)(&xdrs, res_ptr);
540
541	return (dummy);
542}
543
544/*ARGSUSED*/
545static void
546clnt_vc_abort(CLIENT *cl)
547{
548}
549
550static bool_t
551clnt_vc_control(CLIENT *cl, u_int request, void *info)
552{
553	struct ct_data *ct = (struct ct_data *)cl->cl_private;
554	void *infop = info;
555
556	mtx_lock(&ct->ct_lock);
557
558	switch (request) {
559	case CLSET_FD_CLOSE:
560		ct->ct_closeit = TRUE;
561		mtx_unlock(&ct->ct_lock);
562		return (TRUE);
563	case CLSET_FD_NCLOSE:
564		ct->ct_closeit = FALSE;
565		mtx_unlock(&ct->ct_lock);
566		return (TRUE);
567	default:
568		break;
569	}
570
571	/* for other requests which use info */
572	if (info == NULL) {
573		mtx_unlock(&ct->ct_lock);
574		return (FALSE);
575	}
576	switch (request) {
577	case CLSET_TIMEOUT:
578		if (time_not_ok((struct timeval *)info)) {
579			mtx_unlock(&ct->ct_lock);
580			return (FALSE);
581		}
582		ct->ct_wait = *(struct timeval *)infop;
583		break;
584	case CLGET_TIMEOUT:
585		*(struct timeval *)infop = ct->ct_wait;
586		break;
587	case CLGET_SERVER_ADDR:
588		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
589		break;
590	case CLGET_SVC_ADDR:
591		/*
592		 * Slightly different semantics to userland - we use
593		 * sockaddr instead of netbuf.
594		 */
595		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
596		break;
597	case CLSET_SVC_ADDR:		/* set to new address */
598		mtx_unlock(&ct->ct_lock);
599		return (FALSE);
600	case CLGET_XID:
601		*(uint32_t *)info = ct->ct_xid;
602		break;
603	case CLSET_XID:
604		/* This will set the xid of the NEXT call */
605		/* decrement by 1 as clnt_vc_call() increments once */
606		ct->ct_xid = *(uint32_t *)info - 1;
607		break;
608	case CLGET_VERS:
609		/*
610		 * This RELIES on the information that, in the call body,
611		 * the version number field is the fifth field from the
612		 * begining of the RPC header. MUST be changed if the
613		 * call_struct is changed
614		 */
615		*(uint32_t *)info =
616		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
617		    4 * BYTES_PER_XDR_UNIT));
618		break;
619
620	case CLSET_VERS:
621		*(uint32_t *)(void *)(ct->ct_mcallc +
622		    4 * BYTES_PER_XDR_UNIT) =
623		    htonl(*(uint32_t *)info);
624		break;
625
626	case CLGET_PROG:
627		/*
628		 * This RELIES on the information that, in the call body,
629		 * the program number field is the fourth field from the
630		 * begining of the RPC header. MUST be changed if the
631		 * call_struct is changed
632		 */
633		*(uint32_t *)info =
634		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
635		    3 * BYTES_PER_XDR_UNIT));
636		break;
637
638	case CLSET_PROG:
639		*(uint32_t *)(void *)(ct->ct_mcallc +
640		    3 * BYTES_PER_XDR_UNIT) =
641		    htonl(*(uint32_t *)info);
642		break;
643
644	case CLSET_WAITCHAN:
645		ct->ct_waitchan = *(const char **)info;
646		break;
647
648	case CLGET_WAITCHAN:
649		*(const char **) info = ct->ct_waitchan;
650		break;
651
652	case CLSET_INTERRUPTIBLE:
653		if (*(int *) info)
654			ct->ct_waitflag = PCATCH;
655		else
656			ct->ct_waitflag = 0;
657		break;
658
659	case CLGET_INTERRUPTIBLE:
660		if (ct->ct_waitflag)
661			*(int *) info = TRUE;
662		else
663			*(int *) info = FALSE;
664		break;
665
666	default:
667		mtx_unlock(&ct->ct_lock);
668		return (FALSE);
669	}
670
671	mtx_unlock(&ct->ct_lock);
672	return (TRUE);
673}
674
675static void
676clnt_vc_destroy(CLIENT *cl)
677{
678	struct ct_data *ct = (struct ct_data *) cl->cl_private;
679	struct ct_request *cr;
680	struct socket *so = NULL;
681
682	mtx_lock(&ct->ct_lock);
683
684	if (ct->ct_socket) {
685		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
686		ct->ct_socket->so_upcallarg = NULL;
687		ct->ct_socket->so_upcall = NULL;
688		ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
689		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
690
691		/*
692		 * Abort any pending requests and wait until everyone
693		 * has finished with clnt_vc_call.
694		 */
695		ct->ct_closing = TRUE;
696		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
697			cr->cr_xid = 0;
698			cr->cr_error = ESHUTDOWN;
699			wakeup(cr);
700		}
701
702		while (ct->ct_threads)
703			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
704
705		if (ct->ct_closeit) {
706			so = ct->ct_socket;
707		}
708	}
709
710	mtx_unlock(&ct->ct_lock);
711
712	mtx_destroy(&ct->ct_lock);
713	if (so) {
714		soshutdown(so, SHUT_WR);
715		soclose(so);
716	}
717	mem_free(ct, sizeof(struct ct_data));
718	mem_free(cl, sizeof(CLIENT));
719}
720
721/*
722 * Make sure that the time is not garbage.   -1 value is disallowed.
723 * Note this is different from time_not_ok in clnt_dg.c
724 */
725static bool_t
726time_not_ok(struct timeval *t)
727{
728	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
729		t->tv_usec <= -1 || t->tv_usec > 1000000);
730}
731
732void
733clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
734{
735	struct ct_data *ct = (struct ct_data *) arg;
736	struct uio uio;
737	struct mbuf *m;
738	struct ct_request *cr;
739	int error, rcvflag, foundreq;
740	uint32_t xid, header;
741
742	uio.uio_td = curthread;
743	do {
744		/*
745		 * If ct_record_resid is zero, we are waiting for a
746		 * record mark.
747		 */
748		if (ct->ct_record_resid == 0) {
749			bool_t do_read;
750
751			/*
752			 * Make sure there is either a whole record
753			 * mark in the buffer or there is some other
754			 * error condition
755			 */
756			do_read = FALSE;
757			SOCKBUF_LOCK(&so->so_rcv);
758			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
759			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
760			    || so->so_error)
761				do_read = TRUE;
762			SOCKBUF_UNLOCK(&so->so_rcv);
763
764			if (!do_read)
765				return;
766
767			uio.uio_resid = sizeof(uint32_t);
768			m = NULL;
769			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
770			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
771
772			if (error == EWOULDBLOCK)
773				break;
774
775			/*
776			 * If there was an error, wake up all pending
777			 * requests.
778			 */
779			if (error || uio.uio_resid > 0) {
780			wakeup_all:
781				mtx_lock(&ct->ct_lock);
782				if (!error) {
783					/*
784					 * We must have got EOF trying
785					 * to read from the stream.
786					 */
787					error = ECONNRESET;
788				}
789				ct->ct_error.re_status = RPC_CANTRECV;
790				ct->ct_error.re_errno = error;
791				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
792					cr->cr_error = error;
793					wakeup(cr);
794				}
795				mtx_unlock(&ct->ct_lock);
796				break;
797			}
798			memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
799			header = ntohl(header);
800			ct->ct_record = NULL;
801			ct->ct_record_resid = header & 0x7fffffff;
802			ct->ct_record_eor = ((header & 0x80000000) != 0);
803			m_freem(m);
804		} else {
805			/*
806			 * We have the record mark. Read as much as
807			 * the socket has buffered up to the end of
808			 * this record.
809			 */
810			uio.uio_resid = ct->ct_record_resid;
811			m = NULL;
812			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
813			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
814
815			if (error == EWOULDBLOCK)
816				break;
817
818			if (error || uio.uio_resid == ct->ct_record_resid)
819				goto wakeup_all;
820
821			/*
822			 * If we have part of the record already,
823			 * chain this bit onto the end.
824			 */
825			if (ct->ct_record)
826				m_last(ct->ct_record)->m_next = m;
827			else
828				ct->ct_record = m;
829
830			ct->ct_record_resid = uio.uio_resid;
831
832			/*
833			 * If we have the entire record, see if we can
834			 * match it to a request.
835			 */
836			if (ct->ct_record_resid == 0
837			    && ct->ct_record_eor) {
838				/*
839				 * The XID is in the first uint32_t of
840				 * the reply.
841				 */
842				ct->ct_record =
843					m_pullup(ct->ct_record, sizeof(xid));
844				if (!ct->ct_record)
845					break;
846				memcpy(&xid,
847				    mtod(ct->ct_record, uint32_t *),
848				    sizeof(uint32_t));
849				xid = ntohl(xid);
850
851				mtx_lock(&ct->ct_lock);
852				foundreq = 0;
853				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
854					if (cr->cr_xid == xid) {
855						/*
856						 * This one
857						 * matches. We leave
858						 * the reply mbuf in
859						 * cr->cr_mrep. Set
860						 * the XID to zero so
861						 * that we will ignore
862						 * any duplicaed
863						 * replies.
864						 */
865						cr->cr_xid = 0;
866						cr->cr_mrep = ct->ct_record;
867						cr->cr_error = 0;
868						foundreq = 1;
869						wakeup(cr);
870						break;
871					}
872				}
873				mtx_unlock(&ct->ct_lock);
874
875				if (!foundreq)
876					m_freem(ct->ct_record);
877				ct->ct_record = NULL;
878			}
879		}
880	} while (m);
881}
882