subr_taskqueue.c revision 126027
1219019Sgabor/*-
2219019Sgabor * Copyright (c) 2000 Doug Rabson
3219019Sgabor * All rights reserved.
4219019Sgabor *
5219019Sgabor * Redistribution and use in source and binary forms, with or without
6219019Sgabor * modification, are permitted provided that the following conditions
7219019Sgabor * are met:
8219019Sgabor * 1. Redistributions of source code must retain the above copyright
9219019Sgabor *    notice, this list of conditions and the following disclaimer.
10219019Sgabor * 2. Redistributions in binary form must reproduce the above copyright
11219019Sgabor *    notice, this list of conditions and the following disclaimer in the
12219019Sgabor *    documentation and/or other materials provided with the distribution.
13219019Sgabor *
14219019Sgabor * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15219019Sgabor * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16219019Sgabor * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17219019Sgabor * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18219019Sgabor * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19219019Sgabor * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20219019Sgabor * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21219019Sgabor * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22219019Sgabor * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23219019Sgabor * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24219019Sgabor * SUCH DAMAGE.
25219019Sgabor */
26219019Sgabor
27219019Sgabor#include <sys/cdefs.h>
28219019Sgabor__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 126027 2004-02-19 22:03:52Z jhb $");
29219019Sgabor
30219019Sgabor#include <sys/param.h>
31219019Sgabor#include <sys/systm.h>
32219019Sgabor#include <sys/bus.h>
33219019Sgabor#include <sys/interrupt.h>
34219019Sgabor#include <sys/kernel.h>
35219019Sgabor#include <sys/kthread.h>
36219019Sgabor#include <sys/lock.h>
37219019Sgabor#include <sys/malloc.h>
38219019Sgabor#include <sys/mutex.h>
39219019Sgabor#include <sys/taskqueue.h>
40219019Sgabor#include <sys/unistd.h>
41219019Sgabor
42219019Sgaborstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43219019Sgaborstatic void	*taskqueue_giant_ih;
44219019Sgaborstatic void	*taskqueue_ih;
45219019Sgaborstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46219019Sgaborstatic struct mtx taskqueue_queues_mutex;
47219019Sgaborstatic struct proc *taskqueue_thread_proc;
48219019Sgabor
49219019Sgaborstruct taskqueue {
50219019Sgabor	STAILQ_ENTRY(taskqueue)	tq_link;
51219019Sgabor	STAILQ_HEAD(, task)	tq_queue;
52219019Sgabor	const char		*tq_name;
53219019Sgabor	taskqueue_enqueue_fn	tq_enqueue;
54219019Sgabor	void			*tq_context;
55219019Sgabor	int			tq_draining;
56219019Sgabor	struct mtx		tq_mutex;
57219019Sgabor};
58219019Sgabor
59219019Sgaborstatic void	init_taskqueue_list(void *data);
60219019Sgabor
61219019Sgaborstatic void
62219019Sgaborinit_taskqueue_list(void *data __unused)
63219019Sgabor{
64219019Sgabor
65219019Sgabor	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
66219019Sgabor	STAILQ_INIT(&taskqueue_queues);
67219019Sgabor}
68219019SgaborSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
69219019Sgabor    NULL);
70219019Sgabor
71219019Sgaborstruct taskqueue *
72219019Sgabortaskqueue_create(const char *name, int mflags,
73219019Sgabor		 taskqueue_enqueue_fn enqueue, void *context)
74219019Sgabor{
75219019Sgabor	struct taskqueue *queue;
76219019Sgabor
77219019Sgabor	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
78219019Sgabor	if (!queue)
79219019Sgabor		return 0;
80219019Sgabor
81219019Sgabor	STAILQ_INIT(&queue->tq_queue);
82219019Sgabor	queue->tq_name = name;
83219019Sgabor	queue->tq_enqueue = enqueue;
84219019Sgabor	queue->tq_context = context;
85219019Sgabor	queue->tq_draining = 0;
86219019Sgabor	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
87219019Sgabor
88219019Sgabor	mtx_lock(&taskqueue_queues_mutex);
89219019Sgabor	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
90219019Sgabor	mtx_unlock(&taskqueue_queues_mutex);
91219019Sgabor
92219019Sgabor	return queue;
93219019Sgabor}
94219019Sgabor
95219019Sgaborvoid
96219019Sgabortaskqueue_free(struct taskqueue *queue)
97219019Sgabor{
98219019Sgabor
99219019Sgabor	mtx_lock(&queue->tq_mutex);
100219019Sgabor	KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue"));
101219019Sgabor	queue->tq_draining = 1;
102219019Sgabor	mtx_unlock(&queue->tq_mutex);
103219019Sgabor
104219019Sgabor	taskqueue_run(queue);
105219019Sgabor
106219019Sgabor	mtx_lock(&taskqueue_queues_mutex);
107219019Sgabor	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
108219019Sgabor	mtx_unlock(&taskqueue_queues_mutex);
109219019Sgabor
110219019Sgabor	mtx_destroy(&queue->tq_mutex);
111219019Sgabor	free(queue, M_TASKQUEUE);
112219019Sgabor}
113219019Sgabor
114219019Sgabor/*
115219019Sgabor * Returns with the taskqueue locked.
116219019Sgabor */
117219019Sgaborstruct taskqueue *
118219019Sgabortaskqueue_find(const char *name)
119219019Sgabor{
120219019Sgabor	struct taskqueue *queue;
121219019Sgabor
122219019Sgabor	mtx_lock(&taskqueue_queues_mutex);
123219019Sgabor	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
124219019Sgabor		mtx_lock(&queue->tq_mutex);
125219019Sgabor		if (strcmp(queue->tq_name, name) == 0) {
126219019Sgabor			mtx_unlock(&taskqueue_queues_mutex);
127219019Sgabor			return queue;
128219019Sgabor		}
129219019Sgabor		mtx_unlock(&queue->tq_mutex);
130219019Sgabor	}
131219019Sgabor	mtx_unlock(&taskqueue_queues_mutex);
132219019Sgabor	return NULL;
133219019Sgabor}
134219019Sgabor
135219019Sgaborint
136219019Sgabortaskqueue_enqueue(struct taskqueue *queue, struct task *task)
137219019Sgabor{
138219019Sgabor	struct task *ins;
139219019Sgabor	struct task *prev;
140219019Sgabor
141219019Sgabor	mtx_lock(&queue->tq_mutex);
142219019Sgabor
143219019Sgabor	/*
144219019Sgabor	 * Don't allow new tasks on a queue which is being freed.
145219019Sgabor	 */
146219019Sgabor	if (queue->tq_draining) {
147219019Sgabor		mtx_unlock(&queue->tq_mutex);
148219019Sgabor		return EPIPE;
149219019Sgabor	}
150219019Sgabor
151219019Sgabor	/*
152219019Sgabor	 * Count multiple enqueues.
153219019Sgabor	 */
154219019Sgabor	if (task->ta_pending) {
155219019Sgabor		task->ta_pending++;
156219019Sgabor		mtx_unlock(&queue->tq_mutex);
157219019Sgabor		return 0;
158219019Sgabor	}
159219019Sgabor
160219019Sgabor	/*
161219019Sgabor	 * Optimise the case when all tasks have the same priority.
162219019Sgabor	 */
163219019Sgabor	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
164219019Sgabor	if (!prev || prev->ta_priority >= task->ta_priority) {
165219019Sgabor		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
166219019Sgabor	} else {
167219019Sgabor		prev = 0;
168219019Sgabor		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
169219019Sgabor		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
170219019Sgabor			if (ins->ta_priority < task->ta_priority)
171219019Sgabor				break;
172219019Sgabor
173219019Sgabor		if (prev)
174219019Sgabor			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
175219019Sgabor		else
176219019Sgabor			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
177219019Sgabor	}
178219019Sgabor
179219019Sgabor	task->ta_pending = 1;
180219019Sgabor	if (queue->tq_enqueue)
181219019Sgabor		queue->tq_enqueue(queue->tq_context);
182219019Sgabor
183219019Sgabor	mtx_unlock(&queue->tq_mutex);
184219019Sgabor
185219019Sgabor	return 0;
186219019Sgabor}
187219019Sgabor
188219019Sgaborvoid
189219019Sgabortaskqueue_run(struct taskqueue *queue)
190219019Sgabor{
191219019Sgabor	struct task *task;
192219019Sgabor	int pending;
193219019Sgabor
194219019Sgabor	mtx_lock(&queue->tq_mutex);
195219019Sgabor	while (STAILQ_FIRST(&queue->tq_queue)) {
196219019Sgabor		/*
197219019Sgabor		 * Carefully remove the first task from the queue and
198219019Sgabor		 * zero its pending count.
199219019Sgabor		 */
200219019Sgabor		task = STAILQ_FIRST(&queue->tq_queue);
201219019Sgabor		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
202219019Sgabor		pending = task->ta_pending;
203219019Sgabor		task->ta_pending = 0;
204219019Sgabor		mtx_unlock(&queue->tq_mutex);
205219019Sgabor
206219019Sgabor		task->ta_func(task->ta_context, pending);
207219019Sgabor
208219019Sgabor		mtx_lock(&queue->tq_mutex);
209219019Sgabor	}
210219019Sgabor	mtx_unlock(&queue->tq_mutex);
211219019Sgabor}
212219019Sgabor
213219019Sgaborstatic void
214219019Sgabortaskqueue_swi_enqueue(void *context)
215219019Sgabor{
216219019Sgabor	swi_sched(taskqueue_ih, 0);
217219019Sgabor}
218219019Sgabor
219219019Sgaborstatic void
220219019Sgabortaskqueue_swi_run(void *dummy)
221219019Sgabor{
222219019Sgabor	taskqueue_run(taskqueue_swi);
223219019Sgabor}
224219019Sgabor
225219019Sgaborstatic void
226219019Sgabortaskqueue_swi_giant_enqueue(void *context)
227219019Sgabor{
228219019Sgabor	swi_sched(taskqueue_giant_ih, 0);
229219019Sgabor}
230219019Sgabor
231219019Sgaborstatic void
232219019Sgabortaskqueue_swi_giant_run(void *dummy)
233219019Sgabor{
234219019Sgabor	taskqueue_run(taskqueue_swi_giant);
235219019Sgabor}
236219019Sgabor
237219019Sgaborstatic void
238219019Sgabortaskqueue_thread_loop(void *arg)
239219019Sgabor{
240219019Sgabor	for (;;) {
241219019Sgabor		mtx_lock(&taskqueue_thread->tq_mutex);
242219019Sgabor		while (STAILQ_EMPTY(&taskqueue_thread->tq_queue))
243219019Sgabor			msleep(taskqueue_thread, &taskqueue_thread->tq_mutex,
244219019Sgabor			    PWAIT, "-", 0);
245219019Sgabor		mtx_unlock(&taskqueue_thread->tq_mutex);
246219019Sgabor		taskqueue_run(taskqueue_thread);
247219019Sgabor	}
248219019Sgabor}
249219019Sgabor
250219019Sgaborstatic void
251219019Sgabortaskqueue_thread_enqueue(void *context)
252219019Sgabor{
253219019Sgabor	mtx_assert(&taskqueue_thread->tq_mutex, MA_OWNED);
254219019Sgabor	wakeup(taskqueue_thread);
255219019Sgabor}
256219019Sgabor
257219019SgaborTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
258219019Sgabor		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
259219019Sgabor		     INTR_MPSAFE, &taskqueue_ih));
260219019Sgabor
261219019SgaborTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
262219019Sgabor		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
263219019Sgabor		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
264219019Sgabor
265219019SgaborTASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0,
266219019Sgabor		 kthread_create(taskqueue_thread_loop, NULL,
267219019Sgabor		 &taskqueue_thread_proc, 0, 0, "taskqueue"));
268219019Sgabor
269219019Sgaborint
270219019Sgabortaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
271219019Sgabor{
272219019Sgabor	struct task *ins;
273219019Sgabor	struct task *prev;
274219019Sgabor
275219019Sgabor	mtx_lock_spin(&queue->tq_mutex);
276219019Sgabor
277219019Sgabor	/*
278219019Sgabor	 * Don't allow new tasks on a queue which is being freed.
279219019Sgabor	 */
280219019Sgabor	if (queue->tq_draining) {
281219019Sgabor		mtx_unlock_spin(&queue->tq_mutex);
282219019Sgabor		return EPIPE;
283219019Sgabor	}
284219019Sgabor
285219019Sgabor	/*
286219019Sgabor	 * Count multiple enqueues.
287219019Sgabor	 */
288219019Sgabor	if (task->ta_pending) {
289219019Sgabor		task->ta_pending++;
290219019Sgabor		mtx_unlock_spin(&queue->tq_mutex);
291219019Sgabor		return 0;
292219019Sgabor	}
293219019Sgabor
294219019Sgabor	/*
295219019Sgabor	 * Optimise the case when all tasks have the same priority.
296219019Sgabor	 */
297219019Sgabor	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
298219019Sgabor	if (!prev || prev->ta_priority >= task->ta_priority) {
299219019Sgabor		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
300219019Sgabor	} else {
301219019Sgabor		prev = 0;
302219019Sgabor		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
303219019Sgabor		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
304219019Sgabor			if (ins->ta_priority < task->ta_priority)
305219019Sgabor				break;
306219019Sgabor
307219019Sgabor		if (prev)
308219019Sgabor			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
309219019Sgabor		else
310219019Sgabor			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
311219019Sgabor	}
312219019Sgabor
313219019Sgabor	task->ta_pending = 1;
314219019Sgabor	if (queue->tq_enqueue)
315219019Sgabor		queue->tq_enqueue(queue->tq_context);
316219019Sgabor
317219019Sgabor	mtx_unlock_spin(&queue->tq_mutex);
318219019Sgabor
319219019Sgabor	return 0;
320219019Sgabor}
321219019Sgabor
322219019Sgaborstatic void
323219019Sgabortaskqueue_run_fast(struct taskqueue *queue)
324219019Sgabor{
325219019Sgabor	struct task *task;
326219019Sgabor	int pending;
327219019Sgabor
328219019Sgabor	mtx_lock_spin(&queue->tq_mutex);
329219019Sgabor	while (STAILQ_FIRST(&queue->tq_queue)) {
330219019Sgabor		/*
331219019Sgabor		 * Carefully remove the first task from the queue and
332219019Sgabor		 * zero its pending count.
333219019Sgabor		 */
334219019Sgabor		task = STAILQ_FIRST(&queue->tq_queue);
335219019Sgabor		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
336219019Sgabor		pending = task->ta_pending;
337219019Sgabor		task->ta_pending = 0;
338219019Sgabor		mtx_unlock_spin(&queue->tq_mutex);
339219019Sgabor
340219019Sgabor		task->ta_func(task->ta_context, pending);
341219019Sgabor
342219019Sgabor		mtx_lock_spin(&queue->tq_mutex);
343219019Sgabor	}
344219019Sgabor	mtx_unlock_spin(&queue->tq_mutex);
345219019Sgabor}
346219019Sgabor
347219019Sgaborstruct taskqueue *taskqueue_fast;
348219019Sgaborstatic void	*taskqueue_fast_ih;
349219019Sgabor
350219019Sgaborstatic void
351219019Sgabortaskqueue_fast_schedule(void *context)
352219019Sgabor{
353219019Sgabor	swi_sched(taskqueue_fast_ih, 0);
354219019Sgabor}
355219019Sgabor
356219019Sgaborstatic void
357219019Sgabortaskqueue_fast_run(void *dummy)
358219019Sgabor{
359219019Sgabor	taskqueue_run_fast(taskqueue_fast);
360219019Sgabor}
361219019Sgabor
362219019Sgaborstatic void
363219019Sgabortaskqueue_define_fast(void *arg)
364219019Sgabor{
365219019Sgabor	taskqueue_fast = malloc(sizeof(struct taskqueue),
366219019Sgabor		M_TASKQUEUE, M_NOWAIT | M_ZERO);
367219019Sgabor	if (!taskqueue_fast) {
368219019Sgabor		printf("%s: Unable to allocate fast task queue!\n", __func__);
369219019Sgabor		return;
370219019Sgabor	}
371219019Sgabor
372219019Sgabor	STAILQ_INIT(&taskqueue_fast->tq_queue);
373219019Sgabor	taskqueue_fast->tq_name = "fast";
374219019Sgabor	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
375219019Sgabor	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
376219019Sgabor
377219019Sgabor	mtx_lock(&taskqueue_queues_mutex);
378219019Sgabor	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
379219019Sgabor	mtx_unlock(&taskqueue_queues_mutex);
380219019Sgabor
381219019Sgabor	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
382219019Sgabor		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
383219019Sgabor}
384219019SgaborSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
385219019Sgabor	taskqueue_define_fast, NULL);
386