subr_taskqueue.c revision 188548
152419Sjulian/*-
252419Sjulian * Copyright (c) 2000 Doug Rabson
352419Sjulian * All rights reserved.
452419Sjulian *
552419Sjulian * Redistribution and use in source and binary forms, with or without
652419Sjulian * modification, are permitted provided that the following conditions
752419Sjulian * are met:
852419Sjulian * 1. Redistributions of source code must retain the above copyright
952419Sjulian *    notice, this list of conditions and the following disclaimer.
1052419Sjulian * 2. Redistributions in binary form must reproduce the above copyright
1152419Sjulian *    notice, this list of conditions and the following disclaimer in the
1252419Sjulian *    documentation and/or other materials provided with the distribution.
1352419Sjulian *
1452419Sjulian * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1552419Sjulian * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1652419Sjulian * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1752419Sjulian * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1852419Sjulian * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1952419Sjulian * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2052419Sjulian * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2152419Sjulian * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2252419Sjulian * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2352419Sjulian * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2452419Sjulian * SUCH DAMAGE.
2552419Sjulian */
2652419Sjulian
2752419Sjulian#include <sys/cdefs.h>
2852419Sjulian__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 188548 2009-02-13 01:16:51Z thompsa $");
2952419Sjulian
3052419Sjulian#include <sys/param.h>
3152419Sjulian#include <sys/systm.h>
3252419Sjulian#include <sys/bus.h>
3352419Sjulian#include <sys/interrupt.h>
3452419Sjulian#include <sys/kernel.h>
3552419Sjulian#include <sys/kthread.h>
3652419Sjulian#include <sys/lock.h>
3770700Sjulian#include <sys/malloc.h>
3852419Sjulian#include <sys/mutex.h>
3952419Sjulian#include <sys/proc.h>
4052752Sjulian#include <sys/sched.h>
4152419Sjulian#include <sys/taskqueue.h>
4252419Sjulian#include <sys/unistd.h>
4352419Sjulian#include <machine/stdarg.h>
4452419Sjulian
4552419Sjulianstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
4652419Sjulianstatic void	*taskqueue_giant_ih;
4752419Sjulianstatic void	*taskqueue_ih;
4852419Sjulianstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
4952419Sjulianstatic struct mtx taskqueue_queues_mutex;
5052419Sjulian
5152419Sjulianstruct taskqueue {
5252419Sjulian	STAILQ_ENTRY(taskqueue)	tq_link;
5352419Sjulian	STAILQ_HEAD(, task)	tq_queue;
5452419Sjulian	const char		*tq_name;
5552419Sjulian	taskqueue_enqueue_fn	tq_enqueue;
5652419Sjulian	void			*tq_context;
5752419Sjulian	struct task		*tq_running;
5852419Sjulian	struct mtx		tq_mutex;
5952843Sphk	struct thread		**tq_threads;
6052419Sjulian	int			tq_tcount;
6152419Sjulian	int			tq_spin;
6252419Sjulian	int			tq_flags;
6352419Sjulian};
6452419Sjulian
6552419Sjulian#define	TQ_FLAGS_ACTIVE		(1 << 0)
6652419Sjulian#define	TQ_FLAGS_BLOCKED	(1 << 1)
6752419Sjulian#define	TQ_FLAGS_PENDING	(1 << 2)
6852419Sjulian
6952419Sjulianstatic __inline void
7052419SjulianTQ_LOCK(struct taskqueue *tq)
7152419Sjulian{
7252419Sjulian	if (tq->tq_spin)
7352419Sjulian		mtx_lock_spin(&tq->tq_mutex);
7452419Sjulian	else
7552419Sjulian		mtx_lock(&tq->tq_mutex);
7652419Sjulian}
7752419Sjulian
7852419Sjulianstatic __inline void
7952419SjulianTQ_UNLOCK(struct taskqueue *tq)
8052419Sjulian{
8152419Sjulian	if (tq->tq_spin)
8252419Sjulian		mtx_unlock_spin(&tq->tq_mutex);
8352419Sjulian	else
8452419Sjulian		mtx_unlock(&tq->tq_mutex);
8552419Sjulian}
8652419Sjulian
8752419Sjulianstatic void	init_taskqueue_list(void *data);
8852419Sjulian
8952419Sjulianstatic __inline int
9052419SjulianTQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
9152419Sjulian    int t)
9252419Sjulian{
9352419Sjulian	if (tq->tq_spin)
9452419Sjulian		return (msleep_spin(p, m, wm, t));
9552419Sjulian	return (msleep(p, m, pri, wm, t));
9652419Sjulian}
9752419Sjulian
9852419Sjulianstatic void
9952419Sjulianinit_taskqueue_list(void *data __unused)
10052419Sjulian{
10152419Sjulian
10252419Sjulian	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
10352419Sjulian	STAILQ_INIT(&taskqueue_queues);
10452419Sjulian}
10552419SjulianSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
10652419Sjulian    NULL);
10752419Sjulian
10852419Sjulianstatic struct taskqueue *
10952419Sjulian_taskqueue_create(const char *name, int mflags,
11052419Sjulian		 taskqueue_enqueue_fn enqueue, void *context,
11152419Sjulian		 int mtxflags, const char *mtxname)
11252419Sjulian{
11352419Sjulian	struct taskqueue *queue;
11452419Sjulian
11552419Sjulian	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
11652419Sjulian	if (!queue)
11752419Sjulian		return NULL;
11852419Sjulian
11952419Sjulian	STAILQ_INIT(&queue->tq_queue);
12052419Sjulian	queue->tq_name = name;
12152419Sjulian	queue->tq_enqueue = enqueue;
12252419Sjulian	queue->tq_context = context;
12352419Sjulian	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
12452419Sjulian	queue->tq_flags |= TQ_FLAGS_ACTIVE;
12552419Sjulian	mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
12652419Sjulian
12752752Sjulian	mtx_lock(&taskqueue_queues_mutex);
12870700Sjulian	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
12952752Sjulian	mtx_unlock(&taskqueue_queues_mutex);
13052752Sjulian
13152752Sjulian	return queue;
13252419Sjulian}
13352419Sjulian
13470700Sjulianstruct taskqueue *
13552419Sjuliantaskqueue_create(const char *name, int mflags,
13652419Sjulian		 taskqueue_enqueue_fn enqueue, void *context)
13752419Sjulian{
13852419Sjulian	return _taskqueue_create(name, mflags, enqueue, context,
13952419Sjulian			MTX_DEF, "taskqueue");
140129823Sjulian}
141129823Sjulian
142129823Sjulian/*
143129823Sjulian * Signal a taskqueue thread to terminate.
144129823Sjulian */
145129823Sjulianstatic void
146129823Sjuliantaskqueue_terminate(struct thread **pp, struct taskqueue *tq)
14752419Sjulian{
14852419Sjulian
14952419Sjulian	while (tq->tq_tcount > 0) {
15052419Sjulian		wakeup(tq);
15152419Sjulian		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
15252419Sjulian	}
15352419Sjulian}
15452419Sjulian
15552419Sjulianvoid
15652419Sjuliantaskqueue_free(struct taskqueue *queue)
15752419Sjulian{
15852419Sjulian
15952419Sjulian	mtx_lock(&taskqueue_queues_mutex);
16052419Sjulian	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
16152419Sjulian	mtx_unlock(&taskqueue_queues_mutex);
16252419Sjulian
16352419Sjulian	TQ_LOCK(queue);
16452419Sjulian	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
16552419Sjulian	taskqueue_run(queue);
16652419Sjulian	taskqueue_terminate(queue->tq_threads, queue);
16752419Sjulian	mtx_destroy(&queue->tq_mutex);
16852419Sjulian	free(queue->tq_threads, M_TASKQUEUE);
16952419Sjulian	free(queue, M_TASKQUEUE);
17052419Sjulian}
17152419Sjulian
17252419Sjulian/*
17352419Sjulian * Returns with the taskqueue locked.
17452419Sjulian */
17552419Sjulianstruct taskqueue *
17652419Sjuliantaskqueue_find(const char *name)
17752419Sjulian{
17852419Sjulian	struct taskqueue *queue;
17952419Sjulian
18052419Sjulian	mtx_lock(&taskqueue_queues_mutex);
18152419Sjulian	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
18252419Sjulian		if (strcmp(queue->tq_name, name) == 0) {
18352419Sjulian			TQ_LOCK(queue);
18452419Sjulian			mtx_unlock(&taskqueue_queues_mutex);
18552419Sjulian			return queue;
18652419Sjulian		}
18752419Sjulian	}
18852419Sjulian	mtx_unlock(&taskqueue_queues_mutex);
18952419Sjulian	return NULL;
19052419Sjulian}
19152419Sjulian
19252419Sjulianint
19352419Sjuliantaskqueue_enqueue(struct taskqueue *queue, struct task *task)
19452419Sjulian{
19552419Sjulian	struct task *ins;
19652419Sjulian	struct task *prev;
19752419Sjulian
19852419Sjulian	TQ_LOCK(queue);
19952419Sjulian
20052419Sjulian	/*
20152419Sjulian	 * Count multiple enqueues.
20252419Sjulian	 */
20352419Sjulian	if (task->ta_pending) {
20452419Sjulian		task->ta_pending++;
20552419Sjulian		TQ_UNLOCK(queue);
20652419Sjulian		return 0;
20752419Sjulian	}
20852419Sjulian
20952419Sjulian	/*
21070700Sjulian	 * Optimise the case when all tasks have the same priority.
21152419Sjulian	 */
21252419Sjulian	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
21352419Sjulian	if (!prev || prev->ta_priority >= task->ta_priority) {
21468876Sdwmalone		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
21552419Sjulian	} else {
21652419Sjulian		prev = NULL;
21752419Sjulian		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
21852419Sjulian		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
21952419Sjulian			if (ins->ta_priority < task->ta_priority)
22070784Sjulian				break;
22170700Sjulian
22252419Sjulian		if (prev)
22352419Sjulian			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
22452419Sjulian		else
22552419Sjulian			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
22652419Sjulian	}
22752419Sjulian
22852419Sjulian	task->ta_pending = 1;
22952419Sjulian	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
23052419Sjulian		queue->tq_enqueue(queue->tq_context);
23152419Sjulian	else
23252419Sjulian		queue->tq_flags |= TQ_FLAGS_PENDING;
23352419Sjulian
23452419Sjulian	TQ_UNLOCK(queue);
23570784Sjulian
23653648Sarchie	return 0;
23753648Sarchie}
23853648Sarchie
23953648Sarchievoid
24052419Sjuliantaskqueue_block(struct taskqueue *queue)
24152419Sjulian{
24252419Sjulian
24370784Sjulian	TQ_LOCK(queue);
24452419Sjulian	queue->tq_flags |= TQ_FLAGS_BLOCKED;
24552419Sjulian	TQ_UNLOCK(queue);
24652419Sjulian}
24752419Sjulian
24852419Sjulianvoid
24952419Sjuliantaskqueue_unblock(struct taskqueue *queue)
25052419Sjulian{
25152419Sjulian
25252419Sjulian	TQ_LOCK(queue);
25352419Sjulian	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
25452419Sjulian	if (queue->tq_flags & TQ_FLAGS_PENDING) {
25552419Sjulian		queue->tq_flags &= ~TQ_FLAGS_PENDING;
25652419Sjulian		queue->tq_enqueue(queue->tq_context);
25752419Sjulian	}
25852419Sjulian	TQ_UNLOCK(queue);
25952419Sjulian}
26052419Sjulian
26152419Sjulianvoid
26252419Sjuliantaskqueue_run(struct taskqueue *queue)
26352419Sjulian{
26470784Sjulian	struct task *task;
26552419Sjulian	int owned, pending;
26652419Sjulian
26752419Sjulian	owned = mtx_owned(&queue->tq_mutex);
26852419Sjulian	if (!owned)
26952419Sjulian		TQ_LOCK(queue);
27052419Sjulian	while (STAILQ_FIRST(&queue->tq_queue)) {
27152419Sjulian		/*
27252419Sjulian		 * Carefully remove the first task from the queue and
27352419Sjulian		 * zero its pending count.
27452816Sarchie		 */
27552419Sjulian		task = STAILQ_FIRST(&queue->tq_queue);
27652816Sarchie		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
27752816Sarchie		pending = task->ta_pending;
27852816Sarchie		task->ta_pending = 0;
27952816Sarchie		queue->tq_running = task;
28052419Sjulian		TQ_UNLOCK(queue);
28152419Sjulian
28252419Sjulian		task->ta_func(task->ta_context, pending);
28352419Sjulian
28452419Sjulian		TQ_LOCK(queue);
28552419Sjulian		queue->tq_running = NULL;
28652419Sjulian		wakeup(task);
28752419Sjulian	}
28852419Sjulian
28952419Sjulian	/*
29052419Sjulian	 * For compatibility, unlock on return if the queue was not locked
29152419Sjulian	 * on entry, although this opens a race window.
29252419Sjulian	 */
29352419Sjulian	if (!owned)
29452419Sjulian		TQ_UNLOCK(queue);
29552419Sjulian}
29652419Sjulian
29752419Sjulianvoid
29852419Sjuliantaskqueue_drain(struct taskqueue *queue, struct task *task)
29952419Sjulian{
30052419Sjulian	if (queue->tq_spin) {		/* XXX */
30170784Sjulian		mtx_lock_spin(&queue->tq_mutex);
30252419Sjulian		while (task->ta_pending != 0 || task == queue->tq_running)
30352419Sjulian			msleep_spin(task, &queue->tq_mutex, "-", 0);
30452419Sjulian		mtx_unlock_spin(&queue->tq_mutex);
30552419Sjulian	} else {
30652419Sjulian		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
30752419Sjulian
30852419Sjulian		mtx_lock(&queue->tq_mutex);
30952419Sjulian		while (task->ta_pending != 0 || task == queue->tq_running)
31052419Sjulian			msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
31152419Sjulian		mtx_unlock(&queue->tq_mutex);
31252419Sjulian	}
31352419Sjulian}
31452419Sjulian
31552419Sjulianstatic void
31652419Sjuliantaskqueue_swi_enqueue(void *context)
31752419Sjulian{
31852419Sjulian	swi_sched(taskqueue_ih, 0);
31952419Sjulian}
32052419Sjulian
32152419Sjulianstatic void
32252419Sjuliantaskqueue_swi_run(void *dummy)
32352419Sjulian{
32452419Sjulian	taskqueue_run(taskqueue_swi);
32552419Sjulian}
32652419Sjulian
32752419Sjulianstatic void
32870700Sjuliantaskqueue_swi_giant_enqueue(void *context)
32952419Sjulian{
33070784Sjulian	swi_sched(taskqueue_giant_ih, 0);
33152419Sjulian}
33252419Sjulian
33352419Sjulianstatic void
33452419Sjuliantaskqueue_swi_giant_run(void *dummy)
33552419Sjulian{
33670700Sjulian	taskqueue_run(taskqueue_swi_giant);
33752419Sjulian}
33852419Sjulian
33952419Sjulianint
34052419Sjuliantaskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
34152419Sjulian			const char *name, ...)
34252419Sjulian{
34352419Sjulian	va_list ap;
34452419Sjulian	struct thread *td;
34552419Sjulian	struct taskqueue *tq;
34652419Sjulian	int i, error;
34770784Sjulian	char ktname[MAXCOMLEN];
34852419Sjulian
34970700Sjulian	if (count <= 0)
35052419Sjulian		return (EINVAL);
35170784Sjulian
35252419Sjulian	tq = *tqp;
35352419Sjulian
35452419Sjulian	va_start(ap, name);
35552419Sjulian	vsnprintf(ktname, MAXCOMLEN, name, ap);
35652419Sjulian	va_end(ap);
35752419Sjulian
35852419Sjulian	tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
35952419Sjulian	    M_NOWAIT | M_ZERO);
36052419Sjulian	if (tq->tq_threads == NULL) {
36152419Sjulian		printf("%s: no memory for %s threads\n", __func__, ktname);
36252419Sjulian		return (ENOMEM);
36352419Sjulian	}
364111119Simp
36552419Sjulian	for (i = 0; i < count; i++) {
36652419Sjulian		if (count == 1)
36752419Sjulian			error = kthread_add(taskqueue_thread_loop, tqp, NULL,
36852419Sjulian			    &tq->tq_threads[i], RFSTOPPED, 0, ktname);
36952419Sjulian		else
37052419Sjulian			error = kthread_add(taskqueue_thread_loop, tqp, NULL,
37152419Sjulian			    &tq->tq_threads[i], RFSTOPPED, 0,
37252419Sjulian			    "%s_%d", ktname, i);
37352419Sjulian		if (error) {
37452419Sjulian			/* should be ok to continue, taskqueue_free will dtrt */
37552419Sjulian			printf("%s: kthread_add(%s): error %d", __func__,
37652419Sjulian			    ktname, error);
37752419Sjulian			tq->tq_threads[i] = NULL;		/* paranoid */
37852419Sjulian		} else
37952419Sjulian			tq->tq_tcount++;
38052419Sjulian	}
38152419Sjulian	for (i = 0; i < count; i++) {
38252419Sjulian		if (tq->tq_threads[i] == NULL)
38352419Sjulian			continue;
38452419Sjulian		td = tq->tq_threads[i];
38552419Sjulian		thread_lock(td);
38652419Sjulian		sched_prio(td, pri);
38752419Sjulian		sched_add(td, SRQ_BORING);
38852419Sjulian		thread_unlock(td);
38952419Sjulian	}
39052419Sjulian
39152419Sjulian	return (0);
39252419Sjulian}
39352419Sjulian
39452419Sjulianvoid
39552419Sjuliantaskqueue_thread_loop(void *arg)
39652419Sjulian{
39752419Sjulian	struct taskqueue **tqp, *tq;
39887599Sobrien
39952419Sjulian	tqp = arg;
40052419Sjulian	tq = *tqp;
40152419Sjulian	TQ_LOCK(tq);
40270700Sjulian	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
40352419Sjulian		taskqueue_run(tq);
40452419Sjulian		TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
40552419Sjulian	};
40670700Sjulian
40770700Sjulian	/* rendezvous with thread that asked us to terminate */
40852419Sjulian	tq->tq_tcount--;
40952419Sjulian	wakeup_one(tq->tq_threads);
41052419Sjulian	TQ_UNLOCK(tq);
41152419Sjulian	kthread_exit();
41252419Sjulian}
41352419Sjulian
41452419Sjulianvoid
41570700Sjuliantaskqueue_thread_enqueue(void *context)
41652419Sjulian{
41770784Sjulian	struct taskqueue **tqp, *tq;
41852419Sjulian
41952419Sjulian	tqp = context;
42052419Sjulian	tq = *tqp;
42152419Sjulian
42252419Sjulian	mtx_assert(&tq->tq_mutex, MA_OWNED);
42370700Sjulian	wakeup_one(tq);
42452419Sjulian}
42570700Sjulian
42652539SjulianTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
42752419Sjulian		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
42852419Sjulian		     INTR_MPSAFE, &taskqueue_ih));
42952419Sjulian
43052419SjulianTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
43152419Sjulian		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
43252419Sjulian		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
43352419Sjulian
43452419SjulianTASKQUEUE_DEFINE_THREAD(thread);
43552419Sjulian
43652419Sjulianstruct taskqueue *
43752419Sjuliantaskqueue_create_fast(const char *name, int mflags,
43852419Sjulian		 taskqueue_enqueue_fn enqueue, void *context)
43952419Sjulian{
44052419Sjulian	return _taskqueue_create(name, mflags, enqueue, context,
44152419Sjulian			MTX_SPIN, "fast_taskqueue");
44252419Sjulian}
44352419Sjulian
44452419Sjulian/* NB: for backwards compatibility */
44552419Sjulianint
44652419Sjuliantaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
44752419Sjulian{
44852419Sjulian	return taskqueue_enqueue(queue, task);
44952419Sjulian}
45052419Sjulian
45152419Sjulianstatic void	*taskqueue_fast_ih;
45252419Sjulian
45352419Sjulianstatic void
45452419Sjuliantaskqueue_fast_enqueue(void *context)
45552419Sjulian{
45652419Sjulian	swi_sched(taskqueue_fast_ih, 0);
45752419Sjulian}
45852419Sjulian
45952419Sjulianstatic void
46052419Sjuliantaskqueue_fast_run(void *dummy)
46152419Sjulian{
46252419Sjulian	taskqueue_run(taskqueue_fast);
46370700Sjulian}
46452419Sjulian
46552419SjulianTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
46652419Sjulian	swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL,
46752419Sjulian	SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
46852419Sjulian