svc.c revision 261057
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/9/sys/rpc/svc.c 261057 2014-01-23 00:28:17Z mav $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/ucred.h>
60
61#include <rpc/rpc.h>
62#include <rpc/rpcb_clnt.h>
63#include <rpc/replay.h>
64
65#include <rpc/rpc_com.h>
66
67#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
68#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
69
70static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
71    char *);
72static void svc_new_thread(SVCPOOL *pool);
73static void xprt_unregister_locked(SVCXPRT *xprt);
74
75/* ***************  SVCXPRT related stuff **************** */
76
77static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
78static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
79
80SVCPOOL*
81svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
82{
83	SVCPOOL *pool;
84
85	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
86
87	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
88	pool->sp_name = name;
89	pool->sp_state = SVCPOOL_INIT;
90	pool->sp_proc = NULL;
91	TAILQ_INIT(&pool->sp_xlist);
92	TAILQ_INIT(&pool->sp_active);
93	TAILQ_INIT(&pool->sp_callouts);
94	LIST_INIT(&pool->sp_threads);
95	LIST_INIT(&pool->sp_idlethreads);
96	pool->sp_minthreads = 1;
97	pool->sp_maxthreads = 1;
98	pool->sp_threadcount = 0;
99
100	/*
101	 * Don't use more than a quarter of mbuf clusters or more than
102	 * 45Mb buffering requests.
103	 */
104	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
105	if (pool->sp_space_high > 45 << 20)
106		pool->sp_space_high = 45 << 20;
107	pool->sp_space_low = 2 * pool->sp_space_high / 3;
108
109	sysctl_ctx_init(&pool->sp_sysctl);
110	if (sysctl_base) {
111		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
112		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
113		    pool, 0, svcpool_minthread_sysctl, "I", "");
114		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
115		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
116		    pool, 0, svcpool_maxthread_sysctl, "I", "");
117		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
118		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
119
120		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
121		    "request_space_used", CTLFLAG_RD,
122		    &pool->sp_space_used, 0,
123		    "Space in parsed but not handled requests.");
124
125		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126		    "request_space_used_highest", CTLFLAG_RD,
127		    &pool->sp_space_used_highest, 0,
128		    "Highest space used since reboot.");
129
130		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131		    "request_space_high", CTLFLAG_RW,
132		    &pool->sp_space_high, 0,
133		    "Maximum space in parsed but not handled requests.");
134
135		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136		    "request_space_low", CTLFLAG_RW,
137		    &pool->sp_space_low, 0,
138		    "Low water mark for request space.");
139
140		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141		    "request_space_throttled", CTLFLAG_RD,
142		    &pool->sp_space_throttled, 0,
143		    "Whether nfs requests are currently throttled");
144
145		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146		    "request_space_throttle_count", CTLFLAG_RD,
147		    &pool->sp_space_throttle_count, 0,
148		    "Count of times throttling based on request space has occurred");
149	}
150
151	return pool;
152}
153
154void
155svcpool_destroy(SVCPOOL *pool)
156{
157	SVCXPRT *xprt, *nxprt;
158	struct svc_callout *s;
159	struct svcxprt_list cleanup;
160
161	TAILQ_INIT(&cleanup);
162	mtx_lock(&pool->sp_lock);
163
164	while (TAILQ_FIRST(&pool->sp_xlist)) {
165		xprt = TAILQ_FIRST(&pool->sp_xlist);
166		xprt_unregister_locked(xprt);
167		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
168	}
169
170	while (TAILQ_FIRST(&pool->sp_callouts)) {
171		s = TAILQ_FIRST(&pool->sp_callouts);
172		mtx_unlock(&pool->sp_lock);
173		svc_unreg(pool, s->sc_prog, s->sc_vers);
174		mtx_lock(&pool->sp_lock);
175	}
176	mtx_unlock(&pool->sp_lock);
177
178	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
179		SVC_RELEASE(xprt);
180	}
181
182	mtx_destroy(&pool->sp_lock);
183
184	if (pool->sp_rcache)
185		replay_freecache(pool->sp_rcache);
186
187	sysctl_ctx_free(&pool->sp_sysctl);
188	free(pool, M_RPC);
189}
190
191static bool_t
192svcpool_active(SVCPOOL *pool)
193{
194	enum svcpool_state state = pool->sp_state;
195
196	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197		return (FALSE);
198	return (TRUE);
199}
200
201/*
202 * Sysctl handler to set the minimum thread count on a pool
203 */
204static int
205svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206{
207	SVCPOOL *pool;
208	int newminthreads, error, n;
209
210	pool = oidp->oid_arg1;
211	newminthreads = pool->sp_minthreads;
212	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213	if (error == 0 && newminthreads != pool->sp_minthreads) {
214		if (newminthreads > pool->sp_maxthreads)
215			return (EINVAL);
216		mtx_lock(&pool->sp_lock);
217		if (newminthreads > pool->sp_minthreads
218		    && svcpool_active(pool)) {
219			/*
220			 * If the pool is running and we are
221			 * increasing, create some more threads now.
222			 */
223			n = newminthreads - pool->sp_threadcount;
224			if (n > 0) {
225				mtx_unlock(&pool->sp_lock);
226				while (n--)
227					svc_new_thread(pool);
228				mtx_lock(&pool->sp_lock);
229			}
230		}
231		pool->sp_minthreads = newminthreads;
232		mtx_unlock(&pool->sp_lock);
233	}
234	return (error);
235}
236
237/*
238 * Sysctl handler to set the maximum thread count on a pool
239 */
240static int
241svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242{
243	SVCPOOL *pool;
244	SVCTHREAD *st;
245	int newmaxthreads, error;
246
247	pool = oidp->oid_arg1;
248	newmaxthreads = pool->sp_maxthreads;
249	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251		if (newmaxthreads < pool->sp_minthreads)
252			return (EINVAL);
253		mtx_lock(&pool->sp_lock);
254		if (newmaxthreads < pool->sp_maxthreads
255		    && svcpool_active(pool)) {
256			/*
257			 * If the pool is running and we are
258			 * decreasing, wake up some idle threads to
259			 * encourage them to exit.
260			 */
261			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262				cv_signal(&st->st_cond);
263		}
264		pool->sp_maxthreads = newmaxthreads;
265		mtx_unlock(&pool->sp_lock);
266	}
267	return (error);
268}
269
270/*
271 * Activate a transport handle.
272 */
273void
274xprt_register(SVCXPRT *xprt)
275{
276	SVCPOOL *pool = xprt->xp_pool;
277
278	SVC_ACQUIRE(xprt);
279	mtx_lock(&pool->sp_lock);
280	xprt->xp_registered = TRUE;
281	xprt->xp_active = FALSE;
282	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
283	mtx_unlock(&pool->sp_lock);
284}
285
286/*
287 * De-activate a transport handle. Note: the locked version doesn't
288 * release the transport - caller must do that after dropping the pool
289 * lock.
290 */
291static void
292xprt_unregister_locked(SVCXPRT *xprt)
293{
294	SVCPOOL *pool = xprt->xp_pool;
295
296	KASSERT(xprt->xp_registered == TRUE,
297	    ("xprt_unregister_locked: not registered"));
298	if (xprt->xp_active) {
299		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
300		xprt->xp_active = FALSE;
301	}
302	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
303	xprt->xp_registered = FALSE;
304}
305
306void
307xprt_unregister(SVCXPRT *xprt)
308{
309	SVCPOOL *pool = xprt->xp_pool;
310
311	mtx_lock(&pool->sp_lock);
312	if (xprt->xp_registered == FALSE) {
313		/* Already unregistered by another thread */
314		mtx_unlock(&pool->sp_lock);
315		return;
316	}
317	xprt_unregister_locked(xprt);
318	mtx_unlock(&pool->sp_lock);
319
320	SVC_RELEASE(xprt);
321}
322
323static void
324xprt_assignthread(SVCXPRT *xprt)
325{
326	SVCPOOL *pool = xprt->xp_pool;
327	SVCTHREAD *st;
328
329	/*
330	 * Attempt to assign a service thread to this
331	 * transport.
332	 */
333	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
334		if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
335			break;
336	}
337	if (st) {
338		SVC_ACQUIRE(xprt);
339		xprt->xp_thread = st;
340		st->st_xprt = xprt;
341		cv_signal(&st->st_cond);
342	} else {
343		/*
344		 * See if we can create a new thread. The
345		 * actual thread creation happens in
346		 * svc_run_internal because our locking state
347		 * is poorly defined (we are typically called
348		 * from a socket upcall). Don't create more
349		 * than one thread per second.
350		 */
351		if (pool->sp_state == SVCPOOL_ACTIVE
352		    && pool->sp_lastcreatetime < time_uptime
353		    && pool->sp_threadcount < pool->sp_maxthreads) {
354			pool->sp_state = SVCPOOL_THREADWANTED;
355		}
356	}
357}
358
359void
360xprt_active(SVCXPRT *xprt)
361{
362	SVCPOOL *pool = xprt->xp_pool;
363
364	mtx_lock(&pool->sp_lock);
365
366	if (!xprt->xp_registered) {
367		/*
368		 * Race with xprt_unregister - we lose.
369		 */
370		mtx_unlock(&pool->sp_lock);
371		return;
372	}
373
374	if (!xprt->xp_active) {
375		TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
376		xprt->xp_active = TRUE;
377		xprt_assignthread(xprt);
378	}
379
380	mtx_unlock(&pool->sp_lock);
381}
382
383void
384xprt_inactive_locked(SVCXPRT *xprt)
385{
386	SVCPOOL *pool = xprt->xp_pool;
387
388	if (xprt->xp_active) {
389		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
390		xprt->xp_active = FALSE;
391	}
392}
393
394void
395xprt_inactive(SVCXPRT *xprt)
396{
397	SVCPOOL *pool = xprt->xp_pool;
398
399	mtx_lock(&pool->sp_lock);
400	xprt_inactive_locked(xprt);
401	mtx_unlock(&pool->sp_lock);
402}
403
404/*
405 * Add a service program to the callout list.
406 * The dispatch routine will be called when a rpc request for this
407 * program number comes in.
408 */
409bool_t
410svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
411    void (*dispatch)(struct svc_req *, SVCXPRT *),
412    const struct netconfig *nconf)
413{
414	SVCPOOL *pool = xprt->xp_pool;
415	struct svc_callout *s;
416	char *netid = NULL;
417	int flag = 0;
418
419/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
420
421	if (xprt->xp_netid) {
422		netid = strdup(xprt->xp_netid, M_RPC);
423		flag = 1;
424	} else if (nconf && nconf->nc_netid) {
425		netid = strdup(nconf->nc_netid, M_RPC);
426		flag = 1;
427	} /* must have been created with svc_raw_create */
428	if ((netid == NULL) && (flag == 1)) {
429		return (FALSE);
430	}
431
432	mtx_lock(&pool->sp_lock);
433	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
434		if (netid)
435			free(netid, M_RPC);
436		if (s->sc_dispatch == dispatch)
437			goto rpcb_it; /* he is registering another xptr */
438		mtx_unlock(&pool->sp_lock);
439		return (FALSE);
440	}
441	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
442	if (s == NULL) {
443		if (netid)
444			free(netid, M_RPC);
445		mtx_unlock(&pool->sp_lock);
446		return (FALSE);
447	}
448
449	s->sc_prog = prog;
450	s->sc_vers = vers;
451	s->sc_dispatch = dispatch;
452	s->sc_netid = netid;
453	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
454
455	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
456		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
457
458rpcb_it:
459	mtx_unlock(&pool->sp_lock);
460	/* now register the information with the local binder service */
461	if (nconf) {
462		bool_t dummy;
463		struct netconfig tnc;
464		struct netbuf nb;
465		tnc = *nconf;
466		nb.buf = &xprt->xp_ltaddr;
467		nb.len = xprt->xp_ltaddr.ss_len;
468		dummy = rpcb_set(prog, vers, &tnc, &nb);
469		return (dummy);
470	}
471	return (TRUE);
472}
473
474/*
475 * Remove a service program from the callout list.
476 */
477void
478svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
479{
480	struct svc_callout *s;
481
482	/* unregister the information anyway */
483	(void) rpcb_unset(prog, vers, NULL);
484	mtx_lock(&pool->sp_lock);
485	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
486		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
487		if (s->sc_netid)
488			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
489		mem_free(s, sizeof (struct svc_callout));
490	}
491	mtx_unlock(&pool->sp_lock);
492}
493
494/* ********************** CALLOUT list related stuff ************* */
495
496/*
497 * Search the callout list for a program number, return the callout
498 * struct.
499 */
500static struct svc_callout *
501svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
502{
503	struct svc_callout *s;
504
505	mtx_assert(&pool->sp_lock, MA_OWNED);
506	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
507		if (s->sc_prog == prog && s->sc_vers == vers
508		    && (netid == NULL || s->sc_netid == NULL ||
509			strcmp(netid, s->sc_netid) == 0))
510			break;
511	}
512
513	return (s);
514}
515
516/* ******************* REPLY GENERATION ROUTINES  ************ */
517
518static bool_t
519svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
520    struct mbuf *body)
521{
522	SVCXPRT *xprt = rqstp->rq_xprt;
523	bool_t ok;
524
525	if (rqstp->rq_args) {
526		m_freem(rqstp->rq_args);
527		rqstp->rq_args = NULL;
528	}
529
530	if (xprt->xp_pool->sp_rcache)
531		replay_setreply(xprt->xp_pool->sp_rcache,
532		    rply, svc_getrpccaller(rqstp), body);
533
534	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
535		return (FALSE);
536
537	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
538	if (rqstp->rq_addr) {
539		free(rqstp->rq_addr, M_SONAME);
540		rqstp->rq_addr = NULL;
541	}
542
543	return (ok);
544}
545
546/*
547 * Send a reply to an rpc request
548 */
549bool_t
550svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
551{
552	struct rpc_msg rply;
553	struct mbuf *m;
554	XDR xdrs;
555	bool_t ok;
556
557	rply.rm_xid = rqstp->rq_xid;
558	rply.rm_direction = REPLY;
559	rply.rm_reply.rp_stat = MSG_ACCEPTED;
560	rply.acpted_rply.ar_verf = rqstp->rq_verf;
561	rply.acpted_rply.ar_stat = SUCCESS;
562	rply.acpted_rply.ar_results.where = NULL;
563	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
564
565	MGET(m, M_WAIT, MT_DATA);
566	MCLGET(m, M_WAIT);
567	m->m_len = 0;
568	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
569	ok = xdr_results(&xdrs, xdr_location);
570	XDR_DESTROY(&xdrs);
571
572	if (ok) {
573		return (svc_sendreply_common(rqstp, &rply, m));
574	} else {
575		m_freem(m);
576		return (FALSE);
577	}
578}
579
580bool_t
581svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
582{
583	struct rpc_msg rply;
584
585	rply.rm_xid = rqstp->rq_xid;
586	rply.rm_direction = REPLY;
587	rply.rm_reply.rp_stat = MSG_ACCEPTED;
588	rply.acpted_rply.ar_verf = rqstp->rq_verf;
589	rply.acpted_rply.ar_stat = SUCCESS;
590	rply.acpted_rply.ar_results.where = NULL;
591	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
592
593	return (svc_sendreply_common(rqstp, &rply, m));
594}
595
596/*
597 * No procedure error reply
598 */
599void
600svcerr_noproc(struct svc_req *rqstp)
601{
602	SVCXPRT *xprt = rqstp->rq_xprt;
603	struct rpc_msg rply;
604
605	rply.rm_xid = rqstp->rq_xid;
606	rply.rm_direction = REPLY;
607	rply.rm_reply.rp_stat = MSG_ACCEPTED;
608	rply.acpted_rply.ar_verf = rqstp->rq_verf;
609	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
610
611	if (xprt->xp_pool->sp_rcache)
612		replay_setreply(xprt->xp_pool->sp_rcache,
613		    &rply, svc_getrpccaller(rqstp), NULL);
614
615	svc_sendreply_common(rqstp, &rply, NULL);
616}
617
618/*
619 * Can't decode args error reply
620 */
621void
622svcerr_decode(struct svc_req *rqstp)
623{
624	SVCXPRT *xprt = rqstp->rq_xprt;
625	struct rpc_msg rply;
626
627	rply.rm_xid = rqstp->rq_xid;
628	rply.rm_direction = REPLY;
629	rply.rm_reply.rp_stat = MSG_ACCEPTED;
630	rply.acpted_rply.ar_verf = rqstp->rq_verf;
631	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
632
633	if (xprt->xp_pool->sp_rcache)
634		replay_setreply(xprt->xp_pool->sp_rcache,
635		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
636
637	svc_sendreply_common(rqstp, &rply, NULL);
638}
639
640/*
641 * Some system error
642 */
643void
644svcerr_systemerr(struct svc_req *rqstp)
645{
646	SVCXPRT *xprt = rqstp->rq_xprt;
647	struct rpc_msg rply;
648
649	rply.rm_xid = rqstp->rq_xid;
650	rply.rm_direction = REPLY;
651	rply.rm_reply.rp_stat = MSG_ACCEPTED;
652	rply.acpted_rply.ar_verf = rqstp->rq_verf;
653	rply.acpted_rply.ar_stat = SYSTEM_ERR;
654
655	if (xprt->xp_pool->sp_rcache)
656		replay_setreply(xprt->xp_pool->sp_rcache,
657		    &rply, svc_getrpccaller(rqstp), NULL);
658
659	svc_sendreply_common(rqstp, &rply, NULL);
660}
661
662/*
663 * Authentication error reply
664 */
665void
666svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
667{
668	SVCXPRT *xprt = rqstp->rq_xprt;
669	struct rpc_msg rply;
670
671	rply.rm_xid = rqstp->rq_xid;
672	rply.rm_direction = REPLY;
673	rply.rm_reply.rp_stat = MSG_DENIED;
674	rply.rjcted_rply.rj_stat = AUTH_ERROR;
675	rply.rjcted_rply.rj_why = why;
676
677	if (xprt->xp_pool->sp_rcache)
678		replay_setreply(xprt->xp_pool->sp_rcache,
679		    &rply, svc_getrpccaller(rqstp), NULL);
680
681	svc_sendreply_common(rqstp, &rply, NULL);
682}
683
684/*
685 * Auth too weak error reply
686 */
687void
688svcerr_weakauth(struct svc_req *rqstp)
689{
690
691	svcerr_auth(rqstp, AUTH_TOOWEAK);
692}
693
694/*
695 * Program unavailable error reply
696 */
697void
698svcerr_noprog(struct svc_req *rqstp)
699{
700	SVCXPRT *xprt = rqstp->rq_xprt;
701	struct rpc_msg rply;
702
703	rply.rm_xid = rqstp->rq_xid;
704	rply.rm_direction = REPLY;
705	rply.rm_reply.rp_stat = MSG_ACCEPTED;
706	rply.acpted_rply.ar_verf = rqstp->rq_verf;
707	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
708
709	if (xprt->xp_pool->sp_rcache)
710		replay_setreply(xprt->xp_pool->sp_rcache,
711		    &rply, svc_getrpccaller(rqstp), NULL);
712
713	svc_sendreply_common(rqstp, &rply, NULL);
714}
715
716/*
717 * Program version mismatch error reply
718 */
719void
720svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
721{
722	SVCXPRT *xprt = rqstp->rq_xprt;
723	struct rpc_msg rply;
724
725	rply.rm_xid = rqstp->rq_xid;
726	rply.rm_direction = REPLY;
727	rply.rm_reply.rp_stat = MSG_ACCEPTED;
728	rply.acpted_rply.ar_verf = rqstp->rq_verf;
729	rply.acpted_rply.ar_stat = PROG_MISMATCH;
730	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
731	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
732
733	if (xprt->xp_pool->sp_rcache)
734		replay_setreply(xprt->xp_pool->sp_rcache,
735		    &rply, svc_getrpccaller(rqstp), NULL);
736
737	svc_sendreply_common(rqstp, &rply, NULL);
738}
739
740/*
741 * Allocate a new server transport structure. All fields are
742 * initialized to zero and xp_p3 is initialized to point at an
743 * extension structure to hold various flags and authentication
744 * parameters.
745 */
746SVCXPRT *
747svc_xprt_alloc()
748{
749	SVCXPRT *xprt;
750	SVCXPRT_EXT *ext;
751
752	xprt = mem_alloc(sizeof(SVCXPRT));
753	memset(xprt, 0, sizeof(SVCXPRT));
754	ext = mem_alloc(sizeof(SVCXPRT_EXT));
755	memset(ext, 0, sizeof(SVCXPRT_EXT));
756	xprt->xp_p3 = ext;
757	refcount_init(&xprt->xp_refs, 1);
758
759	return (xprt);
760}
761
762/*
763 * Free a server transport structure.
764 */
765void
766svc_xprt_free(xprt)
767	SVCXPRT *xprt;
768{
769
770	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
771	mem_free(xprt, sizeof(SVCXPRT));
772}
773
774/* ******************* SERVER INPUT STUFF ******************* */
775
776/*
777 * Read RPC requests from a transport and queue them to be
778 * executed. We handle authentication and replay cache replies here.
779 * Actually dispatching the RPC is deferred till svc_executereq.
780 */
781static enum xprt_stat
782svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
783{
784	SVCPOOL *pool = xprt->xp_pool;
785	struct svc_req *r;
786	struct rpc_msg msg;
787	struct mbuf *args;
788	enum xprt_stat stat;
789
790	/* now receive msgs from xprtprt (support batch calls) */
791	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
792
793	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
794	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
795	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
796	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
797		enum auth_stat why;
798
799		/*
800		 * Handle replays and authenticate before queuing the
801		 * request to be executed.
802		 */
803		SVC_ACQUIRE(xprt);
804		r->rq_xprt = xprt;
805		if (pool->sp_rcache) {
806			struct rpc_msg repmsg;
807			struct mbuf *repbody;
808			enum replay_state rs;
809			rs = replay_find(pool->sp_rcache, &msg,
810			    svc_getrpccaller(r), &repmsg, &repbody);
811			switch (rs) {
812			case RS_NEW:
813				break;
814			case RS_DONE:
815				SVC_REPLY(xprt, &repmsg, r->rq_addr,
816				    repbody);
817				if (r->rq_addr) {
818					free(r->rq_addr, M_SONAME);
819					r->rq_addr = NULL;
820				}
821				m_freem(args);
822				goto call_done;
823
824			default:
825				m_freem(args);
826				goto call_done;
827			}
828		}
829
830		r->rq_xid = msg.rm_xid;
831		r->rq_prog = msg.rm_call.cb_prog;
832		r->rq_vers = msg.rm_call.cb_vers;
833		r->rq_proc = msg.rm_call.cb_proc;
834		r->rq_size = sizeof(*r) + m_length(args, NULL);
835		r->rq_args = args;
836		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
837			/*
838			 * RPCSEC_GSS uses this return code
839			 * for requests that form part of its
840			 * context establishment protocol and
841			 * should not be dispatched to the
842			 * application.
843			 */
844			if (why != RPCSEC_GSS_NODISPATCH)
845				svcerr_auth(r, why);
846			goto call_done;
847		}
848
849		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
850			svcerr_decode(r);
851			goto call_done;
852		}
853
854		/*
855		 * Everything checks out, return request to caller.
856		 */
857		*rqstp_ret = r;
858		r = NULL;
859	}
860call_done:
861	if (r) {
862		svc_freereq(r);
863		r = NULL;
864	}
865	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
866		xprt_unregister(xprt);
867	}
868
869	return (stat);
870}
871
872static void
873svc_executereq(struct svc_req *rqstp)
874{
875	SVCXPRT *xprt = rqstp->rq_xprt;
876	SVCPOOL *pool = xprt->xp_pool;
877	int prog_found;
878	rpcvers_t low_vers;
879	rpcvers_t high_vers;
880	struct svc_callout *s;
881
882	/* now match message with a registered service*/
883	prog_found = FALSE;
884	low_vers = (rpcvers_t) -1L;
885	high_vers = (rpcvers_t) 0L;
886	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
887		if (s->sc_prog == rqstp->rq_prog) {
888			if (s->sc_vers == rqstp->rq_vers) {
889				/*
890				 * We hand ownership of r to the
891				 * dispatch method - they must call
892				 * svc_freereq.
893				 */
894				(*s->sc_dispatch)(rqstp, xprt);
895				return;
896			}  /* found correct version */
897			prog_found = TRUE;
898			if (s->sc_vers < low_vers)
899				low_vers = s->sc_vers;
900			if (s->sc_vers > high_vers)
901				high_vers = s->sc_vers;
902		}   /* found correct program */
903	}
904
905	/*
906	 * if we got here, the program or version
907	 * is not served ...
908	 */
909	if (prog_found)
910		svcerr_progvers(rqstp, low_vers, high_vers);
911	else
912		svcerr_noprog(rqstp);
913
914	svc_freereq(rqstp);
915}
916
917static void
918svc_checkidle(SVCPOOL *pool)
919{
920	SVCXPRT *xprt, *nxprt;
921	time_t timo;
922	struct svcxprt_list cleanup;
923
924	TAILQ_INIT(&cleanup);
925	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
926		/*
927		 * Only some transports have idle timers. Don't time
928		 * something out which is just waking up.
929		 */
930		if (!xprt->xp_idletimeout || xprt->xp_thread)
931			continue;
932
933		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
934		if (time_uptime > timo) {
935			xprt_unregister_locked(xprt);
936			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
937		}
938	}
939
940	mtx_unlock(&pool->sp_lock);
941	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
942		SVC_RELEASE(xprt);
943	}
944	mtx_lock(&pool->sp_lock);
945
946}
947
948static void
949svc_assign_waiting_sockets(SVCPOOL *pool)
950{
951	SVCXPRT *xprt;
952
953	TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
954		if (!xprt->xp_thread) {
955			xprt_assignthread(xprt);
956		}
957	}
958}
959
960static bool_t
961svc_request_space_available(SVCPOOL *pool)
962{
963
964	mtx_assert(&pool->sp_lock, MA_OWNED);
965
966	if (pool->sp_space_throttled) {
967		/*
968		 * Below the low-water yet? If so, assign any waiting sockets.
969		 */
970		if (pool->sp_space_used < pool->sp_space_low) {
971			pool->sp_space_throttled = FALSE;
972			svc_assign_waiting_sockets(pool);
973			return TRUE;
974		}
975
976		return FALSE;
977	} else {
978		if (pool->sp_space_used
979		    >= pool->sp_space_high) {
980			pool->sp_space_throttled = TRUE;
981			pool->sp_space_throttle_count++;
982			return FALSE;
983		}
984
985		return TRUE;
986	}
987}
988
989static void
990svc_run_internal(SVCPOOL *pool, bool_t ismaster)
991{
992	SVCTHREAD *st, *stpref;
993	SVCXPRT *xprt;
994	enum xprt_stat stat;
995	struct svc_req *rqstp;
996	int error;
997
998	st = mem_alloc(sizeof(*st));
999	st->st_xprt = NULL;
1000	STAILQ_INIT(&st->st_reqs);
1001	cv_init(&st->st_cond, "rpcsvc");
1002
1003	mtx_lock(&pool->sp_lock);
1004	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1005
1006	/*
1007	 * If we are a new thread which was spawned to cope with
1008	 * increased load, set the state back to SVCPOOL_ACTIVE.
1009	 */
1010	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1011		pool->sp_state = SVCPOOL_ACTIVE;
1012
1013	while (pool->sp_state != SVCPOOL_CLOSING) {
1014		/*
1015		 * Create new thread if requested.
1016		 */
1017		if (pool->sp_state == SVCPOOL_THREADWANTED) {
1018			pool->sp_state = SVCPOOL_THREADSTARTING;
1019			pool->sp_lastcreatetime = time_uptime;
1020			mtx_unlock(&pool->sp_lock);
1021			svc_new_thread(pool);
1022			mtx_lock(&pool->sp_lock);
1023			continue;
1024		}
1025
1026		/*
1027		 * Check for idle transports once per second.
1028		 */
1029		if (time_uptime > pool->sp_lastidlecheck) {
1030			pool->sp_lastidlecheck = time_uptime;
1031			svc_checkidle(pool);
1032		}
1033
1034		xprt = st->st_xprt;
1035		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1036			/*
1037			 * Enforce maxthreads count.
1038			 */
1039			if (pool->sp_threadcount > pool->sp_maxthreads)
1040				break;
1041
1042			/*
1043			 * Before sleeping, see if we can find an
1044			 * active transport which isn't being serviced
1045			 * by a thread.
1046			 */
1047			if (svc_request_space_available(pool)) {
1048				TAILQ_FOREACH(xprt, &pool->sp_active,
1049				    xp_alink) {
1050					if (!xprt->xp_thread) {
1051						SVC_ACQUIRE(xprt);
1052						xprt->xp_thread = st;
1053						st->st_xprt = xprt;
1054						break;
1055					}
1056				}
1057			}
1058			if (st->st_xprt)
1059				continue;
1060
1061			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1062			if (ismaster || (!ismaster &&
1063			    pool->sp_threadcount > pool->sp_minthreads))
1064				error = cv_timedwait_sig(&st->st_cond,
1065				    &pool->sp_lock, 5 * hz);
1066			else
1067				error = cv_wait_sig(&st->st_cond,
1068				    &pool->sp_lock);
1069			LIST_REMOVE(st, st_ilink);
1070
1071			/*
1072			 * Reduce worker thread count when idle.
1073			 */
1074			if (error == EWOULDBLOCK) {
1075				if (!ismaster
1076				    && (pool->sp_threadcount
1077					> pool->sp_minthreads)
1078					&& !st->st_xprt
1079					&& STAILQ_EMPTY(&st->st_reqs))
1080					break;
1081			} else if (error) {
1082				mtx_unlock(&pool->sp_lock);
1083				svc_exit(pool);
1084				mtx_lock(&pool->sp_lock);
1085				break;
1086			}
1087			continue;
1088		}
1089
1090		if (xprt) {
1091			/*
1092			 * Drain the transport socket and queue up any
1093			 * RPCs.
1094			 */
1095			xprt->xp_lastactive = time_uptime;
1096			stat = XPRT_IDLE;
1097			do {
1098				if (!svc_request_space_available(pool))
1099					break;
1100				rqstp = NULL;
1101				mtx_unlock(&pool->sp_lock);
1102				stat = svc_getreq(xprt, &rqstp);
1103				mtx_lock(&pool->sp_lock);
1104				if (rqstp) {
1105					/*
1106					 * See if the application has
1107					 * a preference for some other
1108					 * thread.
1109					 */
1110					stpref = st;
1111					if (pool->sp_assign)
1112						stpref = pool->sp_assign(st,
1113						    rqstp);
1114
1115					pool->sp_space_used +=
1116						rqstp->rq_size;
1117					if (pool->sp_space_used
1118					    > pool->sp_space_used_highest)
1119						pool->sp_space_used_highest =
1120							pool->sp_space_used;
1121					rqstp->rq_thread = stpref;
1122					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1123					    rqstp, rq_link);
1124					stpref->st_reqcount++;
1125
1126					/*
1127					 * If we assigned the request
1128					 * to another thread, make
1129					 * sure its awake and continue
1130					 * reading from the
1131					 * socket. Otherwise, try to
1132					 * find some other thread to
1133					 * read from the socket and
1134					 * execute the request
1135					 * immediately.
1136					 */
1137					if (stpref != st) {
1138						cv_signal(&stpref->st_cond);
1139						continue;
1140					} else {
1141						break;
1142					}
1143				}
1144			} while (stat == XPRT_MOREREQS
1145			    && pool->sp_state != SVCPOOL_CLOSING);
1146
1147			/*
1148			 * Move this transport to the end of the
1149			 * active list to ensure fairness when
1150			 * multiple transports are active. If this was
1151			 * the last queued request, svc_getreq will
1152			 * end up calling xprt_inactive to remove from
1153			 * the active list.
1154			 */
1155			xprt->xp_thread = NULL;
1156			st->st_xprt = NULL;
1157			if (xprt->xp_active) {
1158				xprt_assignthread(xprt);
1159				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1160				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1161				    xp_alink);
1162			}
1163			mtx_unlock(&pool->sp_lock);
1164			SVC_RELEASE(xprt);
1165			mtx_lock(&pool->sp_lock);
1166		}
1167
1168		/*
1169		 * Execute what we have queued.
1170		 */
1171		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1172			size_t sz = rqstp->rq_size;
1173			mtx_unlock(&pool->sp_lock);
1174			svc_executereq(rqstp);
1175			mtx_lock(&pool->sp_lock);
1176			pool->sp_space_used -= sz;
1177		}
1178	}
1179
1180	if (st->st_xprt) {
1181		xprt = st->st_xprt;
1182		st->st_xprt = NULL;
1183		SVC_RELEASE(xprt);
1184	}
1185
1186	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1187	LIST_REMOVE(st, st_link);
1188	pool->sp_threadcount--;
1189
1190	mtx_unlock(&pool->sp_lock);
1191
1192	cv_destroy(&st->st_cond);
1193	mem_free(st, sizeof(*st));
1194
1195	if (!ismaster)
1196		wakeup(pool);
1197}
1198
1199static void
1200svc_thread_start(void *arg)
1201{
1202
1203	svc_run_internal((SVCPOOL *) arg, FALSE);
1204	kthread_exit();
1205}
1206
1207static void
1208svc_new_thread(SVCPOOL *pool)
1209{
1210	struct thread *td;
1211
1212	pool->sp_threadcount++;
1213	kthread_add(svc_thread_start, pool,
1214	    pool->sp_proc, &td, 0, 0,
1215	    "%s: service", pool->sp_name);
1216}
1217
1218void
1219svc_run(SVCPOOL *pool)
1220{
1221	int i;
1222	struct proc *p;
1223	struct thread *td;
1224
1225	p = curproc;
1226	td = curthread;
1227	snprintf(td->td_name, sizeof(td->td_name),
1228	    "%s: master", pool->sp_name);
1229	pool->sp_state = SVCPOOL_ACTIVE;
1230	pool->sp_proc = p;
1231	pool->sp_lastcreatetime = time_uptime;
1232	pool->sp_threadcount = 1;
1233
1234	for (i = 1; i < pool->sp_minthreads; i++) {
1235		svc_new_thread(pool);
1236	}
1237
1238	svc_run_internal(pool, TRUE);
1239
1240	mtx_lock(&pool->sp_lock);
1241	while (pool->sp_threadcount > 0)
1242		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1243	mtx_unlock(&pool->sp_lock);
1244}
1245
1246void
1247svc_exit(SVCPOOL *pool)
1248{
1249	SVCTHREAD *st;
1250
1251	mtx_lock(&pool->sp_lock);
1252
1253	if (pool->sp_state != SVCPOOL_CLOSING) {
1254		pool->sp_state = SVCPOOL_CLOSING;
1255		LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1256			cv_signal(&st->st_cond);
1257	}
1258
1259	mtx_unlock(&pool->sp_lock);
1260}
1261
1262bool_t
1263svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1264{
1265	struct mbuf *m;
1266	XDR xdrs;
1267	bool_t stat;
1268
1269	m = rqstp->rq_args;
1270	rqstp->rq_args = NULL;
1271
1272	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1273	stat = xargs(&xdrs, args);
1274	XDR_DESTROY(&xdrs);
1275
1276	return (stat);
1277}
1278
1279bool_t
1280svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1281{
1282	XDR xdrs;
1283
1284	if (rqstp->rq_addr) {
1285		free(rqstp->rq_addr, M_SONAME);
1286		rqstp->rq_addr = NULL;
1287	}
1288
1289	xdrs.x_op = XDR_FREE;
1290	return (xargs(&xdrs, args));
1291}
1292
1293void
1294svc_freereq(struct svc_req *rqstp)
1295{
1296	SVCTHREAD *st;
1297	SVCXPRT *xprt;
1298	SVCPOOL *pool;
1299
1300	st = rqstp->rq_thread;
1301	xprt = rqstp->rq_xprt;
1302	if (xprt)
1303		pool = xprt->xp_pool;
1304	else
1305		pool = NULL;
1306	if (st) {
1307		mtx_lock(&pool->sp_lock);
1308		KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1309		    ("Freeing request out of order"));
1310		STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1311		st->st_reqcount--;
1312		if (pool->sp_done)
1313			pool->sp_done(st, rqstp);
1314		mtx_unlock(&pool->sp_lock);
1315	}
1316
1317	if (rqstp->rq_auth.svc_ah_ops)
1318		SVCAUTH_RELEASE(&rqstp->rq_auth);
1319
1320	if (rqstp->rq_xprt) {
1321		SVC_RELEASE(rqstp->rq_xprt);
1322	}
1323
1324	if (rqstp->rq_addr)
1325		free(rqstp->rq_addr, M_SONAME);
1326
1327	if (rqstp->rq_args)
1328		m_freem(rqstp->rq_args);
1329
1330	free(rqstp, M_RPC);
1331}
1332