subr_taskqueue.c revision 145729
11553Srgrimes/*-
21553Srgrimes * Copyright (c) 2000 Doug Rabson
31553Srgrimes * All rights reserved.
41553Srgrimes *
51553Srgrimes * Redistribution and use in source and binary forms, with or without
61553Srgrimes * modification, are permitted provided that the following conditions
71553Srgrimes * are met:
81553Srgrimes * 1. Redistributions of source code must retain the above copyright
91553Srgrimes *    notice, this list of conditions and the following disclaimer.
101553Srgrimes * 2. Redistributions in binary form must reproduce the above copyright
111553Srgrimes *    notice, this list of conditions and the following disclaimer in the
121553Srgrimes *    documentation and/or other materials provided with the distribution.
131553Srgrimes *
141553Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
151553Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
161553Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
171553Srgrimes * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
181553Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
191553Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
201553Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
211553Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
221553Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
231553Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
241553Srgrimes * SUCH DAMAGE.
251553Srgrimes */
261553Srgrimes
271553Srgrimes#include <sys/cdefs.h>
281553Srgrimes__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 145729 2005-05-01 00:38:11Z sam $");
291553Srgrimes
301553Srgrimes#include <sys/param.h>
3129451Scharnier#include <sys/systm.h>
321553Srgrimes#include <sys/bus.h>
3329451Scharnier#include <sys/interrupt.h>
3429451Scharnier#include <sys/kernel.h>
3550479Speter#include <sys/kthread.h>
361553Srgrimes#include <sys/lock.h>
371553Srgrimes#include <sys/malloc.h>
381553Srgrimes#include <sys/mutex.h>
391553Srgrimes#include <sys/proc.h>
401553Srgrimes#include <sys/taskqueue.h>
411553Srgrimes#include <sys/unistd.h>
421553Srgrimes
431553Srgrimesstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
4429451Scharnierstatic void	*taskqueue_giant_ih;
4529451Scharnierstatic void	*taskqueue_ih;
461553Srgrimesstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
4720458Sjoergstatic struct mtx taskqueue_queues_mutex;
4869004Simp
4916073Sphkstruct taskqueue {
501553Srgrimes	STAILQ_ENTRY(taskqueue)	tq_link;
5130638Speter	STAILQ_HEAD(, task)	tq_queue;
521553Srgrimes	const char		*tq_name;
531553Srgrimes	taskqueue_enqueue_fn	tq_enqueue;
5461640Speter	void			*tq_context;
551553Srgrimes	struct task		*tq_running;
561553Srgrimes	struct mtx		tq_mutex;
571553Srgrimes	struct proc		**tq_pproc;
581553Srgrimes};
591553Srgrimes
601553Srgrimesstatic void	init_taskqueue_list(void *data);
6161640Speter
621553Srgrimesstatic void
631553Srgrimesinit_taskqueue_list(void *data __unused)
641553Srgrimes{
651553Srgrimes
661553Srgrimes	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
671553Srgrimes	STAILQ_INIT(&taskqueue_queues);
6861640Speter}
6961640SpeterSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
7061640Speter    NULL);
7169135Speter
7261640Speterstruct taskqueue *
7361640Spetertaskqueue_create(const char *name, int mflags,
7472684Speter		 taskqueue_enqueue_fn enqueue, void *context,
7561640Speter		 struct proc **pp)
7629451Scharnier{
771553Srgrimes	struct taskqueue *queue;
781553Srgrimes
791553Srgrimes	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
8045744Speter	if (!queue)
8161640Speter		return 0;
821553Srgrimes
8361640Speter	STAILQ_INIT(&queue->tq_queue);
841553Srgrimes	queue->tq_name = name;
85110895Sru	queue->tq_enqueue = enqueue;
861553Srgrimes	queue->tq_context = context;
871553Srgrimes	queue->tq_pproc = pp;
881553Srgrimes	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
891553Srgrimes
901553Srgrimes	mtx_lock(&taskqueue_queues_mutex);
911553Srgrimes	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
921553Srgrimes	mtx_unlock(&taskqueue_queues_mutex);
931553Srgrimes
941553Srgrimes	return queue;
9545744Speter}
9661640Speter
971553Srgrimes/*
9861640Speter * Signal a taskqueue thread to terminate.
991553Srgrimes */
100159362Sdelphijstatic void
101205880Srutaskqueue_terminate(struct proc **pp, struct taskqueue *tq)
102205880Sru{
103110895Sru	struct proc *p;
1041553Srgrimes
1051553Srgrimes	p = *pp;
1061553Srgrimes	*pp = NULL;
1071553Srgrimes	if (p) {
108207260Simp		wakeup_one(tq);
1091553Srgrimes		PROC_LOCK(p);		   /* NB: insure we don't miss wakeup */
110207260Simp		mtx_unlock(&tq->tq_mutex); /* let taskqueue thread run */
111207260Simp		msleep(p, &p->p_mtx, PWAIT, "taskqueue_destroy", 0);
1121553Srgrimes		PROC_UNLOCK(p);
113207260Simp		mtx_lock(&tq->tq_mutex);
1141553Srgrimes	}
1151553Srgrimes}
11655614Speter
1171553Srgrimesvoid
11855614Spetertaskqueue_free(struct taskqueue *queue)
11955614Speter{
12055614Speter
12155614Speter	mtx_lock(&taskqueue_queues_mutex);
12229451Scharnier	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
12329451Scharnier	mtx_unlock(&taskqueue_queues_mutex);
124207260Simp
125207260Simp	mtx_lock(&queue->tq_mutex);
12612772Speter	taskqueue_run(queue);
127207260Simp	taskqueue_terminate(queue->tq_pproc, queue);
128207260Simp	mtx_destroy(&queue->tq_mutex);
129207260Simp	free(queue, M_TASKQUEUE);
130207260Simp}
131207260Simp
132207260Simp/*
133207260Simp * Returns with the taskqueue locked.
134207260Simp */
135207260Simpstruct taskqueue *
136207260Simptaskqueue_find(const char *name)
137207260Simp{
138207260Simp	struct taskqueue *queue;
13999923Sbde
14099923Sbde	mtx_lock(&taskqueue_queues_mutex);
14199923Sbde	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
142113951Sdes		if (strcmp(queue->tq_name, name) == 0) {
143218544Simp			mtx_lock(&queue->tq_mutex);
144218544Simp			mtx_unlock(&taskqueue_queues_mutex);
145185186Sthompsa			return queue;
146185186Sthompsa		}
147185186Sthompsa	}
148185186Sthompsa	mtx_unlock(&taskqueue_queues_mutex);
149185186Sthompsa	return NULL;
150185186Sthompsa}
1511553Srgrimes
1521553Srgrimesint
15399923Sbdetaskqueue_enqueue(struct taskqueue *queue, struct task *task)
15420395Sbde{
15552653Smarcel	struct task *ins;
15652653Smarcel	struct task *prev;
1571553Srgrimes
1581553Srgrimes	mtx_lock(&queue->tq_mutex);
1591553Srgrimes
1601553Srgrimes	/*
1611553Srgrimes	 * Count multiple enqueues.
1625325Sgibbs	 */
1635325Sgibbs	if (task->ta_pending) {
1645325Sgibbs		task->ta_pending++;
1651553Srgrimes		mtx_unlock(&queue->tq_mutex);
16669135Speter		return 0;
16769135Speter	}
1681553Srgrimes
1691553Srgrimes	/*
1706803Sgibbs	 * Optimise the case when all tasks have the same priority.
1716803Sgibbs	 */
172207260Simp	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
173207260Simp	if (!prev || prev->ta_priority >= task->ta_priority) {
174207260Simp		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
1751553Srgrimes	} else {
1761553Srgrimes		prev = 0;
1771553Srgrimes		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
1781553Srgrimes		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
1791553Srgrimes			if (ins->ta_priority < task->ta_priority)
1801553Srgrimes				break;
18113400Speter
182153888Sru		if (prev)
18334619Seivind			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
184153888Sru		else
185153888Sru			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
186153888Sru	}
187153888Sru
188153888Sru	task->ta_pending = 1;
189153888Sru	queue->tq_enqueue(queue->tq_context);
190163640Simp
191153888Sru	mtx_unlock(&queue->tq_mutex);
192153888Sru
193163638Simp	return 0;
194153888Sru}
19561640Speter
19661640Spetervoid
19761640Spetertaskqueue_run(struct taskqueue *queue)
19883594Speter{
19983594Speter	struct task *task;
20083594Speter	int owned, pending;
20165091Speter
20261640Speter	owned = mtx_owned(&queue->tq_mutex);
203163638Simp	if (!owned)
204163638Simp		mtx_lock(&queue->tq_mutex);
205163638Simp	while (STAILQ_FIRST(&queue->tq_queue)) {
206163638Simp		/*
20761640Speter		 * Carefully remove the first task from the queue and
20861640Speter		 * zero its pending count.
20961640Speter		 */
21061640Speter		task = STAILQ_FIRST(&queue->tq_queue);
21161640Speter		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
21261640Speter		pending = task->ta_pending;
21361640Speter		task->ta_pending = 0;
21461640Speter		queue->tq_running = task;
21561640Speter		mtx_unlock(&queue->tq_mutex);
21661640Speter
21761640Speter		task->ta_func(task->ta_context, pending);
21861640Speter
21961640Speter		mtx_lock(&queue->tq_mutex);
22061640Speter		queue->tq_running = NULL;
22161640Speter		wakeup(task);
22261640Speter	}
22361640Speter
22461640Speter	/*
22561640Speter	 * For compatibility, unlock on return if the queue was not locked
22661640Speter	 * on entry, although this opens a race window.
22761640Speter	 */
22861640Speter	if (!owned)
22961640Speter		mtx_unlock(&queue->tq_mutex);
23061640Speter}
23161640Speter
23261640Spetervoid
23361652Spetertaskqueue_drain(struct taskqueue *queue, struct task *task)
23461640Speter{
23561640Speter	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain");
236163640Simp
23761640Speter	mtx_lock(&queue->tq_mutex);
23861640Speter	while (task->ta_pending != 0 || task == queue->tq_running)
23961640Speter		msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
24061640Speter	mtx_unlock(&queue->tq_mutex);
241153888Sru}
24282393Speter
243153888Srustatic void
244153888Srutaskqueue_swi_enqueue(void *context)
245153888Sru{
246153888Sru	swi_sched(taskqueue_ih, 0);
247153888Sru}
248153888Sru
249153888Srustatic void
250153888Srutaskqueue_swi_run(void *dummy)
251153888Sru{
252153888Sru	taskqueue_run(taskqueue_swi);
25382393Speter}
25482393Speter
25582393Speterstatic void
25682393Spetertaskqueue_swi_giant_enqueue(void *context)
25782393Speter{
25882393Speter	swi_sched(taskqueue_giant_ih, 0);
25982393Speter}
26082393Speter
26182393Speterstatic void
26282393Spetertaskqueue_swi_giant_run(void *dummy)
26383594Speter{
26483594Speter	taskqueue_run(taskqueue_swi_giant);
26583594Speter}
26682393Speter
26782393Spetervoid
26882393Spetertaskqueue_thread_loop(void *arg)
26982393Speter{
27082393Speter	struct taskqueue **tqp, *tq;
27182393Speter
27282393Speter	tqp = arg;
27382393Speter	tq = *tqp;
27482393Speter	mtx_lock(&tq->tq_mutex);
27582393Speter	do {
27682393Speter		taskqueue_run(tq);
27782393Speter		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
27882393Speter	} while (*tq->tq_pproc != NULL);
27982393Speter
28082393Speter	/* rendezvous with thread that asked us to terminate */
28182393Speter	wakeup_one(tq);
28282393Speter	mtx_unlock(&tq->tq_mutex);
28382393Speter	kthread_exit(0);
28482393Speter}
28582393Speter
28682393Spetervoid
28782393Spetertaskqueue_thread_enqueue(void *context)
28882393Speter{
28982393Speter	struct taskqueue **tqp, *tq;
29082393Speter
29182393Speter	tqp = context;
29282393Speter	tq = *tqp;
29382393Speter
29482393Speter	mtx_assert(&tq->tq_mutex, MA_OWNED);
29582393Speter	wakeup_one(tq);
29682393Speter}
29782393Speter
29882393SpeterTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
29982393Speter		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
30082393Speter		     INTR_MPSAFE, &taskqueue_ih));
30182393Speter
30282393SpeterTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
30382393Speter		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
3041553Srgrimes		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
3051553Srgrimes
306129119ScognetTASKQUEUE_DEFINE_THREAD(thread);
307129073Scognet
3081553Srgrimesint
309162936Srutaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
3101553Srgrimes{
311152811Sru	struct task *ins;
31261640Speter	struct task *prev;
31361640Speter
314152811Sru	mtx_lock_spin(&queue->tq_mutex);
315153889Sru
316134542Speter	/*
3171553Srgrimes	 * Count multiple enqueues.
3181553Srgrimes	 */
31929451Scharnier	if (task->ta_pending) {
32029451Scharnier		task->ta_pending++;
3211553Srgrimes		mtx_unlock_spin(&queue->tq_mutex);
3221553Srgrimes		return 0;
323162936Sru	}
324134542Speter
325152862Sru	/*
3268857Srgrimes	 * Optimise the case when all tasks have the same priority.
3276803Sgibbs	 */
32854490Speter	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
3291553Srgrimes	if (!prev || prev->ta_priority >= task->ta_priority) {
3301553Srgrimes		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
3311553Srgrimes	} else {
3321553Srgrimes		prev = 0;
3331553Srgrimes		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
334129073Scognet		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
3351553Srgrimes			if (ins->ta_priority < task->ta_priority)
3361553Srgrimes				break;
33752098Speter
3381566Srgrimes		if (prev)
33952098Speter			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
34052098Speter		else
3411566Srgrimes			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
3421566Srgrimes	}
343162936Sru
344162936Sru	task->ta_pending = 1;
345162936Sru	queue->tq_enqueue(queue->tq_context);
346210144Simp
347210144Simp	mtx_unlock_spin(&queue->tq_mutex);
348162936Sru
349162936Sru	return 0;
350162936Sru}
351162936Sru
352162936Srustatic void
353162936Srutaskqueue_run_fast(struct taskqueue *queue)
354162936Sru{
355162936Sru	struct task *task;
3561553Srgrimes	int pending;
3571553Srgrimes
3581553Srgrimes	mtx_lock_spin(&queue->tq_mutex);
359210144Simp	while (STAILQ_FIRST(&queue->tq_queue)) {
3601553Srgrimes		/*
3611553Srgrimes		 * Carefully remove the first task from the queue and
362152811Sru		 * zero its pending count.
363152862Sru		 */
364152862Sru		task = STAILQ_FIRST(&queue->tq_queue);
3651553Srgrimes		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
36673199Speter		pending = task->ta_pending;
3674571Sgibbs		task->ta_pending = 0;
3686803Sgibbs		mtx_unlock_spin(&queue->tq_mutex);
36972684Speter
37030796Sjoerg		task->ta_func(task->ta_context, pending);
3714571Sgibbs
3724571Sgibbs		mtx_lock_spin(&queue->tq_mutex);
3735325Sgibbs	}
37491002Speter	mtx_unlock_spin(&queue->tq_mutex);
3751553Srgrimes}
37661523Speter
3771553Srgrimesstruct taskqueue *taskqueue_fast;
37830796Sjoergstatic void	*taskqueue_fast_ih;
37930796Sjoerg
38030796Sjoergstatic void
38130796Sjoergtaskqueue_fast_schedule(void *context)
38230796Sjoerg{
38361523Speter	swi_sched(taskqueue_fast_ih, 0);
38430796Sjoerg}
38561523Speter
386210144Simpstatic void
387214654Sobrientaskqueue_fast_run(void *dummy)
388214654Sobrien{
3891553Srgrimes	taskqueue_run_fast(taskqueue_fast);
3901553Srgrimes}
3911553Srgrimes
3921553Srgrimesstatic void
393113397Sphktaskqueue_define_fast(void *arg)
394152862Sru{
395152862Sru
396152862Sru	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
397152862Sru	    M_NOWAIT | M_ZERO);
398113397Sphk	if (!taskqueue_fast) {
399152862Sru		printf("%s: Unable to allocate fast task queue!\n", __func__);
400152862Sru		return;
401210144Simp	}
402152862Sru
403152862Sru	STAILQ_INIT(&taskqueue_fast->tq_queue);
404152862Sru	taskqueue_fast->tq_name = "fast";
405152862Sru	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
406152862Sru	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
407152862Sru
408152862Sru	mtx_lock(&taskqueue_queues_mutex);
409152862Sru	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
4104571Sgibbs	mtx_unlock(&taskqueue_queues_mutex);
4114571Sgibbs
4124571Sgibbs	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
4134571Sgibbs		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
4144571Sgibbs}
41573199SpeterSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
416210144Simp    taskqueue_define_fast, NULL);
417210144Simp