subr_taskqueue.c revision 154333
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 154333 2006-01-14 01:55:24Z scottl $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/proc.h>
40#include <sys/sched.h>
41#include <sys/taskqueue.h>
42#include <sys/unistd.h>
43#include <machine/stdarg.h>
44
45static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46static void	*taskqueue_giant_ih;
47static void	*taskqueue_ih;
48static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
49static struct mtx taskqueue_queues_mutex;
50
51struct taskqueue {
52	STAILQ_ENTRY(taskqueue)	tq_link;
53	STAILQ_HEAD(, task)	tq_queue;
54	const char		*tq_name;
55	taskqueue_enqueue_fn	tq_enqueue;
56	void			*tq_context;
57	struct task		*tq_running;
58	struct mtx		tq_mutex;
59	struct proc		**tq_pproc;
60	int			tq_pcount;
61	int			tq_spin;
62	int			tq_flags;
63};
64
65#define	TQ_FLAGS_ACTIVE		(1 << 0)
66
67static __inline void
68TQ_LOCK(struct taskqueue *tq)
69{
70	if (tq->tq_spin)
71		mtx_lock_spin(&tq->tq_mutex);
72	else
73		mtx_lock(&tq->tq_mutex);
74}
75
76static __inline void
77TQ_UNLOCK(struct taskqueue *tq)
78{
79	if (tq->tq_spin)
80		mtx_unlock_spin(&tq->tq_mutex);
81	else
82		mtx_unlock(&tq->tq_mutex);
83}
84
85static void	init_taskqueue_list(void *data);
86
87static __inline int
88TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
89    int t)
90{
91	if (tq->tq_spin)
92		return (msleep_spin(p, m, wm, t));
93	return (msleep(p, m, pri, wm, t));
94}
95
96static void
97init_taskqueue_list(void *data __unused)
98{
99
100	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
101	STAILQ_INIT(&taskqueue_queues);
102}
103SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
104    NULL);
105
106static struct taskqueue *
107_taskqueue_create(const char *name, int mflags,
108		 taskqueue_enqueue_fn enqueue, void *context,
109		 int mtxflags, const char *mtxname)
110{
111	struct taskqueue *queue;
112
113	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
114	if (!queue)
115		return 0;
116
117	STAILQ_INIT(&queue->tq_queue);
118	queue->tq_name = name;
119	queue->tq_enqueue = enqueue;
120	queue->tq_context = context;
121	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
122	queue->tq_flags |= TQ_FLAGS_ACTIVE;
123	mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
124
125	mtx_lock(&taskqueue_queues_mutex);
126	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
127	mtx_unlock(&taskqueue_queues_mutex);
128
129	return queue;
130}
131
132struct taskqueue *
133taskqueue_create(const char *name, int mflags,
134		 taskqueue_enqueue_fn enqueue, void *context)
135{
136	return _taskqueue_create(name, mflags, enqueue, context,
137			MTX_DEF, "taskqueue");
138}
139
140/*
141 * Signal a taskqueue thread to terminate.
142 */
143static void
144taskqueue_terminate(struct proc **pp, struct taskqueue *tq)
145{
146
147	while (tq->tq_pcount > 0) {
148		wakeup(tq);
149		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
150	}
151}
152
153void
154taskqueue_free(struct taskqueue *queue)
155{
156
157	mtx_lock(&taskqueue_queues_mutex);
158	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
159	mtx_unlock(&taskqueue_queues_mutex);
160
161	TQ_LOCK(queue);
162	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
163	taskqueue_run(queue);
164	taskqueue_terminate(queue->tq_pproc, queue);
165	mtx_destroy(&queue->tq_mutex);
166	free(queue->tq_pproc, M_TASKQUEUE);
167	free(queue, M_TASKQUEUE);
168}
169
170/*
171 * Returns with the taskqueue locked.
172 */
173struct taskqueue *
174taskqueue_find(const char *name)
175{
176	struct taskqueue *queue;
177
178	mtx_lock(&taskqueue_queues_mutex);
179	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
180		if (strcmp(queue->tq_name, name) == 0) {
181			TQ_LOCK(queue);
182			mtx_unlock(&taskqueue_queues_mutex);
183			return queue;
184		}
185	}
186	mtx_unlock(&taskqueue_queues_mutex);
187	return NULL;
188}
189
190int
191taskqueue_enqueue(struct taskqueue *queue, struct task *task)
192{
193	struct task *ins;
194	struct task *prev;
195
196	TQ_LOCK(queue);
197
198	/*
199	 * Count multiple enqueues.
200	 */
201	if (task->ta_pending) {
202		task->ta_pending++;
203		TQ_UNLOCK(queue);
204		return 0;
205	}
206
207	/*
208	 * Optimise the case when all tasks have the same priority.
209	 */
210	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
211	if (!prev || prev->ta_priority >= task->ta_priority) {
212		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
213	} else {
214		prev = 0;
215		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
216		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
217			if (ins->ta_priority < task->ta_priority)
218				break;
219
220		if (prev)
221			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
222		else
223			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
224	}
225
226	task->ta_pending = 1;
227	queue->tq_enqueue(queue->tq_context);
228
229	TQ_UNLOCK(queue);
230
231	return 0;
232}
233
234void
235taskqueue_run(struct taskqueue *queue)
236{
237	struct task *task;
238	int owned, pending;
239
240	owned = mtx_owned(&queue->tq_mutex);
241	if (!owned)
242		TQ_LOCK(queue);
243	while (STAILQ_FIRST(&queue->tq_queue)) {
244		/*
245		 * Carefully remove the first task from the queue and
246		 * zero its pending count.
247		 */
248		task = STAILQ_FIRST(&queue->tq_queue);
249		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
250		pending = task->ta_pending;
251		task->ta_pending = 0;
252		queue->tq_running = task;
253		TQ_UNLOCK(queue);
254
255		task->ta_func(task->ta_context, pending);
256
257		TQ_LOCK(queue);
258		queue->tq_running = NULL;
259		wakeup(task);
260	}
261
262	/*
263	 * For compatibility, unlock on return if the queue was not locked
264	 * on entry, although this opens a race window.
265	 */
266	if (!owned)
267		TQ_UNLOCK(queue);
268}
269
270void
271taskqueue_drain(struct taskqueue *queue, struct task *task)
272{
273	if (queue->tq_spin) {		/* XXX */
274		mtx_lock_spin(&queue->tq_mutex);
275		while (task->ta_pending != 0 || task == queue->tq_running)
276			msleep_spin(task, &queue->tq_mutex, "-", 0);
277		mtx_unlock_spin(&queue->tq_mutex);
278	} else {
279		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
280
281		mtx_lock(&queue->tq_mutex);
282		while (task->ta_pending != 0 || task == queue->tq_running)
283			msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
284		mtx_unlock(&queue->tq_mutex);
285	}
286}
287
288static void
289taskqueue_swi_enqueue(void *context)
290{
291	swi_sched(taskqueue_ih, 0);
292}
293
294static void
295taskqueue_swi_run(void *dummy)
296{
297	taskqueue_run(taskqueue_swi);
298}
299
300static void
301taskqueue_swi_giant_enqueue(void *context)
302{
303	swi_sched(taskqueue_giant_ih, 0);
304}
305
306static void
307taskqueue_swi_giant_run(void *dummy)
308{
309	taskqueue_run(taskqueue_swi_giant);
310}
311
312int
313taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
314			const char *name, ...)
315{
316	va_list ap;
317	struct taskqueue *tq;
318	char ktname[MAXCOMLEN];
319	int i;
320
321	if (count <= 0)
322		return (EINVAL);
323	tq = *tqp;
324
325	if ((tq->tq_pproc = malloc(sizeof(struct proc *) * count, M_TASKQUEUE,
326	    M_NOWAIT | M_ZERO)) == NULL)
327		return (ENOMEM);
328
329	va_start(ap, name);
330	vsnprintf(ktname, MAXCOMLEN, name, ap);
331	va_end(ap);
332
333	for (i = 0; i < count; i++) {
334		if (count == 1)
335			kthread_create(taskqueue_thread_loop, tqp,
336			    &tq->tq_pproc[i], 0, 0, ktname);
337		else
338			kthread_create(taskqueue_thread_loop, tqp,
339			    &tq->tq_pproc[i], 0, 0, "%s_%d", ktname, i);
340		mtx_lock_spin(&sched_lock);
341		sched_prio(FIRST_THREAD_IN_PROC(tq->tq_pproc[i]), pri);
342		mtx_unlock_spin(&sched_lock);
343		tq->tq_pcount++;
344	}
345
346	return (0);
347}
348
349void
350taskqueue_thread_loop(void *arg)
351{
352	struct taskqueue **tqp, *tq;
353
354	tqp = arg;
355	tq = *tqp;
356	TQ_LOCK(tq);
357	do {
358		taskqueue_run(tq);
359		TQ_SLEEP(tq, tq, &tq->tq_mutex, curthread->td_priority, "-", 0);
360	} while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0);
361
362	/* rendezvous with thread that asked us to terminate */
363	tq->tq_pcount--;
364	wakeup_one(tq->tq_pproc);
365	TQ_UNLOCK(tq);
366	kthread_exit(0);
367}
368
369void
370taskqueue_thread_enqueue(void *context)
371{
372	struct taskqueue **tqp, *tq;
373
374	tqp = context;
375	tq = *tqp;
376
377	mtx_assert(&tq->tq_mutex, MA_OWNED);
378	wakeup_one(tq);
379}
380
381TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
382		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
383		     INTR_MPSAFE, &taskqueue_ih));
384
385TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
386		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
387		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
388
389TASKQUEUE_DEFINE_THREAD(thread);
390
391struct taskqueue *
392taskqueue_create_fast(const char *name, int mflags,
393		 taskqueue_enqueue_fn enqueue, void *context)
394{
395	return _taskqueue_create(name, mflags, enqueue, context,
396			MTX_SPIN, "fast_taskqueue");
397}
398
399/* NB: for backwards compatibility */
400int
401taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
402{
403	return taskqueue_enqueue(queue, task);
404}
405
406static void	*taskqueue_fast_ih;
407
408static void
409taskqueue_fast_enqueue(void *context)
410{
411	swi_sched(taskqueue_fast_ih, 0);
412}
413
414static void
415taskqueue_fast_run(void *dummy)
416{
417	taskqueue_run(taskqueue_fast);
418}
419
420TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, 0,
421	swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL,
422	SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
423