subr_taskqueue.c revision 133305
13229Spst/*- 23229Spst * Copyright (c) 2000 Doug Rabson 33229Spst * All rights reserved. 43229Spst * 53229Spst * Redistribution and use in source and binary forms, with or without 63229Spst * modification, are permitted provided that the following conditions 73229Spst * are met: 83229Spst * 1. Redistributions of source code must retain the above copyright 93229Spst * notice, this list of conditions and the following disclaimer. 103229Spst * 2. Redistributions in binary form must reproduce the above copyright 113229Spst * notice, this list of conditions and the following disclaimer in the 123229Spst * documentation and/or other materials provided with the distribution. 133229Spst * 143229Spst * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 153229Spst * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 163229Spst * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 173229Spst * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 183229Spst * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 193229Spst * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 203229Spst * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2118471Swosch * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2250476Speter * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2318471Swosch * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 243229Spst * SUCH DAMAGE. 253229Spst */ 263229Spst 273229Spst#include <sys/cdefs.h> 283229Spst__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 133305 2004-08-08 02:37:22Z jmg $"); 293229Spst 303229Spst#include <sys/param.h> 313229Spst#include <sys/systm.h> 323229Spst#include <sys/bus.h> 333229Spst#include <sys/interrupt.h> 343229Spst#include <sys/kernel.h> 353229Spst#include <sys/kthread.h> 363229Spst#include <sys/lock.h> 373229Spst#include <sys/malloc.h> 383229Spst#include <sys/mutex.h> 393229Spst#include <sys/taskqueue.h> 403229Spst#include <sys/unistd.h> 413229Spst 423229Spststatic MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 433229Spststatic void *taskqueue_giant_ih; 443229Spststatic void *taskqueue_ih; 453229Spststatic STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 463229Spststatic struct mtx taskqueue_queues_mutex; 473229Spst 483229Spststruct taskqueue { 493229Spst STAILQ_ENTRY(taskqueue) tq_link; 503229Spst STAILQ_HEAD(, task) tq_queue; 513229Spst const char *tq_name; 523229Spst taskqueue_enqueue_fn tq_enqueue; 533229Spst void *tq_context; 543229Spst struct mtx tq_mutex; 553229Spst}; 563229Spst 573229Spststatic void init_taskqueue_list(void *data); 583229Spst 593229Spststatic void 603229Spstinit_taskqueue_list(void *data __unused) 613229Spst{ 623229Spst 633229Spst mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 643229Spst STAILQ_INIT(&taskqueue_queues); 653229Spst} 663229SpstSYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 673229Spst NULL); 683229Spst 693229Spststruct taskqueue * 703229Spsttaskqueue_create(const char *name, int mflags, 713229Spst taskqueue_enqueue_fn enqueue, void *context) 723229Spst{ 733229Spst struct taskqueue *queue; 743229Spst 753229Spst queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 763229Spst if (!queue) 773229Spst return 0; 783229Spst 793229Spst STAILQ_INIT(&queue->tq_queue); 803229Spst queue->tq_name = name; 813229Spst queue->tq_enqueue = enqueue; 823229Spst queue->tq_context = context; 833229Spst mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 843229Spst 853229Spst mtx_lock(&taskqueue_queues_mutex); 863229Spst STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 873229Spst mtx_unlock(&taskqueue_queues_mutex); 883229Spst 893229Spst return queue; 903229Spst} 913229Spst 923229Spstvoid 933229Spsttaskqueue_free(struct taskqueue *queue) 943229Spst{ 953229Spst 963229Spst mtx_lock(&taskqueue_queues_mutex); 973229Spst STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 983229Spst mtx_unlock(&taskqueue_queues_mutex); 993229Spst 1003229Spst mtx_lock(&queue->tq_mutex); 1013229Spst taskqueue_run(queue); 1023229Spst mtx_destroy(&queue->tq_mutex); 1033229Spst free(queue, M_TASKQUEUE); 1043229Spst} 1053229Spst 1063229Spst/* 1073229Spst * Returns with the taskqueue locked. 1083229Spst */ 1093229Spststruct taskqueue * 1103229Spsttaskqueue_find(const char *name) 1113229Spst{ 1123229Spst struct taskqueue *queue; 1133229Spst 1143229Spst mtx_lock(&taskqueue_queues_mutex); 1153229Spst STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 1163229Spst if (strcmp(queue->tq_name, name) == 0) { 1173229Spst mtx_lock(&queue->tq_mutex); 1183229Spst mtx_unlock(&taskqueue_queues_mutex); 1193229Spst return queue; 1203229Spst } 1213229Spst } 1223229Spst mtx_unlock(&taskqueue_queues_mutex); 1233229Spst return NULL; 1243229Spst} 1253229Spst 1263229Spstint 1273229Spsttaskqueue_enqueue(struct taskqueue *queue, struct task *task) 1283229Spst{ 1293229Spst struct task *ins; 1303229Spst struct task *prev; 1313229Spst 1323229Spst mtx_lock(&queue->tq_mutex); 1333229Spst 1343229Spst /* 1353229Spst * Count multiple enqueues. 1363229Spst */ 1373229Spst if (task->ta_pending) { 1383229Spst task->ta_pending++; 1393229Spst mtx_unlock(&queue->tq_mutex); 1403229Spst return 0; 1413229Spst } 1423229Spst 1433229Spst /* 1443229Spst * Optimise the case when all tasks have the same priority. 1453229Spst */ 1463229Spst prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 1473229Spst if (!prev || prev->ta_priority >= task->ta_priority) { 1483229Spst STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 1493229Spst } else { 1503229Spst prev = 0; 1513229Spst for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 1523229Spst prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 1533229Spst if (ins->ta_priority < task->ta_priority) 1543229Spst break; 1553229Spst 1563229Spst if (prev) 1573229Spst STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 1583229Spst else 1593229Spst STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 1603229Spst } 1613229Spst 1623229Spst task->ta_pending = 1; 1633229Spst if (queue->tq_enqueue) 1643229Spst queue->tq_enqueue(queue->tq_context); 1653229Spst 1663229Spst mtx_unlock(&queue->tq_mutex); 1673229Spst 1683229Spst return 0; 1693229Spst} 1703229Spst 1713229Spstvoid 1723229Spsttaskqueue_run(struct taskqueue *queue) 1733229Spst{ 1743229Spst struct task *task; 1753229Spst int owned, pending; 1763229Spst 1773229Spst owned = mtx_owned(&queue->tq_mutex); 1783229Spst if (!owned) 1793229Spst mtx_lock(&queue->tq_mutex); 1803229Spst while (STAILQ_FIRST(&queue->tq_queue)) { 1813229Spst /* 1823229Spst * Carefully remove the first task from the queue and 1833229Spst * zero its pending count. 1843229Spst */ 1853229Spst task = STAILQ_FIRST(&queue->tq_queue); 1863229Spst STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 1873229Spst pending = task->ta_pending; 1883229Spst task->ta_pending = 0; 1893229Spst mtx_unlock(&queue->tq_mutex); 1903229Spst 1913229Spst task->ta_func(task->ta_context, pending); 1923229Spst 1933229Spst mtx_lock(&queue->tq_mutex); 1943229Spst } 1953229Spst 1963229Spst /* 1973229Spst * For compatibility, unlock on return if the queue was not locked 1983229Spst * on entry, although this opens a race window. 1993229Spst */ 2003229Spst if (!owned) 2013229Spst mtx_unlock(&queue->tq_mutex); 2023229Spst} 2033229Spst 2043229Spststatic void 2053229Spsttaskqueue_swi_enqueue(void *context) 2063229Spst{ 2073229Spst swi_sched(taskqueue_ih, 0); 2083229Spst} 2093229Spst 2103229Spststatic void 2113229Spsttaskqueue_swi_run(void *dummy) 2123229Spst{ 2133229Spst taskqueue_run(taskqueue_swi); 2143229Spst} 2153229Spst 2163229Spststatic void 2173229Spsttaskqueue_swi_giant_enqueue(void *context) 2183229Spst{ 2193229Spst swi_sched(taskqueue_giant_ih, 0); 2203229Spst} 2213229Spst 2223229Spststatic void 2233229Spsttaskqueue_swi_giant_run(void *dummy) 2243229Spst{ 2253229Spst taskqueue_run(taskqueue_swi_giant); 2263229Spst} 2273229Spst 2283229Spstvoid 2293229Spsttaskqueue_thread_loop(void *arg) 2303229Spst{ 23197417Salfred struct taskqueue **tqp, *tq; 2323229Spst 2333229Spst tqp = arg; 23497417Salfred tq = *tqp; 2353229Spst mtx_lock(&tq->tq_mutex); 23697417Salfred for (;;) { 2373229Spst taskqueue_run(tq); 23897417Salfred msleep(tq, &tq->tq_mutex, PWAIT, "-", 0); 2393229Spst } 24097417Salfred} 2413229Spst 24297417Salfredvoid 2433229Spsttaskqueue_thread_enqueue(void *context) 24497417Salfred{ 2453229Spst struct taskqueue **tqp, *tq; 24697417Salfred 2473229Spst tqp = context; 24897417Salfred tq = *tqp; 2493229Spst 25097417Salfred mtx_assert(&tq->tq_mutex, MA_OWNED); 2513229Spst wakeup(tq); 25297417Salfred} 2533229Spst 25497417SalfredTASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 2553229Spst swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 25697417Salfred INTR_MPSAFE, &taskqueue_ih)); 2573229Spst 25897417SalfredTASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 2593229Spst swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 26097417Salfred NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 2613229Spst 26297417SalfredTASKQUEUE_DEFINE_THREAD(thread); 2633229Spst 26497417Salfredint 2653229Spsttaskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 26697417Salfred{ 2673229Spst struct task *ins; 26897417Salfred struct task *prev; 2693229Spst 27097417Salfred mtx_lock_spin(&queue->tq_mutex); 2713229Spst 27297417Salfred /* 2733229Spst * Count multiple enqueues. 27497417Salfred */ 2753229Spst if (task->ta_pending) { 27697417Salfred task->ta_pending++; 2773229Spst mtx_unlock_spin(&queue->tq_mutex); 27897417Salfred return 0; 2793229Spst } 2803229Spst 2813229Spst /* 2823229Spst * Optimise the case when all tasks have the same priority. 2833229Spst */ 2843229Spst prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 2853229Spst if (!prev || prev->ta_priority >= task->ta_priority) { 2863229Spst STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 2873229Spst } else { 2883229Spst prev = 0; 2893229Spst for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 2903229Spst prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 2913229Spst if (ins->ta_priority < task->ta_priority) 2923229Spst break; 2933229Spst 2943229Spst if (prev) 2953229Spst STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 2963229Spst else 2973229Spst STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 2983229Spst } 2993229Spst 3003229Spst task->ta_pending = 1; 3013229Spst if (queue->tq_enqueue) 3023229Spst queue->tq_enqueue(queue->tq_context); 3033229Spst 3043229Spst mtx_unlock_spin(&queue->tq_mutex); 3053229Spst 3063229Spst return 0; 3073229Spst} 3083229Spst 3093229Spststatic void 3103229Spsttaskqueue_run_fast(struct taskqueue *queue) 3113229Spst{ 3123229Spst struct task *task; 3133229Spst int pending; 3143229Spst 3153229Spst mtx_lock_spin(&queue->tq_mutex); 3163229Spst while (STAILQ_FIRST(&queue->tq_queue)) { 3173229Spst /* 3183229Spst * Carefully remove the first task from the queue and 3193229Spst * zero its pending count. 3203229Spst */ 3213229Spst task = STAILQ_FIRST(&queue->tq_queue); 3223229Spst STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 3233229Spst pending = task->ta_pending; 3243229Spst task->ta_pending = 0; 3253229Spst mtx_unlock_spin(&queue->tq_mutex); 3263229Spst 3273229Spst task->ta_func(task->ta_context, pending); 3283229Spst 3293229Spst mtx_lock_spin(&queue->tq_mutex); 3303229Spst } 3313229Spst mtx_unlock_spin(&queue->tq_mutex); 3323229Spst} 3333229Spst 3343229Spststruct taskqueue *taskqueue_fast; 3353229Spststatic void *taskqueue_fast_ih; 3363229Spst 3373229Spststatic void 3383229Spsttaskqueue_fast_schedule(void *context) 3393229Spst{ 3403229Spst swi_sched(taskqueue_fast_ih, 0); 3413229Spst} 3423229Spst 3433229Spststatic void 3443229Spsttaskqueue_fast_run(void *dummy) 3453229Spst{ 3463229Spst taskqueue_run_fast(taskqueue_fast); 3473229Spst} 3483229Spst 3493229Spststatic void 3503229Spsttaskqueue_define_fast(void *arg) 3513229Spst{ 3523229Spst 3533229Spst taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE, 3543229Spst M_NOWAIT | M_ZERO); 3553229Spst if (!taskqueue_fast) { 3563229Spst printf("%s: Unable to allocate fast task queue!\n", __func__); 3573229Spst return; 3583229Spst } 3593229Spst 3603229Spst STAILQ_INIT(&taskqueue_fast->tq_queue); 3613229Spst taskqueue_fast->tq_name = "fast"; 3623229Spst taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 3633229Spst mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 3643229Spst 3653229Spst mtx_lock(&taskqueue_queues_mutex); 3663229Spst STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 3673229Spst mtx_unlock(&taskqueue_queues_mutex); 3683229Spst 3693229Spst swi_add(NULL, "Fast task queue", taskqueue_fast_run, 3703229Spst NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 3713229Spst} 3723229SpstSYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 3733229Spst taskqueue_define_fast, NULL); 3743229Spst