svc.c revision 261060
1178354Ssam/* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2178354Ssam 3178354Ssam/*- 4178354Ssam * Copyright (c) 2009, Sun Microsystems, Inc. 5178354Ssam * All rights reserved. 6178354Ssam * 7178354Ssam * Redistribution and use in source and binary forms, with or without 8178354Ssam * modification, are permitted provided that the following conditions are met: 9178354Ssam * - Redistributions of source code must retain the above copyright notice, 10178354Ssam * this list of conditions and the following disclaimer. 11178354Ssam * - Redistributions in binary form must reproduce the above copyright notice, 12178354Ssam * this list of conditions and the following disclaimer in the documentation 13178354Ssam * and/or other materials provided with the distribution. 14178354Ssam * - Neither the name of Sun Microsystems, Inc. nor the names of its 15178354Ssam * contributors may be used to endorse or promote products derived 16178354Ssam * from this software without specific prior written permission. 17178354Ssam * 18178354Ssam * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19178354Ssam * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20178354Ssam * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21178354Ssam * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22178354Ssam * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23178354Ssam * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24178354Ssam * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25178354Ssam * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26178354Ssam * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27178354Ssam * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28178354Ssam * POSSIBILITY OF SUCH DAMAGE. 29178354Ssam */ 30178354Ssam 31178354Ssam#if defined(LIBC_SCCS) && !defined(lint) 32178354Ssamstatic char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33178354Ssamstatic char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34178354Ssam#endif 35178354Ssam#include <sys/cdefs.h> 36178354Ssam__FBSDID("$FreeBSD: stable/9/sys/rpc/svc.c 261060 2014-01-23 00:41:23Z mav $"); 37178354Ssam 38178354Ssam/* 39178354Ssam * svc.c, Server-side remote procedure call interface. 40178354Ssam * 41178354Ssam * There are two sets of procedures here. The xprt routines are 42178354Ssam * for handling transport handles. The svc routines handle the 43178354Ssam * list of service routines. 44178354Ssam * 45178354Ssam * Copyright (C) 1984, Sun Microsystems, Inc. 46178354Ssam */ 47178354Ssam 48178354Ssam#include <sys/param.h> 49178354Ssam#include <sys/lock.h> 50178354Ssam#include <sys/kernel.h> 51178354Ssam#include <sys/kthread.h> 52178354Ssam#include <sys/malloc.h> 53178354Ssam#include <sys/mbuf.h> 54178354Ssam#include <sys/mutex.h> 55178354Ssam#include <sys/proc.h> 56178354Ssam#include <sys/queue.h> 57178354Ssam#include <sys/socketvar.h> 58178354Ssam#include <sys/systm.h> 59178354Ssam#include <sys/ucred.h> 60190391Ssam 61190391Ssam#include <rpc/rpc.h> 62190391Ssam#include <rpc/rpcb_clnt.h> 63178354Ssam#include <rpc/replay.h> 64178354Ssam 65178354Ssam#include <rpc/rpc_com.h> 66192468Ssam 67178354Ssam#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 68192468Ssam#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 69178354Ssam 70178354Ssamstatic struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 71178354Ssam char *); 72178354Ssamstatic void svc_new_thread(SVCPOOL *pool); 73178354Ssamstatic void xprt_unregister_locked(SVCXPRT *xprt); 74178354Ssam 75178354Ssam/* *************** SVCXPRT related stuff **************** */ 76178354Ssam 77178354Ssamstatic int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 78178354Ssamstatic int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 79178354Ssam 80178354SsamSVCPOOL* 81178354Ssamsvcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 82178354Ssam{ 83178354Ssam SVCPOOL *pool; 84178354Ssam 85178354Ssam pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 86178354Ssam 87178354Ssam mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 88178354Ssam pool->sp_name = name; 89178354Ssam pool->sp_state = SVCPOOL_INIT; 90178354Ssam pool->sp_proc = NULL; 91178354Ssam TAILQ_INIT(&pool->sp_xlist); 92178354Ssam TAILQ_INIT(&pool->sp_active); 93178354Ssam TAILQ_INIT(&pool->sp_callouts); 94178354Ssam LIST_INIT(&pool->sp_threads); 95178354Ssam LIST_INIT(&pool->sp_idlethreads); 96178354Ssam pool->sp_minthreads = 1; 97178354Ssam pool->sp_maxthreads = 1; 98178354Ssam pool->sp_threadcount = 0; 99178354Ssam 100195379Ssam /* 101195379Ssam * Don't use more than a quarter of mbuf clusters or more than 102195379Ssam * 45Mb buffering requests. 103195379Ssam */ 104195379Ssam pool->sp_space_high = nmbclusters * MCLBYTES / 4; 105195379Ssam if (pool->sp_space_high > 45 << 20) 106195379Ssam pool->sp_space_high = 45 << 20; 107195379Ssam pool->sp_space_low = 2 * pool->sp_space_high / 3; 108195379Ssam 109195379Ssam sysctl_ctx_init(&pool->sp_sysctl); 110195379Ssam if (sysctl_base) { 111195379Ssam SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 112195379Ssam "minthreads", CTLTYPE_INT | CTLFLAG_RW, 113195379Ssam pool, 0, svcpool_minthread_sysctl, "I", ""); 114195379Ssam SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 115195379Ssam "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 116195379Ssam pool, 0, svcpool_maxthread_sysctl, "I", ""); 117195379Ssam SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 118195379Ssam "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 119195379Ssam 120195379Ssam SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 121195379Ssam "request_space_used", CTLFLAG_RD, 122178354Ssam &pool->sp_space_used, 0, 123178354Ssam "Space in parsed but not handled requests."); 124178354Ssam 125178354Ssam SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 126178354Ssam "request_space_used_highest", CTLFLAG_RD, 127178354Ssam &pool->sp_space_used_highest, 0, 128178354Ssam "Highest space used since reboot."); 129178354Ssam 130178354Ssam SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131178354Ssam "request_space_high", CTLFLAG_RW, 132178354Ssam &pool->sp_space_high, 0, 133178354Ssam "Maximum space in parsed but not handled requests."); 134178354Ssam 135178354Ssam SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 136178354Ssam "request_space_low", CTLFLAG_RW, 137178354Ssam &pool->sp_space_low, 0, 138178354Ssam "Low water mark for request space."); 139178354Ssam 140178354Ssam SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141178354Ssam "request_space_throttled", CTLFLAG_RD, 142178354Ssam &pool->sp_space_throttled, 0, 143178354Ssam "Whether nfs requests are currently throttled"); 144178354Ssam 145178354Ssam SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 146178354Ssam "request_space_throttle_count", CTLFLAG_RD, 147178354Ssam &pool->sp_space_throttle_count, 0, 148178354Ssam "Count of times throttling based on request space has occurred"); 149178354Ssam } 150178354Ssam 151178354Ssam return pool; 152178354Ssam} 153178354Ssam 154178354Ssamvoid 155178354Ssamsvcpool_destroy(SVCPOOL *pool) 156178354Ssam{ 157178354Ssam SVCXPRT *xprt, *nxprt; 158178354Ssam struct svc_callout *s; 159178354Ssam struct svcxprt_list cleanup; 160178354Ssam 161178354Ssam TAILQ_INIT(&cleanup); 162178354Ssam mtx_lock(&pool->sp_lock); 163178354Ssam 164178354Ssam while (TAILQ_FIRST(&pool->sp_xlist)) { 165178354Ssam xprt = TAILQ_FIRST(&pool->sp_xlist); 166178354Ssam xprt_unregister_locked(xprt); 167178354Ssam TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 168178354Ssam } 169178354Ssam 170178354Ssam while (TAILQ_FIRST(&pool->sp_callouts)) { 171178354Ssam s = TAILQ_FIRST(&pool->sp_callouts); 172178354Ssam mtx_unlock(&pool->sp_lock); 173178354Ssam svc_unreg(pool, s->sc_prog, s->sc_vers); 174178354Ssam mtx_lock(&pool->sp_lock); 175178354Ssam } 176178354Ssam mtx_unlock(&pool->sp_lock); 177178354Ssam 178178354Ssam TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 179178354Ssam SVC_RELEASE(xprt); 180178354Ssam } 181178354Ssam 182178354Ssam mtx_destroy(&pool->sp_lock); 183178354Ssam 184178354Ssam if (pool->sp_rcache) 185178354Ssam replay_freecache(pool->sp_rcache); 186178354Ssam 187178354Ssam sysctl_ctx_free(&pool->sp_sysctl); 188178354Ssam free(pool, M_RPC); 189178354Ssam} 190178354Ssam 191178354Ssamstatic bool_t 192178354Ssamsvcpool_active(SVCPOOL *pool) 193178354Ssam{ 194178354Ssam enum svcpool_state state = pool->sp_state; 195178354Ssam 196178354Ssam if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 197178354Ssam return (FALSE); 198178354Ssam return (TRUE); 199178354Ssam} 200178354Ssam 201178354Ssam/* 202178354Ssam * Sysctl handler to set the minimum thread count on a pool 203178354Ssam */ 204178354Ssamstatic int 205178354Ssamsvcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 206178354Ssam{ 207178354Ssam SVCPOOL *pool; 208178354Ssam int newminthreads, error, n; 209178354Ssam 210178354Ssam pool = oidp->oid_arg1; 211178354Ssam newminthreads = pool->sp_minthreads; 212178354Ssam error = sysctl_handle_int(oidp, &newminthreads, 0, req); 213178354Ssam if (error == 0 && newminthreads != pool->sp_minthreads) { 214178354Ssam if (newminthreads > pool->sp_maxthreads) 215178354Ssam return (EINVAL); 216178354Ssam mtx_lock(&pool->sp_lock); 217178354Ssam if (newminthreads > pool->sp_minthreads 218178354Ssam && svcpool_active(pool)) { 219178354Ssam /* 220195379Ssam * If the pool is running and we are 221178354Ssam * increasing, create some more threads now. 222195379Ssam */ 223195379Ssam n = newminthreads - pool->sp_threadcount; 224178354Ssam if (n > 0) { 225178354Ssam mtx_unlock(&pool->sp_lock); 226178354Ssam while (n--) 227178354Ssam svc_new_thread(pool); 228178354Ssam mtx_lock(&pool->sp_lock); 229178354Ssam } 230178354Ssam } 231178354Ssam pool->sp_minthreads = newminthreads; 232178354Ssam mtx_unlock(&pool->sp_lock); 233178354Ssam } 234178354Ssam return (error); 235178354Ssam} 236178354Ssam 237178354Ssam/* 238178354Ssam * Sysctl handler to set the maximum thread count on a pool 239178354Ssam */ 240178354Ssamstatic int 241178354Ssamsvcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 242178354Ssam{ 243178354Ssam SVCPOOL *pool; 244178354Ssam SVCTHREAD *st; 245178354Ssam int newmaxthreads, error; 246178354Ssam 247178354Ssam pool = oidp->oid_arg1; 248178354Ssam newmaxthreads = pool->sp_maxthreads; 249178354Ssam error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 250178354Ssam if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 251178354Ssam if (newmaxthreads < pool->sp_minthreads) 252178354Ssam return (EINVAL); 253178354Ssam mtx_lock(&pool->sp_lock); 254178354Ssam if (newmaxthreads < pool->sp_maxthreads 255178354Ssam && svcpool_active(pool)) { 256190672Ssam /* 257178354Ssam * If the pool is running and we are 258243882Sglebius * decreasing, wake up some idle threads to 259178354Ssam * encourage them to exit. 260178354Ssam */ 261178354Ssam LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 262178354Ssam cv_signal(&st->st_cond); 263178354Ssam } 264178354Ssam pool->sp_maxthreads = newmaxthreads; 265178354Ssam mtx_unlock(&pool->sp_lock); 266178354Ssam } 267178354Ssam return (error); 268178354Ssam} 269178354Ssam 270178354Ssam/* 271190672Ssam * Activate a transport handle. 272178354Ssam */ 273178354Ssamvoid 274178354Ssamxprt_register(SVCXPRT *xprt) 275178354Ssam{ 276178354Ssam SVCPOOL *pool = xprt->xp_pool; 277178354Ssam 278178354Ssam SVC_ACQUIRE(xprt); 279178354Ssam mtx_lock(&pool->sp_lock); 280178354Ssam xprt->xp_registered = TRUE; 281178354Ssam xprt->xp_active = FALSE; 282178354Ssam TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 283191542Ssam mtx_unlock(&pool->sp_lock); 284191542Ssam} 285191542Ssam 286190672Ssam/* 287190672Ssam * De-activate a transport handle. Note: the locked version doesn't 288190672Ssam * release the transport - caller must do that after dropping the pool 289190672Ssam * lock. 290194461Srpaulo */ 291190672Ssamstatic void 292190672Ssamxprt_unregister_locked(SVCXPRT *xprt) 293190672Ssam{ 294190672Ssam SVCPOOL *pool = xprt->xp_pool; 295190672Ssam 296178354Ssam mtx_assert(&pool->sp_lock, MA_OWNED); 297178354Ssam KASSERT(xprt->xp_registered == TRUE, 298254082Sadrian ("xprt_unregister_locked: not registered")); 299178354Ssam xprt_inactive_locked(xprt); 300178354Ssam TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 301178354Ssam xprt->xp_registered = FALSE; 302178354Ssam} 303178354Ssam 304178354Ssamvoid 305178354Ssamxprt_unregister(SVCXPRT *xprt) 306178354Ssam{ 307178354Ssam SVCPOOL *pool = xprt->xp_pool; 308178354Ssam 309178354Ssam mtx_lock(&pool->sp_lock); 310178354Ssam if (xprt->xp_registered == FALSE) { 311178354Ssam /* Already unregistered by another thread */ 312178354Ssam mtx_unlock(&pool->sp_lock); 313178354Ssam return; 314178354Ssam } 315178354Ssam xprt_unregister_locked(xprt); 316178354Ssam mtx_unlock(&pool->sp_lock); 317178354Ssam 318178354Ssam SVC_RELEASE(xprt); 319178354Ssam} 320195379Ssam 321195379Ssam/* 322195379Ssam * Attempt to assign a service thread to this transport. 323195379Ssam */ 324195379Ssamstatic int 325195379Ssamxprt_assignthread(SVCXPRT *xprt) 326195379Ssam{ 327195379Ssam SVCPOOL *pool = xprt->xp_pool; 328195379Ssam SVCTHREAD *st; 329195379Ssam 330195379Ssam mtx_assert(&pool->sp_lock, MA_OWNED); 331195379Ssam st = LIST_FIRST(&pool->sp_idlethreads); 332178354Ssam if (st) { 333178354Ssam LIST_REMOVE(st, st_ilink); 334178354Ssam st->st_idle = FALSE; 335178354Ssam SVC_ACQUIRE(xprt); 336178354Ssam xprt->xp_thread = st; 337178354Ssam st->st_xprt = xprt; 338178354Ssam cv_signal(&st->st_cond); 339178354Ssam return (TRUE); 340178354Ssam } else { 341178354Ssam /* 342178354Ssam * See if we can create a new thread. The 343178354Ssam * actual thread creation happens in 344178354Ssam * svc_run_internal because our locking state 345178354Ssam * is poorly defined (we are typically called 346178354Ssam * from a socket upcall). Don't create more 347178354Ssam * than one thread per second. 348178354Ssam */ 349178354Ssam if (pool->sp_state == SVCPOOL_ACTIVE 350178354Ssam && pool->sp_lastcreatetime < time_uptime 351178354Ssam && pool->sp_threadcount < pool->sp_maxthreads) { 352178354Ssam pool->sp_state = SVCPOOL_THREADWANTED; 353178354Ssam } 354178354Ssam } 355178354Ssam return (FALSE); 356178354Ssam} 357178354Ssam 358178354Ssamvoid 359178354Ssamxprt_active(SVCXPRT *xprt) 360178354Ssam{ 361178354Ssam SVCPOOL *pool = xprt->xp_pool; 362178354Ssam 363178354Ssam mtx_lock(&pool->sp_lock); 364178354Ssam 365178354Ssam if (!xprt->xp_registered) { 366178354Ssam /* 367178354Ssam * Race with xprt_unregister - we lose. 368178354Ssam */ 369178354Ssam mtx_unlock(&pool->sp_lock); 370178354Ssam return; 371178354Ssam } 372178354Ssam 373178354Ssam if (!xprt->xp_active) { 374178354Ssam xprt->xp_active = TRUE; 375178354Ssam if (xprt->xp_thread == NULL) { 376178354Ssam if (!xprt_assignthread(xprt)) 377178354Ssam TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 378178354Ssam xp_alink); 379178354Ssam } 380178354Ssam } 381178354Ssam 382178354Ssam mtx_unlock(&pool->sp_lock); 383178354Ssam} 384178354Ssam 385178354Ssamvoid 386178354Ssamxprt_inactive_locked(SVCXPRT *xprt) 387178354Ssam{ 388178354Ssam SVCPOOL *pool = xprt->xp_pool; 389178354Ssam 390178354Ssam mtx_assert(&pool->sp_lock, MA_OWNED); 391178354Ssam if (xprt->xp_active) { 392178354Ssam if (xprt->xp_thread == NULL) 393178354Ssam TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 394178354Ssam xprt->xp_active = FALSE; 395178354Ssam } 396178354Ssam} 397178354Ssam 398178354Ssamvoid 399178354Ssamxprt_inactive(SVCXPRT *xprt) 400178354Ssam{ 401178354Ssam SVCPOOL *pool = xprt->xp_pool; 402178354Ssam 403178354Ssam mtx_lock(&pool->sp_lock); 404178354Ssam xprt_inactive_locked(xprt); 405178354Ssam mtx_unlock(&pool->sp_lock); 406192468Ssam} 407178354Ssam 408178354Ssam/* 409178354Ssam * Add a service program to the callout list. 410178354Ssam * The dispatch routine will be called when a rpc request for this 411178354Ssam * program number comes in. 412178354Ssam */ 413178354Ssambool_t 414178354Ssamsvc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 415203422Srpaulo void (*dispatch)(struct svc_req *, SVCXPRT *), 416178354Ssam const struct netconfig *nconf) 417178354Ssam{ 418178354Ssam SVCPOOL *pool = xprt->xp_pool; 419183247Ssam struct svc_callout *s; 420178354Ssam char *netid = NULL; 421178354Ssam int flag = 0; 422183247Ssam 423183247Ssam/* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 424183247Ssam 425183247Ssam if (xprt->xp_netid) { 426183247Ssam netid = strdup(xprt->xp_netid, M_RPC); 427178354Ssam flag = 1; 428178354Ssam } else if (nconf && nconf->nc_netid) { 429178354Ssam netid = strdup(nconf->nc_netid, M_RPC); 430178354Ssam flag = 1; 431178354Ssam } /* must have been created with svc_raw_create */ 432178354Ssam if ((netid == NULL) && (flag == 1)) { 433178354Ssam return (FALSE); 434178354Ssam } 435178354Ssam 436178354Ssam mtx_lock(&pool->sp_lock); 437178354Ssam if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 438178354Ssam if (netid) 439178354Ssam free(netid, M_RPC); 440178354Ssam if (s->sc_dispatch == dispatch) 441178354Ssam goto rpcb_it; /* he is registering another xptr */ 442178354Ssam mtx_unlock(&pool->sp_lock); 443178354Ssam return (FALSE); 444178354Ssam } 445178354Ssam s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 446178354Ssam if (s == NULL) { 447178354Ssam if (netid) 448178354Ssam free(netid, M_RPC); 449178354Ssam mtx_unlock(&pool->sp_lock); 450178354Ssam return (FALSE); 451178354Ssam } 452178354Ssam 453178354Ssam s->sc_prog = prog; 454178354Ssam s->sc_vers = vers; 455218958Sbschmidt s->sc_dispatch = dispatch; 456218958Sbschmidt s->sc_netid = netid; 457218958Sbschmidt TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 458178354Ssam 459178354Ssam if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 460178354Ssam ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 461191547Ssam 462191547Ssamrpcb_it: 463178354Ssam mtx_unlock(&pool->sp_lock); 464178354Ssam /* now register the information with the local binder service */ 465178354Ssam if (nconf) { 466178354Ssam bool_t dummy; 467178354Ssam struct netconfig tnc; 468178354Ssam struct netbuf nb; 469178354Ssam tnc = *nconf; 470178354Ssam nb.buf = &xprt->xp_ltaddr; 471178354Ssam nb.len = xprt->xp_ltaddr.ss_len; 472178354Ssam dummy = rpcb_set(prog, vers, &tnc, &nb); 473178354Ssam return (dummy); 474178354Ssam } 475178354Ssam return (TRUE); 476178354Ssam} 477178354Ssam 478178354Ssam/* 479178354Ssam * Remove a service program from the callout list. 480178354Ssam */ 481178354Ssamvoid 482178354Ssamsvc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 483178354Ssam{ 484178354Ssam struct svc_callout *s; 485178354Ssam 486178354Ssam /* unregister the information anyway */ 487178354Ssam (void) rpcb_unset(prog, vers, NULL); 488178354Ssam mtx_lock(&pool->sp_lock); 489192468Ssam while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 490178354Ssam TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 491178354Ssam if (s->sc_netid) 492178354Ssam mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 493178354Ssam mem_free(s, sizeof (struct svc_callout)); 494178354Ssam } 495178354Ssam mtx_unlock(&pool->sp_lock); 496221418Sadrian} 497178354Ssam 498178354Ssam/* ********************** CALLOUT list related stuff ************* */ 499178354Ssam 500178354Ssam/* 501178354Ssam * Search the callout list for a program number, return the callout 502178354Ssam * struct. 503178354Ssam */ 504178354Ssamstatic struct svc_callout * 505178354Ssamsvc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 506178354Ssam{ 507178354Ssam struct svc_callout *s; 508178354Ssam 509178354Ssam mtx_assert(&pool->sp_lock, MA_OWNED); 510178354Ssam TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 511178354Ssam if (s->sc_prog == prog && s->sc_vers == vers 512178354Ssam && (netid == NULL || s->sc_netid == NULL || 513178354Ssam strcmp(netid, s->sc_netid) == 0)) 514178354Ssam break; 515178354Ssam } 516178354Ssam 517178354Ssam return (s); 518178354Ssam} 519178354Ssam 520178354Ssam/* ******************* REPLY GENERATION ROUTINES ************ */ 521178354Ssam 522178354Ssamstatic bool_t 523178354Ssamsvc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 524178354Ssam struct mbuf *body) 525178354Ssam{ 526178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 527178354Ssam bool_t ok; 528178354Ssam 529178354Ssam if (rqstp->rq_args) { 530178354Ssam m_freem(rqstp->rq_args); 531178354Ssam rqstp->rq_args = NULL; 532178354Ssam } 533178354Ssam 534178354Ssam if (xprt->xp_pool->sp_rcache) 535178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 536178354Ssam rply, svc_getrpccaller(rqstp), body); 537178354Ssam 538178354Ssam if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 539183247Ssam return (FALSE); 540183247Ssam 541178354Ssam ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 542178354Ssam if (rqstp->rq_addr) { 543178354Ssam free(rqstp->rq_addr, M_SONAME); 544183247Ssam rqstp->rq_addr = NULL; 545178354Ssam } 546178354Ssam 547178354Ssam return (ok); 548178354Ssam} 549178354Ssam 550178354Ssam/* 551178354Ssam * Send a reply to an rpc request 552178354Ssam */ 553178354Ssambool_t 554178354Ssamsvc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 555178354Ssam{ 556178354Ssam struct rpc_msg rply; 557178354Ssam struct mbuf *m; 558178354Ssam XDR xdrs; 559262007Skevlo bool_t ok; 560178354Ssam 561178354Ssam rply.rm_xid = rqstp->rq_xid; 562178354Ssam rply.rm_direction = REPLY; 563178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 564178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 565178354Ssam rply.acpted_rply.ar_stat = SUCCESS; 566178354Ssam rply.acpted_rply.ar_results.where = NULL; 567178354Ssam rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 568178354Ssam 569178354Ssam MGET(m, M_WAIT, MT_DATA); 570178354Ssam MCLGET(m, M_WAIT); 571178354Ssam m->m_len = 0; 572178354Ssam xdrmbuf_create(&xdrs, m, XDR_ENCODE); 573178354Ssam ok = xdr_results(&xdrs, xdr_location); 574178354Ssam XDR_DESTROY(&xdrs); 575178354Ssam 576178354Ssam if (ok) { 577262007Skevlo return (svc_sendreply_common(rqstp, &rply, m)); 578178354Ssam } else { 579178354Ssam m_freem(m); 580178354Ssam return (FALSE); 581178354Ssam } 582178354Ssam} 583178354Ssam 584178354Ssambool_t 585178354Ssamsvc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 586178354Ssam{ 587178354Ssam struct rpc_msg rply; 588178354Ssam 589178354Ssam rply.rm_xid = rqstp->rq_xid; 590178354Ssam rply.rm_direction = REPLY; 591178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 592178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 593178354Ssam rply.acpted_rply.ar_stat = SUCCESS; 594178354Ssam rply.acpted_rply.ar_results.where = NULL; 595178354Ssam rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 596178354Ssam 597178354Ssam return (svc_sendreply_common(rqstp, &rply, m)); 598178354Ssam} 599178354Ssam 600178354Ssam/* 601178354Ssam * No procedure error reply 602178354Ssam */ 603178354Ssamvoid 604178354Ssamsvcerr_noproc(struct svc_req *rqstp) 605178354Ssam{ 606178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 607178354Ssam struct rpc_msg rply; 608178354Ssam 609178354Ssam rply.rm_xid = rqstp->rq_xid; 610178354Ssam rply.rm_direction = REPLY; 611178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 612178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 613178354Ssam rply.acpted_rply.ar_stat = PROC_UNAVAIL; 614178354Ssam 615178354Ssam if (xprt->xp_pool->sp_rcache) 616178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 617192468Ssam &rply, svc_getrpccaller(rqstp), NULL); 618192468Ssam 619178354Ssam svc_sendreply_common(rqstp, &rply, NULL); 620178354Ssam} 621178354Ssam 622178354Ssam/* 623178354Ssam * Can't decode args error reply 624178354Ssam */ 625178354Ssamvoid 626178354Ssamsvcerr_decode(struct svc_req *rqstp) 627178354Ssam{ 628178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 629178354Ssam struct rpc_msg rply; 630178354Ssam 631178354Ssam rply.rm_xid = rqstp->rq_xid; 632178354Ssam rply.rm_direction = REPLY; 633178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 634178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 635178354Ssam rply.acpted_rply.ar_stat = GARBAGE_ARGS; 636178354Ssam 637178354Ssam if (xprt->xp_pool->sp_rcache) 638178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 639178354Ssam &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 640178354Ssam 641178354Ssam svc_sendreply_common(rqstp, &rply, NULL); 642178354Ssam} 643178354Ssam 644178354Ssam/* 645178354Ssam * Some system error 646178354Ssam */ 647178354Ssamvoid 648178354Ssamsvcerr_systemerr(struct svc_req *rqstp) 649178354Ssam{ 650178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 651178354Ssam struct rpc_msg rply; 652178354Ssam 653178354Ssam rply.rm_xid = rqstp->rq_xid; 654178354Ssam rply.rm_direction = REPLY; 655178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 656178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 657178354Ssam rply.acpted_rply.ar_stat = SYSTEM_ERR; 658178354Ssam 659178354Ssam if (xprt->xp_pool->sp_rcache) 660178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 661178354Ssam &rply, svc_getrpccaller(rqstp), NULL); 662178354Ssam 663178354Ssam svc_sendreply_common(rqstp, &rply, NULL); 664178354Ssam} 665178354Ssam 666178354Ssam/* 667178354Ssam * Authentication error reply 668178354Ssam */ 669178354Ssamvoid 670178354Ssamsvcerr_auth(struct svc_req *rqstp, enum auth_stat why) 671178354Ssam{ 672178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 673178354Ssam struct rpc_msg rply; 674178354Ssam 675178354Ssam rply.rm_xid = rqstp->rq_xid; 676178354Ssam rply.rm_direction = REPLY; 677190391Ssam rply.rm_reply.rp_stat = MSG_DENIED; 678190391Ssam rply.rjcted_rply.rj_stat = AUTH_ERROR; 679190391Ssam rply.rjcted_rply.rj_why = why; 680190391Ssam 681178354Ssam if (xprt->xp_pool->sp_rcache) 682190391Ssam replay_setreply(xprt->xp_pool->sp_rcache, 683178354Ssam &rply, svc_getrpccaller(rqstp), NULL); 684178354Ssam 685178354Ssam svc_sendreply_common(rqstp, &rply, NULL); 686178354Ssam} 687178354Ssam 688178354Ssam/* 689178354Ssam * Auth too weak error reply 690178354Ssam */ 691178354Ssamvoid 692178354Ssamsvcerr_weakauth(struct svc_req *rqstp) 693178354Ssam{ 694178354Ssam 695178354Ssam svcerr_auth(rqstp, AUTH_TOOWEAK); 696178354Ssam} 697178354Ssam 698178354Ssam/* 699178354Ssam * Program unavailable error reply 700178354Ssam */ 701178354Ssamvoid 702178354Ssamsvcerr_noprog(struct svc_req *rqstp) 703178354Ssam{ 704178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 705178354Ssam struct rpc_msg rply; 706178354Ssam 707178354Ssam rply.rm_xid = rqstp->rq_xid; 708178354Ssam rply.rm_direction = REPLY; 709178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 710178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 711262007Skevlo rply.acpted_rply.ar_stat = PROG_UNAVAIL; 712178354Ssam 713178354Ssam if (xprt->xp_pool->sp_rcache) 714178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 715178354Ssam &rply, svc_getrpccaller(rqstp), NULL); 716178354Ssam 717192468Ssam svc_sendreply_common(rqstp, &rply, NULL); 718192468Ssam} 719178354Ssam 720178354Ssam/* 721178354Ssam * Program version mismatch error reply 722178354Ssam */ 723178354Ssamvoid 724192468Ssamsvcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 725178354Ssam{ 726178354Ssam SVCXPRT *xprt = rqstp->rq_xprt; 727178354Ssam struct rpc_msg rply; 728178354Ssam 729178354Ssam rply.rm_xid = rqstp->rq_xid; 730178354Ssam rply.rm_direction = REPLY; 731178354Ssam rply.rm_reply.rp_stat = MSG_ACCEPTED; 732178354Ssam rply.acpted_rply.ar_verf = rqstp->rq_verf; 733178354Ssam rply.acpted_rply.ar_stat = PROG_MISMATCH; 734178354Ssam rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 735192765Ssam rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 736192468Ssam 737178354Ssam if (xprt->xp_pool->sp_rcache) 738178354Ssam replay_setreply(xprt->xp_pool->sp_rcache, 739178354Ssam &rply, svc_getrpccaller(rqstp), NULL); 740178354Ssam 741178354Ssam svc_sendreply_common(rqstp, &rply, NULL); 742178354Ssam} 743178354Ssam 744192468Ssam/* 745178354Ssam * Allocate a new server transport structure. All fields are 746178354Ssam * initialized to zero and xp_p3 is initialized to point at an 747178354Ssam * extension structure to hold various flags and authentication 748178354Ssam * parameters. 749178354Ssam */ 750178354SsamSVCXPRT * 751178354Ssamsvc_xprt_alloc() 752178354Ssam{ 753178354Ssam SVCXPRT *xprt; 754178354Ssam SVCXPRT_EXT *ext; 755218927Sbschmidt 756218927Sbschmidt xprt = mem_alloc(sizeof(SVCXPRT)); 757218958Sbschmidt memset(xprt, 0, sizeof(SVCXPRT)); 758218958Sbschmidt ext = mem_alloc(sizeof(SVCXPRT_EXT)); 759218958Sbschmidt memset(ext, 0, sizeof(SVCXPRT_EXT)); 760218927Sbschmidt xprt->xp_p3 = ext; 761218958Sbschmidt refcount_init(&xprt->xp_refs, 1); 762218958Sbschmidt 763218927Sbschmidt return (xprt); 764218958Sbschmidt} 765218958Sbschmidt 766218958Sbschmidt/* 767218958Sbschmidt * Free a server transport structure. 768218927Sbschmidt */ 769218927Sbschmidtvoid 770218927Sbschmidtsvc_xprt_free(xprt) 771218958Sbschmidt SVCXPRT *xprt; 772218958Sbschmidt{ 773218958Sbschmidt 774218927Sbschmidt mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 775218927Sbschmidt mem_free(xprt, sizeof(SVCXPRT)); 776218927Sbschmidt} 777178354Ssam 778218927Sbschmidt/* ******************* SERVER INPUT STUFF ******************* */ 779178354Ssam 780178354Ssam/* 781218927Sbschmidt * Read RPC requests from a transport and queue them to be 782218927Sbschmidt * executed. We handle authentication and replay cache replies here. 783218927Sbschmidt * Actually dispatching the RPC is deferred till svc_executereq. 784218927Sbschmidt */ 785178354Ssamstatic enum xprt_stat 786218927Sbschmidtsvc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 787218927Sbschmidt{ 788218927Sbschmidt SVCPOOL *pool = xprt->xp_pool; 789218927Sbschmidt struct svc_req *r; 790178354Ssam struct rpc_msg msg; 791178354Ssam struct mbuf *args; 792218927Sbschmidt enum xprt_stat stat; 793178354Ssam 794178354Ssam /* now receive msgs from xprtprt (support batch calls) */ 795218927Sbschmidt r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 796178354Ssam 797178354Ssam msg.rm_call.cb_cred.oa_base = r->rq_credarea; 798178354Ssam msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 799178354Ssam r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 800 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 801 enum auth_stat why; 802 803 /* 804 * Handle replays and authenticate before queuing the 805 * request to be executed. 806 */ 807 SVC_ACQUIRE(xprt); 808 r->rq_xprt = xprt; 809 if (pool->sp_rcache) { 810 struct rpc_msg repmsg; 811 struct mbuf *repbody; 812 enum replay_state rs; 813 rs = replay_find(pool->sp_rcache, &msg, 814 svc_getrpccaller(r), &repmsg, &repbody); 815 switch (rs) { 816 case RS_NEW: 817 break; 818 case RS_DONE: 819 SVC_REPLY(xprt, &repmsg, r->rq_addr, 820 repbody); 821 if (r->rq_addr) { 822 free(r->rq_addr, M_SONAME); 823 r->rq_addr = NULL; 824 } 825 m_freem(args); 826 goto call_done; 827 828 default: 829 m_freem(args); 830 goto call_done; 831 } 832 } 833 834 r->rq_xid = msg.rm_xid; 835 r->rq_prog = msg.rm_call.cb_prog; 836 r->rq_vers = msg.rm_call.cb_vers; 837 r->rq_proc = msg.rm_call.cb_proc; 838 r->rq_size = sizeof(*r) + m_length(args, NULL); 839 r->rq_args = args; 840 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 841 /* 842 * RPCSEC_GSS uses this return code 843 * for requests that form part of its 844 * context establishment protocol and 845 * should not be dispatched to the 846 * application. 847 */ 848 if (why != RPCSEC_GSS_NODISPATCH) 849 svcerr_auth(r, why); 850 goto call_done; 851 } 852 853 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 854 svcerr_decode(r); 855 goto call_done; 856 } 857 858 /* 859 * Everything checks out, return request to caller. 860 */ 861 *rqstp_ret = r; 862 r = NULL; 863 } 864call_done: 865 if (r) { 866 svc_freereq(r); 867 r = NULL; 868 } 869 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 870 xprt_unregister(xprt); 871 } 872 873 return (stat); 874} 875 876static void 877svc_executereq(struct svc_req *rqstp) 878{ 879 SVCXPRT *xprt = rqstp->rq_xprt; 880 SVCPOOL *pool = xprt->xp_pool; 881 int prog_found; 882 rpcvers_t low_vers; 883 rpcvers_t high_vers; 884 struct svc_callout *s; 885 886 /* now match message with a registered service*/ 887 prog_found = FALSE; 888 low_vers = (rpcvers_t) -1L; 889 high_vers = (rpcvers_t) 0L; 890 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 891 if (s->sc_prog == rqstp->rq_prog) { 892 if (s->sc_vers == rqstp->rq_vers) { 893 /* 894 * We hand ownership of r to the 895 * dispatch method - they must call 896 * svc_freereq. 897 */ 898 (*s->sc_dispatch)(rqstp, xprt); 899 return; 900 } /* found correct version */ 901 prog_found = TRUE; 902 if (s->sc_vers < low_vers) 903 low_vers = s->sc_vers; 904 if (s->sc_vers > high_vers) 905 high_vers = s->sc_vers; 906 } /* found correct program */ 907 } 908 909 /* 910 * if we got here, the program or version 911 * is not served ... 912 */ 913 if (prog_found) 914 svcerr_progvers(rqstp, low_vers, high_vers); 915 else 916 svcerr_noprog(rqstp); 917 918 svc_freereq(rqstp); 919} 920 921static void 922svc_checkidle(SVCPOOL *pool) 923{ 924 SVCXPRT *xprt, *nxprt; 925 time_t timo; 926 struct svcxprt_list cleanup; 927 928 TAILQ_INIT(&cleanup); 929 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 930 /* 931 * Only some transports have idle timers. Don't time 932 * something out which is just waking up. 933 */ 934 if (!xprt->xp_idletimeout || xprt->xp_thread) 935 continue; 936 937 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 938 if (time_uptime > timo) { 939 xprt_unregister_locked(xprt); 940 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 941 } 942 } 943 944 mtx_unlock(&pool->sp_lock); 945 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 946 SVC_RELEASE(xprt); 947 } 948 mtx_lock(&pool->sp_lock); 949 950} 951 952static void 953svc_assign_waiting_sockets(SVCPOOL *pool) 954{ 955 SVCXPRT *xprt; 956 957 while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { 958 if (xprt_assignthread(xprt)) 959 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 960 else 961 break; 962 } 963} 964 965static bool_t 966svc_request_space_available(SVCPOOL *pool) 967{ 968 969 mtx_assert(&pool->sp_lock, MA_OWNED); 970 971 if (pool->sp_space_throttled) { 972 /* 973 * Below the low-water yet? If so, assign any waiting sockets. 974 */ 975 if (pool->sp_space_used < pool->sp_space_low) { 976 pool->sp_space_throttled = FALSE; 977 svc_assign_waiting_sockets(pool); 978 return TRUE; 979 } 980 981 return FALSE; 982 } else { 983 if (pool->sp_space_used 984 >= pool->sp_space_high) { 985 pool->sp_space_throttled = TRUE; 986 pool->sp_space_throttle_count++; 987 return FALSE; 988 } 989 990 return TRUE; 991 } 992} 993 994static void 995svc_run_internal(SVCPOOL *pool, bool_t ismaster) 996{ 997 SVCTHREAD *st, *stpref; 998 SVCXPRT *xprt; 999 enum xprt_stat stat; 1000 struct svc_req *rqstp; 1001 int error; 1002 1003 st = mem_alloc(sizeof(*st)); 1004 st->st_xprt = NULL; 1005 STAILQ_INIT(&st->st_reqs); 1006 cv_init(&st->st_cond, "rpcsvc"); 1007 1008 mtx_lock(&pool->sp_lock); 1009 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 1010 1011 /* 1012 * If we are a new thread which was spawned to cope with 1013 * increased load, set the state back to SVCPOOL_ACTIVE. 1014 */ 1015 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1016 pool->sp_state = SVCPOOL_ACTIVE; 1017 1018 while (pool->sp_state != SVCPOOL_CLOSING) { 1019 /* 1020 * Create new thread if requested. 1021 */ 1022 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1023 pool->sp_state = SVCPOOL_THREADSTARTING; 1024 pool->sp_lastcreatetime = time_uptime; 1025 mtx_unlock(&pool->sp_lock); 1026 svc_new_thread(pool); 1027 mtx_lock(&pool->sp_lock); 1028 continue; 1029 } 1030 1031 /* 1032 * Check for idle transports once per second. 1033 */ 1034 if (time_uptime > pool->sp_lastidlecheck) { 1035 pool->sp_lastidlecheck = time_uptime; 1036 svc_checkidle(pool); 1037 } 1038 1039 xprt = st->st_xprt; 1040 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1041 /* 1042 * Enforce maxthreads count. 1043 */ 1044 if (pool->sp_threadcount > pool->sp_maxthreads) 1045 break; 1046 1047 /* 1048 * Before sleeping, see if we can find an 1049 * active transport which isn't being serviced 1050 * by a thread. 1051 */ 1052 if (svc_request_space_available(pool) && 1053 (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) { 1054 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1055 SVC_ACQUIRE(xprt); 1056 xprt->xp_thread = st; 1057 st->st_xprt = xprt; 1058 continue; 1059 } 1060 1061 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1062 st->st_idle = TRUE; 1063 if (ismaster || (!ismaster && 1064 pool->sp_threadcount > pool->sp_minthreads)) 1065 error = cv_timedwait_sig(&st->st_cond, 1066 &pool->sp_lock, 5 * hz); 1067 else 1068 error = cv_wait_sig(&st->st_cond, 1069 &pool->sp_lock); 1070 if (st->st_idle) { 1071 LIST_REMOVE(st, st_ilink); 1072 st->st_idle = FALSE; 1073 } 1074 1075 /* 1076 * Reduce worker thread count when idle. 1077 */ 1078 if (error == EWOULDBLOCK) { 1079 if (!ismaster 1080 && (pool->sp_threadcount 1081 > pool->sp_minthreads) 1082 && !st->st_xprt 1083 && STAILQ_EMPTY(&st->st_reqs)) 1084 break; 1085 } else if (error) { 1086 mtx_unlock(&pool->sp_lock); 1087 svc_exit(pool); 1088 mtx_lock(&pool->sp_lock); 1089 break; 1090 } 1091 continue; 1092 } 1093 1094 if (xprt) { 1095 /* 1096 * Drain the transport socket and queue up any 1097 * RPCs. 1098 */ 1099 xprt->xp_lastactive = time_uptime; 1100 stat = XPRT_IDLE; 1101 do { 1102 if (!svc_request_space_available(pool)) 1103 break; 1104 rqstp = NULL; 1105 mtx_unlock(&pool->sp_lock); 1106 stat = svc_getreq(xprt, &rqstp); 1107 mtx_lock(&pool->sp_lock); 1108 if (rqstp) { 1109 /* 1110 * See if the application has 1111 * a preference for some other 1112 * thread. 1113 */ 1114 stpref = st; 1115 if (pool->sp_assign) 1116 stpref = pool->sp_assign(st, 1117 rqstp); 1118 1119 pool->sp_space_used += 1120 rqstp->rq_size; 1121 if (pool->sp_space_used 1122 > pool->sp_space_used_highest) 1123 pool->sp_space_used_highest = 1124 pool->sp_space_used; 1125 rqstp->rq_thread = stpref; 1126 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1127 rqstp, rq_link); 1128 stpref->st_reqcount++; 1129 1130 /* 1131 * If we assigned the request 1132 * to another thread, make 1133 * sure its awake and continue 1134 * reading from the 1135 * socket. Otherwise, try to 1136 * find some other thread to 1137 * read from the socket and 1138 * execute the request 1139 * immediately. 1140 */ 1141 if (stpref == st) 1142 break; 1143 if (stpref->st_idle) { 1144 LIST_REMOVE(stpref, st_ilink); 1145 stpref->st_idle = FALSE; 1146 cv_signal(&stpref->st_cond); 1147 } 1148 } 1149 } while (stat == XPRT_MOREREQS 1150 && pool->sp_state != SVCPOOL_CLOSING); 1151 1152 /* 1153 * Move this transport to the end of the 1154 * active list to ensure fairness when 1155 * multiple transports are active. If this was 1156 * the last queued request, svc_getreq will 1157 * end up calling xprt_inactive to remove from 1158 * the active list. 1159 */ 1160 xprt->xp_thread = NULL; 1161 st->st_xprt = NULL; 1162 if (xprt->xp_active) { 1163 if (!xprt_assignthread(xprt)) 1164 TAILQ_INSERT_TAIL(&pool->sp_active, 1165 xprt, xp_alink); 1166 } 1167 mtx_unlock(&pool->sp_lock); 1168 SVC_RELEASE(xprt); 1169 mtx_lock(&pool->sp_lock); 1170 } 1171 1172 /* 1173 * Execute what we have queued. 1174 */ 1175 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1176 size_t sz = rqstp->rq_size; 1177 mtx_unlock(&pool->sp_lock); 1178 svc_executereq(rqstp); 1179 mtx_lock(&pool->sp_lock); 1180 pool->sp_space_used -= sz; 1181 } 1182 } 1183 1184 if (st->st_xprt) { 1185 xprt = st->st_xprt; 1186 st->st_xprt = NULL; 1187 SVC_RELEASE(xprt); 1188 } 1189 1190 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1191 LIST_REMOVE(st, st_link); 1192 pool->sp_threadcount--; 1193 1194 mtx_unlock(&pool->sp_lock); 1195 1196 cv_destroy(&st->st_cond); 1197 mem_free(st, sizeof(*st)); 1198 1199 if (!ismaster) 1200 wakeup(pool); 1201} 1202 1203static void 1204svc_thread_start(void *arg) 1205{ 1206 1207 svc_run_internal((SVCPOOL *) arg, FALSE); 1208 kthread_exit(); 1209} 1210 1211static void 1212svc_new_thread(SVCPOOL *pool) 1213{ 1214 struct thread *td; 1215 1216 pool->sp_threadcount++; 1217 kthread_add(svc_thread_start, pool, 1218 pool->sp_proc, &td, 0, 0, 1219 "%s: service", pool->sp_name); 1220} 1221 1222void 1223svc_run(SVCPOOL *pool) 1224{ 1225 int i; 1226 struct proc *p; 1227 struct thread *td; 1228 1229 p = curproc; 1230 td = curthread; 1231 snprintf(td->td_name, sizeof(td->td_name), 1232 "%s: master", pool->sp_name); 1233 pool->sp_state = SVCPOOL_ACTIVE; 1234 pool->sp_proc = p; 1235 pool->sp_lastcreatetime = time_uptime; 1236 pool->sp_threadcount = 1; 1237 1238 for (i = 1; i < pool->sp_minthreads; i++) { 1239 svc_new_thread(pool); 1240 } 1241 1242 svc_run_internal(pool, TRUE); 1243 1244 mtx_lock(&pool->sp_lock); 1245 while (pool->sp_threadcount > 0) 1246 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1247 mtx_unlock(&pool->sp_lock); 1248} 1249 1250void 1251svc_exit(SVCPOOL *pool) 1252{ 1253 SVCTHREAD *st; 1254 1255 mtx_lock(&pool->sp_lock); 1256 1257 if (pool->sp_state != SVCPOOL_CLOSING) { 1258 pool->sp_state = SVCPOOL_CLOSING; 1259 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1260 cv_signal(&st->st_cond); 1261 } 1262 1263 mtx_unlock(&pool->sp_lock); 1264} 1265 1266bool_t 1267svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1268{ 1269 struct mbuf *m; 1270 XDR xdrs; 1271 bool_t stat; 1272 1273 m = rqstp->rq_args; 1274 rqstp->rq_args = NULL; 1275 1276 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1277 stat = xargs(&xdrs, args); 1278 XDR_DESTROY(&xdrs); 1279 1280 return (stat); 1281} 1282 1283bool_t 1284svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1285{ 1286 XDR xdrs; 1287 1288 if (rqstp->rq_addr) { 1289 free(rqstp->rq_addr, M_SONAME); 1290 rqstp->rq_addr = NULL; 1291 } 1292 1293 xdrs.x_op = XDR_FREE; 1294 return (xargs(&xdrs, args)); 1295} 1296 1297void 1298svc_freereq(struct svc_req *rqstp) 1299{ 1300 SVCTHREAD *st; 1301 SVCXPRT *xprt; 1302 SVCPOOL *pool; 1303 1304 st = rqstp->rq_thread; 1305 xprt = rqstp->rq_xprt; 1306 if (xprt) 1307 pool = xprt->xp_pool; 1308 else 1309 pool = NULL; 1310 if (st) { 1311 mtx_lock(&pool->sp_lock); 1312 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1313 ("Freeing request out of order")); 1314 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1315 st->st_reqcount--; 1316 if (pool->sp_done) 1317 pool->sp_done(st, rqstp); 1318 mtx_unlock(&pool->sp_lock); 1319 } 1320 1321 if (rqstp->rq_auth.svc_ah_ops) 1322 SVCAUTH_RELEASE(&rqstp->rq_auth); 1323 1324 if (rqstp->rq_xprt) { 1325 SVC_RELEASE(rqstp->rq_xprt); 1326 } 1327 1328 if (rqstp->rq_addr) 1329 free(rqstp->rq_addr, M_SONAME); 1330 1331 if (rqstp->rq_args) 1332 m_freem(rqstp->rq_args); 1333 1334 free(rqstp, M_RPC); 1335} 1336