subr_taskqueue.c revision 145729
11553Srgrimes/*- 21553Srgrimes * Copyright (c) 2000 Doug Rabson 31553Srgrimes * All rights reserved. 41553Srgrimes * 51553Srgrimes * Redistribution and use in source and binary forms, with or without 61553Srgrimes * modification, are permitted provided that the following conditions 71553Srgrimes * are met: 81553Srgrimes * 1. Redistributions of source code must retain the above copyright 91553Srgrimes * notice, this list of conditions and the following disclaimer. 101553Srgrimes * 2. Redistributions in binary form must reproduce the above copyright 111553Srgrimes * notice, this list of conditions and the following disclaimer in the 121553Srgrimes * documentation and/or other materials provided with the distribution. 131553Srgrimes * 141553Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 151553Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 161553Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 171553Srgrimes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 181553Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 191553Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 201553Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 211553Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 221553Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 231553Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 241553Srgrimes * SUCH DAMAGE. 251553Srgrimes */ 261553Srgrimes 271553Srgrimes#include <sys/cdefs.h> 281553Srgrimes__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 145729 2005-05-01 00:38:11Z sam $"); 291553Srgrimes 301553Srgrimes#include <sys/param.h> 3129451Scharnier#include <sys/systm.h> 321553Srgrimes#include <sys/bus.h> 3329451Scharnier#include <sys/interrupt.h> 3429451Scharnier#include <sys/kernel.h> 3550479Speter#include <sys/kthread.h> 361553Srgrimes#include <sys/lock.h> 371553Srgrimes#include <sys/malloc.h> 381553Srgrimes#include <sys/mutex.h> 391553Srgrimes#include <sys/proc.h> 401553Srgrimes#include <sys/taskqueue.h> 411553Srgrimes#include <sys/unistd.h> 421553Srgrimes 431553Srgrimesstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 4429451Scharnierstatic void *taskqueue_giant_ih; 4529451Scharnierstatic void *taskqueue_ih; 461553Srgrimesstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 4720458Sjoergstatic struct mtx taskqueue_queues_mutex; 4869004Simp 4916073Sphkstruct taskqueue { 501553Srgrimes STAILQ_ENTRY(taskqueue) tq_link; 5130638Speter STAILQ_HEAD(, task) tq_queue; 521553Srgrimes const char *tq_name; 531553Srgrimes taskqueue_enqueue_fn tq_enqueue; 5461640Speter void *tq_context; 551553Srgrimes struct task *tq_running; 561553Srgrimes struct mtx tq_mutex; 571553Srgrimes struct proc **tq_pproc; 581553Srgrimes}; 591553Srgrimes 601553Srgrimesstatic void init_taskqueue_list(void *data); 6161640Speter 621553Srgrimesstatic void 631553Srgrimesinit_taskqueue_list(void *data __unused) 641553Srgrimes{ 651553Srgrimes 661553Srgrimes mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 671553Srgrimes STAILQ_INIT(&taskqueue_queues); 6861640Speter} 6961640SpeterSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 7061640Speter NULL); 7169135Speter 7261640Speterstruct taskqueue * 7361640Spetertaskqueue_create(const char *name, int mflags, 7472684Speter taskqueue_enqueue_fn enqueue, void *context, 7561640Speter struct proc **pp) 7629451Scharnier{ 771553Srgrimes struct taskqueue *queue; 781553Srgrimes 791553Srgrimes queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 8045744Speter if (!queue) 8161640Speter return 0; 821553Srgrimes 8361640Speter STAILQ_INIT(&queue->tq_queue); 841553Srgrimes queue->tq_name = name; 85110895Sru queue->tq_enqueue = enqueue; 861553Srgrimes queue->tq_context = context; 871553Srgrimes queue->tq_pproc = pp; 881553Srgrimes mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 891553Srgrimes 901553Srgrimes mtx_lock(&taskqueue_queues_mutex); 911553Srgrimes STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 921553Srgrimes mtx_unlock(&taskqueue_queues_mutex); 931553Srgrimes 941553Srgrimes return queue; 9545744Speter} 9661640Speter 971553Srgrimes/* 9861640Speter * Signal a taskqueue thread to terminate. 991553Srgrimes */ 100159362Sdelphijstatic void 101205880Srutaskqueue_terminate(struct proc **pp, struct taskqueue *tq) 102205880Sru{ 103110895Sru struct proc *p; 1041553Srgrimes 1051553Srgrimes p = *pp; 1061553Srgrimes *pp = NULL; 1071553Srgrimes if (p) { 108207260Simp wakeup_one(tq); 1091553Srgrimes PROC_LOCK(p); /* NB: insure we don't miss wakeup */ 110207260Simp mtx_unlock(&tq->tq_mutex); /* let taskqueue thread run */ 111207260Simp msleep(p, &p->p_mtx, PWAIT, "taskqueue_destroy", 0); 1121553Srgrimes PROC_UNLOCK(p); 113207260Simp mtx_lock(&tq->tq_mutex); 1141553Srgrimes } 1151553Srgrimes} 11655614Speter 1171553Srgrimesvoid 11855614Spetertaskqueue_free(struct taskqueue *queue) 11955614Speter{ 12055614Speter 12155614Speter mtx_lock(&taskqueue_queues_mutex); 12229451Scharnier STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 12329451Scharnier mtx_unlock(&taskqueue_queues_mutex); 124207260Simp 125207260Simp mtx_lock(&queue->tq_mutex); 12612772Speter taskqueue_run(queue); 127207260Simp taskqueue_terminate(queue->tq_pproc, queue); 128207260Simp mtx_destroy(&queue->tq_mutex); 129207260Simp free(queue, M_TASKQUEUE); 130207260Simp} 131207260Simp 132207260Simp/* 133207260Simp * Returns with the taskqueue locked. 134207260Simp */ 135207260Simpstruct taskqueue * 136207260Simptaskqueue_find(const char *name) 137207260Simp{ 138207260Simp struct taskqueue *queue; 13999923Sbde 14099923Sbde mtx_lock(&taskqueue_queues_mutex); 14199923Sbde STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 142113951Sdes if (strcmp(queue->tq_name, name) == 0) { 143218544Simp mtx_lock(&queue->tq_mutex); 144218544Simp mtx_unlock(&taskqueue_queues_mutex); 145185186Sthompsa return queue; 146185186Sthompsa } 147185186Sthompsa } 148185186Sthompsa mtx_unlock(&taskqueue_queues_mutex); 149185186Sthompsa return NULL; 150185186Sthompsa} 1511553Srgrimes 1521553Srgrimesint 15399923Sbdetaskqueue_enqueue(struct taskqueue *queue, struct task *task) 15420395Sbde{ 15552653Smarcel struct task *ins; 15652653Smarcel struct task *prev; 1571553Srgrimes 1581553Srgrimes mtx_lock(&queue->tq_mutex); 1591553Srgrimes 1601553Srgrimes /* 1611553Srgrimes * Count multiple enqueues. 1625325Sgibbs */ 1635325Sgibbs if (task->ta_pending) { 1645325Sgibbs task->ta_pending++; 1651553Srgrimes mtx_unlock(&queue->tq_mutex); 16669135Speter return 0; 16769135Speter } 1681553Srgrimes 1691553Srgrimes /* 1706803Sgibbs * Optimise the case when all tasks have the same priority. 1716803Sgibbs */ 172207260Simp prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 173207260Simp if (!prev || prev->ta_priority >= task->ta_priority) { 174207260Simp STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 1751553Srgrimes } else { 1761553Srgrimes prev = 0; 1771553Srgrimes for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 1781553Srgrimes prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 1791553Srgrimes if (ins->ta_priority < task->ta_priority) 1801553Srgrimes break; 18113400Speter 182153888Sru if (prev) 18334619Seivind STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 184153888Sru else 185153888Sru STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 186153888Sru } 187153888Sru 188153888Sru task->ta_pending = 1; 189153888Sru queue->tq_enqueue(queue->tq_context); 190163640Simp 191153888Sru mtx_unlock(&queue->tq_mutex); 192153888Sru 193163638Simp return 0; 194153888Sru} 19561640Speter 19661640Spetervoid 19761640Spetertaskqueue_run(struct taskqueue *queue) 19883594Speter{ 19983594Speter struct task *task; 20083594Speter int owned, pending; 20165091Speter 20261640Speter owned = mtx_owned(&queue->tq_mutex); 203163638Simp if (!owned) 204163638Simp mtx_lock(&queue->tq_mutex); 205163638Simp while (STAILQ_FIRST(&queue->tq_queue)) { 206163638Simp /* 20761640Speter * Carefully remove the first task from the queue and 20861640Speter * zero its pending count. 20961640Speter */ 21061640Speter task = STAILQ_FIRST(&queue->tq_queue); 21161640Speter STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 21261640Speter pending = task->ta_pending; 21361640Speter task->ta_pending = 0; 21461640Speter queue->tq_running = task; 21561640Speter mtx_unlock(&queue->tq_mutex); 21661640Speter 21761640Speter task->ta_func(task->ta_context, pending); 21861640Speter 21961640Speter mtx_lock(&queue->tq_mutex); 22061640Speter queue->tq_running = NULL; 22161640Speter wakeup(task); 22261640Speter } 22361640Speter 22461640Speter /* 22561640Speter * For compatibility, unlock on return if the queue was not locked 22661640Speter * on entry, although this opens a race window. 22761640Speter */ 22861640Speter if (!owned) 22961640Speter mtx_unlock(&queue->tq_mutex); 23061640Speter} 23161640Speter 23261640Spetervoid 23361652Spetertaskqueue_drain(struct taskqueue *queue, struct task *task) 23461640Speter{ 23561640Speter WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain"); 236163640Simp 23761640Speter mtx_lock(&queue->tq_mutex); 23861640Speter while (task->ta_pending != 0 || task == queue->tq_running) 23961640Speter msleep(task, &queue->tq_mutex, PWAIT, "-", 0); 24061640Speter mtx_unlock(&queue->tq_mutex); 241153888Sru} 24282393Speter 243153888Srustatic void 244153888Srutaskqueue_swi_enqueue(void *context) 245153888Sru{ 246153888Sru swi_sched(taskqueue_ih, 0); 247153888Sru} 248153888Sru 249153888Srustatic void 250153888Srutaskqueue_swi_run(void *dummy) 251153888Sru{ 252153888Sru taskqueue_run(taskqueue_swi); 25382393Speter} 25482393Speter 25582393Speterstatic void 25682393Spetertaskqueue_swi_giant_enqueue(void *context) 25782393Speter{ 25882393Speter swi_sched(taskqueue_giant_ih, 0); 25982393Speter} 26082393Speter 26182393Speterstatic void 26282393Spetertaskqueue_swi_giant_run(void *dummy) 26383594Speter{ 26483594Speter taskqueue_run(taskqueue_swi_giant); 26583594Speter} 26682393Speter 26782393Spetervoid 26882393Spetertaskqueue_thread_loop(void *arg) 26982393Speter{ 27082393Speter struct taskqueue **tqp, *tq; 27182393Speter 27282393Speter tqp = arg; 27382393Speter tq = *tqp; 27482393Speter mtx_lock(&tq->tq_mutex); 27582393Speter do { 27682393Speter taskqueue_run(tq); 27782393Speter msleep(tq, &tq->tq_mutex, PWAIT, "-", 0); 27882393Speter } while (*tq->tq_pproc != NULL); 27982393Speter 28082393Speter /* rendezvous with thread that asked us to terminate */ 28182393Speter wakeup_one(tq); 28282393Speter mtx_unlock(&tq->tq_mutex); 28382393Speter kthread_exit(0); 28482393Speter} 28582393Speter 28682393Spetervoid 28782393Spetertaskqueue_thread_enqueue(void *context) 28882393Speter{ 28982393Speter struct taskqueue **tqp, *tq; 29082393Speter 29182393Speter tqp = context; 29282393Speter tq = *tqp; 29382393Speter 29482393Speter mtx_assert(&tq->tq_mutex, MA_OWNED); 29582393Speter wakeup_one(tq); 29682393Speter} 29782393Speter 29882393SpeterTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 29982393Speter swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 30082393Speter INTR_MPSAFE, &taskqueue_ih)); 30182393Speter 30282393SpeterTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 30382393Speter swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 3041553Srgrimes NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 3051553Srgrimes 306129119ScognetTASKQUEUE_DEFINE_THREAD(thread); 307129073Scognet 3081553Srgrimesint 309162936Srutaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 3101553Srgrimes{ 311152811Sru struct task *ins; 31261640Speter struct task *prev; 31361640Speter 314152811Sru mtx_lock_spin(&queue->tq_mutex); 315153889Sru 316134542Speter /* 3171553Srgrimes * Count multiple enqueues. 3181553Srgrimes */ 31929451Scharnier if (task->ta_pending) { 32029451Scharnier task->ta_pending++; 3211553Srgrimes mtx_unlock_spin(&queue->tq_mutex); 3221553Srgrimes return 0; 323162936Sru } 324134542Speter 325152862Sru /* 3268857Srgrimes * Optimise the case when all tasks have the same priority. 3276803Sgibbs */ 32854490Speter prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 3291553Srgrimes if (!prev || prev->ta_priority >= task->ta_priority) { 3301553Srgrimes STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 3311553Srgrimes } else { 3321553Srgrimes prev = 0; 3331553Srgrimes for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 334129073Scognet prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 3351553Srgrimes if (ins->ta_priority < task->ta_priority) 3361553Srgrimes break; 33752098Speter 3381566Srgrimes if (prev) 33952098Speter STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 34052098Speter else 3411566Srgrimes STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 3421566Srgrimes } 343162936Sru 344162936Sru task->ta_pending = 1; 345162936Sru queue->tq_enqueue(queue->tq_context); 346210144Simp 347210144Simp mtx_unlock_spin(&queue->tq_mutex); 348162936Sru 349162936Sru return 0; 350162936Sru} 351162936Sru 352162936Srustatic void 353162936Srutaskqueue_run_fast(struct taskqueue *queue) 354162936Sru{ 355162936Sru struct task *task; 3561553Srgrimes int pending; 3571553Srgrimes 3581553Srgrimes mtx_lock_spin(&queue->tq_mutex); 359210144Simp while (STAILQ_FIRST(&queue->tq_queue)) { 3601553Srgrimes /* 3611553Srgrimes * Carefully remove the first task from the queue and 362152811Sru * zero its pending count. 363152862Sru */ 364152862Sru task = STAILQ_FIRST(&queue->tq_queue); 3651553Srgrimes STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 36673199Speter pending = task->ta_pending; 3674571Sgibbs task->ta_pending = 0; 3686803Sgibbs mtx_unlock_spin(&queue->tq_mutex); 36972684Speter 37030796Sjoerg task->ta_func(task->ta_context, pending); 3714571Sgibbs 3724571Sgibbs mtx_lock_spin(&queue->tq_mutex); 3735325Sgibbs } 37491002Speter mtx_unlock_spin(&queue->tq_mutex); 3751553Srgrimes} 37661523Speter 3771553Srgrimesstruct taskqueue *taskqueue_fast; 37830796Sjoergstatic void *taskqueue_fast_ih; 37930796Sjoerg 38030796Sjoergstatic void 38130796Sjoergtaskqueue_fast_schedule(void *context) 38230796Sjoerg{ 38361523Speter swi_sched(taskqueue_fast_ih, 0); 38430796Sjoerg} 38561523Speter 386210144Simpstatic void 387214654Sobrientaskqueue_fast_run(void *dummy) 388214654Sobrien{ 3891553Srgrimes taskqueue_run_fast(taskqueue_fast); 3901553Srgrimes} 3911553Srgrimes 3921553Srgrimesstatic void 393113397Sphktaskqueue_define_fast(void *arg) 394152862Sru{ 395152862Sru 396152862Sru taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE, 397152862Sru M_NOWAIT | M_ZERO); 398113397Sphk if (!taskqueue_fast) { 399152862Sru printf("%s: Unable to allocate fast task queue!\n", __func__); 400152862Sru return; 401210144Simp } 402152862Sru 403152862Sru STAILQ_INIT(&taskqueue_fast->tq_queue); 404152862Sru taskqueue_fast->tq_name = "fast"; 405152862Sru taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 406152862Sru mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 407152862Sru 408152862Sru mtx_lock(&taskqueue_queues_mutex); 409152862Sru STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 4104571Sgibbs mtx_unlock(&taskqueue_queues_mutex); 4114571Sgibbs 4124571Sgibbs swi_add(NULL, "Fast task queue", taskqueue_fast_run, 4134571Sgibbs NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 4144571Sgibbs} 41573199SpeterSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 416210144Simp taskqueue_define_fast, NULL); 417210144Simp