subr_taskqueue.c revision 119789
161033Sdfr/*-
261033Sdfr * Copyright (c) 2000 Doug Rabson
361033Sdfr * All rights reserved.
461033Sdfr *
561033Sdfr * Redistribution and use in source and binary forms, with or without
661033Sdfr * modification, are permitted provided that the following conditions
761033Sdfr * are met:
861033Sdfr * 1. Redistributions of source code must retain the above copyright
961033Sdfr *    notice, this list of conditions and the following disclaimer.
1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright
1161033Sdfr *    notice, this list of conditions and the following disclaimer in the
1261033Sdfr *    documentation and/or other materials provided with the distribution.
1361033Sdfr *
1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1761033Sdfr * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2461033Sdfr * SUCH DAMAGE.
2561033Sdfr */
2661033Sdfr
27116182Sobrien#include <sys/cdefs.h>
28116182Sobrien__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 119789 2003-09-05 23:09:22Z sam $");
29116182Sobrien
3061033Sdfr#include <sys/param.h>
3185521Sjhb#include <sys/systm.h>
3265822Sjhb#include <sys/bus.h>
3385560Sjhb#include <sys/interrupt.h>
3461033Sdfr#include <sys/kernel.h>
3585521Sjhb#include <sys/lock.h>
3661033Sdfr#include <sys/malloc.h>
3785521Sjhb#include <sys/mutex.h>
3885521Sjhb#include <sys/taskqueue.h>
39119708Sken#include <sys/kthread.h>
40119708Sken#include <sys/unistd.h>
4161033Sdfr
4269774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
4361033Sdfr
4461033Sdfrstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
4561033Sdfr
4672238Sjhbstatic void	*taskqueue_ih;
47111528Sscottlstatic void	*taskqueue_giant_ih;
4885521Sjhbstatic struct mtx taskqueue_queues_mutex;
49119708Skenstatic struct proc *taskqueue_thread_proc;
5067551Sjhb
5161033Sdfrstruct taskqueue {
5261033Sdfr	STAILQ_ENTRY(taskqueue)	tq_link;
5361033Sdfr	STAILQ_HEAD(, task)	tq_queue;
5461033Sdfr	const char		*tq_name;
5561033Sdfr	taskqueue_enqueue_fn	tq_enqueue;
5661033Sdfr	void			*tq_context;
5761033Sdfr	int			tq_draining;
5885521Sjhb	struct mtx		tq_mutex;
5961033Sdfr};
6061033Sdfr
6185521Sjhbstatic void	init_taskqueue_list(void *data);
6285521Sjhb
6385521Sjhbstatic void
6485521Sjhbinit_taskqueue_list(void *data __unused)
6585521Sjhb{
6685521Sjhb
6793818Sjhb	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
6885521Sjhb	STAILQ_INIT(&taskqueue_queues);
6985521Sjhb}
7085521SjhbSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
7185521Sjhb    NULL);
7285521Sjhb
7361033Sdfrstruct taskqueue *
7461033Sdfrtaskqueue_create(const char *name, int mflags,
7561033Sdfr		 taskqueue_enqueue_fn enqueue, void *context)
7661033Sdfr{
7761033Sdfr	struct taskqueue *queue;
7861033Sdfr
7985521Sjhb	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
8061033Sdfr	if (!queue)
8161033Sdfr		return 0;
8285521Sjhb
8361033Sdfr	STAILQ_INIT(&queue->tq_queue);
8461033Sdfr	queue->tq_name = name;
8561033Sdfr	queue->tq_enqueue = enqueue;
8661033Sdfr	queue->tq_context = context;
8761033Sdfr	queue->tq_draining = 0;
8893818Sjhb	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
8961033Sdfr
9085521Sjhb	mtx_lock(&taskqueue_queues_mutex);
9161033Sdfr	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
9285521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
9361033Sdfr
9461033Sdfr	return queue;
9561033Sdfr}
9661033Sdfr
9761033Sdfrvoid
9861033Sdfrtaskqueue_free(struct taskqueue *queue)
9961033Sdfr{
10085521Sjhb
10185521Sjhb	mtx_lock(&queue->tq_mutex);
102101153Sjhb	KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue"));
10361033Sdfr	queue->tq_draining = 1;
10485521Sjhb	mtx_unlock(&queue->tq_mutex);
10561033Sdfr
10661033Sdfr	taskqueue_run(queue);
10761033Sdfr
10885521Sjhb	mtx_lock(&taskqueue_queues_mutex);
10961033Sdfr	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
11085521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
11161033Sdfr
11285521Sjhb	mtx_destroy(&queue->tq_mutex);
11361033Sdfr	free(queue, M_TASKQUEUE);
11461033Sdfr}
11561033Sdfr
11685521Sjhb/*
11785521Sjhb * Returns with the taskqueue locked.
11885521Sjhb */
11961033Sdfrstruct taskqueue *
12061033Sdfrtaskqueue_find(const char *name)
12161033Sdfr{
12261033Sdfr	struct taskqueue *queue;
12361033Sdfr
12485521Sjhb	mtx_lock(&taskqueue_queues_mutex);
12585521Sjhb	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
12685521Sjhb		mtx_lock(&queue->tq_mutex);
12761033Sdfr		if (!strcmp(queue->tq_name, name)) {
12885521Sjhb			mtx_unlock(&taskqueue_queues_mutex);
12961033Sdfr			return queue;
13061033Sdfr		}
13185521Sjhb		mtx_unlock(&queue->tq_mutex);
13285521Sjhb	}
13385521Sjhb	mtx_unlock(&taskqueue_queues_mutex);
13461033Sdfr	return 0;
13561033Sdfr}
13661033Sdfr
13761033Sdfrint
13861033Sdfrtaskqueue_enqueue(struct taskqueue *queue, struct task *task)
13961033Sdfr{
14061033Sdfr	struct task *ins;
14161033Sdfr	struct task *prev;
14261033Sdfr
14385560Sjhb	mtx_lock(&queue->tq_mutex);
14485560Sjhb
14561033Sdfr	/*
14661033Sdfr	 * Don't allow new tasks on a queue which is being freed.
14761033Sdfr	 */
14861033Sdfr	if (queue->tq_draining) {
14985521Sjhb		mtx_unlock(&queue->tq_mutex);
15061033Sdfr		return EPIPE;
15161033Sdfr	}
15261033Sdfr
15361033Sdfr	/*
15461033Sdfr	 * Count multiple enqueues.
15561033Sdfr	 */
15661033Sdfr	if (task->ta_pending) {
15761033Sdfr		task->ta_pending++;
15885521Sjhb		mtx_unlock(&queue->tq_mutex);
15961033Sdfr		return 0;
16061033Sdfr	}
16161033Sdfr
16261033Sdfr	/*
16361033Sdfr	 * Optimise the case when all tasks have the same priority.
16461033Sdfr	 */
16564199Shsu	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
16661033Sdfr	if (!prev || prev->ta_priority >= task->ta_priority) {
16761033Sdfr		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
16861033Sdfr	} else {
16961033Sdfr		prev = 0;
17061033Sdfr		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
17161033Sdfr		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
17261033Sdfr			if (ins->ta_priority < task->ta_priority)
17361033Sdfr				break;
17461033Sdfr
17561033Sdfr		if (prev)
17661033Sdfr			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
17761033Sdfr		else
17861033Sdfr			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
17961033Sdfr	}
18061033Sdfr
18161033Sdfr	task->ta_pending = 1;
18261033Sdfr	if (queue->tq_enqueue)
18361033Sdfr		queue->tq_enqueue(queue->tq_context);
18485560Sjhb
18585521Sjhb	mtx_unlock(&queue->tq_mutex);
18685560Sjhb
18761033Sdfr	return 0;
18861033Sdfr}
18961033Sdfr
19061033Sdfrvoid
19161033Sdfrtaskqueue_run(struct taskqueue *queue)
19261033Sdfr{
19361033Sdfr	struct task *task;
19461033Sdfr	int pending;
19561033Sdfr
19685521Sjhb	mtx_lock(&queue->tq_mutex);
19761033Sdfr	while (STAILQ_FIRST(&queue->tq_queue)) {
19861033Sdfr		/*
19961033Sdfr		 * Carefully remove the first task from the queue and
20061033Sdfr		 * zero its pending count.
20161033Sdfr		 */
20261033Sdfr		task = STAILQ_FIRST(&queue->tq_queue);
20361033Sdfr		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
20461033Sdfr		pending = task->ta_pending;
20561033Sdfr		task->ta_pending = 0;
20685560Sjhb		mtx_unlock(&queue->tq_mutex);
20761033Sdfr
20885560Sjhb		task->ta_func(task->ta_context, pending);
20961033Sdfr
21085521Sjhb		mtx_lock(&queue->tq_mutex);
21161033Sdfr	}
21285521Sjhb	mtx_unlock(&queue->tq_mutex);
21361033Sdfr}
21461033Sdfr
21561033Sdfrstatic void
21661033Sdfrtaskqueue_swi_enqueue(void *context)
21761033Sdfr{
21888900Sjhb	swi_sched(taskqueue_ih, 0);
21961033Sdfr}
22061033Sdfr
22161033Sdfrstatic void
22267551Sjhbtaskqueue_swi_run(void *dummy)
22361033Sdfr{
22461033Sdfr	taskqueue_run(taskqueue_swi);
22561033Sdfr}
22661033Sdfr
227111528Sscottlstatic void
228111528Sscottltaskqueue_swi_giant_enqueue(void *context)
229111528Sscottl{
230111528Sscottl	swi_sched(taskqueue_giant_ih, 0);
231111528Sscottl}
232111528Sscottl
233111528Sscottlstatic void
234111528Sscottltaskqueue_swi_giant_run(void *dummy)
235111528Sscottl{
236111528Sscottl	taskqueue_run(taskqueue_swi_giant);
237111528Sscottl}
238111528Sscottl
239119708Skenstatic void
240119708Skentaskqueue_kthread(void *arg)
241119708Sken{
242119708Sken	struct mtx kthread_mutex;
243119708Sken
244119708Sken	bzero(&kthread_mutex, sizeof(kthread_mutex));
245119708Sken
246119708Sken	mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF);
247119708Sken
248119708Sken	mtx_lock(&kthread_mutex);
249119708Sken
250119708Sken	for (;;) {
251119708Sken		mtx_unlock(&kthread_mutex);
252119708Sken		taskqueue_run(taskqueue_thread);
253119708Sken		mtx_lock(&kthread_mutex);
254119708Sken		msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0);
255119708Sken	}
256119708Sken}
257119708Sken
258119708Skenstatic void
259119708Skentaskqueue_thread_enqueue(void *context)
260119708Sken{
261119708Sken	wakeup(&taskqueue_thread);
262119708Sken}
263119708Sken
26461033SdfrTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
265111528Sscottl		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
266111528Sscottl		     INTR_MPSAFE, &taskqueue_ih));
267111528Sscottl
268111528SscottlTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
269111528Sscottl		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
270111528Sscottl		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
271119708Sken
272119708SkenTASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0,
273119708Sken		 kthread_create(taskqueue_kthread, NULL,
274119708Sken		 &taskqueue_thread_proc, RFNOWAIT, 0, "taskqueue"));
275119789Ssam
276119789Ssamint
277119789Ssamtaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
278119789Ssam{
279119789Ssam	struct task *ins;
280119789Ssam	struct task *prev;
281119789Ssam
282119789Ssam	mtx_lock_spin(&queue->tq_mutex);
283119789Ssam
284119789Ssam	/*
285119789Ssam	 * Don't allow new tasks on a queue which is being freed.
286119789Ssam	 */
287119789Ssam	if (queue->tq_draining) {
288119789Ssam		mtx_unlock_spin(&queue->tq_mutex);
289119789Ssam		return EPIPE;
290119789Ssam	}
291119789Ssam
292119789Ssam	/*
293119789Ssam	 * Count multiple enqueues.
294119789Ssam	 */
295119789Ssam	if (task->ta_pending) {
296119789Ssam		task->ta_pending++;
297119789Ssam		mtx_unlock_spin(&queue->tq_mutex);
298119789Ssam		return 0;
299119789Ssam	}
300119789Ssam
301119789Ssam	/*
302119789Ssam	 * Optimise the case when all tasks have the same priority.
303119789Ssam	 */
304119789Ssam	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
305119789Ssam	if (!prev || prev->ta_priority >= task->ta_priority) {
306119789Ssam		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
307119789Ssam	} else {
308119789Ssam		prev = 0;
309119789Ssam		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
310119789Ssam		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
311119789Ssam			if (ins->ta_priority < task->ta_priority)
312119789Ssam				break;
313119789Ssam
314119789Ssam		if (prev)
315119789Ssam			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
316119789Ssam		else
317119789Ssam			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
318119789Ssam	}
319119789Ssam
320119789Ssam	task->ta_pending = 1;
321119789Ssam	if (queue->tq_enqueue)
322119789Ssam		queue->tq_enqueue(queue->tq_context);
323119789Ssam
324119789Ssam	mtx_unlock_spin(&queue->tq_mutex);
325119789Ssam
326119789Ssam	return 0;
327119789Ssam}
328119789Ssam
329119789Ssamstatic void
330119789Ssamtaskqueue_run_fast(struct taskqueue *queue)
331119789Ssam{
332119789Ssam	struct task *task;
333119789Ssam	int pending;
334119789Ssam
335119789Ssam	mtx_lock_spin(&queue->tq_mutex);
336119789Ssam	while (STAILQ_FIRST(&queue->tq_queue)) {
337119789Ssam		/*
338119789Ssam		 * Carefully remove the first task from the queue and
339119789Ssam		 * zero its pending count.
340119789Ssam		 */
341119789Ssam		task = STAILQ_FIRST(&queue->tq_queue);
342119789Ssam		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
343119789Ssam		pending = task->ta_pending;
344119789Ssam		task->ta_pending = 0;
345119789Ssam		mtx_unlock_spin(&queue->tq_mutex);
346119789Ssam
347119789Ssam		task->ta_func(task->ta_context, pending);
348119789Ssam
349119789Ssam		mtx_lock_spin(&queue->tq_mutex);
350119789Ssam	}
351119789Ssam	mtx_unlock_spin(&queue->tq_mutex);
352119789Ssam}
353119789Ssam
354119789Ssamstruct taskqueue *taskqueue_fast;
355119789Ssamstatic void	*taskqueue_fast_ih;
356119789Ssam
357119789Ssamstatic void
358119789Ssamtaskqueue_fast_schedule(void *context)
359119789Ssam{
360119789Ssam	swi_sched(taskqueue_fast_ih, 0);
361119789Ssam}
362119789Ssam
363119789Ssamstatic void
364119789Ssamtaskqueue_fast_run(void *dummy)
365119789Ssam{
366119789Ssam	taskqueue_run_fast(taskqueue_fast);
367119789Ssam}
368119789Ssam
369119789Ssamstatic void
370119789Ssamtaskqueue_define_fast(void *arg)
371119789Ssam{
372119789Ssam	taskqueue_fast = malloc(sizeof(struct taskqueue),
373119789Ssam		M_TASKQUEUE, M_NOWAIT | M_ZERO);
374119789Ssam	if (!taskqueue_fast) {
375119789Ssam		printf("%s: Unable to allocate fast task queue!\n", __func__);
376119789Ssam		return;
377119789Ssam	}
378119789Ssam
379119789Ssam	STAILQ_INIT(&taskqueue_fast->tq_queue);
380119789Ssam	taskqueue_fast->tq_name = "fast";
381119789Ssam	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
382119789Ssam	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue", NULL, MTX_SPIN);
383119789Ssam
384119789Ssam	mtx_lock(&taskqueue_queues_mutex);
385119789Ssam	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
386119789Ssam	mtx_unlock(&taskqueue_queues_mutex);
387119789Ssam
388119789Ssam	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
389119789Ssam		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
390119789Ssam}
391119789SsamSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
392119789Ssam	taskqueue_define_fast, NULL);
393