subr_taskqueue.c revision 154205
1/*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: head/sys/kern/subr_taskqueue.c 154205 2006-01-11 00:37:13Z scottl $"); 29 30#include <sys/param.h> 31#include <sys/systm.h> 32#include <sys/bus.h> 33#include <sys/interrupt.h> 34#include <sys/kernel.h> 35#include <sys/kthread.h> 36#include <sys/lock.h> 37#include <sys/malloc.h> 38#include <sys/mutex.h> 39#include <sys/proc.h> 40#include <sys/taskqueue.h> 41#include <sys/unistd.h> 42 43static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 44static void *taskqueue_giant_ih; 45static void *taskqueue_ih; 46static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 47static struct mtx taskqueue_queues_mutex; 48 49struct taskqueue { 50 STAILQ_ENTRY(taskqueue) tq_link; 51 STAILQ_HEAD(, task) tq_queue; 52 const char *tq_name; 53 taskqueue_enqueue_fn tq_enqueue; 54 void *tq_context; 55 struct task *tq_running; 56 struct mtx tq_mutex; 57 struct proc **tq_pproc; 58 int tq_spin; 59}; 60 61static __inline void 62TQ_LOCK(struct taskqueue *tq) 63{ 64 if (tq->tq_spin) 65 mtx_lock_spin(&tq->tq_mutex); 66 else 67 mtx_lock(&tq->tq_mutex); 68} 69 70static __inline void 71TQ_UNLOCK(struct taskqueue *tq) 72{ 73 if (tq->tq_spin) 74 mtx_unlock_spin(&tq->tq_mutex); 75 else 76 mtx_unlock(&tq->tq_mutex); 77} 78 79static void init_taskqueue_list(void *data); 80 81static __inline int 82TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 83 int t) 84{ 85 if (tq->tq_spin) 86 return (msleep_spin(p, m, wm, t)); 87 return (msleep(p, m, pri, wm, t)); 88} 89 90static void 91init_taskqueue_list(void *data __unused) 92{ 93 94 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF); 95 STAILQ_INIT(&taskqueue_queues); 96} 97SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list, 98 NULL); 99 100static struct taskqueue * 101_taskqueue_create(const char *name, int mflags, 102 taskqueue_enqueue_fn enqueue, void *context, 103 struct proc **pp, 104 int mtxflags, const char *mtxname) 105{ 106 struct taskqueue *queue; 107 108 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 109 if (!queue) 110 return 0; 111 112 STAILQ_INIT(&queue->tq_queue); 113 queue->tq_name = name; 114 queue->tq_enqueue = enqueue; 115 queue->tq_context = context; 116 queue->tq_pproc = pp; 117 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 118 mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); 119 120 mtx_lock(&taskqueue_queues_mutex); 121 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 122 mtx_unlock(&taskqueue_queues_mutex); 123 124 return queue; 125} 126 127struct taskqueue * 128taskqueue_create(const char *name, int mflags, 129 taskqueue_enqueue_fn enqueue, void *context, 130 struct proc **pp) 131{ 132 return _taskqueue_create(name, mflags, enqueue, context, pp, 133 MTX_DEF, "taskqueue"); 134} 135 136/* 137 * Signal a taskqueue thread to terminate. 138 */ 139static void 140taskqueue_terminate(struct proc **pp, struct taskqueue *tq) 141{ 142 struct proc *p; 143 144 p = *pp; 145 *pp = NULL; 146 if (p) { 147 wakeup_one(tq); 148 TQ_SLEEP(tq, p, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 149 } 150} 151 152void 153taskqueue_free(struct taskqueue *queue) 154{ 155 156 mtx_lock(&taskqueue_queues_mutex); 157 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 158 mtx_unlock(&taskqueue_queues_mutex); 159 160 TQ_LOCK(queue); 161 taskqueue_run(queue); 162 taskqueue_terminate(queue->tq_pproc, queue); 163 mtx_destroy(&queue->tq_mutex); 164 free(queue, M_TASKQUEUE); 165} 166 167/* 168 * Returns with the taskqueue locked. 169 */ 170struct taskqueue * 171taskqueue_find(const char *name) 172{ 173 struct taskqueue *queue; 174 175 mtx_lock(&taskqueue_queues_mutex); 176 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 177 if (strcmp(queue->tq_name, name) == 0) { 178 TQ_LOCK(queue); 179 mtx_unlock(&taskqueue_queues_mutex); 180 return queue; 181 } 182 } 183 mtx_unlock(&taskqueue_queues_mutex); 184 return NULL; 185} 186 187int 188taskqueue_enqueue(struct taskqueue *queue, struct task *task) 189{ 190 struct task *ins; 191 struct task *prev; 192 193 TQ_LOCK(queue); 194 195 /* 196 * Count multiple enqueues. 197 */ 198 if (task->ta_pending) { 199 task->ta_pending++; 200 TQ_UNLOCK(queue); 201 return 0; 202 } 203 204 /* 205 * Optimise the case when all tasks have the same priority. 206 */ 207 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 208 if (!prev || prev->ta_priority >= task->ta_priority) { 209 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 210 } else { 211 prev = 0; 212 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 213 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 214 if (ins->ta_priority < task->ta_priority) 215 break; 216 217 if (prev) 218 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 219 else 220 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 221 } 222 223 task->ta_pending = 1; 224 queue->tq_enqueue(queue->tq_context); 225 226 TQ_UNLOCK(queue); 227 228 return 0; 229} 230 231void 232taskqueue_run(struct taskqueue *queue) 233{ 234 struct task *task; 235 int owned, pending; 236 237 owned = mtx_owned(&queue->tq_mutex); 238 if (!owned) 239 TQ_LOCK(queue); 240 while (STAILQ_FIRST(&queue->tq_queue)) { 241 /* 242 * Carefully remove the first task from the queue and 243 * zero its pending count. 244 */ 245 task = STAILQ_FIRST(&queue->tq_queue); 246 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 247 pending = task->ta_pending; 248 task->ta_pending = 0; 249 queue->tq_running = task; 250 TQ_UNLOCK(queue); 251 252 task->ta_func(task->ta_context, pending); 253 254 TQ_LOCK(queue); 255 queue->tq_running = NULL; 256 wakeup(task); 257 } 258 259 /* 260 * For compatibility, unlock on return if the queue was not locked 261 * on entry, although this opens a race window. 262 */ 263 if (!owned) 264 TQ_UNLOCK(queue); 265} 266 267void 268taskqueue_drain(struct taskqueue *queue, struct task *task) 269{ 270 if (queue->tq_spin) { /* XXX */ 271 mtx_lock_spin(&queue->tq_mutex); 272 while (task->ta_pending != 0 || task == queue->tq_running) 273 msleep_spin(task, &queue->tq_mutex, "-", 0); 274 mtx_unlock_spin(&queue->tq_mutex); 275 } else { 276 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 277 278 mtx_lock(&queue->tq_mutex); 279 while (task->ta_pending != 0 || task == queue->tq_running) 280 msleep(task, &queue->tq_mutex, PWAIT, "-", 0); 281 mtx_unlock(&queue->tq_mutex); 282 } 283} 284 285static void 286taskqueue_swi_enqueue(void *context) 287{ 288 swi_sched(taskqueue_ih, 0); 289} 290 291static void 292taskqueue_swi_run(void *dummy) 293{ 294 taskqueue_run(taskqueue_swi); 295} 296 297static void 298taskqueue_swi_giant_enqueue(void *context) 299{ 300 swi_sched(taskqueue_giant_ih, 0); 301} 302 303static void 304taskqueue_swi_giant_run(void *dummy) 305{ 306 taskqueue_run(taskqueue_swi_giant); 307} 308 309void 310taskqueue_thread_loop(void *arg) 311{ 312 struct taskqueue **tqp, *tq; 313 314 tqp = arg; 315 tq = *tqp; 316 TQ_LOCK(tq); 317 do { 318 taskqueue_run(tq); 319 TQ_SLEEP(tq, tq, &tq->tq_mutex, curthread->td_priority, "-", 0); 320 } while (*tq->tq_pproc != NULL); 321 322 /* rendezvous with thread that asked us to terminate */ 323 wakeup_one(tq); 324 TQ_UNLOCK(tq); 325 kthread_exit(0); 326} 327 328void 329taskqueue_thread_enqueue(void *context) 330{ 331 struct taskqueue **tqp, *tq; 332 333 tqp = context; 334 tq = *tqp; 335 336 mtx_assert(&tq->tq_mutex, MA_OWNED); 337 wakeup_one(tq); 338} 339 340TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 341 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 342 INTR_MPSAFE, &taskqueue_ih)); 343 344TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0, 345 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 346 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 347 348TASKQUEUE_DEFINE_THREAD(thread); 349 350struct taskqueue * 351taskqueue_create_fast(const char *name, int mflags, 352 taskqueue_enqueue_fn enqueue, void *context, 353 struct proc **pp) 354{ 355 return _taskqueue_create(name, mflags, enqueue, context, pp, 356 MTX_SPIN, "fast_taskqueue"); 357} 358 359/* NB: for backwards compatibility */ 360int 361taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task) 362{ 363 return taskqueue_enqueue(queue, task); 364} 365 366static void *taskqueue_fast_ih; 367 368static void 369taskqueue_fast_enqueue(void *context) 370{ 371 swi_sched(taskqueue_fast_ih, 0); 372} 373 374static void 375taskqueue_fast_run(void *dummy) 376{ 377 taskqueue_run(taskqueue_fast); 378} 379 380TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, 0, 381 swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL, 382 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 383