1/* 2 * Copyright (C) 2004-2008 Internet Systems Consortium, Inc. ("ISC") 3 * Copyright (C) 1998-2003 Internet Software Consortium. 4 * 5 * Permission to use, copy, modify, and/or distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH 10 * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 11 * AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, 12 * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 13 * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 14 * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 15 * PERFORMANCE OF THIS SOFTWARE. 16 */ 17 18/* $Id: task.c,v 1.107 2008/03/27 23:46:57 tbox Exp $ */ 19 20/*! \file 21 * \author Principal Author: Bob Halley 22 */ 23 24/* 25 * XXXRTH Need to document the states a task can be in, and the rules 26 * for changing states. 27 */ 28 29#include <config.h> 30 31#include <isc/condition.h> 32#include <isc/event.h> 33#include <isc/magic.h> 34#include <isc/mem.h> 35#include <isc/msgs.h> 36#include <isc/platform.h> 37#include <isc/string.h> 38#include <isc/task.h> 39#include <isc/thread.h> 40#include <isc/util.h> 41#include <isc/xml.h> 42 43#ifndef ISC_PLATFORM_USETHREADS 44#include "task_p.h" 45#endif /* ISC_PLATFORM_USETHREADS */ 46 47#ifdef ISC_TASK_TRACE 48#define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \ 49 task, isc_thread_self(), (m)) 50#define XTTRACE(t, m) fprintf(stderr, "task %p thread %lu: %s\n", \ 51 (t), isc_thread_self(), (m)) 52#define XTHREADTRACE(m) fprintf(stderr, "thread %lu: %s\n", \ 53 isc_thread_self(), (m)) 54#else 55#define XTRACE(m) 56#define XTTRACE(t, m) 57#define XTHREADTRACE(m) 58#endif 59 60/*** 61 *** Types. 62 ***/ 63 64typedef enum { 65 task_state_idle, task_state_ready, task_state_running, 66 task_state_done 67} task_state_t; 68 69#ifdef HAVE_LIBXML2 70static const char *statenames[] = { 71 "idle", "ready", "running", "done", 72}; 73#endif 74 75#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') 76#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) 77 78struct isc_task { 79 /* Not locked. */ 80 unsigned int magic; 81 isc_taskmgr_t * manager; 82 isc_mutex_t lock; 83 /* Locked by task lock. */ 84 task_state_t state; 85 unsigned int references; 86 isc_eventlist_t events; 87 isc_eventlist_t on_shutdown; 88 unsigned int quantum; 89 unsigned int flags; 90 isc_stdtime_t now; 91 char name[16]; 92 void * tag; 93 /* Locked by task manager lock. */ 94 LINK(isc_task_t) link; 95 LINK(isc_task_t) ready_link; 96}; 97 98#define TASK_F_SHUTTINGDOWN 0x01 99 100#define TASK_SHUTTINGDOWN(t) (((t)->flags & TASK_F_SHUTTINGDOWN) \ 101 != 0) 102 103#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') 104#define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) 105 106struct isc_taskmgr { 107 /* Not locked. */ 108 unsigned int magic; 109 isc_mem_t * mctx; 110 isc_mutex_t lock; 111#ifdef ISC_PLATFORM_USETHREADS 112 unsigned int workers; 113 isc_thread_t * threads; 114#endif /* ISC_PLATFORM_USETHREADS */ 115 /* Locked by task manager lock. */ 116 unsigned int default_quantum; 117 LIST(isc_task_t) tasks; 118 isc_tasklist_t ready_tasks; 119#ifdef ISC_PLATFORM_USETHREADS 120 isc_condition_t work_available; 121 isc_condition_t exclusive_granted; 122#endif /* ISC_PLATFORM_USETHREADS */ 123 unsigned int tasks_running; 124 isc_boolean_t exclusive_requested; 125 isc_boolean_t exiting; 126#ifndef ISC_PLATFORM_USETHREADS 127 unsigned int refs; 128#endif /* ISC_PLATFORM_USETHREADS */ 129}; 130 131#define DEFAULT_TASKMGR_QUANTUM 10 132#define DEFAULT_DEFAULT_QUANTUM 5 133#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks)) 134 135#ifndef ISC_PLATFORM_USETHREADS 136static isc_taskmgr_t *taskmgr = NULL; 137#endif /* ISC_PLATFORM_USETHREADS */ 138 139/*** 140 *** Tasks. 141 ***/ 142 143static void 144task_finished(isc_task_t *task) { 145 isc_taskmgr_t *manager = task->manager; 146 147 REQUIRE(EMPTY(task->events)); 148 REQUIRE(EMPTY(task->on_shutdown)); 149 REQUIRE(task->references == 0); 150 REQUIRE(task->state == task_state_done); 151 152 XTRACE("task_finished"); 153 154 LOCK(&manager->lock); 155 UNLINK(manager->tasks, task, link); 156#ifdef ISC_PLATFORM_USETHREADS 157 if (FINISHED(manager)) { 158 /* 159 * All tasks have completed and the 160 * task manager is exiting. Wake up 161 * any idle worker threads so they 162 * can exit. 163 */ 164 BROADCAST(&manager->work_available); 165 } 166#endif /* ISC_PLATFORM_USETHREADS */ 167 UNLOCK(&manager->lock); 168 169 DESTROYLOCK(&task->lock); 170 task->magic = 0; 171 isc_mem_put(manager->mctx, task, sizeof(*task)); 172} 173 174isc_result_t 175isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, 176 isc_task_t **taskp) 177{ 178 isc_task_t *task; 179 isc_boolean_t exiting; 180 isc_result_t result; 181 182 REQUIRE(VALID_MANAGER(manager)); 183 REQUIRE(taskp != NULL && *taskp == NULL); 184 185 task = isc_mem_get(manager->mctx, sizeof(*task)); 186 if (task == NULL) 187 return (ISC_R_NOMEMORY); 188 XTRACE("isc_task_create"); 189 task->manager = manager; 190 result = isc_mutex_init(&task->lock); 191 if (result != ISC_R_SUCCESS) { 192 isc_mem_put(manager->mctx, task, sizeof(*task)); 193 return (result); 194 } 195 task->state = task_state_idle; 196 task->references = 1; 197 INIT_LIST(task->events); 198 INIT_LIST(task->on_shutdown); 199 task->quantum = quantum; 200 task->flags = 0; 201 task->now = 0; 202 memset(task->name, 0, sizeof(task->name)); 203 task->tag = NULL; 204 INIT_LINK(task, link); 205 INIT_LINK(task, ready_link); 206 207 exiting = ISC_FALSE; 208 LOCK(&manager->lock); 209 if (!manager->exiting) { 210 if (task->quantum == 0) 211 task->quantum = manager->default_quantum; 212 APPEND(manager->tasks, task, link); 213 } else 214 exiting = ISC_TRUE; 215 UNLOCK(&manager->lock); 216 217 if (exiting) { 218 DESTROYLOCK(&task->lock); 219 isc_mem_put(manager->mctx, task, sizeof(*task)); 220 return (ISC_R_SHUTTINGDOWN); 221 } 222 223 task->magic = TASK_MAGIC; 224 *taskp = task; 225 226 return (ISC_R_SUCCESS); 227} 228 229void 230isc_task_attach(isc_task_t *source, isc_task_t **targetp) { 231 232 /* 233 * Attach *targetp to source. 234 */ 235 236 REQUIRE(VALID_TASK(source)); 237 REQUIRE(targetp != NULL && *targetp == NULL); 238 239 XTTRACE(source, "isc_task_attach"); 240 241 LOCK(&source->lock); 242 source->references++; 243 UNLOCK(&source->lock); 244 245 *targetp = source; 246} 247 248static inline isc_boolean_t 249task_shutdown(isc_task_t *task) { 250 isc_boolean_t was_idle = ISC_FALSE; 251 isc_event_t *event, *prev; 252 253 /* 254 * Caller must be holding the task's lock. 255 */ 256 257 XTRACE("task_shutdown"); 258 259 if (! TASK_SHUTTINGDOWN(task)) { 260 XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 261 ISC_MSG_SHUTTINGDOWN, "shutting down")); 262 task->flags |= TASK_F_SHUTTINGDOWN; 263 if (task->state == task_state_idle) { 264 INSIST(EMPTY(task->events)); 265 task->state = task_state_ready; 266 was_idle = ISC_TRUE; 267 } 268 INSIST(task->state == task_state_ready || 269 task->state == task_state_running); 270 /* 271 * Note that we post shutdown events LIFO. 272 */ 273 for (event = TAIL(task->on_shutdown); 274 event != NULL; 275 event = prev) { 276 prev = PREV(event, ev_link); 277 DEQUEUE(task->on_shutdown, event, ev_link); 278 ENQUEUE(task->events, event, ev_link); 279 } 280 } 281 282 return (was_idle); 283} 284 285static inline void 286task_ready(isc_task_t *task) { 287 isc_taskmgr_t *manager = task->manager; 288 289 REQUIRE(VALID_MANAGER(manager)); 290 REQUIRE(task->state == task_state_ready); 291 292 XTRACE("task_ready"); 293 294 LOCK(&manager->lock); 295 296 ENQUEUE(manager->ready_tasks, task, ready_link); 297#ifdef ISC_PLATFORM_USETHREADS 298 SIGNAL(&manager->work_available); 299#endif /* ISC_PLATFORM_USETHREADS */ 300 301 UNLOCK(&manager->lock); 302} 303 304static inline isc_boolean_t 305task_detach(isc_task_t *task) { 306 307 /* 308 * Caller must be holding the task lock. 309 */ 310 311 REQUIRE(task->references > 0); 312 313 XTRACE("detach"); 314 315 task->references--; 316 if (task->references == 0 && task->state == task_state_idle) { 317 INSIST(EMPTY(task->events)); 318 /* 319 * There are no references to this task, and no 320 * pending events. We could try to optimize and 321 * either initiate shutdown or clean up the task, 322 * depending on its state, but it's easier to just 323 * make the task ready and allow run() or the event 324 * loop to deal with shutting down and termination. 325 */ 326 task->state = task_state_ready; 327 return (ISC_TRUE); 328 } 329 330 return (ISC_FALSE); 331} 332 333void 334isc_task_detach(isc_task_t **taskp) { 335 isc_task_t *task; 336 isc_boolean_t was_idle; 337 338 /* 339 * Detach *taskp from its task. 340 */ 341 342 REQUIRE(taskp != NULL); 343 task = *taskp; 344 REQUIRE(VALID_TASK(task)); 345 346 XTRACE("isc_task_detach"); 347 348 LOCK(&task->lock); 349 was_idle = task_detach(task); 350 UNLOCK(&task->lock); 351 352 if (was_idle) 353 task_ready(task); 354 355 *taskp = NULL; 356} 357 358static inline isc_boolean_t 359task_send(isc_task_t *task, isc_event_t **eventp) { 360 isc_boolean_t was_idle = ISC_FALSE; 361 isc_event_t *event; 362 363 /* 364 * Caller must be holding the task lock. 365 */ 366 367 REQUIRE(eventp != NULL); 368 event = *eventp; 369 REQUIRE(event != NULL); 370 REQUIRE(event->ev_type > 0); 371 REQUIRE(task->state != task_state_done); 372 373 XTRACE("task_send"); 374 375 if (task->state == task_state_idle) { 376 was_idle = ISC_TRUE; 377 INSIST(EMPTY(task->events)); 378 task->state = task_state_ready; 379 } 380 INSIST(task->state == task_state_ready || 381 task->state == task_state_running); 382 ENQUEUE(task->events, event, ev_link); 383 *eventp = NULL; 384 385 return (was_idle); 386} 387 388void 389isc_task_send(isc_task_t *task, isc_event_t **eventp) { 390 isc_boolean_t was_idle; 391 392 /* 393 * Send '*event' to 'task'. 394 */ 395 396 REQUIRE(VALID_TASK(task)); 397 398 XTRACE("isc_task_send"); 399 400 /* 401 * We're trying hard to hold locks for as short a time as possible. 402 * We're also trying to hold as few locks as possible. This is why 403 * some processing is deferred until after the lock is released. 404 */ 405 LOCK(&task->lock); 406 was_idle = task_send(task, eventp); 407 UNLOCK(&task->lock); 408 409 if (was_idle) { 410 /* 411 * We need to add this task to the ready queue. 412 * 413 * We've waited until now to do it because making a task 414 * ready requires locking the manager. If we tried to do 415 * this while holding the task lock, we could deadlock. 416 * 417 * We've changed the state to ready, so no one else will 418 * be trying to add this task to the ready queue. The 419 * only way to leave the ready state is by executing the 420 * task. It thus doesn't matter if events are added, 421 * removed, or a shutdown is started in the interval 422 * between the time we released the task lock, and the time 423 * we add the task to the ready queue. 424 */ 425 task_ready(task); 426 } 427} 428 429void 430isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { 431 isc_boolean_t idle1, idle2; 432 isc_task_t *task; 433 434 /* 435 * Send '*event' to '*taskp' and then detach '*taskp' from its 436 * task. 437 */ 438 439 REQUIRE(taskp != NULL); 440 task = *taskp; 441 REQUIRE(VALID_TASK(task)); 442 443 XTRACE("isc_task_sendanddetach"); 444 445 LOCK(&task->lock); 446 idle1 = task_send(task, eventp); 447 idle2 = task_detach(task); 448 UNLOCK(&task->lock); 449 450 /* 451 * If idle1, then idle2 shouldn't be true as well since we're holding 452 * the task lock, and thus the task cannot switch from ready back to 453 * idle. 454 */ 455 INSIST(!(idle1 && idle2)); 456 457 if (idle1 || idle2) 458 task_ready(task); 459 460 *taskp = NULL; 461} 462 463#define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0) 464 465static unsigned int 466dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first, 467 isc_eventtype_t last, void *tag, 468 isc_eventlist_t *events, isc_boolean_t purging) 469{ 470 isc_event_t *event, *next_event; 471 unsigned int count = 0; 472 473 REQUIRE(VALID_TASK(task)); 474 REQUIRE(last >= first); 475 476 XTRACE("dequeue_events"); 477 478 /* 479 * Events matching 'sender', whose type is >= first and <= last, and 480 * whose tag is 'tag' will be dequeued. If 'purging', matching events 481 * which are marked as unpurgable will not be dequeued. 482 * 483 * sender == NULL means "any sender", and tag == NULL means "any tag". 484 */ 485 486 LOCK(&task->lock); 487 488 for (event = HEAD(task->events); event != NULL; event = next_event) { 489 next_event = NEXT(event, ev_link); 490 if (event->ev_type >= first && event->ev_type <= last && 491 (sender == NULL || event->ev_sender == sender) && 492 (tag == NULL || event->ev_tag == tag) && 493 (!purging || PURGE_OK(event))) { 494 DEQUEUE(task->events, event, ev_link); 495 ENQUEUE(*events, event, ev_link); 496 count++; 497 } 498 } 499 500 UNLOCK(&task->lock); 501 502 return (count); 503} 504 505unsigned int 506isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first, 507 isc_eventtype_t last, void *tag) 508{ 509 unsigned int count; 510 isc_eventlist_t events; 511 isc_event_t *event, *next_event; 512 513 /* 514 * Purge events from a task's event queue. 515 */ 516 517 XTRACE("isc_task_purgerange"); 518 519 ISC_LIST_INIT(events); 520 521 count = dequeue_events(task, sender, first, last, tag, &events, 522 ISC_TRUE); 523 524 for (event = HEAD(events); event != NULL; event = next_event) { 525 next_event = NEXT(event, ev_link); 526 isc_event_free(&event); 527 } 528 529 /* 530 * Note that purging never changes the state of the task. 531 */ 532 533 return (count); 534} 535 536unsigned int 537isc_task_purge(isc_task_t *task, void *sender, isc_eventtype_t type, 538 void *tag) 539{ 540 /* 541 * Purge events from a task's event queue. 542 */ 543 544 XTRACE("isc_task_purge"); 545 546 return (isc_task_purgerange(task, sender, type, type, tag)); 547} 548 549isc_boolean_t 550isc_task_purgeevent(isc_task_t *task, isc_event_t *event) { 551 isc_event_t *curr_event, *next_event; 552 553 /* 554 * Purge 'event' from a task's event queue. 555 * 556 * XXXRTH: WARNING: This method may be removed before beta. 557 */ 558 559 REQUIRE(VALID_TASK(task)); 560 561 /* 562 * If 'event' is on the task's event queue, it will be purged, 563 * unless it is marked as unpurgeable. 'event' does not have to be 564 * on the task's event queue; in fact, it can even be an invalid 565 * pointer. Purging only occurs if the event is actually on the task's 566 * event queue. 567 * 568 * Purging never changes the state of the task. 569 */ 570 571 LOCK(&task->lock); 572 for (curr_event = HEAD(task->events); 573 curr_event != NULL; 574 curr_event = next_event) { 575 next_event = NEXT(curr_event, ev_link); 576 if (curr_event == event && PURGE_OK(event)) { 577 DEQUEUE(task->events, curr_event, ev_link); 578 break; 579 } 580 } 581 UNLOCK(&task->lock); 582 583 if (curr_event == NULL) 584 return (ISC_FALSE); 585 586 isc_event_free(&curr_event); 587 588 return (ISC_TRUE); 589} 590 591unsigned int 592isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first, 593 isc_eventtype_t last, void *tag, 594 isc_eventlist_t *events) 595{ 596 /* 597 * Remove events from a task's event queue. 598 */ 599 600 XTRACE("isc_task_unsendrange"); 601 602 return (dequeue_events(task, sender, first, last, tag, events, 603 ISC_FALSE)); 604} 605 606unsigned int 607isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, 608 void *tag, isc_eventlist_t *events) 609{ 610 /* 611 * Remove events from a task's event queue. 612 */ 613 614 XTRACE("isc_task_unsend"); 615 616 return (dequeue_events(task, sender, type, type, tag, events, 617 ISC_FALSE)); 618} 619 620isc_result_t 621isc_task_onshutdown(isc_task_t *task, isc_taskaction_t action, const void *arg) 622{ 623 isc_boolean_t disallowed = ISC_FALSE; 624 isc_result_t result = ISC_R_SUCCESS; 625 isc_event_t *event; 626 627 /* 628 * Send a shutdown event with action 'action' and argument 'arg' when 629 * 'task' is shutdown. 630 */ 631 632 REQUIRE(VALID_TASK(task)); 633 REQUIRE(action != NULL); 634 635 event = isc_event_allocate(task->manager->mctx, 636 NULL, 637 ISC_TASKEVENT_SHUTDOWN, 638 action, 639 arg, 640 sizeof(*event)); 641 if (event == NULL) 642 return (ISC_R_NOMEMORY); 643 644 LOCK(&task->lock); 645 if (TASK_SHUTTINGDOWN(task)) { 646 disallowed = ISC_TRUE; 647 result = ISC_R_SHUTTINGDOWN; 648 } else 649 ENQUEUE(task->on_shutdown, event, ev_link); 650 UNLOCK(&task->lock); 651 652 if (disallowed) 653 isc_mem_put(task->manager->mctx, event, sizeof(*event)); 654 655 return (result); 656} 657 658void 659isc_task_shutdown(isc_task_t *task) { 660 isc_boolean_t was_idle; 661 662 /* 663 * Shutdown 'task'. 664 */ 665 666 REQUIRE(VALID_TASK(task)); 667 668 LOCK(&task->lock); 669 was_idle = task_shutdown(task); 670 UNLOCK(&task->lock); 671 672 if (was_idle) 673 task_ready(task); 674} 675 676void 677isc_task_destroy(isc_task_t **taskp) { 678 679 /* 680 * Destroy '*taskp'. 681 */ 682 683 REQUIRE(taskp != NULL); 684 685 isc_task_shutdown(*taskp); 686 isc_task_detach(taskp); 687} 688 689void 690isc_task_setname(isc_task_t *task, const char *name, void *tag) { 691 692 /* 693 * Name 'task'. 694 */ 695 696 REQUIRE(VALID_TASK(task)); 697 698 LOCK(&task->lock); 699 memset(task->name, 0, sizeof(task->name)); 700 strncpy(task->name, name, sizeof(task->name) - 1); 701 task->tag = tag; 702 UNLOCK(&task->lock); 703} 704 705const char * 706isc_task_getname(isc_task_t *task) { 707 return (task->name); 708} 709 710void * 711isc_task_gettag(isc_task_t *task) { 712 return (task->tag); 713} 714 715void 716isc_task_getcurrenttime(isc_task_t *task, isc_stdtime_t *t) { 717 REQUIRE(VALID_TASK(task)); 718 REQUIRE(t != NULL); 719 720 LOCK(&task->lock); 721 722 *t = task->now; 723 724 UNLOCK(&task->lock); 725} 726 727/*** 728 *** Task Manager. 729 ***/ 730static void 731dispatch(isc_taskmgr_t *manager) { 732 isc_task_t *task; 733#ifndef ISC_PLATFORM_USETHREADS 734 unsigned int total_dispatch_count = 0; 735 isc_tasklist_t ready_tasks; 736#endif /* ISC_PLATFORM_USETHREADS */ 737 738 REQUIRE(VALID_MANAGER(manager)); 739 740 /* 741 * Again we're trying to hold the lock for as short a time as possible 742 * and to do as little locking and unlocking as possible. 743 * 744 * In both while loops, the appropriate lock must be held before the 745 * while body starts. Code which acquired the lock at the top of 746 * the loop would be more readable, but would result in a lot of 747 * extra locking. Compare: 748 * 749 * Straightforward: 750 * 751 * LOCK(); 752 * ... 753 * UNLOCK(); 754 * while (expression) { 755 * LOCK(); 756 * ... 757 * UNLOCK(); 758 * 759 * Unlocked part here... 760 * 761 * LOCK(); 762 * ... 763 * UNLOCK(); 764 * } 765 * 766 * Note how if the loop continues we unlock and then immediately lock. 767 * For N iterations of the loop, this code does 2N+1 locks and 2N+1 768 * unlocks. Also note that the lock is not held when the while 769 * condition is tested, which may or may not be important, depending 770 * on the expression. 771 * 772 * As written: 773 * 774 * LOCK(); 775 * while (expression) { 776 * ... 777 * UNLOCK(); 778 * 779 * Unlocked part here... 780 * 781 * LOCK(); 782 * ... 783 * } 784 * UNLOCK(); 785 * 786 * For N iterations of the loop, this code does N+1 locks and N+1 787 * unlocks. The while expression is always protected by the lock. 788 */ 789 790#ifndef ISC_PLATFORM_USETHREADS 791 ISC_LIST_INIT(ready_tasks); 792#endif 793 LOCK(&manager->lock); 794 while (!FINISHED(manager)) { 795#ifdef ISC_PLATFORM_USETHREADS 796 /* 797 * For reasons similar to those given in the comment in 798 * isc_task_send() above, it is safe for us to dequeue 799 * the task while only holding the manager lock, and then 800 * change the task to running state while only holding the 801 * task lock. 802 */ 803 while ((EMPTY(manager->ready_tasks) || 804 manager->exclusive_requested) && 805 !FINISHED(manager)) 806 { 807 XTHREADTRACE(isc_msgcat_get(isc_msgcat, 808 ISC_MSGSET_GENERAL, 809 ISC_MSG_WAIT, "wait")); 810 WAIT(&manager->work_available, &manager->lock); 811 XTHREADTRACE(isc_msgcat_get(isc_msgcat, 812 ISC_MSGSET_TASK, 813 ISC_MSG_AWAKE, "awake")); 814 } 815#else /* ISC_PLATFORM_USETHREADS */ 816 if (total_dispatch_count >= DEFAULT_TASKMGR_QUANTUM || 817 EMPTY(manager->ready_tasks)) 818 break; 819#endif /* ISC_PLATFORM_USETHREADS */ 820 XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, 821 ISC_MSG_WORKING, "working")); 822 823 task = HEAD(manager->ready_tasks); 824 if (task != NULL) { 825 unsigned int dispatch_count = 0; 826 isc_boolean_t done = ISC_FALSE; 827 isc_boolean_t requeue = ISC_FALSE; 828 isc_boolean_t finished = ISC_FALSE; 829 isc_event_t *event; 830 831 INSIST(VALID_TASK(task)); 832 833 /* 834 * Note we only unlock the manager lock if we actually 835 * have a task to do. We must reacquire the manager 836 * lock before exiting the 'if (task != NULL)' block. 837 */ 838 DEQUEUE(manager->ready_tasks, task, ready_link); 839 manager->tasks_running++; 840 UNLOCK(&manager->lock); 841 842 LOCK(&task->lock); 843 INSIST(task->state == task_state_ready); 844 task->state = task_state_running; 845 XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 846 ISC_MSG_RUNNING, "running")); 847 isc_stdtime_get(&task->now); 848 do { 849 if (!EMPTY(task->events)) { 850 event = HEAD(task->events); 851 DEQUEUE(task->events, event, ev_link); 852 853 /* 854 * Execute the event action. 855 */ 856 XTRACE(isc_msgcat_get(isc_msgcat, 857 ISC_MSGSET_TASK, 858 ISC_MSG_EXECUTE, 859 "execute action")); 860 if (event->ev_action != NULL) { 861 UNLOCK(&task->lock); 862 (event->ev_action)(task,event); 863 LOCK(&task->lock); 864 } 865 dispatch_count++; 866#ifndef ISC_PLATFORM_USETHREADS 867 total_dispatch_count++; 868#endif /* ISC_PLATFORM_USETHREADS */ 869 } 870 871 if (task->references == 0 && 872 EMPTY(task->events) && 873 !TASK_SHUTTINGDOWN(task)) { 874 isc_boolean_t was_idle; 875 876 /* 877 * There are no references and no 878 * pending events for this task, 879 * which means it will not become 880 * runnable again via an external 881 * action (such as sending an event 882 * or detaching). 883 * 884 * We initiate shutdown to prevent 885 * it from becoming a zombie. 886 * 887 * We do this here instead of in 888 * the "if EMPTY(task->events)" block 889 * below because: 890 * 891 * If we post no shutdown events, 892 * we want the task to finish. 893 * 894 * If we did post shutdown events, 895 * will still want the task's 896 * quantum to be applied. 897 */ 898 was_idle = task_shutdown(task); 899 INSIST(!was_idle); 900 } 901 902 if (EMPTY(task->events)) { 903 /* 904 * Nothing else to do for this task 905 * right now. 906 */ 907 XTRACE(isc_msgcat_get(isc_msgcat, 908 ISC_MSGSET_TASK, 909 ISC_MSG_EMPTY, 910 "empty")); 911 if (task->references == 0 && 912 TASK_SHUTTINGDOWN(task)) { 913 /* 914 * The task is done. 915 */ 916 XTRACE(isc_msgcat_get( 917 isc_msgcat, 918 ISC_MSGSET_TASK, 919 ISC_MSG_DONE, 920 "done")); 921 finished = ISC_TRUE; 922 task->state = task_state_done; 923 } else 924 task->state = task_state_idle; 925 done = ISC_TRUE; 926 } else if (dispatch_count >= task->quantum) { 927 /* 928 * Our quantum has expired, but 929 * there is more work to be done. 930 * We'll requeue it to the ready 931 * queue later. 932 * 933 * We don't check quantum until 934 * dispatching at least one event, 935 * so the minimum quantum is one. 936 */ 937 XTRACE(isc_msgcat_get(isc_msgcat, 938 ISC_MSGSET_TASK, 939 ISC_MSG_QUANTUM, 940 "quantum")); 941 task->state = task_state_ready; 942 requeue = ISC_TRUE; 943 done = ISC_TRUE; 944 } 945 } while (!done); 946 UNLOCK(&task->lock); 947 948 if (finished) 949 task_finished(task); 950 951 LOCK(&manager->lock); 952 manager->tasks_running--; 953#ifdef ISC_PLATFORM_USETHREADS 954 if (manager->exclusive_requested && 955 manager->tasks_running == 1) { 956 SIGNAL(&manager->exclusive_granted); 957 } 958#endif /* ISC_PLATFORM_USETHREADS */ 959 if (requeue) { 960 /* 961 * We know we're awake, so we don't have 962 * to wakeup any sleeping threads if the 963 * ready queue is empty before we requeue. 964 * 965 * A possible optimization if the queue is 966 * empty is to 'goto' the 'if (task != NULL)' 967 * block, avoiding the ENQUEUE of the task 968 * and the subsequent immediate DEQUEUE 969 * (since it is the only executable task). 970 * We don't do this because then we'd be 971 * skipping the exit_requested check. The 972 * cost of ENQUEUE is low anyway, especially 973 * when you consider that we'd have to do 974 * an extra EMPTY check to see if we could 975 * do the optimization. If the ready queue 976 * were usually nonempty, the 'optimization' 977 * might even hurt rather than help. 978 */ 979#ifdef ISC_PLATFORM_USETHREADS 980 ENQUEUE(manager->ready_tasks, task, 981 ready_link); 982#else 983 ENQUEUE(ready_tasks, task, ready_link); 984#endif 985 } 986 } 987 } 988#ifndef ISC_PLATFORM_USETHREADS 989 ISC_LIST_APPENDLIST(manager->ready_tasks, ready_tasks, ready_link); 990#endif 991 UNLOCK(&manager->lock); 992} 993 994#ifdef ISC_PLATFORM_USETHREADS 995static isc_threadresult_t 996#ifdef _WIN32 997WINAPI 998#endif 999run(void *uap) { 1000 isc_taskmgr_t *manager = uap; 1001 1002 XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 1003 ISC_MSG_STARTING, "starting")); 1004 1005 dispatch(manager); 1006 1007 XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 1008 ISC_MSG_EXITING, "exiting")); 1009 1010 return ((isc_threadresult_t)0); 1011} 1012#endif /* ISC_PLATFORM_USETHREADS */ 1013 1014static void 1015manager_free(isc_taskmgr_t *manager) { 1016 isc_mem_t *mctx; 1017 1018#ifdef ISC_PLATFORM_USETHREADS 1019 (void)isc_condition_destroy(&manager->exclusive_granted); 1020 (void)isc_condition_destroy(&manager->work_available); 1021 isc_mem_free(manager->mctx, manager->threads); 1022#endif /* ISC_PLATFORM_USETHREADS */ 1023 DESTROYLOCK(&manager->lock); 1024 manager->magic = 0; 1025 mctx = manager->mctx; 1026 isc_mem_put(mctx, manager, sizeof(*manager)); 1027 isc_mem_detach(&mctx); 1028} 1029 1030isc_result_t 1031isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, 1032 unsigned int default_quantum, isc_taskmgr_t **managerp) 1033{ 1034 isc_result_t result; 1035 unsigned int i, started = 0; 1036 isc_taskmgr_t *manager; 1037 1038 /* 1039 * Create a new task manager. 1040 */ 1041 1042 REQUIRE(workers > 0); 1043 REQUIRE(managerp != NULL && *managerp == NULL); 1044 1045#ifndef ISC_PLATFORM_USETHREADS 1046 UNUSED(i); 1047 UNUSED(started); 1048 UNUSED(workers); 1049 1050 if (taskmgr != NULL) { 1051 taskmgr->refs++; 1052 *managerp = taskmgr; 1053 return (ISC_R_SUCCESS); 1054 } 1055#endif /* ISC_PLATFORM_USETHREADS */ 1056 1057 manager = isc_mem_get(mctx, sizeof(*manager)); 1058 if (manager == NULL) 1059 return (ISC_R_NOMEMORY); 1060 manager->magic = TASK_MANAGER_MAGIC; 1061 manager->mctx = NULL; 1062 result = isc_mutex_init(&manager->lock); 1063 if (result != ISC_R_SUCCESS) 1064 goto cleanup_mgr; 1065 1066#ifdef ISC_PLATFORM_USETHREADS 1067 manager->workers = 0; 1068 manager->threads = isc_mem_allocate(mctx, 1069 workers * sizeof(isc_thread_t)); 1070 if (manager->threads == NULL) { 1071 result = ISC_R_NOMEMORY; 1072 goto cleanup_lock; 1073 } 1074 if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) { 1075 UNEXPECTED_ERROR(__FILE__, __LINE__, 1076 "isc_condition_init() %s", 1077 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 1078 ISC_MSG_FAILED, "failed")); 1079 result = ISC_R_UNEXPECTED; 1080 goto cleanup_threads; 1081 } 1082 if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) { 1083 UNEXPECTED_ERROR(__FILE__, __LINE__, 1084 "isc_condition_init() %s", 1085 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, 1086 ISC_MSG_FAILED, "failed")); 1087 result = ISC_R_UNEXPECTED; 1088 goto cleanup_workavailable; 1089 } 1090#endif /* ISC_PLATFORM_USETHREADS */ 1091 if (default_quantum == 0) 1092 default_quantum = DEFAULT_DEFAULT_QUANTUM; 1093 manager->default_quantum = default_quantum; 1094 INIT_LIST(manager->tasks); 1095 INIT_LIST(manager->ready_tasks); 1096 manager->tasks_running = 0; 1097 manager->exclusive_requested = ISC_FALSE; 1098 manager->exiting = ISC_FALSE; 1099 1100 isc_mem_attach(mctx, &manager->mctx); 1101 1102#ifdef ISC_PLATFORM_USETHREADS 1103 LOCK(&manager->lock); 1104 /* 1105 * Start workers. 1106 */ 1107 for (i = 0; i < workers; i++) { 1108 if (isc_thread_create(run, manager, 1109 &manager->threads[manager->workers]) == 1110 ISC_R_SUCCESS) { 1111 manager->workers++; 1112 started++; 1113 } 1114 } 1115 UNLOCK(&manager->lock); 1116 1117 if (started == 0) { 1118 manager_free(manager); 1119 return (ISC_R_NOTHREADS); 1120 } 1121 isc_thread_setconcurrency(workers); 1122#else /* ISC_PLATFORM_USETHREADS */ 1123 manager->refs = 1; 1124 taskmgr = manager; 1125#endif /* ISC_PLATFORM_USETHREADS */ 1126 1127 *managerp = manager; 1128 1129 return (ISC_R_SUCCESS); 1130 1131#ifdef ISC_PLATFORM_USETHREADS 1132 cleanup_workavailable: 1133 (void)isc_condition_destroy(&manager->work_available); 1134 cleanup_threads: 1135 isc_mem_free(mctx, manager->threads); 1136 cleanup_lock: 1137 DESTROYLOCK(&manager->lock); 1138#endif 1139 cleanup_mgr: 1140 isc_mem_put(mctx, manager, sizeof(*manager)); 1141 return (result); 1142} 1143 1144void 1145isc_taskmgr_destroy(isc_taskmgr_t **managerp) { 1146 isc_taskmgr_t *manager; 1147 isc_task_t *task; 1148 unsigned int i; 1149 1150 /* 1151 * Destroy '*managerp'. 1152 */ 1153 1154 REQUIRE(managerp != NULL); 1155 manager = *managerp; 1156 REQUIRE(VALID_MANAGER(manager)); 1157 1158#ifndef ISC_PLATFORM_USETHREADS 1159 UNUSED(i); 1160 1161 if (manager->refs > 1) { 1162 manager->refs--; 1163 *managerp = NULL; 1164 return; 1165 } 1166#endif /* ISC_PLATFORM_USETHREADS */ 1167 1168 XTHREADTRACE("isc_taskmgr_destroy"); 1169 /* 1170 * Only one non-worker thread may ever call this routine. 1171 * If a worker thread wants to initiate shutdown of the 1172 * task manager, it should ask some non-worker thread to call 1173 * isc_taskmgr_destroy(), e.g. by signalling a condition variable 1174 * that the startup thread is sleeping on. 1175 */ 1176 1177 /* 1178 * Unlike elsewhere, we're going to hold this lock a long time. 1179 * We need to do so, because otherwise the list of tasks could 1180 * change while we were traversing it. 1181 * 1182 * This is also the only function where we will hold both the 1183 * task manager lock and a task lock at the same time. 1184 */ 1185 1186 LOCK(&manager->lock); 1187 1188 /* 1189 * Make sure we only get called once. 1190 */ 1191 INSIST(!manager->exiting); 1192 manager->exiting = ISC_TRUE; 1193 1194 /* 1195 * Post shutdown event(s) to every task (if they haven't already been 1196 * posted). 1197 */ 1198 for (task = HEAD(manager->tasks); 1199 task != NULL; 1200 task = NEXT(task, link)) { 1201 LOCK(&task->lock); 1202 if (task_shutdown(task)) 1203 ENQUEUE(manager->ready_tasks, task, ready_link); 1204 UNLOCK(&task->lock); 1205 } 1206#ifdef ISC_PLATFORM_USETHREADS 1207 /* 1208 * Wake up any sleeping workers. This ensures we get work done if 1209 * there's work left to do, and if there are already no tasks left 1210 * it will cause the workers to see manager->exiting. 1211 */ 1212 BROADCAST(&manager->work_available); 1213 UNLOCK(&manager->lock); 1214 1215 /* 1216 * Wait for all the worker threads to exit. 1217 */ 1218 for (i = 0; i < manager->workers; i++) 1219 (void)isc_thread_join(manager->threads[i], NULL); 1220#else /* ISC_PLATFORM_USETHREADS */ 1221 /* 1222 * Dispatch the shutdown events. 1223 */ 1224 UNLOCK(&manager->lock); 1225 while (isc__taskmgr_ready()) 1226 (void)isc__taskmgr_dispatch(); 1227 if (!ISC_LIST_EMPTY(manager->tasks)) 1228 isc_mem_printallactive(stderr); 1229 INSIST(ISC_LIST_EMPTY(manager->tasks)); 1230#endif /* ISC_PLATFORM_USETHREADS */ 1231 1232 manager_free(manager); 1233 1234 *managerp = NULL; 1235} 1236 1237#ifndef ISC_PLATFORM_USETHREADS 1238isc_boolean_t 1239isc__taskmgr_ready(void) { 1240 if (taskmgr == NULL) 1241 return (ISC_FALSE); 1242 return (ISC_TF(!ISC_LIST_EMPTY(taskmgr->ready_tasks))); 1243} 1244 1245isc_result_t 1246isc__taskmgr_dispatch(void) { 1247 isc_taskmgr_t *manager = taskmgr; 1248 1249 if (taskmgr == NULL) 1250 return (ISC_R_NOTFOUND); 1251 1252 dispatch(manager); 1253 1254 return (ISC_R_SUCCESS); 1255} 1256 1257#endif /* ISC_PLATFORM_USETHREADS */ 1258 1259isc_result_t 1260isc_task_beginexclusive(isc_task_t *task) { 1261#ifdef ISC_PLATFORM_USETHREADS 1262 isc_taskmgr_t *manager = task->manager; 1263 REQUIRE(task->state == task_state_running); 1264 LOCK(&manager->lock); 1265 if (manager->exclusive_requested) { 1266 UNLOCK(&manager->lock); 1267 return (ISC_R_LOCKBUSY); 1268 } 1269 manager->exclusive_requested = ISC_TRUE; 1270 while (manager->tasks_running > 1) { 1271 WAIT(&manager->exclusive_granted, &manager->lock); 1272 } 1273 UNLOCK(&manager->lock); 1274#else 1275 UNUSED(task); 1276#endif 1277 return (ISC_R_SUCCESS); 1278} 1279 1280void 1281isc_task_endexclusive(isc_task_t *task) { 1282#ifdef ISC_PLATFORM_USETHREADS 1283 isc_taskmgr_t *manager = task->manager; 1284 REQUIRE(task->state == task_state_running); 1285 LOCK(&manager->lock); 1286 REQUIRE(manager->exclusive_requested); 1287 manager->exclusive_requested = ISC_FALSE; 1288 BROADCAST(&manager->work_available); 1289 UNLOCK(&manager->lock); 1290#else 1291 UNUSED(task); 1292#endif 1293} 1294 1295#ifdef HAVE_LIBXML2 1296 1297void 1298isc_taskmgr_renderxml(isc_taskmgr_t *mgr, xmlTextWriterPtr writer) 1299{ 1300 isc_task_t *task; 1301 1302 LOCK(&mgr->lock); 1303 1304 /* 1305 * Write out the thread-model, and some details about each depending 1306 * on which type is enabled. 1307 */ 1308 xmlTextWriterStartElement(writer, ISC_XMLCHAR "thread-model"); 1309#ifdef ISC_PLATFORM_USETHREADS 1310 xmlTextWriterStartElement(writer, ISC_XMLCHAR "type"); 1311 xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded"); 1312 xmlTextWriterEndElement(writer); /* type */ 1313 1314 xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads"); 1315 xmlTextWriterWriteFormatString(writer, "%d", mgr->workers); 1316 xmlTextWriterEndElement(writer); /* worker-threads */ 1317#else /* ISC_PLATFORM_USETHREADS */ 1318 xmlTextWriterStartElement(writer, ISC_XMLCHAR "type"); 1319 xmlTextWriterWriteString(writer, ISC_XMLCHAR "non-threaded"); 1320 xmlTextWriterEndElement(writer); /* type */ 1321 1322 xmlTextWriterStartElement(writer, ISC_XMLCHAR "references"); 1323 xmlTextWriterWriteFormatString(writer, "%d", mgr->refs); 1324 xmlTextWriterEndElement(writer); /* references */ 1325#endif /* ISC_PLATFORM_USETHREADS */ 1326 1327 xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"); 1328 xmlTextWriterWriteFormatString(writer, "%d", mgr->default_quantum); 1329 xmlTextWriterEndElement(writer); /* default-quantum */ 1330 1331 xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"); 1332 xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running); 1333 xmlTextWriterEndElement(writer); /* tasks-running */ 1334 1335 xmlTextWriterEndElement(writer); /* thread-model */ 1336 1337 xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks"); 1338 task = ISC_LIST_HEAD(mgr->tasks); 1339 while (task != NULL) { 1340 LOCK(&task->lock); 1341 xmlTextWriterStartElement(writer, ISC_XMLCHAR "task"); 1342 1343 if (task->name[0] != 0) { 1344 xmlTextWriterStartElement(writer, ISC_XMLCHAR "name"); 1345 xmlTextWriterWriteFormatString(writer, "%s", 1346 task->name); 1347 xmlTextWriterEndElement(writer); /* name */ 1348 } 1349 1350 xmlTextWriterStartElement(writer, ISC_XMLCHAR "references"); 1351 xmlTextWriterWriteFormatString(writer, "%d", task->references); 1352 xmlTextWriterEndElement(writer); /* references */ 1353 1354 xmlTextWriterStartElement(writer, ISC_XMLCHAR "id"); 1355 xmlTextWriterWriteFormatString(writer, "%p", task); 1356 xmlTextWriterEndElement(writer); /* id */ 1357 1358 xmlTextWriterStartElement(writer, ISC_XMLCHAR "state"); 1359 xmlTextWriterWriteFormatString(writer, "%s", 1360 statenames[task->state]); 1361 xmlTextWriterEndElement(writer); /* state */ 1362 1363 xmlTextWriterStartElement(writer, ISC_XMLCHAR "quantum"); 1364 xmlTextWriterWriteFormatString(writer, "%d", task->quantum); 1365 xmlTextWriterEndElement(writer); /* quantum */ 1366 1367 xmlTextWriterEndElement(writer); 1368 1369 UNLOCK(&task->lock); 1370 task = ISC_LIST_NEXT(task, link); 1371 } 1372 xmlTextWriterEndElement(writer); /* tasks */ 1373 1374 UNLOCK(&mgr->lock); 1375} 1376#endif /* HAVE_LIBXML2 */ 1377