1238106Sdes/*- 2238106Sdes * Copyright (c) 2000 Doug Rabson 3238106Sdes * Copyright (c) 2014 Jeff Roberson 4238106Sdes * Copyright (c) 2016 Matthew Macy 5238106Sdes * All rights reserved. 6238106Sdes * 7238106Sdes * Redistribution and use in source and binary forms, with or without 8238106Sdes * modification, are permitted provided that the following conditions 9238106Sdes * are met: 10238106Sdes * 1. Redistributions of source code must retain the above copyright 11238106Sdes * notice, this list of conditions and the following disclaimer. 12238106Sdes * 2. Redistributions in binary form must reproduce the above copyright 13238106Sdes * notice, this list of conditions and the following disclaimer in the 14238106Sdes * documentation and/or other materials provided with the distribution. 15238106Sdes * 16238106Sdes * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17238106Sdes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18238106Sdes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19238106Sdes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20238106Sdes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21238106Sdes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22238106Sdes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23238106Sdes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24269257Sdes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25269257Sdes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26269257Sdes * SUCH DAMAGE. 27269257Sdes */ 28269257Sdes 29269257Sdes#include <sys/cdefs.h> 30269257Sdes__FBSDID("$FreeBSD: releng/11.0/sys/kern/subr_gtaskqueue.c 305267 2016-09-02 01:41:57Z nwhitehorn $"); 31269257Sdes 32269257Sdes#include <sys/param.h> 33269257Sdes#include <sys/systm.h> 34238106Sdes#include <sys/bus.h> 35238106Sdes#include <sys/cpuset.h> 36238106Sdes#include <sys/interrupt.h> 37238106Sdes#include <sys/kernel.h> 38238106Sdes#include <sys/kthread.h> 39238106Sdes#include <sys/libkern.h> 40238106Sdes#include <sys/limits.h> 41285206Sdes#include <sys/lock.h> 42285206Sdes#include <sys/malloc.h> 43238106Sdes#include <sys/mutex.h> 44238106Sdes#include <sys/proc.h> 45238106Sdes#include <sys/sched.h> 46238106Sdes#include <sys/smp.h> 47238106Sdes#include <sys/gtaskqueue.h> 48238106Sdes#include <sys/unistd.h> 49285206Sdes#include <machine/stdarg.h> 50285206Sdes 51285206Sdesstatic MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues"); 52285206Sdesstatic void gtaskqueue_thread_enqueue(void *); 53238106Sdesstatic void gtaskqueue_thread_loop(void *arg); 54238106Sdes 55238106Sdes 56238106Sdesstruct gtaskqueue_busy { 57238106Sdes struct gtask *tb_running; 58238106Sdes TAILQ_ENTRY(gtaskqueue_busy) tb_link; 59238106Sdes}; 60238106Sdes 61238106Sdesstatic struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 62238106Sdes 63238106Sdesstruct gtaskqueue { 64238106Sdes STAILQ_HEAD(, gtask) tq_queue; 65238106Sdes gtaskqueue_enqueue_fn tq_enqueue; 66238106Sdes void *tq_context; 67238106Sdes char *tq_name; 68238106Sdes TAILQ_HEAD(, gtaskqueue_busy) tq_active; 69238106Sdes struct mtx tq_mutex; 70238106Sdes struct thread **tq_threads; 71238106Sdes int tq_tcount; 72238106Sdes int tq_spin; 73238106Sdes int tq_flags; 74238106Sdes int tq_callouts; 75238106Sdes taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 76238106Sdes void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 77238106Sdes}; 78238106Sdes 79238106Sdes#define TQ_FLAGS_ACTIVE (1 << 0) 80238106Sdes#define TQ_FLAGS_BLOCKED (1 << 1) 81291767Sdes#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 82291767Sdes 83291767Sdes#define DT_CALLOUT_ARMED (1 << 0) 84291767Sdes 85238106Sdes#define TQ_LOCK(tq) \ 86238106Sdes do { \ 87238106Sdes if ((tq)->tq_spin) \ 88238106Sdes mtx_lock_spin(&(tq)->tq_mutex); \ 89285206Sdes else \ 90285206Sdes mtx_lock(&(tq)->tq_mutex); \ 91285206Sdes } while (0) 92238106Sdes#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 93238106Sdes 94238106Sdes#define TQ_UNLOCK(tq) \ 95238106Sdes do { \ 96238106Sdes if ((tq)->tq_spin) \ 97238106Sdes mtx_unlock_spin(&(tq)->tq_mutex); \ 98238106Sdes else \ 99238106Sdes mtx_unlock(&(tq)->tq_mutex); \ 100238106Sdes } while (0) 101238106Sdes#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 102238106Sdes 103238106Sdesstatic __inline int 104238106SdesTQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 105238106Sdes int t) 106238106Sdes{ 107238106Sdes if (tq->tq_spin) 108238106Sdes return (msleep_spin(p, m, wm, t)); 109238106Sdes return (msleep(p, m, pri, wm, t)); 110238106Sdes} 111238106Sdes 112238106Sdesstatic struct gtaskqueue * 113238106Sdes_gtaskqueue_create(const char *name, int mflags, 114238106Sdes taskqueue_enqueue_fn enqueue, void *context, 115238106Sdes int mtxflags, const char *mtxname __unused) 116238106Sdes{ 117238106Sdes struct gtaskqueue *queue; 118238106Sdes char *tq_name; 119238106Sdes 120238106Sdes tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 121238106Sdes if (!tq_name) 122238106Sdes return (NULL); 123238106Sdes 124238106Sdes snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 125238106Sdes 126238106Sdes queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 127238106Sdes if (!queue) 128238106Sdes return (NULL); 129238106Sdes 130238106Sdes STAILQ_INIT(&queue->tq_queue); 131238106Sdes TAILQ_INIT(&queue->tq_active); 132238106Sdes queue->tq_enqueue = enqueue; 133238106Sdes queue->tq_context = context; 134238106Sdes queue->tq_name = tq_name; 135238106Sdes queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 136238106Sdes queue->tq_flags |= TQ_FLAGS_ACTIVE; 137238106Sdes if (enqueue == gtaskqueue_thread_enqueue) 138238106Sdes queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 139238106Sdes mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 140238106Sdes 141285206Sdes return (queue); 142285206Sdes} 143291767Sdes 144291767Sdes 145285206Sdes/* 146285206Sdes * Signal a taskqueue thread to terminate. 147291767Sdes */ 148285206Sdesstatic void 149291767Sdesgtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 150291767Sdes{ 151291767Sdes 152291767Sdes while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 153291767Sdes wakeup(tq); 154291767Sdes TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 155291767Sdes } 156291767Sdes} 157291767Sdes 158291767Sdesstatic void 159291767Sdesgtaskqueue_free(struct gtaskqueue *queue) 160291767Sdes{ 161291767Sdes 162291767Sdes TQ_LOCK(queue); 163291767Sdes queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 164291767Sdes gtaskqueue_terminate(queue->tq_threads, queue); 165291767Sdes KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 166291767Sdes KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 167291767Sdes mtx_destroy(&queue->tq_mutex); 168291767Sdes free(queue->tq_threads, M_GTASKQUEUE); 169291767Sdes free(queue->tq_name, M_GTASKQUEUE); 170291767Sdes free(queue, M_GTASKQUEUE); 171291767Sdes} 172291767Sdes 173291767Sdesint 174291767Sdesgrouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 175285206Sdes{ 176285206Sdes TQ_LOCK(queue); 177285206Sdes if (gtask->ta_flags & TASK_ENQUEUED) { 178291767Sdes TQ_UNLOCK(queue); 179291767Sdes return (0); 180285206Sdes } 181291767Sdes STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 182285206Sdes gtask->ta_flags |= TASK_ENQUEUED; 183285206Sdes TQ_UNLOCK(queue); 184285206Sdes if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 185285206Sdes queue->tq_enqueue(queue->tq_context); 186285206Sdes return (0); 187238106Sdes} 188238106Sdes 189238106Sdesstatic void 190238106Sdesgtaskqueue_task_nop_fn(void *context) 191238106Sdes{ 192238106Sdes} 193238106Sdes 194238106Sdes/* 195238106Sdes * Block until all currently queued tasks in this taskqueue 196238106Sdes * have begun execution. Tasks queued during execution of 197238106Sdes * this function are ignored. 198238106Sdes */ 199238106Sdesstatic void 200238106Sdesgtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 201238106Sdes{ 202238106Sdes struct gtask t_barrier; 203238106Sdes 204238106Sdes if (STAILQ_EMPTY(&queue->tq_queue)) 205238106Sdes return; 206238106Sdes 207238106Sdes /* 208238106Sdes * Enqueue our barrier after all current tasks, but with 209238106Sdes * the highest priority so that newly queued tasks cannot 210285206Sdes * pass it. Because of the high priority, we can not use 211294190Sdes * taskqueue_enqueue_locked directly (which drops the lock 212294190Sdes * anyway) so just insert it at tail while we have the 213238106Sdes * queue lock. 214238106Sdes */ 215238106Sdes GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 216238106Sdes STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 217294190Sdes t_barrier.ta_flags |= TASK_ENQUEUED; 218294190Sdes 219285206Sdes /* 220285206Sdes * Once the barrier has executed, all previously queued tasks 221285206Sdes * have completed or are currently executing. 222285206Sdes */ 223285206Sdes while (t_barrier.ta_flags & TASK_ENQUEUED) 224285206Sdes TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 225285206Sdes} 226285206Sdes 227285206Sdes/* 228285206Sdes * Block until all currently executing tasks for this taskqueue 229285206Sdes * complete. Tasks that begin execution during the execution 230285206Sdes * of this function are ignored. 231285206Sdes */ 232285206Sdesstatic void 233285206Sdesgtaskqueue_drain_tq_active(struct gtaskqueue *queue) 234291767Sdes{ 235285206Sdes struct gtaskqueue_busy tb_marker, *tb_first; 236285206Sdes 237285206Sdes if (TAILQ_EMPTY(&queue->tq_active)) 238285206Sdes return; 239285206Sdes 240285206Sdes /* Block taskq_terminate().*/ 241238106Sdes queue->tq_callouts++; 242238106Sdes 243238106Sdes /* 244238106Sdes * Wait for all currently executing taskqueue threads 245238106Sdes * to go idle. 246238106Sdes */ 247238106Sdes tb_marker.tb_running = TB_DRAIN_WAITER; 248291767Sdes TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 249238106Sdes while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 250291767Sdes TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 251238106Sdes TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 252238106Sdes 253238106Sdes /* 254238106Sdes * Wakeup any other drain waiter that happened to queue up 255238106Sdes * without any intervening active thread. 256238106Sdes */ 257238106Sdes tb_first = TAILQ_FIRST(&queue->tq_active); 258238106Sdes if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 259238106Sdes wakeup(tb_first); 260238106Sdes 261238106Sdes /* Release taskqueue_terminate(). */ 262238106Sdes queue->tq_callouts--; 263291767Sdes if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 264291767Sdes wakeup_one(queue->tq_threads); 265291767Sdes} 266291767Sdes 267291767Sdesvoid 268291767Sdesgtaskqueue_block(struct gtaskqueue *queue) 269291767Sdes{ 270291767Sdes 271291767Sdes TQ_LOCK(queue); 272291767Sdes queue->tq_flags |= TQ_FLAGS_BLOCKED; 273291767Sdes TQ_UNLOCK(queue); 274291767Sdes} 275291767Sdes 276291767Sdesvoid 277291767Sdesgtaskqueue_unblock(struct gtaskqueue *queue) 278291767Sdes{ 279291767Sdes 280238106Sdes TQ_LOCK(queue); 281238106Sdes queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 282238106Sdes if (!STAILQ_EMPTY(&queue->tq_queue)) 283238106Sdes queue->tq_enqueue(queue->tq_context); 284238106Sdes TQ_UNLOCK(queue); 285238106Sdes} 286238106Sdes 287238106Sdesstatic void 288238106Sdesgtaskqueue_run_locked(struct gtaskqueue *queue) 289238106Sdes{ 290238106Sdes struct gtaskqueue_busy tb; 291238106Sdes struct gtaskqueue_busy *tb_first; 292238106Sdes struct gtask *gtask; 293238106Sdes 294238106Sdes KASSERT(queue != NULL, ("tq is NULL")); 295238106Sdes TQ_ASSERT_LOCKED(queue); 296238106Sdes tb.tb_running = NULL; 297238106Sdes 298238106Sdes while (STAILQ_FIRST(&queue->tq_queue)) { 299238106Sdes TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 300238106Sdes 301238106Sdes /* 302238106Sdes * Carefully remove the first task from the queue and 303238106Sdes * clear its TASK_ENQUEUED flag 304238106Sdes */ 305238106Sdes gtask = STAILQ_FIRST(&queue->tq_queue); 306238106Sdes KASSERT(gtask != NULL, ("task is NULL")); 307238106Sdes STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 308238106Sdes gtask->ta_flags &= ~TASK_ENQUEUED; 309238106Sdes tb.tb_running = gtask; 310238106Sdes TQ_UNLOCK(queue); 311238106Sdes 312238106Sdes KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 313238106Sdes gtask->ta_func(gtask->ta_context); 314238106Sdes 315238106Sdes TQ_LOCK(queue); 316238106Sdes tb.tb_running = NULL; 317238106Sdes wakeup(gtask); 318238106Sdes 319238106Sdes TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 320238106Sdes tb_first = TAILQ_FIRST(&queue->tq_active); 321238106Sdes if (tb_first != NULL && 322238106Sdes tb_first->tb_running == TB_DRAIN_WAITER) 323238106Sdes wakeup(tb_first); 324238106Sdes } 325238106Sdes} 326238106Sdes 327238106Sdesstatic int 328238106Sdestask_is_running(struct gtaskqueue *queue, struct gtask *gtask) 329238106Sdes{ 330238106Sdes struct gtaskqueue_busy *tb; 331238106Sdes 332238106Sdes TQ_ASSERT_LOCKED(queue); 333238106Sdes TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 334285206Sdes if (tb->tb_running == gtask) 335238106Sdes return (1); 336238106Sdes } 337238106Sdes return (0); 338285206Sdes} 339285206Sdes 340238106Sdesstatic int 341238106Sdesgtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 342238106Sdes{ 343238106Sdes 344238106Sdes if (gtask->ta_flags & TASK_ENQUEUED) 345238106Sdes STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 346238106Sdes gtask->ta_flags &= ~TASK_ENQUEUED; 347238106Sdes return (task_is_running(queue, gtask) ? EBUSY : 0); 348238106Sdes} 349238106Sdes 350285206Sdesint 351285206Sdesgtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 352285206Sdes{ 353285206Sdes int error; 354285206Sdes 355285206Sdes TQ_LOCK(queue); 356285206Sdes error = gtaskqueue_cancel_locked(queue, gtask); 357285206Sdes TQ_UNLOCK(queue); 358285206Sdes 359285206Sdes return (error); 360285206Sdes} 361285206Sdes 362285206Sdesvoid 363285206Sdesgtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 364285206Sdes{ 365285206Sdes 366285206Sdes if (!queue->tq_spin) 367285206Sdes WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 368285206Sdes 369285206Sdes TQ_LOCK(queue); 370285206Sdes while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 371285206Sdes TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 372285206Sdes TQ_UNLOCK(queue); 373238106Sdes} 374285206Sdes 375285206Sdesvoid 376285206Sdesgtaskqueue_drain_all(struct gtaskqueue *queue) 377285206Sdes{ 378238106Sdes 379285206Sdes if (!queue->tq_spin) 380285206Sdes WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 381238106Sdes 382285206Sdes TQ_LOCK(queue); 383238106Sdes gtaskqueue_drain_tq_queue(queue); 384285206Sdes gtaskqueue_drain_tq_active(queue); 385238106Sdes TQ_UNLOCK(queue); 386238106Sdes} 387285206Sdes 388285206Sdesstatic int 389285206Sdes_gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 390285206Sdes cpuset_t *mask, const char *name, va_list ap) 391291767Sdes{ 392291767Sdes char ktname[MAXCOMLEN + 1]; 393285206Sdes struct thread *td; 394238106Sdes struct gtaskqueue *tq; 395238106Sdes int i, error; 396238106Sdes 397238106Sdes if (count <= 0) 398238106Sdes return (EINVAL); 399238106Sdes 400238106Sdes vsnprintf(ktname, sizeof(ktname), name, ap); 401238106Sdes tq = *tqp; 402238106Sdes 403238106Sdes tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 404238106Sdes M_NOWAIT | M_ZERO); 405238106Sdes if (tq->tq_threads == NULL) { 406238106Sdes printf("%s: no memory for %s threads\n", __func__, ktname); 407238106Sdes return (ENOMEM); 408238106Sdes } 409238106Sdes 410238106Sdes for (i = 0; i < count; i++) { 411238106Sdes if (count == 1) 412238106Sdes error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 413238106Sdes &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 414238106Sdes else 415238106Sdes error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 416238106Sdes &tq->tq_threads[i], RFSTOPPED, 0, 417238106Sdes "%s_%d", ktname, i); 418238106Sdes if (error) { 419238106Sdes /* should be ok to continue, taskqueue_free will dtrt */ 420238106Sdes printf("%s: kthread_add(%s): error %d", __func__, 421238106Sdes ktname, error); 422238106Sdes tq->tq_threads[i] = NULL; /* paranoid */ 423238106Sdes } else 424238106Sdes tq->tq_tcount++; 425238106Sdes } 426238106Sdes for (i = 0; i < count; i++) { 427238106Sdes if (tq->tq_threads[i] == NULL) 428238106Sdes continue; 429238106Sdes td = tq->tq_threads[i]; 430238106Sdes if (mask) { 431238106Sdes error = cpuset_setthread(td->td_tid, mask); 432285206Sdes /* 433238106Sdes * Failing to pin is rarely an actual fatal error; 434238106Sdes * it'll just affect performance. 435238106Sdes */ 436238106Sdes if (error) 437238106Sdes printf("%s: curthread=%llu: can't pin; " 438238106Sdes "error=%d\n", 439238106Sdes __func__, 440285206Sdes (unsigned long long) td->td_tid, 441238106Sdes error); 442238106Sdes } 443238106Sdes thread_lock(td); 444238106Sdes sched_prio(td, pri); 445285206Sdes sched_add(td, SRQ_BORING); 446238106Sdes thread_unlock(td); 447238106Sdes } 448238106Sdes 449238106Sdes return (0); 450238106Sdes} 451238106Sdes 452238106Sdesstatic int 453238106Sdesgtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 454238106Sdes const char *name, ...) 455238106Sdes{ 456238106Sdes va_list ap; 457238106Sdes int error; 458238106Sdes 459238106Sdes va_start(ap, name); 460238106Sdes error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 461238106Sdes va_end(ap); 462238106Sdes return (error); 463238106Sdes} 464238106Sdes 465238106Sdesstatic inline void 466238106Sdesgtaskqueue_run_callback(struct gtaskqueue *tq, 467238106Sdes enum taskqueue_callback_type cb_type) 468238106Sdes{ 469238106Sdes taskqueue_callback_fn tq_callback; 470238106Sdes 471238106Sdes TQ_ASSERT_UNLOCKED(tq); 472238106Sdes tq_callback = tq->tq_callbacks[cb_type]; 473238106Sdes if (tq_callback != NULL) 474238106Sdes tq_callback(tq->tq_cb_contexts[cb_type]); 475238106Sdes} 476238106Sdes 477238106Sdesstatic void 478238106Sdesgtaskqueue_thread_loop(void *arg) 479238106Sdes{ 480238106Sdes struct gtaskqueue **tqp, *tq; 481238106Sdes 482238106Sdes tqp = arg; 483238106Sdes tq = *tqp; 484238106Sdes gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 485238106Sdes TQ_LOCK(tq); 486238106Sdes while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 487238106Sdes /* XXX ? */ 488238106Sdes gtaskqueue_run_locked(tq); 489238106Sdes /* 490238106Sdes * Because taskqueue_run() can drop tq_mutex, we need to 491238106Sdes * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 492238106Sdes * meantime, which means we missed a wakeup. 493238106Sdes */ 494238106Sdes if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 495238106Sdes break; 496238106Sdes TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 497238106Sdes } 498238106Sdes gtaskqueue_run_locked(tq); 499238106Sdes /* 500238106Sdes * This thread is on its way out, so just drop the lock temporarily 501238106Sdes * in order to call the shutdown callback. This allows the callback 502238106Sdes * to look at the taskqueue, even just before it dies. 503238106Sdes */ 504238106Sdes TQ_UNLOCK(tq); 505238106Sdes gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 506238106Sdes TQ_LOCK(tq); 507238106Sdes 508238106Sdes /* rendezvous with thread that asked us to terminate */ 509238106Sdes tq->tq_tcount--; 510238106Sdes wakeup_one(tq->tq_threads); 511238106Sdes TQ_UNLOCK(tq); 512238106Sdes kthread_exit(); 513238106Sdes} 514238106Sdes 515238106Sdesstatic void 516238106Sdesgtaskqueue_thread_enqueue(void *context) 517238106Sdes{ 518238106Sdes struct gtaskqueue **tqp, *tq; 519238106Sdes 520238106Sdes tqp = context; 521238106Sdes tq = *tqp; 522238106Sdes wakeup_one(tq); 523238106Sdes} 524238106Sdes 525238106Sdes 526238106Sdesstatic struct gtaskqueue * 527238106Sdesgtaskqueue_create_fast(const char *name, int mflags, 528238106Sdes taskqueue_enqueue_fn enqueue, void *context) 529238106Sdes{ 530238106Sdes return _gtaskqueue_create(name, mflags, enqueue, context, 531238106Sdes MTX_SPIN, "fast_taskqueue"); 532238106Sdes} 533238106Sdes 534238106Sdes 535238106Sdesstruct taskqgroup_cpu { 536238106Sdes LIST_HEAD(, grouptask) tgc_tasks; 537238106Sdes struct gtaskqueue *tgc_taskq; 538238106Sdes int tgc_cnt; 539238106Sdes int tgc_cpu; 540238106Sdes}; 541238106Sdes 542238106Sdesstruct taskqgroup { 543238106Sdes struct taskqgroup_cpu tqg_queue[MAXCPU]; 544238106Sdes struct mtx tqg_lock; 545238106Sdes char * tqg_name; 546238106Sdes int tqg_adjusting; 547238106Sdes int tqg_stride; 548238106Sdes int tqg_cnt; 549238106Sdes}; 550238106Sdes 551238106Sdesstruct taskq_bind_task { 552238106Sdes struct gtask bt_task; 553238106Sdes int bt_cpuid; 554238106Sdes}; 555238106Sdes 556238106Sdesstatic void 557238106Sdestaskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) 558238106Sdes{ 559238106Sdes struct taskqgroup_cpu *qcpu; 560238106Sdes 561238106Sdes qcpu = &qgroup->tqg_queue[idx]; 562238106Sdes LIST_INIT(&qcpu->tgc_tasks); 563238106Sdes qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 564238106Sdes taskqueue_thread_enqueue, &qcpu->tgc_taskq); 565238106Sdes gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 566238106Sdes "%s_%d", qgroup->tqg_name, idx); 567238106Sdes qcpu->tgc_cpu = idx * qgroup->tqg_stride; 568238106Sdes} 569238106Sdes 570238106Sdesstatic void 571238106Sdestaskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 572238106Sdes{ 573238106Sdes 574238106Sdes gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 575238106Sdes} 576238106Sdes 577238106Sdes/* 578238106Sdes * Find the taskq with least # of tasks that doesn't currently have any 579238106Sdes * other queues from the uniq identifier. 580238106Sdes */ 581238106Sdesstatic int 582238106Sdestaskqgroup_find(struct taskqgroup *qgroup, void *uniq) 583238106Sdes{ 584238106Sdes struct grouptask *n; 585238106Sdes int i, idx, mincnt; 586238106Sdes int strict; 587238106Sdes 588238106Sdes mtx_assert(&qgroup->tqg_lock, MA_OWNED); 589238106Sdes if (qgroup->tqg_cnt == 0) 590238106Sdes return (0); 591238106Sdes idx = -1; 592238106Sdes mincnt = INT_MAX; 593238106Sdes /* 594238106Sdes * Two passes; First scan for a queue with the least tasks that 595238106Sdes * does not already service this uniq id. If that fails simply find 596238106Sdes * the queue with the least total tasks; 597238106Sdes */ 598238106Sdes for (strict = 1; mincnt == INT_MAX; strict = 0) { 599238106Sdes for (i = 0; i < qgroup->tqg_cnt; i++) { 600238106Sdes if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 601238106Sdes continue; 602238106Sdes if (strict) { 603238106Sdes LIST_FOREACH(n, 604238106Sdes &qgroup->tqg_queue[i].tgc_tasks, gt_list) 605238106Sdes if (n->gt_uniq == uniq) 606238106Sdes break; 607238106Sdes if (n != NULL) 608238106Sdes continue; 609238106Sdes } 610238106Sdes mincnt = qgroup->tqg_queue[i].tgc_cnt; 611238106Sdes idx = i; 612238106Sdes } 613238106Sdes } 614238106Sdes if (idx == -1) 615238106Sdes panic("taskqgroup_find: Failed to pick a qid."); 616238106Sdes 617238106Sdes return (idx); 618238106Sdes} 619238106Sdes 620238106Sdesvoid 621238106Sdestaskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 622238106Sdes void *uniq, int irq, char *name) 623238106Sdes{ 624238106Sdes cpuset_t mask; 625238106Sdes int qid; 626238106Sdes 627238106Sdes gtask->gt_uniq = uniq; 628238106Sdes gtask->gt_name = name; 629238106Sdes gtask->gt_irq = irq; 630238106Sdes gtask->gt_cpu = -1; 631238106Sdes mtx_lock(&qgroup->tqg_lock); 632238106Sdes qid = taskqgroup_find(qgroup, uniq); 633238106Sdes qgroup->tqg_queue[qid].tgc_cnt++; 634238106Sdes LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 635238106Sdes gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 636238106Sdes if (irq != -1 && smp_started) { 637238106Sdes CPU_ZERO(&mask); 638238106Sdes CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 639238106Sdes mtx_unlock(&qgroup->tqg_lock); 640238106Sdes intr_setaffinity(irq, &mask); 641238106Sdes } else 642238106Sdes mtx_unlock(&qgroup->tqg_lock); 643238106Sdes} 644238106Sdes 645238106Sdesint 646238106Sdestaskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 647238106Sdes void *uniq, int cpu, int irq, char *name) 648238106Sdes{ 649238106Sdes cpuset_t mask; 650238106Sdes int i, qid; 651238106Sdes 652238106Sdes qid = -1; 653238106Sdes gtask->gt_uniq = uniq; 654238106Sdes gtask->gt_name = name; 655238106Sdes gtask->gt_irq = irq; 656238106Sdes gtask->gt_cpu = cpu; 657238106Sdes mtx_lock(&qgroup->tqg_lock); 658238106Sdes if (smp_started) { 659238106Sdes for (i = 0; i < qgroup->tqg_cnt; i++) 660238106Sdes if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 661238106Sdes qid = i; 662238106Sdes break; 663238106Sdes } 664238106Sdes if (qid == -1) { 665238106Sdes mtx_unlock(&qgroup->tqg_lock); 666238106Sdes return (EINVAL); 667238106Sdes } 668238106Sdes } else 669238106Sdes qid = 0; 670238106Sdes qgroup->tqg_queue[qid].tgc_cnt++; 671238106Sdes LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 672238106Sdes gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 673238106Sdes if (irq != -1 && smp_started) { 674238106Sdes CPU_ZERO(&mask); 675238106Sdes CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 676238106Sdes mtx_unlock(&qgroup->tqg_lock); 677238106Sdes intr_setaffinity(irq, &mask); 678238106Sdes } else 679238106Sdes mtx_unlock(&qgroup->tqg_lock); 680238106Sdes return (0); 681285206Sdes} 682238106Sdes 683238106Sdesvoid 684238106Sdestaskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 685238106Sdes{ 686238106Sdes int i; 687238106Sdes 688238106Sdes mtx_lock(&qgroup->tqg_lock); 689238106Sdes for (i = 0; i < qgroup->tqg_cnt; i++) 690238106Sdes if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 691238106Sdes break; 692238106Sdes if (i == qgroup->tqg_cnt) 693238106Sdes panic("taskqgroup_detach: task not in group\n"); 694238106Sdes qgroup->tqg_queue[i].tgc_cnt--; 695238106Sdes LIST_REMOVE(gtask, gt_list); 696238106Sdes mtx_unlock(&qgroup->tqg_lock); 697238106Sdes gtask->gt_taskqueue = NULL; 698238106Sdes} 699238106Sdes 700238106Sdesstatic void 701238106Sdestaskqgroup_binder(void *ctx) 702238106Sdes{ 703238106Sdes struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 704238106Sdes cpuset_t mask; 705238106Sdes int error; 706238106Sdes 707238106Sdes CPU_ZERO(&mask); 708238106Sdes CPU_SET(gtask->bt_cpuid, &mask); 709238106Sdes error = cpuset_setthread(curthread->td_tid, &mask); 710238106Sdes thread_lock(curthread); 711238106Sdes sched_bind(curthread, gtask->bt_cpuid); 712238106Sdes thread_unlock(curthread); 713238106Sdes 714238106Sdes if (error) 715238106Sdes printf("taskqgroup_binder: setaffinity failed: %d\n", 716238106Sdes error); 717238106Sdes free(gtask, M_DEVBUF); 718238106Sdes} 719238106Sdes 720238106Sdesstatic void 721238106Sdestaskqgroup_bind(struct taskqgroup *qgroup) 722238106Sdes{ 723238106Sdes struct taskq_bind_task *gtask; 724238106Sdes int i; 725238106Sdes 726238106Sdes /* 727238106Sdes * Bind taskqueue threads to specific CPUs, if they have been assigned 728285206Sdes * one. 729285206Sdes */ 730285206Sdes for (i = 0; i < qgroup->tqg_cnt; i++) { 731285206Sdes gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT); 732238106Sdes GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 733285206Sdes gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 734285206Sdes grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 735285206Sdes >ask->bt_task); 736285206Sdes } 737285206Sdes} 738285206Sdes 739238106Sdesstatic int 740238106Sdes_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 741238106Sdes{ 742238106Sdes LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 743238106Sdes cpuset_t mask; 744285206Sdes struct grouptask *gtask; 745285206Sdes int i, k, old_cnt, qid, cpu; 746285206Sdes 747285206Sdes mtx_assert(&qgroup->tqg_lock, MA_OWNED); 748285206Sdes 749285206Sdes if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 750285206Sdes printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 751285206Sdes cnt, stride, mp_ncpus, smp_started); 752285206Sdes return (EINVAL); 753285206Sdes } 754238106Sdes if (qgroup->tqg_adjusting) { 755269257Sdes printf("taskqgroup_adjust failed: adjusting\n"); 756269257Sdes return (EBUSY); 757238106Sdes } 758238106Sdes qgroup->tqg_adjusting = 1; 759291767Sdes old_cnt = qgroup->tqg_cnt; 760291767Sdes mtx_unlock(&qgroup->tqg_lock); 761238106Sdes /* 762238106Sdes * Set up queue for tasks added before boot. 763238106Sdes */ 764238106Sdes if (old_cnt == 0) { 765238106Sdes LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 766238106Sdes grouptask, gt_list); 767238106Sdes qgroup->tqg_queue[0].tgc_cnt = 0; 768238106Sdes } 769238106Sdes 770238106Sdes /* 771238106Sdes * If new taskq threads have been added. 772238106Sdes */ 773238106Sdes for (i = old_cnt; i < cnt; i++) 774238106Sdes taskqgroup_cpu_create(qgroup, i); 775238106Sdes mtx_lock(&qgroup->tqg_lock); 776255579Sdes qgroup->tqg_cnt = cnt; 777238106Sdes qgroup->tqg_stride = stride; 778238106Sdes 779238106Sdes /* 780238106Sdes * Adjust drivers to use new taskqs. 781238106Sdes */ 782238106Sdes for (i = 0; i < old_cnt; i++) { 783238106Sdes while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 784238106Sdes LIST_REMOVE(gtask, gt_list); 785285206Sdes qgroup->tqg_queue[i].tgc_cnt--; 786238106Sdes LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 787238106Sdes } 788238106Sdes } 789238106Sdes 790238106Sdes while ((gtask = LIST_FIRST(>ask_head))) { 791238106Sdes LIST_REMOVE(gtask, gt_list); 792238106Sdes if (gtask->gt_cpu == -1) 793238106Sdes qid = taskqgroup_find(qgroup, gtask->gt_uniq); 794238106Sdes else { 795238106Sdes for (i = 0; i < qgroup->tqg_cnt; i++) 796238106Sdes if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { 797238106Sdes qid = i; 798238106Sdes break; 799238106Sdes } 800238106Sdes } 801238106Sdes qgroup->tqg_queue[qid].tgc_cnt++; 802238106Sdes LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 803238106Sdes gt_list); 804238106Sdes gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 805238106Sdes } 806238106Sdes /* 807238106Sdes * Set new CPU and IRQ affinity 808238106Sdes */ 809238106Sdes cpu = CPU_FIRST(); 810238106Sdes for (i = 0; i < cnt; i++) { 811238106Sdes qgroup->tqg_queue[i].tgc_cpu = cpu; 812238106Sdes for (k = 0; k < qgroup->tqg_stride; k++) 813238106Sdes cpu = CPU_NEXT(cpu); 814238106Sdes CPU_ZERO(&mask); 815238106Sdes CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); 816238106Sdes LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { 817238106Sdes if (gtask->gt_irq == -1) 818238106Sdes continue; 819238106Sdes intr_setaffinity(gtask->gt_irq, &mask); 820238106Sdes } 821238106Sdes } 822238106Sdes mtx_unlock(&qgroup->tqg_lock); 823238106Sdes 824238106Sdes /* 825238106Sdes * If taskq thread count has been reduced. 826238106Sdes */ 827238106Sdes for (i = cnt; i < old_cnt; i++) 828238106Sdes taskqgroup_cpu_remove(qgroup, i); 829238106Sdes 830238106Sdes mtx_lock(&qgroup->tqg_lock); 831238106Sdes qgroup->tqg_adjusting = 0; 832238106Sdes 833238106Sdes taskqgroup_bind(qgroup); 834238106Sdes 835238106Sdes return (0); 836238106Sdes} 837238106Sdes 838238106Sdesint 839238106Sdestaskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) 840238106Sdes{ 841269257Sdes int error; 842269257Sdes 843269257Sdes mtx_lock(&qgroup->tqg_lock); 844269257Sdes error = _taskqgroup_adjust(qgroup, cpu, stride); 845269257Sdes mtx_unlock(&qgroup->tqg_lock); 846269257Sdes 847238106Sdes return (error); 848238106Sdes} 849238106Sdes 850238106Sdesstruct taskqgroup * 851238106Sdestaskqgroup_create(char *name) 852238106Sdes{ 853238106Sdes struct taskqgroup *qgroup; 854238106Sdes 855238106Sdes qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 856238106Sdes mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 857238106Sdes qgroup->tqg_name = name; 858238106Sdes LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 859238106Sdes 860238106Sdes return (qgroup); 861238106Sdes} 862238106Sdes 863238106Sdesvoid 864285206Sdestaskqgroup_destroy(struct taskqgroup *qgroup) 865238106Sdes{ 866238106Sdes 867238106Sdes} 868238106Sdes