1/*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: stable/11/sys/kern/subr_taskqueue.c 354406 2019-11-06 18:15:20Z mav $"); 29 30#include <sys/param.h> 31#include <sys/systm.h> 32#include <sys/bus.h> 33#include <sys/cpuset.h> 34#include <sys/interrupt.h> 35#include <sys/kernel.h> 36#include <sys/kthread.h> 37#include <sys/libkern.h> 38#include <sys/limits.h> 39#include <sys/lock.h> 40#include <sys/malloc.h> 41#include <sys/mutex.h> 42#include <sys/proc.h> 43#include <sys/sched.h> 44#include <sys/smp.h> 45#include <sys/taskqueue.h> 46#include <sys/unistd.h> 47#include <machine/stdarg.h> 48 49static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 50static void *taskqueue_giant_ih; 51static void *taskqueue_ih; 52static void taskqueue_fast_enqueue(void *); 53static void taskqueue_swi_enqueue(void *); 54static void taskqueue_swi_giant_enqueue(void *); 55 56struct taskqueue_busy { 57 struct task *tb_running; 58 u_int tb_seq; 59 LIST_ENTRY(taskqueue_busy) tb_link; 60}; 61 62struct taskqueue { 63 STAILQ_HEAD(, task) tq_queue; 64 LIST_HEAD(, taskqueue_busy) tq_active; 65 struct task *tq_hint; 66 u_int tq_seq; 67 int tq_callouts; 68 struct mtx_padalign tq_mutex; 69 taskqueue_enqueue_fn tq_enqueue; 70 void *tq_context; 71 char *tq_name; 72 struct thread **tq_threads; 73 int tq_tcount; 74 int tq_spin; 75 int tq_flags; 76 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 77 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 78}; 79 80#define TQ_FLAGS_ACTIVE (1 << 0) 81#define TQ_FLAGS_BLOCKED (1 << 1) 82#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 83 84#define DT_CALLOUT_ARMED (1 << 0) 85#define DT_DRAIN_IN_PROGRESS (1 << 1) 86 87#define TQ_LOCK(tq) \ 88 do { \ 89 if ((tq)->tq_spin) \ 90 mtx_lock_spin(&(tq)->tq_mutex); \ 91 else \ 92 mtx_lock(&(tq)->tq_mutex); \ 93 } while (0) 94#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 95 96#define TQ_UNLOCK(tq) \ 97 do { \ 98 if ((tq)->tq_spin) \ 99 mtx_unlock_spin(&(tq)->tq_mutex); \ 100 else \ 101 mtx_unlock(&(tq)->tq_mutex); \ 102 } while (0) 103#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 104 105void 106_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 107 int priority, task_fn_t func, void *context) 108{ 109 110 TASK_INIT(&timeout_task->t, priority, func, context); 111 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 112 CALLOUT_RETURNUNLOCKED); 113 timeout_task->q = queue; 114 timeout_task->f = 0; 115} 116 117static __inline int 118TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 119{ 120 if (tq->tq_spin) 121 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 122 return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 123} 124 125static struct taskqueue * 126_taskqueue_create(const char *name, int mflags, 127 taskqueue_enqueue_fn enqueue, void *context, 128 int mtxflags, const char *mtxname __unused) 129{ 130 struct taskqueue *queue; 131 char *tq_name; 132 133 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 134 if (tq_name == NULL) 135 return (NULL); 136 137 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 138 if (queue == NULL) { 139 free(tq_name, M_TASKQUEUE); 140 return (NULL); 141 } 142 143 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 144 145 STAILQ_INIT(&queue->tq_queue); 146 LIST_INIT(&queue->tq_active); 147 queue->tq_enqueue = enqueue; 148 queue->tq_context = context; 149 queue->tq_name = tq_name; 150 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 151 queue->tq_flags |= TQ_FLAGS_ACTIVE; 152 if (enqueue == taskqueue_fast_enqueue || 153 enqueue == taskqueue_swi_enqueue || 154 enqueue == taskqueue_swi_giant_enqueue || 155 enqueue == taskqueue_thread_enqueue) 156 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 157 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 158 159 return (queue); 160} 161 162struct taskqueue * 163taskqueue_create(const char *name, int mflags, 164 taskqueue_enqueue_fn enqueue, void *context) 165{ 166 167 return _taskqueue_create(name, mflags, enqueue, context, 168 MTX_DEF, name); 169} 170 171void 172taskqueue_set_callback(struct taskqueue *queue, 173 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 174 void *context) 175{ 176 177 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 178 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 179 ("Callback type %d not valid, must be %d-%d", cb_type, 180 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 181 KASSERT((queue->tq_callbacks[cb_type] == NULL), 182 ("Re-initialization of taskqueue callback?")); 183 184 queue->tq_callbacks[cb_type] = callback; 185 queue->tq_cb_contexts[cb_type] = context; 186} 187 188/* 189 * Signal a taskqueue thread to terminate. 190 */ 191static void 192taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 193{ 194 195 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 196 wakeup(tq); 197 TQ_SLEEP(tq, pp, "tq_destroy"); 198 } 199} 200 201void 202taskqueue_free(struct taskqueue *queue) 203{ 204 205 TQ_LOCK(queue); 206 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 207 taskqueue_terminate(queue->tq_threads, queue); 208 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 209 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 210 mtx_destroy(&queue->tq_mutex); 211 free(queue->tq_threads, M_TASKQUEUE); 212 free(queue->tq_name, M_TASKQUEUE); 213 free(queue, M_TASKQUEUE); 214} 215 216static int 217taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 218{ 219 struct task *ins; 220 struct task *prev; 221 222 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 223 /* 224 * Count multiple enqueues. 225 */ 226 if (task->ta_pending) { 227 if (task->ta_pending < USHRT_MAX) 228 task->ta_pending++; 229 TQ_UNLOCK(queue); 230 return (0); 231 } 232 233 /* 234 * Optimise cases when all tasks use small set of priorities. 235 * In case of only one priority we always insert at the end. 236 * In case of two tq_hint typically gives the insertion point. 237 * In case of more then two tq_hint should halve the search. 238 */ 239 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 240 if (!prev || prev->ta_priority >= task->ta_priority) { 241 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 242 } else { 243 prev = queue->tq_hint; 244 if (prev && prev->ta_priority >= task->ta_priority) { 245 ins = STAILQ_NEXT(prev, ta_link); 246 } else { 247 prev = NULL; 248 ins = STAILQ_FIRST(&queue->tq_queue); 249 } 250 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 251 if (ins->ta_priority < task->ta_priority) 252 break; 253 254 if (prev) { 255 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 256 queue->tq_hint = task; 257 } else 258 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 259 } 260 261 task->ta_pending = 1; 262 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 263 TQ_UNLOCK(queue); 264 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 265 queue->tq_enqueue(queue->tq_context); 266 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 267 TQ_UNLOCK(queue); 268 269 /* Return with lock released. */ 270 return (0); 271} 272 273int 274taskqueue_enqueue(struct taskqueue *queue, struct task *task) 275{ 276 int res; 277 278 TQ_LOCK(queue); 279 res = taskqueue_enqueue_locked(queue, task); 280 /* The lock is released inside. */ 281 282 return (res); 283} 284 285static void 286taskqueue_timeout_func(void *arg) 287{ 288 struct taskqueue *queue; 289 struct timeout_task *timeout_task; 290 291 timeout_task = arg; 292 queue = timeout_task->q; 293 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 294 timeout_task->f &= ~DT_CALLOUT_ARMED; 295 queue->tq_callouts--; 296 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 297 /* The lock is released inside. */ 298} 299 300int 301taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 302 struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 303{ 304 int res; 305 306 TQ_LOCK(queue); 307 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 308 ("Migrated queue")); 309 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 310 timeout_task->q = queue; 311 res = timeout_task->t.ta_pending; 312 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 313 /* Do nothing */ 314 TQ_UNLOCK(queue); 315 res = -1; 316 } else if (sbt == 0) { 317 taskqueue_enqueue_locked(queue, &timeout_task->t); 318 /* The lock is released inside. */ 319 } else { 320 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 321 res++; 322 } else { 323 queue->tq_callouts++; 324 timeout_task->f |= DT_CALLOUT_ARMED; 325 if (sbt < 0) 326 sbt = -sbt; /* Ignore overflow. */ 327 } 328 if (sbt > 0) { 329 callout_reset_sbt(&timeout_task->c, sbt, pr, 330 taskqueue_timeout_func, timeout_task, flags); 331 } 332 TQ_UNLOCK(queue); 333 } 334 return (res); 335} 336 337int 338taskqueue_enqueue_timeout(struct taskqueue *queue, 339 struct timeout_task *ttask, int ticks) 340{ 341 342 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 343 0, 0)); 344} 345 346static void 347taskqueue_task_nop_fn(void *context, int pending) 348{ 349} 350 351/* 352 * Block until all currently queued tasks in this taskqueue 353 * have begun execution. Tasks queued during execution of 354 * this function are ignored. 355 */ 356static int 357taskqueue_drain_tq_queue(struct taskqueue *queue) 358{ 359 struct task t_barrier; 360 361 if (STAILQ_EMPTY(&queue->tq_queue)) 362 return (0); 363 364 /* 365 * Enqueue our barrier after all current tasks, but with 366 * the highest priority so that newly queued tasks cannot 367 * pass it. Because of the high priority, we can not use 368 * taskqueue_enqueue_locked directly (which drops the lock 369 * anyway) so just insert it at tail while we have the 370 * queue lock. 371 */ 372 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 373 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 374 queue->tq_hint = &t_barrier; 375 t_barrier.ta_pending = 1; 376 377 /* 378 * Once the barrier has executed, all previously queued tasks 379 * have completed or are currently executing. 380 */ 381 while (t_barrier.ta_pending != 0) 382 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 383 return (1); 384} 385 386/* 387 * Block until all currently executing tasks for this taskqueue 388 * complete. Tasks that begin execution during the execution 389 * of this function are ignored. 390 */ 391static int 392taskqueue_drain_tq_active(struct taskqueue *queue) 393{ 394 struct taskqueue_busy *tb; 395 u_int seq; 396 397 if (LIST_EMPTY(&queue->tq_active)) 398 return (0); 399 400 /* Block taskq_terminate().*/ 401 queue->tq_callouts++; 402 403 /* Wait for any active task with sequence from the past. */ 404 seq = queue->tq_seq; 405restart: 406 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 407 if ((int)(tb->tb_seq - seq) <= 0) { 408 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 409 goto restart; 410 } 411 } 412 413 /* Release taskqueue_terminate(). */ 414 queue->tq_callouts--; 415 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 416 wakeup_one(queue->tq_threads); 417 return (1); 418} 419 420void 421taskqueue_block(struct taskqueue *queue) 422{ 423 424 TQ_LOCK(queue); 425 queue->tq_flags |= TQ_FLAGS_BLOCKED; 426 TQ_UNLOCK(queue); 427} 428 429void 430taskqueue_unblock(struct taskqueue *queue) 431{ 432 433 TQ_LOCK(queue); 434 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 435 if (!STAILQ_EMPTY(&queue->tq_queue)) 436 queue->tq_enqueue(queue->tq_context); 437 TQ_UNLOCK(queue); 438} 439 440static void 441taskqueue_run_locked(struct taskqueue *queue) 442{ 443 struct taskqueue_busy tb; 444 struct task *task; 445 int pending; 446 447 KASSERT(queue != NULL, ("tq is NULL")); 448 TQ_ASSERT_LOCKED(queue); 449 tb.tb_running = NULL; 450 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 451 452 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 453 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 454 if (queue->tq_hint == task) 455 queue->tq_hint = NULL; 456 pending = task->ta_pending; 457 task->ta_pending = 0; 458 tb.tb_running = task; 459 tb.tb_seq = ++queue->tq_seq; 460 TQ_UNLOCK(queue); 461 462 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 463 task->ta_func(task->ta_context, pending); 464 465 TQ_LOCK(queue); 466 wakeup(task); 467 } 468 LIST_REMOVE(&tb, tb_link); 469} 470 471void 472taskqueue_run(struct taskqueue *queue) 473{ 474 475 TQ_LOCK(queue); 476 taskqueue_run_locked(queue); 477 TQ_UNLOCK(queue); 478} 479 480static int 481task_is_running(struct taskqueue *queue, struct task *task) 482{ 483 struct taskqueue_busy *tb; 484 485 TQ_ASSERT_LOCKED(queue); 486 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 487 if (tb->tb_running == task) 488 return (1); 489 } 490 return (0); 491} 492 493/* 494 * Only use this function in single threaded contexts. It returns 495 * non-zero if the given task is either pending or running. Else the 496 * task is idle and can be queued again or freed. 497 */ 498int 499taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 500{ 501 int retval; 502 503 TQ_LOCK(queue); 504 retval = task->ta_pending > 0 || task_is_running(queue, task); 505 TQ_UNLOCK(queue); 506 507 return (retval); 508} 509 510static int 511taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 512 u_int *pendp) 513{ 514 515 if (task->ta_pending > 0) { 516 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 517 if (queue->tq_hint == task) 518 queue->tq_hint = NULL; 519 } 520 if (pendp != NULL) 521 *pendp = task->ta_pending; 522 task->ta_pending = 0; 523 return (task_is_running(queue, task) ? EBUSY : 0); 524} 525 526int 527taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 528{ 529 int error; 530 531 TQ_LOCK(queue); 532 error = taskqueue_cancel_locked(queue, task, pendp); 533 TQ_UNLOCK(queue); 534 535 return (error); 536} 537 538int 539taskqueue_cancel_timeout(struct taskqueue *queue, 540 struct timeout_task *timeout_task, u_int *pendp) 541{ 542 u_int pending, pending1; 543 int error; 544 545 TQ_LOCK(queue); 546 pending = !!(callout_stop(&timeout_task->c) > 0); 547 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 548 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 549 timeout_task->f &= ~DT_CALLOUT_ARMED; 550 queue->tq_callouts--; 551 } 552 TQ_UNLOCK(queue); 553 554 if (pendp != NULL) 555 *pendp = pending + pending1; 556 return (error); 557} 558 559void 560taskqueue_drain(struct taskqueue *queue, struct task *task) 561{ 562 563 if (!queue->tq_spin) 564 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 565 566 TQ_LOCK(queue); 567 while (task->ta_pending != 0 || task_is_running(queue, task)) 568 TQ_SLEEP(queue, task, "tq_drain"); 569 TQ_UNLOCK(queue); 570} 571 572void 573taskqueue_drain_all(struct taskqueue *queue) 574{ 575 576 if (!queue->tq_spin) 577 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 578 579 TQ_LOCK(queue); 580 (void)taskqueue_drain_tq_queue(queue); 581 (void)taskqueue_drain_tq_active(queue); 582 TQ_UNLOCK(queue); 583} 584 585void 586taskqueue_drain_timeout(struct taskqueue *queue, 587 struct timeout_task *timeout_task) 588{ 589 590 /* 591 * Set flag to prevent timer from re-starting during drain: 592 */ 593 TQ_LOCK(queue); 594 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 595 ("Drain already in progress")); 596 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 597 TQ_UNLOCK(queue); 598 599 callout_drain(&timeout_task->c); 600 taskqueue_drain(queue, &timeout_task->t); 601 602 /* 603 * Clear flag to allow timer to re-start: 604 */ 605 TQ_LOCK(queue); 606 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 607 TQ_UNLOCK(queue); 608} 609 610void 611taskqueue_quiesce(struct taskqueue *queue) 612{ 613 int ret; 614 615 TQ_LOCK(queue); 616 do { 617 ret = taskqueue_drain_tq_queue(queue); 618 if (ret == 0) 619 ret = taskqueue_drain_tq_active(queue); 620 } while (ret != 0); 621 TQ_UNLOCK(queue); 622} 623 624static void 625taskqueue_swi_enqueue(void *context) 626{ 627 swi_sched(taskqueue_ih, 0); 628} 629 630static void 631taskqueue_swi_run(void *dummy) 632{ 633 taskqueue_run(taskqueue_swi); 634} 635 636static void 637taskqueue_swi_giant_enqueue(void *context) 638{ 639 swi_sched(taskqueue_giant_ih, 0); 640} 641 642static void 643taskqueue_swi_giant_run(void *dummy) 644{ 645 taskqueue_run(taskqueue_swi_giant); 646} 647 648static int 649_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 650 cpuset_t *mask, const char *name, va_list ap) 651{ 652 char ktname[MAXCOMLEN + 1]; 653 struct thread *td; 654 struct taskqueue *tq; 655 int i, error; 656 657 if (count <= 0) 658 return (EINVAL); 659 660 vsnprintf(ktname, sizeof(ktname), name, ap); 661 tq = *tqp; 662 663 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 664 M_NOWAIT | M_ZERO); 665 if (tq->tq_threads == NULL) { 666 printf("%s: no memory for %s threads\n", __func__, ktname); 667 return (ENOMEM); 668 } 669 670 for (i = 0; i < count; i++) { 671 if (count == 1) 672 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 673 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 674 else 675 error = kthread_add(taskqueue_thread_loop, tqp, NULL, 676 &tq->tq_threads[i], RFSTOPPED, 0, 677 "%s_%d", ktname, i); 678 if (error) { 679 /* should be ok to continue, taskqueue_free will dtrt */ 680 printf("%s: kthread_add(%s): error %d", __func__, 681 ktname, error); 682 tq->tq_threads[i] = NULL; /* paranoid */ 683 } else 684 tq->tq_tcount++; 685 } 686 if (tq->tq_tcount == 0) { 687 free(tq->tq_threads, M_TASKQUEUE); 688 tq->tq_threads = NULL; 689 return (ENOMEM); 690 } 691 for (i = 0; i < count; i++) { 692 if (tq->tq_threads[i] == NULL) 693 continue; 694 td = tq->tq_threads[i]; 695 if (mask) { 696 error = cpuset_setthread(td->td_tid, mask); 697 /* 698 * Failing to pin is rarely an actual fatal error; 699 * it'll just affect performance. 700 */ 701 if (error) 702 printf("%s: curthread=%llu: can't pin; " 703 "error=%d\n", 704 __func__, 705 (unsigned long long) td->td_tid, 706 error); 707 } 708 thread_lock(td); 709 sched_prio(td, pri); 710 sched_add(td, SRQ_BORING); 711 thread_unlock(td); 712 } 713 714 return (0); 715} 716 717int 718taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 719 const char *name, ...) 720{ 721 va_list ap; 722 int error; 723 724 va_start(ap, name); 725 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap); 726 va_end(ap); 727 return (error); 728} 729 730int 731taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 732 cpuset_t *mask, const char *name, ...) 733{ 734 va_list ap; 735 int error; 736 737 va_start(ap, name); 738 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap); 739 va_end(ap); 740 return (error); 741} 742 743static inline void 744taskqueue_run_callback(struct taskqueue *tq, 745 enum taskqueue_callback_type cb_type) 746{ 747 taskqueue_callback_fn tq_callback; 748 749 TQ_ASSERT_UNLOCKED(tq); 750 tq_callback = tq->tq_callbacks[cb_type]; 751 if (tq_callback != NULL) 752 tq_callback(tq->tq_cb_contexts[cb_type]); 753} 754 755void 756taskqueue_thread_loop(void *arg) 757{ 758 struct taskqueue **tqp, *tq; 759 760 tqp = arg; 761 tq = *tqp; 762 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 763 TQ_LOCK(tq); 764 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 765 /* XXX ? */ 766 taskqueue_run_locked(tq); 767 /* 768 * Because taskqueue_run() can drop tq_mutex, we need to 769 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 770 * meantime, which means we missed a wakeup. 771 */ 772 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 773 break; 774 TQ_SLEEP(tq, tq, "-"); 775 } 776 taskqueue_run_locked(tq); 777 /* 778 * This thread is on its way out, so just drop the lock temporarily 779 * in order to call the shutdown callback. This allows the callback 780 * to look at the taskqueue, even just before it dies. 781 */ 782 TQ_UNLOCK(tq); 783 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 784 TQ_LOCK(tq); 785 786 /* rendezvous with thread that asked us to terminate */ 787 tq->tq_tcount--; 788 wakeup_one(tq->tq_threads); 789 TQ_UNLOCK(tq); 790 kthread_exit(); 791} 792 793void 794taskqueue_thread_enqueue(void *context) 795{ 796 struct taskqueue **tqp, *tq; 797 798 tqp = context; 799 tq = *tqp; 800 wakeup_any(tq); 801} 802 803TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 804 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 805 INTR_MPSAFE, &taskqueue_ih)); 806 807TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 808 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 809 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 810 811TASKQUEUE_DEFINE_THREAD(thread); 812 813struct taskqueue * 814taskqueue_create_fast(const char *name, int mflags, 815 taskqueue_enqueue_fn enqueue, void *context) 816{ 817 return _taskqueue_create(name, mflags, enqueue, context, 818 MTX_SPIN, "fast_taskqueue"); 819} 820 821static void *taskqueue_fast_ih; 822 823static void 824taskqueue_fast_enqueue(void *context) 825{ 826 swi_sched(taskqueue_fast_ih, 0); 827} 828 829static void 830taskqueue_fast_run(void *dummy) 831{ 832 taskqueue_run(taskqueue_fast); 833} 834 835TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 836 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 837 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 838 839int 840taskqueue_member(struct taskqueue *queue, struct thread *td) 841{ 842 int i, j, ret = 0; 843 844 for (i = 0, j = 0; ; i++) { 845 if (queue->tq_threads[i] == NULL) 846 continue; 847 if (queue->tq_threads[i] == td) { 848 ret = 1; 849 break; 850 } 851 if (++j >= queue->tq_tcount) 852 break; 853 } 854 return (ret); 855} 856