1/* $NetBSD: task.c,v 1.19 2024/02/21 22:52:29 christos Exp $ */ 2 3/* 4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 * 6 * SPDX-License-Identifier: MPL-2.0 7 * 8 * This Source Code Form is subject to the terms of the Mozilla Public 9 * License, v. 2.0. If a copy of the MPL was not distributed with this 10 * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 * 12 * See the COPYRIGHT file distributed with this work for additional 13 * information regarding copyright ownership. 14 */ 15 16/*! \file */ 17 18/* 19 * XXXRTH Need to document the states a task can be in, and the rules 20 * for changing states. 21 */ 22 23#include <stdbool.h> 24#include <unistd.h> 25 26#include <isc/app.h> 27#include <isc/atomic.h> 28#include <isc/condition.h> 29#include <isc/event.h> 30#include <isc/log.h> 31#include <isc/magic.h> 32#include <isc/mem.h> 33#include <isc/once.h> 34#include <isc/print.h> 35#include <isc/random.h> 36#include <isc/refcount.h> 37#include <isc/string.h> 38#include <isc/task.h> 39#include <isc/thread.h> 40#include <isc/time.h> 41#include <isc/util.h> 42 43#ifdef HAVE_LIBXML2 44#include <libxml/xmlwriter.h> 45#define ISC_XMLCHAR (const xmlChar *) 46#endif /* HAVE_LIBXML2 */ 47 48#ifdef HAVE_JSON_C 49#include <json_object.h> 50#endif /* HAVE_JSON_C */ 51 52#include "task_p.h" 53 54/* 55 * Task manager is built around 'as little locking as possible' concept. 56 * Each thread has his own queue of tasks to be run, if a task is in running 57 * state it will stay on the runner it's currently on, if a task is in idle 58 * state it can be woken up on a specific runner with isc_task_sendto - that 59 * helps with data locality on CPU. 60 * 61 * To make load even some tasks (from task pools) are bound to specific 62 * queues using isc_task_create_bound. This way load balancing between 63 * CPUs/queues happens on the higher layer. 64 */ 65 66#ifdef ISC_TASK_TRACE 67#define XTRACE(m) \ 68 fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m)) 69#define XTTRACE(t, m) \ 70 fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m)) 71#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m)) 72#else /* ifdef ISC_TASK_TRACE */ 73#define XTRACE(m) 74#define XTTRACE(t, m) 75#define XTHREADTRACE(m) 76#endif /* ifdef ISC_TASK_TRACE */ 77 78/*** 79 *** Types. 80 ***/ 81 82typedef enum { 83 task_state_idle, /* not doing anything, events queue empty */ 84 task_state_ready, /* waiting in worker's queue */ 85 task_state_running, /* actively processing events */ 86 task_state_done /* shutting down, no events or references */ 87} task_state_t; 88 89#if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C) 90static const char *statenames[] = { 91 "idle", 92 "ready", 93 "running", 94 "done", 95}; 96#endif /* if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C) */ 97 98#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') 99#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) 100 101struct isc_task { 102 /* Not locked. */ 103 unsigned int magic; 104 isc_taskmgr_t *manager; 105 isc_mutex_t lock; 106 /* Locked by task lock. */ 107 int threadid; 108 task_state_t state; 109 isc_refcount_t references; 110 isc_refcount_t running; 111 isc_eventlist_t events; 112 isc_eventlist_t on_shutdown; 113 unsigned int nevents; 114 unsigned int quantum; 115 isc_stdtime_t now; 116 isc_time_t tnow; 117 char name[16]; 118 void *tag; 119 bool bound; 120 /* Protected by atomics */ 121 atomic_bool shuttingdown; 122 atomic_bool privileged; 123 /* Locked by task manager lock. */ 124 LINK(isc_task_t) link; 125}; 126 127#define TASK_SHUTTINGDOWN(t) (atomic_load_acquire(&(t)->shuttingdown)) 128#define TASK_PRIVILEGED(t) (atomic_load_acquire(&(t)->privileged)) 129 130#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') 131#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) 132 133struct isc_taskmgr { 134 /* Not locked. */ 135 unsigned int magic; 136 isc_refcount_t references; 137 isc_mem_t *mctx; 138 isc_mutex_t lock; 139 atomic_uint_fast32_t tasks_count; 140 isc_nm_t *netmgr; 141 142 /* Locked by task manager lock. */ 143 unsigned int default_quantum; 144 LIST(isc_task_t) tasks; 145 atomic_uint_fast32_t mode; 146 atomic_bool exclusive_req; 147 bool exiting; 148 isc_task_t *excl; 149}; 150 151#define DEFAULT_DEFAULT_QUANTUM 25 152 153/*% 154 * The following are intended for internal use (indicated by "isc__" 155 * prefix) but are not declared as static, allowing direct access from 156 * unit tests etc. 157 */ 158 159bool 160isc_task_purgeevent(isc_task_t *task, isc_event_t *event); 161void 162isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task); 163isc_result_t 164isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp); 165 166/*** 167 *** Tasks. 168 ***/ 169 170static void 171task_finished(isc_task_t *task) { 172 isc_taskmgr_t *manager = task->manager; 173 isc_mem_t *mctx = manager->mctx; 174 REQUIRE(EMPTY(task->events)); 175 REQUIRE(task->nevents == 0); 176 REQUIRE(EMPTY(task->on_shutdown)); 177 REQUIRE(task->state == task_state_done); 178 179 XTRACE("task_finished"); 180 181 isc_refcount_destroy(&task->running); 182 isc_refcount_destroy(&task->references); 183 184 LOCK(&manager->lock); 185 UNLINK(manager->tasks, task, link); 186 atomic_fetch_sub(&manager->tasks_count, 1); 187 UNLOCK(&manager->lock); 188 189 isc_mutex_destroy(&task->lock); 190 task->magic = 0; 191 isc_mem_put(mctx, task, sizeof(*task)); 192 193 isc_taskmgr_detach(&manager); 194} 195 196isc_result_t 197isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, 198 isc_task_t **taskp) { 199 return (isc_task_create_bound(manager, quantum, taskp, -1)); 200} 201 202isc_result_t 203isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, 204 isc_task_t **taskp, int threadid) { 205 isc_task_t *task = NULL; 206 bool exiting; 207 208 REQUIRE(VALID_MANAGER(manager)); 209 REQUIRE(taskp != NULL && *taskp == NULL); 210 211 XTRACE("isc_task_create"); 212 213 task = isc_mem_get(manager->mctx, sizeof(*task)); 214 *task = (isc_task_t){ 0 }; 215 216 isc_taskmgr_attach(manager, &task->manager); 217 218 if (threadid == -1) { 219 /* 220 * Task is not pinned to a queue, it's threadid will be 221 * chosen when first task will be sent to it - either 222 * randomly or specified by isc_task_sendto. 223 */ 224 task->bound = false; 225 task->threadid = -1; 226 } else { 227 /* 228 * Task is pinned to a queue, it'll always be run 229 * by a specific thread. 230 */ 231 task->bound = true; 232 task->threadid = threadid; 233 } 234 235 isc_mutex_init(&task->lock); 236 task->state = task_state_idle; 237 238 isc_refcount_init(&task->references, 1); 239 isc_refcount_init(&task->running, 0); 240 INIT_LIST(task->events); 241 INIT_LIST(task->on_shutdown); 242 task->nevents = 0; 243 task->quantum = (quantum > 0) ? quantum : manager->default_quantum; 244 atomic_init(&task->shuttingdown, false); 245 atomic_init(&task->privileged, false); 246 task->now = 0; 247 isc_time_settoepoch(&task->tnow); 248 memset(task->name, 0, sizeof(task->name)); 249 task->tag = NULL; 250 INIT_LINK(task, link); 251 task->magic = TASK_MAGIC; 252 253 LOCK(&manager->lock); 254 exiting = manager->exiting; 255 if (!exiting) { 256 APPEND(manager->tasks, task, link); 257 atomic_fetch_add(&manager->tasks_count, 1); 258 } 259 UNLOCK(&manager->lock); 260 261 if (exiting) { 262 isc_refcount_destroy(&task->running); 263 isc_refcount_decrement(&task->references); 264 isc_refcount_destroy(&task->references); 265 isc_mutex_destroy(&task->lock); 266 isc_taskmgr_detach(&task->manager); 267 isc_mem_put(manager->mctx, task, sizeof(*task)); 268 return (ISC_R_SHUTTINGDOWN); 269 } 270 271 *taskp = task; 272 273 return (ISC_R_SUCCESS); 274} 275 276void 277isc_task_attach(isc_task_t *source, isc_task_t **targetp) { 278 /* 279 * Attach *targetp to source. 280 */ 281 282 REQUIRE(VALID_TASK(source)); 283 REQUIRE(targetp != NULL && *targetp == NULL); 284 285 XTTRACE(source, "isc_task_attach"); 286 287 isc_refcount_increment(&source->references); 288 289 *targetp = source; 290} 291 292static bool 293task_shutdown(isc_task_t *task) { 294 bool was_idle = false; 295 isc_event_t *event, *prev; 296 297 /* 298 * Caller must be holding the task's lock. 299 */ 300 301 XTRACE("task_shutdown"); 302 303 if (atomic_compare_exchange_strong(&task->shuttingdown, 304 &(bool){ false }, true)) 305 { 306 XTRACE("shutting down"); 307 if (task->state == task_state_idle) { 308 INSIST(EMPTY(task->events)); 309 task->state = task_state_ready; 310 was_idle = true; 311 } 312 INSIST(task->state == task_state_ready || 313 task->state == task_state_running); 314 315 /* 316 * Note that we post shutdown events LIFO. 317 */ 318 for (event = TAIL(task->on_shutdown); event != NULL; 319 event = prev) 320 { 321 prev = PREV(event, ev_link); 322 DEQUEUE(task->on_shutdown, event, ev_link); 323 ENQUEUE(task->events, event, ev_link); 324 task->nevents++; 325 } 326 } 327 328 return (was_idle); 329} 330 331/* 332 * Moves a task onto the appropriate run queue. 333 * 334 * Caller must NOT hold queue lock. 335 */ 336static void 337task_ready(isc_task_t *task) { 338 isc_taskmgr_t *manager = task->manager; 339 REQUIRE(VALID_MANAGER(manager)); 340 341 XTRACE("task_ready"); 342 343 isc_refcount_increment0(&task->running); 344 LOCK(&task->lock); 345 isc_nm_task_enqueue(manager->netmgr, task, task->threadid); 346 UNLOCK(&task->lock); 347} 348 349void 350isc_task_ready(isc_task_t *task) { 351 task_ready(task); 352} 353 354static bool 355task_detach(isc_task_t *task) { 356 /* 357 * Caller must be holding the task lock. 358 */ 359 360 XTRACE("detach"); 361 362 if (isc_refcount_decrement(&task->references) == 1 && 363 task->state == task_state_idle) 364 { 365 INSIST(EMPTY(task->events)); 366 /* 367 * There are no references to this task, and no 368 * pending events. We could try to optimize and 369 * either initiate shutdown or clean up the task, 370 * depending on its state, but it's easier to just 371 * make the task ready and allow run() or the event 372 * loop to deal with shutting down and termination. 373 */ 374 task->state = task_state_ready; 375 return (true); 376 } 377 378 return (false); 379} 380 381void 382isc_task_detach(isc_task_t **taskp) { 383 isc_task_t *task; 384 bool was_idle; 385 386 /* 387 * Detach *taskp from its task. 388 */ 389 390 REQUIRE(taskp != NULL); 391 task = *taskp; 392 REQUIRE(VALID_TASK(task)); 393 394 XTRACE("isc_task_detach"); 395 396 LOCK(&task->lock); 397 was_idle = task_detach(task); 398 UNLOCK(&task->lock); 399 400 if (was_idle) { 401 task_ready(task); 402 } 403 404 *taskp = NULL; 405} 406 407static bool 408task_send(isc_task_t *task, isc_event_t **eventp, int c) { 409 bool was_idle = false; 410 isc_event_t *event; 411 412 /* 413 * Caller must be holding the task lock. 414 */ 415 416 REQUIRE(eventp != NULL); 417 event = *eventp; 418 *eventp = NULL; 419 REQUIRE(event != NULL); 420 REQUIRE(event->ev_type > 0); 421 REQUIRE(task->state != task_state_done); 422 REQUIRE(!ISC_LINK_LINKED(event, ev_ratelink)); 423 424 XTRACE("task_send"); 425 426 if (task->bound) { 427 c = task->threadid; 428 } else if (c < 0) { 429 c = -1; 430 } 431 432 if (task->state == task_state_idle) { 433 was_idle = true; 434 task->threadid = c; 435 INSIST(EMPTY(task->events)); 436 task->state = task_state_ready; 437 } 438 INSIST(task->state == task_state_ready || 439 task->state == task_state_running); 440 ENQUEUE(task->events, event, ev_link); 441 task->nevents++; 442 443 return (was_idle); 444} 445 446void 447isc_task_send(isc_task_t *task, isc_event_t **eventp) { 448 isc_task_sendto(task, eventp, -1); 449} 450 451void 452isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { 453 isc_task_sendtoanddetach(taskp, eventp, -1); 454} 455 456void 457isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c) { 458 bool was_idle; 459 460 /* 461 * Send '*event' to 'task'. 462 */ 463 464 REQUIRE(VALID_TASK(task)); 465 XTRACE("isc_task_send"); 466 467 /* 468 * We're trying hard to hold locks for as short a time as possible. 469 * We're also trying to hold as few locks as possible. This is why 470 * some processing is deferred until after the lock is released. 471 */ 472 LOCK(&task->lock); 473 was_idle = task_send(task, eventp, c); 474 UNLOCK(&task->lock); 475 476 if (was_idle) { 477 /* 478 * We need to add this task to the ready queue. 479 * 480 * We've waited until now to do it because making a task 481 * ready requires locking the manager. If we tried to do 482 * this while holding the task lock, we could deadlock. 483 * 484 * We've changed the state to ready, so no one else will 485 * be trying to add this task to the ready queue. The 486 * only way to leave the ready state is by executing the 487 * task. It thus doesn't matter if events are added, 488 * removed, or a shutdown is started in the interval 489 * between the time we released the task lock, and the time 490 * we add the task to the ready queue. 491 */ 492 task_ready(task); 493 } 494} 495 496void 497isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { 498 bool idle1, idle2; 499 isc_task_t *task; 500 501 /* 502 * Send '*event' to '*taskp' and then detach '*taskp' from its 503 * task. 504 */ 505 506 REQUIRE(taskp != NULL); 507 task = *taskp; 508 REQUIRE(VALID_TASK(task)); 509 XTRACE("isc_task_sendanddetach"); 510 511 LOCK(&task->lock); 512 idle1 = task_send(task, eventp, c); 513 idle2 = task_detach(task); 514 UNLOCK(&task->lock); 515 516 /* 517 * If idle1, then idle2 shouldn't be true as well since we're holding 518 * the task lock, and thus the task cannot switch from ready back to 519 * idle. 520 */ 521 INSIST(!(idle1 && idle2)); 522 523 if (idle1 || idle2) { 524 task_ready(task); 525 } 526 527 *taskp = NULL; 528} 529 530#define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0) 531 532static unsigned int 533dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first, 534 isc_eventtype_t last, void *tag, isc_eventlist_t *events, 535 bool purging) { 536 isc_event_t *event, *next_event; 537 unsigned int count = 0; 538 539 REQUIRE(VALID_TASK(task)); 540 REQUIRE(last >= first); 541 542 XTRACE("dequeue_events"); 543 544 /* 545 * Events matching 'sender', whose type is >= first and <= last, and 546 * whose tag is 'tag' will be dequeued. If 'purging', matching events 547 * which are marked as unpurgable will not be dequeued. 548 * 549 * sender == NULL means "any sender", and tag == NULL means "any tag". 550 */ 551 552 LOCK(&task->lock); 553 554 for (event = HEAD(task->events); event != NULL; event = next_event) { 555 next_event = NEXT(event, ev_link); 556 if (event->ev_type >= first && event->ev_type <= last && 557 (sender == NULL || event->ev_sender == sender) && 558 (tag == NULL || event->ev_tag == tag) && 559 (!purging || PURGE_OK(event))) 560 { 561 DEQUEUE(task->events, event, ev_link); 562 task->nevents--; 563 ENQUEUE(*events, event, ev_link); 564 count++; 565 } 566 } 567 568 UNLOCK(&task->lock); 569 570 return (count); 571} 572 573unsigned int 574isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first, 575 isc_eventtype_t last, void *tag) { 576 unsigned int count; 577 isc_eventlist_t events; 578 isc_event_t *event, *next_event; 579 REQUIRE(VALID_TASK(task)); 580 581 /* 582 * Purge events from a task's event queue. 583 */ 584 585 XTRACE("isc_task_purgerange"); 586 587 ISC_LIST_INIT(events); 588 589 count = dequeue_events(task, sender, first, last, tag, &events, true); 590 591 for (event = HEAD(events); event != NULL; event = next_event) { 592 next_event = NEXT(event, ev_link); 593 ISC_LIST_UNLINK(events, event, ev_link); 594 isc_event_free(&event); 595 } 596 597 /* 598 * Note that purging never changes the state of the task. 599 */ 600 601 return (count); 602} 603 604unsigned int 605isc_task_purge(isc_task_t *task, void *sender, isc_eventtype_t type, 606 void *tag) { 607 /* 608 * Purge events from a task's event queue. 609 */ 610 REQUIRE(VALID_TASK(task)); 611 612 XTRACE("isc_task_purge"); 613 614 return (isc_task_purgerange(task, sender, type, type, tag)); 615} 616 617bool 618isc_task_purgeevent(isc_task_t *task, isc_event_t *event) { 619 bool found = false; 620 621 /* 622 * Purge 'event' from a task's event queue. 623 */ 624 625 REQUIRE(VALID_TASK(task)); 626 627 /* 628 * If 'event' is on the task's event queue, it will be purged, 629 * unless it is marked as unpurgeable. 'event' does not have to be 630 * on the task's event queue; in fact, it can even be an invalid 631 * pointer. Purging only occurs if the event is actually on the task's 632 * event queue. 633 * 634 * Purging never changes the state of the task. 635 */ 636 637 LOCK(&task->lock); 638 if (ISC_LINK_LINKED(event, ev_link)) { 639 DEQUEUE(task->events, event, ev_link); 640 task->nevents--; 641 found = true; 642 } 643 UNLOCK(&task->lock); 644 645 if (!found) { 646 return (false); 647 } 648 649 isc_event_free(&event); 650 651 return (true); 652} 653 654unsigned int 655isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, void *tag, 656 isc_eventlist_t *events) { 657 /* 658 * Remove events from a task's event queue. 659 */ 660 661 XTRACE("isc_task_unsend"); 662 663 return (dequeue_events(task, sender, type, type, tag, events, false)); 664} 665 666isc_result_t 667isc_task_onshutdown(isc_task_t *task, isc_taskaction_t action, void *arg) { 668 bool disallowed = false; 669 isc_result_t result = ISC_R_SUCCESS; 670 isc_event_t *event; 671 672 /* 673 * Send a shutdown event with action 'action' and argument 'arg' when 674 * 'task' is shutdown. 675 */ 676 677 REQUIRE(VALID_TASK(task)); 678 REQUIRE(action != NULL); 679 680 event = isc_event_allocate(task->manager->mctx, NULL, 681 ISC_TASKEVENT_SHUTDOWN, action, arg, 682 sizeof(*event)); 683 684 if (TASK_SHUTTINGDOWN(task)) { 685 disallowed = true; 686 result = ISC_R_SHUTTINGDOWN; 687 } else { 688 LOCK(&task->lock); 689 ENQUEUE(task->on_shutdown, event, ev_link); 690 UNLOCK(&task->lock); 691 } 692 693 if (disallowed) { 694 isc_mem_put(task->manager->mctx, event, sizeof(*event)); 695 } 696 697 return (result); 698} 699 700void 701isc_task_shutdown(isc_task_t *task) { 702 bool was_idle; 703 704 /* 705 * Shutdown 'task'. 706 */ 707 708 REQUIRE(VALID_TASK(task)); 709 710 LOCK(&task->lock); 711 was_idle = task_shutdown(task); 712 UNLOCK(&task->lock); 713 714 if (was_idle) { 715 task_ready(task); 716 } 717} 718 719void 720isc_task_destroy(isc_task_t **taskp) { 721 /* 722 * Destroy '*taskp'. 723 */ 724 725 REQUIRE(taskp != NULL); 726 727 isc_task_shutdown(*taskp); 728 isc_task_detach(taskp); 729} 730 731void 732isc_task_setname(isc_task_t *task, const char *name, void *tag) { 733 /* 734 * Name 'task'. 735 */ 736 737 REQUIRE(VALID_TASK(task)); 738 739 LOCK(&task->lock); 740 strlcpy(task->name, name, sizeof(task->name)); 741 task->tag = tag; 742 UNLOCK(&task->lock); 743} 744 745const char * 746isc_task_getname(isc_task_t *task) { 747 REQUIRE(VALID_TASK(task)); 748 749 return (task->name); 750} 751 752void * 753isc_task_gettag(isc_task_t *task) { 754 REQUIRE(VALID_TASK(task)); 755 756 return (task->tag); 757} 758 759isc_nm_t * 760isc_task_getnetmgr(isc_task_t *task) { 761 REQUIRE(VALID_TASK(task)); 762 763 return (task->manager->netmgr); 764} 765 766void 767isc_task_setquantum(isc_task_t *task, unsigned int quantum) { 768 REQUIRE(VALID_TASK(task)); 769 770 LOCK(&task->lock); 771 task->quantum = (quantum > 0) ? quantum 772 : task->manager->default_quantum; 773 UNLOCK(&task->lock); 774} 775 776/*** 777 *** Task Manager. 778 ***/ 779 780static isc_result_t 781task_run(isc_task_t *task) { 782 unsigned int dispatch_count = 0; 783 bool finished = false; 784 isc_event_t *event = NULL; 785 isc_result_t result = ISC_R_SUCCESS; 786 uint32_t quantum; 787 788 REQUIRE(VALID_TASK(task)); 789 790 LOCK(&task->lock); 791 quantum = task->quantum; 792 793 if (task->state != task_state_ready) { 794 goto done; 795 } 796 797 INSIST(task->state == task_state_ready); 798 task->state = task_state_running; 799 XTRACE("running"); 800 XTRACE(task->name); 801 TIME_NOW(&task->tnow); 802 task->now = isc_time_seconds(&task->tnow); 803 804 while (true) { 805 if (!EMPTY(task->events)) { 806 event = HEAD(task->events); 807 DEQUEUE(task->events, event, ev_link); 808 task->nevents--; 809 810 /* 811 * Execute the event action. 812 */ 813 XTRACE("execute action"); 814 XTRACE(task->name); 815 if (event->ev_action != NULL) { 816 UNLOCK(&task->lock); 817 (event->ev_action)(task, event); 818 LOCK(&task->lock); 819 } 820 XTRACE("execution complete"); 821 dispatch_count++; 822 } 823 824 if (isc_refcount_current(&task->references) == 0 && 825 EMPTY(task->events) && !TASK_SHUTTINGDOWN(task)) 826 { 827 /* 828 * There are no references and no pending events for 829 * this task, which means it will not become runnable 830 * again via an external action (such as sending an 831 * event or detaching). 832 * 833 * We initiate shutdown to prevent it from becoming a 834 * zombie. 835 * 836 * We do this here instead of in the "if 837 * EMPTY(task->events)" block below because: 838 * 839 * If we post no shutdown events, we want the task 840 * to finish. 841 * 842 * If we did post shutdown events, will still want 843 * the task's quantum to be applied. 844 */ 845 INSIST(!task_shutdown(task)); 846 } 847 848 if (EMPTY(task->events)) { 849 /* 850 * Nothing else to do for this task right now. 851 */ 852 XTRACE("empty"); 853 if (isc_refcount_current(&task->references) == 0 && 854 TASK_SHUTTINGDOWN(task)) 855 { 856 /* 857 * The task is done. 858 */ 859 XTRACE("done"); 860 task->state = task_state_done; 861 } else if (task->state == task_state_running) { 862 XTRACE("idling"); 863 task->state = task_state_idle; 864 } 865 break; 866 } else if (dispatch_count >= quantum) { 867 /* 868 * Our quantum has expired, but there is more work to be 869 * done. We'll requeue it to the ready queue later. 870 * 871 * We don't check quantum until dispatching at least one 872 * event, so the minimum quantum is one. 873 */ 874 XTRACE("quantum"); 875 task->state = task_state_ready; 876 result = ISC_R_QUOTA; 877 break; 878 } 879 } 880 881done: 882 if (isc_refcount_decrement(&task->running) == 1 && 883 task->state == task_state_done) 884 { 885 finished = true; 886 } 887 UNLOCK(&task->lock); 888 889 if (finished) { 890 task_finished(task); 891 } 892 893 return (result); 894} 895 896isc_result_t 897isc_task_run(isc_task_t *task) { 898 return (task_run(task)); 899} 900 901static void 902manager_free(isc_taskmgr_t *manager) { 903 isc_refcount_destroy(&manager->references); 904 isc_nm_detach(&manager->netmgr); 905 906 isc_mutex_destroy(&manager->lock); 907 manager->magic = 0; 908 isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); 909} 910 911void 912isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) { 913 REQUIRE(VALID_MANAGER(source)); 914 REQUIRE(targetp != NULL && *targetp == NULL); 915 916 isc_refcount_increment(&source->references); 917 918 *targetp = source; 919} 920 921void 922isc_taskmgr_detach(isc_taskmgr_t **managerp) { 923 REQUIRE(managerp != NULL); 924 REQUIRE(VALID_MANAGER(*managerp)); 925 926 isc_taskmgr_t *manager = *managerp; 927 *managerp = NULL; 928 929 if (isc_refcount_decrement(&manager->references) == 1) { 930 manager_free(manager); 931 } 932} 933 934isc_result_t 935isc__taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, 936 isc_taskmgr_t **managerp) { 937 isc_taskmgr_t *manager; 938 939 /* 940 * Create a new task manager. 941 */ 942 943 REQUIRE(managerp != NULL && *managerp == NULL); 944 REQUIRE(nm != NULL); 945 946 manager = isc_mem_get(mctx, sizeof(*manager)); 947 *manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC }; 948 949 isc_mutex_init(&manager->lock); 950 951 if (default_quantum == 0) { 952 default_quantum = DEFAULT_DEFAULT_QUANTUM; 953 } 954 manager->default_quantum = default_quantum; 955 956 if (nm != NULL) { 957 isc_nm_attach(nm, &manager->netmgr); 958 } 959 960 INIT_LIST(manager->tasks); 961 atomic_init(&manager->mode, isc_taskmgrmode_normal); 962 atomic_init(&manager->exclusive_req, false); 963 atomic_init(&manager->tasks_count, 0); 964 965 isc_mem_attach(mctx, &manager->mctx); 966 967 isc_refcount_init(&manager->references, 1); 968 969 *managerp = manager; 970 971 return (ISC_R_SUCCESS); 972} 973 974void 975isc__taskmgr_shutdown(isc_taskmgr_t *manager) { 976 isc_task_t *task; 977 978 REQUIRE(VALID_MANAGER(manager)); 979 980 XTHREADTRACE("isc_taskmgr_shutdown"); 981 /* 982 * Only one non-worker thread may ever call this routine. 983 * If a worker thread wants to initiate shutdown of the 984 * task manager, it should ask some non-worker thread to call 985 * isc_taskmgr_destroy(), e.g. by signalling a condition variable 986 * that the startup thread is sleeping on. 987 */ 988 989 /* 990 * Unlike elsewhere, we're going to hold this lock a long time. 991 * We need to do so, because otherwise the list of tasks could 992 * change while we were traversing it. 993 * 994 * This is also the only function where we will hold both the 995 * task manager lock and a task lock at the same time. 996 */ 997 LOCK(&manager->lock); 998 if (manager->excl != NULL) { 999 isc_task_detach((isc_task_t **)&manager->excl); 1000 } 1001 1002 /* 1003 * Make sure we only get called once. 1004 */ 1005 INSIST(manager->exiting == false); 1006 manager->exiting = true; 1007 1008 /* 1009 * Post shutdown event(s) to every task (if they haven't already been 1010 * posted). 1011 */ 1012 for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link)) 1013 { 1014 bool was_idle; 1015 1016 LOCK(&task->lock); 1017 was_idle = task_shutdown(task); 1018 UNLOCK(&task->lock); 1019 1020 if (was_idle) { 1021 task_ready(task); 1022 } 1023 } 1024 1025 UNLOCK(&manager->lock); 1026} 1027 1028void 1029isc__taskmgr_destroy(isc_taskmgr_t **managerp) { 1030 REQUIRE(managerp != NULL && VALID_MANAGER(*managerp)); 1031 XTHREADTRACE("isc_taskmgr_destroy"); 1032 1033#ifdef ISC_TASK_TRACE 1034 int counter = 0; 1035 while (isc_refcount_current(&(*managerp)->references) > 1 && 1036 counter++ < 1000) 1037 { 1038 usleep(10 * 1000); 1039 } 1040 INSIST(counter < 1000); 1041#else 1042 while (isc_refcount_current(&(*managerp)->references) > 1) { 1043 usleep(10 * 1000); 1044 } 1045#endif 1046 1047 isc_taskmgr_detach(managerp); 1048} 1049 1050void 1051isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) { 1052 REQUIRE(VALID_MANAGER(mgr)); 1053 REQUIRE(VALID_TASK(task)); 1054 1055 LOCK(&task->lock); 1056 REQUIRE(task->threadid == 0); 1057 UNLOCK(&task->lock); 1058 1059 LOCK(&mgr->lock); 1060 if (mgr->excl != NULL) { 1061 isc_task_detach(&mgr->excl); 1062 } 1063 isc_task_attach(task, &mgr->excl); 1064 UNLOCK(&mgr->lock); 1065} 1066 1067isc_result_t 1068isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp) { 1069 isc_result_t result; 1070 1071 REQUIRE(VALID_MANAGER(mgr)); 1072 REQUIRE(taskp != NULL && *taskp == NULL); 1073 1074 LOCK(&mgr->lock); 1075 if (mgr->excl != NULL) { 1076 isc_task_attach(mgr->excl, taskp); 1077 result = ISC_R_SUCCESS; 1078 } else if (mgr->exiting) { 1079 result = ISC_R_SHUTTINGDOWN; 1080 } else { 1081 result = ISC_R_NOTFOUND; 1082 } 1083 UNLOCK(&mgr->lock); 1084 1085 return (result); 1086} 1087 1088isc_result_t 1089isc_task_beginexclusive(isc_task_t *task) { 1090 isc_taskmgr_t *manager; 1091 1092 REQUIRE(VALID_TASK(task)); 1093 1094 manager = task->manager; 1095 1096 REQUIRE(task->state == task_state_running); 1097 1098 LOCK(&manager->lock); 1099 REQUIRE(task == manager->excl || 1100 (manager->exiting && manager->excl == NULL)); 1101 UNLOCK(&manager->lock); 1102 1103 if (!atomic_compare_exchange_strong(&manager->exclusive_req, 1104 &(bool){ false }, true)) 1105 { 1106 return (ISC_R_LOCKBUSY); 1107 } 1108 1109 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1110 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1111 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1112 "exclusive task mode: %s", "starting"); 1113 } 1114 1115 isc_nm_pause(manager->netmgr); 1116 1117 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1118 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1119 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1120 "exclusive task mode: %s", "started"); 1121 } 1122 1123 return (ISC_R_SUCCESS); 1124} 1125 1126void 1127isc_task_endexclusive(isc_task_t *task) { 1128 isc_taskmgr_t *manager = NULL; 1129 1130 REQUIRE(VALID_TASK(task)); 1131 REQUIRE(task->state == task_state_running); 1132 1133 manager = task->manager; 1134 1135 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1136 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1137 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1138 "exclusive task mode: %s", "ending"); 1139 } 1140 1141 isc_nm_resume(manager->netmgr); 1142 1143 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1144 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1145 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1146 "exclusive task mode: %s", "ended"); 1147 } 1148 1149 atomic_compare_exchange_enforced(&manager->exclusive_req, 1150 &(bool){ true }, false); 1151} 1152 1153void 1154isc_taskmgr_setmode(isc_taskmgr_t *manager, isc_taskmgrmode_t mode) { 1155 atomic_store(&manager->mode, mode); 1156} 1157 1158isc_taskmgrmode_t 1159isc_taskmgr_mode(isc_taskmgr_t *manager) { 1160 return (atomic_load(&manager->mode)); 1161} 1162 1163void 1164isc_task_setprivilege(isc_task_t *task, bool priv) { 1165 REQUIRE(VALID_TASK(task)); 1166 1167 atomic_store_release(&task->privileged, priv); 1168} 1169 1170bool 1171isc_task_getprivilege(isc_task_t *task) { 1172 REQUIRE(VALID_TASK(task)); 1173 1174 return (TASK_PRIVILEGED(task)); 1175} 1176 1177bool 1178isc_task_privileged(isc_task_t *task) { 1179 REQUIRE(VALID_TASK(task)); 1180 1181 return (isc_taskmgr_mode(task->manager) && TASK_PRIVILEGED(task)); 1182} 1183 1184bool 1185isc_task_exiting(isc_task_t *task) { 1186 REQUIRE(VALID_TASK(task)); 1187 1188 return (TASK_SHUTTINGDOWN(task)); 1189} 1190 1191#ifdef HAVE_LIBXML2 1192#define TRY0(a) \ 1193 do { \ 1194 xmlrc = (a); \ 1195 if (xmlrc < 0) \ 1196 goto error; \ 1197 } while (0) 1198int 1199isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) { 1200 isc_task_t *task = NULL; 1201 int xmlrc; 1202 xmlTextWriterPtr writer = (xmlTextWriterPtr)writer0; 1203 1204 LOCK(&mgr->lock); 1205 1206 /* 1207 * Write out the thread-model, and some details about each depending 1208 * on which type is enabled. 1209 */ 1210 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "thread-model")); 1211 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "type")); 1212 TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded")); 1213 TRY0(xmlTextWriterEndElement(writer)); /* type */ 1214 1215 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum")); 1216 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1217 mgr->default_quantum)); 1218 TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */ 1219 1220 TRY0(xmlTextWriterEndElement(writer)); /* thread-model */ 1221 1222 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks")); 1223 task = ISC_LIST_HEAD(mgr->tasks); 1224 while (task != NULL) { 1225 LOCK(&task->lock); 1226 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "task")); 1227 1228 if (task->name[0] != 0) { 1229 TRY0(xmlTextWriterStartElement(writer, 1230 ISC_XMLCHAR "name")); 1231 TRY0(xmlTextWriterWriteFormatString(writer, "%s", 1232 task->name)); 1233 TRY0(xmlTextWriterEndElement(writer)); /* name */ 1234 } 1235 1236 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "reference" 1237 "s")); 1238 TRY0(xmlTextWriterWriteFormatString( 1239 writer, "%" PRIuFAST32, 1240 isc_refcount_current(&task->references))); 1241 TRY0(xmlTextWriterEndElement(writer)); /* references */ 1242 1243 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "id")); 1244 TRY0(xmlTextWriterWriteFormatString(writer, "%p", task)); 1245 TRY0(xmlTextWriterEndElement(writer)); /* id */ 1246 1247 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "state")); 1248 TRY0(xmlTextWriterWriteFormatString(writer, "%s", 1249 statenames[task->state])); 1250 TRY0(xmlTextWriterEndElement(writer)); /* state */ 1251 1252 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "quantum")); 1253 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1254 task->quantum)); 1255 TRY0(xmlTextWriterEndElement(writer)); /* quantum */ 1256 1257 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "events")); 1258 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1259 task->nevents)); 1260 TRY0(xmlTextWriterEndElement(writer)); /* events */ 1261 1262 TRY0(xmlTextWriterEndElement(writer)); 1263 1264 UNLOCK(&task->lock); 1265 task = ISC_LIST_NEXT(task, link); 1266 } 1267 TRY0(xmlTextWriterEndElement(writer)); /* tasks */ 1268 1269error: 1270 if (task != NULL) { 1271 UNLOCK(&task->lock); 1272 } 1273 UNLOCK(&mgr->lock); 1274 1275 return (xmlrc); 1276} 1277#endif /* HAVE_LIBXML2 */ 1278 1279#ifdef HAVE_JSON_C 1280#define CHECKMEM(m) \ 1281 do { \ 1282 if (m == NULL) { \ 1283 result = ISC_R_NOMEMORY; \ 1284 goto error; \ 1285 } \ 1286 } while (0) 1287 1288isc_result_t 1289isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) { 1290 isc_result_t result = ISC_R_SUCCESS; 1291 isc_task_t *task = NULL; 1292 json_object *obj = NULL, *array = NULL, *taskobj = NULL; 1293 json_object *tasks = (json_object *)tasks0; 1294 1295 LOCK(&mgr->lock); 1296 1297 /* 1298 * Write out the thread-model, and some details about each depending 1299 * on which type is enabled. 1300 */ 1301 obj = json_object_new_string("threaded"); 1302 CHECKMEM(obj); 1303 json_object_object_add(tasks, "thread-model", obj); 1304 1305 obj = json_object_new_int(mgr->default_quantum); 1306 CHECKMEM(obj); 1307 json_object_object_add(tasks, "default-quantum", obj); 1308 1309 array = json_object_new_array(); 1310 CHECKMEM(array); 1311 1312 for (task = ISC_LIST_HEAD(mgr->tasks); task != NULL; 1313 task = ISC_LIST_NEXT(task, link)) 1314 { 1315 char buf[255]; 1316 1317 LOCK(&task->lock); 1318 1319 taskobj = json_object_new_object(); 1320 CHECKMEM(taskobj); 1321 json_object_array_add(array, taskobj); 1322 1323 snprintf(buf, sizeof(buf), "%p", task); 1324 obj = json_object_new_string(buf); 1325 CHECKMEM(obj); 1326 json_object_object_add(taskobj, "id", obj); 1327 1328 if (task->name[0] != 0) { 1329 obj = json_object_new_string(task->name); 1330 CHECKMEM(obj); 1331 json_object_object_add(taskobj, "name", obj); 1332 } 1333 1334 obj = json_object_new_int( 1335 isc_refcount_current(&task->references)); 1336 CHECKMEM(obj); 1337 json_object_object_add(taskobj, "references", obj); 1338 1339 obj = json_object_new_string(statenames[task->state]); 1340 CHECKMEM(obj); 1341 json_object_object_add(taskobj, "state", obj); 1342 1343 obj = json_object_new_int(task->quantum); 1344 CHECKMEM(obj); 1345 json_object_object_add(taskobj, "quantum", obj); 1346 1347 obj = json_object_new_int(task->nevents); 1348 CHECKMEM(obj); 1349 json_object_object_add(taskobj, "events", obj); 1350 1351 UNLOCK(&task->lock); 1352 } 1353 1354 json_object_object_add(tasks, "tasks", array); 1355 array = NULL; 1356 result = ISC_R_SUCCESS; 1357 1358error: 1359 if (array != NULL) { 1360 json_object_put(array); 1361 } 1362 1363 if (task != NULL) { 1364 UNLOCK(&task->lock); 1365 } 1366 UNLOCK(&mgr->lock); 1367 1368 return (result); 1369} 1370#endif /* ifdef HAVE_JSON_C */ 1371