subr_taskqueue.c revision 151656
1254721Semaste/*-
2254721Semaste * Copyright (c) 2000 Doug Rabson
3254721Semaste * All rights reserved.
4254721Semaste *
5254721Semaste * Redistribution and use in source and binary forms, with or without
6254721Semaste * modification, are permitted provided that the following conditions
7254721Semaste * are met:
8254721Semaste * 1. Redistributions of source code must retain the above copyright
9254721Semaste *    notice, this list of conditions and the following disclaimer.
10254721Semaste * 2. Redistributions in binary form must reproduce the above copyright
11254721Semaste *    notice, this list of conditions and the following disclaimer in the
12254721Semaste *    documentation and/or other materials provided with the distribution.
13254721Semaste *
14254721Semaste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15254721Semaste * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16254721Semaste * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17254721Semaste * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18254721Semaste * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19254721Semaste * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20254721Semaste * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21254721Semaste * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22254721Semaste * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23254721Semaste * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24254721Semaste * SUCH DAMAGE.
25254721Semaste */
26254721Semaste
27254721Semaste#include <sys/cdefs.h>
28254721Semaste__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 151656 2005-10-25 19:29:02Z jhb $");
29254721Semaste
30254721Semaste#include <sys/param.h>
31254721Semaste#include <sys/systm.h>
32254721Semaste#include <sys/bus.h>
33254721Semaste#include <sys/interrupt.h>
34254721Semaste#include <sys/kernel.h>
35254721Semaste#include <sys/kthread.h>
36254721Semaste#include <sys/lock.h>
37254721Semaste#include <sys/malloc.h>
38254721Semaste#include <sys/mutex.h>
39254721Semaste#include <sys/proc.h>
40254721Semaste#include <sys/taskqueue.h>
41254721Semaste#include <sys/unistd.h>
42254721Semaste
43254721Semastestatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
44254721Semastestatic void	*taskqueue_giant_ih;
45254721Semastestatic void	*taskqueue_ih;
46254721Semastestatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
47254721Semastestatic struct mtx taskqueue_queues_mutex;
48254721Semaste
49254721Semastestruct taskqueue {
50254721Semaste	STAILQ_ENTRY(taskqueue)	tq_link;
51254721Semaste	STAILQ_HEAD(, task)	tq_queue;
52254721Semaste	const char		*tq_name;
53254721Semaste	taskqueue_enqueue_fn	tq_enqueue;
54254721Semaste	void			*tq_context;
55254721Semaste	struct task		*tq_running;
56254721Semaste	struct mtx		tq_mutex;
57254721Semaste	struct proc		**tq_pproc;
58254721Semaste};
59254721Semaste
60254721Semastestatic void	init_taskqueue_list(void *data);
61254721Semaste
62254721Semastestatic void
63254721Semasteinit_taskqueue_list(void *data __unused)
64254721Semaste{
65254721Semaste
66254721Semaste	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
67254721Semaste	STAILQ_INIT(&taskqueue_queues);
68254721Semaste}
69254721SemasteSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
70254721Semaste    NULL);
71254721Semaste
72254721Semastestruct taskqueue *
73254721Semastetaskqueue_create(const char *name, int mflags,
74254721Semaste		 taskqueue_enqueue_fn enqueue, void *context,
75254721Semaste		 struct proc **pp)
76254721Semaste{
77254721Semaste	struct taskqueue *queue;
78254721Semaste
79254721Semaste	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
80254721Semaste	if (!queue)
81254721Semaste		return 0;
82254721Semaste
83254721Semaste	STAILQ_INIT(&queue->tq_queue);
84254721Semaste	queue->tq_name = name;
85254721Semaste	queue->tq_enqueue = enqueue;
86269024Semaste	queue->tq_context = context;
87269024Semaste	queue->tq_pproc = pp;
88269024Semaste	mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF);
89269024Semaste
90254721Semaste	mtx_lock(&taskqueue_queues_mutex);
91254721Semaste	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
92254721Semaste	mtx_unlock(&taskqueue_queues_mutex);
93254721Semaste
94254721Semaste	return queue;
95254721Semaste}
96254721Semaste
97254721Semaste/*
98254721Semaste * Signal a taskqueue thread to terminate.
99254721Semaste */
100254721Semastestatic void
101254721Semastetaskqueue_terminate(struct proc **pp, struct taskqueue *tq)
102254721Semaste{
103254721Semaste	struct proc *p;
104254721Semaste
105263363Semaste	p = *pp;
106263363Semaste	*pp = NULL;
107254721Semaste	if (p) {
108254721Semaste		wakeup_one(tq);
109254721Semaste		PROC_LOCK(p);		   /* NB: insure we don't miss wakeup */
110254721Semaste		mtx_unlock(&tq->tq_mutex); /* let taskqueue thread run */
111254721Semaste		msleep(p, &p->p_mtx, PWAIT, "taskqueue_destroy", 0);
112254721Semaste		PROC_UNLOCK(p);
113254721Semaste		mtx_lock(&tq->tq_mutex);
114254721Semaste	}
115254721Semaste}
116254721Semaste
117254721Semastevoid
118254721Semastetaskqueue_free(struct taskqueue *queue)
119254721Semaste{
120254721Semaste
121254721Semaste	mtx_lock(&taskqueue_queues_mutex);
122269024Semaste	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
123254721Semaste	mtx_unlock(&taskqueue_queues_mutex);
124254721Semaste
125269024Semaste	mtx_lock(&queue->tq_mutex);
126269024Semaste	taskqueue_run(queue);
127254721Semaste	taskqueue_terminate(queue->tq_pproc, queue);
128254721Semaste	mtx_destroy(&queue->tq_mutex);
129254721Semaste	free(queue, M_TASKQUEUE);
130254721Semaste}
131254721Semaste
132254721Semaste/*
133254721Semaste * Returns with the taskqueue locked.
134254721Semaste */
135254721Semastestruct taskqueue *
136254721Semastetaskqueue_find(const char *name)
137254721Semaste{
138254721Semaste	struct taskqueue *queue;
139254721Semaste
140254721Semaste	mtx_lock(&taskqueue_queues_mutex);
141254721Semaste	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
142254721Semaste		if (strcmp(queue->tq_name, name) == 0) {
143254721Semaste			mtx_lock(&queue->tq_mutex);
144254721Semaste			mtx_unlock(&taskqueue_queues_mutex);
145254721Semaste			return queue;
146254721Semaste		}
147254721Semaste	}
148254721Semaste	mtx_unlock(&taskqueue_queues_mutex);
149254721Semaste	return NULL;
150254721Semaste}
151254721Semaste
152254721Semasteint
153254721Semastetaskqueue_enqueue(struct taskqueue *queue, struct task *task)
154254721Semaste{
155254721Semaste	struct task *ins;
156254721Semaste	struct task *prev;
157254721Semaste
158254721Semaste	mtx_lock(&queue->tq_mutex);
159254721Semaste
160254721Semaste	/*
161254721Semaste	 * Count multiple enqueues.
162254721Semaste	 */
163254721Semaste	if (task->ta_pending) {
164254721Semaste		task->ta_pending++;
165254721Semaste		mtx_unlock(&queue->tq_mutex);
166254721Semaste		return 0;
167254721Semaste	}
168254721Semaste
169254721Semaste	/*
170254721Semaste	 * Optimise the case when all tasks have the same priority.
171254721Semaste	 */
172254721Semaste	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
173254721Semaste	if (!prev || prev->ta_priority >= task->ta_priority) {
174254721Semaste		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
175254721Semaste	} else {
176254721Semaste		prev = 0;
177254721Semaste		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
178254721Semaste		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
179254721Semaste			if (ins->ta_priority < task->ta_priority)
180254721Semaste				break;
181254721Semaste
182254721Semaste		if (prev)
183254721Semaste			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
184254721Semaste		else
185254721Semaste			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
186254721Semaste	}
187254721Semaste
188254721Semaste	task->ta_pending = 1;
189254721Semaste	queue->tq_enqueue(queue->tq_context);
190254721Semaste
191254721Semaste	mtx_unlock(&queue->tq_mutex);
192254721Semaste
193254721Semaste	return 0;
194254721Semaste}
195254721Semaste
196254721Semastevoid
197254721Semastetaskqueue_run(struct taskqueue *queue)
198254721Semaste{
199254721Semaste	struct task *task;
200254721Semaste	int owned, pending;
201254721Semaste
202254721Semaste	owned = mtx_owned(&queue->tq_mutex);
203254721Semaste	if (!owned)
204254721Semaste		mtx_lock(&queue->tq_mutex);
205254721Semaste	while (STAILQ_FIRST(&queue->tq_queue)) {
206254721Semaste		/*
207254721Semaste		 * Carefully remove the first task from the queue and
208254721Semaste		 * zero its pending count.
209254721Semaste		 */
210254721Semaste		task = STAILQ_FIRST(&queue->tq_queue);
211254721Semaste		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
212254721Semaste		pending = task->ta_pending;
213254721Semaste		task->ta_pending = 0;
214254721Semaste		queue->tq_running = task;
215254721Semaste		mtx_unlock(&queue->tq_mutex);
216254721Semaste
217254721Semaste		task->ta_func(task->ta_context, pending);
218254721Semaste
219254721Semaste		mtx_lock(&queue->tq_mutex);
220254721Semaste		queue->tq_running = NULL;
221254721Semaste		wakeup(task);
222254721Semaste	}
223254721Semaste
224254721Semaste	/*
225254721Semaste	 * For compatibility, unlock on return if the queue was not locked
226254721Semaste	 * on entry, although this opens a race window.
227254721Semaste	 */
228254721Semaste	if (!owned)
229254721Semaste		mtx_unlock(&queue->tq_mutex);
230254721Semaste}
231254721Semaste
232254721Semastevoid
233254721Semastetaskqueue_drain(struct taskqueue *queue, struct task *task)
234254721Semaste{
235254721Semaste	WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain");
236254721Semaste
237254721Semaste	mtx_lock(&queue->tq_mutex);
238254721Semaste	while (task->ta_pending != 0 || task == queue->tq_running)
239254721Semaste		msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
240254721Semaste	mtx_unlock(&queue->tq_mutex);
241254721Semaste}
242254721Semaste
243254721Semastestatic void
244254721Semastetaskqueue_swi_enqueue(void *context)
245254721Semaste{
246254721Semaste	swi_sched(taskqueue_ih, 0);
247254721Semaste}
248254721Semaste
249254721Semastestatic void
250254721Semastetaskqueue_swi_run(void *dummy)
251254721Semaste{
252254721Semaste	taskqueue_run(taskqueue_swi);
253254721Semaste}
254254721Semaste
255254721Semastestatic void
256254721Semastetaskqueue_swi_giant_enqueue(void *context)
257254721Semaste{
258254721Semaste	swi_sched(taskqueue_giant_ih, 0);
259254721Semaste}
260254721Semaste
261254721Semastestatic void
262254721Semastetaskqueue_swi_giant_run(void *dummy)
263254721Semaste{
264254721Semaste	taskqueue_run(taskqueue_swi_giant);
265254721Semaste}
266254721Semaste
267254721Semastevoid
268254721Semastetaskqueue_thread_loop(void *arg)
269254721Semaste{
270254721Semaste	struct taskqueue **tqp, *tq;
271254721Semaste
272254721Semaste	tqp = arg;
273254721Semaste	tq = *tqp;
274254721Semaste	mtx_lock(&tq->tq_mutex);
275254721Semaste	do {
276254721Semaste		taskqueue_run(tq);
277254721Semaste		msleep(tq, &tq->tq_mutex, PWAIT, "-", 0);
278254721Semaste	} while (*tq->tq_pproc != NULL);
279254721Semaste
280254721Semaste	/* rendezvous with thread that asked us to terminate */
281254721Semaste	wakeup_one(tq);
282254721Semaste	mtx_unlock(&tq->tq_mutex);
283254721Semaste	kthread_exit(0);
284254721Semaste}
285254721Semaste
286254721Semastevoid
287254721Semastetaskqueue_thread_enqueue(void *context)
288254721Semaste{
289254721Semaste	struct taskqueue **tqp, *tq;
290254721Semaste
291254721Semaste	tqp = context;
292254721Semaste	tq = *tqp;
293254721Semaste
294254721Semaste	mtx_assert(&tq->tq_mutex, MA_OWNED);
295254721Semaste	wakeup_one(tq);
296254721Semaste}
297254721Semaste
298254721SemasteTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
299254721Semaste		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
300254721Semaste		     INTR_MPSAFE, &taskqueue_ih));
301254721Semaste
302254721SemasteTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
303254721Semaste		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
304254721Semaste		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
305254721Semaste
306254721SemasteTASKQUEUE_DEFINE_THREAD(thread);
307254721Semaste
308254721Semasteint
309254721Semastetaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
310254721Semaste{
311254721Semaste	struct task *ins;
312254721Semaste	struct task *prev;
313254721Semaste
314254721Semaste	mtx_lock_spin(&queue->tq_mutex);
315254721Semaste
316254721Semaste	/*
317254721Semaste	 * Count multiple enqueues.
318254721Semaste	 */
319254721Semaste	if (task->ta_pending) {
320254721Semaste		task->ta_pending++;
321254721Semaste		mtx_unlock_spin(&queue->tq_mutex);
322254721Semaste		return 0;
323254721Semaste	}
324254721Semaste
325254721Semaste	/*
326254721Semaste	 * Optimise the case when all tasks have the same priority.
327254721Semaste	 */
328254721Semaste	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
329254721Semaste	if (!prev || prev->ta_priority >= task->ta_priority) {
330254721Semaste		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
331254721Semaste	} else {
332254721Semaste		prev = 0;
333254721Semaste		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
334254721Semaste		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
335254721Semaste			if (ins->ta_priority < task->ta_priority)
336254721Semaste				break;
337254721Semaste
338254721Semaste		if (prev)
339254721Semaste			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
340254721Semaste		else
341254721Semaste			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
342254721Semaste	}
343254721Semaste
344254721Semaste	task->ta_pending = 1;
345254721Semaste	queue->tq_enqueue(queue->tq_context);
346254721Semaste
347254721Semaste	mtx_unlock_spin(&queue->tq_mutex);
348254721Semaste
349254721Semaste	return 0;
350254721Semaste}
351254721Semaste
352254721Semastestatic void
353254721Semastetaskqueue_run_fast(struct taskqueue *queue)
354254721Semaste{
355254721Semaste	struct task *task;
356254721Semaste	int pending;
357254721Semaste
358254721Semaste	mtx_lock_spin(&queue->tq_mutex);
359254721Semaste	while (STAILQ_FIRST(&queue->tq_queue)) {
360254721Semaste		/*
361254721Semaste		 * Carefully remove the first task from the queue and
362254721Semaste		 * zero its pending count.
363254721Semaste		 */
364254721Semaste		task = STAILQ_FIRST(&queue->tq_queue);
365254721Semaste		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
366254721Semaste		pending = task->ta_pending;
367254721Semaste		task->ta_pending = 0;
368254721Semaste		mtx_unlock_spin(&queue->tq_mutex);
369254721Semaste
370254721Semaste		task->ta_func(task->ta_context, pending);
371254721Semaste
372254721Semaste		mtx_lock_spin(&queue->tq_mutex);
373254721Semaste	}
374254721Semaste	mtx_unlock_spin(&queue->tq_mutex);
375254721Semaste}
376254721Semaste
377254721Semastestruct taskqueue *taskqueue_fast;
378254721Semastestatic void	*taskqueue_fast_ih;
379254721Semaste
380254721Semastestatic void
381254721Semastetaskqueue_fast_schedule(void *context)
382254721Semaste{
383254721Semaste	swi_sched(taskqueue_fast_ih, 0);
384254721Semaste}
385254721Semaste
386254721Semastestatic void
387254721Semastetaskqueue_fast_run(void *dummy)
388254721Semaste{
389254721Semaste	taskqueue_run_fast(taskqueue_fast);
390254721Semaste}
391254721Semaste
392254721Semastestatic void
393254721Semastetaskqueue_define_fast(void *arg)
394254721Semaste{
395254721Semaste
396254721Semaste	taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE,
397254721Semaste	    M_NOWAIT | M_ZERO);
398254721Semaste	if (!taskqueue_fast) {
399254721Semaste		printf("%s: Unable to allocate fast task queue!\n", __func__);
400254721Semaste		return;
401254721Semaste	}
402254721Semaste
403254721Semaste	STAILQ_INIT(&taskqueue_fast->tq_queue);
404254721Semaste	taskqueue_fast->tq_name = "fast";
405254721Semaste	taskqueue_fast->tq_enqueue = taskqueue_fast_schedule;
406254721Semaste	mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN);
407254721Semaste
408254721Semaste	mtx_lock(&taskqueue_queues_mutex);
409254721Semaste	STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link);
410254721Semaste	mtx_unlock(&taskqueue_queues_mutex);
411254721Semaste
412254721Semaste	swi_add(NULL, "Fast taskq", taskqueue_fast_run,
413254721Semaste		NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih);
414254721Semaste}
415254721SemasteSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND,
416254721Semaste    taskqueue_define_fast, NULL);
417254721Semaste