subr_taskqueue.c revision 126027
1219019Sgabor/*- 2219019Sgabor * Copyright (c) 2000 Doug Rabson 3219019Sgabor * All rights reserved. 4219019Sgabor * 5219019Sgabor * Redistribution and use in source and binary forms, with or without 6219019Sgabor * modification, are permitted provided that the following conditions 7219019Sgabor * are met: 8219019Sgabor * 1. Redistributions of source code must retain the above copyright 9219019Sgabor * notice, this list of conditions and the following disclaimer. 10219019Sgabor * 2. Redistributions in binary form must reproduce the above copyright 11219019Sgabor * notice, this list of conditions and the following disclaimer in the 12219019Sgabor * documentation and/or other materials provided with the distribution. 13219019Sgabor * 14219019Sgabor * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15219019Sgabor * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16219019Sgabor * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17219019Sgabor * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18219019Sgabor * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19219019Sgabor * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20219019Sgabor * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21219019Sgabor * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22219019Sgabor * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23219019Sgabor * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24219019Sgabor * SUCH DAMAGE. 25219019Sgabor */ 26219019Sgabor 27219019Sgabor#include <sys/cdefs.h> 28219019Sgabor__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 126027 2004-02-19 22:03:52Z jhb $"); 29219019Sgabor 30219019Sgabor#include <sys/param.h> 31219019Sgabor#include <sys/systm.h> 32219019Sgabor#include <sys/bus.h> 33219019Sgabor#include <sys/interrupt.h> 34219019Sgabor#include <sys/kernel.h> 35219019Sgabor#include <sys/kthread.h> 36219019Sgabor#include <sys/lock.h> 37219019Sgabor#include <sys/malloc.h> 38219019Sgabor#include <sys/mutex.h> 39219019Sgabor#include <sys/taskqueue.h> 40219019Sgabor#include <sys/unistd.h> 41219019Sgabor 42219019Sgaborstatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43219019Sgaborstatic void *taskqueue_giant_ih; 44219019Sgaborstatic void *taskqueue_ih; 45219019Sgaborstatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 46219019Sgaborstatic struct mtx taskqueue_queues_mutex; 47219019Sgaborstatic struct proc *taskqueue_thread_proc; 48219019Sgabor 49219019Sgaborstruct taskqueue { 50219019Sgabor STAILQ_ENTRY(taskqueue) tq_link; 51219019Sgabor STAILQ_HEAD(, task) tq_queue; 52219019Sgabor const char *tq_name; 53219019Sgabor taskqueue_enqueue_fn tq_enqueue; 54219019Sgabor void *tq_context; 55219019Sgabor int tq_draining; 56219019Sgabor struct mtx tq_mutex; 57219019Sgabor}; 58219019Sgabor 59219019Sgaborstatic void init_taskqueue_list(void *data); 60219019Sgabor 61219019Sgaborstatic void 62219019Sgaborinit_taskqueue_list(void *data __unused) 63219019Sgabor{ 64219019Sgabor 65219019Sgabor mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 66219019Sgabor STAILQ_INIT(&taskqueue_queues); 67219019Sgabor} 68219019SgaborSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 69219019Sgabor NULL); 70219019Sgabor 71219019Sgaborstruct taskqueue * 72219019Sgabortaskqueue_create(const char *name, int mflags, 73219019Sgabor taskqueue_enqueue_fn enqueue, void *context) 74219019Sgabor{ 75219019Sgabor struct taskqueue *queue; 76219019Sgabor 77219019Sgabor queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 78219019Sgabor if (!queue) 79219019Sgabor return 0; 80219019Sgabor 81219019Sgabor STAILQ_INIT(&queue->tq_queue); 82219019Sgabor queue->tq_name = name; 83219019Sgabor queue->tq_enqueue = enqueue; 84219019Sgabor queue->tq_context = context; 85219019Sgabor queue->tq_draining = 0; 86219019Sgabor mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 87219019Sgabor 88219019Sgabor mtx_lock(&taskqueue_queues_mutex); 89219019Sgabor STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 90219019Sgabor mtx_unlock(&taskqueue_queues_mutex); 91219019Sgabor 92219019Sgabor return queue; 93219019Sgabor} 94219019Sgabor 95219019Sgaborvoid 96219019Sgabortaskqueue_free(struct taskqueue *queue) 97219019Sgabor{ 98219019Sgabor 99219019Sgabor mtx_lock(&queue->tq_mutex); 100219019Sgabor KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue")); 101219019Sgabor queue->tq_draining = 1; 102219019Sgabor mtx_unlock(&queue->tq_mutex); 103219019Sgabor 104219019Sgabor taskqueue_run(queue); 105219019Sgabor 106219019Sgabor mtx_lock(&taskqueue_queues_mutex); 107219019Sgabor STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 108219019Sgabor mtx_unlock(&taskqueue_queues_mutex); 109219019Sgabor 110219019Sgabor mtx_destroy(&queue->tq_mutex); 111219019Sgabor free(queue, M_TASKQUEUE); 112219019Sgabor} 113219019Sgabor 114219019Sgabor/* 115219019Sgabor * Returns with the taskqueue locked. 116219019Sgabor */ 117219019Sgaborstruct taskqueue * 118219019Sgabortaskqueue_find(const char *name) 119219019Sgabor{ 120219019Sgabor struct taskqueue *queue; 121219019Sgabor 122219019Sgabor mtx_lock(&taskqueue_queues_mutex); 123219019Sgabor STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 124219019Sgabor mtx_lock(&queue->tq_mutex); 125219019Sgabor if (strcmp(queue->tq_name, name) == 0) { 126219019Sgabor mtx_unlock(&taskqueue_queues_mutex); 127219019Sgabor return queue; 128219019Sgabor } 129219019Sgabor mtx_unlock(&queue->tq_mutex); 130219019Sgabor } 131219019Sgabor mtx_unlock(&taskqueue_queues_mutex); 132219019Sgabor return NULL; 133219019Sgabor} 134219019Sgabor 135219019Sgaborint 136219019Sgabortaskqueue_enqueue(struct taskqueue *queue, struct task *task) 137219019Sgabor{ 138219019Sgabor struct task *ins; 139219019Sgabor struct task *prev; 140219019Sgabor 141219019Sgabor mtx_lock(&queue->tq_mutex); 142219019Sgabor 143219019Sgabor /* 144219019Sgabor * Don't allow new tasks on a queue which is being freed. 145219019Sgabor */ 146219019Sgabor if (queue->tq_draining) { 147219019Sgabor mtx_unlock(&queue->tq_mutex); 148219019Sgabor return EPIPE; 149219019Sgabor } 150219019Sgabor 151219019Sgabor /* 152219019Sgabor * Count multiple enqueues. 153219019Sgabor */ 154219019Sgabor if (task->ta_pending) { 155219019Sgabor task->ta_pending++; 156219019Sgabor mtx_unlock(&queue->tq_mutex); 157219019Sgabor return 0; 158219019Sgabor } 159219019Sgabor 160219019Sgabor /* 161219019Sgabor * Optimise the case when all tasks have the same priority. 162219019Sgabor */ 163219019Sgabor prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 164219019Sgabor if (!prev || prev->ta_priority >= task->ta_priority) { 165219019Sgabor STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 166219019Sgabor } else { 167219019Sgabor prev = 0; 168219019Sgabor for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 169219019Sgabor prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 170219019Sgabor if (ins->ta_priority < task->ta_priority) 171219019Sgabor break; 172219019Sgabor 173219019Sgabor if (prev) 174219019Sgabor STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 175219019Sgabor else 176219019Sgabor STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 177219019Sgabor } 178219019Sgabor 179219019Sgabor task->ta_pending = 1; 180219019Sgabor if (queue->tq_enqueue) 181219019Sgabor queue->tq_enqueue(queue->tq_context); 182219019Sgabor 183219019Sgabor mtx_unlock(&queue->tq_mutex); 184219019Sgabor 185219019Sgabor return 0; 186219019Sgabor} 187219019Sgabor 188219019Sgaborvoid 189219019Sgabortaskqueue_run(struct taskqueue *queue) 190219019Sgabor{ 191219019Sgabor struct task *task; 192219019Sgabor int pending; 193219019Sgabor 194219019Sgabor mtx_lock(&queue->tq_mutex); 195219019Sgabor while (STAILQ_FIRST(&queue->tq_queue)) { 196219019Sgabor /* 197219019Sgabor * Carefully remove the first task from the queue and 198219019Sgabor * zero its pending count. 199219019Sgabor */ 200219019Sgabor task = STAILQ_FIRST(&queue->tq_queue); 201219019Sgabor STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 202219019Sgabor pending = task->ta_pending; 203219019Sgabor task->ta_pending = 0; 204219019Sgabor mtx_unlock(&queue->tq_mutex); 205219019Sgabor 206219019Sgabor task->ta_func(task->ta_context, pending); 207219019Sgabor 208219019Sgabor mtx_lock(&queue->tq_mutex); 209219019Sgabor } 210219019Sgabor mtx_unlock(&queue->tq_mutex); 211219019Sgabor} 212219019Sgabor 213219019Sgaborstatic void 214219019Sgabortaskqueue_swi_enqueue(void *context) 215219019Sgabor{ 216219019Sgabor swi_sched(taskqueue_ih, 0); 217219019Sgabor} 218219019Sgabor 219219019Sgaborstatic void 220219019Sgabortaskqueue_swi_run(void *dummy) 221219019Sgabor{ 222219019Sgabor taskqueue_run(taskqueue_swi); 223219019Sgabor} 224219019Sgabor 225219019Sgaborstatic void 226219019Sgabortaskqueue_swi_giant_enqueue(void *context) 227219019Sgabor{ 228219019Sgabor swi_sched(taskqueue_giant_ih, 0); 229219019Sgabor} 230219019Sgabor 231219019Sgaborstatic void 232219019Sgabortaskqueue_swi_giant_run(void *dummy) 233219019Sgabor{ 234219019Sgabor taskqueue_run(taskqueue_swi_giant); 235219019Sgabor} 236219019Sgabor 237219019Sgaborstatic void 238219019Sgabortaskqueue_thread_loop(void *arg) 239219019Sgabor{ 240219019Sgabor for (;;) { 241219019Sgabor mtx_lock(&taskqueue_thread->tq_mutex); 242219019Sgabor while (STAILQ_EMPTY(&taskqueue_thread->tq_queue)) 243219019Sgabor msleep(taskqueue_thread, &taskqueue_thread->tq_mutex, 244219019Sgabor PWAIT, "-", 0); 245219019Sgabor mtx_unlock(&taskqueue_thread->tq_mutex); 246219019Sgabor taskqueue_run(taskqueue_thread); 247219019Sgabor } 248219019Sgabor} 249219019Sgabor 250219019Sgaborstatic void 251219019Sgabortaskqueue_thread_enqueue(void *context) 252219019Sgabor{ 253219019Sgabor mtx_assert(&taskqueue_thread->tq_mutex, MA_OWNED); 254219019Sgabor wakeup(taskqueue_thread); 255219019Sgabor} 256219019Sgabor 257219019SgaborTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 258219019Sgabor swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 259219019Sgabor INTR_MPSAFE, &taskqueue_ih)); 260219019Sgabor 261219019SgaborTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 262219019Sgabor swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 263219019Sgabor NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 264219019Sgabor 265219019SgaborTASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0, 266219019Sgabor kthread_create(taskqueue_thread_loop, NULL, 267219019Sgabor &taskqueue_thread_proc, 0, 0, "taskqueue")); 268219019Sgabor 269219019Sgaborint 270219019Sgabortaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 271219019Sgabor{ 272219019Sgabor struct task *ins; 273219019Sgabor struct task *prev; 274219019Sgabor 275219019Sgabor mtx_lock_spin(&queue->tq_mutex); 276219019Sgabor 277219019Sgabor /* 278219019Sgabor * Don't allow new tasks on a queue which is being freed. 279219019Sgabor */ 280219019Sgabor if (queue->tq_draining) { 281219019Sgabor mtx_unlock_spin(&queue->tq_mutex); 282219019Sgabor return EPIPE; 283219019Sgabor } 284219019Sgabor 285219019Sgabor /* 286219019Sgabor * Count multiple enqueues. 287219019Sgabor */ 288219019Sgabor if (task->ta_pending) { 289219019Sgabor task->ta_pending++; 290219019Sgabor mtx_unlock_spin(&queue->tq_mutex); 291219019Sgabor return 0; 292219019Sgabor } 293219019Sgabor 294219019Sgabor /* 295219019Sgabor * Optimise the case when all tasks have the same priority. 296219019Sgabor */ 297219019Sgabor prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 298219019Sgabor if (!prev || prev->ta_priority >= task->ta_priority) { 299219019Sgabor STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 300219019Sgabor } else { 301219019Sgabor prev = 0; 302219019Sgabor for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 303219019Sgabor prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 304219019Sgabor if (ins->ta_priority < task->ta_priority) 305219019Sgabor break; 306219019Sgabor 307219019Sgabor if (prev) 308219019Sgabor STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 309219019Sgabor else 310219019Sgabor STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 311219019Sgabor } 312219019Sgabor 313219019Sgabor task->ta_pending = 1; 314219019Sgabor if (queue->tq_enqueue) 315219019Sgabor queue->tq_enqueue(queue->tq_context); 316219019Sgabor 317219019Sgabor mtx_unlock_spin(&queue->tq_mutex); 318219019Sgabor 319219019Sgabor return 0; 320219019Sgabor} 321219019Sgabor 322219019Sgaborstatic void 323219019Sgabortaskqueue_run_fast(struct taskqueue *queue) 324219019Sgabor{ 325219019Sgabor struct task *task; 326219019Sgabor int pending; 327219019Sgabor 328219019Sgabor mtx_lock_spin(&queue->tq_mutex); 329219019Sgabor while (STAILQ_FIRST(&queue->tq_queue)) { 330219019Sgabor /* 331219019Sgabor * Carefully remove the first task from the queue and 332219019Sgabor * zero its pending count. 333219019Sgabor */ 334219019Sgabor task = STAILQ_FIRST(&queue->tq_queue); 335219019Sgabor STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 336219019Sgabor pending = task->ta_pending; 337219019Sgabor task->ta_pending = 0; 338219019Sgabor mtx_unlock_spin(&queue->tq_mutex); 339219019Sgabor 340219019Sgabor task->ta_func(task->ta_context, pending); 341219019Sgabor 342219019Sgabor mtx_lock_spin(&queue->tq_mutex); 343219019Sgabor } 344219019Sgabor mtx_unlock_spin(&queue->tq_mutex); 345219019Sgabor} 346219019Sgabor 347219019Sgaborstruct taskqueue *taskqueue_fast; 348219019Sgaborstatic void *taskqueue_fast_ih; 349219019Sgabor 350219019Sgaborstatic void 351219019Sgabortaskqueue_fast_schedule(void *context) 352219019Sgabor{ 353219019Sgabor swi_sched(taskqueue_fast_ih, 0); 354219019Sgabor} 355219019Sgabor 356219019Sgaborstatic void 357219019Sgabortaskqueue_fast_run(void *dummy) 358219019Sgabor{ 359219019Sgabor taskqueue_run_fast(taskqueue_fast); 360219019Sgabor} 361219019Sgabor 362219019Sgaborstatic void 363219019Sgabortaskqueue_define_fast(void *arg) 364219019Sgabor{ 365219019Sgabor taskqueue_fast = malloc(sizeof(struct taskqueue), 366219019Sgabor M_TASKQUEUE, M_NOWAIT | M_ZERO); 367219019Sgabor if (!taskqueue_fast) { 368219019Sgabor printf("%s: Unable to allocate fast task queue!\n", __func__); 369219019Sgabor return; 370219019Sgabor } 371219019Sgabor 372219019Sgabor STAILQ_INIT(&taskqueue_fast->tq_queue); 373219019Sgabor taskqueue_fast->tq_name = "fast"; 374219019Sgabor taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 375219019Sgabor mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 376219019Sgabor 377219019Sgabor mtx_lock(&taskqueue_queues_mutex); 378219019Sgabor STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 379219019Sgabor mtx_unlock(&taskqueue_queues_mutex); 380219019Sgabor 381219019Sgabor swi_add(NULL, "Fast task queue", taskqueue_fast_run, 382219019Sgabor NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 383219019Sgabor} 384219019SgaborSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 385219019Sgabor taskqueue_define_fast, NULL); 386