1/* $NetBSD$ */ 2 3/*++ 4/* NAME 5/* qmgr_job 3 6/* SUMMARY 7/* per-transport jobs 8/* SYNOPSIS 9/* #include "qmgr.h" 10/* 11/* QMGR_JOB *qmgr_job_obtain(message, transport) 12/* QMGR_MESSAGE *message; 13/* QMGR_TRANSPORT *transport; 14/* 15/* void qmgr_job_free(job) 16/* QMGR_JOB *job; 17/* 18/* void qmgr_job_move_limits(job) 19/* QMGR_JOB *job; 20/* 21/* QMGR_ENTRY *qmgr_job_entry_select(transport) 22/* QMGR_TRANSPORT *transport; 23/* 24/* void qmgr_job_blocker_update(queue) 25/* QMGR_QUEUE *queue; 26/* DESCRIPTION 27/* These routines add/delete/manipulate per-transport jobs. 28/* Each job corresponds to a specific transport and message. 29/* Each job has a peer list containing all pending delivery 30/* requests for that message. 31/* 32/* qmgr_job_obtain() finds an existing job for named message and 33/* transport combination. New empty job is created if no existing can 34/* be found. In either case, the job is prepared for assignment of 35/* (more) message recipients. 36/* 37/* qmgr_job_free() disposes of a per-transport job after all 38/* its entries have been taken care of. It is an error to dispose 39/* of a job that is still in use. 40/* 41/* qmgr_job_entry_select() attempts to find the next entry suitable 42/* for delivery. The job preempting algorithm is also exercised. 43/* If necessary, an attempt to read more recipients into core is made. 44/* This can result in creation of more job, queue and entry structures. 45/* 46/* qmgr_job_blocker_update() updates the status of blocked 47/* jobs after a decrease in the queue's concurrency level, 48/* after the queue is throttled, or after the queue is resumed 49/* from suspension. 50/* 51/* qmgr_job_move_limits() takes care of proper distribution of the 52/* per-transport recipients limit among the per-transport jobs. 53/* Should be called whenever a job's recipient slot becomes available. 54/* DIAGNOSTICS 55/* Panic: consistency check failure. 56/* LICENSE 57/* .ad 58/* .fi 59/* The Secure Mailer license must be distributed with this software. 60/* AUTHOR(S) 61/* Patrik Rak 62/* patrik@raxoft.cz 63/*--*/ 64 65/* System library. */ 66 67#include <sys_defs.h> 68 69/* Utility library. */ 70 71#include <msg.h> 72#include <htable.h> 73#include <mymalloc.h> 74#include <sane_time.h> 75 76/* Application-specific. */ 77 78#include "qmgr.h" 79 80/* Forward declarations */ 81 82static void qmgr_job_pop(QMGR_JOB *); 83 84/* Helper macros */ 85 86#define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries) 87 88/* 89 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread 90 * because we don't know if all those unread recipients go to our transport yet. 91 */ 92 93#define MIN_ENTRIES(job) ((job)->read_entries) 94#define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread) 95 96#define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0) 97 98#define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag) 99 100/* qmgr_job_create - create and initialize message job structure */ 101 102static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 103{ 104 QMGR_JOB *job; 105 106 job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB)); 107 job->message = message; 108 QMGR_LIST_APPEND(message->job_list, job, message_peers); 109 htable_enter(transport->job_byname, message->queue_id, (char *) job); 110 job->transport = transport; 111 QMGR_LIST_INIT(job->transport_peers); 112 QMGR_LIST_INIT(job->time_peers); 113 job->stack_parent = 0; 114 QMGR_LIST_INIT(job->stack_children); 115 QMGR_LIST_INIT(job->stack_siblings); 116 job->stack_level = -1; 117 job->blocker_tag = 0; 118 job->peer_byname = htable_create(0); 119 QMGR_LIST_INIT(job->peer_list); 120 job->slots_used = 0; 121 job->slots_available = 0; 122 job->selected_entries = 0; 123 job->read_entries = 0; 124 job->rcpt_count = 0; 125 job->rcpt_limit = 0; 126 return (job); 127} 128 129/* qmgr_job_link - append the job to the job lists based on the time it was queued */ 130 131static void qmgr_job_link(QMGR_JOB *job) 132{ 133 QMGR_TRANSPORT *transport = job->transport; 134 QMGR_MESSAGE *message = job->message; 135 QMGR_JOB *prev, 136 *next, 137 *list_prev, 138 *list_next, 139 *unread, 140 *current; 141 int delay; 142 143 /* 144 * Sanity checks. 145 */ 146 if (job->stack_level >= 0) 147 msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level); 148 149 /* 150 * Traverse the time list and the scheduler list from the end and stop 151 * when we found job older than the one being linked. 152 * 153 * During the traversals keep track if we have come across either the 154 * current job or the first unread job on the job list. If this is the 155 * case, these pointers will be adjusted below as required. 156 * 157 * Although both lists are exactly the same when only jobs on the stack 158 * level zero are considered, it's easier to traverse them separately. 159 * Otherwise it's impossible to keep track of the current job pointer 160 * effectively. 161 * 162 * This may look inefficient but under normal operation it is expected that 163 * the loops will stop right away, resulting in normal list appends 164 * below. However, this code is necessary for reviving retired jobs and 165 * for jobs which are created long after the first chunk of recipients 166 * was read in-core (either of these can happen only for multi-transport 167 * messages). 168 */ 169 current = transport->job_current; 170 for (next = 0, prev = transport->job_list.prev; prev; 171 next = prev, prev = prev->transport_peers.prev) { 172 if (prev->stack_parent == 0) { 173 delay = message->queued_time - prev->message->queued_time; 174 if (delay >= 0) 175 break; 176 } 177 if (current == prev) 178 current = 0; 179 } 180 list_prev = prev; 181 list_next = next; 182 183 unread = transport->job_next_unread; 184 for (next = 0, prev = transport->job_bytime.prev; prev; 185 next = prev, prev = prev->time_peers.prev) { 186 delay = message->queued_time - prev->message->queued_time; 187 if (delay >= 0) 188 break; 189 if (unread == prev) 190 unread = 0; 191 } 192 193 /* 194 * Link the job into the proper place on the job lists and mark it so we 195 * know it has been linked. 196 */ 197 job->stack_level = 0; 198 QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers); 199 QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers); 200 201 /* 202 * Update the current job pointer if necessary. 203 */ 204 if (current == 0) 205 transport->job_current = job; 206 207 /* 208 * Update the pointer to the first unread job on the job list and steal 209 * the unused recipient slots from the old one. 210 */ 211 if (unread == 0) { 212 unread = transport->job_next_unread; 213 transport->job_next_unread = job; 214 if (unread != 0) 215 qmgr_job_move_limits(unread); 216 } 217 218 /* 219 * Get as much recipient slots as possible. The excess will be returned 220 * to the transport pool as soon as the exact amount required is known 221 * (which is usually after all recipients have been read in core). 222 */ 223 if (transport->rcpt_unused > 0) { 224 job->rcpt_limit += transport->rcpt_unused; 225 message->rcpt_limit += transport->rcpt_unused; 226 transport->rcpt_unused = 0; 227 } 228} 229 230/* qmgr_job_find - lookup job associated with named message and transport */ 231 232static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 233{ 234 235 /* 236 * Instead of traversing the message job list, we use single per 237 * transport hash table. This is better (at least with respect to memory 238 * usage) than having single hash table (usually almost empty) for each 239 * message. 240 */ 241 return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id)); 242} 243 244/* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */ 245 246QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 247{ 248 QMGR_JOB *job; 249 250 /* 251 * Try finding an existing job, reviving it if it was already retired. 252 * Create a new job for this transport/message combination otherwise. In 253 * either case, the job ends linked on the job lists. 254 */ 255 if ((job = qmgr_job_find(message, transport)) == 0) 256 job = qmgr_job_create(message, transport); 257 if (job->stack_level < 0) 258 qmgr_job_link(job); 259 260 /* 261 * Reset the candidate cache because of the new expected recipients. Make 262 * sure the job is not marked as a blocker for the same reason. Note that 263 * this can result in having a non-blocker followed by more blockers. 264 * Consequently, we can't just update the current job pointer, we have to 265 * reset it. Fortunately qmgr_job_entry_select() will easily deal with 266 * this and will lookup the real current job for us. 267 */ 268 RESET_CANDIDATE_CACHE(transport); 269 if (IS_BLOCKER(job, transport)) { 270 job->blocker_tag = 0; 271 transport->job_current = transport->job_list.next; 272 } 273 return (job); 274} 275 276/* qmgr_job_move_limits - move unused recipient slots to the next unread job */ 277 278void qmgr_job_move_limits(QMGR_JOB *job) 279{ 280 QMGR_TRANSPORT *transport = job->transport; 281 QMGR_MESSAGE *message = job->message; 282 QMGR_JOB *next = transport->job_next_unread; 283 int rcpt_unused, 284 msg_rcpt_unused; 285 286 /* 287 * Find next unread job on the job list if necessary. Cache it for later. 288 * This makes the amortized efficiency of this routine O(1) per job. Note 289 * that we use the time list whose ordering doesn't change over time. 290 */ 291 if (job == next) { 292 for (next = next->time_peers.next; next; next = next->time_peers.next) 293 if (next->message->rcpt_offset != 0) 294 break; 295 transport->job_next_unread = next; 296 } 297 298 /* 299 * Calculate the number of available unused slots. 300 */ 301 rcpt_unused = job->rcpt_limit - job->rcpt_count; 302 msg_rcpt_unused = message->rcpt_limit - message->rcpt_count; 303 if (msg_rcpt_unused < rcpt_unused) 304 rcpt_unused = msg_rcpt_unused; 305 306 /* 307 * Transfer the unused recipient slots back to the transport pool and to 308 * the next not-fully-read job. Job's message limits are adjusted 309 * accordingly. Note that the transport pool can be negative if we used 310 * some of the rcpt_per_stack slots. 311 */ 312 if (rcpt_unused > 0) { 313 job->rcpt_limit -= rcpt_unused; 314 message->rcpt_limit -= rcpt_unused; 315 transport->rcpt_unused += rcpt_unused; 316 if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) { 317 next->rcpt_limit += rcpt_unused; 318 next->message->rcpt_limit += rcpt_unused; 319 transport->rcpt_unused = 0; 320 } 321 } 322} 323 324/* qmgr_job_parent_gone - take care of orphaned stack children */ 325 326static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent) 327{ 328 QMGR_JOB *child; 329 330 while ((child = job->stack_children.next) != 0) { 331 QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings); 332 if (parent != 0) 333 QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings); 334 child->stack_parent = parent; 335 } 336} 337 338/* qmgr_job_unlink - unlink the job from the job lists */ 339 340static void qmgr_job_unlink(QMGR_JOB *job) 341{ 342 const char *myname = "qmgr_job_unlink"; 343 QMGR_TRANSPORT *transport = job->transport; 344 345 /* 346 * Sanity checks. 347 */ 348 if (job->stack_level != 0) 349 msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level); 350 if (job->stack_parent != 0) 351 msg_panic("%s: parent present", myname); 352 if (job->stack_siblings.next != 0) 353 msg_panic("%s: siblings present", myname); 354 355 /* 356 * Make sure that children of job on zero stack level are informed that 357 * their parent is gone too. 358 */ 359 qmgr_job_parent_gone(job, 0); 360 361 /* 362 * Update the current job pointer if necessary. 363 */ 364 if (transport->job_current == job) 365 transport->job_current = job->transport_peers.next; 366 367 /* 368 * Invalidate the candidate selection cache if necessary. 369 */ 370 if (job == transport->candidate_cache 371 || job == transport->candidate_cache_current) 372 RESET_CANDIDATE_CACHE(transport); 373 374 /* 375 * Remove the job from the job lists and mark it as unlinked. 376 */ 377 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 378 QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers); 379 job->stack_level = -1; 380} 381 382/* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */ 383 384static void qmgr_job_retire(QMGR_JOB *job) 385{ 386 if (msg_verbose) 387 msg_info("qmgr_job_retire: %s", job->message->queue_id); 388 389 /* 390 * Pop the job from the job stack if necessary. 391 */ 392 if (job->stack_level > 0) 393 qmgr_job_pop(job); 394 395 /* 396 * Make sure this job is not cached as the next unread job for this 397 * transport. The qmgr_entry_done() will make sure that the slots donated 398 * by this job are moved back to the transport pool as soon as possible. 399 */ 400 qmgr_job_move_limits(job); 401 402 /* 403 * Remove the job from the job lists. Note that it remains on the message 404 * job list, though, and that it can be revived by using 405 * qmgr_job_obtain(). Also note that the available slot counter is left 406 * intact. 407 */ 408 qmgr_job_unlink(job); 409} 410 411/* qmgr_job_free - release the job structure */ 412 413void qmgr_job_free(QMGR_JOB *job) 414{ 415 const char *myname = "qmgr_job_free"; 416 QMGR_MESSAGE *message = job->message; 417 QMGR_TRANSPORT *transport = job->transport; 418 419 if (msg_verbose) 420 msg_info("%s: %s %s", myname, message->queue_id, transport->name); 421 422 /* 423 * Sanity checks. 424 */ 425 if (job->rcpt_count) 426 msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count); 427 428 /* 429 * Pop the job from the job stack if necessary. 430 */ 431 if (job->stack_level > 0) 432 qmgr_job_pop(job); 433 434 /* 435 * Return any remaining recipient slots back to the recipient slots pool. 436 */ 437 qmgr_job_move_limits(job); 438 if (job->rcpt_limit) 439 msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit); 440 441 /* 442 * Unlink and discard the structure. Check if the job is still linked on 443 * the job lists or if it was already retired before unlinking it. 444 */ 445 if (job->stack_level >= 0) 446 qmgr_job_unlink(job); 447 QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers); 448 htable_delete(transport->job_byname, message->queue_id, (void (*) (char *)) 0); 449 htable_free(job->peer_byname, (void (*) (char *)) 0); 450 myfree((char *) job); 451} 452 453/* qmgr_job_count_slots - maintain the delivery slot counters */ 454 455static void qmgr_job_count_slots(QMGR_JOB *job) 456{ 457 458 /* 459 * Count the number of delivery slots used during the delivery of the 460 * selected job. Also count the number of delivery slots available for 461 * its preemption. 462 * 463 * Despite its trivial look, this is one of the key parts of the theory 464 * behind this preempting scheduler. 465 */ 466 job->slots_available++; 467 job->slots_used++; 468 469 /* 470 * If the selected job is not the original current job, reset the 471 * candidate cache because the change above have slightly increased the 472 * chance of this job becoming a candidate next time. 473 * 474 * Don't expect that the change of the current jobs this turn will render 475 * the candidate cache invalid the next turn - it can happen that the 476 * next turn the original current job will be selected again and the 477 * cache would be considered valid in such case. 478 */ 479 if (job != job->transport->candidate_cache_current) 480 RESET_CANDIDATE_CACHE(job->transport); 481} 482 483/* qmgr_job_candidate - find best job candidate for preempting given job */ 484 485static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current) 486{ 487 QMGR_TRANSPORT *transport = current->transport; 488 QMGR_JOB *job, 489 *best_job = 0; 490 double score, 491 best_score = 0.0; 492 int max_slots, 493 max_needed_entries, 494 max_total_entries; 495 int delay; 496 time_t now = sane_time(); 497 498 /* 499 * Fetch the result directly from the cache if the cache is still valid. 500 * 501 * Note that we cache negative results too, so the cache must be invalidated 502 * by resetting the cached current job pointer, not the candidate pointer 503 * itself. 504 * 505 * In case the cache is valid and contains no candidate, we can ignore the 506 * time change, as it affects only which candidate is the best, not if 507 * one exists. However, this feature requires that we no longer relax the 508 * cache resetting rules, depending on the automatic cache timeout. 509 */ 510 if (transport->candidate_cache_current == current 511 && (transport->candidate_cache_time == now 512 || transport->candidate_cache == 0)) 513 return (transport->candidate_cache); 514 515 /* 516 * Estimate the minimum amount of delivery slots that can ever be 517 * accumulated for the given job. All jobs that won't fit into these 518 * slots are excluded from the candidate selection. 519 */ 520 max_slots = (MIN_ENTRIES(current) - current->selected_entries 521 + current->slots_available) / transport->slot_cost; 522 523 /* 524 * Select the candidate with best time_since_queued/total_recipients 525 * score. In addition to jobs which don't meet the max_slots limit, skip 526 * also jobs which don't have any selectable entries at the moment. 527 * 528 * Instead of traversing the whole job list we traverse it just from the 529 * current job forward. This has several advantages. First, we skip some 530 * of the blocker jobs and the current job itself right away. But the 531 * really important advantage is that we are sure that we don't consider 532 * any jobs that are already stack children of the current job. Thanks to 533 * this we can easily include all encountered jobs which are leaf 534 * children of some of the preempting stacks as valid candidates. All we 535 * need to do is to make sure we do not include any of the stack parents. 536 * And, because the leaf children are not ordered by the time since 537 * queued, we have to exclude them from the early loop end test. 538 * 539 * However, don't bother searching if we can't find anything suitable 540 * anyway. 541 */ 542 if (max_slots > 0) { 543 for (job = current->transport_peers.next; job; job = job->transport_peers.next) { 544 if (job->stack_children.next != 0 || IS_BLOCKER(job, transport)) 545 continue; 546 max_total_entries = MAX_ENTRIES(job); 547 max_needed_entries = max_total_entries - job->selected_entries; 548 delay = now - job->message->queued_time + 1; 549 if (max_needed_entries > 0 && max_needed_entries <= max_slots) { 550 score = (double) delay / max_total_entries; 551 if (score > best_score) { 552 best_score = score; 553 best_job = job; 554 } 555 } 556 557 /* 558 * Stop early if the best score is as good as it can get. 559 */ 560 if (delay <= best_score && job->stack_level == 0) 561 break; 562 } 563 } 564 565 /* 566 * Cache the result for later use. 567 */ 568 transport->candidate_cache = best_job; 569 transport->candidate_cache_current = current; 570 transport->candidate_cache_time = now; 571 572 return (best_job); 573} 574 575/* qmgr_job_preempt - preempt large message with smaller one */ 576 577static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current) 578{ 579 const char *myname = "qmgr_job_preempt"; 580 QMGR_TRANSPORT *transport = current->transport; 581 QMGR_JOB *job, 582 *prev; 583 int expected_slots; 584 int rcpt_slots; 585 586 /* 587 * Suppress preempting completely if the current job is not big enough to 588 * accumulate even the minimal number of slots required. 589 * 590 * Also, don't look for better job candidate if there are no available slots 591 * yet (the count can get negative due to the slot loans below). 592 */ 593 if (current->slots_available <= 0 594 || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost) 595 return (current); 596 597 /* 598 * Find best candidate for preempting the current job. 599 * 600 * Note that the function also takes care that the candidate fits within the 601 * number of delivery slots which the current job is still able to 602 * accumulate. 603 */ 604 if ((job = qmgr_job_candidate(current)) == 0) 605 return (current); 606 607 /* 608 * Sanity checks. 609 */ 610 if (job == current) 611 msg_panic("%s: attempt to preempt itself", myname); 612 if (job->stack_children.next != 0) 613 msg_panic("%s: already on the job stack (%d)", myname, job->stack_level); 614 if (job->stack_level < 0) 615 msg_panic("%s: not on the job list (%d)", myname, job->stack_level); 616 617 /* 618 * Check if there is enough available delivery slots accumulated to 619 * preempt the current job. 620 * 621 * The slot loaning scheme improves the average message response time. Note 622 * that the loan only allows the preemption happen earlier, though. It 623 * doesn't affect how many slots have to be "paid" - in either case the 624 * full number of slots required has to be accumulated later before the 625 * current job can be preempted again. 626 */ 627 expected_slots = MAX_ENTRIES(job) - job->selected_entries; 628 if (current->slots_available / transport->slot_cost + transport->slot_loan 629 < expected_slots * transport->slot_loan_factor / 100.0) 630 return (current); 631 632 /* 633 * Preempt the current job. 634 * 635 * This involves placing the selected candidate in front of the current job 636 * on the job list and updating the stack parent/child/sibling pointers 637 * appropriately. But first we need to make sure that the candidate is 638 * taken from its previous job stack which it might be top of. 639 */ 640 if (job->stack_level > 0) 641 qmgr_job_pop(job); 642 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 643 prev = current->transport_peers.prev; 644 QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers); 645 job->stack_parent = current; 646 QMGR_LIST_APPEND(current->stack_children, job, stack_siblings); 647 job->stack_level = current->stack_level + 1; 648 649 /* 650 * Update the current job pointer and explicitly reset the candidate 651 * cache. 652 */ 653 transport->job_current = job; 654 RESET_CANDIDATE_CACHE(transport); 655 656 /* 657 * Since the single job can be preempted by several jobs at the same 658 * time, we have to adjust the available slot count now to prevent using 659 * the same slots multiple times. To do that we subtract the number of 660 * slots the preempting job will supposedly use. This number will be 661 * corrected later when that job is popped from the stack to reflect the 662 * number of slots really used. 663 * 664 * As long as we don't need to keep track of how many slots were really 665 * used, we can (ab)use the slots_used counter for counting the 666 * difference between the real and expected amounts instead of the 667 * absolute amount. 668 */ 669 current->slots_available -= expected_slots * transport->slot_cost; 670 job->slots_used = -expected_slots; 671 672 /* 673 * Add part of extra recipient slots reserved for preempting jobs to the 674 * new current job if necessary. 675 * 676 * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such 677 * case. 678 */ 679 if (job->message->rcpt_offset != 0) { 680 rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2; 681 job->rcpt_limit += rcpt_slots; 682 job->message->rcpt_limit += rcpt_slots; 683 transport->rcpt_unused -= rcpt_slots; 684 } 685 if (msg_verbose) 686 msg_info("%s: %s by %s, level %d", myname, current->message->queue_id, 687 job->message->queue_id, job->stack_level); 688 689 return (job); 690} 691 692/* qmgr_job_pop - remove the job from its job preemption stack */ 693 694static void qmgr_job_pop(QMGR_JOB *job) 695{ 696 const char *myname = "qmgr_job_pop"; 697 QMGR_TRANSPORT *transport = job->transport; 698 QMGR_JOB *parent; 699 700 if (msg_verbose) 701 msg_info("%s: %s", myname, job->message->queue_id); 702 703 /* 704 * Sanity checks. 705 */ 706 if (job->stack_level <= 0) 707 msg_panic("%s: not on the job stack (%d)", myname, job->stack_level); 708 709 /* 710 * Adjust the number of delivery slots available to preempt job's parent. 711 * 712 * Note that we intentionally do not adjust slots_used of the parent. Doing 713 * so would decrease the maximum per message inflation factor if the 714 * preemption appeared near the end of parent delivery. 715 * 716 * For the same reason we do not adjust parent's slots_available if the 717 * parent is not the original parent that was preempted by this job 718 * (i.e., the original parent job has already completed). 719 * 720 * This is another key part of the theory behind this preempting scheduler. 721 */ 722 if ((parent = job->stack_parent) != 0 723 && job->stack_level == parent->stack_level + 1) 724 parent->slots_available -= job->slots_used * transport->slot_cost; 725 726 /* 727 * Remove the job from its parent's children list. 728 */ 729 if (parent != 0) { 730 QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings); 731 job->stack_parent = 0; 732 } 733 734 /* 735 * If there is a parent, let it adopt all those orphaned children. 736 * Otherwise at least notify the children that their parent is gone. 737 */ 738 qmgr_job_parent_gone(job, parent); 739 740 /* 741 * Put the job back to stack level zero. 742 */ 743 job->stack_level = 0; 744 745 /* 746 * Explicitly reset the candidate cache. It's not worth trying to skip 747 * this under some complicated conditions - in most cases the popped job 748 * is the current job so we would have to reset it anyway. 749 */ 750 RESET_CANDIDATE_CACHE(transport); 751 752 /* 753 * Here we leave the remaining work involving the proper placement on the 754 * job list to the caller. The most important reason for this is that it 755 * allows us not to look up where exactly to place the job. 756 * 757 * The caller is also made responsible for invalidating the current job 758 * cache if necessary. 759 */ 760#if 0 761 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 762 QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers); 763 764 if (transport->job_current == job) 765 transport->job_current = job->transport_peers.next; 766#endif 767} 768 769/* qmgr_job_peer_select - select next peer suitable for delivery */ 770 771static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job) 772{ 773 QMGR_PEER *peer; 774 QMGR_MESSAGE *message = job->message; 775 776 /* 777 * Try reading in more recipients. We do that as soon as possible 778 * (almost, see below), to make sure there is enough new blood pouring 779 * in. Otherwise single recipient for slow destination might starve the 780 * entire message delivery, leaving lot of fast destination recipients 781 * sitting idle in the queue file. 782 * 783 * Ideally we would like to read in recipients whenever there is a 784 * space, but to prevent excessive I/O, we read them only when enough 785 * time has passed or we can read enough of them at once. 786 * 787 * Note that even if we read the recipients few at a time, the message 788 * loading code tries to put them to existing recipient entries whenever 789 * possible, so the per-destination recipient grouping is not grossly 790 * affected. 791 * 792 * XXX Workaround for logic mismatch. The message->refcount test needs 793 * explanation. If the refcount is zero, it means that qmgr_active_done() 794 * is being completed asynchronously. In such case, we can't read in 795 * more recipients as bad things would happen after qmgr_active_done() 796 * continues processing. Note that this results in the given job being 797 * stalled for some time, but fortunately this particular situation is so 798 * rare that it is not critical. Still we seek for better solution. 799 */ 800 if (message->rcpt_offset != 0 801 && message->refcount > 0 802 && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit 803 || (message->rcpt_limit > message->rcpt_count 804 && sane_time() - message->refill_time >= job->transport->refill_delay))) 805 qmgr_message_realloc(message); 806 807 /* 808 * Get the next suitable peer, if there is any. 809 */ 810 if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0) 811 return (peer); 812 813 /* 814 * There is no suitable peer in-core, so try reading in more recipients if possible. 815 * This is our last chance to get suitable peer before giving up on this job for now. 816 * 817 * XXX For message->refcount, see above. 818 */ 819 if (message->rcpt_offset != 0 820 && message->refcount > 0 821 && message->rcpt_limit > message->rcpt_count) { 822 qmgr_message_realloc(message); 823 if (HAS_ENTRIES(job)) 824 return (qmgr_peer_select(job)); 825 } 826 return (0); 827} 828 829/* qmgr_job_entry_select - select next entry suitable for delivery */ 830 831QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) 832{ 833 QMGR_JOB *job, 834 *next; 835 QMGR_PEER *peer; 836 QMGR_ENTRY *entry; 837 838 /* 839 * Get the current job if there is one. 840 */ 841 if ((job = transport->job_current) == 0) 842 return (0); 843 844 /* 845 * Exercise the preempting algorithm if enabled. 846 * 847 * The slot_cost equal to 1 causes the algorithm to degenerate and is 848 * therefore disabled too. 849 */ 850 if (transport->slot_cost >= 2) 851 job = qmgr_job_preempt(job); 852 853 /* 854 * Select next entry suitable for delivery. In case the current job can't 855 * provide one because of the per-destination concurrency limits, we mark 856 * it as a "blocker" job and continue with the next job on the job list. 857 * 858 * Note that the loop also takes care of getting the "stall" jobs (job with 859 * no entries currently available) out of the way if necessary. Stall 860 * jobs can appear in case of multi-transport messages whose recipients 861 * don't fit in-core at once. Some jobs created by such message may have 862 * only few recipients and would stay on the job list until all other 863 * jobs of that message are delivered, blocking precious recipient slots 864 * available to this transport. Or it can happen that the job has some 865 * more entries but suddenly they all get deferred. Whatever the reason, 866 * we retire such jobs below if we happen to come across some. 867 */ 868 for ( /* empty */ ; job; job = next) { 869 next = job->transport_peers.next; 870 871 /* 872 * Don't bother if the job is known to have no available entries 873 * because of the per-destination concurrency limits. 874 */ 875 if (IS_BLOCKER(job, transport)) 876 continue; 877 878 if ((peer = qmgr_job_peer_select(job)) != 0) { 879 880 /* 881 * We have found a suitable peer. Select one of its entries and 882 * adjust the delivery slot counters. 883 */ 884 entry = qmgr_entry_select(peer); 885 qmgr_job_count_slots(job); 886 887 /* 888 * Remember the current job for the next time so we don't have to 889 * crawl over all those blockers again. They will be reconsidered 890 * when the concurrency limit permits. 891 */ 892 transport->job_current = job; 893 894 /* 895 * In case we selected the very last job entry, remove the job 896 * from the job lists right now. 897 * 898 * This action uses the assumption that once the job entry has been 899 * selected, it can be unselected only before the message ifself 900 * is deferred. Thus the job with all entries selected can't 901 * re-appear with more entries available for selection again 902 * (without reading in more entries from the queue file, which in 903 * turn invokes qmgr_job_obtain() which re-links the job back on 904 * the lists if necessary). 905 * 906 * Note that qmgr_job_move_limits() transfers the recipients slots 907 * correctly even if the job is unlinked from the job list thanks 908 * to the job_next_unread caching. 909 */ 910 if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0) 911 qmgr_job_retire(job); 912 913 /* 914 * Finally. Hand back the fruit of our tedious effort. 915 */ 916 return (entry); 917 } else if (HAS_ENTRIES(job)) { 918 919 /* 920 * The job can't be selected due the concurrency limits. Mark it 921 * together with its queues so we know they are blocking the job 922 * list and they get the appropriate treatment. In particular, 923 * all blockers will be reconsidered when one of the problematic 924 * queues will accept more deliveries. And the job itself will be 925 * reconsidered if it is assigned some more entries. 926 */ 927 job->blocker_tag = transport->blocker_tag; 928 for (peer = job->peer_list.next; peer; peer = peer->peers.next) 929 if (peer->entry_list.next != 0) 930 peer->queue->blocker_tag = transport->blocker_tag; 931 } else { 932 933 /* 934 * The job is "stalled". Retire it until it either gets freed or 935 * gets more entries later. 936 */ 937 qmgr_job_retire(job); 938 } 939 } 940 941 /* 942 * We have not found any entry we could use for delivery. Well, things 943 * must have changed since this transport was selected for asynchronous 944 * allocation. Never mind. Clear the current job pointer and reluctantly 945 * report back that we have failed in our task. 946 */ 947 transport->job_current = 0; 948 return (0); 949} 950 951/* qmgr_job_blocker_update - update "blocked job" status */ 952 953void qmgr_job_blocker_update(QMGR_QUEUE *queue) 954{ 955 QMGR_TRANSPORT *transport = queue->transport; 956 957 /* 958 * If the queue was blocking some of the jobs on the job list, check if 959 * the concurrency limit has lifted. If there are still some pending 960 * deliveries, give it a try and unmark all transport blockers at once. 961 * The qmgr_job_entry_select() will do the rest. In either case make sure 962 * the queue is not marked as a blocker anymore, with extra handling of 963 * queues which were declared dead. 964 * 965 * Note that changing the blocker status also affects the candidate cache. 966 * Most of the cases would be automatically recognized by the current job 967 * change, but we play safe and reset the cache explicitly below. 968 * 969 * Keeping the transport blocker tag odd is an easy way to make sure the tag 970 * never matches jobs that are not explicitly marked as blockers. 971 */ 972 if (queue->blocker_tag == transport->blocker_tag) { 973 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { 974 transport->blocker_tag += 2; 975 transport->job_current = transport->job_list.next; 976 transport->candidate_cache_current = 0; 977 } 978 if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue)) 979 queue->blocker_tag = 0; 980 } 981} 982 983