161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: stable/10/sys/kern/subr_taskqueue.c 315268 2017-03-14 16:00:33Z hselasky $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 3385560Sjhb#include <sys/interrupt.h> 3461033Sdfr#include <sys/kernel.h> 35123614Sjhb#include <sys/kthread.h> 36225570Sadrian#include <sys/limits.h> 3785521Sjhb#include <sys/lock.h> 3861033Sdfr#include <sys/malloc.h> 3985521Sjhb#include <sys/mutex.h> 40145729Ssam#include <sys/proc.h> 41154333Sscottl#include <sys/sched.h> 4285521Sjhb#include <sys/taskqueue.h> 43119708Sken#include <sys/unistd.h> 44154333Sscottl#include <machine/stdarg.h> 4561033Sdfr 4669774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 47123614Sjhbstatic void *taskqueue_giant_ih; 48123614Sjhbstatic void *taskqueue_ih; 49297066Smavstatic void taskqueue_fast_enqueue(void *); 50297066Smavstatic void taskqueue_swi_enqueue(void *); 51297066Smavstatic void taskqueue_swi_giant_enqueue(void *); 5267551Sjhb 53213813Smdfstruct taskqueue_busy { 54213813Smdf struct task *tb_running; 55213813Smdf TAILQ_ENTRY(taskqueue_busy) tb_link; 56213813Smdf}; 57213813Smdf 5861033Sdfrstruct taskqueue { 5961033Sdfr STAILQ_HEAD(, task) tq_queue; 6061033Sdfr taskqueue_enqueue_fn tq_enqueue; 6161033Sdfr void *tq_context; 62213813Smdf TAILQ_HEAD(, taskqueue_busy) tq_active; 6385521Sjhb struct mtx tq_mutex; 64178015Ssam struct thread **tq_threads; 65178015Ssam int tq_tcount; 66180588Skmacy int tq_spin; 67154333Sscottl int tq_flags; 68221059Skib int tq_callouts; 69248649Swill taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 70248649Swill void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7161033Sdfr}; 7261033Sdfr 73154333Sscottl#define TQ_FLAGS_ACTIVE (1 << 0) 74177621Sscottl#define TQ_FLAGS_BLOCKED (1 << 1) 75297066Smav#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 76154333Sscottl 77221059Skib#define DT_CALLOUT_ARMED (1 << 0) 78306947Shselasky#define DT_DRAIN_IN_PROGRESS (1 << 1) 79221059Skib 80215021Sjmallett#define TQ_LOCK(tq) \ 81215021Sjmallett do { \ 82215021Sjmallett if ((tq)->tq_spin) \ 83215021Sjmallett mtx_lock_spin(&(tq)->tq_mutex); \ 84215021Sjmallett else \ 85215021Sjmallett mtx_lock(&(tq)->tq_mutex); \ 86215021Sjmallett } while (0) 87248649Swill#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 88154167Sscottl 89215021Sjmallett#define TQ_UNLOCK(tq) \ 90215021Sjmallett do { \ 91215021Sjmallett if ((tq)->tq_spin) \ 92215021Sjmallett mtx_unlock_spin(&(tq)->tq_mutex); \ 93215021Sjmallett else \ 94215021Sjmallett mtx_unlock(&(tq)->tq_mutex); \ 95215021Sjmallett } while (0) 96248649Swill#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 97154167Sscottl 98221059Skibvoid 99221059Skib_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 100221059Skib int priority, task_fn_t func, void *context) 101221059Skib{ 102221059Skib 103221059Skib TASK_INIT(&timeout_task->t, priority, func, context); 104297066Smav callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 105297066Smav CALLOUT_RETURNUNLOCKED); 106221059Skib timeout_task->q = queue; 107221059Skib timeout_task->f = 0; 108221059Skib} 109221059Skib 110154167Sscottlstatic __inline int 111154167SscottlTQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 112154167Sscottl int t) 113154167Sscottl{ 114180588Skmacy if (tq->tq_spin) 115154167Sscottl return (msleep_spin(p, m, wm, t)); 116154167Sscottl return (msleep(p, m, pri, wm, t)); 117154167Sscottl} 118154167Sscottl 119154167Sscottlstatic struct taskqueue * 120215750Savg_taskqueue_create(const char *name __unused, int mflags, 121145729Ssam taskqueue_enqueue_fn enqueue, void *context, 122154167Sscottl int mtxflags, const char *mtxname) 12361033Sdfr{ 12461033Sdfr struct taskqueue *queue; 125180588Skmacy 12685521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 12761033Sdfr if (!queue) 128188058Simp return NULL; 129180588Skmacy 13061033Sdfr STAILQ_INIT(&queue->tq_queue); 131213813Smdf TAILQ_INIT(&queue->tq_active); 13261033Sdfr queue->tq_enqueue = enqueue; 13361033Sdfr queue->tq_context = context; 134180588Skmacy queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 135180588Skmacy queue->tq_flags |= TQ_FLAGS_ACTIVE; 136297066Smav if (enqueue == taskqueue_fast_enqueue || 137297066Smav enqueue == taskqueue_swi_enqueue || 138297066Smav enqueue == taskqueue_swi_giant_enqueue || 139297066Smav enqueue == taskqueue_thread_enqueue) 140297066Smav queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 141154167Sscottl mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); 14261033Sdfr 14361033Sdfr return queue; 14461033Sdfr} 14561033Sdfr 146154167Sscottlstruct taskqueue * 147154167Sscottltaskqueue_create(const char *name, int mflags, 148154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 149154167Sscottl{ 150154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 151154167Sscottl MTX_DEF, "taskqueue"); 152154167Sscottl} 153154167Sscottl 154248649Swillvoid 155248649Swilltaskqueue_set_callback(struct taskqueue *queue, 156248649Swill enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 157248649Swill void *context) 158248649Swill{ 159248649Swill 160248649Swill KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 161248649Swill (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 162248649Swill ("Callback type %d not valid, must be %d-%d", cb_type, 163248649Swill TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 164248649Swill KASSERT((queue->tq_callbacks[cb_type] == NULL), 165248649Swill ("Re-initialization of taskqueue callback?")); 166248649Swill 167248649Swill queue->tq_callbacks[cb_type] = callback; 168248649Swill queue->tq_cb_contexts[cb_type] = context; 169248649Swill} 170248649Swill 171145729Ssam/* 172145729Ssam * Signal a taskqueue thread to terminate. 173145729Ssam */ 174145729Ssamstatic void 175178015Ssamtaskqueue_terminate(struct thread **pp, struct taskqueue *tq) 176145729Ssam{ 177145729Ssam 178221059Skib while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 179154333Sscottl wakeup(tq); 180154333Sscottl TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 181145729Ssam } 182145729Ssam} 183145729Ssam 18461033Sdfrvoid 18561033Sdfrtaskqueue_free(struct taskqueue *queue) 18661033Sdfr{ 18785521Sjhb 188154167Sscottl TQ_LOCK(queue); 189154333Sscottl queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 190178015Ssam taskqueue_terminate(queue->tq_threads, queue); 191213813Smdf KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 192221059Skib KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 19385521Sjhb mtx_destroy(&queue->tq_mutex); 194178015Ssam free(queue->tq_threads, M_TASKQUEUE); 19561033Sdfr free(queue, M_TASKQUEUE); 19661033Sdfr} 19761033Sdfr 198221059Skibstatic int 199221059Skibtaskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 20061033Sdfr{ 20161033Sdfr struct task *ins; 20261033Sdfr struct task *prev; 20361033Sdfr 20461033Sdfr /* 20561033Sdfr * Count multiple enqueues. 20661033Sdfr */ 207180588Skmacy if (task->ta_pending) { 208225570Sadrian if (task->ta_pending < USHRT_MAX) 209225570Sadrian task->ta_pending++; 210297066Smav TQ_UNLOCK(queue); 211221059Skib return (0); 21261033Sdfr } 21361033Sdfr 21461033Sdfr /* 21561033Sdfr * Optimise the case when all tasks have the same priority. 21661033Sdfr */ 21764199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 21861033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 21961033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 22061033Sdfr } else { 221188058Simp prev = NULL; 22261033Sdfr for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 22361033Sdfr prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 22461033Sdfr if (ins->ta_priority < task->ta_priority) 22561033Sdfr break; 22661033Sdfr 22761033Sdfr if (prev) 22861033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 22961033Sdfr else 23061033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 23161033Sdfr } 23261033Sdfr 23361033Sdfr task->ta_pending = 1; 234297066Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 235297066Smav TQ_UNLOCK(queue); 236180588Skmacy if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 237177621Sscottl queue->tq_enqueue(queue->tq_context); 238297066Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 239297066Smav TQ_UNLOCK(queue); 24085560Sjhb 241297066Smav /* Return with lock released. */ 242221059Skib return (0); 243221059Skib} 244221059Skibint 245221059Skibtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 246221059Skib{ 247221059Skib int res; 248221059Skib 249221059Skib TQ_LOCK(queue); 250221059Skib res = taskqueue_enqueue_locked(queue, task); 251297066Smav /* The lock is released inside. */ 25285560Sjhb 253221059Skib return (res); 25461033Sdfr} 25561033Sdfr 256221059Skibstatic void 257221059Skibtaskqueue_timeout_func(void *arg) 258221059Skib{ 259221059Skib struct taskqueue *queue; 260221059Skib struct timeout_task *timeout_task; 261221059Skib 262221059Skib timeout_task = arg; 263221059Skib queue = timeout_task->q; 264221059Skib KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 265221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 266221059Skib queue->tq_callouts--; 267221059Skib taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 268297066Smav /* The lock is released inside. */ 269221059Skib} 270221059Skib 271221059Skibint 272221059Skibtaskqueue_enqueue_timeout(struct taskqueue *queue, 273221059Skib struct timeout_task *timeout_task, int ticks) 274221059Skib{ 275221059Skib int res; 276221059Skib 277221059Skib TQ_LOCK(queue); 278221059Skib KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 279221059Skib ("Migrated queue")); 280221059Skib KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 281221059Skib timeout_task->q = queue; 282221059Skib res = timeout_task->t.ta_pending; 283306947Shselasky if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 284306947Shselasky /* Do nothing */ 285306947Shselasky TQ_UNLOCK(queue); 286306947Shselasky res = -1; 287306947Shselasky } else if (ticks == 0) { 288221059Skib taskqueue_enqueue_locked(queue, &timeout_task->t); 289297066Smav /* The lock is released inside. */ 290221059Skib } else { 291221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 292221059Skib res++; 293221059Skib } else { 294221059Skib queue->tq_callouts++; 295221059Skib timeout_task->f |= DT_CALLOUT_ARMED; 296243341Skib if (ticks < 0) 297243341Skib ticks = -ticks; /* Ignore overflow. */ 298221059Skib } 299243341Skib if (ticks > 0) { 300243341Skib callout_reset(&timeout_task->c, ticks, 301243341Skib taskqueue_timeout_func, timeout_task); 302243341Skib } 303297066Smav TQ_UNLOCK(queue); 304221059Skib } 305221059Skib return (res); 306221059Skib} 307221059Skib 308262065Savgstatic void 309262065Savgtaskqueue_drain_running(struct taskqueue *queue) 310262065Savg{ 311262065Savg 312262065Savg while (!TAILQ_EMPTY(&queue->tq_active)) 313262065Savg TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, 314262065Savg PWAIT, "-", 0); 315262065Savg} 316262065Savg 31761033Sdfrvoid 318177621Sscottltaskqueue_block(struct taskqueue *queue) 319177621Sscottl{ 320177621Sscottl 321177621Sscottl TQ_LOCK(queue); 322177621Sscottl queue->tq_flags |= TQ_FLAGS_BLOCKED; 323177621Sscottl TQ_UNLOCK(queue); 324177621Sscottl} 325177621Sscottl 326177621Sscottlvoid 327177621Sscottltaskqueue_unblock(struct taskqueue *queue) 328177621Sscottl{ 329177621Sscottl 330177621Sscottl TQ_LOCK(queue); 331177621Sscottl queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 332297064Smav if (!STAILQ_EMPTY(&queue->tq_queue)) 333177621Sscottl queue->tq_enqueue(queue->tq_context); 334177621Sscottl TQ_UNLOCK(queue); 335177621Sscottl} 336177621Sscottl 337213813Smdfstatic void 338213813Smdftaskqueue_run_locked(struct taskqueue *queue) 33961033Sdfr{ 340213813Smdf struct taskqueue_busy tb; 341210380Smdf struct task *task; 342210377Smdf int pending; 34361033Sdfr 344248649Swill TQ_ASSERT_LOCKED(queue); 345213813Smdf tb.tb_running = NULL; 346213813Smdf TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 347213813Smdf 34861033Sdfr while (STAILQ_FIRST(&queue->tq_queue)) { 34961033Sdfr /* 35061033Sdfr * Carefully remove the first task from the queue and 35161033Sdfr * zero its pending count. 35261033Sdfr */ 35361033Sdfr task = STAILQ_FIRST(&queue->tq_queue); 35461033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 35561033Sdfr pending = task->ta_pending; 35661033Sdfr task->ta_pending = 0; 357213813Smdf tb.tb_running = task; 358154167Sscottl TQ_UNLOCK(queue); 35961033Sdfr 36085560Sjhb task->ta_func(task->ta_context, pending); 36161033Sdfr 362154167Sscottl TQ_LOCK(queue); 363213813Smdf tb.tb_running = NULL; 364208715Szml wakeup(task); 36561033Sdfr } 366213813Smdf TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 367262065Savg if (TAILQ_EMPTY(&queue->tq_active)) 368262065Savg wakeup(&queue->tq_active); 36961033Sdfr} 37061033Sdfr 371136131Simpvoid 372213813Smdftaskqueue_run(struct taskqueue *queue) 373213813Smdf{ 374213813Smdf 375213813Smdf TQ_LOCK(queue); 376213813Smdf taskqueue_run_locked(queue); 377213813Smdf TQ_UNLOCK(queue); 378213813Smdf} 379213813Smdf 380213813Smdfstatic int 381213813Smdftask_is_running(struct taskqueue *queue, struct task *task) 382213813Smdf{ 383213813Smdf struct taskqueue_busy *tb; 384213813Smdf 385248649Swill TQ_ASSERT_LOCKED(queue); 386213813Smdf TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 387213813Smdf if (tb->tb_running == task) 388213813Smdf return (1); 389213813Smdf } 390213813Smdf return (0); 391213813Smdf} 392213813Smdf 393315268Shselasky/* 394315268Shselasky * Only use this function in single threaded contexts. It returns 395315268Shselasky * non-zero if the given task is either pending or running. Else the 396315268Shselasky * task is idle and can be queued again or freed. 397315268Shselasky */ 398315268Shselaskyint 399315268Shselaskytaskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 400315268Shselasky{ 401315268Shselasky int retval; 402315268Shselasky 403315268Shselasky TQ_LOCK(queue); 404315268Shselasky retval = task->ta_pending > 0 || task_is_running(queue, task); 405315268Shselasky TQ_UNLOCK(queue); 406315268Shselasky 407315268Shselasky return (retval); 408315268Shselasky} 409315268Shselasky 410221059Skibstatic int 411221059Skibtaskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 412221059Skib u_int *pendp) 413221059Skib{ 414221059Skib 415221059Skib if (task->ta_pending > 0) 416221059Skib STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 417221059Skib if (pendp != NULL) 418221059Skib *pendp = task->ta_pending; 419221059Skib task->ta_pending = 0; 420221059Skib return (task_is_running(queue, task) ? EBUSY : 0); 421221059Skib} 422221059Skib 423215011Smdfint 424215011Smdftaskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 425215011Smdf{ 426215011Smdf int error; 427215011Smdf 428215011Smdf TQ_LOCK(queue); 429221059Skib error = taskqueue_cancel_locked(queue, task, pendp); 430215011Smdf TQ_UNLOCK(queue); 431215011Smdf 432221059Skib return (error); 433221059Skib} 434221059Skib 435221059Skibint 436221059Skibtaskqueue_cancel_timeout(struct taskqueue *queue, 437221059Skib struct timeout_task *timeout_task, u_int *pendp) 438221059Skib{ 439221059Skib u_int pending, pending1; 440221059Skib int error; 441221059Skib 442221059Skib TQ_LOCK(queue); 443221059Skib pending = !!callout_stop(&timeout_task->c); 444221059Skib error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 445221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 446221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 447221059Skib queue->tq_callouts--; 448221059Skib } 449221059Skib TQ_UNLOCK(queue); 450221059Skib 451215011Smdf if (pendp != NULL) 452221059Skib *pendp = pending + pending1; 453215011Smdf return (error); 454215011Smdf} 455215011Smdf 456213813Smdfvoid 457136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task) 458136131Simp{ 459211284Spjd 460211284Spjd if (!queue->tq_spin) 461154167Sscottl WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 462145729Ssam 463211284Spjd TQ_LOCK(queue); 464213813Smdf while (task->ta_pending != 0 || task_is_running(queue, task)) 465211284Spjd TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 466211284Spjd TQ_UNLOCK(queue); 467136131Simp} 468136131Simp 469221059Skibvoid 470262065Savgtaskqueue_drain_all(struct taskqueue *queue) 471262065Savg{ 472262065Savg struct task *task; 473262065Savg 474262065Savg if (!queue->tq_spin) 475262065Savg WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 476262065Savg 477262065Savg TQ_LOCK(queue); 478262065Savg task = STAILQ_LAST(&queue->tq_queue, task, ta_link); 479306935Sjulian while (task != NULL && task->ta_pending != 0) { 480306935Sjulian struct task *oldtask; 481306935Sjulian TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 482306935Sjulian /* 483306935Sjulian * While we were asleeep the last entry may have been freed. 484306935Sjulian * We need to check if it's still even in the queue. 485306935Sjulian * Not perfect, but it's better than referencing bad memory. 486306935Sjulian * first guess is the current 'end of queue' but if a new 487306935Sjulian * item has been added we need to take the expensive path 488306935Sjulian * Better fix in 11. 489306935Sjulian */ 490306935Sjulian oldtask = task; 491306935Sjulian if (oldtask != 492306935Sjulian (task = STAILQ_LAST(&queue->tq_queue, task, ta_link))) { 493306935Sjulian STAILQ_FOREACH(task, &queue->tq_queue, ta_link) { 494306935Sjulian if (task == oldtask) 495306935Sjulian break; 496306935Sjulian } 497306935Sjulian } 498306935Sjulian } 499262065Savg taskqueue_drain_running(queue); 500262065Savg KASSERT(STAILQ_EMPTY(&queue->tq_queue), 501262065Savg ("taskqueue queue is not empty after draining")); 502262065Savg TQ_UNLOCK(queue); 503262065Savg} 504262065Savg 505262065Savgvoid 506221059Skibtaskqueue_drain_timeout(struct taskqueue *queue, 507221059Skib struct timeout_task *timeout_task) 508221059Skib{ 509221059Skib 510306947Shselasky /* 511306947Shselasky * Set flag to prevent timer from re-starting during drain: 512306947Shselasky */ 513306947Shselasky TQ_LOCK(queue); 514306947Shselasky KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 515306947Shselasky ("Drain already in progress")); 516306947Shselasky timeout_task->f |= DT_DRAIN_IN_PROGRESS; 517306947Shselasky TQ_UNLOCK(queue); 518306947Shselasky 519221059Skib callout_drain(&timeout_task->c); 520221059Skib taskqueue_drain(queue, &timeout_task->t); 521306947Shselasky 522306947Shselasky /* 523306947Shselasky * Clear flag to allow timer to re-start: 524306947Shselasky */ 525306947Shselasky TQ_LOCK(queue); 526306947Shselasky timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 527306947Shselasky TQ_UNLOCK(queue); 528221059Skib} 529221059Skib 53061033Sdfrstatic void 53161033Sdfrtaskqueue_swi_enqueue(void *context) 53261033Sdfr{ 53388900Sjhb swi_sched(taskqueue_ih, 0); 53461033Sdfr} 53561033Sdfr 53661033Sdfrstatic void 53767551Sjhbtaskqueue_swi_run(void *dummy) 53861033Sdfr{ 539213813Smdf taskqueue_run(taskqueue_swi); 54061033Sdfr} 54161033Sdfr 542111528Sscottlstatic void 543111528Sscottltaskqueue_swi_giant_enqueue(void *context) 544111528Sscottl{ 545111528Sscottl swi_sched(taskqueue_giant_ih, 0); 546111528Sscottl} 547111528Sscottl 548111528Sscottlstatic void 549111528Sscottltaskqueue_swi_giant_run(void *dummy) 550111528Sscottl{ 551213813Smdf taskqueue_run(taskqueue_swi_giant); 552111528Sscottl} 553111528Sscottl 554154333Sscottlint 555154333Sscottltaskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 556154333Sscottl const char *name, ...) 557154333Sscottl{ 558154333Sscottl va_list ap; 559178015Ssam struct thread *td; 560154333Sscottl struct taskqueue *tq; 561178015Ssam int i, error; 562198411Sjhb char ktname[MAXCOMLEN + 1]; 563154333Sscottl 564154333Sscottl if (count <= 0) 565154333Sscottl return (EINVAL); 566178015Ssam 567154333Sscottl tq = *tqp; 568154333Sscottl 569154333Sscottl va_start(ap, name); 570198411Sjhb vsnprintf(ktname, sizeof(ktname), name, ap); 571154333Sscottl va_end(ap); 572154333Sscottl 573178015Ssam tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 574157314Ssam M_NOWAIT | M_ZERO); 575178015Ssam if (tq->tq_threads == NULL) { 576157314Ssam printf("%s: no memory for %s threads\n", __func__, ktname); 577157314Ssam return (ENOMEM); 578157314Ssam } 579157314Ssam 580154333Sscottl for (i = 0; i < count; i++) { 581154333Sscottl if (count == 1) 582178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 583209062Savg &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 584154333Sscottl else 585178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 586178015Ssam &tq->tq_threads[i], RFSTOPPED, 0, 587178015Ssam "%s_%d", ktname, i); 588158904Ssam if (error) { 589157314Ssam /* should be ok to continue, taskqueue_free will dtrt */ 590178015Ssam printf("%s: kthread_add(%s): error %d", __func__, 591178015Ssam ktname, error); 592178015Ssam tq->tq_threads[i] = NULL; /* paranoid */ 593158904Ssam } else 594178015Ssam tq->tq_tcount++; 595154333Sscottl } 596158904Ssam for (i = 0; i < count; i++) { 597178015Ssam if (tq->tq_threads[i] == NULL) 598158904Ssam continue; 599178015Ssam td = tq->tq_threads[i]; 600170307Sjeff thread_lock(td); 601158904Ssam sched_prio(td, pri); 602166188Sjeff sched_add(td, SRQ_BORING); 603170307Sjeff thread_unlock(td); 604158904Ssam } 605154333Sscottl 606154333Sscottl return (0); 607154333Sscottl} 608154333Sscottl 609248649Swillstatic inline void 610248649Swilltaskqueue_run_callback(struct taskqueue *tq, 611248649Swill enum taskqueue_callback_type cb_type) 612248649Swill{ 613248649Swill taskqueue_callback_fn tq_callback; 614248649Swill 615248649Swill TQ_ASSERT_UNLOCKED(tq); 616248649Swill tq_callback = tq->tq_callbacks[cb_type]; 617248649Swill if (tq_callback != NULL) 618248649Swill tq_callback(tq->tq_cb_contexts[cb_type]); 619248649Swill} 620248649Swill 621133305Sjmgvoid 622133305Sjmgtaskqueue_thread_loop(void *arg) 623119708Sken{ 624133305Sjmg struct taskqueue **tqp, *tq; 625131246Sjhb 626133305Sjmg tqp = arg; 627133305Sjmg tq = *tqp; 628248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 629154167Sscottl TQ_LOCK(tq); 630188548Sthompsa while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 631213813Smdf taskqueue_run_locked(tq); 632196293Spjd /* 633196293Spjd * Because taskqueue_run() can drop tq_mutex, we need to 634196293Spjd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 635196293Spjd * meantime, which means we missed a wakeup. 636196293Spjd */ 637196293Spjd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 638196293Spjd break; 639157815Sjhb TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 640188592Sthompsa } 641213813Smdf taskqueue_run_locked(tq); 642145729Ssam 643248649Swill /* 644248649Swill * This thread is on its way out, so just drop the lock temporarily 645248649Swill * in order to call the shutdown callback. This allows the callback 646248649Swill * to look at the taskqueue, even just before it dies. 647248649Swill */ 648248649Swill TQ_UNLOCK(tq); 649248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 650248649Swill TQ_LOCK(tq); 651248649Swill 652145729Ssam /* rendezvous with thread that asked us to terminate */ 653178015Ssam tq->tq_tcount--; 654178015Ssam wakeup_one(tq->tq_threads); 655154167Sscottl TQ_UNLOCK(tq); 656178123Sjhb kthread_exit(); 657119708Sken} 658119708Sken 659133305Sjmgvoid 660119708Skentaskqueue_thread_enqueue(void *context) 661119708Sken{ 662133305Sjmg struct taskqueue **tqp, *tq; 663131246Sjhb 664133305Sjmg tqp = context; 665133305Sjmg tq = *tqp; 666133305Sjmg 667145729Ssam wakeup_one(tq); 668119708Sken} 669119708Sken 670188058SimpTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 671111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 672111528Sscottl INTR_MPSAFE, &taskqueue_ih)); 673111528Sscottl 674188058SimpTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 675151656Sjhb swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 676111528Sscottl NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 677119708Sken 678133305SjmgTASKQUEUE_DEFINE_THREAD(thread); 679119789Ssam 680154167Sscottlstruct taskqueue * 681154167Sscottltaskqueue_create_fast(const char *name, int mflags, 682154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 683119789Ssam{ 684154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 685154167Sscottl MTX_SPIN, "fast_taskqueue"); 686119789Ssam} 687119789Ssam 688154167Sscottl/* NB: for backwards compatibility */ 689154167Sscottlint 690154167Sscottltaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 691119789Ssam{ 692154167Sscottl return taskqueue_enqueue(queue, task); 693119789Ssam} 694119789Ssam 695119789Ssamstatic void *taskqueue_fast_ih; 696119789Ssam 697119789Ssamstatic void 698154167Sscottltaskqueue_fast_enqueue(void *context) 699119789Ssam{ 700119789Ssam swi_sched(taskqueue_fast_ih, 0); 701119789Ssam} 702119789Ssam 703119789Ssamstatic void 704119789Ssamtaskqueue_fast_run(void *dummy) 705119789Ssam{ 706213813Smdf taskqueue_run(taskqueue_fast); 707119789Ssam} 708119789Ssam 709188058SimpTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 710239779Sjhb swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 711154167Sscottl SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 712196295Spjd 713196295Spjdint 714196295Spjdtaskqueue_member(struct taskqueue *queue, struct thread *td) 715196295Spjd{ 716196295Spjd int i, j, ret = 0; 717196295Spjd 718196295Spjd for (i = 0, j = 0; ; i++) { 719196295Spjd if (queue->tq_threads[i] == NULL) 720196295Spjd continue; 721196295Spjd if (queue->tq_threads[i] == td) { 722196295Spjd ret = 1; 723196295Spjd break; 724196295Spjd } 725196295Spjd if (++j >= queue->tq_tcount) 726196295Spjd break; 727196295Spjd } 728196295Spjd return (ret); 729196295Spjd} 730