1/* $NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $ */ 2 3/*- 4 * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Taylor R. Campbell and Jason R. Thorpe. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32/* 33 * Thread pools. 34 * 35 * A thread pool is a collection of worker threads idle or running 36 * jobs, together with a dispatcher thread that does not run jobs but 37 * can be given jobs to assign to a worker thread. Scheduling a job in 38 * a thread pool does not allocate or even sleep at all, except perhaps 39 * on an adaptive lock, unlike kthread_create. Jobs reuse threads, so 40 * they do not incur the expense of creating and destroying kthreads 41 * unless there is not much work to be done. 42 * 43 * A per-CPU thread pool (threadpool_percpu) is a collection of thread 44 * pools, one per CPU bound to that CPU. For each priority level in 45 * use, there is one shared unbound thread pool (i.e., pool of threads 46 * not bound to any CPU) and one shared per-CPU thread pool. 47 * 48 * To use the unbound thread pool at priority pri, call 49 * threadpool_get(&pool, pri). When you're done, call 50 * threadpool_put(pool, pri). 51 * 52 * To use the per-CPU thread pools at priority pri, call 53 * threadpool_percpu_get(&pool_percpu, pri), and then use the thread 54 * pool returned by threadpool_percpu_ref(pool_percpu) for the current 55 * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another 56 * CPU. When you're done, call threadpool_percpu_put(pool_percpu, 57 * pri). 58 * 59 * +--MACHINE-----------------------------------------------------+ 60 * | +--CPU 0---------+ +--CPU 1---------+ +--CPU n---------+ | 61 * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | | 62 * | | <idle 0a> | | <running 1a> | ... | <idle na> | | 63 * | | <running 0b> | | <running 1b> | ... | <idle nb> | | 64 * | | . | | . | ... | . | | 65 * | | . | | . | ... | . | | 66 * | | . | | . | ... | . | | 67 * | +----------------+ +----------------+ +----------------+ | 68 * | +--unbound-----------+ | 69 * | | <dispatcher n+1> | | 70 * | | <idle (n+1)a> | | 71 * | | <running (n+1)b> | | 72 * | +--------------------+ | 73 * +--------------------------------------------------------------+ 74 * 75 * XXX Why one dispatcher per CPU? I did that originally to avoid 76 * touching remote CPUs' memory when scheduling a job, but that still 77 * requires interprocessor synchronization. Perhaps we could get by 78 * with a single dispatcher thread, at the expense of another pointer 79 * in struct threadpool_job to identify the CPU on which it must run in 80 * order for the dispatcher to schedule it correctly. 81 */ 82 83#include <sys/cdefs.h> 84__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $"); 85 86#include <sys/types.h> 87#include <sys/param.h> 88#include <sys/atomic.h> 89#include <sys/condvar.h> 90#include <sys/cpu.h> 91#include <sys/kernel.h> 92#include <sys/kmem.h> 93#include <sys/kthread.h> 94#include <sys/mutex.h> 95#include <sys/once.h> 96#include <sys/percpu.h> 97#include <sys/pool.h> 98#include <sys/proc.h> 99#include <sys/queue.h> 100#include <sys/sdt.h> 101#include <sys/sysctl.h> 102#include <sys/systm.h> 103#include <sys/threadpool.h> 104 105/* Probes */ 106 107SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get, 108 "pri_t"/*pri*/); 109SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create, 110 "pri_t"/*pri*/); 111SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race, 112 "pri_t"/*pri*/); 113SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put, 114 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 115SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy, 116 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 117 118SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get, 119 "pri_t"/*pri*/); 120SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create, 121 "pri_t"/*pri*/); 122SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race, 123 "pri_t"/*pri*/); 124SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put, 125 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 126SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy, 127 "struct threadpool *"/*pool*/, "pri_t"/*pri*/); 128 129SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create, 130 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/); 131SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success, 132 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/); 133SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure, 134 "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/); 135SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy, 136 "struct threadpool *"/*pool*/); 137SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait, 138 "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/); 139 140SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job, 141 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 142SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running, 143 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 144SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher, 145 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 146SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread, 147 "struct threadpool *"/*pool*/, 148 "struct threadpool_job *"/*job*/, 149 "struct lwp *"/*thread*/); 150 151SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start, 152 "struct threadpool *"/*pool*/); 153SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying, 154 "struct threadpool *"/*pool*/); 155SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn, 156 "struct threadpool *"/*pool*/); 157SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race, 158 "struct threadpool *"/*pool*/, 159 "struct threadpool_job *"/*job*/); 160SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign, 161 "struct threadpool *"/*pool*/, 162 "struct threadpool_job *"/*job*/, 163 "struct lwp *"/*thread*/); 164SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit, 165 "struct threadpool *"/*pool*/); 166 167SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start, 168 "struct threadpool *"/*pool*/); 169SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying, 170 "struct threadpool *"/*pool*/); 171SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job, 172 "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/); 173SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit, 174 "struct threadpool *"/*pool*/); 175 176/* Data structures */ 177 178TAILQ_HEAD(job_head, threadpool_job); 179TAILQ_HEAD(thread_head, threadpool_thread); 180 181struct threadpool_thread { 182 struct lwp *tpt_lwp; 183 char *tpt_lwp_savedname; 184 struct threadpool *tpt_pool; 185 struct threadpool_job *tpt_job; 186 kcondvar_t tpt_cv; 187 TAILQ_ENTRY(threadpool_thread) tpt_entry; 188}; 189 190struct threadpool { 191 kmutex_t tp_lock; 192 struct threadpool_thread tp_dispatcher; 193 struct job_head tp_jobs; 194 struct thread_head tp_idle_threads; 195 uint64_t tp_refcnt; 196 int tp_flags; 197#define THREADPOOL_DYING 0x01 198 struct cpu_info *tp_cpu; 199 pri_t tp_pri; 200}; 201 202static void threadpool_hold(struct threadpool *); 203static void threadpool_rele(struct threadpool *); 204 205static int threadpool_percpu_create(struct threadpool_percpu **, pri_t); 206static void threadpool_percpu_destroy(struct threadpool_percpu *); 207static void threadpool_percpu_init(void *, void *, struct cpu_info *); 208static void threadpool_percpu_ok(void *, void *, struct cpu_info *); 209static void threadpool_percpu_fini(void *, void *, struct cpu_info *); 210 211static threadpool_job_fn_t threadpool_job_dead; 212 213static void threadpool_job_hold(struct threadpool_job *); 214static void threadpool_job_rele(struct threadpool_job *); 215 216static void threadpool_dispatcher_thread(void *) __dead; 217static void threadpool_thread(void *) __dead; 218 219static pool_cache_t threadpool_thread_pc __read_mostly; 220 221static kmutex_t threadpools_lock __cacheline_aligned; 222 223 /* Default to 30 second idle timeout for pool threads. */ 224static int threadpool_idle_time_ms = 30 * 1000; 225 226struct threadpool_unbound { 227 struct threadpool tpu_pool; 228 229 /* protected by threadpools_lock */ 230 LIST_ENTRY(threadpool_unbound) tpu_link; 231 uint64_t tpu_refcnt; 232}; 233 234static LIST_HEAD(, threadpool_unbound) unbound_threadpools; 235 236static struct threadpool_unbound * 237threadpool_lookup_unbound(pri_t pri) 238{ 239 struct threadpool_unbound *tpu; 240 241 LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) { 242 if (tpu->tpu_pool.tp_pri == pri) 243 return tpu; 244 } 245 return NULL; 246} 247 248static void 249threadpool_insert_unbound(struct threadpool_unbound *tpu) 250{ 251 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL); 252 LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link); 253} 254 255static void 256threadpool_remove_unbound(struct threadpool_unbound *tpu) 257{ 258 KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu); 259 LIST_REMOVE(tpu, tpu_link); 260} 261 262struct threadpool_percpu { 263 percpu_t * tpp_percpu; 264 pri_t tpp_pri; 265 266 /* protected by threadpools_lock */ 267 LIST_ENTRY(threadpool_percpu) tpp_link; 268 uint64_t tpp_refcnt; 269}; 270 271static LIST_HEAD(, threadpool_percpu) percpu_threadpools; 272 273static struct threadpool_percpu * 274threadpool_lookup_percpu(pri_t pri) 275{ 276 struct threadpool_percpu *tpp; 277 278 LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) { 279 if (tpp->tpp_pri == pri) 280 return tpp; 281 } 282 return NULL; 283} 284 285static void 286threadpool_insert_percpu(struct threadpool_percpu *tpp) 287{ 288 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL); 289 LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link); 290} 291 292static void 293threadpool_remove_percpu(struct threadpool_percpu *tpp) 294{ 295 KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp); 296 LIST_REMOVE(tpp, tpp_link); 297} 298 299static int 300sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS) 301{ 302 struct sysctlnode node; 303 int val, error; 304 305 node = *rnode; 306 307 val = threadpool_idle_time_ms; 308 node.sysctl_data = &val; 309 error = sysctl_lookup(SYSCTLFN_CALL(&node)); 310 if (error == 0 && newp != NULL) { 311 /* Disallow negative values and 0 (forever). */ 312 if (val < 1) 313 error = EINVAL; 314 else 315 threadpool_idle_time_ms = val; 316 } 317 318 return error; 319} 320 321SYSCTL_SETUP_PROTO(sysctl_threadpool_setup); 322 323SYSCTL_SETUP(sysctl_threadpool_setup, 324 "sysctl kern.threadpool subtree setup") 325{ 326 const struct sysctlnode *rnode, *cnode; 327 int error __diagused; 328 329 error = sysctl_createv(clog, 0, NULL, &rnode, 330 CTLFLAG_PERMANENT, 331 CTLTYPE_NODE, "threadpool", 332 SYSCTL_DESCR("threadpool subsystem options"), 333 NULL, 0, NULL, 0, 334 CTL_KERN, CTL_CREATE, CTL_EOL); 335 KASSERT(error == 0); 336 337 error = sysctl_createv(clog, 0, &rnode, &cnode, 338 CTLFLAG_PERMANENT | CTLFLAG_READWRITE, 339 CTLTYPE_INT, "idle_ms", 340 SYSCTL_DESCR("idle thread timeout in ms"), 341 sysctl_kern_threadpool_idle_ms, 0, NULL, 0, 342 CTL_CREATE, CTL_EOL); 343 KASSERT(error == 0); 344} 345 346void 347threadpools_init(void) 348{ 349 350 threadpool_thread_pc = 351 pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0, 352 "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL); 353 354 LIST_INIT(&unbound_threadpools); 355 LIST_INIT(&percpu_threadpools); 356 mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE); 357} 358 359static void 360threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri) 361{ 362 363 buf[0] = '\0'; 364 if (ci) 365 snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d", 366 cpu_index(ci)); 367 if (pri != PRI_NONE) 368 snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri); 369} 370 371/* Thread pool creation */ 372 373static bool 374threadpool_pri_is_valid(pri_t pri) 375{ 376 return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT)); 377} 378 379static int 380threadpool_create(struct threadpool *const pool, struct cpu_info *ci, 381 pri_t pri) 382{ 383 struct lwp *lwp; 384 char suffix[16]; 385 int ktflags; 386 int error; 387 388 KASSERT(threadpool_pri_is_valid(pri)); 389 390 SDT_PROBE2(sdt, kernel, threadpool, create, ci, pri); 391 392 mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM); 393 /* XXX dispatcher */ 394 TAILQ_INIT(&pool->tp_jobs); 395 TAILQ_INIT(&pool->tp_idle_threads); 396 pool->tp_refcnt = 1; /* dispatcher's reference */ 397 pool->tp_flags = 0; 398 pool->tp_cpu = ci; 399 pool->tp_pri = pri; 400 401 pool->tp_dispatcher.tpt_lwp = NULL; 402 pool->tp_dispatcher.tpt_pool = pool; 403 pool->tp_dispatcher.tpt_job = NULL; 404 cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp"); 405 406 ktflags = 0; 407 ktflags |= KTHREAD_MPSAFE; 408 if (pri < PRI_KERNEL) 409 ktflags |= KTHREAD_TS; 410 threadnamesuffix(suffix, sizeof(suffix), ci, pri); 411 error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread, 412 &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix); 413 if (error) 414 goto fail0; 415 416 mutex_spin_enter(&pool->tp_lock); 417 pool->tp_dispatcher.tpt_lwp = lwp; 418 cv_broadcast(&pool->tp_dispatcher.tpt_cv); 419 mutex_spin_exit(&pool->tp_lock); 420 421 SDT_PROBE3(sdt, kernel, threadpool, create__success, ci, pri, pool); 422 return 0; 423 424fail0: KASSERT(error); 425 KASSERT(pool->tp_dispatcher.tpt_job == NULL); 426 KASSERT(pool->tp_dispatcher.tpt_pool == pool); 427 KASSERT(pool->tp_flags == 0); 428 KASSERT(pool->tp_refcnt == 0); 429 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 430 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 431 KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv)); 432 cv_destroy(&pool->tp_dispatcher.tpt_cv); 433 mutex_destroy(&pool->tp_lock); 434 SDT_PROBE3(sdt, kernel, threadpool, create__failure, ci, pri, error); 435 return error; 436} 437 438/* Thread pool destruction */ 439 440static void 441threadpool_destroy(struct threadpool *pool) 442{ 443 struct threadpool_thread *thread; 444 445 SDT_PROBE1(sdt, kernel, threadpool, destroy, pool); 446 447 /* Mark the pool dying and wait for threads to commit suicide. */ 448 mutex_spin_enter(&pool->tp_lock); 449 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 450 pool->tp_flags |= THREADPOOL_DYING; 451 cv_broadcast(&pool->tp_dispatcher.tpt_cv); 452 TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry) 453 cv_broadcast(&thread->tpt_cv); 454 while (0 < pool->tp_refcnt) { 455 SDT_PROBE2(sdt, kernel, threadpool, destroy__wait, 456 pool, pool->tp_refcnt); 457 cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock); 458 } 459 mutex_spin_exit(&pool->tp_lock); 460 461 KASSERT(pool->tp_dispatcher.tpt_job == NULL); 462 KASSERT(pool->tp_dispatcher.tpt_pool == pool); 463 KASSERT(pool->tp_flags == THREADPOOL_DYING); 464 KASSERT(pool->tp_refcnt == 0); 465 KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads)); 466 KASSERT(TAILQ_EMPTY(&pool->tp_jobs)); 467 KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv)); 468 cv_destroy(&pool->tp_dispatcher.tpt_cv); 469 mutex_destroy(&pool->tp_lock); 470} 471 472static void 473threadpool_hold(struct threadpool *pool) 474{ 475 476 KASSERT(mutex_owned(&pool->tp_lock)); 477 pool->tp_refcnt++; 478 KASSERT(pool->tp_refcnt != 0); 479} 480 481static void 482threadpool_rele(struct threadpool *pool) 483{ 484 485 KASSERT(mutex_owned(&pool->tp_lock)); 486 KASSERT(0 < pool->tp_refcnt); 487 if (--pool->tp_refcnt == 0) 488 cv_broadcast(&pool->tp_dispatcher.tpt_cv); 489} 490 491/* Unbound thread pools */ 492 493int 494threadpool_get(struct threadpool **poolp, pri_t pri) 495{ 496 struct threadpool_unbound *tpu, *tmp = NULL; 497 int error; 498 499 ASSERT_SLEEPABLE(); 500 501 SDT_PROBE1(sdt, kernel, threadpool, get, pri); 502 503 if (! threadpool_pri_is_valid(pri)) 504 return EINVAL; 505 506 mutex_enter(&threadpools_lock); 507 tpu = threadpool_lookup_unbound(pri); 508 if (tpu == NULL) { 509 mutex_exit(&threadpools_lock); 510 SDT_PROBE1(sdt, kernel, threadpool, get__create, pri); 511 tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP); 512 error = threadpool_create(&tmp->tpu_pool, NULL, pri); 513 if (error) { 514 kmem_free(tmp, sizeof(*tmp)); 515 return error; 516 } 517 mutex_enter(&threadpools_lock); 518 tpu = threadpool_lookup_unbound(pri); 519 if (tpu == NULL) { 520 tpu = tmp; 521 tmp = NULL; 522 threadpool_insert_unbound(tpu); 523 } else { 524 SDT_PROBE1(sdt, kernel, threadpool, get__race, pri); 525 } 526 } 527 KASSERT(tpu != NULL); 528 tpu->tpu_refcnt++; 529 KASSERT(tpu->tpu_refcnt != 0); 530 mutex_exit(&threadpools_lock); 531 532 if (tmp != NULL) { 533 threadpool_destroy(&tmp->tpu_pool); 534 kmem_free(tmp, sizeof(*tmp)); 535 } 536 KASSERT(tpu != NULL); 537 *poolp = &tpu->tpu_pool; 538 return 0; 539} 540 541void 542threadpool_put(struct threadpool *pool, pri_t pri) 543{ 544 struct threadpool_unbound *tpu = 545 container_of(pool, struct threadpool_unbound, tpu_pool); 546 547 ASSERT_SLEEPABLE(); 548 KASSERT(threadpool_pri_is_valid(pri)); 549 550 SDT_PROBE2(sdt, kernel, threadpool, put, pool, pri); 551 552 mutex_enter(&threadpools_lock); 553 KASSERT(tpu == threadpool_lookup_unbound(pri)); 554 KASSERT(0 < tpu->tpu_refcnt); 555 if (--tpu->tpu_refcnt == 0) { 556 SDT_PROBE2(sdt, kernel, threadpool, put__destroy, pool, pri); 557 threadpool_remove_unbound(tpu); 558 } else { 559 tpu = NULL; 560 } 561 mutex_exit(&threadpools_lock); 562 563 if (tpu) { 564 threadpool_destroy(&tpu->tpu_pool); 565 kmem_free(tpu, sizeof(*tpu)); 566 } 567} 568 569/* Per-CPU thread pools */ 570 571int 572threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri) 573{ 574 struct threadpool_percpu *pool_percpu, *tmp = NULL; 575 int error; 576 577 ASSERT_SLEEPABLE(); 578 579 SDT_PROBE1(sdt, kernel, threadpool, percpu__get, pri); 580 581 if (! threadpool_pri_is_valid(pri)) 582 return EINVAL; 583 584 mutex_enter(&threadpools_lock); 585 pool_percpu = threadpool_lookup_percpu(pri); 586 if (pool_percpu == NULL) { 587 mutex_exit(&threadpools_lock); 588 SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create, pri); 589 error = threadpool_percpu_create(&tmp, pri); 590 if (error) 591 return error; 592 KASSERT(tmp != NULL); 593 mutex_enter(&threadpools_lock); 594 pool_percpu = threadpool_lookup_percpu(pri); 595 if (pool_percpu == NULL) { 596 pool_percpu = tmp; 597 tmp = NULL; 598 threadpool_insert_percpu(pool_percpu); 599 } else { 600 SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race, 601 pri); 602 } 603 } 604 KASSERT(pool_percpu != NULL); 605 pool_percpu->tpp_refcnt++; 606 KASSERT(pool_percpu->tpp_refcnt != 0); 607 mutex_exit(&threadpools_lock); 608 609 if (tmp != NULL) 610 threadpool_percpu_destroy(tmp); 611 KASSERT(pool_percpu != NULL); 612 *pool_percpup = pool_percpu; 613 return 0; 614} 615 616void 617threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri) 618{ 619 620 ASSERT_SLEEPABLE(); 621 622 KASSERT(threadpool_pri_is_valid(pri)); 623 624 SDT_PROBE2(sdt, kernel, threadpool, percpu__put, pool_percpu, pri); 625 626 mutex_enter(&threadpools_lock); 627 KASSERT(pool_percpu == threadpool_lookup_percpu(pri)); 628 KASSERT(0 < pool_percpu->tpp_refcnt); 629 if (--pool_percpu->tpp_refcnt == 0) { 630 SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy, 631 pool_percpu, pri); 632 threadpool_remove_percpu(pool_percpu); 633 } else { 634 pool_percpu = NULL; 635 } 636 mutex_exit(&threadpools_lock); 637 638 if (pool_percpu) 639 threadpool_percpu_destroy(pool_percpu); 640} 641 642struct threadpool * 643threadpool_percpu_ref(struct threadpool_percpu *pool_percpu) 644{ 645 struct threadpool **poolp, *pool; 646 647 poolp = percpu_getref(pool_percpu->tpp_percpu); 648 pool = *poolp; 649 percpu_putref(pool_percpu->tpp_percpu); 650 651 return pool; 652} 653 654struct threadpool * 655threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu, 656 struct cpu_info *ci) 657{ 658 struct threadpool **poolp, *pool; 659 660 /* 661 * As long as xcalls are blocked -- e.g., by kpreempt_disable 662 * -- the percpu object will not be swapped and destroyed. We 663 * can't write to it, because the data may have already been 664 * moved to a new buffer, but we can safely read from it. 665 */ 666 kpreempt_disable(); 667 poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci); 668 pool = *poolp; 669 kpreempt_enable(); 670 671 return pool; 672} 673 674static int 675threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri) 676{ 677 struct threadpool_percpu *pool_percpu; 678 bool ok = true; 679 680 pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP); 681 pool_percpu->tpp_pri = pri; 682 pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *), 683 threadpool_percpu_init, threadpool_percpu_fini, 684 (void *)(intptr_t)pri); 685 686 /* 687 * Verify that all of the CPUs were initialized. 688 * 689 * XXX What to do if we add CPU hotplug? 690 */ 691 percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok); 692 if (!ok) 693 goto fail; 694 695 /* Success! */ 696 *pool_percpup = (struct threadpool_percpu *)pool_percpu; 697 return 0; 698 699fail: percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 700 kmem_free(pool_percpu, sizeof(*pool_percpu)); 701 return ENOMEM; 702} 703 704static void 705threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu) 706{ 707 708 percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *)); 709 kmem_free(pool_percpu, sizeof(*pool_percpu)); 710} 711 712static void 713threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci) 714{ 715 struct threadpool **const poolp = vpoolp; 716 pri_t pri = (intptr_t)(void *)vpri; 717 int error; 718 719 *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP); 720 error = threadpool_create(*poolp, ci, pri); 721 if (error) { 722 KASSERT(error == ENOMEM); 723 kmem_free(*poolp, sizeof(**poolp)); 724 *poolp = NULL; 725 } 726} 727 728static void 729threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci) 730{ 731 struct threadpool **const poolp = vpoolp; 732 bool *okp = vokp; 733 734 if (*poolp == NULL) 735 atomic_store_relaxed(okp, false); 736} 737 738static void 739threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci) 740{ 741 struct threadpool **const poolp = vpoolp; 742 743 if (*poolp == NULL) /* initialization failed */ 744 return; 745 threadpool_destroy(*poolp); 746 kmem_free(*poolp, sizeof(**poolp)); 747} 748 749/* Thread pool jobs */ 750 751void __printflike(4,5) 752threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn, 753 kmutex_t *lock, const char *fmt, ...) 754{ 755 va_list ap; 756 757 va_start(ap, fmt); 758 (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap); 759 va_end(ap); 760 761 job->job_lock = lock; 762 job->job_thread = NULL; 763 job->job_refcnt = 0; 764 cv_init(&job->job_cv, job->job_name); 765 job->job_fn = fn; 766} 767 768static void 769threadpool_job_dead(struct threadpool_job *job) 770{ 771 772 panic("threadpool job %p ran after destruction", job); 773} 774 775void 776threadpool_job_destroy(struct threadpool_job *job) 777{ 778 779 ASSERT_SLEEPABLE(); 780 781 KASSERTMSG((job->job_thread == NULL), "job %p still running", job); 782 783 mutex_enter(job->job_lock); 784 while (0 < atomic_load_relaxed(&job->job_refcnt)) 785 cv_wait(&job->job_cv, job->job_lock); 786 mutex_exit(job->job_lock); 787 788 job->job_lock = NULL; 789 KASSERT(job->job_thread == NULL); 790 KASSERT(job->job_refcnt == 0); 791 KASSERT(!cv_has_waiters(&job->job_cv)); 792 cv_destroy(&job->job_cv); 793 job->job_fn = threadpool_job_dead; 794 (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name)); 795} 796 797static void 798threadpool_job_hold(struct threadpool_job *job) 799{ 800 unsigned int refcnt __diagused; 801 802 refcnt = atomic_inc_uint_nv(&job->job_refcnt); 803 KASSERT(refcnt != 0); 804} 805 806static void 807threadpool_job_rele(struct threadpool_job *job) 808{ 809 unsigned int refcnt; 810 811 KASSERT(mutex_owned(job->job_lock)); 812 813 refcnt = atomic_dec_uint_nv(&job->job_refcnt); 814 KASSERT(refcnt != UINT_MAX); 815 if (refcnt == 0) 816 cv_broadcast(&job->job_cv); 817} 818 819void 820threadpool_job_done(struct threadpool_job *job) 821{ 822 823 KASSERT(mutex_owned(job->job_lock)); 824 KASSERT(job->job_thread != NULL); 825 KASSERT(job->job_thread->tpt_lwp == curlwp); 826 827 /* 828 * We can safely read this field; it's only modified right before 829 * we call the job work function, and we are only preserving it 830 * to use here; no one cares if it contains junk afterward. 831 */ 832 lwp_lock(curlwp); 833 curlwp->l_name = job->job_thread->tpt_lwp_savedname; 834 lwp_unlock(curlwp); 835 836 /* 837 * Inline the work of threadpool_job_rele(); the job is already 838 * locked, the most likely scenario (XXXJRT only scenario?) is 839 * that we're dropping the last reference (the one taken in 840 * threadpool_schedule_job()), and we always do the cv_broadcast() 841 * anyway. 842 */ 843 KASSERT(0 < atomic_load_relaxed(&job->job_refcnt)); 844 unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt); 845 KASSERT(refcnt != UINT_MAX); 846 cv_broadcast(&job->job_cv); 847 job->job_thread = NULL; 848} 849 850void 851threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job) 852{ 853 854 KASSERT(mutex_owned(job->job_lock)); 855 856 SDT_PROBE2(sdt, kernel, threadpool, schedule__job, pool, job); 857 858 /* 859 * If the job's already running, let it keep running. The job 860 * is guaranteed by the interlock not to end early -- if it had 861 * ended early, threadpool_job_done would have set job_thread 862 * to NULL under the interlock. 863 */ 864 if (__predict_true(job->job_thread != NULL)) { 865 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running, 866 pool, job); 867 return; 868 } 869 870 threadpool_job_hold(job); 871 872 /* Otherwise, try to assign a thread to the job. */ 873 mutex_spin_enter(&pool->tp_lock); 874 if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) { 875 /* Nobody's idle. Give it to the dispatcher. */ 876 SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher, 877 pool, job); 878 job->job_thread = &pool->tp_dispatcher; 879 TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry); 880 } else { 881 /* Assign it to the first idle thread. */ 882 job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads); 883 SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread, 884 pool, job, job->job_thread->tpt_lwp); 885 TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread, 886 tpt_entry); 887 job->job_thread->tpt_job = job; 888 } 889 890 /* Notify whomever we gave it to, dispatcher or idle thread. */ 891 KASSERT(job->job_thread != NULL); 892 cv_broadcast(&job->job_thread->tpt_cv); 893 mutex_spin_exit(&pool->tp_lock); 894} 895 896bool 897threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job) 898{ 899 900 KASSERT(mutex_owned(job->job_lock)); 901 902 /* 903 * XXXJRT This fails (albeit safely) when all of the following 904 * are true: 905 * 906 * => "pool" is something other than what the job was 907 * scheduled on. This can legitimately occur if, 908 * for example, a job is percpu-scheduled on CPU0 909 * and then CPU1 attempts to cancel it without taking 910 * a remote pool reference. (this might happen by 911 * "luck of the draw"). 912 * 913 * => "job" is not yet running, but is assigned to the 914 * dispatcher. 915 * 916 * When this happens, this code makes the determination that 917 * the job is already running. The failure mode is that the 918 * caller is told the job is running, and thus has to wait. 919 * The dispatcher will eventually get to it and the job will 920 * proceed as if it had been already running. 921 */ 922 923 if (job->job_thread == NULL) { 924 /* Nothing to do. Guaranteed not running. */ 925 return true; 926 } else if (job->job_thread == &pool->tp_dispatcher) { 927 /* Take it off the list to guarantee it won't run. */ 928 job->job_thread = NULL; 929 mutex_spin_enter(&pool->tp_lock); 930 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 931 mutex_spin_exit(&pool->tp_lock); 932 threadpool_job_rele(job); 933 return true; 934 } else { 935 /* Too late -- already running. */ 936 return false; 937 } 938} 939 940void 941threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job) 942{ 943 944 /* 945 * We may sleep here, but we can't ASSERT_SLEEPABLE() because 946 * the job lock (used to interlock the cv_wait()) may in fact 947 * legitimately be a spin lock, so the assertion would fire 948 * as a false-positive. 949 */ 950 951 KASSERT(mutex_owned(job->job_lock)); 952 953 if (threadpool_cancel_job_async(pool, job)) 954 return; 955 956 /* Already running. Wait for it to complete. */ 957 while (job->job_thread != NULL) 958 cv_wait(&job->job_cv, job->job_lock); 959} 960 961/* Thread pool dispatcher thread */ 962 963static void __dead 964threadpool_dispatcher_thread(void *arg) 965{ 966 struct threadpool_thread *const dispatcher = arg; 967 struct threadpool *const pool = dispatcher->tpt_pool; 968 struct lwp *lwp = NULL; 969 int ktflags; 970 char suffix[16]; 971 int error; 972 973 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 974 KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 975 976 /* Wait until we're initialized. */ 977 mutex_spin_enter(&pool->tp_lock); 978 while (dispatcher->tpt_lwp == NULL) 979 cv_wait(&dispatcher->tpt_cv, &pool->tp_lock); 980 981 SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start, pool); 982 983 for (;;) { 984 /* Wait until there's a job. */ 985 while (TAILQ_EMPTY(&pool->tp_jobs)) { 986 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 987 SDT_PROBE1(sdt, kernel, threadpool, 988 dispatcher__dying, pool); 989 break; 990 } 991 cv_wait(&dispatcher->tpt_cv, &pool->tp_lock); 992 } 993 if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs))) 994 break; 995 996 /* If there are no threads, we'll have to try to start one. */ 997 if (TAILQ_EMPTY(&pool->tp_idle_threads)) { 998 SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn, 999 pool); 1000 threadpool_hold(pool); 1001 mutex_spin_exit(&pool->tp_lock); 1002 1003 struct threadpool_thread *const thread = 1004 pool_cache_get(threadpool_thread_pc, PR_WAITOK); 1005 thread->tpt_lwp = NULL; 1006 thread->tpt_pool = pool; 1007 thread->tpt_job = NULL; 1008 cv_init(&thread->tpt_cv, "pooljob"); 1009 1010 ktflags = 0; 1011 ktflags |= KTHREAD_MPSAFE; 1012 if (pool->tp_pri < PRI_KERNEL) 1013 ktflags |= KTHREAD_TS; 1014 threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu, 1015 pool->tp_pri); 1016 error = kthread_create(pool->tp_pri, ktflags, 1017 pool->tp_cpu, &threadpool_thread, thread, &lwp, 1018 "poolthread%s", suffix); 1019 1020 mutex_spin_enter(&pool->tp_lock); 1021 if (error) { 1022 pool_cache_put(threadpool_thread_pc, thread); 1023 threadpool_rele(pool); 1024 /* XXX What to do to wait for memory? */ 1025 (void)kpause("thrdplcr", false, hz, 1026 &pool->tp_lock); 1027 continue; 1028 } 1029 /* 1030 * New kthread now owns the reference to the pool 1031 * taken above. 1032 */ 1033 KASSERT(lwp != NULL); 1034 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, 1035 tpt_entry); 1036 thread->tpt_lwp = lwp; 1037 lwp = NULL; 1038 cv_broadcast(&thread->tpt_cv); 1039 continue; 1040 } 1041 1042 /* There are idle threads, so try giving one a job. */ 1043 struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs); 1044 1045 /* 1046 * Take an extra reference on the job temporarily so that 1047 * it won't disappear on us while we have both locks dropped. 1048 */ 1049 threadpool_job_hold(job); 1050 mutex_spin_exit(&pool->tp_lock); 1051 1052 mutex_enter(job->job_lock); 1053 /* If the job was cancelled, we'll no longer be its thread. */ 1054 if (__predict_true(job->job_thread == dispatcher)) { 1055 mutex_spin_enter(&pool->tp_lock); 1056 TAILQ_REMOVE(&pool->tp_jobs, job, job_entry); 1057 if (__predict_false( 1058 TAILQ_EMPTY(&pool->tp_idle_threads))) { 1059 /* 1060 * Someone else snagged the thread 1061 * first. We'll have to try again. 1062 */ 1063 SDT_PROBE2(sdt, kernel, threadpool, 1064 dispatcher__race, pool, job); 1065 TAILQ_INSERT_HEAD(&pool->tp_jobs, job, 1066 job_entry); 1067 } else { 1068 /* 1069 * Assign the job to the thread and 1070 * wake the thread so it starts work. 1071 */ 1072 struct threadpool_thread *const thread = 1073 TAILQ_FIRST(&pool->tp_idle_threads); 1074 1075 SDT_PROBE2(sdt, kernel, threadpool, 1076 dispatcher__assign, job, thread->tpt_lwp); 1077 KASSERT(thread->tpt_job == NULL); 1078 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1079 tpt_entry); 1080 thread->tpt_job = job; 1081 job->job_thread = thread; 1082 cv_broadcast(&thread->tpt_cv); 1083 } 1084 mutex_spin_exit(&pool->tp_lock); 1085 } 1086 threadpool_job_rele(job); 1087 mutex_exit(job->job_lock); 1088 1089 mutex_spin_enter(&pool->tp_lock); 1090 } 1091 threadpool_rele(pool); 1092 mutex_spin_exit(&pool->tp_lock); 1093 1094 SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit, pool); 1095 1096 kthread_exit(0); 1097} 1098 1099/* Thread pool thread */ 1100 1101static void __dead 1102threadpool_thread(void *arg) 1103{ 1104 struct threadpool_thread *const thread = arg; 1105 struct threadpool *const pool = thread->tpt_pool; 1106 1107 KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu())); 1108 KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND)); 1109 1110 /* Wait until we're initialized and on the queue. */ 1111 mutex_spin_enter(&pool->tp_lock); 1112 while (thread->tpt_lwp == NULL) 1113 cv_wait(&thread->tpt_cv, &pool->tp_lock); 1114 1115 SDT_PROBE1(sdt, kernel, threadpool, thread__start, pool); 1116 1117 KASSERT(thread->tpt_lwp == curlwp); 1118 for (;;) { 1119 /* Wait until we are assigned a job. */ 1120 while (thread->tpt_job == NULL) { 1121 if (ISSET(pool->tp_flags, THREADPOOL_DYING)) { 1122 SDT_PROBE1(sdt, kernel, threadpool, 1123 thread__dying, pool); 1124 break; 1125 } 1126 if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, 1127 mstohz(threadpool_idle_time_ms))) 1128 break; 1129 } 1130 if (__predict_false(thread->tpt_job == NULL)) { 1131 TAILQ_REMOVE(&pool->tp_idle_threads, thread, 1132 tpt_entry); 1133 break; 1134 } 1135 1136 struct threadpool_job *const job = thread->tpt_job; 1137 KASSERT(job != NULL); 1138 1139 /* Set our lwp name to reflect what job we're doing. */ 1140 lwp_lock(curlwp); 1141 char *const lwp_name __diagused = curlwp->l_name; 1142 thread->tpt_lwp_savedname = curlwp->l_name; 1143 curlwp->l_name = job->job_name; 1144 lwp_unlock(curlwp); 1145 1146 mutex_spin_exit(&pool->tp_lock); 1147 1148 SDT_PROBE2(sdt, kernel, threadpool, thread__job, pool, job); 1149 1150 /* Run the job. */ 1151 (*job->job_fn)(job); 1152 1153 /* lwp name restored in threadpool_job_done(). */ 1154 KASSERTMSG((curlwp->l_name == lwp_name), 1155 "someone forgot to call threadpool_job_done()!"); 1156 1157 /* 1158 * We can compare pointers, but we can no longer deference 1159 * job after this because threadpool_job_done() drops the 1160 * last reference on the job while the job is locked. 1161 */ 1162 1163 mutex_spin_enter(&pool->tp_lock); 1164 KASSERT(thread->tpt_job == job); 1165 thread->tpt_job = NULL; 1166 TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry); 1167 } 1168 threadpool_rele(pool); 1169 mutex_spin_exit(&pool->tp_lock); 1170 1171 SDT_PROBE1(sdt, kernel, threadpool, thread__exit, pool); 1172 1173 KASSERT(!cv_has_waiters(&thread->tpt_cv)); 1174 cv_destroy(&thread->tpt_cv); 1175 pool_cache_put(threadpool_thread_pc, thread); 1176 kthread_exit(0); 1177} 1178