Deleted Added
full compact
clnt_vc.c (195703) clnt_vc.c (196503)
1/* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC";
35static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36#endif
37#include <sys/cdefs.h>
1/* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC";
35static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36#endif
37#include <sys/cdefs.h>
38__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 195703 2009-07-14 22:54:29Z kib $");
38__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 196503 2009-08-24 10:09:30Z zec $");
39
40/*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59#include <sys/param.h>
60#include <sys/systm.h>
61#include <sys/lock.h>
62#include <sys/malloc.h>
63#include <sys/mbuf.h>
64#include <sys/mutex.h>
65#include <sys/pcpu.h>
66#include <sys/proc.h>
67#include <sys/protosw.h>
68#include <sys/socket.h>
69#include <sys/socketvar.h>
70#include <sys/syslog.h>
71#include <sys/time.h>
72#include <sys/uio.h>
39
40/*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59#include <sys/param.h>
60#include <sys/systm.h>
61#include <sys/lock.h>
62#include <sys/malloc.h>
63#include <sys/mbuf.h>
64#include <sys/mutex.h>
65#include <sys/pcpu.h>
66#include <sys/proc.h>
67#include <sys/protosw.h>
68#include <sys/socket.h>
69#include <sys/socketvar.h>
70#include <sys/syslog.h>
71#include <sys/time.h>
72#include <sys/uio.h>
73
74#include <net/vnet.h>
75
73#include <netinet/tcp.h>
74
75#include <rpc/rpc.h>
76#include <rpc/rpc_com.h>
77
78#define MCALL_MSG_SIZE 24
79
80struct cmessage {
81 struct cmsghdr cmsg;
82 struct cmsgcred cmcred;
83};
84
85static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
86 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
87static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
88static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
89static void clnt_vc_abort(CLIENT *);
90static bool_t clnt_vc_control(CLIENT *, u_int, void *);
91static void clnt_vc_close(CLIENT *);
92static void clnt_vc_destroy(CLIENT *);
93static bool_t time_not_ok(struct timeval *);
94static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
95
96static struct clnt_ops clnt_vc_ops = {
97 .cl_call = clnt_vc_call,
98 .cl_abort = clnt_vc_abort,
99 .cl_geterr = clnt_vc_geterr,
100 .cl_freeres = clnt_vc_freeres,
101 .cl_close = clnt_vc_close,
102 .cl_destroy = clnt_vc_destroy,
103 .cl_control = clnt_vc_control
104};
105
106/*
107 * A pending RPC request which awaits a reply. Requests which have
108 * received their reply will have cr_xid set to zero and cr_mrep to
109 * the mbuf chain of the reply.
110 */
111struct ct_request {
112 TAILQ_ENTRY(ct_request) cr_link;
113 uint32_t cr_xid; /* XID of request */
114 struct mbuf *cr_mrep; /* reply received by upcall */
115 int cr_error; /* any error from upcall */
116 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
117};
118
119TAILQ_HEAD(ct_request_list, ct_request);
120
121struct ct_data {
122 struct mtx ct_lock;
123 int ct_threads; /* number of threads in clnt_vc_call */
124 bool_t ct_closing; /* TRUE if we are closing */
125 bool_t ct_closed; /* TRUE if we are closed */
126 struct socket *ct_socket; /* connection socket */
127 bool_t ct_closeit; /* close it on destroy */
128 struct timeval ct_wait; /* wait interval in milliseconds */
129 struct sockaddr_storage ct_addr; /* remote addr */
130 struct rpc_err ct_error;
131 uint32_t ct_xid;
132 char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
133 size_t ct_mpos; /* pos after marshal */
134 const char *ct_waitchan;
135 int ct_waitflag;
136 struct mbuf *ct_record; /* current reply record */
137 size_t ct_record_resid; /* how much left of reply to read */
138 bool_t ct_record_eor; /* true if reading last fragment */
139 struct ct_request_list ct_pending;
140 int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
141};
142
143static void clnt_vc_upcallsdone(struct ct_data *);
144
145static const char clnt_vc_errstr[] = "%s : %s";
146static const char clnt_vc_str[] = "clnt_vc_create";
147static const char clnt_read_vc_str[] = "read_vc";
148static const char __no_mem_str[] = "out of memory";
149
150/*
151 * Create a client handle for a connection.
152 * Default options are set, which the user can change using clnt_control()'s.
153 * The rpc/vc package does buffering similar to stdio, so the client
154 * must pick send and receive buffer sizes, 0 => use the default.
155 * NB: fd is copied into a private area.
156 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
157 * set this something more useful.
158 *
159 * fd should be an open socket
160 */
161CLIENT *
162clnt_vc_create(
163 struct socket *so, /* open file descriptor */
164 struct sockaddr *raddr, /* servers address */
165 const rpcprog_t prog, /* program number */
166 const rpcvers_t vers, /* version number */
167 size_t sendsz, /* buffer recv size */
168 size_t recvsz) /* buffer send size */
169{
170 CLIENT *cl; /* client handle */
171 struct ct_data *ct = NULL; /* client handle */
172 struct timeval now;
173 struct rpc_msg call_msg;
174 static uint32_t disrupt;
175 struct __rpc_sockinfo si;
176 XDR xdrs;
177 int error, interrupted, one = 1;
178 struct sockopt sopt;
179
180 if (disrupt == 0)
181 disrupt = (uint32_t)(long)raddr;
182
183 cl = (CLIENT *)mem_alloc(sizeof (*cl));
184 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
185
186 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
187 ct->ct_threads = 0;
188 ct->ct_closing = FALSE;
189 ct->ct_closed = FALSE;
190 ct->ct_upcallrefs = 0;
191
192 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
193 error = soconnect(so, raddr, curthread);
194 SOCK_LOCK(so);
195 interrupted = 0;
196 while ((so->so_state & SS_ISCONNECTING)
197 && so->so_error == 0) {
198 error = msleep(&so->so_timeo, SOCK_MTX(so),
199 PSOCK | PCATCH | PBDRY, "connec", 0);
200 if (error) {
201 if (error == EINTR || error == ERESTART)
202 interrupted = 1;
203 break;
204 }
205 }
206 if (error == 0) {
207 error = so->so_error;
208 so->so_error = 0;
209 }
210 SOCK_UNLOCK(so);
211 if (error) {
212 if (!interrupted)
213 so->so_state &= ~SS_ISCONNECTING;
214 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
215 rpc_createerr.cf_error.re_errno = error;
216 goto err;
217 }
218 }
219
76#include <netinet/tcp.h>
77
78#include <rpc/rpc.h>
79#include <rpc/rpc_com.h>
80
81#define MCALL_MSG_SIZE 24
82
83struct cmessage {
84 struct cmsghdr cmsg;
85 struct cmsgcred cmcred;
86};
87
88static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
89 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
90static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
91static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
92static void clnt_vc_abort(CLIENT *);
93static bool_t clnt_vc_control(CLIENT *, u_int, void *);
94static void clnt_vc_close(CLIENT *);
95static void clnt_vc_destroy(CLIENT *);
96static bool_t time_not_ok(struct timeval *);
97static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99static struct clnt_ops clnt_vc_ops = {
100 .cl_call = clnt_vc_call,
101 .cl_abort = clnt_vc_abort,
102 .cl_geterr = clnt_vc_geterr,
103 .cl_freeres = clnt_vc_freeres,
104 .cl_close = clnt_vc_close,
105 .cl_destroy = clnt_vc_destroy,
106 .cl_control = clnt_vc_control
107};
108
109/*
110 * A pending RPC request which awaits a reply. Requests which have
111 * received their reply will have cr_xid set to zero and cr_mrep to
112 * the mbuf chain of the reply.
113 */
114struct ct_request {
115 TAILQ_ENTRY(ct_request) cr_link;
116 uint32_t cr_xid; /* XID of request */
117 struct mbuf *cr_mrep; /* reply received by upcall */
118 int cr_error; /* any error from upcall */
119 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
120};
121
122TAILQ_HEAD(ct_request_list, ct_request);
123
124struct ct_data {
125 struct mtx ct_lock;
126 int ct_threads; /* number of threads in clnt_vc_call */
127 bool_t ct_closing; /* TRUE if we are closing */
128 bool_t ct_closed; /* TRUE if we are closed */
129 struct socket *ct_socket; /* connection socket */
130 bool_t ct_closeit; /* close it on destroy */
131 struct timeval ct_wait; /* wait interval in milliseconds */
132 struct sockaddr_storage ct_addr; /* remote addr */
133 struct rpc_err ct_error;
134 uint32_t ct_xid;
135 char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
136 size_t ct_mpos; /* pos after marshal */
137 const char *ct_waitchan;
138 int ct_waitflag;
139 struct mbuf *ct_record; /* current reply record */
140 size_t ct_record_resid; /* how much left of reply to read */
141 bool_t ct_record_eor; /* true if reading last fragment */
142 struct ct_request_list ct_pending;
143 int ct_upcallrefs; /* Ref cnt of upcalls in prog. */
144};
145
146static void clnt_vc_upcallsdone(struct ct_data *);
147
148static const char clnt_vc_errstr[] = "%s : %s";
149static const char clnt_vc_str[] = "clnt_vc_create";
150static const char clnt_read_vc_str[] = "read_vc";
151static const char __no_mem_str[] = "out of memory";
152
153/*
154 * Create a client handle for a connection.
155 * Default options are set, which the user can change using clnt_control()'s.
156 * The rpc/vc package does buffering similar to stdio, so the client
157 * must pick send and receive buffer sizes, 0 => use the default.
158 * NB: fd is copied into a private area.
159 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
160 * set this something more useful.
161 *
162 * fd should be an open socket
163 */
164CLIENT *
165clnt_vc_create(
166 struct socket *so, /* open file descriptor */
167 struct sockaddr *raddr, /* servers address */
168 const rpcprog_t prog, /* program number */
169 const rpcvers_t vers, /* version number */
170 size_t sendsz, /* buffer recv size */
171 size_t recvsz) /* buffer send size */
172{
173 CLIENT *cl; /* client handle */
174 struct ct_data *ct = NULL; /* client handle */
175 struct timeval now;
176 struct rpc_msg call_msg;
177 static uint32_t disrupt;
178 struct __rpc_sockinfo si;
179 XDR xdrs;
180 int error, interrupted, one = 1;
181 struct sockopt sopt;
182
183 if (disrupt == 0)
184 disrupt = (uint32_t)(long)raddr;
185
186 cl = (CLIENT *)mem_alloc(sizeof (*cl));
187 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
188
189 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
190 ct->ct_threads = 0;
191 ct->ct_closing = FALSE;
192 ct->ct_closed = FALSE;
193 ct->ct_upcallrefs = 0;
194
195 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
196 error = soconnect(so, raddr, curthread);
197 SOCK_LOCK(so);
198 interrupted = 0;
199 while ((so->so_state & SS_ISCONNECTING)
200 && so->so_error == 0) {
201 error = msleep(&so->so_timeo, SOCK_MTX(so),
202 PSOCK | PCATCH | PBDRY, "connec", 0);
203 if (error) {
204 if (error == EINTR || error == ERESTART)
205 interrupted = 1;
206 break;
207 }
208 }
209 if (error == 0) {
210 error = so->so_error;
211 so->so_error = 0;
212 }
213 SOCK_UNLOCK(so);
214 if (error) {
215 if (!interrupted)
216 so->so_state &= ~SS_ISCONNECTING;
217 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
218 rpc_createerr.cf_error.re_errno = error;
219 goto err;
220 }
221 }
222
220 if (!__rpc_socket2sockinfo(so, &si))
223 CURVNET_SET(so->so_vnet);
224 if (!__rpc_socket2sockinfo(so, &si)) {
225 CURVNET_RESTORE();
221 goto err;
226 goto err;
227 }
222
223 if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
224 bzero(&sopt, sizeof(sopt));
225 sopt.sopt_dir = SOPT_SET;
226 sopt.sopt_level = SOL_SOCKET;
227 sopt.sopt_name = SO_KEEPALIVE;
228 sopt.sopt_val = &one;
229 sopt.sopt_valsize = sizeof(one);
230 sosetopt(so, &sopt);
231 }
232
233 if (so->so_proto->pr_protocol == IPPROTO_TCP) {
234 bzero(&sopt, sizeof(sopt));
235 sopt.sopt_dir = SOPT_SET;
236 sopt.sopt_level = IPPROTO_TCP;
237 sopt.sopt_name = TCP_NODELAY;
238 sopt.sopt_val = &one;
239 sopt.sopt_valsize = sizeof(one);
240 sosetopt(so, &sopt);
241 }
228
229 if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
230 bzero(&sopt, sizeof(sopt));
231 sopt.sopt_dir = SOPT_SET;
232 sopt.sopt_level = SOL_SOCKET;
233 sopt.sopt_name = SO_KEEPALIVE;
234 sopt.sopt_val = &one;
235 sopt.sopt_valsize = sizeof(one);
236 sosetopt(so, &sopt);
237 }
238
239 if (so->so_proto->pr_protocol == IPPROTO_TCP) {
240 bzero(&sopt, sizeof(sopt));
241 sopt.sopt_dir = SOPT_SET;
242 sopt.sopt_level = IPPROTO_TCP;
243 sopt.sopt_name = TCP_NODELAY;
244 sopt.sopt_val = &one;
245 sopt.sopt_valsize = sizeof(one);
246 sosetopt(so, &sopt);
247 }
248 CURVNET_RESTORE();
242
243 ct->ct_closeit = FALSE;
244
245 /*
246 * Set up private data struct
247 */
248 ct->ct_socket = so;
249 ct->ct_wait.tv_sec = -1;
250 ct->ct_wait.tv_usec = -1;
251 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
252
253 /*
254 * Initialize call message
255 */
256 getmicrotime(&now);
257 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
258 call_msg.rm_xid = ct->ct_xid;
259 call_msg.rm_direction = CALL;
260 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
261 call_msg.rm_call.cb_prog = (uint32_t)prog;
262 call_msg.rm_call.cb_vers = (uint32_t)vers;
263
264 /*
265 * pre-serialize the static part of the call msg and stash it away
266 */
267 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
268 XDR_ENCODE);
269 if (! xdr_callhdr(&xdrs, &call_msg)) {
270 if (ct->ct_closeit) {
271 soclose(ct->ct_socket);
272 }
273 goto err;
274 }
275 ct->ct_mpos = XDR_GETPOS(&xdrs);
276 XDR_DESTROY(&xdrs);
277 ct->ct_waitchan = "rpcrecv";
278 ct->ct_waitflag = 0;
279
280 /*
281 * Create a client handle which uses xdrrec for serialization
282 * and authnone for authentication.
283 */
284 cl->cl_refs = 1;
285 cl->cl_ops = &clnt_vc_ops;
286 cl->cl_private = ct;
287 cl->cl_auth = authnone_create();
288 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
289 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
290 soreserve(ct->ct_socket, sendsz, recvsz);
291
292 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
293 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
294 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
295
296 ct->ct_record = NULL;
297 ct->ct_record_resid = 0;
298 TAILQ_INIT(&ct->ct_pending);
299 return (cl);
300
301err:
302 if (cl) {
303 if (ct) {
304 mtx_destroy(&ct->ct_lock);
305 mem_free(ct, sizeof (struct ct_data));
306 }
307 if (cl)
308 mem_free(cl, sizeof (CLIENT));
309 }
310 return ((CLIENT *)NULL);
311}
312
313static enum clnt_stat
314clnt_vc_call(
315 CLIENT *cl, /* client handle */
316 struct rpc_callextra *ext, /* call metadata */
317 rpcproc_t proc, /* procedure number */
318 struct mbuf *args, /* pointer to args */
319 struct mbuf **resultsp, /* pointer to results */
320 struct timeval utimeout)
321{
322 struct ct_data *ct = (struct ct_data *) cl->cl_private;
323 AUTH *auth;
324 struct rpc_err *errp;
325 enum clnt_stat stat;
326 XDR xdrs;
327 struct rpc_msg reply_msg;
328 bool_t ok;
329 int nrefreshes = 2; /* number of times to refresh cred */
330 struct timeval timeout;
331 uint32_t xid;
332 struct mbuf *mreq = NULL, *results;
333 struct ct_request *cr;
334 int error;
335
336 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
337
338 mtx_lock(&ct->ct_lock);
339
340 if (ct->ct_closing || ct->ct_closed) {
341 mtx_unlock(&ct->ct_lock);
342 free(cr, M_RPC);
343 return (RPC_CANTSEND);
344 }
345 ct->ct_threads++;
346
347 if (ext) {
348 auth = ext->rc_auth;
349 errp = &ext->rc_err;
350 } else {
351 auth = cl->cl_auth;
352 errp = &ct->ct_error;
353 }
354
355 cr->cr_mrep = NULL;
356 cr->cr_error = 0;
357
358 if (ct->ct_wait.tv_usec == -1) {
359 timeout = utimeout; /* use supplied timeout */
360 } else {
361 timeout = ct->ct_wait; /* use default timeout */
362 }
363
364call_again:
365 mtx_assert(&ct->ct_lock, MA_OWNED);
366
367 ct->ct_xid++;
368 xid = ct->ct_xid;
369
370 mtx_unlock(&ct->ct_lock);
371
372 /*
373 * Leave space to pre-pend the record mark.
374 */
375 MGETHDR(mreq, M_WAIT, MT_DATA);
376 mreq->m_data += sizeof(uint32_t);
377 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
378 ("RPC header too big"));
379 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
380 mreq->m_len = ct->ct_mpos;
381
382 /*
383 * The XID is the first thing in the request.
384 */
385 *mtod(mreq, uint32_t *) = htonl(xid);
386
387 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
388
389 errp->re_status = stat = RPC_SUCCESS;
390
391 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
392 (! AUTH_MARSHALL(auth, xid, &xdrs,
393 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
394 errp->re_status = stat = RPC_CANTENCODEARGS;
395 mtx_lock(&ct->ct_lock);
396 goto out;
397 }
398 mreq->m_pkthdr.len = m_length(mreq, NULL);
399
400 /*
401 * Prepend a record marker containing the packet length.
402 */
403 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
404 *mtod(mreq, uint32_t *) =
405 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
406
407 cr->cr_xid = xid;
408 mtx_lock(&ct->ct_lock);
409 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
410 mtx_unlock(&ct->ct_lock);
411
412 /*
413 * sosend consumes mreq.
414 */
415 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
416 mreq = NULL;
417 if (error == EMSGSIZE) {
418 SOCKBUF_LOCK(&ct->ct_socket->so_snd);
419 sbwait(&ct->ct_socket->so_snd);
420 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
421 AUTH_VALIDATE(auth, xid, NULL, NULL);
422 mtx_lock(&ct->ct_lock);
423 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424 goto call_again;
425 }
426
427 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
428 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
429 reply_msg.acpted_rply.ar_verf.oa_length = 0;
430 reply_msg.acpted_rply.ar_results.where = NULL;
431 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
432
433 mtx_lock(&ct->ct_lock);
434 if (error) {
435 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
436 errp->re_errno = error;
437 errp->re_status = stat = RPC_CANTSEND;
438 goto out;
439 }
440
441 /*
442 * Check to see if we got an upcall while waiting for the
443 * lock. In both these cases, the request has been removed
444 * from ct->ct_pending.
445 */
446 if (cr->cr_error) {
447 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
448 errp->re_errno = cr->cr_error;
449 errp->re_status = stat = RPC_CANTRECV;
450 goto out;
451 }
452 if (cr->cr_mrep) {
453 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
454 goto got_reply;
455 }
456
457 /*
458 * Hack to provide rpc-based message passing
459 */
460 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
461 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
462 errp->re_status = stat = RPC_TIMEDOUT;
463 goto out;
464 }
465
466 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
467 tvtohz(&timeout));
468
469 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
470
471 if (error) {
472 /*
473 * The sleep returned an error so our request is still
474 * on the list. Turn the error code into an
475 * appropriate client status.
476 */
477 errp->re_errno = error;
478 switch (error) {
479 case EINTR:
480 case ERESTART:
481 stat = RPC_INTR;
482 break;
483 case EWOULDBLOCK:
484 stat = RPC_TIMEDOUT;
485 break;
486 default:
487 stat = RPC_CANTRECV;
488 }
489 errp->re_status = stat;
490 goto out;
491 } else {
492 /*
493 * We were woken up by the upcall. If the
494 * upcall had a receive error, report that,
495 * otherwise we have a reply.
496 */
497 if (cr->cr_error) {
498 errp->re_errno = cr->cr_error;
499 errp->re_status = stat = RPC_CANTRECV;
500 goto out;
501 }
502 }
503
504got_reply:
505 /*
506 * Now decode and validate the response. We need to drop the
507 * lock since xdr_replymsg may end up sleeping in malloc.
508 */
509 mtx_unlock(&ct->ct_lock);
510
511 if (ext && ext->rc_feedback)
512 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
513
514 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
515 ok = xdr_replymsg(&xdrs, &reply_msg);
516 cr->cr_mrep = NULL;
517
518 if (ok) {
519 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
520 (reply_msg.acpted_rply.ar_stat == SUCCESS))
521 errp->re_status = stat = RPC_SUCCESS;
522 else
523 stat = _seterr_reply(&reply_msg, errp);
524
525 if (stat == RPC_SUCCESS) {
526 results = xdrmbuf_getall(&xdrs);
527 if (!AUTH_VALIDATE(auth, xid,
528 &reply_msg.acpted_rply.ar_verf,
529 &results)) {
530 errp->re_status = stat = RPC_AUTHERROR;
531 errp->re_why = AUTH_INVALIDRESP;
532 } else {
533 KASSERT(results,
534 ("auth validated but no result"));
535 *resultsp = results;
536 }
537 } /* end successful completion */
538 /*
539 * If unsuccesful AND error is an authentication error
540 * then refresh credentials and try again, else break
541 */
542 else if (stat == RPC_AUTHERROR)
543 /* maybe our credentials need to be refreshed ... */
544 if (nrefreshes > 0 &&
545 AUTH_REFRESH(auth, &reply_msg)) {
546 nrefreshes--;
547 XDR_DESTROY(&xdrs);
548 mtx_lock(&ct->ct_lock);
549 goto call_again;
550 }
551 /* end of unsuccessful completion */
552 } /* end of valid reply message */
553 else {
554 errp->re_status = stat = RPC_CANTDECODERES;
555 }
556 XDR_DESTROY(&xdrs);
557 mtx_lock(&ct->ct_lock);
558out:
559 mtx_assert(&ct->ct_lock, MA_OWNED);
560
561 KASSERT(stat != RPC_SUCCESS || *resultsp,
562 ("RPC_SUCCESS without reply"));
563
564 if (mreq)
565 m_freem(mreq);
566 if (cr->cr_mrep)
567 m_freem(cr->cr_mrep);
568
569 ct->ct_threads--;
570 if (ct->ct_closing)
571 wakeup(ct);
572
573 mtx_unlock(&ct->ct_lock);
574
575 if (auth && stat != RPC_SUCCESS)
576 AUTH_VALIDATE(auth, xid, NULL, NULL);
577
578 free(cr, M_RPC);
579
580 return (stat);
581}
582
583static void
584clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
585{
586 struct ct_data *ct = (struct ct_data *) cl->cl_private;
587
588 *errp = ct->ct_error;
589}
590
591static bool_t
592clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
593{
594 XDR xdrs;
595 bool_t dummy;
596
597 xdrs.x_op = XDR_FREE;
598 dummy = (*xdr_res)(&xdrs, res_ptr);
599
600 return (dummy);
601}
602
603/*ARGSUSED*/
604static void
605clnt_vc_abort(CLIENT *cl)
606{
607}
608
609static bool_t
610clnt_vc_control(CLIENT *cl, u_int request, void *info)
611{
612 struct ct_data *ct = (struct ct_data *)cl->cl_private;
613 void *infop = info;
614
615 mtx_lock(&ct->ct_lock);
616
617 switch (request) {
618 case CLSET_FD_CLOSE:
619 ct->ct_closeit = TRUE;
620 mtx_unlock(&ct->ct_lock);
621 return (TRUE);
622 case CLSET_FD_NCLOSE:
623 ct->ct_closeit = FALSE;
624 mtx_unlock(&ct->ct_lock);
625 return (TRUE);
626 default:
627 break;
628 }
629
630 /* for other requests which use info */
631 if (info == NULL) {
632 mtx_unlock(&ct->ct_lock);
633 return (FALSE);
634 }
635 switch (request) {
636 case CLSET_TIMEOUT:
637 if (time_not_ok((struct timeval *)info)) {
638 mtx_unlock(&ct->ct_lock);
639 return (FALSE);
640 }
641 ct->ct_wait = *(struct timeval *)infop;
642 break;
643 case CLGET_TIMEOUT:
644 *(struct timeval *)infop = ct->ct_wait;
645 break;
646 case CLGET_SERVER_ADDR:
647 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
648 break;
649 case CLGET_SVC_ADDR:
650 /*
651 * Slightly different semantics to userland - we use
652 * sockaddr instead of netbuf.
653 */
654 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
655 break;
656 case CLSET_SVC_ADDR: /* set to new address */
657 mtx_unlock(&ct->ct_lock);
658 return (FALSE);
659 case CLGET_XID:
660 *(uint32_t *)info = ct->ct_xid;
661 break;
662 case CLSET_XID:
663 /* This will set the xid of the NEXT call */
664 /* decrement by 1 as clnt_vc_call() increments once */
665 ct->ct_xid = *(uint32_t *)info - 1;
666 break;
667 case CLGET_VERS:
668 /*
669 * This RELIES on the information that, in the call body,
670 * the version number field is the fifth field from the
671 * begining of the RPC header. MUST be changed if the
672 * call_struct is changed
673 */
674 *(uint32_t *)info =
675 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
676 4 * BYTES_PER_XDR_UNIT));
677 break;
678
679 case CLSET_VERS:
680 *(uint32_t *)(void *)(ct->ct_mcallc +
681 4 * BYTES_PER_XDR_UNIT) =
682 htonl(*(uint32_t *)info);
683 break;
684
685 case CLGET_PROG:
686 /*
687 * This RELIES on the information that, in the call body,
688 * the program number field is the fourth field from the
689 * begining of the RPC header. MUST be changed if the
690 * call_struct is changed
691 */
692 *(uint32_t *)info =
693 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
694 3 * BYTES_PER_XDR_UNIT));
695 break;
696
697 case CLSET_PROG:
698 *(uint32_t *)(void *)(ct->ct_mcallc +
699 3 * BYTES_PER_XDR_UNIT) =
700 htonl(*(uint32_t *)info);
701 break;
702
703 case CLSET_WAITCHAN:
704 ct->ct_waitchan = (const char *)info;
705 break;
706
707 case CLGET_WAITCHAN:
708 *(const char **) info = ct->ct_waitchan;
709 break;
710
711 case CLSET_INTERRUPTIBLE:
712 if (*(int *) info)
713 ct->ct_waitflag = PCATCH | PBDRY;
714 else
715 ct->ct_waitflag = 0;
716 break;
717
718 case CLGET_INTERRUPTIBLE:
719 if (ct->ct_waitflag)
720 *(int *) info = TRUE;
721 else
722 *(int *) info = FALSE;
723 break;
724
725 default:
726 mtx_unlock(&ct->ct_lock);
727 return (FALSE);
728 }
729
730 mtx_unlock(&ct->ct_lock);
731 return (TRUE);
732}
733
734static void
735clnt_vc_close(CLIENT *cl)
736{
737 struct ct_data *ct = (struct ct_data *) cl->cl_private;
738 struct ct_request *cr;
739
740 mtx_lock(&ct->ct_lock);
741
742 if (ct->ct_closed) {
743 mtx_unlock(&ct->ct_lock);
744 return;
745 }
746
747 if (ct->ct_closing) {
748 while (ct->ct_closing)
749 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
750 KASSERT(ct->ct_closed, ("client should be closed"));
751 mtx_unlock(&ct->ct_lock);
752 return;
753 }
754
755 if (ct->ct_socket) {
756 ct->ct_closing = TRUE;
757 mtx_unlock(&ct->ct_lock);
758
759 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
760 soupcall_clear(ct->ct_socket, SO_RCV);
761 clnt_vc_upcallsdone(ct);
762 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
763
764 /*
765 * Abort any pending requests and wait until everyone
766 * has finished with clnt_vc_call.
767 */
768 mtx_lock(&ct->ct_lock);
769 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
770 cr->cr_xid = 0;
771 cr->cr_error = ESHUTDOWN;
772 wakeup(cr);
773 }
774
775 while (ct->ct_threads)
776 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
777 }
778
779 ct->ct_closing = FALSE;
780 ct->ct_closed = TRUE;
781 mtx_unlock(&ct->ct_lock);
782 wakeup(ct);
783}
784
785static void
786clnt_vc_destroy(CLIENT *cl)
787{
788 struct ct_data *ct = (struct ct_data *) cl->cl_private;
789 struct socket *so = NULL;
790
791 clnt_vc_close(cl);
792
793 mtx_lock(&ct->ct_lock);
794
795 if (ct->ct_socket) {
796 if (ct->ct_closeit) {
797 so = ct->ct_socket;
798 }
799 }
800
801 mtx_unlock(&ct->ct_lock);
802
803 mtx_destroy(&ct->ct_lock);
804 if (so) {
805 soshutdown(so, SHUT_WR);
806 soclose(so);
807 }
808 mem_free(ct, sizeof(struct ct_data));
809 mem_free(cl, sizeof(CLIENT));
810}
811
812/*
813 * Make sure that the time is not garbage. -1 value is disallowed.
814 * Note this is different from time_not_ok in clnt_dg.c
815 */
816static bool_t
817time_not_ok(struct timeval *t)
818{
819 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
820 t->tv_usec <= -1 || t->tv_usec > 1000000);
821}
822
823int
824clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
825{
826 struct ct_data *ct = (struct ct_data *) arg;
827 struct uio uio;
828 struct mbuf *m;
829 struct ct_request *cr;
830 int error, rcvflag, foundreq;
831 uint32_t xid, header;
832 bool_t do_read;
833
834 ct->ct_upcallrefs++;
835 uio.uio_td = curthread;
836 do {
837 /*
838 * If ct_record_resid is zero, we are waiting for a
839 * record mark.
840 */
841 if (ct->ct_record_resid == 0) {
842
843 /*
844 * Make sure there is either a whole record
845 * mark in the buffer or there is some other
846 * error condition
847 */
848 do_read = FALSE;
849 if (so->so_rcv.sb_cc >= sizeof(uint32_t)
850 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
851 || so->so_error)
852 do_read = TRUE;
853
854 if (!do_read)
855 break;
856
857 SOCKBUF_UNLOCK(&so->so_rcv);
858 uio.uio_resid = sizeof(uint32_t);
859 m = NULL;
860 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
861 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
862 SOCKBUF_LOCK(&so->so_rcv);
863
864 if (error == EWOULDBLOCK)
865 break;
866
867 /*
868 * If there was an error, wake up all pending
869 * requests.
870 */
871 if (error || uio.uio_resid > 0) {
872 wakeup_all:
873 mtx_lock(&ct->ct_lock);
874 if (!error) {
875 /*
876 * We must have got EOF trying
877 * to read from the stream.
878 */
879 error = ECONNRESET;
880 }
881 ct->ct_error.re_status = RPC_CANTRECV;
882 ct->ct_error.re_errno = error;
883 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
884 cr->cr_error = error;
885 wakeup(cr);
886 }
887 mtx_unlock(&ct->ct_lock);
888 break;
889 }
890 bcopy(mtod(m, uint32_t *), &header, sizeof(uint32_t));
891 header = ntohl(header);
892 ct->ct_record = NULL;
893 ct->ct_record_resid = header & 0x7fffffff;
894 ct->ct_record_eor = ((header & 0x80000000) != 0);
895 m_freem(m);
896 } else {
897 /*
898 * Wait until the socket has the whole record
899 * buffered.
900 */
901 do_read = FALSE;
902 if (so->so_rcv.sb_cc >= ct->ct_record_resid
903 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
904 || so->so_error)
905 do_read = TRUE;
906
907 if (!do_read)
908 break;
909
910 /*
911 * We have the record mark. Read as much as
912 * the socket has buffered up to the end of
913 * this record.
914 */
915 SOCKBUF_UNLOCK(&so->so_rcv);
916 uio.uio_resid = ct->ct_record_resid;
917 m = NULL;
918 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
919 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
920 SOCKBUF_LOCK(&so->so_rcv);
921
922 if (error == EWOULDBLOCK)
923 break;
924
925 if (error || uio.uio_resid == ct->ct_record_resid)
926 goto wakeup_all;
927
928 /*
929 * If we have part of the record already,
930 * chain this bit onto the end.
931 */
932 if (ct->ct_record)
933 m_last(ct->ct_record)->m_next = m;
934 else
935 ct->ct_record = m;
936
937 ct->ct_record_resid = uio.uio_resid;
938
939 /*
940 * If we have the entire record, see if we can
941 * match it to a request.
942 */
943 if (ct->ct_record_resid == 0
944 && ct->ct_record_eor) {
945 /*
946 * The XID is in the first uint32_t of
947 * the reply.
948 */
949 if (ct->ct_record->m_len < sizeof(xid))
950 ct->ct_record =
951 m_pullup(ct->ct_record,
952 sizeof(xid));
953 if (!ct->ct_record)
954 break;
955 bcopy(mtod(ct->ct_record, uint32_t *),
956 &xid, sizeof(uint32_t));
957 xid = ntohl(xid);
958
959 mtx_lock(&ct->ct_lock);
960 foundreq = 0;
961 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
962 if (cr->cr_xid == xid) {
963 /*
964 * This one
965 * matches. We leave
966 * the reply mbuf in
967 * cr->cr_mrep. Set
968 * the XID to zero so
969 * that we will ignore
970 * any duplicaed
971 * replies.
972 */
973 cr->cr_xid = 0;
974 cr->cr_mrep = ct->ct_record;
975 cr->cr_error = 0;
976 foundreq = 1;
977 wakeup(cr);
978 break;
979 }
980 }
981 mtx_unlock(&ct->ct_lock);
982
983 if (!foundreq)
984 m_freem(ct->ct_record);
985 ct->ct_record = NULL;
986 }
987 }
988 } while (m);
989 ct->ct_upcallrefs--;
990 if (ct->ct_upcallrefs < 0)
991 panic("rpcvc upcall refcnt");
992 if (ct->ct_upcallrefs == 0)
993 wakeup(&ct->ct_upcallrefs);
994 return (SU_OK);
995}
996
997/*
998 * Wait for all upcalls in progress to complete.
999 */
1000static void
1001clnt_vc_upcallsdone(struct ct_data *ct)
1002{
1003
1004 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1005
1006 while (ct->ct_upcallrefs > 0)
1007 (void) msleep(&ct->ct_upcallrefs,
1008 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1009}
249
250 ct->ct_closeit = FALSE;
251
252 /*
253 * Set up private data struct
254 */
255 ct->ct_socket = so;
256 ct->ct_wait.tv_sec = -1;
257 ct->ct_wait.tv_usec = -1;
258 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
259
260 /*
261 * Initialize call message
262 */
263 getmicrotime(&now);
264 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
265 call_msg.rm_xid = ct->ct_xid;
266 call_msg.rm_direction = CALL;
267 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
268 call_msg.rm_call.cb_prog = (uint32_t)prog;
269 call_msg.rm_call.cb_vers = (uint32_t)vers;
270
271 /*
272 * pre-serialize the static part of the call msg and stash it away
273 */
274 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
275 XDR_ENCODE);
276 if (! xdr_callhdr(&xdrs, &call_msg)) {
277 if (ct->ct_closeit) {
278 soclose(ct->ct_socket);
279 }
280 goto err;
281 }
282 ct->ct_mpos = XDR_GETPOS(&xdrs);
283 XDR_DESTROY(&xdrs);
284 ct->ct_waitchan = "rpcrecv";
285 ct->ct_waitflag = 0;
286
287 /*
288 * Create a client handle which uses xdrrec for serialization
289 * and authnone for authentication.
290 */
291 cl->cl_refs = 1;
292 cl->cl_ops = &clnt_vc_ops;
293 cl->cl_private = ct;
294 cl->cl_auth = authnone_create();
295 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
296 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
297 soreserve(ct->ct_socket, sendsz, recvsz);
298
299 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
300 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
301 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
302
303 ct->ct_record = NULL;
304 ct->ct_record_resid = 0;
305 TAILQ_INIT(&ct->ct_pending);
306 return (cl);
307
308err:
309 if (cl) {
310 if (ct) {
311 mtx_destroy(&ct->ct_lock);
312 mem_free(ct, sizeof (struct ct_data));
313 }
314 if (cl)
315 mem_free(cl, sizeof (CLIENT));
316 }
317 return ((CLIENT *)NULL);
318}
319
320static enum clnt_stat
321clnt_vc_call(
322 CLIENT *cl, /* client handle */
323 struct rpc_callextra *ext, /* call metadata */
324 rpcproc_t proc, /* procedure number */
325 struct mbuf *args, /* pointer to args */
326 struct mbuf **resultsp, /* pointer to results */
327 struct timeval utimeout)
328{
329 struct ct_data *ct = (struct ct_data *) cl->cl_private;
330 AUTH *auth;
331 struct rpc_err *errp;
332 enum clnt_stat stat;
333 XDR xdrs;
334 struct rpc_msg reply_msg;
335 bool_t ok;
336 int nrefreshes = 2; /* number of times to refresh cred */
337 struct timeval timeout;
338 uint32_t xid;
339 struct mbuf *mreq = NULL, *results;
340 struct ct_request *cr;
341 int error;
342
343 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
344
345 mtx_lock(&ct->ct_lock);
346
347 if (ct->ct_closing || ct->ct_closed) {
348 mtx_unlock(&ct->ct_lock);
349 free(cr, M_RPC);
350 return (RPC_CANTSEND);
351 }
352 ct->ct_threads++;
353
354 if (ext) {
355 auth = ext->rc_auth;
356 errp = &ext->rc_err;
357 } else {
358 auth = cl->cl_auth;
359 errp = &ct->ct_error;
360 }
361
362 cr->cr_mrep = NULL;
363 cr->cr_error = 0;
364
365 if (ct->ct_wait.tv_usec == -1) {
366 timeout = utimeout; /* use supplied timeout */
367 } else {
368 timeout = ct->ct_wait; /* use default timeout */
369 }
370
371call_again:
372 mtx_assert(&ct->ct_lock, MA_OWNED);
373
374 ct->ct_xid++;
375 xid = ct->ct_xid;
376
377 mtx_unlock(&ct->ct_lock);
378
379 /*
380 * Leave space to pre-pend the record mark.
381 */
382 MGETHDR(mreq, M_WAIT, MT_DATA);
383 mreq->m_data += sizeof(uint32_t);
384 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
385 ("RPC header too big"));
386 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
387 mreq->m_len = ct->ct_mpos;
388
389 /*
390 * The XID is the first thing in the request.
391 */
392 *mtod(mreq, uint32_t *) = htonl(xid);
393
394 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
395
396 errp->re_status = stat = RPC_SUCCESS;
397
398 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
399 (! AUTH_MARSHALL(auth, xid, &xdrs,
400 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
401 errp->re_status = stat = RPC_CANTENCODEARGS;
402 mtx_lock(&ct->ct_lock);
403 goto out;
404 }
405 mreq->m_pkthdr.len = m_length(mreq, NULL);
406
407 /*
408 * Prepend a record marker containing the packet length.
409 */
410 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
411 *mtod(mreq, uint32_t *) =
412 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
413
414 cr->cr_xid = xid;
415 mtx_lock(&ct->ct_lock);
416 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
417 mtx_unlock(&ct->ct_lock);
418
419 /*
420 * sosend consumes mreq.
421 */
422 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
423 mreq = NULL;
424 if (error == EMSGSIZE) {
425 SOCKBUF_LOCK(&ct->ct_socket->so_snd);
426 sbwait(&ct->ct_socket->so_snd);
427 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
428 AUTH_VALIDATE(auth, xid, NULL, NULL);
429 mtx_lock(&ct->ct_lock);
430 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
431 goto call_again;
432 }
433
434 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
435 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
436 reply_msg.acpted_rply.ar_verf.oa_length = 0;
437 reply_msg.acpted_rply.ar_results.where = NULL;
438 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
439
440 mtx_lock(&ct->ct_lock);
441 if (error) {
442 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
443 errp->re_errno = error;
444 errp->re_status = stat = RPC_CANTSEND;
445 goto out;
446 }
447
448 /*
449 * Check to see if we got an upcall while waiting for the
450 * lock. In both these cases, the request has been removed
451 * from ct->ct_pending.
452 */
453 if (cr->cr_error) {
454 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
455 errp->re_errno = cr->cr_error;
456 errp->re_status = stat = RPC_CANTRECV;
457 goto out;
458 }
459 if (cr->cr_mrep) {
460 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
461 goto got_reply;
462 }
463
464 /*
465 * Hack to provide rpc-based message passing
466 */
467 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
468 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
469 errp->re_status = stat = RPC_TIMEDOUT;
470 goto out;
471 }
472
473 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
474 tvtohz(&timeout));
475
476 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
477
478 if (error) {
479 /*
480 * The sleep returned an error so our request is still
481 * on the list. Turn the error code into an
482 * appropriate client status.
483 */
484 errp->re_errno = error;
485 switch (error) {
486 case EINTR:
487 case ERESTART:
488 stat = RPC_INTR;
489 break;
490 case EWOULDBLOCK:
491 stat = RPC_TIMEDOUT;
492 break;
493 default:
494 stat = RPC_CANTRECV;
495 }
496 errp->re_status = stat;
497 goto out;
498 } else {
499 /*
500 * We were woken up by the upcall. If the
501 * upcall had a receive error, report that,
502 * otherwise we have a reply.
503 */
504 if (cr->cr_error) {
505 errp->re_errno = cr->cr_error;
506 errp->re_status = stat = RPC_CANTRECV;
507 goto out;
508 }
509 }
510
511got_reply:
512 /*
513 * Now decode and validate the response. We need to drop the
514 * lock since xdr_replymsg may end up sleeping in malloc.
515 */
516 mtx_unlock(&ct->ct_lock);
517
518 if (ext && ext->rc_feedback)
519 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
520
521 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
522 ok = xdr_replymsg(&xdrs, &reply_msg);
523 cr->cr_mrep = NULL;
524
525 if (ok) {
526 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
527 (reply_msg.acpted_rply.ar_stat == SUCCESS))
528 errp->re_status = stat = RPC_SUCCESS;
529 else
530 stat = _seterr_reply(&reply_msg, errp);
531
532 if (stat == RPC_SUCCESS) {
533 results = xdrmbuf_getall(&xdrs);
534 if (!AUTH_VALIDATE(auth, xid,
535 &reply_msg.acpted_rply.ar_verf,
536 &results)) {
537 errp->re_status = stat = RPC_AUTHERROR;
538 errp->re_why = AUTH_INVALIDRESP;
539 } else {
540 KASSERT(results,
541 ("auth validated but no result"));
542 *resultsp = results;
543 }
544 } /* end successful completion */
545 /*
546 * If unsuccesful AND error is an authentication error
547 * then refresh credentials and try again, else break
548 */
549 else if (stat == RPC_AUTHERROR)
550 /* maybe our credentials need to be refreshed ... */
551 if (nrefreshes > 0 &&
552 AUTH_REFRESH(auth, &reply_msg)) {
553 nrefreshes--;
554 XDR_DESTROY(&xdrs);
555 mtx_lock(&ct->ct_lock);
556 goto call_again;
557 }
558 /* end of unsuccessful completion */
559 } /* end of valid reply message */
560 else {
561 errp->re_status = stat = RPC_CANTDECODERES;
562 }
563 XDR_DESTROY(&xdrs);
564 mtx_lock(&ct->ct_lock);
565out:
566 mtx_assert(&ct->ct_lock, MA_OWNED);
567
568 KASSERT(stat != RPC_SUCCESS || *resultsp,
569 ("RPC_SUCCESS without reply"));
570
571 if (mreq)
572 m_freem(mreq);
573 if (cr->cr_mrep)
574 m_freem(cr->cr_mrep);
575
576 ct->ct_threads--;
577 if (ct->ct_closing)
578 wakeup(ct);
579
580 mtx_unlock(&ct->ct_lock);
581
582 if (auth && stat != RPC_SUCCESS)
583 AUTH_VALIDATE(auth, xid, NULL, NULL);
584
585 free(cr, M_RPC);
586
587 return (stat);
588}
589
590static void
591clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
592{
593 struct ct_data *ct = (struct ct_data *) cl->cl_private;
594
595 *errp = ct->ct_error;
596}
597
598static bool_t
599clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
600{
601 XDR xdrs;
602 bool_t dummy;
603
604 xdrs.x_op = XDR_FREE;
605 dummy = (*xdr_res)(&xdrs, res_ptr);
606
607 return (dummy);
608}
609
610/*ARGSUSED*/
611static void
612clnt_vc_abort(CLIENT *cl)
613{
614}
615
616static bool_t
617clnt_vc_control(CLIENT *cl, u_int request, void *info)
618{
619 struct ct_data *ct = (struct ct_data *)cl->cl_private;
620 void *infop = info;
621
622 mtx_lock(&ct->ct_lock);
623
624 switch (request) {
625 case CLSET_FD_CLOSE:
626 ct->ct_closeit = TRUE;
627 mtx_unlock(&ct->ct_lock);
628 return (TRUE);
629 case CLSET_FD_NCLOSE:
630 ct->ct_closeit = FALSE;
631 mtx_unlock(&ct->ct_lock);
632 return (TRUE);
633 default:
634 break;
635 }
636
637 /* for other requests which use info */
638 if (info == NULL) {
639 mtx_unlock(&ct->ct_lock);
640 return (FALSE);
641 }
642 switch (request) {
643 case CLSET_TIMEOUT:
644 if (time_not_ok((struct timeval *)info)) {
645 mtx_unlock(&ct->ct_lock);
646 return (FALSE);
647 }
648 ct->ct_wait = *(struct timeval *)infop;
649 break;
650 case CLGET_TIMEOUT:
651 *(struct timeval *)infop = ct->ct_wait;
652 break;
653 case CLGET_SERVER_ADDR:
654 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
655 break;
656 case CLGET_SVC_ADDR:
657 /*
658 * Slightly different semantics to userland - we use
659 * sockaddr instead of netbuf.
660 */
661 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
662 break;
663 case CLSET_SVC_ADDR: /* set to new address */
664 mtx_unlock(&ct->ct_lock);
665 return (FALSE);
666 case CLGET_XID:
667 *(uint32_t *)info = ct->ct_xid;
668 break;
669 case CLSET_XID:
670 /* This will set the xid of the NEXT call */
671 /* decrement by 1 as clnt_vc_call() increments once */
672 ct->ct_xid = *(uint32_t *)info - 1;
673 break;
674 case CLGET_VERS:
675 /*
676 * This RELIES on the information that, in the call body,
677 * the version number field is the fifth field from the
678 * begining of the RPC header. MUST be changed if the
679 * call_struct is changed
680 */
681 *(uint32_t *)info =
682 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
683 4 * BYTES_PER_XDR_UNIT));
684 break;
685
686 case CLSET_VERS:
687 *(uint32_t *)(void *)(ct->ct_mcallc +
688 4 * BYTES_PER_XDR_UNIT) =
689 htonl(*(uint32_t *)info);
690 break;
691
692 case CLGET_PROG:
693 /*
694 * This RELIES on the information that, in the call body,
695 * the program number field is the fourth field from the
696 * begining of the RPC header. MUST be changed if the
697 * call_struct is changed
698 */
699 *(uint32_t *)info =
700 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
701 3 * BYTES_PER_XDR_UNIT));
702 break;
703
704 case CLSET_PROG:
705 *(uint32_t *)(void *)(ct->ct_mcallc +
706 3 * BYTES_PER_XDR_UNIT) =
707 htonl(*(uint32_t *)info);
708 break;
709
710 case CLSET_WAITCHAN:
711 ct->ct_waitchan = (const char *)info;
712 break;
713
714 case CLGET_WAITCHAN:
715 *(const char **) info = ct->ct_waitchan;
716 break;
717
718 case CLSET_INTERRUPTIBLE:
719 if (*(int *) info)
720 ct->ct_waitflag = PCATCH | PBDRY;
721 else
722 ct->ct_waitflag = 0;
723 break;
724
725 case CLGET_INTERRUPTIBLE:
726 if (ct->ct_waitflag)
727 *(int *) info = TRUE;
728 else
729 *(int *) info = FALSE;
730 break;
731
732 default:
733 mtx_unlock(&ct->ct_lock);
734 return (FALSE);
735 }
736
737 mtx_unlock(&ct->ct_lock);
738 return (TRUE);
739}
740
741static void
742clnt_vc_close(CLIENT *cl)
743{
744 struct ct_data *ct = (struct ct_data *) cl->cl_private;
745 struct ct_request *cr;
746
747 mtx_lock(&ct->ct_lock);
748
749 if (ct->ct_closed) {
750 mtx_unlock(&ct->ct_lock);
751 return;
752 }
753
754 if (ct->ct_closing) {
755 while (ct->ct_closing)
756 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
757 KASSERT(ct->ct_closed, ("client should be closed"));
758 mtx_unlock(&ct->ct_lock);
759 return;
760 }
761
762 if (ct->ct_socket) {
763 ct->ct_closing = TRUE;
764 mtx_unlock(&ct->ct_lock);
765
766 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
767 soupcall_clear(ct->ct_socket, SO_RCV);
768 clnt_vc_upcallsdone(ct);
769 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
770
771 /*
772 * Abort any pending requests and wait until everyone
773 * has finished with clnt_vc_call.
774 */
775 mtx_lock(&ct->ct_lock);
776 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
777 cr->cr_xid = 0;
778 cr->cr_error = ESHUTDOWN;
779 wakeup(cr);
780 }
781
782 while (ct->ct_threads)
783 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
784 }
785
786 ct->ct_closing = FALSE;
787 ct->ct_closed = TRUE;
788 mtx_unlock(&ct->ct_lock);
789 wakeup(ct);
790}
791
792static void
793clnt_vc_destroy(CLIENT *cl)
794{
795 struct ct_data *ct = (struct ct_data *) cl->cl_private;
796 struct socket *so = NULL;
797
798 clnt_vc_close(cl);
799
800 mtx_lock(&ct->ct_lock);
801
802 if (ct->ct_socket) {
803 if (ct->ct_closeit) {
804 so = ct->ct_socket;
805 }
806 }
807
808 mtx_unlock(&ct->ct_lock);
809
810 mtx_destroy(&ct->ct_lock);
811 if (so) {
812 soshutdown(so, SHUT_WR);
813 soclose(so);
814 }
815 mem_free(ct, sizeof(struct ct_data));
816 mem_free(cl, sizeof(CLIENT));
817}
818
819/*
820 * Make sure that the time is not garbage. -1 value is disallowed.
821 * Note this is different from time_not_ok in clnt_dg.c
822 */
823static bool_t
824time_not_ok(struct timeval *t)
825{
826 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
827 t->tv_usec <= -1 || t->tv_usec > 1000000);
828}
829
830int
831clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
832{
833 struct ct_data *ct = (struct ct_data *) arg;
834 struct uio uio;
835 struct mbuf *m;
836 struct ct_request *cr;
837 int error, rcvflag, foundreq;
838 uint32_t xid, header;
839 bool_t do_read;
840
841 ct->ct_upcallrefs++;
842 uio.uio_td = curthread;
843 do {
844 /*
845 * If ct_record_resid is zero, we are waiting for a
846 * record mark.
847 */
848 if (ct->ct_record_resid == 0) {
849
850 /*
851 * Make sure there is either a whole record
852 * mark in the buffer or there is some other
853 * error condition
854 */
855 do_read = FALSE;
856 if (so->so_rcv.sb_cc >= sizeof(uint32_t)
857 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
858 || so->so_error)
859 do_read = TRUE;
860
861 if (!do_read)
862 break;
863
864 SOCKBUF_UNLOCK(&so->so_rcv);
865 uio.uio_resid = sizeof(uint32_t);
866 m = NULL;
867 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
868 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
869 SOCKBUF_LOCK(&so->so_rcv);
870
871 if (error == EWOULDBLOCK)
872 break;
873
874 /*
875 * If there was an error, wake up all pending
876 * requests.
877 */
878 if (error || uio.uio_resid > 0) {
879 wakeup_all:
880 mtx_lock(&ct->ct_lock);
881 if (!error) {
882 /*
883 * We must have got EOF trying
884 * to read from the stream.
885 */
886 error = ECONNRESET;
887 }
888 ct->ct_error.re_status = RPC_CANTRECV;
889 ct->ct_error.re_errno = error;
890 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
891 cr->cr_error = error;
892 wakeup(cr);
893 }
894 mtx_unlock(&ct->ct_lock);
895 break;
896 }
897 bcopy(mtod(m, uint32_t *), &header, sizeof(uint32_t));
898 header = ntohl(header);
899 ct->ct_record = NULL;
900 ct->ct_record_resid = header & 0x7fffffff;
901 ct->ct_record_eor = ((header & 0x80000000) != 0);
902 m_freem(m);
903 } else {
904 /*
905 * Wait until the socket has the whole record
906 * buffered.
907 */
908 do_read = FALSE;
909 if (so->so_rcv.sb_cc >= ct->ct_record_resid
910 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
911 || so->so_error)
912 do_read = TRUE;
913
914 if (!do_read)
915 break;
916
917 /*
918 * We have the record mark. Read as much as
919 * the socket has buffered up to the end of
920 * this record.
921 */
922 SOCKBUF_UNLOCK(&so->so_rcv);
923 uio.uio_resid = ct->ct_record_resid;
924 m = NULL;
925 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
926 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
927 SOCKBUF_LOCK(&so->so_rcv);
928
929 if (error == EWOULDBLOCK)
930 break;
931
932 if (error || uio.uio_resid == ct->ct_record_resid)
933 goto wakeup_all;
934
935 /*
936 * If we have part of the record already,
937 * chain this bit onto the end.
938 */
939 if (ct->ct_record)
940 m_last(ct->ct_record)->m_next = m;
941 else
942 ct->ct_record = m;
943
944 ct->ct_record_resid = uio.uio_resid;
945
946 /*
947 * If we have the entire record, see if we can
948 * match it to a request.
949 */
950 if (ct->ct_record_resid == 0
951 && ct->ct_record_eor) {
952 /*
953 * The XID is in the first uint32_t of
954 * the reply.
955 */
956 if (ct->ct_record->m_len < sizeof(xid))
957 ct->ct_record =
958 m_pullup(ct->ct_record,
959 sizeof(xid));
960 if (!ct->ct_record)
961 break;
962 bcopy(mtod(ct->ct_record, uint32_t *),
963 &xid, sizeof(uint32_t));
964 xid = ntohl(xid);
965
966 mtx_lock(&ct->ct_lock);
967 foundreq = 0;
968 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
969 if (cr->cr_xid == xid) {
970 /*
971 * This one
972 * matches. We leave
973 * the reply mbuf in
974 * cr->cr_mrep. Set
975 * the XID to zero so
976 * that we will ignore
977 * any duplicaed
978 * replies.
979 */
980 cr->cr_xid = 0;
981 cr->cr_mrep = ct->ct_record;
982 cr->cr_error = 0;
983 foundreq = 1;
984 wakeup(cr);
985 break;
986 }
987 }
988 mtx_unlock(&ct->ct_lock);
989
990 if (!foundreq)
991 m_freem(ct->ct_record);
992 ct->ct_record = NULL;
993 }
994 }
995 } while (m);
996 ct->ct_upcallrefs--;
997 if (ct->ct_upcallrefs < 0)
998 panic("rpcvc upcall refcnt");
999 if (ct->ct_upcallrefs == 0)
1000 wakeup(&ct->ct_upcallrefs);
1001 return (SU_OK);
1002}
1003
1004/*
1005 * Wait for all upcalls in progress to complete.
1006 */
1007static void
1008clnt_vc_upcallsdone(struct ct_data *ct)
1009{
1010
1011 SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1012
1013 while (ct->ct_upcallrefs > 0)
1014 (void) msleep(&ct->ct_upcallrefs,
1015 SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1016}