subr_taskqueue.c revision 123614
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 123614 2003-12-17 21:13:04Z jhb $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/taskqueue.h>
40#include <sys/unistd.h>
41
42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43static void	*taskqueue_giant_ih;
44static void	*taskqueue_ih;
45static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46static struct mtx taskqueue_queues_mutex;
47static struct proc *taskqueue_thread_proc;
48
49struct taskqueue {
50	STAILQ_ENTRY(taskqueue)	tq_link;
51	STAILQ_HEAD(, task)	tq_queue;
52	const char		*tq_name;
53	taskqueue_enqueue_fn	tq_enqueue;
54	void			*tq_context;
55	int			tq_draining;
56	struct mtx		tq_mutex;
57};
58
59static void	init_taskqueue_list(void *data);
60
61static void
62init_taskqueue_list(void *data __unused)
63{
64
65	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
66	STAILQ_INIT(&taskqueue_queues);
67}
68SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
69    NULL);
70
71struct taskqueue *
72taskqueue_create(const char *name, int mflags,
73		 taskqueue_enqueue_fn enqueue, void *context)
74{
75	struct taskqueue *queue;
76
77	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
78	if (!queue)
79		return 0;
80
81	STAILQ_INIT(&queue->tq_queue);
82	queue->tq_name = name;
83	queue->tq_enqueue = enqueue;
84	queue->tq_context = context;
85	queue->tq_draining = 0;
86	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
87
88	mtx_lock(&taskqueue_queues_mutex);
89	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
90	mtx_unlock(&taskqueue_queues_mutex);
91
92	return queue;
93}
94
95void
96taskqueue_free(struct taskqueue *queue)
97{
98
99	mtx_lock(&queue->tq_mutex);
100	KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue"));
101	queue->tq_draining = 1;
102	mtx_unlock(&queue->tq_mutex);
103
104	taskqueue_run(queue);
105
106	mtx_lock(&taskqueue_queues_mutex);
107	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
108	mtx_unlock(&taskqueue_queues_mutex);
109
110	mtx_destroy(&queue->tq_mutex);
111	free(queue, M_TASKQUEUE);
112}
113
114/*
115 * Returns with the taskqueue locked.
116 */
117struct taskqueue *
118taskqueue_find(const char *name)
119{
120	struct taskqueue *queue;
121
122	mtx_lock(&taskqueue_queues_mutex);
123	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
124		mtx_lock(&queue->tq_mutex);
125		if (strcmp(queue->tq_name, name) == 0) {
126			mtx_unlock(&taskqueue_queues_mutex);
127			return queue;
128		}
129		mtx_unlock(&queue->tq_mutex);
130	}
131	mtx_unlock(&taskqueue_queues_mutex);
132	return NULL;
133}
134
135int
136taskqueue_enqueue(struct taskqueue *queue, struct task *task)
137{
138	struct task *ins;
139	struct task *prev;
140
141	mtx_lock(&queue->tq_mutex);
142
143	/*
144	 * Don't allow new tasks on a queue which is being freed.
145	 */
146	if (queue->tq_draining) {
147		mtx_unlock(&queue->tq_mutex);
148		return EPIPE;
149	}
150
151	/*
152	 * Count multiple enqueues.
153	 */
154	if (task->ta_pending) {
155		task->ta_pending++;
156		mtx_unlock(&queue->tq_mutex);
157		return 0;
158	}
159
160	/*
161	 * Optimise the case when all tasks have the same priority.
162	 */
163	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
164	if (!prev || prev->ta_priority >= task->ta_priority) {
165		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
166	} else {
167		prev = 0;
168		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
169		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
170			if (ins->ta_priority < task->ta_priority)
171				break;
172
173		if (prev)
174			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
175		else
176			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
177	}
178
179	task->ta_pending = 1;
180	if (queue->tq_enqueue)
181		queue->tq_enqueue(queue->tq_context);
182
183	mtx_unlock(&queue->tq_mutex);
184
185	return 0;
186}
187
188void
189taskqueue_run(struct taskqueue *queue)
190{
191	struct task *task;
192	int pending;
193
194	mtx_lock(&queue->tq_mutex);
195	while (STAILQ_FIRST(&queue->tq_queue)) {
196		/*
197		 * Carefully remove the first task from the queue and
198		 * zero its pending count.
199		 */
200		task = STAILQ_FIRST(&queue->tq_queue);
201		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
202		pending = task->ta_pending;
203		task->ta_pending = 0;
204		mtx_unlock(&queue->tq_mutex);
205
206		task->ta_func(task->ta_context, pending);
207
208		mtx_lock(&queue->tq_mutex);
209	}
210	mtx_unlock(&queue->tq_mutex);
211}
212
213static void
214taskqueue_swi_enqueue(void *context)
215{
216	swi_sched(taskqueue_ih, 0);
217}
218
219static void
220taskqueue_swi_run(void *dummy)
221{
222	taskqueue_run(taskqueue_swi);
223}
224
225static void
226taskqueue_swi_giant_enqueue(void *context)
227{
228	swi_sched(taskqueue_giant_ih, 0);
229}
230
231static void
232taskqueue_swi_giant_run(void *dummy)
233{
234	taskqueue_run(taskqueue_swi_giant);
235}
236
237static void
238taskqueue_kthread(void *arg)
239{
240	struct mtx kthread_mutex;
241
242	bzero(&kthread_mutex, sizeof(kthread_mutex));
243
244	mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF);
245
246	mtx_lock(&kthread_mutex);
247
248	for (;;) {
249		mtx_unlock(&kthread_mutex);
250		taskqueue_run(taskqueue_thread);
251		mtx_lock(&kthread_mutex);
252		msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0);
253	}
254}
255
256static void
257taskqueue_thread_enqueue(void *context)
258{
259	wakeup(&taskqueue_thread);
260}
261
262TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
263		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
264		     INTR_MPSAFE, &taskqueue_ih));
265
266TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
267		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
268		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
269
270TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0,
271		 kthread_create(taskqueue_kthread, NULL,
272		 &taskqueue_thread_proc, 0, 0, "taskqueue"));
273
274int
275taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
276{
277	struct task *ins;
278	struct task *prev;
279
280	mtx_lock_spin(&queue->tq_mutex);
281
282	/*
283	 * Don't allow new tasks on a queue which is being freed.
284	 */
285	if (queue->tq_draining) {
286		mtx_unlock_spin(&queue->tq_mutex);
287		return EPIPE;
288	}
289
290	/*
291	 * Count multiple enqueues.
292	 */
293	if (task->ta_pending) {
294		task->ta_pending++;
295		mtx_unlock_spin(&queue->tq_mutex);
296		return 0;
297	}
298
299	/*
300	 * Optimise the case when all tasks have the same priority.
301	 */
302	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
303	if (!prev || prev->ta_priority >= task->ta_priority) {
304		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
305	} else {
306		prev = 0;
307		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
308		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
309			if (ins->ta_priority < task->ta_priority)
310				break;
311
312		if (prev)
313			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
314		else
315			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
316	}
317
318	task->ta_pending = 1;
319	if (queue->tq_enqueue)
320		queue->tq_enqueue(queue->tq_context);
321
322	mtx_unlock_spin(&queue->tq_mutex);
323
324	return 0;
325}
326
327static void
328taskqueue_run_fast(struct taskqueue *queue)
329{
330	struct task *task;
331	int pending;
332
333	mtx_lock_spin(&queue->tq_mutex);
334	while (STAILQ_FIRST(&queue->tq_queue)) {
335		/*
336		 * Carefully remove the first task from the queue and
337		 * zero its pending count.
338		 */
339		task = STAILQ_FIRST(&queue->tq_queue);
340		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
341		pending = task->ta_pending;
342		task->ta_pending = 0;
343		mtx_unlock_spin(&queue->tq_mutex);
344
345		task->ta_func(task->ta_context, pending);
346
347		mtx_lock_spin(&queue->tq_mutex);
348	}
349	mtx_unlock_spin(&queue->tq_mutex);
350}
351
352struct taskqueue *taskqueue_fast;
353static void	*taskqueue_fast_ih;
354
355static void
356taskqueue_fast_schedule(void *context)
357{
358	swi_sched(taskqueue_fast_ih, 0);
359}
360
361static void
362taskqueue_fast_run(void *dummy)
363{
364	taskqueue_run_fast(taskqueue_fast);
365}
366
367static void
368taskqueue_define_fast(void *arg)
369{
370	taskqueue_fast = malloc(sizeof(struct taskqueue),
371		M_TASKQUEUE, M_NOWAIT | M_ZERO);
372	if (!taskqueue_fast) {
373		printf("%s: Unable to allocate fast task queue!\n", __func__);
374		return;
375	}
376
377	STAILQ_INIT(&taskqueue_fast->tq_queue);
378	taskqueue_fast->tq_name = "fast";
379	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
380	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
381
382	mtx_lock(&taskqueue_queues_mutex);
383	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
384	mtx_unlock(&taskqueue_queues_mutex);
385
386	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
387		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
388}
389SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
390	taskqueue_define_fast, NULL);
391