subr_taskqueue.c revision 136131
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 136131 2004-10-05 04:16:01Z imp $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/taskqueue.h>
40#include <sys/unistd.h>
41
42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43static void	*taskqueue_giant_ih;
44static void	*taskqueue_ih;
45static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46static struct mtx taskqueue_queues_mutex;
47
48struct taskqueue {
49	STAILQ_ENTRY(taskqueue)	tq_link;
50	STAILQ_HEAD(, task)	tq_queue;
51	const char		*tq_name;
52	taskqueue_enqueue_fn	tq_enqueue;
53	void			*tq_context;
54	struct mtx		tq_mutex;
55};
56
57static void	init_taskqueue_list(void *data);
58
59static void
60init_taskqueue_list(void *data __unused)
61{
62
63	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
64	STAILQ_INIT(&taskqueue_queues);
65}
66SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
67    NULL);
68
69struct taskqueue *
70taskqueue_create(const char *name, int mflags,
71		 taskqueue_enqueue_fn enqueue, void *context)
72{
73	struct taskqueue *queue;
74
75	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
76	if (!queue)
77		return 0;
78
79	STAILQ_INIT(&queue->tq_queue);
80	queue->tq_name = name;
81	queue->tq_enqueue = enqueue;
82	queue->tq_context = context;
83	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
84
85	mtx_lock(&taskqueue_queues_mutex);
86	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
87	mtx_unlock(&taskqueue_queues_mutex);
88
89	return queue;
90}
91
92void
93taskqueue_free(struct taskqueue *queue)
94{
95
96	mtx_lock(&taskqueue_queues_mutex);
97	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
98	mtx_unlock(&taskqueue_queues_mutex);
99
100	mtx_lock(&queue->tq_mutex);
101	taskqueue_run(queue);
102	mtx_destroy(&queue->tq_mutex);
103	free(queue, M_TASKQUEUE);
104}
105
106/*
107 * Returns with the taskqueue locked.
108 */
109struct taskqueue *
110taskqueue_find(const char *name)
111{
112	struct taskqueue *queue;
113
114	mtx_lock(&taskqueue_queues_mutex);
115	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
116		if (strcmp(queue->tq_name, name) == 0) {
117			mtx_lock(&queue->tq_mutex);
118			mtx_unlock(&taskqueue_queues_mutex);
119			return queue;
120		}
121	}
122	mtx_unlock(&taskqueue_queues_mutex);
123	return NULL;
124}
125
126int
127taskqueue_enqueue(struct taskqueue *queue, struct task *task)
128{
129	struct task *ins;
130	struct task *prev;
131
132	mtx_lock(&queue->tq_mutex);
133
134	/*
135	 * Count multiple enqueues.
136	 */
137	if (task->ta_pending) {
138		task->ta_pending++;
139		mtx_unlock(&queue->tq_mutex);
140		return 0;
141	}
142
143	/*
144	 * Optimise the case when all tasks have the same priority.
145	 */
146	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
147	if (!prev || prev->ta_priority >= task->ta_priority) {
148		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
149	} else {
150		prev = 0;
151		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
152		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
153			if (ins->ta_priority < task->ta_priority)
154				break;
155
156		if (prev)
157			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
158		else
159			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
160	}
161
162	task->ta_pending = 1;
163	if (queue->tq_enqueue)
164		queue->tq_enqueue(queue->tq_context);
165
166	mtx_unlock(&queue->tq_mutex);
167
168	return 0;
169}
170
171void
172taskqueue_run(struct taskqueue *queue)
173{
174	struct task *task;
175	int owned, pending;
176
177	owned = mtx_owned(&queue->tq_mutex);
178	if (!owned)
179		mtx_lock(&queue->tq_mutex);
180	while (STAILQ_FIRST(&queue->tq_queue)) {
181		/*
182		 * Carefully remove the first task from the queue and
183		 * zero its pending count.
184		 */
185		task = STAILQ_FIRST(&queue->tq_queue);
186		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
187		pending = task->ta_pending;
188		task->ta_pending = 0;
189		task->ta_flags |= TAF_PENDING;
190		mtx_unlock(&queue->tq_mutex);
191
192		task->ta_func(task->ta_context, pending);
193
194		mtx_lock(&queue->tq_mutex);
195		task->ta_flags &= ~TAF_PENDING;
196		wakeup(task);
197	}
198
199	/*
200	 * For compatibility, unlock on return if the queue was not locked
201	 * on entry, although this opens a race window.
202	 */
203	if (!owned)
204		mtx_unlock(&queue->tq_mutex);
205}
206
207void
208taskqueue_drain(struct taskqueue *queue, struct task *task)
209{
210	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain");
211	mtx_lock(&queue->tq_mutex);
212	while (task->ta_pending != 0 || (task->ta_flags & TAF_PENDING)) {
213		msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
214	}
215	mtx_unlock(&queue->tq_mutex);
216}
217
218static void
219taskqueue_swi_enqueue(void *context)
220{
221	swi_sched(taskqueue_ih, 0);
222}
223
224static void
225taskqueue_swi_run(void *dummy)
226{
227	taskqueue_run(taskqueue_swi);
228}
229
230static void
231taskqueue_swi_giant_enqueue(void *context)
232{
233	swi_sched(taskqueue_giant_ih, 0);
234}
235
236static void
237taskqueue_swi_giant_run(void *dummy)
238{
239	taskqueue_run(taskqueue_swi_giant);
240}
241
242void
243taskqueue_thread_loop(void *arg)
244{
245	struct taskqueue **tqp, *tq;
246
247	tqp = arg;
248	tq = *tqp;
249	mtx_lock(&tq->tq_mutex);
250	for (;;) {
251		taskqueue_run(tq);
252		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
253	}
254}
255
256void
257taskqueue_thread_enqueue(void *context)
258{
259	struct taskqueue **tqp, *tq;
260
261	tqp = context;
262	tq = *tqp;
263
264	mtx_assert(&tq->tq_mutex, MA_OWNED);
265	wakeup(tq);
266}
267
268TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
269		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
270		     INTR_MPSAFE, &taskqueue_ih));
271
272TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
273		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
274		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
275
276TASKQUEUE_DEFINE_THREAD(thread);
277
278int
279taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
280{
281	struct task *ins;
282	struct task *prev;
283
284	mtx_lock_spin(&queue->tq_mutex);
285
286	/*
287	 * Count multiple enqueues.
288	 */
289	if (task->ta_pending) {
290		task->ta_pending++;
291		mtx_unlock_spin(&queue->tq_mutex);
292		return 0;
293	}
294
295	/*
296	 * Optimise the case when all tasks have the same priority.
297	 */
298	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
299	if (!prev || prev->ta_priority >= task->ta_priority) {
300		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
301	} else {
302		prev = 0;
303		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
304		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
305			if (ins->ta_priority < task->ta_priority)
306				break;
307
308		if (prev)
309			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
310		else
311			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
312	}
313
314	task->ta_pending = 1;
315	if (queue->tq_enqueue)
316		queue->tq_enqueue(queue->tq_context);
317
318	mtx_unlock_spin(&queue->tq_mutex);
319
320	return 0;
321}
322
323static void
324taskqueue_run_fast(struct taskqueue *queue)
325{
326	struct task *task;
327	int pending;
328
329	mtx_lock_spin(&queue->tq_mutex);
330	while (STAILQ_FIRST(&queue->tq_queue)) {
331		/*
332		 * Carefully remove the first task from the queue and
333		 * zero its pending count.
334		 */
335		task = STAILQ_FIRST(&queue->tq_queue);
336		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
337		pending = task->ta_pending;
338		task->ta_pending = 0;
339		mtx_unlock_spin(&queue->tq_mutex);
340
341		task->ta_func(task->ta_context, pending);
342
343		mtx_lock_spin(&queue->tq_mutex);
344	}
345	mtx_unlock_spin(&queue->tq_mutex);
346}
347
348struct taskqueue *taskqueue_fast;
349static void	*taskqueue_fast_ih;
350
351static void
352taskqueue_fast_schedule(void *context)
353{
354	swi_sched(taskqueue_fast_ih, 0);
355}
356
357static void
358taskqueue_fast_run(void *dummy)
359{
360	taskqueue_run_fast(taskqueue_fast);
361}
362
363static void
364taskqueue_define_fast(void *arg)
365{
366
367	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
368	    M_NOWAIT | M_ZERO);
369	if (!taskqueue_fast) {
370		printf("%s: Unable to allocate fast task queue!\n", __func__);
371		return;
372	}
373
374	STAILQ_INIT(&taskqueue_fast->tq_queue);
375	taskqueue_fast->tq_name = "fast";
376	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
377	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
378
379	mtx_lock(&taskqueue_queues_mutex);
380	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
381	mtx_unlock(&taskqueue_queues_mutex);
382
383	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
384		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
385}
386SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
387    taskqueue_define_fast, NULL);
388