subr_taskqueue.c revision 122436
1/*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 122436 2003-11-10 20:39:44Z alfred $"); 29 30#include <sys/param.h> 31#include <sys/systm.h> 32#include <sys/bus.h> 33#include <sys/interrupt.h> 34#include <sys/kernel.h> 35#include <sys/lock.h> 36#include <sys/malloc.h> 37#include <sys/mutex.h> 38#include <sys/taskqueue.h> 39#include <sys/kthread.h> 40#include <sys/unistd.h> 41 42static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 43 44static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 45 46static void *taskqueue_ih; 47static void *taskqueue_giant_ih; 48static struct mtx taskqueue_queues_mutex; 49static struct proc *taskqueue_thread_proc; 50 51struct taskqueue { 52 STAILQ_ENTRY(taskqueue) tq_link; 53 STAILQ_HEAD(, task) tq_queue; 54 const char *tq_name; 55 taskqueue_enqueue_fn tq_enqueue; 56 void *tq_context; 57 int tq_draining; 58 struct mtx tq_mutex; 59}; 60 61static void init_taskqueue_list(void *data); 62 63static void 64init_taskqueue_list(void *data __unused) 65{ 66 67 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 68 STAILQ_INIT(&taskqueue_queues); 69} 70SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 71 NULL); 72 73struct taskqueue * 74taskqueue_create(const char *name, int mflags, 75 taskqueue_enqueue_fn enqueue, void *context) 76{ 77 struct taskqueue *queue; 78 79 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 80 if (!queue) 81 return 0; 82 83 STAILQ_INIT(&queue->tq_queue); 84 queue->tq_name = name; 85 queue->tq_enqueue = enqueue; 86 queue->tq_context = context; 87 queue->tq_draining = 0; 88 mtx_init(&queue->tq_mutex, "taskqueue", NULL, MTX_DEF); 89 90 mtx_lock(&taskqueue_queues_mutex); 91 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 92 mtx_unlock(&taskqueue_queues_mutex); 93 94 return queue; 95} 96 97void 98taskqueue_free(struct taskqueue *queue) 99{ 100 101 mtx_lock(&queue->tq_mutex); 102 KASSERT(queue->tq_draining == 0, ("free'ing a draining taskqueue")); 103 queue->tq_draining = 1; 104 mtx_unlock(&queue->tq_mutex); 105 106 taskqueue_run(queue); 107 108 mtx_lock(&taskqueue_queues_mutex); 109 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 110 mtx_unlock(&taskqueue_queues_mutex); 111 112 mtx_destroy(&queue->tq_mutex); 113 free(queue, M_TASKQUEUE); 114} 115 116/* 117 * Returns with the taskqueue locked. 118 */ 119struct taskqueue * 120taskqueue_find(const char *name) 121{ 122 struct taskqueue *queue; 123 124 mtx_lock(&taskqueue_queues_mutex); 125 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 126 mtx_lock(&queue->tq_mutex); 127 if (!strcmp(queue->tq_name, name)) { 128 mtx_unlock(&taskqueue_queues_mutex); 129 return queue; 130 } 131 mtx_unlock(&queue->tq_mutex); 132 } 133 mtx_unlock(&taskqueue_queues_mutex); 134 return 0; 135} 136 137int 138taskqueue_enqueue(struct taskqueue *queue, struct task *task) 139{ 140 struct task *ins; 141 struct task *prev; 142 143 mtx_lock(&queue->tq_mutex); 144 145 /* 146 * Don't allow new tasks on a queue which is being freed. 147 */ 148 if (queue->tq_draining) { 149 mtx_unlock(&queue->tq_mutex); 150 return EPIPE; 151 } 152 153 /* 154 * Count multiple enqueues. 155 */ 156 if (task->ta_pending) { 157 task->ta_pending++; 158 mtx_unlock(&queue->tq_mutex); 159 return 0; 160 } 161 162 /* 163 * Optimise the case when all tasks have the same priority. 164 */ 165 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 166 if (!prev || prev->ta_priority >= task->ta_priority) { 167 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 168 } else { 169 prev = 0; 170 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 171 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 172 if (ins->ta_priority < task->ta_priority) 173 break; 174 175 if (prev) 176 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 177 else 178 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 179 } 180 181 task->ta_pending = 1; 182 if (queue->tq_enqueue) 183 queue->tq_enqueue(queue->tq_context); 184 185 mtx_unlock(&queue->tq_mutex); 186 187 return 0; 188} 189 190void 191taskqueue_run(struct taskqueue *queue) 192{ 193 struct task *task; 194 int pending; 195 196 mtx_lock(&queue->tq_mutex); 197 while (STAILQ_FIRST(&queue->tq_queue)) { 198 /* 199 * Carefully remove the first task from the queue and 200 * zero its pending count. 201 */ 202 task = STAILQ_FIRST(&queue->tq_queue); 203 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 204 pending = task->ta_pending; 205 task->ta_pending = 0; 206 mtx_unlock(&queue->tq_mutex); 207 208 task->ta_func(task->ta_context, pending); 209 210 mtx_lock(&queue->tq_mutex); 211 } 212 mtx_unlock(&queue->tq_mutex); 213} 214 215static void 216taskqueue_swi_enqueue(void *context) 217{ 218 swi_sched(taskqueue_ih, 0); 219} 220 221static void 222taskqueue_swi_run(void *dummy) 223{ 224 taskqueue_run(taskqueue_swi); 225} 226 227static void 228taskqueue_swi_giant_enqueue(void *context) 229{ 230 swi_sched(taskqueue_giant_ih, 0); 231} 232 233static void 234taskqueue_swi_giant_run(void *dummy) 235{ 236 taskqueue_run(taskqueue_swi_giant); 237} 238 239static void 240taskqueue_kthread(void *arg) 241{ 242 struct mtx kthread_mutex; 243 244 bzero(&kthread_mutex, sizeof(kthread_mutex)); 245 246 mtx_init(&kthread_mutex, "taskqueue kthread", NULL, MTX_DEF); 247 248 mtx_lock(&kthread_mutex); 249 250 for (;;) { 251 mtx_unlock(&kthread_mutex); 252 taskqueue_run(taskqueue_thread); 253 mtx_lock(&kthread_mutex); 254 msleep(&taskqueue_thread, &kthread_mutex, PWAIT, "tqthr", 0); 255 } 256} 257 258static void 259taskqueue_thread_enqueue(void *context) 260{ 261 wakeup(&taskqueue_thread); 262} 263 264TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 265 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 266 INTR_MPSAFE, &taskqueue_ih)); 267 268TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 269 swi_add(NULL, "Giant task queue", taskqueue_swi_giant_run, 270 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 271 272TASKQUEUE_DEFINE(thread, taskqueue_thread_enqueue, 0, 273 kthread_create(taskqueue_kthread, NULL, 274 &taskqueue_thread_proc, 0, 0, "taskqueue")); 275 276int 277taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 278{ 279 struct task *ins; 280 struct task *prev; 281 282 mtx_lock_spin(&queue->tq_mutex); 283 284 /* 285 * Don't allow new tasks on a queue which is being freed. 286 */ 287 if (queue->tq_draining) { 288 mtx_unlock_spin(&queue->tq_mutex); 289 return EPIPE; 290 } 291 292 /* 293 * Count multiple enqueues. 294 */ 295 if (task->ta_pending) { 296 task->ta_pending++; 297 mtx_unlock_spin(&queue->tq_mutex); 298 return 0; 299 } 300 301 /* 302 * Optimise the case when all tasks have the same priority. 303 */ 304 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 305 if (!prev || prev->ta_priority >= task->ta_priority) { 306 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 307 } else { 308 prev = 0; 309 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 310 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 311 if (ins->ta_priority < task->ta_priority) 312 break; 313 314 if (prev) 315 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 316 else 317 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 318 } 319 320 task->ta_pending = 1; 321 if (queue->tq_enqueue) 322 queue->tq_enqueue(queue->tq_context); 323 324 mtx_unlock_spin(&queue->tq_mutex); 325 326 return 0; 327} 328 329static void 330taskqueue_run_fast(struct taskqueue *queue) 331{ 332 struct task *task; 333 int pending; 334 335 mtx_lock_spin(&queue->tq_mutex); 336 while (STAILQ_FIRST(&queue->tq_queue)) { 337 /* 338 * Carefully remove the first task from the queue and 339 * zero its pending count. 340 */ 341 task = STAILQ_FIRST(&queue->tq_queue); 342 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 343 pending = task->ta_pending; 344 task->ta_pending = 0; 345 mtx_unlock_spin(&queue->tq_mutex); 346 347 task->ta_func(task->ta_context, pending); 348 349 mtx_lock_spin(&queue->tq_mutex); 350 } 351 mtx_unlock_spin(&queue->tq_mutex); 352} 353 354struct taskqueue *taskqueue_fast; 355static void *taskqueue_fast_ih; 356 357static void 358taskqueue_fast_schedule(void *context) 359{ 360 swi_sched(taskqueue_fast_ih, 0); 361} 362 363static void 364taskqueue_fast_run(void *dummy) 365{ 366 taskqueue_run_fast(taskqueue_fast); 367} 368 369static void 370taskqueue_define_fast(void *arg) 371{ 372 taskqueue_fast = malloc(sizeof(struct taskqueue), 373 M_TASKQUEUE, M_NOWAIT | M_ZERO); 374 if (!taskqueue_fast) { 375 printf("%s: Unable to allocate fast task queue!\n", __func__); 376 return; 377 } 378 379 STAILQ_INIT(&taskqueue_fast->tq_queue); 380 taskqueue_fast->tq_name = "fast"; 381 taskqueue_fast->tq_enqueue = taskqueue_fast_schedule; 382 mtx_init(&taskqueue_fast->tq_mutex, "taskqueue_fast", NULL, MTX_SPIN); 383 384 mtx_lock(&taskqueue_queues_mutex); 385 STAILQ_INSERT_TAIL(&taskqueue_queues, taskqueue_fast, tq_link); 386 mtx_unlock(&taskqueue_queues_mutex); 387 388 swi_add(NULL, "Fast task queue", taskqueue_fast_run, 389 NULL, SWI_TQ_FAST, 0, &taskqueue_fast_ih); 390} 391SYSINIT(taskqueue_fast, SI_SUB_CONFIGURE, SI_ORDER_SECOND, 392 taskqueue_define_fast, NULL); 393