Deleted Added
full compact
clnt_dg.c (195245) clnt_dg.c (196503)
1/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31/*
32 * Copyright (c) 1986-1991 by Sun Microsystems Inc.
33 */
34
35#if defined(LIBC_SCCS) && !defined(lint)
36#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI"
37static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
38#endif
39#include <sys/cdefs.h>
1/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31/*
32 * Copyright (c) 1986-1991 by Sun Microsystems Inc.
33 */
34
35#if defined(LIBC_SCCS) && !defined(lint)
36#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI"
37static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
38#endif
39#include <sys/cdefs.h>
40__FBSDID("$FreeBSD: head/sys/rpc/clnt_dg.c 195245 2009-07-01 16:38:18Z rmacklem $");
40__FBSDID("$FreeBSD: head/sys/rpc/clnt_dg.c 196503 2009-08-24 10:09:30Z zec $");
41
42/*
43 * Implements a connectionless client side RPC.
44 */
45
46#include <sys/param.h>
47#include <sys/systm.h>
48#include <sys/kernel.h>
49#include <sys/lock.h>
50#include <sys/malloc.h>
51#include <sys/mbuf.h>
52#include <sys/mutex.h>
53#include <sys/pcpu.h>
54#include <sys/proc.h>
55#include <sys/socket.h>
56#include <sys/socketvar.h>
57#include <sys/time.h>
58#include <sys/uio.h>
59
41
42/*
43 * Implements a connectionless client side RPC.
44 */
45
46#include <sys/param.h>
47#include <sys/systm.h>
48#include <sys/kernel.h>
49#include <sys/lock.h>
50#include <sys/malloc.h>
51#include <sys/mbuf.h>
52#include <sys/mutex.h>
53#include <sys/pcpu.h>
54#include <sys/proc.h>
55#include <sys/socket.h>
56#include <sys/socketvar.h>
57#include <sys/time.h>
58#include <sys/uio.h>
59
60#include <net/vnet.h>
61
60#include <rpc/rpc.h>
61#include <rpc/rpc_com.h>
62
63
64#ifdef _FREEFALL_CONFIG
65/*
66 * Disable RPC exponential back-off for FreeBSD.org systems.
67 */
68#define RPC_MAX_BACKOFF 1 /* second */
69#else
70#define RPC_MAX_BACKOFF 30 /* seconds */
71#endif
72
73static bool_t time_not_ok(struct timeval *);
74static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
75 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
76static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
77static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
78static void clnt_dg_abort(CLIENT *);
79static bool_t clnt_dg_control(CLIENT *, u_int, void *);
80static void clnt_dg_close(CLIENT *);
81static void clnt_dg_destroy(CLIENT *);
82static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
83
84static struct clnt_ops clnt_dg_ops = {
85 .cl_call = clnt_dg_call,
86 .cl_abort = clnt_dg_abort,
87 .cl_geterr = clnt_dg_geterr,
88 .cl_freeres = clnt_dg_freeres,
89 .cl_close = clnt_dg_close,
90 .cl_destroy = clnt_dg_destroy,
91 .cl_control = clnt_dg_control
92};
93
94static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
95
96/*
97 * A pending RPC request which awaits a reply. Requests which have
98 * received their reply will have cr_xid set to zero and cr_mrep to
99 * the mbuf chain of the reply.
100 */
101struct cu_request {
102 TAILQ_ENTRY(cu_request) cr_link;
103 CLIENT *cr_client; /* owner */
104 uint32_t cr_xid; /* XID of request */
105 struct mbuf *cr_mrep; /* reply received by upcall */
106 int cr_error; /* any error from upcall */
107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
108};
109
110TAILQ_HEAD(cu_request_list, cu_request);
111
112#define MCALL_MSG_SIZE 24
113
114/*
115 * This structure is pointed to by the socket buffer's sb_upcallarg
116 * member. It is separate from the client private data to facilitate
117 * multiple clients sharing the same socket. The cs_lock mutex is used
118 * to protect all fields of this structure, the socket's receive
119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
120 * structures is installed on the socket.
121 */
122struct cu_socket {
123 struct mtx cs_lock;
124 int cs_refs; /* Count of clients */
125 struct cu_request_list cs_pending; /* Requests awaiting replies */
126 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/
127};
128
129static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
130
131/*
132 * Private data kept per client handle
133 */
134struct cu_data {
135 int cu_threads; /* # threads in clnt_vc_call */
136 bool_t cu_closing; /* TRUE if we are closing */
137 bool_t cu_closed; /* TRUE if we are closed */
138 struct socket *cu_socket; /* connection socket */
139 bool_t cu_closeit; /* opened by library */
140 struct sockaddr_storage cu_raddr; /* remote address */
141 int cu_rlen;
142 struct timeval cu_wait; /* retransmit interval */
143 struct timeval cu_total; /* total time for the call */
144 struct rpc_err cu_error;
145 uint32_t cu_xid;
146 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
147 size_t cu_mcalllen;
148 size_t cu_sendsz; /* send size */
149 size_t cu_recvsz; /* recv size */
150 int cu_async;
151 int cu_connect; /* Use connect(). */
152 int cu_connected; /* Have done connect(). */
153 const char *cu_waitchan;
154 int cu_waitflag;
155 int cu_cwnd; /* congestion window */
156 int cu_sent; /* number of in-flight RPCs */
157 bool_t cu_cwnd_wait;
158};
159
160#define CWNDSCALE 256
161#define MAXCWND (32 * CWNDSCALE)
162
163/*
164 * Connection less client creation returns with client handle parameters.
165 * Default options are set, which the user can change using clnt_control().
166 * fd should be open and bound.
167 * NB: The rpch->cl_auth is initialized to null authentication.
168 * Caller may wish to set this something more useful.
169 *
170 * sendsz and recvsz are the maximum allowable packet sizes that can be
171 * sent and received. Normally they are the same, but they can be
172 * changed to improve the program efficiency and buffer allocation.
173 * If they are 0, use the transport default.
174 *
175 * If svcaddr is NULL, returns NULL.
176 */
177CLIENT *
178clnt_dg_create(
179 struct socket *so,
180 struct sockaddr *svcaddr, /* servers address */
181 rpcprog_t program, /* program number */
182 rpcvers_t version, /* version number */
183 size_t sendsz, /* buffer recv size */
184 size_t recvsz) /* buffer send size */
185{
186 CLIENT *cl = NULL; /* client handle */
187 struct cu_data *cu = NULL; /* private data */
188 struct cu_socket *cs = NULL;
189 struct sockbuf *sb;
190 struct timeval now;
191 struct rpc_msg call_msg;
192 struct __rpc_sockinfo si;
193 XDR xdrs;
194
195 if (svcaddr == NULL) {
196 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
197 return (NULL);
198 }
199
62#include <rpc/rpc.h>
63#include <rpc/rpc_com.h>
64
65
66#ifdef _FREEFALL_CONFIG
67/*
68 * Disable RPC exponential back-off for FreeBSD.org systems.
69 */
70#define RPC_MAX_BACKOFF 1 /* second */
71#else
72#define RPC_MAX_BACKOFF 30 /* seconds */
73#endif
74
75static bool_t time_not_ok(struct timeval *);
76static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
77 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
78static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
79static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
80static void clnt_dg_abort(CLIENT *);
81static bool_t clnt_dg_control(CLIENT *, u_int, void *);
82static void clnt_dg_close(CLIENT *);
83static void clnt_dg_destroy(CLIENT *);
84static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
85
86static struct clnt_ops clnt_dg_ops = {
87 .cl_call = clnt_dg_call,
88 .cl_abort = clnt_dg_abort,
89 .cl_geterr = clnt_dg_geterr,
90 .cl_freeres = clnt_dg_freeres,
91 .cl_close = clnt_dg_close,
92 .cl_destroy = clnt_dg_destroy,
93 .cl_control = clnt_dg_control
94};
95
96static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
97
98/*
99 * A pending RPC request which awaits a reply. Requests which have
100 * received their reply will have cr_xid set to zero and cr_mrep to
101 * the mbuf chain of the reply.
102 */
103struct cu_request {
104 TAILQ_ENTRY(cu_request) cr_link;
105 CLIENT *cr_client; /* owner */
106 uint32_t cr_xid; /* XID of request */
107 struct mbuf *cr_mrep; /* reply received by upcall */
108 int cr_error; /* any error from upcall */
109 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
110};
111
112TAILQ_HEAD(cu_request_list, cu_request);
113
114#define MCALL_MSG_SIZE 24
115
116/*
117 * This structure is pointed to by the socket buffer's sb_upcallarg
118 * member. It is separate from the client private data to facilitate
119 * multiple clients sharing the same socket. The cs_lock mutex is used
120 * to protect all fields of this structure, the socket's receive
121 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
122 * structures is installed on the socket.
123 */
124struct cu_socket {
125 struct mtx cs_lock;
126 int cs_refs; /* Count of clients */
127 struct cu_request_list cs_pending; /* Requests awaiting replies */
128 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/
129};
130
131static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
132
133/*
134 * Private data kept per client handle
135 */
136struct cu_data {
137 int cu_threads; /* # threads in clnt_vc_call */
138 bool_t cu_closing; /* TRUE if we are closing */
139 bool_t cu_closed; /* TRUE if we are closed */
140 struct socket *cu_socket; /* connection socket */
141 bool_t cu_closeit; /* opened by library */
142 struct sockaddr_storage cu_raddr; /* remote address */
143 int cu_rlen;
144 struct timeval cu_wait; /* retransmit interval */
145 struct timeval cu_total; /* total time for the call */
146 struct rpc_err cu_error;
147 uint32_t cu_xid;
148 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
149 size_t cu_mcalllen;
150 size_t cu_sendsz; /* send size */
151 size_t cu_recvsz; /* recv size */
152 int cu_async;
153 int cu_connect; /* Use connect(). */
154 int cu_connected; /* Have done connect(). */
155 const char *cu_waitchan;
156 int cu_waitflag;
157 int cu_cwnd; /* congestion window */
158 int cu_sent; /* number of in-flight RPCs */
159 bool_t cu_cwnd_wait;
160};
161
162#define CWNDSCALE 256
163#define MAXCWND (32 * CWNDSCALE)
164
165/*
166 * Connection less client creation returns with client handle parameters.
167 * Default options are set, which the user can change using clnt_control().
168 * fd should be open and bound.
169 * NB: The rpch->cl_auth is initialized to null authentication.
170 * Caller may wish to set this something more useful.
171 *
172 * sendsz and recvsz are the maximum allowable packet sizes that can be
173 * sent and received. Normally they are the same, but they can be
174 * changed to improve the program efficiency and buffer allocation.
175 * If they are 0, use the transport default.
176 *
177 * If svcaddr is NULL, returns NULL.
178 */
179CLIENT *
180clnt_dg_create(
181 struct socket *so,
182 struct sockaddr *svcaddr, /* servers address */
183 rpcprog_t program, /* program number */
184 rpcvers_t version, /* version number */
185 size_t sendsz, /* buffer recv size */
186 size_t recvsz) /* buffer send size */
187{
188 CLIENT *cl = NULL; /* client handle */
189 struct cu_data *cu = NULL; /* private data */
190 struct cu_socket *cs = NULL;
191 struct sockbuf *sb;
192 struct timeval now;
193 struct rpc_msg call_msg;
194 struct __rpc_sockinfo si;
195 XDR xdrs;
196
197 if (svcaddr == NULL) {
198 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
199 return (NULL);
200 }
201
202 CURVNET_SET(so->so_vnet);
200 if (!__rpc_socket2sockinfo(so, &si)) {
201 rpc_createerr.cf_stat = RPC_TLIERROR;
202 rpc_createerr.cf_error.re_errno = 0;
203 if (!__rpc_socket2sockinfo(so, &si)) {
204 rpc_createerr.cf_stat = RPC_TLIERROR;
205 rpc_createerr.cf_error.re_errno = 0;
206 CURVNET_RESTORE();
203 return (NULL);
204 }
207 return (NULL);
208 }
209 CURVNET_RESTORE();
205
206 /*
207 * Find the receive and the send size
208 */
209 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
210 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
211 if ((sendsz == 0) || (recvsz == 0)) {
212 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
213 rpc_createerr.cf_error.re_errno = 0;
214 return (NULL);
215 }
216
217 cl = mem_alloc(sizeof (CLIENT));
218
219 /*
220 * Should be multiple of 4 for XDR.
221 */
222 sendsz = ((sendsz + 3) / 4) * 4;
223 recvsz = ((recvsz + 3) / 4) * 4;
224 cu = mem_alloc(sizeof (*cu));
225 cu->cu_threads = 0;
226 cu->cu_closing = FALSE;
227 cu->cu_closed = FALSE;
228 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
229 cu->cu_rlen = svcaddr->sa_len;
230 /* Other values can also be set through clnt_control() */
231 cu->cu_wait.tv_sec = 3; /* heuristically chosen */
232 cu->cu_wait.tv_usec = 0;
233 cu->cu_total.tv_sec = -1;
234 cu->cu_total.tv_usec = -1;
235 cu->cu_sendsz = sendsz;
236 cu->cu_recvsz = recvsz;
237 cu->cu_async = FALSE;
238 cu->cu_connect = FALSE;
239 cu->cu_connected = FALSE;
240 cu->cu_waitchan = "rpcrecv";
241 cu->cu_waitflag = 0;
242 cu->cu_cwnd = MAXCWND / 2;
243 cu->cu_sent = 0;
244 cu->cu_cwnd_wait = FALSE;
245 (void) getmicrotime(&now);
246 cu->cu_xid = __RPC_GETXID(&now);
247 call_msg.rm_xid = cu->cu_xid;
248 call_msg.rm_call.cb_prog = program;
249 call_msg.rm_call.cb_vers = version;
250 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
251 if (! xdr_callhdr(&xdrs, &call_msg)) {
252 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */
253 rpc_createerr.cf_error.re_errno = 0;
254 goto err2;
255 }
256 cu->cu_mcalllen = XDR_GETPOS(&xdrs);;
257
258 /*
259 * By default, closeit is always FALSE. It is users responsibility
260 * to do a close on it, else the user may use clnt_control
261 * to let clnt_destroy do it for him/her.
262 */
263 cu->cu_closeit = FALSE;
264 cu->cu_socket = so;
265 soreserve(so, 256*1024, 256*1024);
266
267 sb = &so->so_rcv;
268 SOCKBUF_LOCK(&so->so_rcv);
269recheck_socket:
270 if (sb->sb_upcall) {
271 if (sb->sb_upcall != clnt_dg_soupcall) {
272 SOCKBUF_UNLOCK(&so->so_rcv);
273 printf("clnt_dg_create(): socket already has an incompatible upcall\n");
274 goto err2;
275 }
276 cs = (struct cu_socket *) sb->sb_upcallarg;
277 mtx_lock(&cs->cs_lock);
278 cs->cs_refs++;
279 mtx_unlock(&cs->cs_lock);
280 } else {
281 /*
282 * We are the first on this socket - allocate the
283 * structure and install it in the socket.
284 */
285 SOCKBUF_UNLOCK(&so->so_rcv);
286 cs = mem_alloc(sizeof(*cs));
287 SOCKBUF_LOCK(&so->so_rcv);
288 if (sb->sb_upcall) {
289 /*
290 * We have lost a race with some other client.
291 */
292 mem_free(cs, sizeof(*cs));
293 goto recheck_socket;
294 }
295 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
296 cs->cs_refs = 1;
297 cs->cs_upcallrefs = 0;
298 TAILQ_INIT(&cs->cs_pending);
299 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
300 }
301 SOCKBUF_UNLOCK(&so->so_rcv);
302
303 cl->cl_refs = 1;
304 cl->cl_ops = &clnt_dg_ops;
305 cl->cl_private = (caddr_t)(void *)cu;
306 cl->cl_auth = authnone_create();
307 cl->cl_tp = NULL;
308 cl->cl_netid = NULL;
309 return (cl);
310err2:
311 if (cl) {
312 mem_free(cl, sizeof (CLIENT));
313 if (cu)
314 mem_free(cu, sizeof (*cu));
315 }
316 return (NULL);
317}
318
319static enum clnt_stat
320clnt_dg_call(
321 CLIENT *cl, /* client handle */
322 struct rpc_callextra *ext, /* call metadata */
323 rpcproc_t proc, /* procedure number */
324 struct mbuf *args, /* pointer to args */
325 struct mbuf **resultsp, /* pointer to results */
326 struct timeval utimeout) /* seconds to wait before giving up */
327{
328 struct cu_data *cu = (struct cu_data *)cl->cl_private;
329 struct cu_socket *cs;
330 struct rpc_timers *rt;
331 AUTH *auth;
332 struct rpc_err *errp;
333 enum clnt_stat stat;
334 XDR xdrs;
335 struct rpc_msg reply_msg;
336 bool_t ok;
337 int retrans; /* number of re-transmits so far */
338 int nrefreshes = 2; /* number of times to refresh cred */
339 struct timeval *tvp;
340 int timeout;
341 int retransmit_time;
342 int next_sendtime, starttime, rtt, time_waited, tv = 0;
343 struct sockaddr *sa;
344 socklen_t salen;
345 uint32_t xid = 0;
346 struct mbuf *mreq = NULL, *results;
347 struct cu_request *cr;
348 int error;
349
350 cs = cu->cu_socket->so_rcv.sb_upcallarg;
351 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
352
353 mtx_lock(&cs->cs_lock);
354
355 if (cu->cu_closing || cu->cu_closed) {
356 mtx_unlock(&cs->cs_lock);
357 free(cr, M_RPC);
358 return (RPC_CANTSEND);
359 }
360 cu->cu_threads++;
361
362 if (ext) {
363 auth = ext->rc_auth;
364 errp = &ext->rc_err;
365 } else {
366 auth = cl->cl_auth;
367 errp = &cu->cu_error;
368 }
369
370 cr->cr_client = cl;
371 cr->cr_mrep = NULL;
372 cr->cr_error = 0;
373
374 if (cu->cu_total.tv_usec == -1) {
375 tvp = &utimeout; /* use supplied timeout */
376 } else {
377 tvp = &cu->cu_total; /* use default timeout */
378 }
379 if (tvp->tv_sec || tvp->tv_usec)
380 timeout = tvtohz(tvp);
381 else
382 timeout = 0;
383
384 if (cu->cu_connect && !cu->cu_connected) {
385 mtx_unlock(&cs->cs_lock);
386 error = soconnect(cu->cu_socket,
387 (struct sockaddr *)&cu->cu_raddr, curthread);
388 mtx_lock(&cs->cs_lock);
389 if (error) {
390 errp->re_errno = error;
391 errp->re_status = stat = RPC_CANTSEND;
392 goto out;
393 }
394 cu->cu_connected = 1;
395 }
396 if (cu->cu_connected) {
397 sa = NULL;
398 salen = 0;
399 } else {
400 sa = (struct sockaddr *)&cu->cu_raddr;
401 salen = cu->cu_rlen;
402 }
403 time_waited = 0;
404 retrans = 0;
405 if (ext && ext->rc_timers) {
406 rt = ext->rc_timers;
407 if (!rt->rt_rtxcur)
408 rt->rt_rtxcur = tvtohz(&cu->cu_wait);
409 retransmit_time = next_sendtime = rt->rt_rtxcur;
410 } else {
411 rt = NULL;
412 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
413 }
414
415 starttime = ticks;
416
417call_again:
418 mtx_assert(&cs->cs_lock, MA_OWNED);
419
420 cu->cu_xid++;
421 xid = cu->cu_xid;
422
423send_again:
424 mtx_unlock(&cs->cs_lock);
425
426 MGETHDR(mreq, M_WAIT, MT_DATA);
427 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
428 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
429 mreq->m_len = cu->cu_mcalllen;
430
431 /*
432 * The XID is the first thing in the request.
433 */
434 *mtod(mreq, uint32_t *) = htonl(xid);
435
436 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
437
438 if (cu->cu_async == TRUE && args == NULL)
439 goto get_reply;
440
441 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
442 (! AUTH_MARSHALL(auth, xid, &xdrs,
443 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
444 errp->re_status = stat = RPC_CANTENCODEARGS;
445 mtx_lock(&cs->cs_lock);
446 goto out;
447 }
448 mreq->m_pkthdr.len = m_length(mreq, NULL);
449
450 cr->cr_xid = xid;
451 mtx_lock(&cs->cs_lock);
452
453 /*
454 * Try to get a place in the congestion window.
455 */
456 while (cu->cu_sent >= cu->cu_cwnd) {
457 cu->cu_cwnd_wait = TRUE;
458 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
459 cu->cu_waitflag, "rpccwnd", 0);
460 if (error) {
461 errp->re_errno = error;
462 errp->re_status = stat = RPC_CANTSEND;
463 goto out;
464 }
465 }
466 cu->cu_sent += CWNDSCALE;
467
468 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
469 mtx_unlock(&cs->cs_lock);
470
471 /*
472 * sosend consumes mreq.
473 */
474 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
475 mreq = NULL;
476
477 /*
478 * sub-optimal code appears here because we have
479 * some clock time to spare while the packets are in flight.
480 * (We assume that this is actually only executed once.)
481 */
482 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
483 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
484 reply_msg.acpted_rply.ar_verf.oa_length = 0;
485 reply_msg.acpted_rply.ar_results.where = NULL;
486 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
487
488 mtx_lock(&cs->cs_lock);
489 if (error) {
490 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
491 errp->re_errno = error;
492 errp->re_status = stat = RPC_CANTSEND;
493 cu->cu_sent -= CWNDSCALE;
494 if (cu->cu_cwnd_wait) {
495 cu->cu_cwnd_wait = FALSE;
496 wakeup(&cu->cu_cwnd_wait);
497 }
498 goto out;
499 }
500
501 /*
502 * Check to see if we got an upcall while waiting for the
503 * lock.
504 */
505 if (cr->cr_error) {
506 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
507 errp->re_errno = cr->cr_error;
508 errp->re_status = stat = RPC_CANTRECV;
509 cu->cu_sent -= CWNDSCALE;
510 if (cu->cu_cwnd_wait) {
511 cu->cu_cwnd_wait = FALSE;
512 wakeup(&cu->cu_cwnd_wait);
513 }
514 goto out;
515 }
516 if (cr->cr_mrep) {
517 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
518 cu->cu_sent -= CWNDSCALE;
519 if (cu->cu_cwnd_wait) {
520 cu->cu_cwnd_wait = FALSE;
521 wakeup(&cu->cu_cwnd_wait);
522 }
523 goto got_reply;
524 }
525
526 /*
527 * Hack to provide rpc-based message passing
528 */
529 if (timeout == 0) {
530 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
531 errp->re_status = stat = RPC_TIMEDOUT;
532 cu->cu_sent -= CWNDSCALE;
533 if (cu->cu_cwnd_wait) {
534 cu->cu_cwnd_wait = FALSE;
535 wakeup(&cu->cu_cwnd_wait);
536 }
537 goto out;
538 }
539
540get_reply:
541 for (;;) {
542 /* Decide how long to wait. */
543 if (next_sendtime < timeout)
544 tv = next_sendtime;
545 else
546 tv = timeout;
547 tv -= time_waited;
548
549 if (tv > 0) {
550 if (cu->cu_closing || cu->cu_closed) {
551 error = 0;
552 cr->cr_error = ESHUTDOWN;
553 } else {
554 error = msleep(cr, &cs->cs_lock,
555 cu->cu_waitflag, cu->cu_waitchan, tv);
556 }
557 } else {
558 error = EWOULDBLOCK;
559 }
560
561 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
562 cu->cu_sent -= CWNDSCALE;
563 if (cu->cu_cwnd_wait) {
564 cu->cu_cwnd_wait = FALSE;
565 wakeup(&cu->cu_cwnd_wait);
566 }
567
568 if (!error) {
569 /*
570 * We were woken up by the upcall. If the
571 * upcall had a receive error, report that,
572 * otherwise we have a reply.
573 */
574 if (cr->cr_error) {
575 errp->re_errno = cr->cr_error;
576 errp->re_status = stat = RPC_CANTRECV;
577 goto out;
578 }
579
580 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
581 + cu->cu_cwnd / 2) / cu->cu_cwnd;
582 if (cu->cu_cwnd > MAXCWND)
583 cu->cu_cwnd = MAXCWND;
584
585 if (rt) {
586 /*
587 * Add one to the time since a tick
588 * count of N means that the actual
589 * time taken was somewhere between N
590 * and N+1.
591 */
592 rtt = ticks - starttime + 1;
593
594 /*
595 * Update our estimate of the round
596 * trip time using roughly the
597 * algorithm described in RFC
598 * 2988. Given an RTT sample R:
599 *
600 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
601 * SRTT = (1-alpha) * SRTT + alpha * R
602 *
603 * where alpha = 0.125 and beta = 0.25.
604 *
605 * The initial retransmit timeout is
606 * SRTT + 4*RTTVAR and doubles on each
607 * retransmision.
608 */
609 if (rt->rt_srtt == 0) {
610 rt->rt_srtt = rtt;
611 rt->rt_deviate = rtt / 2;
612 } else {
613 int32_t error = rtt - rt->rt_srtt;
614 rt->rt_srtt += error / 8;
615 error = abs(error) - rt->rt_deviate;
616 rt->rt_deviate += error / 4;
617 }
618 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
619 }
620
621 break;
622 }
623
624 /*
625 * The sleep returned an error so our request is still
626 * on the list. If we got EWOULDBLOCK, we may want to
627 * re-send the request.
628 */
629 if (error != EWOULDBLOCK) {
630 errp->re_errno = error;
631 if (error == EINTR)
632 errp->re_status = stat = RPC_INTR;
633 else
634 errp->re_status = stat = RPC_CANTRECV;
635 goto out;
636 }
637
638 time_waited = ticks - starttime;
639
640 /* Check for timeout. */
641 if (time_waited > timeout) {
642 errp->re_errno = EWOULDBLOCK;
643 errp->re_status = stat = RPC_TIMEDOUT;
644 goto out;
645 }
646
647 /* Retransmit if necessary. */
648 if (time_waited >= next_sendtime) {
649 cu->cu_cwnd /= 2;
650 if (cu->cu_cwnd < CWNDSCALE)
651 cu->cu_cwnd = CWNDSCALE;
652 if (ext && ext->rc_feedback) {
653 mtx_unlock(&cs->cs_lock);
654 if (retrans == 0)
655 ext->rc_feedback(FEEDBACK_REXMIT1,
656 proc, ext->rc_feedback_arg);
657 else
658 ext->rc_feedback(FEEDBACK_REXMIT2,
659 proc, ext->rc_feedback_arg);
660 mtx_lock(&cs->cs_lock);
661 }
662 if (cu->cu_closing || cu->cu_closed) {
663 errp->re_errno = ESHUTDOWN;
664 errp->re_status = stat = RPC_CANTRECV;
665 goto out;
666 }
667 retrans++;
668 /* update retransmit_time */
669 if (retransmit_time < RPC_MAX_BACKOFF * hz)
670 retransmit_time = 2 * retransmit_time;
671 next_sendtime += retransmit_time;
672 goto send_again;
673 }
674 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
675 }
676
677got_reply:
678 /*
679 * Now decode and validate the response. We need to drop the
680 * lock since xdr_replymsg may end up sleeping in malloc.
681 */
682 mtx_unlock(&cs->cs_lock);
683
684 if (ext && ext->rc_feedback)
685 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
686
687 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
688 ok = xdr_replymsg(&xdrs, &reply_msg);
689 cr->cr_mrep = NULL;
690
691 if (ok) {
692 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
693 (reply_msg.acpted_rply.ar_stat == SUCCESS))
694 errp->re_status = stat = RPC_SUCCESS;
695 else
696 stat = _seterr_reply(&reply_msg, &(cu->cu_error));
697
698 if (errp->re_status == RPC_SUCCESS) {
699 results = xdrmbuf_getall(&xdrs);
700 if (! AUTH_VALIDATE(auth, xid,
701 &reply_msg.acpted_rply.ar_verf,
702 &results)) {
703 errp->re_status = stat = RPC_AUTHERROR;
704 errp->re_why = AUTH_INVALIDRESP;
705 if (retrans &&
706 auth->ah_cred.oa_flavor == RPCSEC_GSS) {
707 /*
708 * If we retransmitted, its
709 * possible that we will
710 * receive a reply for one of
711 * the earlier transmissions
712 * (which will use an older
713 * RPCSEC_GSS sequence
714 * number). In this case, just
715 * go back and listen for a
716 * new reply. We could keep a
717 * record of all the seq
718 * numbers we have transmitted
719 * so far so that we could
720 * accept a reply for any of
721 * them here.
722 */
723 XDR_DESTROY(&xdrs);
724 mtx_lock(&cs->cs_lock);
725 TAILQ_INSERT_TAIL(&cs->cs_pending,
726 cr, cr_link);
727 cr->cr_mrep = NULL;
728 goto get_reply;
729 }
730 } else {
731 *resultsp = results;
732 }
733 } /* end successful completion */
734 /*
735 * If unsuccesful AND error is an authentication error
736 * then refresh credentials and try again, else break
737 */
738 else if (stat == RPC_AUTHERROR)
739 /* maybe our credentials need to be refreshed ... */
740 if (nrefreshes > 0 &&
741 AUTH_REFRESH(auth, &reply_msg)) {
742 nrefreshes--;
743 XDR_DESTROY(&xdrs);
744 mtx_lock(&cs->cs_lock);
745 goto call_again;
746 }
747 /* end of unsuccessful completion */
748 } /* end of valid reply message */
749 else {
750 errp->re_status = stat = RPC_CANTDECODERES;
751
752 }
753 XDR_DESTROY(&xdrs);
754 mtx_lock(&cs->cs_lock);
755out:
756 mtx_assert(&cs->cs_lock, MA_OWNED);
757
758 if (mreq)
759 m_freem(mreq);
760 if (cr->cr_mrep)
761 m_freem(cr->cr_mrep);
762
763 cu->cu_threads--;
764 if (cu->cu_closing)
765 wakeup(cu);
766
767 mtx_unlock(&cs->cs_lock);
768
769 if (auth && stat != RPC_SUCCESS)
770 AUTH_VALIDATE(auth, xid, NULL, NULL);
771
772 free(cr, M_RPC);
773
774 return (stat);
775}
776
777static void
778clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
779{
780 struct cu_data *cu = (struct cu_data *)cl->cl_private;
781
782 *errp = cu->cu_error;
783}
784
785static bool_t
786clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
787{
788 XDR xdrs;
789 bool_t dummy;
790
791 xdrs.x_op = XDR_FREE;
792 dummy = (*xdr_res)(&xdrs, res_ptr);
793
794 return (dummy);
795}
796
797/*ARGSUSED*/
798static void
799clnt_dg_abort(CLIENT *h)
800{
801}
802
803static bool_t
804clnt_dg_control(CLIENT *cl, u_int request, void *info)
805{
806 struct cu_data *cu = (struct cu_data *)cl->cl_private;
807 struct cu_socket *cs;
808 struct sockaddr *addr;
809
810 cs = cu->cu_socket->so_rcv.sb_upcallarg;
811 mtx_lock(&cs->cs_lock);
812
813 switch (request) {
814 case CLSET_FD_CLOSE:
815 cu->cu_closeit = TRUE;
816 mtx_unlock(&cs->cs_lock);
817 return (TRUE);
818 case CLSET_FD_NCLOSE:
819 cu->cu_closeit = FALSE;
820 mtx_unlock(&cs->cs_lock);
821 return (TRUE);
822 }
823
824 /* for other requests which use info */
825 if (info == NULL) {
826 mtx_unlock(&cs->cs_lock);
827 return (FALSE);
828 }
829 switch (request) {
830 case CLSET_TIMEOUT:
831 if (time_not_ok((struct timeval *)info)) {
832 mtx_unlock(&cs->cs_lock);
833 return (FALSE);
834 }
835 cu->cu_total = *(struct timeval *)info;
836 break;
837 case CLGET_TIMEOUT:
838 *(struct timeval *)info = cu->cu_total;
839 break;
840 case CLSET_RETRY_TIMEOUT:
841 if (time_not_ok((struct timeval *)info)) {
842 mtx_unlock(&cs->cs_lock);
843 return (FALSE);
844 }
845 cu->cu_wait = *(struct timeval *)info;
846 break;
847 case CLGET_RETRY_TIMEOUT:
848 *(struct timeval *)info = cu->cu_wait;
849 break;
850 case CLGET_SVC_ADDR:
851 /*
852 * Slightly different semantics to userland - we use
853 * sockaddr instead of netbuf.
854 */
855 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
856 break;
857 case CLSET_SVC_ADDR: /* set to new address */
858 addr = (struct sockaddr *)info;
859 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
860 break;
861 case CLGET_XID:
862 *(uint32_t *)info = cu->cu_xid;
863 break;
864
865 case CLSET_XID:
866 /* This will set the xid of the NEXT call */
867 /* decrement by 1 as clnt_dg_call() increments once */
868 cu->cu_xid = *(uint32_t *)info - 1;
869 break;
870
871 case CLGET_VERS:
872 /*
873 * This RELIES on the information that, in the call body,
874 * the version number field is the fifth field from the
875 * begining of the RPC header. MUST be changed if the
876 * call_struct is changed
877 */
878 *(uint32_t *)info =
879 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
880 4 * BYTES_PER_XDR_UNIT));
881 break;
882
883 case CLSET_VERS:
884 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
885 = htonl(*(uint32_t *)info);
886 break;
887
888 case CLGET_PROG:
889 /*
890 * This RELIES on the information that, in the call body,
891 * the program number field is the fourth field from the
892 * begining of the RPC header. MUST be changed if the
893 * call_struct is changed
894 */
895 *(uint32_t *)info =
896 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
897 3 * BYTES_PER_XDR_UNIT));
898 break;
899
900 case CLSET_PROG:
901 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
902 = htonl(*(uint32_t *)info);
903 break;
904 case CLSET_ASYNC:
905 cu->cu_async = *(int *)info;
906 break;
907 case CLSET_CONNECT:
908 cu->cu_connect = *(int *)info;
909 break;
910 case CLSET_WAITCHAN:
911 cu->cu_waitchan = (const char *)info;
912 break;
913 case CLGET_WAITCHAN:
914 *(const char **) info = cu->cu_waitchan;
915 break;
916 case CLSET_INTERRUPTIBLE:
917 if (*(int *) info)
918 cu->cu_waitflag = PCATCH;
919 else
920 cu->cu_waitflag = 0;
921 break;
922 case CLGET_INTERRUPTIBLE:
923 if (cu->cu_waitflag)
924 *(int *) info = TRUE;
925 else
926 *(int *) info = FALSE;
927 break;
928 default:
929 mtx_unlock(&cs->cs_lock);
930 return (FALSE);
931 }
932 mtx_unlock(&cs->cs_lock);
933 return (TRUE);
934}
935
936static void
937clnt_dg_close(CLIENT *cl)
938{
939 struct cu_data *cu = (struct cu_data *)cl->cl_private;
940 struct cu_socket *cs;
941 struct cu_request *cr;
942
943 cs = cu->cu_socket->so_rcv.sb_upcallarg;
944 mtx_lock(&cs->cs_lock);
945
946 if (cu->cu_closed) {
947 mtx_unlock(&cs->cs_lock);
948 return;
949 }
950
951 if (cu->cu_closing) {
952 while (cu->cu_closing)
953 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
954 KASSERT(cu->cu_closed, ("client should be closed"));
955 mtx_unlock(&cs->cs_lock);
956 return;
957 }
958
959 /*
960 * Abort any pending requests and wait until everyone
961 * has finished with clnt_vc_call.
962 */
963 cu->cu_closing = TRUE;
964 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
965 if (cr->cr_client == cl) {
966 cr->cr_xid = 0;
967 cr->cr_error = ESHUTDOWN;
968 wakeup(cr);
969 }
970 }
971
972 while (cu->cu_threads)
973 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
974
975 cu->cu_closing = FALSE;
976 cu->cu_closed = TRUE;
977
978 mtx_unlock(&cs->cs_lock);
979 wakeup(cu);
980}
981
982static void
983clnt_dg_destroy(CLIENT *cl)
984{
985 struct cu_data *cu = (struct cu_data *)cl->cl_private;
986 struct cu_socket *cs;
987 struct socket *so = NULL;
988 bool_t lastsocketref;
989
990 cs = cu->cu_socket->so_rcv.sb_upcallarg;
991 clnt_dg_close(cl);
992
993 mtx_lock(&cs->cs_lock);
994
995 cs->cs_refs--;
996 if (cs->cs_refs == 0) {
997 mtx_unlock(&cs->cs_lock);
998 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
999 soupcall_clear(cu->cu_socket, SO_RCV);
1000 clnt_dg_upcallsdone(cu->cu_socket, cs);
1001 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1002 mtx_destroy(&cs->cs_lock);
1003 mem_free(cs, sizeof(*cs));
1004 lastsocketref = TRUE;
1005 } else {
1006 mtx_unlock(&cs->cs_lock);
1007 lastsocketref = FALSE;
1008 }
1009
1010 if (cu->cu_closeit && lastsocketref) {
1011 so = cu->cu_socket;
1012 cu->cu_socket = NULL;
1013 }
1014
1015 if (so)
1016 soclose(so);
1017
1018 if (cl->cl_netid && cl->cl_netid[0])
1019 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1020 if (cl->cl_tp && cl->cl_tp[0])
1021 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1022 mem_free(cu, sizeof (*cu));
1023 mem_free(cl, sizeof (CLIENT));
1024}
1025
1026/*
1027 * Make sure that the time is not garbage. -1 value is allowed.
1028 */
1029static bool_t
1030time_not_ok(struct timeval *t)
1031{
1032 return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1033 t->tv_usec < -1 || t->tv_usec > 1000000);
1034}
1035
1036int
1037clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1038{
1039 struct cu_socket *cs = (struct cu_socket *) arg;
1040 struct uio uio;
1041 struct mbuf *m;
1042 struct mbuf *control;
1043 struct cu_request *cr;
1044 int error, rcvflag, foundreq;
1045 uint32_t xid;
1046
1047 cs->cs_upcallrefs++;
1048 uio.uio_resid = 1000000000;
1049 uio.uio_td = curthread;
1050 do {
1051 SOCKBUF_UNLOCK(&so->so_rcv);
1052 m = NULL;
1053 control = NULL;
1054 rcvflag = MSG_DONTWAIT;
1055 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1056 if (control)
1057 m_freem(control);
1058 SOCKBUF_LOCK(&so->so_rcv);
1059
1060 if (error == EWOULDBLOCK)
1061 break;
1062
1063 /*
1064 * If there was an error, wake up all pending
1065 * requests.
1066 */
1067 if (error) {
1068 mtx_lock(&cs->cs_lock);
1069 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1070 cr->cr_xid = 0;
1071 cr->cr_error = error;
1072 wakeup(cr);
1073 }
1074 mtx_unlock(&cs->cs_lock);
1075 break;
1076 }
1077
1078 /*
1079 * The XID is in the first uint32_t of the reply.
1080 */
1081 if (m->m_len < sizeof(xid))
1082 m = m_pullup(m, sizeof(xid));
1083 if (!m)
1084 /*
1085 * Should never happen.
1086 */
1087 continue;
1088
1089 xid = ntohl(*mtod(m, uint32_t *));
1090
1091 /*
1092 * Attempt to match this reply with a pending request.
1093 */
1094 mtx_lock(&cs->cs_lock);
1095 foundreq = 0;
1096 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1097 if (cr->cr_xid == xid) {
1098 /*
1099 * This one matches. We leave the
1100 * reply mbuf in cr->cr_mrep. Set the
1101 * XID to zero so that we will ignore
1102 * any duplicated replies that arrive
1103 * before clnt_dg_call removes it from
1104 * the queue.
1105 */
1106 cr->cr_xid = 0;
1107 cr->cr_mrep = m;
1108 cr->cr_error = 0;
1109 foundreq = 1;
1110 wakeup(cr);
1111 break;
1112 }
1113 }
1114 mtx_unlock(&cs->cs_lock);
1115
1116 /*
1117 * If we didn't find the matching request, just drop
1118 * it - its probably a repeated reply.
1119 */
1120 if (!foundreq)
1121 m_freem(m);
1122 } while (m);
1123 cs->cs_upcallrefs--;
1124 if (cs->cs_upcallrefs < 0)
1125 panic("rpcdg upcall refcnt");
1126 if (cs->cs_upcallrefs == 0)
1127 wakeup(&cs->cs_upcallrefs);
1128 return (SU_OK);
1129}
1130
1131/*
1132 * Wait for all upcalls in progress to complete.
1133 */
1134static void
1135clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
1136{
1137
1138 SOCKBUF_LOCK_ASSERT(&so->so_rcv);
1139
1140 while (cs->cs_upcallrefs > 0)
1141 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
1142 "rpcdgup", 0);
1143}
210
211 /*
212 * Find the receive and the send size
213 */
214 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
215 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
216 if ((sendsz == 0) || (recvsz == 0)) {
217 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
218 rpc_createerr.cf_error.re_errno = 0;
219 return (NULL);
220 }
221
222 cl = mem_alloc(sizeof (CLIENT));
223
224 /*
225 * Should be multiple of 4 for XDR.
226 */
227 sendsz = ((sendsz + 3) / 4) * 4;
228 recvsz = ((recvsz + 3) / 4) * 4;
229 cu = mem_alloc(sizeof (*cu));
230 cu->cu_threads = 0;
231 cu->cu_closing = FALSE;
232 cu->cu_closed = FALSE;
233 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
234 cu->cu_rlen = svcaddr->sa_len;
235 /* Other values can also be set through clnt_control() */
236 cu->cu_wait.tv_sec = 3; /* heuristically chosen */
237 cu->cu_wait.tv_usec = 0;
238 cu->cu_total.tv_sec = -1;
239 cu->cu_total.tv_usec = -1;
240 cu->cu_sendsz = sendsz;
241 cu->cu_recvsz = recvsz;
242 cu->cu_async = FALSE;
243 cu->cu_connect = FALSE;
244 cu->cu_connected = FALSE;
245 cu->cu_waitchan = "rpcrecv";
246 cu->cu_waitflag = 0;
247 cu->cu_cwnd = MAXCWND / 2;
248 cu->cu_sent = 0;
249 cu->cu_cwnd_wait = FALSE;
250 (void) getmicrotime(&now);
251 cu->cu_xid = __RPC_GETXID(&now);
252 call_msg.rm_xid = cu->cu_xid;
253 call_msg.rm_call.cb_prog = program;
254 call_msg.rm_call.cb_vers = version;
255 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
256 if (! xdr_callhdr(&xdrs, &call_msg)) {
257 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */
258 rpc_createerr.cf_error.re_errno = 0;
259 goto err2;
260 }
261 cu->cu_mcalllen = XDR_GETPOS(&xdrs);;
262
263 /*
264 * By default, closeit is always FALSE. It is users responsibility
265 * to do a close on it, else the user may use clnt_control
266 * to let clnt_destroy do it for him/her.
267 */
268 cu->cu_closeit = FALSE;
269 cu->cu_socket = so;
270 soreserve(so, 256*1024, 256*1024);
271
272 sb = &so->so_rcv;
273 SOCKBUF_LOCK(&so->so_rcv);
274recheck_socket:
275 if (sb->sb_upcall) {
276 if (sb->sb_upcall != clnt_dg_soupcall) {
277 SOCKBUF_UNLOCK(&so->so_rcv);
278 printf("clnt_dg_create(): socket already has an incompatible upcall\n");
279 goto err2;
280 }
281 cs = (struct cu_socket *) sb->sb_upcallarg;
282 mtx_lock(&cs->cs_lock);
283 cs->cs_refs++;
284 mtx_unlock(&cs->cs_lock);
285 } else {
286 /*
287 * We are the first on this socket - allocate the
288 * structure and install it in the socket.
289 */
290 SOCKBUF_UNLOCK(&so->so_rcv);
291 cs = mem_alloc(sizeof(*cs));
292 SOCKBUF_LOCK(&so->so_rcv);
293 if (sb->sb_upcall) {
294 /*
295 * We have lost a race with some other client.
296 */
297 mem_free(cs, sizeof(*cs));
298 goto recheck_socket;
299 }
300 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
301 cs->cs_refs = 1;
302 cs->cs_upcallrefs = 0;
303 TAILQ_INIT(&cs->cs_pending);
304 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
305 }
306 SOCKBUF_UNLOCK(&so->so_rcv);
307
308 cl->cl_refs = 1;
309 cl->cl_ops = &clnt_dg_ops;
310 cl->cl_private = (caddr_t)(void *)cu;
311 cl->cl_auth = authnone_create();
312 cl->cl_tp = NULL;
313 cl->cl_netid = NULL;
314 return (cl);
315err2:
316 if (cl) {
317 mem_free(cl, sizeof (CLIENT));
318 if (cu)
319 mem_free(cu, sizeof (*cu));
320 }
321 return (NULL);
322}
323
324static enum clnt_stat
325clnt_dg_call(
326 CLIENT *cl, /* client handle */
327 struct rpc_callextra *ext, /* call metadata */
328 rpcproc_t proc, /* procedure number */
329 struct mbuf *args, /* pointer to args */
330 struct mbuf **resultsp, /* pointer to results */
331 struct timeval utimeout) /* seconds to wait before giving up */
332{
333 struct cu_data *cu = (struct cu_data *)cl->cl_private;
334 struct cu_socket *cs;
335 struct rpc_timers *rt;
336 AUTH *auth;
337 struct rpc_err *errp;
338 enum clnt_stat stat;
339 XDR xdrs;
340 struct rpc_msg reply_msg;
341 bool_t ok;
342 int retrans; /* number of re-transmits so far */
343 int nrefreshes = 2; /* number of times to refresh cred */
344 struct timeval *tvp;
345 int timeout;
346 int retransmit_time;
347 int next_sendtime, starttime, rtt, time_waited, tv = 0;
348 struct sockaddr *sa;
349 socklen_t salen;
350 uint32_t xid = 0;
351 struct mbuf *mreq = NULL, *results;
352 struct cu_request *cr;
353 int error;
354
355 cs = cu->cu_socket->so_rcv.sb_upcallarg;
356 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
357
358 mtx_lock(&cs->cs_lock);
359
360 if (cu->cu_closing || cu->cu_closed) {
361 mtx_unlock(&cs->cs_lock);
362 free(cr, M_RPC);
363 return (RPC_CANTSEND);
364 }
365 cu->cu_threads++;
366
367 if (ext) {
368 auth = ext->rc_auth;
369 errp = &ext->rc_err;
370 } else {
371 auth = cl->cl_auth;
372 errp = &cu->cu_error;
373 }
374
375 cr->cr_client = cl;
376 cr->cr_mrep = NULL;
377 cr->cr_error = 0;
378
379 if (cu->cu_total.tv_usec == -1) {
380 tvp = &utimeout; /* use supplied timeout */
381 } else {
382 tvp = &cu->cu_total; /* use default timeout */
383 }
384 if (tvp->tv_sec || tvp->tv_usec)
385 timeout = tvtohz(tvp);
386 else
387 timeout = 0;
388
389 if (cu->cu_connect && !cu->cu_connected) {
390 mtx_unlock(&cs->cs_lock);
391 error = soconnect(cu->cu_socket,
392 (struct sockaddr *)&cu->cu_raddr, curthread);
393 mtx_lock(&cs->cs_lock);
394 if (error) {
395 errp->re_errno = error;
396 errp->re_status = stat = RPC_CANTSEND;
397 goto out;
398 }
399 cu->cu_connected = 1;
400 }
401 if (cu->cu_connected) {
402 sa = NULL;
403 salen = 0;
404 } else {
405 sa = (struct sockaddr *)&cu->cu_raddr;
406 salen = cu->cu_rlen;
407 }
408 time_waited = 0;
409 retrans = 0;
410 if (ext && ext->rc_timers) {
411 rt = ext->rc_timers;
412 if (!rt->rt_rtxcur)
413 rt->rt_rtxcur = tvtohz(&cu->cu_wait);
414 retransmit_time = next_sendtime = rt->rt_rtxcur;
415 } else {
416 rt = NULL;
417 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
418 }
419
420 starttime = ticks;
421
422call_again:
423 mtx_assert(&cs->cs_lock, MA_OWNED);
424
425 cu->cu_xid++;
426 xid = cu->cu_xid;
427
428send_again:
429 mtx_unlock(&cs->cs_lock);
430
431 MGETHDR(mreq, M_WAIT, MT_DATA);
432 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
433 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
434 mreq->m_len = cu->cu_mcalllen;
435
436 /*
437 * The XID is the first thing in the request.
438 */
439 *mtod(mreq, uint32_t *) = htonl(xid);
440
441 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
442
443 if (cu->cu_async == TRUE && args == NULL)
444 goto get_reply;
445
446 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
447 (! AUTH_MARSHALL(auth, xid, &xdrs,
448 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
449 errp->re_status = stat = RPC_CANTENCODEARGS;
450 mtx_lock(&cs->cs_lock);
451 goto out;
452 }
453 mreq->m_pkthdr.len = m_length(mreq, NULL);
454
455 cr->cr_xid = xid;
456 mtx_lock(&cs->cs_lock);
457
458 /*
459 * Try to get a place in the congestion window.
460 */
461 while (cu->cu_sent >= cu->cu_cwnd) {
462 cu->cu_cwnd_wait = TRUE;
463 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
464 cu->cu_waitflag, "rpccwnd", 0);
465 if (error) {
466 errp->re_errno = error;
467 errp->re_status = stat = RPC_CANTSEND;
468 goto out;
469 }
470 }
471 cu->cu_sent += CWNDSCALE;
472
473 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
474 mtx_unlock(&cs->cs_lock);
475
476 /*
477 * sosend consumes mreq.
478 */
479 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
480 mreq = NULL;
481
482 /*
483 * sub-optimal code appears here because we have
484 * some clock time to spare while the packets are in flight.
485 * (We assume that this is actually only executed once.)
486 */
487 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
488 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
489 reply_msg.acpted_rply.ar_verf.oa_length = 0;
490 reply_msg.acpted_rply.ar_results.where = NULL;
491 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
492
493 mtx_lock(&cs->cs_lock);
494 if (error) {
495 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
496 errp->re_errno = error;
497 errp->re_status = stat = RPC_CANTSEND;
498 cu->cu_sent -= CWNDSCALE;
499 if (cu->cu_cwnd_wait) {
500 cu->cu_cwnd_wait = FALSE;
501 wakeup(&cu->cu_cwnd_wait);
502 }
503 goto out;
504 }
505
506 /*
507 * Check to see if we got an upcall while waiting for the
508 * lock.
509 */
510 if (cr->cr_error) {
511 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
512 errp->re_errno = cr->cr_error;
513 errp->re_status = stat = RPC_CANTRECV;
514 cu->cu_sent -= CWNDSCALE;
515 if (cu->cu_cwnd_wait) {
516 cu->cu_cwnd_wait = FALSE;
517 wakeup(&cu->cu_cwnd_wait);
518 }
519 goto out;
520 }
521 if (cr->cr_mrep) {
522 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
523 cu->cu_sent -= CWNDSCALE;
524 if (cu->cu_cwnd_wait) {
525 cu->cu_cwnd_wait = FALSE;
526 wakeup(&cu->cu_cwnd_wait);
527 }
528 goto got_reply;
529 }
530
531 /*
532 * Hack to provide rpc-based message passing
533 */
534 if (timeout == 0) {
535 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
536 errp->re_status = stat = RPC_TIMEDOUT;
537 cu->cu_sent -= CWNDSCALE;
538 if (cu->cu_cwnd_wait) {
539 cu->cu_cwnd_wait = FALSE;
540 wakeup(&cu->cu_cwnd_wait);
541 }
542 goto out;
543 }
544
545get_reply:
546 for (;;) {
547 /* Decide how long to wait. */
548 if (next_sendtime < timeout)
549 tv = next_sendtime;
550 else
551 tv = timeout;
552 tv -= time_waited;
553
554 if (tv > 0) {
555 if (cu->cu_closing || cu->cu_closed) {
556 error = 0;
557 cr->cr_error = ESHUTDOWN;
558 } else {
559 error = msleep(cr, &cs->cs_lock,
560 cu->cu_waitflag, cu->cu_waitchan, tv);
561 }
562 } else {
563 error = EWOULDBLOCK;
564 }
565
566 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
567 cu->cu_sent -= CWNDSCALE;
568 if (cu->cu_cwnd_wait) {
569 cu->cu_cwnd_wait = FALSE;
570 wakeup(&cu->cu_cwnd_wait);
571 }
572
573 if (!error) {
574 /*
575 * We were woken up by the upcall. If the
576 * upcall had a receive error, report that,
577 * otherwise we have a reply.
578 */
579 if (cr->cr_error) {
580 errp->re_errno = cr->cr_error;
581 errp->re_status = stat = RPC_CANTRECV;
582 goto out;
583 }
584
585 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
586 + cu->cu_cwnd / 2) / cu->cu_cwnd;
587 if (cu->cu_cwnd > MAXCWND)
588 cu->cu_cwnd = MAXCWND;
589
590 if (rt) {
591 /*
592 * Add one to the time since a tick
593 * count of N means that the actual
594 * time taken was somewhere between N
595 * and N+1.
596 */
597 rtt = ticks - starttime + 1;
598
599 /*
600 * Update our estimate of the round
601 * trip time using roughly the
602 * algorithm described in RFC
603 * 2988. Given an RTT sample R:
604 *
605 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
606 * SRTT = (1-alpha) * SRTT + alpha * R
607 *
608 * where alpha = 0.125 and beta = 0.25.
609 *
610 * The initial retransmit timeout is
611 * SRTT + 4*RTTVAR and doubles on each
612 * retransmision.
613 */
614 if (rt->rt_srtt == 0) {
615 rt->rt_srtt = rtt;
616 rt->rt_deviate = rtt / 2;
617 } else {
618 int32_t error = rtt - rt->rt_srtt;
619 rt->rt_srtt += error / 8;
620 error = abs(error) - rt->rt_deviate;
621 rt->rt_deviate += error / 4;
622 }
623 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
624 }
625
626 break;
627 }
628
629 /*
630 * The sleep returned an error so our request is still
631 * on the list. If we got EWOULDBLOCK, we may want to
632 * re-send the request.
633 */
634 if (error != EWOULDBLOCK) {
635 errp->re_errno = error;
636 if (error == EINTR)
637 errp->re_status = stat = RPC_INTR;
638 else
639 errp->re_status = stat = RPC_CANTRECV;
640 goto out;
641 }
642
643 time_waited = ticks - starttime;
644
645 /* Check for timeout. */
646 if (time_waited > timeout) {
647 errp->re_errno = EWOULDBLOCK;
648 errp->re_status = stat = RPC_TIMEDOUT;
649 goto out;
650 }
651
652 /* Retransmit if necessary. */
653 if (time_waited >= next_sendtime) {
654 cu->cu_cwnd /= 2;
655 if (cu->cu_cwnd < CWNDSCALE)
656 cu->cu_cwnd = CWNDSCALE;
657 if (ext && ext->rc_feedback) {
658 mtx_unlock(&cs->cs_lock);
659 if (retrans == 0)
660 ext->rc_feedback(FEEDBACK_REXMIT1,
661 proc, ext->rc_feedback_arg);
662 else
663 ext->rc_feedback(FEEDBACK_REXMIT2,
664 proc, ext->rc_feedback_arg);
665 mtx_lock(&cs->cs_lock);
666 }
667 if (cu->cu_closing || cu->cu_closed) {
668 errp->re_errno = ESHUTDOWN;
669 errp->re_status = stat = RPC_CANTRECV;
670 goto out;
671 }
672 retrans++;
673 /* update retransmit_time */
674 if (retransmit_time < RPC_MAX_BACKOFF * hz)
675 retransmit_time = 2 * retransmit_time;
676 next_sendtime += retransmit_time;
677 goto send_again;
678 }
679 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
680 }
681
682got_reply:
683 /*
684 * Now decode and validate the response. We need to drop the
685 * lock since xdr_replymsg may end up sleeping in malloc.
686 */
687 mtx_unlock(&cs->cs_lock);
688
689 if (ext && ext->rc_feedback)
690 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
691
692 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
693 ok = xdr_replymsg(&xdrs, &reply_msg);
694 cr->cr_mrep = NULL;
695
696 if (ok) {
697 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
698 (reply_msg.acpted_rply.ar_stat == SUCCESS))
699 errp->re_status = stat = RPC_SUCCESS;
700 else
701 stat = _seterr_reply(&reply_msg, &(cu->cu_error));
702
703 if (errp->re_status == RPC_SUCCESS) {
704 results = xdrmbuf_getall(&xdrs);
705 if (! AUTH_VALIDATE(auth, xid,
706 &reply_msg.acpted_rply.ar_verf,
707 &results)) {
708 errp->re_status = stat = RPC_AUTHERROR;
709 errp->re_why = AUTH_INVALIDRESP;
710 if (retrans &&
711 auth->ah_cred.oa_flavor == RPCSEC_GSS) {
712 /*
713 * If we retransmitted, its
714 * possible that we will
715 * receive a reply for one of
716 * the earlier transmissions
717 * (which will use an older
718 * RPCSEC_GSS sequence
719 * number). In this case, just
720 * go back and listen for a
721 * new reply. We could keep a
722 * record of all the seq
723 * numbers we have transmitted
724 * so far so that we could
725 * accept a reply for any of
726 * them here.
727 */
728 XDR_DESTROY(&xdrs);
729 mtx_lock(&cs->cs_lock);
730 TAILQ_INSERT_TAIL(&cs->cs_pending,
731 cr, cr_link);
732 cr->cr_mrep = NULL;
733 goto get_reply;
734 }
735 } else {
736 *resultsp = results;
737 }
738 } /* end successful completion */
739 /*
740 * If unsuccesful AND error is an authentication error
741 * then refresh credentials and try again, else break
742 */
743 else if (stat == RPC_AUTHERROR)
744 /* maybe our credentials need to be refreshed ... */
745 if (nrefreshes > 0 &&
746 AUTH_REFRESH(auth, &reply_msg)) {
747 nrefreshes--;
748 XDR_DESTROY(&xdrs);
749 mtx_lock(&cs->cs_lock);
750 goto call_again;
751 }
752 /* end of unsuccessful completion */
753 } /* end of valid reply message */
754 else {
755 errp->re_status = stat = RPC_CANTDECODERES;
756
757 }
758 XDR_DESTROY(&xdrs);
759 mtx_lock(&cs->cs_lock);
760out:
761 mtx_assert(&cs->cs_lock, MA_OWNED);
762
763 if (mreq)
764 m_freem(mreq);
765 if (cr->cr_mrep)
766 m_freem(cr->cr_mrep);
767
768 cu->cu_threads--;
769 if (cu->cu_closing)
770 wakeup(cu);
771
772 mtx_unlock(&cs->cs_lock);
773
774 if (auth && stat != RPC_SUCCESS)
775 AUTH_VALIDATE(auth, xid, NULL, NULL);
776
777 free(cr, M_RPC);
778
779 return (stat);
780}
781
782static void
783clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
784{
785 struct cu_data *cu = (struct cu_data *)cl->cl_private;
786
787 *errp = cu->cu_error;
788}
789
790static bool_t
791clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
792{
793 XDR xdrs;
794 bool_t dummy;
795
796 xdrs.x_op = XDR_FREE;
797 dummy = (*xdr_res)(&xdrs, res_ptr);
798
799 return (dummy);
800}
801
802/*ARGSUSED*/
803static void
804clnt_dg_abort(CLIENT *h)
805{
806}
807
808static bool_t
809clnt_dg_control(CLIENT *cl, u_int request, void *info)
810{
811 struct cu_data *cu = (struct cu_data *)cl->cl_private;
812 struct cu_socket *cs;
813 struct sockaddr *addr;
814
815 cs = cu->cu_socket->so_rcv.sb_upcallarg;
816 mtx_lock(&cs->cs_lock);
817
818 switch (request) {
819 case CLSET_FD_CLOSE:
820 cu->cu_closeit = TRUE;
821 mtx_unlock(&cs->cs_lock);
822 return (TRUE);
823 case CLSET_FD_NCLOSE:
824 cu->cu_closeit = FALSE;
825 mtx_unlock(&cs->cs_lock);
826 return (TRUE);
827 }
828
829 /* for other requests which use info */
830 if (info == NULL) {
831 mtx_unlock(&cs->cs_lock);
832 return (FALSE);
833 }
834 switch (request) {
835 case CLSET_TIMEOUT:
836 if (time_not_ok((struct timeval *)info)) {
837 mtx_unlock(&cs->cs_lock);
838 return (FALSE);
839 }
840 cu->cu_total = *(struct timeval *)info;
841 break;
842 case CLGET_TIMEOUT:
843 *(struct timeval *)info = cu->cu_total;
844 break;
845 case CLSET_RETRY_TIMEOUT:
846 if (time_not_ok((struct timeval *)info)) {
847 mtx_unlock(&cs->cs_lock);
848 return (FALSE);
849 }
850 cu->cu_wait = *(struct timeval *)info;
851 break;
852 case CLGET_RETRY_TIMEOUT:
853 *(struct timeval *)info = cu->cu_wait;
854 break;
855 case CLGET_SVC_ADDR:
856 /*
857 * Slightly different semantics to userland - we use
858 * sockaddr instead of netbuf.
859 */
860 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
861 break;
862 case CLSET_SVC_ADDR: /* set to new address */
863 addr = (struct sockaddr *)info;
864 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
865 break;
866 case CLGET_XID:
867 *(uint32_t *)info = cu->cu_xid;
868 break;
869
870 case CLSET_XID:
871 /* This will set the xid of the NEXT call */
872 /* decrement by 1 as clnt_dg_call() increments once */
873 cu->cu_xid = *(uint32_t *)info - 1;
874 break;
875
876 case CLGET_VERS:
877 /*
878 * This RELIES on the information that, in the call body,
879 * the version number field is the fifth field from the
880 * begining of the RPC header. MUST be changed if the
881 * call_struct is changed
882 */
883 *(uint32_t *)info =
884 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
885 4 * BYTES_PER_XDR_UNIT));
886 break;
887
888 case CLSET_VERS:
889 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
890 = htonl(*(uint32_t *)info);
891 break;
892
893 case CLGET_PROG:
894 /*
895 * This RELIES on the information that, in the call body,
896 * the program number field is the fourth field from the
897 * begining of the RPC header. MUST be changed if the
898 * call_struct is changed
899 */
900 *(uint32_t *)info =
901 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
902 3 * BYTES_PER_XDR_UNIT));
903 break;
904
905 case CLSET_PROG:
906 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
907 = htonl(*(uint32_t *)info);
908 break;
909 case CLSET_ASYNC:
910 cu->cu_async = *(int *)info;
911 break;
912 case CLSET_CONNECT:
913 cu->cu_connect = *(int *)info;
914 break;
915 case CLSET_WAITCHAN:
916 cu->cu_waitchan = (const char *)info;
917 break;
918 case CLGET_WAITCHAN:
919 *(const char **) info = cu->cu_waitchan;
920 break;
921 case CLSET_INTERRUPTIBLE:
922 if (*(int *) info)
923 cu->cu_waitflag = PCATCH;
924 else
925 cu->cu_waitflag = 0;
926 break;
927 case CLGET_INTERRUPTIBLE:
928 if (cu->cu_waitflag)
929 *(int *) info = TRUE;
930 else
931 *(int *) info = FALSE;
932 break;
933 default:
934 mtx_unlock(&cs->cs_lock);
935 return (FALSE);
936 }
937 mtx_unlock(&cs->cs_lock);
938 return (TRUE);
939}
940
941static void
942clnt_dg_close(CLIENT *cl)
943{
944 struct cu_data *cu = (struct cu_data *)cl->cl_private;
945 struct cu_socket *cs;
946 struct cu_request *cr;
947
948 cs = cu->cu_socket->so_rcv.sb_upcallarg;
949 mtx_lock(&cs->cs_lock);
950
951 if (cu->cu_closed) {
952 mtx_unlock(&cs->cs_lock);
953 return;
954 }
955
956 if (cu->cu_closing) {
957 while (cu->cu_closing)
958 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
959 KASSERT(cu->cu_closed, ("client should be closed"));
960 mtx_unlock(&cs->cs_lock);
961 return;
962 }
963
964 /*
965 * Abort any pending requests and wait until everyone
966 * has finished with clnt_vc_call.
967 */
968 cu->cu_closing = TRUE;
969 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
970 if (cr->cr_client == cl) {
971 cr->cr_xid = 0;
972 cr->cr_error = ESHUTDOWN;
973 wakeup(cr);
974 }
975 }
976
977 while (cu->cu_threads)
978 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
979
980 cu->cu_closing = FALSE;
981 cu->cu_closed = TRUE;
982
983 mtx_unlock(&cs->cs_lock);
984 wakeup(cu);
985}
986
987static void
988clnt_dg_destroy(CLIENT *cl)
989{
990 struct cu_data *cu = (struct cu_data *)cl->cl_private;
991 struct cu_socket *cs;
992 struct socket *so = NULL;
993 bool_t lastsocketref;
994
995 cs = cu->cu_socket->so_rcv.sb_upcallarg;
996 clnt_dg_close(cl);
997
998 mtx_lock(&cs->cs_lock);
999
1000 cs->cs_refs--;
1001 if (cs->cs_refs == 0) {
1002 mtx_unlock(&cs->cs_lock);
1003 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
1004 soupcall_clear(cu->cu_socket, SO_RCV);
1005 clnt_dg_upcallsdone(cu->cu_socket, cs);
1006 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
1007 mtx_destroy(&cs->cs_lock);
1008 mem_free(cs, sizeof(*cs));
1009 lastsocketref = TRUE;
1010 } else {
1011 mtx_unlock(&cs->cs_lock);
1012 lastsocketref = FALSE;
1013 }
1014
1015 if (cu->cu_closeit && lastsocketref) {
1016 so = cu->cu_socket;
1017 cu->cu_socket = NULL;
1018 }
1019
1020 if (so)
1021 soclose(so);
1022
1023 if (cl->cl_netid && cl->cl_netid[0])
1024 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1025 if (cl->cl_tp && cl->cl_tp[0])
1026 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1027 mem_free(cu, sizeof (*cu));
1028 mem_free(cl, sizeof (CLIENT));
1029}
1030
1031/*
1032 * Make sure that the time is not garbage. -1 value is allowed.
1033 */
1034static bool_t
1035time_not_ok(struct timeval *t)
1036{
1037 return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1038 t->tv_usec < -1 || t->tv_usec > 1000000);
1039}
1040
1041int
1042clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1043{
1044 struct cu_socket *cs = (struct cu_socket *) arg;
1045 struct uio uio;
1046 struct mbuf *m;
1047 struct mbuf *control;
1048 struct cu_request *cr;
1049 int error, rcvflag, foundreq;
1050 uint32_t xid;
1051
1052 cs->cs_upcallrefs++;
1053 uio.uio_resid = 1000000000;
1054 uio.uio_td = curthread;
1055 do {
1056 SOCKBUF_UNLOCK(&so->so_rcv);
1057 m = NULL;
1058 control = NULL;
1059 rcvflag = MSG_DONTWAIT;
1060 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1061 if (control)
1062 m_freem(control);
1063 SOCKBUF_LOCK(&so->so_rcv);
1064
1065 if (error == EWOULDBLOCK)
1066 break;
1067
1068 /*
1069 * If there was an error, wake up all pending
1070 * requests.
1071 */
1072 if (error) {
1073 mtx_lock(&cs->cs_lock);
1074 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1075 cr->cr_xid = 0;
1076 cr->cr_error = error;
1077 wakeup(cr);
1078 }
1079 mtx_unlock(&cs->cs_lock);
1080 break;
1081 }
1082
1083 /*
1084 * The XID is in the first uint32_t of the reply.
1085 */
1086 if (m->m_len < sizeof(xid))
1087 m = m_pullup(m, sizeof(xid));
1088 if (!m)
1089 /*
1090 * Should never happen.
1091 */
1092 continue;
1093
1094 xid = ntohl(*mtod(m, uint32_t *));
1095
1096 /*
1097 * Attempt to match this reply with a pending request.
1098 */
1099 mtx_lock(&cs->cs_lock);
1100 foundreq = 0;
1101 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1102 if (cr->cr_xid == xid) {
1103 /*
1104 * This one matches. We leave the
1105 * reply mbuf in cr->cr_mrep. Set the
1106 * XID to zero so that we will ignore
1107 * any duplicated replies that arrive
1108 * before clnt_dg_call removes it from
1109 * the queue.
1110 */
1111 cr->cr_xid = 0;
1112 cr->cr_mrep = m;
1113 cr->cr_error = 0;
1114 foundreq = 1;
1115 wakeup(cr);
1116 break;
1117 }
1118 }
1119 mtx_unlock(&cs->cs_lock);
1120
1121 /*
1122 * If we didn't find the matching request, just drop
1123 * it - its probably a repeated reply.
1124 */
1125 if (!foundreq)
1126 m_freem(m);
1127 } while (m);
1128 cs->cs_upcallrefs--;
1129 if (cs->cs_upcallrefs < 0)
1130 panic("rpcdg upcall refcnt");
1131 if (cs->cs_upcallrefs == 0)
1132 wakeup(&cs->cs_upcallrefs);
1133 return (SU_OK);
1134}
1135
1136/*
1137 * Wait for all upcalls in progress to complete.
1138 */
1139static void
1140clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
1141{
1142
1143 SOCKBUF_LOCK_ASSERT(&so->so_rcv);
1144
1145 while (cs->cs_upcallrefs > 0)
1146 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
1147 "rpcdgup", 0);
1148}