subr_taskqueue.c revision 136131
161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 136131 2004-10-05 04:16:01Z imp $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 3385560Sjhb#include <sys/interrupt.h> 3461033Sdfr#include <sys/kernel.h> 35123614Sjhb#include <sys/kthread.h> 3685521Sjhb#include <sys/lock.h> 3761033Sdfr#include <sys/malloc.h> 3885521Sjhb#include <sys/mutex.h> 3985521Sjhb#include <sys/taskqueue.h> 40119708Sken#include <sys/unistd.h> 4161033Sdfr 4269774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43123614Sjhbstatic void *taskqueue_giant_ih; 44123614Sjhbstatic void *taskqueue_ih; 4561033Sdfrstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 4685521Sjhbstatic struct mtx taskqueue_queues_mutex; 4767551Sjhb 4861033Sdfrstruct taskqueue { 4961033Sdfr STAILQ_ENTRY(taskqueue) tq_link; 5061033Sdfr STAILQ_HEAD(, task) tq_queue; 5161033Sdfr const char *tq_name; 5261033Sdfr taskqueue_enqueue_fn tq_enqueue; 5361033Sdfr void *tq_context; 5485521Sjhb struct mtx tq_mutex; 5561033Sdfr}; 5661033Sdfr 5785521Sjhbstatic void init_taskqueue_list(void *data); 5885521Sjhb 5985521Sjhbstatic void 6085521Sjhbinit_taskqueue_list(void *data __unused) 6185521Sjhb{ 6285521Sjhb 6393818Sjhb mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 6485521Sjhb STAILQ_INIT(&taskqueue_queues); 6585521Sjhb} 6685521SjhbSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 6785521Sjhb NULL); 6885521Sjhb 6961033Sdfrstruct taskqueue * 7061033Sdfrtaskqueue_create(const char *name, int mflags, 7161033Sdfr taskqueue_enqueue_fn enqueue, void *context) 7261033Sdfr{ 7361033Sdfr struct taskqueue *queue; 7461033Sdfr 7585521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 7661033Sdfr if (!queue) 7761033Sdfr return 0; 7885521Sjhb 7961033Sdfr STAILQ_INIT(&queue->tq_queue); 8061033Sdfr queue->tq_name = name; 8161033Sdfr queue->tq_enqueue = enqueue; 8261033Sdfr queue->tq_context = context; 8393818Sjhb mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 8461033Sdfr 8585521Sjhb mtx_lock(&taskqueue_queues_mutex); 8661033Sdfr STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 8785521Sjhb mtx_unlock(&taskqueue_queues_mutex); 8861033Sdfr 8961033Sdfr return queue; 9061033Sdfr} 9161033Sdfr 9261033Sdfrvoid 9361033Sdfrtaskqueue_free(struct taskqueue *queue) 9461033Sdfr{ 9585521Sjhb 9685521Sjhb mtx_lock(&taskqueue_queues_mutex); 9761033Sdfr STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 9885521Sjhb mtx_unlock(&taskqueue_queues_mutex); 9961033Sdfr 100131246Sjhb mtx_lock(&queue->tq_mutex); 101131246Sjhb taskqueue_run(queue); 10285521Sjhb mtx_destroy(&queue->tq_mutex); 10361033Sdfr free(queue, M_TASKQUEUE); 10461033Sdfr} 10561033Sdfr 10685521Sjhb/* 10785521Sjhb * Returns with the taskqueue locked. 10885521Sjhb */ 10961033Sdfrstruct taskqueue * 11061033Sdfrtaskqueue_find(const char *name) 11161033Sdfr{ 11261033Sdfr struct taskqueue *queue; 11361033Sdfr 11485521Sjhb mtx_lock(&taskqueue_queues_mutex); 11585521Sjhb STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 116123614Sjhb if (strcmp(queue->tq_name, name) == 0) { 117131246Sjhb mtx_lock(&queue->tq_mutex); 11885521Sjhb mtx_unlock(&taskqueue_queues_mutex); 11961033Sdfr return queue; 12061033Sdfr } 12185521Sjhb } 12285521Sjhb mtx_unlock(&taskqueue_queues_mutex); 123123614Sjhb return NULL; 12461033Sdfr} 12561033Sdfr 12661033Sdfrint 12761033Sdfrtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 12861033Sdfr{ 12961033Sdfr struct task *ins; 13061033Sdfr struct task *prev; 13161033Sdfr 13285560Sjhb mtx_lock(&queue->tq_mutex); 13385560Sjhb 13461033Sdfr /* 13561033Sdfr * Count multiple enqueues. 13661033Sdfr */ 13761033Sdfr if (task->ta_pending) { 13861033Sdfr task->ta_pending++; 13985521Sjhb mtx_unlock(&queue->tq_mutex); 14061033Sdfr return 0; 14161033Sdfr } 14261033Sdfr 14361033Sdfr /* 14461033Sdfr * Optimise the case when all tasks have the same priority. 14561033Sdfr */ 14664199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 14761033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 14861033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 14961033Sdfr } else { 15061033Sdfr prev = 0; 15161033Sdfr for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 15261033Sdfr prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 15361033Sdfr if (ins->ta_priority < task->ta_priority) 15461033Sdfr break; 15561033Sdfr 15661033Sdfr if (prev) 15761033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 15861033Sdfr else 15961033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 16061033Sdfr } 16161033Sdfr 16261033Sdfr task->ta_pending = 1; 16361033Sdfr if (queue->tq_enqueue) 16461033Sdfr queue->tq_enqueue(queue->tq_context); 16585560Sjhb 16685521Sjhb mtx_unlock(&queue->tq_mutex); 16785560Sjhb 16861033Sdfr return 0; 16961033Sdfr} 17061033Sdfr 17161033Sdfrvoid 17261033Sdfrtaskqueue_run(struct taskqueue *queue) 17361033Sdfr{ 17461033Sdfr struct task *task; 175131246Sjhb int owned, pending; 17661033Sdfr 177131246Sjhb owned = mtx_owned(&queue->tq_mutex); 178131246Sjhb if (!owned) 179131246Sjhb mtx_lock(&queue->tq_mutex); 18061033Sdfr while (STAILQ_FIRST(&queue->tq_queue)) { 18161033Sdfr /* 18261033Sdfr * Carefully remove the first task from the queue and 18361033Sdfr * zero its pending count. 18461033Sdfr */ 18561033Sdfr task = STAILQ_FIRST(&queue->tq_queue); 18661033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 18761033Sdfr pending = task->ta_pending; 18861033Sdfr task->ta_pending = 0; 189136131Simp task->ta_flags |= TAF_PENDING; 19085560Sjhb mtx_unlock(&queue->tq_mutex); 19161033Sdfr 19285560Sjhb task->ta_func(task->ta_context, pending); 19361033Sdfr 19485521Sjhb mtx_lock(&queue->tq_mutex); 195136131Simp task->ta_flags &= ~TAF_PENDING; 196136131Simp wakeup(task); 19761033Sdfr } 198131246Sjhb 199131246Sjhb /* 200131246Sjhb * For compatibility, unlock on return if the queue was not locked 201131246Sjhb * on entry, although this opens a race window. 202131246Sjhb */ 203131246Sjhb if (!owned) 204131246Sjhb mtx_unlock(&queue->tq_mutex); 20561033Sdfr} 20661033Sdfr 207136131Simpvoid 208136131Simptaskqueue_drain(struct taskqueue *queue, struct task *task) 209136131Simp{ 210136131Simp WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain"); 211136131Simp mtx_lock(&queue->tq_mutex); 212136131Simp while (task->ta_pending != 0 || (task->ta_flags & TAF_PENDING)) { 213136131Simp msleep(task, &queue->tq_mutex, PWAIT, "-", 0); 214136131Simp } 215136131Simp mtx_unlock(&queue->tq_mutex); 216136131Simp} 217136131Simp 21861033Sdfrstatic void 21961033Sdfrtaskqueue_swi_enqueue(void *context) 22061033Sdfr{ 22188900Sjhb swi_sched(taskqueue_ih, 0); 22261033Sdfr} 22361033Sdfr 22461033Sdfrstatic void 22567551Sjhbtaskqueue_swi_run(void *dummy) 22661033Sdfr{ 22761033Sdfr taskqueue_run(taskqueue_swi); 22861033Sdfr} 22961033Sdfr 230111528Sscottlstatic void 231111528Sscottltaskqueue_swi_giant_enqueue(void *context) 232111528Sscottl{ 233111528Sscottl swi_sched(taskqueue_giant_ih, 0); 234111528Sscottl} 235111528Sscottl 236111528Sscottlstatic void 237111528Sscottltaskqueue_swi_giant_run(void *dummy) 238111528Sscottl{ 239111528Sscottl taskqueue_run(taskqueue_swi_giant); 240111528Sscottl} 241111528Sscottl 242133305Sjmgvoid 243133305Sjmgtaskqueue_thread_loop(void *arg) 244119708Sken{ 245133305Sjmg struct taskqueue **tqp, *tq; 246131246Sjhb 247133305Sjmg tqp = arg; 248133305Sjmg tq = *tqp; 249133305Sjmg mtx_lock(&tq->tq_mutex); 250119708Sken for (;;) { 251133305Sjmg taskqueue_run(tq); 252133305Sjmg msleep(tq, &tq->tq_mutex, PWAIT, "-", 0); 253119708Sken } 254119708Sken} 255119708Sken 256133305Sjmgvoid 257119708Skentaskqueue_thread_enqueue(void *context) 258119708Sken{ 259133305Sjmg struct taskqueue **tqp, *tq; 260131246Sjhb 261133305Sjmg tqp = context; 262133305Sjmg tq = *tqp; 263133305Sjmg 264133305Sjmg mtx_assert(&tq->tq_mutex, MA_OWNED); 265133305Sjmg wakeup(tq); 266119708Sken} 267119708Sken 26861033SdfrTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 269111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 270111528Sscottl INTR_MPSAFE, &taskqueue_ih)); 271111528Sscottl 272111528SscottlTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 273111528Sscottl swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 274111528Sscottl NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 275119708Sken 276133305SjmgTASKQUEUE_DEFINE_THREAD(thread); 277119789Ssam 278119789Ssamint 279119789Ssamtaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 280119789Ssam{ 281119789Ssam struct task *ins; 282119789Ssam struct task *prev; 283119789Ssam 284119789Ssam mtx_lock_spin(&queue->tq_mutex); 285119789Ssam 286119789Ssam /* 287119789Ssam * Count multiple enqueues. 288119789Ssam */ 289119789Ssam if (task->ta_pending) { 290119789Ssam task->ta_pending++; 291119789Ssam mtx_unlock_spin(&queue->tq_mutex); 292119789Ssam return 0; 293119789Ssam } 294119789Ssam 295119789Ssam /* 296119789Ssam * Optimise the case when all tasks have the same priority. 297119789Ssam */ 298119789Ssam prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 299119789Ssam if (!prev || prev->ta_priority >= task->ta_priority) { 300119789Ssam STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 301119789Ssam } else { 302119789Ssam prev = 0; 303119789Ssam for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 304119789Ssam prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 305119789Ssam if (ins->ta_priority < task->ta_priority) 306119789Ssam break; 307119789Ssam 308119789Ssam if (prev) 309119789Ssam STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 310119789Ssam else 311119789Ssam STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 312119789Ssam } 313119789Ssam 314119789Ssam task->ta_pending = 1; 315119789Ssam if (queue->tq_enqueue) 316119789Ssam queue->tq_enqueue(queue->tq_context); 317119789Ssam 318119789Ssam mtx_unlock_spin(&queue->tq_mutex); 319119789Ssam 320119789Ssam return 0; 321119789Ssam} 322119789Ssam 323119789Ssamstatic void 324119789Ssamtaskqueue_run_fast(struct taskqueue *queue) 325119789Ssam{ 326119789Ssam struct task *task; 327119789Ssam int pending; 328119789Ssam 329119789Ssam mtx_lock_spin(&queue->tq_mutex); 330119789Ssam while (STAILQ_FIRST(&queue->tq_queue)) { 331119789Ssam /* 332119789Ssam * Carefully remove the first task from the queue and 333119789Ssam * zero its pending count. 334119789Ssam */ 335119789Ssam task = STAILQ_FIRST(&queue->tq_queue); 336119789Ssam STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 337119789Ssam pending = task->ta_pending; 338119789Ssam task->ta_pending = 0; 339119789Ssam mtx_unlock_spin(&queue->tq_mutex); 340119789Ssam 341119789Ssam task->ta_func(task->ta_context, pending); 342119789Ssam 343119789Ssam mtx_lock_spin(&queue->tq_mutex); 344119789Ssam } 345119789Ssam mtx_unlock_spin(&queue->tq_mutex); 346119789Ssam} 347119789Ssam 348119789Ssamstruct taskqueue *taskqueue_fast; 349119789Ssamstatic void *taskqueue_fast_ih; 350119789Ssam 351119789Ssamstatic void 352119789Ssamtaskqueue_fast_schedule(void *context) 353119789Ssam{ 354119789Ssam swi_sched(taskqueue_fast_ih, 0); 355119789Ssam} 356119789Ssam 357119789Ssamstatic void 358119789Ssamtaskqueue_fast_run(void *dummy) 359119789Ssam{ 360119789Ssam taskqueue_run_fast(taskqueue_fast); 361119789Ssam} 362119789Ssam 363119789Ssamstatic void 364119789Ssamtaskqueue_define_fast(void *arg) 365119789Ssam{ 366131246Sjhb 367131246Sjhb taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE, 368131246Sjhb M_NOWAIT | M_ZERO); 369119789Ssam if (!taskqueue_fast) { 370119789Ssam printf("%s: Unable to allocate fast task queue!\n", __func__); 371119789Ssam return; 372119789Ssam } 373119789Ssam 374119789Ssam STAILQ_INIT(&taskqueue_fast->tq_queue); 375119789Ssam taskqueue_fast->tq_name = "fast"; 376119789Ssam taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 377119812Ssam mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 378119789Ssam 379119789Ssam mtx_lock(&taskqueue_queues_mutex); 380119789Ssam STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 381119789Ssam mtx_unlock(&taskqueue_queues_mutex); 382119789Ssam 383119789Ssam swi_add(NULL, "Fast task queue", taskqueue_fast_run, 384119789Ssam NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 385119789Ssam} 386119789SsamSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 387131246Sjhb taskqueue_define_fast, NULL); 388