1/* $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $ */ 2 3/*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31#if defined(LIBC_SCCS) && !defined(lint) 32static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro"; 33static char *sccsid = "@(#)svc_tcp.c 2.2 88/08/01 4.0 RPCSRC"; 34#endif 35#include <sys/cdefs.h> 36__FBSDID("$FreeBSD$"); 37 38/* 39 * svc_vc.c, Server side for Connection Oriented based RPC. 40 * 41 * Actually implements two flavors of transporter - 42 * a tcp rendezvouser (a listner and connection establisher) 43 * and a record/tcp stream. 44 */ 45 46#include <sys/param.h> 47#include <sys/limits.h> 48#include <sys/lock.h> 49#include <sys/kernel.h> 50#include <sys/malloc.h> 51#include <sys/mbuf.h> 52#include <sys/mutex.h> 53#include <sys/proc.h> 54#include <sys/protosw.h> 55#include <sys/queue.h> 56#include <sys/socket.h> 57#include <sys/socketvar.h> 58#include <sys/sx.h> 59#include <sys/systm.h> 60#include <sys/uio.h> 61 62#include <net/vnet.h> 63 64#include <netinet/tcp.h> 65 66#include <rpc/rpc.h> 67 68#include <rpc/krpc.h> 69#include <rpc/rpc_com.h> 70 71#include <security/mac/mac_framework.h> 72 73static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *, 74 struct sockaddr **, struct mbuf **); 75static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *); 76static void svc_vc_rendezvous_destroy(SVCXPRT *); 77static bool_t svc_vc_null(void); 78static void svc_vc_destroy(SVCXPRT *); 79static enum xprt_stat svc_vc_stat(SVCXPRT *); 80static bool_t svc_vc_ack(SVCXPRT *, uint32_t *); 81static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *, 82 struct sockaddr **, struct mbuf **); 83static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *, 84 struct sockaddr *, struct mbuf *, uint32_t *seq); 85static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in); 86static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq, 87 void *in); 88static void svc_vc_backchannel_destroy(SVCXPRT *); 89static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *); 90static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *, 91 struct sockaddr **, struct mbuf **); 92static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *, 93 struct sockaddr *, struct mbuf *, uint32_t *); 94static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, 95 void *in); 96static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so, 97 struct sockaddr *raddr); 98static int svc_vc_accept(struct socket *head, struct socket **sop); 99static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag); 100 101static struct xp_ops svc_vc_rendezvous_ops = { 102 .xp_recv = svc_vc_rendezvous_recv, 103 .xp_stat = svc_vc_rendezvous_stat, 104 .xp_reply = (bool_t (*)(SVCXPRT *, struct rpc_msg *, 105 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null, 106 .xp_destroy = svc_vc_rendezvous_destroy, 107 .xp_control = svc_vc_rendezvous_control 108}; 109 110static struct xp_ops svc_vc_ops = { 111 .xp_recv = svc_vc_recv, 112 .xp_stat = svc_vc_stat, 113 .xp_ack = svc_vc_ack, 114 .xp_reply = svc_vc_reply, 115 .xp_destroy = svc_vc_destroy, 116 .xp_control = svc_vc_control 117}; 118 119static struct xp_ops svc_vc_backchannel_ops = { 120 .xp_recv = svc_vc_backchannel_recv, 121 .xp_stat = svc_vc_backchannel_stat, 122 .xp_reply = svc_vc_backchannel_reply, 123 .xp_destroy = svc_vc_backchannel_destroy, 124 .xp_control = svc_vc_backchannel_control 125}; 126 127/* 128 * Usage: 129 * xprt = svc_vc_create(sock, send_buf_size, recv_buf_size); 130 * 131 * Creates, registers, and returns a (rpc) tcp based transporter. 132 * Once *xprt is initialized, it is registered as a transporter 133 * see (svc.h, xprt_register). This routine returns 134 * a NULL if a problem occurred. 135 * 136 * The filedescriptor passed in is expected to refer to a bound, but 137 * not yet connected socket. 138 * 139 * Since streams do buffered io similar to stdio, the caller can specify 140 * how big the send and receive buffers are via the second and third parms; 141 * 0 => use the system default. 142 */ 143SVCXPRT * 144svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize, 145 size_t recvsize) 146{ 147 SVCXPRT *xprt; 148 struct sockaddr* sa; 149 int error; 150 151 SOCK_LOCK(so); 152 if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) { 153 SOCK_UNLOCK(so); 154 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa); 155 if (error) 156 return (NULL); 157 xprt = svc_vc_create_conn(pool, so, sa); 158 free(sa, M_SONAME); 159 return (xprt); 160 } 161 SOCK_UNLOCK(so); 162 163 xprt = svc_xprt_alloc(); 164 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 165 xprt->xp_pool = pool; 166 xprt->xp_socket = so; 167 xprt->xp_p1 = NULL; 168 xprt->xp_p2 = NULL; 169 xprt->xp_ops = &svc_vc_rendezvous_ops; 170 171 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 172 if (error) { 173 goto cleanup_svc_vc_create; 174 } 175 176 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 177 free(sa, M_SONAME); 178 179 xprt_register(xprt); 180 181 solisten(so, SOMAXCONN, curthread); 182 183 SOCKBUF_LOCK(&so->so_rcv); 184 xprt->xp_upcallset = 1; 185 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 186 SOCKBUF_UNLOCK(&so->so_rcv); 187 188 return (xprt); 189cleanup_svc_vc_create: 190 if (xprt) { 191 sx_destroy(&xprt->xp_lock); 192 svc_xprt_free(xprt); 193 } 194 return (NULL); 195} 196 197/* 198 * Create a new transport for a socket optained via soaccept(). 199 */ 200SVCXPRT * 201svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr) 202{ 203 SVCXPRT *xprt = NULL; 204 struct cf_conn *cd = NULL; 205 struct sockaddr* sa = NULL; 206 struct sockopt opt; 207 int one = 1; 208 int error; 209 210 bzero(&opt, sizeof(struct sockopt)); 211 opt.sopt_dir = SOPT_SET; 212 opt.sopt_level = SOL_SOCKET; 213 opt.sopt_name = SO_KEEPALIVE; 214 opt.sopt_val = &one; 215 opt.sopt_valsize = sizeof(one); 216 error = sosetopt(so, &opt); 217 if (error) { 218 return (NULL); 219 } 220 221 if (so->so_proto->pr_protocol == IPPROTO_TCP) { 222 bzero(&opt, sizeof(struct sockopt)); 223 opt.sopt_dir = SOPT_SET; 224 opt.sopt_level = IPPROTO_TCP; 225 opt.sopt_name = TCP_NODELAY; 226 opt.sopt_val = &one; 227 opt.sopt_valsize = sizeof(one); 228 error = sosetopt(so, &opt); 229 if (error) { 230 return (NULL); 231 } 232 } 233 234 cd = mem_alloc(sizeof(*cd)); 235 cd->strm_stat = XPRT_IDLE; 236 237 xprt = svc_xprt_alloc(); 238 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 239 xprt->xp_pool = pool; 240 xprt->xp_socket = so; 241 xprt->xp_p1 = cd; 242 xprt->xp_p2 = NULL; 243 xprt->xp_ops = &svc_vc_ops; 244 245 /* 246 * See http://www.connectathon.org/talks96/nfstcp.pdf - client 247 * has a 5 minute timer, server has a 6 minute timer. 248 */ 249 xprt->xp_idletimeout = 6 * 60; 250 251 memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len); 252 253 error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa); 254 if (error) 255 goto cleanup_svc_vc_create; 256 257 memcpy(&xprt->xp_ltaddr, sa, sa->sa_len); 258 free(sa, M_SONAME); 259 260 xprt_register(xprt); 261 262 SOCKBUF_LOCK(&so->so_rcv); 263 xprt->xp_upcallset = 1; 264 soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt); 265 SOCKBUF_UNLOCK(&so->so_rcv); 266 267 /* 268 * Throw the transport into the active list in case it already 269 * has some data buffered. 270 */ 271 sx_xlock(&xprt->xp_lock); 272 xprt_active(xprt); 273 sx_xunlock(&xprt->xp_lock); 274 275 return (xprt); 276cleanup_svc_vc_create: 277 if (xprt) { 278 sx_destroy(&xprt->xp_lock); 279 svc_xprt_free(xprt); 280 } 281 if (cd) 282 mem_free(cd, sizeof(*cd)); 283 return (NULL); 284} 285 286/* 287 * Create a new transport for a backchannel on a clnt_vc socket. 288 */ 289SVCXPRT * 290svc_vc_create_backchannel(SVCPOOL *pool) 291{ 292 SVCXPRT *xprt = NULL; 293 struct cf_conn *cd = NULL; 294 295 cd = mem_alloc(sizeof(*cd)); 296 cd->strm_stat = XPRT_IDLE; 297 298 xprt = svc_xprt_alloc(); 299 sx_init(&xprt->xp_lock, "xprt->xp_lock"); 300 xprt->xp_pool = pool; 301 xprt->xp_socket = NULL; 302 xprt->xp_p1 = cd; 303 xprt->xp_p2 = NULL; 304 xprt->xp_ops = &svc_vc_backchannel_ops; 305 return (xprt); 306} 307 308/* 309 * This does all of the accept except the final call to soaccept. The 310 * caller will call soaccept after dropping its locks (soaccept may 311 * call malloc). 312 */ 313int 314svc_vc_accept(struct socket *head, struct socket **sop) 315{ 316 int error = 0; 317 struct socket *so; 318 319 if ((head->so_options & SO_ACCEPTCONN) == 0) { 320 error = EINVAL; 321 goto done; 322 } 323#ifdef MAC 324 error = mac_socket_check_accept(curthread->td_ucred, head); 325 if (error != 0) 326 goto done; 327#endif 328 ACCEPT_LOCK(); 329 if (TAILQ_EMPTY(&head->so_comp)) { 330 ACCEPT_UNLOCK(); 331 error = EWOULDBLOCK; 332 goto done; 333 } 334 so = TAILQ_FIRST(&head->so_comp); 335 KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP")); 336 KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP")); 337 338 /* 339 * Before changing the flags on the socket, we have to bump the 340 * reference count. Otherwise, if the protocol calls sofree(), 341 * the socket will be released due to a zero refcount. 342 * XXX might not need soref() since this is simpler than kern_accept. 343 */ 344 SOCK_LOCK(so); /* soref() and so_state update */ 345 soref(so); /* file descriptor reference */ 346 347 TAILQ_REMOVE(&head->so_comp, so, so_list); 348 head->so_qlen--; 349 so->so_state |= (head->so_state & SS_NBIO); 350 so->so_qstate &= ~SQ_COMP; 351 so->so_head = NULL; 352 353 SOCK_UNLOCK(so); 354 ACCEPT_UNLOCK(); 355 356 *sop = so; 357 358 /* connection has been removed from the listen queue */ 359 KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0); 360done: 361 return (error); 362} 363 364/*ARGSUSED*/ 365static bool_t 366svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg, 367 struct sockaddr **addrp, struct mbuf **mp) 368{ 369 struct socket *so = NULL; 370 struct sockaddr *sa = NULL; 371 int error; 372 SVCXPRT *new_xprt; 373 374 /* 375 * The socket upcall calls xprt_active() which will eventually 376 * cause the server to call us here. We attempt to accept a 377 * connection from the socket and turn it into a new 378 * transport. If the accept fails, we have drained all pending 379 * connections so we call xprt_inactive(). 380 */ 381 sx_xlock(&xprt->xp_lock); 382 383 error = svc_vc_accept(xprt->xp_socket, &so); 384 385 if (error == EWOULDBLOCK) { 386 /* 387 * We must re-test for new connections after taking 388 * the lock to protect us in the case where a new 389 * connection arrives after our call to accept fails 390 * with EWOULDBLOCK. 391 */ 392 ACCEPT_LOCK(); 393 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) 394 xprt_inactive_self(xprt); 395 ACCEPT_UNLOCK(); 396 sx_xunlock(&xprt->xp_lock); 397 return (FALSE); 398 } 399 400 if (error) { 401 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 402 if (xprt->xp_upcallset) { 403 xprt->xp_upcallset = 0; 404 soupcall_clear(xprt->xp_socket, SO_RCV); 405 } 406 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 407 xprt_inactive_self(xprt); 408 sx_xunlock(&xprt->xp_lock); 409 return (FALSE); 410 } 411 412 sx_xunlock(&xprt->xp_lock); 413 414 sa = 0; 415 error = soaccept(so, &sa); 416 417 if (error) { 418 /* 419 * XXX not sure if I need to call sofree or soclose here. 420 */ 421 if (sa) 422 free(sa, M_SONAME); 423 return (FALSE); 424 } 425 426 /* 427 * svc_vc_create_conn will call xprt_register - we don't need 428 * to do anything with the new connection except derefence it. 429 */ 430 new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa); 431 if (!new_xprt) { 432 soclose(so); 433 } else { 434 SVC_RELEASE(new_xprt); 435 } 436 437 free(sa, M_SONAME); 438 439 return (FALSE); /* there is never an rpc msg to be processed */ 440} 441 442/*ARGSUSED*/ 443static enum xprt_stat 444svc_vc_rendezvous_stat(SVCXPRT *xprt) 445{ 446 447 return (XPRT_IDLE); 448} 449 450static void 451svc_vc_destroy_common(SVCXPRT *xprt) 452{ 453 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); 454 if (xprt->xp_upcallset) { 455 xprt->xp_upcallset = 0; 456 soupcall_clear(xprt->xp_socket, SO_RCV); 457 } 458 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); 459 460 if (xprt->xp_socket) 461 (void)soclose(xprt->xp_socket); 462 463 if (xprt->xp_netid) 464 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1); 465 svc_xprt_free(xprt); 466} 467 468static void 469svc_vc_rendezvous_destroy(SVCXPRT *xprt) 470{ 471 472 svc_vc_destroy_common(xprt); 473} 474 475static void 476svc_vc_destroy(SVCXPRT *xprt) 477{ 478 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 479 480 svc_vc_destroy_common(xprt); 481 482 if (cd->mreq) 483 m_freem(cd->mreq); 484 if (cd->mpending) 485 m_freem(cd->mpending); 486 mem_free(cd, sizeof(*cd)); 487} 488 489static void 490svc_vc_backchannel_destroy(SVCXPRT *xprt) 491{ 492 struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1; 493 struct mbuf *m, *m2; 494 495 svc_xprt_free(xprt); 496 m = cd->mreq; 497 while (m != NULL) { 498 m2 = m; 499 m = m->m_nextpkt; 500 m_freem(m2); 501 } 502 mem_free(cd, sizeof(*cd)); 503} 504 505/*ARGSUSED*/ 506static bool_t 507svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in) 508{ 509 return (FALSE); 510} 511 512static bool_t 513svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in) 514{ 515 516 return (FALSE); 517} 518 519static bool_t 520svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in) 521{ 522 523 return (FALSE); 524} 525 526static enum xprt_stat 527svc_vc_stat(SVCXPRT *xprt) 528{ 529 struct cf_conn *cd; 530 531 cd = (struct cf_conn *)(xprt->xp_p1); 532 533 if (cd->strm_stat == XPRT_DIED) 534 return (XPRT_DIED); 535 536 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) 537 return (XPRT_MOREREQS); 538 539 if (soreadable(xprt->xp_socket)) 540 return (XPRT_MOREREQS); 541 542 return (XPRT_IDLE); 543} 544 545static bool_t 546svc_vc_ack(SVCXPRT *xprt, uint32_t *ack) 547{ 548 549 *ack = atomic_load_acq_32(&xprt->xp_snt_cnt); 550 *ack -= xprt->xp_socket->so_snd.sb_cc; 551 return (TRUE); 552} 553 554static enum xprt_stat 555svc_vc_backchannel_stat(SVCXPRT *xprt) 556{ 557 struct cf_conn *cd; 558 559 cd = (struct cf_conn *)(xprt->xp_p1); 560 561 if (cd->mreq != NULL) 562 return (XPRT_MOREREQS); 563 564 return (XPRT_IDLE); 565} 566 567/* 568 * If we have an mbuf chain in cd->mpending, try to parse a record from it, 569 * leaving the result in cd->mreq. If we don't have a complete record, leave 570 * the partial result in cd->mreq and try to read more from the socket. 571 */ 572static int 573svc_vc_process_pending(SVCXPRT *xprt) 574{ 575 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 576 struct socket *so = xprt->xp_socket; 577 struct mbuf *m; 578 579 /* 580 * If cd->resid is non-zero, we have part of the 581 * record already, otherwise we are expecting a record 582 * marker. 583 */ 584 if (!cd->resid && cd->mpending) { 585 /* 586 * See if there is enough data buffered to 587 * make up a record marker. Make sure we can 588 * handle the case where the record marker is 589 * split across more than one mbuf. 590 */ 591 size_t n = 0; 592 uint32_t header; 593 594 m = cd->mpending; 595 while (n < sizeof(uint32_t) && m) { 596 n += m->m_len; 597 m = m->m_next; 598 } 599 if (n < sizeof(uint32_t)) { 600 so->so_rcv.sb_lowat = sizeof(uint32_t) - n; 601 return (FALSE); 602 } 603 m_copydata(cd->mpending, 0, sizeof(header), 604 (char *)&header); 605 header = ntohl(header); 606 cd->eor = (header & 0x80000000) != 0; 607 cd->resid = header & 0x7fffffff; 608 m_adj(cd->mpending, sizeof(uint32_t)); 609 } 610 611 /* 612 * Start pulling off mbufs from cd->mpending 613 * until we either have a complete record or 614 * we run out of data. We use m_split to pull 615 * data - it will pull as much as possible and 616 * split the last mbuf if necessary. 617 */ 618 while (cd->mpending && cd->resid) { 619 m = cd->mpending; 620 if (cd->mpending->m_next 621 || cd->mpending->m_len > cd->resid) 622 cd->mpending = m_split(cd->mpending, 623 cd->resid, M_WAITOK); 624 else 625 cd->mpending = NULL; 626 if (cd->mreq) 627 m_last(cd->mreq)->m_next = m; 628 else 629 cd->mreq = m; 630 while (m) { 631 cd->resid -= m->m_len; 632 m = m->m_next; 633 } 634 } 635 636 /* 637 * Block receive upcalls if we have more data pending, 638 * otherwise report our need. 639 */ 640 if (cd->mpending) 641 so->so_rcv.sb_lowat = INT_MAX; 642 else 643 so->so_rcv.sb_lowat = 644 imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); 645 return (TRUE); 646} 647 648static bool_t 649svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, 650 struct sockaddr **addrp, struct mbuf **mp) 651{ 652 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 653 struct uio uio; 654 struct mbuf *m; 655 struct socket* so = xprt->xp_socket; 656 XDR xdrs; 657 int error, rcvflag; 658 659 /* 660 * Serialise access to the socket and our own record parsing 661 * state. 662 */ 663 sx_xlock(&xprt->xp_lock); 664 665 for (;;) { 666 /* If we have no request ready, check pending queue. */ 667 while (cd->mpending && 668 (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) { 669 if (!svc_vc_process_pending(xprt)) 670 break; 671 } 672 673 /* Process and return complete request in cd->mreq. */ 674 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { 675 676 xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); 677 cd->mreq = NULL; 678 679 /* Check for next request in a pending queue. */ 680 svc_vc_process_pending(xprt); 681 if (cd->mreq == NULL || cd->resid != 0) { 682 SOCKBUF_LOCK(&so->so_rcv); 683 if (!soreadable(so)) 684 xprt_inactive_self(xprt); 685 SOCKBUF_UNLOCK(&so->so_rcv); 686 } 687 688 sx_xunlock(&xprt->xp_lock); 689 690 if (! xdr_callmsg(&xdrs, msg)) { 691 XDR_DESTROY(&xdrs); 692 return (FALSE); 693 } 694 695 *addrp = NULL; 696 *mp = xdrmbuf_getall(&xdrs); 697 XDR_DESTROY(&xdrs); 698 699 return (TRUE); 700 } 701 702 /* 703 * The socket upcall calls xprt_active() which will eventually 704 * cause the server to call us here. We attempt to 705 * read as much as possible from the socket and put 706 * the result in cd->mpending. If the read fails, 707 * we have drained both cd->mpending and the socket so 708 * we can call xprt_inactive(). 709 */ 710 uio.uio_resid = 1000000000; 711 uio.uio_td = curthread; 712 m = NULL; 713 rcvflag = MSG_DONTWAIT; 714 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); 715 716 if (error == EWOULDBLOCK) { 717 /* 718 * We must re-test for readability after 719 * taking the lock to protect us in the case 720 * where a new packet arrives on the socket 721 * after our call to soreceive fails with 722 * EWOULDBLOCK. 723 */ 724 SOCKBUF_LOCK(&so->so_rcv); 725 if (!soreadable(so)) 726 xprt_inactive_self(xprt); 727 SOCKBUF_UNLOCK(&so->so_rcv); 728 sx_xunlock(&xprt->xp_lock); 729 return (FALSE); 730 } 731 732 if (error) { 733 SOCKBUF_LOCK(&so->so_rcv); 734 if (xprt->xp_upcallset) { 735 xprt->xp_upcallset = 0; 736 soupcall_clear(so, SO_RCV); 737 } 738 SOCKBUF_UNLOCK(&so->so_rcv); 739 xprt_inactive_self(xprt); 740 cd->strm_stat = XPRT_DIED; 741 sx_xunlock(&xprt->xp_lock); 742 return (FALSE); 743 } 744 745 if (!m) { 746 /* 747 * EOF - the other end has closed the socket. 748 */ 749 xprt_inactive_self(xprt); 750 cd->strm_stat = XPRT_DIED; 751 sx_xunlock(&xprt->xp_lock); 752 return (FALSE); 753 } 754 755 if (cd->mpending) 756 m_last(cd->mpending)->m_next = m; 757 else 758 cd->mpending = m; 759 } 760} 761 762static bool_t 763svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg, 764 struct sockaddr **addrp, struct mbuf **mp) 765{ 766 struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; 767 struct ct_data *ct; 768 struct mbuf *m; 769 XDR xdrs; 770 771 sx_xlock(&xprt->xp_lock); 772 ct = (struct ct_data *)xprt->xp_p2; 773 if (ct == NULL) { 774 sx_xunlock(&xprt->xp_lock); 775 return (FALSE); 776 } 777 mtx_lock(&ct->ct_lock); 778 m = cd->mreq; 779 if (m == NULL) { 780 xprt_inactive_self(xprt); 781 mtx_unlock(&ct->ct_lock); 782 sx_xunlock(&xprt->xp_lock); 783 return (FALSE); 784 } 785 cd->mreq = m->m_nextpkt; 786 mtx_unlock(&ct->ct_lock); 787 sx_xunlock(&xprt->xp_lock); 788 789 xdrmbuf_create(&xdrs, m, XDR_DECODE); 790 if (! xdr_callmsg(&xdrs, msg)) { 791 XDR_DESTROY(&xdrs); 792 return (FALSE); 793 } 794 *addrp = NULL; 795 *mp = xdrmbuf_getall(&xdrs); 796 XDR_DESTROY(&xdrs); 797 return (TRUE); 798} 799 800static bool_t 801svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg, 802 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 803{ 804 XDR xdrs; 805 struct mbuf *mrep; 806 bool_t stat = TRUE; 807 int error, len; 808 809 /* 810 * Leave space for record mark. 811 */ 812 MGETHDR(mrep, M_WAIT, MT_DATA); 813 mrep->m_len = 0; 814 mrep->m_data += sizeof(uint32_t); 815 816 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 817 818 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 819 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 820 if (!xdr_replymsg(&xdrs, msg)) 821 stat = FALSE; 822 else 823 xdrmbuf_append(&xdrs, m); 824 } else { 825 stat = xdr_replymsg(&xdrs, msg); 826 } 827 828 if (stat) { 829 m_fixhdr(mrep); 830 831 /* 832 * Prepend a record marker containing the reply length. 833 */ 834 M_PREPEND(mrep, sizeof(uint32_t), M_WAIT); 835 len = mrep->m_pkthdr.len; 836 *mtod(mrep, uint32_t *) = 837 htonl(0x80000000 | (len - sizeof(uint32_t))); 838 atomic_add_acq_32(&xprt->xp_snd_cnt, len); 839 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL, 840 0, curthread); 841 if (!error) { 842 atomic_add_rel_32(&xprt->xp_snt_cnt, len); 843 if (seq) 844 *seq = xprt->xp_snd_cnt; 845 stat = TRUE; 846 } else 847 atomic_subtract_32(&xprt->xp_snd_cnt, len); 848 } else { 849 m_freem(mrep); 850 } 851 852 XDR_DESTROY(&xdrs); 853 xprt->xp_p2 = NULL; 854 855 return (stat); 856} 857 858static bool_t 859svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg, 860 struct sockaddr *addr, struct mbuf *m, uint32_t *seq) 861{ 862 struct ct_data *ct; 863 XDR xdrs; 864 struct mbuf *mrep; 865 bool_t stat = TRUE; 866 int error; 867 868 /* 869 * Leave space for record mark. 870 */ 871 MGETHDR(mrep, M_WAITOK, MT_DATA); 872 mrep->m_len = 0; 873 mrep->m_data += sizeof(uint32_t); 874 875 xdrmbuf_create(&xdrs, mrep, XDR_ENCODE); 876 877 if (msg->rm_reply.rp_stat == MSG_ACCEPTED && 878 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) { 879 if (!xdr_replymsg(&xdrs, msg)) 880 stat = FALSE; 881 else 882 xdrmbuf_append(&xdrs, m); 883 } else { 884 stat = xdr_replymsg(&xdrs, msg); 885 } 886 887 if (stat) { 888 m_fixhdr(mrep); 889 890 /* 891 * Prepend a record marker containing the reply length. 892 */ 893 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK); 894 *mtod(mrep, uint32_t *) = 895 htonl(0x80000000 | (mrep->m_pkthdr.len 896 - sizeof(uint32_t))); 897 sx_xlock(&xprt->xp_lock); 898 ct = (struct ct_data *)xprt->xp_p2; 899 if (ct != NULL) 900 error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL, 901 0, curthread); 902 else 903 error = EPIPE; 904 sx_xunlock(&xprt->xp_lock); 905 if (!error) { 906 stat = TRUE; 907 } 908 } else { 909 m_freem(mrep); 910 } 911 912 XDR_DESTROY(&xdrs); 913 914 return (stat); 915} 916 917static bool_t 918svc_vc_null() 919{ 920 921 return (FALSE); 922} 923 924static int 925svc_vc_soupcall(struct socket *so, void *arg, int waitflag) 926{ 927 SVCXPRT *xprt = (SVCXPRT *) arg; 928 929 if (soreadable(xprt->xp_socket)) 930 xprt_active(xprt); 931 return (SU_OK); 932} 933 934#if 0 935/* 936 * Get the effective UID of the sending process. Used by rpcbind, keyserv 937 * and rpc.yppasswdd on AF_LOCAL. 938 */ 939int 940__rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) { 941 int sock, ret; 942 gid_t egid; 943 uid_t euid; 944 struct sockaddr *sa; 945 946 sock = transp->xp_fd; 947 sa = (struct sockaddr *)transp->xp_rtaddr; 948 if (sa->sa_family == AF_LOCAL) { 949 ret = getpeereid(sock, &euid, &egid); 950 if (ret == 0) 951 *uid = euid; 952 return (ret); 953 } else 954 return (-1); 955} 956#endif 957