1/*
2 * Copyright (c) 2011 Apple Inc.  All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28#include <stdint.h>
29#include <sys/param.h>
30#include <sys/mount_internal.h>
31#include <sys/malloc.h>
32#include <sys/queue.h>
33
34#include <libkern/libkern.h>
35#include <libkern/OSAtomic.h>
36#include <kern/debug.h>
37#include <kern/thread.h>
38
39#include <nfs/rpcv2.h>
40#include <nfs/nfsproto.h>
41#include <nfs/nfs.h>
42
43#ifdef NFS_UC_DEBUG
44#define DPRINT(fmt, ...) printf(fmt,## __VA_ARGS__)
45#else
46#define DPRINT(fmt, ...)
47#endif
48
49struct nfsrv_uc_arg {
50	TAILQ_ENTRY(nfsrv_uc_arg) nua_svcq;
51	socket_t nua_so;
52	struct nfsrv_sock *nua_slp;
53	int nua_waitflag;  /* Should always be MBUF_DONTWAIT */
54	uint32_t nua_flags;
55	uint32_t nua_qi;
56};
57#define NFS_UC_QUEUED	0x0001
58
59#define NFS_UC_HASH_SZ 7
60#define NFS_UC_HASH(x) ((((uint32_t)(uintptr_t)(x)) >> 3) % nfsrv_uc_thread_count)
61
62TAILQ_HEAD(nfsrv_uc_q, nfsrv_uc_arg);
63
64static struct nfsrv_uc_queue {
65	lck_mtx_t		*ucq_lock;
66	struct nfsrv_uc_q	ucq_queue[1];
67	thread_t		ucq_thd;
68	uint32_t		ucq_flags;
69} nfsrv_uc_queue_tbl[NFS_UC_HASH_SZ];
70#define NFS_UC_QUEUE_SLEEPING	0x0001
71
72static lck_grp_t *nfsrv_uc_group;
73static lck_mtx_t *nfsrv_uc_shutdown_lock;
74static volatile int nfsrv_uc_shutdown = 0;
75static int32_t nfsrv_uc_thread_count;
76
77extern kern_return_t thread_terminate(thread_t);
78
79#ifdef NFS_UC_Q_DEBUG
80int nfsrv_uc_use_proxy = 1;
81uint32_t nfsrv_uc_queue_limit;
82uint32_t nfsrv_uc_queue_max_seen;
83volatile uint32_t nfsrv_uc_queue_count;
84#endif
85
86/*
87 * Thread that dequeues up-calls and runs the nfsrv_rcv routine
88 */
89static void
90nfsrv_uc_thread(void *arg, wait_result_t wr __unused)
91{
92	int qi = (int)(uintptr_t)arg;
93	int error;
94	struct nfsrv_uc_arg *ep = NULL;
95	struct nfsrv_uc_queue *myqueue = &nfsrv_uc_queue_tbl[qi];
96
97	DPRINT("nfsrv_uc_thread %d started\n", qi);
98	while (!nfsrv_uc_shutdown) {
99		lck_mtx_lock(myqueue->ucq_lock);
100
101		while (!nfsrv_uc_shutdown && TAILQ_EMPTY(myqueue->ucq_queue)) {
102			myqueue->ucq_flags |= NFS_UC_QUEUE_SLEEPING;
103			error = msleep(myqueue, myqueue->ucq_lock, PSOCK, "nfsd_upcall_handler", NULL);
104			myqueue->ucq_flags &= ~NFS_UC_QUEUE_SLEEPING;
105			if (error) {
106				printf("nfsrv_uc_thread received error %d\n", error);
107			}
108		}
109		if (nfsrv_uc_shutdown) {
110			lck_mtx_unlock(myqueue->ucq_lock);
111			break;
112		}
113
114
115		ep = TAILQ_FIRST(myqueue->ucq_queue);
116		DPRINT("nfsrv_uc_thread:%d dequeue %p from %p\n", qi, ep, myqueue);
117
118		TAILQ_REMOVE(myqueue->ucq_queue, ep, nua_svcq);
119
120		ep->nua_flags &= ~NFS_UC_QUEUED;
121
122		lck_mtx_unlock(myqueue->ucq_lock);
123
124#ifdef NFS_UC_Q_DEBUG
125		OSDecrementAtomic(&nfsrv_uc_queue_count);
126#endif
127
128		DPRINT("calling nfsrv_rcv for %p\n", (void *)ep->nua_slp);
129		nfsrv_rcv(ep->nua_so, (void *)ep->nua_slp, ep->nua_waitflag);
130	}
131
132	lck_mtx_lock(nfsrv_uc_shutdown_lock);
133	nfsrv_uc_thread_count--;
134	wakeup(&nfsrv_uc_thread_count);
135	lck_mtx_unlock(nfsrv_uc_shutdown_lock);
136
137	thread_terminate(current_thread());
138}
139
140/*
141 * Dequeue a closed nfsrv_sock if needed from the up-call queue.
142 * Call from nfsrv_zapsock
143 */
144void
145nfsrv_uc_dequeue(struct nfsrv_sock *slp)
146{
147	struct nfsrv_uc_arg *ap = slp->ns_ua;
148	struct nfsrv_uc_queue *myqueue = &nfsrv_uc_queue_tbl[ap->nua_qi];
149
150	/*
151	 * We assume that the socket up-calls have been stop and the socket
152	 * is shutting down so no need for acquiring the lock to check that
153	 * the flag is cleared.
154	 */
155	if (ap == NULL || (ap->nua_flags & NFS_UC_QUEUED) == 0)
156		return;
157	/* If we're queued we might race with nfsrv_uc_thread */
158	lck_mtx_lock(myqueue->ucq_lock);
159	if (ap->nua_flags & NFS_UC_QUEUED) {
160		printf("nfsrv_uc_dequeue remove %p\n", ap);
161		TAILQ_REMOVE(myqueue->ucq_queue, ap, nua_svcq);
162		ap->nua_flags &= ~NFS_UC_QUEUED;
163#ifdef NFS_UC_Q_DEBUG
164		OSDecrementAtomic(&nfsrv_uc_queue_count);
165#endif
166	}
167	lck_mtx_unlock(myqueue->ucq_lock);
168}
169
170/*
171 * Allocate and initialize globals for nfsrv_sock up-call support.
172 */
173void
174nfsrv_uc_init(void)
175{
176	int i;
177
178	nfsrv_uc_group = lck_grp_alloc_init("nfs_upcall_locks", LCK_GRP_ATTR_NULL);
179	for (i = 0; i < NFS_UC_HASH_SZ; i++) {
180		TAILQ_INIT(nfsrv_uc_queue_tbl[i].ucq_queue);
181		nfsrv_uc_queue_tbl[i].ucq_lock = lck_mtx_alloc_init(nfsrv_uc_group, LCK_ATTR_NULL);
182		nfsrv_uc_queue_tbl[i].ucq_thd = THREAD_NULL;
183		nfsrv_uc_queue_tbl[i].ucq_flags = 0;
184	}
185	nfsrv_uc_shutdown_lock = lck_mtx_alloc_init(nfsrv_uc_group, LCK_ATTR_NULL);
186}
187
188/*
189 * Start up-call threads to service nfsrv_sock(s)
190 * Called from the first call of nfsrv_uc_addsock
191 */
192static void
193nfsrv_uc_start(void)
194{
195	int32_t i;
196	int error;
197
198#ifdef NFS_UC_Q_DEBUG
199	if (!nfsrv_uc_use_proxy)
200		return;
201#endif
202	DPRINT("nfsrv_uc_start\n");
203
204	/* Wait until previous shutdown finishes */
205	lck_mtx_lock(nfsrv_uc_shutdown_lock);
206	while (nfsrv_uc_shutdown || nfsrv_uc_thread_count > 0)
207		msleep(&nfsrv_uc_thread_count, nfsrv_uc_shutdown_lock, PSOCK, "nfsd_upcall_shutdown_wait", NULL);
208
209	/* Start up-call threads */
210	for (i = 0; i < NFS_UC_HASH_SZ; i++) {
211		error = kernel_thread_start(nfsrv_uc_thread, (void *)(uintptr_t)i, &nfsrv_uc_queue_tbl[nfsrv_uc_thread_count].ucq_thd);
212		if (!error) {
213			nfsrv_uc_thread_count++;
214		} else {
215			printf("nfsd: Could not start nfsrv_uc_thread: %d\n", error);
216		}
217	}
218	if (nfsrv_uc_thread_count == 0) {
219		printf("nfsd: Could not start nfsd proxy up-call service. Falling back\n");
220		goto out;
221	}
222
223out:
224#ifdef NFS_UC_Q_DEBUG
225	nfsrv_uc_queue_count = 0ULL;
226	nfsrv_uc_queue_max_seen = 0ULL;
227#endif
228	lck_mtx_unlock(nfsrv_uc_shutdown_lock);
229}
230
231/*
232 * Stop the up-call threads.
233 * Called from nfsrv_uc_cleanup.
234 */
235static void
236nfsrv_uc_stop(void)
237{
238	int32_t i;
239	int32_t thread_count = nfsrv_uc_thread_count;
240
241	DPRINT("Entering nfsrv_uc_stop\n");
242
243	/* Signal up-call threads to stop */
244	nfsrv_uc_shutdown = 1;
245	for (i = 0; i < thread_count; i++) {
246		lck_mtx_lock(nfsrv_uc_queue_tbl[i].ucq_lock);
247		wakeup(&nfsrv_uc_queue_tbl[i]);
248		lck_mtx_unlock(nfsrv_uc_queue_tbl[i].ucq_lock);
249	}
250
251	/* Wait until they are done shutting down */
252	lck_mtx_lock(nfsrv_uc_shutdown_lock);
253	while (nfsrv_uc_thread_count > 0)
254		msleep(&nfsrv_uc_thread_count, nfsrv_uc_shutdown_lock, PSOCK, "nfsd_upcall_shutdown_stop", NULL);
255
256	/* Deallocate old threads */
257	for (i = 0; i < nfsrv_uc_thread_count; i++) {
258		if (nfsrv_uc_queue_tbl[i].ucq_thd != THREAD_NULL)
259			thread_deallocate(nfsrv_uc_queue_tbl[i].ucq_thd);
260		nfsrv_uc_queue_tbl[i].ucq_thd = THREAD_NULL;
261	}
262
263	/* Enable restarting */
264	nfsrv_uc_shutdown = 0;
265	lck_mtx_unlock(nfsrv_uc_shutdown_lock);
266}
267
268/*
269 * Shutdown up-calls for nfsrv_socks.
270 *	Make sure nothing is queued on the up-call queues
271 *	Shutdown the up-call threads
272 * Called from nfssvc_cleanup.
273 */
274void
275nfsrv_uc_cleanup(void)
276{
277	int i;
278
279	DPRINT("Entering nfsrv_uc_cleanup\n");
280
281	/*
282	 * Every thing should be dequeued at this point or will be as sockets are closed
283	 * but to be safe, we'll make sure.
284	 */
285	for (i = 0; i < NFS_UC_HASH_SZ; i++) {
286		struct nfsrv_uc_queue *queue = &nfsrv_uc_queue_tbl[i];
287
288		lck_mtx_lock(queue->ucq_lock);
289		while (!TAILQ_EMPTY(queue->ucq_queue)) {
290			struct nfsrv_uc_arg *ep = TAILQ_FIRST(queue->ucq_queue);
291			TAILQ_REMOVE(queue->ucq_queue, ep, nua_svcq);
292			ep->nua_flags &= ~NFS_UC_QUEUED;
293		}
294		lck_mtx_unlock(queue->ucq_lock);
295	}
296
297	nfsrv_uc_stop();
298}
299
300/*
301 * This is the nfs up-call routine for server sockets.
302 * We used to set nfsrv_rcv as the up-call routine, but
303 * recently that seems like we are doing to much work for
304 * the interface thread, so we just queue the arguments
305 * that we would have gotten for nfsrv_rcv and let a
306 * worker thread dequeue them and pass them on to nfsrv_rcv.
307 */
308static void
309nfsrv_uc_proxy(socket_t so, void *arg, int waitflag)
310{
311	struct nfsrv_uc_arg *uap = (struct nfsrv_uc_arg *)arg;
312	int qi = uap->nua_qi;
313	struct nfsrv_uc_queue *myqueue = &nfsrv_uc_queue_tbl[qi];
314
315	lck_mtx_lock(myqueue->ucq_lock);
316	DPRINT("nfsrv_uc_proxy called for %p (%p)\n", uap, uap->nua_slp);
317	DPRINT("\tUp-call queued on %d for wakeup of %p\n", qi, myqueue);
318	if (uap->nua_flags & NFS_UC_QUEUED) {
319		lck_mtx_unlock(myqueue->ucq_lock);
320		return;  /* Already queued */
321	}
322
323	uap->nua_so = so;
324	uap->nua_waitflag = waitflag;
325
326	TAILQ_INSERT_TAIL(myqueue->ucq_queue, uap, nua_svcq);
327
328	uap->nua_flags |= NFS_UC_QUEUED;
329	if (myqueue->ucq_flags | NFS_UC_QUEUE_SLEEPING)
330		wakeup(myqueue);
331
332#ifdef NFS_UC_Q_DEBUG
333	{
334		uint32_t count = OSIncrementAtomic(&nfsrv_uc_queue_count);
335
336		/* This is a bit racey but just for debug */
337		if (count > nfsrv_uc_queue_max_seen)
338			nfsrv_uc_queue_max_seen = count;
339
340		if (nfsrv_uc_queue_limit && count > nfsrv_uc_queue_limit) {
341			panic("nfsd up-call queue limit exceeded\n");
342		}
343	}
344#endif
345	lck_mtx_unlock(myqueue->ucq_lock);
346}
347
348
349/*
350 * Set the up-call routine on the socket associated with the passed in
351 * nfsrv_sock.
352 * Assumes nfsd_mutex is held.
353 */
354void
355nfsrv_uc_addsock(struct nfsrv_sock *slp, int start)
356{
357	int on = 1;
358	struct nfsrv_uc_arg *arg;
359
360	if (start && nfsrv_uc_thread_count == 0)
361		nfsrv_uc_start();
362
363	/*
364	 * We don't take a lock since once we're up nfsrv_uc_thread_count does
365	 * not change until shutdown and then we should not be adding sockets to
366	 * generate up-calls.
367	 */
368	if (nfsrv_uc_thread_count) {
369		MALLOC(arg, struct nfsrv_uc_arg *, sizeof (struct nfsrv_uc_arg), M_NFSSVC, M_WAITOK | M_ZERO);
370		if (arg == NULL)
371			goto direct;
372
373		slp->ns_ua = arg;
374		arg->nua_slp = slp;
375		arg->nua_qi = NFS_UC_HASH(slp);
376
377		sock_setupcall(slp->ns_so, nfsrv_uc_proxy, arg);
378	} else {
379direct:
380		slp->ns_ua = NULL;
381		DPRINT("setting nfsrv_rcv up-call\n");
382		sock_setupcall(slp->ns_so, nfsrv_rcv, slp);
383	}
384
385	/* just playin' it safe */
386	sock_setsockopt(slp->ns_so, SOL_SOCKET, SO_UPCALLCLOSEWAIT, &on, sizeof(on));
387
388	return;
389}
390
391