subr_taskqueue.c revision 133305
13229Spst/*-
23229Spst * Copyright (c) 2000 Doug Rabson
33229Spst * All rights reserved.
43229Spst *
53229Spst * Redistribution and use in source and binary forms, with or without
63229Spst * modification, are permitted provided that the following conditions
73229Spst * are met:
83229Spst * 1. Redistributions of source code must retain the above copyright
93229Spst *    notice, this list of conditions and the following disclaimer.
103229Spst * 2. Redistributions in binary form must reproduce the above copyright
113229Spst *    notice, this list of conditions and the following disclaimer in the
123229Spst *    documentation and/or other materials provided with the distribution.
133229Spst *
143229Spst * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
153229Spst * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
163229Spst * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
173229Spst * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
183229Spst * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
193229Spst * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
203229Spst * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2118471Swosch * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2250476Speter * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2318471Swosch * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
243229Spst * SUCH DAMAGE.
253229Spst */
263229Spst
273229Spst#include <sys/cdefs.h>
283229Spst__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 133305 2004-08-08 02:37:22Z jmg $");
293229Spst
303229Spst#include <sys/param.h>
313229Spst#include <sys/systm.h>
323229Spst#include <sys/bus.h>
333229Spst#include <sys/interrupt.h>
343229Spst#include <sys/kernel.h>
353229Spst#include <sys/kthread.h>
363229Spst#include <sys/lock.h>
373229Spst#include <sys/malloc.h>
383229Spst#include <sys/mutex.h>
393229Spst#include <sys/taskqueue.h>
403229Spst#include <sys/unistd.h>
413229Spst
423229Spststatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
433229Spststatic void	*taskqueue_giant_ih;
443229Spststatic void	*taskqueue_ih;
453229Spststatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
463229Spststatic struct mtx taskqueue_queues_mutex;
473229Spst
483229Spststruct taskqueue {
493229Spst	STAILQ_ENTRY(taskqueue)	tq_link;
503229Spst	STAILQ_HEAD(, task)	tq_queue;
513229Spst	const char		*tq_name;
523229Spst	taskqueue_enqueue_fn	tq_enqueue;
533229Spst	void			*tq_context;
543229Spst	struct mtx		tq_mutex;
553229Spst};
563229Spst
573229Spststatic void	init_taskqueue_list(void *data);
583229Spst
593229Spststatic void
603229Spstinit_taskqueue_list(void *data __unused)
613229Spst{
623229Spst
633229Spst	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
643229Spst	STAILQ_INIT(&taskqueue_queues);
653229Spst}
663229SpstSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
673229Spst    NULL);
683229Spst
693229Spststruct taskqueue *
703229Spsttaskqueue_create(const char *name, int mflags,
713229Spst		 taskqueue_enqueue_fn enqueue, void *context)
723229Spst{
733229Spst	struct taskqueue *queue;
743229Spst
753229Spst	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
763229Spst	if (!queue)
773229Spst		return 0;
783229Spst
793229Spst	STAILQ_INIT(&queue->tq_queue);
803229Spst	queue->tq_name = name;
813229Spst	queue->tq_enqueue = enqueue;
823229Spst	queue->tq_context = context;
833229Spst	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
843229Spst
853229Spst	mtx_lock(&taskqueue_queues_mutex);
863229Spst	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
873229Spst	mtx_unlock(&taskqueue_queues_mutex);
883229Spst
893229Spst	return queue;
903229Spst}
913229Spst
923229Spstvoid
933229Spsttaskqueue_free(struct taskqueue *queue)
943229Spst{
953229Spst
963229Spst	mtx_lock(&taskqueue_queues_mutex);
973229Spst	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
983229Spst	mtx_unlock(&taskqueue_queues_mutex);
993229Spst
1003229Spst	mtx_lock(&queue->tq_mutex);
1013229Spst	taskqueue_run(queue);
1023229Spst	mtx_destroy(&queue->tq_mutex);
1033229Spst	free(queue, M_TASKQUEUE);
1043229Spst}
1053229Spst
1063229Spst/*
1073229Spst * Returns with the taskqueue locked.
1083229Spst */
1093229Spststruct taskqueue *
1103229Spsttaskqueue_find(const char *name)
1113229Spst{
1123229Spst	struct taskqueue *queue;
1133229Spst
1143229Spst	mtx_lock(&taskqueue_queues_mutex);
1153229Spst	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
1163229Spst		if (strcmp(queue->tq_name, name) == 0) {
1173229Spst			mtx_lock(&queue->tq_mutex);
1183229Spst			mtx_unlock(&taskqueue_queues_mutex);
1193229Spst			return queue;
1203229Spst		}
1213229Spst	}
1223229Spst	mtx_unlock(&taskqueue_queues_mutex);
1233229Spst	return NULL;
1243229Spst}
1253229Spst
1263229Spstint
1273229Spsttaskqueue_enqueue(struct taskqueue *queue, struct task *task)
1283229Spst{
1293229Spst	struct task *ins;
1303229Spst	struct task *prev;
1313229Spst
1323229Spst	mtx_lock(&queue->tq_mutex);
1333229Spst
1343229Spst	/*
1353229Spst	 * Count multiple enqueues.
1363229Spst	 */
1373229Spst	if (task->ta_pending) {
1383229Spst		task->ta_pending++;
1393229Spst		mtx_unlock(&queue->tq_mutex);
1403229Spst		return 0;
1413229Spst	}
1423229Spst
1433229Spst	/*
1443229Spst	 * Optimise the case when all tasks have the same priority.
1453229Spst	 */
1463229Spst	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
1473229Spst	if (!prev || prev->ta_priority >= task->ta_priority) {
1483229Spst		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
1493229Spst	} else {
1503229Spst		prev = 0;
1513229Spst		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
1523229Spst		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
1533229Spst			if (ins->ta_priority < task->ta_priority)
1543229Spst				break;
1553229Spst
1563229Spst		if (prev)
1573229Spst			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
1583229Spst		else
1593229Spst			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
1603229Spst	}
1613229Spst
1623229Spst	task->ta_pending = 1;
1633229Spst	if (queue->tq_enqueue)
1643229Spst		queue->tq_enqueue(queue->tq_context);
1653229Spst
1663229Spst	mtx_unlock(&queue->tq_mutex);
1673229Spst
1683229Spst	return 0;
1693229Spst}
1703229Spst
1713229Spstvoid
1723229Spsttaskqueue_run(struct taskqueue *queue)
1733229Spst{
1743229Spst	struct task *task;
1753229Spst	int owned, pending;
1763229Spst
1773229Spst	owned = mtx_owned(&queue->tq_mutex);
1783229Spst	if (!owned)
1793229Spst		mtx_lock(&queue->tq_mutex);
1803229Spst	while (STAILQ_FIRST(&queue->tq_queue)) {
1813229Spst		/*
1823229Spst		 * Carefully remove the first task from the queue and
1833229Spst		 * zero its pending count.
1843229Spst		 */
1853229Spst		task = STAILQ_FIRST(&queue->tq_queue);
1863229Spst		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
1873229Spst		pending = task->ta_pending;
1883229Spst		task->ta_pending = 0;
1893229Spst		mtx_unlock(&queue->tq_mutex);
1903229Spst
1913229Spst		task->ta_func(task->ta_context, pending);
1923229Spst
1933229Spst		mtx_lock(&queue->tq_mutex);
1943229Spst	}
1953229Spst
1963229Spst	/*
1973229Spst	 * For compatibility, unlock on return if the queue was not locked
1983229Spst	 * on entry, although this opens a race window.
1993229Spst	 */
2003229Spst	if (!owned)
2013229Spst		mtx_unlock(&queue->tq_mutex);
2023229Spst}
2033229Spst
2043229Spststatic void
2053229Spsttaskqueue_swi_enqueue(void *context)
2063229Spst{
2073229Spst	swi_sched(taskqueue_ih, 0);
2083229Spst}
2093229Spst
2103229Spststatic void
2113229Spsttaskqueue_swi_run(void *dummy)
2123229Spst{
2133229Spst	taskqueue_run(taskqueue_swi);
2143229Spst}
2153229Spst
2163229Spststatic void
2173229Spsttaskqueue_swi_giant_enqueue(void *context)
2183229Spst{
2193229Spst	swi_sched(taskqueue_giant_ih, 0);
2203229Spst}
2213229Spst
2223229Spststatic void
2233229Spsttaskqueue_swi_giant_run(void *dummy)
2243229Spst{
2253229Spst	taskqueue_run(taskqueue_swi_giant);
2263229Spst}
2273229Spst
2283229Spstvoid
2293229Spsttaskqueue_thread_loop(void *arg)
2303229Spst{
23197417Salfred	struct taskqueue **tqp, *tq;
2323229Spst
2333229Spst	tqp = arg;
23497417Salfred	tq = *tqp;
2353229Spst	mtx_lock(&tq->tq_mutex);
23697417Salfred	for (;;) {
2373229Spst		taskqueue_run(tq);
23897417Salfred		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
2393229Spst	}
24097417Salfred}
2413229Spst
24297417Salfredvoid
2433229Spsttaskqueue_thread_enqueue(void *context)
24497417Salfred{
2453229Spst	struct taskqueue **tqp, *tq;
24697417Salfred
2473229Spst	tqp = context;
24897417Salfred	tq = *tqp;
2493229Spst
25097417Salfred	mtx_assert(&tq->tq_mutex, MA_OWNED);
2513229Spst	wakeup(tq);
25297417Salfred}
2533229Spst
25497417SalfredTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
2553229Spst		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
25697417Salfred		     INTR_MPSAFE, &taskqueue_ih));
2573229Spst
25897417SalfredTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
2593229Spst		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
26097417Salfred		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
2613229Spst
26297417SalfredTASKQUEUE_DEFINE_THREAD(thread);
2633229Spst
26497417Salfredint
2653229Spsttaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
26697417Salfred{
2673229Spst	struct task *ins;
26897417Salfred	struct task *prev;
2693229Spst
27097417Salfred	mtx_lock_spin(&queue->tq_mutex);
2713229Spst
27297417Salfred	/*
2733229Spst	 * Count multiple enqueues.
27497417Salfred	 */
2753229Spst	if (task->ta_pending) {
27697417Salfred		task->ta_pending++;
2773229Spst		mtx_unlock_spin(&queue->tq_mutex);
27897417Salfred		return 0;
2793229Spst	}
2803229Spst
2813229Spst	/*
2823229Spst	 * Optimise the case when all tasks have the same priority.
2833229Spst	 */
2843229Spst	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
2853229Spst	if (!prev || prev->ta_priority >= task->ta_priority) {
2863229Spst		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
2873229Spst	} else {
2883229Spst		prev = 0;
2893229Spst		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
2903229Spst		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
2913229Spst			if (ins->ta_priority < task->ta_priority)
2923229Spst				break;
2933229Spst
2943229Spst		if (prev)
2953229Spst			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
2963229Spst		else
2973229Spst			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
2983229Spst	}
2993229Spst
3003229Spst	task->ta_pending = 1;
3013229Spst	if (queue->tq_enqueue)
3023229Spst		queue->tq_enqueue(queue->tq_context);
3033229Spst
3043229Spst	mtx_unlock_spin(&queue->tq_mutex);
3053229Spst
3063229Spst	return 0;
3073229Spst}
3083229Spst
3093229Spststatic void
3103229Spsttaskqueue_run_fast(struct taskqueue *queue)
3113229Spst{
3123229Spst	struct task *task;
3133229Spst	int pending;
3143229Spst
3153229Spst	mtx_lock_spin(&queue->tq_mutex);
3163229Spst	while (STAILQ_FIRST(&queue->tq_queue)) {
3173229Spst		/*
3183229Spst		 * Carefully remove the first task from the queue and
3193229Spst		 * zero its pending count.
3203229Spst		 */
3213229Spst		task = STAILQ_FIRST(&queue->tq_queue);
3223229Spst		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
3233229Spst		pending = task->ta_pending;
3243229Spst		task->ta_pending = 0;
3253229Spst		mtx_unlock_spin(&queue->tq_mutex);
3263229Spst
3273229Spst		task->ta_func(task->ta_context, pending);
3283229Spst
3293229Spst		mtx_lock_spin(&queue->tq_mutex);
3303229Spst	}
3313229Spst	mtx_unlock_spin(&queue->tq_mutex);
3323229Spst}
3333229Spst
3343229Spststruct taskqueue *taskqueue_fast;
3353229Spststatic void	*taskqueue_fast_ih;
3363229Spst
3373229Spststatic void
3383229Spsttaskqueue_fast_schedule(void *context)
3393229Spst{
3403229Spst	swi_sched(taskqueue_fast_ih, 0);
3413229Spst}
3423229Spst
3433229Spststatic void
3443229Spsttaskqueue_fast_run(void *dummy)
3453229Spst{
3463229Spst	taskqueue_run_fast(taskqueue_fast);
3473229Spst}
3483229Spst
3493229Spststatic void
3503229Spsttaskqueue_define_fast(void *arg)
3513229Spst{
3523229Spst
3533229Spst	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
3543229Spst	    M_NOWAIT | M_ZERO);
3553229Spst	if (!taskqueue_fast) {
3563229Spst		printf("%s: Unable to allocate fast task queue!\n", __func__);
3573229Spst		return;
3583229Spst	}
3593229Spst
3603229Spst	STAILQ_INIT(&taskqueue_fast->tq_queue);
3613229Spst	taskqueue_fast->tq_name = "fast";
3623229Spst	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
3633229Spst	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
3643229Spst
3653229Spst	mtx_lock(&taskqueue_queues_mutex);
3663229Spst	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
3673229Spst	mtx_unlock(&taskqueue_queues_mutex);
3683229Spst
3693229Spst	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
3703229Spst		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
3713229Spst}
3723229SpstSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
3733229Spst    taskqueue_define_fast, NULL);
3743229Spst