1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 *   this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 *   this list of conditions and the following disclaimer in the documentation
15 *   and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 *   contributors may be used to endorse or promote products derived
18 *   from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33#if defined(LIBC_SCCS) && !defined(lint)
34static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
36#endif
37#include <sys/cdefs.h>
38__FBSDID("$FreeBSD$");
39
40/*
41 * svc.c, Server-side remote procedure call interface.
42 *
43 * There are two sets of procedures here.  The xprt routines are
44 * for handling transport handles.  The svc routines handle the
45 * list of service routines.
46 *
47 * Copyright (C) 1984, Sun Microsystems, Inc.
48 */
49
50#include <sys/param.h>
51#include <sys/lock.h>
52#include <sys/kernel.h>
53#include <sys/kthread.h>
54#include <sys/malloc.h>
55#include <sys/mbuf.h>
56#include <sys/mutex.h>
57#include <sys/proc.h>
58#include <sys/queue.h>
59#include <sys/socketvar.h>
60#include <sys/systm.h>
61#include <sys/smp.h>
62#include <sys/sx.h>
63#include <sys/ucred.h>
64
65#include <rpc/rpc.h>
66#include <rpc/rpcb_clnt.h>
67#include <rpc/replay.h>
68
69#include <rpc/rpc_com.h>
70
71#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
72#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73
74static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75    char *);
76static void svc_new_thread(SVCGROUP *grp);
77static void xprt_unregister_locked(SVCXPRT *xprt);
78static void svc_change_space_used(SVCPOOL *pool, long delta);
79static bool_t svc_request_space_available(SVCPOOL *pool);
80static void svcpool_cleanup(SVCPOOL *pool);
81
82/* ***************  SVCXPRT related stuff **************** */
83
84static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
85static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
86static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87
88SVCPOOL*
89svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90{
91	SVCPOOL *pool;
92	SVCGROUP *grp;
93	int g;
94
95	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96
97	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98	pool->sp_name = name;
99	pool->sp_state = SVCPOOL_INIT;
100	pool->sp_proc = NULL;
101	TAILQ_INIT(&pool->sp_callouts);
102	TAILQ_INIT(&pool->sp_lcallouts);
103	pool->sp_minthreads = 1;
104	pool->sp_maxthreads = 1;
105	pool->sp_groupcount = 1;
106	for (g = 0; g < SVC_MAXGROUPS; g++) {
107		grp = &pool->sp_groups[g];
108		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109		grp->sg_pool = pool;
110		grp->sg_state = SVCPOOL_ACTIVE;
111		TAILQ_INIT(&grp->sg_xlist);
112		TAILQ_INIT(&grp->sg_active);
113		LIST_INIT(&grp->sg_idlethreads);
114		grp->sg_minthreads = 1;
115		grp->sg_maxthreads = 1;
116	}
117
118	/*
119	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
120	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
121	 * on LP64 architectures, so cast to u_long to avoid undefined
122	 * behavior.  (ILP32 architectures cannot have nmbclusters
123	 * large enough to overflow for other reasons.)
124	 */
125	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
126	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127
128	sysctl_ctx_init(&pool->sp_sysctl);
129	if (sysctl_base) {
130		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
132		    pool, 0, svcpool_minthread_sysctl, "I",
133		    "Minimal number of threads");
134		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
136		    pool, 0, svcpool_maxthread_sysctl, "I",
137		    "Maximal number of threads");
138		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139		    "threads", CTLTYPE_INT | CTLFLAG_RD,
140		    pool, 0, svcpool_threads_sysctl, "I",
141		    "Current number of threads");
142		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
144		    "Number of thread groups");
145
146		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147		    "request_space_used", CTLFLAG_RD,
148		    &pool->sp_space_used,
149		    "Space in parsed but not handled requests.");
150
151		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152		    "request_space_used_highest", CTLFLAG_RD,
153		    &pool->sp_space_used_highest,
154		    "Highest space used since reboot.");
155
156		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157		    "request_space_high", CTLFLAG_RW,
158		    &pool->sp_space_high,
159		    "Maximum space in parsed but not handled requests.");
160
161		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162		    "request_space_low", CTLFLAG_RW,
163		    &pool->sp_space_low,
164		    "Low water mark for request space.");
165
166		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167		    "request_space_throttled", CTLFLAG_RD,
168		    &pool->sp_space_throttled, 0,
169		    "Whether nfs requests are currently throttled");
170
171		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
172		    "request_space_throttle_count", CTLFLAG_RD,
173		    &pool->sp_space_throttle_count, 0,
174		    "Count of times throttling based on request space has occurred");
175	}
176
177	return pool;
178}
179
180/*
181 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
182 * the pool data structures.
183 */
184static void
185svcpool_cleanup(SVCPOOL *pool)
186{
187	SVCGROUP *grp;
188	SVCXPRT *xprt, *nxprt;
189	struct svc_callout *s;
190	struct svc_loss_callout *sl;
191	struct svcxprt_list cleanup;
192	int g;
193
194	TAILQ_INIT(&cleanup);
195
196	for (g = 0; g < SVC_MAXGROUPS; g++) {
197		grp = &pool->sp_groups[g];
198		mtx_lock(&grp->sg_lock);
199		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
200			xprt_unregister_locked(xprt);
201			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202		}
203		mtx_unlock(&grp->sg_lock);
204	}
205	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
206		if (xprt->xp_socket != NULL)
207			soshutdown(xprt->xp_socket, SHUT_WR);
208		SVC_RELEASE(xprt);
209	}
210
211	mtx_lock(&pool->sp_lock);
212	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
213		mtx_unlock(&pool->sp_lock);
214		svc_unreg(pool, s->sc_prog, s->sc_vers);
215		mtx_lock(&pool->sp_lock);
216	}
217	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
218		mtx_unlock(&pool->sp_lock);
219		svc_loss_unreg(pool, sl->slc_dispatch);
220		mtx_lock(&pool->sp_lock);
221	}
222	mtx_unlock(&pool->sp_lock);
223}
224
225void
226svcpool_destroy(SVCPOOL *pool)
227{
228	SVCGROUP *grp;
229	int g;
230
231	svcpool_cleanup(pool);
232
233	for (g = 0; g < SVC_MAXGROUPS; g++) {
234		grp = &pool->sp_groups[g];
235		mtx_destroy(&grp->sg_lock);
236	}
237	mtx_destroy(&pool->sp_lock);
238
239	if (pool->sp_rcache)
240		replay_freecache(pool->sp_rcache);
241
242	sysctl_ctx_free(&pool->sp_sysctl);
243	free(pool, M_RPC);
244}
245
246/*
247 * Similar to svcpool_destroy(), except that it does not destroy the actual
248 * data structures.  As such, "pool" may be used again.
249 */
250void
251svcpool_close(SVCPOOL *pool)
252{
253	SVCGROUP *grp;
254	int g;
255
256	svcpool_cleanup(pool);
257
258	/* Now, initialize the pool's state for a fresh svc_run() call. */
259	mtx_lock(&pool->sp_lock);
260	pool->sp_state = SVCPOOL_INIT;
261	mtx_unlock(&pool->sp_lock);
262	for (g = 0; g < SVC_MAXGROUPS; g++) {
263		grp = &pool->sp_groups[g];
264		mtx_lock(&grp->sg_lock);
265		grp->sg_state = SVCPOOL_ACTIVE;
266		mtx_unlock(&grp->sg_lock);
267	}
268}
269
270/*
271 * Sysctl handler to get the present thread count on a pool
272 */
273static int
274svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
275{
276	SVCPOOL *pool;
277	int threads, error, g;
278
279	pool = oidp->oid_arg1;
280	threads = 0;
281	mtx_lock(&pool->sp_lock);
282	for (g = 0; g < pool->sp_groupcount; g++)
283		threads += pool->sp_groups[g].sg_threadcount;
284	mtx_unlock(&pool->sp_lock);
285	error = sysctl_handle_int(oidp, &threads, 0, req);
286	return (error);
287}
288
289/*
290 * Sysctl handler to set the minimum thread count on a pool
291 */
292static int
293svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
294{
295	SVCPOOL *pool;
296	int newminthreads, error, g;
297
298	pool = oidp->oid_arg1;
299	newminthreads = pool->sp_minthreads;
300	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
301	if (error == 0 && newminthreads != pool->sp_minthreads) {
302		if (newminthreads > pool->sp_maxthreads)
303			return (EINVAL);
304		mtx_lock(&pool->sp_lock);
305		pool->sp_minthreads = newminthreads;
306		for (g = 0; g < pool->sp_groupcount; g++) {
307			pool->sp_groups[g].sg_minthreads = max(1,
308			    pool->sp_minthreads / pool->sp_groupcount);
309		}
310		mtx_unlock(&pool->sp_lock);
311	}
312	return (error);
313}
314
315/*
316 * Sysctl handler to set the maximum thread count on a pool
317 */
318static int
319svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
320{
321	SVCPOOL *pool;
322	int newmaxthreads, error, g;
323
324	pool = oidp->oid_arg1;
325	newmaxthreads = pool->sp_maxthreads;
326	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
327	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
328		if (newmaxthreads < pool->sp_minthreads)
329			return (EINVAL);
330		mtx_lock(&pool->sp_lock);
331		pool->sp_maxthreads = newmaxthreads;
332		for (g = 0; g < pool->sp_groupcount; g++) {
333			pool->sp_groups[g].sg_maxthreads = max(1,
334			    pool->sp_maxthreads / pool->sp_groupcount);
335		}
336		mtx_unlock(&pool->sp_lock);
337	}
338	return (error);
339}
340
341/*
342 * Activate a transport handle.
343 */
344void
345xprt_register(SVCXPRT *xprt)
346{
347	SVCPOOL *pool = xprt->xp_pool;
348	SVCGROUP *grp;
349	int g;
350
351	SVC_ACQUIRE(xprt);
352	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
353	xprt->xp_group = grp = &pool->sp_groups[g];
354	mtx_lock(&grp->sg_lock);
355	xprt->xp_registered = TRUE;
356	xprt->xp_active = FALSE;
357	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
358	mtx_unlock(&grp->sg_lock);
359}
360
361/*
362 * De-activate a transport handle. Note: the locked version doesn't
363 * release the transport - caller must do that after dropping the pool
364 * lock.
365 */
366static void
367xprt_unregister_locked(SVCXPRT *xprt)
368{
369	SVCGROUP *grp = xprt->xp_group;
370
371	mtx_assert(&grp->sg_lock, MA_OWNED);
372	KASSERT(xprt->xp_registered == TRUE,
373	    ("xprt_unregister_locked: not registered"));
374	xprt_inactive_locked(xprt);
375	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
376	xprt->xp_registered = FALSE;
377}
378
379void
380xprt_unregister(SVCXPRT *xprt)
381{
382	SVCGROUP *grp = xprt->xp_group;
383
384	mtx_lock(&grp->sg_lock);
385	if (xprt->xp_registered == FALSE) {
386		/* Already unregistered by another thread */
387		mtx_unlock(&grp->sg_lock);
388		return;
389	}
390	xprt_unregister_locked(xprt);
391	mtx_unlock(&grp->sg_lock);
392
393	if (xprt->xp_socket != NULL)
394		soshutdown(xprt->xp_socket, SHUT_WR);
395	SVC_RELEASE(xprt);
396}
397
398/*
399 * Attempt to assign a service thread to this transport.
400 */
401static int
402xprt_assignthread(SVCXPRT *xprt)
403{
404	SVCGROUP *grp = xprt->xp_group;
405	SVCTHREAD *st;
406
407	mtx_assert(&grp->sg_lock, MA_OWNED);
408	st = LIST_FIRST(&grp->sg_idlethreads);
409	if (st) {
410		LIST_REMOVE(st, st_ilink);
411		SVC_ACQUIRE(xprt);
412		xprt->xp_thread = st;
413		st->st_xprt = xprt;
414		cv_signal(&st->st_cond);
415		return (TRUE);
416	} else {
417		/*
418		 * See if we can create a new thread. The
419		 * actual thread creation happens in
420		 * svc_run_internal because our locking state
421		 * is poorly defined (we are typically called
422		 * from a socket upcall). Don't create more
423		 * than one thread per second.
424		 */
425		if (grp->sg_state == SVCPOOL_ACTIVE
426		    && grp->sg_lastcreatetime < time_uptime
427		    && grp->sg_threadcount < grp->sg_maxthreads) {
428			grp->sg_state = SVCPOOL_THREADWANTED;
429		}
430	}
431	return (FALSE);
432}
433
434void
435xprt_active(SVCXPRT *xprt)
436{
437	SVCGROUP *grp = xprt->xp_group;
438
439	mtx_lock(&grp->sg_lock);
440
441	if (!xprt->xp_registered) {
442		/*
443		 * Race with xprt_unregister - we lose.
444		 */
445		mtx_unlock(&grp->sg_lock);
446		return;
447	}
448
449	if (!xprt->xp_active) {
450		xprt->xp_active = TRUE;
451		if (xprt->xp_thread == NULL) {
452			if (!svc_request_space_available(xprt->xp_pool) ||
453			    !xprt_assignthread(xprt))
454				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
455				    xp_alink);
456		}
457	}
458
459	mtx_unlock(&grp->sg_lock);
460}
461
462void
463xprt_inactive_locked(SVCXPRT *xprt)
464{
465	SVCGROUP *grp = xprt->xp_group;
466
467	mtx_assert(&grp->sg_lock, MA_OWNED);
468	if (xprt->xp_active) {
469		if (xprt->xp_thread == NULL)
470			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
471		xprt->xp_active = FALSE;
472	}
473}
474
475void
476xprt_inactive(SVCXPRT *xprt)
477{
478	SVCGROUP *grp = xprt->xp_group;
479
480	mtx_lock(&grp->sg_lock);
481	xprt_inactive_locked(xprt);
482	mtx_unlock(&grp->sg_lock);
483}
484
485/*
486 * Variant of xprt_inactive() for use only when sure that port is
487 * assigned to thread. For example, within receive handlers.
488 */
489void
490xprt_inactive_self(SVCXPRT *xprt)
491{
492
493	KASSERT(xprt->xp_thread != NULL,
494	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
495	xprt->xp_active = FALSE;
496}
497
498/*
499 * Add a service program to the callout list.
500 * The dispatch routine will be called when a rpc request for this
501 * program number comes in.
502 */
503bool_t
504svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
505    void (*dispatch)(struct svc_req *, SVCXPRT *),
506    const struct netconfig *nconf)
507{
508	SVCPOOL *pool = xprt->xp_pool;
509	struct svc_callout *s;
510	char *netid = NULL;
511	int flag = 0;
512
513/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
514
515	if (xprt->xp_netid) {
516		netid = strdup(xprt->xp_netid, M_RPC);
517		flag = 1;
518	} else if (nconf && nconf->nc_netid) {
519		netid = strdup(nconf->nc_netid, M_RPC);
520		flag = 1;
521	} /* must have been created with svc_raw_create */
522	if ((netid == NULL) && (flag == 1)) {
523		return (FALSE);
524	}
525
526	mtx_lock(&pool->sp_lock);
527	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
528		if (netid)
529			free(netid, M_RPC);
530		if (s->sc_dispatch == dispatch)
531			goto rpcb_it; /* he is registering another xptr */
532		mtx_unlock(&pool->sp_lock);
533		return (FALSE);
534	}
535	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
536	if (s == NULL) {
537		if (netid)
538			free(netid, M_RPC);
539		mtx_unlock(&pool->sp_lock);
540		return (FALSE);
541	}
542
543	s->sc_prog = prog;
544	s->sc_vers = vers;
545	s->sc_dispatch = dispatch;
546	s->sc_netid = netid;
547	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
548
549	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
550		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
551
552rpcb_it:
553	mtx_unlock(&pool->sp_lock);
554	/* now register the information with the local binder service */
555	if (nconf) {
556		bool_t dummy;
557		struct netconfig tnc;
558		struct netbuf nb;
559		tnc = *nconf;
560		nb.buf = &xprt->xp_ltaddr;
561		nb.len = xprt->xp_ltaddr.ss_len;
562		dummy = rpcb_set(prog, vers, &tnc, &nb);
563		return (dummy);
564	}
565	return (TRUE);
566}
567
568/*
569 * Remove a service program from the callout list.
570 */
571void
572svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
573{
574	struct svc_callout *s;
575
576	/* unregister the information anyway */
577	(void) rpcb_unset(prog, vers, NULL);
578	mtx_lock(&pool->sp_lock);
579	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
580		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
581		if (s->sc_netid)
582			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
583		mem_free(s, sizeof (struct svc_callout));
584	}
585	mtx_unlock(&pool->sp_lock);
586}
587
588/*
589 * Add a service connection loss program to the callout list.
590 * The dispatch routine will be called when some port in ths pool die.
591 */
592bool_t
593svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
594{
595	SVCPOOL *pool = xprt->xp_pool;
596	struct svc_loss_callout *s;
597
598	mtx_lock(&pool->sp_lock);
599	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
600		if (s->slc_dispatch == dispatch)
601			break;
602	}
603	if (s != NULL) {
604		mtx_unlock(&pool->sp_lock);
605		return (TRUE);
606	}
607	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
608	if (s == NULL) {
609		mtx_unlock(&pool->sp_lock);
610		return (FALSE);
611	}
612	s->slc_dispatch = dispatch;
613	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
614	mtx_unlock(&pool->sp_lock);
615	return (TRUE);
616}
617
618/*
619 * Remove a service connection loss program from the callout list.
620 */
621void
622svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
623{
624	struct svc_loss_callout *s;
625
626	mtx_lock(&pool->sp_lock);
627	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
628		if (s->slc_dispatch == dispatch) {
629			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
630			free(s, M_RPC);
631			break;
632		}
633	}
634	mtx_unlock(&pool->sp_lock);
635}
636
637/* ********************** CALLOUT list related stuff ************* */
638
639/*
640 * Search the callout list for a program number, return the callout
641 * struct.
642 */
643static struct svc_callout *
644svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
645{
646	struct svc_callout *s;
647
648	mtx_assert(&pool->sp_lock, MA_OWNED);
649	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
650		if (s->sc_prog == prog && s->sc_vers == vers
651		    && (netid == NULL || s->sc_netid == NULL ||
652			strcmp(netid, s->sc_netid) == 0))
653			break;
654	}
655
656	return (s);
657}
658
659/* ******************* REPLY GENERATION ROUTINES  ************ */
660
661static bool_t
662svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
663    struct mbuf *body)
664{
665	SVCXPRT *xprt = rqstp->rq_xprt;
666	bool_t ok;
667
668	if (rqstp->rq_args) {
669		m_freem(rqstp->rq_args);
670		rqstp->rq_args = NULL;
671	}
672
673	if (xprt->xp_pool->sp_rcache)
674		replay_setreply(xprt->xp_pool->sp_rcache,
675		    rply, svc_getrpccaller(rqstp), body);
676
677	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
678		return (FALSE);
679
680	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
681	if (rqstp->rq_addr) {
682		free(rqstp->rq_addr, M_SONAME);
683		rqstp->rq_addr = NULL;
684	}
685
686	return (ok);
687}
688
689/*
690 * Send a reply to an rpc request
691 */
692bool_t
693svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
694{
695	struct rpc_msg rply;
696	struct mbuf *m;
697	XDR xdrs;
698	bool_t ok;
699
700	rply.rm_xid = rqstp->rq_xid;
701	rply.rm_direction = REPLY;
702	rply.rm_reply.rp_stat = MSG_ACCEPTED;
703	rply.acpted_rply.ar_verf = rqstp->rq_verf;
704	rply.acpted_rply.ar_stat = SUCCESS;
705	rply.acpted_rply.ar_results.where = NULL;
706	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
707
708	m = m_getcl(M_WAITOK, MT_DATA, 0);
709	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
710	ok = xdr_results(&xdrs, xdr_location);
711	XDR_DESTROY(&xdrs);
712
713	if (ok) {
714		return (svc_sendreply_common(rqstp, &rply, m));
715	} else {
716		m_freem(m);
717		return (FALSE);
718	}
719}
720
721bool_t
722svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
723{
724	struct rpc_msg rply;
725
726	rply.rm_xid = rqstp->rq_xid;
727	rply.rm_direction = REPLY;
728	rply.rm_reply.rp_stat = MSG_ACCEPTED;
729	rply.acpted_rply.ar_verf = rqstp->rq_verf;
730	rply.acpted_rply.ar_stat = SUCCESS;
731	rply.acpted_rply.ar_results.where = NULL;
732	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
733
734	return (svc_sendreply_common(rqstp, &rply, m));
735}
736
737/*
738 * No procedure error reply
739 */
740void
741svcerr_noproc(struct svc_req *rqstp)
742{
743	SVCXPRT *xprt = rqstp->rq_xprt;
744	struct rpc_msg rply;
745
746	rply.rm_xid = rqstp->rq_xid;
747	rply.rm_direction = REPLY;
748	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
751
752	if (xprt->xp_pool->sp_rcache)
753		replay_setreply(xprt->xp_pool->sp_rcache,
754		    &rply, svc_getrpccaller(rqstp), NULL);
755
756	svc_sendreply_common(rqstp, &rply, NULL);
757}
758
759/*
760 * Can't decode args error reply
761 */
762void
763svcerr_decode(struct svc_req *rqstp)
764{
765	SVCXPRT *xprt = rqstp->rq_xprt;
766	struct rpc_msg rply;
767
768	rply.rm_xid = rqstp->rq_xid;
769	rply.rm_direction = REPLY;
770	rply.rm_reply.rp_stat = MSG_ACCEPTED;
771	rply.acpted_rply.ar_verf = rqstp->rq_verf;
772	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
773
774	if (xprt->xp_pool->sp_rcache)
775		replay_setreply(xprt->xp_pool->sp_rcache,
776		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
777
778	svc_sendreply_common(rqstp, &rply, NULL);
779}
780
781/*
782 * Some system error
783 */
784void
785svcerr_systemerr(struct svc_req *rqstp)
786{
787	SVCXPRT *xprt = rqstp->rq_xprt;
788	struct rpc_msg rply;
789
790	rply.rm_xid = rqstp->rq_xid;
791	rply.rm_direction = REPLY;
792	rply.rm_reply.rp_stat = MSG_ACCEPTED;
793	rply.acpted_rply.ar_verf = rqstp->rq_verf;
794	rply.acpted_rply.ar_stat = SYSTEM_ERR;
795
796	if (xprt->xp_pool->sp_rcache)
797		replay_setreply(xprt->xp_pool->sp_rcache,
798		    &rply, svc_getrpccaller(rqstp), NULL);
799
800	svc_sendreply_common(rqstp, &rply, NULL);
801}
802
803/*
804 * Authentication error reply
805 */
806void
807svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
808{
809	SVCXPRT *xprt = rqstp->rq_xprt;
810	struct rpc_msg rply;
811
812	rply.rm_xid = rqstp->rq_xid;
813	rply.rm_direction = REPLY;
814	rply.rm_reply.rp_stat = MSG_DENIED;
815	rply.rjcted_rply.rj_stat = AUTH_ERROR;
816	rply.rjcted_rply.rj_why = why;
817
818	if (xprt->xp_pool->sp_rcache)
819		replay_setreply(xprt->xp_pool->sp_rcache,
820		    &rply, svc_getrpccaller(rqstp), NULL);
821
822	svc_sendreply_common(rqstp, &rply, NULL);
823}
824
825/*
826 * Auth too weak error reply
827 */
828void
829svcerr_weakauth(struct svc_req *rqstp)
830{
831
832	svcerr_auth(rqstp, AUTH_TOOWEAK);
833}
834
835/*
836 * Program unavailable error reply
837 */
838void
839svcerr_noprog(struct svc_req *rqstp)
840{
841	SVCXPRT *xprt = rqstp->rq_xprt;
842	struct rpc_msg rply;
843
844	rply.rm_xid = rqstp->rq_xid;
845	rply.rm_direction = REPLY;
846	rply.rm_reply.rp_stat = MSG_ACCEPTED;
847	rply.acpted_rply.ar_verf = rqstp->rq_verf;
848	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
849
850	if (xprt->xp_pool->sp_rcache)
851		replay_setreply(xprt->xp_pool->sp_rcache,
852		    &rply, svc_getrpccaller(rqstp), NULL);
853
854	svc_sendreply_common(rqstp, &rply, NULL);
855}
856
857/*
858 * Program version mismatch error reply
859 */
860void
861svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
862{
863	SVCXPRT *xprt = rqstp->rq_xprt;
864	struct rpc_msg rply;
865
866	rply.rm_xid = rqstp->rq_xid;
867	rply.rm_direction = REPLY;
868	rply.rm_reply.rp_stat = MSG_ACCEPTED;
869	rply.acpted_rply.ar_verf = rqstp->rq_verf;
870	rply.acpted_rply.ar_stat = PROG_MISMATCH;
871	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
872	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
873
874	if (xprt->xp_pool->sp_rcache)
875		replay_setreply(xprt->xp_pool->sp_rcache,
876		    &rply, svc_getrpccaller(rqstp), NULL);
877
878	svc_sendreply_common(rqstp, &rply, NULL);
879}
880
881/*
882 * Allocate a new server transport structure. All fields are
883 * initialized to zero and xp_p3 is initialized to point at an
884 * extension structure to hold various flags and authentication
885 * parameters.
886 */
887SVCXPRT *
888svc_xprt_alloc(void)
889{
890	SVCXPRT *xprt;
891	SVCXPRT_EXT *ext;
892
893	xprt = mem_alloc(sizeof(SVCXPRT));
894	ext = mem_alloc(sizeof(SVCXPRT_EXT));
895	xprt->xp_p3 = ext;
896	refcount_init(&xprt->xp_refs, 1);
897
898	return (xprt);
899}
900
901/*
902 * Free a server transport structure.
903 */
904void
905svc_xprt_free(SVCXPRT *xprt)
906{
907
908	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
909	mem_free(xprt, sizeof(SVCXPRT));
910}
911
912/* ******************* SERVER INPUT STUFF ******************* */
913
914/*
915 * Read RPC requests from a transport and queue them to be
916 * executed. We handle authentication and replay cache replies here.
917 * Actually dispatching the RPC is deferred till svc_executereq.
918 */
919static enum xprt_stat
920svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921{
922	SVCPOOL *pool = xprt->xp_pool;
923	struct svc_req *r;
924	struct rpc_msg msg;
925	struct mbuf *args;
926	struct svc_loss_callout *s;
927	enum xprt_stat stat;
928
929	/* now receive msgs from xprtprt (support batch calls) */
930	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931
932	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936		enum auth_stat why;
937
938		/*
939		 * Handle replays and authenticate before queuing the
940		 * request to be executed.
941		 */
942		SVC_ACQUIRE(xprt);
943		r->rq_xprt = xprt;
944		if (pool->sp_rcache) {
945			struct rpc_msg repmsg;
946			struct mbuf *repbody;
947			enum replay_state rs;
948			rs = replay_find(pool->sp_rcache, &msg,
949			    svc_getrpccaller(r), &repmsg, &repbody);
950			switch (rs) {
951			case RS_NEW:
952				break;
953			case RS_DONE:
954				SVC_REPLY(xprt, &repmsg, r->rq_addr,
955				    repbody, &r->rq_reply_seq);
956				if (r->rq_addr) {
957					free(r->rq_addr, M_SONAME);
958					r->rq_addr = NULL;
959				}
960				m_freem(args);
961				goto call_done;
962
963			default:
964				m_freem(args);
965				goto call_done;
966			}
967		}
968
969		r->rq_xid = msg.rm_xid;
970		r->rq_prog = msg.rm_call.cb_prog;
971		r->rq_vers = msg.rm_call.cb_vers;
972		r->rq_proc = msg.rm_call.cb_proc;
973		r->rq_size = sizeof(*r) + m_length(args, NULL);
974		r->rq_args = args;
975		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976			/*
977			 * RPCSEC_GSS uses this return code
978			 * for requests that form part of its
979			 * context establishment protocol and
980			 * should not be dispatched to the
981			 * application.
982			 */
983			if (why != RPCSEC_GSS_NODISPATCH)
984				svcerr_auth(r, why);
985			goto call_done;
986		}
987
988		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989			svcerr_decode(r);
990			goto call_done;
991		}
992
993		/*
994		 * Everything checks out, return request to caller.
995		 */
996		*rqstp_ret = r;
997		r = NULL;
998	}
999call_done:
1000	if (r) {
1001		svc_freereq(r);
1002		r = NULL;
1003	}
1004	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1005		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1006			(*s->slc_dispatch)(xprt);
1007		xprt_unregister(xprt);
1008	}
1009
1010	return (stat);
1011}
1012
1013static void
1014svc_executereq(struct svc_req *rqstp)
1015{
1016	SVCXPRT *xprt = rqstp->rq_xprt;
1017	SVCPOOL *pool = xprt->xp_pool;
1018	int prog_found;
1019	rpcvers_t low_vers;
1020	rpcvers_t high_vers;
1021	struct svc_callout *s;
1022
1023	/* now match message with a registered service*/
1024	prog_found = FALSE;
1025	low_vers = (rpcvers_t) -1L;
1026	high_vers = (rpcvers_t) 0L;
1027	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1028		if (s->sc_prog == rqstp->rq_prog) {
1029			if (s->sc_vers == rqstp->rq_vers) {
1030				/*
1031				 * We hand ownership of r to the
1032				 * dispatch method - they must call
1033				 * svc_freereq.
1034				 */
1035				(*s->sc_dispatch)(rqstp, xprt);
1036				return;
1037			}  /* found correct version */
1038			prog_found = TRUE;
1039			if (s->sc_vers < low_vers)
1040				low_vers = s->sc_vers;
1041			if (s->sc_vers > high_vers)
1042				high_vers = s->sc_vers;
1043		}   /* found correct program */
1044	}
1045
1046	/*
1047	 * if we got here, the program or version
1048	 * is not served ...
1049	 */
1050	if (prog_found)
1051		svcerr_progvers(rqstp, low_vers, high_vers);
1052	else
1053		svcerr_noprog(rqstp);
1054
1055	svc_freereq(rqstp);
1056}
1057
1058static void
1059svc_checkidle(SVCGROUP *grp)
1060{
1061	SVCXPRT *xprt, *nxprt;
1062	time_t timo;
1063	struct svcxprt_list cleanup;
1064
1065	TAILQ_INIT(&cleanup);
1066	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1067		/*
1068		 * Only some transports have idle timers. Don't time
1069		 * something out which is just waking up.
1070		 */
1071		if (!xprt->xp_idletimeout || xprt->xp_thread)
1072			continue;
1073
1074		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1075		if (time_uptime > timo) {
1076			xprt_unregister_locked(xprt);
1077			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1078		}
1079	}
1080
1081	mtx_unlock(&grp->sg_lock);
1082	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1083		soshutdown(xprt->xp_socket, SHUT_WR);
1084		SVC_RELEASE(xprt);
1085	}
1086	mtx_lock(&grp->sg_lock);
1087}
1088
1089static void
1090svc_assign_waiting_sockets(SVCPOOL *pool)
1091{
1092	SVCGROUP *grp;
1093	SVCXPRT *xprt;
1094	int g;
1095
1096	for (g = 0; g < pool->sp_groupcount; g++) {
1097		grp = &pool->sp_groups[g];
1098		mtx_lock(&grp->sg_lock);
1099		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1100			if (xprt_assignthread(xprt))
1101				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1102			else
1103				break;
1104		}
1105		mtx_unlock(&grp->sg_lock);
1106	}
1107}
1108
1109static void
1110svc_change_space_used(SVCPOOL *pool, long delta)
1111{
1112	unsigned long value;
1113
1114	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1115	if (delta > 0) {
1116		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1117			pool->sp_space_throttled = TRUE;
1118			pool->sp_space_throttle_count++;
1119		}
1120		if (value > pool->sp_space_used_highest)
1121			pool->sp_space_used_highest = value;
1122	} else {
1123		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1124			pool->sp_space_throttled = FALSE;
1125			svc_assign_waiting_sockets(pool);
1126		}
1127	}
1128}
1129
1130static bool_t
1131svc_request_space_available(SVCPOOL *pool)
1132{
1133
1134	if (pool->sp_space_throttled)
1135		return (FALSE);
1136	return (TRUE);
1137}
1138
1139static void
1140svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1141{
1142	SVCPOOL *pool = grp->sg_pool;
1143	SVCTHREAD *st, *stpref;
1144	SVCXPRT *xprt;
1145	enum xprt_stat stat;
1146	struct svc_req *rqstp;
1147	struct proc *p;
1148	long sz;
1149	int error;
1150
1151	st = mem_alloc(sizeof(*st));
1152	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1153	st->st_pool = pool;
1154	st->st_xprt = NULL;
1155	STAILQ_INIT(&st->st_reqs);
1156	cv_init(&st->st_cond, "rpcsvc");
1157
1158	mtx_lock(&grp->sg_lock);
1159
1160	/*
1161	 * If we are a new thread which was spawned to cope with
1162	 * increased load, set the state back to SVCPOOL_ACTIVE.
1163	 */
1164	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1165		grp->sg_state = SVCPOOL_ACTIVE;
1166
1167	while (grp->sg_state != SVCPOOL_CLOSING) {
1168		/*
1169		 * Create new thread if requested.
1170		 */
1171		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1172			grp->sg_state = SVCPOOL_THREADSTARTING;
1173			grp->sg_lastcreatetime = time_uptime;
1174			mtx_unlock(&grp->sg_lock);
1175			svc_new_thread(grp);
1176			mtx_lock(&grp->sg_lock);
1177			continue;
1178		}
1179
1180		/*
1181		 * Check for idle transports once per second.
1182		 */
1183		if (time_uptime > grp->sg_lastidlecheck) {
1184			grp->sg_lastidlecheck = time_uptime;
1185			svc_checkidle(grp);
1186		}
1187
1188		xprt = st->st_xprt;
1189		if (!xprt) {
1190			/*
1191			 * Enforce maxthreads count.
1192			 */
1193			if (!ismaster && grp->sg_threadcount >
1194			    grp->sg_maxthreads)
1195				break;
1196
1197			/*
1198			 * Before sleeping, see if we can find an
1199			 * active transport which isn't being serviced
1200			 * by a thread.
1201			 */
1202			if (svc_request_space_available(pool) &&
1203			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1204				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1205				SVC_ACQUIRE(xprt);
1206				xprt->xp_thread = st;
1207				st->st_xprt = xprt;
1208				continue;
1209			}
1210
1211			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1212			if (ismaster || (!ismaster &&
1213			    grp->sg_threadcount > grp->sg_minthreads))
1214				error = cv_timedwait_sig(&st->st_cond,
1215				    &grp->sg_lock, 5 * hz);
1216			else
1217				error = cv_wait_sig(&st->st_cond,
1218				    &grp->sg_lock);
1219			if (st->st_xprt == NULL)
1220				LIST_REMOVE(st, st_ilink);
1221
1222			/*
1223			 * Reduce worker thread count when idle.
1224			 */
1225			if (error == EWOULDBLOCK) {
1226				if (!ismaster
1227				    && (grp->sg_threadcount
1228					> grp->sg_minthreads)
1229					&& !st->st_xprt)
1230					break;
1231			} else if (error != 0) {
1232				KASSERT(error == EINTR || error == ERESTART,
1233				    ("non-signal error %d", error));
1234				mtx_unlock(&grp->sg_lock);
1235				p = curproc;
1236				PROC_LOCK(p);
1237				if (P_SHOULDSTOP(p) ||
1238				    (p->p_flag & P_TOTAL_STOP) != 0) {
1239					thread_suspend_check(0);
1240					PROC_UNLOCK(p);
1241					mtx_lock(&grp->sg_lock);
1242				} else {
1243					PROC_UNLOCK(p);
1244					svc_exit(pool);
1245					mtx_lock(&grp->sg_lock);
1246					break;
1247				}
1248			}
1249			continue;
1250		}
1251		mtx_unlock(&grp->sg_lock);
1252
1253		/*
1254		 * Drain the transport socket and queue up any RPCs.
1255		 */
1256		xprt->xp_lastactive = time_uptime;
1257		do {
1258			if (!svc_request_space_available(pool))
1259				break;
1260			rqstp = NULL;
1261			stat = svc_getreq(xprt, &rqstp);
1262			if (rqstp) {
1263				svc_change_space_used(pool, rqstp->rq_size);
1264				/*
1265				 * See if the application has a preference
1266				 * for some other thread.
1267				 */
1268				if (pool->sp_assign) {
1269					stpref = pool->sp_assign(st, rqstp);
1270					rqstp->rq_thread = stpref;
1271					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1272					    rqstp, rq_link);
1273					mtx_unlock(&stpref->st_lock);
1274					if (stpref != st)
1275						rqstp = NULL;
1276				} else {
1277					rqstp->rq_thread = st;
1278					STAILQ_INSERT_TAIL(&st->st_reqs,
1279					    rqstp, rq_link);
1280				}
1281			}
1282		} while (rqstp == NULL && stat == XPRT_MOREREQS
1283		    && grp->sg_state != SVCPOOL_CLOSING);
1284
1285		/*
1286		 * Move this transport to the end of the active list to
1287		 * ensure fairness when multiple transports are active.
1288		 * If this was the last queued request, svc_getreq will end
1289		 * up calling xprt_inactive to remove from the active list.
1290		 */
1291		mtx_lock(&grp->sg_lock);
1292		xprt->xp_thread = NULL;
1293		st->st_xprt = NULL;
1294		if (xprt->xp_active) {
1295			if (!svc_request_space_available(pool) ||
1296			    !xprt_assignthread(xprt))
1297				TAILQ_INSERT_TAIL(&grp->sg_active,
1298				    xprt, xp_alink);
1299		}
1300		mtx_unlock(&grp->sg_lock);
1301		SVC_RELEASE(xprt);
1302
1303		/*
1304		 * Execute what we have queued.
1305		 */
1306		mtx_lock(&st->st_lock);
1307		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1308			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1309			mtx_unlock(&st->st_lock);
1310			sz = (long)rqstp->rq_size;
1311			svc_executereq(rqstp);
1312			svc_change_space_used(pool, -sz);
1313			mtx_lock(&st->st_lock);
1314		}
1315		mtx_unlock(&st->st_lock);
1316		mtx_lock(&grp->sg_lock);
1317	}
1318
1319	if (st->st_xprt) {
1320		xprt = st->st_xprt;
1321		st->st_xprt = NULL;
1322		SVC_RELEASE(xprt);
1323	}
1324	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1325	mtx_destroy(&st->st_lock);
1326	cv_destroy(&st->st_cond);
1327	mem_free(st, sizeof(*st));
1328
1329	grp->sg_threadcount--;
1330	if (!ismaster)
1331		wakeup(grp);
1332	mtx_unlock(&grp->sg_lock);
1333}
1334
1335static void
1336svc_thread_start(void *arg)
1337{
1338
1339	svc_run_internal((SVCGROUP *) arg, FALSE);
1340	kthread_exit();
1341}
1342
1343static void
1344svc_new_thread(SVCGROUP *grp)
1345{
1346	SVCPOOL *pool = grp->sg_pool;
1347	struct thread *td;
1348
1349	mtx_lock(&grp->sg_lock);
1350	grp->sg_threadcount++;
1351	mtx_unlock(&grp->sg_lock);
1352	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1353	    "%s: service", pool->sp_name);
1354}
1355
1356void
1357svc_run(SVCPOOL *pool)
1358{
1359	int g, i;
1360	struct proc *p;
1361	struct thread *td;
1362	SVCGROUP *grp;
1363
1364	p = curproc;
1365	td = curthread;
1366	snprintf(td->td_name, sizeof(td->td_name),
1367	    "%s: master", pool->sp_name);
1368	pool->sp_state = SVCPOOL_ACTIVE;
1369	pool->sp_proc = p;
1370
1371	/* Choose group count based on number of threads and CPUs. */
1372	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1373	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1374	for (g = 0; g < pool->sp_groupcount; g++) {
1375		grp = &pool->sp_groups[g];
1376		grp->sg_minthreads = max(1,
1377		    pool->sp_minthreads / pool->sp_groupcount);
1378		grp->sg_maxthreads = max(1,
1379		    pool->sp_maxthreads / pool->sp_groupcount);
1380		grp->sg_lastcreatetime = time_uptime;
1381	}
1382
1383	/* Starting threads */
1384	pool->sp_groups[0].sg_threadcount++;
1385	for (g = 0; g < pool->sp_groupcount; g++) {
1386		grp = &pool->sp_groups[g];
1387		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1388			svc_new_thread(grp);
1389	}
1390	svc_run_internal(&pool->sp_groups[0], TRUE);
1391
1392	/* Waiting for threads to stop. */
1393	for (g = 0; g < pool->sp_groupcount; g++) {
1394		grp = &pool->sp_groups[g];
1395		mtx_lock(&grp->sg_lock);
1396		while (grp->sg_threadcount > 0)
1397			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1398		mtx_unlock(&grp->sg_lock);
1399	}
1400}
1401
1402void
1403svc_exit(SVCPOOL *pool)
1404{
1405	SVCGROUP *grp;
1406	SVCTHREAD *st;
1407	int g;
1408
1409	pool->sp_state = SVCPOOL_CLOSING;
1410	for (g = 0; g < pool->sp_groupcount; g++) {
1411		grp = &pool->sp_groups[g];
1412		mtx_lock(&grp->sg_lock);
1413		if (grp->sg_state != SVCPOOL_CLOSING) {
1414			grp->sg_state = SVCPOOL_CLOSING;
1415			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1416				cv_signal(&st->st_cond);
1417		}
1418		mtx_unlock(&grp->sg_lock);
1419	}
1420}
1421
1422bool_t
1423svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1424{
1425	struct mbuf *m;
1426	XDR xdrs;
1427	bool_t stat;
1428
1429	m = rqstp->rq_args;
1430	rqstp->rq_args = NULL;
1431
1432	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1433	stat = xargs(&xdrs, args);
1434	XDR_DESTROY(&xdrs);
1435
1436	return (stat);
1437}
1438
1439bool_t
1440svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1441{
1442	XDR xdrs;
1443
1444	if (rqstp->rq_addr) {
1445		free(rqstp->rq_addr, M_SONAME);
1446		rqstp->rq_addr = NULL;
1447	}
1448
1449	xdrs.x_op = XDR_FREE;
1450	return (xargs(&xdrs, args));
1451}
1452
1453void
1454svc_freereq(struct svc_req *rqstp)
1455{
1456	SVCTHREAD *st;
1457	SVCPOOL *pool;
1458
1459	st = rqstp->rq_thread;
1460	if (st) {
1461		pool = st->st_pool;
1462		if (pool->sp_done)
1463			pool->sp_done(st, rqstp);
1464	}
1465
1466	if (rqstp->rq_auth.svc_ah_ops)
1467		SVCAUTH_RELEASE(&rqstp->rq_auth);
1468
1469	if (rqstp->rq_xprt) {
1470		SVC_RELEASE(rqstp->rq_xprt);
1471	}
1472
1473	if (rqstp->rq_addr)
1474		free(rqstp->rq_addr, M_SONAME);
1475
1476	if (rqstp->rq_args)
1477		m_freem(rqstp->rq_args);
1478
1479	free(rqstp, M_RPC);
1480}
1481