subr_taskqueue.c revision 170307
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 170307 2007-06-05 00:00:57Z jeff $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/lock.h>
37#include <sys/malloc.h>
38#include <sys/mutex.h>
39#include <sys/proc.h>
40#include <sys/sched.h>
41#include <sys/taskqueue.h>
42#include <sys/unistd.h>
43#include <machine/stdarg.h>
44
45static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46static void	*taskqueue_giant_ih;
47static void	*taskqueue_ih;
48static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
49static struct mtx taskqueue_queues_mutex;
50
51struct taskqueue {
52	STAILQ_ENTRY(taskqueue)	tq_link;
53	STAILQ_HEAD(, task)	tq_queue;
54	const char		*tq_name;
55	taskqueue_enqueue_fn	tq_enqueue;
56	void			*tq_context;
57	struct task		*tq_running;
58	struct mtx		tq_mutex;
59	struct proc		**tq_pproc;
60	int			tq_pcount;
61	int			tq_spin;
62	int			tq_flags;
63};
64
65#define	TQ_FLAGS_ACTIVE		(1 << 0)
66
67static __inline void
68TQ_LOCK(struct taskqueue *tq)
69{
70	if (tq->tq_spin)
71		mtx_lock_spin(&tq->tq_mutex);
72	else
73		mtx_lock(&tq->tq_mutex);
74}
75
76static __inline void
77TQ_UNLOCK(struct taskqueue *tq)
78{
79	if (tq->tq_spin)
80		mtx_unlock_spin(&tq->tq_mutex);
81	else
82		mtx_unlock(&tq->tq_mutex);
83}
84
85static void	init_taskqueue_list(void *data);
86
87static __inline int
88TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
89    int t)
90{
91	if (tq->tq_spin)
92		return (msleep_spin(p, m, wm, t));
93	return (msleep(p, m, pri, wm, t));
94}
95
96static void
97init_taskqueue_list(void *data __unused)
98{
99
100	mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
101	STAILQ_INIT(&taskqueue_queues);
102}
103SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
104    NULL);
105
106static struct taskqueue *
107_taskqueue_create(const char *name, int mflags,
108		 taskqueue_enqueue_fn enqueue, void *context,
109		 int mtxflags, const char *mtxname)
110{
111	struct taskqueue *queue;
112
113	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
114	if (!queue)
115		return 0;
116
117	STAILQ_INIT(&queue->tq_queue);
118	queue->tq_name = name;
119	queue->tq_enqueue = enqueue;
120	queue->tq_context = context;
121	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
122	queue->tq_flags |= TQ_FLAGS_ACTIVE;
123	mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
124
125	mtx_lock(&taskqueue_queues_mutex);
126	STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
127	mtx_unlock(&taskqueue_queues_mutex);
128
129	return queue;
130}
131
132struct taskqueue *
133taskqueue_create(const char *name, int mflags,
134		 taskqueue_enqueue_fn enqueue, void *context)
135{
136	return _taskqueue_create(name, mflags, enqueue, context,
137			MTX_DEF, "taskqueue");
138}
139
140/*
141 * Signal a taskqueue thread to terminate.
142 */
143static void
144taskqueue_terminate(struct proc **pp, struct taskqueue *tq)
145{
146
147	while (tq->tq_pcount > 0) {
148		wakeup(tq);
149		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
150	}
151}
152
153void
154taskqueue_free(struct taskqueue *queue)
155{
156
157	mtx_lock(&taskqueue_queues_mutex);
158	STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
159	mtx_unlock(&taskqueue_queues_mutex);
160
161	TQ_LOCK(queue);
162	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
163	taskqueue_run(queue);
164	taskqueue_terminate(queue->tq_pproc, queue);
165	mtx_destroy(&queue->tq_mutex);
166	free(queue->tq_pproc, M_TASKQUEUE);
167	free(queue, M_TASKQUEUE);
168}
169
170/*
171 * Returns with the taskqueue locked.
172 */
173struct taskqueue *
174taskqueue_find(const char *name)
175{
176	struct taskqueue *queue;
177
178	mtx_lock(&taskqueue_queues_mutex);
179	STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
180		if (strcmp(queue->tq_name, name) == 0) {
181			TQ_LOCK(queue);
182			mtx_unlock(&taskqueue_queues_mutex);
183			return queue;
184		}
185	}
186	mtx_unlock(&taskqueue_queues_mutex);
187	return NULL;
188}
189
190int
191taskqueue_enqueue(struct taskqueue *queue, struct task *task)
192{
193	struct task *ins;
194	struct task *prev;
195
196	TQ_LOCK(queue);
197
198	/*
199	 * Count multiple enqueues.
200	 */
201	if (task->ta_pending) {
202		task->ta_pending++;
203		TQ_UNLOCK(queue);
204		return 0;
205	}
206
207	/*
208	 * Optimise the case when all tasks have the same priority.
209	 */
210	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
211	if (!prev || prev->ta_priority >= task->ta_priority) {
212		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
213	} else {
214		prev = 0;
215		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
216		     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
217			if (ins->ta_priority < task->ta_priority)
218				break;
219
220		if (prev)
221			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
222		else
223			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
224	}
225
226	task->ta_pending = 1;
227	queue->tq_enqueue(queue->tq_context);
228
229	TQ_UNLOCK(queue);
230
231	return 0;
232}
233
234void
235taskqueue_run(struct taskqueue *queue)
236{
237	struct task *task;
238	int owned, pending;
239
240	owned = mtx_owned(&queue->tq_mutex);
241	if (!owned)
242		TQ_LOCK(queue);
243	while (STAILQ_FIRST(&queue->tq_queue)) {
244		/*
245		 * Carefully remove the first task from the queue and
246		 * zero its pending count.
247		 */
248		task = STAILQ_FIRST(&queue->tq_queue);
249		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
250		pending = task->ta_pending;
251		task->ta_pending = 0;
252		queue->tq_running = task;
253		TQ_UNLOCK(queue);
254
255		task->ta_func(task->ta_context, pending);
256
257		TQ_LOCK(queue);
258		queue->tq_running = NULL;
259		wakeup(task);
260	}
261
262	/*
263	 * For compatibility, unlock on return if the queue was not locked
264	 * on entry, although this opens a race window.
265	 */
266	if (!owned)
267		TQ_UNLOCK(queue);
268}
269
270void
271taskqueue_drain(struct taskqueue *queue, struct task *task)
272{
273	if (queue->tq_spin) {		/* XXX */
274		mtx_lock_spin(&queue->tq_mutex);
275		while (task->ta_pending != 0 || task == queue->tq_running)
276			msleep_spin(task, &queue->tq_mutex, "-", 0);
277		mtx_unlock_spin(&queue->tq_mutex);
278	} else {
279		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
280
281		mtx_lock(&queue->tq_mutex);
282		while (task->ta_pending != 0 || task == queue->tq_running)
283			msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
284		mtx_unlock(&queue->tq_mutex);
285	}
286}
287
288static void
289taskqueue_swi_enqueue(void *context)
290{
291	swi_sched(taskqueue_ih, 0);
292}
293
294static void
295taskqueue_swi_run(void *dummy)
296{
297	taskqueue_run(taskqueue_swi);
298}
299
300static void
301taskqueue_swi_giant_enqueue(void *context)
302{
303	swi_sched(taskqueue_giant_ih, 0);
304}
305
306static void
307taskqueue_swi_giant_run(void *dummy)
308{
309	taskqueue_run(taskqueue_swi_giant);
310}
311
312int
313taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
314			const char *name, ...)
315{
316	va_list ap;
317	struct taskqueue *tq;
318	struct thread *td;
319	char ktname[MAXCOMLEN];
320	int i, error;
321
322	if (count <= 0)
323		return (EINVAL);
324	tq = *tqp;
325
326	va_start(ap, name);
327	vsnprintf(ktname, MAXCOMLEN, name, ap);
328	va_end(ap);
329
330	tq->tq_pproc = malloc(sizeof(struct proc *) * count, M_TASKQUEUE,
331	    M_NOWAIT | M_ZERO);
332	if (tq->tq_pproc == NULL) {
333		printf("%s: no memory for %s threads\n", __func__, ktname);
334		return (ENOMEM);
335	}
336
337	for (i = 0; i < count; i++) {
338		if (count == 1)
339			error = kthread_create(taskqueue_thread_loop, tqp,
340			    &tq->tq_pproc[i], RFSTOPPED, 0, ktname);
341		else
342			error = kthread_create(taskqueue_thread_loop, tqp,
343			    &tq->tq_pproc[i], RFSTOPPED, 0, "%s_%d", ktname, i);
344		if (error) {
345			/* should be ok to continue, taskqueue_free will dtrt */
346			printf("%s: kthread_create(%s): error %d",
347				__func__, ktname, error);
348			tq->tq_pproc[i] = NULL;		/* paranoid */
349		} else
350			tq->tq_pcount++;
351	}
352	for (i = 0; i < count; i++) {
353		if (tq->tq_pproc[i] == NULL)
354			continue;
355		td = FIRST_THREAD_IN_PROC(tq->tq_pproc[i]);
356		thread_lock(td);
357		sched_prio(td, pri);
358		sched_add(td, SRQ_BORING);
359		thread_unlock(td);
360	}
361
362	return (0);
363}
364
365void
366taskqueue_thread_loop(void *arg)
367{
368	struct taskqueue **tqp, *tq;
369
370	tqp = arg;
371	tq = *tqp;
372	TQ_LOCK(tq);
373	do {
374		taskqueue_run(tq);
375		TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
376	} while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0);
377
378	/* rendezvous with thread that asked us to terminate */
379	tq->tq_pcount--;
380	wakeup_one(tq->tq_pproc);
381	TQ_UNLOCK(tq);
382	kthread_exit(0);
383}
384
385void
386taskqueue_thread_enqueue(void *context)
387{
388	struct taskqueue **tqp, *tq;
389
390	tqp = context;
391	tq = *tqp;
392
393	mtx_assert(&tq->tq_mutex, MA_OWNED);
394	wakeup_one(tq);
395}
396
397TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
398		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
399		     INTR_MPSAFE, &taskqueue_ih));
400
401TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
402		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
403		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
404
405TASKQUEUE_DEFINE_THREAD(thread);
406
407struct taskqueue *
408taskqueue_create_fast(const char *name, int mflags,
409		 taskqueue_enqueue_fn enqueue, void *context)
410{
411	return _taskqueue_create(name, mflags, enqueue, context,
412			MTX_SPIN, "fast_taskqueue");
413}
414
415/* NB: for backwards compatibility */
416int
417taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
418{
419	return taskqueue_enqueue(queue, task);
420}
421
422static void	*taskqueue_fast_ih;
423
424static void
425taskqueue_fast_enqueue(void *context)
426{
427	swi_sched(taskqueue_fast_ih, 0);
428}
429
430static void
431taskqueue_fast_run(void *dummy)
432{
433	taskqueue_run(taskqueue_fast);
434}
435
436TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, 0,
437	swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL,
438	SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
439