Deleted Added
full compact
subr_taskqueue.c (301208) subr_taskqueue.c (302372)
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 301208 2016-06-02 15:52:34Z mjg $");
28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 302372 2016-07-06 14:09:49Z nwhitehorn $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/cpuset.h>
34#include <sys/interrupt.h>
35#include <sys/kernel.h>
36#include <sys/kthread.h>
37#include <sys/libkern.h>
38#include <sys/limits.h>
39#include <sys/lock.h>
40#include <sys/malloc.h>
41#include <sys/mutex.h>
42#include <sys/proc.h>
43#include <sys/sched.h>
44#include <sys/smp.h>
45#include <sys/taskqueue.h>
46#include <sys/unistd.h>
47#include <machine/stdarg.h>
48
49static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
50static void *taskqueue_giant_ih;
51static void *taskqueue_ih;
52static void taskqueue_fast_enqueue(void *);
53static void taskqueue_swi_enqueue(void *);
54static void taskqueue_swi_giant_enqueue(void *);
55
56struct taskqueue_busy {
57 struct task *tb_running;
58 TAILQ_ENTRY(taskqueue_busy) tb_link;
59};
60
61struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
62
63struct taskqueue {
64 STAILQ_HEAD(, task) tq_queue;
65 taskqueue_enqueue_fn tq_enqueue;
66 void *tq_context;
67 char *tq_name;
68 TAILQ_HEAD(, taskqueue_busy) tq_active;
69 struct mtx tq_mutex;
70 struct thread **tq_threads;
71 int tq_tcount;
72 int tq_spin;
73 int tq_flags;
74 int tq_callouts;
75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77};
78
79#define TQ_FLAGS_ACTIVE (1 << 0)
80#define TQ_FLAGS_BLOCKED (1 << 1)
81#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
82
83#define DT_CALLOUT_ARMED (1 << 0)
84
85#define TQ_LOCK(tq) \
86 do { \
87 if ((tq)->tq_spin) \
88 mtx_lock_spin(&(tq)->tq_mutex); \
89 else \
90 mtx_lock(&(tq)->tq_mutex); \
91 } while (0)
92#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
93
94#define TQ_UNLOCK(tq) \
95 do { \
96 if ((tq)->tq_spin) \
97 mtx_unlock_spin(&(tq)->tq_mutex); \
98 else \
99 mtx_unlock(&(tq)->tq_mutex); \
100 } while (0)
101#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
102
103void
104_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
105 int priority, task_fn_t func, void *context)
106{
107
108 TASK_INIT(&timeout_task->t, priority, func, context);
109 callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
110 CALLOUT_RETURNUNLOCKED);
111 timeout_task->q = queue;
112 timeout_task->f = 0;
113}
114
115static __inline int
116TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
117 int t)
118{
119 if (tq->tq_spin)
120 return (msleep_spin(p, m, wm, t));
121 return (msleep(p, m, pri, wm, t));
122}
123
124static struct taskqueue *
125_taskqueue_create(const char *name, int mflags,
126 taskqueue_enqueue_fn enqueue, void *context,
127 int mtxflags, const char *mtxname __unused)
128{
129 struct taskqueue *queue;
130 char *tq_name;
131
132 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
133 if (tq_name == NULL)
134 return (NULL);
135
136 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
137 if (queue == NULL) {
138 free(tq_name, M_TASKQUEUE);
139 return (NULL);
140 }
141
142 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
143
144 STAILQ_INIT(&queue->tq_queue);
145 TAILQ_INIT(&queue->tq_active);
146 queue->tq_enqueue = enqueue;
147 queue->tq_context = context;
148 queue->tq_name = tq_name;
149 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
150 queue->tq_flags |= TQ_FLAGS_ACTIVE;
151 if (enqueue == taskqueue_fast_enqueue ||
152 enqueue == taskqueue_swi_enqueue ||
153 enqueue == taskqueue_swi_giant_enqueue ||
154 enqueue == taskqueue_thread_enqueue)
155 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
156 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
157
158 return (queue);
159}
160
161struct taskqueue *
162taskqueue_create(const char *name, int mflags,
163 taskqueue_enqueue_fn enqueue, void *context)
164{
165
166 return _taskqueue_create(name, mflags, enqueue, context,
167 MTX_DEF, name);
168}
169
170void
171taskqueue_set_callback(struct taskqueue *queue,
172 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
173 void *context)
174{
175
176 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
177 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
178 ("Callback type %d not valid, must be %d-%d", cb_type,
179 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
180 KASSERT((queue->tq_callbacks[cb_type] == NULL),
181 ("Re-initialization of taskqueue callback?"));
182
183 queue->tq_callbacks[cb_type] = callback;
184 queue->tq_cb_contexts[cb_type] = context;
185}
186
187/*
188 * Signal a taskqueue thread to terminate.
189 */
190static void
191taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
192{
193
194 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
195 wakeup(tq);
196 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
197 }
198}
199
200void
201taskqueue_free(struct taskqueue *queue)
202{
203
204 TQ_LOCK(queue);
205 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
206 taskqueue_terminate(queue->tq_threads, queue);
207 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
208 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
209 mtx_destroy(&queue->tq_mutex);
210 free(queue->tq_threads, M_TASKQUEUE);
211 free(queue->tq_name, M_TASKQUEUE);
212 free(queue, M_TASKQUEUE);
213}
214
215static int
216taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
217{
218 struct task *ins;
219 struct task *prev;
220
221 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
222 /*
223 * Count multiple enqueues.
224 */
225 if (task->ta_pending) {
226 if (task->ta_pending < USHRT_MAX)
227 task->ta_pending++;
228 TQ_UNLOCK(queue);
229 return (0);
230 }
231
232 /*
233 * Optimise the case when all tasks have the same priority.
234 */
235 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
236 if (!prev || prev->ta_priority >= task->ta_priority) {
237 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
238 } else {
239 prev = NULL;
240 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
241 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
242 if (ins->ta_priority < task->ta_priority)
243 break;
244
245 if (prev)
246 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
247 else
248 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
249 }
250
251 task->ta_pending = 1;
252 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
253 TQ_UNLOCK(queue);
254 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
255 queue->tq_enqueue(queue->tq_context);
256 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
257 TQ_UNLOCK(queue);
258
259 /* Return with lock released. */
260 return (0);
261}
262
263int
264grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task)
265{
266 TQ_LOCK(queue);
267 if (task->ta_pending) {
268 TQ_UNLOCK(queue);
269 return (0);
270 }
271 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
272 task->ta_pending = 1;
273 TQ_UNLOCK(queue);
274 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
275 queue->tq_enqueue(queue->tq_context);
276 return (0);
277}
278
279int
280taskqueue_enqueue(struct taskqueue *queue, struct task *task)
281{
282 int res;
283
284 TQ_LOCK(queue);
285 res = taskqueue_enqueue_locked(queue, task);
286 /* The lock is released inside. */
287
288 return (res);
289}
290
291static void
292taskqueue_timeout_func(void *arg)
293{
294 struct taskqueue *queue;
295 struct timeout_task *timeout_task;
296
297 timeout_task = arg;
298 queue = timeout_task->q;
299 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
300 timeout_task->f &= ~DT_CALLOUT_ARMED;
301 queue->tq_callouts--;
302 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
303 /* The lock is released inside. */
304}
305
306int
307taskqueue_enqueue_timeout(struct taskqueue *queue,
308 struct timeout_task *timeout_task, int ticks)
309{
310 int res;
311
312 TQ_LOCK(queue);
313 KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
314 ("Migrated queue"));
315 KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
316 timeout_task->q = queue;
317 res = timeout_task->t.ta_pending;
318 if (ticks == 0) {
319 taskqueue_enqueue_locked(queue, &timeout_task->t);
320 /* The lock is released inside. */
321 } else {
322 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
323 res++;
324 } else {
325 queue->tq_callouts++;
326 timeout_task->f |= DT_CALLOUT_ARMED;
327 if (ticks < 0)
328 ticks = -ticks; /* Ignore overflow. */
329 }
330 if (ticks > 0) {
331 callout_reset(&timeout_task->c, ticks,
332 taskqueue_timeout_func, timeout_task);
333 }
334 TQ_UNLOCK(queue);
335 }
336 return (res);
337}
338
339static void
340taskqueue_task_nop_fn(void *context, int pending)
341{
342}
343
344/*
345 * Block until all currently queued tasks in this taskqueue
346 * have begun execution. Tasks queued during execution of
347 * this function are ignored.
348 */
349static void
350taskqueue_drain_tq_queue(struct taskqueue *queue)
351{
352 struct task t_barrier;
353
354 if (STAILQ_EMPTY(&queue->tq_queue))
355 return;
356
357 /*
358 * Enqueue our barrier after all current tasks, but with
359 * the highest priority so that newly queued tasks cannot
360 * pass it. Because of the high priority, we can not use
361 * taskqueue_enqueue_locked directly (which drops the lock
362 * anyway) so just insert it at tail while we have the
363 * queue lock.
364 */
365 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
366 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
367 t_barrier.ta_pending = 1;
368
369 /*
370 * Once the barrier has executed, all previously queued tasks
371 * have completed or are currently executing.
372 */
373 while (t_barrier.ta_pending != 0)
374 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
375}
376
377/*
378 * Block until all currently executing tasks for this taskqueue
379 * complete. Tasks that begin execution during the execution
380 * of this function are ignored.
381 */
382static void
383taskqueue_drain_tq_active(struct taskqueue *queue)
384{
385 struct taskqueue_busy tb_marker, *tb_first;
386
387 if (TAILQ_EMPTY(&queue->tq_active))
388 return;
389
390 /* Block taskq_terminate().*/
391 queue->tq_callouts++;
392
393 /*
394 * Wait for all currently executing taskqueue threads
395 * to go idle.
396 */
397 tb_marker.tb_running = TB_DRAIN_WAITER;
398 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
399 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
400 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
401 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
402
403 /*
404 * Wakeup any other drain waiter that happened to queue up
405 * without any intervening active thread.
406 */
407 tb_first = TAILQ_FIRST(&queue->tq_active);
408 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
409 wakeup(tb_first);
410
411 /* Release taskqueue_terminate(). */
412 queue->tq_callouts--;
413 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
414 wakeup_one(queue->tq_threads);
415}
416
417void
418taskqueue_block(struct taskqueue *queue)
419{
420
421 TQ_LOCK(queue);
422 queue->tq_flags |= TQ_FLAGS_BLOCKED;
423 TQ_UNLOCK(queue);
424}
425
426void
427taskqueue_unblock(struct taskqueue *queue)
428{
429
430 TQ_LOCK(queue);
431 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
432 if (!STAILQ_EMPTY(&queue->tq_queue))
433 queue->tq_enqueue(queue->tq_context);
434 TQ_UNLOCK(queue);
435}
436
437static void
438taskqueue_run_locked(struct taskqueue *queue)
439{
440 struct taskqueue_busy tb;
441 struct taskqueue_busy *tb_first;
442 struct task *task;
443 int pending;
444
445 KASSERT(queue != NULL, ("tq is NULL"));
446 TQ_ASSERT_LOCKED(queue);
447 tb.tb_running = NULL;
448
449 while (STAILQ_FIRST(&queue->tq_queue)) {
450 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
451
452 /*
453 * Carefully remove the first task from the queue and
454 * zero its pending count.
455 */
456 task = STAILQ_FIRST(&queue->tq_queue);
457 KASSERT(task != NULL, ("task is NULL"));
458 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
459 pending = task->ta_pending;
460 task->ta_pending = 0;
461 tb.tb_running = task;
462 TQ_UNLOCK(queue);
463
464 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
465 task->ta_func(task->ta_context, pending);
466
467 TQ_LOCK(queue);
468 tb.tb_running = NULL;
469 wakeup(task);
470
471 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
472 tb_first = TAILQ_FIRST(&queue->tq_active);
473 if (tb_first != NULL &&
474 tb_first->tb_running == TB_DRAIN_WAITER)
475 wakeup(tb_first);
476 }
477}
478
479void
480taskqueue_run(struct taskqueue *queue)
481{
482
483 TQ_LOCK(queue);
484 taskqueue_run_locked(queue);
485 TQ_UNLOCK(queue);
486}
487
488static int
489task_is_running(struct taskqueue *queue, struct task *task)
490{
491 struct taskqueue_busy *tb;
492
493 TQ_ASSERT_LOCKED(queue);
494 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
495 if (tb->tb_running == task)
496 return (1);
497 }
498 return (0);
499}
500
501static int
502taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
503 u_int *pendp)
504{
505
506 if (task->ta_pending > 0)
507 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
508 if (pendp != NULL)
509 *pendp = task->ta_pending;
510 task->ta_pending = 0;
511 return (task_is_running(queue, task) ? EBUSY : 0);
512}
513
514int
515taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
516{
517 int error;
518
519 TQ_LOCK(queue);
520 error = taskqueue_cancel_locked(queue, task, pendp);
521 TQ_UNLOCK(queue);
522
523 return (error);
524}
525
526int
527taskqueue_cancel_timeout(struct taskqueue *queue,
528 struct timeout_task *timeout_task, u_int *pendp)
529{
530 u_int pending, pending1;
531 int error;
532
533 TQ_LOCK(queue);
534 pending = !!(callout_stop(&timeout_task->c) > 0);
535 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
536 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
537 timeout_task->f &= ~DT_CALLOUT_ARMED;
538 queue->tq_callouts--;
539 }
540 TQ_UNLOCK(queue);
541
542 if (pendp != NULL)
543 *pendp = pending + pending1;
544 return (error);
545}
546
547void
548taskqueue_drain(struct taskqueue *queue, struct task *task)
549{
550
551 if (!queue->tq_spin)
552 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
553
554 TQ_LOCK(queue);
555 while (task->ta_pending != 0 || task_is_running(queue, task))
556 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
557 TQ_UNLOCK(queue);
558}
559
560void
561taskqueue_drain_all(struct taskqueue *queue)
562{
563
564 if (!queue->tq_spin)
565 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
566
567 TQ_LOCK(queue);
568 taskqueue_drain_tq_queue(queue);
569 taskqueue_drain_tq_active(queue);
570 TQ_UNLOCK(queue);
571}
572
573void
574taskqueue_drain_timeout(struct taskqueue *queue,
575 struct timeout_task *timeout_task)
576{
577
578 callout_drain(&timeout_task->c);
579 taskqueue_drain(queue, &timeout_task->t);
580}
581
582static void
583taskqueue_swi_enqueue(void *context)
584{
585 swi_sched(taskqueue_ih, 0);
586}
587
588static void
589taskqueue_swi_run(void *dummy)
590{
591 taskqueue_run(taskqueue_swi);
592}
593
594static void
595taskqueue_swi_giant_enqueue(void *context)
596{
597 swi_sched(taskqueue_giant_ih, 0);
598}
599
600static void
601taskqueue_swi_giant_run(void *dummy)
602{
603 taskqueue_run(taskqueue_swi_giant);
604}
605
606static int
607_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
608 cpuset_t *mask, const char *name, va_list ap)
609{
610 char ktname[MAXCOMLEN + 1];
611 struct thread *td;
612 struct taskqueue *tq;
613 int i, error;
614
615 if (count <= 0)
616 return (EINVAL);
617
618 vsnprintf(ktname, sizeof(ktname), name, ap);
619 tq = *tqp;
620
621 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
622 M_NOWAIT | M_ZERO);
623 if (tq->tq_threads == NULL) {
624 printf("%s: no memory for %s threads\n", __func__, ktname);
625 return (ENOMEM);
626 }
627
628 for (i = 0; i < count; i++) {
629 if (count == 1)
630 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
631 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
632 else
633 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
634 &tq->tq_threads[i], RFSTOPPED, 0,
635 "%s_%d", ktname, i);
636 if (error) {
637 /* should be ok to continue, taskqueue_free will dtrt */
638 printf("%s: kthread_add(%s): error %d", __func__,
639 ktname, error);
640 tq->tq_threads[i] = NULL; /* paranoid */
641 } else
642 tq->tq_tcount++;
643 }
644 for (i = 0; i < count; i++) {
645 if (tq->tq_threads[i] == NULL)
646 continue;
647 td = tq->tq_threads[i];
648 if (mask) {
649 error = cpuset_setthread(td->td_tid, mask);
650 /*
651 * Failing to pin is rarely an actual fatal error;
652 * it'll just affect performance.
653 */
654 if (error)
655 printf("%s: curthread=%llu: can't pin; "
656 "error=%d\n",
657 __func__,
658 (unsigned long long) td->td_tid,
659 error);
660 }
661 thread_lock(td);
662 sched_prio(td, pri);
663 sched_add(td, SRQ_BORING);
664 thread_unlock(td);
665 }
666
667 return (0);
668}
669
670int
671taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
672 const char *name, ...)
673{
674 va_list ap;
675 int error;
676
677 va_start(ap, name);
678 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
679 va_end(ap);
680 return (error);
681}
682
683int
684taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
685 cpuset_t *mask, const char *name, ...)
686{
687 va_list ap;
688 int error;
689
690 va_start(ap, name);
691 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
692 va_end(ap);
693 return (error);
694}
695
696static inline void
697taskqueue_run_callback(struct taskqueue *tq,
698 enum taskqueue_callback_type cb_type)
699{
700 taskqueue_callback_fn tq_callback;
701
702 TQ_ASSERT_UNLOCKED(tq);
703 tq_callback = tq->tq_callbacks[cb_type];
704 if (tq_callback != NULL)
705 tq_callback(tq->tq_cb_contexts[cb_type]);
706}
707
708void
709taskqueue_thread_loop(void *arg)
710{
711 struct taskqueue **tqp, *tq;
712
713 tqp = arg;
714 tq = *tqp;
715 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
716 TQ_LOCK(tq);
717 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
718 /* XXX ? */
719 taskqueue_run_locked(tq);
720 /*
721 * Because taskqueue_run() can drop tq_mutex, we need to
722 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
723 * meantime, which means we missed a wakeup.
724 */
725 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
726 break;
727 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
728 }
729 taskqueue_run_locked(tq);
730 /*
731 * This thread is on its way out, so just drop the lock temporarily
732 * in order to call the shutdown callback. This allows the callback
733 * to look at the taskqueue, even just before it dies.
734 */
735 TQ_UNLOCK(tq);
736 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
737 TQ_LOCK(tq);
738
739 /* rendezvous with thread that asked us to terminate */
740 tq->tq_tcount--;
741 wakeup_one(tq->tq_threads);
742 TQ_UNLOCK(tq);
743 kthread_exit();
744}
745
746void
747taskqueue_thread_enqueue(void *context)
748{
749 struct taskqueue **tqp, *tq;
750
751 tqp = context;
752 tq = *tqp;
753 wakeup_one(tq);
754}
755
756TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
757 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
758 INTR_MPSAFE, &taskqueue_ih));
759
760TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
761 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
762 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
763
764TASKQUEUE_DEFINE_THREAD(thread);
765
766struct taskqueue *
767taskqueue_create_fast(const char *name, int mflags,
768 taskqueue_enqueue_fn enqueue, void *context)
769{
770 return _taskqueue_create(name, mflags, enqueue, context,
771 MTX_SPIN, "fast_taskqueue");
772}
773
774static void *taskqueue_fast_ih;
775
776static void
777taskqueue_fast_enqueue(void *context)
778{
779 swi_sched(taskqueue_fast_ih, 0);
780}
781
782static void
783taskqueue_fast_run(void *dummy)
784{
785 taskqueue_run(taskqueue_fast);
786}
787
788TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
789 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
790 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
791
792int
793taskqueue_member(struct taskqueue *queue, struct thread *td)
794{
795 int i, j, ret = 0;
796
797 for (i = 0, j = 0; ; i++) {
798 if (queue->tq_threads[i] == NULL)
799 continue;
800 if (queue->tq_threads[i] == td) {
801 ret = 1;
802 break;
803 }
804 if (++j >= queue->tq_tcount)
805 break;
806 }
807 return (ret);
808}
809
810struct taskqgroup_cpu {
811 LIST_HEAD(, grouptask) tgc_tasks;
812 struct taskqueue *tgc_taskq;
813 int tgc_cnt;
814 int tgc_cpu;
815};
816
817struct taskqgroup {
818 struct taskqgroup_cpu tqg_queue[MAXCPU];
819 struct mtx tqg_lock;
820 char * tqg_name;
821 int tqg_adjusting;
822 int tqg_stride;
823 int tqg_cnt;
824};
825
826struct taskq_bind_task {
827 struct task bt_task;
828 int bt_cpuid;
829};
830
831static void
832taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
833{
834 struct taskqgroup_cpu *qcpu;
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/bus.h>
33#include <sys/cpuset.h>
34#include <sys/interrupt.h>
35#include <sys/kernel.h>
36#include <sys/kthread.h>
37#include <sys/libkern.h>
38#include <sys/limits.h>
39#include <sys/lock.h>
40#include <sys/malloc.h>
41#include <sys/mutex.h>
42#include <sys/proc.h>
43#include <sys/sched.h>
44#include <sys/smp.h>
45#include <sys/taskqueue.h>
46#include <sys/unistd.h>
47#include <machine/stdarg.h>
48
49static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
50static void *taskqueue_giant_ih;
51static void *taskqueue_ih;
52static void taskqueue_fast_enqueue(void *);
53static void taskqueue_swi_enqueue(void *);
54static void taskqueue_swi_giant_enqueue(void *);
55
56struct taskqueue_busy {
57 struct task *tb_running;
58 TAILQ_ENTRY(taskqueue_busy) tb_link;
59};
60
61struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
62
63struct taskqueue {
64 STAILQ_HEAD(, task) tq_queue;
65 taskqueue_enqueue_fn tq_enqueue;
66 void *tq_context;
67 char *tq_name;
68 TAILQ_HEAD(, taskqueue_busy) tq_active;
69 struct mtx tq_mutex;
70 struct thread **tq_threads;
71 int tq_tcount;
72 int tq_spin;
73 int tq_flags;
74 int tq_callouts;
75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77};
78
79#define TQ_FLAGS_ACTIVE (1 << 0)
80#define TQ_FLAGS_BLOCKED (1 << 1)
81#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
82
83#define DT_CALLOUT_ARMED (1 << 0)
84
85#define TQ_LOCK(tq) \
86 do { \
87 if ((tq)->tq_spin) \
88 mtx_lock_spin(&(tq)->tq_mutex); \
89 else \
90 mtx_lock(&(tq)->tq_mutex); \
91 } while (0)
92#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
93
94#define TQ_UNLOCK(tq) \
95 do { \
96 if ((tq)->tq_spin) \
97 mtx_unlock_spin(&(tq)->tq_mutex); \
98 else \
99 mtx_unlock(&(tq)->tq_mutex); \
100 } while (0)
101#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
102
103void
104_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
105 int priority, task_fn_t func, void *context)
106{
107
108 TASK_INIT(&timeout_task->t, priority, func, context);
109 callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
110 CALLOUT_RETURNUNLOCKED);
111 timeout_task->q = queue;
112 timeout_task->f = 0;
113}
114
115static __inline int
116TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
117 int t)
118{
119 if (tq->tq_spin)
120 return (msleep_spin(p, m, wm, t));
121 return (msleep(p, m, pri, wm, t));
122}
123
124static struct taskqueue *
125_taskqueue_create(const char *name, int mflags,
126 taskqueue_enqueue_fn enqueue, void *context,
127 int mtxflags, const char *mtxname __unused)
128{
129 struct taskqueue *queue;
130 char *tq_name;
131
132 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
133 if (tq_name == NULL)
134 return (NULL);
135
136 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
137 if (queue == NULL) {
138 free(tq_name, M_TASKQUEUE);
139 return (NULL);
140 }
141
142 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
143
144 STAILQ_INIT(&queue->tq_queue);
145 TAILQ_INIT(&queue->tq_active);
146 queue->tq_enqueue = enqueue;
147 queue->tq_context = context;
148 queue->tq_name = tq_name;
149 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
150 queue->tq_flags |= TQ_FLAGS_ACTIVE;
151 if (enqueue == taskqueue_fast_enqueue ||
152 enqueue == taskqueue_swi_enqueue ||
153 enqueue == taskqueue_swi_giant_enqueue ||
154 enqueue == taskqueue_thread_enqueue)
155 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
156 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
157
158 return (queue);
159}
160
161struct taskqueue *
162taskqueue_create(const char *name, int mflags,
163 taskqueue_enqueue_fn enqueue, void *context)
164{
165
166 return _taskqueue_create(name, mflags, enqueue, context,
167 MTX_DEF, name);
168}
169
170void
171taskqueue_set_callback(struct taskqueue *queue,
172 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
173 void *context)
174{
175
176 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
177 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
178 ("Callback type %d not valid, must be %d-%d", cb_type,
179 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
180 KASSERT((queue->tq_callbacks[cb_type] == NULL),
181 ("Re-initialization of taskqueue callback?"));
182
183 queue->tq_callbacks[cb_type] = callback;
184 queue->tq_cb_contexts[cb_type] = context;
185}
186
187/*
188 * Signal a taskqueue thread to terminate.
189 */
190static void
191taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
192{
193
194 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
195 wakeup(tq);
196 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
197 }
198}
199
200void
201taskqueue_free(struct taskqueue *queue)
202{
203
204 TQ_LOCK(queue);
205 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
206 taskqueue_terminate(queue->tq_threads, queue);
207 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
208 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
209 mtx_destroy(&queue->tq_mutex);
210 free(queue->tq_threads, M_TASKQUEUE);
211 free(queue->tq_name, M_TASKQUEUE);
212 free(queue, M_TASKQUEUE);
213}
214
215static int
216taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
217{
218 struct task *ins;
219 struct task *prev;
220
221 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
222 /*
223 * Count multiple enqueues.
224 */
225 if (task->ta_pending) {
226 if (task->ta_pending < USHRT_MAX)
227 task->ta_pending++;
228 TQ_UNLOCK(queue);
229 return (0);
230 }
231
232 /*
233 * Optimise the case when all tasks have the same priority.
234 */
235 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
236 if (!prev || prev->ta_priority >= task->ta_priority) {
237 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
238 } else {
239 prev = NULL;
240 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
241 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
242 if (ins->ta_priority < task->ta_priority)
243 break;
244
245 if (prev)
246 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
247 else
248 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
249 }
250
251 task->ta_pending = 1;
252 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
253 TQ_UNLOCK(queue);
254 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
255 queue->tq_enqueue(queue->tq_context);
256 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
257 TQ_UNLOCK(queue);
258
259 /* Return with lock released. */
260 return (0);
261}
262
263int
264grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task)
265{
266 TQ_LOCK(queue);
267 if (task->ta_pending) {
268 TQ_UNLOCK(queue);
269 return (0);
270 }
271 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
272 task->ta_pending = 1;
273 TQ_UNLOCK(queue);
274 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
275 queue->tq_enqueue(queue->tq_context);
276 return (0);
277}
278
279int
280taskqueue_enqueue(struct taskqueue *queue, struct task *task)
281{
282 int res;
283
284 TQ_LOCK(queue);
285 res = taskqueue_enqueue_locked(queue, task);
286 /* The lock is released inside. */
287
288 return (res);
289}
290
291static void
292taskqueue_timeout_func(void *arg)
293{
294 struct taskqueue *queue;
295 struct timeout_task *timeout_task;
296
297 timeout_task = arg;
298 queue = timeout_task->q;
299 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
300 timeout_task->f &= ~DT_CALLOUT_ARMED;
301 queue->tq_callouts--;
302 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
303 /* The lock is released inside. */
304}
305
306int
307taskqueue_enqueue_timeout(struct taskqueue *queue,
308 struct timeout_task *timeout_task, int ticks)
309{
310 int res;
311
312 TQ_LOCK(queue);
313 KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
314 ("Migrated queue"));
315 KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
316 timeout_task->q = queue;
317 res = timeout_task->t.ta_pending;
318 if (ticks == 0) {
319 taskqueue_enqueue_locked(queue, &timeout_task->t);
320 /* The lock is released inside. */
321 } else {
322 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
323 res++;
324 } else {
325 queue->tq_callouts++;
326 timeout_task->f |= DT_CALLOUT_ARMED;
327 if (ticks < 0)
328 ticks = -ticks; /* Ignore overflow. */
329 }
330 if (ticks > 0) {
331 callout_reset(&timeout_task->c, ticks,
332 taskqueue_timeout_func, timeout_task);
333 }
334 TQ_UNLOCK(queue);
335 }
336 return (res);
337}
338
339static void
340taskqueue_task_nop_fn(void *context, int pending)
341{
342}
343
344/*
345 * Block until all currently queued tasks in this taskqueue
346 * have begun execution. Tasks queued during execution of
347 * this function are ignored.
348 */
349static void
350taskqueue_drain_tq_queue(struct taskqueue *queue)
351{
352 struct task t_barrier;
353
354 if (STAILQ_EMPTY(&queue->tq_queue))
355 return;
356
357 /*
358 * Enqueue our barrier after all current tasks, but with
359 * the highest priority so that newly queued tasks cannot
360 * pass it. Because of the high priority, we can not use
361 * taskqueue_enqueue_locked directly (which drops the lock
362 * anyway) so just insert it at tail while we have the
363 * queue lock.
364 */
365 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
366 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
367 t_barrier.ta_pending = 1;
368
369 /*
370 * Once the barrier has executed, all previously queued tasks
371 * have completed or are currently executing.
372 */
373 while (t_barrier.ta_pending != 0)
374 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
375}
376
377/*
378 * Block until all currently executing tasks for this taskqueue
379 * complete. Tasks that begin execution during the execution
380 * of this function are ignored.
381 */
382static void
383taskqueue_drain_tq_active(struct taskqueue *queue)
384{
385 struct taskqueue_busy tb_marker, *tb_first;
386
387 if (TAILQ_EMPTY(&queue->tq_active))
388 return;
389
390 /* Block taskq_terminate().*/
391 queue->tq_callouts++;
392
393 /*
394 * Wait for all currently executing taskqueue threads
395 * to go idle.
396 */
397 tb_marker.tb_running = TB_DRAIN_WAITER;
398 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
399 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
400 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
401 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
402
403 /*
404 * Wakeup any other drain waiter that happened to queue up
405 * without any intervening active thread.
406 */
407 tb_first = TAILQ_FIRST(&queue->tq_active);
408 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
409 wakeup(tb_first);
410
411 /* Release taskqueue_terminate(). */
412 queue->tq_callouts--;
413 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
414 wakeup_one(queue->tq_threads);
415}
416
417void
418taskqueue_block(struct taskqueue *queue)
419{
420
421 TQ_LOCK(queue);
422 queue->tq_flags |= TQ_FLAGS_BLOCKED;
423 TQ_UNLOCK(queue);
424}
425
426void
427taskqueue_unblock(struct taskqueue *queue)
428{
429
430 TQ_LOCK(queue);
431 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
432 if (!STAILQ_EMPTY(&queue->tq_queue))
433 queue->tq_enqueue(queue->tq_context);
434 TQ_UNLOCK(queue);
435}
436
437static void
438taskqueue_run_locked(struct taskqueue *queue)
439{
440 struct taskqueue_busy tb;
441 struct taskqueue_busy *tb_first;
442 struct task *task;
443 int pending;
444
445 KASSERT(queue != NULL, ("tq is NULL"));
446 TQ_ASSERT_LOCKED(queue);
447 tb.tb_running = NULL;
448
449 while (STAILQ_FIRST(&queue->tq_queue)) {
450 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
451
452 /*
453 * Carefully remove the first task from the queue and
454 * zero its pending count.
455 */
456 task = STAILQ_FIRST(&queue->tq_queue);
457 KASSERT(task != NULL, ("task is NULL"));
458 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
459 pending = task->ta_pending;
460 task->ta_pending = 0;
461 tb.tb_running = task;
462 TQ_UNLOCK(queue);
463
464 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
465 task->ta_func(task->ta_context, pending);
466
467 TQ_LOCK(queue);
468 tb.tb_running = NULL;
469 wakeup(task);
470
471 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
472 tb_first = TAILQ_FIRST(&queue->tq_active);
473 if (tb_first != NULL &&
474 tb_first->tb_running == TB_DRAIN_WAITER)
475 wakeup(tb_first);
476 }
477}
478
479void
480taskqueue_run(struct taskqueue *queue)
481{
482
483 TQ_LOCK(queue);
484 taskqueue_run_locked(queue);
485 TQ_UNLOCK(queue);
486}
487
488static int
489task_is_running(struct taskqueue *queue, struct task *task)
490{
491 struct taskqueue_busy *tb;
492
493 TQ_ASSERT_LOCKED(queue);
494 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
495 if (tb->tb_running == task)
496 return (1);
497 }
498 return (0);
499}
500
501static int
502taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
503 u_int *pendp)
504{
505
506 if (task->ta_pending > 0)
507 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
508 if (pendp != NULL)
509 *pendp = task->ta_pending;
510 task->ta_pending = 0;
511 return (task_is_running(queue, task) ? EBUSY : 0);
512}
513
514int
515taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
516{
517 int error;
518
519 TQ_LOCK(queue);
520 error = taskqueue_cancel_locked(queue, task, pendp);
521 TQ_UNLOCK(queue);
522
523 return (error);
524}
525
526int
527taskqueue_cancel_timeout(struct taskqueue *queue,
528 struct timeout_task *timeout_task, u_int *pendp)
529{
530 u_int pending, pending1;
531 int error;
532
533 TQ_LOCK(queue);
534 pending = !!(callout_stop(&timeout_task->c) > 0);
535 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
536 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
537 timeout_task->f &= ~DT_CALLOUT_ARMED;
538 queue->tq_callouts--;
539 }
540 TQ_UNLOCK(queue);
541
542 if (pendp != NULL)
543 *pendp = pending + pending1;
544 return (error);
545}
546
547void
548taskqueue_drain(struct taskqueue *queue, struct task *task)
549{
550
551 if (!queue->tq_spin)
552 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
553
554 TQ_LOCK(queue);
555 while (task->ta_pending != 0 || task_is_running(queue, task))
556 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
557 TQ_UNLOCK(queue);
558}
559
560void
561taskqueue_drain_all(struct taskqueue *queue)
562{
563
564 if (!queue->tq_spin)
565 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
566
567 TQ_LOCK(queue);
568 taskqueue_drain_tq_queue(queue);
569 taskqueue_drain_tq_active(queue);
570 TQ_UNLOCK(queue);
571}
572
573void
574taskqueue_drain_timeout(struct taskqueue *queue,
575 struct timeout_task *timeout_task)
576{
577
578 callout_drain(&timeout_task->c);
579 taskqueue_drain(queue, &timeout_task->t);
580}
581
582static void
583taskqueue_swi_enqueue(void *context)
584{
585 swi_sched(taskqueue_ih, 0);
586}
587
588static void
589taskqueue_swi_run(void *dummy)
590{
591 taskqueue_run(taskqueue_swi);
592}
593
594static void
595taskqueue_swi_giant_enqueue(void *context)
596{
597 swi_sched(taskqueue_giant_ih, 0);
598}
599
600static void
601taskqueue_swi_giant_run(void *dummy)
602{
603 taskqueue_run(taskqueue_swi_giant);
604}
605
606static int
607_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
608 cpuset_t *mask, const char *name, va_list ap)
609{
610 char ktname[MAXCOMLEN + 1];
611 struct thread *td;
612 struct taskqueue *tq;
613 int i, error;
614
615 if (count <= 0)
616 return (EINVAL);
617
618 vsnprintf(ktname, sizeof(ktname), name, ap);
619 tq = *tqp;
620
621 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
622 M_NOWAIT | M_ZERO);
623 if (tq->tq_threads == NULL) {
624 printf("%s: no memory for %s threads\n", __func__, ktname);
625 return (ENOMEM);
626 }
627
628 for (i = 0; i < count; i++) {
629 if (count == 1)
630 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
631 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
632 else
633 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
634 &tq->tq_threads[i], RFSTOPPED, 0,
635 "%s_%d", ktname, i);
636 if (error) {
637 /* should be ok to continue, taskqueue_free will dtrt */
638 printf("%s: kthread_add(%s): error %d", __func__,
639 ktname, error);
640 tq->tq_threads[i] = NULL; /* paranoid */
641 } else
642 tq->tq_tcount++;
643 }
644 for (i = 0; i < count; i++) {
645 if (tq->tq_threads[i] == NULL)
646 continue;
647 td = tq->tq_threads[i];
648 if (mask) {
649 error = cpuset_setthread(td->td_tid, mask);
650 /*
651 * Failing to pin is rarely an actual fatal error;
652 * it'll just affect performance.
653 */
654 if (error)
655 printf("%s: curthread=%llu: can't pin; "
656 "error=%d\n",
657 __func__,
658 (unsigned long long) td->td_tid,
659 error);
660 }
661 thread_lock(td);
662 sched_prio(td, pri);
663 sched_add(td, SRQ_BORING);
664 thread_unlock(td);
665 }
666
667 return (0);
668}
669
670int
671taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
672 const char *name, ...)
673{
674 va_list ap;
675 int error;
676
677 va_start(ap, name);
678 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
679 va_end(ap);
680 return (error);
681}
682
683int
684taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
685 cpuset_t *mask, const char *name, ...)
686{
687 va_list ap;
688 int error;
689
690 va_start(ap, name);
691 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
692 va_end(ap);
693 return (error);
694}
695
696static inline void
697taskqueue_run_callback(struct taskqueue *tq,
698 enum taskqueue_callback_type cb_type)
699{
700 taskqueue_callback_fn tq_callback;
701
702 TQ_ASSERT_UNLOCKED(tq);
703 tq_callback = tq->tq_callbacks[cb_type];
704 if (tq_callback != NULL)
705 tq_callback(tq->tq_cb_contexts[cb_type]);
706}
707
708void
709taskqueue_thread_loop(void *arg)
710{
711 struct taskqueue **tqp, *tq;
712
713 tqp = arg;
714 tq = *tqp;
715 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
716 TQ_LOCK(tq);
717 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
718 /* XXX ? */
719 taskqueue_run_locked(tq);
720 /*
721 * Because taskqueue_run() can drop tq_mutex, we need to
722 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
723 * meantime, which means we missed a wakeup.
724 */
725 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
726 break;
727 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
728 }
729 taskqueue_run_locked(tq);
730 /*
731 * This thread is on its way out, so just drop the lock temporarily
732 * in order to call the shutdown callback. This allows the callback
733 * to look at the taskqueue, even just before it dies.
734 */
735 TQ_UNLOCK(tq);
736 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
737 TQ_LOCK(tq);
738
739 /* rendezvous with thread that asked us to terminate */
740 tq->tq_tcount--;
741 wakeup_one(tq->tq_threads);
742 TQ_UNLOCK(tq);
743 kthread_exit();
744}
745
746void
747taskqueue_thread_enqueue(void *context)
748{
749 struct taskqueue **tqp, *tq;
750
751 tqp = context;
752 tq = *tqp;
753 wakeup_one(tq);
754}
755
756TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
757 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
758 INTR_MPSAFE, &taskqueue_ih));
759
760TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
761 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
762 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
763
764TASKQUEUE_DEFINE_THREAD(thread);
765
766struct taskqueue *
767taskqueue_create_fast(const char *name, int mflags,
768 taskqueue_enqueue_fn enqueue, void *context)
769{
770 return _taskqueue_create(name, mflags, enqueue, context,
771 MTX_SPIN, "fast_taskqueue");
772}
773
774static void *taskqueue_fast_ih;
775
776static void
777taskqueue_fast_enqueue(void *context)
778{
779 swi_sched(taskqueue_fast_ih, 0);
780}
781
782static void
783taskqueue_fast_run(void *dummy)
784{
785 taskqueue_run(taskqueue_fast);
786}
787
788TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
789 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
790 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
791
792int
793taskqueue_member(struct taskqueue *queue, struct thread *td)
794{
795 int i, j, ret = 0;
796
797 for (i = 0, j = 0; ; i++) {
798 if (queue->tq_threads[i] == NULL)
799 continue;
800 if (queue->tq_threads[i] == td) {
801 ret = 1;
802 break;
803 }
804 if (++j >= queue->tq_tcount)
805 break;
806 }
807 return (ret);
808}
809
810struct taskqgroup_cpu {
811 LIST_HEAD(, grouptask) tgc_tasks;
812 struct taskqueue *tgc_taskq;
813 int tgc_cnt;
814 int tgc_cpu;
815};
816
817struct taskqgroup {
818 struct taskqgroup_cpu tqg_queue[MAXCPU];
819 struct mtx tqg_lock;
820 char * tqg_name;
821 int tqg_adjusting;
822 int tqg_stride;
823 int tqg_cnt;
824};
825
826struct taskq_bind_task {
827 struct task bt_task;
828 int bt_cpuid;
829};
830
831static void
832taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
833{
834 struct taskqgroup_cpu *qcpu;
835 int i, j;
835
836 qcpu = &qgroup->tqg_queue[idx];
837 LIST_INIT(&qcpu->tgc_tasks);
838 qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK,
839 taskqueue_thread_enqueue, &qcpu->tgc_taskq);
840 taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
841 "%s_%d", qgroup->tqg_name, idx);
836
837 qcpu = &qgroup->tqg_queue[idx];
838 LIST_INIT(&qcpu->tgc_tasks);
839 qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK,
840 taskqueue_thread_enqueue, &qcpu->tgc_taskq);
841 taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
842 "%s_%d", qgroup->tqg_name, idx);
842 qcpu->tgc_cpu = idx * qgroup->tqg_stride;
843
844 for (i = CPU_FIRST(), j = 0; j < idx * qgroup->tqg_stride;
845 j++, i = CPU_NEXT(i)) {
846 /*
847 * Wait: evaluate the idx * qgroup->tqg_stride'th CPU,
848 * potentially wrapping the actual count
849 */
850 }
851 qcpu->tgc_cpu = i;
843}
844
845static void
846taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
847{
848
849 taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
850}
851
852/*
853 * Find the taskq with least # of tasks that doesn't currently have any
854 * other queues from the uniq identifier.
855 */
856static int
857taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
858{
859 struct grouptask *n;
860 int i, idx, mincnt;
861 int strict;
862
863 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
864 if (qgroup->tqg_cnt == 0)
865 return (0);
866 idx = -1;
867 mincnt = INT_MAX;
868 /*
869 * Two passes; First scan for a queue with the least tasks that
870 * does not already service this uniq id. If that fails simply find
871 * the queue with the least total tasks;
872 */
873 for (strict = 1; mincnt == INT_MAX; strict = 0) {
874 for (i = 0; i < qgroup->tqg_cnt; i++) {
875 if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
876 continue;
877 if (strict) {
878 LIST_FOREACH(n,
879 &qgroup->tqg_queue[i].tgc_tasks, gt_list)
880 if (n->gt_uniq == uniq)
881 break;
882 if (n != NULL)
883 continue;
884 }
885 mincnt = qgroup->tqg_queue[i].tgc_cnt;
886 idx = i;
887 }
888 }
889 if (idx == -1)
890 panic("taskqgroup_find: Failed to pick a qid.");
891
892 return (idx);
893}
894
895void
896taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
897 void *uniq, int irq, char *name)
898{
899 cpuset_t mask;
900 int qid;
901
902 gtask->gt_uniq = uniq;
903 gtask->gt_name = name;
904 gtask->gt_irq = irq;
905 gtask->gt_cpu = -1;
906 mtx_lock(&qgroup->tqg_lock);
907 qid = taskqgroup_find(qgroup, uniq);
908 qgroup->tqg_queue[qid].tgc_cnt++;
909 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
910 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
911 if (irq != -1 && smp_started) {
912 CPU_ZERO(&mask);
913 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
914 mtx_unlock(&qgroup->tqg_lock);
915 intr_setaffinity(irq, &mask);
916 } else
917 mtx_unlock(&qgroup->tqg_lock);
918}
919
920int
921taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
922 void *uniq, int cpu, int irq, char *name)
923{
924 cpuset_t mask;
925 int i, qid;
926
927 qid = -1;
928 gtask->gt_uniq = uniq;
929 gtask->gt_name = name;
930 gtask->gt_irq = irq;
931 gtask->gt_cpu = cpu;
932 mtx_lock(&qgroup->tqg_lock);
933 if (smp_started) {
934 for (i = 0; i < qgroup->tqg_cnt; i++)
935 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
936 qid = i;
937 break;
938 }
939 if (qid == -1) {
940 mtx_unlock(&qgroup->tqg_lock);
941 return (EINVAL);
942 }
943 } else
944 qid = 0;
945 qgroup->tqg_queue[qid].tgc_cnt++;
946 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
947 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
948 if (irq != -1 && smp_started) {
949 CPU_ZERO(&mask);
950 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
951 mtx_unlock(&qgroup->tqg_lock);
952 intr_setaffinity(irq, &mask);
953 } else
954 mtx_unlock(&qgroup->tqg_lock);
955 return (0);
956}
957
958void
959taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
960{
961 int i;
962
963 mtx_lock(&qgroup->tqg_lock);
964 for (i = 0; i < qgroup->tqg_cnt; i++)
965 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
966 break;
967 if (i == qgroup->tqg_cnt)
968 panic("taskqgroup_detach: task not in group\n");
969 qgroup->tqg_queue[i].tgc_cnt--;
970 LIST_REMOVE(gtask, gt_list);
971 mtx_unlock(&qgroup->tqg_lock);
972 gtask->gt_taskqueue = NULL;
973}
974
975static void
976taskqgroup_binder(void *ctx, int pending)
977{
978 struct taskq_bind_task *task = (struct taskq_bind_task *)ctx;
979 cpuset_t mask;
980 int error;
981
982 CPU_ZERO(&mask);
983 CPU_SET(task->bt_cpuid, &mask);
984 error = cpuset_setthread(curthread->td_tid, &mask);
985 thread_lock(curthread);
986 sched_bind(curthread, task->bt_cpuid);
987 thread_unlock(curthread);
988
989 if (error)
990 printf("taskqgroup_binder: setaffinity failed: %d\n",
991 error);
992 free(task, M_DEVBUF);
993}
994
995static void
996taskqgroup_bind(struct taskqgroup *qgroup)
997{
998 struct taskq_bind_task *task;
999 int i;
1000
1001 /*
1002 * Bind taskqueue threads to specific CPUs, if they have been assigned
1003 * one.
1004 */
1005 for (i = 0; i < qgroup->tqg_cnt; i++) {
1006 task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT);
1007 TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task);
1008 task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
1009 taskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
1010 &task->bt_task);
1011 }
1012}
1013
1014static int
1015_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
1016{
1017 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
1018 cpuset_t mask;
1019 struct grouptask *gtask;
852}
853
854static void
855taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
856{
857
858 taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
859}
860
861/*
862 * Find the taskq with least # of tasks that doesn't currently have any
863 * other queues from the uniq identifier.
864 */
865static int
866taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
867{
868 struct grouptask *n;
869 int i, idx, mincnt;
870 int strict;
871
872 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
873 if (qgroup->tqg_cnt == 0)
874 return (0);
875 idx = -1;
876 mincnt = INT_MAX;
877 /*
878 * Two passes; First scan for a queue with the least tasks that
879 * does not already service this uniq id. If that fails simply find
880 * the queue with the least total tasks;
881 */
882 for (strict = 1; mincnt == INT_MAX; strict = 0) {
883 for (i = 0; i < qgroup->tqg_cnt; i++) {
884 if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
885 continue;
886 if (strict) {
887 LIST_FOREACH(n,
888 &qgroup->tqg_queue[i].tgc_tasks, gt_list)
889 if (n->gt_uniq == uniq)
890 break;
891 if (n != NULL)
892 continue;
893 }
894 mincnt = qgroup->tqg_queue[i].tgc_cnt;
895 idx = i;
896 }
897 }
898 if (idx == -1)
899 panic("taskqgroup_find: Failed to pick a qid.");
900
901 return (idx);
902}
903
904void
905taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
906 void *uniq, int irq, char *name)
907{
908 cpuset_t mask;
909 int qid;
910
911 gtask->gt_uniq = uniq;
912 gtask->gt_name = name;
913 gtask->gt_irq = irq;
914 gtask->gt_cpu = -1;
915 mtx_lock(&qgroup->tqg_lock);
916 qid = taskqgroup_find(qgroup, uniq);
917 qgroup->tqg_queue[qid].tgc_cnt++;
918 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
919 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
920 if (irq != -1 && smp_started) {
921 CPU_ZERO(&mask);
922 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
923 mtx_unlock(&qgroup->tqg_lock);
924 intr_setaffinity(irq, &mask);
925 } else
926 mtx_unlock(&qgroup->tqg_lock);
927}
928
929int
930taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
931 void *uniq, int cpu, int irq, char *name)
932{
933 cpuset_t mask;
934 int i, qid;
935
936 qid = -1;
937 gtask->gt_uniq = uniq;
938 gtask->gt_name = name;
939 gtask->gt_irq = irq;
940 gtask->gt_cpu = cpu;
941 mtx_lock(&qgroup->tqg_lock);
942 if (smp_started) {
943 for (i = 0; i < qgroup->tqg_cnt; i++)
944 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
945 qid = i;
946 break;
947 }
948 if (qid == -1) {
949 mtx_unlock(&qgroup->tqg_lock);
950 return (EINVAL);
951 }
952 } else
953 qid = 0;
954 qgroup->tqg_queue[qid].tgc_cnt++;
955 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
956 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
957 if (irq != -1 && smp_started) {
958 CPU_ZERO(&mask);
959 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
960 mtx_unlock(&qgroup->tqg_lock);
961 intr_setaffinity(irq, &mask);
962 } else
963 mtx_unlock(&qgroup->tqg_lock);
964 return (0);
965}
966
967void
968taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
969{
970 int i;
971
972 mtx_lock(&qgroup->tqg_lock);
973 for (i = 0; i < qgroup->tqg_cnt; i++)
974 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
975 break;
976 if (i == qgroup->tqg_cnt)
977 panic("taskqgroup_detach: task not in group\n");
978 qgroup->tqg_queue[i].tgc_cnt--;
979 LIST_REMOVE(gtask, gt_list);
980 mtx_unlock(&qgroup->tqg_lock);
981 gtask->gt_taskqueue = NULL;
982}
983
984static void
985taskqgroup_binder(void *ctx, int pending)
986{
987 struct taskq_bind_task *task = (struct taskq_bind_task *)ctx;
988 cpuset_t mask;
989 int error;
990
991 CPU_ZERO(&mask);
992 CPU_SET(task->bt_cpuid, &mask);
993 error = cpuset_setthread(curthread->td_tid, &mask);
994 thread_lock(curthread);
995 sched_bind(curthread, task->bt_cpuid);
996 thread_unlock(curthread);
997
998 if (error)
999 printf("taskqgroup_binder: setaffinity failed: %d\n",
1000 error);
1001 free(task, M_DEVBUF);
1002}
1003
1004static void
1005taskqgroup_bind(struct taskqgroup *qgroup)
1006{
1007 struct taskq_bind_task *task;
1008 int i;
1009
1010 /*
1011 * Bind taskqueue threads to specific CPUs, if they have been assigned
1012 * one.
1013 */
1014 for (i = 0; i < qgroup->tqg_cnt; i++) {
1015 task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT);
1016 TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task);
1017 task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
1018 taskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
1019 &task->bt_task);
1020 }
1021}
1022
1023static int
1024_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
1025{
1026 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
1027 cpuset_t mask;
1028 struct grouptask *gtask;
1020 int i, old_cnt, qid;
1029 int i, k, old_cnt, qid, cpu;
1021
1022 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
1023
1024 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
1030
1031 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
1032
1033 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
1025 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
1026 cnt, stride, mp_ncpus, smp_started);
1034 printf("taskqgroup_adjust failed cnt: %d stride: %d "
1035 "mp_ncpus: %d smp_started: %d\n", cnt, stride, mp_ncpus,
1036 smp_started);
1027 return (EINVAL);
1028 }
1029 if (qgroup->tqg_adjusting) {
1030 printf("taskqgroup_adjust failed: adjusting\n");
1031 return (EBUSY);
1032 }
1033 qgroup->tqg_adjusting = 1;
1034 old_cnt = qgroup->tqg_cnt;
1035 mtx_unlock(&qgroup->tqg_lock);
1036 /*
1037 * Set up queue for tasks added before boot.
1038 */
1039 if (old_cnt == 0) {
1040 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
1041 grouptask, gt_list);
1042 qgroup->tqg_queue[0].tgc_cnt = 0;
1043 }
1044
1045 /*
1046 * If new taskq threads have been added.
1047 */
1048 for (i = old_cnt; i < cnt; i++)
1049 taskqgroup_cpu_create(qgroup, i);
1050 mtx_lock(&qgroup->tqg_lock);
1051 qgroup->tqg_cnt = cnt;
1052 qgroup->tqg_stride = stride;
1053
1054 /*
1055 * Adjust drivers to use new taskqs.
1056 */
1057 for (i = 0; i < old_cnt; i++) {
1058 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
1059 LIST_REMOVE(gtask, gt_list);
1060 qgroup->tqg_queue[i].tgc_cnt--;
1061 LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
1062 }
1063 }
1064
1065 while ((gtask = LIST_FIRST(&gtask_head))) {
1066 LIST_REMOVE(gtask, gt_list);
1067 if (gtask->gt_cpu == -1)
1068 qid = taskqgroup_find(qgroup, gtask->gt_uniq);
1069 else {
1070 for (i = 0; i < qgroup->tqg_cnt; i++)
1071 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) {
1072 qid = i;
1073 break;
1074 }
1075 }
1076 qgroup->tqg_queue[qid].tgc_cnt++;
1077 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
1078 gt_list);
1079 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
1080 }
1081 /*
1082 * Set new CPU and IRQ affinity
1083 */
1037 return (EINVAL);
1038 }
1039 if (qgroup->tqg_adjusting) {
1040 printf("taskqgroup_adjust failed: adjusting\n");
1041 return (EBUSY);
1042 }
1043 qgroup->tqg_adjusting = 1;
1044 old_cnt = qgroup->tqg_cnt;
1045 mtx_unlock(&qgroup->tqg_lock);
1046 /*
1047 * Set up queue for tasks added before boot.
1048 */
1049 if (old_cnt == 0) {
1050 LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
1051 grouptask, gt_list);
1052 qgroup->tqg_queue[0].tgc_cnt = 0;
1053 }
1054
1055 /*
1056 * If new taskq threads have been added.
1057 */
1058 for (i = old_cnt; i < cnt; i++)
1059 taskqgroup_cpu_create(qgroup, i);
1060 mtx_lock(&qgroup->tqg_lock);
1061 qgroup->tqg_cnt = cnt;
1062 qgroup->tqg_stride = stride;
1063
1064 /*
1065 * Adjust drivers to use new taskqs.
1066 */
1067 for (i = 0; i < old_cnt; i++) {
1068 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
1069 LIST_REMOVE(gtask, gt_list);
1070 qgroup->tqg_queue[i].tgc_cnt--;
1071 LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
1072 }
1073 }
1074
1075 while ((gtask = LIST_FIRST(&gtask_head))) {
1076 LIST_REMOVE(gtask, gt_list);
1077 if (gtask->gt_cpu == -1)
1078 qid = taskqgroup_find(qgroup, gtask->gt_uniq);
1079 else {
1080 for (i = 0; i < qgroup->tqg_cnt; i++)
1081 if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) {
1082 qid = i;
1083 break;
1084 }
1085 }
1086 qgroup->tqg_queue[qid].tgc_cnt++;
1087 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
1088 gt_list);
1089 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
1090 }
1091 /*
1092 * Set new CPU and IRQ affinity
1093 */
1094 cpu = CPU_FIRST();
1084 for (i = 0; i < cnt; i++) {
1095 for (i = 0; i < cnt; i++) {
1085 qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride;
1096 qgroup->tqg_queue[i].tgc_cpu = cpu;
1097 for (k = 0; k < qgroup->tqg_stride; k++)
1098 cpu = CPU_NEXT(cpu);
1086 CPU_ZERO(&mask);
1087 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
1088 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
1089 if (gtask->gt_irq == -1)
1090 continue;
1091 intr_setaffinity(gtask->gt_irq, &mask);
1092 }
1093 }
1094 mtx_unlock(&qgroup->tqg_lock);
1095
1096 /*
1097 * If taskq thread count has been reduced.
1098 */
1099 for (i = cnt; i < old_cnt; i++)
1100 taskqgroup_cpu_remove(qgroup, i);
1101
1102 mtx_lock(&qgroup->tqg_lock);
1103 qgroup->tqg_adjusting = 0;
1104
1105 taskqgroup_bind(qgroup);
1106
1107 return (0);
1108}
1109
1110int
1111taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
1112{
1113 int error;
1114
1115 mtx_lock(&qgroup->tqg_lock);
1116 error = _taskqgroup_adjust(qgroup, cpu, stride);
1117 mtx_unlock(&qgroup->tqg_lock);
1118
1119 return (error);
1120}
1121
1122struct taskqgroup *
1123taskqgroup_create(char *name)
1124{
1125 struct taskqgroup *qgroup;
1126
1127 qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO);
1128 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
1129 qgroup->tqg_name = name;
1130 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
1131
1132 return (qgroup);
1133}
1134
1135void
1136taskqgroup_destroy(struct taskqgroup *qgroup)
1137{
1138
1139}
1099 CPU_ZERO(&mask);
1100 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
1101 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
1102 if (gtask->gt_irq == -1)
1103 continue;
1104 intr_setaffinity(gtask->gt_irq, &mask);
1105 }
1106 }
1107 mtx_unlock(&qgroup->tqg_lock);
1108
1109 /*
1110 * If taskq thread count has been reduced.
1111 */
1112 for (i = cnt; i < old_cnt; i++)
1113 taskqgroup_cpu_remove(qgroup, i);
1114
1115 mtx_lock(&qgroup->tqg_lock);
1116 qgroup->tqg_adjusting = 0;
1117
1118 taskqgroup_bind(qgroup);
1119
1120 return (0);
1121}
1122
1123int
1124taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
1125{
1126 int error;
1127
1128 mtx_lock(&qgroup->tqg_lock);
1129 error = _taskqgroup_adjust(qgroup, cpu, stride);
1130 mtx_unlock(&qgroup->tqg_lock);
1131
1132 return (error);
1133}
1134
1135struct taskqgroup *
1136taskqgroup_create(char *name)
1137{
1138 struct taskqgroup *qgroup;
1139
1140 qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO);
1141 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
1142 qgroup->tqg_name = name;
1143 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
1144
1145 return (qgroup);
1146}
1147
1148void
1149taskqgroup_destroy(struct taskqgroup *qgroup)
1150{
1151
1152}