subr_taskqueue.c revision 122436
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 122436 2003-11-10 20:39:44Z alfred $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/lock.h>
36#include <sys/malloc.h>
37#include <sys/mutex.h>
38#include <sys/taskqueue.h>
39#include <sys/kthread.h>
40#include <sys/unistd.h>
41
42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
43
44static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
45
46static void	*taskqueue_ih;
47static void	*taskqueue_giant_ih;
48static struct mtx taskqueue_queues_mutex;
49static struct proc *taskqueue_thread_proc;
50
51struct taskqueue {
52	STAILQ_ENTRY(taskqueue)	tq_link;
53	STAILQ_HEAD(, task)	tq_queue;
54	const char		*tq_name;
55	taskqueue_enqueue_fn	tq_enqueue;
56	void			*tq_context;
57	int			tq_draining;
58	struct mtx		tq_mutex;
59};
60
61static void	init_taskqueue_list(void *data);
62
63static void
64init_taskqueue_list(void *data __unused)
65{
66
67	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
68	STAILQ_INIT(&taskqueue_queues);
69}
70SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
71    NULL);
72
73struct taskqueue *
74taskqueue_create(const char *name, int mflags,
75		 taskqueue_enqueue_fn enqueue, void *context)
76{
77	struct taskqueue *queue;
78
79	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
80	if (!queue)
81		return 0;
82
83	STAILQ_INIT(&queue->tq_queue);
84	queue->tq_name = name;
85	queue->tq_enqueue = enqueue;
86	queue->tq_context = context;
87	queue->tq_draining = 0;
88	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
89
90	mtx_lock(&taskqueue_queues_mutex);
91	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
92	mtx_unlock(&taskqueue_queues_mutex);
93
94	return queue;
95}
96
97void
98taskqueue_free(struct taskqueue *queue)
99{
100
101	mtx_lock(&queue->tq_mutex);
102	KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue"));
103	queue->tq_draining = 1;
104	mtx_unlock(&queue->tq_mutex);
105
106	taskqueue_run(queue);
107
108	mtx_lock(&taskqueue_queues_mutex);
109	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
110	mtx_unlock(&taskqueue_queues_mutex);
111
112	mtx_destroy(&queue->tq_mutex);
113	free(queue, M_TASKQUEUE);
114}
115
116/*
117 * Returns with the taskqueue locked.
118 */
119struct taskqueue *
120taskqueue_find(const char *name)
121{
122	struct taskqueue *queue;
123
124	mtx_lock(&taskqueue_queues_mutex);
125	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
126		mtx_lock(&queue->tq_mutex);
127		if (!strcmp(queue->tq_name, name)) {
128			mtx_unlock(&taskqueue_queues_mutex);
129			return queue;
130		}
131		mtx_unlock(&queue->tq_mutex);
132	}
133	mtx_unlock(&taskqueue_queues_mutex);
134	return 0;
135}
136
137int
138taskqueue_enqueue(struct taskqueue *queue, struct task *task)
139{
140	struct task *ins;
141	struct task *prev;
142
143	mtx_lock(&queue->tq_mutex);
144
145	/*
146	 * Don't allow new tasks on a queue which is being freed.
147	 */
148	if (queue->tq_draining) {
149		mtx_unlock(&queue->tq_mutex);
150		return EPIPE;
151	}
152
153	/*
154	 * Count multiple enqueues.
155	 */
156	if (task->ta_pending) {
157		task->ta_pending++;
158		mtx_unlock(&queue->tq_mutex);
159		return 0;
160	}
161
162	/*
163	 * Optimise the case when all tasks have the same priority.
164	 */
165	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
166	if (!prev || prev->ta_priority >= task->ta_priority) {
167		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
168	} else {
169		prev = 0;
170		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
171		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
172			if (ins->ta_priority < task->ta_priority)
173				break;
174
175		if (prev)
176			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
177		else
178			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
179	}
180
181	task->ta_pending = 1;
182	if (queue->tq_enqueue)
183		queue->tq_enqueue(queue->tq_context);
184
185	mtx_unlock(&queue->tq_mutex);
186
187	return 0;
188}
189
190void
191taskqueue_run(struct taskqueue *queue)
192{
193	struct task *task;
194	int pending;
195
196	mtx_lock(&queue->tq_mutex);
197	while (STAILQ_FIRST(&queue->tq_queue)) {
198		/*
199		 * Carefully remove the first task from the queue and
200		 * zero its pending count.
201		 */
202		task = STAILQ_FIRST(&queue->tq_queue);
203		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
204		pending = task->ta_pending;
205		task->ta_pending = 0;
206		mtx_unlock(&queue->tq_mutex);
207
208		task->ta_func(task->ta_context, pending);
209
210		mtx_lock(&queue->tq_mutex);
211	}
212	mtx_unlock(&queue->tq_mutex);
213}
214
215static void
216taskqueue_swi_enqueue(void *context)
217{
218	swi_sched(taskqueue_ih, 0);
219}
220
221static void
222taskqueue_swi_run(void *dummy)
223{
224	taskqueue_run(taskqueue_swi);
225}
226
227static void
228taskqueue_swi_giant_enqueue(void *context)
229{
230	swi_sched(taskqueue_giant_ih, 0);
231}
232
233static void
234taskqueue_swi_giant_run(void *dummy)
235{
236	taskqueue_run(taskqueue_swi_giant);
237}
238
239static void
240taskqueue_kthread(void *arg)
241{
242	struct mtx kthread_mutex;
243
244	bzero(&kthread_mutex, sizeof(kthread_mutex));
245
246	mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF);
247
248	mtx_lock(&kthread_mutex);
249
250	for (;;) {
251		mtx_unlock(&kthread_mutex);
252		taskqueue_run(taskqueue_thread);
253		mtx_lock(&kthread_mutex);
254		msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0);
255	}
256}
257
258static void
259taskqueue_thread_enqueue(void *context)
260{
261	wakeup(&taskqueue_thread);
262}
263
264TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
265		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
266		     INTR_MPSAFE, &taskqueue_ih));
267
268TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
269		 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run,
270		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
271
272TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0,
273		 kthread_create(taskqueue_kthread, NULL,
274		 &taskqueue_thread_proc, 0, 0, "taskqueue"));
275
276int
277taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
278{
279	struct task *ins;
280	struct task *prev;
281
282	mtx_lock_spin(&queue->tq_mutex);
283
284	/*
285	 * Don't allow new tasks on a queue which is being freed.
286	 */
287	if (queue->tq_draining) {
288		mtx_unlock_spin(&queue->tq_mutex);
289		return EPIPE;
290	}
291
292	/*
293	 * Count multiple enqueues.
294	 */
295	if (task->ta_pending) {
296		task->ta_pending++;
297		mtx_unlock_spin(&queue->tq_mutex);
298		return 0;
299	}
300
301	/*
302	 * Optimise the case when all tasks have the same priority.
303	 */
304	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
305	if (!prev || prev->ta_priority >= task->ta_priority) {
306		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
307	} else {
308		prev = 0;
309		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
310		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
311			if (ins->ta_priority < task->ta_priority)
312				break;
313
314		if (prev)
315			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
316		else
317			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
318	}
319
320	task->ta_pending = 1;
321	if (queue->tq_enqueue)
322		queue->tq_enqueue(queue->tq_context);
323
324	mtx_unlock_spin(&queue->tq_mutex);
325
326	return 0;
327}
328
329static void
330taskqueue_run_fast(struct taskqueue *queue)
331{
332	struct task *task;
333	int pending;
334
335	mtx_lock_spin(&queue->tq_mutex);
336	while (STAILQ_FIRST(&queue->tq_queue)) {
337		/*
338		 * Carefully remove the first task from the queue and
339		 * zero its pending count.
340		 */
341		task = STAILQ_FIRST(&queue->tq_queue);
342		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
343		pending = task->ta_pending;
344		task->ta_pending = 0;
345		mtx_unlock_spin(&queue->tq_mutex);
346
347		task->ta_func(task->ta_context, pending);
348
349		mtx_lock_spin(&queue->tq_mutex);
350	}
351	mtx_unlock_spin(&queue->tq_mutex);
352}
353
354struct taskqueue *taskqueue_fast;
355static void	*taskqueue_fast_ih;
356
357static void
358taskqueue_fast_schedule(void *context)
359{
360	swi_sched(taskqueue_fast_ih, 0);
361}
362
363static void
364taskqueue_fast_run(void *dummy)
365{
366	taskqueue_run_fast(taskqueue_fast);
367}
368
369static void
370taskqueue_define_fast(void *arg)
371{
372	taskqueue_fast = malloc(sizeof(struct taskqueue),
373		M_TASKQUEUE, M_NOWAIT | M_ZERO);
374	if (!taskqueue_fast) {
375		printf("%s: Unable to allocate fast task queue!\n", __func__);
376		return;
377	}
378
379	STAILQ_INIT(&taskqueue_fast->tq_queue);
380	taskqueue_fast->tq_name = "fast";
381	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
382	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
383
384	mtx_lock(&taskqueue_queues_mutex);
385	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
386	mtx_unlock(&taskqueue_queues_mutex);
387
388	swi_add(NULL, "Fast task queue", taskqueue_fast_run,
389		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
390}
391SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
392	taskqueue_define_fast, NULL);
393