subr_taskqueue.c revision 133305
1/*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 133305 2004-08-08 02:37:22Z jmg $"); 29 30#include <sys/param.h> 31#include <sys/systm.h> 32#include <sys/bus.h> 33#include <sys/interrupt.h> 34#include <sys/kernel.h> 35#include <sys/kthread.h> 36#include <sys/lock.h> 37#include <sys/malloc.h> 38#include <sys/mutex.h> 39#include <sys/taskqueue.h> 40#include <sys/unistd.h> 41 42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43static void *taskqueue_giant_ih; 44static void *taskqueue_ih; 45static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 46static struct mtx taskqueue_queues_mutex; 47 48struct taskqueue { 49 STAILQ_ENTRY(taskqueue) tq_link; 50 STAILQ_HEAD(, task) tq_queue; 51 const char *tq_name; 52 taskqueue_enqueue_fn tq_enqueue; 53 void *tq_context; 54 struct mtx tq_mutex; 55}; 56 57static void init_taskqueue_list(void *data); 58 59static void 60init_taskqueue_list(void *data __unused) 61{ 62 63 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 64 STAILQ_INIT(&taskqueue_queues); 65} 66SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 67 NULL); 68 69struct taskqueue * 70taskqueue_create(const char *name, int mflags, 71 taskqueue_enqueue_fn enqueue, void *context) 72{ 73 struct taskqueue *queue; 74 75 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 76 if (!queue) 77 return 0; 78 79 STAILQ_INIT(&queue->tq_queue); 80 queue->tq_name = name; 81 queue->tq_enqueue = enqueue; 82 queue->tq_context = context; 83 mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 84 85 mtx_lock(&taskqueue_queues_mutex); 86 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 87 mtx_unlock(&taskqueue_queues_mutex); 88 89 return queue; 90} 91 92void 93taskqueue_free(struct taskqueue *queue) 94{ 95 96 mtx_lock(&taskqueue_queues_mutex); 97 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 98 mtx_unlock(&taskqueue_queues_mutex); 99 100 mtx_lock(&queue->tq_mutex); 101 taskqueue_run(queue); 102 mtx_destroy(&queue->tq_mutex); 103 free(queue, M_TASKQUEUE); 104} 105 106/* 107 * Returns with the taskqueue locked. 108 */ 109struct taskqueue * 110taskqueue_find(const char *name) 111{ 112 struct taskqueue *queue; 113 114 mtx_lock(&taskqueue_queues_mutex); 115 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 116 if (strcmp(queue->tq_name, name) == 0) { 117 mtx_lock(&queue->tq_mutex); 118 mtx_unlock(&taskqueue_queues_mutex); 119 return queue; 120 } 121 } 122 mtx_unlock(&taskqueue_queues_mutex); 123 return NULL; 124} 125 126int 127taskqueue_enqueue(struct taskqueue *queue, struct task *task) 128{ 129 struct task *ins; 130 struct task *prev; 131 132 mtx_lock(&queue->tq_mutex); 133 134 /* 135 * Count multiple enqueues. 136 */ 137 if (task->ta_pending) { 138 task->ta_pending++; 139 mtx_unlock(&queue->tq_mutex); 140 return 0; 141 } 142 143 /* 144 * Optimise the case when all tasks have the same priority. 145 */ 146 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 147 if (!prev || prev->ta_priority >= task->ta_priority) { 148 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 149 } else { 150 prev = 0; 151 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 152 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 153 if (ins->ta_priority < task->ta_priority) 154 break; 155 156 if (prev) 157 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 158 else 159 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 160 } 161 162 task->ta_pending = 1; 163 if (queue->tq_enqueue) 164 queue->tq_enqueue(queue->tq_context); 165 166 mtx_unlock(&queue->tq_mutex); 167 168 return 0; 169} 170 171void 172taskqueue_run(struct taskqueue *queue) 173{ 174 struct task *task; 175 int owned, pending; 176 177 owned = mtx_owned(&queue->tq_mutex); 178 if (!owned) 179 mtx_lock(&queue->tq_mutex); 180 while (STAILQ_FIRST(&queue->tq_queue)) { 181 /* 182 * Carefully remove the first task from the queue and 183 * zero its pending count. 184 */ 185 task = STAILQ_FIRST(&queue->tq_queue); 186 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 187 pending = task->ta_pending; 188 task->ta_pending = 0; 189 mtx_unlock(&queue->tq_mutex); 190 191 task->ta_func(task->ta_context, pending); 192 193 mtx_lock(&queue->tq_mutex); 194 } 195 196 /* 197 * For compatibility, unlock on return if the queue was not locked 198 * on entry, although this opens a race window. 199 */ 200 if (!owned) 201 mtx_unlock(&queue->tq_mutex); 202} 203 204static void 205taskqueue_swi_enqueue(void *context) 206{ 207 swi_sched(taskqueue_ih, 0); 208} 209 210static void 211taskqueue_swi_run(void *dummy) 212{ 213 taskqueue_run(taskqueue_swi); 214} 215 216static void 217taskqueue_swi_giant_enqueue(void *context) 218{ 219 swi_sched(taskqueue_giant_ih, 0); 220} 221 222static void 223taskqueue_swi_giant_run(void *dummy) 224{ 225 taskqueue_run(taskqueue_swi_giant); 226} 227 228void 229taskqueue_thread_loop(void *arg) 230{ 231 struct taskqueue **tqp, *tq; 232 233 tqp = arg; 234 tq = *tqp; 235 mtx_lock(&tq->tq_mutex); 236 for (;;) { 237 taskqueue_run(tq); 238 msleep(tq, &tq->tq_mutex, PWAIT, "-", 0); 239 } 240} 241 242void 243taskqueue_thread_enqueue(void *context) 244{ 245 struct taskqueue **tqp, *tq; 246 247 tqp = context; 248 tq = *tqp; 249 250 mtx_assert(&tq->tq_mutex, MA_OWNED); 251 wakeup(tq); 252} 253 254TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 255 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 256 INTR_MPSAFE, &taskqueue_ih)); 257 258TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 259 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 260 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 261 262TASKQUEUE_DEFINE_THREAD(thread); 263 264int 265taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 266{ 267 struct task *ins; 268 struct task *prev; 269 270 mtx_lock_spin(&queue->tq_mutex); 271 272 /* 273 * Count multiple enqueues. 274 */ 275 if (task->ta_pending) { 276 task->ta_pending++; 277 mtx_unlock_spin(&queue->tq_mutex); 278 return 0; 279 } 280 281 /* 282 * Optimise the case when all tasks have the same priority. 283 */ 284 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 285 if (!prev || prev->ta_priority >= task->ta_priority) { 286 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 287 } else { 288 prev = 0; 289 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 290 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 291 if (ins->ta_priority < task->ta_priority) 292 break; 293 294 if (prev) 295 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 296 else 297 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 298 } 299 300 task->ta_pending = 1; 301 if (queue->tq_enqueue) 302 queue->tq_enqueue(queue->tq_context); 303 304 mtx_unlock_spin(&queue->tq_mutex); 305 306 return 0; 307} 308 309static void 310taskqueue_run_fast(struct taskqueue *queue) 311{ 312 struct task *task; 313 int pending; 314 315 mtx_lock_spin(&queue->tq_mutex); 316 while (STAILQ_FIRST(&queue->tq_queue)) { 317 /* 318 * Carefully remove the first task from the queue and 319 * zero its pending count. 320 */ 321 task = STAILQ_FIRST(&queue->tq_queue); 322 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 323 pending = task->ta_pending; 324 task->ta_pending = 0; 325 mtx_unlock_spin(&queue->tq_mutex); 326 327 task->ta_func(task->ta_context, pending); 328 329 mtx_lock_spin(&queue->tq_mutex); 330 } 331 mtx_unlock_spin(&queue->tq_mutex); 332} 333 334struct taskqueue *taskqueue_fast; 335static void *taskqueue_fast_ih; 336 337static void 338taskqueue_fast_schedule(void *context) 339{ 340 swi_sched(taskqueue_fast_ih, 0); 341} 342 343static void 344taskqueue_fast_run(void *dummy) 345{ 346 taskqueue_run_fast(taskqueue_fast); 347} 348 349static void 350taskqueue_define_fast(void *arg) 351{ 352 353 taskqueue_fast = malloc(sizeof(struct taskqueue), M_TASKQUEUE, 354 M_NOWAIT | M_ZERO); 355 if (!taskqueue_fast) { 356 printf("%s: Unable to allocate fast task queue!\n", __func__); 357 return; 358 } 359 360 STAILQ_INIT(&taskqueue_fast->tq_queue); 361 taskqueue_fast->tq_name = "fast"; 362 taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 363 mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 364 365 mtx_lock(&taskqueue_queues_mutex); 366 STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 367 mtx_unlock(&taskqueue_queues_mutex); 368 369 swi_add(NULL, "Fast task queue", taskqueue_fast_run, 370 NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 371} 372SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 373 taskqueue_define_fast, NULL); 374