subr_taskqueue.c revision 151656
1254721Semaste/*- 2254721Semaste * Copyright (c) 2000 Doug Rabson 3254721Semaste * All rights reserved. 4254721Semaste * 5254721Semaste * Redistribution and use in source and binary forms, with or without 6254721Semaste * modification, are permitted provided that the following conditions 7254721Semaste * are met: 8254721Semaste * 1. Redistributions of source code must retain the above copyright 9254721Semaste * notice, this list of conditions and the following disclaimer. 10254721Semaste * 2. Redistributions in binary form must reproduce the above copyright 11254721Semaste * notice, this list of conditions and the following disclaimer in the 12254721Semaste * documentation and/or other materials provided with the distribution. 13254721Semaste * 14254721Semaste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15254721Semaste * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16254721Semaste * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17254721Semaste * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18254721Semaste * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19254721Semaste * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20254721Semaste * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21254721Semaste * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22254721Semaste * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23254721Semaste * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24254721Semaste * SUCH DAMAGE. 25254721Semaste */ 26254721Semaste 27254721Semaste#include <sys/cdefs.h> 28254721Semaste__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 151656 2005-10-25 19:29:02Z jhb $"); 29254721Semaste 30254721Semaste#include <sys/param.h> 31254721Semaste#include <sys/systm.h> 32254721Semaste#include <sys/bus.h> 33254721Semaste#include <sys/interrupt.h> 34254721Semaste#include <sys/kernel.h> 35254721Semaste#include <sys/kthread.h> 36254721Semaste#include <sys/lock.h> 37254721Semaste#include <sys/malloc.h> 38254721Semaste#include <sys/mutex.h> 39254721Semaste#include <sys/proc.h> 40254721Semaste#include <sys/taskqueue.h> 41254721Semaste#include <sys/unistd.h> 42254721Semaste 43254721Semastestatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 44254721Semastestatic void *taskqueue_giant_ih; 45254721Semastestatic void *taskqueue_ih; 46254721Semastestatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 47254721Semastestatic struct mtx taskqueue_queues_mutex; 48254721Semaste 49254721Semastestruct taskqueue { 50254721Semaste STAILQ_ENTRY(taskqueue) tq_link; 51254721Semaste STAILQ_HEAD(, task) tq_queue; 52254721Semaste const char *tq_name; 53254721Semaste taskqueue_enqueue_fn tq_enqueue; 54254721Semaste void *tq_context; 55254721Semaste struct task *tq_running; 56254721Semaste struct mtx tq_mutex; 57254721Semaste struct proc **tq_pproc; 58254721Semaste}; 59254721Semaste 60254721Semastestatic void init_taskqueue_list(void *data); 61254721Semaste 62254721Semastestatic void 63254721Semasteinit_taskqueue_list(void *data __unused) 64254721Semaste{ 65254721Semaste 66254721Semaste mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 67254721Semaste STAILQ_INIT(&taskqueue_queues); 68254721Semaste} 69254721SemasteSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 70254721Semaste NULL); 71254721Semaste 72254721Semastestruct taskqueue * 73254721Semastetaskqueue_create(const char *name, int mflags, 74254721Semaste taskqueue_enqueue_fn enqueue, void *context, 75254721Semaste struct proc **pp) 76254721Semaste{ 77254721Semaste struct taskqueue *queue; 78254721Semaste 79254721Semaste queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 80254721Semaste if (!queue) 81254721Semaste return 0; 82254721Semaste 83254721Semaste STAILQ_INIT(&queue->tq_queue); 84254721Semaste queue->tq_name = name; 85254721Semaste queue->tq_enqueue = enqueue; 86269024Semaste queue->tq_context = context; 87269024Semaste queue->tq_pproc = pp; 88269024Semaste mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 89269024Semaste 90254721Semaste mtx_lock(&taskqueue_queues_mutex); 91254721Semaste STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 92254721Semaste mtx_unlock(&taskqueue_queues_mutex); 93254721Semaste 94254721Semaste return queue; 95254721Semaste} 96254721Semaste 97254721Semaste/* 98254721Semaste * Signal a taskqueue thread to terminate. 99254721Semaste */ 100254721Semastestatic void 101254721Semastetaskqueue_terminate(struct proc **pp, struct taskqueue *tq) 102254721Semaste{ 103254721Semaste struct proc *p; 104254721Semaste 105263363Semaste p = *pp; 106263363Semaste *pp = NULL; 107254721Semaste if (p) { 108254721Semaste wakeup_one(tq); 109254721Semaste PROC_LOCK(p); /* NB: insure we don't miss wakeup */ 110254721Semaste mtx_unlock(&tq->tq_mutex); /* let taskqueue thread run */ 111254721Semaste msleep(p, &p->p_mtx, PWAIT, "taskqueue_destroy", 0); 112254721Semaste PROC_UNLOCK(p); 113254721Semaste mtx_lock(&tq->tq_mutex); 114254721Semaste } 115254721Semaste} 116254721Semaste 117254721Semastevoid 118254721Semastetaskqueue_free(struct taskqueue *queue) 119254721Semaste{ 120254721Semaste 121254721Semaste mtx_lock(&taskqueue_queues_mutex); 122269024Semaste STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 123254721Semaste mtx_unlock(&taskqueue_queues_mutex); 124254721Semaste 125269024Semaste mtx_lock(&queue->tq_mutex); 126269024Semaste taskqueue_run(queue); 127254721Semaste taskqueue_terminate(queue->tq_pproc, queue); 128254721Semaste mtx_destroy(&queue->tq_mutex); 129254721Semaste free(queue, M_TASKQUEUE); 130254721Semaste} 131254721Semaste 132254721Semaste/* 133254721Semaste * Returns with the taskqueue locked. 134254721Semaste */ 135254721Semastestruct taskqueue * 136254721Semastetaskqueue_find(const char *name) 137254721Semaste{ 138254721Semaste struct taskqueue *queue; 139254721Semaste 140254721Semaste mtx_lock(&taskqueue_queues_mutex); 141254721Semaste STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 142254721Semaste if (strcmp(queue->tq_name, name) == 0) { 143254721Semaste mtx_lock(&queue->tq_mutex); 144254721Semaste mtx_unlock(&taskqueue_queues_mutex); 145254721Semaste return queue; 146254721Semaste } 147254721Semaste } 148254721Semaste mtx_unlock(&taskqueue_queues_mutex); 149254721Semaste return NULL; 150254721Semaste} 151254721Semaste 152254721Semasteint 153254721Semastetaskqueue_enqueue(struct taskqueue *queue, struct task *task) 154254721Semaste{ 155254721Semaste struct task *ins; 156254721Semaste struct task *prev; 157254721Semaste 158254721Semaste mtx_lock(&queue->tq_mutex); 159254721Semaste 160254721Semaste /* 161254721Semaste * Count multiple enqueues. 162254721Semaste */ 163254721Semaste if (task->ta_pending) { 164254721Semaste task->ta_pending++; 165254721Semaste mtx_unlock(&queue->tq_mutex); 166254721Semaste return 0; 167254721Semaste } 168254721Semaste 169254721Semaste /* 170254721Semaste * Optimise the case when all tasks have the same priority. 171254721Semaste */ 172254721Semaste prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 173254721Semaste if (!prev || prev->ta_priority >= task->ta_priority) { 174254721Semaste STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 175254721Semaste } else { 176254721Semaste prev = 0; 177254721Semaste for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 178254721Semaste prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 179254721Semaste if (ins->ta_priority < task->ta_priority) 180254721Semaste break; 181254721Semaste 182254721Semaste if (prev) 183254721Semaste STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 184254721Semaste else 185254721Semaste STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 186254721Semaste } 187254721Semaste 188254721Semaste task->ta_pending = 1; 189254721Semaste queue->tq_enqueue(queue->tq_context); 190254721Semaste 191254721Semaste mtx_unlock(&queue->tq_mutex); 192254721Semaste 193254721Semaste return 0; 194254721Semaste} 195254721Semaste 196254721Semastevoid 197254721Semastetaskqueue_run(struct taskqueue *queue) 198254721Semaste{ 199254721Semaste struct task *task; 200254721Semaste int owned, pending; 201254721Semaste 202254721Semaste owned = mtx_owned(&queue->tq_mutex); 203254721Semaste if (!owned) 204254721Semaste mtx_lock(&queue->tq_mutex); 205254721Semaste while (STAILQ_FIRST(&queue->tq_queue)) { 206254721Semaste /* 207254721Semaste * Carefully remove the first task from the queue and 208254721Semaste * zero its pending count. 209254721Semaste */ 210254721Semaste task = STAILQ_FIRST(&queue->tq_queue); 211254721Semaste STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 212254721Semaste pending = task->ta_pending; 213254721Semaste task->ta_pending = 0; 214254721Semaste queue->tq_running = task; 215254721Semaste mtx_unlock(&queue->tq_mutex); 216254721Semaste 217254721Semaste task->ta_func(task->ta_context, pending); 218254721Semaste 219254721Semaste mtx_lock(&queue->tq_mutex); 220254721Semaste queue->tq_running = NULL; 221254721Semaste wakeup(task); 222254721Semaste } 223254721Semaste 224254721Semaste /* 225254721Semaste * For compatibility, unlock on return if the queue was not locked 226254721Semaste * on entry, although this opens a race window. 227254721Semaste */ 228254721Semaste if (!owned) 229254721Semaste mtx_unlock(&queue->tq_mutex); 230254721Semaste} 231254721Semaste 232254721Semastevoid 233254721Semastetaskqueue_drain(struct taskqueue *queue, struct task *task) 234254721Semaste{ 235254721Semaste WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, "taskqueue_drain"); 236254721Semaste 237254721Semaste mtx_lock(&queue->tq_mutex); 238254721Semaste while (task->ta_pending != 0 || task == queue->tq_running) 239254721Semaste msleep(task, &queue->tq_mutex, PWAIT, "-", 0); 240254721Semaste mtx_unlock(&queue->tq_mutex); 241254721Semaste} 242254721Semaste 243254721Semastestatic void 244254721Semastetaskqueue_swi_enqueue(void *context) 245254721Semaste{ 246254721Semaste swi_sched(taskqueue_ih, 0); 247254721Semaste} 248254721Semaste 249254721Semastestatic void 250254721Semastetaskqueue_swi_run(void *dummy) 251254721Semaste{ 252254721Semaste taskqueue_run(taskqueue_swi); 253254721Semaste} 254254721Semaste 255254721Semastestatic void 256254721Semastetaskqueue_swi_giant_enqueue(void *context) 257254721Semaste{ 258254721Semaste swi_sched(taskqueue_giant_ih, 0); 259254721Semaste} 260254721Semaste 261254721Semastestatic void 262254721Semastetaskqueue_swi_giant_run(void *dummy) 263254721Semaste{ 264254721Semaste taskqueue_run(taskqueue_swi_giant); 265254721Semaste} 266254721Semaste 267254721Semastevoid 268254721Semastetaskqueue_thread_loop(void *arg) 269254721Semaste{ 270254721Semaste struct taskqueue **tqp, *tq; 271254721Semaste 272254721Semaste tqp = arg; 273254721Semaste tq = *tqp; 274254721Semaste mtx_lock(&tq->tq_mutex); 275254721Semaste do { 276254721Semaste taskqueue_run(tq); 277254721Semaste msleep(tq, &tq->tq_mutex, PWAIT, "-", 0); 278254721Semaste } while (*tq->tq_pproc != NULL); 279254721Semaste 280254721Semaste /* rendezvous with thread that asked us to terminate */ 281254721Semaste wakeup_one(tq); 282254721Semaste mtx_unlock(&tq->tq_mutex); 283254721Semaste kthread_exit(0); 284254721Semaste} 285254721Semaste 286254721Semastevoid 287254721Semastetaskqueue_thread_enqueue(void *context) 288254721Semaste{ 289254721Semaste struct taskqueue **tqp, *tq; 290254721Semaste 291254721Semaste tqp = context; 292254721Semaste tq = *tqp; 293254721Semaste 294254721Semaste mtx_assert(&tq->tq_mutex, MA_OWNED); 295254721Semaste wakeup_one(tq); 296254721Semaste} 297254721Semaste 298254721SemasteTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 299254721Semaste swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 300254721Semaste INTR_MPSAFE, &taskqueue_ih)); 301254721Semaste 302254721SemasteTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 303254721Semaste swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 304254721Semaste NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 305254721Semaste 306254721SemasteTASKQUEUE_DEFINE_THREAD(thread); 307254721Semaste 308254721Semasteint 309254721Semastetaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 310254721Semaste{ 311254721Semaste struct task *ins; 312254721Semaste struct task *prev; 313254721Semaste 314254721Semaste mtx_lock_spin(&queue->tq_mutex); 315254721Semaste 316254721Semaste /* 317254721Semaste * Count multiple enqueues. 318254721Semaste */ 319254721Semaste if (task->ta_pending) { 320254721Semaste task->ta_pending++; 321254721Semaste mtx_unlock_spin(&queue->tq_mutex); 322254721Semaste return 0; 323254721Semaste } 324254721Semaste 325254721Semaste /* 326254721Semaste * Optimise the case when all tasks have the same priority. 327254721Semaste */ 328254721Semaste prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 329254721Semaste if (!prev || prev->ta_priority >= task->ta_priority) { 330254721Semaste STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 331254721Semaste } else { 332254721Semaste prev = 0; 333254721Semaste for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 334254721Semaste prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 335254721Semaste if (ins->ta_priority < task->ta_priority) 336254721Semaste break; 337254721Semaste 338254721Semaste if (prev) 339254721Semaste STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 340254721Semaste else 341254721Semaste STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 342254721Semaste } 343254721Semaste 344254721Semaste task->ta_pending = 1; 345254721Semaste queue->tq_enqueue(queue->tq_context); 346254721Semaste 347254721Semaste mtx_unlock_spin(&queue->tq_mutex); 348254721Semaste 349254721Semaste return 0; 350254721Semaste} 351254721Semaste 352254721Semastestatic void 353254721Semastetaskqueue_run_fast(struct taskqueue *queue) 354254721Semaste{ 355254721Semaste struct task *task; 356254721Semaste int pending; 357254721Semaste 358254721Semaste mtx_lock_spin(&queue->tq_mutex); 359254721Semaste while (STAILQ_FIRST(&queue->tq_queue)) { 360254721Semaste /* 361254721Semaste * Carefully remove the first task from the queue and 362254721Semaste * zero its pending count. 363254721Semaste */ 364254721Semaste task = STAILQ_FIRST(&queue->tq_queue); 365254721Semaste STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 366254721Semaste pending = task->ta_pending; 367254721Semaste task->ta_pending = 0; 368254721Semaste mtx_unlock_spin(&queue->tq_mutex); 369254721Semaste 370254721Semaste task->ta_func(task->ta_context, pending); 371254721Semaste 372254721Semaste mtx_lock_spin(&queue->tq_mutex); 373254721Semaste } 374254721Semaste mtx_unlock_spin(&queue->tq_mutex); 375254721Semaste} 376254721Semaste 377254721Semastestruct taskqueue *taskqueue_fast; 378254721Semastestatic void *taskqueue_fast_ih; 379254721Semaste 380254721Semastestatic void 381254721Semastetaskqueue_fast_schedule(void *context) 382254721Semaste{ 383254721Semaste swi_sched(taskqueue_fast_ih, 0); 384254721Semaste} 385254721Semaste 386254721Semastestatic void 387254721Semastetaskqueue_fast_run(void *dummy) 388254721Semaste{ 389254721Semaste taskqueue_run_fast(taskqueue_fast); 390254721Semaste} 391254721Semaste 392254721Semastestatic void 393254721Semastetaskqueue_define_fast(void *arg) 394254721Semaste{ 395254721Semaste 396254721Semaste taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE, 397254721Semaste M_NOWAIT | M_ZERO); 398254721Semaste if (!taskqueue_fast) { 399254721Semaste printf("%s: Unable to allocate fast task queue!\n", __func__); 400254721Semaste return; 401254721Semaste } 402254721Semaste 403254721Semaste STAILQ_INIT(&taskqueue_fast->tq_queue); 404254721Semaste taskqueue_fast->tq_name = "fast"; 405254721Semaste taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 406254721Semaste mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 407254721Semaste 408254721Semaste mtx_lock(&taskqueue_queues_mutex); 409254721Semaste STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 410254721Semaste mtx_unlock(&taskqueue_queues_mutex); 411254721Semaste 412254721Semaste swi_add(NULL, "Fast taskq", taskqueue_fast_run, 413254721Semaste NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 414254721Semaste} 415254721SemasteSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 416254721Semaste taskqueue_define_fast, NULL); 417254721Semaste