clnt_vc.c revision 193272
122652Smpp/* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */ 222652Smpp 322652Smpp/* 422652Smpp * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 522652Smpp * unrestricted use provided that this legend is included on all tape 622652Smpp * media and as a part of the software program in whole or part. Users 722652Smpp * may copy or modify Sun RPC without charge, but are not authorized 822652Smpp * to license or distribute it to anyone else except as part of a product or 922652Smpp * program developed by the user. 1022652Smpp * 1122652Smpp * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 1222652Smpp * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 1322652Smpp * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 1422652Smpp * 1522652Smpp * Sun RPC is provided with no support and without any obligation on the 1622652Smpp * part of Sun Microsystems, Inc. to assist in its use, correction, 1722652Smpp * modification or enhancement. 1822652Smpp * 1922652Smpp * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 2022652Smpp * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 2122652Smpp * OR ANY PART THEREOF. 2222652Smpp * 2322652Smpp * In no event will Sun Microsystems, Inc. be liable for any lost revenue 2422652Smpp * or profits or other special, indirect and consequential damages, even if 2522652Smpp * Sun has been advised of the possibility of such damages. 2622652Smpp * 2722652Smpp * Sun Microsystems, Inc. 2855547Sphantom * 2550 Garcia Avenue 2950476Speter * Mountain View, California 94043 3048795Snik */ 31226119Sdes 3222652Smpp#if defined(LIBC_SCCS) && !defined(lint) 3322652Smppstatic char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro"; 3422652Smppstatic char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 3522652Smppstatic char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 3622652Smpp#endif 37226119Sdes#include <sys/cdefs.h> 38226119Sdes__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 193272 2009-06-01 21:17:03Z jhb $"); 39226119Sdes 4022652Smpp/* 4184306Sru * clnt_tcp.c, Implements a TCP/IP based, client side RPC. 4284306Sru * 4322652Smpp * Copyright (C) 1984, Sun Microsystems, Inc. 4424817Sbde * 4522652Smpp * TCP based RPC supports 'batched calls'. 4624817Sbde * A sequence of calls may be batched-up in a send buffer. The rpc call 4722652Smpp * return immediately to the client even though the call was not necessarily 4824817Sbde * sent. The batching occurs if the results' xdr routine is NULL (0) AND 49226119Sdes * the rpc timeout value is zero (see clnt.h, rpc). 50226119Sdes * 5122652Smpp * Clients should NOT casually batch calls that in fact return results; that is, 52226119Sdes * the server side should be aware that a call is batched and not produce any 5357731Ssheldonh * return message. Batched calls that produce many result messages can 5422652Smpp * deadlock (netlock) the client and the server.... 5522652Smpp * 5622652Smpp * Now go hang yourself. 5722652Smpp */ 5822652Smpp 5922652Smpp#include <sys/param.h> 6022652Smpp#include <sys/systm.h> 6122652Smpp#include <sys/lock.h> 6222652Smpp#include <sys/malloc.h> 6322652Smpp#include <sys/mbuf.h> 6422652Smpp#include <sys/mutex.h> 6522652Smpp#include <sys/pcpu.h> 6657731Ssheldonh#include <sys/proc.h> 6757731Ssheldonh#include <sys/protosw.h> 6822652Smpp#include <sys/socket.h> 6922652Smpp#include <sys/socketvar.h> 7022652Smpp#include <sys/syslog.h> 7122652Smpp#include <sys/time.h> 7222652Smpp#include <sys/uio.h> 7322652Smpp#include <netinet/tcp.h> 7457731Ssheldonh 7557731Ssheldonh#include <rpc/rpc.h> 7622652Smpp#include <rpc/rpc_com.h> 7722652Smpp 7822652Smpp#define MCALL_MSG_SIZE 24 7922652Smpp 8022652Smppstruct cmessage { 8122652Smpp struct cmsghdr cmsg; 8281251Sru struct cmsgcred cmcred; 8381251Sru}; 8422652Smpp 8522652Smppstatic enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *, 8622652Smpp rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 8722652Smppstatic void clnt_vc_geterr(CLIENT *, struct rpc_err *); 8881251Srustatic bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *); 8922652Smppstatic void clnt_vc_abort(CLIENT *); 9081251Srustatic bool_t clnt_vc_control(CLIENT *, u_int, void *); 9122652Smppstatic void clnt_vc_close(CLIENT *); 9222652Smppstatic void clnt_vc_destroy(CLIENT *); 9322652Smppstatic bool_t time_not_ok(struct timeval *); 9422652Smppstatic int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag); 9522652Smpp 9622652Smppstatic struct clnt_ops clnt_vc_ops = { 9722652Smpp .cl_call = clnt_vc_call, 9822652Smpp .cl_abort = clnt_vc_abort, 9922652Smpp .cl_geterr = clnt_vc_geterr, 10022652Smpp .cl_freeres = clnt_vc_freeres, 10122652Smpp .cl_close = clnt_vc_close, 10222652Smpp .cl_destroy = clnt_vc_destroy, 10322652Smpp .cl_control = clnt_vc_control 10422652Smpp}; 10522652Smpp 10622652Smpp/* 10722652Smpp * A pending RPC request which awaits a reply. Requests which have 10822652Smpp * received their reply will have cr_xid set to zero and cr_mrep to 10922652Smpp * the mbuf chain of the reply. 11022652Smpp */ 11122652Smppstruct ct_request { 11222652Smpp TAILQ_ENTRY(ct_request) cr_link; 11322652Smpp uint32_t cr_xid; /* XID of request */ 11422652Smpp struct mbuf *cr_mrep; /* reply received by upcall */ 11522652Smpp int cr_error; /* any error from upcall */ 11622652Smpp char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 11757731Ssheldonh}; 11857731Ssheldonh 11922652SmppTAILQ_HEAD(ct_request_list, ct_request); 12022652Smpp 12122652Smppstruct ct_data { 12222652Smpp struct mtx ct_lock; 12322652Smpp int ct_threads; /* number of threads in clnt_vc_call */ 12422652Smpp bool_t ct_closing; /* TRUE if we are closing */ 12522652Smpp bool_t ct_closed; /* TRUE if we are closed */ 12622652Smpp struct socket *ct_socket; /* connection socket */ 12722652Smpp bool_t ct_closeit; /* close it on destroy */ 12822652Smpp struct timeval ct_wait; /* wait interval in milliseconds */ 12922652Smpp struct sockaddr_storage ct_addr; /* remote addr */ 13022652Smpp struct rpc_err ct_error; 13122652Smpp uint32_t ct_xid; 13222652Smpp char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 13322652Smpp size_t ct_mpos; /* pos after marshal */ 13422652Smpp const char *ct_waitchan; 13522652Smpp int ct_waitflag; 13622652Smpp struct mbuf *ct_record; /* current reply record */ 13757731Ssheldonh size_t ct_record_resid; /* how much left of reply to read */ 13857731Ssheldonh bool_t ct_record_eor; /* true if reading last fragment */ 13922652Smpp struct ct_request_list ct_pending; 14022652Smpp}; 141226119Sdes 142226119Sdesstatic const char clnt_vc_errstr[] = "%s : %s"; 143226119Sdesstatic const char clnt_vc_str[] = "clnt_vc_create"; 144226119Sdesstatic const char clnt_read_vc_str[] = "read_vc"; 145226119Sdesstatic const char __no_mem_str[] = "out of memory"; 146226119Sdes 147226119Sdes/* 14822652Smpp * Create a client handle for a connection. 14922652Smpp * Default options are set, which the user can change using clnt_control()'s. 150137840Sjkoshy * The rpc/vc package does buffering similar to stdio, so the client 15122652Smpp * must pick send and receive buffer sizes, 0 => use the default. 152 * NB: fd is copied into a private area. 153 * NB: The rpch->cl_auth is set null authentication. Caller may wish to 154 * set this something more useful. 155 * 156 * fd should be an open socket 157 */ 158CLIENT * 159clnt_vc_create( 160 struct socket *so, /* open file descriptor */ 161 struct sockaddr *raddr, /* servers address */ 162 const rpcprog_t prog, /* program number */ 163 const rpcvers_t vers, /* version number */ 164 size_t sendsz, /* buffer recv size */ 165 size_t recvsz) /* buffer send size */ 166{ 167 CLIENT *cl; /* client handle */ 168 struct ct_data *ct = NULL; /* client handle */ 169 struct timeval now; 170 struct rpc_msg call_msg; 171 static uint32_t disrupt; 172 struct __rpc_sockinfo si; 173 XDR xdrs; 174 int error, interrupted, one = 1; 175 struct sockopt sopt; 176 177 if (disrupt == 0) 178 disrupt = (uint32_t)(long)raddr; 179 180 cl = (CLIENT *)mem_alloc(sizeof (*cl)); 181 ct = (struct ct_data *)mem_alloc(sizeof (*ct)); 182 183 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF); 184 ct->ct_threads = 0; 185 ct->ct_closing = FALSE; 186 ct->ct_closed = FALSE; 187 188 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) { 189 error = soconnect(so, raddr, curthread); 190 SOCK_LOCK(so); 191 interrupted = 0; 192 while ((so->so_state & SS_ISCONNECTING) 193 && so->so_error == 0) { 194 error = msleep(&so->so_timeo, SOCK_MTX(so), 195 PSOCK | PCATCH, "connec", 0); 196 if (error) { 197 if (error == EINTR || error == ERESTART) 198 interrupted = 1; 199 break; 200 } 201 } 202 if (error == 0) { 203 error = so->so_error; 204 so->so_error = 0; 205 } 206 SOCK_UNLOCK(so); 207 if (error) { 208 if (!interrupted) 209 so->so_state &= ~SS_ISCONNECTING; 210 rpc_createerr.cf_stat = RPC_SYSTEMERROR; 211 rpc_createerr.cf_error.re_errno = error; 212 goto err; 213 } 214 } 215 216 if (!__rpc_socket2sockinfo(so, &si)) 217 goto err; 218 219 if (so->so_proto->pr_flags & PR_CONNREQUIRED) { 220 bzero(&sopt, sizeof(sopt)); 221 sopt.sopt_dir = SOPT_SET; 222 sopt.sopt_level = SOL_SOCKET; 223 sopt.sopt_name = SO_KEEPALIVE; 224 sopt.sopt_val = &one; 225 sopt.sopt_valsize = sizeof(one); 226 sosetopt(so, &sopt); 227 } 228 229 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 230 bzero(&sopt, sizeof(sopt)); 231 sopt.sopt_dir = SOPT_SET; 232 sopt.sopt_level = IPPROTO_TCP; 233 sopt.sopt_name = TCP_NODELAY; 234 sopt.sopt_val = &one; 235 sopt.sopt_valsize = sizeof(one); 236 sosetopt(so, &sopt); 237 } 238 239 ct->ct_closeit = FALSE; 240 241 /* 242 * Set up private data struct 243 */ 244 ct->ct_socket = so; 245 ct->ct_wait.tv_sec = -1; 246 ct->ct_wait.tv_usec = -1; 247 memcpy(&ct->ct_addr, raddr, raddr->sa_len); 248 249 /* 250 * Initialize call message 251 */ 252 getmicrotime(&now); 253 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now); 254 call_msg.rm_xid = ct->ct_xid; 255 call_msg.rm_direction = CALL; 256 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION; 257 call_msg.rm_call.cb_prog = (uint32_t)prog; 258 call_msg.rm_call.cb_vers = (uint32_t)vers; 259 260 /* 261 * pre-serialize the static part of the call msg and stash it away 262 */ 263 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE, 264 XDR_ENCODE); 265 if (! xdr_callhdr(&xdrs, &call_msg)) { 266 if (ct->ct_closeit) { 267 soclose(ct->ct_socket); 268 } 269 goto err; 270 } 271 ct->ct_mpos = XDR_GETPOS(&xdrs); 272 XDR_DESTROY(&xdrs); 273 ct->ct_waitchan = "rpcrecv"; 274 ct->ct_waitflag = 0; 275 276 /* 277 * Create a client handle which uses xdrrec for serialization 278 * and authnone for authentication. 279 */ 280 cl->cl_refs = 1; 281 cl->cl_ops = &clnt_vc_ops; 282 cl->cl_private = ct; 283 cl->cl_auth = authnone_create(); 284 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 285 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 286 soreserve(ct->ct_socket, sendsz, recvsz); 287 288 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 289 soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); 290 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 291 292 ct->ct_record = NULL; 293 ct->ct_record_resid = 0; 294 TAILQ_INIT(&ct->ct_pending); 295 return (cl); 296 297err: 298 if (cl) { 299 if (ct) { 300 mtx_destroy(&ct->ct_lock); 301 mem_free(ct, sizeof (struct ct_data)); 302 } 303 if (cl) 304 mem_free(cl, sizeof (CLIENT)); 305 } 306 return ((CLIENT *)NULL); 307} 308 309static enum clnt_stat 310clnt_vc_call( 311 CLIENT *cl, /* client handle */ 312 struct rpc_callextra *ext, /* call metadata */ 313 rpcproc_t proc, /* procedure number */ 314 struct mbuf *args, /* pointer to args */ 315 struct mbuf **resultsp, /* pointer to results */ 316 struct timeval utimeout) 317{ 318 struct ct_data *ct = (struct ct_data *) cl->cl_private; 319 AUTH *auth; 320 struct rpc_err *errp; 321 enum clnt_stat stat; 322 XDR xdrs; 323 struct rpc_msg reply_msg; 324 bool_t ok; 325 int nrefreshes = 2; /* number of times to refresh cred */ 326 struct timeval timeout; 327 uint32_t xid; 328 struct mbuf *mreq = NULL, *results; 329 struct ct_request *cr; 330 int error; 331 332 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK); 333 334 mtx_lock(&ct->ct_lock); 335 336 if (ct->ct_closing || ct->ct_closed) { 337 mtx_unlock(&ct->ct_lock); 338 free(cr, M_RPC); 339 return (RPC_CANTSEND); 340 } 341 ct->ct_threads++; 342 343 if (ext) { 344 auth = ext->rc_auth; 345 errp = &ext->rc_err; 346 } else { 347 auth = cl->cl_auth; 348 errp = &ct->ct_error; 349 } 350 351 cr->cr_mrep = NULL; 352 cr->cr_error = 0; 353 354 if (ct->ct_wait.tv_usec == -1) { 355 timeout = utimeout; /* use supplied timeout */ 356 } else { 357 timeout = ct->ct_wait; /* use default timeout */ 358 } 359 360call_again: 361 mtx_assert(&ct->ct_lock, MA_OWNED); 362 363 ct->ct_xid++; 364 xid = ct->ct_xid; 365 366 mtx_unlock(&ct->ct_lock); 367 368 /* 369 * Leave space to pre-pend the record mark. 370 */ 371 MGETHDR(mreq, M_WAIT, MT_DATA); 372 mreq->m_data += sizeof(uint32_t); 373 KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN, 374 ("RPC header too big")); 375 bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos); 376 mreq->m_len = ct->ct_mpos; 377 378 /* 379 * The XID is the first thing in the request. 380 */ 381 *mtod(mreq, uint32_t *) = htonl(xid); 382 383 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 384 385 errp->re_status = stat = RPC_SUCCESS; 386 387 if ((! XDR_PUTINT32(&xdrs, &proc)) || 388 (! AUTH_MARSHALL(auth, xid, &xdrs, 389 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 390 errp->re_status = stat = RPC_CANTENCODEARGS; 391 mtx_lock(&ct->ct_lock); 392 goto out; 393 } 394 mreq->m_pkthdr.len = m_length(mreq, NULL); 395 396 /* 397 * Prepend a record marker containing the packet length. 398 */ 399 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT); 400 *mtod(mreq, uint32_t *) = 401 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t))); 402 403 cr->cr_xid = xid; 404 mtx_lock(&ct->ct_lock); 405 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link); 406 mtx_unlock(&ct->ct_lock); 407 408 /* 409 * sosend consumes mreq. 410 */ 411 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread); 412 mreq = NULL; 413 if (error == EMSGSIZE) { 414 SOCKBUF_LOCK(&ct->ct_socket->so_snd); 415 sbwait(&ct->ct_socket->so_snd); 416 SOCKBUF_UNLOCK(&ct->ct_socket->so_snd); 417 AUTH_VALIDATE(auth, xid, NULL, NULL); 418 mtx_lock(&ct->ct_lock); 419 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 420 goto call_again; 421 } 422 423 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 424 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 425 reply_msg.acpted_rply.ar_verf.oa_length = 0; 426 reply_msg.acpted_rply.ar_results.where = NULL; 427 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 428 429 mtx_lock(&ct->ct_lock); 430 if (error) { 431 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 432 errp->re_errno = error; 433 errp->re_status = stat = RPC_CANTSEND; 434 goto out; 435 } 436 437 /* 438 * Check to see if we got an upcall while waiting for the 439 * lock. In both these cases, the request has been removed 440 * from ct->ct_pending. 441 */ 442 if (cr->cr_error) { 443 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 444 errp->re_errno = cr->cr_error; 445 errp->re_status = stat = RPC_CANTRECV; 446 goto out; 447 } 448 if (cr->cr_mrep) { 449 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 450 goto got_reply; 451 } 452 453 /* 454 * Hack to provide rpc-based message passing 455 */ 456 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { 457 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 458 errp->re_status = stat = RPC_TIMEDOUT; 459 goto out; 460 } 461 462 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan, 463 tvtohz(&timeout)); 464 465 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link); 466 467 if (error) { 468 /* 469 * The sleep returned an error so our request is still 470 * on the list. Turn the error code into an 471 * appropriate client status. 472 */ 473 errp->re_errno = error; 474 switch (error) { 475 case EINTR: 476 stat = RPC_INTR; 477 break; 478 case EWOULDBLOCK: 479 stat = RPC_TIMEDOUT; 480 break; 481 default: 482 stat = RPC_CANTRECV; 483 } 484 errp->re_status = stat; 485 goto out; 486 } else { 487 /* 488 * We were woken up by the upcall. If the 489 * upcall had a receive error, report that, 490 * otherwise we have a reply. 491 */ 492 if (cr->cr_error) { 493 errp->re_errno = cr->cr_error; 494 errp->re_status = stat = RPC_CANTRECV; 495 goto out; 496 } 497 } 498 499got_reply: 500 /* 501 * Now decode and validate the response. We need to drop the 502 * lock since xdr_replymsg may end up sleeping in malloc. 503 */ 504 mtx_unlock(&ct->ct_lock); 505 506 if (ext && ext->rc_feedback) 507 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 508 509 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 510 ok = xdr_replymsg(&xdrs, &reply_msg); 511 cr->cr_mrep = NULL; 512 513 if (ok) { 514 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 515 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 516 errp->re_status = stat = RPC_SUCCESS; 517 else 518 stat = _seterr_reply(&reply_msg, errp); 519 520 if (stat == RPC_SUCCESS) { 521 results = xdrmbuf_getall(&xdrs); 522 if (!AUTH_VALIDATE(auth, xid, 523 &reply_msg.acpted_rply.ar_verf, 524 &results)) { 525 errp->re_status = stat = RPC_AUTHERROR; 526 errp->re_why = AUTH_INVALIDRESP; 527 } else { 528 KASSERT(results, 529 ("auth validated but no result")); 530 *resultsp = results; 531 } 532 } /* end successful completion */ 533 /* 534 * If unsuccesful AND error is an authentication error 535 * then refresh credentials and try again, else break 536 */ 537 else if (stat == RPC_AUTHERROR) 538 /* maybe our credentials need to be refreshed ... */ 539 if (nrefreshes > 0 && 540 AUTH_REFRESH(auth, &reply_msg)) { 541 nrefreshes--; 542 XDR_DESTROY(&xdrs); 543 mtx_lock(&ct->ct_lock); 544 goto call_again; 545 } 546 /* end of unsuccessful completion */ 547 } /* end of valid reply message */ 548 else { 549 errp->re_status = stat = RPC_CANTDECODERES; 550 } 551 XDR_DESTROY(&xdrs); 552 mtx_lock(&ct->ct_lock); 553out: 554 mtx_assert(&ct->ct_lock, MA_OWNED); 555 556 KASSERT(stat != RPC_SUCCESS || *resultsp, 557 ("RPC_SUCCESS without reply")); 558 559 if (mreq) 560 m_freem(mreq); 561 if (cr->cr_mrep) 562 m_freem(cr->cr_mrep); 563 564 ct->ct_threads--; 565 if (ct->ct_closing) 566 wakeup(ct); 567 568 mtx_unlock(&ct->ct_lock); 569 570 if (auth && stat != RPC_SUCCESS) 571 AUTH_VALIDATE(auth, xid, NULL, NULL); 572 573 free(cr, M_RPC); 574 575 return (stat); 576} 577 578static void 579clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp) 580{ 581 struct ct_data *ct = (struct ct_data *) cl->cl_private; 582 583 *errp = ct->ct_error; 584} 585 586static bool_t 587clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 588{ 589 XDR xdrs; 590 bool_t dummy; 591 592 xdrs.x_op = XDR_FREE; 593 dummy = (*xdr_res)(&xdrs, res_ptr); 594 595 return (dummy); 596} 597 598/*ARGSUSED*/ 599static void 600clnt_vc_abort(CLIENT *cl) 601{ 602} 603 604static bool_t 605clnt_vc_control(CLIENT *cl, u_int request, void *info) 606{ 607 struct ct_data *ct = (struct ct_data *)cl->cl_private; 608 void *infop = info; 609 610 mtx_lock(&ct->ct_lock); 611 612 switch (request) { 613 case CLSET_FD_CLOSE: 614 ct->ct_closeit = TRUE; 615 mtx_unlock(&ct->ct_lock); 616 return (TRUE); 617 case CLSET_FD_NCLOSE: 618 ct->ct_closeit = FALSE; 619 mtx_unlock(&ct->ct_lock); 620 return (TRUE); 621 default: 622 break; 623 } 624 625 /* for other requests which use info */ 626 if (info == NULL) { 627 mtx_unlock(&ct->ct_lock); 628 return (FALSE); 629 } 630 switch (request) { 631 case CLSET_TIMEOUT: 632 if (time_not_ok((struct timeval *)info)) { 633 mtx_unlock(&ct->ct_lock); 634 return (FALSE); 635 } 636 ct->ct_wait = *(struct timeval *)infop; 637 break; 638 case CLGET_TIMEOUT: 639 *(struct timeval *)infop = ct->ct_wait; 640 break; 641 case CLGET_SERVER_ADDR: 642 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len); 643 break; 644 case CLGET_SVC_ADDR: 645 /* 646 * Slightly different semantics to userland - we use 647 * sockaddr instead of netbuf. 648 */ 649 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len); 650 break; 651 case CLSET_SVC_ADDR: /* set to new address */ 652 mtx_unlock(&ct->ct_lock); 653 return (FALSE); 654 case CLGET_XID: 655 *(uint32_t *)info = ct->ct_xid; 656 break; 657 case CLSET_XID: 658 /* This will set the xid of the NEXT call */ 659 /* decrement by 1 as clnt_vc_call() increments once */ 660 ct->ct_xid = *(uint32_t *)info - 1; 661 break; 662 case CLGET_VERS: 663 /* 664 * This RELIES on the information that, in the call body, 665 * the version number field is the fifth field from the 666 * begining of the RPC header. MUST be changed if the 667 * call_struct is changed 668 */ 669 *(uint32_t *)info = 670 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 671 4 * BYTES_PER_XDR_UNIT)); 672 break; 673 674 case CLSET_VERS: 675 *(uint32_t *)(void *)(ct->ct_mcallc + 676 4 * BYTES_PER_XDR_UNIT) = 677 htonl(*(uint32_t *)info); 678 break; 679 680 case CLGET_PROG: 681 /* 682 * This RELIES on the information that, in the call body, 683 * the program number field is the fourth field from the 684 * begining of the RPC header. MUST be changed if the 685 * call_struct is changed 686 */ 687 *(uint32_t *)info = 688 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc + 689 3 * BYTES_PER_XDR_UNIT)); 690 break; 691 692 case CLSET_PROG: 693 *(uint32_t *)(void *)(ct->ct_mcallc + 694 3 * BYTES_PER_XDR_UNIT) = 695 htonl(*(uint32_t *)info); 696 break; 697 698 case CLSET_WAITCHAN: 699 ct->ct_waitchan = (const char *)info; 700 break; 701 702 case CLGET_WAITCHAN: 703 *(const char **) info = ct->ct_waitchan; 704 break; 705 706 case CLSET_INTERRUPTIBLE: 707 if (*(int *) info) 708 ct->ct_waitflag = PCATCH; 709 else 710 ct->ct_waitflag = 0; 711 break; 712 713 case CLGET_INTERRUPTIBLE: 714 if (ct->ct_waitflag) 715 *(int *) info = TRUE; 716 else 717 *(int *) info = FALSE; 718 break; 719 720 default: 721 mtx_unlock(&ct->ct_lock); 722 return (FALSE); 723 } 724 725 mtx_unlock(&ct->ct_lock); 726 return (TRUE); 727} 728 729static void 730clnt_vc_close(CLIENT *cl) 731{ 732 struct ct_data *ct = (struct ct_data *) cl->cl_private; 733 struct ct_request *cr; 734 735 mtx_lock(&ct->ct_lock); 736 737 if (ct->ct_closed) { 738 mtx_unlock(&ct->ct_lock); 739 return; 740 } 741 742 if (ct->ct_closing) { 743 while (ct->ct_closing) 744 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 745 KASSERT(ct->ct_closed, ("client should be closed")); 746 mtx_unlock(&ct->ct_lock); 747 return; 748 } 749 750 if (ct->ct_socket) { 751 ct->ct_closing = TRUE; 752 mtx_unlock(&ct->ct_lock); 753 754 SOCKBUF_LOCK(&ct->ct_socket->so_rcv); 755 soupcall_clear(ct->ct_socket, SO_RCV); 756 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); 757 758 /* 759 * Abort any pending requests and wait until everyone 760 * has finished with clnt_vc_call. 761 */ 762 mtx_lock(&ct->ct_lock); 763 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 764 cr->cr_xid = 0; 765 cr->cr_error = ESHUTDOWN; 766 wakeup(cr); 767 } 768 769 while (ct->ct_threads) 770 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0); 771 } 772 773 ct->ct_closing = FALSE; 774 ct->ct_closed = TRUE; 775 mtx_unlock(&ct->ct_lock); 776 wakeup(ct); 777} 778 779static void 780clnt_vc_destroy(CLIENT *cl) 781{ 782 struct ct_data *ct = (struct ct_data *) cl->cl_private; 783 struct socket *so = NULL; 784 785 clnt_vc_close(cl); 786 787 mtx_lock(&ct->ct_lock); 788 789 if (ct->ct_socket) { 790 if (ct->ct_closeit) { 791 so = ct->ct_socket; 792 } 793 } 794 795 mtx_unlock(&ct->ct_lock); 796 797 mtx_destroy(&ct->ct_lock); 798 if (so) { 799 soshutdown(so, SHUT_WR); 800 soclose(so); 801 } 802 mem_free(ct, sizeof(struct ct_data)); 803 mem_free(cl, sizeof(CLIENT)); 804} 805 806/* 807 * Make sure that the time is not garbage. -1 value is disallowed. 808 * Note this is different from time_not_ok in clnt_dg.c 809 */ 810static bool_t 811time_not_ok(struct timeval *t) 812{ 813 return (t->tv_sec <= -1 || t->tv_sec > 100000000 || 814 t->tv_usec <= -1 || t->tv_usec > 1000000); 815} 816 817int 818clnt_vc_soupcall(struct socket *so, void *arg, int waitflag) 819{ 820 struct ct_data *ct = (struct ct_data *) arg; 821 struct uio uio; 822 struct mbuf *m; 823 struct ct_request *cr; 824 int error, rcvflag, foundreq; 825 uint32_t xid, header; 826 bool_t do_read; 827 828 uio.uio_td = curthread; 829 do { 830 /* 831 * If ct_record_resid is zero, we are waiting for a 832 * record mark. 833 */ 834 if (ct->ct_record_resid == 0) { 835 836 /* 837 * Make sure there is either a whole record 838 * mark in the buffer or there is some other 839 * error condition 840 */ 841 do_read = FALSE; 842 if (so->so_rcv.sb_cc >= sizeof(uint32_t) 843 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 844 || so->so_error) 845 do_read = TRUE; 846 847 if (!do_read) 848 return (SU_OK); 849 850 SOCKBUF_UNLOCK(&so->so_rcv); 851 uio.uio_resid = sizeof(uint32_t); 852 m = NULL; 853 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 854 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 855 SOCKBUF_LOCK(&so->so_rcv); 856 857 if (error == EWOULDBLOCK) 858 break; 859 860 /* 861 * If there was an error, wake up all pending 862 * requests. 863 */ 864 if (error || uio.uio_resid > 0) { 865 wakeup_all: 866 mtx_lock(&ct->ct_lock); 867 if (!error) { 868 /* 869 * We must have got EOF trying 870 * to read from the stream. 871 */ 872 error = ECONNRESET; 873 } 874 ct->ct_error.re_status = RPC_CANTRECV; 875 ct->ct_error.re_errno = error; 876 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 877 cr->cr_error = error; 878 wakeup(cr); 879 } 880 mtx_unlock(&ct->ct_lock); 881 break; 882 } 883 bcopy(mtod(m, uint32_t *), &header, sizeof(uint32_t)); 884 header = ntohl(header); 885 ct->ct_record = NULL; 886 ct->ct_record_resid = header & 0x7fffffff; 887 ct->ct_record_eor = ((header & 0x80000000) != 0); 888 m_freem(m); 889 } else { 890 /* 891 * Wait until the socket has the whole record 892 * buffered. 893 */ 894 do_read = FALSE; 895 if (so->so_rcv.sb_cc >= ct->ct_record_resid 896 || (so->so_rcv.sb_state & SBS_CANTRCVMORE) 897 || so->so_error) 898 do_read = TRUE; 899 900 if (!do_read) 901 return (SU_OK); 902 903 /* 904 * We have the record mark. Read as much as 905 * the socket has buffered up to the end of 906 * this record. 907 */ 908 SOCKBUF_UNLOCK(&so->so_rcv); 909 uio.uio_resid = ct->ct_record_resid; 910 m = NULL; 911 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; 912 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 913 SOCKBUF_LOCK(&so->so_rcv); 914 915 if (error == EWOULDBLOCK) 916 break; 917 918 if (error || uio.uio_resid == ct->ct_record_resid) 919 goto wakeup_all; 920 921 /* 922 * If we have part of the record already, 923 * chain this bit onto the end. 924 */ 925 if (ct->ct_record) 926 m_last(ct->ct_record)->m_next = m; 927 else 928 ct->ct_record = m; 929 930 ct->ct_record_resid = uio.uio_resid; 931 932 /* 933 * If we have the entire record, see if we can 934 * match it to a request. 935 */ 936 if (ct->ct_record_resid == 0 937 && ct->ct_record_eor) { 938 /* 939 * The XID is in the first uint32_t of 940 * the reply. 941 */ 942 if (ct->ct_record->m_len < sizeof(xid)) 943 ct->ct_record = 944 m_pullup(ct->ct_record, 945 sizeof(xid)); 946 if (!ct->ct_record) 947 break; 948 bcopy(mtod(ct->ct_record, uint32_t *), 949 &xid, sizeof(uint32_t)); 950 xid = ntohl(xid); 951 952 mtx_lock(&ct->ct_lock); 953 foundreq = 0; 954 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { 955 if (cr->cr_xid == xid) { 956 /* 957 * This one 958 * matches. We leave 959 * the reply mbuf in 960 * cr->cr_mrep. Set 961 * the XID to zero so 962 * that we will ignore 963 * any duplicaed 964 * replies. 965 */ 966 cr->cr_xid = 0; 967 cr->cr_mrep = ct->ct_record; 968 cr->cr_error = 0; 969 foundreq = 1; 970 wakeup(cr); 971 break; 972 } 973 } 974 mtx_unlock(&ct->ct_lock); 975 976 if (!foundreq) 977 m_freem(ct->ct_record); 978 ct->ct_record = NULL; 979 } 980 } 981 } while (m); 982 return (SU_OK); 983} 984