1/*++
2/* NAME
3/*	qmgr_job 3
4/* SUMMARY
5/*	per-transport jobs
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	QMGR_JOB *qmgr_job_obtain(message, transport)
10/*	QMGR_MESSAGE *message;
11/*	QMGR_TRANSPORT *transport;
12/*
13/*	void qmgr_job_free(job)
14/*	QMGR_JOB *job;
15/*
16/*	void qmgr_job_move_limits(job)
17/*	QMGR_JOB *job;
18/*
19/*	QMGR_ENTRY *qmgr_job_entry_select(transport)
20/*	QMGR_TRANSPORT *transport;
21/*
22/*	void	qmgr_job_blocker_update(queue)
23/*	QMGR_QUEUE *queue;
24/* DESCRIPTION
25/*	These routines add/delete/manipulate per-transport jobs.
26/*	Each job corresponds to a specific transport and message.
27/*	Each job has a peer list containing all pending delivery
28/*	requests for that message.
29/*
30/*	qmgr_job_obtain() finds an existing job for named message and
31/*	transport combination. New empty job is created if no existing can
32/*	be found. In either case, the job is prepared for assignment of
33/*	(more) message recipients.
34/*
35/*	qmgr_job_free() disposes of a per-transport job after all
36/*	its entries have been taken care of. It is an error to dispose
37/*	of a job that is still in use.
38/*
39/*	qmgr_job_entry_select() attempts to find the next entry suitable
40/*	for delivery. The job preempting algorithm is also exercised.
41/*	If necessary, an attempt to read more recipients into core is made.
42/*	This can result in creation of more job, queue and entry structures.
43/*
44/*	qmgr_job_blocker_update() updates the status of blocked
45/*	jobs after a decrease in the queue's concurrency level,
46/*	after the queue is throttled, or after the queue is resumed
47/*	from suspension.
48/*
49/*	qmgr_job_move_limits() takes care of proper distribution of the
50/*	per-transport recipients limit among the per-transport jobs.
51/*	Should be called whenever a job's recipient slot becomes available.
52/* DIAGNOSTICS
53/*	Panic: consistency check failure.
54/* LICENSE
55/* .ad
56/* .fi
57/*	The Secure Mailer license must be distributed with this software.
58/* AUTHOR(S)
59/*	Patrik Rak
60/*	patrik@raxoft.cz
61/*--*/
62
63/* System library. */
64
65#include <sys_defs.h>
66
67/* Utility library. */
68
69#include <msg.h>
70#include <htable.h>
71#include <mymalloc.h>
72#include <sane_time.h>
73
74/* Application-specific. */
75
76#include "qmgr.h"
77
78/* Forward declarations */
79
80static void qmgr_job_pop(QMGR_JOB *);
81
82/* Helper macros */
83
84#define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
85
86/*
87 * The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
88 * because we don't know if all those unread recipients go to our transport yet.
89 */
90
91#define MIN_ENTRIES(job) ((job)->read_entries)
92#define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
93
94#define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
95
96#define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
97
98/* qmgr_job_create - create and initialize message job structure */
99
100static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
101{
102    QMGR_JOB *job;
103
104    job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
105    job->message = message;
106    QMGR_LIST_APPEND(message->job_list, job, message_peers);
107    htable_enter(transport->job_byname, message->queue_id, (char *) job);
108    job->transport = transport;
109    QMGR_LIST_INIT(job->transport_peers);
110    QMGR_LIST_INIT(job->time_peers);
111    job->stack_parent = 0;
112    QMGR_LIST_INIT(job->stack_children);
113    QMGR_LIST_INIT(job->stack_siblings);
114    job->stack_level = -1;
115    job->blocker_tag = 0;
116    job->peer_byname = htable_create(0);
117    QMGR_LIST_INIT(job->peer_list);
118    job->slots_used = 0;
119    job->slots_available = 0;
120    job->selected_entries = 0;
121    job->read_entries = 0;
122    job->rcpt_count = 0;
123    job->rcpt_limit = 0;
124    return (job);
125}
126
127/* qmgr_job_link - append the job to the job lists based on the time it was queued */
128
129static void qmgr_job_link(QMGR_JOB *job)
130{
131    QMGR_TRANSPORT *transport = job->transport;
132    QMGR_MESSAGE *message = job->message;
133    QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current;
134    int     delay;
135
136    /*
137     * Sanity checks.
138     */
139    if (job->stack_level >= 0)
140	msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level);
141
142    /*
143     * Traverse the time list and the scheduler list from the end and stop
144     * when we found job older than the one being linked.
145     *
146     * During the traversals keep track if we have come across either the
147     * current job or the first unread job on the job list. If this is the
148     * case, these pointers will be adjusted below as required.
149     *
150     * Although both lists are exactly the same when only jobs on the stack
151     * level zero are considered, it's easier to traverse them separately.
152     * Otherwise it's impossible to keep track of the current job pointer
153     * effectively.
154     *
155     * This may look inefficient but under normal operation it is expected that
156     * the loops will stop right away, resulting in normal list appends
157     * below. However, this code is necessary for reviving retired jobs and
158     * for jobs which are created long after the first chunk of recipients
159     * was read in-core (either of these can happen only for multi-transport
160     * messages).
161     *
162     * XXX Note that we test stack_parent rather than stack_level below. This
163     * subtle difference allows us to enqueue the job in correct time order
164     * with respect to orphaned children even after their original parent on
165     * level zero is gone. Consequently, the early loop stop in candidate
166     * selection works reliably, too. These are the reasons why we care to
167     * bother with children adoption at all.
168     */
169    current = transport->job_current;
170    for (next = 0, prev = transport->job_list.prev; prev;
171	 next = prev, prev = prev->transport_peers.prev) {
172	if (prev->stack_parent == 0) {
173	    delay = message->queued_time - prev->message->queued_time;
174	    if (delay >= 0)
175		break;
176	}
177	if (current == prev)
178	    current = 0;
179    }
180    list_prev = prev;
181    list_next = next;
182
183    unread = transport->job_next_unread;
184    for (next = 0, prev = transport->job_bytime.prev; prev;
185	 next = prev, prev = prev->time_peers.prev) {
186	delay = message->queued_time - prev->message->queued_time;
187	if (delay >= 0)
188	    break;
189	if (unread == prev)
190	    unread = 0;
191    }
192
193    /*
194     * Link the job into the proper place on the job lists and mark it so we
195     * know it has been linked.
196     */
197    job->stack_level = 0;
198    QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers);
199    QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers);
200
201    /*
202     * Update the current job pointer if necessary.
203     */
204    if (current == 0)
205	transport->job_current = job;
206
207    /*
208     * Update the pointer to the first unread job on the job list and steal
209     * the unused recipient slots from the old one.
210     */
211    if (unread == 0) {
212	unread = transport->job_next_unread;
213	transport->job_next_unread = job;
214	if (unread != 0)
215	    qmgr_job_move_limits(unread);
216    }
217
218    /*
219     * Get as much recipient slots as possible. The excess will be returned
220     * to the transport pool as soon as the exact amount required is known
221     * (which is usually after all recipients have been read in core).
222     */
223    if (transport->rcpt_unused > 0) {
224	job->rcpt_limit += transport->rcpt_unused;
225	message->rcpt_limit += transport->rcpt_unused;
226	transport->rcpt_unused = 0;
227    }
228}
229
230/* qmgr_job_find - lookup job associated with named message and transport */
231
232static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
233{
234
235    /*
236     * Instead of traversing the message job list, we use single per
237     * transport hash table. This is better (at least with respect to memory
238     * usage) than having single hash table (usually almost empty) for each
239     * message.
240     */
241    return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id));
242}
243
244/* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
245
246QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
247{
248    QMGR_JOB *job;
249
250    /*
251     * Try finding an existing job, reviving it if it was already retired.
252     * Create a new job for this transport/message combination otherwise. In
253     * either case, the job ends linked on the job lists.
254     */
255    if ((job = qmgr_job_find(message, transport)) == 0)
256	job = qmgr_job_create(message, transport);
257    if (job->stack_level < 0)
258	qmgr_job_link(job);
259
260    /*
261     * Reset the candidate cache because of the new expected recipients. Make
262     * sure the job is not marked as a blocker for the same reason. Note that
263     * this can result in having a non-blocker followed by more blockers.
264     * Consequently, we can't just update the current job pointer, we have to
265     * reset it. Fortunately qmgr_job_entry_select() will easily deal with
266     * this and will lookup the real current job for us.
267     */
268    RESET_CANDIDATE_CACHE(transport);
269    if (IS_BLOCKER(job, transport)) {
270	job->blocker_tag = 0;
271	transport->job_current = transport->job_list.next;
272    }
273    return (job);
274}
275
276/* qmgr_job_move_limits - move unused recipient slots to the next unread job */
277
278void    qmgr_job_move_limits(QMGR_JOB *job)
279{
280    QMGR_TRANSPORT *transport = job->transport;
281    QMGR_MESSAGE *message = job->message;
282    QMGR_JOB *next = transport->job_next_unread;
283    int     rcpt_unused, msg_rcpt_unused;
284
285    /*
286     * Find next unread job on the job list if necessary. Cache it for later.
287     * This makes the amortized efficiency of this routine O(1) per job. Note
288     * that we use the time list whose ordering doesn't change over time.
289     */
290    if (job == next) {
291	for (next = next->time_peers.next; next; next = next->time_peers.next)
292	    if (next->message->rcpt_offset != 0)
293		break;
294	transport->job_next_unread = next;
295    }
296
297    /*
298     * Calculate the number of available unused slots.
299     */
300    rcpt_unused = job->rcpt_limit - job->rcpt_count;
301    msg_rcpt_unused = message->rcpt_limit - message->rcpt_count;
302    if (msg_rcpt_unused < rcpt_unused)
303	rcpt_unused = msg_rcpt_unused;
304
305    /*
306     * Transfer the unused recipient slots back to the transport pool and to
307     * the next not-fully-read job. Job's message limits are adjusted
308     * accordingly. Note that the transport pool can be negative if we used
309     * some of the rcpt_per_stack slots.
310     */
311    if (rcpt_unused > 0) {
312	job->rcpt_limit -= rcpt_unused;
313	message->rcpt_limit -= rcpt_unused;
314	transport->rcpt_unused += rcpt_unused;
315	if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) {
316	    next->rcpt_limit += rcpt_unused;
317	    next->message->rcpt_limit += rcpt_unused;
318	    transport->rcpt_unused = 0;
319	}
320    }
321}
322
323/* qmgr_job_parent_gone - take care of orphaned stack children */
324
325static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent)
326{
327    QMGR_JOB *child;
328
329    while ((child = job->stack_children.next) != 0) {
330	QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings);
331	if (parent != 0)
332	    QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings);
333	child->stack_parent = parent;
334    }
335}
336
337/* qmgr_job_unlink - unlink the job from the job lists */
338
339static void qmgr_job_unlink(QMGR_JOB *job)
340{
341    const char *myname = "qmgr_job_unlink";
342    QMGR_TRANSPORT *transport = job->transport;
343
344    /*
345     * Sanity checks.
346     */
347    if (job->stack_level != 0)
348	msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level);
349    if (job->stack_parent != 0)
350	msg_panic("%s: parent present", myname);
351    if (job->stack_siblings.next != 0)
352	msg_panic("%s: siblings present", myname);
353
354    /*
355     * Make sure that children of job on zero stack level are informed that
356     * their parent is gone too.
357     */
358    qmgr_job_parent_gone(job, 0);
359
360    /*
361     * Update the current job pointer if necessary.
362     */
363    if (transport->job_current == job)
364	transport->job_current = job->transport_peers.next;
365
366    /*
367     * Invalidate the candidate selection cache if necessary.
368     */
369    if (job == transport->candidate_cache
370	|| job == transport->candidate_cache_current)
371	RESET_CANDIDATE_CACHE(transport);
372
373    /*
374     * Remove the job from the job lists and mark it as unlinked.
375     */
376    QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
377    QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers);
378    job->stack_level = -1;
379}
380
381/* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
382
383static void qmgr_job_retire(QMGR_JOB *job)
384{
385    if (msg_verbose)
386	msg_info("qmgr_job_retire: %s", job->message->queue_id);
387
388    /*
389     * Pop the job from the job stack if necessary.
390     */
391    if (job->stack_level > 0)
392	qmgr_job_pop(job);
393
394    /*
395     * Make sure this job is not cached as the next unread job for this
396     * transport. The qmgr_entry_done() will make sure that the slots donated
397     * by this job are moved back to the transport pool as soon as possible.
398     */
399    qmgr_job_move_limits(job);
400
401    /*
402     * Remove the job from the job lists. Note that it remains on the message
403     * job list, though, and that it can be revived by using
404     * qmgr_job_obtain(). Also note that the available slot counter is left
405     * intact.
406     */
407    qmgr_job_unlink(job);
408}
409
410/* qmgr_job_free - release the job structure */
411
412void    qmgr_job_free(QMGR_JOB *job)
413{
414    const char *myname = "qmgr_job_free";
415    QMGR_MESSAGE *message = job->message;
416    QMGR_TRANSPORT *transport = job->transport;
417
418    if (msg_verbose)
419	msg_info("%s: %s %s", myname, message->queue_id, transport->name);
420
421    /*
422     * Sanity checks.
423     */
424    if (job->rcpt_count)
425	msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count);
426
427    /*
428     * Pop the job from the job stack if necessary.
429     */
430    if (job->stack_level > 0)
431	qmgr_job_pop(job);
432
433    /*
434     * Return any remaining recipient slots back to the recipient slots pool.
435     */
436    qmgr_job_move_limits(job);
437    if (job->rcpt_limit)
438	msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit);
439
440    /*
441     * Unlink and discard the structure. Check if the job is still linked on
442     * the job lists or if it was already retired before unlinking it.
443     */
444    if (job->stack_level >= 0)
445	qmgr_job_unlink(job);
446    QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers);
447    htable_delete(transport->job_byname, message->queue_id, (void (*) (char *)) 0);
448    htable_free(job->peer_byname, (void (*) (char *)) 0);
449    myfree((char *) job);
450}
451
452/* qmgr_job_count_slots - maintain the delivery slot counters */
453
454static void qmgr_job_count_slots(QMGR_JOB *job)
455{
456
457    /*
458     * Count the number of delivery slots used during the delivery of the
459     * selected job. Also count the number of delivery slots available for
460     * its preemption.
461     *
462     * Despite its trivial look, this is one of the key parts of the theory
463     * behind this preempting scheduler.
464     */
465    job->slots_available++;
466    job->slots_used++;
467
468    /*
469     * If the selected job is not the original current job, reset the
470     * candidate cache because the change above have slightly increased the
471     * chance of this job becoming a candidate next time.
472     *
473     * Don't expect that the change of the current jobs this turn will render
474     * the candidate cache invalid the next turn - it can happen that the
475     * next turn the original current job will be selected again and the
476     * cache would be considered valid in such case.
477     */
478    if (job != job->transport->candidate_cache_current)
479	RESET_CANDIDATE_CACHE(job->transport);
480}
481
482/* qmgr_job_candidate - find best job candidate for preempting given job */
483
484static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current)
485{
486    QMGR_TRANSPORT *transport = current->transport;
487    QMGR_JOB *job, *best_job = 0;
488    double  score, best_score = 0.0;
489    int     max_slots, max_needed_entries, max_total_entries;
490    int     delay;
491    time_t  now = sane_time();
492
493    /*
494     * Fetch the result directly from the cache if the cache is still valid.
495     *
496     * Note that we cache negative results too, so the cache must be invalidated
497     * by resetting the cached current job pointer, not the candidate pointer
498     * itself.
499     *
500     * In case the cache is valid and contains no candidate, we can ignore the
501     * time change, as it affects only which candidate is the best, not if
502     * one exists. However, this feature requires that we no longer relax the
503     * cache resetting rules, depending on the automatic cache timeout.
504     */
505    if (transport->candidate_cache_current == current
506	&& (transport->candidate_cache_time == now
507	    || transport->candidate_cache == 0))
508	return (transport->candidate_cache);
509
510    /*
511     * Estimate the minimum amount of delivery slots that can ever be
512     * accumulated for the given job. All jobs that won't fit into these
513     * slots are excluded from the candidate selection.
514     */
515    max_slots = (MIN_ENTRIES(current) - current->selected_entries
516		 + current->slots_available) / transport->slot_cost;
517
518    /*
519     * Select the candidate with best time_since_queued/total_recipients
520     * score. In addition to jobs which don't meet the max_slots limit, skip
521     * also jobs which don't have any selectable entries at the moment.
522     *
523     * Instead of traversing the whole job list we traverse it just from the
524     * current job forward. This has several advantages. First, we skip some
525     * of the blocker jobs and the current job itself right away. But the
526     * really important advantage is that we are sure that we don't consider
527     * any jobs that are already stack children of the current job. Thanks to
528     * this we can easily include all encountered jobs which are leaf
529     * children of some of the preempting stacks as valid candidates. All we
530     * need to do is to make sure we do not include any of the stack parents.
531     * And, because the leaf children are not ordered by the time since
532     * queued, we have to exclude them from the early loop end test.
533     *
534     * However, don't bother searching if we can't find anything suitable
535     * anyway.
536     */
537    if (max_slots > 0) {
538	for (job = current->transport_peers.next; job; job = job->transport_peers.next) {
539	    if (job->stack_children.next != 0 || IS_BLOCKER(job, transport))
540		continue;
541	    max_total_entries = MAX_ENTRIES(job);
542	    max_needed_entries = max_total_entries - job->selected_entries;
543	    delay = now - job->message->queued_time + 1;
544	    if (max_needed_entries > 0 && max_needed_entries <= max_slots) {
545		score = (double) delay / max_total_entries;
546		if (score > best_score) {
547		    best_score = score;
548		    best_job = job;
549		}
550	    }
551
552	    /*
553	     * Stop early if the best score is as good as it can get.
554	     */
555	    if (delay <= best_score && job->stack_level == 0)
556		break;
557	}
558    }
559
560    /*
561     * Cache the result for later use.
562     */
563    transport->candidate_cache = best_job;
564    transport->candidate_cache_current = current;
565    transport->candidate_cache_time = now;
566
567    return (best_job);
568}
569
570/* qmgr_job_preempt - preempt large message with smaller one */
571
572static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current)
573{
574    const char *myname = "qmgr_job_preempt";
575    QMGR_TRANSPORT *transport = current->transport;
576    QMGR_JOB *job, *prev;
577    int     expected_slots;
578    int     rcpt_slots;
579
580    /*
581     * Suppress preempting completely if the current job is not big enough to
582     * accumulate even the minimal number of slots required.
583     *
584     * Also, don't look for better job candidate if there are no available slots
585     * yet (the count can get negative due to the slot loans below).
586     */
587    if (current->slots_available <= 0
588      || MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost)
589	return (current);
590
591    /*
592     * Find best candidate for preempting the current job.
593     *
594     * Note that the function also takes care that the candidate fits within the
595     * number of delivery slots which the current job is still able to
596     * accumulate.
597     */
598    if ((job = qmgr_job_candidate(current)) == 0)
599	return (current);
600
601    /*
602     * Sanity checks.
603     */
604    if (job == current)
605	msg_panic("%s: attempt to preempt itself", myname);
606    if (job->stack_children.next != 0)
607	msg_panic("%s: already on the job stack (%d)", myname, job->stack_level);
608    if (job->stack_level < 0)
609	msg_panic("%s: not on the job list (%d)", myname, job->stack_level);
610
611    /*
612     * Check if there is enough available delivery slots accumulated to
613     * preempt the current job.
614     *
615     * The slot loaning scheme improves the average message response time. Note
616     * that the loan only allows the preemption happen earlier, though. It
617     * doesn't affect how many slots have to be "paid" - in either case the
618     * full number of slots required has to be accumulated later before the
619     * current job can be preempted again.
620     */
621    expected_slots = MAX_ENTRIES(job) - job->selected_entries;
622    if (current->slots_available / transport->slot_cost + transport->slot_loan
623	< expected_slots * transport->slot_loan_factor / 100.0)
624	return (current);
625
626    /*
627     * Preempt the current job.
628     *
629     * This involves placing the selected candidate in front of the current job
630     * on the job list and updating the stack parent/child/sibling pointers
631     * appropriately. But first we need to make sure that the candidate is
632     * taken from its previous job stack which it might be top of.
633     */
634    if (job->stack_level > 0)
635	qmgr_job_pop(job);
636    QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
637    prev = current->transport_peers.prev;
638    QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers);
639    job->stack_parent = current;
640    QMGR_LIST_APPEND(current->stack_children, job, stack_siblings);
641    job->stack_level = current->stack_level + 1;
642
643    /*
644     * Update the current job pointer and explicitly reset the candidate
645     * cache.
646     */
647    transport->job_current = job;
648    RESET_CANDIDATE_CACHE(transport);
649
650    /*
651     * Since the single job can be preempted by several jobs at the same
652     * time, we have to adjust the available slot count now to prevent using
653     * the same slots multiple times. To do that we subtract the number of
654     * slots the preempting job will supposedly use. This number will be
655     * corrected later when that job is popped from the stack to reflect the
656     * number of slots really used.
657     *
658     * As long as we don't need to keep track of how many slots were really
659     * used, we can (ab)use the slots_used counter for counting the
660     * difference between the real and expected amounts instead of the
661     * absolute amount.
662     */
663    current->slots_available -= expected_slots * transport->slot_cost;
664    job->slots_used = -expected_slots;
665
666    /*
667     * Add part of extra recipient slots reserved for preempting jobs to the
668     * new current job if necessary.
669     *
670     * Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
671     * case.
672     */
673    if (job->message->rcpt_offset != 0) {
674	rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2;
675	job->rcpt_limit += rcpt_slots;
676	job->message->rcpt_limit += rcpt_slots;
677	transport->rcpt_unused -= rcpt_slots;
678    }
679    if (msg_verbose)
680	msg_info("%s: %s by %s, level %d", myname, current->message->queue_id,
681		 job->message->queue_id, job->stack_level);
682
683    return (job);
684}
685
686/* qmgr_job_pop - remove the job from its job preemption stack */
687
688static void qmgr_job_pop(QMGR_JOB *job)
689{
690    const char *myname = "qmgr_job_pop";
691    QMGR_TRANSPORT *transport = job->transport;
692    QMGR_JOB *parent;
693
694    if (msg_verbose)
695	msg_info("%s: %s", myname, job->message->queue_id);
696
697    /*
698     * Sanity checks.
699     */
700    if (job->stack_level <= 0)
701	msg_panic("%s: not on the job stack (%d)", myname, job->stack_level);
702
703    /*
704     * Adjust the number of delivery slots available to preempt job's parent.
705     * Note that the -= actually adds back any unused slots, as we have
706     * already subtracted the expected amount of slots from both counters
707     * when we did the preemption.
708     *
709     * Note that we intentionally do not adjust slots_used of the parent. Doing
710     * so would decrease the maximum per message inflation factor if the
711     * preemption appeared near the end of parent delivery.
712     *
713     * For the same reason we do not adjust parent's slots_available if the
714     * parent is not the original parent that was preempted by this job
715     * (i.e., the original parent job has already completed).
716     *
717     * This is another key part of the theory behind this preempting scheduler.
718     */
719    if ((parent = job->stack_parent) != 0
720	&& job->stack_level == parent->stack_level + 1)
721	parent->slots_available -= job->slots_used * transport->slot_cost;
722
723    /*
724     * Remove the job from its parent's children list.
725     */
726    if (parent != 0) {
727	QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings);
728	job->stack_parent = 0;
729    }
730
731    /*
732     * If there is a parent, let it adopt all those orphaned children.
733     * Otherwise at least notify the children that their parent is gone.
734     */
735    qmgr_job_parent_gone(job, parent);
736
737    /*
738     * Put the job back to stack level zero.
739     */
740    job->stack_level = 0;
741
742    /*
743     * Explicitly reset the candidate cache. It's not worth trying to skip
744     * this under some complicated conditions - in most cases the popped job
745     * is the current job so we would have to reset it anyway.
746     */
747    RESET_CANDIDATE_CACHE(transport);
748
749    /*
750     * Here we leave the remaining work involving the proper placement on the
751     * job list to the caller. The most important reason for this is that it
752     * allows us not to look up where exactly to place the job.
753     *
754     * The caller is also made responsible for invalidating the current job
755     * cache if necessary.
756     */
757#if 0
758    QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
759    QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers);
760
761    if (transport->job_current == job)
762	transport->job_current = job->transport_peers.next;
763#endif
764}
765
766/* qmgr_job_peer_select - select next peer suitable for delivery */
767
768static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job)
769{
770    QMGR_PEER *peer;
771    QMGR_MESSAGE *message = job->message;
772
773    /*
774     * Try reading in more recipients. We do that as soon as possible
775     * (almost, see below), to make sure there is enough new blood pouring
776     * in. Otherwise single recipient for slow destination might starve the
777     * entire message delivery, leaving lot of fast destination recipients
778     * sitting idle in the queue file.
779     *
780     * Ideally we would like to read in recipients whenever there is a space,
781     * but to prevent excessive I/O, we read them only when enough time has
782     * passed or we can read enough of them at once.
783     *
784     * Note that even if we read the recipients few at a time, the message
785     * loading code tries to put them to existing recipient entries whenever
786     * possible, so the per-destination recipient grouping is not grossly
787     * affected.
788     *
789     * XXX Workaround for logic mismatch. The message->refcount test needs
790     * explanation. If the refcount is zero, it means that qmgr_active_done()
791     * is being completed asynchronously.  In such case, we can't read in
792     * more recipients as bad things would happen after qmgr_active_done()
793     * continues processing. Note that this results in the given job being
794     * stalled for some time, but fortunately this particular situation is so
795     * rare that it is not critical. Still we seek for better solution.
796     */
797    if (message->rcpt_offset != 0
798	&& message->refcount > 0
799	&& (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit
800	    || (message->rcpt_limit > message->rcpt_count
801    && sane_time() - message->refill_time >= job->transport->refill_delay)))
802	qmgr_message_realloc(message);
803
804    /*
805     * Get the next suitable peer, if there is any.
806     */
807    if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0)
808	return (peer);
809
810    /*
811     * There is no suitable peer in-core, so try reading in more recipients
812     * if possible. This is our last chance to get suitable peer before
813     * giving up on this job for now.
814     *
815     * XXX For message->refcount, see above.
816     */
817    if (message->rcpt_offset != 0
818	&& message->refcount > 0
819	&& message->rcpt_limit > message->rcpt_count) {
820	qmgr_message_realloc(message);
821	if (HAS_ENTRIES(job))
822	    return (qmgr_peer_select(job));
823    }
824    return (0);
825}
826
827/* qmgr_job_entry_select - select next entry suitable for delivery */
828
829QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
830{
831    QMGR_JOB *job, *next;
832    QMGR_PEER *peer;
833    QMGR_ENTRY *entry;
834
835    /*
836     * Get the current job if there is one.
837     */
838    if ((job = transport->job_current) == 0)
839	return (0);
840
841    /*
842     * Exercise the preempting algorithm if enabled.
843     *
844     * The slot_cost equal to 1 causes the algorithm to degenerate and is
845     * therefore disabled too.
846     */
847    if (transport->slot_cost >= 2)
848	job = qmgr_job_preempt(job);
849
850    /*
851     * Select next entry suitable for delivery. In case the current job can't
852     * provide one because of the per-destination concurrency limits, we mark
853     * it as a "blocker" job and continue with the next job on the job list.
854     *
855     * Note that the loop also takes care of getting the "stall" jobs (job with
856     * no entries currently available) out of the way if necessary. Stall
857     * jobs can appear in case of multi-transport messages whose recipients
858     * don't fit in-core at once. Some jobs created by such message may have
859     * only few recipients and would stay on the job list until all other
860     * jobs of that message are delivered, blocking precious recipient slots
861     * available to this transport. Or it can happen that the job has some
862     * more entries but suddenly they all get deferred. Whatever the reason,
863     * we retire such jobs below if we happen to come across some.
864     */
865    for ( /* empty */ ; job; job = next) {
866	next = job->transport_peers.next;
867
868	/*
869	 * Don't bother if the job is known to have no available entries
870	 * because of the per-destination concurrency limits.
871	 */
872	if (IS_BLOCKER(job, transport))
873	    continue;
874
875	if ((peer = qmgr_job_peer_select(job)) != 0) {
876
877	    /*
878	     * We have found a suitable peer. Select one of its entries and
879	     * adjust the delivery slot counters.
880	     */
881	    entry = qmgr_entry_select(peer);
882	    qmgr_job_count_slots(job);
883
884	    /*
885	     * Remember the current job for the next time so we don't have to
886	     * crawl over all those blockers again. They will be reconsidered
887	     * when the concurrency limit permits.
888	     */
889	    transport->job_current = job;
890
891	    /*
892	     * In case we selected the very last job entry, remove the job
893	     * from the job lists right now.
894	     *
895	     * This action uses the assumption that once the job entry has been
896	     * selected, it can be unselected only before the message ifself
897	     * is deferred. Thus the job with all entries selected can't
898	     * re-appear with more entries available for selection again
899	     * (without reading in more entries from the queue file, which in
900	     * turn invokes qmgr_job_obtain() which re-links the job back on
901	     * the lists if necessary).
902	     *
903	     * Note that qmgr_job_move_limits() transfers the recipients slots
904	     * correctly even if the job is unlinked from the job list thanks
905	     * to the job_next_unread caching.
906	     */
907	    if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0)
908		qmgr_job_retire(job);
909
910	    /*
911	     * Finally. Hand back the fruit of our tedious effort.
912	     */
913	    return (entry);
914	} else if (HAS_ENTRIES(job)) {
915
916	    /*
917	     * The job can't be selected due the concurrency limits. Mark it
918	     * together with its queues so we know they are blocking the job
919	     * list and they get the appropriate treatment. In particular,
920	     * all blockers will be reconsidered when one of the problematic
921	     * queues will accept more deliveries. And the job itself will be
922	     * reconsidered if it is assigned some more entries.
923	     */
924	    job->blocker_tag = transport->blocker_tag;
925	    for (peer = job->peer_list.next; peer; peer = peer->peers.next)
926		if (peer->entry_list.next != 0)
927		    peer->queue->blocker_tag = transport->blocker_tag;
928	} else {
929
930	    /*
931	     * The job is "stalled". Retire it until it either gets freed or
932	     * gets more entries later.
933	     */
934	    qmgr_job_retire(job);
935	}
936    }
937
938    /*
939     * We have not found any entry we could use for delivery. Well, things
940     * must have changed since this transport was selected for asynchronous
941     * allocation. Never mind. Clear the current job pointer and reluctantly
942     * report back that we have failed in our task.
943     */
944    transport->job_current = 0;
945    return (0);
946}
947
948/* qmgr_job_blocker_update - update "blocked job" status */
949
950void    qmgr_job_blocker_update(QMGR_QUEUE *queue)
951{
952    QMGR_TRANSPORT *transport = queue->transport;
953
954    /*
955     * If the queue was blocking some of the jobs on the job list, check if
956     * the concurrency limit has lifted. If there are still some pending
957     * deliveries, give it a try and unmark all transport blockers at once.
958     * The qmgr_job_entry_select() will do the rest. In either case make sure
959     * the queue is not marked as a blocker anymore, with extra handling of
960     * queues which were declared dead.
961     *
962     * Note that changing the blocker status also affects the candidate cache.
963     * Most of the cases would be automatically recognized by the current job
964     * change, but we play safe and reset the cache explicitly below.
965     *
966     * Keeping the transport blocker tag odd is an easy way to make sure the tag
967     * never matches jobs that are not explicitly marked as blockers.
968     */
969    if (queue->blocker_tag == transport->blocker_tag) {
970	if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
971	    transport->blocker_tag += 2;
972	    transport->job_current = transport->job_list.next;
973	    transport->candidate_cache_current = 0;
974	}
975	if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue))
976	    queue->blocker_tag = 0;
977    }
978}
979