svc.c revision 184588
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part.  Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California  94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
35#endif
36#include <sys/cdefs.h>
37__FBSDID("$FreeBSD: head/sys/rpc/svc.c 184588 2008-11-03 10:38:00Z dfr $");
38
39/*
40 * svc.c, Server-side remote procedure call interface.
41 *
42 * There are two sets of procedures here.  The xprt routines are
43 * for handling transport handles.  The svc routines handle the
44 * list of service routines.
45 *
46 * Copyright (C) 1984, Sun Microsystems, Inc.
47 */
48
49#include <sys/param.h>
50#include <sys/lock.h>
51#include <sys/kernel.h>
52#include <sys/kthread.h>
53#include <sys/malloc.h>
54#include <sys/mbuf.h>
55#include <sys/mutex.h>
56#include <sys/proc.h>
57#include <sys/queue.h>
58#include <sys/socketvar.h>
59#include <sys/systm.h>
60#include <sys/ucred.h>
61
62#include <rpc/rpc.h>
63#include <rpc/rpcb_clnt.h>
64#include <rpc/replay.h>
65
66#include <rpc/rpc_com.h>
67
68#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
69#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70
71static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72    char *);
73static void svc_new_thread(SVCPOOL *pool);
74static void xprt_unregister_locked(SVCXPRT *xprt);
75
76/* ***************  SVCXPRT related stuff **************** */
77
78static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80
81SVCPOOL*
82svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83{
84	SVCPOOL *pool;
85
86	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87
88	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89	pool->sp_name = name;
90	pool->sp_state = SVCPOOL_INIT;
91	pool->sp_proc = NULL;
92	TAILQ_INIT(&pool->sp_xlist);
93	TAILQ_INIT(&pool->sp_active);
94	TAILQ_INIT(&pool->sp_callouts);
95	LIST_INIT(&pool->sp_threads);
96	LIST_INIT(&pool->sp_idlethreads);
97	pool->sp_minthreads = 1;
98	pool->sp_maxthreads = 1;
99	pool->sp_threadcount = 0;
100
101	/*
102	 * Don't use more than a quarter of mbuf clusters or more than
103	 * 45Mb buffering requests.
104	 */
105	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106	if (pool->sp_space_high > 45 << 20)
107		pool->sp_space_high = 45 << 20;
108	pool->sp_space_low = 2 * pool->sp_space_high / 3;
109
110	sysctl_ctx_init(&pool->sp_sysctl);
111	if (sysctl_base) {
112		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114		    pool, 0, svcpool_minthread_sysctl, "I", "");
115		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117		    pool, 0, svcpool_maxthread_sysctl, "I", "");
118		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120
121		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122		    "request_space_used", CTLFLAG_RD,
123		    &pool->sp_space_used, 0,
124		    "Space in parsed but not handled requests.");
125
126		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127		    "request_space_used_highest", CTLFLAG_RD,
128		    &pool->sp_space_used_highest, 0,
129		    "Highest space used since reboot.");
130
131		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132		    "request_space_high", CTLFLAG_RW,
133		    &pool->sp_space_high, 0,
134		    "Maximum space in parsed but not handled requests.");
135
136		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137		    "request_space_low", CTLFLAG_RW,
138		    &pool->sp_space_low, 0,
139		    "Low water mark for request space.");
140
141		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142		    "request_space_throttled", CTLFLAG_RD,
143		    &pool->sp_space_throttled, 0,
144		    "Whether nfs requests are currently throttled");
145
146		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147		    "request_space_throttle_count", CTLFLAG_RD,
148		    &pool->sp_space_throttle_count, 0,
149		    "Count of times throttling based on request space has occurred");
150	}
151
152	return pool;
153}
154
155void
156svcpool_destroy(SVCPOOL *pool)
157{
158	SVCXPRT *xprt, *nxprt;
159	struct svc_callout *s;
160	struct svcxprt_list cleanup;
161
162	TAILQ_INIT(&cleanup);
163	mtx_lock(&pool->sp_lock);
164
165	while (TAILQ_FIRST(&pool->sp_xlist)) {
166		xprt = TAILQ_FIRST(&pool->sp_xlist);
167		xprt_unregister_locked(xprt);
168		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169	}
170
171	while (TAILQ_FIRST(&pool->sp_callouts)) {
172		s = TAILQ_FIRST(&pool->sp_callouts);
173		mtx_unlock(&pool->sp_lock);
174		svc_unreg(pool, s->sc_prog, s->sc_vers);
175		mtx_lock(&pool->sp_lock);
176	}
177
178	mtx_destroy(&pool->sp_lock);
179
180	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181		SVC_RELEASE(xprt);
182	}
183
184	if (pool->sp_rcache)
185		replay_freecache(pool->sp_rcache);
186
187	sysctl_ctx_free(&pool->sp_sysctl);
188	free(pool, M_RPC);
189}
190
191static bool_t
192svcpool_active(SVCPOOL *pool)
193{
194	enum svcpool_state state = pool->sp_state;
195
196	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197		return (FALSE);
198	return (TRUE);
199}
200
201/*
202 * Sysctl handler to set the minimum thread count on a pool
203 */
204static int
205svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206{
207	SVCPOOL *pool;
208	int newminthreads, error, n;
209
210	pool = oidp->oid_arg1;
211	newminthreads = pool->sp_minthreads;
212	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213	if (error == 0 && newminthreads != pool->sp_minthreads) {
214		if (newminthreads > pool->sp_maxthreads)
215			return (EINVAL);
216		mtx_lock(&pool->sp_lock);
217		if (newminthreads > pool->sp_minthreads
218		    && svcpool_active(pool)) {
219			/*
220			 * If the pool is running and we are
221			 * increasing, create some more threads now.
222			 */
223			n = newminthreads - pool->sp_threadcount;
224			if (n > 0) {
225				mtx_unlock(&pool->sp_lock);
226				while (n--)
227					svc_new_thread(pool);
228				mtx_lock(&pool->sp_lock);
229			}
230		}
231		pool->sp_minthreads = newminthreads;
232		mtx_unlock(&pool->sp_lock);
233	}
234	return (error);
235}
236
237/*
238 * Sysctl handler to set the maximum thread count on a pool
239 */
240static int
241svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242{
243	SVCPOOL *pool;
244	SVCTHREAD *st;
245	int newmaxthreads, error;
246
247	pool = oidp->oid_arg1;
248	newmaxthreads = pool->sp_maxthreads;
249	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251		if (newmaxthreads < pool->sp_minthreads)
252			return (EINVAL);
253		mtx_lock(&pool->sp_lock);
254		if (newmaxthreads < pool->sp_maxthreads
255		    && svcpool_active(pool)) {
256			/*
257			 * If the pool is running and we are
258			 * decreasing, wake up some idle threads to
259			 * encourage them to exit.
260			 */
261			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262				cv_signal(&st->st_cond);
263		}
264		pool->sp_maxthreads = newmaxthreads;
265		mtx_unlock(&pool->sp_lock);
266	}
267	return (error);
268}
269
270/*
271 * Activate a transport handle.
272 */
273void
274xprt_register(SVCXPRT *xprt)
275{
276	SVCPOOL *pool = xprt->xp_pool;
277
278	mtx_lock(&pool->sp_lock);
279	xprt->xp_registered = TRUE;
280	xprt->xp_active = FALSE;
281	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282	mtx_unlock(&pool->sp_lock);
283}
284
285/*
286 * De-activate a transport handle. Note: the locked version doesn't
287 * release the transport - caller must do that after dropping the pool
288 * lock.
289 */
290static void
291xprt_unregister_locked(SVCXPRT *xprt)
292{
293	SVCPOOL *pool = xprt->xp_pool;
294
295	if (xprt->xp_active) {
296		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297		xprt->xp_active = FALSE;
298	}
299	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300	xprt->xp_registered = FALSE;
301}
302
303void
304xprt_unregister(SVCXPRT *xprt)
305{
306	SVCPOOL *pool = xprt->xp_pool;
307
308	mtx_lock(&pool->sp_lock);
309	xprt_unregister_locked(xprt);
310	mtx_unlock(&pool->sp_lock);
311
312	SVC_RELEASE(xprt);
313}
314
315static void
316xprt_assignthread(SVCXPRT *xprt)
317{
318	SVCPOOL *pool = xprt->xp_pool;
319	SVCTHREAD *st;
320
321	/*
322	 * Attempt to assign a service thread to this
323	 * transport.
324	 */
325	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326		if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327			break;
328	}
329	if (st) {
330		SVC_ACQUIRE(xprt);
331		xprt->xp_thread = st;
332		st->st_xprt = xprt;
333		cv_signal(&st->st_cond);
334	} else {
335		/*
336		 * See if we can create a new thread. The
337		 * actual thread creation happens in
338		 * svc_run_internal because our locking state
339		 * is poorly defined (we are typically called
340		 * from a socket upcall). Don't create more
341		 * than one thread per second.
342		 */
343		if (pool->sp_state == SVCPOOL_ACTIVE
344		    && pool->sp_lastcreatetime < time_uptime
345		    && pool->sp_threadcount < pool->sp_maxthreads) {
346			pool->sp_state = SVCPOOL_THREADWANTED;
347		}
348	}
349}
350
351void
352xprt_active(SVCXPRT *xprt)
353{
354	SVCPOOL *pool = xprt->xp_pool;
355
356	if (!xprt->xp_registered) {
357		/*
358		 * Race with xprt_unregister - we lose.
359		 */
360		return;
361	}
362
363	mtx_lock(&pool->sp_lock);
364
365	if (!xprt->xp_active) {
366		TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367		xprt->xp_active = TRUE;
368		xprt_assignthread(xprt);
369	}
370
371	mtx_unlock(&pool->sp_lock);
372}
373
374void
375xprt_inactive_locked(SVCXPRT *xprt)
376{
377	SVCPOOL *pool = xprt->xp_pool;
378
379	if (xprt->xp_active) {
380		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381		xprt->xp_active = FALSE;
382	}
383}
384
385void
386xprt_inactive(SVCXPRT *xprt)
387{
388	SVCPOOL *pool = xprt->xp_pool;
389
390	mtx_lock(&pool->sp_lock);
391	xprt_inactive_locked(xprt);
392	mtx_unlock(&pool->sp_lock);
393}
394
395/*
396 * Add a service program to the callout list.
397 * The dispatch routine will be called when a rpc request for this
398 * program number comes in.
399 */
400bool_t
401svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402    void (*dispatch)(struct svc_req *, SVCXPRT *),
403    const struct netconfig *nconf)
404{
405	SVCPOOL *pool = xprt->xp_pool;
406	struct svc_callout *s;
407	char *netid = NULL;
408	int flag = 0;
409
410/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
411
412	if (xprt->xp_netid) {
413		netid = strdup(xprt->xp_netid, M_RPC);
414		flag = 1;
415	} else if (nconf && nconf->nc_netid) {
416		netid = strdup(nconf->nc_netid, M_RPC);
417		flag = 1;
418	} /* must have been created with svc_raw_create */
419	if ((netid == NULL) && (flag == 1)) {
420		return (FALSE);
421	}
422
423	mtx_lock(&pool->sp_lock);
424	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
425		if (netid)
426			free(netid, M_RPC);
427		if (s->sc_dispatch == dispatch)
428			goto rpcb_it; /* he is registering another xptr */
429		mtx_unlock(&pool->sp_lock);
430		return (FALSE);
431	}
432	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
433	if (s == NULL) {
434		if (netid)
435			free(netid, M_RPC);
436		mtx_unlock(&pool->sp_lock);
437		return (FALSE);
438	}
439
440	s->sc_prog = prog;
441	s->sc_vers = vers;
442	s->sc_dispatch = dispatch;
443	s->sc_netid = netid;
444	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
445
446	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
448
449rpcb_it:
450	mtx_unlock(&pool->sp_lock);
451	/* now register the information with the local binder service */
452	if (nconf) {
453		bool_t dummy;
454		struct netconfig tnc;
455		struct netbuf nb;
456		tnc = *nconf;
457		nb.buf = &xprt->xp_ltaddr;
458		nb.len = xprt->xp_ltaddr.ss_len;
459		dummy = rpcb_set(prog, vers, &tnc, &nb);
460		return (dummy);
461	}
462	return (TRUE);
463}
464
465/*
466 * Remove a service program from the callout list.
467 */
468void
469svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
470{
471	struct svc_callout *s;
472
473	/* unregister the information anyway */
474	(void) rpcb_unset(prog, vers, NULL);
475	mtx_lock(&pool->sp_lock);
476	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
478		if (s->sc_netid)
479			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480		mem_free(s, sizeof (struct svc_callout));
481	}
482	mtx_unlock(&pool->sp_lock);
483}
484
485/* ********************** CALLOUT list related stuff ************* */
486
487/*
488 * Search the callout list for a program number, return the callout
489 * struct.
490 */
491static struct svc_callout *
492svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
493{
494	struct svc_callout *s;
495
496	mtx_assert(&pool->sp_lock, MA_OWNED);
497	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498		if (s->sc_prog == prog && s->sc_vers == vers
499		    && (netid == NULL || s->sc_netid == NULL ||
500			strcmp(netid, s->sc_netid) == 0))
501			break;
502	}
503
504	return (s);
505}
506
507/* ******************* REPLY GENERATION ROUTINES  ************ */
508
509static bool_t
510svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
511    struct mbuf *body)
512{
513	SVCXPRT *xprt = rqstp->rq_xprt;
514	bool_t ok;
515
516	if (rqstp->rq_args) {
517		m_freem(rqstp->rq_args);
518		rqstp->rq_args = NULL;
519	}
520
521	if (xprt->xp_pool->sp_rcache)
522		replay_setreply(xprt->xp_pool->sp_rcache,
523		    rply, svc_getrpccaller(rqstp), body);
524
525	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
526		return (FALSE);
527
528	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
529	if (rqstp->rq_addr) {
530		free(rqstp->rq_addr, M_SONAME);
531		rqstp->rq_addr = NULL;
532	}
533
534	return (ok);
535}
536
537/*
538 * Send a reply to an rpc request
539 */
540bool_t
541svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
542{
543	struct rpc_msg rply;
544	struct mbuf *m;
545	XDR xdrs;
546	bool_t ok;
547
548	rply.rm_xid = rqstp->rq_xid;
549	rply.rm_direction = REPLY;
550	rply.rm_reply.rp_stat = MSG_ACCEPTED;
551	rply.acpted_rply.ar_verf = rqstp->rq_verf;
552	rply.acpted_rply.ar_stat = SUCCESS;
553	rply.acpted_rply.ar_results.where = NULL;
554	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
555
556	MGET(m, M_WAIT, MT_DATA);
557	MCLGET(m, M_WAIT);
558	m->m_len = 0;
559	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560	ok = xdr_results(&xdrs, xdr_location);
561	XDR_DESTROY(&xdrs);
562
563	if (ok) {
564		return (svc_sendreply_common(rqstp, &rply, m));
565	} else {
566		m_freem(m);
567		return (FALSE);
568	}
569}
570
571bool_t
572svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
573{
574	struct rpc_msg rply;
575
576	rply.rm_xid = rqstp->rq_xid;
577	rply.rm_direction = REPLY;
578	rply.rm_reply.rp_stat = MSG_ACCEPTED;
579	rply.acpted_rply.ar_verf = rqstp->rq_verf;
580	rply.acpted_rply.ar_stat = SUCCESS;
581	rply.acpted_rply.ar_results.where = NULL;
582	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
583
584	return (svc_sendreply_common(rqstp, &rply, m));
585}
586
587/*
588 * No procedure error reply
589 */
590void
591svcerr_noproc(struct svc_req *rqstp)
592{
593	SVCXPRT *xprt = rqstp->rq_xprt;
594	struct rpc_msg rply;
595
596	rply.rm_xid = rqstp->rq_xid;
597	rply.rm_direction = REPLY;
598	rply.rm_reply.rp_stat = MSG_ACCEPTED;
599	rply.acpted_rply.ar_verf = rqstp->rq_verf;
600	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
601
602	if (xprt->xp_pool->sp_rcache)
603		replay_setreply(xprt->xp_pool->sp_rcache,
604		    &rply, svc_getrpccaller(rqstp), NULL);
605
606	svc_sendreply_common(rqstp, &rply, NULL);
607}
608
609/*
610 * Can't decode args error reply
611 */
612void
613svcerr_decode(struct svc_req *rqstp)
614{
615	SVCXPRT *xprt = rqstp->rq_xprt;
616	struct rpc_msg rply;
617
618	rply.rm_xid = rqstp->rq_xid;
619	rply.rm_direction = REPLY;
620	rply.rm_reply.rp_stat = MSG_ACCEPTED;
621	rply.acpted_rply.ar_verf = rqstp->rq_verf;
622	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
623
624	if (xprt->xp_pool->sp_rcache)
625		replay_setreply(xprt->xp_pool->sp_rcache,
626		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
627
628	svc_sendreply_common(rqstp, &rply, NULL);
629}
630
631/*
632 * Some system error
633 */
634void
635svcerr_systemerr(struct svc_req *rqstp)
636{
637	SVCXPRT *xprt = rqstp->rq_xprt;
638	struct rpc_msg rply;
639
640	rply.rm_xid = rqstp->rq_xid;
641	rply.rm_direction = REPLY;
642	rply.rm_reply.rp_stat = MSG_ACCEPTED;
643	rply.acpted_rply.ar_verf = rqstp->rq_verf;
644	rply.acpted_rply.ar_stat = SYSTEM_ERR;
645
646	if (xprt->xp_pool->sp_rcache)
647		replay_setreply(xprt->xp_pool->sp_rcache,
648		    &rply, svc_getrpccaller(rqstp), NULL);
649
650	svc_sendreply_common(rqstp, &rply, NULL);
651}
652
653/*
654 * Authentication error reply
655 */
656void
657svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
658{
659	SVCXPRT *xprt = rqstp->rq_xprt;
660	struct rpc_msg rply;
661
662	rply.rm_xid = rqstp->rq_xid;
663	rply.rm_direction = REPLY;
664	rply.rm_reply.rp_stat = MSG_DENIED;
665	rply.rjcted_rply.rj_stat = AUTH_ERROR;
666	rply.rjcted_rply.rj_why = why;
667
668	if (xprt->xp_pool->sp_rcache)
669		replay_setreply(xprt->xp_pool->sp_rcache,
670		    &rply, svc_getrpccaller(rqstp), NULL);
671
672	svc_sendreply_common(rqstp, &rply, NULL);
673}
674
675/*
676 * Auth too weak error reply
677 */
678void
679svcerr_weakauth(struct svc_req *rqstp)
680{
681
682	svcerr_auth(rqstp, AUTH_TOOWEAK);
683}
684
685/*
686 * Program unavailable error reply
687 */
688void
689svcerr_noprog(struct svc_req *rqstp)
690{
691	SVCXPRT *xprt = rqstp->rq_xprt;
692	struct rpc_msg rply;
693
694	rply.rm_xid = rqstp->rq_xid;
695	rply.rm_direction = REPLY;
696	rply.rm_reply.rp_stat = MSG_ACCEPTED;
697	rply.acpted_rply.ar_verf = rqstp->rq_verf;
698	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
699
700	if (xprt->xp_pool->sp_rcache)
701		replay_setreply(xprt->xp_pool->sp_rcache,
702		    &rply, svc_getrpccaller(rqstp), NULL);
703
704	svc_sendreply_common(rqstp, &rply, NULL);
705}
706
707/*
708 * Program version mismatch error reply
709 */
710void
711svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
712{
713	SVCXPRT *xprt = rqstp->rq_xprt;
714	struct rpc_msg rply;
715
716	rply.rm_xid = rqstp->rq_xid;
717	rply.rm_direction = REPLY;
718	rply.rm_reply.rp_stat = MSG_ACCEPTED;
719	rply.acpted_rply.ar_verf = rqstp->rq_verf;
720	rply.acpted_rply.ar_stat = PROG_MISMATCH;
721	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
723
724	if (xprt->xp_pool->sp_rcache)
725		replay_setreply(xprt->xp_pool->sp_rcache,
726		    &rply, svc_getrpccaller(rqstp), NULL);
727
728	svc_sendreply_common(rqstp, &rply, NULL);
729}
730
731/*
732 * Allocate a new server transport structure. All fields are
733 * initialized to zero and xp_p3 is initialized to point at an
734 * extension structure to hold various flags and authentication
735 * parameters.
736 */
737SVCXPRT *
738svc_xprt_alloc()
739{
740	SVCXPRT *xprt;
741	SVCXPRT_EXT *ext;
742
743	xprt = mem_alloc(sizeof(SVCXPRT));
744	memset(xprt, 0, sizeof(SVCXPRT));
745	ext = mem_alloc(sizeof(SVCXPRT_EXT));
746	memset(ext, 0, sizeof(SVCXPRT_EXT));
747	xprt->xp_p3 = ext;
748	refcount_init(&xprt->xp_refs, 1);
749
750	return (xprt);
751}
752
753/*
754 * Free a server transport structure.
755 */
756void
757svc_xprt_free(xprt)
758	SVCXPRT *xprt;
759{
760
761	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762	mem_free(xprt, sizeof(SVCXPRT));
763}
764
765/* ******************* SERVER INPUT STUFF ******************* */
766
767/*
768 * Read RPC requests from a transport and queue them to be
769 * executed. We handle authentication and replay cache replies here.
770 * Actually dispatching the RPC is deferred till svc_executereq.
771 */
772static enum xprt_stat
773svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
774{
775	SVCPOOL *pool = xprt->xp_pool;
776	struct svc_req *r;
777	struct rpc_msg msg;
778	struct mbuf *args;
779	enum xprt_stat stat;
780
781	/* now receive msgs from xprtprt (support batch calls) */
782	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
783
784	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
788		enum auth_stat why;
789
790		/*
791		 * Handle replays and authenticate before queuing the
792		 * request to be executed.
793		 */
794		SVC_ACQUIRE(xprt);
795		r->rq_xprt = xprt;
796		if (pool->sp_rcache) {
797			struct rpc_msg repmsg;
798			struct mbuf *repbody;
799			enum replay_state rs;
800			rs = replay_find(pool->sp_rcache, &msg,
801			    svc_getrpccaller(r), &repmsg, &repbody);
802			switch (rs) {
803			case RS_NEW:
804				break;
805			case RS_DONE:
806				SVC_REPLY(xprt, &repmsg, r->rq_addr,
807				    repbody);
808				if (r->rq_addr) {
809					free(r->rq_addr, M_SONAME);
810					r->rq_addr = NULL;
811				}
812				goto call_done;
813
814			default:
815				goto call_done;
816			}
817		}
818
819		r->rq_xid = msg.rm_xid;
820		r->rq_prog = msg.rm_call.cb_prog;
821		r->rq_vers = msg.rm_call.cb_vers;
822		r->rq_proc = msg.rm_call.cb_proc;
823		r->rq_size = sizeof(*r) + m_length(args, NULL);
824		r->rq_args = args;
825		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
826			/*
827			 * RPCSEC_GSS uses this return code
828			 * for requests that form part of its
829			 * context establishment protocol and
830			 * should not be dispatched to the
831			 * application.
832			 */
833			if (why != RPCSEC_GSS_NODISPATCH)
834				svcerr_auth(r, why);
835			goto call_done;
836		}
837
838		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
839			svcerr_decode(r);
840			goto call_done;
841		}
842
843		/*
844		 * Everything checks out, return request to caller.
845		 */
846		*rqstp_ret = r;
847		r = NULL;
848	}
849call_done:
850	if (r) {
851		svc_freereq(r);
852		r = NULL;
853	}
854	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855		xprt_unregister(xprt);
856	}
857
858	return (stat);
859}
860
861static void
862svc_executereq(struct svc_req *rqstp)
863{
864	SVCXPRT *xprt = rqstp->rq_xprt;
865	SVCPOOL *pool = xprt->xp_pool;
866	int prog_found;
867	rpcvers_t low_vers;
868	rpcvers_t high_vers;
869	struct svc_callout *s;
870
871	/* now match message with a registered service*/
872	prog_found = FALSE;
873	low_vers = (rpcvers_t) -1L;
874	high_vers = (rpcvers_t) 0L;
875	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876		if (s->sc_prog == rqstp->rq_prog) {
877			if (s->sc_vers == rqstp->rq_vers) {
878				/*
879				 * We hand ownership of r to the
880				 * dispatch method - they must call
881				 * svc_freereq.
882				 */
883				(*s->sc_dispatch)(rqstp, xprt);
884				return;
885			}  /* found correct version */
886			prog_found = TRUE;
887			if (s->sc_vers < low_vers)
888				low_vers = s->sc_vers;
889			if (s->sc_vers > high_vers)
890				high_vers = s->sc_vers;
891		}   /* found correct program */
892	}
893
894	/*
895	 * if we got here, the program or version
896	 * is not served ...
897	 */
898	if (prog_found)
899		svcerr_progvers(rqstp, low_vers, high_vers);
900	else
901		svcerr_noprog(rqstp);
902
903	svc_freereq(rqstp);
904}
905
906static void
907svc_checkidle(SVCPOOL *pool)
908{
909	SVCXPRT *xprt, *nxprt;
910	time_t timo;
911	struct svcxprt_list cleanup;
912
913	TAILQ_INIT(&cleanup);
914	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
915		/*
916		 * Only some transports have idle timers. Don't time
917		 * something out which is just waking up.
918		 */
919		if (!xprt->xp_idletimeout || xprt->xp_thread)
920			continue;
921
922		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923		if (time_uptime > timo) {
924			xprt_unregister_locked(xprt);
925			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
926		}
927	}
928
929	mtx_unlock(&pool->sp_lock);
930	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
931		SVC_RELEASE(xprt);
932	}
933	mtx_lock(&pool->sp_lock);
934
935}
936
937static void
938svc_assign_waiting_sockets(SVCPOOL *pool)
939{
940	SVCXPRT *xprt;
941
942	TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943		if (!xprt->xp_thread) {
944			xprt_assignthread(xprt);
945		}
946	}
947}
948
949static bool_t
950svc_request_space_available(SVCPOOL *pool)
951{
952
953	mtx_assert(&pool->sp_lock, MA_OWNED);
954
955	if (pool->sp_space_throttled) {
956		/*
957		 * Below the low-water yet? If so, assign any waiting sockets.
958		 */
959		if (pool->sp_space_used < pool->sp_space_low) {
960			pool->sp_space_throttled = FALSE;
961			svc_assign_waiting_sockets(pool);
962			return TRUE;
963		}
964
965		return FALSE;
966	} else {
967		if (pool->sp_space_used
968		    >= pool->sp_space_high) {
969			pool->sp_space_throttled = TRUE;
970			pool->sp_space_throttle_count++;
971			return FALSE;
972		}
973
974		return TRUE;
975	}
976}
977
978static void
979svc_run_internal(SVCPOOL *pool, bool_t ismaster)
980{
981	SVCTHREAD *st, *stpref;
982	SVCXPRT *xprt;
983	enum xprt_stat stat;
984	struct svc_req *rqstp;
985	int error;
986
987	st = mem_alloc(sizeof(*st));
988	st->st_xprt = NULL;
989	STAILQ_INIT(&st->st_reqs);
990	cv_init(&st->st_cond, "rpcsvc");
991
992	mtx_lock(&pool->sp_lock);
993	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
994
995	/*
996	 * If we are a new thread which was spawned to cope with
997	 * increased load, set the state back to SVCPOOL_ACTIVE.
998	 */
999	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000		pool->sp_state = SVCPOOL_ACTIVE;
1001
1002	while (pool->sp_state != SVCPOOL_CLOSING) {
1003		/*
1004		 * Check for idle transports once per second.
1005		 */
1006		if (time_uptime > pool->sp_lastidlecheck) {
1007			pool->sp_lastidlecheck = time_uptime;
1008			svc_checkidle(pool);
1009		}
1010
1011		xprt = st->st_xprt;
1012		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1013			/*
1014			 * Enforce maxthreads count.
1015			 */
1016			if (pool->sp_threadcount > pool->sp_maxthreads)
1017				break;
1018
1019			/*
1020			 * Before sleeping, see if we can find an
1021			 * active transport which isn't being serviced
1022			 * by a thread.
1023			 */
1024			if (svc_request_space_available(pool)) {
1025				TAILQ_FOREACH(xprt, &pool->sp_active,
1026				    xp_alink) {
1027					if (!xprt->xp_thread) {
1028						SVC_ACQUIRE(xprt);
1029						xprt->xp_thread = st;
1030						st->st_xprt = xprt;
1031						break;
1032					}
1033				}
1034			}
1035			if (st->st_xprt)
1036				continue;
1037
1038			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039			error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1040				5 * hz);
1041			LIST_REMOVE(st, st_ilink);
1042
1043			/*
1044			 * Reduce worker thread count when idle.
1045			 */
1046			if (error == EWOULDBLOCK) {
1047				if (!ismaster
1048				    && (pool->sp_threadcount
1049					> pool->sp_minthreads)
1050					&& !st->st_xprt
1051					&& STAILQ_EMPTY(&st->st_reqs))
1052					break;
1053			}
1054			if (error == EWOULDBLOCK)
1055				continue;
1056			if (error) {
1057				if (pool->sp_state != SVCPOOL_CLOSING) {
1058					mtx_unlock(&pool->sp_lock);
1059					svc_exit(pool);
1060					mtx_lock(&pool->sp_lock);
1061				}
1062				break;
1063			}
1064
1065			if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066				pool->sp_state = SVCPOOL_THREADSTARTING;
1067				pool->sp_lastcreatetime = time_uptime;
1068				mtx_unlock(&pool->sp_lock);
1069				svc_new_thread(pool);
1070				mtx_lock(&pool->sp_lock);
1071			}
1072			continue;
1073		}
1074
1075		if (xprt) {
1076			/*
1077			 * Drain the transport socket and queue up any
1078			 * RPCs.
1079			 */
1080			xprt->xp_lastactive = time_uptime;
1081			stat = XPRT_IDLE;
1082			do {
1083				if (!svc_request_space_available(pool))
1084					break;
1085				rqstp = NULL;
1086				mtx_unlock(&pool->sp_lock);
1087				stat = svc_getreq(xprt, &rqstp);
1088				mtx_lock(&pool->sp_lock);
1089				if (rqstp) {
1090					/*
1091					 * See if the application has
1092					 * a preference for some other
1093					 * thread.
1094					 */
1095					stpref = st;
1096					if (pool->sp_assign)
1097						stpref = pool->sp_assign(st,
1098						    rqstp);
1099
1100					pool->sp_space_used +=
1101						rqstp->rq_size;
1102					if (pool->sp_space_used
1103					    > pool->sp_space_used_highest)
1104						pool->sp_space_used_highest =
1105							pool->sp_space_used;
1106					rqstp->rq_thread = stpref;
1107					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1108					    rqstp, rq_link);
1109					stpref->st_reqcount++;
1110
1111					/*
1112					 * If we assigned the request
1113					 * to another thread, make
1114					 * sure its awake and continue
1115					 * reading from the
1116					 * socket. Otherwise, try to
1117					 * find some other thread to
1118					 * read from the socket and
1119					 * execute the request
1120					 * immediately.
1121					 */
1122					if (stpref != st) {
1123						cv_signal(&stpref->st_cond);
1124						continue;
1125					} else {
1126						break;
1127					}
1128				}
1129			} while (stat == XPRT_MOREREQS
1130			    && pool->sp_state != SVCPOOL_CLOSING);
1131
1132			/*
1133			 * Move this transport to the end of the
1134			 * active list to ensure fairness when
1135			 * multiple transports are active. If this was
1136			 * the last queued request, svc_getreq will
1137			 * end up calling xprt_inactive to remove from
1138			 * the active list.
1139			 */
1140			xprt->xp_thread = NULL;
1141			st->st_xprt = NULL;
1142			if (xprt->xp_active) {
1143				xprt_assignthread(xprt);
1144				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1145				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1146				    xp_alink);
1147			}
1148			mtx_unlock(&pool->sp_lock);
1149			SVC_RELEASE(xprt);
1150			mtx_lock(&pool->sp_lock);
1151		}
1152
1153		/*
1154		 * Execute what we have queued.
1155		 */
1156		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1157			size_t sz = rqstp->rq_size;
1158			mtx_unlock(&pool->sp_lock);
1159			svc_executereq(rqstp);
1160			mtx_lock(&pool->sp_lock);
1161			pool->sp_space_used -= sz;
1162		}
1163	}
1164
1165	if (st->st_xprt) {
1166		xprt = st->st_xprt;
1167		st->st_xprt = NULL;
1168		SVC_RELEASE(xprt);
1169	}
1170
1171	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1172	LIST_REMOVE(st, st_link);
1173	pool->sp_threadcount--;
1174
1175	mtx_unlock(&pool->sp_lock);
1176
1177	cv_destroy(&st->st_cond);
1178	mem_free(st, sizeof(*st));
1179
1180	if (!ismaster)
1181		wakeup(pool);
1182}
1183
1184static void
1185svc_thread_start(void *arg)
1186{
1187
1188	svc_run_internal((SVCPOOL *) arg, FALSE);
1189	kthread_exit();
1190}
1191
1192static void
1193svc_new_thread(SVCPOOL *pool)
1194{
1195	struct thread *td;
1196
1197	pool->sp_threadcount++;
1198	kthread_add(svc_thread_start, pool,
1199	    pool->sp_proc, &td, 0, 0,
1200	    "%s: service", pool->sp_name);
1201}
1202
1203void
1204svc_run(SVCPOOL *pool)
1205{
1206	int i;
1207	struct proc *p;
1208	struct thread *td;
1209
1210	p = curproc;
1211	td = curthread;
1212	snprintf(td->td_name, sizeof(td->td_name),
1213	    "%s: master", pool->sp_name);
1214	pool->sp_state = SVCPOOL_ACTIVE;
1215	pool->sp_proc = p;
1216	pool->sp_lastcreatetime = time_uptime;
1217	pool->sp_threadcount = 1;
1218
1219	for (i = 1; i < pool->sp_minthreads; i++) {
1220		svc_new_thread(pool);
1221	}
1222
1223	svc_run_internal(pool, TRUE);
1224
1225	mtx_lock(&pool->sp_lock);
1226	while (pool->sp_threadcount > 0)
1227		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1228	mtx_unlock(&pool->sp_lock);
1229}
1230
1231void
1232svc_exit(SVCPOOL *pool)
1233{
1234	SVCTHREAD *st;
1235
1236	mtx_lock(&pool->sp_lock);
1237
1238	pool->sp_state = SVCPOOL_CLOSING;
1239	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1240		cv_signal(&st->st_cond);
1241
1242	mtx_unlock(&pool->sp_lock);
1243}
1244
1245bool_t
1246svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1247{
1248	struct mbuf *m;
1249	XDR xdrs;
1250	bool_t stat;
1251
1252	m = rqstp->rq_args;
1253	rqstp->rq_args = NULL;
1254
1255	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1256	stat = xargs(&xdrs, args);
1257	XDR_DESTROY(&xdrs);
1258
1259	return (stat);
1260}
1261
1262bool_t
1263svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1264{
1265	XDR xdrs;
1266
1267	if (rqstp->rq_addr) {
1268		free(rqstp->rq_addr, M_SONAME);
1269		rqstp->rq_addr = NULL;
1270	}
1271
1272	xdrs.x_op = XDR_FREE;
1273	return (xargs(&xdrs, args));
1274}
1275
1276void
1277svc_freereq(struct svc_req *rqstp)
1278{
1279	SVCTHREAD *st;
1280	SVCXPRT *xprt;
1281	SVCPOOL *pool;
1282
1283	st = rqstp->rq_thread;
1284	xprt = rqstp->rq_xprt;
1285	if (xprt)
1286		pool = xprt->xp_pool;
1287	else
1288		pool = NULL;
1289	if (st) {
1290		mtx_lock(&pool->sp_lock);
1291		KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1292		    ("Freeing request out of order"));
1293		STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1294		st->st_reqcount--;
1295		if (pool->sp_done)
1296			pool->sp_done(st, rqstp);
1297		mtx_unlock(&pool->sp_lock);
1298	}
1299
1300	if (rqstp->rq_auth.svc_ah_ops)
1301		SVCAUTH_RELEASE(&rqstp->rq_auth);
1302
1303	if (rqstp->rq_xprt) {
1304		SVC_RELEASE(rqstp->rq_xprt);
1305	}
1306
1307	if (rqstp->rq_addr)
1308		free(rqstp->rq_addr, M_SONAME);
1309
1310	if (rqstp->rq_args)
1311		m_freem(rqstp->rq_args);
1312
1313	free(rqstp, M_RPC);
1314}
1315