subr_taskqueue.c revision 269666
161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 269666 2014-08-07 14:32:28Z ae $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 33266629Sadrian#include <sys/cpuset.h> 3485560Sjhb#include <sys/interrupt.h> 3561033Sdfr#include <sys/kernel.h> 36123614Sjhb#include <sys/kthread.h> 37225570Sadrian#include <sys/limits.h> 3885521Sjhb#include <sys/lock.h> 3961033Sdfr#include <sys/malloc.h> 4085521Sjhb#include <sys/mutex.h> 41145729Ssam#include <sys/proc.h> 42154333Sscottl#include <sys/sched.h> 4385521Sjhb#include <sys/taskqueue.h> 44119708Sken#include <sys/unistd.h> 45154333Sscottl#include <machine/stdarg.h> 4661033Sdfr 4769774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 48123614Sjhbstatic void *taskqueue_giant_ih; 49123614Sjhbstatic void *taskqueue_ih; 50256613Smavstatic void taskqueue_fast_enqueue(void *); 51256613Smavstatic void taskqueue_swi_enqueue(void *); 52256613Smavstatic void taskqueue_swi_giant_enqueue(void *); 5367551Sjhb 54213813Smdfstruct taskqueue_busy { 55213813Smdf struct task *tb_running; 56213813Smdf TAILQ_ENTRY(taskqueue_busy) tb_link; 57213813Smdf}; 58213813Smdf 5961033Sdfrstruct taskqueue { 6061033Sdfr STAILQ_HEAD(, task) tq_queue; 6161033Sdfr taskqueue_enqueue_fn tq_enqueue; 6261033Sdfr void *tq_context; 63213813Smdf TAILQ_HEAD(, taskqueue_busy) tq_active; 6485521Sjhb struct mtx tq_mutex; 65178015Ssam struct thread **tq_threads; 66178015Ssam int tq_tcount; 67180588Skmacy int tq_spin; 68154333Sscottl int tq_flags; 69221059Skib int tq_callouts; 70248649Swill taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 71248649Swill void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7261033Sdfr}; 7361033Sdfr 74154333Sscottl#define TQ_FLAGS_ACTIVE (1 << 0) 75177621Sscottl#define TQ_FLAGS_BLOCKED (1 << 1) 76256613Smav#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 77154333Sscottl 78221059Skib#define DT_CALLOUT_ARMED (1 << 0) 79221059Skib 80215021Sjmallett#define TQ_LOCK(tq) \ 81215021Sjmallett do { \ 82215021Sjmallett if ((tq)->tq_spin) \ 83215021Sjmallett mtx_lock_spin(&(tq)->tq_mutex); \ 84215021Sjmallett else \ 85215021Sjmallett mtx_lock(&(tq)->tq_mutex); \ 86215021Sjmallett } while (0) 87248649Swill#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 88154167Sscottl 89215021Sjmallett#define TQ_UNLOCK(tq) \ 90215021Sjmallett do { \ 91215021Sjmallett if ((tq)->tq_spin) \ 92215021Sjmallett mtx_unlock_spin(&(tq)->tq_mutex); \ 93215021Sjmallett else \ 94215021Sjmallett mtx_unlock(&(tq)->tq_mutex); \ 95215021Sjmallett } while (0) 96248649Swill#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 97154167Sscottl 98221059Skibvoid 99221059Skib_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 100221059Skib int priority, task_fn_t func, void *context) 101221059Skib{ 102221059Skib 103221059Skib TASK_INIT(&timeout_task->t, priority, func, context); 104256613Smav callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 105256613Smav CALLOUT_RETURNUNLOCKED); 106221059Skib timeout_task->q = queue; 107221059Skib timeout_task->f = 0; 108221059Skib} 109221059Skib 110154167Sscottlstatic __inline int 111154167SscottlTQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 112154167Sscottl int t) 113154167Sscottl{ 114180588Skmacy if (tq->tq_spin) 115154167Sscottl return (msleep_spin(p, m, wm, t)); 116154167Sscottl return (msleep(p, m, pri, wm, t)); 117154167Sscottl} 118154167Sscottl 119154167Sscottlstatic struct taskqueue * 120215750Savg_taskqueue_create(const char *name __unused, int mflags, 121145729Ssam taskqueue_enqueue_fn enqueue, void *context, 122154167Sscottl int mtxflags, const char *mtxname) 12361033Sdfr{ 12461033Sdfr struct taskqueue *queue; 125180588Skmacy 12685521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 12761033Sdfr if (!queue) 128188058Simp return NULL; 129180588Skmacy 13061033Sdfr STAILQ_INIT(&queue->tq_queue); 131213813Smdf TAILQ_INIT(&queue->tq_active); 13261033Sdfr queue->tq_enqueue = enqueue; 13361033Sdfr queue->tq_context = context; 134180588Skmacy queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 135180588Skmacy queue->tq_flags |= TQ_FLAGS_ACTIVE; 136256613Smav if (enqueue == taskqueue_fast_enqueue || 137256613Smav enqueue == taskqueue_swi_enqueue || 138256613Smav enqueue == taskqueue_swi_giant_enqueue || 139256613Smav enqueue == taskqueue_thread_enqueue) 140256613Smav queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 141154167Sscottl mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); 14261033Sdfr 14361033Sdfr return queue; 14461033Sdfr} 14561033Sdfr 146154167Sscottlstruct taskqueue * 147154167Sscottltaskqueue_create(const char *name, int mflags, 148154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 149154167Sscottl{ 150154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 151154167Sscottl MTX_DEF, "taskqueue"); 152154167Sscottl} 153154167Sscottl 154248649Swillvoid 155248649Swilltaskqueue_set_callback(struct taskqueue *queue, 156248649Swill enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 157248649Swill void *context) 158248649Swill{ 159248649Swill 160248649Swill KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 161248649Swill (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 162248649Swill ("Callback type %d not valid, must be %d-%d", cb_type, 163248649Swill TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 164248649Swill KASSERT((queue->tq_callbacks[cb_type] == NULL), 165248649Swill ("Re-initialization of taskqueue callback?")); 166248649Swill 167248649Swill queue->tq_callbacks[cb_type] = callback; 168248649Swill queue->tq_cb_contexts[cb_type] = context; 169248649Swill} 170248649Swill 171145729Ssam/* 172145729Ssam * Signal a taskqueue thread to terminate. 173145729Ssam */ 174145729Ssamstatic void 175178015Ssamtaskqueue_terminate(struct thread **pp, struct taskqueue *tq) 176145729Ssam{ 177145729Ssam 178221059Skib while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 179154333Sscottl wakeup(tq); 180154333Sscottl TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 181145729Ssam } 182145729Ssam} 183145729Ssam 18461033Sdfrvoid 18561033Sdfrtaskqueue_free(struct taskqueue *queue) 18661033Sdfr{ 18785521Sjhb 188154167Sscottl TQ_LOCK(queue); 189154333Sscottl queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 190178015Ssam taskqueue_terminate(queue->tq_threads, queue); 191213813Smdf KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 192221059Skib KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 19385521Sjhb mtx_destroy(&queue->tq_mutex); 194178015Ssam free(queue->tq_threads, M_TASKQUEUE); 19561033Sdfr free(queue, M_TASKQUEUE); 19661033Sdfr} 19761033Sdfr 198221059Skibstatic int 199221059Skibtaskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 20061033Sdfr{ 20161033Sdfr struct task *ins; 20261033Sdfr struct task *prev; 20361033Sdfr 20461033Sdfr /* 20561033Sdfr * Count multiple enqueues. 20661033Sdfr */ 207180588Skmacy if (task->ta_pending) { 208225570Sadrian if (task->ta_pending < USHRT_MAX) 209225570Sadrian task->ta_pending++; 210256613Smav TQ_UNLOCK(queue); 211221059Skib return (0); 21261033Sdfr } 21361033Sdfr 21461033Sdfr /* 21561033Sdfr * Optimise the case when all tasks have the same priority. 21661033Sdfr */ 21764199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 21861033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 21961033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 22061033Sdfr } else { 221188058Simp prev = NULL; 22261033Sdfr for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 22361033Sdfr prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 22461033Sdfr if (ins->ta_priority < task->ta_priority) 22561033Sdfr break; 22661033Sdfr 22761033Sdfr if (prev) 22861033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 22961033Sdfr else 23061033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 23161033Sdfr } 23261033Sdfr 23361033Sdfr task->ta_pending = 1; 234256613Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 235256613Smav TQ_UNLOCK(queue); 236180588Skmacy if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 237177621Sscottl queue->tq_enqueue(queue->tq_context); 238256613Smav if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 239256613Smav TQ_UNLOCK(queue); 24085560Sjhb 241256862Smav /* Return with lock released. */ 242221059Skib return (0); 243221059Skib} 244221059Skibint 245221059Skibtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 246221059Skib{ 247221059Skib int res; 248221059Skib 249221059Skib TQ_LOCK(queue); 250221059Skib res = taskqueue_enqueue_locked(queue, task); 251256862Smav /* The lock is released inside. */ 25285560Sjhb 253221059Skib return (res); 25461033Sdfr} 25561033Sdfr 256221059Skibstatic void 257221059Skibtaskqueue_timeout_func(void *arg) 258221059Skib{ 259221059Skib struct taskqueue *queue; 260221059Skib struct timeout_task *timeout_task; 261221059Skib 262221059Skib timeout_task = arg; 263221059Skib queue = timeout_task->q; 264221059Skib KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 265221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 266221059Skib queue->tq_callouts--; 267221059Skib taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 268256862Smav /* The lock is released inside. */ 269221059Skib} 270221059Skib 271221059Skibint 272221059Skibtaskqueue_enqueue_timeout(struct taskqueue *queue, 273221059Skib struct timeout_task *timeout_task, int ticks) 274221059Skib{ 275221059Skib int res; 276221059Skib 277221059Skib TQ_LOCK(queue); 278221059Skib KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 279221059Skib ("Migrated queue")); 280221059Skib KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 281221059Skib timeout_task->q = queue; 282221059Skib res = timeout_task->t.ta_pending; 283221059Skib if (ticks == 0) { 284221059Skib taskqueue_enqueue_locked(queue, &timeout_task->t); 285256862Smav /* The lock is released inside. */ 286221059Skib } else { 287221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 288221059Skib res++; 289221059Skib } else { 290221059Skib queue->tq_callouts++; 291221059Skib timeout_task->f |= DT_CALLOUT_ARMED; 292243341Skib if (ticks < 0) 293243341Skib ticks = -ticks; /* Ignore overflow. */ 294221059Skib } 295243341Skib if (ticks > 0) { 296243341Skib callout_reset(&timeout_task->c, ticks, 297243341Skib taskqueue_timeout_func, timeout_task); 298243341Skib } 299256613Smav TQ_UNLOCK(queue); 300221059Skib } 301221059Skib return (res); 302221059Skib} 303221059Skib 304258713Savgstatic void 305258713Savgtaskqueue_drain_running(struct taskqueue *queue) 306258713Savg{ 307258713Savg 308258713Savg while (!TAILQ_EMPTY(&queue->tq_active)) 309258713Savg TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, 310258713Savg PWAIT, "-", 0); 311258713Savg} 312258713Savg 31361033Sdfrvoid 314177621Sscottltaskqueue_block(struct taskqueue *queue) 315177621Sscottl{ 316177621Sscottl 317177621Sscottl TQ_LOCK(queue); 318177621Sscottl queue->tq_flags |= TQ_FLAGS_BLOCKED; 319177621Sscottl TQ_UNLOCK(queue); 320177621Sscottl} 321177621Sscottl 322177621Sscottlvoid 323177621Sscottltaskqueue_unblock(struct taskqueue *queue) 324177621Sscottl{ 325177621Sscottl 326177621Sscottl TQ_LOCK(queue); 327177621Sscottl queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 328256612Smav if (!STAILQ_EMPTY(&queue->tq_queue)) 329177621Sscottl queue->tq_enqueue(queue->tq_context); 330177621Sscottl TQ_UNLOCK(queue); 331177621Sscottl} 332177621Sscottl 333213813Smdfstatic void 334213813Smdftaskqueue_run_locked(struct taskqueue *queue) 33561033Sdfr{ 336213813Smdf struct taskqueue_busy tb; 337210380Smdf struct task *task; 338210377Smdf int pending; 33961033Sdfr 340248649Swill TQ_ASSERT_LOCKED(queue); 341213813Smdf tb.tb_running = NULL; 342213813Smdf TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 343213813Smdf 34461033Sdfr while (STAILQ_FIRST(&queue->tq_queue)) { 34561033Sdfr /* 34661033Sdfr * Carefully remove the first task from the queue and 34761033Sdfr * zero its pending count. 34861033Sdfr */ 34961033Sdfr task = STAILQ_FIRST(&queue->tq_queue); 35061033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 35161033Sdfr pending = task->ta_pending; 35261033Sdfr task->ta_pending = 0; 353213813Smdf tb.tb_running = task; 354154167Sscottl TQ_UNLOCK(queue); 35561033Sdfr 35685560Sjhb task->ta_func(task->ta_context, pending); 35761033Sdfr 358154167Sscottl TQ_LOCK(queue); 359213813Smdf tb.tb_running = NULL; 360208715Szml wakeup(task); 36161033Sdfr } 362213813Smdf TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 363258713Savg if (TAILQ_EMPTY(&queue->tq_active)) 364258713Savg wakeup(&queue->tq_active); 36561033Sdfr} 36661033Sdfr 367136131Simpvoid 368213813Smdftaskqueue_run(struct taskqueue *queue) 369213813Smdf{ 370213813Smdf 371213813Smdf TQ_LOCK(queue); 372213813Smdf taskqueue_run_locked(queue); 373213813Smdf TQ_UNLOCK(queue); 374213813Smdf} 375213813Smdf 376213813Smdfstatic int 377213813Smdftask_is_running(struct taskqueue *queue, struct task *task) 378213813Smdf{ 379213813Smdf struct taskqueue_busy *tb; 380213813Smdf 381248649Swill TQ_ASSERT_LOCKED(queue); 382213813Smdf TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 383213813Smdf if (tb->tb_running == task) 384213813Smdf return (1); 385213813Smdf } 386213813Smdf return (0); 387213813Smdf} 388213813Smdf 389221059Skibstatic int 390221059Skibtaskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 391221059Skib u_int *pendp) 392221059Skib{ 393221059Skib 394221059Skib if (task->ta_pending > 0) 395221059Skib STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 396221059Skib if (pendp != NULL) 397221059Skib *pendp = task->ta_pending; 398221059Skib task->ta_pending = 0; 399221059Skib return (task_is_running(queue, task) ? EBUSY : 0); 400221059Skib} 401221059Skib 402215011Smdfint 403215011Smdftaskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 404215011Smdf{ 405215011Smdf int error; 406215011Smdf 407215011Smdf TQ_LOCK(queue); 408221059Skib error = taskqueue_cancel_locked(queue, task, pendp); 409215011Smdf TQ_UNLOCK(queue); 410215011Smdf 411221059Skib return (error); 412221059Skib} 413221059Skib 414221059Skibint 415221059Skibtaskqueue_cancel_timeout(struct taskqueue *queue, 416221059Skib struct timeout_task *timeout_task, u_int *pendp) 417221059Skib{ 418221059Skib u_int pending, pending1; 419221059Skib int error; 420221059Skib 421221059Skib TQ_LOCK(queue); 422221059Skib pending = !!callout_stop(&timeout_task->c); 423221059Skib error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 424221059Skib if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 425221059Skib timeout_task->f &= ~DT_CALLOUT_ARMED; 426221059Skib queue->tq_callouts--; 427221059Skib } 428221059Skib TQ_UNLOCK(queue); 429221059Skib 430215011Smdf if (pendp != NULL) 431221059Skib *pendp = pending + pending1; 432215011Smdf return (error); 433215011Smdf} 434215011Smdf 435213813Smdfvoid 436136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task) 437136131Simp{ 438211284Spjd 439211284Spjd if (!queue->tq_spin) 440154167Sscottl WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 441145729Ssam 442211284Spjd TQ_LOCK(queue); 443213813Smdf while (task->ta_pending != 0 || task_is_running(queue, task)) 444211284Spjd TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 445211284Spjd TQ_UNLOCK(queue); 446136131Simp} 447136131Simp 448221059Skibvoid 449258713Savgtaskqueue_drain_all(struct taskqueue *queue) 450258713Savg{ 451258713Savg struct task *task; 452258713Savg 453258713Savg if (!queue->tq_spin) 454258713Savg WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 455258713Savg 456258713Savg TQ_LOCK(queue); 457258713Savg task = STAILQ_LAST(&queue->tq_queue, task, ta_link); 458258713Savg if (task != NULL) 459258713Savg while (task->ta_pending != 0) 460258713Savg TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); 461258713Savg taskqueue_drain_running(queue); 462258713Savg KASSERT(STAILQ_EMPTY(&queue->tq_queue), 463258713Savg ("taskqueue queue is not empty after draining")); 464258713Savg TQ_UNLOCK(queue); 465258713Savg} 466258713Savg 467258713Savgvoid 468221059Skibtaskqueue_drain_timeout(struct taskqueue *queue, 469221059Skib struct timeout_task *timeout_task) 470221059Skib{ 471221059Skib 472221059Skib callout_drain(&timeout_task->c); 473221059Skib taskqueue_drain(queue, &timeout_task->t); 474221059Skib} 475221059Skib 47661033Sdfrstatic void 47761033Sdfrtaskqueue_swi_enqueue(void *context) 47861033Sdfr{ 47988900Sjhb swi_sched(taskqueue_ih, 0); 48061033Sdfr} 48161033Sdfr 48261033Sdfrstatic void 48367551Sjhbtaskqueue_swi_run(void *dummy) 48461033Sdfr{ 485213813Smdf taskqueue_run(taskqueue_swi); 48661033Sdfr} 48761033Sdfr 488111528Sscottlstatic void 489111528Sscottltaskqueue_swi_giant_enqueue(void *context) 490111528Sscottl{ 491111528Sscottl swi_sched(taskqueue_giant_ih, 0); 492111528Sscottl} 493111528Sscottl 494111528Sscottlstatic void 495111528Sscottltaskqueue_swi_giant_run(void *dummy) 496111528Sscottl{ 497213813Smdf taskqueue_run(taskqueue_swi_giant); 498111528Sscottl} 499111528Sscottl 500266629Sadrianstatic int 501266629Sadrian_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 502266629Sadrian cpuset_t *mask, const char *ktname) 503154333Sscottl{ 504178015Ssam struct thread *td; 505154333Sscottl struct taskqueue *tq; 506178015Ssam int i, error; 507154333Sscottl 508154333Sscottl if (count <= 0) 509154333Sscottl return (EINVAL); 510178015Ssam 511154333Sscottl tq = *tqp; 512154333Sscottl 513178015Ssam tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 514157314Ssam M_NOWAIT | M_ZERO); 515178015Ssam if (tq->tq_threads == NULL) { 516157314Ssam printf("%s: no memory for %s threads\n", __func__, ktname); 517157314Ssam return (ENOMEM); 518157314Ssam } 519157314Ssam 520154333Sscottl for (i = 0; i < count; i++) { 521154333Sscottl if (count == 1) 522178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 523209062Savg &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 524154333Sscottl else 525178015Ssam error = kthread_add(taskqueue_thread_loop, tqp, NULL, 526178015Ssam &tq->tq_threads[i], RFSTOPPED, 0, 527178015Ssam "%s_%d", ktname, i); 528158904Ssam if (error) { 529157314Ssam /* should be ok to continue, taskqueue_free will dtrt */ 530178015Ssam printf("%s: kthread_add(%s): error %d", __func__, 531178015Ssam ktname, error); 532178015Ssam tq->tq_threads[i] = NULL; /* paranoid */ 533158904Ssam } else 534178015Ssam tq->tq_tcount++; 535154333Sscottl } 536158904Ssam for (i = 0; i < count; i++) { 537178015Ssam if (tq->tq_threads[i] == NULL) 538158904Ssam continue; 539178015Ssam td = tq->tq_threads[i]; 540266629Sadrian if (mask) { 541269666Sae error = cpuset_setthread(td->td_tid, mask); 542266629Sadrian /* 543266629Sadrian * Failing to pin is rarely an actual fatal error; 544266629Sadrian * it'll just affect performance. 545266629Sadrian */ 546266629Sadrian if (error) 547266629Sadrian printf("%s: curthread=%llu: can't pin; " 548266629Sadrian "error=%d\n", 549266629Sadrian __func__, 550266629Sadrian (unsigned long long) td->td_tid, 551266629Sadrian error); 552266629Sadrian } 553170307Sjeff thread_lock(td); 554158904Ssam sched_prio(td, pri); 555166188Sjeff sched_add(td, SRQ_BORING); 556170307Sjeff thread_unlock(td); 557158904Ssam } 558154333Sscottl 559154333Sscottl return (0); 560154333Sscottl} 561154333Sscottl 562266629Sadrianint 563266629Sadriantaskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 564266629Sadrian const char *name, ...) 565266629Sadrian{ 566266629Sadrian char ktname[MAXCOMLEN + 1]; 567266629Sadrian va_list ap; 568266629Sadrian 569266629Sadrian va_start(ap, name); 570266629Sadrian vsnprintf(ktname, sizeof(ktname), name, ap); 571266629Sadrian va_end(ap); 572266629Sadrian 573266629Sadrian return (_taskqueue_start_threads(tqp, count, pri, NULL, ktname)); 574266629Sadrian} 575266629Sadrian 576266629Sadrianint 577266629Sadriantaskqueue_start_threads_pinned(struct taskqueue **tqp, int count, int pri, 578266629Sadrian int cpu_id, const char *name, ...) 579266629Sadrian{ 580266629Sadrian char ktname[MAXCOMLEN + 1]; 581266629Sadrian va_list ap; 582266629Sadrian cpuset_t mask; 583266629Sadrian 584266629Sadrian va_start(ap, name); 585266629Sadrian vsnprintf(ktname, sizeof(ktname), name, ap); 586266629Sadrian va_end(ap); 587266629Sadrian 588266629Sadrian /* 589266629Sadrian * In case someone passes in NOCPU, just fall back to the 590266629Sadrian * default behaviour of "don't pin". 591266629Sadrian */ 592266629Sadrian if (cpu_id != NOCPU) { 593266629Sadrian CPU_ZERO(&mask); 594266629Sadrian CPU_SET(cpu_id, &mask); 595266629Sadrian } 596266629Sadrian 597266629Sadrian return (_taskqueue_start_threads(tqp, count, pri, 598266629Sadrian cpu_id == NOCPU ? NULL : &mask, ktname)); 599266629Sadrian} 600266629Sadrian 601248649Swillstatic inline void 602248649Swilltaskqueue_run_callback(struct taskqueue *tq, 603248649Swill enum taskqueue_callback_type cb_type) 604248649Swill{ 605248649Swill taskqueue_callback_fn tq_callback; 606248649Swill 607248649Swill TQ_ASSERT_UNLOCKED(tq); 608248649Swill tq_callback = tq->tq_callbacks[cb_type]; 609248649Swill if (tq_callback != NULL) 610248649Swill tq_callback(tq->tq_cb_contexts[cb_type]); 611248649Swill} 612248649Swill 613133305Sjmgvoid 614133305Sjmgtaskqueue_thread_loop(void *arg) 615119708Sken{ 616133305Sjmg struct taskqueue **tqp, *tq; 617131246Sjhb 618133305Sjmg tqp = arg; 619133305Sjmg tq = *tqp; 620248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 621154167Sscottl TQ_LOCK(tq); 622188548Sthompsa while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 623213813Smdf taskqueue_run_locked(tq); 624196293Spjd /* 625196293Spjd * Because taskqueue_run() can drop tq_mutex, we need to 626196293Spjd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 627196293Spjd * meantime, which means we missed a wakeup. 628196293Spjd */ 629196293Spjd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 630196293Spjd break; 631157815Sjhb TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 632188592Sthompsa } 633213813Smdf taskqueue_run_locked(tq); 634145729Ssam 635248649Swill /* 636248649Swill * This thread is on its way out, so just drop the lock temporarily 637248649Swill * in order to call the shutdown callback. This allows the callback 638248649Swill * to look at the taskqueue, even just before it dies. 639248649Swill */ 640248649Swill TQ_UNLOCK(tq); 641248649Swill taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 642248649Swill TQ_LOCK(tq); 643248649Swill 644145729Ssam /* rendezvous with thread that asked us to terminate */ 645178015Ssam tq->tq_tcount--; 646178015Ssam wakeup_one(tq->tq_threads); 647154167Sscottl TQ_UNLOCK(tq); 648178123Sjhb kthread_exit(); 649119708Sken} 650119708Sken 651133305Sjmgvoid 652119708Skentaskqueue_thread_enqueue(void *context) 653119708Sken{ 654133305Sjmg struct taskqueue **tqp, *tq; 655131246Sjhb 656133305Sjmg tqp = context; 657133305Sjmg tq = *tqp; 658133305Sjmg 659145729Ssam wakeup_one(tq); 660119708Sken} 661119708Sken 662188058SimpTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 663111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 664111528Sscottl INTR_MPSAFE, &taskqueue_ih)); 665111528Sscottl 666188058SimpTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 667151656Sjhb swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 668111528Sscottl NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 669119708Sken 670133305SjmgTASKQUEUE_DEFINE_THREAD(thread); 671119789Ssam 672154167Sscottlstruct taskqueue * 673154167Sscottltaskqueue_create_fast(const char *name, int mflags, 674154333Sscottl taskqueue_enqueue_fn enqueue, void *context) 675119789Ssam{ 676154333Sscottl return _taskqueue_create(name, mflags, enqueue, context, 677154167Sscottl MTX_SPIN, "fast_taskqueue"); 678119789Ssam} 679119789Ssam 680154167Sscottl/* NB: for backwards compatibility */ 681154167Sscottlint 682154167Sscottltaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 683119789Ssam{ 684154167Sscottl return taskqueue_enqueue(queue, task); 685119789Ssam} 686119789Ssam 687119789Ssamstatic void *taskqueue_fast_ih; 688119789Ssam 689119789Ssamstatic void 690154167Sscottltaskqueue_fast_enqueue(void *context) 691119789Ssam{ 692119789Ssam swi_sched(taskqueue_fast_ih, 0); 693119789Ssam} 694119789Ssam 695119789Ssamstatic void 696119789Ssamtaskqueue_fast_run(void *dummy) 697119789Ssam{ 698213813Smdf taskqueue_run(taskqueue_fast); 699119789Ssam} 700119789Ssam 701188058SimpTASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 702239779Sjhb swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 703154167Sscottl SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 704196295Spjd 705196295Spjdint 706196295Spjdtaskqueue_member(struct taskqueue *queue, struct thread *td) 707196295Spjd{ 708196295Spjd int i, j, ret = 0; 709196295Spjd 710196295Spjd for (i = 0, j = 0; ; i++) { 711196295Spjd if (queue->tq_threads[i] == NULL) 712196295Spjd continue; 713196295Spjd if (queue->tq_threads[i] == td) { 714196295Spjd ret = 1; 715196295Spjd break; 716196295Spjd } 717196295Spjd if (++j >= queue->tq_tcount) 718196295Spjd break; 719196295Spjd } 720196295Spjd return (ret); 721196295Spjd} 722