1/*++ 2/* NAME 3/* qmgr_job 3 4/* SUMMARY 5/* per-transport jobs 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* QMGR_JOB *qmgr_job_obtain(message, transport) 10/* QMGR_MESSAGE *message; 11/* QMGR_TRANSPORT *transport; 12/* 13/* void qmgr_job_free(job) 14/* QMGR_JOB *job; 15/* 16/* void qmgr_job_move_limits(job) 17/* QMGR_JOB *job; 18/* 19/* QMGR_ENTRY *qmgr_job_entry_select(transport) 20/* QMGR_TRANSPORT *transport; 21/* 22/* void qmgr_job_blocker_update(queue) 23/* QMGR_QUEUE *queue; 24/* DESCRIPTION 25/* These routines add/delete/manipulate per-transport jobs. 26/* Each job corresponds to a specific transport and message. 27/* Each job has a peer list containing all pending delivery 28/* requests for that message. 29/* 30/* qmgr_job_obtain() finds an existing job for named message and 31/* transport combination. New empty job is created if no existing can 32/* be found. In either case, the job is prepared for assignment of 33/* (more) message recipients. 34/* 35/* qmgr_job_free() disposes of a per-transport job after all 36/* its entries have been taken care of. It is an error to dispose 37/* of a job that is still in use. 38/* 39/* qmgr_job_entry_select() attempts to find the next entry suitable 40/* for delivery. The job preempting algorithm is also exercised. 41/* If necessary, an attempt to read more recipients into core is made. 42/* This can result in creation of more job, queue and entry structures. 43/* 44/* qmgr_job_blocker_update() updates the status of blocked 45/* jobs after a decrease in the queue's concurrency level, 46/* after the queue is throttled, or after the queue is resumed 47/* from suspension. 48/* 49/* qmgr_job_move_limits() takes care of proper distribution of the 50/* per-transport recipients limit among the per-transport jobs. 51/* Should be called whenever a job's recipient slot becomes available. 52/* DIAGNOSTICS 53/* Panic: consistency check failure. 54/* LICENSE 55/* .ad 56/* .fi 57/* The Secure Mailer license must be distributed with this software. 58/* AUTHOR(S) 59/* Patrik Rak 60/* patrik@raxoft.cz 61/*--*/ 62 63/* System library. */ 64 65#include <sys_defs.h> 66 67/* Utility library. */ 68 69#include <msg.h> 70#include <htable.h> 71#include <mymalloc.h> 72#include <sane_time.h> 73 74/* Application-specific. */ 75 76#include "qmgr.h" 77 78/* Forward declarations */ 79 80static void qmgr_job_pop(QMGR_JOB *); 81 82/* Helper macros */ 83 84#define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries) 85 86/* 87 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread 88 * because we don't know if all those unread recipients go to our transport yet. 89 */ 90 91#define MIN_ENTRIES(job) ((job)->read_entries) 92#define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread) 93 94#define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0) 95 96#define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag) 97 98/* qmgr_job_create - create and initialize message job structure */ 99 100static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 101{ 102 QMGR_JOB *job; 103 104 job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB)); 105 job->message = message; 106 QMGR_LIST_APPEND(message->job_list, job, message_peers); 107 htable_enter(transport->job_byname, message->queue_id, (char *) job); 108 job->transport = transport; 109 QMGR_LIST_INIT(job->transport_peers); 110 QMGR_LIST_INIT(job->time_peers); 111 job->stack_parent = 0; 112 QMGR_LIST_INIT(job->stack_children); 113 QMGR_LIST_INIT(job->stack_siblings); 114 job->stack_level = -1; 115 job->blocker_tag = 0; 116 job->peer_byname = htable_create(0); 117 QMGR_LIST_INIT(job->peer_list); 118 job->slots_used = 0; 119 job->slots_available = 0; 120 job->selected_entries = 0; 121 job->read_entries = 0; 122 job->rcpt_count = 0; 123 job->rcpt_limit = 0; 124 return (job); 125} 126 127/* qmgr_job_link - append the job to the job lists based on the time it was queued */ 128 129static void qmgr_job_link(QMGR_JOB *job) 130{ 131 QMGR_TRANSPORT *transport = job->transport; 132 QMGR_MESSAGE *message = job->message; 133 QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current; 134 int delay; 135 136 /* 137 * Sanity checks. 138 */ 139 if (job->stack_level >= 0) 140 msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level); 141 142 /* 143 * Traverse the time list and the scheduler list from the end and stop 144 * when we found job older than the one being linked. 145 * 146 * During the traversals keep track if we have come across either the 147 * current job or the first unread job on the job list. If this is the 148 * case, these pointers will be adjusted below as required. 149 * 150 * Although both lists are exactly the same when only jobs on the stack 151 * level zero are considered, it's easier to traverse them separately. 152 * Otherwise it's impossible to keep track of the current job pointer 153 * effectively. 154 * 155 * This may look inefficient but under normal operation it is expected that 156 * the loops will stop right away, resulting in normal list appends 157 * below. However, this code is necessary for reviving retired jobs and 158 * for jobs which are created long after the first chunk of recipients 159 * was read in-core (either of these can happen only for multi-transport 160 * messages). 161 * 162 * XXX Note that we test stack_parent rather than stack_level below. This 163 * subtle difference allows us to enqueue the job in correct time order 164 * with respect to orphaned children even after their original parent on 165 * level zero is gone. Consequently, the early loop stop in candidate 166 * selection works reliably, too. These are the reasons why we care to 167 * bother with children adoption at all. 168 */ 169 current = transport->job_current; 170 for (next = 0, prev = transport->job_list.prev; prev; 171 next = prev, prev = prev->transport_peers.prev) { 172 if (prev->stack_parent == 0) { 173 delay = message->queued_time - prev->message->queued_time; 174 if (delay >= 0) 175 break; 176 } 177 if (current == prev) 178 current = 0; 179 } 180 list_prev = prev; 181 list_next = next; 182 183 unread = transport->job_next_unread; 184 for (next = 0, prev = transport->job_bytime.prev; prev; 185 next = prev, prev = prev->time_peers.prev) { 186 delay = message->queued_time - prev->message->queued_time; 187 if (delay >= 0) 188 break; 189 if (unread == prev) 190 unread = 0; 191 } 192 193 /* 194 * Link the job into the proper place on the job lists and mark it so we 195 * know it has been linked. 196 */ 197 job->stack_level = 0; 198 QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers); 199 QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers); 200 201 /* 202 * Update the current job pointer if necessary. 203 */ 204 if (current == 0) 205 transport->job_current = job; 206 207 /* 208 * Update the pointer to the first unread job on the job list and steal 209 * the unused recipient slots from the old one. 210 */ 211 if (unread == 0) { 212 unread = transport->job_next_unread; 213 transport->job_next_unread = job; 214 if (unread != 0) 215 qmgr_job_move_limits(unread); 216 } 217 218 /* 219 * Get as much recipient slots as possible. The excess will be returned 220 * to the transport pool as soon as the exact amount required is known 221 * (which is usually after all recipients have been read in core). 222 */ 223 if (transport->rcpt_unused > 0) { 224 job->rcpt_limit += transport->rcpt_unused; 225 message->rcpt_limit += transport->rcpt_unused; 226 transport->rcpt_unused = 0; 227 } 228} 229 230/* qmgr_job_find - lookup job associated with named message and transport */ 231 232static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 233{ 234 235 /* 236 * Instead of traversing the message job list, we use single per 237 * transport hash table. This is better (at least with respect to memory 238 * usage) than having single hash table (usually almost empty) for each 239 * message. 240 */ 241 return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id)); 242} 243 244/* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */ 245 246QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport) 247{ 248 QMGR_JOB *job; 249 250 /* 251 * Try finding an existing job, reviving it if it was already retired. 252 * Create a new job for this transport/message combination otherwise. In 253 * either case, the job ends linked on the job lists. 254 */ 255 if ((job = qmgr_job_find(message, transport)) == 0) 256 job = qmgr_job_create(message, transport); 257 if (job->stack_level < 0) 258 qmgr_job_link(job); 259 260 /* 261 * Reset the candidate cache because of the new expected recipients. Make 262 * sure the job is not marked as a blocker for the same reason. Note that 263 * this can result in having a non-blocker followed by more blockers. 264 * Consequently, we can't just update the current job pointer, we have to 265 * reset it. Fortunately qmgr_job_entry_select() will easily deal with 266 * this and will lookup the real current job for us. 267 */ 268 RESET_CANDIDATE_CACHE(transport); 269 if (IS_BLOCKER(job, transport)) { 270 job->blocker_tag = 0; 271 transport->job_current = transport->job_list.next; 272 } 273 return (job); 274} 275 276/* qmgr_job_move_limits - move unused recipient slots to the next unread job */ 277 278void qmgr_job_move_limits(QMGR_JOB *job) 279{ 280 QMGR_TRANSPORT *transport = job->transport; 281 QMGR_MESSAGE *message = job->message; 282 QMGR_JOB *next = transport->job_next_unread; 283 int rcpt_unused, msg_rcpt_unused; 284 285 /* 286 * Find next unread job on the job list if necessary. Cache it for later. 287 * This makes the amortized efficiency of this routine O(1) per job. Note 288 * that we use the time list whose ordering doesn't change over time. 289 */ 290 if (job == next) { 291 for (next = next->time_peers.next; next; next = next->time_peers.next) 292 if (next->message->rcpt_offset != 0) 293 break; 294 transport->job_next_unread = next; 295 } 296 297 /* 298 * Calculate the number of available unused slots. 299 */ 300 rcpt_unused = job->rcpt_limit - job->rcpt_count; 301 msg_rcpt_unused = message->rcpt_limit - message->rcpt_count; 302 if (msg_rcpt_unused < rcpt_unused) 303 rcpt_unused = msg_rcpt_unused; 304 305 /* 306 * Transfer the unused recipient slots back to the transport pool and to 307 * the next not-fully-read job. Job's message limits are adjusted 308 * accordingly. Note that the transport pool can be negative if we used 309 * some of the rcpt_per_stack slots. 310 */ 311 if (rcpt_unused > 0) { 312 job->rcpt_limit -= rcpt_unused; 313 message->rcpt_limit -= rcpt_unused; 314 transport->rcpt_unused += rcpt_unused; 315 if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) { 316 next->rcpt_limit += rcpt_unused; 317 next->message->rcpt_limit += rcpt_unused; 318 transport->rcpt_unused = 0; 319 } 320 } 321} 322 323/* qmgr_job_parent_gone - take care of orphaned stack children */ 324 325static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent) 326{ 327 QMGR_JOB *child; 328 329 while ((child = job->stack_children.next) != 0) { 330 QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings); 331 if (parent != 0) 332 QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings); 333 child->stack_parent = parent; 334 } 335} 336 337/* qmgr_job_unlink - unlink the job from the job lists */ 338 339static void qmgr_job_unlink(QMGR_JOB *job) 340{ 341 const char *myname = "qmgr_job_unlink"; 342 QMGR_TRANSPORT *transport = job->transport; 343 344 /* 345 * Sanity checks. 346 */ 347 if (job->stack_level != 0) 348 msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level); 349 if (job->stack_parent != 0) 350 msg_panic("%s: parent present", myname); 351 if (job->stack_siblings.next != 0) 352 msg_panic("%s: siblings present", myname); 353 354 /* 355 * Make sure that children of job on zero stack level are informed that 356 * their parent is gone too. 357 */ 358 qmgr_job_parent_gone(job, 0); 359 360 /* 361 * Update the current job pointer if necessary. 362 */ 363 if (transport->job_current == job) 364 transport->job_current = job->transport_peers.next; 365 366 /* 367 * Invalidate the candidate selection cache if necessary. 368 */ 369 if (job == transport->candidate_cache 370 || job == transport->candidate_cache_current) 371 RESET_CANDIDATE_CACHE(transport); 372 373 /* 374 * Remove the job from the job lists and mark it as unlinked. 375 */ 376 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 377 QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers); 378 job->stack_level = -1; 379} 380 381/* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */ 382 383static void qmgr_job_retire(QMGR_JOB *job) 384{ 385 if (msg_verbose) 386 msg_info("qmgr_job_retire: %s", job->message->queue_id); 387 388 /* 389 * Pop the job from the job stack if necessary. 390 */ 391 if (job->stack_level > 0) 392 qmgr_job_pop(job); 393 394 /* 395 * Make sure this job is not cached as the next unread job for this 396 * transport. The qmgr_entry_done() will make sure that the slots donated 397 * by this job are moved back to the transport pool as soon as possible. 398 */ 399 qmgr_job_move_limits(job); 400 401 /* 402 * Remove the job from the job lists. Note that it remains on the message 403 * job list, though, and that it can be revived by using 404 * qmgr_job_obtain(). Also note that the available slot counter is left 405 * intact. 406 */ 407 qmgr_job_unlink(job); 408} 409 410/* qmgr_job_free - release the job structure */ 411 412void qmgr_job_free(QMGR_JOB *job) 413{ 414 const char *myname = "qmgr_job_free"; 415 QMGR_MESSAGE *message = job->message; 416 QMGR_TRANSPORT *transport = job->transport; 417 418 if (msg_verbose) 419 msg_info("%s: %s %s", myname, message->queue_id, transport->name); 420 421 /* 422 * Sanity checks. 423 */ 424 if (job->rcpt_count) 425 msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count); 426 427 /* 428 * Pop the job from the job stack if necessary. 429 */ 430 if (job->stack_level > 0) 431 qmgr_job_pop(job); 432 433 /* 434 * Return any remaining recipient slots back to the recipient slots pool. 435 */ 436 qmgr_job_move_limits(job); 437 if (job->rcpt_limit) 438 msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit); 439 440 /* 441 * Unlink and discard the structure. Check if the job is still linked on 442 * the job lists or if it was already retired before unlinking it. 443 */ 444 if (job->stack_level >= 0) 445 qmgr_job_unlink(job); 446 QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers); 447 htable_delete(transport->job_byname, message->queue_id, (void (*) (char *)) 0); 448 htable_free(job->peer_byname, (void (*) (char *)) 0); 449 myfree((char *) job); 450} 451 452/* qmgr_job_count_slots - maintain the delivery slot counters */ 453 454static void qmgr_job_count_slots(QMGR_JOB *job) 455{ 456 457 /* 458 * Count the number of delivery slots used during the delivery of the 459 * selected job. Also count the number of delivery slots available for 460 * its preemption. 461 * 462 * Despite its trivial look, this is one of the key parts of the theory 463 * behind this preempting scheduler. 464 */ 465 job->slots_available++; 466 job->slots_used++; 467 468 /* 469 * If the selected job is not the original current job, reset the 470 * candidate cache because the change above have slightly increased the 471 * chance of this job becoming a candidate next time. 472 * 473 * Don't expect that the change of the current jobs this turn will render 474 * the candidate cache invalid the next turn - it can happen that the 475 * next turn the original current job will be selected again and the 476 * cache would be considered valid in such case. 477 */ 478 if (job != job->transport->candidate_cache_current) 479 RESET_CANDIDATE_CACHE(job->transport); 480} 481 482/* qmgr_job_candidate - find best job candidate for preempting given job */ 483 484static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current) 485{ 486 QMGR_TRANSPORT *transport = current->transport; 487 QMGR_JOB *job, *best_job = 0; 488 double score, best_score = 0.0; 489 int max_slots, max_needed_entries, max_total_entries; 490 int delay; 491 time_t now = sane_time(); 492 493 /* 494 * Fetch the result directly from the cache if the cache is still valid. 495 * 496 * Note that we cache negative results too, so the cache must be invalidated 497 * by resetting the cached current job pointer, not the candidate pointer 498 * itself. 499 * 500 * In case the cache is valid and contains no candidate, we can ignore the 501 * time change, as it affects only which candidate is the best, not if 502 * one exists. However, this feature requires that we no longer relax the 503 * cache resetting rules, depending on the automatic cache timeout. 504 */ 505 if (transport->candidate_cache_current == current 506 && (transport->candidate_cache_time == now 507 || transport->candidate_cache == 0)) 508 return (transport->candidate_cache); 509 510 /* 511 * Estimate the minimum amount of delivery slots that can ever be 512 * accumulated for the given job. All jobs that won't fit into these 513 * slots are excluded from the candidate selection. 514 */ 515 max_slots = (MIN_ENTRIES(current) - current->selected_entries 516 + current->slots_available) / transport->slot_cost; 517 518 /* 519 * Select the candidate with best time_since_queued/total_recipients 520 * score. In addition to jobs which don't meet the max_slots limit, skip 521 * also jobs which don't have any selectable entries at the moment. 522 * 523 * Instead of traversing the whole job list we traverse it just from the 524 * current job forward. This has several advantages. First, we skip some 525 * of the blocker jobs and the current job itself right away. But the 526 * really important advantage is that we are sure that we don't consider 527 * any jobs that are already stack children of the current job. Thanks to 528 * this we can easily include all encountered jobs which are leaf 529 * children of some of the preempting stacks as valid candidates. All we 530 * need to do is to make sure we do not include any of the stack parents. 531 * And, because the leaf children are not ordered by the time since 532 * queued, we have to exclude them from the early loop end test. 533 * 534 * However, don't bother searching if we can't find anything suitable 535 * anyway. 536 */ 537 if (max_slots > 0) { 538 for (job = current->transport_peers.next; job; job = job->transport_peers.next) { 539 if (job->stack_children.next != 0 || IS_BLOCKER(job, transport)) 540 continue; 541 max_total_entries = MAX_ENTRIES(job); 542 max_needed_entries = max_total_entries - job->selected_entries; 543 delay = now - job->message->queued_time + 1; 544 if (max_needed_entries > 0 && max_needed_entries <= max_slots) { 545 score = (double) delay / max_total_entries; 546 if (score > best_score) { 547 best_score = score; 548 best_job = job; 549 } 550 } 551 552 /* 553 * Stop early if the best score is as good as it can get. 554 */ 555 if (delay <= best_score && job->stack_level == 0) 556 break; 557 } 558 } 559 560 /* 561 * Cache the result for later use. 562 */ 563 transport->candidate_cache = best_job; 564 transport->candidate_cache_current = current; 565 transport->candidate_cache_time = now; 566 567 return (best_job); 568} 569 570/* qmgr_job_preempt - preempt large message with smaller one */ 571 572static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current) 573{ 574 const char *myname = "qmgr_job_preempt"; 575 QMGR_TRANSPORT *transport = current->transport; 576 QMGR_JOB *job, *prev; 577 int expected_slots; 578 int rcpt_slots; 579 580 /* 581 * Suppress preempting completely if the current job is not big enough to 582 * accumulate even the minimal number of slots required. 583 * 584 * Also, don't look for better job candidate if there are no available slots 585 * yet (the count can get negative due to the slot loans below). 586 */ 587 if (current->slots_available <= 0 588 || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost) 589 return (current); 590 591 /* 592 * Find best candidate for preempting the current job. 593 * 594 * Note that the function also takes care that the candidate fits within the 595 * number of delivery slots which the current job is still able to 596 * accumulate. 597 */ 598 if ((job = qmgr_job_candidate(current)) == 0) 599 return (current); 600 601 /* 602 * Sanity checks. 603 */ 604 if (job == current) 605 msg_panic("%s: attempt to preempt itself", myname); 606 if (job->stack_children.next != 0) 607 msg_panic("%s: already on the job stack (%d)", myname, job->stack_level); 608 if (job->stack_level < 0) 609 msg_panic("%s: not on the job list (%d)", myname, job->stack_level); 610 611 /* 612 * Check if there is enough available delivery slots accumulated to 613 * preempt the current job. 614 * 615 * The slot loaning scheme improves the average message response time. Note 616 * that the loan only allows the preemption happen earlier, though. It 617 * doesn't affect how many slots have to be "paid" - in either case the 618 * full number of slots required has to be accumulated later before the 619 * current job can be preempted again. 620 */ 621 expected_slots = MAX_ENTRIES(job) - job->selected_entries; 622 if (current->slots_available / transport->slot_cost + transport->slot_loan 623 < expected_slots * transport->slot_loan_factor / 100.0) 624 return (current); 625 626 /* 627 * Preempt the current job. 628 * 629 * This involves placing the selected candidate in front of the current job 630 * on the job list and updating the stack parent/child/sibling pointers 631 * appropriately. But first we need to make sure that the candidate is 632 * taken from its previous job stack which it might be top of. 633 */ 634 if (job->stack_level > 0) 635 qmgr_job_pop(job); 636 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 637 prev = current->transport_peers.prev; 638 QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers); 639 job->stack_parent = current; 640 QMGR_LIST_APPEND(current->stack_children, job, stack_siblings); 641 job->stack_level = current->stack_level + 1; 642 643 /* 644 * Update the current job pointer and explicitly reset the candidate 645 * cache. 646 */ 647 transport->job_current = job; 648 RESET_CANDIDATE_CACHE(transport); 649 650 /* 651 * Since the single job can be preempted by several jobs at the same 652 * time, we have to adjust the available slot count now to prevent using 653 * the same slots multiple times. To do that we subtract the number of 654 * slots the preempting job will supposedly use. This number will be 655 * corrected later when that job is popped from the stack to reflect the 656 * number of slots really used. 657 * 658 * As long as we don't need to keep track of how many slots were really 659 * used, we can (ab)use the slots_used counter for counting the 660 * difference between the real and expected amounts instead of the 661 * absolute amount. 662 */ 663 current->slots_available -= expected_slots * transport->slot_cost; 664 job->slots_used = -expected_slots; 665 666 /* 667 * Add part of extra recipient slots reserved for preempting jobs to the 668 * new current job if necessary. 669 * 670 * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such 671 * case. 672 */ 673 if (job->message->rcpt_offset != 0) { 674 rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2; 675 job->rcpt_limit += rcpt_slots; 676 job->message->rcpt_limit += rcpt_slots; 677 transport->rcpt_unused -= rcpt_slots; 678 } 679 if (msg_verbose) 680 msg_info("%s: %s by %s, level %d", myname, current->message->queue_id, 681 job->message->queue_id, job->stack_level); 682 683 return (job); 684} 685 686/* qmgr_job_pop - remove the job from its job preemption stack */ 687 688static void qmgr_job_pop(QMGR_JOB *job) 689{ 690 const char *myname = "qmgr_job_pop"; 691 QMGR_TRANSPORT *transport = job->transport; 692 QMGR_JOB *parent; 693 694 if (msg_verbose) 695 msg_info("%s: %s", myname, job->message->queue_id); 696 697 /* 698 * Sanity checks. 699 */ 700 if (job->stack_level <= 0) 701 msg_panic("%s: not on the job stack (%d)", myname, job->stack_level); 702 703 /* 704 * Adjust the number of delivery slots available to preempt job's parent. 705 * Note that the -= actually adds back any unused slots, as we have 706 * already subtracted the expected amount of slots from both counters 707 * when we did the preemption. 708 * 709 * Note that we intentionally do not adjust slots_used of the parent. Doing 710 * so would decrease the maximum per message inflation factor if the 711 * preemption appeared near the end of parent delivery. 712 * 713 * For the same reason we do not adjust parent's slots_available if the 714 * parent is not the original parent that was preempted by this job 715 * (i.e., the original parent job has already completed). 716 * 717 * This is another key part of the theory behind this preempting scheduler. 718 */ 719 if ((parent = job->stack_parent) != 0 720 && job->stack_level == parent->stack_level + 1) 721 parent->slots_available -= job->slots_used * transport->slot_cost; 722 723 /* 724 * Remove the job from its parent's children list. 725 */ 726 if (parent != 0) { 727 QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings); 728 job->stack_parent = 0; 729 } 730 731 /* 732 * If there is a parent, let it adopt all those orphaned children. 733 * Otherwise at least notify the children that their parent is gone. 734 */ 735 qmgr_job_parent_gone(job, parent); 736 737 /* 738 * Put the job back to stack level zero. 739 */ 740 job->stack_level = 0; 741 742 /* 743 * Explicitly reset the candidate cache. It's not worth trying to skip 744 * this under some complicated conditions - in most cases the popped job 745 * is the current job so we would have to reset it anyway. 746 */ 747 RESET_CANDIDATE_CACHE(transport); 748 749 /* 750 * Here we leave the remaining work involving the proper placement on the 751 * job list to the caller. The most important reason for this is that it 752 * allows us not to look up where exactly to place the job. 753 * 754 * The caller is also made responsible for invalidating the current job 755 * cache if necessary. 756 */ 757#if 0 758 QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers); 759 QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers); 760 761 if (transport->job_current == job) 762 transport->job_current = job->transport_peers.next; 763#endif 764} 765 766/* qmgr_job_peer_select - select next peer suitable for delivery */ 767 768static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job) 769{ 770 QMGR_PEER *peer; 771 QMGR_MESSAGE *message = job->message; 772 773 /* 774 * Try reading in more recipients. We do that as soon as possible 775 * (almost, see below), to make sure there is enough new blood pouring 776 * in. Otherwise single recipient for slow destination might starve the 777 * entire message delivery, leaving lot of fast destination recipients 778 * sitting idle in the queue file. 779 * 780 * Ideally we would like to read in recipients whenever there is a space, 781 * but to prevent excessive I/O, we read them only when enough time has 782 * passed or we can read enough of them at once. 783 * 784 * Note that even if we read the recipients few at a time, the message 785 * loading code tries to put them to existing recipient entries whenever 786 * possible, so the per-destination recipient grouping is not grossly 787 * affected. 788 * 789 * XXX Workaround for logic mismatch. The message->refcount test needs 790 * explanation. If the refcount is zero, it means that qmgr_active_done() 791 * is being completed asynchronously. In such case, we can't read in 792 * more recipients as bad things would happen after qmgr_active_done() 793 * continues processing. Note that this results in the given job being 794 * stalled for some time, but fortunately this particular situation is so 795 * rare that it is not critical. Still we seek for better solution. 796 */ 797 if (message->rcpt_offset != 0 798 && message->refcount > 0 799 && (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit 800 || (message->rcpt_limit > message->rcpt_count 801 && sane_time() - message->refill_time >= job->transport->refill_delay))) 802 qmgr_message_realloc(message); 803 804 /* 805 * Get the next suitable peer, if there is any. 806 */ 807 if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0) 808 return (peer); 809 810 /* 811 * There is no suitable peer in-core, so try reading in more recipients 812 * if possible. This is our last chance to get suitable peer before 813 * giving up on this job for now. 814 * 815 * XXX For message->refcount, see above. 816 */ 817 if (message->rcpt_offset != 0 818 && message->refcount > 0 819 && message->rcpt_limit > message->rcpt_count) { 820 qmgr_message_realloc(message); 821 if (HAS_ENTRIES(job)) 822 return (qmgr_peer_select(job)); 823 } 824 return (0); 825} 826 827/* qmgr_job_entry_select - select next entry suitable for delivery */ 828 829QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport) 830{ 831 QMGR_JOB *job, *next; 832 QMGR_PEER *peer; 833 QMGR_ENTRY *entry; 834 835 /* 836 * Get the current job if there is one. 837 */ 838 if ((job = transport->job_current) == 0) 839 return (0); 840 841 /* 842 * Exercise the preempting algorithm if enabled. 843 * 844 * The slot_cost equal to 1 causes the algorithm to degenerate and is 845 * therefore disabled too. 846 */ 847 if (transport->slot_cost >= 2) 848 job = qmgr_job_preempt(job); 849 850 /* 851 * Select next entry suitable for delivery. In case the current job can't 852 * provide one because of the per-destination concurrency limits, we mark 853 * it as a "blocker" job and continue with the next job on the job list. 854 * 855 * Note that the loop also takes care of getting the "stall" jobs (job with 856 * no entries currently available) out of the way if necessary. Stall 857 * jobs can appear in case of multi-transport messages whose recipients 858 * don't fit in-core at once. Some jobs created by such message may have 859 * only few recipients and would stay on the job list until all other 860 * jobs of that message are delivered, blocking precious recipient slots 861 * available to this transport. Or it can happen that the job has some 862 * more entries but suddenly they all get deferred. Whatever the reason, 863 * we retire such jobs below if we happen to come across some. 864 */ 865 for ( /* empty */ ; job; job = next) { 866 next = job->transport_peers.next; 867 868 /* 869 * Don't bother if the job is known to have no available entries 870 * because of the per-destination concurrency limits. 871 */ 872 if (IS_BLOCKER(job, transport)) 873 continue; 874 875 if ((peer = qmgr_job_peer_select(job)) != 0) { 876 877 /* 878 * We have found a suitable peer. Select one of its entries and 879 * adjust the delivery slot counters. 880 */ 881 entry = qmgr_entry_select(peer); 882 qmgr_job_count_slots(job); 883 884 /* 885 * Remember the current job for the next time so we don't have to 886 * crawl over all those blockers again. They will be reconsidered 887 * when the concurrency limit permits. 888 */ 889 transport->job_current = job; 890 891 /* 892 * In case we selected the very last job entry, remove the job 893 * from the job lists right now. 894 * 895 * This action uses the assumption that once the job entry has been 896 * selected, it can be unselected only before the message ifself 897 * is deferred. Thus the job with all entries selected can't 898 * re-appear with more entries available for selection again 899 * (without reading in more entries from the queue file, which in 900 * turn invokes qmgr_job_obtain() which re-links the job back on 901 * the lists if necessary). 902 * 903 * Note that qmgr_job_move_limits() transfers the recipients slots 904 * correctly even if the job is unlinked from the job list thanks 905 * to the job_next_unread caching. 906 */ 907 if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0) 908 qmgr_job_retire(job); 909 910 /* 911 * Finally. Hand back the fruit of our tedious effort. 912 */ 913 return (entry); 914 } else if (HAS_ENTRIES(job)) { 915 916 /* 917 * The job can't be selected due the concurrency limits. Mark it 918 * together with its queues so we know they are blocking the job 919 * list and they get the appropriate treatment. In particular, 920 * all blockers will be reconsidered when one of the problematic 921 * queues will accept more deliveries. And the job itself will be 922 * reconsidered if it is assigned some more entries. 923 */ 924 job->blocker_tag = transport->blocker_tag; 925 for (peer = job->peer_list.next; peer; peer = peer->peers.next) 926 if (peer->entry_list.next != 0) 927 peer->queue->blocker_tag = transport->blocker_tag; 928 } else { 929 930 /* 931 * The job is "stalled". Retire it until it either gets freed or 932 * gets more entries later. 933 */ 934 qmgr_job_retire(job); 935 } 936 } 937 938 /* 939 * We have not found any entry we could use for delivery. Well, things 940 * must have changed since this transport was selected for asynchronous 941 * allocation. Never mind. Clear the current job pointer and reluctantly 942 * report back that we have failed in our task. 943 */ 944 transport->job_current = 0; 945 return (0); 946} 947 948/* qmgr_job_blocker_update - update "blocked job" status */ 949 950void qmgr_job_blocker_update(QMGR_QUEUE *queue) 951{ 952 QMGR_TRANSPORT *transport = queue->transport; 953 954 /* 955 * If the queue was blocking some of the jobs on the job list, check if 956 * the concurrency limit has lifted. If there are still some pending 957 * deliveries, give it a try and unmark all transport blockers at once. 958 * The qmgr_job_entry_select() will do the rest. In either case make sure 959 * the queue is not marked as a blocker anymore, with extra handling of 960 * queues which were declared dead. 961 * 962 * Note that changing the blocker status also affects the candidate cache. 963 * Most of the cases would be automatically recognized by the current job 964 * change, but we play safe and reset the cache explicitly below. 965 * 966 * Keeping the transport blocker tag odd is an easy way to make sure the tag 967 * never matches jobs that are not explicitly marked as blockers. 968 */ 969 if (queue->blocker_tag == transport->blocker_tag) { 970 if (queue->window > queue->busy_refcount && queue->todo.next != 0) { 971 transport->blocker_tag += 2; 972 transport->job_current = transport->job_list.next; 973 transport->candidate_cache_current = 0; 974 } 975 if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue)) 976 queue->blocker_tag = 0; 977 } 978} 979