svc.c revision 261066
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/9/sys/rpc/svc.c 261066 2014-01-23 00:45:20Z mav $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/ucred.h>
60
61#include <rpc/rpc.h>
62#include <rpc/rpcb_clnt.h>
63#include <rpc/replay.h>
64
65#include <rpc/rpc_com.h>
66
67#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
68#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
69
70static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
71    char *);
72static void svc_new_thread(SVCPOOL *pool);
73static void xprt_unregister_locked(SVCXPRT *xprt);
74static void svc_change_space_used(SVCPOOL *pool, int delta);
75static bool_t svc_request_space_available(SVCPOOL *pool);
76
77/* ***************  SVCXPRT related stuff **************** */
78
79static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
80static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
81
82SVCPOOL*
83svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
84{
85	SVCPOOL *pool;
86
87	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
88
89	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
90	pool->sp_name = name;
91	pool->sp_state = SVCPOOL_INIT;
92	pool->sp_proc = NULL;
93	TAILQ_INIT(&pool->sp_xlist);
94	TAILQ_INIT(&pool->sp_active);
95	TAILQ_INIT(&pool->sp_callouts);
96	LIST_INIT(&pool->sp_threads);
97	LIST_INIT(&pool->sp_idlethreads);
98	pool->sp_minthreads = 1;
99	pool->sp_maxthreads = 1;
100	pool->sp_threadcount = 0;
101
102	/*
103	 * Don't use more than a quarter of mbuf clusters or more than
104	 * 45Mb buffering requests.
105	 */
106	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
107	if (pool->sp_space_high > 45 << 20)
108		pool->sp_space_high = 45 << 20;
109	pool->sp_space_low = 2 * pool->sp_space_high / 3;
110
111	sysctl_ctx_init(&pool->sp_sysctl);
112	if (sysctl_base) {
113		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
114		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
115		    pool, 0, svcpool_minthread_sysctl, "I", "");
116		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
117		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
118		    pool, 0, svcpool_maxthread_sysctl, "I", "");
119		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
120		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
121
122		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
123		    "request_space_used", CTLFLAG_RD,
124		    &pool->sp_space_used, 0,
125		    "Space in parsed but not handled requests.");
126
127		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
128		    "request_space_used_highest", CTLFLAG_RD,
129		    &pool->sp_space_used_highest, 0,
130		    "Highest space used since reboot.");
131
132		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133		    "request_space_high", CTLFLAG_RW,
134		    &pool->sp_space_high, 0,
135		    "Maximum space in parsed but not handled requests.");
136
137		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138		    "request_space_low", CTLFLAG_RW,
139		    &pool->sp_space_low, 0,
140		    "Low water mark for request space.");
141
142		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143		    "request_space_throttled", CTLFLAG_RD,
144		    &pool->sp_space_throttled, 0,
145		    "Whether nfs requests are currently throttled");
146
147		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148		    "request_space_throttle_count", CTLFLAG_RD,
149		    &pool->sp_space_throttle_count, 0,
150		    "Count of times throttling based on request space has occurred");
151	}
152
153	return pool;
154}
155
156void
157svcpool_destroy(SVCPOOL *pool)
158{
159	SVCXPRT *xprt, *nxprt;
160	struct svc_callout *s;
161	struct svcxprt_list cleanup;
162
163	TAILQ_INIT(&cleanup);
164	mtx_lock(&pool->sp_lock);
165
166	while (TAILQ_FIRST(&pool->sp_xlist)) {
167		xprt = TAILQ_FIRST(&pool->sp_xlist);
168		xprt_unregister_locked(xprt);
169		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
170	}
171
172	while (TAILQ_FIRST(&pool->sp_callouts)) {
173		s = TAILQ_FIRST(&pool->sp_callouts);
174		mtx_unlock(&pool->sp_lock);
175		svc_unreg(pool, s->sc_prog, s->sc_vers);
176		mtx_lock(&pool->sp_lock);
177	}
178	mtx_unlock(&pool->sp_lock);
179
180	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181		SVC_RELEASE(xprt);
182	}
183
184	mtx_destroy(&pool->sp_lock);
185
186	if (pool->sp_rcache)
187		replay_freecache(pool->sp_rcache);
188
189	sysctl_ctx_free(&pool->sp_sysctl);
190	free(pool, M_RPC);
191}
192
193static bool_t
194svcpool_active(SVCPOOL *pool)
195{
196	enum svcpool_state state = pool->sp_state;
197
198	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
199		return (FALSE);
200	return (TRUE);
201}
202
203/*
204 * Sysctl handler to set the minimum thread count on a pool
205 */
206static int
207svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
208{
209	SVCPOOL *pool;
210	int newminthreads, error, n;
211
212	pool = oidp->oid_arg1;
213	newminthreads = pool->sp_minthreads;
214	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
215	if (error == 0 && newminthreads != pool->sp_minthreads) {
216		if (newminthreads > pool->sp_maxthreads)
217			return (EINVAL);
218		mtx_lock(&pool->sp_lock);
219		if (newminthreads > pool->sp_minthreads
220		    && svcpool_active(pool)) {
221			/*
222			 * If the pool is running and we are
223			 * increasing, create some more threads now.
224			 */
225			n = newminthreads - pool->sp_threadcount;
226			if (n > 0) {
227				mtx_unlock(&pool->sp_lock);
228				while (n--)
229					svc_new_thread(pool);
230				mtx_lock(&pool->sp_lock);
231			}
232		}
233		pool->sp_minthreads = newminthreads;
234		mtx_unlock(&pool->sp_lock);
235	}
236	return (error);
237}
238
239/*
240 * Sysctl handler to set the maximum thread count on a pool
241 */
242static int
243svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
244{
245	SVCPOOL *pool;
246	SVCTHREAD *st;
247	int newmaxthreads, error;
248
249	pool = oidp->oid_arg1;
250	newmaxthreads = pool->sp_maxthreads;
251	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
252	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
253		if (newmaxthreads < pool->sp_minthreads)
254			return (EINVAL);
255		mtx_lock(&pool->sp_lock);
256		if (newmaxthreads < pool->sp_maxthreads
257		    && svcpool_active(pool)) {
258			/*
259			 * If the pool is running and we are
260			 * decreasing, wake up some idle threads to
261			 * encourage them to exit.
262			 */
263			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
264				cv_signal(&st->st_cond);
265		}
266		pool->sp_maxthreads = newmaxthreads;
267		mtx_unlock(&pool->sp_lock);
268	}
269	return (error);
270}
271
272/*
273 * Activate a transport handle.
274 */
275void
276xprt_register(SVCXPRT *xprt)
277{
278	SVCPOOL *pool = xprt->xp_pool;
279
280	SVC_ACQUIRE(xprt);
281	mtx_lock(&pool->sp_lock);
282	xprt->xp_registered = TRUE;
283	xprt->xp_active = FALSE;
284	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
285	mtx_unlock(&pool->sp_lock);
286}
287
288/*
289 * De-activate a transport handle. Note: the locked version doesn't
290 * release the transport - caller must do that after dropping the pool
291 * lock.
292 */
293static void
294xprt_unregister_locked(SVCXPRT *xprt)
295{
296	SVCPOOL *pool = xprt->xp_pool;
297
298	mtx_assert(&pool->sp_lock, MA_OWNED);
299	KASSERT(xprt->xp_registered == TRUE,
300	    ("xprt_unregister_locked: not registered"));
301	xprt_inactive_locked(xprt);
302	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
303	xprt->xp_registered = FALSE;
304}
305
306void
307xprt_unregister(SVCXPRT *xprt)
308{
309	SVCPOOL *pool = xprt->xp_pool;
310
311	mtx_lock(&pool->sp_lock);
312	if (xprt->xp_registered == FALSE) {
313		/* Already unregistered by another thread */
314		mtx_unlock(&pool->sp_lock);
315		return;
316	}
317	xprt_unregister_locked(xprt);
318	mtx_unlock(&pool->sp_lock);
319
320	SVC_RELEASE(xprt);
321}
322
323/*
324 * Attempt to assign a service thread to this transport.
325 */
326static int
327xprt_assignthread(SVCXPRT *xprt)
328{
329	SVCPOOL *pool = xprt->xp_pool;
330	SVCTHREAD *st;
331
332	mtx_assert(&pool->sp_lock, MA_OWNED);
333	st = LIST_FIRST(&pool->sp_idlethreads);
334	if (st) {
335		LIST_REMOVE(st, st_ilink);
336		st->st_idle = FALSE;
337		SVC_ACQUIRE(xprt);
338		xprt->xp_thread = st;
339		st->st_xprt = xprt;
340		cv_signal(&st->st_cond);
341		return (TRUE);
342	} else {
343		/*
344		 * See if we can create a new thread. The
345		 * actual thread creation happens in
346		 * svc_run_internal because our locking state
347		 * is poorly defined (we are typically called
348		 * from a socket upcall). Don't create more
349		 * than one thread per second.
350		 */
351		if (pool->sp_state == SVCPOOL_ACTIVE
352		    && pool->sp_lastcreatetime < time_uptime
353		    && pool->sp_threadcount < pool->sp_maxthreads) {
354			pool->sp_state = SVCPOOL_THREADWANTED;
355		}
356	}
357	return (FALSE);
358}
359
360void
361xprt_active(SVCXPRT *xprt)
362{
363	SVCPOOL *pool = xprt->xp_pool;
364
365	mtx_lock(&pool->sp_lock);
366
367	if (!xprt->xp_registered) {
368		/*
369		 * Race with xprt_unregister - we lose.
370		 */
371		mtx_unlock(&pool->sp_lock);
372		return;
373	}
374
375	if (!xprt->xp_active) {
376		xprt->xp_active = TRUE;
377		if (xprt->xp_thread == NULL) {
378			if (!svc_request_space_available(pool) ||
379			    !xprt_assignthread(xprt))
380				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
381				    xp_alink);
382		}
383	}
384
385	mtx_unlock(&pool->sp_lock);
386}
387
388void
389xprt_inactive_locked(SVCXPRT *xprt)
390{
391	SVCPOOL *pool = xprt->xp_pool;
392
393	mtx_assert(&pool->sp_lock, MA_OWNED);
394	if (xprt->xp_active) {
395		if (xprt->xp_thread == NULL)
396			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
397		xprt->xp_active = FALSE;
398	}
399}
400
401void
402xprt_inactive(SVCXPRT *xprt)
403{
404	SVCPOOL *pool = xprt->xp_pool;
405
406	mtx_lock(&pool->sp_lock);
407	xprt_inactive_locked(xprt);
408	mtx_unlock(&pool->sp_lock);
409}
410
411/*
412 * Variant of xprt_inactive() for use only when sure that port is
413 * assigned to thread. For example, withing receive handlers.
414 */
415void
416xprt_inactive_self(SVCXPRT *xprt)
417{
418
419	KASSERT(xprt->xp_thread != NULL,
420	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
421	xprt->xp_active = FALSE;
422}
423
424/*
425 * Add a service program to the callout list.
426 * The dispatch routine will be called when a rpc request for this
427 * program number comes in.
428 */
429bool_t
430svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
431    void (*dispatch)(struct svc_req *, SVCXPRT *),
432    const struct netconfig *nconf)
433{
434	SVCPOOL *pool = xprt->xp_pool;
435	struct svc_callout *s;
436	char *netid = NULL;
437	int flag = 0;
438
439/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
440
441	if (xprt->xp_netid) {
442		netid = strdup(xprt->xp_netid, M_RPC);
443		flag = 1;
444	} else if (nconf && nconf->nc_netid) {
445		netid = strdup(nconf->nc_netid, M_RPC);
446		flag = 1;
447	} /* must have been created with svc_raw_create */
448	if ((netid == NULL) && (flag == 1)) {
449		return (FALSE);
450	}
451
452	mtx_lock(&pool->sp_lock);
453	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
454		if (netid)
455			free(netid, M_RPC);
456		if (s->sc_dispatch == dispatch)
457			goto rpcb_it; /* he is registering another xptr */
458		mtx_unlock(&pool->sp_lock);
459		return (FALSE);
460	}
461	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
462	if (s == NULL) {
463		if (netid)
464			free(netid, M_RPC);
465		mtx_unlock(&pool->sp_lock);
466		return (FALSE);
467	}
468
469	s->sc_prog = prog;
470	s->sc_vers = vers;
471	s->sc_dispatch = dispatch;
472	s->sc_netid = netid;
473	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
474
475	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
476		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
477
478rpcb_it:
479	mtx_unlock(&pool->sp_lock);
480	/* now register the information with the local binder service */
481	if (nconf) {
482		bool_t dummy;
483		struct netconfig tnc;
484		struct netbuf nb;
485		tnc = *nconf;
486		nb.buf = &xprt->xp_ltaddr;
487		nb.len = xprt->xp_ltaddr.ss_len;
488		dummy = rpcb_set(prog, vers, &tnc, &nb);
489		return (dummy);
490	}
491	return (TRUE);
492}
493
494/*
495 * Remove a service program from the callout list.
496 */
497void
498svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
499{
500	struct svc_callout *s;
501
502	/* unregister the information anyway */
503	(void) rpcb_unset(prog, vers, NULL);
504	mtx_lock(&pool->sp_lock);
505	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
506		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
507		if (s->sc_netid)
508			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
509		mem_free(s, sizeof (struct svc_callout));
510	}
511	mtx_unlock(&pool->sp_lock);
512}
513
514/* ********************** CALLOUT list related stuff ************* */
515
516/*
517 * Search the callout list for a program number, return the callout
518 * struct.
519 */
520static struct svc_callout *
521svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
522{
523	struct svc_callout *s;
524
525	mtx_assert(&pool->sp_lock, MA_OWNED);
526	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
527		if (s->sc_prog == prog && s->sc_vers == vers
528		    && (netid == NULL || s->sc_netid == NULL ||
529			strcmp(netid, s->sc_netid) == 0))
530			break;
531	}
532
533	return (s);
534}
535
536/* ******************* REPLY GENERATION ROUTINES  ************ */
537
538static bool_t
539svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
540    struct mbuf *body)
541{
542	SVCXPRT *xprt = rqstp->rq_xprt;
543	bool_t ok;
544
545	if (rqstp->rq_args) {
546		m_freem(rqstp->rq_args);
547		rqstp->rq_args = NULL;
548	}
549
550	if (xprt->xp_pool->sp_rcache)
551		replay_setreply(xprt->xp_pool->sp_rcache,
552		    rply, svc_getrpccaller(rqstp), body);
553
554	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
555		return (FALSE);
556
557	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
558	if (rqstp->rq_addr) {
559		free(rqstp->rq_addr, M_SONAME);
560		rqstp->rq_addr = NULL;
561	}
562
563	return (ok);
564}
565
566/*
567 * Send a reply to an rpc request
568 */
569bool_t
570svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
571{
572	struct rpc_msg rply;
573	struct mbuf *m;
574	XDR xdrs;
575	bool_t ok;
576
577	rply.rm_xid = rqstp->rq_xid;
578	rply.rm_direction = REPLY;
579	rply.rm_reply.rp_stat = MSG_ACCEPTED;
580	rply.acpted_rply.ar_verf = rqstp->rq_verf;
581	rply.acpted_rply.ar_stat = SUCCESS;
582	rply.acpted_rply.ar_results.where = NULL;
583	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
584
585	MGET(m, M_WAIT, MT_DATA);
586	MCLGET(m, M_WAIT);
587	m->m_len = 0;
588	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
589	ok = xdr_results(&xdrs, xdr_location);
590	XDR_DESTROY(&xdrs);
591
592	if (ok) {
593		return (svc_sendreply_common(rqstp, &rply, m));
594	} else {
595		m_freem(m);
596		return (FALSE);
597	}
598}
599
600bool_t
601svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
602{
603	struct rpc_msg rply;
604
605	rply.rm_xid = rqstp->rq_xid;
606	rply.rm_direction = REPLY;
607	rply.rm_reply.rp_stat = MSG_ACCEPTED;
608	rply.acpted_rply.ar_verf = rqstp->rq_verf;
609	rply.acpted_rply.ar_stat = SUCCESS;
610	rply.acpted_rply.ar_results.where = NULL;
611	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
612
613	return (svc_sendreply_common(rqstp, &rply, m));
614}
615
616/*
617 * No procedure error reply
618 */
619void
620svcerr_noproc(struct svc_req *rqstp)
621{
622	SVCXPRT *xprt = rqstp->rq_xprt;
623	struct rpc_msg rply;
624
625	rply.rm_xid = rqstp->rq_xid;
626	rply.rm_direction = REPLY;
627	rply.rm_reply.rp_stat = MSG_ACCEPTED;
628	rply.acpted_rply.ar_verf = rqstp->rq_verf;
629	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
630
631	if (xprt->xp_pool->sp_rcache)
632		replay_setreply(xprt->xp_pool->sp_rcache,
633		    &rply, svc_getrpccaller(rqstp), NULL);
634
635	svc_sendreply_common(rqstp, &rply, NULL);
636}
637
638/*
639 * Can't decode args error reply
640 */
641void
642svcerr_decode(struct svc_req *rqstp)
643{
644	SVCXPRT *xprt = rqstp->rq_xprt;
645	struct rpc_msg rply;
646
647	rply.rm_xid = rqstp->rq_xid;
648	rply.rm_direction = REPLY;
649	rply.rm_reply.rp_stat = MSG_ACCEPTED;
650	rply.acpted_rply.ar_verf = rqstp->rq_verf;
651	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
652
653	if (xprt->xp_pool->sp_rcache)
654		replay_setreply(xprt->xp_pool->sp_rcache,
655		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
656
657	svc_sendreply_common(rqstp, &rply, NULL);
658}
659
660/*
661 * Some system error
662 */
663void
664svcerr_systemerr(struct svc_req *rqstp)
665{
666	SVCXPRT *xprt = rqstp->rq_xprt;
667	struct rpc_msg rply;
668
669	rply.rm_xid = rqstp->rq_xid;
670	rply.rm_direction = REPLY;
671	rply.rm_reply.rp_stat = MSG_ACCEPTED;
672	rply.acpted_rply.ar_verf = rqstp->rq_verf;
673	rply.acpted_rply.ar_stat = SYSTEM_ERR;
674
675	if (xprt->xp_pool->sp_rcache)
676		replay_setreply(xprt->xp_pool->sp_rcache,
677		    &rply, svc_getrpccaller(rqstp), NULL);
678
679	svc_sendreply_common(rqstp, &rply, NULL);
680}
681
682/*
683 * Authentication error reply
684 */
685void
686svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
687{
688	SVCXPRT *xprt = rqstp->rq_xprt;
689	struct rpc_msg rply;
690
691	rply.rm_xid = rqstp->rq_xid;
692	rply.rm_direction = REPLY;
693	rply.rm_reply.rp_stat = MSG_DENIED;
694	rply.rjcted_rply.rj_stat = AUTH_ERROR;
695	rply.rjcted_rply.rj_why = why;
696
697	if (xprt->xp_pool->sp_rcache)
698		replay_setreply(xprt->xp_pool->sp_rcache,
699		    &rply, svc_getrpccaller(rqstp), NULL);
700
701	svc_sendreply_common(rqstp, &rply, NULL);
702}
703
704/*
705 * Auth too weak error reply
706 */
707void
708svcerr_weakauth(struct svc_req *rqstp)
709{
710
711	svcerr_auth(rqstp, AUTH_TOOWEAK);
712}
713
714/*
715 * Program unavailable error reply
716 */
717void
718svcerr_noprog(struct svc_req *rqstp)
719{
720	SVCXPRT *xprt = rqstp->rq_xprt;
721	struct rpc_msg rply;
722
723	rply.rm_xid = rqstp->rq_xid;
724	rply.rm_direction = REPLY;
725	rply.rm_reply.rp_stat = MSG_ACCEPTED;
726	rply.acpted_rply.ar_verf = rqstp->rq_verf;
727	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
728
729	if (xprt->xp_pool->sp_rcache)
730		replay_setreply(xprt->xp_pool->sp_rcache,
731		    &rply, svc_getrpccaller(rqstp), NULL);
732
733	svc_sendreply_common(rqstp, &rply, NULL);
734}
735
736/*
737 * Program version mismatch error reply
738 */
739void
740svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
741{
742	SVCXPRT *xprt = rqstp->rq_xprt;
743	struct rpc_msg rply;
744
745	rply.rm_xid = rqstp->rq_xid;
746	rply.rm_direction = REPLY;
747	rply.rm_reply.rp_stat = MSG_ACCEPTED;
748	rply.acpted_rply.ar_verf = rqstp->rq_verf;
749	rply.acpted_rply.ar_stat = PROG_MISMATCH;
750	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
751	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
752
753	if (xprt->xp_pool->sp_rcache)
754		replay_setreply(xprt->xp_pool->sp_rcache,
755		    &rply, svc_getrpccaller(rqstp), NULL);
756
757	svc_sendreply_common(rqstp, &rply, NULL);
758}
759
760/*
761 * Allocate a new server transport structure. All fields are
762 * initialized to zero and xp_p3 is initialized to point at an
763 * extension structure to hold various flags and authentication
764 * parameters.
765 */
766SVCXPRT *
767svc_xprt_alloc()
768{
769	SVCXPRT *xprt;
770	SVCXPRT_EXT *ext;
771
772	xprt = mem_alloc(sizeof(SVCXPRT));
773	memset(xprt, 0, sizeof(SVCXPRT));
774	ext = mem_alloc(sizeof(SVCXPRT_EXT));
775	memset(ext, 0, sizeof(SVCXPRT_EXT));
776	xprt->xp_p3 = ext;
777	refcount_init(&xprt->xp_refs, 1);
778
779	return (xprt);
780}
781
782/*
783 * Free a server transport structure.
784 */
785void
786svc_xprt_free(xprt)
787	SVCXPRT *xprt;
788{
789
790	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
791	mem_free(xprt, sizeof(SVCXPRT));
792}
793
794/* ******************* SERVER INPUT STUFF ******************* */
795
796/*
797 * Read RPC requests from a transport and queue them to be
798 * executed. We handle authentication and replay cache replies here.
799 * Actually dispatching the RPC is deferred till svc_executereq.
800 */
801static enum xprt_stat
802svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
803{
804	SVCPOOL *pool = xprt->xp_pool;
805	struct svc_req *r;
806	struct rpc_msg msg;
807	struct mbuf *args;
808	enum xprt_stat stat;
809
810	/* now receive msgs from xprtprt (support batch calls) */
811	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
812
813	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
814	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
815	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
816	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
817		enum auth_stat why;
818
819		/*
820		 * Handle replays and authenticate before queuing the
821		 * request to be executed.
822		 */
823		SVC_ACQUIRE(xprt);
824		r->rq_xprt = xprt;
825		if (pool->sp_rcache) {
826			struct rpc_msg repmsg;
827			struct mbuf *repbody;
828			enum replay_state rs;
829			rs = replay_find(pool->sp_rcache, &msg,
830			    svc_getrpccaller(r), &repmsg, &repbody);
831			switch (rs) {
832			case RS_NEW:
833				break;
834			case RS_DONE:
835				SVC_REPLY(xprt, &repmsg, r->rq_addr,
836				    repbody);
837				if (r->rq_addr) {
838					free(r->rq_addr, M_SONAME);
839					r->rq_addr = NULL;
840				}
841				m_freem(args);
842				goto call_done;
843
844			default:
845				m_freem(args);
846				goto call_done;
847			}
848		}
849
850		r->rq_xid = msg.rm_xid;
851		r->rq_prog = msg.rm_call.cb_prog;
852		r->rq_vers = msg.rm_call.cb_vers;
853		r->rq_proc = msg.rm_call.cb_proc;
854		r->rq_size = sizeof(*r) + m_length(args, NULL);
855		r->rq_args = args;
856		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
857			/*
858			 * RPCSEC_GSS uses this return code
859			 * for requests that form part of its
860			 * context establishment protocol and
861			 * should not be dispatched to the
862			 * application.
863			 */
864			if (why != RPCSEC_GSS_NODISPATCH)
865				svcerr_auth(r, why);
866			goto call_done;
867		}
868
869		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
870			svcerr_decode(r);
871			goto call_done;
872		}
873
874		/*
875		 * Everything checks out, return request to caller.
876		 */
877		*rqstp_ret = r;
878		r = NULL;
879	}
880call_done:
881	if (r) {
882		svc_freereq(r);
883		r = NULL;
884	}
885	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
886		xprt_unregister(xprt);
887	}
888
889	return (stat);
890}
891
892static void
893svc_executereq(struct svc_req *rqstp)
894{
895	SVCXPRT *xprt = rqstp->rq_xprt;
896	SVCPOOL *pool = xprt->xp_pool;
897	int prog_found;
898	rpcvers_t low_vers;
899	rpcvers_t high_vers;
900	struct svc_callout *s;
901
902	/* now match message with a registered service*/
903	prog_found = FALSE;
904	low_vers = (rpcvers_t) -1L;
905	high_vers = (rpcvers_t) 0L;
906	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
907		if (s->sc_prog == rqstp->rq_prog) {
908			if (s->sc_vers == rqstp->rq_vers) {
909				/*
910				 * We hand ownership of r to the
911				 * dispatch method - they must call
912				 * svc_freereq.
913				 */
914				(*s->sc_dispatch)(rqstp, xprt);
915				return;
916			}  /* found correct version */
917			prog_found = TRUE;
918			if (s->sc_vers < low_vers)
919				low_vers = s->sc_vers;
920			if (s->sc_vers > high_vers)
921				high_vers = s->sc_vers;
922		}   /* found correct program */
923	}
924
925	/*
926	 * if we got here, the program or version
927	 * is not served ...
928	 */
929	if (prog_found)
930		svcerr_progvers(rqstp, low_vers, high_vers);
931	else
932		svcerr_noprog(rqstp);
933
934	svc_freereq(rqstp);
935}
936
937static void
938svc_checkidle(SVCPOOL *pool)
939{
940	SVCXPRT *xprt, *nxprt;
941	time_t timo;
942	struct svcxprt_list cleanup;
943
944	TAILQ_INIT(&cleanup);
945	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
946		/*
947		 * Only some transports have idle timers. Don't time
948		 * something out which is just waking up.
949		 */
950		if (!xprt->xp_idletimeout || xprt->xp_thread)
951			continue;
952
953		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
954		if (time_uptime > timo) {
955			xprt_unregister_locked(xprt);
956			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
957		}
958	}
959
960	mtx_unlock(&pool->sp_lock);
961	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
962		SVC_RELEASE(xprt);
963	}
964	mtx_lock(&pool->sp_lock);
965
966}
967
968static void
969svc_assign_waiting_sockets(SVCPOOL *pool)
970{
971	SVCXPRT *xprt;
972
973	mtx_lock(&pool->sp_lock);
974	while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
975		if (xprt_assignthread(xprt))
976			TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
977		else
978			break;
979	}
980	mtx_unlock(&pool->sp_lock);
981}
982
983static void
984svc_change_space_used(SVCPOOL *pool, int delta)
985{
986	unsigned int value;
987
988	value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
989	if (delta > 0) {
990		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
991			pool->sp_space_throttled = TRUE;
992			pool->sp_space_throttle_count++;
993		}
994		if (value > pool->sp_space_used_highest)
995			pool->sp_space_used_highest = value;
996	} else {
997		if (value < pool->sp_space_low && pool->sp_space_throttled) {
998			pool->sp_space_throttled = FALSE;
999			svc_assign_waiting_sockets(pool);
1000		}
1001	}
1002}
1003
1004static bool_t
1005svc_request_space_available(SVCPOOL *pool)
1006{
1007
1008	if (pool->sp_space_throttled)
1009		return (FALSE);
1010	return (TRUE);
1011}
1012
1013static void
1014svc_run_internal(SVCPOOL *pool, bool_t ismaster)
1015{
1016	struct svc_reqlist reqs;
1017	SVCTHREAD *st, *stpref;
1018	SVCXPRT *xprt;
1019	enum xprt_stat stat;
1020	struct svc_req *rqstp;
1021	size_t sz;
1022	int error;
1023
1024	st = mem_alloc(sizeof(*st));
1025	st->st_pool = pool;
1026	st->st_xprt = NULL;
1027	STAILQ_INIT(&st->st_reqs);
1028	cv_init(&st->st_cond, "rpcsvc");
1029	STAILQ_INIT(&reqs);
1030
1031	mtx_lock(&pool->sp_lock);
1032	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1033
1034	/*
1035	 * If we are a new thread which was spawned to cope with
1036	 * increased load, set the state back to SVCPOOL_ACTIVE.
1037	 */
1038	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1039		pool->sp_state = SVCPOOL_ACTIVE;
1040
1041	while (pool->sp_state != SVCPOOL_CLOSING) {
1042		/*
1043		 * Create new thread if requested.
1044		 */
1045		if (pool->sp_state == SVCPOOL_THREADWANTED) {
1046			pool->sp_state = SVCPOOL_THREADSTARTING;
1047			pool->sp_lastcreatetime = time_uptime;
1048			mtx_unlock(&pool->sp_lock);
1049			svc_new_thread(pool);
1050			mtx_lock(&pool->sp_lock);
1051			continue;
1052		}
1053
1054		/*
1055		 * Check for idle transports once per second.
1056		 */
1057		if (time_uptime > pool->sp_lastidlecheck) {
1058			pool->sp_lastidlecheck = time_uptime;
1059			svc_checkidle(pool);
1060		}
1061
1062		xprt = st->st_xprt;
1063		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1064			/*
1065			 * Enforce maxthreads count.
1066			 */
1067			if (pool->sp_threadcount > pool->sp_maxthreads)
1068				break;
1069
1070			/*
1071			 * Before sleeping, see if we can find an
1072			 * active transport which isn't being serviced
1073			 * by a thread.
1074			 */
1075			if (svc_request_space_available(pool) &&
1076			    (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
1077				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1078				SVC_ACQUIRE(xprt);
1079				xprt->xp_thread = st;
1080				st->st_xprt = xprt;
1081				continue;
1082			}
1083
1084			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1085			st->st_idle = TRUE;
1086			if (ismaster || (!ismaster &&
1087			    pool->sp_threadcount > pool->sp_minthreads))
1088				error = cv_timedwait_sig(&st->st_cond,
1089				    &pool->sp_lock, 5 * hz);
1090			else
1091				error = cv_wait_sig(&st->st_cond,
1092				    &pool->sp_lock);
1093			if (st->st_idle) {
1094				LIST_REMOVE(st, st_ilink);
1095				st->st_idle = FALSE;
1096			}
1097
1098			/*
1099			 * Reduce worker thread count when idle.
1100			 */
1101			if (error == EWOULDBLOCK) {
1102				if (!ismaster
1103				    && (pool->sp_threadcount
1104					> pool->sp_minthreads)
1105					&& !st->st_xprt
1106					&& STAILQ_EMPTY(&st->st_reqs))
1107					break;
1108			} else if (error) {
1109				mtx_unlock(&pool->sp_lock);
1110				svc_exit(pool);
1111				mtx_lock(&pool->sp_lock);
1112				break;
1113			}
1114			continue;
1115		}
1116
1117		if (xprt) {
1118			/*
1119			 * Drain the transport socket and queue up any
1120			 * RPCs.
1121			 */
1122			xprt->xp_lastactive = time_uptime;
1123			do {
1124				mtx_unlock(&pool->sp_lock);
1125				if (!svc_request_space_available(pool))
1126					break;
1127				rqstp = NULL;
1128				stat = svc_getreq(xprt, &rqstp);
1129				if (rqstp) {
1130					svc_change_space_used(pool, rqstp->rq_size);
1131					/*
1132					 * See if the application has
1133					 * a preference for some other
1134					 * thread.
1135					 */
1136					stpref = st;
1137					if (pool->sp_assign)
1138						stpref = pool->sp_assign(st,
1139						    rqstp);
1140					else
1141						mtx_lock(&pool->sp_lock);
1142
1143					rqstp->rq_thread = stpref;
1144					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1145					    rqstp, rq_link);
1146
1147					/*
1148					 * If we assigned the request
1149					 * to another thread, make
1150					 * sure its awake and continue
1151					 * reading from the
1152					 * socket. Otherwise, try to
1153					 * find some other thread to
1154					 * read from the socket and
1155					 * execute the request
1156					 * immediately.
1157					 */
1158					if (stpref == st)
1159						break;
1160					if (stpref->st_idle) {
1161						LIST_REMOVE(stpref, st_ilink);
1162						stpref->st_idle = FALSE;
1163						cv_signal(&stpref->st_cond);
1164					}
1165				} else
1166					mtx_lock(&pool->sp_lock);
1167			} while (stat == XPRT_MOREREQS
1168			    && pool->sp_state != SVCPOOL_CLOSING);
1169
1170			/*
1171			 * Move this transport to the end of the
1172			 * active list to ensure fairness when
1173			 * multiple transports are active. If this was
1174			 * the last queued request, svc_getreq will
1175			 * end up calling xprt_inactive to remove from
1176			 * the active list.
1177			 */
1178			xprt->xp_thread = NULL;
1179			st->st_xprt = NULL;
1180			if (xprt->xp_active) {
1181				if (!svc_request_space_available(pool) ||
1182				    !xprt_assignthread(xprt))
1183					TAILQ_INSERT_TAIL(&pool->sp_active,
1184					    xprt, xp_alink);
1185			}
1186			STAILQ_CONCAT(&reqs, &st->st_reqs);
1187			mtx_unlock(&pool->sp_lock);
1188			SVC_RELEASE(xprt);
1189		} else {
1190			STAILQ_CONCAT(&reqs, &st->st_reqs);
1191			mtx_unlock(&pool->sp_lock);
1192		}
1193
1194		/*
1195		 * Execute what we have queued.
1196		 */
1197		sz = 0;
1198		while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
1199			STAILQ_REMOVE_HEAD(&reqs, rq_link);
1200			sz += rqstp->rq_size;
1201			svc_executereq(rqstp);
1202		}
1203		svc_change_space_used(pool, -sz);
1204		mtx_lock(&pool->sp_lock);
1205	}
1206
1207	if (st->st_xprt) {
1208		xprt = st->st_xprt;
1209		st->st_xprt = NULL;
1210		SVC_RELEASE(xprt);
1211	}
1212
1213	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1214	LIST_REMOVE(st, st_link);
1215	pool->sp_threadcount--;
1216
1217	mtx_unlock(&pool->sp_lock);
1218
1219	cv_destroy(&st->st_cond);
1220	mem_free(st, sizeof(*st));
1221
1222	if (!ismaster)
1223		wakeup(pool);
1224}
1225
1226static void
1227svc_thread_start(void *arg)
1228{
1229
1230	svc_run_internal((SVCPOOL *) arg, FALSE);
1231	kthread_exit();
1232}
1233
1234static void
1235svc_new_thread(SVCPOOL *pool)
1236{
1237	struct thread *td;
1238
1239	pool->sp_threadcount++;
1240	kthread_add(svc_thread_start, pool,
1241	    pool->sp_proc, &td, 0, 0,
1242	    "%s: service", pool->sp_name);
1243}
1244
1245void
1246svc_run(SVCPOOL *pool)
1247{
1248	int i;
1249	struct proc *p;
1250	struct thread *td;
1251
1252	p = curproc;
1253	td = curthread;
1254	snprintf(td->td_name, sizeof(td->td_name),
1255	    "%s: master", pool->sp_name);
1256	pool->sp_state = SVCPOOL_ACTIVE;
1257	pool->sp_proc = p;
1258	pool->sp_lastcreatetime = time_uptime;
1259	pool->sp_threadcount = 1;
1260
1261	for (i = 1; i < pool->sp_minthreads; i++) {
1262		svc_new_thread(pool);
1263	}
1264
1265	svc_run_internal(pool, TRUE);
1266
1267	mtx_lock(&pool->sp_lock);
1268	while (pool->sp_threadcount > 0)
1269		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1270	mtx_unlock(&pool->sp_lock);
1271}
1272
1273void
1274svc_exit(SVCPOOL *pool)
1275{
1276	SVCTHREAD *st;
1277
1278	mtx_lock(&pool->sp_lock);
1279
1280	if (pool->sp_state != SVCPOOL_CLOSING) {
1281		pool->sp_state = SVCPOOL_CLOSING;
1282		LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1283			cv_signal(&st->st_cond);
1284	}
1285
1286	mtx_unlock(&pool->sp_lock);
1287}
1288
1289bool_t
1290svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1291{
1292	struct mbuf *m;
1293	XDR xdrs;
1294	bool_t stat;
1295
1296	m = rqstp->rq_args;
1297	rqstp->rq_args = NULL;
1298
1299	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1300	stat = xargs(&xdrs, args);
1301	XDR_DESTROY(&xdrs);
1302
1303	return (stat);
1304}
1305
1306bool_t
1307svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1308{
1309	XDR xdrs;
1310
1311	if (rqstp->rq_addr) {
1312		free(rqstp->rq_addr, M_SONAME);
1313		rqstp->rq_addr = NULL;
1314	}
1315
1316	xdrs.x_op = XDR_FREE;
1317	return (xargs(&xdrs, args));
1318}
1319
1320void
1321svc_freereq(struct svc_req *rqstp)
1322{
1323	SVCTHREAD *st;
1324	SVCPOOL *pool;
1325
1326	st = rqstp->rq_thread;
1327	if (st) {
1328		pool = st->st_pool;
1329		if (pool->sp_done)
1330			pool->sp_done(st, rqstp);
1331	}
1332
1333	if (rqstp->rq_auth.svc_ah_ops)
1334		SVCAUTH_RELEASE(&rqstp->rq_auth);
1335
1336	if (rqstp->rq_xprt) {
1337		SVC_RELEASE(rqstp->rq_xprt);
1338	}
1339
1340	if (rqstp->rq_addr)
1341		free(rqstp->rq_addr, M_SONAME);
1342
1343	if (rqstp->rq_args)
1344		m_freem(rqstp->rq_args);
1345
1346	free(rqstp, M_RPC);
1347}
1348