161033Sdfr/*-
261033Sdfr * Copyright (c) 2000 Doug Rabson
361033Sdfr * All rights reserved.
461033Sdfr *
561033Sdfr * Redistribution and use in source and binary forms, with or without
661033Sdfr * modification, are permitted provided that the following conditions
761033Sdfr * are met:
861033Sdfr * 1. Redistributions of source code must retain the above copyright
961033Sdfr *    notice, this list of conditions and the following disclaimer.
1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright
1161033Sdfr *    notice, this list of conditions and the following disclaimer in the
1261033Sdfr *    documentation and/or other materials provided with the distribution.
1361033Sdfr *
1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1761033Sdfr * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2461033Sdfr * SUCH DAMAGE.
2561033Sdfr */
2661033Sdfr
27116182Sobrien#include <sys/cdefs.h>
28116182Sobrien__FBSDID("$FreeBSD: stable/10/sys/kern/subr_taskqueue.c 315268 2017-03-14 16:00:33Z hselasky $");
29116182Sobrien
3061033Sdfr#include <sys/param.h>
3185521Sjhb#include <sys/systm.h>
3265822Sjhb#include <sys/bus.h>
3385560Sjhb#include <sys/interrupt.h>
3461033Sdfr#include <sys/kernel.h>
35123614Sjhb#include <sys/kthread.h>
36225570Sadrian#include <sys/limits.h>
3785521Sjhb#include <sys/lock.h>
3861033Sdfr#include <sys/malloc.h>
3985521Sjhb#include <sys/mutex.h>
40145729Ssam#include <sys/proc.h>
41154333Sscottl#include <sys/sched.h>
4285521Sjhb#include <sys/taskqueue.h>
43119708Sken#include <sys/unistd.h>
44154333Sscottl#include <machine/stdarg.h>
4561033Sdfr
4669774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
47123614Sjhbstatic void	*taskqueue_giant_ih;
48123614Sjhbstatic void	*taskqueue_ih;
49297066Smavstatic void	 taskqueue_fast_enqueue(void *);
50297066Smavstatic void	 taskqueue_swi_enqueue(void *);
51297066Smavstatic void	 taskqueue_swi_giant_enqueue(void *);
5267551Sjhb
53213813Smdfstruct taskqueue_busy {
54213813Smdf	struct task	*tb_running;
55213813Smdf	TAILQ_ENTRY(taskqueue_busy) tb_link;
56213813Smdf};
57213813Smdf
5861033Sdfrstruct taskqueue {
5961033Sdfr	STAILQ_HEAD(, task)	tq_queue;
6061033Sdfr	taskqueue_enqueue_fn	tq_enqueue;
6161033Sdfr	void			*tq_context;
62213813Smdf	TAILQ_HEAD(, taskqueue_busy) tq_active;
6385521Sjhb	struct mtx		tq_mutex;
64178015Ssam	struct thread		**tq_threads;
65178015Ssam	int			tq_tcount;
66180588Skmacy	int			tq_spin;
67154333Sscottl	int			tq_flags;
68221059Skib	int			tq_callouts;
69248649Swill	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
70248649Swill	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
7161033Sdfr};
7261033Sdfr
73154333Sscottl#define	TQ_FLAGS_ACTIVE		(1 << 0)
74177621Sscottl#define	TQ_FLAGS_BLOCKED	(1 << 1)
75297066Smav#define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
76154333Sscottl
77221059Skib#define	DT_CALLOUT_ARMED	(1 << 0)
78306947Shselasky#define	DT_DRAIN_IN_PROGRESS	(1 << 1)
79221059Skib
80215021Sjmallett#define	TQ_LOCK(tq)							\
81215021Sjmallett	do {								\
82215021Sjmallett		if ((tq)->tq_spin)					\
83215021Sjmallett			mtx_lock_spin(&(tq)->tq_mutex);			\
84215021Sjmallett		else							\
85215021Sjmallett			mtx_lock(&(tq)->tq_mutex);			\
86215021Sjmallett	} while (0)
87248649Swill#define	TQ_ASSERT_LOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_OWNED)
88154167Sscottl
89215021Sjmallett#define	TQ_UNLOCK(tq)							\
90215021Sjmallett	do {								\
91215021Sjmallett		if ((tq)->tq_spin)					\
92215021Sjmallett			mtx_unlock_spin(&(tq)->tq_mutex);		\
93215021Sjmallett		else							\
94215021Sjmallett			mtx_unlock(&(tq)->tq_mutex);			\
95215021Sjmallett	} while (0)
96248649Swill#define	TQ_ASSERT_UNLOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
97154167Sscottl
98221059Skibvoid
99221059Skib_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
100221059Skib    int priority, task_fn_t func, void *context)
101221059Skib{
102221059Skib
103221059Skib	TASK_INIT(&timeout_task->t, priority, func, context);
104297066Smav	callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
105297066Smav	    CALLOUT_RETURNUNLOCKED);
106221059Skib	timeout_task->q = queue;
107221059Skib	timeout_task->f = 0;
108221059Skib}
109221059Skib
110154167Sscottlstatic __inline int
111154167SscottlTQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
112154167Sscottl    int t)
113154167Sscottl{
114180588Skmacy	if (tq->tq_spin)
115154167Sscottl		return (msleep_spin(p, m, wm, t));
116154167Sscottl	return (msleep(p, m, pri, wm, t));
117154167Sscottl}
118154167Sscottl
119154167Sscottlstatic struct taskqueue *
120215750Savg_taskqueue_create(const char *name __unused, int mflags,
121145729Ssam		 taskqueue_enqueue_fn enqueue, void *context,
122154167Sscottl		 int mtxflags, const char *mtxname)
12361033Sdfr{
12461033Sdfr	struct taskqueue *queue;
125180588Skmacy
12685521Sjhb	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
12761033Sdfr	if (!queue)
128188058Simp		return NULL;
129180588Skmacy
13061033Sdfr	STAILQ_INIT(&queue->tq_queue);
131213813Smdf	TAILQ_INIT(&queue->tq_active);
13261033Sdfr	queue->tq_enqueue = enqueue;
13361033Sdfr	queue->tq_context = context;
134180588Skmacy	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
135180588Skmacy	queue->tq_flags |= TQ_FLAGS_ACTIVE;
136297066Smav	if (enqueue == taskqueue_fast_enqueue ||
137297066Smav	    enqueue == taskqueue_swi_enqueue ||
138297066Smav	    enqueue == taskqueue_swi_giant_enqueue ||
139297066Smav	    enqueue == taskqueue_thread_enqueue)
140297066Smav		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
141154167Sscottl	mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
14261033Sdfr
14361033Sdfr	return queue;
14461033Sdfr}
14561033Sdfr
146154167Sscottlstruct taskqueue *
147154167Sscottltaskqueue_create(const char *name, int mflags,
148154333Sscottl		 taskqueue_enqueue_fn enqueue, void *context)
149154167Sscottl{
150154333Sscottl	return _taskqueue_create(name, mflags, enqueue, context,
151154167Sscottl			MTX_DEF, "taskqueue");
152154167Sscottl}
153154167Sscottl
154248649Swillvoid
155248649Swilltaskqueue_set_callback(struct taskqueue *queue,
156248649Swill    enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
157248649Swill    void *context)
158248649Swill{
159248649Swill
160248649Swill	KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
161248649Swill	    (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
162248649Swill	    ("Callback type %d not valid, must be %d-%d", cb_type,
163248649Swill	    TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
164248649Swill	KASSERT((queue->tq_callbacks[cb_type] == NULL),
165248649Swill	    ("Re-initialization of taskqueue callback?"));
166248649Swill
167248649Swill	queue->tq_callbacks[cb_type] = callback;
168248649Swill	queue->tq_cb_contexts[cb_type] = context;
169248649Swill}
170248649Swill
171145729Ssam/*
172145729Ssam * Signal a taskqueue thread to terminate.
173145729Ssam */
174145729Ssamstatic void
175178015Ssamtaskqueue_terminate(struct thread **pp, struct taskqueue *tq)
176145729Ssam{
177145729Ssam
178221059Skib	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
179154333Sscottl		wakeup(tq);
180154333Sscottl		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
181145729Ssam	}
182145729Ssam}
183145729Ssam
18461033Sdfrvoid
18561033Sdfrtaskqueue_free(struct taskqueue *queue)
18661033Sdfr{
18785521Sjhb
188154167Sscottl	TQ_LOCK(queue);
189154333Sscottl	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
190178015Ssam	taskqueue_terminate(queue->tq_threads, queue);
191213813Smdf	KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
192221059Skib	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
19385521Sjhb	mtx_destroy(&queue->tq_mutex);
194178015Ssam	free(queue->tq_threads, M_TASKQUEUE);
19561033Sdfr	free(queue, M_TASKQUEUE);
19661033Sdfr}
19761033Sdfr
198221059Skibstatic int
199221059Skibtaskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
20061033Sdfr{
20161033Sdfr	struct task *ins;
20261033Sdfr	struct task *prev;
20361033Sdfr
20461033Sdfr	/*
20561033Sdfr	 * Count multiple enqueues.
20661033Sdfr	 */
207180588Skmacy	if (task->ta_pending) {
208225570Sadrian		if (task->ta_pending < USHRT_MAX)
209225570Sadrian			task->ta_pending++;
210297066Smav		TQ_UNLOCK(queue);
211221059Skib		return (0);
21261033Sdfr	}
21361033Sdfr
21461033Sdfr	/*
21561033Sdfr	 * Optimise the case when all tasks have the same priority.
21661033Sdfr	 */
21764199Shsu	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
21861033Sdfr	if (!prev || prev->ta_priority >= task->ta_priority) {
21961033Sdfr		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
22061033Sdfr	} else {
221188058Simp		prev = NULL;
22261033Sdfr		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
22361033Sdfr		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
22461033Sdfr			if (ins->ta_priority < task->ta_priority)
22561033Sdfr				break;
22661033Sdfr
22761033Sdfr		if (prev)
22861033Sdfr			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
22961033Sdfr		else
23061033Sdfr			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
23161033Sdfr	}
23261033Sdfr
23361033Sdfr	task->ta_pending = 1;
234297066Smav	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
235297066Smav		TQ_UNLOCK(queue);
236180588Skmacy	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
237177621Sscottl		queue->tq_enqueue(queue->tq_context);
238297066Smav	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
239297066Smav		TQ_UNLOCK(queue);
24085560Sjhb
241297066Smav	/* Return with lock released. */
242221059Skib	return (0);
243221059Skib}
244221059Skibint
245221059Skibtaskqueue_enqueue(struct taskqueue *queue, struct task *task)
246221059Skib{
247221059Skib	int res;
248221059Skib
249221059Skib	TQ_LOCK(queue);
250221059Skib	res = taskqueue_enqueue_locked(queue, task);
251297066Smav	/* The lock is released inside. */
25285560Sjhb
253221059Skib	return (res);
25461033Sdfr}
25561033Sdfr
256221059Skibstatic void
257221059Skibtaskqueue_timeout_func(void *arg)
258221059Skib{
259221059Skib	struct taskqueue *queue;
260221059Skib	struct timeout_task *timeout_task;
261221059Skib
262221059Skib	timeout_task = arg;
263221059Skib	queue = timeout_task->q;
264221059Skib	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
265221059Skib	timeout_task->f &= ~DT_CALLOUT_ARMED;
266221059Skib	queue->tq_callouts--;
267221059Skib	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
268297066Smav	/* The lock is released inside. */
269221059Skib}
270221059Skib
271221059Skibint
272221059Skibtaskqueue_enqueue_timeout(struct taskqueue *queue,
273221059Skib    struct timeout_task *timeout_task, int ticks)
274221059Skib{
275221059Skib	int res;
276221059Skib
277221059Skib	TQ_LOCK(queue);
278221059Skib	KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
279221059Skib	    ("Migrated queue"));
280221059Skib	KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
281221059Skib	timeout_task->q = queue;
282221059Skib	res = timeout_task->t.ta_pending;
283306947Shselasky	if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
284306947Shselasky		/* Do nothing */
285306947Shselasky		TQ_UNLOCK(queue);
286306947Shselasky		res = -1;
287306947Shselasky	} else if (ticks == 0) {
288221059Skib		taskqueue_enqueue_locked(queue, &timeout_task->t);
289297066Smav		/* The lock is released inside. */
290221059Skib	} else {
291221059Skib		if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
292221059Skib			res++;
293221059Skib		} else {
294221059Skib			queue->tq_callouts++;
295221059Skib			timeout_task->f |= DT_CALLOUT_ARMED;
296243341Skib			if (ticks < 0)
297243341Skib				ticks = -ticks; /* Ignore overflow. */
298221059Skib		}
299243341Skib		if (ticks > 0) {
300243341Skib			callout_reset(&timeout_task->c, ticks,
301243341Skib			    taskqueue_timeout_func, timeout_task);
302243341Skib		}
303297066Smav		TQ_UNLOCK(queue);
304221059Skib	}
305221059Skib	return (res);
306221059Skib}
307221059Skib
308262065Savgstatic void
309262065Savgtaskqueue_drain_running(struct taskqueue *queue)
310262065Savg{
311262065Savg
312262065Savg	while (!TAILQ_EMPTY(&queue->tq_active))
313262065Savg		TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
314262065Savg		    PWAIT, "-", 0);
315262065Savg}
316262065Savg
31761033Sdfrvoid
318177621Sscottltaskqueue_block(struct taskqueue *queue)
319177621Sscottl{
320177621Sscottl
321177621Sscottl	TQ_LOCK(queue);
322177621Sscottl	queue->tq_flags |= TQ_FLAGS_BLOCKED;
323177621Sscottl	TQ_UNLOCK(queue);
324177621Sscottl}
325177621Sscottl
326177621Sscottlvoid
327177621Sscottltaskqueue_unblock(struct taskqueue *queue)
328177621Sscottl{
329177621Sscottl
330177621Sscottl	TQ_LOCK(queue);
331177621Sscottl	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
332297064Smav	if (!STAILQ_EMPTY(&queue->tq_queue))
333177621Sscottl		queue->tq_enqueue(queue->tq_context);
334177621Sscottl	TQ_UNLOCK(queue);
335177621Sscottl}
336177621Sscottl
337213813Smdfstatic void
338213813Smdftaskqueue_run_locked(struct taskqueue *queue)
33961033Sdfr{
340213813Smdf	struct taskqueue_busy tb;
341210380Smdf	struct task *task;
342210377Smdf	int pending;
34361033Sdfr
344248649Swill	TQ_ASSERT_LOCKED(queue);
345213813Smdf	tb.tb_running = NULL;
346213813Smdf	TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
347213813Smdf
34861033Sdfr	while (STAILQ_FIRST(&queue->tq_queue)) {
34961033Sdfr		/*
35061033Sdfr		 * Carefully remove the first task from the queue and
35161033Sdfr		 * zero its pending count.
35261033Sdfr		 */
35361033Sdfr		task = STAILQ_FIRST(&queue->tq_queue);
35461033Sdfr		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
35561033Sdfr		pending = task->ta_pending;
35661033Sdfr		task->ta_pending = 0;
357213813Smdf		tb.tb_running = task;
358154167Sscottl		TQ_UNLOCK(queue);
35961033Sdfr
36085560Sjhb		task->ta_func(task->ta_context, pending);
36161033Sdfr
362154167Sscottl		TQ_LOCK(queue);
363213813Smdf		tb.tb_running = NULL;
364208715Szml		wakeup(task);
36561033Sdfr	}
366213813Smdf	TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
367262065Savg	if (TAILQ_EMPTY(&queue->tq_active))
368262065Savg		wakeup(&queue->tq_active);
36961033Sdfr}
37061033Sdfr
371136131Simpvoid
372213813Smdftaskqueue_run(struct taskqueue *queue)
373213813Smdf{
374213813Smdf
375213813Smdf	TQ_LOCK(queue);
376213813Smdf	taskqueue_run_locked(queue);
377213813Smdf	TQ_UNLOCK(queue);
378213813Smdf}
379213813Smdf
380213813Smdfstatic int
381213813Smdftask_is_running(struct taskqueue *queue, struct task *task)
382213813Smdf{
383213813Smdf	struct taskqueue_busy *tb;
384213813Smdf
385248649Swill	TQ_ASSERT_LOCKED(queue);
386213813Smdf	TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
387213813Smdf		if (tb->tb_running == task)
388213813Smdf			return (1);
389213813Smdf	}
390213813Smdf	return (0);
391213813Smdf}
392213813Smdf
393315268Shselasky/*
394315268Shselasky * Only use this function in single threaded contexts. It returns
395315268Shselasky * non-zero if the given task is either pending or running. Else the
396315268Shselasky * task is idle and can be queued again or freed.
397315268Shselasky */
398315268Shselaskyint
399315268Shselaskytaskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
400315268Shselasky{
401315268Shselasky	int retval;
402315268Shselasky
403315268Shselasky	TQ_LOCK(queue);
404315268Shselasky	retval = task->ta_pending > 0 || task_is_running(queue, task);
405315268Shselasky	TQ_UNLOCK(queue);
406315268Shselasky
407315268Shselasky	return (retval);
408315268Shselasky}
409315268Shselasky
410221059Skibstatic int
411221059Skibtaskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
412221059Skib    u_int *pendp)
413221059Skib{
414221059Skib
415221059Skib	if (task->ta_pending > 0)
416221059Skib		STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
417221059Skib	if (pendp != NULL)
418221059Skib		*pendp = task->ta_pending;
419221059Skib	task->ta_pending = 0;
420221059Skib	return (task_is_running(queue, task) ? EBUSY : 0);
421221059Skib}
422221059Skib
423215011Smdfint
424215011Smdftaskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
425215011Smdf{
426215011Smdf	int error;
427215011Smdf
428215011Smdf	TQ_LOCK(queue);
429221059Skib	error = taskqueue_cancel_locked(queue, task, pendp);
430215011Smdf	TQ_UNLOCK(queue);
431215011Smdf
432221059Skib	return (error);
433221059Skib}
434221059Skib
435221059Skibint
436221059Skibtaskqueue_cancel_timeout(struct taskqueue *queue,
437221059Skib    struct timeout_task *timeout_task, u_int *pendp)
438221059Skib{
439221059Skib	u_int pending, pending1;
440221059Skib	int error;
441221059Skib
442221059Skib	TQ_LOCK(queue);
443221059Skib	pending = !!callout_stop(&timeout_task->c);
444221059Skib	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
445221059Skib	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
446221059Skib		timeout_task->f &= ~DT_CALLOUT_ARMED;
447221059Skib		queue->tq_callouts--;
448221059Skib	}
449221059Skib	TQ_UNLOCK(queue);
450221059Skib
451215011Smdf	if (pendp != NULL)
452221059Skib		*pendp = pending + pending1;
453215011Smdf	return (error);
454215011Smdf}
455215011Smdf
456213813Smdfvoid
457136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task)
458136131Simp{
459211284Spjd
460211284Spjd	if (!queue->tq_spin)
461154167Sscottl		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
462145729Ssam
463211284Spjd	TQ_LOCK(queue);
464213813Smdf	while (task->ta_pending != 0 || task_is_running(queue, task))
465211284Spjd		TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
466211284Spjd	TQ_UNLOCK(queue);
467136131Simp}
468136131Simp
469221059Skibvoid
470262065Savgtaskqueue_drain_all(struct taskqueue *queue)
471262065Savg{
472262065Savg	struct task *task;
473262065Savg
474262065Savg	if (!queue->tq_spin)
475262065Savg		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
476262065Savg
477262065Savg	TQ_LOCK(queue);
478262065Savg	task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
479306935Sjulian	while (task != NULL && task->ta_pending != 0) {
480306935Sjulian		struct task *oldtask;
481306935Sjulian		TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
482306935Sjulian		/*
483306935Sjulian		 * While we were asleeep the last entry may have been freed.
484306935Sjulian		 * We need to check if it's still even in the queue.
485306935Sjulian		 * Not perfect, but it's better than referencing bad memory.
486306935Sjulian		 * first guess is the current 'end of queue' but if a new
487306935Sjulian		 * item has been added we need to take the expensive path
488306935Sjulian		 * Better fix in 11.
489306935Sjulian		 */
490306935Sjulian		oldtask = task;
491306935Sjulian		if (oldtask !=
492306935Sjulian		    (task = STAILQ_LAST(&queue->tq_queue, task, ta_link))) {
493306935Sjulian			STAILQ_FOREACH(task, &queue->tq_queue, ta_link) {
494306935Sjulian				if (task == oldtask)
495306935Sjulian					break;
496306935Sjulian			}
497306935Sjulian		}
498306935Sjulian	}
499262065Savg	taskqueue_drain_running(queue);
500262065Savg	KASSERT(STAILQ_EMPTY(&queue->tq_queue),
501262065Savg	    ("taskqueue queue is not empty after draining"));
502262065Savg	TQ_UNLOCK(queue);
503262065Savg}
504262065Savg
505262065Savgvoid
506221059Skibtaskqueue_drain_timeout(struct taskqueue *queue,
507221059Skib    struct timeout_task *timeout_task)
508221059Skib{
509221059Skib
510306947Shselasky	/*
511306947Shselasky	 * Set flag to prevent timer from re-starting during drain:
512306947Shselasky	 */
513306947Shselasky	TQ_LOCK(queue);
514306947Shselasky	KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
515306947Shselasky	    ("Drain already in progress"));
516306947Shselasky	timeout_task->f |= DT_DRAIN_IN_PROGRESS;
517306947Shselasky	TQ_UNLOCK(queue);
518306947Shselasky
519221059Skib	callout_drain(&timeout_task->c);
520221059Skib	taskqueue_drain(queue, &timeout_task->t);
521306947Shselasky
522306947Shselasky	/*
523306947Shselasky	 * Clear flag to allow timer to re-start:
524306947Shselasky	 */
525306947Shselasky	TQ_LOCK(queue);
526306947Shselasky	timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
527306947Shselasky	TQ_UNLOCK(queue);
528221059Skib}
529221059Skib
53061033Sdfrstatic void
53161033Sdfrtaskqueue_swi_enqueue(void *context)
53261033Sdfr{
53388900Sjhb	swi_sched(taskqueue_ih, 0);
53461033Sdfr}
53561033Sdfr
53661033Sdfrstatic void
53767551Sjhbtaskqueue_swi_run(void *dummy)
53861033Sdfr{
539213813Smdf	taskqueue_run(taskqueue_swi);
54061033Sdfr}
54161033Sdfr
542111528Sscottlstatic void
543111528Sscottltaskqueue_swi_giant_enqueue(void *context)
544111528Sscottl{
545111528Sscottl	swi_sched(taskqueue_giant_ih, 0);
546111528Sscottl}
547111528Sscottl
548111528Sscottlstatic void
549111528Sscottltaskqueue_swi_giant_run(void *dummy)
550111528Sscottl{
551213813Smdf	taskqueue_run(taskqueue_swi_giant);
552111528Sscottl}
553111528Sscottl
554154333Sscottlint
555154333Sscottltaskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
556154333Sscottl			const char *name, ...)
557154333Sscottl{
558154333Sscottl	va_list ap;
559178015Ssam	struct thread *td;
560154333Sscottl	struct taskqueue *tq;
561178015Ssam	int i, error;
562198411Sjhb	char ktname[MAXCOMLEN + 1];
563154333Sscottl
564154333Sscottl	if (count <= 0)
565154333Sscottl		return (EINVAL);
566178015Ssam
567154333Sscottl	tq = *tqp;
568154333Sscottl
569154333Sscottl	va_start(ap, name);
570198411Sjhb	vsnprintf(ktname, sizeof(ktname), name, ap);
571154333Sscottl	va_end(ap);
572154333Sscottl
573178015Ssam	tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
574157314Ssam	    M_NOWAIT | M_ZERO);
575178015Ssam	if (tq->tq_threads == NULL) {
576157314Ssam		printf("%s: no memory for %s threads\n", __func__, ktname);
577157314Ssam		return (ENOMEM);
578157314Ssam	}
579157314Ssam
580154333Sscottl	for (i = 0; i < count; i++) {
581154333Sscottl		if (count == 1)
582178015Ssam			error = kthread_add(taskqueue_thread_loop, tqp, NULL,
583209062Savg			    &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
584154333Sscottl		else
585178015Ssam			error = kthread_add(taskqueue_thread_loop, tqp, NULL,
586178015Ssam			    &tq->tq_threads[i], RFSTOPPED, 0,
587178015Ssam			    "%s_%d", ktname, i);
588158904Ssam		if (error) {
589157314Ssam			/* should be ok to continue, taskqueue_free will dtrt */
590178015Ssam			printf("%s: kthread_add(%s): error %d", __func__,
591178015Ssam			    ktname, error);
592178015Ssam			tq->tq_threads[i] = NULL;		/* paranoid */
593158904Ssam		} else
594178015Ssam			tq->tq_tcount++;
595154333Sscottl	}
596158904Ssam	for (i = 0; i < count; i++) {
597178015Ssam		if (tq->tq_threads[i] == NULL)
598158904Ssam			continue;
599178015Ssam		td = tq->tq_threads[i];
600170307Sjeff		thread_lock(td);
601158904Ssam		sched_prio(td, pri);
602166188Sjeff		sched_add(td, SRQ_BORING);
603170307Sjeff		thread_unlock(td);
604158904Ssam	}
605154333Sscottl
606154333Sscottl	return (0);
607154333Sscottl}
608154333Sscottl
609248649Swillstatic inline void
610248649Swilltaskqueue_run_callback(struct taskqueue *tq,
611248649Swill    enum taskqueue_callback_type cb_type)
612248649Swill{
613248649Swill	taskqueue_callback_fn tq_callback;
614248649Swill
615248649Swill	TQ_ASSERT_UNLOCKED(tq);
616248649Swill	tq_callback = tq->tq_callbacks[cb_type];
617248649Swill	if (tq_callback != NULL)
618248649Swill		tq_callback(tq->tq_cb_contexts[cb_type]);
619248649Swill}
620248649Swill
621133305Sjmgvoid
622133305Sjmgtaskqueue_thread_loop(void *arg)
623119708Sken{
624133305Sjmg	struct taskqueue **tqp, *tq;
625131246Sjhb
626133305Sjmg	tqp = arg;
627133305Sjmg	tq = *tqp;
628248649Swill	taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
629154167Sscottl	TQ_LOCK(tq);
630188548Sthompsa	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
631213813Smdf		taskqueue_run_locked(tq);
632196293Spjd		/*
633196293Spjd		 * Because taskqueue_run() can drop tq_mutex, we need to
634196293Spjd		 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
635196293Spjd		 * meantime, which means we missed a wakeup.
636196293Spjd		 */
637196293Spjd		if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
638196293Spjd			break;
639157815Sjhb		TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
640188592Sthompsa	}
641213813Smdf	taskqueue_run_locked(tq);
642145729Ssam
643248649Swill	/*
644248649Swill	 * This thread is on its way out, so just drop the lock temporarily
645248649Swill	 * in order to call the shutdown callback.  This allows the callback
646248649Swill	 * to look at the taskqueue, even just before it dies.
647248649Swill	 */
648248649Swill	TQ_UNLOCK(tq);
649248649Swill	taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
650248649Swill	TQ_LOCK(tq);
651248649Swill
652145729Ssam	/* rendezvous with thread that asked us to terminate */
653178015Ssam	tq->tq_tcount--;
654178015Ssam	wakeup_one(tq->tq_threads);
655154167Sscottl	TQ_UNLOCK(tq);
656178123Sjhb	kthread_exit();
657119708Sken}
658119708Sken
659133305Sjmgvoid
660119708Skentaskqueue_thread_enqueue(void *context)
661119708Sken{
662133305Sjmg	struct taskqueue **tqp, *tq;
663131246Sjhb
664133305Sjmg	tqp = context;
665133305Sjmg	tq = *tqp;
666133305Sjmg
667145729Ssam	wakeup_one(tq);
668119708Sken}
669119708Sken
670188058SimpTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
671111528Sscottl		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
672111528Sscottl		     INTR_MPSAFE, &taskqueue_ih));
673111528Sscottl
674188058SimpTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
675151656Sjhb		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
676111528Sscottl		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
677119708Sken
678133305SjmgTASKQUEUE_DEFINE_THREAD(thread);
679119789Ssam
680154167Sscottlstruct taskqueue *
681154167Sscottltaskqueue_create_fast(const char *name, int mflags,
682154333Sscottl		 taskqueue_enqueue_fn enqueue, void *context)
683119789Ssam{
684154333Sscottl	return _taskqueue_create(name, mflags, enqueue, context,
685154167Sscottl			MTX_SPIN, "fast_taskqueue");
686119789Ssam}
687119789Ssam
688154167Sscottl/* NB: for backwards compatibility */
689154167Sscottlint
690154167Sscottltaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
691119789Ssam{
692154167Sscottl	return taskqueue_enqueue(queue, task);
693119789Ssam}
694119789Ssam
695119789Ssamstatic void	*taskqueue_fast_ih;
696119789Ssam
697119789Ssamstatic void
698154167Sscottltaskqueue_fast_enqueue(void *context)
699119789Ssam{
700119789Ssam	swi_sched(taskqueue_fast_ih, 0);
701119789Ssam}
702119789Ssam
703119789Ssamstatic void
704119789Ssamtaskqueue_fast_run(void *dummy)
705119789Ssam{
706213813Smdf	taskqueue_run(taskqueue_fast);
707119789Ssam}
708119789Ssam
709188058SimpTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
710239779Sjhb	swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
711154167Sscottl	SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
712196295Spjd
713196295Spjdint
714196295Spjdtaskqueue_member(struct taskqueue *queue, struct thread *td)
715196295Spjd{
716196295Spjd	int i, j, ret = 0;
717196295Spjd
718196295Spjd	for (i = 0, j = 0; ; i++) {
719196295Spjd		if (queue->tq_threads[i] == NULL)
720196295Spjd			continue;
721196295Spjd		if (queue->tq_threads[i] == td) {
722196295Spjd			ret = 1;
723196295Spjd			break;
724196295Spjd		}
725196295Spjd		if (++j >= queue->tq_tcount)
726196295Spjd			break;
727196295Spjd	}
728196295Spjd	return (ret);
729196295Spjd}
730