1/*
2 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3 * All rights reserved
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted providing that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 *
26 */
27
28#include <errno.h>
29#include <stdlib.h>
30#include <pthread.h>
31#if defined(__FreeBSD__)
32#include <pthread_np.h>
33#endif
34#include <sys/queue.h>
35#include "lib9p.h"
36#include "threadpool.h"
37
38static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39    struct l9p_request *req);
40
41static void *
42l9p_responder(void *arg)
43{
44	struct l9p_threadpool *tp;
45	struct l9p_worker *worker = arg;
46	struct l9p_request *req;
47
48	tp = worker->ltw_tp;
49	for (;;) {
50		/* get next reply to send */
51		pthread_mutex_lock(&tp->ltp_mtx);
52		while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53			pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54		if (worker->ltw_exiting) {
55			pthread_mutex_unlock(&tp->ltp_mtx);
56			break;
57		}
58
59		/* off reply queue */
60		req = STAILQ_FIRST(&tp->ltp_replyq);
61		STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
62
63		/* request is now in final glide path, can't be Tflush-ed */
64		req->lr_workstate = L9P_WS_REPLYING;
65
66		/* any flushers waiting for this request can go now */
67		if (req->lr_flushstate != L9P_FLUSH_NONE)
68			l9p_threadpool_rflush(tp, req);
69
70		pthread_mutex_unlock(&tp->ltp_mtx);
71
72		/* send response */
73		l9p_respond(req, false, true);
74	}
75	return (NULL);
76}
77
78static void *
79l9p_worker(void *arg)
80{
81	struct l9p_threadpool *tp;
82	struct l9p_worker *worker = arg;
83	struct l9p_request *req;
84
85	tp = worker->ltw_tp;
86	pthread_mutex_lock(&tp->ltp_mtx);
87	for (;;) {
88		while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89			pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90		if (worker->ltw_exiting)
91			break;
92
93		/* off work queue; now work-in-progress, by us */
94		req = STAILQ_FIRST(&tp->ltp_workq);
95		STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96		req->lr_workstate = L9P_WS_INPROGRESS;
97		req->lr_worker = worker;
98		pthread_mutex_unlock(&tp->ltp_mtx);
99
100		/* actually try the request */
101		req->lr_error = l9p_dispatch_request(req);
102
103		/* move to responder queue, updating work-state */
104		pthread_mutex_lock(&tp->ltp_mtx);
105		req->lr_workstate = L9P_WS_RESPQUEUED;
106		req->lr_worker = NULL;
107		STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
108
109		/* signal the responder */
110		pthread_cond_signal(&tp->ltp_reply_cv);
111	}
112	pthread_mutex_unlock(&tp->ltp_mtx);
113	return (NULL);
114}
115
116/*
117 * Just before finally replying to a request that got touched by
118 * a Tflush request, we enqueue its flushers (requests of type
119 * Tflush, which are now on the flushee's lr_flushq) onto the
120 * response queue.
121 */
122static void
123l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
124{
125	struct l9p_request *flusher;
126
127	/*
128	 * https://swtch.com/plan9port/man/man9/flush.html says:
129	 *
130	 * "Should multiple Tflushes be received for a pending
131	 * request, they must be answered in order.  A Rflush for
132	 * any of the multiple Tflushes implies an answer for all
133	 * previous ones.  Therefore, should a server receive a
134	 * request and then multiple flushes for that request, it
135	 * need respond only to the last flush."  This means
136	 * we could march through the queue of flushers here,
137	 * marking all but the last one as "to be dropped" rather
138	 * than "to be replied-to".
139	 *
140	 * However, we'll leave that for later, if ever -- it
141	 * should be harmless to respond to each, in order.
142	 */
143	STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144		flusher->lr_workstate = L9P_WS_RESPQUEUED;
145#ifdef notdef
146		if (not the last) {
147			flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148			/* or, flusher->lr_drop = true ? */
149		}
150#endif
151		STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
152	}
153}
154
155int
156l9p_threadpool_init(struct l9p_threadpool *tp, int size)
157{
158	struct l9p_worker *worker;
159#if defined(__FreeBSD__)
160	char threadname[16];
161#endif
162	int error;
163	int i, nworkers, nresponders;
164
165	if (size <= 0)
166		return (EINVAL);
167	error = pthread_mutex_init(&tp->ltp_mtx, NULL);
168	if (error)
169		return (error);
170	error = pthread_cond_init(&tp->ltp_work_cv, NULL);
171	if (error)
172		goto fail_work_cv;
173	error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
174	if (error)
175		goto fail_reply_cv;
176
177	STAILQ_INIT(&tp->ltp_workq);
178	STAILQ_INIT(&tp->ltp_replyq);
179	LIST_INIT(&tp->ltp_workers);
180
181	nresponders = 0;
182	nworkers = 0;
183	for (i = 0; i <= size; i++) {
184		worker = calloc(1, sizeof(struct l9p_worker));
185		worker->ltw_tp = tp;
186		worker->ltw_responder = i == 0;
187		error = pthread_create(&worker->ltw_thread, NULL,
188		    worker->ltw_responder ? l9p_responder : l9p_worker,
189		    (void *)worker);
190		if (error) {
191			free(worker);
192			break;
193		}
194		if (worker->ltw_responder)
195			nresponders++;
196		else
197			nworkers++;
198
199#if defined(__FreeBSD__)
200		if (worker->ltw_responder) {
201			pthread_set_name_np(worker->ltw_thread, "9p-responder");
202		} else {
203			sprintf(threadname, "9p-worker:%d", i - 1);
204			pthread_set_name_np(worker->ltw_thread, threadname);
205		}
206#endif
207
208		LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
209	}
210	if (nresponders == 0 || nworkers == 0) {
211		/* need the one responder, and at least one worker */
212		l9p_threadpool_shutdown(tp);
213		return (error);
214	}
215	return (0);
216
217	/*
218	 * We could avoid these labels by having multiple destroy
219	 * paths (one for each error case), or by having booleans
220	 * for which variables were initialized.  Neither is very
221	 * appealing...
222	 */
223fail_reply_cv:
224	pthread_cond_destroy(&tp->ltp_work_cv);
225fail_work_cv:
226	pthread_mutex_destroy(&tp->ltp_mtx);
227
228	return (error);
229}
230
231/*
232 * Run a request, usually by queueing it.
233 */
234void
235l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
236{
237
238	/*
239	 * Flush requests must be handled specially, since they
240	 * can cancel / kill off regular requests.  (But we can
241	 * run them through the regular dispatch mechanism.)
242	 */
243	if (req->lr_req.hdr.type == L9P_TFLUSH) {
244		/* not on a work queue yet so we can touch state */
245		req->lr_workstate = L9P_WS_IMMEDIATE;
246		(void) l9p_dispatch_request(req);
247	} else {
248		pthread_mutex_lock(&tp->ltp_mtx);
249		req->lr_workstate = L9P_WS_NOTSTARTED;
250		STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251		pthread_cond_signal(&tp->ltp_work_cv);
252		pthread_mutex_unlock(&tp->ltp_mtx);
253	}
254}
255
256/*
257 * Run a Tflush request.  Called via l9p_dispatch_request() since
258 * it has some debug code in it, but not called from worker thread.
259 */
260int
261l9p_threadpool_tflush(struct l9p_request *req)
262{
263	struct l9p_connection *conn;
264	struct l9p_threadpool *tp;
265	struct l9p_request *flushee;
266	uint16_t oldtag;
267	enum l9p_flushstate nstate;
268
269	/*
270	 * Find what we're supposed to flush (the flushee, as it were).
271	 */
272	req->lr_error = 0;	/* Tflush always succeeds */
273	conn = req->lr_conn;
274	tp = &conn->lc_tp;
275	oldtag = req->lr_req.tflush.oldtag;
276	ht_wrlock(&conn->lc_requests);
277	flushee = ht_find_locked(&conn->lc_requests, oldtag);
278	if (flushee == NULL) {
279		/*
280		 * Nothing to flush!  The old request must have
281		 * been done and gone already.  Just queue this
282		 * Tflush for a success reply.
283		 */
284		ht_unlock(&conn->lc_requests);
285		pthread_mutex_lock(&tp->ltp_mtx);
286		goto done;
287	}
288
289	/*
290	 * Found the original request.  We'll need to inspect its
291	 * work-state to figure out what to do.
292	 */
293	pthread_mutex_lock(&tp->ltp_mtx);
294	ht_unlock(&conn->lc_requests);
295
296	switch (flushee->lr_workstate) {
297
298	case L9P_WS_NOTSTARTED:
299		/*
300		 * Flushee is on work queue, but not yet being
301		 * handled by a worker.
302		 *
303		 * The documentation -- see
304		 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
305		 * https://swtch.com/plan9port/man/man9/flush.html
306		 * -- says that "the server should answer the
307		 * flush message immediately".  However, Linux
308		 * sends flush requests for operations that
309		 * must finish, such as Tclunk, and it's not
310		 * possible to *answer* the flush request until
311		 * it has been handled (if necessary) or aborted
312		 * (if allowed).
313		 *
314		 * We therefore now just  the original request
315		 * and let the request-handler do whatever is
316		 * appropriate.  NOTE: we could have a table of
317		 * "requests that can be aborted without being
318		 * run" vs "requests that must be run to be
319		 * aborted", but for now that seems like an
320		 * unnecessary complication.
321		 */
322		nstate = L9P_FLUSH_REQUESTED_PRE_START;
323		break;
324
325	case L9P_WS_IMMEDIATE:
326		/*
327		 * This state only applies to Tflush requests, and
328		 * flushing a Tflush is illegal.  But we'll do nothing
329		 * special here, which will make us act like a flush
330		 * request for the flushee that arrived too late to
331		 * do anything about the flushee.
332		 */
333		nstate = L9P_FLUSH_REQUESTED_POST_START;
334		break;
335
336	case L9P_WS_INPROGRESS:
337		/*
338		 * Worker thread flushee->lr_worker is working on it.
339		 * Kick it to get it out of blocking system calls.
340		 * (This requires that it carefully set up some
341		 * signal handlers, and may be FreeBSD-dependent,
342		 * it probably cannot be handled this way on MacOS.)
343		 */
344#ifdef notyet
345		pthread_kill(...);
346#endif
347		nstate = L9P_FLUSH_REQUESTED_POST_START;
348		break;
349
350	case L9P_WS_RESPQUEUED:
351		/*
352		 * The flushee is already in the response queue.
353		 * We'll just mark it as having had some flush
354		 * action applied.
355		 */
356		nstate = L9P_FLUSH_TOOLATE;
357		break;
358
359	case L9P_WS_REPLYING:
360		/*
361		 * Although we found the flushee, it's too late to
362		 * make us depend on it: it's already heading out
363		 * the door as a reply.
364		 *
365		 * We don't want to do anything to the flushee.
366		 * Instead, we want to work the same way as if
367		 * we had never found the tag.
368		 */
369		goto done;
370	}
371
372	/*
373	 * Now add us to the list of Tflush-es that are waiting
374	 * for the flushee (creating the list if needed, i.e., if
375	 * this is the first Tflush for the flushee).  We (req)
376	 * will get queued for reply later, when the responder
377	 * processes the flushee and calls l9p_threadpool_rflush().
378	 */
379	if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380		STAILQ_INIT(&flushee->lr_flushq);
381	flushee->lr_flushstate = nstate;
382	STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
383
384	pthread_mutex_unlock(&tp->ltp_mtx);
385
386	return (0);
387
388done:
389	/*
390	 * This immediate op is ready to be replied-to now, so just
391	 * stick it onto the reply queue.
392	 */
393	req->lr_workstate = L9P_WS_RESPQUEUED;
394	STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395	pthread_mutex_unlock(&tp->ltp_mtx);
396	pthread_cond_signal(&tp->ltp_reply_cv);
397	return (0);
398}
399
400int
401l9p_threadpool_shutdown(struct l9p_threadpool *tp)
402{
403	struct l9p_worker *worker, *tmp;
404
405	LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406		pthread_mutex_lock(&tp->ltp_mtx);
407		worker->ltw_exiting = true;
408		if (worker->ltw_responder)
409			pthread_cond_signal(&tp->ltp_reply_cv);
410		else
411			pthread_cond_broadcast(&tp->ltp_work_cv);
412		pthread_mutex_unlock(&tp->ltp_mtx);
413		pthread_join(worker->ltw_thread, NULL);
414		LIST_REMOVE(worker, ltw_link);
415		free(worker);
416	}
417	pthread_cond_destroy(&tp->ltp_reply_cv);
418	pthread_cond_destroy(&tp->ltp_work_cv);
419	pthread_mutex_destroy(&tp->ltp_mtx);
420
421	return (0);
422}
423