subr_taskqueue.c revision 119789
161033Sdfr/*- 261033Sdfr * Copyright (c) 2000 Doug Rabson 361033Sdfr * All rights reserved. 461033Sdfr * 561033Sdfr * Redistribution and use in source and binary forms, with or without 661033Sdfr * modification, are permitted provided that the following conditions 761033Sdfr * are met: 861033Sdfr * 1. Redistributions of source code must retain the above copyright 961033Sdfr * notice, this list of conditions and the following disclaimer. 1061033Sdfr * 2. Redistributions in binary form must reproduce the above copyright 1161033Sdfr * notice, this list of conditions and the following disclaimer in the 1261033Sdfr * documentation and/or other materials provided with the distribution. 1361033Sdfr * 1461033Sdfr * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1561033Sdfr * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1661033Sdfr * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1761033Sdfr * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 1861033Sdfr * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 1961033Sdfr * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2061033Sdfr * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2161033Sdfr * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2261033Sdfr * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2361033Sdfr * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2461033Sdfr * SUCH DAMAGE. 2561033Sdfr */ 2661033Sdfr 27116182Sobrien#include <sys/cdefs.h> 28116182Sobrien__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 119789 2003-09-05 23:09:22Z sam $"); 29116182Sobrien 3061033Sdfr#include <sys/param.h> 3185521Sjhb#include <sys/systm.h> 3265822Sjhb#include <sys/bus.h> 3385560Sjhb#include <sys/interrupt.h> 3461033Sdfr#include <sys/kernel.h> 3585521Sjhb#include <sys/lock.h> 3661033Sdfr#include <sys/malloc.h> 3785521Sjhb#include <sys/mutex.h> 3885521Sjhb#include <sys/taskqueue.h> 39119708Sken#include <sys/kthread.h> 40119708Sken#include <sys/unistd.h> 4161033Sdfr 4269774Sphkstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 4361033Sdfr 4461033Sdfrstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 4561033Sdfr 4672238Sjhbstatic void *taskqueue_ih; 47111528Sscottlstatic void *taskqueue_giant_ih; 4885521Sjhbstatic struct mtx taskqueue_queues_mutex; 49119708Skenstatic struct proc *taskqueue_thread_proc; 5067551Sjhb 5161033Sdfrstruct taskqueue { 5261033Sdfr STAILQ_ENTRY(taskqueue) tq_link; 5361033Sdfr STAILQ_HEAD(, task) tq_queue; 5461033Sdfr const char *tq_name; 5561033Sdfr taskqueue_enqueue_fn tq_enqueue; 5661033Sdfr void *tq_context; 5761033Sdfr int tq_draining; 5885521Sjhb struct mtx tq_mutex; 5961033Sdfr}; 6061033Sdfr 6185521Sjhbstatic void init_taskqueue_list(void *data); 6285521Sjhb 6385521Sjhbstatic void 6485521Sjhbinit_taskqueue_list(void *data __unused) 6585521Sjhb{ 6685521Sjhb 6793818Sjhb mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 6885521Sjhb STAILQ_INIT(&taskqueue_queues); 6985521Sjhb} 7085521SjhbSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 7185521Sjhb NULL); 7285521Sjhb 7361033Sdfrstruct taskqueue * 7461033Sdfrtaskqueue_create(const char *name, int mflags, 7561033Sdfr taskqueue_enqueue_fn enqueue, void *context) 7661033Sdfr{ 7761033Sdfr struct taskqueue *queue; 7861033Sdfr 7985521Sjhb queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 8061033Sdfr if (!queue) 8161033Sdfr return 0; 8285521Sjhb 8361033Sdfr STAILQ_INIT(&queue->tq_queue); 8461033Sdfr queue->tq_name = name; 8561033Sdfr queue->tq_enqueue = enqueue; 8661033Sdfr queue->tq_context = context; 8761033Sdfr queue->tq_draining = 0; 8893818Sjhb mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 8961033Sdfr 9085521Sjhb mtx_lock(&taskqueue_queues_mutex); 9161033Sdfr STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 9285521Sjhb mtx_unlock(&taskqueue_queues_mutex); 9361033Sdfr 9461033Sdfr return queue; 9561033Sdfr} 9661033Sdfr 9761033Sdfrvoid 9861033Sdfrtaskqueue_free(struct taskqueue *queue) 9961033Sdfr{ 10085521Sjhb 10185521Sjhb mtx_lock(&queue->tq_mutex); 102101153Sjhb KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue")); 10361033Sdfr queue->tq_draining = 1; 10485521Sjhb mtx_unlock(&queue->tq_mutex); 10561033Sdfr 10661033Sdfr taskqueue_run(queue); 10761033Sdfr 10885521Sjhb mtx_lock(&taskqueue_queues_mutex); 10961033Sdfr STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 11085521Sjhb mtx_unlock(&taskqueue_queues_mutex); 11161033Sdfr 11285521Sjhb mtx_destroy(&queue->tq_mutex); 11361033Sdfr free(queue, M_TASKQUEUE); 11461033Sdfr} 11561033Sdfr 11685521Sjhb/* 11785521Sjhb * Returns with the taskqueue locked. 11885521Sjhb */ 11961033Sdfrstruct taskqueue * 12061033Sdfrtaskqueue_find(const char *name) 12161033Sdfr{ 12261033Sdfr struct taskqueue *queue; 12361033Sdfr 12485521Sjhb mtx_lock(&taskqueue_queues_mutex); 12585521Sjhb STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 12685521Sjhb mtx_lock(&queue->tq_mutex); 12761033Sdfr if (!strcmp(queue->tq_name, name)) { 12885521Sjhb mtx_unlock(&taskqueue_queues_mutex); 12961033Sdfr return queue; 13061033Sdfr } 13185521Sjhb mtx_unlock(&queue->tq_mutex); 13285521Sjhb } 13385521Sjhb mtx_unlock(&taskqueue_queues_mutex); 13461033Sdfr return 0; 13561033Sdfr} 13661033Sdfr 13761033Sdfrint 13861033Sdfrtaskqueue_enqueue(struct taskqueue *queue, struct task *task) 13961033Sdfr{ 14061033Sdfr struct task *ins; 14161033Sdfr struct task *prev; 14261033Sdfr 14385560Sjhb mtx_lock(&queue->tq_mutex); 14485560Sjhb 14561033Sdfr /* 14661033Sdfr * Don't allow new tasks on a queue which is being freed. 14761033Sdfr */ 14861033Sdfr if (queue->tq_draining) { 14985521Sjhb mtx_unlock(&queue->tq_mutex); 15061033Sdfr return EPIPE; 15161033Sdfr } 15261033Sdfr 15361033Sdfr /* 15461033Sdfr * Count multiple enqueues. 15561033Sdfr */ 15661033Sdfr if (task->ta_pending) { 15761033Sdfr task->ta_pending++; 15885521Sjhb mtx_unlock(&queue->tq_mutex); 15961033Sdfr return 0; 16061033Sdfr } 16161033Sdfr 16261033Sdfr /* 16361033Sdfr * Optimise the case when all tasks have the same priority. 16461033Sdfr */ 16564199Shsu prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 16661033Sdfr if (!prev || prev->ta_priority >= task->ta_priority) { 16761033Sdfr STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 16861033Sdfr } else { 16961033Sdfr prev = 0; 17061033Sdfr for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 17161033Sdfr prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 17261033Sdfr if (ins->ta_priority < task->ta_priority) 17361033Sdfr break; 17461033Sdfr 17561033Sdfr if (prev) 17661033Sdfr STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 17761033Sdfr else 17861033Sdfr STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 17961033Sdfr } 18061033Sdfr 18161033Sdfr task->ta_pending = 1; 18261033Sdfr if (queue->tq_enqueue) 18361033Sdfr queue->tq_enqueue(queue->tq_context); 18485560Sjhb 18585521Sjhb mtx_unlock(&queue->tq_mutex); 18685560Sjhb 18761033Sdfr return 0; 18861033Sdfr} 18961033Sdfr 19061033Sdfrvoid 19161033Sdfrtaskqueue_run(struct taskqueue *queue) 19261033Sdfr{ 19361033Sdfr struct task *task; 19461033Sdfr int pending; 19561033Sdfr 19685521Sjhb mtx_lock(&queue->tq_mutex); 19761033Sdfr while (STAILQ_FIRST(&queue->tq_queue)) { 19861033Sdfr /* 19961033Sdfr * Carefully remove the first task from the queue and 20061033Sdfr * zero its pending count. 20161033Sdfr */ 20261033Sdfr task = STAILQ_FIRST(&queue->tq_queue); 20361033Sdfr STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 20461033Sdfr pending = task->ta_pending; 20561033Sdfr task->ta_pending = 0; 20685560Sjhb mtx_unlock(&queue->tq_mutex); 20761033Sdfr 20885560Sjhb task->ta_func(task->ta_context, pending); 20961033Sdfr 21085521Sjhb mtx_lock(&queue->tq_mutex); 21161033Sdfr } 21285521Sjhb mtx_unlock(&queue->tq_mutex); 21361033Sdfr} 21461033Sdfr 21561033Sdfrstatic void 21661033Sdfrtaskqueue_swi_enqueue(void *context) 21761033Sdfr{ 21888900Sjhb swi_sched(taskqueue_ih, 0); 21961033Sdfr} 22061033Sdfr 22161033Sdfrstatic void 22267551Sjhbtaskqueue_swi_run(void *dummy) 22361033Sdfr{ 22461033Sdfr taskqueue_run(taskqueue_swi); 22561033Sdfr} 22661033Sdfr 227111528Sscottlstatic void 228111528Sscottltaskqueue_swi_giant_enqueue(void *context) 229111528Sscottl{ 230111528Sscottl swi_sched(taskqueue_giant_ih, 0); 231111528Sscottl} 232111528Sscottl 233111528Sscottlstatic void 234111528Sscottltaskqueue_swi_giant_run(void *dummy) 235111528Sscottl{ 236111528Sscottl taskqueue_run(taskqueue_swi_giant); 237111528Sscottl} 238111528Sscottl 239119708Skenstatic void 240119708Skentaskqueue_kthread(void *arg) 241119708Sken{ 242119708Sken struct mtx kthread_mutex; 243119708Sken 244119708Sken bzero(&kthread_mutex, sizeof(kthread_mutex)); 245119708Sken 246119708Sken mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF); 247119708Sken 248119708Sken mtx_lock(&kthread_mutex); 249119708Sken 250119708Sken for (;;) { 251119708Sken mtx_unlock(&kthread_mutex); 252119708Sken taskqueue_run(taskqueue_thread); 253119708Sken mtx_lock(&kthread_mutex); 254119708Sken msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0); 255119708Sken } 256119708Sken} 257119708Sken 258119708Skenstatic void 259119708Skentaskqueue_thread_enqueue(void *context) 260119708Sken{ 261119708Sken wakeup(&taskqueue_thread); 262119708Sken} 263119708Sken 26461033SdfrTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 265111528Sscottl swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 266111528Sscottl INTR_MPSAFE, &taskqueue_ih)); 267111528Sscottl 268111528SscottlTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 269111528Sscottl swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 270111528Sscottl NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 271119708Sken 272119708SkenTASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0, 273119708Sken kthread_create(taskqueue_kthread, NULL, 274119708Sken &taskqueue_thread_proc, RFNOWAIT, 0, "taskqueue")); 275119789Ssam 276119789Ssamint 277119789Ssamtaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 278119789Ssam{ 279119789Ssam struct task *ins; 280119789Ssam struct task *prev; 281119789Ssam 282119789Ssam mtx_lock_spin(&queue->tq_mutex); 283119789Ssam 284119789Ssam /* 285119789Ssam * Don't allow new tasks on a queue which is being freed. 286119789Ssam */ 287119789Ssam if (queue->tq_draining) { 288119789Ssam mtx_unlock_spin(&queue->tq_mutex); 289119789Ssam return EPIPE; 290119789Ssam } 291119789Ssam 292119789Ssam /* 293119789Ssam * Count multiple enqueues. 294119789Ssam */ 295119789Ssam if (task->ta_pending) { 296119789Ssam task->ta_pending++; 297119789Ssam mtx_unlock_spin(&queue->tq_mutex); 298119789Ssam return 0; 299119789Ssam } 300119789Ssam 301119789Ssam /* 302119789Ssam * Optimise the case when all tasks have the same priority. 303119789Ssam */ 304119789Ssam prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 305119789Ssam if (!prev || prev->ta_priority >= task->ta_priority) { 306119789Ssam STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 307119789Ssam } else { 308119789Ssam prev = 0; 309119789Ssam for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 310119789Ssam prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 311119789Ssam if (ins->ta_priority < task->ta_priority) 312119789Ssam break; 313119789Ssam 314119789Ssam if (prev) 315119789Ssam STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 316119789Ssam else 317119789Ssam STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 318119789Ssam } 319119789Ssam 320119789Ssam task->ta_pending = 1; 321119789Ssam if (queue->tq_enqueue) 322119789Ssam queue->tq_enqueue(queue->tq_context); 323119789Ssam 324119789Ssam mtx_unlock_spin(&queue->tq_mutex); 325119789Ssam 326119789Ssam return 0; 327119789Ssam} 328119789Ssam 329119789Ssamstatic void 330119789Ssamtaskqueue_run_fast(struct taskqueue *queue) 331119789Ssam{ 332119789Ssam struct task *task; 333119789Ssam int pending; 334119789Ssam 335119789Ssam mtx_lock_spin(&queue->tq_mutex); 336119789Ssam while (STAILQ_FIRST(&queue->tq_queue)) { 337119789Ssam /* 338119789Ssam * Carefully remove the first task from the queue and 339119789Ssam * zero its pending count. 340119789Ssam */ 341119789Ssam task = STAILQ_FIRST(&queue->tq_queue); 342119789Ssam STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 343119789Ssam pending = task->ta_pending; 344119789Ssam task->ta_pending = 0; 345119789Ssam mtx_unlock_spin(&queue->tq_mutex); 346119789Ssam 347119789Ssam task->ta_func(task->ta_context, pending); 348119789Ssam 349119789Ssam mtx_lock_spin(&queue->tq_mutex); 350119789Ssam } 351119789Ssam mtx_unlock_spin(&queue->tq_mutex); 352119789Ssam} 353119789Ssam 354119789Ssamstruct taskqueue *taskqueue_fast; 355119789Ssamstatic void *taskqueue_fast_ih; 356119789Ssam 357119789Ssamstatic void 358119789Ssamtaskqueue_fast_schedule(void *context) 359119789Ssam{ 360119789Ssam swi_sched(taskqueue_fast_ih, 0); 361119789Ssam} 362119789Ssam 363119789Ssamstatic void 364119789Ssamtaskqueue_fast_run(void *dummy) 365119789Ssam{ 366119789Ssam taskqueue_run_fast(taskqueue_fast); 367119789Ssam} 368119789Ssam 369119789Ssamstatic void 370119789Ssamtaskqueue_define_fast(void *arg) 371119789Ssam{ 372119789Ssam taskqueue_fast = malloc(sizeof(struct taskqueue), 373119789Ssam M_TASKQUEUE, M_NOWAIT | M_ZERO); 374119789Ssam if (!taskqueue_fast) { 375119789Ssam printf("%s: Unable to allocate fast task queue!\n", __func__); 376119789Ssam return; 377119789Ssam } 378119789Ssam 379119789Ssam STAILQ_INIT(&taskqueue_fast->tq_queue); 380119789Ssam taskqueue_fast->tq_name = "fast"; 381119789Ssam taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 382119789Ssam mtx_init(&taskqueue_fast->tq_mutex, "taskqueue", NULL, MTX_SPIN); 383119789Ssam 384119789Ssam mtx_lock(&taskqueue_queues_mutex); 385119789Ssam STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 386119789Ssam mtx_unlock(&taskqueue_queues_mutex); 387119789Ssam 388119789Ssam swi_add(NULL, "Fast task queue", taskqueue_fast_run, 389119789Ssam NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 390119789Ssam} 391119789SsamSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 392119789Ssam taskqueue_define_fast, NULL); 393