clnt_vc.c revision 193272
122652Smpp/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
222652Smpp
322652Smpp/*
422652Smpp * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
522652Smpp * unrestricted use provided that this legend is included on all tape
622652Smpp * media and as a part of the software program in whole or part.  Users
722652Smpp * may copy or modify Sun RPC without charge, but are not authorized
822652Smpp * to license or distribute it to anyone else except as part of a product or
922652Smpp * program developed by the user.
1022652Smpp *
1122652Smpp * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
1222652Smpp * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
1322652Smpp * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
1422652Smpp *
1522652Smpp * Sun RPC is provided with no support and without any obligation on the
1622652Smpp * part of Sun Microsystems, Inc. to assist in its use, correction,
1722652Smpp * modification or enhancement.
1822652Smpp *
1922652Smpp * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
2022652Smpp * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
2122652Smpp * OR ANY PART THEREOF.
2222652Smpp *
2322652Smpp * In no event will Sun Microsystems, Inc. be liable for any lost revenue
2422652Smpp * or profits or other special, indirect and consequential damages, even if
2522652Smpp * Sun has been advised of the possibility of such damages.
2622652Smpp *
2722652Smpp * Sun Microsystems, Inc.
2855547Sphantom * 2550 Garcia Avenue
2950476Speter * Mountain View, California  94043
3048795Snik */
31226119Sdes
3222652Smpp#if defined(LIBC_SCCS) && !defined(lint)
3322652Smppstatic char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
3422652Smppstatic char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
3522652Smppstatic char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
3622652Smpp#endif
37226119Sdes#include <sys/cdefs.h>
38226119Sdes__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 193272 2009-06-01 21:17:03Z jhb $");
39226119Sdes
4022652Smpp/*
4184306Sru * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
4284306Sru *
4322652Smpp * Copyright (C) 1984, Sun Microsystems, Inc.
4424817Sbde *
4522652Smpp * TCP based RPC supports 'batched calls'.
4624817Sbde * A sequence of calls may be batched-up in a send buffer.  The rpc call
4722652Smpp * return immediately to the client even though the call was not necessarily
4824817Sbde * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49226119Sdes * the rpc timeout value is zero (see clnt.h, rpc).
50226119Sdes *
5122652Smpp * Clients should NOT casually batch calls that in fact return results; that is,
52226119Sdes * the server side should be aware that a call is batched and not produce any
5357731Ssheldonh * return message.  Batched calls that produce many result messages can
5422652Smpp * deadlock (netlock) the client and the server....
5522652Smpp *
5622652Smpp * Now go hang yourself.
5722652Smpp */
5822652Smpp
5922652Smpp#include <sys/param.h>
6022652Smpp#include <sys/systm.h>
6122652Smpp#include <sys/lock.h>
6222652Smpp#include <sys/malloc.h>
6322652Smpp#include <sys/mbuf.h>
6422652Smpp#include <sys/mutex.h>
6522652Smpp#include <sys/pcpu.h>
6657731Ssheldonh#include <sys/proc.h>
6757731Ssheldonh#include <sys/protosw.h>
6822652Smpp#include <sys/socket.h>
6922652Smpp#include <sys/socketvar.h>
7022652Smpp#include <sys/syslog.h>
7122652Smpp#include <sys/time.h>
7222652Smpp#include <sys/uio.h>
7322652Smpp#include <netinet/tcp.h>
7457731Ssheldonh
7557731Ssheldonh#include <rpc/rpc.h>
7622652Smpp#include <rpc/rpc_com.h>
7722652Smpp
7822652Smpp#define MCALL_MSG_SIZE 24
7922652Smpp
8022652Smppstruct cmessage {
8122652Smpp        struct cmsghdr cmsg;
8281251Sru        struct cmsgcred cmcred;
8381251Sru};
8422652Smpp
8522652Smppstatic enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
8622652Smpp    rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
8722652Smppstatic void clnt_vc_geterr(CLIENT *, struct rpc_err *);
8881251Srustatic bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
8922652Smppstatic void clnt_vc_abort(CLIENT *);
9081251Srustatic bool_t clnt_vc_control(CLIENT *, u_int, void *);
9122652Smppstatic void clnt_vc_close(CLIENT *);
9222652Smppstatic void clnt_vc_destroy(CLIENT *);
9322652Smppstatic bool_t time_not_ok(struct timeval *);
9422652Smppstatic int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
9522652Smpp
9622652Smppstatic struct clnt_ops clnt_vc_ops = {
9722652Smpp	.cl_call =	clnt_vc_call,
9822652Smpp	.cl_abort =	clnt_vc_abort,
9922652Smpp	.cl_geterr =	clnt_vc_geterr,
10022652Smpp	.cl_freeres =	clnt_vc_freeres,
10122652Smpp	.cl_close =	clnt_vc_close,
10222652Smpp	.cl_destroy =	clnt_vc_destroy,
10322652Smpp	.cl_control =	clnt_vc_control
10422652Smpp};
10522652Smpp
10622652Smpp/*
10722652Smpp * A pending RPC request which awaits a reply. Requests which have
10822652Smpp * received their reply will have cr_xid set to zero and cr_mrep to
10922652Smpp * the mbuf chain of the reply.
11022652Smpp */
11122652Smppstruct ct_request {
11222652Smpp	TAILQ_ENTRY(ct_request) cr_link;
11322652Smpp	uint32_t		cr_xid;		/* XID of request */
11422652Smpp	struct mbuf		*cr_mrep;	/* reply received by upcall */
11522652Smpp	int			cr_error;	/* any error from upcall */
11622652Smpp	char			cr_verf[MAX_AUTH_BYTES]; /* reply verf */
11757731Ssheldonh};
11857731Ssheldonh
11922652SmppTAILQ_HEAD(ct_request_list, ct_request);
12022652Smpp
12122652Smppstruct ct_data {
12222652Smpp	struct mtx	ct_lock;
12322652Smpp	int		ct_threads;	/* number of threads in clnt_vc_call */
12422652Smpp	bool_t		ct_closing;	/* TRUE if we are closing */
12522652Smpp	bool_t		ct_closed;	/* TRUE if we are closed */
12622652Smpp	struct socket	*ct_socket;	/* connection socket */
12722652Smpp	bool_t		ct_closeit;	/* close it on destroy */
12822652Smpp	struct timeval	ct_wait;	/* wait interval in milliseconds */
12922652Smpp	struct sockaddr_storage	ct_addr; /* remote addr */
13022652Smpp	struct rpc_err	ct_error;
13122652Smpp	uint32_t	ct_xid;
13222652Smpp	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
13322652Smpp	size_t		ct_mpos;	/* pos after marshal */
13422652Smpp	const char	*ct_waitchan;
13522652Smpp	int		ct_waitflag;
13622652Smpp	struct mbuf	*ct_record;	/* current reply record */
13757731Ssheldonh	size_t		ct_record_resid; /* how much left of reply to read */
13857731Ssheldonh	bool_t		ct_record_eor;	 /* true if reading last fragment */
13922652Smpp	struct ct_request_list ct_pending;
14022652Smpp};
141226119Sdes
142226119Sdesstatic const char clnt_vc_errstr[] = "%s : %s";
143226119Sdesstatic const char clnt_vc_str[] = "clnt_vc_create";
144226119Sdesstatic const char clnt_read_vc_str[] = "read_vc";
145226119Sdesstatic const char __no_mem_str[] = "out of memory";
146226119Sdes
147226119Sdes/*
14822652Smpp * Create a client handle for a connection.
14922652Smpp * Default options are set, which the user can change using clnt_control()'s.
150137840Sjkoshy * The rpc/vc package does buffering similar to stdio, so the client
15122652Smpp * must pick send and receive buffer sizes, 0 => use the default.
152 * NB: fd is copied into a private area.
153 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
154 * set this something more useful.
155 *
156 * fd should be an open socket
157 */
158CLIENT *
159clnt_vc_create(
160	struct socket *so,		/* open file descriptor */
161	struct sockaddr *raddr,		/* servers address */
162	const rpcprog_t prog,		/* program number */
163	const rpcvers_t vers,		/* version number */
164	size_t sendsz,			/* buffer recv size */
165	size_t recvsz)			/* buffer send size */
166{
167	CLIENT *cl;			/* client handle */
168	struct ct_data *ct = NULL;	/* client handle */
169	struct timeval now;
170	struct rpc_msg call_msg;
171	static uint32_t disrupt;
172	struct __rpc_sockinfo si;
173	XDR xdrs;
174	int error, interrupted, one = 1;
175	struct sockopt sopt;
176
177	if (disrupt == 0)
178		disrupt = (uint32_t)(long)raddr;
179
180	cl = (CLIENT *)mem_alloc(sizeof (*cl));
181	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
182
183	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
184	ct->ct_threads = 0;
185	ct->ct_closing = FALSE;
186	ct->ct_closed = FALSE;
187
188	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
189		error = soconnect(so, raddr, curthread);
190		SOCK_LOCK(so);
191		interrupted = 0;
192		while ((so->so_state & SS_ISCONNECTING)
193		    && so->so_error == 0) {
194			error = msleep(&so->so_timeo, SOCK_MTX(so),
195			    PSOCK | PCATCH, "connec", 0);
196			if (error) {
197				if (error == EINTR || error == ERESTART)
198					interrupted = 1;
199				break;
200			}
201		}
202		if (error == 0) {
203			error = so->so_error;
204			so->so_error = 0;
205		}
206		SOCK_UNLOCK(so);
207		if (error) {
208			if (!interrupted)
209				so->so_state &= ~SS_ISCONNECTING;
210			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
211			rpc_createerr.cf_error.re_errno = error;
212			goto err;
213		}
214	}
215
216	if (!__rpc_socket2sockinfo(so, &si))
217		goto err;
218
219	if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
220		bzero(&sopt, sizeof(sopt));
221		sopt.sopt_dir = SOPT_SET;
222		sopt.sopt_level = SOL_SOCKET;
223		sopt.sopt_name = SO_KEEPALIVE;
224		sopt.sopt_val = &one;
225		sopt.sopt_valsize = sizeof(one);
226		sosetopt(so, &sopt);
227	}
228
229	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
230		bzero(&sopt, sizeof(sopt));
231		sopt.sopt_dir = SOPT_SET;
232		sopt.sopt_level = IPPROTO_TCP;
233		sopt.sopt_name = TCP_NODELAY;
234		sopt.sopt_val = &one;
235		sopt.sopt_valsize = sizeof(one);
236		sosetopt(so, &sopt);
237	}
238
239	ct->ct_closeit = FALSE;
240
241	/*
242	 * Set up private data struct
243	 */
244	ct->ct_socket = so;
245	ct->ct_wait.tv_sec = -1;
246	ct->ct_wait.tv_usec = -1;
247	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
248
249	/*
250	 * Initialize call message
251	 */
252	getmicrotime(&now);
253	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
254	call_msg.rm_xid = ct->ct_xid;
255	call_msg.rm_direction = CALL;
256	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
257	call_msg.rm_call.cb_prog = (uint32_t)prog;
258	call_msg.rm_call.cb_vers = (uint32_t)vers;
259
260	/*
261	 * pre-serialize the static part of the call msg and stash it away
262	 */
263	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
264	    XDR_ENCODE);
265	if (! xdr_callhdr(&xdrs, &call_msg)) {
266		if (ct->ct_closeit) {
267			soclose(ct->ct_socket);
268		}
269		goto err;
270	}
271	ct->ct_mpos = XDR_GETPOS(&xdrs);
272	XDR_DESTROY(&xdrs);
273	ct->ct_waitchan = "rpcrecv";
274	ct->ct_waitflag = 0;
275
276	/*
277	 * Create a client handle which uses xdrrec for serialization
278	 * and authnone for authentication.
279	 */
280	cl->cl_refs = 1;
281	cl->cl_ops = &clnt_vc_ops;
282	cl->cl_private = ct;
283	cl->cl_auth = authnone_create();
284	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
285	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
286	soreserve(ct->ct_socket, sendsz, recvsz);
287
288	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
289	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
290	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
291
292	ct->ct_record = NULL;
293	ct->ct_record_resid = 0;
294	TAILQ_INIT(&ct->ct_pending);
295	return (cl);
296
297err:
298	if (cl) {
299		if (ct) {
300			mtx_destroy(&ct->ct_lock);
301			mem_free(ct, sizeof (struct ct_data));
302		}
303		if (cl)
304			mem_free(cl, sizeof (CLIENT));
305	}
306	return ((CLIENT *)NULL);
307}
308
309static enum clnt_stat
310clnt_vc_call(
311	CLIENT		*cl,		/* client handle */
312	struct rpc_callextra *ext,	/* call metadata */
313	rpcproc_t	proc,		/* procedure number */
314	struct mbuf	*args,		/* pointer to args */
315	struct mbuf	**resultsp,	/* pointer to results */
316	struct timeval	utimeout)
317{
318	struct ct_data *ct = (struct ct_data *) cl->cl_private;
319	AUTH *auth;
320	struct rpc_err *errp;
321	enum clnt_stat stat;
322	XDR xdrs;
323	struct rpc_msg reply_msg;
324	bool_t ok;
325	int nrefreshes = 2;		/* number of times to refresh cred */
326	struct timeval timeout;
327	uint32_t xid;
328	struct mbuf *mreq = NULL, *results;
329	struct ct_request *cr;
330	int error;
331
332	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
333
334	mtx_lock(&ct->ct_lock);
335
336	if (ct->ct_closing || ct->ct_closed) {
337		mtx_unlock(&ct->ct_lock);
338		free(cr, M_RPC);
339		return (RPC_CANTSEND);
340	}
341	ct->ct_threads++;
342
343	if (ext) {
344		auth = ext->rc_auth;
345		errp = &ext->rc_err;
346	} else {
347		auth = cl->cl_auth;
348		errp = &ct->ct_error;
349	}
350
351	cr->cr_mrep = NULL;
352	cr->cr_error = 0;
353
354	if (ct->ct_wait.tv_usec == -1) {
355		timeout = utimeout;	/* use supplied timeout */
356	} else {
357		timeout = ct->ct_wait;	/* use default timeout */
358	}
359
360call_again:
361	mtx_assert(&ct->ct_lock, MA_OWNED);
362
363	ct->ct_xid++;
364	xid = ct->ct_xid;
365
366	mtx_unlock(&ct->ct_lock);
367
368	/*
369	 * Leave space to pre-pend the record mark.
370	 */
371	MGETHDR(mreq, M_WAIT, MT_DATA);
372	mreq->m_data += sizeof(uint32_t);
373	KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
374	    ("RPC header too big"));
375	bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
376	mreq->m_len = ct->ct_mpos;
377
378	/*
379	 * The XID is the first thing in the request.
380	 */
381	*mtod(mreq, uint32_t *) = htonl(xid);
382
383	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
384
385	errp->re_status = stat = RPC_SUCCESS;
386
387	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
388	    (! AUTH_MARSHALL(auth, xid, &xdrs,
389		m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
390		errp->re_status = stat = RPC_CANTENCODEARGS;
391		mtx_lock(&ct->ct_lock);
392		goto out;
393	}
394	mreq->m_pkthdr.len = m_length(mreq, NULL);
395
396	/*
397	 * Prepend a record marker containing the packet length.
398	 */
399	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
400	*mtod(mreq, uint32_t *) =
401		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
402
403	cr->cr_xid = xid;
404	mtx_lock(&ct->ct_lock);
405	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
406	mtx_unlock(&ct->ct_lock);
407
408	/*
409	 * sosend consumes mreq.
410	 */
411	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
412	mreq = NULL;
413	if (error == EMSGSIZE) {
414		SOCKBUF_LOCK(&ct->ct_socket->so_snd);
415		sbwait(&ct->ct_socket->so_snd);
416		SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
417		AUTH_VALIDATE(auth, xid, NULL, NULL);
418		mtx_lock(&ct->ct_lock);
419		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
420		goto call_again;
421	}
422
423	reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
424	reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
425	reply_msg.acpted_rply.ar_verf.oa_length = 0;
426	reply_msg.acpted_rply.ar_results.where = NULL;
427	reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
428
429	mtx_lock(&ct->ct_lock);
430	if (error) {
431		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
432		errp->re_errno = error;
433		errp->re_status = stat = RPC_CANTSEND;
434		goto out;
435	}
436
437	/*
438	 * Check to see if we got an upcall while waiting for the
439	 * lock. In both these cases, the request has been removed
440	 * from ct->ct_pending.
441	 */
442	if (cr->cr_error) {
443		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
444		errp->re_errno = cr->cr_error;
445		errp->re_status = stat = RPC_CANTRECV;
446		goto out;
447	}
448	if (cr->cr_mrep) {
449		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
450		goto got_reply;
451	}
452
453	/*
454	 * Hack to provide rpc-based message passing
455	 */
456	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
457		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
458		errp->re_status = stat = RPC_TIMEDOUT;
459		goto out;
460	}
461
462	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
463	    tvtohz(&timeout));
464
465	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
466
467	if (error) {
468		/*
469		 * The sleep returned an error so our request is still
470		 * on the list. Turn the error code into an
471		 * appropriate client status.
472		 */
473		errp->re_errno = error;
474		switch (error) {
475		case EINTR:
476			stat = RPC_INTR;
477			break;
478		case EWOULDBLOCK:
479			stat = RPC_TIMEDOUT;
480			break;
481		default:
482			stat = RPC_CANTRECV;
483		}
484		errp->re_status = stat;
485		goto out;
486	} else {
487		/*
488		 * We were woken up by the upcall.  If the
489		 * upcall had a receive error, report that,
490		 * otherwise we have a reply.
491		 */
492		if (cr->cr_error) {
493			errp->re_errno = cr->cr_error;
494			errp->re_status = stat = RPC_CANTRECV;
495			goto out;
496		}
497	}
498
499got_reply:
500	/*
501	 * Now decode and validate the response. We need to drop the
502	 * lock since xdr_replymsg may end up sleeping in malloc.
503	 */
504	mtx_unlock(&ct->ct_lock);
505
506	if (ext && ext->rc_feedback)
507		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
508
509	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
510	ok = xdr_replymsg(&xdrs, &reply_msg);
511	cr->cr_mrep = NULL;
512
513	if (ok) {
514		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
515		    (reply_msg.acpted_rply.ar_stat == SUCCESS))
516			errp->re_status = stat = RPC_SUCCESS;
517		else
518			stat = _seterr_reply(&reply_msg, errp);
519
520		if (stat == RPC_SUCCESS) {
521			results = xdrmbuf_getall(&xdrs);
522			if (!AUTH_VALIDATE(auth, xid,
523				&reply_msg.acpted_rply.ar_verf,
524				&results)) {
525				errp->re_status = stat = RPC_AUTHERROR;
526				errp->re_why = AUTH_INVALIDRESP;
527			} else {
528				KASSERT(results,
529				    ("auth validated but no result"));
530				*resultsp = results;
531			}
532		}		/* end successful completion */
533		/*
534		 * If unsuccesful AND error is an authentication error
535		 * then refresh credentials and try again, else break
536		 */
537		else if (stat == RPC_AUTHERROR)
538			/* maybe our credentials need to be refreshed ... */
539			if (nrefreshes > 0 &&
540			    AUTH_REFRESH(auth, &reply_msg)) {
541				nrefreshes--;
542				XDR_DESTROY(&xdrs);
543				mtx_lock(&ct->ct_lock);
544				goto call_again;
545			}
546		/* end of unsuccessful completion */
547	}	/* end of valid reply message */
548	else {
549		errp->re_status = stat = RPC_CANTDECODERES;
550	}
551	XDR_DESTROY(&xdrs);
552	mtx_lock(&ct->ct_lock);
553out:
554	mtx_assert(&ct->ct_lock, MA_OWNED);
555
556	KASSERT(stat != RPC_SUCCESS || *resultsp,
557	    ("RPC_SUCCESS without reply"));
558
559	if (mreq)
560		m_freem(mreq);
561	if (cr->cr_mrep)
562		m_freem(cr->cr_mrep);
563
564	ct->ct_threads--;
565	if (ct->ct_closing)
566		wakeup(ct);
567
568	mtx_unlock(&ct->ct_lock);
569
570	if (auth && stat != RPC_SUCCESS)
571		AUTH_VALIDATE(auth, xid, NULL, NULL);
572
573	free(cr, M_RPC);
574
575	return (stat);
576}
577
578static void
579clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
580{
581	struct ct_data *ct = (struct ct_data *) cl->cl_private;
582
583	*errp = ct->ct_error;
584}
585
586static bool_t
587clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
588{
589	XDR xdrs;
590	bool_t dummy;
591
592	xdrs.x_op = XDR_FREE;
593	dummy = (*xdr_res)(&xdrs, res_ptr);
594
595	return (dummy);
596}
597
598/*ARGSUSED*/
599static void
600clnt_vc_abort(CLIENT *cl)
601{
602}
603
604static bool_t
605clnt_vc_control(CLIENT *cl, u_int request, void *info)
606{
607	struct ct_data *ct = (struct ct_data *)cl->cl_private;
608	void *infop = info;
609
610	mtx_lock(&ct->ct_lock);
611
612	switch (request) {
613	case CLSET_FD_CLOSE:
614		ct->ct_closeit = TRUE;
615		mtx_unlock(&ct->ct_lock);
616		return (TRUE);
617	case CLSET_FD_NCLOSE:
618		ct->ct_closeit = FALSE;
619		mtx_unlock(&ct->ct_lock);
620		return (TRUE);
621	default:
622		break;
623	}
624
625	/* for other requests which use info */
626	if (info == NULL) {
627		mtx_unlock(&ct->ct_lock);
628		return (FALSE);
629	}
630	switch (request) {
631	case CLSET_TIMEOUT:
632		if (time_not_ok((struct timeval *)info)) {
633			mtx_unlock(&ct->ct_lock);
634			return (FALSE);
635		}
636		ct->ct_wait = *(struct timeval *)infop;
637		break;
638	case CLGET_TIMEOUT:
639		*(struct timeval *)infop = ct->ct_wait;
640		break;
641	case CLGET_SERVER_ADDR:
642		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
643		break;
644	case CLGET_SVC_ADDR:
645		/*
646		 * Slightly different semantics to userland - we use
647		 * sockaddr instead of netbuf.
648		 */
649		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
650		break;
651	case CLSET_SVC_ADDR:		/* set to new address */
652		mtx_unlock(&ct->ct_lock);
653		return (FALSE);
654	case CLGET_XID:
655		*(uint32_t *)info = ct->ct_xid;
656		break;
657	case CLSET_XID:
658		/* This will set the xid of the NEXT call */
659		/* decrement by 1 as clnt_vc_call() increments once */
660		ct->ct_xid = *(uint32_t *)info - 1;
661		break;
662	case CLGET_VERS:
663		/*
664		 * This RELIES on the information that, in the call body,
665		 * the version number field is the fifth field from the
666		 * begining of the RPC header. MUST be changed if the
667		 * call_struct is changed
668		 */
669		*(uint32_t *)info =
670		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
671		    4 * BYTES_PER_XDR_UNIT));
672		break;
673
674	case CLSET_VERS:
675		*(uint32_t *)(void *)(ct->ct_mcallc +
676		    4 * BYTES_PER_XDR_UNIT) =
677		    htonl(*(uint32_t *)info);
678		break;
679
680	case CLGET_PROG:
681		/*
682		 * This RELIES on the information that, in the call body,
683		 * the program number field is the fourth field from the
684		 * begining of the RPC header. MUST be changed if the
685		 * call_struct is changed
686		 */
687		*(uint32_t *)info =
688		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
689		    3 * BYTES_PER_XDR_UNIT));
690		break;
691
692	case CLSET_PROG:
693		*(uint32_t *)(void *)(ct->ct_mcallc +
694		    3 * BYTES_PER_XDR_UNIT) =
695		    htonl(*(uint32_t *)info);
696		break;
697
698	case CLSET_WAITCHAN:
699		ct->ct_waitchan = (const char *)info;
700		break;
701
702	case CLGET_WAITCHAN:
703		*(const char **) info = ct->ct_waitchan;
704		break;
705
706	case CLSET_INTERRUPTIBLE:
707		if (*(int *) info)
708			ct->ct_waitflag = PCATCH;
709		else
710			ct->ct_waitflag = 0;
711		break;
712
713	case CLGET_INTERRUPTIBLE:
714		if (ct->ct_waitflag)
715			*(int *) info = TRUE;
716		else
717			*(int *) info = FALSE;
718		break;
719
720	default:
721		mtx_unlock(&ct->ct_lock);
722		return (FALSE);
723	}
724
725	mtx_unlock(&ct->ct_lock);
726	return (TRUE);
727}
728
729static void
730clnt_vc_close(CLIENT *cl)
731{
732	struct ct_data *ct = (struct ct_data *) cl->cl_private;
733	struct ct_request *cr;
734
735	mtx_lock(&ct->ct_lock);
736
737	if (ct->ct_closed) {
738		mtx_unlock(&ct->ct_lock);
739		return;
740	}
741
742	if (ct->ct_closing) {
743		while (ct->ct_closing)
744			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
745		KASSERT(ct->ct_closed, ("client should be closed"));
746		mtx_unlock(&ct->ct_lock);
747		return;
748	}
749
750	if (ct->ct_socket) {
751		ct->ct_closing = TRUE;
752		mtx_unlock(&ct->ct_lock);
753
754		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
755		soupcall_clear(ct->ct_socket, SO_RCV);
756		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
757
758		/*
759		 * Abort any pending requests and wait until everyone
760		 * has finished with clnt_vc_call.
761		 */
762		mtx_lock(&ct->ct_lock);
763		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
764			cr->cr_xid = 0;
765			cr->cr_error = ESHUTDOWN;
766			wakeup(cr);
767		}
768
769		while (ct->ct_threads)
770			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
771	}
772
773	ct->ct_closing = FALSE;
774	ct->ct_closed = TRUE;
775	mtx_unlock(&ct->ct_lock);
776	wakeup(ct);
777}
778
779static void
780clnt_vc_destroy(CLIENT *cl)
781{
782	struct ct_data *ct = (struct ct_data *) cl->cl_private;
783	struct socket *so = NULL;
784
785	clnt_vc_close(cl);
786
787	mtx_lock(&ct->ct_lock);
788
789	if (ct->ct_socket) {
790		if (ct->ct_closeit) {
791			so = ct->ct_socket;
792		}
793	}
794
795	mtx_unlock(&ct->ct_lock);
796
797	mtx_destroy(&ct->ct_lock);
798	if (so) {
799		soshutdown(so, SHUT_WR);
800		soclose(so);
801	}
802	mem_free(ct, sizeof(struct ct_data));
803	mem_free(cl, sizeof(CLIENT));
804}
805
806/*
807 * Make sure that the time is not garbage.   -1 value is disallowed.
808 * Note this is different from time_not_ok in clnt_dg.c
809 */
810static bool_t
811time_not_ok(struct timeval *t)
812{
813	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
814		t->tv_usec <= -1 || t->tv_usec > 1000000);
815}
816
817int
818clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
819{
820	struct ct_data *ct = (struct ct_data *) arg;
821	struct uio uio;
822	struct mbuf *m;
823	struct ct_request *cr;
824	int error, rcvflag, foundreq;
825	uint32_t xid, header;
826	bool_t do_read;
827
828	uio.uio_td = curthread;
829	do {
830		/*
831		 * If ct_record_resid is zero, we are waiting for a
832		 * record mark.
833		 */
834		if (ct->ct_record_resid == 0) {
835
836			/*
837			 * Make sure there is either a whole record
838			 * mark in the buffer or there is some other
839			 * error condition
840			 */
841			do_read = FALSE;
842			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
843			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
844			    || so->so_error)
845				do_read = TRUE;
846
847			if (!do_read)
848				return (SU_OK);
849
850			SOCKBUF_UNLOCK(&so->so_rcv);
851			uio.uio_resid = sizeof(uint32_t);
852			m = NULL;
853			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
854			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
855			SOCKBUF_LOCK(&so->so_rcv);
856
857			if (error == EWOULDBLOCK)
858				break;
859
860			/*
861			 * If there was an error, wake up all pending
862			 * requests.
863			 */
864			if (error || uio.uio_resid > 0) {
865			wakeup_all:
866				mtx_lock(&ct->ct_lock);
867				if (!error) {
868					/*
869					 * We must have got EOF trying
870					 * to read from the stream.
871					 */
872					error = ECONNRESET;
873				}
874				ct->ct_error.re_status = RPC_CANTRECV;
875				ct->ct_error.re_errno = error;
876				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
877					cr->cr_error = error;
878					wakeup(cr);
879				}
880				mtx_unlock(&ct->ct_lock);
881				break;
882			}
883			bcopy(mtod(m, uint32_t *), &header, sizeof(uint32_t));
884			header = ntohl(header);
885			ct->ct_record = NULL;
886			ct->ct_record_resid = header & 0x7fffffff;
887			ct->ct_record_eor = ((header & 0x80000000) != 0);
888			m_freem(m);
889		} else {
890			/*
891			 * Wait until the socket has the whole record
892			 * buffered.
893			 */
894			do_read = FALSE;
895			if (so->so_rcv.sb_cc >= ct->ct_record_resid
896			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
897			    || so->so_error)
898				do_read = TRUE;
899
900			if (!do_read)
901				return (SU_OK);
902
903			/*
904			 * We have the record mark. Read as much as
905			 * the socket has buffered up to the end of
906			 * this record.
907			 */
908			SOCKBUF_UNLOCK(&so->so_rcv);
909			uio.uio_resid = ct->ct_record_resid;
910			m = NULL;
911			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
912			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
913			SOCKBUF_LOCK(&so->so_rcv);
914
915			if (error == EWOULDBLOCK)
916				break;
917
918			if (error || uio.uio_resid == ct->ct_record_resid)
919				goto wakeup_all;
920
921			/*
922			 * If we have part of the record already,
923			 * chain this bit onto the end.
924			 */
925			if (ct->ct_record)
926				m_last(ct->ct_record)->m_next = m;
927			else
928				ct->ct_record = m;
929
930			ct->ct_record_resid = uio.uio_resid;
931
932			/*
933			 * If we have the entire record, see if we can
934			 * match it to a request.
935			 */
936			if (ct->ct_record_resid == 0
937			    && ct->ct_record_eor) {
938				/*
939				 * The XID is in the first uint32_t of
940				 * the reply.
941				 */
942				if (ct->ct_record->m_len < sizeof(xid))
943					ct->ct_record =
944						m_pullup(ct->ct_record,
945						    sizeof(xid));
946				if (!ct->ct_record)
947					break;
948				bcopy(mtod(ct->ct_record, uint32_t *),
949				    &xid, sizeof(uint32_t));
950				xid = ntohl(xid);
951
952				mtx_lock(&ct->ct_lock);
953				foundreq = 0;
954				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
955					if (cr->cr_xid == xid) {
956						/*
957						 * This one
958						 * matches. We leave
959						 * the reply mbuf in
960						 * cr->cr_mrep. Set
961						 * the XID to zero so
962						 * that we will ignore
963						 * any duplicaed
964						 * replies.
965						 */
966						cr->cr_xid = 0;
967						cr->cr_mrep = ct->ct_record;
968						cr->cr_error = 0;
969						foundreq = 1;
970						wakeup(cr);
971						break;
972					}
973				}
974				mtx_unlock(&ct->ct_lock);
975
976				if (!foundreq)
977					m_freem(ct->ct_record);
978				ct->ct_record = NULL;
979			}
980		}
981	} while (m);
982	return (SU_OK);
983}
984