161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: stable/11/sys/kern/subr_taskqueue.c 354406 2019-11-06 18:15:20Z mav $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 33266629Sadrian#include <sys/cpuset.h> 3485560Sjhb#include <sys/interrupt.h> 3561033Sdfr#include <sys/kernel.h> 36123614Sjhb#include <sys/kthread.h> 37300113Sscottl#include <sys/libkern.h> 38225570Sadrian#include <sys/limits.h> 3985521Sjhb#include <sys/lock.h> 4061033Sdfr#include <sys/malloc.h> 4185521Sjhb#include <sys/mutex.h> 42145729Ssam#include <sys/proc.h> 43154333Sscottl#include <sys/sched.h> 44300113Sscottl#include <sys/smp.h> 4585521Sjhb#include <sys/taskqueue.h> 46119708Sken#include <sys/unistd.h> 47154333Sscottl#include <machine/stdarg.h> 4861033Sdfr 4969774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 50123614Sjhbstatic void *taskqueue_giant_ih; 51123614Sjhbstatic void *taskqueue_ih; 52256613Smavstatic void taskqueue_fast_enqueue(void *); 53256613Smavstatic void taskqueue_swi_enqueue(void *); 54256613Smavstatic void taskqueue_swi_giant_enqueue(void *); 5567551Sjhb 56213813Smdfstruct taskqueue_busy { 57354406Smav struct task *tb_running; 58354406Smav u_int tb_seq; 59354406Smav LIST_ENTRY(taskqueue_busy) tb_link; 60213813Smdf}; 61213813Smdf 6261033Sdfrstruct taskqueue { 6361033Sdfr STAILQ_HEAD(, task) tq_queue; 64354406Smav LIST_HEAD(, taskqueue_busy) tq_active; 65354406Smav struct task *tq_hint; 66354406Smav u_int tq_seq; 67354406Smav int tq_callouts; 68354406Smav struct mtx_padalign tq_mutex; 6961033Sdfr taskqueue_enqueue_fn tq_enqueue; 7061033Sdfr void *tq_context; 71300113Sscottl char *tq_name; 72178015Ssam struct thread **tq_threads; 73178015Ssam int tq_tcount; 74180588Skmacy int tq_spin; 75154333Sscottl int tq_flags; 76248649Swill taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 77248649Swill void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7861033Sdfr}; 7961033Sdfr 80154333Sscottl#define TQ_FLAGS_ACTIVE (1 << 0) 81177621Sscottl#define TQ_FLAGS_BLOCKED (1 << 1) 82256613Smav#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 83154333Sscottl 84221059Skib#define DT_CALLOUT_ARMED (1 << 0) 85306946Shselasky#define DT_DRAIN_IN_PROGRESS (1 << 1) 86221059Skib 87215021Sjmallett#define TQ_LOCK(tq) \ 88215021Sjmallett do { \ 89215021Sjmallett if ((tq)->tq_spin) \ 90215021Sjmallett mtx_lock_spin(&(tq)->tq_mutex); \ 91215021Sjmallett else \ 92215021Sjmallett mtx_lock(&(tq)->tq_mutex); \ 93215021Sjmallett } while (0) 94248649Swill#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 95154167Sscottl 96215021Sjmallett#define TQ_UNLOCK(tq) \ 97215021Sjmallett do { \ 98215021Sjmallett if ((tq)->tq_spin) \ 99215021Sjmallett mtx_unlock_spin(&(tq)->tq_mutex); \ 100215021Sjmallett else \ 101215021Sjmallett mtx_unlock(&(tq)->tq_mutex); \ 102215021Sjmallett } while (0) 103248649Swill#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 104154167Sscottl 105221059Skibvoid 106221059Skib_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 107221059Skib int priority, task_fn_t func, void *context) 108221059Skib{ 109221059Skib 110221059Skib TASK_INIT(&timeout_task->t, priority, func, context); 111256613Smav callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 112256613Smav CALLOUT_RETURNUNLOCKED); 113221059Skib timeout_task->q = queue; 114221059Skib timeout_task->f = 0; 115221059Skib} 116221059Skib 117154167Sscottlstatic __inline int 118354406SmavTQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 119154167Sscottl{ 120180588Skmacy if (tq->tq_spin) 121354406Smav return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 122354406Smav return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 123154167Sscottl} 124154167Sscottl 125154167Sscottlstatic struct taskqueue * 126300113Sscottl_taskqueue_create(const char *name, int mflags, 127145729Ssam taskqueue_enqueue_fn enqueue, void *context, 128300113Sscottl int mtxflags, const char *mtxname __unused) 12961033Sdfr{ 13061033Sdfr struct taskqueue *queue; 131300219Sscottl char *tq_name; 132180588Skmacy 133300219Sscottl tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 134301208Smjg if (tq_name == NULL) 135300219Sscottl return (NULL); 136300113Sscottl 13785521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 138301208Smjg if (queue == NULL) { 139301208Smjg free(tq_name, M_TASKQUEUE); 140300219Sscottl return (NULL); 141301208Smjg } 142180588Skmacy 143301208Smjg snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 144301208Smjg 14561033Sdfr STAILQ_INIT(&queue->tq_queue); 146354406Smav LIST_INIT(&queue->tq_active); 14761033Sdfr queue->tq_enqueue = enqueue; 14861033Sdfr queue->tq_context = context; 149300113Sscottl queue->tq_name = tq_name; 150180588Skmacy queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 151180588Skmacy queue->tq_flags |= TQ_FLAGS_ACTIVE; 152256613Smav if (enqueue == taskqueue_fast_enqueue || 153256613Smav enqueue == taskqueue_swi_enqueue || 154256613Smav enqueue == taskqueue_swi_giant_enqueue || 155256613Smav enqueue == taskqueue_thread_enqueue) 156256613Smav queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 157300113Sscottl mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 15861033Sdfr 159300219Sscottl return (queue); 16061033Sdfr} 16161033Sdfr 162154167Sscottlstruct taskqueue * 163154167Sscottltaskqueue_create(const char *name, int mflags, 164154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 165154167Sscottl{ 166300113Sscottl 167154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 168300113Sscottl MTX_DEF, name); 169154167Sscottl} 170154167Sscottl 171248649Swillvoid 172248649Swilltaskqueue_set_callback(struct taskqueue *queue, 173248649Swill enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 174248649Swill void *context) 175248649Swill{ 176248649Swill 177248649Swill KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 178248649Swill (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 179248649Swill ("Callback type %d not valid, must be %d-%d", cb_type, 180248649Swill TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 181248649Swill KASSERT((queue->tq_callbacks[cb_type] == NULL), 182248649Swill ("Re-initialization of taskqueue callback?")); 183248649Swill 184248649Swill queue->tq_callbacks[cb_type] = callback; 185248649Swill queue->tq_cb_contexts[cb_type] = context; 186248649Swill} 187248649Swill 188145729Ssam/* 189145729Ssam * Signal a taskqueue thread to terminate. 190145729Ssam */ 191145729Ssamstatic void 192178015Ssamtaskqueue_terminate(struct thread **pp, struct taskqueue *tq) 193145729Ssam{ 194145729Ssam 195221059Skib while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 196154333Sscottl wakeup(tq); 197354406Smav TQ_SLEEP(tq, pp, "tq_destroy"); 198145729Ssam } 199145729Ssam} 200145729Ssam 20161033Sdfrvoid 20261033Sdfrtaskqueue_free(struct taskqueue *queue) 20361033Sdfr{ 20485521Sjhb 205154167Sscottl TQ_LOCK(queue); 206154333Sscottl queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 207178015Ssam taskqueue_terminate(queue->tq_threads, queue); 208354406Smav KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 209221059Skib KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 21085521Sjhb mtx_destroy(&queue->tq_mutex); 211178015Ssam free(queue->tq_threads, M_TASKQUEUE); 212300113Sscottl free(queue->tq_name, M_TASKQUEUE); 21361033Sdfr free(queue, M_TASKQUEUE); 21461033Sdfr} 21561033Sdfr 216221059Skibstatic int 217221059Skibtaskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 21861033Sdfr{ 21961033Sdfr struct task *ins; 22061033Sdfr struct task *prev; 22161033Sdfr 222300113Sscottl KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 22361033Sdfr /* 22461033Sdfr * Count multiple enqueues. 22561033Sdfr */ 226180588Skmacy if (task->ta_pending) { 227300372Savg if (task->ta_pending < USHRT_MAX) 228225570Sadrian task->ta_pending++; 229256613Smav TQ_UNLOCK(queue); 230221059Skib return (0); 23161033Sdfr } 23261033Sdfr 23361033Sdfr /* 234354406Smav * Optimise cases when all tasks use small set of priorities. 235354406Smav * In case of only one priority we always insert at the end. 236354406Smav * In case of two tq_hint typically gives the insertion point. 237354406Smav * In case of more then two tq_hint should halve the search. 23861033Sdfr */ 23964199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 24061033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 24161033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 24261033Sdfr } else { 243354406Smav prev = queue->tq_hint; 244354406Smav if (prev && prev->ta_priority >= task->ta_priority) { 245354406Smav ins = STAILQ_NEXT(prev, ta_link); 246354406Smav } else { 247354406Smav prev = NULL; 248354406Smav ins = STAILQ_FIRST(&queue->tq_queue); 249354406Smav } 250354406Smav for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 25161033Sdfr if (ins->ta_priority < task->ta_priority) 25261033Sdfr break; 25361033Sdfr 254354406Smav if (prev) { 25561033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 256354406Smav queue->tq_hint = task; 257354406Smav } else 25861033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 25961033Sdfr } 26061033Sdfr 26161033Sdfr task->ta_pending = 1; 262256613Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 263256613Smav TQ_UNLOCK(queue); 264180588Skmacy if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 265177621Sscottl queue->tq_enqueue(queue->tq_context); 266256613Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 267256613Smav TQ_UNLOCK(queue); 26885560Sjhb 269256862Smav /* Return with lock released. */ 270221059Skib return (0); 271221059Skib} 272276665Sgibbs 273221059Skibint 274221059Skibtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 275221059Skib{ 276221059Skib int res; 277221059Skib 278221059Skib TQ_LOCK(queue); 279221059Skib res = taskqueue_enqueue_locked(queue, task); 280256862Smav /* The lock is released inside. */ 28185560Sjhb 282221059Skib return (res); 28361033Sdfr} 28461033Sdfr 285221059Skibstatic void 286221059Skibtaskqueue_timeout_func(void *arg) 287221059Skib{ 288221059Skib struct taskqueue *queue; 289221059Skib struct timeout_task *timeout_task; 290221059Skib 291221059Skib timeout_task = arg; 292221059Skib queue = timeout_task->q; 293221059Skib KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 294221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 295221059Skib queue->tq_callouts--; 296221059Skib taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 297256862Smav /* The lock is released inside. */ 298221059Skib} 299221059Skib 300221059Skibint 301323447Siantaskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 302323447Sian struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 303221059Skib{ 304221059Skib int res; 305221059Skib 306221059Skib TQ_LOCK(queue); 307221059Skib KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 308221059Skib ("Migrated queue")); 309221059Skib KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 310221059Skib timeout_task->q = queue; 311221059Skib res = timeout_task->t.ta_pending; 312306946Shselasky if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 313306946Shselasky /* Do nothing */ 314306946Shselasky TQ_UNLOCK(queue); 315306946Shselasky res = -1; 316323447Sian } else if (sbt == 0) { 317221059Skib taskqueue_enqueue_locked(queue, &timeout_task->t); 318256862Smav /* The lock is released inside. */ 319221059Skib } else { 320221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 321221059Skib res++; 322221059Skib } else { 323221059Skib queue->tq_callouts++; 324221059Skib timeout_task->f |= DT_CALLOUT_ARMED; 325323447Sian if (sbt < 0) 326323447Sian sbt = -sbt; /* Ignore overflow. */ 327221059Skib } 328323447Sian if (sbt > 0) { 329323447Sian callout_reset_sbt(&timeout_task->c, sbt, pr, 330323447Sian taskqueue_timeout_func, timeout_task, flags); 331243341Skib } 332256613Smav TQ_UNLOCK(queue); 333221059Skib } 334221059Skib return (res); 335221059Skib} 336221059Skib 337323447Sianint 338323447Siantaskqueue_enqueue_timeout(struct taskqueue *queue, 339323447Sian struct timeout_task *ttask, int ticks) 340323447Sian{ 341323447Sian 342323447Sian return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 343323447Sian 0, 0)); 344323447Sian} 345323447Sian 346258713Savgstatic void 347276665Sgibbstaskqueue_task_nop_fn(void *context, int pending) 348258713Savg{ 349276665Sgibbs} 350258713Savg 351276665Sgibbs/* 352276665Sgibbs * Block until all currently queued tasks in this taskqueue 353276665Sgibbs * have begun execution. Tasks queued during execution of 354276665Sgibbs * this function are ignored. 355276665Sgibbs */ 356341154Smarkjstatic int 357276665Sgibbstaskqueue_drain_tq_queue(struct taskqueue *queue) 358276665Sgibbs{ 359276665Sgibbs struct task t_barrier; 360276665Sgibbs 361276665Sgibbs if (STAILQ_EMPTY(&queue->tq_queue)) 362341154Smarkj return (0); 363276665Sgibbs 364276665Sgibbs /* 365283551Sdelphij * Enqueue our barrier after all current tasks, but with 366283551Sdelphij * the highest priority so that newly queued tasks cannot 367283551Sdelphij * pass it. Because of the high priority, we can not use 368283551Sdelphij * taskqueue_enqueue_locked directly (which drops the lock 369283551Sdelphij * anyway) so just insert it at tail while we have the 370283551Sdelphij * queue lock. 371276665Sgibbs */ 372283551Sdelphij TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 373283551Sdelphij STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 374354406Smav queue->tq_hint = &t_barrier; 375283551Sdelphij t_barrier.ta_pending = 1; 376276665Sgibbs 377276665Sgibbs /* 378276665Sgibbs * Once the barrier has executed, all previously queued tasks 379276665Sgibbs * have completed or are currently executing. 380276665Sgibbs */ 381276665Sgibbs while (t_barrier.ta_pending != 0) 382354406Smav TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 383341154Smarkj return (1); 384258713Savg} 385258713Savg 386276665Sgibbs/* 387276665Sgibbs * Block until all currently executing tasks for this taskqueue 388276665Sgibbs * complete. Tasks that begin execution during the execution 389276665Sgibbs * of this function are ignored. 390276665Sgibbs */ 391341154Smarkjstatic int 392276665Sgibbstaskqueue_drain_tq_active(struct taskqueue *queue) 393276665Sgibbs{ 394354406Smav struct taskqueue_busy *tb; 395354406Smav u_int seq; 396276665Sgibbs 397354406Smav if (LIST_EMPTY(&queue->tq_active)) 398341154Smarkj return (0); 399276665Sgibbs 400276665Sgibbs /* Block taskq_terminate().*/ 401276665Sgibbs queue->tq_callouts++; 402276665Sgibbs 403354406Smav /* Wait for any active task with sequence from the past. */ 404354406Smav seq = queue->tq_seq; 405354406Smavrestart: 406354406Smav LIST_FOREACH(tb, &queue->tq_active, tb_link) { 407354406Smav if ((int)(tb->tb_seq - seq) <= 0) { 408354406Smav TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 409354406Smav goto restart; 410354406Smav } 411354406Smav } 412276665Sgibbs 413276665Sgibbs /* Release taskqueue_terminate(). */ 414276665Sgibbs queue->tq_callouts--; 415276665Sgibbs if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 416276665Sgibbs wakeup_one(queue->tq_threads); 417341154Smarkj return (1); 418276665Sgibbs} 419276665Sgibbs 42061033Sdfrvoid 421177621Sscottltaskqueue_block(struct taskqueue *queue) 422177621Sscottl{ 423177621Sscottl 424177621Sscottl TQ_LOCK(queue); 425177621Sscottl queue->tq_flags |= TQ_FLAGS_BLOCKED; 426177621Sscottl TQ_UNLOCK(queue); 427177621Sscottl} 428177621Sscottl 429177621Sscottlvoid 430177621Sscottltaskqueue_unblock(struct taskqueue *queue) 431177621Sscottl{ 432177621Sscottl 433177621Sscottl TQ_LOCK(queue); 434177621Sscottl queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 435256612Smav if (!STAILQ_EMPTY(&queue->tq_queue)) 436177621Sscottl queue->tq_enqueue(queue->tq_context); 437177621Sscottl TQ_UNLOCK(queue); 438177621Sscottl} 439177621Sscottl 440213813Smdfstatic void 441213813Smdftaskqueue_run_locked(struct taskqueue *queue) 44261033Sdfr{ 443213813Smdf struct taskqueue_busy tb; 444210380Smdf struct task *task; 445210377Smdf int pending; 44661033Sdfr 447300113Sscottl KASSERT(queue != NULL, ("tq is NULL")); 448248649Swill TQ_ASSERT_LOCKED(queue); 449213813Smdf tb.tb_running = NULL; 450354406Smav LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 451213813Smdf 452354406Smav while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 45361033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 454354406Smav if (queue->tq_hint == task) 455354406Smav queue->tq_hint = NULL; 45661033Sdfr pending = task->ta_pending; 45761033Sdfr task->ta_pending = 0; 458213813Smdf tb.tb_running = task; 459354406Smav tb.tb_seq = ++queue->tq_seq; 460154167Sscottl TQ_UNLOCK(queue); 46161033Sdfr 462300113Sscottl KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 46385560Sjhb task->ta_func(task->ta_context, pending); 46461033Sdfr 465154167Sscottl TQ_LOCK(queue); 466300372Savg wakeup(task); 46761033Sdfr } 468354406Smav LIST_REMOVE(&tb, tb_link); 46961033Sdfr} 47061033Sdfr 471136131Simpvoid 472213813Smdftaskqueue_run(struct taskqueue *queue) 473213813Smdf{ 474213813Smdf 475213813Smdf TQ_LOCK(queue); 476213813Smdf taskqueue_run_locked(queue); 477213813Smdf TQ_UNLOCK(queue); 478213813Smdf} 479213813Smdf 480213813Smdfstatic int 481213813Smdftask_is_running(struct taskqueue *queue, struct task *task) 482213813Smdf{ 483213813Smdf struct taskqueue_busy *tb; 484213813Smdf 485248649Swill TQ_ASSERT_LOCKED(queue); 486354406Smav LIST_FOREACH(tb, &queue->tq_active, tb_link) { 487213813Smdf if (tb->tb_running == task) 488213813Smdf return (1); 489213813Smdf } 490213813Smdf return (0); 491213813Smdf} 492213813Smdf 493315267Shselasky/* 494315267Shselasky * Only use this function in single threaded contexts. It returns 495315267Shselasky * non-zero if the given task is either pending or running. Else the 496315267Shselasky * task is idle and can be queued again or freed. 497315267Shselasky */ 498315267Shselaskyint 499315267Shselaskytaskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 500315267Shselasky{ 501315267Shselasky int retval; 502315267Shselasky 503315267Shselasky TQ_LOCK(queue); 504315267Shselasky retval = task->ta_pending > 0 || task_is_running(queue, task); 505315267Shselasky TQ_UNLOCK(queue); 506315267Shselasky 507315267Shselasky return (retval); 508315267Shselasky} 509315267Shselasky 510221059Skibstatic int 511221059Skibtaskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 512221059Skib u_int *pendp) 513221059Skib{ 514221059Skib 515354406Smav if (task->ta_pending > 0) { 516221059Skib STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 517354406Smav if (queue->tq_hint == task) 518354406Smav queue->tq_hint = NULL; 519354406Smav } 520221059Skib if (pendp != NULL) 521221059Skib *pendp = task->ta_pending; 522221059Skib task->ta_pending = 0; 523221059Skib return (task_is_running(queue, task) ? EBUSY : 0); 524221059Skib} 525221059Skib 526215011Smdfint 527215011Smdftaskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 528215011Smdf{ 529215011Smdf int error; 530215011Smdf 531215011Smdf TQ_LOCK(queue); 532221059Skib error = taskqueue_cancel_locked(queue, task, pendp); 533215011Smdf TQ_UNLOCK(queue); 534215011Smdf 535221059Skib return (error); 536221059Skib} 537221059Skib 538221059Skibint 539221059Skibtaskqueue_cancel_timeout(struct taskqueue *queue, 540221059Skib struct timeout_task *timeout_task, u_int *pendp) 541221059Skib{ 542221059Skib u_int pending, pending1; 543221059Skib int error; 544221059Skib 545221059Skib TQ_LOCK(queue); 546290805Srrs pending = !!(callout_stop(&timeout_task->c) > 0); 547221059Skib error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 548221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 549221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 550221059Skib queue->tq_callouts--; 551221059Skib } 552221059Skib TQ_UNLOCK(queue); 553221059Skib 554215011Smdf if (pendp != NULL) 555221059Skib *pendp = pending + pending1; 556215011Smdf return (error); 557215011Smdf} 558215011Smdf 559213813Smdfvoid 560136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task) 561136131Simp{ 562211284Spjd 563211284Spjd if (!queue->tq_spin) 564154167Sscottl WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 565145729Ssam 566211284Spjd TQ_LOCK(queue); 567213813Smdf while (task->ta_pending != 0 || task_is_running(queue, task)) 568354406Smav TQ_SLEEP(queue, task, "tq_drain"); 569211284Spjd TQ_UNLOCK(queue); 570136131Simp} 571136131Simp 572221059Skibvoid 573258713Savgtaskqueue_drain_all(struct taskqueue *queue) 574258713Savg{ 575258713Savg 576258713Savg if (!queue->tq_spin) 577258713Savg WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 578258713Savg 579258713Savg TQ_LOCK(queue); 580341154Smarkj (void)taskqueue_drain_tq_queue(queue); 581341154Smarkj (void)taskqueue_drain_tq_active(queue); 582258713Savg TQ_UNLOCK(queue); 583258713Savg} 584258713Savg 585258713Savgvoid 586221059Skibtaskqueue_drain_timeout(struct taskqueue *queue, 587221059Skib struct timeout_task *timeout_task) 588221059Skib{ 589221059Skib 590306946Shselasky /* 591306946Shselasky * Set flag to prevent timer from re-starting during drain: 592306946Shselasky */ 593306946Shselasky TQ_LOCK(queue); 594306946Shselasky KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 595306946Shselasky ("Drain already in progress")); 596306946Shselasky timeout_task->f |= DT_DRAIN_IN_PROGRESS; 597306946Shselasky TQ_UNLOCK(queue); 598306946Shselasky 599221059Skib callout_drain(&timeout_task->c); 600221059Skib taskqueue_drain(queue, &timeout_task->t); 601306946Shselasky 602306946Shselasky /* 603306946Shselasky * Clear flag to allow timer to re-start: 604306946Shselasky */ 605306946Shselasky TQ_LOCK(queue); 606306946Shselasky timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 607306946Shselasky TQ_UNLOCK(queue); 608221059Skib} 609221059Skib 610341154Smarkjvoid 611341154Smarkjtaskqueue_quiesce(struct taskqueue *queue) 612341154Smarkj{ 613341154Smarkj int ret; 614341154Smarkj 615341154Smarkj TQ_LOCK(queue); 616341154Smarkj do { 617341154Smarkj ret = taskqueue_drain_tq_queue(queue); 618341154Smarkj if (ret == 0) 619341154Smarkj ret = taskqueue_drain_tq_active(queue); 620341154Smarkj } while (ret != 0); 621341154Smarkj TQ_UNLOCK(queue); 622341154Smarkj} 623341154Smarkj 62461033Sdfrstatic void 62561033Sdfrtaskqueue_swi_enqueue(void *context) 62661033Sdfr{ 62788900Sjhb swi_sched(taskqueue_ih, 0); 62861033Sdfr} 62961033Sdfr 63061033Sdfrstatic void 63167551Sjhbtaskqueue_swi_run(void *dummy) 63261033Sdfr{ 633213813Smdf taskqueue_run(taskqueue_swi); 63461033Sdfr} 63561033Sdfr 636111528Sscottlstatic void 637111528Sscottltaskqueue_swi_giant_enqueue(void *context) 638111528Sscottl{ 639111528Sscottl swi_sched(taskqueue_giant_ih, 0); 640111528Sscottl} 641111528Sscottl 642111528Sscottlstatic void 643111528Sscottltaskqueue_swi_giant_run(void *dummy) 644111528Sscottl{ 645213813Smdf taskqueue_run(taskqueue_swi_giant); 646111528Sscottl} 647111528Sscottl 648266629Sadrianstatic int 649266629Sadrian_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 650278879Sadrian cpuset_t *mask, const char *name, va_list ap) 651154333Sscottl{ 652278879Sadrian char ktname[MAXCOMLEN + 1]; 653178015Ssam struct thread *td; 654154333Sscottl struct taskqueue *tq; 655178015Ssam int i, error; 656154333Sscottl 657154333Sscottl if (count <= 0) 658154333Sscottl return (EINVAL); 659178015Ssam 660278879Sadrian vsnprintf(ktname, sizeof(ktname), name, ap); 661154333Sscottl tq = *tqp; 662154333Sscottl 663178015Ssam tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 664157314Ssam M_NOWAIT | M_ZERO); 665178015Ssam if (tq->tq_threads == NULL) { 666157314Ssam printf("%s: no memory for %s threads\n", __func__, ktname); 667157314Ssam return (ENOMEM); 668157314Ssam } 669157314Ssam 670154333Sscottl for (i = 0; i < count; i++) { 671154333Sscottl if (count == 1) 672178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 673209062Savg &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 674154333Sscottl else 675178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 676178015Ssam &tq->tq_threads[i], RFSTOPPED, 0, 677178015Ssam "%s_%d", ktname, i); 678158904Ssam if (error) { 679157314Ssam /* should be ok to continue, taskqueue_free will dtrt */ 680178015Ssam printf("%s: kthread_add(%s): error %d", __func__, 681178015Ssam ktname, error); 682178015Ssam tq->tq_threads[i] = NULL; /* paranoid */ 683158904Ssam } else 684178015Ssam tq->tq_tcount++; 685154333Sscottl } 686328392Spkelsey if (tq->tq_tcount == 0) { 687328392Spkelsey free(tq->tq_threads, M_TASKQUEUE); 688328392Spkelsey tq->tq_threads = NULL; 689328392Spkelsey return (ENOMEM); 690328392Spkelsey } 691158904Ssam for (i = 0; i < count; i++) { 692178015Ssam if (tq->tq_threads[i] == NULL) 693158904Ssam continue; 694178015Ssam td = tq->tq_threads[i]; 695266629Sadrian if (mask) { 696269666Sae error = cpuset_setthread(td->td_tid, mask); 697266629Sadrian /* 698266629Sadrian * Failing to pin is rarely an actual fatal error; 699266629Sadrian * it'll just affect performance. 700266629Sadrian */ 701266629Sadrian if (error) 702266629Sadrian printf("%s: curthread=%llu: can't pin; " 703266629Sadrian "error=%d\n", 704266629Sadrian __func__, 705266629Sadrian (unsigned long long) td->td_tid, 706266629Sadrian error); 707266629Sadrian } 708170307Sjeff thread_lock(td); 709158904Ssam sched_prio(td, pri); 710166188Sjeff sched_add(td, SRQ_BORING); 711170307Sjeff thread_unlock(td); 712158904Ssam } 713154333Sscottl 714154333Sscottl return (0); 715154333Sscottl} 716154333Sscottl 717266629Sadrianint 718266629Sadriantaskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 719266629Sadrian const char *name, ...) 720266629Sadrian{ 721266629Sadrian va_list ap; 722278879Sadrian int error; 723266629Sadrian 724266629Sadrian va_start(ap, name); 725278879Sadrian error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap); 726266629Sadrian va_end(ap); 727278879Sadrian return (error); 728278879Sadrian} 729266629Sadrian 730278879Sadrianint 731278879Sadriantaskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 732278879Sadrian cpuset_t *mask, const char *name, ...) 733278879Sadrian{ 734278879Sadrian va_list ap; 735278879Sadrian int error; 736278879Sadrian 737278879Sadrian va_start(ap, name); 738278879Sadrian error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap); 739278879Sadrian va_end(ap); 740278879Sadrian return (error); 741266629Sadrian} 742266629Sadrian 743248649Swillstatic inline void 744248649Swilltaskqueue_run_callback(struct taskqueue *tq, 745248649Swill enum taskqueue_callback_type cb_type) 746248649Swill{ 747248649Swill taskqueue_callback_fn tq_callback; 748248649Swill 749248649Swill TQ_ASSERT_UNLOCKED(tq); 750248649Swill tq_callback = tq->tq_callbacks[cb_type]; 751248649Swill if (tq_callback != NULL) 752248649Swill tq_callback(tq->tq_cb_contexts[cb_type]); 753248649Swill} 754248649Swill 755133305Sjmgvoid 756133305Sjmgtaskqueue_thread_loop(void *arg) 757119708Sken{ 758133305Sjmg struct taskqueue **tqp, *tq; 759131246Sjhb 760133305Sjmg tqp = arg; 761133305Sjmg tq = *tqp; 762248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 763154167Sscottl TQ_LOCK(tq); 764188548Sthompsa while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 765300113Sscottl /* XXX ? */ 766213813Smdf taskqueue_run_locked(tq); 767196293Spjd /* 768196293Spjd * Because taskqueue_run() can drop tq_mutex, we need to 769196293Spjd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 770196293Spjd * meantime, which means we missed a wakeup. 771196293Spjd */ 772196293Spjd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 773196293Spjd break; 774354406Smav TQ_SLEEP(tq, tq, "-"); 775188592Sthompsa } 776213813Smdf taskqueue_run_locked(tq); 777248649Swill /* 778248649Swill * This thread is on its way out, so just drop the lock temporarily 779248649Swill * in order to call the shutdown callback. This allows the callback 780248649Swill * to look at the taskqueue, even just before it dies. 781248649Swill */ 782248649Swill TQ_UNLOCK(tq); 783248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 784248649Swill TQ_LOCK(tq); 785248649Swill 786145729Ssam /* rendezvous with thread that asked us to terminate */ 787178015Ssam tq->tq_tcount--; 788178015Ssam wakeup_one(tq->tq_threads); 789154167Sscottl TQ_UNLOCK(tq); 790178123Sjhb kthread_exit(); 791119708Sken} 792119708Sken 793133305Sjmgvoid 794119708Skentaskqueue_thread_enqueue(void *context) 795119708Sken{ 796133305Sjmg struct taskqueue **tqp, *tq; 797131246Sjhb 798133305Sjmg tqp = context; 799133305Sjmg tq = *tqp; 800354405Smav wakeup_any(tq); 801119708Sken} 802119708Sken 803188058SimpTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 804111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 805275345Sgibbs INTR_MPSAFE, &taskqueue_ih)); 806111528Sscottl 807188058SimpTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 808151656Sjhb swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 809275345Sgibbs NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 810119708Sken 811133305SjmgTASKQUEUE_DEFINE_THREAD(thread); 812119789Ssam 813154167Sscottlstruct taskqueue * 814154167Sscottltaskqueue_create_fast(const char *name, int mflags, 815154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 816119789Ssam{ 817154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 818154167Sscottl MTX_SPIN, "fast_taskqueue"); 819119789Ssam} 820119789Ssam 821119789Ssamstatic void *taskqueue_fast_ih; 822119789Ssam 823119789Ssamstatic void 824154167Sscottltaskqueue_fast_enqueue(void *context) 825119789Ssam{ 826119789Ssam swi_sched(taskqueue_fast_ih, 0); 827119789Ssam} 828119789Ssam 829119789Ssamstatic void 830119789Ssamtaskqueue_fast_run(void *dummy) 831119789Ssam{ 832213813Smdf taskqueue_run(taskqueue_fast); 833119789Ssam} 834119789Ssam 835188058SimpTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 836239779Sjhb swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 837154167Sscottl SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 838196295Spjd 839196295Spjdint 840196295Spjdtaskqueue_member(struct taskqueue *queue, struct thread *td) 841196295Spjd{ 842196295Spjd int i, j, ret = 0; 843196295Spjd 844196295Spjd for (i = 0, j = 0; ; i++) { 845196295Spjd if (queue->tq_threads[i] == NULL) 846196295Spjd continue; 847196295Spjd if (queue->tq_threads[i] == td) { 848196295Spjd ret = 1; 849196295Spjd break; 850196295Spjd } 851196295Spjd if (++j >= queue->tq_tcount) 852196295Spjd break; 853196295Spjd } 854196295Spjd return (ret); 855196295Spjd} 856