svc.c revision 261057
1/* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3/*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31#if defined(LIBC_SCCS) && !defined(lint) 32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34#endif 35#include <sys/cdefs.h> 36__FBSDID("$FreeBSD: stable/9/sys/rpc/svc.c 261057 2014-01-23 00:28:17Z mav $"); 37 38/* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48#include <sys/param.h> 49#include <sys/lock.h> 50#include <sys/kernel.h> 51#include <sys/kthread.h> 52#include <sys/malloc.h> 53#include <sys/mbuf.h> 54#include <sys/mutex.h> 55#include <sys/proc.h> 56#include <sys/queue.h> 57#include <sys/socketvar.h> 58#include <sys/systm.h> 59#include <sys/ucred.h> 60 61#include <rpc/rpc.h> 62#include <rpc/rpcb_clnt.h> 63#include <rpc/replay.h> 64 65#include <rpc/rpc_com.h> 66 67#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 68#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 69 70static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 71 char *); 72static void svc_new_thread(SVCPOOL *pool); 73static void xprt_unregister_locked(SVCXPRT *xprt); 74 75/* *************** SVCXPRT related stuff **************** */ 76 77static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 78static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 79 80SVCPOOL* 81svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 82{ 83 SVCPOOL *pool; 84 85 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 86 87 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 88 pool->sp_name = name; 89 pool->sp_state = SVCPOOL_INIT; 90 pool->sp_proc = NULL; 91 TAILQ_INIT(&pool->sp_xlist); 92 TAILQ_INIT(&pool->sp_active); 93 TAILQ_INIT(&pool->sp_callouts); 94 LIST_INIT(&pool->sp_threads); 95 LIST_INIT(&pool->sp_idlethreads); 96 pool->sp_minthreads = 1; 97 pool->sp_maxthreads = 1; 98 pool->sp_threadcount = 0; 99 100 /* 101 * Don't use more than a quarter of mbuf clusters or more than 102 * 45Mb buffering requests. 103 */ 104 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 105 if (pool->sp_space_high > 45 << 20) 106 pool->sp_space_high = 45 << 20; 107 pool->sp_space_low = 2 * pool->sp_space_high / 3; 108 109 sysctl_ctx_init(&pool->sp_sysctl); 110 if (sysctl_base) { 111 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 112 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 113 pool, 0, svcpool_minthread_sysctl, "I", ""); 114 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 115 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 116 pool, 0, svcpool_maxthread_sysctl, "I", ""); 117 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 118 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 119 120 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 121 "request_space_used", CTLFLAG_RD, 122 &pool->sp_space_used, 0, 123 "Space in parsed but not handled requests."); 124 125 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 126 "request_space_used_highest", CTLFLAG_RD, 127 &pool->sp_space_used_highest, 0, 128 "Highest space used since reboot."); 129 130 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "request_space_high", CTLFLAG_RW, 132 &pool->sp_space_high, 0, 133 "Maximum space in parsed but not handled requests."); 134 135 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 136 "request_space_low", CTLFLAG_RW, 137 &pool->sp_space_low, 0, 138 "Low water mark for request space."); 139 140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141 "request_space_throttled", CTLFLAG_RD, 142 &pool->sp_space_throttled, 0, 143 "Whether nfs requests are currently throttled"); 144 145 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 146 "request_space_throttle_count", CTLFLAG_RD, 147 &pool->sp_space_throttle_count, 0, 148 "Count of times throttling based on request space has occurred"); 149 } 150 151 return pool; 152} 153 154void 155svcpool_destroy(SVCPOOL *pool) 156{ 157 SVCXPRT *xprt, *nxprt; 158 struct svc_callout *s; 159 struct svcxprt_list cleanup; 160 161 TAILQ_INIT(&cleanup); 162 mtx_lock(&pool->sp_lock); 163 164 while (TAILQ_FIRST(&pool->sp_xlist)) { 165 xprt = TAILQ_FIRST(&pool->sp_xlist); 166 xprt_unregister_locked(xprt); 167 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 168 } 169 170 while (TAILQ_FIRST(&pool->sp_callouts)) { 171 s = TAILQ_FIRST(&pool->sp_callouts); 172 mtx_unlock(&pool->sp_lock); 173 svc_unreg(pool, s->sc_prog, s->sc_vers); 174 mtx_lock(&pool->sp_lock); 175 } 176 mtx_unlock(&pool->sp_lock); 177 178 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 179 SVC_RELEASE(xprt); 180 } 181 182 mtx_destroy(&pool->sp_lock); 183 184 if (pool->sp_rcache) 185 replay_freecache(pool->sp_rcache); 186 187 sysctl_ctx_free(&pool->sp_sysctl); 188 free(pool, M_RPC); 189} 190 191static bool_t 192svcpool_active(SVCPOOL *pool) 193{ 194 enum svcpool_state state = pool->sp_state; 195 196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 197 return (FALSE); 198 return (TRUE); 199} 200 201/* 202 * Sysctl handler to set the minimum thread count on a pool 203 */ 204static int 205svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 206{ 207 SVCPOOL *pool; 208 int newminthreads, error, n; 209 210 pool = oidp->oid_arg1; 211 newminthreads = pool->sp_minthreads; 212 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 213 if (error == 0 && newminthreads != pool->sp_minthreads) { 214 if (newminthreads > pool->sp_maxthreads) 215 return (EINVAL); 216 mtx_lock(&pool->sp_lock); 217 if (newminthreads > pool->sp_minthreads 218 && svcpool_active(pool)) { 219 /* 220 * If the pool is running and we are 221 * increasing, create some more threads now. 222 */ 223 n = newminthreads - pool->sp_threadcount; 224 if (n > 0) { 225 mtx_unlock(&pool->sp_lock); 226 while (n--) 227 svc_new_thread(pool); 228 mtx_lock(&pool->sp_lock); 229 } 230 } 231 pool->sp_minthreads = newminthreads; 232 mtx_unlock(&pool->sp_lock); 233 } 234 return (error); 235} 236 237/* 238 * Sysctl handler to set the maximum thread count on a pool 239 */ 240static int 241svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 242{ 243 SVCPOOL *pool; 244 SVCTHREAD *st; 245 int newmaxthreads, error; 246 247 pool = oidp->oid_arg1; 248 newmaxthreads = pool->sp_maxthreads; 249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 251 if (newmaxthreads < pool->sp_minthreads) 252 return (EINVAL); 253 mtx_lock(&pool->sp_lock); 254 if (newmaxthreads < pool->sp_maxthreads 255 && svcpool_active(pool)) { 256 /* 257 * If the pool is running and we are 258 * decreasing, wake up some idle threads to 259 * encourage them to exit. 260 */ 261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 262 cv_signal(&st->st_cond); 263 } 264 pool->sp_maxthreads = newmaxthreads; 265 mtx_unlock(&pool->sp_lock); 266 } 267 return (error); 268} 269 270/* 271 * Activate a transport handle. 272 */ 273void 274xprt_register(SVCXPRT *xprt) 275{ 276 SVCPOOL *pool = xprt->xp_pool; 277 278 SVC_ACQUIRE(xprt); 279 mtx_lock(&pool->sp_lock); 280 xprt->xp_registered = TRUE; 281 xprt->xp_active = FALSE; 282 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 283 mtx_unlock(&pool->sp_lock); 284} 285 286/* 287 * De-activate a transport handle. Note: the locked version doesn't 288 * release the transport - caller must do that after dropping the pool 289 * lock. 290 */ 291static void 292xprt_unregister_locked(SVCXPRT *xprt) 293{ 294 SVCPOOL *pool = xprt->xp_pool; 295 296 KASSERT(xprt->xp_registered == TRUE, 297 ("xprt_unregister_locked: not registered")); 298 if (xprt->xp_active) { 299 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 300 xprt->xp_active = FALSE; 301 } 302 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 303 xprt->xp_registered = FALSE; 304} 305 306void 307xprt_unregister(SVCXPRT *xprt) 308{ 309 SVCPOOL *pool = xprt->xp_pool; 310 311 mtx_lock(&pool->sp_lock); 312 if (xprt->xp_registered == FALSE) { 313 /* Already unregistered by another thread */ 314 mtx_unlock(&pool->sp_lock); 315 return; 316 } 317 xprt_unregister_locked(xprt); 318 mtx_unlock(&pool->sp_lock); 319 320 SVC_RELEASE(xprt); 321} 322 323static void 324xprt_assignthread(SVCXPRT *xprt) 325{ 326 SVCPOOL *pool = xprt->xp_pool; 327 SVCTHREAD *st; 328 329 /* 330 * Attempt to assign a service thread to this 331 * transport. 332 */ 333 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) { 334 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs)) 335 break; 336 } 337 if (st) { 338 SVC_ACQUIRE(xprt); 339 xprt->xp_thread = st; 340 st->st_xprt = xprt; 341 cv_signal(&st->st_cond); 342 } else { 343 /* 344 * See if we can create a new thread. The 345 * actual thread creation happens in 346 * svc_run_internal because our locking state 347 * is poorly defined (we are typically called 348 * from a socket upcall). Don't create more 349 * than one thread per second. 350 */ 351 if (pool->sp_state == SVCPOOL_ACTIVE 352 && pool->sp_lastcreatetime < time_uptime 353 && pool->sp_threadcount < pool->sp_maxthreads) { 354 pool->sp_state = SVCPOOL_THREADWANTED; 355 } 356 } 357} 358 359void 360xprt_active(SVCXPRT *xprt) 361{ 362 SVCPOOL *pool = xprt->xp_pool; 363 364 mtx_lock(&pool->sp_lock); 365 366 if (!xprt->xp_registered) { 367 /* 368 * Race with xprt_unregister - we lose. 369 */ 370 mtx_unlock(&pool->sp_lock); 371 return; 372 } 373 374 if (!xprt->xp_active) { 375 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); 376 xprt->xp_active = TRUE; 377 xprt_assignthread(xprt); 378 } 379 380 mtx_unlock(&pool->sp_lock); 381} 382 383void 384xprt_inactive_locked(SVCXPRT *xprt) 385{ 386 SVCPOOL *pool = xprt->xp_pool; 387 388 if (xprt->xp_active) { 389 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 390 xprt->xp_active = FALSE; 391 } 392} 393 394void 395xprt_inactive(SVCXPRT *xprt) 396{ 397 SVCPOOL *pool = xprt->xp_pool; 398 399 mtx_lock(&pool->sp_lock); 400 xprt_inactive_locked(xprt); 401 mtx_unlock(&pool->sp_lock); 402} 403 404/* 405 * Add a service program to the callout list. 406 * The dispatch routine will be called when a rpc request for this 407 * program number comes in. 408 */ 409bool_t 410svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 411 void (*dispatch)(struct svc_req *, SVCXPRT *), 412 const struct netconfig *nconf) 413{ 414 SVCPOOL *pool = xprt->xp_pool; 415 struct svc_callout *s; 416 char *netid = NULL; 417 int flag = 0; 418 419/* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 420 421 if (xprt->xp_netid) { 422 netid = strdup(xprt->xp_netid, M_RPC); 423 flag = 1; 424 } else if (nconf && nconf->nc_netid) { 425 netid = strdup(nconf->nc_netid, M_RPC); 426 flag = 1; 427 } /* must have been created with svc_raw_create */ 428 if ((netid == NULL) && (flag == 1)) { 429 return (FALSE); 430 } 431 432 mtx_lock(&pool->sp_lock); 433 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 434 if (netid) 435 free(netid, M_RPC); 436 if (s->sc_dispatch == dispatch) 437 goto rpcb_it; /* he is registering another xptr */ 438 mtx_unlock(&pool->sp_lock); 439 return (FALSE); 440 } 441 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 442 if (s == NULL) { 443 if (netid) 444 free(netid, M_RPC); 445 mtx_unlock(&pool->sp_lock); 446 return (FALSE); 447 } 448 449 s->sc_prog = prog; 450 s->sc_vers = vers; 451 s->sc_dispatch = dispatch; 452 s->sc_netid = netid; 453 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 454 455 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 456 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 457 458rpcb_it: 459 mtx_unlock(&pool->sp_lock); 460 /* now register the information with the local binder service */ 461 if (nconf) { 462 bool_t dummy; 463 struct netconfig tnc; 464 struct netbuf nb; 465 tnc = *nconf; 466 nb.buf = &xprt->xp_ltaddr; 467 nb.len = xprt->xp_ltaddr.ss_len; 468 dummy = rpcb_set(prog, vers, &tnc, &nb); 469 return (dummy); 470 } 471 return (TRUE); 472} 473 474/* 475 * Remove a service program from the callout list. 476 */ 477void 478svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 479{ 480 struct svc_callout *s; 481 482 /* unregister the information anyway */ 483 (void) rpcb_unset(prog, vers, NULL); 484 mtx_lock(&pool->sp_lock); 485 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 486 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 487 if (s->sc_netid) 488 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 489 mem_free(s, sizeof (struct svc_callout)); 490 } 491 mtx_unlock(&pool->sp_lock); 492} 493 494/* ********************** CALLOUT list related stuff ************* */ 495 496/* 497 * Search the callout list for a program number, return the callout 498 * struct. 499 */ 500static struct svc_callout * 501svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 502{ 503 struct svc_callout *s; 504 505 mtx_assert(&pool->sp_lock, MA_OWNED); 506 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 507 if (s->sc_prog == prog && s->sc_vers == vers 508 && (netid == NULL || s->sc_netid == NULL || 509 strcmp(netid, s->sc_netid) == 0)) 510 break; 511 } 512 513 return (s); 514} 515 516/* ******************* REPLY GENERATION ROUTINES ************ */ 517 518static bool_t 519svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 520 struct mbuf *body) 521{ 522 SVCXPRT *xprt = rqstp->rq_xprt; 523 bool_t ok; 524 525 if (rqstp->rq_args) { 526 m_freem(rqstp->rq_args); 527 rqstp->rq_args = NULL; 528 } 529 530 if (xprt->xp_pool->sp_rcache) 531 replay_setreply(xprt->xp_pool->sp_rcache, 532 rply, svc_getrpccaller(rqstp), body); 533 534 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 535 return (FALSE); 536 537 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 538 if (rqstp->rq_addr) { 539 free(rqstp->rq_addr, M_SONAME); 540 rqstp->rq_addr = NULL; 541 } 542 543 return (ok); 544} 545 546/* 547 * Send a reply to an rpc request 548 */ 549bool_t 550svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 551{ 552 struct rpc_msg rply; 553 struct mbuf *m; 554 XDR xdrs; 555 bool_t ok; 556 557 rply.rm_xid = rqstp->rq_xid; 558 rply.rm_direction = REPLY; 559 rply.rm_reply.rp_stat = MSG_ACCEPTED; 560 rply.acpted_rply.ar_verf = rqstp->rq_verf; 561 rply.acpted_rply.ar_stat = SUCCESS; 562 rply.acpted_rply.ar_results.where = NULL; 563 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 564 565 MGET(m, M_WAIT, MT_DATA); 566 MCLGET(m, M_WAIT); 567 m->m_len = 0; 568 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 569 ok = xdr_results(&xdrs, xdr_location); 570 XDR_DESTROY(&xdrs); 571 572 if (ok) { 573 return (svc_sendreply_common(rqstp, &rply, m)); 574 } else { 575 m_freem(m); 576 return (FALSE); 577 } 578} 579 580bool_t 581svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 582{ 583 struct rpc_msg rply; 584 585 rply.rm_xid = rqstp->rq_xid; 586 rply.rm_direction = REPLY; 587 rply.rm_reply.rp_stat = MSG_ACCEPTED; 588 rply.acpted_rply.ar_verf = rqstp->rq_verf; 589 rply.acpted_rply.ar_stat = SUCCESS; 590 rply.acpted_rply.ar_results.where = NULL; 591 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 592 593 return (svc_sendreply_common(rqstp, &rply, m)); 594} 595 596/* 597 * No procedure error reply 598 */ 599void 600svcerr_noproc(struct svc_req *rqstp) 601{ 602 SVCXPRT *xprt = rqstp->rq_xprt; 603 struct rpc_msg rply; 604 605 rply.rm_xid = rqstp->rq_xid; 606 rply.rm_direction = REPLY; 607 rply.rm_reply.rp_stat = MSG_ACCEPTED; 608 rply.acpted_rply.ar_verf = rqstp->rq_verf; 609 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 610 611 if (xprt->xp_pool->sp_rcache) 612 replay_setreply(xprt->xp_pool->sp_rcache, 613 &rply, svc_getrpccaller(rqstp), NULL); 614 615 svc_sendreply_common(rqstp, &rply, NULL); 616} 617 618/* 619 * Can't decode args error reply 620 */ 621void 622svcerr_decode(struct svc_req *rqstp) 623{ 624 SVCXPRT *xprt = rqstp->rq_xprt; 625 struct rpc_msg rply; 626 627 rply.rm_xid = rqstp->rq_xid; 628 rply.rm_direction = REPLY; 629 rply.rm_reply.rp_stat = MSG_ACCEPTED; 630 rply.acpted_rply.ar_verf = rqstp->rq_verf; 631 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 632 633 if (xprt->xp_pool->sp_rcache) 634 replay_setreply(xprt->xp_pool->sp_rcache, 635 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 636 637 svc_sendreply_common(rqstp, &rply, NULL); 638} 639 640/* 641 * Some system error 642 */ 643void 644svcerr_systemerr(struct svc_req *rqstp) 645{ 646 SVCXPRT *xprt = rqstp->rq_xprt; 647 struct rpc_msg rply; 648 649 rply.rm_xid = rqstp->rq_xid; 650 rply.rm_direction = REPLY; 651 rply.rm_reply.rp_stat = MSG_ACCEPTED; 652 rply.acpted_rply.ar_verf = rqstp->rq_verf; 653 rply.acpted_rply.ar_stat = SYSTEM_ERR; 654 655 if (xprt->xp_pool->sp_rcache) 656 replay_setreply(xprt->xp_pool->sp_rcache, 657 &rply, svc_getrpccaller(rqstp), NULL); 658 659 svc_sendreply_common(rqstp, &rply, NULL); 660} 661 662/* 663 * Authentication error reply 664 */ 665void 666svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 667{ 668 SVCXPRT *xprt = rqstp->rq_xprt; 669 struct rpc_msg rply; 670 671 rply.rm_xid = rqstp->rq_xid; 672 rply.rm_direction = REPLY; 673 rply.rm_reply.rp_stat = MSG_DENIED; 674 rply.rjcted_rply.rj_stat = AUTH_ERROR; 675 rply.rjcted_rply.rj_why = why; 676 677 if (xprt->xp_pool->sp_rcache) 678 replay_setreply(xprt->xp_pool->sp_rcache, 679 &rply, svc_getrpccaller(rqstp), NULL); 680 681 svc_sendreply_common(rqstp, &rply, NULL); 682} 683 684/* 685 * Auth too weak error reply 686 */ 687void 688svcerr_weakauth(struct svc_req *rqstp) 689{ 690 691 svcerr_auth(rqstp, AUTH_TOOWEAK); 692} 693 694/* 695 * Program unavailable error reply 696 */ 697void 698svcerr_noprog(struct svc_req *rqstp) 699{ 700 SVCXPRT *xprt = rqstp->rq_xprt; 701 struct rpc_msg rply; 702 703 rply.rm_xid = rqstp->rq_xid; 704 rply.rm_direction = REPLY; 705 rply.rm_reply.rp_stat = MSG_ACCEPTED; 706 rply.acpted_rply.ar_verf = rqstp->rq_verf; 707 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 708 709 if (xprt->xp_pool->sp_rcache) 710 replay_setreply(xprt->xp_pool->sp_rcache, 711 &rply, svc_getrpccaller(rqstp), NULL); 712 713 svc_sendreply_common(rqstp, &rply, NULL); 714} 715 716/* 717 * Program version mismatch error reply 718 */ 719void 720svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 721{ 722 SVCXPRT *xprt = rqstp->rq_xprt; 723 struct rpc_msg rply; 724 725 rply.rm_xid = rqstp->rq_xid; 726 rply.rm_direction = REPLY; 727 rply.rm_reply.rp_stat = MSG_ACCEPTED; 728 rply.acpted_rply.ar_verf = rqstp->rq_verf; 729 rply.acpted_rply.ar_stat = PROG_MISMATCH; 730 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 731 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 732 733 if (xprt->xp_pool->sp_rcache) 734 replay_setreply(xprt->xp_pool->sp_rcache, 735 &rply, svc_getrpccaller(rqstp), NULL); 736 737 svc_sendreply_common(rqstp, &rply, NULL); 738} 739 740/* 741 * Allocate a new server transport structure. All fields are 742 * initialized to zero and xp_p3 is initialized to point at an 743 * extension structure to hold various flags and authentication 744 * parameters. 745 */ 746SVCXPRT * 747svc_xprt_alloc() 748{ 749 SVCXPRT *xprt; 750 SVCXPRT_EXT *ext; 751 752 xprt = mem_alloc(sizeof(SVCXPRT)); 753 memset(xprt, 0, sizeof(SVCXPRT)); 754 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 755 memset(ext, 0, sizeof(SVCXPRT_EXT)); 756 xprt->xp_p3 = ext; 757 refcount_init(&xprt->xp_refs, 1); 758 759 return (xprt); 760} 761 762/* 763 * Free a server transport structure. 764 */ 765void 766svc_xprt_free(xprt) 767 SVCXPRT *xprt; 768{ 769 770 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 771 mem_free(xprt, sizeof(SVCXPRT)); 772} 773 774/* ******************* SERVER INPUT STUFF ******************* */ 775 776/* 777 * Read RPC requests from a transport and queue them to be 778 * executed. We handle authentication and replay cache replies here. 779 * Actually dispatching the RPC is deferred till svc_executereq. 780 */ 781static enum xprt_stat 782svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 783{ 784 SVCPOOL *pool = xprt->xp_pool; 785 struct svc_req *r; 786 struct rpc_msg msg; 787 struct mbuf *args; 788 enum xprt_stat stat; 789 790 /* now receive msgs from xprtprt (support batch calls) */ 791 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 792 793 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 794 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 795 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 796 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 797 enum auth_stat why; 798 799 /* 800 * Handle replays and authenticate before queuing the 801 * request to be executed. 802 */ 803 SVC_ACQUIRE(xprt); 804 r->rq_xprt = xprt; 805 if (pool->sp_rcache) { 806 struct rpc_msg repmsg; 807 struct mbuf *repbody; 808 enum replay_state rs; 809 rs = replay_find(pool->sp_rcache, &msg, 810 svc_getrpccaller(r), &repmsg, &repbody); 811 switch (rs) { 812 case RS_NEW: 813 break; 814 case RS_DONE: 815 SVC_REPLY(xprt, &repmsg, r->rq_addr, 816 repbody); 817 if (r->rq_addr) { 818 free(r->rq_addr, M_SONAME); 819 r->rq_addr = NULL; 820 } 821 m_freem(args); 822 goto call_done; 823 824 default: 825 m_freem(args); 826 goto call_done; 827 } 828 } 829 830 r->rq_xid = msg.rm_xid; 831 r->rq_prog = msg.rm_call.cb_prog; 832 r->rq_vers = msg.rm_call.cb_vers; 833 r->rq_proc = msg.rm_call.cb_proc; 834 r->rq_size = sizeof(*r) + m_length(args, NULL); 835 r->rq_args = args; 836 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 837 /* 838 * RPCSEC_GSS uses this return code 839 * for requests that form part of its 840 * context establishment protocol and 841 * should not be dispatched to the 842 * application. 843 */ 844 if (why != RPCSEC_GSS_NODISPATCH) 845 svcerr_auth(r, why); 846 goto call_done; 847 } 848 849 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 850 svcerr_decode(r); 851 goto call_done; 852 } 853 854 /* 855 * Everything checks out, return request to caller. 856 */ 857 *rqstp_ret = r; 858 r = NULL; 859 } 860call_done: 861 if (r) { 862 svc_freereq(r); 863 r = NULL; 864 } 865 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 866 xprt_unregister(xprt); 867 } 868 869 return (stat); 870} 871 872static void 873svc_executereq(struct svc_req *rqstp) 874{ 875 SVCXPRT *xprt = rqstp->rq_xprt; 876 SVCPOOL *pool = xprt->xp_pool; 877 int prog_found; 878 rpcvers_t low_vers; 879 rpcvers_t high_vers; 880 struct svc_callout *s; 881 882 /* now match message with a registered service*/ 883 prog_found = FALSE; 884 low_vers = (rpcvers_t) -1L; 885 high_vers = (rpcvers_t) 0L; 886 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 887 if (s->sc_prog == rqstp->rq_prog) { 888 if (s->sc_vers == rqstp->rq_vers) { 889 /* 890 * We hand ownership of r to the 891 * dispatch method - they must call 892 * svc_freereq. 893 */ 894 (*s->sc_dispatch)(rqstp, xprt); 895 return; 896 } /* found correct version */ 897 prog_found = TRUE; 898 if (s->sc_vers < low_vers) 899 low_vers = s->sc_vers; 900 if (s->sc_vers > high_vers) 901 high_vers = s->sc_vers; 902 } /* found correct program */ 903 } 904 905 /* 906 * if we got here, the program or version 907 * is not served ... 908 */ 909 if (prog_found) 910 svcerr_progvers(rqstp, low_vers, high_vers); 911 else 912 svcerr_noprog(rqstp); 913 914 svc_freereq(rqstp); 915} 916 917static void 918svc_checkidle(SVCPOOL *pool) 919{ 920 SVCXPRT *xprt, *nxprt; 921 time_t timo; 922 struct svcxprt_list cleanup; 923 924 TAILQ_INIT(&cleanup); 925 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 926 /* 927 * Only some transports have idle timers. Don't time 928 * something out which is just waking up. 929 */ 930 if (!xprt->xp_idletimeout || xprt->xp_thread) 931 continue; 932 933 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 934 if (time_uptime > timo) { 935 xprt_unregister_locked(xprt); 936 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 937 } 938 } 939 940 mtx_unlock(&pool->sp_lock); 941 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 942 SVC_RELEASE(xprt); 943 } 944 mtx_lock(&pool->sp_lock); 945 946} 947 948static void 949svc_assign_waiting_sockets(SVCPOOL *pool) 950{ 951 SVCXPRT *xprt; 952 953 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) { 954 if (!xprt->xp_thread) { 955 xprt_assignthread(xprt); 956 } 957 } 958} 959 960static bool_t 961svc_request_space_available(SVCPOOL *pool) 962{ 963 964 mtx_assert(&pool->sp_lock, MA_OWNED); 965 966 if (pool->sp_space_throttled) { 967 /* 968 * Below the low-water yet? If so, assign any waiting sockets. 969 */ 970 if (pool->sp_space_used < pool->sp_space_low) { 971 pool->sp_space_throttled = FALSE; 972 svc_assign_waiting_sockets(pool); 973 return TRUE; 974 } 975 976 return FALSE; 977 } else { 978 if (pool->sp_space_used 979 >= pool->sp_space_high) { 980 pool->sp_space_throttled = TRUE; 981 pool->sp_space_throttle_count++; 982 return FALSE; 983 } 984 985 return TRUE; 986 } 987} 988 989static void 990svc_run_internal(SVCPOOL *pool, bool_t ismaster) 991{ 992 SVCTHREAD *st, *stpref; 993 SVCXPRT *xprt; 994 enum xprt_stat stat; 995 struct svc_req *rqstp; 996 int error; 997 998 st = mem_alloc(sizeof(*st)); 999 st->st_xprt = NULL; 1000 STAILQ_INIT(&st->st_reqs); 1001 cv_init(&st->st_cond, "rpcsvc"); 1002 1003 mtx_lock(&pool->sp_lock); 1004 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 1005 1006 /* 1007 * If we are a new thread which was spawned to cope with 1008 * increased load, set the state back to SVCPOOL_ACTIVE. 1009 */ 1010 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1011 pool->sp_state = SVCPOOL_ACTIVE; 1012 1013 while (pool->sp_state != SVCPOOL_CLOSING) { 1014 /* 1015 * Create new thread if requested. 1016 */ 1017 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1018 pool->sp_state = SVCPOOL_THREADSTARTING; 1019 pool->sp_lastcreatetime = time_uptime; 1020 mtx_unlock(&pool->sp_lock); 1021 svc_new_thread(pool); 1022 mtx_lock(&pool->sp_lock); 1023 continue; 1024 } 1025 1026 /* 1027 * Check for idle transports once per second. 1028 */ 1029 if (time_uptime > pool->sp_lastidlecheck) { 1030 pool->sp_lastidlecheck = time_uptime; 1031 svc_checkidle(pool); 1032 } 1033 1034 xprt = st->st_xprt; 1035 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1036 /* 1037 * Enforce maxthreads count. 1038 */ 1039 if (pool->sp_threadcount > pool->sp_maxthreads) 1040 break; 1041 1042 /* 1043 * Before sleeping, see if we can find an 1044 * active transport which isn't being serviced 1045 * by a thread. 1046 */ 1047 if (svc_request_space_available(pool)) { 1048 TAILQ_FOREACH(xprt, &pool->sp_active, 1049 xp_alink) { 1050 if (!xprt->xp_thread) { 1051 SVC_ACQUIRE(xprt); 1052 xprt->xp_thread = st; 1053 st->st_xprt = xprt; 1054 break; 1055 } 1056 } 1057 } 1058 if (st->st_xprt) 1059 continue; 1060 1061 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1062 if (ismaster || (!ismaster && 1063 pool->sp_threadcount > pool->sp_minthreads)) 1064 error = cv_timedwait_sig(&st->st_cond, 1065 &pool->sp_lock, 5 * hz); 1066 else 1067 error = cv_wait_sig(&st->st_cond, 1068 &pool->sp_lock); 1069 LIST_REMOVE(st, st_ilink); 1070 1071 /* 1072 * Reduce worker thread count when idle. 1073 */ 1074 if (error == EWOULDBLOCK) { 1075 if (!ismaster 1076 && (pool->sp_threadcount 1077 > pool->sp_minthreads) 1078 && !st->st_xprt 1079 && STAILQ_EMPTY(&st->st_reqs)) 1080 break; 1081 } else if (error) { 1082 mtx_unlock(&pool->sp_lock); 1083 svc_exit(pool); 1084 mtx_lock(&pool->sp_lock); 1085 break; 1086 } 1087 continue; 1088 } 1089 1090 if (xprt) { 1091 /* 1092 * Drain the transport socket and queue up any 1093 * RPCs. 1094 */ 1095 xprt->xp_lastactive = time_uptime; 1096 stat = XPRT_IDLE; 1097 do { 1098 if (!svc_request_space_available(pool)) 1099 break; 1100 rqstp = NULL; 1101 mtx_unlock(&pool->sp_lock); 1102 stat = svc_getreq(xprt, &rqstp); 1103 mtx_lock(&pool->sp_lock); 1104 if (rqstp) { 1105 /* 1106 * See if the application has 1107 * a preference for some other 1108 * thread. 1109 */ 1110 stpref = st; 1111 if (pool->sp_assign) 1112 stpref = pool->sp_assign(st, 1113 rqstp); 1114 1115 pool->sp_space_used += 1116 rqstp->rq_size; 1117 if (pool->sp_space_used 1118 > pool->sp_space_used_highest) 1119 pool->sp_space_used_highest = 1120 pool->sp_space_used; 1121 rqstp->rq_thread = stpref; 1122 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1123 rqstp, rq_link); 1124 stpref->st_reqcount++; 1125 1126 /* 1127 * If we assigned the request 1128 * to another thread, make 1129 * sure its awake and continue 1130 * reading from the 1131 * socket. Otherwise, try to 1132 * find some other thread to 1133 * read from the socket and 1134 * execute the request 1135 * immediately. 1136 */ 1137 if (stpref != st) { 1138 cv_signal(&stpref->st_cond); 1139 continue; 1140 } else { 1141 break; 1142 } 1143 } 1144 } while (stat == XPRT_MOREREQS 1145 && pool->sp_state != SVCPOOL_CLOSING); 1146 1147 /* 1148 * Move this transport to the end of the 1149 * active list to ensure fairness when 1150 * multiple transports are active. If this was 1151 * the last queued request, svc_getreq will 1152 * end up calling xprt_inactive to remove from 1153 * the active list. 1154 */ 1155 xprt->xp_thread = NULL; 1156 st->st_xprt = NULL; 1157 if (xprt->xp_active) { 1158 xprt_assignthread(xprt); 1159 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1160 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 1161 xp_alink); 1162 } 1163 mtx_unlock(&pool->sp_lock); 1164 SVC_RELEASE(xprt); 1165 mtx_lock(&pool->sp_lock); 1166 } 1167 1168 /* 1169 * Execute what we have queued. 1170 */ 1171 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1172 size_t sz = rqstp->rq_size; 1173 mtx_unlock(&pool->sp_lock); 1174 svc_executereq(rqstp); 1175 mtx_lock(&pool->sp_lock); 1176 pool->sp_space_used -= sz; 1177 } 1178 } 1179 1180 if (st->st_xprt) { 1181 xprt = st->st_xprt; 1182 st->st_xprt = NULL; 1183 SVC_RELEASE(xprt); 1184 } 1185 1186 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1187 LIST_REMOVE(st, st_link); 1188 pool->sp_threadcount--; 1189 1190 mtx_unlock(&pool->sp_lock); 1191 1192 cv_destroy(&st->st_cond); 1193 mem_free(st, sizeof(*st)); 1194 1195 if (!ismaster) 1196 wakeup(pool); 1197} 1198 1199static void 1200svc_thread_start(void *arg) 1201{ 1202 1203 svc_run_internal((SVCPOOL *) arg, FALSE); 1204 kthread_exit(); 1205} 1206 1207static void 1208svc_new_thread(SVCPOOL *pool) 1209{ 1210 struct thread *td; 1211 1212 pool->sp_threadcount++; 1213 kthread_add(svc_thread_start, pool, 1214 pool->sp_proc, &td, 0, 0, 1215 "%s: service", pool->sp_name); 1216} 1217 1218void 1219svc_run(SVCPOOL *pool) 1220{ 1221 int i; 1222 struct proc *p; 1223 struct thread *td; 1224 1225 p = curproc; 1226 td = curthread; 1227 snprintf(td->td_name, sizeof(td->td_name), 1228 "%s: master", pool->sp_name); 1229 pool->sp_state = SVCPOOL_ACTIVE; 1230 pool->sp_proc = p; 1231 pool->sp_lastcreatetime = time_uptime; 1232 pool->sp_threadcount = 1; 1233 1234 for (i = 1; i < pool->sp_minthreads; i++) { 1235 svc_new_thread(pool); 1236 } 1237 1238 svc_run_internal(pool, TRUE); 1239 1240 mtx_lock(&pool->sp_lock); 1241 while (pool->sp_threadcount > 0) 1242 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1243 mtx_unlock(&pool->sp_lock); 1244} 1245 1246void 1247svc_exit(SVCPOOL *pool) 1248{ 1249 SVCTHREAD *st; 1250 1251 mtx_lock(&pool->sp_lock); 1252 1253 if (pool->sp_state != SVCPOOL_CLOSING) { 1254 pool->sp_state = SVCPOOL_CLOSING; 1255 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1256 cv_signal(&st->st_cond); 1257 } 1258 1259 mtx_unlock(&pool->sp_lock); 1260} 1261 1262bool_t 1263svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1264{ 1265 struct mbuf *m; 1266 XDR xdrs; 1267 bool_t stat; 1268 1269 m = rqstp->rq_args; 1270 rqstp->rq_args = NULL; 1271 1272 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1273 stat = xargs(&xdrs, args); 1274 XDR_DESTROY(&xdrs); 1275 1276 return (stat); 1277} 1278 1279bool_t 1280svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1281{ 1282 XDR xdrs; 1283 1284 if (rqstp->rq_addr) { 1285 free(rqstp->rq_addr, M_SONAME); 1286 rqstp->rq_addr = NULL; 1287 } 1288 1289 xdrs.x_op = XDR_FREE; 1290 return (xargs(&xdrs, args)); 1291} 1292 1293void 1294svc_freereq(struct svc_req *rqstp) 1295{ 1296 SVCTHREAD *st; 1297 SVCXPRT *xprt; 1298 SVCPOOL *pool; 1299 1300 st = rqstp->rq_thread; 1301 xprt = rqstp->rq_xprt; 1302 if (xprt) 1303 pool = xprt->xp_pool; 1304 else 1305 pool = NULL; 1306 if (st) { 1307 mtx_lock(&pool->sp_lock); 1308 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1309 ("Freeing request out of order")); 1310 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1311 st->st_reqcount--; 1312 if (pool->sp_done) 1313 pool->sp_done(st, rqstp); 1314 mtx_unlock(&pool->sp_lock); 1315 } 1316 1317 if (rqstp->rq_auth.svc_ah_ops) 1318 SVCAUTH_RELEASE(&rqstp->rq_auth); 1319 1320 if (rqstp->rq_xprt) { 1321 SVC_RELEASE(rqstp->rq_xprt); 1322 } 1323 1324 if (rqstp->rq_addr) 1325 free(rqstp->rq_addr, M_SONAME); 1326 1327 if (rqstp->rq_args) 1328 m_freem(rqstp->rq_args); 1329 1330 free(rqstp, M_RPC); 1331} 1332