clnt_dg.c (177685) | clnt_dg.c (180025) |
---|---|
1/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3/* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or --- 23 unchanged lines hidden (view full) --- 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35#if defined(LIBC_SCCS) && !defined(lint) 36#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38#endif 39#include <sys/cdefs.h> | 1/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3/* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or --- 23 unchanged lines hidden (view full) --- 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35#if defined(LIBC_SCCS) && !defined(lint) 36#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38#endif 39#include <sys/cdefs.h> |
40__FBSDID("$FreeBSD: head/sys/rpc/clnt_dg.c 177685 2008-03-28 09:50:32Z dfr $"); | 40__FBSDID("$FreeBSD: head/sys/rpc/clnt_dg.c 180025 2008-06-26 10:21:54Z dfr $"); |
41 42/* 43 * Implements a connectionless client side RPC. 44 */ 45 46#include <sys/param.h> 47#include <sys/systm.h> | 41 42/* 43 * Implements a connectionless client side RPC. 44 */ 45 46#include <sys/param.h> 47#include <sys/systm.h> |
48#include <sys/kernel.h> |
|
48#include <sys/lock.h> 49#include <sys/malloc.h> 50#include <sys/mbuf.h> 51#include <sys/mutex.h> 52#include <sys/pcpu.h> 53#include <sys/proc.h> 54#include <sys/socket.h> 55#include <sys/socketvar.h> --- 9 unchanged lines hidden (view full) --- 65 * Disable RPC exponential back-off for FreeBSD.org systems. 66 */ 67#define RPC_MAX_BACKOFF 1 /* second */ 68#else 69#define RPC_MAX_BACKOFF 30 /* seconds */ 70#endif 71 72static bool_t time_not_ok(struct timeval *); | 49#include <sys/lock.h> 50#include <sys/malloc.h> 51#include <sys/mbuf.h> 52#include <sys/mutex.h> 53#include <sys/pcpu.h> 54#include <sys/proc.h> 55#include <sys/socket.h> 56#include <sys/socketvar.h> --- 9 unchanged lines hidden (view full) --- 66 * Disable RPC exponential back-off for FreeBSD.org systems. 67 */ 68#define RPC_MAX_BACKOFF 1 /* second */ 69#else 70#define RPC_MAX_BACKOFF 30 /* seconds */ 71#endif 72 73static bool_t time_not_ok(struct timeval *); |
73static enum clnt_stat clnt_dg_call(CLIENT *, rpcproc_t, xdrproc_t, void *, 74 xdrproc_t, void *, struct timeval); | 74static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 75 rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval); |
75static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 76static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 77static void clnt_dg_abort(CLIENT *); 78static bool_t clnt_dg_control(CLIENT *, u_int, void *); 79static void clnt_dg_destroy(CLIENT *); 80static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 81 82static struct clnt_ops clnt_dg_ops = { 83 .cl_call = clnt_dg_call, 84 .cl_abort = clnt_dg_abort, 85 .cl_geterr = clnt_dg_geterr, 86 .cl_freeres = clnt_dg_freeres, 87 .cl_destroy = clnt_dg_destroy, 88 .cl_control = clnt_dg_control 89}; 90 91static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 92 93/* | 76static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 77static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 78static void clnt_dg_abort(CLIENT *); 79static bool_t clnt_dg_control(CLIENT *, u_int, void *); 80static void clnt_dg_destroy(CLIENT *); 81static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 82 83static struct clnt_ops clnt_dg_ops = { 84 .cl_call = clnt_dg_call, 85 .cl_abort = clnt_dg_abort, 86 .cl_geterr = clnt_dg_geterr, 87 .cl_freeres = clnt_dg_freeres, 88 .cl_destroy = clnt_dg_destroy, 89 .cl_control = clnt_dg_control 90}; 91 92static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory"; 93 94/* |
94 * A pending RPC request which awaits a reply. | 95 * A pending RPC request which awaits a reply. Requests which have 96 * received their reply will have cr_xid set to zero and cr_mrep to 97 * the mbuf chain of the reply. |
95 */ 96struct cu_request { 97 TAILQ_ENTRY(cu_request) cr_link; | 98 */ 99struct cu_request { 100 TAILQ_ENTRY(cu_request) cr_link; |
101 CLIENT *cr_client; /* owner */ |
|
98 uint32_t cr_xid; /* XID of request */ 99 struct mbuf *cr_mrep; /* reply received by upcall */ 100 int cr_error; /* any error from upcall */ 101}; 102 103TAILQ_HEAD(cu_request_list, cu_request); 104 105#define MCALL_MSG_SIZE 24 --- 12 unchanged lines hidden (view full) --- 118 struct cu_request_list cs_pending; /* Requests awaiting replies */ 119 120}; 121 122/* 123 * Private data kept per client handle 124 */ 125struct cu_data { | 102 uint32_t cr_xid; /* XID of request */ 103 struct mbuf *cr_mrep; /* reply received by upcall */ 104 int cr_error; /* any error from upcall */ 105}; 106 107TAILQ_HEAD(cu_request_list, cu_request); 108 109#define MCALL_MSG_SIZE 24 --- 12 unchanged lines hidden (view full) --- 122 struct cu_request_list cs_pending; /* Requests awaiting replies */ 123 124}; 125 126/* 127 * Private data kept per client handle 128 */ 129struct cu_data { |
130 int cu_threads; /* # threads in clnt_vc_call */ 131 bool_t cu_closing; /* TRUE if we are destroying */ |
|
126 struct socket *cu_socket; /* connection socket */ 127 bool_t cu_closeit; /* opened by library */ 128 struct sockaddr_storage cu_raddr; /* remote address */ 129 int cu_rlen; 130 struct timeval cu_wait; /* retransmit interval */ 131 struct timeval cu_total; /* total time for the call */ 132 struct rpc_err cu_error; 133 uint32_t cu_xid; --- 64 unchanged lines hidden (view full) --- 198 cl = mem_alloc(sizeof (CLIENT)); 199 200 /* 201 * Should be multiple of 4 for XDR. 202 */ 203 sendsz = ((sendsz + 3) / 4) * 4; 204 recvsz = ((recvsz + 3) / 4) * 4; 205 cu = mem_alloc(sizeof (*cu)); | 132 struct socket *cu_socket; /* connection socket */ 133 bool_t cu_closeit; /* opened by library */ 134 struct sockaddr_storage cu_raddr; /* remote address */ 135 int cu_rlen; 136 struct timeval cu_wait; /* retransmit interval */ 137 struct timeval cu_total; /* total time for the call */ 138 struct rpc_err cu_error; 139 uint32_t cu_xid; --- 64 unchanged lines hidden (view full) --- 204 cl = mem_alloc(sizeof (CLIENT)); 205 206 /* 207 * Should be multiple of 4 for XDR. 208 */ 209 sendsz = ((sendsz + 3) / 4) * 4; 210 recvsz = ((recvsz + 3) / 4) * 4; 211 cu = mem_alloc(sizeof (*cu)); |
212 cu->cu_threads = 0; 213 cu->cu_closing = FALSE; |
|
206 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 207 cu->cu_rlen = svcaddr->sa_len; 208 /* Other values can also be set through clnt_control() */ | 214 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 215 cu->cu_rlen = svcaddr->sa_len; 216 /* Other values can also be set through clnt_control() */ |
209 cu->cu_wait.tv_sec = 15; /* heuristically chosen */ | 217 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ |
210 cu->cu_wait.tv_usec = 0; 211 cu->cu_total.tv_sec = -1; 212 cu->cu_total.tv_usec = -1; 213 cu->cu_sendsz = sendsz; 214 cu->cu_recvsz = recvsz; 215 cu->cu_async = FALSE; 216 cu->cu_connect = FALSE; 217 cu->cu_connected = FALSE; --- 14 unchanged lines hidden (view full) --- 232 233 /* 234 * By default, closeit is always FALSE. It is users responsibility 235 * to do a close on it, else the user may use clnt_control 236 * to let clnt_destroy do it for him/her. 237 */ 238 cu->cu_closeit = FALSE; 239 cu->cu_socket = so; | 218 cu->cu_wait.tv_usec = 0; 219 cu->cu_total.tv_sec = -1; 220 cu->cu_total.tv_usec = -1; 221 cu->cu_sendsz = sendsz; 222 cu->cu_recvsz = recvsz; 223 cu->cu_async = FALSE; 224 cu->cu_connect = FALSE; 225 cu->cu_connected = FALSE; --- 14 unchanged lines hidden (view full) --- 240 241 /* 242 * By default, closeit is always FALSE. It is users responsibility 243 * to do a close on it, else the user may use clnt_control 244 * to let clnt_destroy do it for him/her. 245 */ 246 cu->cu_closeit = FALSE; 247 cu->cu_socket = so; |
248 soreserve(so, 256*1024, 256*1024); |
|
240 241 SOCKBUF_LOCK(&so->so_rcv); 242recheck_socket: 243 if (so->so_upcall) { 244 if (so->so_upcall != clnt_dg_soupcall) { 245 SOCKBUF_UNLOCK(&so->so_rcv); 246 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 247 goto err2; --- 21 unchanged lines hidden (view full) --- 269 cs->cs_refs = 1; 270 TAILQ_INIT(&cs->cs_pending); 271 so->so_upcallarg = cs; 272 so->so_upcall = clnt_dg_soupcall; 273 so->so_rcv.sb_flags |= SB_UPCALL; 274 } 275 SOCKBUF_UNLOCK(&so->so_rcv); 276 | 249 250 SOCKBUF_LOCK(&so->so_rcv); 251recheck_socket: 252 if (so->so_upcall) { 253 if (so->so_upcall != clnt_dg_soupcall) { 254 SOCKBUF_UNLOCK(&so->so_rcv); 255 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 256 goto err2; --- 21 unchanged lines hidden (view full) --- 278 cs->cs_refs = 1; 279 TAILQ_INIT(&cs->cs_pending); 280 so->so_upcallarg = cs; 281 so->so_upcall = clnt_dg_soupcall; 282 so->so_rcv.sb_flags |= SB_UPCALL; 283 } 284 SOCKBUF_UNLOCK(&so->so_rcv); 285 |
286 cl->cl_refs = 1; |
|
277 cl->cl_ops = &clnt_dg_ops; 278 cl->cl_private = (caddr_t)(void *)cu; 279 cl->cl_auth = authnone_create(); 280 cl->cl_tp = NULL; 281 cl->cl_netid = NULL; 282 return (cl); 283err2: 284 if (cl) { 285 mem_free(cl, sizeof (CLIENT)); 286 if (cu) 287 mem_free(cu, sizeof (*cu)); 288 } 289 return (NULL); 290} 291 292static enum clnt_stat 293clnt_dg_call( | 287 cl->cl_ops = &clnt_dg_ops; 288 cl->cl_private = (caddr_t)(void *)cu; 289 cl->cl_auth = authnone_create(); 290 cl->cl_tp = NULL; 291 cl->cl_netid = NULL; 292 return (cl); 293err2: 294 if (cl) { 295 mem_free(cl, sizeof (CLIENT)); 296 if (cu) 297 mem_free(cu, sizeof (*cu)); 298 } 299 return (NULL); 300} 301 302static enum clnt_stat 303clnt_dg_call( |
294 CLIENT *cl, /* client handle */ | 304 CLIENT *cl, /* client handle */ 305 struct rpc_callextra *ext, /* call metadata */ |
295 rpcproc_t proc, /* procedure number */ 296 xdrproc_t xargs, /* xdr routine for args */ 297 void *argsp, /* pointer to args */ 298 xdrproc_t xresults, /* xdr routine for results */ 299 void *resultsp, /* pointer to results */ 300 struct timeval utimeout) /* seconds to wait before giving up */ 301{ 302 struct cu_data *cu = (struct cu_data *)cl->cl_private; 303 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; | 306 rpcproc_t proc, /* procedure number */ 307 xdrproc_t xargs, /* xdr routine for args */ 308 void *argsp, /* pointer to args */ 309 xdrproc_t xresults, /* xdr routine for results */ 310 void *resultsp, /* pointer to results */ 311 struct timeval utimeout) /* seconds to wait before giving up */ 312{ 313 struct cu_data *cu = (struct cu_data *)cl->cl_private; 314 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; |
315 AUTH *auth; |
|
304 XDR xdrs; 305 struct rpc_msg reply_msg; 306 bool_t ok; | 316 XDR xdrs; 317 struct rpc_msg reply_msg; 318 bool_t ok; |
319 int retrans; /* number of re-transmits so far */ |
|
307 int nrefreshes = 2; /* number of times to refresh cred */ | 320 int nrefreshes = 2; /* number of times to refresh cred */ |
308 struct timeval timeout; 309 struct timeval retransmit_time; 310 struct timeval next_sendtime, starttime, time_waited, tv; | 321 struct timeval *tvp; 322 int timeout; 323 int retransmit_time; 324 int next_sendtime, starttime, time_waited, tv; |
311 struct sockaddr *sa; 312 socklen_t salen; 313 uint32_t xid; 314 struct mbuf *mreq = NULL; | 325 struct sockaddr *sa; 326 socklen_t salen; 327 uint32_t xid; 328 struct mbuf *mreq = NULL; |
315 struct cu_request cr; | 329 struct cu_request *cr; |
316 int error; 317 | 330 int error; 331 |
332 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 333 |
|
318 mtx_lock(&cs->cs_lock); 319 | 334 mtx_lock(&cs->cs_lock); 335 |
320 cr.cr_mrep = NULL; 321 cr.cr_error = 0; | 336 if (cu->cu_closing) { 337 mtx_unlock(&cs->cs_lock); 338 free(cr, M_RPC); 339 return (RPC_CANTSEND); 340 } 341 cu->cu_threads++; |
322 | 342 |
343 if (ext) 344 auth = ext->rc_auth; 345 else 346 auth = cl->cl_auth; 347 348 cr->cr_client = cl; 349 cr->cr_mrep = NULL; 350 cr->cr_error = 0; 351 |
|
323 if (cu->cu_total.tv_usec == -1) { | 352 if (cu->cu_total.tv_usec == -1) { |
324 timeout = utimeout; /* use supplied timeout */ | 353 tvp = &utimeout; /* use supplied timeout */ |
325 } else { | 354 } else { |
326 timeout = cu->cu_total; /* use default timeout */ | 355 tvp = &cu->cu_total; /* use default timeout */ |
327 } | 356 } |
357 if (tvp->tv_sec || tvp->tv_usec) 358 timeout = tvtohz(tvp); 359 else 360 timeout = 0; |
|
328 329 if (cu->cu_connect && !cu->cu_connected) { 330 mtx_unlock(&cs->cs_lock); 331 error = soconnect(cu->cu_socket, 332 (struct sockaddr *)&cu->cu_raddr, curthread); 333 mtx_lock(&cs->cs_lock); 334 if (error) { 335 cu->cu_error.re_errno = error; --- 4 unchanged lines hidden (view full) --- 340 } 341 if (cu->cu_connected) { 342 sa = NULL; 343 salen = 0; 344 } else { 345 sa = (struct sockaddr *)&cu->cu_raddr; 346 salen = cu->cu_rlen; 347 } | 361 362 if (cu->cu_connect && !cu->cu_connected) { 363 mtx_unlock(&cs->cs_lock); 364 error = soconnect(cu->cu_socket, 365 (struct sockaddr *)&cu->cu_raddr, curthread); 366 mtx_lock(&cs->cs_lock); 367 if (error) { 368 cu->cu_error.re_errno = error; --- 4 unchanged lines hidden (view full) --- 373 } 374 if (cu->cu_connected) { 375 sa = NULL; 376 salen = 0; 377 } else { 378 sa = (struct sockaddr *)&cu->cu_raddr; 379 salen = cu->cu_rlen; 380 } |
348 time_waited.tv_sec = 0; 349 time_waited.tv_usec = 0; 350 retransmit_time = next_sendtime = cu->cu_wait; | 381 time_waited = 0; 382 retrans = 0; 383 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); |
351 | 384 |
352 getmicrotime(&starttime); | 385 starttime = ticks; |
353 354call_again: 355 mtx_assert(&cs->cs_lock, MA_OWNED); 356 357 cu->cu_xid++; 358 xid = cu->cu_xid; 359 360send_again: --- 10 unchanged lines hidden (view full) --- 371 *mtod(mreq, uint32_t *) = htonl(xid); 372 373 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 374 375 if (cu->cu_async == TRUE && xargs == NULL) 376 goto get_reply; 377 378 if ((! XDR_PUTINT32(&xdrs, &proc)) || | 386 387call_again: 388 mtx_assert(&cs->cs_lock, MA_OWNED); 389 390 cu->cu_xid++; 391 xid = cu->cu_xid; 392 393send_again: --- 10 unchanged lines hidden (view full) --- 404 *mtod(mreq, uint32_t *) = htonl(xid); 405 406 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 407 408 if (cu->cu_async == TRUE && xargs == NULL) 409 goto get_reply; 410 411 if ((! XDR_PUTINT32(&xdrs, &proc)) || |
379 (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) || | 412 (! AUTH_MARSHALL(auth, &xdrs)) || |
380 (! (*xargs)(&xdrs, argsp))) { 381 cu->cu_error.re_status = RPC_CANTENCODEARGS; 382 mtx_lock(&cs->cs_lock); 383 goto out; 384 } 385 m_fixhdr(mreq); 386 | 413 (! (*xargs)(&xdrs, argsp))) { 414 cu->cu_error.re_status = RPC_CANTENCODEARGS; 415 mtx_lock(&cs->cs_lock); 416 goto out; 417 } 418 m_fixhdr(mreq); 419 |
387 cr.cr_xid = xid; | 420 cr->cr_xid = xid; |
388 mtx_lock(&cs->cs_lock); | 421 mtx_lock(&cs->cs_lock); |
389 TAILQ_INSERT_TAIL(&cs->cs_pending, &cr, cr_link); | 422 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); |
390 mtx_unlock(&cs->cs_lock); 391 392 /* 393 * sosend consumes mreq. 394 */ 395 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 396 mreq = NULL; 397 398 /* 399 * sub-optimal code appears here because we have 400 * some clock time to spare while the packets are in flight. 401 * (We assume that this is actually only executed once.) 402 */ 403 reply_msg.acpted_rply.ar_verf = _null_auth; 404 reply_msg.acpted_rply.ar_results.where = resultsp; 405 reply_msg.acpted_rply.ar_results.proc = xresults; 406 407 mtx_lock(&cs->cs_lock); 408 if (error) { | 423 mtx_unlock(&cs->cs_lock); 424 425 /* 426 * sosend consumes mreq. 427 */ 428 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 429 mreq = NULL; 430 431 /* 432 * sub-optimal code appears here because we have 433 * some clock time to spare while the packets are in flight. 434 * (We assume that this is actually only executed once.) 435 */ 436 reply_msg.acpted_rply.ar_verf = _null_auth; 437 reply_msg.acpted_rply.ar_results.where = resultsp; 438 reply_msg.acpted_rply.ar_results.proc = xresults; 439 440 mtx_lock(&cs->cs_lock); 441 if (error) { |
409 TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); 410 | 442 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); |
411 cu->cu_error.re_errno = error; 412 cu->cu_error.re_status = RPC_CANTSEND; 413 goto out; 414 } 415 416 /* 417 * Check to see if we got an upcall while waiting for the | 443 cu->cu_error.re_errno = error; 444 cu->cu_error.re_status = RPC_CANTSEND; 445 goto out; 446 } 447 448 /* 449 * Check to see if we got an upcall while waiting for the |
418 * lock. In both these cases, the request has been removed 419 * from cs->cs_pending. | 450 * lock. |
420 */ | 451 */ |
421 if (cr.cr_error) { 422 cu->cu_error.re_errno = cr.cr_error; | 452 if (cr->cr_error) { 453 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 454 cu->cu_error.re_errno = cr->cr_error; |
423 cu->cu_error.re_status = RPC_CANTRECV; 424 goto out; 425 } | 455 cu->cu_error.re_status = RPC_CANTRECV; 456 goto out; 457 } |
426 if (cr.cr_mrep) { | 458 if (cr->cr_mrep) { 459 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); |
427 goto got_reply; 428 } 429 430 /* 431 * Hack to provide rpc-based message passing 432 */ | 460 goto got_reply; 461 } 462 463 /* 464 * Hack to provide rpc-based message passing 465 */ |
433 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 434 if (cr.cr_xid) 435 TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); | 466 if (timeout == 0) { 467 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); |
436 cu->cu_error.re_status = RPC_TIMEDOUT; 437 goto out; 438 } 439 440get_reply: 441 for (;;) { 442 /* Decide how long to wait. */ | 468 cu->cu_error.re_status = RPC_TIMEDOUT; 469 goto out; 470 } 471 472get_reply: 473 for (;;) { 474 /* Decide how long to wait. */ |
443 if (timevalcmp(&next_sendtime, &timeout, <)) { | 475 if (next_sendtime < timeout) |
444 tv = next_sendtime; | 476 tv = next_sendtime; |
445 } else { | 477 else |
446 tv = timeout; | 478 tv = timeout; |
479 tv -= time_waited; 480 481 if (tv > 0) { 482 if (cu->cu_closing) 483 error = 0; 484 else 485 error = msleep(cr, &cs->cs_lock, 486 cu->cu_waitflag, cu->cu_waitchan, tv); 487 } else { 488 error = EWOULDBLOCK; |
|
447 } | 489 } |
448 timevalsub(&tv, &time_waited); 449 if (tv.tv_sec < 0 || tv.tv_usec < 0) 450 tv.tv_sec = tv.tv_usec = 0; | |
451 | 490 |
452 error = msleep(&cr, &cs->cs_lock, cu->cu_waitflag, 453 cu->cu_waitchan, tvtohz(&tv)); | 491 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); |
454 455 if (!error) { 456 /* 457 * We were woken up by the upcall. If the 458 * upcall had a receive error, report that, 459 * otherwise we have a reply. 460 */ | 492 493 if (!error) { 494 /* 495 * We were woken up by the upcall. If the 496 * upcall had a receive error, report that, 497 * otherwise we have a reply. 498 */ |
461 if (cr.cr_error) { 462 cu->cu_error.re_errno = cr.cr_error; | 499 if (cr->cr_error) { 500 cu->cu_error.re_errno = cr->cr_error; |
463 cu->cu_error.re_status = RPC_CANTRECV; 464 goto out; 465 } 466 break; 467 } 468 469 /* 470 * The sleep returned an error so our request is still 471 * on the list. If we got EWOULDBLOCK, we may want to 472 * re-send the request. 473 */ 474 if (error != EWOULDBLOCK) { | 501 cu->cu_error.re_status = RPC_CANTRECV; 502 goto out; 503 } 504 break; 505 } 506 507 /* 508 * The sleep returned an error so our request is still 509 * on the list. If we got EWOULDBLOCK, we may want to 510 * re-send the request. 511 */ 512 if (error != EWOULDBLOCK) { |
475 if (cr.cr_xid) 476 TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); | |
477 cu->cu_error.re_errno = error; 478 if (error == EINTR) 479 cu->cu_error.re_status = RPC_INTR; 480 else 481 cu->cu_error.re_status = RPC_CANTRECV; 482 goto out; 483 } 484 | 513 cu->cu_error.re_errno = error; 514 if (error == EINTR) 515 cu->cu_error.re_status = RPC_INTR; 516 else 517 cu->cu_error.re_status = RPC_CANTRECV; 518 goto out; 519 } 520 |
485 getmicrotime(&tv); 486 time_waited = tv; 487 timevalsub(&time_waited, &starttime); | 521 time_waited = ticks - starttime; |
488 489 /* Check for timeout. */ | 522 523 /* Check for timeout. */ |
490 if (timevalcmp(&time_waited, &timeout, >)) { 491 if (cr.cr_xid) 492 TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); | 524 if (time_waited > timeout) { |
493 cu->cu_error.re_errno = EWOULDBLOCK; 494 cu->cu_error.re_status = RPC_TIMEDOUT; 495 goto out; 496 } 497 498 /* Retransmit if necessary. */ | 525 cu->cu_error.re_errno = EWOULDBLOCK; 526 cu->cu_error.re_status = RPC_TIMEDOUT; 527 goto out; 528 } 529 530 /* Retransmit if necessary. */ |
499 if (timevalcmp(&time_waited, &next_sendtime, >)) { 500 if (cr.cr_xid) 501 TAILQ_REMOVE(&cs->cs_pending, &cr, cr_link); | 531 if (time_waited >= next_sendtime) { 532 if (ext && ext->rc_feedback) { 533 mtx_unlock(&cs->cs_lock); 534 if (retrans == 0) 535 ext->rc_feedback(FEEDBACK_REXMIT1, 536 proc, ext->rc_feedback_arg); 537 else 538 ext->rc_feedback(FEEDBACK_REXMIT2, 539 proc, ext->rc_feedback_arg); 540 mtx_lock(&cs->cs_lock); 541 } 542 if (cu->cu_closing) { 543 cu->cu_error.re_errno = ESHUTDOWN; 544 cu->cu_error.re_status = RPC_CANTRECV; 545 goto out; 546 } 547 retrans++; |
502 /* update retransmit_time */ | 548 /* update retransmit_time */ |
503 if (retransmit_time.tv_sec < RPC_MAX_BACKOFF) 504 timevaladd(&retransmit_time, &retransmit_time); 505 timevaladd(&next_sendtime, &retransmit_time); | 549 if (retransmit_time < RPC_MAX_BACKOFF * hz) 550 retransmit_time = 2 * retransmit_time; 551 next_sendtime += retransmit_time; |
506 goto send_again; 507 } | 552 goto send_again; 553 } |
554 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); |
|
508 } 509 510got_reply: 511 /* 512 * Now decode and validate the response. We need to drop the 513 * lock since xdr_replymsg may end up sleeping in malloc. 514 */ 515 mtx_unlock(&cs->cs_lock); 516 | 555 } 556 557got_reply: 558 /* 559 * Now decode and validate the response. We need to drop the 560 * lock since xdr_replymsg may end up sleeping in malloc. 561 */ 562 mtx_unlock(&cs->cs_lock); 563 |
517 xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE); | 564 if (ext && ext->rc_feedback) 565 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 566 567 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); |
518 ok = xdr_replymsg(&xdrs, &reply_msg); 519 XDR_DESTROY(&xdrs); | 568 ok = xdr_replymsg(&xdrs, &reply_msg); 569 XDR_DESTROY(&xdrs); |
520 cr.cr_mrep = NULL; | 570 cr->cr_mrep = NULL; |
521 522 mtx_lock(&cs->cs_lock); 523 524 if (ok) { 525 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 526 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 527 cu->cu_error.re_status = RPC_SUCCESS; 528 else --- 28 unchanged lines hidden (view full) --- 557 cu->cu_error.re_status = RPC_CANTDECODERES; 558 559 } 560out: 561 mtx_assert(&cs->cs_lock, MA_OWNED); 562 563 if (mreq) 564 m_freem(mreq); | 571 572 mtx_lock(&cs->cs_lock); 573 574 if (ok) { 575 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 576 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 577 cu->cu_error.re_status = RPC_SUCCESS; 578 else --- 28 unchanged lines hidden (view full) --- 607 cu->cu_error.re_status = RPC_CANTDECODERES; 608 609 } 610out: 611 mtx_assert(&cs->cs_lock, MA_OWNED); 612 613 if (mreq) 614 m_freem(mreq); |
565 if (cr.cr_mrep) 566 m_freem(cr.cr_mrep); | 615 if (cr->cr_mrep) 616 m_freem(cr->cr_mrep); |
567 | 617 |
618 cu->cu_threads--; 619 if (cu->cu_closing) 620 wakeup(cu); 621 |
|
568 mtx_unlock(&cs->cs_lock); | 622 mtx_unlock(&cs->cs_lock); |
623 624 free(cr, M_RPC); 625 |
|
569 return (cu->cu_error.re_status); 570} 571 572static void 573clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 574{ 575 struct cu_data *cu = (struct cu_data *)cl->cl_private; 576 --- 150 unchanged lines hidden (view full) --- 727 return (TRUE); 728} 729 730static void 731clnt_dg_destroy(CLIENT *cl) 732{ 733 struct cu_data *cu = (struct cu_data *)cl->cl_private; 734 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; | 626 return (cu->cu_error.re_status); 627} 628 629static void 630clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 631{ 632 struct cu_data *cu = (struct cu_data *)cl->cl_private; 633 --- 150 unchanged lines hidden (view full) --- 784 return (TRUE); 785} 786 787static void 788clnt_dg_destroy(CLIENT *cl) 789{ 790 struct cu_data *cu = (struct cu_data *)cl->cl_private; 791 struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg; |
792 struct cu_request *cr; |
|
735 struct socket *so = NULL; 736 bool_t lastsocketref; 737 | 793 struct socket *so = NULL; 794 bool_t lastsocketref; 795 |
738 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 739 | |
740 mtx_lock(&cs->cs_lock); | 796 mtx_lock(&cs->cs_lock); |
797 798 /* 799 * Abort any pending requests and wait until everyone 800 * has finished with clnt_vc_call. 801 */ 802 cu->cu_closing = TRUE; 803 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 804 if (cr->cr_client == cl) { 805 cr->cr_xid = 0; 806 cr->cr_error = ESHUTDOWN; 807 wakeup(cr); 808 } 809 } 810 811 while (cu->cu_threads) 812 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 813 |
|
741 cs->cs_refs--; 742 if (cs->cs_refs == 0) { | 814 cs->cs_refs--; 815 if (cs->cs_refs == 0) { |
816 mtx_destroy(&cs->cs_lock); 817 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); |
|
743 cu->cu_socket->so_upcallarg = NULL; 744 cu->cu_socket->so_upcall = NULL; 745 cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; | 818 cu->cu_socket->so_upcallarg = NULL; 819 cu->cu_socket->so_upcall = NULL; 820 cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL; |
746 mtx_destroy(&cs->cs_lock); | |
747 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 748 mem_free(cs, sizeof(*cs)); 749 lastsocketref = TRUE; 750 } else { 751 mtx_unlock(&cs->cs_lock); | 821 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 822 mem_free(cs, sizeof(*cs)); 823 lastsocketref = TRUE; 824 } else { 825 mtx_unlock(&cs->cs_lock); |
752 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); | |
753 lastsocketref = FALSE; 754 } 755 | 826 lastsocketref = FALSE; 827 } 828 |
756 if (cu->cu_closeit) { 757 KASSERT(lastsocketref, ("clnt_dg_destroy(): closing a socket " 758 "shared with other clients")); | 829 if (cu->cu_closeit && lastsocketref) { |
759 so = cu->cu_socket; 760 cu->cu_socket = NULL; 761 } 762 763 if (so) 764 soclose(so); 765 766 if (cl->cl_netid && cl->cl_netid[0]) --- 40 unchanged lines hidden (view full) --- 807 808 /* 809 * If there was an error, wake up all pending 810 * requests. 811 */ 812 if (error) { 813 mtx_lock(&cs->cs_lock); 814 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { | 830 so = cu->cu_socket; 831 cu->cu_socket = NULL; 832 } 833 834 if (so) 835 soclose(so); 836 837 if (cl->cl_netid && cl->cl_netid[0]) --- 40 unchanged lines hidden (view full) --- 878 879 /* 880 * If there was an error, wake up all pending 881 * requests. 882 */ 883 if (error) { 884 mtx_lock(&cs->cs_lock); 885 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { |
886 cr->cr_xid = 0; |
|
815 cr->cr_error = error; 816 wakeup(cr); 817 } | 887 cr->cr_error = error; 888 wakeup(cr); 889 } |
818 TAILQ_INIT(&cs->cs_pending); | |
819 mtx_unlock(&cs->cs_lock); 820 break; 821 } 822 823 /* 824 * The XID is in the first uint32_t of the reply. 825 */ 826 m = m_pullup(m, sizeof(xid)); 827 if (!m) | 890 mtx_unlock(&cs->cs_lock); 891 break; 892 } 893 894 /* 895 * The XID is in the first uint32_t of the reply. 896 */ 897 m = m_pullup(m, sizeof(xid)); 898 if (!m) |
828 break; | 899 /* 900 * Should never happen. 901 */ 902 continue; 903 |
829 xid = ntohl(*mtod(m, uint32_t *)); 830 831 /* 832 * Attempt to match this reply with a pending request. 833 */ 834 mtx_lock(&cs->cs_lock); 835 foundreq = 0; 836 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 837 if (cr->cr_xid == xid) { 838 /* | 904 xid = ntohl(*mtod(m, uint32_t *)); 905 906 /* 907 * Attempt to match this reply with a pending request. 908 */ 909 mtx_lock(&cs->cs_lock); 910 foundreq = 0; 911 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 912 if (cr->cr_xid == xid) { 913 /* |
839 * This one matches. We snip it out of 840 * the pending list and leave the | 914 * This one matches. We leave the |
841 * reply mbuf in cr->cr_mrep. Set the | 915 * reply mbuf in cr->cr_mrep. Set the |
842 * XID to zero so that clnt_dg_call 843 * can know not to repeat the 844 * TAILQ_REMOVE. | 916 * XID to zero so that we will ignore 917 * any duplicated replies that arrive 918 * before clnt_dg_call removes it from 919 * the queue. |
845 */ | 920 */ |
846 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); | |
847 cr->cr_xid = 0; 848 cr->cr_mrep = m; 849 cr->cr_error = 0; 850 foundreq = 1; 851 wakeup(cr); 852 break; 853 } 854 } 855 mtx_unlock(&cs->cs_lock); 856 857 /* 858 * If we didn't find the matching request, just drop 859 * it - its probably a repeated reply. 860 */ 861 if (!foundreq) 862 m_freem(m); 863 } while (m); 864} 865 | 921 cr->cr_xid = 0; 922 cr->cr_mrep = m; 923 cr->cr_error = 0; 924 foundreq = 1; 925 wakeup(cr); 926 break; 927 } 928 } 929 mtx_unlock(&cs->cs_lock); 930 931 /* 932 * If we didn't find the matching request, just drop 933 * it - its probably a repeated reply. 934 */ 935 if (!foundreq) 936 m_freem(m); 937 } while (m); 938} 939 |