subr_taskqueue.c revision 136131
161033Sdfr/*-
261033Sdfr * Copyright (c) 2000 Doug Rabson
361033Sdfr * All rights reserved.
461033Sdfr *
561033Sdfr * Redistribution and use in source and binary forms, with or without
661033Sdfr * modification, are permitted provided that the following conditions
761033Sdfr * are met:
861033Sdfr * 1. Redistributions of source code must retain the above copyright
961033Sdfr *    notice, this list of conditions and the following disclaimer.
1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright
1161033Sdfr *    notice, this list of conditions and the following disclaimer in the
1261033Sdfr *    documentation and/or other materials provided with the distribution.
1361033Sdfr *
1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1761033Sdfr * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2461033Sdfr * SUCH DAMAGE.
2561033Sdfr */
2661033Sdfr
27116182Sobrien#include <sys/cdefs.h>
28116182Sobrien__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 136131 2004-10-05 04:16:01Z imp $");
29116182Sobrien
3061033Sdfr#include <sys/param.h>
3185521Sjhb#include <sys/systm.h>
3265822Sjhb#include <sys/bus.h>
3385560Sjhb#include <sys/interrupt.h>
3461033Sdfr#include <sys/kernel.h>
35123614Sjhb#include <sys/kthread.h>
3685521Sjhb#include <sys/lock.h>
3761033Sdfr#include <sys/malloc.h>
3885521Sjhb#include <sys/mutex.h>
3985521Sjhb#include <sys/taskqueue.h>
40119708Sken#include <sys/unistd.h>
4161033Sdfr
4269774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43123614Sjhbstatic void	*taskqueue_giant_ih;
44123614Sjhbstatic void	*taskqueue_ih;
4561033Sdfrstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
4685521Sjhbstatic struct mtx taskqueue_queues_mutex;
4767551Sjhb
4861033Sdfrstruct taskqueue {
4961033Sdfr	STAILQ_ENTRY(taskqueue)	tq_link;
5061033Sdfr	STAILQ_HEAD(, task)	tq_queue;
5161033Sdfr	const char		*tq_name;
5261033Sdfr	taskqueue_enqueue_fn	tq_enqueue;
5361033Sdfr	void			*tq_context;
5485521Sjhb	struct mtx		tq_mutex;
5561033Sdfr};
5661033Sdfr
5785521Sjhbstatic void	init_taskqueue_list(void *data);
5885521Sjhb
5985521Sjhbstatic void
6085521Sjhbinit_taskqueue_list(void *data __unused)
6185521Sjhb{
6285521Sjhb
6393818Sjhb	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
6485521Sjhb	STAILQ_INIT(&taskqueue_queues);
6585521Sjhb}
6685521SjhbSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
6785521Sjhb    NULL);
6885521Sjhb
6961033Sdfrstruct taskqueue *
7061033Sdfrtaskqueue_create(const char *name, int mflags,
7161033Sdfr		 taskqueue_enqueue_fn enqueue, void *context)
7261033Sdfr{
7361033Sdfr	struct taskqueue *queue;
7461033Sdfr
7585521Sjhb	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
7661033Sdfr	if (!queue)
7761033Sdfr		return 0;
7885521Sjhb
7961033Sdfr	STAILQ_INIT(&queue->tq_queue);
8061033Sdfr	queue->tq_name = name;
8161033Sdfr	queue->tq_enqueue = enqueue;
8261033Sdfr	queue->tq_context = context;
8393818Sjhb	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
8461033Sdfr
8585521Sjhb	mtx_lock(&taskqueue_queues_mutex);
8661033Sdfr	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
8785521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
8861033Sdfr
8961033Sdfr	return queue;
9061033Sdfr}
9161033Sdfr
9261033Sdfrvoid
9361033Sdfrtaskqueue_free(struct taskqueue *queue)
9461033Sdfr{
9585521Sjhb
9685521Sjhb	mtx_lock(&taskqueue_queues_mutex);
9761033Sdfr	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
9885521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
9961033Sdfr
100131246Sjhb	mtx_lock(&queue->tq_mutex);
101131246Sjhb	taskqueue_run(queue);
10285521Sjhb	mtx_destroy(&queue->tq_mutex);
10361033Sdfr	free(queue, M_TASKQUEUE);
10461033Sdfr}
10561033Sdfr
10685521Sjhb/*
10785521Sjhb * Returns with the taskqueue locked.
10885521Sjhb */
10961033Sdfrstruct taskqueue *
11061033Sdfrtaskqueue_find(const char *name)
11161033Sdfr{
11261033Sdfr	struct taskqueue *queue;
11361033Sdfr
11485521Sjhb	mtx_lock(&taskqueue_queues_mutex);
11585521Sjhb	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
116123614Sjhb		if (strcmp(queue->tq_name, name) == 0) {
117131246Sjhb			mtx_lock(&queue->tq_mutex);
11885521Sjhb			mtx_unlock(&taskqueue_queues_mutex);
11961033Sdfr			return queue;
12061033Sdfr		}
12185521Sjhb	}
12285521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
123123614Sjhb	return NULL;
12461033Sdfr}
12561033Sdfr
12661033Sdfrint
12761033Sdfrtaskqueue_enqueue(struct taskqueue *queue, struct task *task)
12861033Sdfr{
12961033Sdfr	struct task *ins;
13061033Sdfr	struct task *prev;
13161033Sdfr
13285560Sjhb	mtx_lock(&queue->tq_mutex);
13385560Sjhb
13461033Sdfr	/*
13561033Sdfr	 * Count multiple enqueues.
13661033Sdfr	 */
13761033Sdfr	if (task->ta_pending) {
13861033Sdfr		task->ta_pending++;
13985521Sjhb		mtx_unlock(&queue->tq_mutex);
14061033Sdfr		return 0;
14161033Sdfr	}
14261033Sdfr
14361033Sdfr	/*
14461033Sdfr	 * Optimise the case when all tasks have the same priority.
14561033Sdfr	 */
14664199Shsu	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
14761033Sdfr	if (!prev || prev->ta_priority >= task->ta_priority) {
14861033Sdfr		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
14961033Sdfr	} else {
15061033Sdfr		prev = 0;
15161033Sdfr		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
15261033Sdfr		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
15361033Sdfr			if (ins->ta_priority < task->ta_priority)
15461033Sdfr				break;
15561033Sdfr
15661033Sdfr		if (prev)
15761033Sdfr			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
15861033Sdfr		else
15961033Sdfr			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
16061033Sdfr	}
16161033Sdfr
16261033Sdfr	task->ta_pending = 1;
16361033Sdfr	if (queue->tq_enqueue)
16461033Sdfr		queue->tq_enqueue(queue->tq_context);
16585560Sjhb
16685521Sjhb	mtx_unlock(&queue->tq_mutex);
16785560Sjhb
16861033Sdfr	return 0;
16961033Sdfr}
17061033Sdfr
17161033Sdfrvoid
17261033Sdfrtaskqueue_run(struct taskqueue *queue)
17361033Sdfr{
17461033Sdfr	struct task *task;
175131246Sjhb	int owned, pending;
17661033Sdfr
177131246Sjhb	owned = mtx_owned(&queue->tq_mutex);
178131246Sjhb	if (!owned)
179131246Sjhb		mtx_lock(&queue->tq_mutex);
18061033Sdfr	while (STAILQ_FIRST(&queue->tq_queue)) {
18161033Sdfr		/*
18261033Sdfr		 * Carefully remove the first task from the queue and
18361033Sdfr		 * zero its pending count.
18461033Sdfr		 */
18561033Sdfr		task = STAILQ_FIRST(&queue->tq_queue);
18661033Sdfr		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
18761033Sdfr		pending = task->ta_pending;
18861033Sdfr		task->ta_pending = 0;
189136131Simp		task->ta_flags |= TAF_PENDING;
19085560Sjhb		mtx_unlock(&queue->tq_mutex);
19161033Sdfr
19285560Sjhb		task->ta_func(task->ta_context, pending);
19361033Sdfr
19485521Sjhb		mtx_lock(&queue->tq_mutex);
195136131Simp		task->ta_flags &= ~TAF_PENDING;
196136131Simp		wakeup(task);
19761033Sdfr	}
198131246Sjhb
199131246Sjhb	/*
200131246Sjhb	 * For compatibility, unlock on return if the queue was not locked
201131246Sjhb	 * on entry, although this opens a race window.
202131246Sjhb	 */
203131246Sjhb	if (!owned)
204131246Sjhb		mtx_unlock(&queue->tq_mutex);
20561033Sdfr}
20661033Sdfr
207136131Simpvoid
208136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task)
209136131Simp{
210136131Simp	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain");
211136131Simp	mtx_lock(&queue->tq_mutex);
212136131Simp	while (task->ta_pending != 0 || (task->ta_flags & TAF_PENDING)) {
213136131Simp		msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
214136131Simp	}
215136131Simp	mtx_unlock(&queue->tq_mutex);
216136131Simp}
217136131Simp
21861033Sdfrstatic void
21961033Sdfrtaskqueue_swi_enqueue(void *context)
22061033Sdfr{
22188900Sjhb	swi_sched(taskqueue_ih, 0);
22261033Sdfr}
22361033Sdfr
22461033Sdfrstatic void
22567551Sjhbtaskqueue_swi_run(void *dummy)
22661033Sdfr{
22761033Sdfr	taskqueue_run(taskqueue_swi);
22861033Sdfr}
22961033Sdfr
230111528Sscottlstatic void
231111528Sscottltaskqueue_swi_giant_enqueue(void *context)
232111528Sscottl{
233111528Sscottl	swi_sched(taskqueue_giant_ih, 0);
234111528Sscottl}
235111528Sscottl
236111528Sscottlstatic void
237111528Sscottltaskqueue_swi_giant_run(void *dummy)
238111528Sscottl{
239111528Sscottl	taskqueue_run(taskqueue_swi_giant);
240111528Sscottl}
241111528Sscottl
242133305Sjmgvoid
243133305Sjmgtaskqueue_thread_loop(void *arg)
244119708Sken{
245133305Sjmg	struct taskqueue **tqp, *tq;
246131246Sjhb
247133305Sjmg	tqp = arg;
248133305Sjmg	tq = *tqp;
249133305Sjmg	mtx_lock(&tq->tq_mutex);
250119708Sken	for (;;) {
251133305Sjmg		taskqueue_run(tq);
252133305Sjmg		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
253119708Sken	}
254119708Sken}
255119708Sken
256133305Sjmgvoid
257119708Skentaskqueue_thread_enqueue(void *context)
258119708Sken{
259133305Sjmg	struct taskqueue **tqp, *tq;
260131246Sjhb
261133305Sjmg	tqp = context;
262133305Sjmg	tq = *tqp;
263133305Sjmg
264133305Sjmg	mtx_assert(&tq->tq_mutex, MA_OWNED);
265133305Sjmg	wakeup(tq);
266119708Sken}
267119708Sken
26861033SdfrTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
269111528Sscottl		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
270111528Sscottl		     INTR_MPSAFE, &taskqueue_ih));
271111528Sscottl
272111528SscottlTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
273111528Sscottl		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
274111528Sscottl		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
275119708Sken
276133305SjmgTASKQUEUE_DEFINE_THREAD(thread);
277119789Ssam
278119789Ssamint
279119789Ssamtaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
280119789Ssam{
281119789Ssam	struct task *ins;
282119789Ssam	struct task *prev;
283119789Ssam
284119789Ssam	mtx_lock_spin(&queue->tq_mutex);
285119789Ssam
286119789Ssam	/*
287119789Ssam	 * Count multiple enqueues.
288119789Ssam	 */
289119789Ssam	if (task->ta_pending) {
290119789Ssam		task->ta_pending++;
291119789Ssam		mtx_unlock_spin(&queue->tq_mutex);
292119789Ssam		return 0;
293119789Ssam	}
294119789Ssam
295119789Ssam	/*
296119789Ssam	 * Optimise the case when all tasks have the same priority.
297119789Ssam	 */
298119789Ssam	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
299119789Ssam	if (!prev || prev->ta_priority >= task->ta_priority) {
300119789Ssam		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
301119789Ssam	} else {
302119789Ssam		prev = 0;
303119789Ssam		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
304119789Ssam		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
305119789Ssam			if (ins->ta_priority < task->ta_priority)
306119789Ssam				break;
307119789Ssam
308119789Ssam		if (prev)
309119789Ssam			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
310119789Ssam		else
311119789Ssam			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
312119789Ssam	}
313119789Ssam
314119789Ssam	task->ta_pending = 1;
315119789Ssam	if (queue->tq_enqueue)
316119789Ssam		queue->tq_enqueue(queue->tq_context);
317119789Ssam
318119789Ssam	mtx_unlock_spin(&queue->tq_mutex);
319119789Ssam
320119789Ssam	return 0;
321119789Ssam}
322119789Ssam
323119789Ssamstatic void
324119789Ssamtaskqueue_run_fast(struct taskqueue *queue)
325119789Ssam{
326119789Ssam	struct task *task;
327119789Ssam	int pending;
328119789Ssam
329119789Ssam	mtx_lock_spin(&queue->tq_mutex);
330119789Ssam	while (STAILQ_FIRST(&queue->tq_queue)) {
331119789Ssam		/*
332119789Ssam		 * Carefully remove the first task from the queue and
333119789Ssam		 * zero its pending count.
334119789Ssam		 */
335119789Ssam		task = STAILQ_FIRST(&queue->tq_queue);
336119789Ssam		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
337119789Ssam		pending = task->ta_pending;
338119789Ssam		task->ta_pending = 0;
339119789Ssam		mtx_unlock_spin(&queue->tq_mutex);
340119789Ssam
341119789Ssam		task->ta_func(task->ta_context, pending);
342119789Ssam
343119789Ssam		mtx_lock_spin(&queue->tq_mutex);
344119789Ssam	}
345119789Ssam	mtx_unlock_spin(&queue->tq_mutex);
346119789Ssam}
347119789Ssam
348119789Ssamstruct taskqueue *taskqueue_fast;
349119789Ssamstatic void	*taskqueue_fast_ih;
350119789Ssam
351119789Ssamstatic void
352119789Ssamtaskqueue_fast_schedule(void *context)
353119789Ssam{
354119789Ssam	swi_sched(taskqueue_fast_ih, 0);
355119789Ssam}
356119789Ssam
357119789Ssamstatic void
358119789Ssamtaskqueue_fast_run(void *dummy)
359119789Ssam{
360119789Ssam	taskqueue_run_fast(taskqueue_fast);
361119789Ssam}
362119789Ssam
363119789Ssamstatic void
364119789Ssamtaskqueue_define_fast(void *arg)
365119789Ssam{
366131246Sjhb
367131246Sjhb	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
368131246Sjhb	    M_NOWAIT | M_ZERO);
369119789Ssam	if (!taskqueue_fast) {
370119789Ssam		printf("%s: Unable to allocate fast task queue!\n", __func__);
371119789Ssam		return;
372119789Ssam	}
373119789Ssam
374119789Ssam	STAILQ_INIT(&taskqueue_fast->tq_queue);
375119789Ssam	taskqueue_fast->tq_name = "fast";
376119789Ssam	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
377119812Ssam	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
378119789Ssam
379119789Ssam	mtx_lock(&taskqueue_queues_mutex);
380119789Ssam	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
381119789Ssam	mtx_unlock(&taskqueue_queues_mutex);
382119789Ssam
383119789Ssam	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
384119789Ssam		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
385119789Ssam}
386119789SsamSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
387131246Sjhb    taskqueue_define_fast, NULL);
388