svc.c revision 261065
1/* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3/*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31#if defined(LIBC_SCCS) && !defined(lint) 32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34#endif 35#include <sys/cdefs.h> 36__FBSDID("$FreeBSD: stable/9/sys/rpc/svc.c 261065 2014-01-23 00:44:45Z mav $"); 37 38/* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48#include <sys/param.h> 49#include <sys/lock.h> 50#include <sys/kernel.h> 51#include <sys/kthread.h> 52#include <sys/malloc.h> 53#include <sys/mbuf.h> 54#include <sys/mutex.h> 55#include <sys/proc.h> 56#include <sys/queue.h> 57#include <sys/socketvar.h> 58#include <sys/systm.h> 59#include <sys/ucred.h> 60 61#include <rpc/rpc.h> 62#include <rpc/rpcb_clnt.h> 63#include <rpc/replay.h> 64 65#include <rpc/rpc_com.h> 66 67#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 68#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 69 70static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 71 char *); 72static void svc_new_thread(SVCPOOL *pool); 73static void xprt_unregister_locked(SVCXPRT *xprt); 74 75/* *************** SVCXPRT related stuff **************** */ 76 77static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 78static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 79 80SVCPOOL* 81svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 82{ 83 SVCPOOL *pool; 84 85 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 86 87 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 88 pool->sp_name = name; 89 pool->sp_state = SVCPOOL_INIT; 90 pool->sp_proc = NULL; 91 TAILQ_INIT(&pool->sp_xlist); 92 TAILQ_INIT(&pool->sp_active); 93 TAILQ_INIT(&pool->sp_callouts); 94 LIST_INIT(&pool->sp_threads); 95 LIST_INIT(&pool->sp_idlethreads); 96 pool->sp_minthreads = 1; 97 pool->sp_maxthreads = 1; 98 pool->sp_threadcount = 0; 99 100 /* 101 * Don't use more than a quarter of mbuf clusters or more than 102 * 45Mb buffering requests. 103 */ 104 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 105 if (pool->sp_space_high > 45 << 20) 106 pool->sp_space_high = 45 << 20; 107 pool->sp_space_low = 2 * pool->sp_space_high / 3; 108 109 sysctl_ctx_init(&pool->sp_sysctl); 110 if (sysctl_base) { 111 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 112 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 113 pool, 0, svcpool_minthread_sysctl, "I", ""); 114 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 115 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 116 pool, 0, svcpool_maxthread_sysctl, "I", ""); 117 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 118 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 119 120 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 121 "request_space_used", CTLFLAG_RD, 122 &pool->sp_space_used, 0, 123 "Space in parsed but not handled requests."); 124 125 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 126 "request_space_used_highest", CTLFLAG_RD, 127 &pool->sp_space_used_highest, 0, 128 "Highest space used since reboot."); 129 130 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "request_space_high", CTLFLAG_RW, 132 &pool->sp_space_high, 0, 133 "Maximum space in parsed but not handled requests."); 134 135 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 136 "request_space_low", CTLFLAG_RW, 137 &pool->sp_space_low, 0, 138 "Low water mark for request space."); 139 140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141 "request_space_throttled", CTLFLAG_RD, 142 &pool->sp_space_throttled, 0, 143 "Whether nfs requests are currently throttled"); 144 145 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 146 "request_space_throttle_count", CTLFLAG_RD, 147 &pool->sp_space_throttle_count, 0, 148 "Count of times throttling based on request space has occurred"); 149 } 150 151 return pool; 152} 153 154void 155svcpool_destroy(SVCPOOL *pool) 156{ 157 SVCXPRT *xprt, *nxprt; 158 struct svc_callout *s; 159 struct svcxprt_list cleanup; 160 161 TAILQ_INIT(&cleanup); 162 mtx_lock(&pool->sp_lock); 163 164 while (TAILQ_FIRST(&pool->sp_xlist)) { 165 xprt = TAILQ_FIRST(&pool->sp_xlist); 166 xprt_unregister_locked(xprt); 167 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 168 } 169 170 while (TAILQ_FIRST(&pool->sp_callouts)) { 171 s = TAILQ_FIRST(&pool->sp_callouts); 172 mtx_unlock(&pool->sp_lock); 173 svc_unreg(pool, s->sc_prog, s->sc_vers); 174 mtx_lock(&pool->sp_lock); 175 } 176 mtx_unlock(&pool->sp_lock); 177 178 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 179 SVC_RELEASE(xprt); 180 } 181 182 mtx_destroy(&pool->sp_lock); 183 184 if (pool->sp_rcache) 185 replay_freecache(pool->sp_rcache); 186 187 sysctl_ctx_free(&pool->sp_sysctl); 188 free(pool, M_RPC); 189} 190 191static bool_t 192svcpool_active(SVCPOOL *pool) 193{ 194 enum svcpool_state state = pool->sp_state; 195 196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 197 return (FALSE); 198 return (TRUE); 199} 200 201/* 202 * Sysctl handler to set the minimum thread count on a pool 203 */ 204static int 205svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 206{ 207 SVCPOOL *pool; 208 int newminthreads, error, n; 209 210 pool = oidp->oid_arg1; 211 newminthreads = pool->sp_minthreads; 212 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 213 if (error == 0 && newminthreads != pool->sp_minthreads) { 214 if (newminthreads > pool->sp_maxthreads) 215 return (EINVAL); 216 mtx_lock(&pool->sp_lock); 217 if (newminthreads > pool->sp_minthreads 218 && svcpool_active(pool)) { 219 /* 220 * If the pool is running and we are 221 * increasing, create some more threads now. 222 */ 223 n = newminthreads - pool->sp_threadcount; 224 if (n > 0) { 225 mtx_unlock(&pool->sp_lock); 226 while (n--) 227 svc_new_thread(pool); 228 mtx_lock(&pool->sp_lock); 229 } 230 } 231 pool->sp_minthreads = newminthreads; 232 mtx_unlock(&pool->sp_lock); 233 } 234 return (error); 235} 236 237/* 238 * Sysctl handler to set the maximum thread count on a pool 239 */ 240static int 241svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 242{ 243 SVCPOOL *pool; 244 SVCTHREAD *st; 245 int newmaxthreads, error; 246 247 pool = oidp->oid_arg1; 248 newmaxthreads = pool->sp_maxthreads; 249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 251 if (newmaxthreads < pool->sp_minthreads) 252 return (EINVAL); 253 mtx_lock(&pool->sp_lock); 254 if (newmaxthreads < pool->sp_maxthreads 255 && svcpool_active(pool)) { 256 /* 257 * If the pool is running and we are 258 * decreasing, wake up some idle threads to 259 * encourage them to exit. 260 */ 261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 262 cv_signal(&st->st_cond); 263 } 264 pool->sp_maxthreads = newmaxthreads; 265 mtx_unlock(&pool->sp_lock); 266 } 267 return (error); 268} 269 270/* 271 * Activate a transport handle. 272 */ 273void 274xprt_register(SVCXPRT *xprt) 275{ 276 SVCPOOL *pool = xprt->xp_pool; 277 278 SVC_ACQUIRE(xprt); 279 mtx_lock(&pool->sp_lock); 280 xprt->xp_registered = TRUE; 281 xprt->xp_active = FALSE; 282 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 283 mtx_unlock(&pool->sp_lock); 284} 285 286/* 287 * De-activate a transport handle. Note: the locked version doesn't 288 * release the transport - caller must do that after dropping the pool 289 * lock. 290 */ 291static void 292xprt_unregister_locked(SVCXPRT *xprt) 293{ 294 SVCPOOL *pool = xprt->xp_pool; 295 296 mtx_assert(&pool->sp_lock, MA_OWNED); 297 KASSERT(xprt->xp_registered == TRUE, 298 ("xprt_unregister_locked: not registered")); 299 xprt_inactive_locked(xprt); 300 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 301 xprt->xp_registered = FALSE; 302} 303 304void 305xprt_unregister(SVCXPRT *xprt) 306{ 307 SVCPOOL *pool = xprt->xp_pool; 308 309 mtx_lock(&pool->sp_lock); 310 if (xprt->xp_registered == FALSE) { 311 /* Already unregistered by another thread */ 312 mtx_unlock(&pool->sp_lock); 313 return; 314 } 315 xprt_unregister_locked(xprt); 316 mtx_unlock(&pool->sp_lock); 317 318 SVC_RELEASE(xprt); 319} 320 321/* 322 * Attempt to assign a service thread to this transport. 323 */ 324static int 325xprt_assignthread(SVCXPRT *xprt) 326{ 327 SVCPOOL *pool = xprt->xp_pool; 328 SVCTHREAD *st; 329 330 mtx_assert(&pool->sp_lock, MA_OWNED); 331 st = LIST_FIRST(&pool->sp_idlethreads); 332 if (st) { 333 LIST_REMOVE(st, st_ilink); 334 st->st_idle = FALSE; 335 SVC_ACQUIRE(xprt); 336 xprt->xp_thread = st; 337 st->st_xprt = xprt; 338 cv_signal(&st->st_cond); 339 return (TRUE); 340 } else { 341 /* 342 * See if we can create a new thread. The 343 * actual thread creation happens in 344 * svc_run_internal because our locking state 345 * is poorly defined (we are typically called 346 * from a socket upcall). Don't create more 347 * than one thread per second. 348 */ 349 if (pool->sp_state == SVCPOOL_ACTIVE 350 && pool->sp_lastcreatetime < time_uptime 351 && pool->sp_threadcount < pool->sp_maxthreads) { 352 pool->sp_state = SVCPOOL_THREADWANTED; 353 } 354 } 355 return (FALSE); 356} 357 358void 359xprt_active(SVCXPRT *xprt) 360{ 361 SVCPOOL *pool = xprt->xp_pool; 362 363 mtx_lock(&pool->sp_lock); 364 365 if (!xprt->xp_registered) { 366 /* 367 * Race with xprt_unregister - we lose. 368 */ 369 mtx_unlock(&pool->sp_lock); 370 return; 371 } 372 373 if (!xprt->xp_active) { 374 xprt->xp_active = TRUE; 375 if (xprt->xp_thread == NULL) { 376 if (!xprt_assignthread(xprt)) 377 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 378 xp_alink); 379 } 380 } 381 382 mtx_unlock(&pool->sp_lock); 383} 384 385void 386xprt_inactive_locked(SVCXPRT *xprt) 387{ 388 SVCPOOL *pool = xprt->xp_pool; 389 390 mtx_assert(&pool->sp_lock, MA_OWNED); 391 if (xprt->xp_active) { 392 if (xprt->xp_thread == NULL) 393 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 394 xprt->xp_active = FALSE; 395 } 396} 397 398void 399xprt_inactive(SVCXPRT *xprt) 400{ 401 SVCPOOL *pool = xprt->xp_pool; 402 403 mtx_lock(&pool->sp_lock); 404 xprt_inactive_locked(xprt); 405 mtx_unlock(&pool->sp_lock); 406} 407 408/* 409 * Variant of xprt_inactive() for use only when sure that port is 410 * assigned to thread. For example, withing receive handlers. 411 */ 412void 413xprt_inactive_self(SVCXPRT *xprt) 414{ 415 416 KASSERT(xprt->xp_thread != NULL, 417 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 418 xprt->xp_active = FALSE; 419} 420 421/* 422 * Add a service program to the callout list. 423 * The dispatch routine will be called when a rpc request for this 424 * program number comes in. 425 */ 426bool_t 427svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 428 void (*dispatch)(struct svc_req *, SVCXPRT *), 429 const struct netconfig *nconf) 430{ 431 SVCPOOL *pool = xprt->xp_pool; 432 struct svc_callout *s; 433 char *netid = NULL; 434 int flag = 0; 435 436/* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 437 438 if (xprt->xp_netid) { 439 netid = strdup(xprt->xp_netid, M_RPC); 440 flag = 1; 441 } else if (nconf && nconf->nc_netid) { 442 netid = strdup(nconf->nc_netid, M_RPC); 443 flag = 1; 444 } /* must have been created with svc_raw_create */ 445 if ((netid == NULL) && (flag == 1)) { 446 return (FALSE); 447 } 448 449 mtx_lock(&pool->sp_lock); 450 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 451 if (netid) 452 free(netid, M_RPC); 453 if (s->sc_dispatch == dispatch) 454 goto rpcb_it; /* he is registering another xptr */ 455 mtx_unlock(&pool->sp_lock); 456 return (FALSE); 457 } 458 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 459 if (s == NULL) { 460 if (netid) 461 free(netid, M_RPC); 462 mtx_unlock(&pool->sp_lock); 463 return (FALSE); 464 } 465 466 s->sc_prog = prog; 467 s->sc_vers = vers; 468 s->sc_dispatch = dispatch; 469 s->sc_netid = netid; 470 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 471 472 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 473 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 474 475rpcb_it: 476 mtx_unlock(&pool->sp_lock); 477 /* now register the information with the local binder service */ 478 if (nconf) { 479 bool_t dummy; 480 struct netconfig tnc; 481 struct netbuf nb; 482 tnc = *nconf; 483 nb.buf = &xprt->xp_ltaddr; 484 nb.len = xprt->xp_ltaddr.ss_len; 485 dummy = rpcb_set(prog, vers, &tnc, &nb); 486 return (dummy); 487 } 488 return (TRUE); 489} 490 491/* 492 * Remove a service program from the callout list. 493 */ 494void 495svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 496{ 497 struct svc_callout *s; 498 499 /* unregister the information anyway */ 500 (void) rpcb_unset(prog, vers, NULL); 501 mtx_lock(&pool->sp_lock); 502 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 503 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 504 if (s->sc_netid) 505 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 506 mem_free(s, sizeof (struct svc_callout)); 507 } 508 mtx_unlock(&pool->sp_lock); 509} 510 511/* ********************** CALLOUT list related stuff ************* */ 512 513/* 514 * Search the callout list for a program number, return the callout 515 * struct. 516 */ 517static struct svc_callout * 518svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 519{ 520 struct svc_callout *s; 521 522 mtx_assert(&pool->sp_lock, MA_OWNED); 523 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 524 if (s->sc_prog == prog && s->sc_vers == vers 525 && (netid == NULL || s->sc_netid == NULL || 526 strcmp(netid, s->sc_netid) == 0)) 527 break; 528 } 529 530 return (s); 531} 532 533/* ******************* REPLY GENERATION ROUTINES ************ */ 534 535static bool_t 536svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 537 struct mbuf *body) 538{ 539 SVCXPRT *xprt = rqstp->rq_xprt; 540 bool_t ok; 541 542 if (rqstp->rq_args) { 543 m_freem(rqstp->rq_args); 544 rqstp->rq_args = NULL; 545 } 546 547 if (xprt->xp_pool->sp_rcache) 548 replay_setreply(xprt->xp_pool->sp_rcache, 549 rply, svc_getrpccaller(rqstp), body); 550 551 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 552 return (FALSE); 553 554 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 555 if (rqstp->rq_addr) { 556 free(rqstp->rq_addr, M_SONAME); 557 rqstp->rq_addr = NULL; 558 } 559 560 return (ok); 561} 562 563/* 564 * Send a reply to an rpc request 565 */ 566bool_t 567svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 568{ 569 struct rpc_msg rply; 570 struct mbuf *m; 571 XDR xdrs; 572 bool_t ok; 573 574 rply.rm_xid = rqstp->rq_xid; 575 rply.rm_direction = REPLY; 576 rply.rm_reply.rp_stat = MSG_ACCEPTED; 577 rply.acpted_rply.ar_verf = rqstp->rq_verf; 578 rply.acpted_rply.ar_stat = SUCCESS; 579 rply.acpted_rply.ar_results.where = NULL; 580 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 581 582 MGET(m, M_WAIT, MT_DATA); 583 MCLGET(m, M_WAIT); 584 m->m_len = 0; 585 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 586 ok = xdr_results(&xdrs, xdr_location); 587 XDR_DESTROY(&xdrs); 588 589 if (ok) { 590 return (svc_sendreply_common(rqstp, &rply, m)); 591 } else { 592 m_freem(m); 593 return (FALSE); 594 } 595} 596 597bool_t 598svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 599{ 600 struct rpc_msg rply; 601 602 rply.rm_xid = rqstp->rq_xid; 603 rply.rm_direction = REPLY; 604 rply.rm_reply.rp_stat = MSG_ACCEPTED; 605 rply.acpted_rply.ar_verf = rqstp->rq_verf; 606 rply.acpted_rply.ar_stat = SUCCESS; 607 rply.acpted_rply.ar_results.where = NULL; 608 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 609 610 return (svc_sendreply_common(rqstp, &rply, m)); 611} 612 613/* 614 * No procedure error reply 615 */ 616void 617svcerr_noproc(struct svc_req *rqstp) 618{ 619 SVCXPRT *xprt = rqstp->rq_xprt; 620 struct rpc_msg rply; 621 622 rply.rm_xid = rqstp->rq_xid; 623 rply.rm_direction = REPLY; 624 rply.rm_reply.rp_stat = MSG_ACCEPTED; 625 rply.acpted_rply.ar_verf = rqstp->rq_verf; 626 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 627 628 if (xprt->xp_pool->sp_rcache) 629 replay_setreply(xprt->xp_pool->sp_rcache, 630 &rply, svc_getrpccaller(rqstp), NULL); 631 632 svc_sendreply_common(rqstp, &rply, NULL); 633} 634 635/* 636 * Can't decode args error reply 637 */ 638void 639svcerr_decode(struct svc_req *rqstp) 640{ 641 SVCXPRT *xprt = rqstp->rq_xprt; 642 struct rpc_msg rply; 643 644 rply.rm_xid = rqstp->rq_xid; 645 rply.rm_direction = REPLY; 646 rply.rm_reply.rp_stat = MSG_ACCEPTED; 647 rply.acpted_rply.ar_verf = rqstp->rq_verf; 648 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 649 650 if (xprt->xp_pool->sp_rcache) 651 replay_setreply(xprt->xp_pool->sp_rcache, 652 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 653 654 svc_sendreply_common(rqstp, &rply, NULL); 655} 656 657/* 658 * Some system error 659 */ 660void 661svcerr_systemerr(struct svc_req *rqstp) 662{ 663 SVCXPRT *xprt = rqstp->rq_xprt; 664 struct rpc_msg rply; 665 666 rply.rm_xid = rqstp->rq_xid; 667 rply.rm_direction = REPLY; 668 rply.rm_reply.rp_stat = MSG_ACCEPTED; 669 rply.acpted_rply.ar_verf = rqstp->rq_verf; 670 rply.acpted_rply.ar_stat = SYSTEM_ERR; 671 672 if (xprt->xp_pool->sp_rcache) 673 replay_setreply(xprt->xp_pool->sp_rcache, 674 &rply, svc_getrpccaller(rqstp), NULL); 675 676 svc_sendreply_common(rqstp, &rply, NULL); 677} 678 679/* 680 * Authentication error reply 681 */ 682void 683svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 684{ 685 SVCXPRT *xprt = rqstp->rq_xprt; 686 struct rpc_msg rply; 687 688 rply.rm_xid = rqstp->rq_xid; 689 rply.rm_direction = REPLY; 690 rply.rm_reply.rp_stat = MSG_DENIED; 691 rply.rjcted_rply.rj_stat = AUTH_ERROR; 692 rply.rjcted_rply.rj_why = why; 693 694 if (xprt->xp_pool->sp_rcache) 695 replay_setreply(xprt->xp_pool->sp_rcache, 696 &rply, svc_getrpccaller(rqstp), NULL); 697 698 svc_sendreply_common(rqstp, &rply, NULL); 699} 700 701/* 702 * Auth too weak error reply 703 */ 704void 705svcerr_weakauth(struct svc_req *rqstp) 706{ 707 708 svcerr_auth(rqstp, AUTH_TOOWEAK); 709} 710 711/* 712 * Program unavailable error reply 713 */ 714void 715svcerr_noprog(struct svc_req *rqstp) 716{ 717 SVCXPRT *xprt = rqstp->rq_xprt; 718 struct rpc_msg rply; 719 720 rply.rm_xid = rqstp->rq_xid; 721 rply.rm_direction = REPLY; 722 rply.rm_reply.rp_stat = MSG_ACCEPTED; 723 rply.acpted_rply.ar_verf = rqstp->rq_verf; 724 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 725 726 if (xprt->xp_pool->sp_rcache) 727 replay_setreply(xprt->xp_pool->sp_rcache, 728 &rply, svc_getrpccaller(rqstp), NULL); 729 730 svc_sendreply_common(rqstp, &rply, NULL); 731} 732 733/* 734 * Program version mismatch error reply 735 */ 736void 737svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 738{ 739 SVCXPRT *xprt = rqstp->rq_xprt; 740 struct rpc_msg rply; 741 742 rply.rm_xid = rqstp->rq_xid; 743 rply.rm_direction = REPLY; 744 rply.rm_reply.rp_stat = MSG_ACCEPTED; 745 rply.acpted_rply.ar_verf = rqstp->rq_verf; 746 rply.acpted_rply.ar_stat = PROG_MISMATCH; 747 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 748 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 749 750 if (xprt->xp_pool->sp_rcache) 751 replay_setreply(xprt->xp_pool->sp_rcache, 752 &rply, svc_getrpccaller(rqstp), NULL); 753 754 svc_sendreply_common(rqstp, &rply, NULL); 755} 756 757/* 758 * Allocate a new server transport structure. All fields are 759 * initialized to zero and xp_p3 is initialized to point at an 760 * extension structure to hold various flags and authentication 761 * parameters. 762 */ 763SVCXPRT * 764svc_xprt_alloc() 765{ 766 SVCXPRT *xprt; 767 SVCXPRT_EXT *ext; 768 769 xprt = mem_alloc(sizeof(SVCXPRT)); 770 memset(xprt, 0, sizeof(SVCXPRT)); 771 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 772 memset(ext, 0, sizeof(SVCXPRT_EXT)); 773 xprt->xp_p3 = ext; 774 refcount_init(&xprt->xp_refs, 1); 775 776 return (xprt); 777} 778 779/* 780 * Free a server transport structure. 781 */ 782void 783svc_xprt_free(xprt) 784 SVCXPRT *xprt; 785{ 786 787 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 788 mem_free(xprt, sizeof(SVCXPRT)); 789} 790 791/* ******************* SERVER INPUT STUFF ******************* */ 792 793/* 794 * Read RPC requests from a transport and queue them to be 795 * executed. We handle authentication and replay cache replies here. 796 * Actually dispatching the RPC is deferred till svc_executereq. 797 */ 798static enum xprt_stat 799svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 800{ 801 SVCPOOL *pool = xprt->xp_pool; 802 struct svc_req *r; 803 struct rpc_msg msg; 804 struct mbuf *args; 805 enum xprt_stat stat; 806 807 /* now receive msgs from xprtprt (support batch calls) */ 808 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 809 810 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 811 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 812 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 813 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 814 enum auth_stat why; 815 816 /* 817 * Handle replays and authenticate before queuing the 818 * request to be executed. 819 */ 820 SVC_ACQUIRE(xprt); 821 r->rq_xprt = xprt; 822 if (pool->sp_rcache) { 823 struct rpc_msg repmsg; 824 struct mbuf *repbody; 825 enum replay_state rs; 826 rs = replay_find(pool->sp_rcache, &msg, 827 svc_getrpccaller(r), &repmsg, &repbody); 828 switch (rs) { 829 case RS_NEW: 830 break; 831 case RS_DONE: 832 SVC_REPLY(xprt, &repmsg, r->rq_addr, 833 repbody); 834 if (r->rq_addr) { 835 free(r->rq_addr, M_SONAME); 836 r->rq_addr = NULL; 837 } 838 m_freem(args); 839 goto call_done; 840 841 default: 842 m_freem(args); 843 goto call_done; 844 } 845 } 846 847 r->rq_xid = msg.rm_xid; 848 r->rq_prog = msg.rm_call.cb_prog; 849 r->rq_vers = msg.rm_call.cb_vers; 850 r->rq_proc = msg.rm_call.cb_proc; 851 r->rq_size = sizeof(*r) + m_length(args, NULL); 852 r->rq_args = args; 853 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 854 /* 855 * RPCSEC_GSS uses this return code 856 * for requests that form part of its 857 * context establishment protocol and 858 * should not be dispatched to the 859 * application. 860 */ 861 if (why != RPCSEC_GSS_NODISPATCH) 862 svcerr_auth(r, why); 863 goto call_done; 864 } 865 866 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 867 svcerr_decode(r); 868 goto call_done; 869 } 870 871 /* 872 * Everything checks out, return request to caller. 873 */ 874 *rqstp_ret = r; 875 r = NULL; 876 } 877call_done: 878 if (r) { 879 svc_freereq(r); 880 r = NULL; 881 } 882 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 883 xprt_unregister(xprt); 884 } 885 886 return (stat); 887} 888 889static void 890svc_executereq(struct svc_req *rqstp) 891{ 892 SVCXPRT *xprt = rqstp->rq_xprt; 893 SVCPOOL *pool = xprt->xp_pool; 894 int prog_found; 895 rpcvers_t low_vers; 896 rpcvers_t high_vers; 897 struct svc_callout *s; 898 899 /* now match message with a registered service*/ 900 prog_found = FALSE; 901 low_vers = (rpcvers_t) -1L; 902 high_vers = (rpcvers_t) 0L; 903 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 904 if (s->sc_prog == rqstp->rq_prog) { 905 if (s->sc_vers == rqstp->rq_vers) { 906 /* 907 * We hand ownership of r to the 908 * dispatch method - they must call 909 * svc_freereq. 910 */ 911 (*s->sc_dispatch)(rqstp, xprt); 912 return; 913 } /* found correct version */ 914 prog_found = TRUE; 915 if (s->sc_vers < low_vers) 916 low_vers = s->sc_vers; 917 if (s->sc_vers > high_vers) 918 high_vers = s->sc_vers; 919 } /* found correct program */ 920 } 921 922 /* 923 * if we got here, the program or version 924 * is not served ... 925 */ 926 if (prog_found) 927 svcerr_progvers(rqstp, low_vers, high_vers); 928 else 929 svcerr_noprog(rqstp); 930 931 svc_freereq(rqstp); 932} 933 934static void 935svc_checkidle(SVCPOOL *pool) 936{ 937 SVCXPRT *xprt, *nxprt; 938 time_t timo; 939 struct svcxprt_list cleanup; 940 941 TAILQ_INIT(&cleanup); 942 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 943 /* 944 * Only some transports have idle timers. Don't time 945 * something out which is just waking up. 946 */ 947 if (!xprt->xp_idletimeout || xprt->xp_thread) 948 continue; 949 950 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 951 if (time_uptime > timo) { 952 xprt_unregister_locked(xprt); 953 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 954 } 955 } 956 957 mtx_unlock(&pool->sp_lock); 958 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 959 SVC_RELEASE(xprt); 960 } 961 mtx_lock(&pool->sp_lock); 962 963} 964 965static void 966svc_assign_waiting_sockets(SVCPOOL *pool) 967{ 968 SVCXPRT *xprt; 969 970 while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { 971 if (xprt_assignthread(xprt)) 972 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 973 else 974 break; 975 } 976} 977 978static bool_t 979svc_request_space_available(SVCPOOL *pool) 980{ 981 982 mtx_assert(&pool->sp_lock, MA_OWNED); 983 984 if (pool->sp_space_throttled) { 985 /* 986 * Below the low-water yet? If so, assign any waiting sockets. 987 */ 988 if (pool->sp_space_used < pool->sp_space_low) { 989 pool->sp_space_throttled = FALSE; 990 svc_assign_waiting_sockets(pool); 991 return TRUE; 992 } 993 994 return FALSE; 995 } else { 996 if (pool->sp_space_used 997 >= pool->sp_space_high) { 998 pool->sp_space_throttled = TRUE; 999 pool->sp_space_throttle_count++; 1000 return FALSE; 1001 } 1002 1003 return TRUE; 1004 } 1005} 1006 1007static void 1008svc_run_internal(SVCPOOL *pool, bool_t ismaster) 1009{ 1010 SVCTHREAD *st, *stpref; 1011 SVCXPRT *xprt; 1012 enum xprt_stat stat; 1013 struct svc_req *rqstp; 1014 int error; 1015 1016 st = mem_alloc(sizeof(*st)); 1017 st->st_xprt = NULL; 1018 STAILQ_INIT(&st->st_reqs); 1019 cv_init(&st->st_cond, "rpcsvc"); 1020 1021 mtx_lock(&pool->sp_lock); 1022 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 1023 1024 /* 1025 * If we are a new thread which was spawned to cope with 1026 * increased load, set the state back to SVCPOOL_ACTIVE. 1027 */ 1028 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1029 pool->sp_state = SVCPOOL_ACTIVE; 1030 1031 while (pool->sp_state != SVCPOOL_CLOSING) { 1032 /* 1033 * Create new thread if requested. 1034 */ 1035 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1036 pool->sp_state = SVCPOOL_THREADSTARTING; 1037 pool->sp_lastcreatetime = time_uptime; 1038 mtx_unlock(&pool->sp_lock); 1039 svc_new_thread(pool); 1040 mtx_lock(&pool->sp_lock); 1041 continue; 1042 } 1043 1044 /* 1045 * Check for idle transports once per second. 1046 */ 1047 if (time_uptime > pool->sp_lastidlecheck) { 1048 pool->sp_lastidlecheck = time_uptime; 1049 svc_checkidle(pool); 1050 } 1051 1052 xprt = st->st_xprt; 1053 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1054 /* 1055 * Enforce maxthreads count. 1056 */ 1057 if (pool->sp_threadcount > pool->sp_maxthreads) 1058 break; 1059 1060 /* 1061 * Before sleeping, see if we can find an 1062 * active transport which isn't being serviced 1063 * by a thread. 1064 */ 1065 if (svc_request_space_available(pool) && 1066 (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { 1067 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1068 SVC_ACQUIRE(xprt); 1069 xprt->xp_thread = st; 1070 st->st_xprt = xprt; 1071 continue; 1072 } 1073 1074 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1075 st->st_idle = TRUE; 1076 if (ismaster || (!ismaster && 1077 pool->sp_threadcount > pool->sp_minthreads)) 1078 error = cv_timedwait_sig(&st->st_cond, 1079 &pool->sp_lock, 5 * hz); 1080 else 1081 error = cv_wait_sig(&st->st_cond, 1082 &pool->sp_lock); 1083 if (st->st_idle) { 1084 LIST_REMOVE(st, st_ilink); 1085 st->st_idle = FALSE; 1086 } 1087 1088 /* 1089 * Reduce worker thread count when idle. 1090 */ 1091 if (error == EWOULDBLOCK) { 1092 if (!ismaster 1093 && (pool->sp_threadcount 1094 > pool->sp_minthreads) 1095 && !st->st_xprt 1096 && STAILQ_EMPTY(&st->st_reqs)) 1097 break; 1098 } else if (error) { 1099 mtx_unlock(&pool->sp_lock); 1100 svc_exit(pool); 1101 mtx_lock(&pool->sp_lock); 1102 break; 1103 } 1104 continue; 1105 } 1106 1107 if (xprt) { 1108 /* 1109 * Drain the transport socket and queue up any 1110 * RPCs. 1111 */ 1112 xprt->xp_lastactive = time_uptime; 1113 stat = XPRT_IDLE; 1114 do { 1115 if (!svc_request_space_available(pool)) 1116 break; 1117 rqstp = NULL; 1118 mtx_unlock(&pool->sp_lock); 1119 stat = svc_getreq(xprt, &rqstp); 1120 mtx_lock(&pool->sp_lock); 1121 if (rqstp) { 1122 /* 1123 * See if the application has 1124 * a preference for some other 1125 * thread. 1126 */ 1127 stpref = st; 1128 if (pool->sp_assign) 1129 stpref = pool->sp_assign(st, 1130 rqstp); 1131 1132 pool->sp_space_used += 1133 rqstp->rq_size; 1134 if (pool->sp_space_used 1135 > pool->sp_space_used_highest) 1136 pool->sp_space_used_highest = 1137 pool->sp_space_used; 1138 rqstp->rq_thread = stpref; 1139 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1140 rqstp, rq_link); 1141 stpref->st_reqcount++; 1142 1143 /* 1144 * If we assigned the request 1145 * to another thread, make 1146 * sure its awake and continue 1147 * reading from the 1148 * socket. Otherwise, try to 1149 * find some other thread to 1150 * read from the socket and 1151 * execute the request 1152 * immediately. 1153 */ 1154 if (stpref == st) 1155 break; 1156 if (stpref->st_idle) { 1157 LIST_REMOVE(stpref, st_ilink); 1158 stpref->st_idle = FALSE; 1159 cv_signal(&stpref->st_cond); 1160 } 1161 } 1162 } while (stat == XPRT_MOREREQS 1163 && pool->sp_state != SVCPOOL_CLOSING); 1164 1165 /* 1166 * Move this transport to the end of the 1167 * active list to ensure fairness when 1168 * multiple transports are active. If this was 1169 * the last queued request, svc_getreq will 1170 * end up calling xprt_inactive to remove from 1171 * the active list. 1172 */ 1173 xprt->xp_thread = NULL; 1174 st->st_xprt = NULL; 1175 if (xprt->xp_active) { 1176 if (!xprt_assignthread(xprt)) 1177 TAILQ_INSERT_TAIL(&pool->sp_active, 1178 xprt, xp_alink); 1179 } 1180 mtx_unlock(&pool->sp_lock); 1181 SVC_RELEASE(xprt); 1182 mtx_lock(&pool->sp_lock); 1183 } 1184 1185 /* 1186 * Execute what we have queued. 1187 */ 1188 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1189 size_t sz = rqstp->rq_size; 1190 mtx_unlock(&pool->sp_lock); 1191 svc_executereq(rqstp); 1192 mtx_lock(&pool->sp_lock); 1193 pool->sp_space_used -= sz; 1194 } 1195 } 1196 1197 if (st->st_xprt) { 1198 xprt = st->st_xprt; 1199 st->st_xprt = NULL; 1200 SVC_RELEASE(xprt); 1201 } 1202 1203 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1204 LIST_REMOVE(st, st_link); 1205 pool->sp_threadcount--; 1206 1207 mtx_unlock(&pool->sp_lock); 1208 1209 cv_destroy(&st->st_cond); 1210 mem_free(st, sizeof(*st)); 1211 1212 if (!ismaster) 1213 wakeup(pool); 1214} 1215 1216static void 1217svc_thread_start(void *arg) 1218{ 1219 1220 svc_run_internal((SVCPOOL *) arg, FALSE); 1221 kthread_exit(); 1222} 1223 1224static void 1225svc_new_thread(SVCPOOL *pool) 1226{ 1227 struct thread *td; 1228 1229 pool->sp_threadcount++; 1230 kthread_add(svc_thread_start, pool, 1231 pool->sp_proc, &td, 0, 0, 1232 "%s: service", pool->sp_name); 1233} 1234 1235void 1236svc_run(SVCPOOL *pool) 1237{ 1238 int i; 1239 struct proc *p; 1240 struct thread *td; 1241 1242 p = curproc; 1243 td = curthread; 1244 snprintf(td->td_name, sizeof(td->td_name), 1245 "%s: master", pool->sp_name); 1246 pool->sp_state = SVCPOOL_ACTIVE; 1247 pool->sp_proc = p; 1248 pool->sp_lastcreatetime = time_uptime; 1249 pool->sp_threadcount = 1; 1250 1251 for (i = 1; i < pool->sp_minthreads; i++) { 1252 svc_new_thread(pool); 1253 } 1254 1255 svc_run_internal(pool, TRUE); 1256 1257 mtx_lock(&pool->sp_lock); 1258 while (pool->sp_threadcount > 0) 1259 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1260 mtx_unlock(&pool->sp_lock); 1261} 1262 1263void 1264svc_exit(SVCPOOL *pool) 1265{ 1266 SVCTHREAD *st; 1267 1268 mtx_lock(&pool->sp_lock); 1269 1270 if (pool->sp_state != SVCPOOL_CLOSING) { 1271 pool->sp_state = SVCPOOL_CLOSING; 1272 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1273 cv_signal(&st->st_cond); 1274 } 1275 1276 mtx_unlock(&pool->sp_lock); 1277} 1278 1279bool_t 1280svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1281{ 1282 struct mbuf *m; 1283 XDR xdrs; 1284 bool_t stat; 1285 1286 m = rqstp->rq_args; 1287 rqstp->rq_args = NULL; 1288 1289 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1290 stat = xargs(&xdrs, args); 1291 XDR_DESTROY(&xdrs); 1292 1293 return (stat); 1294} 1295 1296bool_t 1297svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1298{ 1299 XDR xdrs; 1300 1301 if (rqstp->rq_addr) { 1302 free(rqstp->rq_addr, M_SONAME); 1303 rqstp->rq_addr = NULL; 1304 } 1305 1306 xdrs.x_op = XDR_FREE; 1307 return (xargs(&xdrs, args)); 1308} 1309 1310void 1311svc_freereq(struct svc_req *rqstp) 1312{ 1313 SVCTHREAD *st; 1314 SVCXPRT *xprt; 1315 SVCPOOL *pool; 1316 1317 st = rqstp->rq_thread; 1318 xprt = rqstp->rq_xprt; 1319 if (xprt) 1320 pool = xprt->xp_pool; 1321 else 1322 pool = NULL; 1323 if (st) { 1324 mtx_lock(&pool->sp_lock); 1325 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1326 ("Freeing request out of order")); 1327 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1328 st->st_reqcount--; 1329 if (pool->sp_done) 1330 pool->sp_done(st, rqstp); 1331 mtx_unlock(&pool->sp_lock); 1332 } 1333 1334 if (rqstp->rq_auth.svc_ah_ops) 1335 SVCAUTH_RELEASE(&rqstp->rq_auth); 1336 1337 if (rqstp->rq_xprt) { 1338 SVC_RELEASE(rqstp->rq_xprt); 1339 } 1340 1341 if (rqstp->rq_addr) 1342 free(rqstp->rq_addr, M_SONAME); 1343 1344 if (rqstp->rq_args) 1345 m_freem(rqstp->rq_args); 1346 1347 free(rqstp, M_RPC); 1348} 1349