svc.c revision 297342
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/10/sys/rpc/svc.c 297342 2016-03-28 09:28:01Z mav $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/smp.h>
60#include <sys/sx.h>
61#include <sys/ucred.h>
62
63#include <rpc/rpc.h>
64#include <rpc/rpcb_clnt.h>
65#include <rpc/replay.h>
66
67#include <rpc/rpc_com.h>
68
69#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73    char *);
74static void svc_new_thread(SVCGROUP *grp);
75static void xprt_unregister_locked(SVCXPRT *xprt);
76static void svc_change_space_used(SVCPOOL *pool, long delta);
77static bool_t svc_request_space_available(SVCPOOL *pool);
78
79/* ***************  SVCXPRT related stuff **************** */
80
81static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84
85SVCPOOL*
86svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87{
88	SVCPOOL *pool;
89	SVCGROUP *grp;
90	int g;
91
92	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93
94	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95	pool->sp_name = name;
96	pool->sp_state = SVCPOOL_INIT;
97	pool->sp_proc = NULL;
98	TAILQ_INIT(&pool->sp_callouts);
99	TAILQ_INIT(&pool->sp_lcallouts);
100	pool->sp_minthreads = 1;
101	pool->sp_maxthreads = 1;
102	pool->sp_groupcount = 1;
103	for (g = 0; g < SVC_MAXGROUPS; g++) {
104		grp = &pool->sp_groups[g];
105		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106		grp->sg_pool = pool;
107		grp->sg_state = SVCPOOL_ACTIVE;
108		TAILQ_INIT(&grp->sg_xlist);
109		TAILQ_INIT(&grp->sg_active);
110		LIST_INIT(&grp->sg_idlethreads);
111		grp->sg_minthreads = 1;
112		grp->sg_maxthreads = 1;
113	}
114
115	/*
116	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
117	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
118	 * on LP64 architectures, so cast to u_long to avoid undefined
119	 * behavior.  (ILP32 architectures cannot have nmbclusters
120	 * large enough to overflow for other reasons.)
121	 */
122	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
123	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
124
125	sysctl_ctx_init(&pool->sp_sysctl);
126	if (sysctl_base) {
127		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
128		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
129		    pool, 0, svcpool_minthread_sysctl, "I",
130		    "Minimal number of threads");
131		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
133		    pool, 0, svcpool_maxthread_sysctl, "I",
134		    "Maximal number of threads");
135		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136		    "threads", CTLTYPE_INT | CTLFLAG_RD,
137		    pool, 0, svcpool_threads_sysctl, "I",
138		    "Current number of threads");
139		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
141		    "Number of thread groups");
142
143		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144		    "request_space_used", CTLFLAG_RD,
145		    &pool->sp_space_used,
146		    "Space in parsed but not handled requests.");
147
148		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
149		    "request_space_used_highest", CTLFLAG_RD,
150		    &pool->sp_space_used_highest,
151		    "Highest space used since reboot.");
152
153		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
154		    "request_space_high", CTLFLAG_RW,
155		    &pool->sp_space_high,
156		    "Maximum space in parsed but not handled requests.");
157
158		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
159		    "request_space_low", CTLFLAG_RW,
160		    &pool->sp_space_low,
161		    "Low water mark for request space.");
162
163		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
164		    "request_space_throttled", CTLFLAG_RD,
165		    &pool->sp_space_throttled, 0,
166		    "Whether nfs requests are currently throttled");
167
168		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
169		    "request_space_throttle_count", CTLFLAG_RD,
170		    &pool->sp_space_throttle_count, 0,
171		    "Count of times throttling based on request space has occurred");
172	}
173
174	return pool;
175}
176
177void
178svcpool_destroy(SVCPOOL *pool)
179{
180	SVCGROUP *grp;
181	SVCXPRT *xprt, *nxprt;
182	struct svc_callout *s;
183	struct svc_loss_callout *sl;
184	struct svcxprt_list cleanup;
185	int g;
186
187	TAILQ_INIT(&cleanup);
188
189	for (g = 0; g < SVC_MAXGROUPS; g++) {
190		grp = &pool->sp_groups[g];
191		mtx_lock(&grp->sg_lock);
192		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
193			xprt_unregister_locked(xprt);
194			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
195		}
196		mtx_unlock(&grp->sg_lock);
197	}
198	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
199		SVC_RELEASE(xprt);
200	}
201
202	mtx_lock(&pool->sp_lock);
203	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
204		mtx_unlock(&pool->sp_lock);
205		svc_unreg(pool, s->sc_prog, s->sc_vers);
206		mtx_lock(&pool->sp_lock);
207	}
208	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
209		mtx_unlock(&pool->sp_lock);
210		svc_loss_unreg(pool, sl->slc_dispatch);
211		mtx_lock(&pool->sp_lock);
212	}
213	mtx_unlock(&pool->sp_lock);
214
215	for (g = 0; g < SVC_MAXGROUPS; g++) {
216		grp = &pool->sp_groups[g];
217		mtx_destroy(&grp->sg_lock);
218	}
219	mtx_destroy(&pool->sp_lock);
220
221	if (pool->sp_rcache)
222		replay_freecache(pool->sp_rcache);
223
224	sysctl_ctx_free(&pool->sp_sysctl);
225	free(pool, M_RPC);
226}
227
228/*
229 * Sysctl handler to get the present thread count on a pool
230 */
231static int
232svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
233{
234	SVCPOOL *pool;
235	int threads, error, g;
236
237	pool = oidp->oid_arg1;
238	threads = 0;
239	mtx_lock(&pool->sp_lock);
240	for (g = 0; g < pool->sp_groupcount; g++)
241		threads += pool->sp_groups[g].sg_threadcount;
242	mtx_unlock(&pool->sp_lock);
243	error = sysctl_handle_int(oidp, &threads, 0, req);
244	return (error);
245}
246
247/*
248 * Sysctl handler to set the minimum thread count on a pool
249 */
250static int
251svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
252{
253	SVCPOOL *pool;
254	int newminthreads, error, g;
255
256	pool = oidp->oid_arg1;
257	newminthreads = pool->sp_minthreads;
258	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
259	if (error == 0 && newminthreads != pool->sp_minthreads) {
260		if (newminthreads > pool->sp_maxthreads)
261			return (EINVAL);
262		mtx_lock(&pool->sp_lock);
263		pool->sp_minthreads = newminthreads;
264		for (g = 0; g < pool->sp_groupcount; g++) {
265			pool->sp_groups[g].sg_minthreads = max(1,
266			    pool->sp_minthreads / pool->sp_groupcount);
267		}
268		mtx_unlock(&pool->sp_lock);
269	}
270	return (error);
271}
272
273/*
274 * Sysctl handler to set the maximum thread count on a pool
275 */
276static int
277svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
278{
279	SVCPOOL *pool;
280	int newmaxthreads, error, g;
281
282	pool = oidp->oid_arg1;
283	newmaxthreads = pool->sp_maxthreads;
284	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
285	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
286		if (newmaxthreads < pool->sp_minthreads)
287			return (EINVAL);
288		mtx_lock(&pool->sp_lock);
289		pool->sp_maxthreads = newmaxthreads;
290		for (g = 0; g < pool->sp_groupcount; g++) {
291			pool->sp_groups[g].sg_maxthreads = max(1,
292			    pool->sp_maxthreads / pool->sp_groupcount);
293		}
294		mtx_unlock(&pool->sp_lock);
295	}
296	return (error);
297}
298
299/*
300 * Activate a transport handle.
301 */
302void
303xprt_register(SVCXPRT *xprt)
304{
305	SVCPOOL *pool = xprt->xp_pool;
306	SVCGROUP *grp;
307	int g;
308
309	SVC_ACQUIRE(xprt);
310	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
311	xprt->xp_group = grp = &pool->sp_groups[g];
312	mtx_lock(&grp->sg_lock);
313	xprt->xp_registered = TRUE;
314	xprt->xp_active = FALSE;
315	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
316	mtx_unlock(&grp->sg_lock);
317}
318
319/*
320 * De-activate a transport handle. Note: the locked version doesn't
321 * release the transport - caller must do that after dropping the pool
322 * lock.
323 */
324static void
325xprt_unregister_locked(SVCXPRT *xprt)
326{
327	SVCGROUP *grp = xprt->xp_group;
328
329	mtx_assert(&grp->sg_lock, MA_OWNED);
330	KASSERT(xprt->xp_registered == TRUE,
331	    ("xprt_unregister_locked: not registered"));
332	xprt_inactive_locked(xprt);
333	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
334	xprt->xp_registered = FALSE;
335}
336
337void
338xprt_unregister(SVCXPRT *xprt)
339{
340	SVCGROUP *grp = xprt->xp_group;
341
342	mtx_lock(&grp->sg_lock);
343	if (xprt->xp_registered == FALSE) {
344		/* Already unregistered by another thread */
345		mtx_unlock(&grp->sg_lock);
346		return;
347	}
348	xprt_unregister_locked(xprt);
349	mtx_unlock(&grp->sg_lock);
350
351	SVC_RELEASE(xprt);
352}
353
354/*
355 * Attempt to assign a service thread to this transport.
356 */
357static int
358xprt_assignthread(SVCXPRT *xprt)
359{
360	SVCGROUP *grp = xprt->xp_group;
361	SVCTHREAD *st;
362
363	mtx_assert(&grp->sg_lock, MA_OWNED);
364	st = LIST_FIRST(&grp->sg_idlethreads);
365	if (st) {
366		LIST_REMOVE(st, st_ilink);
367		SVC_ACQUIRE(xprt);
368		xprt->xp_thread = st;
369		st->st_xprt = xprt;
370		cv_signal(&st->st_cond);
371		return (TRUE);
372	} else {
373		/*
374		 * See if we can create a new thread. The
375		 * actual thread creation happens in
376		 * svc_run_internal because our locking state
377		 * is poorly defined (we are typically called
378		 * from a socket upcall). Don't create more
379		 * than one thread per second.
380		 */
381		if (grp->sg_state == SVCPOOL_ACTIVE
382		    && grp->sg_lastcreatetime < time_uptime
383		    && grp->sg_threadcount < grp->sg_maxthreads) {
384			grp->sg_state = SVCPOOL_THREADWANTED;
385		}
386	}
387	return (FALSE);
388}
389
390void
391xprt_active(SVCXPRT *xprt)
392{
393	SVCGROUP *grp = xprt->xp_group;
394
395	mtx_lock(&grp->sg_lock);
396
397	if (!xprt->xp_registered) {
398		/*
399		 * Race with xprt_unregister - we lose.
400		 */
401		mtx_unlock(&grp->sg_lock);
402		return;
403	}
404
405	if (!xprt->xp_active) {
406		xprt->xp_active = TRUE;
407		if (xprt->xp_thread == NULL) {
408			if (!svc_request_space_available(xprt->xp_pool) ||
409			    !xprt_assignthread(xprt))
410				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
411				    xp_alink);
412		}
413	}
414
415	mtx_unlock(&grp->sg_lock);
416}
417
418void
419xprt_inactive_locked(SVCXPRT *xprt)
420{
421	SVCGROUP *grp = xprt->xp_group;
422
423	mtx_assert(&grp->sg_lock, MA_OWNED);
424	if (xprt->xp_active) {
425		if (xprt->xp_thread == NULL)
426			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
427		xprt->xp_active = FALSE;
428	}
429}
430
431void
432xprt_inactive(SVCXPRT *xprt)
433{
434	SVCGROUP *grp = xprt->xp_group;
435
436	mtx_lock(&grp->sg_lock);
437	xprt_inactive_locked(xprt);
438	mtx_unlock(&grp->sg_lock);
439}
440
441/*
442 * Variant of xprt_inactive() for use only when sure that port is
443 * assigned to thread. For example, withing receive handlers.
444 */
445void
446xprt_inactive_self(SVCXPRT *xprt)
447{
448
449	KASSERT(xprt->xp_thread != NULL,
450	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
451	xprt->xp_active = FALSE;
452}
453
454/*
455 * Add a service program to the callout list.
456 * The dispatch routine will be called when a rpc request for this
457 * program number comes in.
458 */
459bool_t
460svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
461    void (*dispatch)(struct svc_req *, SVCXPRT *),
462    const struct netconfig *nconf)
463{
464	SVCPOOL *pool = xprt->xp_pool;
465	struct svc_callout *s;
466	char *netid = NULL;
467	int flag = 0;
468
469/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
470
471	if (xprt->xp_netid) {
472		netid = strdup(xprt->xp_netid, M_RPC);
473		flag = 1;
474	} else if (nconf && nconf->nc_netid) {
475		netid = strdup(nconf->nc_netid, M_RPC);
476		flag = 1;
477	} /* must have been created with svc_raw_create */
478	if ((netid == NULL) && (flag == 1)) {
479		return (FALSE);
480	}
481
482	mtx_lock(&pool->sp_lock);
483	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
484		if (netid)
485			free(netid, M_RPC);
486		if (s->sc_dispatch == dispatch)
487			goto rpcb_it; /* he is registering another xptr */
488		mtx_unlock(&pool->sp_lock);
489		return (FALSE);
490	}
491	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
492	if (s == NULL) {
493		if (netid)
494			free(netid, M_RPC);
495		mtx_unlock(&pool->sp_lock);
496		return (FALSE);
497	}
498
499	s->sc_prog = prog;
500	s->sc_vers = vers;
501	s->sc_dispatch = dispatch;
502	s->sc_netid = netid;
503	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
504
505	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
506		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
507
508rpcb_it:
509	mtx_unlock(&pool->sp_lock);
510	/* now register the information with the local binder service */
511	if (nconf) {
512		bool_t dummy;
513		struct netconfig tnc;
514		struct netbuf nb;
515		tnc = *nconf;
516		nb.buf = &xprt->xp_ltaddr;
517		nb.len = xprt->xp_ltaddr.ss_len;
518		dummy = rpcb_set(prog, vers, &tnc, &nb);
519		return (dummy);
520	}
521	return (TRUE);
522}
523
524/*
525 * Remove a service program from the callout list.
526 */
527void
528svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
529{
530	struct svc_callout *s;
531
532	/* unregister the information anyway */
533	(void) rpcb_unset(prog, vers, NULL);
534	mtx_lock(&pool->sp_lock);
535	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
536		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
537		if (s->sc_netid)
538			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
539		mem_free(s, sizeof (struct svc_callout));
540	}
541	mtx_unlock(&pool->sp_lock);
542}
543
544/*
545 * Add a service connection loss program to the callout list.
546 * The dispatch routine will be called when some port in ths pool die.
547 */
548bool_t
549svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
550{
551	SVCPOOL *pool = xprt->xp_pool;
552	struct svc_loss_callout *s;
553
554	mtx_lock(&pool->sp_lock);
555	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
556		if (s->slc_dispatch == dispatch)
557			break;
558	}
559	if (s != NULL) {
560		mtx_unlock(&pool->sp_lock);
561		return (TRUE);
562	}
563	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
564	if (s == NULL) {
565		mtx_unlock(&pool->sp_lock);
566		return (FALSE);
567	}
568	s->slc_dispatch = dispatch;
569	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
570	mtx_unlock(&pool->sp_lock);
571	return (TRUE);
572}
573
574/*
575 * Remove a service connection loss program from the callout list.
576 */
577void
578svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
579{
580	struct svc_loss_callout *s;
581
582	mtx_lock(&pool->sp_lock);
583	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
584		if (s->slc_dispatch == dispatch) {
585			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
586			free(s, M_RPC);
587			break;
588		}
589	}
590	mtx_unlock(&pool->sp_lock);
591}
592
593/* ********************** CALLOUT list related stuff ************* */
594
595/*
596 * Search the callout list for a program number, return the callout
597 * struct.
598 */
599static struct svc_callout *
600svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
601{
602	struct svc_callout *s;
603
604	mtx_assert(&pool->sp_lock, MA_OWNED);
605	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
606		if (s->sc_prog == prog && s->sc_vers == vers
607		    && (netid == NULL || s->sc_netid == NULL ||
608			strcmp(netid, s->sc_netid) == 0))
609			break;
610	}
611
612	return (s);
613}
614
615/* ******************* REPLY GENERATION ROUTINES  ************ */
616
617static bool_t
618svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
619    struct mbuf *body)
620{
621	SVCXPRT *xprt = rqstp->rq_xprt;
622	bool_t ok;
623
624	if (rqstp->rq_args) {
625		m_freem(rqstp->rq_args);
626		rqstp->rq_args = NULL;
627	}
628
629	if (xprt->xp_pool->sp_rcache)
630		replay_setreply(xprt->xp_pool->sp_rcache,
631		    rply, svc_getrpccaller(rqstp), body);
632
633	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
634		return (FALSE);
635
636	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
637	if (rqstp->rq_addr) {
638		free(rqstp->rq_addr, M_SONAME);
639		rqstp->rq_addr = NULL;
640	}
641
642	return (ok);
643}
644
645/*
646 * Send a reply to an rpc request
647 */
648bool_t
649svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
650{
651	struct rpc_msg rply;
652	struct mbuf *m;
653	XDR xdrs;
654	bool_t ok;
655
656	rply.rm_xid = rqstp->rq_xid;
657	rply.rm_direction = REPLY;
658	rply.rm_reply.rp_stat = MSG_ACCEPTED;
659	rply.acpted_rply.ar_verf = rqstp->rq_verf;
660	rply.acpted_rply.ar_stat = SUCCESS;
661	rply.acpted_rply.ar_results.where = NULL;
662	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
663
664	m = m_getcl(M_WAITOK, MT_DATA, 0);
665	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
666	ok = xdr_results(&xdrs, xdr_location);
667	XDR_DESTROY(&xdrs);
668
669	if (ok) {
670		return (svc_sendreply_common(rqstp, &rply, m));
671	} else {
672		m_freem(m);
673		return (FALSE);
674	}
675}
676
677bool_t
678svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
679{
680	struct rpc_msg rply;
681
682	rply.rm_xid = rqstp->rq_xid;
683	rply.rm_direction = REPLY;
684	rply.rm_reply.rp_stat = MSG_ACCEPTED;
685	rply.acpted_rply.ar_verf = rqstp->rq_verf;
686	rply.acpted_rply.ar_stat = SUCCESS;
687	rply.acpted_rply.ar_results.where = NULL;
688	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
689
690	return (svc_sendreply_common(rqstp, &rply, m));
691}
692
693/*
694 * No procedure error reply
695 */
696void
697svcerr_noproc(struct svc_req *rqstp)
698{
699	SVCXPRT *xprt = rqstp->rq_xprt;
700	struct rpc_msg rply;
701
702	rply.rm_xid = rqstp->rq_xid;
703	rply.rm_direction = REPLY;
704	rply.rm_reply.rp_stat = MSG_ACCEPTED;
705	rply.acpted_rply.ar_verf = rqstp->rq_verf;
706	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
707
708	if (xprt->xp_pool->sp_rcache)
709		replay_setreply(xprt->xp_pool->sp_rcache,
710		    &rply, svc_getrpccaller(rqstp), NULL);
711
712	svc_sendreply_common(rqstp, &rply, NULL);
713}
714
715/*
716 * Can't decode args error reply
717 */
718void
719svcerr_decode(struct svc_req *rqstp)
720{
721	SVCXPRT *xprt = rqstp->rq_xprt;
722	struct rpc_msg rply;
723
724	rply.rm_xid = rqstp->rq_xid;
725	rply.rm_direction = REPLY;
726	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
729
730	if (xprt->xp_pool->sp_rcache)
731		replay_setreply(xprt->xp_pool->sp_rcache,
732		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
733
734	svc_sendreply_common(rqstp, &rply, NULL);
735}
736
737/*
738 * Some system error
739 */
740void
741svcerr_systemerr(struct svc_req *rqstp)
742{
743	SVCXPRT *xprt = rqstp->rq_xprt;
744	struct rpc_msg rply;
745
746	rply.rm_xid = rqstp->rq_xid;
747	rply.rm_direction = REPLY;
748	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750	rply.acpted_rply.ar_stat = SYSTEM_ERR;
751
752	if (xprt->xp_pool->sp_rcache)
753		replay_setreply(xprt->xp_pool->sp_rcache,
754		    &rply, svc_getrpccaller(rqstp), NULL);
755
756	svc_sendreply_common(rqstp, &rply, NULL);
757}
758
759/*
760 * Authentication error reply
761 */
762void
763svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
764{
765	SVCXPRT *xprt = rqstp->rq_xprt;
766	struct rpc_msg rply;
767
768	rply.rm_xid = rqstp->rq_xid;
769	rply.rm_direction = REPLY;
770	rply.rm_reply.rp_stat = MSG_DENIED;
771	rply.rjcted_rply.rj_stat = AUTH_ERROR;
772	rply.rjcted_rply.rj_why = why;
773
774	if (xprt->xp_pool->sp_rcache)
775		replay_setreply(xprt->xp_pool->sp_rcache,
776		    &rply, svc_getrpccaller(rqstp), NULL);
777
778	svc_sendreply_common(rqstp, &rply, NULL);
779}
780
781/*
782 * Auth too weak error reply
783 */
784void
785svcerr_weakauth(struct svc_req *rqstp)
786{
787
788	svcerr_auth(rqstp, AUTH_TOOWEAK);
789}
790
791/*
792 * Program unavailable error reply
793 */
794void
795svcerr_noprog(struct svc_req *rqstp)
796{
797	SVCXPRT *xprt = rqstp->rq_xprt;
798	struct rpc_msg rply;
799
800	rply.rm_xid = rqstp->rq_xid;
801	rply.rm_direction = REPLY;
802	rply.rm_reply.rp_stat = MSG_ACCEPTED;
803	rply.acpted_rply.ar_verf = rqstp->rq_verf;
804	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
805
806	if (xprt->xp_pool->sp_rcache)
807		replay_setreply(xprt->xp_pool->sp_rcache,
808		    &rply, svc_getrpccaller(rqstp), NULL);
809
810	svc_sendreply_common(rqstp, &rply, NULL);
811}
812
813/*
814 * Program version mismatch error reply
815 */
816void
817svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
818{
819	SVCXPRT *xprt = rqstp->rq_xprt;
820	struct rpc_msg rply;
821
822	rply.rm_xid = rqstp->rq_xid;
823	rply.rm_direction = REPLY;
824	rply.rm_reply.rp_stat = MSG_ACCEPTED;
825	rply.acpted_rply.ar_verf = rqstp->rq_verf;
826	rply.acpted_rply.ar_stat = PROG_MISMATCH;
827	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
828	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
829
830	if (xprt->xp_pool->sp_rcache)
831		replay_setreply(xprt->xp_pool->sp_rcache,
832		    &rply, svc_getrpccaller(rqstp), NULL);
833
834	svc_sendreply_common(rqstp, &rply, NULL);
835}
836
837/*
838 * Allocate a new server transport structure. All fields are
839 * initialized to zero and xp_p3 is initialized to point at an
840 * extension structure to hold various flags and authentication
841 * parameters.
842 */
843SVCXPRT *
844svc_xprt_alloc()
845{
846	SVCXPRT *xprt;
847	SVCXPRT_EXT *ext;
848
849	xprt = mem_alloc(sizeof(SVCXPRT));
850	memset(xprt, 0, sizeof(SVCXPRT));
851	ext = mem_alloc(sizeof(SVCXPRT_EXT));
852	memset(ext, 0, sizeof(SVCXPRT_EXT));
853	xprt->xp_p3 = ext;
854	refcount_init(&xprt->xp_refs, 1);
855
856	return (xprt);
857}
858
859/*
860 * Free a server transport structure.
861 */
862void
863svc_xprt_free(xprt)
864	SVCXPRT *xprt;
865{
866
867	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
868	mem_free(xprt, sizeof(SVCXPRT));
869}
870
871/* ******************* SERVER INPUT STUFF ******************* */
872
873/*
874 * Read RPC requests from a transport and queue them to be
875 * executed. We handle authentication and replay cache replies here.
876 * Actually dispatching the RPC is deferred till svc_executereq.
877 */
878static enum xprt_stat
879svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
880{
881	SVCPOOL *pool = xprt->xp_pool;
882	struct svc_req *r;
883	struct rpc_msg msg;
884	struct mbuf *args;
885	struct svc_loss_callout *s;
886	enum xprt_stat stat;
887
888	/* now receive msgs from xprtprt (support batch calls) */
889	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
890
891	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
892	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
893	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
894	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
895		enum auth_stat why;
896
897		/*
898		 * Handle replays and authenticate before queuing the
899		 * request to be executed.
900		 */
901		SVC_ACQUIRE(xprt);
902		r->rq_xprt = xprt;
903		if (pool->sp_rcache) {
904			struct rpc_msg repmsg;
905			struct mbuf *repbody;
906			enum replay_state rs;
907			rs = replay_find(pool->sp_rcache, &msg,
908			    svc_getrpccaller(r), &repmsg, &repbody);
909			switch (rs) {
910			case RS_NEW:
911				break;
912			case RS_DONE:
913				SVC_REPLY(xprt, &repmsg, r->rq_addr,
914				    repbody, &r->rq_reply_seq);
915				if (r->rq_addr) {
916					free(r->rq_addr, M_SONAME);
917					r->rq_addr = NULL;
918				}
919				m_freem(args);
920				goto call_done;
921
922			default:
923				m_freem(args);
924				goto call_done;
925			}
926		}
927
928		r->rq_xid = msg.rm_xid;
929		r->rq_prog = msg.rm_call.cb_prog;
930		r->rq_vers = msg.rm_call.cb_vers;
931		r->rq_proc = msg.rm_call.cb_proc;
932		r->rq_size = sizeof(*r) + m_length(args, NULL);
933		r->rq_args = args;
934		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
935			/*
936			 * RPCSEC_GSS uses this return code
937			 * for requests that form part of its
938			 * context establishment protocol and
939			 * should not be dispatched to the
940			 * application.
941			 */
942			if (why != RPCSEC_GSS_NODISPATCH)
943				svcerr_auth(r, why);
944			goto call_done;
945		}
946
947		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
948			svcerr_decode(r);
949			goto call_done;
950		}
951
952		/*
953		 * Everything checks out, return request to caller.
954		 */
955		*rqstp_ret = r;
956		r = NULL;
957	}
958call_done:
959	if (r) {
960		svc_freereq(r);
961		r = NULL;
962	}
963	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
964		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
965			(*s->slc_dispatch)(xprt);
966		xprt_unregister(xprt);
967	}
968
969	return (stat);
970}
971
972static void
973svc_executereq(struct svc_req *rqstp)
974{
975	SVCXPRT *xprt = rqstp->rq_xprt;
976	SVCPOOL *pool = xprt->xp_pool;
977	int prog_found;
978	rpcvers_t low_vers;
979	rpcvers_t high_vers;
980	struct svc_callout *s;
981
982	/* now match message with a registered service*/
983	prog_found = FALSE;
984	low_vers = (rpcvers_t) -1L;
985	high_vers = (rpcvers_t) 0L;
986	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
987		if (s->sc_prog == rqstp->rq_prog) {
988			if (s->sc_vers == rqstp->rq_vers) {
989				/*
990				 * We hand ownership of r to the
991				 * dispatch method - they must call
992				 * svc_freereq.
993				 */
994				(*s->sc_dispatch)(rqstp, xprt);
995				return;
996			}  /* found correct version */
997			prog_found = TRUE;
998			if (s->sc_vers < low_vers)
999				low_vers = s->sc_vers;
1000			if (s->sc_vers > high_vers)
1001				high_vers = s->sc_vers;
1002		}   /* found correct program */
1003	}
1004
1005	/*
1006	 * if we got here, the program or version
1007	 * is not served ...
1008	 */
1009	if (prog_found)
1010		svcerr_progvers(rqstp, low_vers, high_vers);
1011	else
1012		svcerr_noprog(rqstp);
1013
1014	svc_freereq(rqstp);
1015}
1016
1017static void
1018svc_checkidle(SVCGROUP *grp)
1019{
1020	SVCXPRT *xprt, *nxprt;
1021	time_t timo;
1022	struct svcxprt_list cleanup;
1023
1024	TAILQ_INIT(&cleanup);
1025	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1026		/*
1027		 * Only some transports have idle timers. Don't time
1028		 * something out which is just waking up.
1029		 */
1030		if (!xprt->xp_idletimeout || xprt->xp_thread)
1031			continue;
1032
1033		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1034		if (time_uptime > timo) {
1035			xprt_unregister_locked(xprt);
1036			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1037		}
1038	}
1039
1040	mtx_unlock(&grp->sg_lock);
1041	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1042		SVC_RELEASE(xprt);
1043	}
1044	mtx_lock(&grp->sg_lock);
1045}
1046
1047static void
1048svc_assign_waiting_sockets(SVCPOOL *pool)
1049{
1050	SVCGROUP *grp;
1051	SVCXPRT *xprt;
1052	int g;
1053
1054	for (g = 0; g < pool->sp_groupcount; g++) {
1055		grp = &pool->sp_groups[g];
1056		mtx_lock(&grp->sg_lock);
1057		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1058			if (xprt_assignthread(xprt))
1059				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1060			else
1061				break;
1062		}
1063		mtx_unlock(&grp->sg_lock);
1064	}
1065}
1066
1067static void
1068svc_change_space_used(SVCPOOL *pool, long delta)
1069{
1070	unsigned long value;
1071
1072	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1073	if (delta > 0) {
1074		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1075			pool->sp_space_throttled = TRUE;
1076			pool->sp_space_throttle_count++;
1077		}
1078		if (value > pool->sp_space_used_highest)
1079			pool->sp_space_used_highest = value;
1080	} else {
1081		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1082			pool->sp_space_throttled = FALSE;
1083			svc_assign_waiting_sockets(pool);
1084		}
1085	}
1086}
1087
1088static bool_t
1089svc_request_space_available(SVCPOOL *pool)
1090{
1091
1092	if (pool->sp_space_throttled)
1093		return (FALSE);
1094	return (TRUE);
1095}
1096
1097static void
1098svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1099{
1100	SVCPOOL *pool = grp->sg_pool;
1101	SVCTHREAD *st, *stpref;
1102	SVCXPRT *xprt;
1103	enum xprt_stat stat;
1104	struct svc_req *rqstp;
1105	struct proc *p;
1106	long sz;
1107	int error;
1108
1109	st = mem_alloc(sizeof(*st));
1110	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1111	st->st_pool = pool;
1112	st->st_xprt = NULL;
1113	STAILQ_INIT(&st->st_reqs);
1114	cv_init(&st->st_cond, "rpcsvc");
1115
1116	mtx_lock(&grp->sg_lock);
1117
1118	/*
1119	 * If we are a new thread which was spawned to cope with
1120	 * increased load, set the state back to SVCPOOL_ACTIVE.
1121	 */
1122	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1123		grp->sg_state = SVCPOOL_ACTIVE;
1124
1125	while (grp->sg_state != SVCPOOL_CLOSING) {
1126		/*
1127		 * Create new thread if requested.
1128		 */
1129		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1130			grp->sg_state = SVCPOOL_THREADSTARTING;
1131			grp->sg_lastcreatetime = time_uptime;
1132			mtx_unlock(&grp->sg_lock);
1133			svc_new_thread(grp);
1134			mtx_lock(&grp->sg_lock);
1135			continue;
1136		}
1137
1138		/*
1139		 * Check for idle transports once per second.
1140		 */
1141		if (time_uptime > grp->sg_lastidlecheck) {
1142			grp->sg_lastidlecheck = time_uptime;
1143			svc_checkidle(grp);
1144		}
1145
1146		xprt = st->st_xprt;
1147		if (!xprt) {
1148			/*
1149			 * Enforce maxthreads count.
1150			 */
1151			if (grp->sg_threadcount > grp->sg_maxthreads)
1152				break;
1153
1154			/*
1155			 * Before sleeping, see if we can find an
1156			 * active transport which isn't being serviced
1157			 * by a thread.
1158			 */
1159			if (svc_request_space_available(pool) &&
1160			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1161				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1162				SVC_ACQUIRE(xprt);
1163				xprt->xp_thread = st;
1164				st->st_xprt = xprt;
1165				continue;
1166			}
1167
1168			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1169			if (ismaster || (!ismaster &&
1170			    grp->sg_threadcount > grp->sg_minthreads))
1171				error = cv_timedwait_sig(&st->st_cond,
1172				    &grp->sg_lock, 5 * hz);
1173			else
1174				error = cv_wait_sig(&st->st_cond,
1175				    &grp->sg_lock);
1176			if (st->st_xprt == NULL)
1177				LIST_REMOVE(st, st_ilink);
1178
1179			/*
1180			 * Reduce worker thread count when idle.
1181			 */
1182			if (error == EWOULDBLOCK) {
1183				if (!ismaster
1184				    && (grp->sg_threadcount
1185					> grp->sg_minthreads)
1186					&& !st->st_xprt)
1187					break;
1188			} else if (error != 0) {
1189				KASSERT(error == EINTR || error == ERESTART,
1190				    ("non-signal error %d", error));
1191				mtx_unlock(&grp->sg_lock);
1192				p = curproc;
1193				PROC_LOCK(p);
1194				if (P_SHOULDSTOP(p) ||
1195				    (p->p_flag & P_TOTAL_STOP) != 0) {
1196					thread_suspend_check(0);
1197					PROC_UNLOCK(p);
1198					mtx_lock(&grp->sg_lock);
1199				} else {
1200					PROC_UNLOCK(p);
1201					svc_exit(pool);
1202					mtx_lock(&grp->sg_lock);
1203					break;
1204				}
1205			}
1206			continue;
1207		}
1208		mtx_unlock(&grp->sg_lock);
1209
1210		/*
1211		 * Drain the transport socket and queue up any RPCs.
1212		 */
1213		xprt->xp_lastactive = time_uptime;
1214		do {
1215			if (!svc_request_space_available(pool))
1216				break;
1217			rqstp = NULL;
1218			stat = svc_getreq(xprt, &rqstp);
1219			if (rqstp) {
1220				svc_change_space_used(pool, rqstp->rq_size);
1221				/*
1222				 * See if the application has a preference
1223				 * for some other thread.
1224				 */
1225				if (pool->sp_assign) {
1226					stpref = pool->sp_assign(st, rqstp);
1227					rqstp->rq_thread = stpref;
1228					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1229					    rqstp, rq_link);
1230					mtx_unlock(&stpref->st_lock);
1231					if (stpref != st)
1232						rqstp = NULL;
1233				} else {
1234					rqstp->rq_thread = st;
1235					STAILQ_INSERT_TAIL(&st->st_reqs,
1236					    rqstp, rq_link);
1237				}
1238			}
1239		} while (rqstp == NULL && stat == XPRT_MOREREQS
1240		    && grp->sg_state != SVCPOOL_CLOSING);
1241
1242		/*
1243		 * Move this transport to the end of the active list to
1244		 * ensure fairness when multiple transports are active.
1245		 * If this was the last queued request, svc_getreq will end
1246		 * up calling xprt_inactive to remove from the active list.
1247		 */
1248		mtx_lock(&grp->sg_lock);
1249		xprt->xp_thread = NULL;
1250		st->st_xprt = NULL;
1251		if (xprt->xp_active) {
1252			if (!svc_request_space_available(pool) ||
1253			    !xprt_assignthread(xprt))
1254				TAILQ_INSERT_TAIL(&grp->sg_active,
1255				    xprt, xp_alink);
1256		}
1257		mtx_unlock(&grp->sg_lock);
1258		SVC_RELEASE(xprt);
1259
1260		/*
1261		 * Execute what we have queued.
1262		 */
1263		mtx_lock(&st->st_lock);
1264		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1265			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1266			mtx_unlock(&st->st_lock);
1267			sz = (long)rqstp->rq_size;
1268			svc_executereq(rqstp);
1269			svc_change_space_used(pool, -sz);
1270			mtx_lock(&st->st_lock);
1271		}
1272		mtx_unlock(&st->st_lock);
1273		mtx_lock(&grp->sg_lock);
1274	}
1275
1276	if (st->st_xprt) {
1277		xprt = st->st_xprt;
1278		st->st_xprt = NULL;
1279		SVC_RELEASE(xprt);
1280	}
1281	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1282	mtx_destroy(&st->st_lock);
1283	cv_destroy(&st->st_cond);
1284	mem_free(st, sizeof(*st));
1285
1286	grp->sg_threadcount--;
1287	if (!ismaster)
1288		wakeup(grp);
1289	mtx_unlock(&grp->sg_lock);
1290}
1291
1292static void
1293svc_thread_start(void *arg)
1294{
1295
1296	svc_run_internal((SVCGROUP *) arg, FALSE);
1297	kthread_exit();
1298}
1299
1300static void
1301svc_new_thread(SVCGROUP *grp)
1302{
1303	SVCPOOL *pool = grp->sg_pool;
1304	struct thread *td;
1305
1306	mtx_lock(&grp->sg_lock);
1307	grp->sg_threadcount++;
1308	mtx_unlock(&grp->sg_lock);
1309	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1310	    "%s: service", pool->sp_name);
1311}
1312
1313void
1314svc_run(SVCPOOL *pool)
1315{
1316	int g, i;
1317	struct proc *p;
1318	struct thread *td;
1319	SVCGROUP *grp;
1320
1321	p = curproc;
1322	td = curthread;
1323	snprintf(td->td_name, sizeof(td->td_name),
1324	    "%s: master", pool->sp_name);
1325	pool->sp_state = SVCPOOL_ACTIVE;
1326	pool->sp_proc = p;
1327
1328	/* Choose group count based on number of threads and CPUs. */
1329	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1330	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1331	for (g = 0; g < pool->sp_groupcount; g++) {
1332		grp = &pool->sp_groups[g];
1333		grp->sg_minthreads = max(1,
1334		    pool->sp_minthreads / pool->sp_groupcount);
1335		grp->sg_maxthreads = max(1,
1336		    pool->sp_maxthreads / pool->sp_groupcount);
1337		grp->sg_lastcreatetime = time_uptime;
1338	}
1339
1340	/* Starting threads */
1341	pool->sp_groups[0].sg_threadcount++;
1342	for (g = 0; g < pool->sp_groupcount; g++) {
1343		grp = &pool->sp_groups[g];
1344		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1345			svc_new_thread(grp);
1346	}
1347	svc_run_internal(&pool->sp_groups[0], TRUE);
1348
1349	/* Waiting for threads to stop. */
1350	for (g = 0; g < pool->sp_groupcount; g++) {
1351		grp = &pool->sp_groups[g];
1352		mtx_lock(&grp->sg_lock);
1353		while (grp->sg_threadcount > 0)
1354			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1355		mtx_unlock(&grp->sg_lock);
1356	}
1357}
1358
1359void
1360svc_exit(SVCPOOL *pool)
1361{
1362	SVCGROUP *grp;
1363	SVCTHREAD *st;
1364	int g;
1365
1366	pool->sp_state = SVCPOOL_CLOSING;
1367	for (g = 0; g < pool->sp_groupcount; g++) {
1368		grp = &pool->sp_groups[g];
1369		mtx_lock(&grp->sg_lock);
1370		if (grp->sg_state != SVCPOOL_CLOSING) {
1371			grp->sg_state = SVCPOOL_CLOSING;
1372			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1373				cv_signal(&st->st_cond);
1374		}
1375		mtx_unlock(&grp->sg_lock);
1376	}
1377}
1378
1379bool_t
1380svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1381{
1382	struct mbuf *m;
1383	XDR xdrs;
1384	bool_t stat;
1385
1386	m = rqstp->rq_args;
1387	rqstp->rq_args = NULL;
1388
1389	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1390	stat = xargs(&xdrs, args);
1391	XDR_DESTROY(&xdrs);
1392
1393	return (stat);
1394}
1395
1396bool_t
1397svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1398{
1399	XDR xdrs;
1400
1401	if (rqstp->rq_addr) {
1402		free(rqstp->rq_addr, M_SONAME);
1403		rqstp->rq_addr = NULL;
1404	}
1405
1406	xdrs.x_op = XDR_FREE;
1407	return (xargs(&xdrs, args));
1408}
1409
1410void
1411svc_freereq(struct svc_req *rqstp)
1412{
1413	SVCTHREAD *st;
1414	SVCPOOL *pool;
1415
1416	st = rqstp->rq_thread;
1417	if (st) {
1418		pool = st->st_pool;
1419		if (pool->sp_done)
1420			pool->sp_done(st, rqstp);
1421	}
1422
1423	if (rqstp->rq_auth.svc_ah_ops)
1424		SVCAUTH_RELEASE(&rqstp->rq_auth);
1425
1426	if (rqstp->rq_xprt) {
1427		SVC_RELEASE(rqstp->rq_xprt);
1428	}
1429
1430	if (rqstp->rq_addr)
1431		free(rqstp->rq_addr, M_SONAME);
1432
1433	if (rqstp->rq_args)
1434		m_freem(rqstp->rq_args);
1435
1436	free(rqstp, M_RPC);
1437}
1438