subr_taskqueue.c revision 133305
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 133305 2004-08-08 02:37:22Z jmg $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/taskqueue.h>
40#include <sys/unistd.h>
41
42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43static void	*taskqueue_giant_ih;
44static void	*taskqueue_ih;
45static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46static struct mtx taskqueue_queues_mutex;
47
48struct taskqueue {
49	STAILQ_ENTRY(taskqueue)	tq_link;
50	STAILQ_HEAD(, task)	tq_queue;
51	const char		*tq_name;
52	taskqueue_enqueue_fn	tq_enqueue;
53	void			*tq_context;
54	struct mtx		tq_mutex;
55};
56
57static void	init_taskqueue_list(void *data);
58
59static void
60init_taskqueue_list(void *data __unused)
61{
62
63	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
64	STAILQ_INIT(&taskqueue_queues);
65}
66SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
67    NULL);
68
69struct taskqueue *
70taskqueue_create(const char *name, int mflags,
71		 taskqueue_enqueue_fn enqueue, void *context)
72{
73	struct taskqueue *queue;
74
75	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
76	if (!queue)
77		return 0;
78
79	STAILQ_INIT(&queue->tq_queue);
80	queue->tq_name = name;
81	queue->tq_enqueue = enqueue;
82	queue->tq_context = context;
83	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
84
85	mtx_lock(&taskqueue_queues_mutex);
86	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
87	mtx_unlock(&taskqueue_queues_mutex);
88
89	return queue;
90}
91
92void
93taskqueue_free(struct taskqueue *queue)
94{
95
96	mtx_lock(&taskqueue_queues_mutex);
97	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
98	mtx_unlock(&taskqueue_queues_mutex);
99
100	mtx_lock(&queue->tq_mutex);
101	taskqueue_run(queue);
102	mtx_destroy(&queue->tq_mutex);
103	free(queue, M_TASKQUEUE);
104}
105
106/*
107 * Returns with the taskqueue locked.
108 */
109struct taskqueue *
110taskqueue_find(const char *name)
111{
112	struct taskqueue *queue;
113
114	mtx_lock(&taskqueue_queues_mutex);
115	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
116		if (strcmp(queue->tq_name, name) == 0) {
117			mtx_lock(&queue->tq_mutex);
118			mtx_unlock(&taskqueue_queues_mutex);
119			return queue;
120		}
121	}
122	mtx_unlock(&taskqueue_queues_mutex);
123	return NULL;
124}
125
126int
127taskqueue_enqueue(struct taskqueue *queue, struct task *task)
128{
129	struct task *ins;
130	struct task *prev;
131
132	mtx_lock(&queue->tq_mutex);
133
134	/*
135	 * Count multiple enqueues.
136	 */
137	if (task->ta_pending) {
138		task->ta_pending++;
139		mtx_unlock(&queue->tq_mutex);
140		return 0;
141	}
142
143	/*
144	 * Optimise the case when all tasks have the same priority.
145	 */
146	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
147	if (!prev || prev->ta_priority >= task->ta_priority) {
148		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
149	} else {
150		prev = 0;
151		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
152		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
153			if (ins->ta_priority < task->ta_priority)
154				break;
155
156		if (prev)
157			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
158		else
159			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
160	}
161
162	task->ta_pending = 1;
163	if (queue->tq_enqueue)
164		queue->tq_enqueue(queue->tq_context);
165
166	mtx_unlock(&queue->tq_mutex);
167
168	return 0;
169}
170
171void
172taskqueue_run(struct taskqueue *queue)
173{
174	struct task *task;
175	int owned, pending;
176
177	owned = mtx_owned(&queue->tq_mutex);
178	if (!owned)
179		mtx_lock(&queue->tq_mutex);
180	while (STAILQ_FIRST(&queue->tq_queue)) {
181		/*
182		 * Carefully remove the first task from the queue and
183		 * zero its pending count.
184		 */
185		task = STAILQ_FIRST(&queue->tq_queue);
186		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
187		pending = task->ta_pending;
188		task->ta_pending = 0;
189		mtx_unlock(&queue->tq_mutex);
190
191		task->ta_func(task->ta_context, pending);
192
193		mtx_lock(&queue->tq_mutex);
194	}
195
196	/*
197	 * For compatibility, unlock on return if the queue was not locked
198	 * on entry, although this opens a race window.
199	 */
200	if (!owned)
201		mtx_unlock(&queue->tq_mutex);
202}
203
204static void
205taskqueue_swi_enqueue(void *context)
206{
207	swi_sched(taskqueue_ih, 0);
208}
209
210static void
211taskqueue_swi_run(void *dummy)
212{
213	taskqueue_run(taskqueue_swi);
214}
215
216static void
217taskqueue_swi_giant_enqueue(void *context)
218{
219	swi_sched(taskqueue_giant_ih, 0);
220}
221
222static void
223taskqueue_swi_giant_run(void *dummy)
224{
225	taskqueue_run(taskqueue_swi_giant);
226}
227
228void
229taskqueue_thread_loop(void *arg)
230{
231	struct taskqueue **tqp, *tq;
232
233	tqp = arg;
234	tq = *tqp;
235	mtx_lock(&tq->tq_mutex);
236	for (;;) {
237		taskqueue_run(tq);
238		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
239	}
240}
241
242void
243taskqueue_thread_enqueue(void *context)
244{
245	struct taskqueue **tqp, *tq;
246
247	tqp = context;
248	tq = *tqp;
249
250	mtx_assert(&tq->tq_mutex, MA_OWNED);
251	wakeup(tq);
252}
253
254TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
255		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
256		     INTR_MPSAFE, &taskqueue_ih));
257
258TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
259		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
260		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
261
262TASKQUEUE_DEFINE_THREAD(thread);
263
264int
265taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
266{
267	struct task *ins;
268	struct task *prev;
269
270	mtx_lock_spin(&queue->tq_mutex);
271
272	/*
273	 * Count multiple enqueues.
274	 */
275	if (task->ta_pending) {
276		task->ta_pending++;
277		mtx_unlock_spin(&queue->tq_mutex);
278		return 0;
279	}
280
281	/*
282	 * Optimise the case when all tasks have the same priority.
283	 */
284	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
285	if (!prev || prev->ta_priority >= task->ta_priority) {
286		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
287	} else {
288		prev = 0;
289		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
290		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
291			if (ins->ta_priority < task->ta_priority)
292				break;
293
294		if (prev)
295			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
296		else
297			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
298	}
299
300	task->ta_pending = 1;
301	if (queue->tq_enqueue)
302		queue->tq_enqueue(queue->tq_context);
303
304	mtx_unlock_spin(&queue->tq_mutex);
305
306	return 0;
307}
308
309static void
310taskqueue_run_fast(struct taskqueue *queue)
311{
312	struct task *task;
313	int pending;
314
315	mtx_lock_spin(&queue->tq_mutex);
316	while (STAILQ_FIRST(&queue->tq_queue)) {
317		/*
318		 * Carefully remove the first task from the queue and
319		 * zero its pending count.
320		 */
321		task = STAILQ_FIRST(&queue->tq_queue);
322		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
323		pending = task->ta_pending;
324		task->ta_pending = 0;
325		mtx_unlock_spin(&queue->tq_mutex);
326
327		task->ta_func(task->ta_context, pending);
328
329		mtx_lock_spin(&queue->tq_mutex);
330	}
331	mtx_unlock_spin(&queue->tq_mutex);
332}
333
334struct taskqueue *taskqueue_fast;
335static void	*taskqueue_fast_ih;
336
337static void
338taskqueue_fast_schedule(void *context)
339{
340	swi_sched(taskqueue_fast_ih, 0);
341}
342
343static void
344taskqueue_fast_run(void *dummy)
345{
346	taskqueue_run_fast(taskqueue_fast);
347}
348
349static void
350taskqueue_define_fast(void *arg)
351{
352
353	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
354	    M_NOWAIT | M_ZERO);
355	if (!taskqueue_fast) {
356		printf("%s: Unable to allocate fast task queue!\n", __func__);
357		return;
358	}
359
360	STAILQ_INIT(&taskqueue_fast->tq_queue);
361	taskqueue_fast->tq_name = "fast";
362	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
363	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
364
365	mtx_lock(&taskqueue_queues_mutex);
366	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
367	mtx_unlock(&taskqueue_queues_mutex);
368
369	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
370		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
371}
372SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
373    taskqueue_define_fast, NULL);
374