subr_taskqueue.c revision 126027
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 126027 2004-02-19 22:03:52Z jhb $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/taskqueue.h>
40#include <sys/unistd.h>
41
42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43static void	*taskqueue_giant_ih;
44static void	*taskqueue_ih;
45static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46static struct mtx taskqueue_queues_mutex;
47static struct proc *taskqueue_thread_proc;
48
49struct taskqueue {
50	STAILQ_ENTRY(taskqueue)	tq_link;
51	STAILQ_HEAD(, task)	tq_queue;
52	const char		*tq_name;
53	taskqueue_enqueue_fn	tq_enqueue;
54	void			*tq_context;
55	int			tq_draining;
56	struct mtx		tq_mutex;
57};
58
59static void	init_taskqueue_list(void *data);
60
61static void
62init_taskqueue_list(void *data __unused)
63{
64
65	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
66	STAILQ_INIT(&taskqueue_queues);
67}
68SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
69    NULL);
70
71struct taskqueue *
72taskqueue_create(const char *name, int mflags,
73		 taskqueue_enqueue_fn enqueue, void *context)
74{
75	struct taskqueue *queue;
76
77	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
78	if (!queue)
79		return 0;
80
81	STAILQ_INIT(&queue->tq_queue);
82	queue->tq_name = name;
83	queue->tq_enqueue = enqueue;
84	queue->tq_context = context;
85	queue->tq_draining = 0;
86	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
87
88	mtx_lock(&taskqueue_queues_mutex);
89	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
90	mtx_unlock(&taskqueue_queues_mutex);
91
92	return queue;
93}
94
95void
96taskqueue_free(struct taskqueue *queue)
97{
98
99	mtx_lock(&queue->tq_mutex);
100	KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue"));
101	queue->tq_draining = 1;
102	mtx_unlock(&queue->tq_mutex);
103
104	taskqueue_run(queue);
105
106	mtx_lock(&taskqueue_queues_mutex);
107	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
108	mtx_unlock(&taskqueue_queues_mutex);
109
110	mtx_destroy(&queue->tq_mutex);
111	free(queue, M_TASKQUEUE);
112}
113
114/*
115 * Returns with the taskqueue locked.
116 */
117struct taskqueue *
118taskqueue_find(const char *name)
119{
120	struct taskqueue *queue;
121
122	mtx_lock(&taskqueue_queues_mutex);
123	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
124		mtx_lock(&queue->tq_mutex);
125		if (strcmp(queue->tq_name, name) == 0) {
126			mtx_unlock(&taskqueue_queues_mutex);
127			return queue;
128		}
129		mtx_unlock(&queue->tq_mutex);
130	}
131	mtx_unlock(&taskqueue_queues_mutex);
132	return NULL;
133}
134
135int
136taskqueue_enqueue(struct taskqueue *queue, struct task *task)
137{
138	struct task *ins;
139	struct task *prev;
140
141	mtx_lock(&queue->tq_mutex);
142
143	/*
144	 * Don't allow new tasks on a queue which is being freed.
145	 */
146	if (queue->tq_draining) {
147		mtx_unlock(&queue->tq_mutex);
148		return EPIPE;
149	}
150
151	/*
152	 * Count multiple enqueues.
153	 */
154	if (task->ta_pending) {
155		task->ta_pending++;
156		mtx_unlock(&queue->tq_mutex);
157		return 0;
158	}
159
160	/*
161	 * Optimise the case when all tasks have the same priority.
162	 */
163	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
164	if (!prev || prev->ta_priority >= task->ta_priority) {
165		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
166	} else {
167		prev = 0;
168		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
169		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
170			if (ins->ta_priority < task->ta_priority)
171				break;
172
173		if (prev)
174			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
175		else
176			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
177	}
178
179	task->ta_pending = 1;
180	if (queue->tq_enqueue)
181		queue->tq_enqueue(queue->tq_context);
182
183	mtx_unlock(&queue->tq_mutex);
184
185	return 0;
186}
187
188void
189taskqueue_run(struct taskqueue *queue)
190{
191	struct task *task;
192	int pending;
193
194	mtx_lock(&queue->tq_mutex);
195	while (STAILQ_FIRST(&queue->tq_queue)) {
196		/*
197		 * Carefully remove the first task from the queue and
198		 * zero its pending count.
199		 */
200		task = STAILQ_FIRST(&queue->tq_queue);
201		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
202		pending = task->ta_pending;
203		task->ta_pending = 0;
204		mtx_unlock(&queue->tq_mutex);
205
206		task->ta_func(task->ta_context, pending);
207
208		mtx_lock(&queue->tq_mutex);
209	}
210	mtx_unlock(&queue->tq_mutex);
211}
212
213static void
214taskqueue_swi_enqueue(void *context)
215{
216	swi_sched(taskqueue_ih, 0);
217}
218
219static void
220taskqueue_swi_run(void *dummy)
221{
222	taskqueue_run(taskqueue_swi);
223}
224
225static void
226taskqueue_swi_giant_enqueue(void *context)
227{
228	swi_sched(taskqueue_giant_ih, 0);
229}
230
231static void
232taskqueue_swi_giant_run(void *dummy)
233{
234	taskqueue_run(taskqueue_swi_giant);
235}
236
237static void
238taskqueue_thread_loop(void *arg)
239{
240	for (;;) {
241		mtx_lock(&taskqueue_thread->tq_mutex);
242		while (STAILQ_EMPTY(&taskqueue_thread->tq_queue))
243			msleep(taskqueue_thread, &taskqueue_thread->tq_mutex,
244			    PWAIT, "-", 0);
245		mtx_unlock(&taskqueue_thread->tq_mutex);
246		taskqueue_run(taskqueue_thread);
247	}
248}
249
250static void
251taskqueue_thread_enqueue(void *context)
252{
253	mtx_assert(&taskqueue_thread->tq_mutex, MA_OWNED);
254	wakeup(taskqueue_thread);
255}
256
257TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
258		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
259		     INTR_MPSAFE, &taskqueue_ih));
260
261TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
262		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
263		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
264
265TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0,
266		 kthread_create(taskqueue_thread_loop, NULL,
267		 &taskqueue_thread_proc, 0, 0, "taskqueue"));
268
269int
270taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
271{
272	struct task *ins;
273	struct task *prev;
274
275	mtx_lock_spin(&queue->tq_mutex);
276
277	/*
278	 * Don't allow new tasks on a queue which is being freed.
279	 */
280	if (queue->tq_draining) {
281		mtx_unlock_spin(&queue->tq_mutex);
282		return EPIPE;
283	}
284
285	/*
286	 * Count multiple enqueues.
287	 */
288	if (task->ta_pending) {
289		task->ta_pending++;
290		mtx_unlock_spin(&queue->tq_mutex);
291		return 0;
292	}
293
294	/*
295	 * Optimise the case when all tasks have the same priority.
296	 */
297	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
298	if (!prev || prev->ta_priority >= task->ta_priority) {
299		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
300	} else {
301		prev = 0;
302		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
303		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
304			if (ins->ta_priority < task->ta_priority)
305				break;
306
307		if (prev)
308			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
309		else
310			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
311	}
312
313	task->ta_pending = 1;
314	if (queue->tq_enqueue)
315		queue->tq_enqueue(queue->tq_context);
316
317	mtx_unlock_spin(&queue->tq_mutex);
318
319	return 0;
320}
321
322static void
323taskqueue_run_fast(struct taskqueue *queue)
324{
325	struct task *task;
326	int pending;
327
328	mtx_lock_spin(&queue->tq_mutex);
329	while (STAILQ_FIRST(&queue->tq_queue)) {
330		/*
331		 * Carefully remove the first task from the queue and
332		 * zero its pending count.
333		 */
334		task = STAILQ_FIRST(&queue->tq_queue);
335		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
336		pending = task->ta_pending;
337		task->ta_pending = 0;
338		mtx_unlock_spin(&queue->tq_mutex);
339
340		task->ta_func(task->ta_context, pending);
341
342		mtx_lock_spin(&queue->tq_mutex);
343	}
344	mtx_unlock_spin(&queue->tq_mutex);
345}
346
347struct taskqueue *taskqueue_fast;
348static void	*taskqueue_fast_ih;
349
350static void
351taskqueue_fast_schedule(void *context)
352{
353	swi_sched(taskqueue_fast_ih, 0);
354}
355
356static void
357taskqueue_fast_run(void *dummy)
358{
359	taskqueue_run_fast(taskqueue_fast);
360}
361
362static void
363taskqueue_define_fast(void *arg)
364{
365	taskqueue_fast = malloc(sizeof(struct taskqueue),
366		M_TASKQUEUE, M_NOWAIT | M_ZERO);
367	if (!taskqueue_fast) {
368		printf("%s: Unable to allocate fast task queue!\n", __func__);
369		return;
370	}
371
372	STAILQ_INIT(&taskqueue_fast->tq_queue);
373	taskqueue_fast->tq_name = "fast";
374	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
375	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
376
377	mtx_lock(&taskqueue_queues_mutex);
378	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
379	mtx_unlock(&taskqueue_queues_mutex);
380
381	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
382		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
383}
384SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
385	taskqueue_define_fast, NULL);
386