subr_taskqueue.c revision 306935
161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: stable/10/sys/kern/subr_taskqueue.c 306935 2016-10-10 04:57:33Z julian $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 3385560Sjhb#include <sys/interrupt.h> 3461033Sdfr#include <sys/kernel.h> 35123614Sjhb#include <sys/kthread.h> 36225570Sadrian#include <sys/limits.h> 3785521Sjhb#include <sys/lock.h> 3861033Sdfr#include <sys/malloc.h> 3985521Sjhb#include <sys/mutex.h> 40145729Ssam#include <sys/proc.h> 41154333Sscottl#include <sys/sched.h> 4285521Sjhb#include <sys/taskqueue.h> 43119708Sken#include <sys/unistd.h> 44154333Sscottl#include <machine/stdarg.h> 4561033Sdfr 4669774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 47123614Sjhbstatic void *taskqueue_giant_ih; 48123614Sjhbstatic void *taskqueue_ih; 49297066Smavstatic void taskqueue_fast_enqueue(void *); 50297066Smavstatic void taskqueue_swi_enqueue(void *); 51297066Smavstatic void taskqueue_swi_giant_enqueue(void *); 5267551Sjhb 53213813Smdfstruct taskqueue_busy { 54213813Smdf struct task *tb_running; 55213813Smdf TAILQ_ENTRY(taskqueue_busy) tb_link; 56213813Smdf}; 57213813Smdf 5861033Sdfrstruct taskqueue { 5961033Sdfr STAILQ_HEAD(, task) tq_queue; 6061033Sdfr taskqueue_enqueue_fn tq_enqueue; 6161033Sdfr void *tq_context; 62213813Smdf TAILQ_HEAD(, taskqueue_busy) tq_active; 6385521Sjhb struct mtx tq_mutex; 64178015Ssam struct thread **tq_threads; 65178015Ssam int tq_tcount; 66180588Skmacy int tq_spin; 67154333Sscottl int tq_flags; 68221059Skib int tq_callouts; 69248649Swill taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 70248649Swill void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7161033Sdfr}; 7261033Sdfr 73154333Sscottl#define TQ_FLAGS_ACTIVE (1 << 0) 74177621Sscottl#define TQ_FLAGS_BLOCKED (1 << 1) 75297066Smav#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 76154333Sscottl 77221059Skib#define DT_CALLOUT_ARMED (1 << 0) 78221059Skib 79215021Sjmallett#define TQ_LOCK(tq) \ 80215021Sjmallett do { \ 81215021Sjmallett if ((tq)->tq_spin) \ 82215021Sjmallett mtx_lock_spin(&(tq)->tq_mutex); \ 83215021Sjmallett else \ 84215021Sjmallett mtx_lock(&(tq)->tq_mutex); \ 85215021Sjmallett } while (0) 86248649Swill#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 87154167Sscottl 88215021Sjmallett#define TQ_UNLOCK(tq) \ 89215021Sjmallett do { \ 90215021Sjmallett if ((tq)->tq_spin) \ 91215021Sjmallett mtx_unlock_spin(&(tq)->tq_mutex); \ 92215021Sjmallett else \ 93215021Sjmallett mtx_unlock(&(tq)->tq_mutex); \ 94215021Sjmallett } while (0) 95248649Swill#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 96154167Sscottl 97221059Skibvoid 98221059Skib_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 99221059Skib int priority, task_fn_t func, void *context) 100221059Skib{ 101221059Skib 102221059Skib TASK_INIT(&timeout_task->t, priority, func, context); 103297066Smav callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 104297066Smav CALLOUT_RETURNUNLOCKED); 105221059Skib timeout_task->q = queue; 106221059Skib timeout_task->f = 0; 107221059Skib} 108221059Skib 109154167Sscottlstatic __inline int 110154167SscottlTQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 111154167Sscottl int t) 112154167Sscottl{ 113180588Skmacy if (tq->tq_spin) 114154167Sscottl return (msleep_spin(p, m, wm, t)); 115154167Sscottl return (msleep(p, m, pri, wm, t)); 116154167Sscottl} 117154167Sscottl 118154167Sscottlstatic struct taskqueue * 119215750Savg_taskqueue_create(const char *name __unused, int mflags, 120145729Ssam taskqueue_enqueue_fn enqueue, void *context, 121154167Sscottl int mtxflags, const char *mtxname) 12261033Sdfr{ 12361033Sdfr struct taskqueue *queue; 124180588Skmacy 12585521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 12661033Sdfr if (!queue) 127188058Simp return NULL; 128180588Skmacy 12961033Sdfr STAILQ_INIT(&queue->tq_queue); 130213813Smdf TAILQ_INIT(&queue->tq_active); 13161033Sdfr queue->tq_enqueue = enqueue; 13261033Sdfr queue->tq_context = context; 133180588Skmacy queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 134180588Skmacy queue->tq_flags |= TQ_FLAGS_ACTIVE; 135297066Smav if (enqueue == taskqueue_fast_enqueue || 136297066Smav enqueue == taskqueue_swi_enqueue || 137297066Smav enqueue == taskqueue_swi_giant_enqueue || 138297066Smav enqueue == taskqueue_thread_enqueue) 139297066Smav queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 140154167Sscottl mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); 14161033Sdfr 14261033Sdfr return queue; 14361033Sdfr} 14461033Sdfr 145154167Sscottlstruct taskqueue * 146154167Sscottltaskqueue_create(const char *name, int mflags, 147154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 148154167Sscottl{ 149154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 150154167Sscottl MTX_DEF, "taskqueue"); 151154167Sscottl} 152154167Sscottl 153248649Swillvoid 154248649Swilltaskqueue_set_callback(struct taskqueue *queue, 155248649Swill enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 156248649Swill void *context) 157248649Swill{ 158248649Swill 159248649Swill KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 160248649Swill (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 161248649Swill ("Callback type %d not valid, must be %d-%d", cb_type, 162248649Swill TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 163248649Swill KASSERT((queue->tq_callbacks[cb_type] == NULL), 164248649Swill ("Re-initialization of taskqueue callback?")); 165248649Swill 166248649Swill queue->tq_callbacks[cb_type] = callback; 167248649Swill queue->tq_cb_contexts[cb_type] = context; 168248649Swill} 169248649Swill 170145729Ssam/* 171145729Ssam * Signal a taskqueue thread to terminate. 172145729Ssam */ 173145729Ssamstatic void 174178015Ssamtaskqueue_terminate(struct thread **pp, struct taskqueue *tq) 175145729Ssam{ 176145729Ssam 177221059Skib while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 178154333Sscottl wakeup(tq); 179154333Sscottl TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 180145729Ssam } 181145729Ssam} 182145729Ssam 18361033Sdfrvoid 18461033Sdfrtaskqueue_free(struct taskqueue *queue) 18561033Sdfr{ 18685521Sjhb 187154167Sscottl TQ_LOCK(queue); 188154333Sscottl queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 189178015Ssam taskqueue_terminate(queue->tq_threads, queue); 190213813Smdf KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 191221059Skib KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 19285521Sjhb mtx_destroy(&queue->tq_mutex); 193178015Ssam free(queue->tq_threads, M_TASKQUEUE); 19461033Sdfr free(queue, M_TASKQUEUE); 19561033Sdfr} 19661033Sdfr 197221059Skibstatic int 198221059Skibtaskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 19961033Sdfr{ 20061033Sdfr struct task *ins; 20161033Sdfr struct task *prev; 20261033Sdfr 20361033Sdfr /* 20461033Sdfr * Count multiple enqueues. 20561033Sdfr */ 206180588Skmacy if (task->ta_pending) { 207225570Sadrian if (task->ta_pending < USHRT_MAX) 208225570Sadrian task->ta_pending++; 209297066Smav TQ_UNLOCK(queue); 210221059Skib return (0); 21161033Sdfr } 21261033Sdfr 21361033Sdfr /* 21461033Sdfr * Optimise the case when all tasks have the same priority. 21561033Sdfr */ 21664199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 21761033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 21861033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 21961033Sdfr } else { 220188058Simp prev = NULL; 22161033Sdfr for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 22261033Sdfr prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 22361033Sdfr if (ins->ta_priority < task->ta_priority) 22461033Sdfr break; 22561033Sdfr 22661033Sdfr if (prev) 22761033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 22861033Sdfr else 22961033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 23061033Sdfr } 23161033Sdfr 23261033Sdfr task->ta_pending = 1; 233297066Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 234297066Smav TQ_UNLOCK(queue); 235180588Skmacy if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 236177621Sscottl queue->tq_enqueue(queue->tq_context); 237297066Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 238297066Smav TQ_UNLOCK(queue); 23985560Sjhb 240297066Smav /* Return with lock released. */ 241221059Skib return (0); 242221059Skib} 243221059Skibint 244221059Skibtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 245221059Skib{ 246221059Skib int res; 247221059Skib 248221059Skib TQ_LOCK(queue); 249221059Skib res = taskqueue_enqueue_locked(queue, task); 250297066Smav /* The lock is released inside. */ 25185560Sjhb 252221059Skib return (res); 25361033Sdfr} 25461033Sdfr 255221059Skibstatic void 256221059Skibtaskqueue_timeout_func(void *arg) 257221059Skib{ 258221059Skib struct taskqueue *queue; 259221059Skib struct timeout_task *timeout_task; 260221059Skib 261221059Skib timeout_task = arg; 262221059Skib queue = timeout_task->q; 263221059Skib KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 264221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 265221059Skib queue->tq_callouts--; 266221059Skib taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 267297066Smav /* The lock is released inside. */ 268221059Skib} 269221059Skib 270221059Skibint 271221059Skibtaskqueue_enqueue_timeout(struct taskqueue *queue, 272221059Skib struct timeout_task *timeout_task, int ticks) 273221059Skib{ 274221059Skib int res; 275221059Skib 276221059Skib TQ_LOCK(queue); 277221059Skib KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 278221059Skib ("Migrated queue")); 279221059Skib KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 280221059Skib timeout_task->q = queue; 281221059Skib res = timeout_task->t.ta_pending; 282221059Skib if (ticks == 0) { 283221059Skib taskqueue_enqueue_locked(queue, &timeout_task->t); 284297066Smav /* The lock is released inside. */ 285221059Skib } else { 286221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 287221059Skib res++; 288221059Skib } else { 289221059Skib queue->tq_callouts++; 290221059Skib timeout_task->f |= DT_CALLOUT_ARMED; 291243341Skib if (ticks < 0) 292243341Skib ticks = -ticks; /* Ignore overflow. */ 293221059Skib } 294243341Skib if (ticks > 0) { 295243341Skib callout_reset(&timeout_task->c, ticks, 296243341Skib taskqueue_timeout_func, timeout_task); 297243341Skib } 298297066Smav TQ_UNLOCK(queue); 299221059Skib } 300221059Skib return (res); 301221059Skib} 302221059Skib 303262065Savgstatic void 304262065Savgtaskqueue_drain_running(struct taskqueue *queue) 305262065Savg{ 306262065Savg 307262065Savg while (!TAILQ_EMPTY(&queue->tq_active)) 308262065Savg TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, 309262065Savg PWAIT, "-", 0); 310262065Savg} 311262065Savg 31261033Sdfrvoid 313177621Sscottltaskqueue_block(struct taskqueue *queue) 314177621Sscottl{ 315177621Sscottl 316177621Sscottl TQ_LOCK(queue); 317177621Sscottl queue->tq_flags |= TQ_FLAGS_BLOCKED; 318177621Sscottl TQ_UNLOCK(queue); 319177621Sscottl} 320177621Sscottl 321177621Sscottlvoid 322177621Sscottltaskqueue_unblock(struct taskqueue *queue) 323177621Sscottl{ 324177621Sscottl 325177621Sscottl TQ_LOCK(queue); 326177621Sscottl queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 327297064Smav if (!STAILQ_EMPTY(&queue->tq_queue)) 328177621Sscottl queue->tq_enqueue(queue->tq_context); 329177621Sscottl TQ_UNLOCK(queue); 330177621Sscottl} 331177621Sscottl 332213813Smdfstatic void 333213813Smdftaskqueue_run_locked(struct taskqueue *queue) 33461033Sdfr{ 335213813Smdf struct taskqueue_busy tb; 336210380Smdf struct task *task; 337210377Smdf int pending; 33861033Sdfr 339248649Swill TQ_ASSERT_LOCKED(queue); 340213813Smdf tb.tb_running = NULL; 341213813Smdf TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 342213813Smdf 34361033Sdfr while (STAILQ_FIRST(&queue->tq_queue)) { 34461033Sdfr /* 34561033Sdfr * Carefully remove the first task from the queue and 34661033Sdfr * zero its pending count. 34761033Sdfr */ 34861033Sdfr task = STAILQ_FIRST(&queue->tq_queue); 34961033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 35061033Sdfr pending = task->ta_pending; 35161033Sdfr task->ta_pending = 0; 352213813Smdf tb.tb_running = task; 353154167Sscottl TQ_UNLOCK(queue); 35461033Sdfr 35585560Sjhb task->ta_func(task->ta_context, pending); 35661033Sdfr 357154167Sscottl TQ_LOCK(queue); 358213813Smdf tb.tb_running = NULL; 359208715Szml wakeup(task); 36061033Sdfr } 361213813Smdf TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 362262065Savg if (TAILQ_EMPTY(&queue->tq_active)) 363262065Savg wakeup(&queue->tq_active); 36461033Sdfr} 36561033Sdfr 366136131Simpvoid 367213813Smdftaskqueue_run(struct taskqueue *queue) 368213813Smdf{ 369213813Smdf 370213813Smdf TQ_LOCK(queue); 371213813Smdf taskqueue_run_locked(queue); 372213813Smdf TQ_UNLOCK(queue); 373213813Smdf} 374213813Smdf 375213813Smdfstatic int 376213813Smdftask_is_running(struct taskqueue *queue, struct task *task) 377213813Smdf{ 378213813Smdf struct taskqueue_busy *tb; 379213813Smdf 380248649Swill TQ_ASSERT_LOCKED(queue); 381213813Smdf TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 382213813Smdf if (tb->tb_running == task) 383213813Smdf return (1); 384213813Smdf } 385213813Smdf return (0); 386213813Smdf} 387213813Smdf 388221059Skibstatic int 389221059Skibtaskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 390221059Skib u_int *pendp) 391221059Skib{ 392221059Skib 393221059Skib if (task->ta_pending > 0) 394221059Skib STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 395221059Skib if (pendp != NULL) 396221059Skib *pendp = task->ta_pending; 397221059Skib task->ta_pending = 0; 398221059Skib return (task_is_running(queue, task) ? EBUSY : 0); 399221059Skib} 400221059Skib 401215011Smdfint 402215011Smdftaskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 403215011Smdf{ 404215011Smdf int error; 405215011Smdf 406215011Smdf TQ_LOCK(queue); 407221059Skib error = taskqueue_cancel_locked(queue, task, pendp); 408215011Smdf TQ_UNLOCK(queue); 409215011Smdf 410221059Skib return (error); 411221059Skib} 412221059Skib 413221059Skibint 414221059Skibtaskqueue_cancel_timeout(struct taskqueue *queue, 415221059Skib struct timeout_task *timeout_task, u_int *pendp) 416221059Skib{ 417221059Skib u_int pending, pending1; 418221059Skib int error; 419221059Skib 420221059Skib TQ_LOCK(queue); 421221059Skib pending = !!callout_stop(&timeout_task->c); 422221059Skib error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 423221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 424221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 425221059Skib queue->tq_callouts--; 426221059Skib } 427221059Skib TQ_UNLOCK(queue); 428221059Skib 429215011Smdf if (pendp != NULL) 430221059Skib *pendp = pending + pending1; 431215011Smdf return (error); 432215011Smdf} 433215011Smdf 434213813Smdfvoid 435136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task) 436136131Simp{ 437211284Spjd 438211284Spjd if (!queue->tq_spin) 439154167Sscottl WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 440145729Ssam 441211284Spjd TQ_LOCK(queue); 442213813Smdf while (task->ta_pending != 0 || task_is_running(queue, task)) 443211284Spjd TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 444211284Spjd TQ_UNLOCK(queue); 445136131Simp} 446136131Simp 447221059Skibvoid 448262065Savgtaskqueue_drain_all(struct taskqueue *queue) 449262065Savg{ 450262065Savg struct task *task; 451262065Savg 452262065Savg if (!queue->tq_spin) 453262065Savg WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 454262065Savg 455262065Savg TQ_LOCK(queue); 456262065Savg task = STAILQ_LAST(&queue->tq_queue, task, ta_link); 457306935Sjulian while (task != NULL && task->ta_pending != 0) { 458306935Sjulian struct task *oldtask; 459306935Sjulian TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 460306935Sjulian /* 461306935Sjulian * While we were asleeep the last entry may have been freed. 462306935Sjulian * We need to check if it's still even in the queue. 463306935Sjulian * Not perfect, but it's better than referencing bad memory. 464306935Sjulian * first guess is the current 'end of queue' but if a new 465306935Sjulian * item has been added we need to take the expensive path 466306935Sjulian * Better fix in 11. 467306935Sjulian */ 468306935Sjulian oldtask = task; 469306935Sjulian if (oldtask != 470306935Sjulian (task = STAILQ_LAST(&queue->tq_queue, task, ta_link))) { 471306935Sjulian STAILQ_FOREACH(task, &queue->tq_queue, ta_link) { 472306935Sjulian if (task == oldtask) 473306935Sjulian break; 474306935Sjulian } 475306935Sjulian } 476306935Sjulian } 477262065Savg taskqueue_drain_running(queue); 478262065Savg KASSERT(STAILQ_EMPTY(&queue->tq_queue), 479262065Savg ("taskqueue queue is not empty after draining")); 480262065Savg TQ_UNLOCK(queue); 481262065Savg} 482262065Savg 483262065Savgvoid 484221059Skibtaskqueue_drain_timeout(struct taskqueue *queue, 485221059Skib struct timeout_task *timeout_task) 486221059Skib{ 487221059Skib 488221059Skib callout_drain(&timeout_task->c); 489221059Skib taskqueue_drain(queue, &timeout_task->t); 490221059Skib} 491221059Skib 49261033Sdfrstatic void 49361033Sdfrtaskqueue_swi_enqueue(void *context) 49461033Sdfr{ 49588900Sjhb swi_sched(taskqueue_ih, 0); 49661033Sdfr} 49761033Sdfr 49861033Sdfrstatic void 49967551Sjhbtaskqueue_swi_run(void *dummy) 50061033Sdfr{ 501213813Smdf taskqueue_run(taskqueue_swi); 50261033Sdfr} 50361033Sdfr 504111528Sscottlstatic void 505111528Sscottltaskqueue_swi_giant_enqueue(void *context) 506111528Sscottl{ 507111528Sscottl swi_sched(taskqueue_giant_ih, 0); 508111528Sscottl} 509111528Sscottl 510111528Sscottlstatic void 511111528Sscottltaskqueue_swi_giant_run(void *dummy) 512111528Sscottl{ 513213813Smdf taskqueue_run(taskqueue_swi_giant); 514111528Sscottl} 515111528Sscottl 516154333Sscottlint 517154333Sscottltaskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 518154333Sscottl const char *name, ...) 519154333Sscottl{ 520154333Sscottl va_list ap; 521178015Ssam struct thread *td; 522154333Sscottl struct taskqueue *tq; 523178015Ssam int i, error; 524198411Sjhb char ktname[MAXCOMLEN + 1]; 525154333Sscottl 526154333Sscottl if (count <= 0) 527154333Sscottl return (EINVAL); 528178015Ssam 529154333Sscottl tq = *tqp; 530154333Sscottl 531154333Sscottl va_start(ap, name); 532198411Sjhb vsnprintf(ktname, sizeof(ktname), name, ap); 533154333Sscottl va_end(ap); 534154333Sscottl 535178015Ssam tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 536157314Ssam M_NOWAIT | M_ZERO); 537178015Ssam if (tq->tq_threads == NULL) { 538157314Ssam printf("%s: no memory for %s threads\n", __func__, ktname); 539157314Ssam return (ENOMEM); 540157314Ssam } 541157314Ssam 542154333Sscottl for (i = 0; i < count; i++) { 543154333Sscottl if (count == 1) 544178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 545209062Savg &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 546154333Sscottl else 547178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 548178015Ssam &tq->tq_threads[i], RFSTOPPED, 0, 549178015Ssam "%s_%d", ktname, i); 550158904Ssam if (error) { 551157314Ssam /* should be ok to continue, taskqueue_free will dtrt */ 552178015Ssam printf("%s: kthread_add(%s): error %d", __func__, 553178015Ssam ktname, error); 554178015Ssam tq->tq_threads[i] = NULL; /* paranoid */ 555158904Ssam } else 556178015Ssam tq->tq_tcount++; 557154333Sscottl } 558158904Ssam for (i = 0; i < count; i++) { 559178015Ssam if (tq->tq_threads[i] == NULL) 560158904Ssam continue; 561178015Ssam td = tq->tq_threads[i]; 562170307Sjeff thread_lock(td); 563158904Ssam sched_prio(td, pri); 564166188Sjeff sched_add(td, SRQ_BORING); 565170307Sjeff thread_unlock(td); 566158904Ssam } 567154333Sscottl 568154333Sscottl return (0); 569154333Sscottl} 570154333Sscottl 571248649Swillstatic inline void 572248649Swilltaskqueue_run_callback(struct taskqueue *tq, 573248649Swill enum taskqueue_callback_type cb_type) 574248649Swill{ 575248649Swill taskqueue_callback_fn tq_callback; 576248649Swill 577248649Swill TQ_ASSERT_UNLOCKED(tq); 578248649Swill tq_callback = tq->tq_callbacks[cb_type]; 579248649Swill if (tq_callback != NULL) 580248649Swill tq_callback(tq->tq_cb_contexts[cb_type]); 581248649Swill} 582248649Swill 583133305Sjmgvoid 584133305Sjmgtaskqueue_thread_loop(void *arg) 585119708Sken{ 586133305Sjmg struct taskqueue **tqp, *tq; 587131246Sjhb 588133305Sjmg tqp = arg; 589133305Sjmg tq = *tqp; 590248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 591154167Sscottl TQ_LOCK(tq); 592188548Sthompsa while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 593213813Smdf taskqueue_run_locked(tq); 594196293Spjd /* 595196293Spjd * Because taskqueue_run() can drop tq_mutex, we need to 596196293Spjd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 597196293Spjd * meantime, which means we missed a wakeup. 598196293Spjd */ 599196293Spjd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 600196293Spjd break; 601157815Sjhb TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 602188592Sthompsa } 603213813Smdf taskqueue_run_locked(tq); 604145729Ssam 605248649Swill /* 606248649Swill * This thread is on its way out, so just drop the lock temporarily 607248649Swill * in order to call the shutdown callback. This allows the callback 608248649Swill * to look at the taskqueue, even just before it dies. 609248649Swill */ 610248649Swill TQ_UNLOCK(tq); 611248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 612248649Swill TQ_LOCK(tq); 613248649Swill 614145729Ssam /* rendezvous with thread that asked us to terminate */ 615178015Ssam tq->tq_tcount--; 616178015Ssam wakeup_one(tq->tq_threads); 617154167Sscottl TQ_UNLOCK(tq); 618178123Sjhb kthread_exit(); 619119708Sken} 620119708Sken 621133305Sjmgvoid 622119708Skentaskqueue_thread_enqueue(void *context) 623119708Sken{ 624133305Sjmg struct taskqueue **tqp, *tq; 625131246Sjhb 626133305Sjmg tqp = context; 627133305Sjmg tq = *tqp; 628133305Sjmg 629145729Ssam wakeup_one(tq); 630119708Sken} 631119708Sken 632188058SimpTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 633111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 634111528Sscottl INTR_MPSAFE, &taskqueue_ih)); 635111528Sscottl 636188058SimpTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 637151656Sjhb swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 638111528Sscottl NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 639119708Sken 640133305SjmgTASKQUEUE_DEFINE_THREAD(thread); 641119789Ssam 642154167Sscottlstruct taskqueue * 643154167Sscottltaskqueue_create_fast(const char *name, int mflags, 644154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 645119789Ssam{ 646154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 647154167Sscottl MTX_SPIN, "fast_taskqueue"); 648119789Ssam} 649119789Ssam 650154167Sscottl/* NB: for backwards compatibility */ 651154167Sscottlint 652154167Sscottltaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 653119789Ssam{ 654154167Sscottl return taskqueue_enqueue(queue, task); 655119789Ssam} 656119789Ssam 657119789Ssamstatic void *taskqueue_fast_ih; 658119789Ssam 659119789Ssamstatic void 660154167Sscottltaskqueue_fast_enqueue(void *context) 661119789Ssam{ 662119789Ssam swi_sched(taskqueue_fast_ih, 0); 663119789Ssam} 664119789Ssam 665119789Ssamstatic void 666119789Ssamtaskqueue_fast_run(void *dummy) 667119789Ssam{ 668213813Smdf taskqueue_run(taskqueue_fast); 669119789Ssam} 670119789Ssam 671188058SimpTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 672239779Sjhb swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 673154167Sscottl SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 674196295Spjd 675196295Spjdint 676196295Spjdtaskqueue_member(struct taskqueue *queue, struct thread *td) 677196295Spjd{ 678196295Spjd int i, j, ret = 0; 679196295Spjd 680196295Spjd for (i = 0, j = 0; ; i++) { 681196295Spjd if (queue->tq_threads[i] == NULL) 682196295Spjd continue; 683196295Spjd if (queue->tq_threads[i] == td) { 684196295Spjd ret = 1; 685196295Spjd break; 686196295Spjd } 687196295Spjd if (++j >= queue->tq_tcount) 688196295Spjd break; 689196295Spjd } 690196295Spjd return (ret); 691196295Spjd} 692