1/* $NetBSD: task.c,v 1.1 2024/02/18 20:57:50 christos Exp $ */ 2 3/* 4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 * 6 * SPDX-License-Identifier: MPL-2.0 7 * 8 * This Source Code Form is subject to the terms of the Mozilla Public 9 * License, v. 2.0. If a copy of the MPL was not distributed with this 10 * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 * 12 * See the COPYRIGHT file distributed with this work for additional 13 * information regarding copyright ownership. 14 */ 15 16/*! \file */ 17 18/* 19 * XXXRTH Need to document the states a task can be in, and the rules 20 * for changing states. 21 */ 22 23#include <stdbool.h> 24#include <unistd.h> 25 26#include <isc/app.h> 27#include <isc/atomic.h> 28#include <isc/condition.h> 29#include <isc/event.h> 30#include <isc/log.h> 31#include <isc/magic.h> 32#include <isc/mem.h> 33#include <isc/once.h> 34#include <isc/platform.h> 35#include <isc/print.h> 36#include <isc/random.h> 37#include <isc/refcount.h> 38#include <isc/string.h> 39#include <isc/task.h> 40#include <isc/thread.h> 41#include <isc/time.h> 42#include <isc/util.h> 43 44#ifdef HAVE_LIBXML2 45#include <libxml/xmlwriter.h> 46#define ISC_XMLCHAR (const xmlChar *) 47#endif /* HAVE_LIBXML2 */ 48 49#ifdef HAVE_JSON_C 50#include <json_object.h> 51#endif /* HAVE_JSON_C */ 52 53#include "task_p.h" 54 55/* 56 * Task manager is built around 'as little locking as possible' concept. 57 * Each thread has his own queue of tasks to be run, if a task is in running 58 * state it will stay on the runner it's currently on, if a task is in idle 59 * state it can be woken up on a specific runner with isc_task_sendto - that 60 * helps with data locality on CPU. 61 * 62 * To make load even some tasks (from task pools) are bound to specific 63 * queues using isc_task_create_bound. This way load balancing between 64 * CPUs/queues happens on the higher layer. 65 */ 66 67#ifdef ISC_TASK_TRACE 68#define XTRACE(m) \ 69 fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m)) 70#define XTTRACE(t, m) \ 71 fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m)) 72#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m)) 73#else /* ifdef ISC_TASK_TRACE */ 74#define XTRACE(m) 75#define XTTRACE(t, m) 76#define XTHREADTRACE(m) 77#endif /* ifdef ISC_TASK_TRACE */ 78 79/*** 80 *** Types. 81 ***/ 82 83typedef enum { 84 task_state_idle, /* not doing anything, events queue empty */ 85 task_state_ready, /* waiting in worker's queue */ 86 task_state_paused, /* not running, paused */ 87 task_state_pausing, /* running, waiting to be paused */ 88 task_state_running, /* actively processing events */ 89 task_state_done /* shutting down, no events or references */ 90} task_state_t; 91 92#if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C) 93static const char *statenames[] = { 94 "idle", "ready", "paused", "pausing", "running", "done", 95}; 96#endif /* if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C) */ 97 98#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') 99#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) 100 101struct isc_task { 102 /* Not locked. */ 103 unsigned int magic; 104 isc_taskmgr_t *manager; 105 isc_mutex_t lock; 106 /* Locked by task lock. */ 107 int threadid; 108 task_state_t state; 109 int pause_cnt; 110 isc_refcount_t references; 111 isc_refcount_t running; 112 isc_eventlist_t events; 113 isc_eventlist_t on_shutdown; 114 unsigned int nevents; 115 unsigned int quantum; 116 isc_stdtime_t now; 117 isc_time_t tnow; 118 char name[16]; 119 void *tag; 120 bool bound; 121 /* Protected by atomics */ 122 atomic_bool shuttingdown; 123 atomic_bool privileged; 124 /* Locked by task manager lock. */ 125 LINK(isc_task_t) link; 126}; 127 128#define TASK_SHUTTINGDOWN(t) (atomic_load_acquire(&(t)->shuttingdown)) 129#define TASK_PRIVILEGED(t) (atomic_load_acquire(&(t)->privileged)) 130 131#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') 132#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) 133 134struct isc_taskmgr { 135 /* Not locked. */ 136 unsigned int magic; 137 isc_refcount_t references; 138 isc_mem_t *mctx; 139 isc_mutex_t lock; 140 atomic_uint_fast32_t tasks_count; 141 isc_nm_t *netmgr; 142 143 /* Locked by task manager lock. */ 144 unsigned int default_quantum; 145 LIST(isc_task_t) tasks; 146 atomic_uint_fast32_t mode; 147 atomic_bool exclusive_req; 148 bool exiting; 149 isc_task_t *excl; 150}; 151 152#define DEFAULT_DEFAULT_QUANTUM 25 153 154/*% 155 * The following are intended for internal use (indicated by "isc__" 156 * prefix) but are not declared as static, allowing direct access from 157 * unit tests etc. 158 */ 159 160bool 161isc_task_purgeevent(isc_task_t *task, isc_event_t *event); 162void 163isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task); 164isc_result_t 165isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp); 166 167/*** 168 *** Tasks. 169 ***/ 170 171static void 172task_finished(isc_task_t *task) { 173 isc_taskmgr_t *manager = task->manager; 174 isc_mem_t *mctx = manager->mctx; 175 REQUIRE(EMPTY(task->events)); 176 REQUIRE(task->nevents == 0); 177 REQUIRE(EMPTY(task->on_shutdown)); 178 REQUIRE(task->state == task_state_done); 179 180 XTRACE("task_finished"); 181 182 isc_refcount_destroy(&task->running); 183 isc_refcount_destroy(&task->references); 184 185 LOCK(&manager->lock); 186 UNLINK(manager->tasks, task, link); 187 atomic_fetch_sub(&manager->tasks_count, 1); 188 UNLOCK(&manager->lock); 189 190 isc_mutex_destroy(&task->lock); 191 task->magic = 0; 192 isc_mem_put(mctx, task, sizeof(*task)); 193 194 isc_taskmgr_detach(&manager); 195} 196 197isc_result_t 198isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, 199 isc_task_t **taskp) { 200 return (isc_task_create_bound(manager, quantum, taskp, -1)); 201} 202 203isc_result_t 204isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, 205 isc_task_t **taskp, int threadid) { 206 isc_task_t *task = NULL; 207 bool exiting; 208 209 REQUIRE(VALID_MANAGER(manager)); 210 REQUIRE(taskp != NULL && *taskp == NULL); 211 212 XTRACE("isc_task_create"); 213 214 task = isc_mem_get(manager->mctx, sizeof(*task)); 215 *task = (isc_task_t){ 0 }; 216 217 isc_taskmgr_attach(manager, &task->manager); 218 219 if (threadid == -1) { 220 /* 221 * Task is not pinned to a queue, it's threadid will be 222 * chosen when first task will be sent to it - either 223 * randomly or specified by isc_task_sendto. 224 */ 225 task->bound = false; 226 task->threadid = -1; 227 } else { 228 /* 229 * Task is pinned to a queue, it'll always be run 230 * by a specific thread. 231 */ 232 task->bound = true; 233 task->threadid = threadid; 234 } 235 236 isc_mutex_init(&task->lock); 237 task->state = task_state_idle; 238 task->pause_cnt = 0; 239 240 isc_refcount_init(&task->references, 1); 241 isc_refcount_init(&task->running, 0); 242 INIT_LIST(task->events); 243 INIT_LIST(task->on_shutdown); 244 task->nevents = 0; 245 task->quantum = (quantum > 0) ? quantum : manager->default_quantum; 246 atomic_init(&task->shuttingdown, false); 247 atomic_init(&task->privileged, false); 248 task->now = 0; 249 isc_time_settoepoch(&task->tnow); 250 memset(task->name, 0, sizeof(task->name)); 251 task->tag = NULL; 252 INIT_LINK(task, link); 253 task->magic = TASK_MAGIC; 254 255 LOCK(&manager->lock); 256 exiting = manager->exiting; 257 if (!exiting) { 258 APPEND(manager->tasks, task, link); 259 atomic_fetch_add(&manager->tasks_count, 1); 260 } 261 UNLOCK(&manager->lock); 262 263 if (exiting) { 264 isc_refcount_destroy(&task->running); 265 isc_refcount_decrement(&task->references); 266 isc_refcount_destroy(&task->references); 267 isc_mutex_destroy(&task->lock); 268 isc_taskmgr_detach(&task->manager); 269 isc_mem_put(manager->mctx, task, sizeof(*task)); 270 return (ISC_R_SHUTTINGDOWN); 271 } 272 273 *taskp = task; 274 275 return (ISC_R_SUCCESS); 276} 277 278void 279isc_task_attach(isc_task_t *source, isc_task_t **targetp) { 280 /* 281 * Attach *targetp to source. 282 */ 283 284 REQUIRE(VALID_TASK(source)); 285 REQUIRE(targetp != NULL && *targetp == NULL); 286 287 XTTRACE(source, "isc_task_attach"); 288 289 isc_refcount_increment(&source->references); 290 291 *targetp = source; 292} 293 294static bool 295task_shutdown(isc_task_t *task) { 296 bool was_idle = false; 297 isc_event_t *event, *prev; 298 299 /* 300 * Caller must be holding the task's lock. 301 */ 302 303 XTRACE("task_shutdown"); 304 305 if (atomic_compare_exchange_strong(&task->shuttingdown, 306 &(bool){ false }, true)) 307 { 308 XTRACE("shutting down"); 309 if (task->state == task_state_idle) { 310 INSIST(EMPTY(task->events)); 311 task->state = task_state_ready; 312 was_idle = true; 313 } 314 INSIST(task->state == task_state_ready || 315 task->state == task_state_paused || 316 task->state == task_state_pausing || 317 task->state == task_state_running); 318 319 /* 320 * Note that we post shutdown events LIFO. 321 */ 322 for (event = TAIL(task->on_shutdown); event != NULL; 323 event = prev) 324 { 325 prev = PREV(event, ev_link); 326 DEQUEUE(task->on_shutdown, event, ev_link); 327 ENQUEUE(task->events, event, ev_link); 328 task->nevents++; 329 } 330 } 331 332 return (was_idle); 333} 334 335/* 336 * Moves a task onto the appropriate run queue. 337 * 338 * Caller must NOT hold queue lock. 339 */ 340static void 341task_ready(isc_task_t *task) { 342 isc_taskmgr_t *manager = task->manager; 343 REQUIRE(VALID_MANAGER(manager)); 344 345 XTRACE("task_ready"); 346 347 isc_refcount_increment0(&task->running); 348 LOCK(&task->lock); 349 isc_nm_task_enqueue(manager->netmgr, task, task->threadid); 350 UNLOCK(&task->lock); 351} 352 353void 354isc_task_ready(isc_task_t *task) { 355 task_ready(task); 356} 357 358static bool 359task_detach(isc_task_t *task) { 360 /* 361 * Caller must be holding the task lock. 362 */ 363 364 XTRACE("detach"); 365 366 if (isc_refcount_decrement(&task->references) == 1 && 367 task->state == task_state_idle) 368 { 369 INSIST(EMPTY(task->events)); 370 /* 371 * There are no references to this task, and no 372 * pending events. We could try to optimize and 373 * either initiate shutdown or clean up the task, 374 * depending on its state, but it's easier to just 375 * make the task ready and allow run() or the event 376 * loop to deal with shutting down and termination. 377 */ 378 task->state = task_state_ready; 379 return (true); 380 } 381 382 return (false); 383} 384 385void 386isc_task_detach(isc_task_t **taskp) { 387 isc_task_t *task; 388 bool was_idle; 389 390 /* 391 * Detach *taskp from its task. 392 */ 393 394 REQUIRE(taskp != NULL); 395 task = *taskp; 396 REQUIRE(VALID_TASK(task)); 397 398 XTRACE("isc_task_detach"); 399 400 LOCK(&task->lock); 401 was_idle = task_detach(task); 402 UNLOCK(&task->lock); 403 404 if (was_idle) { 405 task_ready(task); 406 } 407 408 *taskp = NULL; 409} 410 411static bool 412task_send(isc_task_t *task, isc_event_t **eventp, int c) { 413 bool was_idle = false; 414 isc_event_t *event; 415 416 /* 417 * Caller must be holding the task lock. 418 */ 419 420 REQUIRE(eventp != NULL); 421 event = *eventp; 422 *eventp = NULL; 423 REQUIRE(event != NULL); 424 REQUIRE(event->ev_type > 0); 425 REQUIRE(task->state != task_state_done); 426 REQUIRE(!ISC_LINK_LINKED(event, ev_ratelink)); 427 428 XTRACE("task_send"); 429 430 if (task->bound) { 431 c = task->threadid; 432 } else if (c < 0) { 433 c = -1; 434 } 435 436 if (task->state == task_state_idle) { 437 was_idle = true; 438 task->threadid = c; 439 INSIST(EMPTY(task->events)); 440 task->state = task_state_ready; 441 } 442 INSIST(task->state == task_state_ready || 443 task->state == task_state_running || 444 task->state == task_state_paused || 445 task->state == task_state_pausing); 446 ENQUEUE(task->events, event, ev_link); 447 task->nevents++; 448 449 return (was_idle); 450} 451 452void 453isc_task_send(isc_task_t *task, isc_event_t **eventp) { 454 isc_task_sendto(task, eventp, -1); 455} 456 457void 458isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { 459 isc_task_sendtoanddetach(taskp, eventp, -1); 460} 461 462void 463isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c) { 464 bool was_idle; 465 466 /* 467 * Send '*event' to 'task'. 468 */ 469 470 REQUIRE(VALID_TASK(task)); 471 XTRACE("isc_task_send"); 472 473 /* 474 * We're trying hard to hold locks for as short a time as possible. 475 * We're also trying to hold as few locks as possible. This is why 476 * some processing is deferred until after the lock is released. 477 */ 478 LOCK(&task->lock); 479 was_idle = task_send(task, eventp, c); 480 UNLOCK(&task->lock); 481 482 if (was_idle) { 483 /* 484 * We need to add this task to the ready queue. 485 * 486 * We've waited until now to do it because making a task 487 * ready requires locking the manager. If we tried to do 488 * this while holding the task lock, we could deadlock. 489 * 490 * We've changed the state to ready, so no one else will 491 * be trying to add this task to the ready queue. The 492 * only way to leave the ready state is by executing the 493 * task. It thus doesn't matter if events are added, 494 * removed, or a shutdown is started in the interval 495 * between the time we released the task lock, and the time 496 * we add the task to the ready queue. 497 */ 498 task_ready(task); 499 } 500} 501 502void 503isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { 504 bool idle1, idle2; 505 isc_task_t *task; 506 507 /* 508 * Send '*event' to '*taskp' and then detach '*taskp' from its 509 * task. 510 */ 511 512 REQUIRE(taskp != NULL); 513 task = *taskp; 514 REQUIRE(VALID_TASK(task)); 515 XTRACE("isc_task_sendanddetach"); 516 517 LOCK(&task->lock); 518 idle1 = task_send(task, eventp, c); 519 idle2 = task_detach(task); 520 UNLOCK(&task->lock); 521 522 /* 523 * If idle1, then idle2 shouldn't be true as well since we're holding 524 * the task lock, and thus the task cannot switch from ready back to 525 * idle. 526 */ 527 INSIST(!(idle1 && idle2)); 528 529 if (idle1 || idle2) { 530 task_ready(task); 531 } 532 533 *taskp = NULL; 534} 535 536#define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0) 537 538static unsigned int 539dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first, 540 isc_eventtype_t last, void *tag, isc_eventlist_t *events, 541 bool purging) { 542 isc_event_t *event, *next_event; 543 unsigned int count = 0; 544 545 REQUIRE(VALID_TASK(task)); 546 REQUIRE(last >= first); 547 548 XTRACE("dequeue_events"); 549 550 /* 551 * Events matching 'sender', whose type is >= first and <= last, and 552 * whose tag is 'tag' will be dequeued. If 'purging', matching events 553 * which are marked as unpurgable will not be dequeued. 554 * 555 * sender == NULL means "any sender", and tag == NULL means "any tag". 556 */ 557 558 LOCK(&task->lock); 559 560 for (event = HEAD(task->events); event != NULL; event = next_event) { 561 next_event = NEXT(event, ev_link); 562 if (event->ev_type >= first && event->ev_type <= last && 563 (sender == NULL || event->ev_sender == sender) && 564 (tag == NULL || event->ev_tag == tag) && 565 (!purging || PURGE_OK(event))) 566 { 567 DEQUEUE(task->events, event, ev_link); 568 task->nevents--; 569 ENQUEUE(*events, event, ev_link); 570 count++; 571 } 572 } 573 574 UNLOCK(&task->lock); 575 576 return (count); 577} 578 579unsigned int 580isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first, 581 isc_eventtype_t last, void *tag) { 582 unsigned int count; 583 isc_eventlist_t events; 584 isc_event_t *event, *next_event; 585 REQUIRE(VALID_TASK(task)); 586 587 /* 588 * Purge events from a task's event queue. 589 */ 590 591 XTRACE("isc_task_purgerange"); 592 593 ISC_LIST_INIT(events); 594 595 count = dequeue_events(task, sender, first, last, tag, &events, true); 596 597 for (event = HEAD(events); event != NULL; event = next_event) { 598 next_event = NEXT(event, ev_link); 599 ISC_LIST_UNLINK(events, event, ev_link); 600 isc_event_free(&event); 601 } 602 603 /* 604 * Note that purging never changes the state of the task. 605 */ 606 607 return (count); 608} 609 610unsigned int 611isc_task_purge(isc_task_t *task, void *sender, isc_eventtype_t type, 612 void *tag) { 613 /* 614 * Purge events from a task's event queue. 615 */ 616 REQUIRE(VALID_TASK(task)); 617 618 XTRACE("isc_task_purge"); 619 620 return (isc_task_purgerange(task, sender, type, type, tag)); 621} 622 623bool 624isc_task_purgeevent(isc_task_t *task, isc_event_t *event) { 625 bool found = false; 626 627 /* 628 * Purge 'event' from a task's event queue. 629 */ 630 631 REQUIRE(VALID_TASK(task)); 632 633 /* 634 * If 'event' is on the task's event queue, it will be purged, 635 * unless it is marked as unpurgeable. 'event' does not have to be 636 * on the task's event queue; in fact, it can even be an invalid 637 * pointer. Purging only occurs if the event is actually on the task's 638 * event queue. 639 * 640 * Purging never changes the state of the task. 641 */ 642 643 LOCK(&task->lock); 644 if (ISC_LINK_LINKED(event, ev_link)) { 645 DEQUEUE(task->events, event, ev_link); 646 task->nevents--; 647 found = true; 648 } 649 UNLOCK(&task->lock); 650 651 if (!found) { 652 return (false); 653 } 654 655 isc_event_free(&event); 656 657 return (true); 658} 659 660unsigned int 661isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first, 662 isc_eventtype_t last, void *tag, isc_eventlist_t *events) { 663 /* 664 * Remove events from a task's event queue. 665 */ 666 REQUIRE(VALID_TASK(task)); 667 668 XTRACE("isc_task_unsendrange"); 669 670 return (dequeue_events(task, sender, first, last, tag, events, false)); 671} 672 673unsigned int 674isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, void *tag, 675 isc_eventlist_t *events) { 676 /* 677 * Remove events from a task's event queue. 678 */ 679 680 XTRACE("isc_task_unsend"); 681 682 return (dequeue_events(task, sender, type, type, tag, events, false)); 683} 684 685isc_result_t 686isc_task_onshutdown(isc_task_t *task, isc_taskaction_t action, void *arg) { 687 bool disallowed = false; 688 isc_result_t result = ISC_R_SUCCESS; 689 isc_event_t *event; 690 691 /* 692 * Send a shutdown event with action 'action' and argument 'arg' when 693 * 'task' is shutdown. 694 */ 695 696 REQUIRE(VALID_TASK(task)); 697 REQUIRE(action != NULL); 698 699 event = isc_event_allocate(task->manager->mctx, NULL, 700 ISC_TASKEVENT_SHUTDOWN, action, arg, 701 sizeof(*event)); 702 703 if (TASK_SHUTTINGDOWN(task)) { 704 disallowed = true; 705 result = ISC_R_SHUTTINGDOWN; 706 } else { 707 LOCK(&task->lock); 708 ENQUEUE(task->on_shutdown, event, ev_link); 709 UNLOCK(&task->lock); 710 } 711 712 if (disallowed) { 713 isc_mem_put(task->manager->mctx, event, sizeof(*event)); 714 } 715 716 return (result); 717} 718 719void 720isc_task_shutdown(isc_task_t *task) { 721 bool was_idle; 722 723 /* 724 * Shutdown 'task'. 725 */ 726 727 REQUIRE(VALID_TASK(task)); 728 729 LOCK(&task->lock); 730 was_idle = task_shutdown(task); 731 UNLOCK(&task->lock); 732 733 if (was_idle) { 734 task_ready(task); 735 } 736} 737 738void 739isc_task_destroy(isc_task_t **taskp) { 740 /* 741 * Destroy '*taskp'. 742 */ 743 744 REQUIRE(taskp != NULL); 745 746 isc_task_shutdown(*taskp); 747 isc_task_detach(taskp); 748} 749 750void 751isc_task_setname(isc_task_t *task, const char *name, void *tag) { 752 /* 753 * Name 'task'. 754 */ 755 756 REQUIRE(VALID_TASK(task)); 757 758 LOCK(&task->lock); 759 strlcpy(task->name, name, sizeof(task->name)); 760 task->tag = tag; 761 UNLOCK(&task->lock); 762} 763 764const char * 765isc_task_getname(isc_task_t *task) { 766 REQUIRE(VALID_TASK(task)); 767 768 return (task->name); 769} 770 771void * 772isc_task_gettag(isc_task_t *task) { 773 REQUIRE(VALID_TASK(task)); 774 775 return (task->tag); 776} 777 778void 779isc_task_getcurrenttime(isc_task_t *task, isc_stdtime_t *t) { 780 REQUIRE(VALID_TASK(task)); 781 REQUIRE(t != NULL); 782 783 LOCK(&task->lock); 784 *t = task->now; 785 UNLOCK(&task->lock); 786} 787 788void 789isc_task_getcurrenttimex(isc_task_t *task, isc_time_t *t) { 790 REQUIRE(VALID_TASK(task)); 791 REQUIRE(t != NULL); 792 793 LOCK(&task->lock); 794 *t = task->tnow; 795 UNLOCK(&task->lock); 796} 797 798isc_nm_t * 799isc_task_getnetmgr(isc_task_t *task) { 800 REQUIRE(VALID_TASK(task)); 801 802 return (task->manager->netmgr); 803} 804 805void 806isc_task_setquantum(isc_task_t *task, unsigned int quantum) { 807 REQUIRE(VALID_TASK(task)); 808 809 LOCK(&task->lock); 810 task->quantum = (quantum > 0) ? quantum 811 : task->manager->default_quantum; 812 UNLOCK(&task->lock); 813} 814 815/*** 816 *** Task Manager. 817 ***/ 818 819static isc_result_t 820task_run(isc_task_t *task) { 821 unsigned int dispatch_count = 0; 822 bool finished = false; 823 isc_event_t *event = NULL; 824 isc_result_t result = ISC_R_SUCCESS; 825 uint32_t quantum; 826 827 REQUIRE(VALID_TASK(task)); 828 829 LOCK(&task->lock); 830 quantum = task->quantum; 831 832 /* 833 * It is possible because that we have a paused task in the queue - it 834 * might have been paused in the meantime and we never hold both queue 835 * and task lock to avoid deadlocks, just bail then. 836 */ 837 if (task->state != task_state_ready) { 838 goto done; 839 } 840 841 INSIST(task->state == task_state_ready); 842 task->state = task_state_running; 843 XTRACE("running"); 844 XTRACE(task->name); 845 TIME_NOW(&task->tnow); 846 task->now = isc_time_seconds(&task->tnow); 847 848 while (true) { 849 if (!EMPTY(task->events)) { 850 event = HEAD(task->events); 851 DEQUEUE(task->events, event, ev_link); 852 task->nevents--; 853 854 /* 855 * Execute the event action. 856 */ 857 XTRACE("execute action"); 858 XTRACE(task->name); 859 if (event->ev_action != NULL) { 860 UNLOCK(&task->lock); 861 (event->ev_action)(task, event); 862 LOCK(&task->lock); 863 } 864 XTRACE("execution complete"); 865 dispatch_count++; 866 } 867 868 if (isc_refcount_current(&task->references) == 0 && 869 EMPTY(task->events) && !TASK_SHUTTINGDOWN(task)) 870 { 871 /* 872 * There are no references and no pending events for 873 * this task, which means it will not become runnable 874 * again via an external action (such as sending an 875 * event or detaching). 876 * 877 * We initiate shutdown to prevent it from becoming a 878 * zombie. 879 * 880 * We do this here instead of in the "if 881 * EMPTY(task->events)" block below because: 882 * 883 * If we post no shutdown events, we want the task 884 * to finish. 885 * 886 * If we did post shutdown events, will still want 887 * the task's quantum to be applied. 888 */ 889 INSIST(!task_shutdown(task)); 890 } 891 892 if (EMPTY(task->events)) { 893 /* 894 * Nothing else to do for this task right now. 895 */ 896 XTRACE("empty"); 897 if (isc_refcount_current(&task->references) == 0 && 898 TASK_SHUTTINGDOWN(task)) 899 { 900 /* 901 * The task is done. 902 */ 903 XTRACE("done"); 904 task->state = task_state_done; 905 } else { 906 if (task->state == task_state_running) { 907 XTRACE("idling"); 908 task->state = task_state_idle; 909 } else if (task->state == task_state_pausing) { 910 XTRACE("pausing"); 911 task->state = task_state_paused; 912 } 913 } 914 break; 915 } else if (task->state == task_state_pausing) { 916 /* 917 * We got a pause request on this task, stop working on 918 * it and switch the state to paused. 919 */ 920 XTRACE("pausing"); 921 task->state = task_state_paused; 922 break; 923 } else if (dispatch_count >= quantum) { 924 /* 925 * Our quantum has expired, but there is more work to be 926 * done. We'll requeue it to the ready queue later. 927 * 928 * We don't check quantum until dispatching at least one 929 * event, so the minimum quantum is one. 930 */ 931 XTRACE("quantum"); 932 task->state = task_state_ready; 933 result = ISC_R_QUOTA; 934 break; 935 } 936 } 937 938done: 939 if (isc_refcount_decrement(&task->running) == 1 && 940 task->state == task_state_done) 941 { 942 finished = true; 943 } 944 UNLOCK(&task->lock); 945 946 if (finished) { 947 task_finished(task); 948 } 949 950 return (result); 951} 952 953isc_result_t 954isc_task_run(isc_task_t *task) { 955 return (task_run(task)); 956} 957 958static void 959manager_free(isc_taskmgr_t *manager) { 960 isc_refcount_destroy(&manager->references); 961 isc_nm_detach(&manager->netmgr); 962 963 isc_mutex_destroy(&manager->lock); 964 manager->magic = 0; 965 isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); 966} 967 968void 969isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) { 970 REQUIRE(VALID_MANAGER(source)); 971 REQUIRE(targetp != NULL && *targetp == NULL); 972 973 isc_refcount_increment(&source->references); 974 975 *targetp = source; 976} 977 978void 979isc_taskmgr_detach(isc_taskmgr_t **managerp) { 980 REQUIRE(managerp != NULL); 981 REQUIRE(VALID_MANAGER(*managerp)); 982 983 isc_taskmgr_t *manager = *managerp; 984 *managerp = NULL; 985 986 if (isc_refcount_decrement(&manager->references) == 1) { 987 manager_free(manager); 988 } 989} 990 991isc_result_t 992isc__taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, 993 isc_taskmgr_t **managerp) { 994 isc_taskmgr_t *manager; 995 996 /* 997 * Create a new task manager. 998 */ 999 1000 REQUIRE(managerp != NULL && *managerp == NULL); 1001 REQUIRE(nm != NULL); 1002 1003 manager = isc_mem_get(mctx, sizeof(*manager)); 1004 *manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC }; 1005 1006 isc_mutex_init(&manager->lock); 1007 1008 if (default_quantum == 0) { 1009 default_quantum = DEFAULT_DEFAULT_QUANTUM; 1010 } 1011 manager->default_quantum = default_quantum; 1012 1013 if (nm != NULL) { 1014 isc_nm_attach(nm, &manager->netmgr); 1015 } 1016 1017 INIT_LIST(manager->tasks); 1018 atomic_init(&manager->mode, isc_taskmgrmode_normal); 1019 atomic_init(&manager->exclusive_req, false); 1020 atomic_init(&manager->tasks_count, 0); 1021 1022 isc_mem_attach(mctx, &manager->mctx); 1023 1024 isc_refcount_init(&manager->references, 1); 1025 1026 *managerp = manager; 1027 1028 return (ISC_R_SUCCESS); 1029} 1030 1031void 1032isc__taskmgr_shutdown(isc_taskmgr_t *manager) { 1033 isc_task_t *task; 1034 1035 REQUIRE(VALID_MANAGER(manager)); 1036 1037 XTHREADTRACE("isc_taskmgr_shutdown"); 1038 /* 1039 * Only one non-worker thread may ever call this routine. 1040 * If a worker thread wants to initiate shutdown of the 1041 * task manager, it should ask some non-worker thread to call 1042 * isc_taskmgr_destroy(), e.g. by signalling a condition variable 1043 * that the startup thread is sleeping on. 1044 */ 1045 1046 /* 1047 * Unlike elsewhere, we're going to hold this lock a long time. 1048 * We need to do so, because otherwise the list of tasks could 1049 * change while we were traversing it. 1050 * 1051 * This is also the only function where we will hold both the 1052 * task manager lock and a task lock at the same time. 1053 */ 1054 LOCK(&manager->lock); 1055 if (manager->excl != NULL) { 1056 isc_task_detach((isc_task_t **)&manager->excl); 1057 } 1058 1059 /* 1060 * Make sure we only get called once. 1061 */ 1062 INSIST(manager->exiting == false); 1063 manager->exiting = true; 1064 1065 /* 1066 * Post shutdown event(s) to every task (if they haven't already been 1067 * posted). 1068 */ 1069 for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link)) 1070 { 1071 bool was_idle; 1072 1073 LOCK(&task->lock); 1074 was_idle = task_shutdown(task); 1075 if (was_idle) { 1076 task->threadid = 0; 1077 } 1078 UNLOCK(&task->lock); 1079 1080 if (was_idle) { 1081 task_ready(task); 1082 } 1083 } 1084 1085 UNLOCK(&manager->lock); 1086} 1087 1088void 1089isc__taskmgr_destroy(isc_taskmgr_t **managerp) { 1090 REQUIRE(managerp != NULL && VALID_MANAGER(*managerp)); 1091 XTHREADTRACE("isc_taskmgr_destroy"); 1092 1093#ifdef ISC_TASK_TRACE 1094 int counter = 0; 1095 while (isc_refcount_current(&(*managerp)->references) > 1 && 1096 counter++ < 1000) 1097 { 1098 usleep(10 * 1000); 1099 } 1100 INSIST(counter < 1000); 1101#else 1102 while (isc_refcount_current(&(*managerp)->references) > 1) { 1103 usleep(10 * 1000); 1104 } 1105#endif 1106 1107 isc_taskmgr_detach(managerp); 1108} 1109 1110void 1111isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) { 1112 REQUIRE(VALID_MANAGER(mgr)); 1113 REQUIRE(VALID_TASK(task)); 1114 1115 LOCK(&task->lock); 1116 REQUIRE(task->threadid == 0); 1117 UNLOCK(&task->lock); 1118 1119 LOCK(&mgr->lock); 1120 if (mgr->excl != NULL) { 1121 isc_task_detach(&mgr->excl); 1122 } 1123 isc_task_attach(task, &mgr->excl); 1124 UNLOCK(&mgr->lock); 1125} 1126 1127isc_result_t 1128isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp) { 1129 isc_result_t result; 1130 1131 REQUIRE(VALID_MANAGER(mgr)); 1132 REQUIRE(taskp != NULL && *taskp == NULL); 1133 1134 LOCK(&mgr->lock); 1135 if (mgr->excl != NULL) { 1136 isc_task_attach(mgr->excl, taskp); 1137 result = ISC_R_SUCCESS; 1138 } else if (mgr->exiting) { 1139 result = ISC_R_SHUTTINGDOWN; 1140 } else { 1141 result = ISC_R_NOTFOUND; 1142 } 1143 UNLOCK(&mgr->lock); 1144 1145 return (result); 1146} 1147 1148isc_result_t 1149isc_task_beginexclusive(isc_task_t *task) { 1150 isc_taskmgr_t *manager; 1151 1152 REQUIRE(VALID_TASK(task)); 1153 1154 manager = task->manager; 1155 1156 REQUIRE(task->state == task_state_running); 1157 1158 LOCK(&manager->lock); 1159 REQUIRE(task == manager->excl || 1160 (manager->exiting && manager->excl == NULL)); 1161 UNLOCK(&manager->lock); 1162 1163 if (!atomic_compare_exchange_strong(&manager->exclusive_req, 1164 &(bool){ false }, true)) 1165 { 1166 return (ISC_R_LOCKBUSY); 1167 } 1168 1169 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1170 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1171 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1172 "exclusive task mode: %s", "starting"); 1173 } 1174 1175 isc_nm_pause(manager->netmgr); 1176 1177 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1178 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1179 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1180 "exclusive task mode: %s", "started"); 1181 } 1182 1183 return (ISC_R_SUCCESS); 1184} 1185 1186void 1187isc_task_endexclusive(isc_task_t *task) { 1188 isc_taskmgr_t *manager; 1189 1190 REQUIRE(VALID_TASK(task)); 1191 REQUIRE(task->state == task_state_running); 1192 manager = task->manager; 1193 1194 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1195 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1196 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1197 "exclusive task mode: %s", "ending"); 1198 } 1199 1200 isc_nm_resume(manager->netmgr); 1201 1202 if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) { 1203 isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, 1204 ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), 1205 "exclusive task mode: %s", "ended"); 1206 } 1207 1208 REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req, 1209 &(bool){ true }, false)); 1210} 1211 1212void 1213isc_task_pause(isc_task_t *task) { 1214 REQUIRE(VALID_TASK(task)); 1215 1216 LOCK(&task->lock); 1217 task->pause_cnt++; 1218 if (task->pause_cnt > 1) { 1219 /* 1220 * Someone already paused this task, just increase 1221 * the number of pausing clients. 1222 */ 1223 UNLOCK(&task->lock); 1224 return; 1225 } 1226 1227 INSIST(task->state == task_state_idle || 1228 task->state == task_state_ready || 1229 task->state == task_state_running); 1230 if (task->state == task_state_running) { 1231 task->state = task_state_pausing; 1232 } else { 1233 task->state = task_state_paused; 1234 } 1235 UNLOCK(&task->lock); 1236} 1237 1238void 1239isc_task_unpause(isc_task_t *task) { 1240 bool was_idle = false; 1241 1242 REQUIRE(VALID_TASK(task)); 1243 1244 LOCK(&task->lock); 1245 task->pause_cnt--; 1246 INSIST(task->pause_cnt >= 0); 1247 if (task->pause_cnt > 0) { 1248 UNLOCK(&task->lock); 1249 return; 1250 } 1251 1252 INSIST(task->state == task_state_paused || 1253 task->state == task_state_pausing); 1254 /* If the task was pausing we can't reschedule it */ 1255 if (task->state == task_state_pausing) { 1256 task->state = task_state_running; 1257 } else { 1258 task->state = task_state_idle; 1259 } 1260 if (task->state == task_state_idle && !EMPTY(task->events)) { 1261 task->state = task_state_ready; 1262 was_idle = true; 1263 } 1264 UNLOCK(&task->lock); 1265 1266 if (was_idle) { 1267 task_ready(task); 1268 } 1269} 1270 1271void 1272isc_taskmgr_setmode(isc_taskmgr_t *manager, isc_taskmgrmode_t mode) { 1273 atomic_store(&manager->mode, mode); 1274} 1275 1276isc_taskmgrmode_t 1277isc_taskmgr_mode(isc_taskmgr_t *manager) { 1278 return (atomic_load(&manager->mode)); 1279} 1280 1281void 1282isc_task_setprivilege(isc_task_t *task, bool priv) { 1283 REQUIRE(VALID_TASK(task)); 1284 1285 atomic_store_release(&task->privileged, priv); 1286} 1287 1288bool 1289isc_task_getprivilege(isc_task_t *task) { 1290 REQUIRE(VALID_TASK(task)); 1291 1292 return (TASK_PRIVILEGED(task)); 1293} 1294 1295bool 1296isc_task_privileged(isc_task_t *task) { 1297 REQUIRE(VALID_TASK(task)); 1298 1299 return (isc_taskmgr_mode(task->manager) && TASK_PRIVILEGED(task)); 1300} 1301 1302bool 1303isc_task_exiting(isc_task_t *task) { 1304 REQUIRE(VALID_TASK(task)); 1305 1306 return (TASK_SHUTTINGDOWN(task)); 1307} 1308 1309#ifdef HAVE_LIBXML2 1310#define TRY0(a) \ 1311 do { \ 1312 xmlrc = (a); \ 1313 if (xmlrc < 0) \ 1314 goto error; \ 1315 } while (0) 1316int 1317isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) { 1318 isc_task_t *task = NULL; 1319 int xmlrc; 1320 xmlTextWriterPtr writer = (xmlTextWriterPtr)writer0; 1321 1322 LOCK(&mgr->lock); 1323 1324 /* 1325 * Write out the thread-model, and some details about each depending 1326 * on which type is enabled. 1327 */ 1328 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "thread-model")); 1329 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "type")); 1330 TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded")); 1331 TRY0(xmlTextWriterEndElement(writer)); /* type */ 1332 1333 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum")); 1334 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1335 mgr->default_quantum)); 1336 TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */ 1337 1338 TRY0(xmlTextWriterEndElement(writer)); /* thread-model */ 1339 1340 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks")); 1341 task = ISC_LIST_HEAD(mgr->tasks); 1342 while (task != NULL) { 1343 LOCK(&task->lock); 1344 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "task")); 1345 1346 if (task->name[0] != 0) { 1347 TRY0(xmlTextWriterStartElement(writer, 1348 ISC_XMLCHAR "name")); 1349 TRY0(xmlTextWriterWriteFormatString(writer, "%s", 1350 task->name)); 1351 TRY0(xmlTextWriterEndElement(writer)); /* name */ 1352 } 1353 1354 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "reference" 1355 "s")); 1356 TRY0(xmlTextWriterWriteFormatString( 1357 writer, "%" PRIuFAST32, 1358 isc_refcount_current(&task->references))); 1359 TRY0(xmlTextWriterEndElement(writer)); /* references */ 1360 1361 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "id")); 1362 TRY0(xmlTextWriterWriteFormatString(writer, "%p", task)); 1363 TRY0(xmlTextWriterEndElement(writer)); /* id */ 1364 1365 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "state")); 1366 TRY0(xmlTextWriterWriteFormatString(writer, "%s", 1367 statenames[task->state])); 1368 TRY0(xmlTextWriterEndElement(writer)); /* state */ 1369 1370 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "quantum")); 1371 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1372 task->quantum)); 1373 TRY0(xmlTextWriterEndElement(writer)); /* quantum */ 1374 1375 TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "events")); 1376 TRY0(xmlTextWriterWriteFormatString(writer, "%d", 1377 task->nevents)); 1378 TRY0(xmlTextWriterEndElement(writer)); /* events */ 1379 1380 TRY0(xmlTextWriterEndElement(writer)); 1381 1382 UNLOCK(&task->lock); 1383 task = ISC_LIST_NEXT(task, link); 1384 } 1385 TRY0(xmlTextWriterEndElement(writer)); /* tasks */ 1386 1387error: 1388 if (task != NULL) { 1389 UNLOCK(&task->lock); 1390 } 1391 UNLOCK(&mgr->lock); 1392 1393 return (xmlrc); 1394} 1395#endif /* HAVE_LIBXML2 */ 1396 1397#ifdef HAVE_JSON_C 1398#define CHECKMEM(m) \ 1399 do { \ 1400 if (m == NULL) { \ 1401 result = ISC_R_NOMEMORY; \ 1402 goto error; \ 1403 } \ 1404 } while (0) 1405 1406isc_result_t 1407isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) { 1408 isc_result_t result = ISC_R_SUCCESS; 1409 isc_task_t *task = NULL; 1410 json_object *obj = NULL, *array = NULL, *taskobj = NULL; 1411 json_object *tasks = (json_object *)tasks0; 1412 1413 LOCK(&mgr->lock); 1414 1415 /* 1416 * Write out the thread-model, and some details about each depending 1417 * on which type is enabled. 1418 */ 1419 obj = json_object_new_string("threaded"); 1420 CHECKMEM(obj); 1421 json_object_object_add(tasks, "thread-model", obj); 1422 1423 obj = json_object_new_int(mgr->default_quantum); 1424 CHECKMEM(obj); 1425 json_object_object_add(tasks, "default-quantum", obj); 1426 1427 array = json_object_new_array(); 1428 CHECKMEM(array); 1429 1430 for (task = ISC_LIST_HEAD(mgr->tasks); task != NULL; 1431 task = ISC_LIST_NEXT(task, link)) 1432 { 1433 char buf[255]; 1434 1435 LOCK(&task->lock); 1436 1437 taskobj = json_object_new_object(); 1438 CHECKMEM(taskobj); 1439 json_object_array_add(array, taskobj); 1440 1441 snprintf(buf, sizeof(buf), "%p", task); 1442 obj = json_object_new_string(buf); 1443 CHECKMEM(obj); 1444 json_object_object_add(taskobj, "id", obj); 1445 1446 if (task->name[0] != 0) { 1447 obj = json_object_new_string(task->name); 1448 CHECKMEM(obj); 1449 json_object_object_add(taskobj, "name", obj); 1450 } 1451 1452 obj = json_object_new_int( 1453 isc_refcount_current(&task->references)); 1454 CHECKMEM(obj); 1455 json_object_object_add(taskobj, "references", obj); 1456 1457 obj = json_object_new_string(statenames[task->state]); 1458 CHECKMEM(obj); 1459 json_object_object_add(taskobj, "state", obj); 1460 1461 obj = json_object_new_int(task->quantum); 1462 CHECKMEM(obj); 1463 json_object_object_add(taskobj, "quantum", obj); 1464 1465 obj = json_object_new_int(task->nevents); 1466 CHECKMEM(obj); 1467 json_object_object_add(taskobj, "events", obj); 1468 1469 UNLOCK(&task->lock); 1470 } 1471 1472 json_object_object_add(tasks, "tasks", array); 1473 array = NULL; 1474 result = ISC_R_SUCCESS; 1475 1476error: 1477 if (array != NULL) { 1478 json_object_put(array); 1479 } 1480 1481 if (task != NULL) { 1482 UNLOCK(&task->lock); 1483 } 1484 UNLOCK(&mgr->lock); 1485 1486 return (result); 1487} 1488#endif /* ifdef HAVE_JSON_C */ 1489