1/*++
2/* NAME
3/*	qmgr_queue 3
4/* SUMMARY
5/*	per-destination queues
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	int	qmgr_queue_count;
10/*
11/*	QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
12/*	QMGR_TRANSPORT *transport;
13/*	const char *name;
14/*	const char *nexthop;
15/*
16/*	void	qmgr_queue_done(queue)
17/*	QMGR_QUEUE *queue;
18/*
19/*	QMGR_QUEUE *qmgr_queue_find(transport, name)
20/*	QMGR_TRANSPORT *transport;
21/*	const char *name;
22/*
23/*	void	qmgr_queue_throttle(queue, dsn)
24/*	QMGR_QUEUE *queue;
25/*	DSN	*dsn;
26/*
27/*	void	qmgr_queue_unthrottle(queue)
28/*	QMGR_QUEUE *queue;
29/*
30/*	void	qmgr_queue_suspend(queue, delay)
31/*	QMGR_QUEUE *queue;
32/*	int	delay;
33/* DESCRIPTION
34/*	These routines add/delete/manipulate per-destination queues.
35/*	Each queue corresponds to a specific transport and destination.
36/*	Each queue has a `todo' list of delivery requests for that
37/*	destination, and a `busy' list of delivery requests in progress.
38/*
39/*	qmgr_queue_count is a global counter for the total number
40/*	of in-core queue structures.
41/*
42/*	qmgr_queue_create() creates an empty named queue for the named
43/*	transport and destination. The queue is given an initial
44/*	concurrency limit as specified with the
45/*	\fIinitial_destination_concurrency\fR configuration parameter,
46/*	provided that it does not exceed the transport-specific
47/*	concurrency limit.
48/*
49/*	qmgr_queue_done() disposes of a per-destination queue after all
50/*	its entries have been taken care of. It is an error to dispose
51/*	of a dead queue.
52/*
53/*	qmgr_queue_find() looks up the named queue for the named
54/*	transport. A null result means that the queue was not found.
55/*
56/*	qmgr_queue_throttle() handles a delivery error, and decrements the
57/*	concurrency limit for the destination, with a lower bound of 1.
58/*	When the cohort failure bound is reached, qmgr_queue_throttle()
59/*	sets the concurrency limit to zero and starts a timer
60/*	to re-enable delivery to the destination after a configurable delay.
61/*
62/*	qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
63/*	The concurrency limit for the destination is incremented,
64/*	provided that it does not exceed the destination concurrency
65/*	limit specified for the transport. This routine implements
66/*	"slow open" mode, and eliminates the "thundering herd" problem.
67/*
68/*	qmgr_queue_suspend() suspends delivery for this destination
69/*	briefly. This function invalidates any scheduling decisions
70/*	that are based on the present queue's concurrency window.
71/*	To compensate for work skipped by qmgr_entry_done(), the
72/*	status of blocker jobs is re-evaluated after the queue is
73/*	resumed.
74/* DIAGNOSTICS
75/*	Panic: consistency check failure.
76/* LICENSE
77/* .ad
78/* .fi
79/*	The Secure Mailer license must be distributed with this software.
80/* AUTHOR(S)
81/*	Wietse Venema
82/*	IBM T.J. Watson Research
83/*	P.O. Box 704
84/*	Yorktown Heights, NY 10598, USA
85/*
86/*	Pre-emptive scheduler enhancements:
87/*	Patrik Rak
88/*	Modra 6
89/*	155 00, Prague, Czech Republic
90/*
91/*	Concurrency scheduler enhancements with:
92/*	Victor Duchovni
93/*	Morgan Stanley
94/*--*/
95
96/* System library. */
97
98#include <sys_defs.h>
99#include <time.h>
100
101/* Utility library. */
102
103#include <msg.h>
104#include <mymalloc.h>
105#include <events.h>
106#include <htable.h>
107
108/* Global library. */
109
110#include <mail_params.h>
111#include <recipient_list.h>
112#include <mail_proto.h>			/* QMGR_LOG_WINDOW */
113
114/* Application-specific. */
115
116#include "qmgr.h"
117
118int     qmgr_queue_count;
119
120#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
121	(strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
122	    || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
123
124#define QMGR_LOG_FEEDBACK(feedback) \
125	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126	    msg_info("%s: feedback %g", myname, feedback);
127
128#define QMGR_LOG_WINDOW(queue) \
129	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
130	    msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
131		    myname, queue->name, queue->transport->dest_concurrency_limit, \
132		    queue->window, queue->success, queue->failure, queue->fail_cohorts);
133
134/* qmgr_queue_resume - resume delivery to destination */
135
136static void qmgr_queue_resume(int event, char *context)
137{
138    QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
139    const char *myname = "qmgr_queue_resume";
140
141    /*
142     * Sanity checks.
143     */
144    if (!QMGR_QUEUE_SUSPENDED(queue))
145	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
146
147    /*
148     * We can't simply force delivery on this queue: the transport's pending
149     * count may already be maxed out, and there may be other constraints
150     * that definitely should be none of our business. The best we can do is
151     * to play by the same rules as everyone else: let qmgr_active_drain()
152     * and round-robin selection take care of message selection.
153     */
154    queue->window = 1;
155
156    /*
157     * Every event handler that leaves a queue in the "ready" state should
158     * remove the queue when it is empty.
159     *
160     * XXX Do not omit the redundant test below. It is here to simplify code
161     * consistency checks. The check is trivially eliminated by the compiler
162     * optimizer. There is no need to sacrifice code clarity for the sake of
163     * performance.
164     *
165     * XXX Do not expose the blocker job logic here. Rate-limited queues are not
166     * a performance-critical feature. Here, too, there is no need to sacrifice
167     * code clarity for the sake of performance.
168     */
169    if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
170	qmgr_queue_done(queue);
171    else
172	qmgr_job_blocker_update(queue);
173}
174
175/* qmgr_queue_suspend - briefly suspend a destination */
176
177void    qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
178{
179    const char *myname = "qmgr_queue_suspend";
180
181    /*
182     * Sanity checks.
183     */
184    if (!QMGR_QUEUE_READY(queue))
185	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
186    if (queue->busy_refcount > 0)
187	msg_panic("%s: queue is busy", myname);
188
189    /*
190     * Set the queue status to "suspended". No-one is supposed to remove a
191     * queue in suspended state.
192     */
193    queue->window = QMGR_QUEUE_STAT_SUSPENDED;
194    event_request_timer(qmgr_queue_resume, (char *) queue, delay);
195}
196
197/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
198
199static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context)
200{
201    QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
202
203    /*
204     * This routine runs when a wakeup timer goes off; it does not run in the
205     * context of some queue manipulation. Therefore, it is safe to discard
206     * this in-core queue when it is empty and when this site is not dead.
207     */
208    qmgr_queue_unthrottle(queue);
209    if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
210	qmgr_queue_done(queue);
211}
212
213/* qmgr_queue_unthrottle - give this destination another chance */
214
215void    qmgr_queue_unthrottle(QMGR_QUEUE *queue)
216{
217    const char *myname = "qmgr_queue_unthrottle";
218    QMGR_TRANSPORT *transport = queue->transport;
219    double  feedback;
220
221    if (msg_verbose)
222	msg_info("%s: queue %s", myname, queue->name);
223
224    /*
225     * Sanity checks.
226     */
227    if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue))
228	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
229
230    /*
231     * Don't restart the negative feedback hysteresis cycle with every
232     * positive feedback. Restart it only when we make a positive concurrency
233     * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
234     * Otherwise negative feedback would be too aggressive: negative feedback
235     * takes effect immediately at the start of its hysteresis cycle.
236     */
237    queue->fail_cohorts = 0;
238
239    /*
240     * Special case when this site was dead.
241     */
242    if (QMGR_QUEUE_THROTTLED(queue)) {
243	event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue);
244	if (queue->dsn == 0)
245	    msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
246	dsn_free(queue->dsn);
247	queue->dsn = 0;
248	/* Back from the almost grave, best concurrency is anyone's guess. */
249	if (queue->busy_refcount > 0)
250	    queue->window = queue->busy_refcount;
251	else
252	    queue->window = transport->init_dest_concurrency;
253	queue->success = queue->failure = 0;
254	QMGR_LOG_WINDOW(queue);
255	return;
256    }
257
258    /*
259     * Increase the destination's concurrency limit until we reach the
260     * transport's concurrency limit. Allow for a margin the size of the
261     * initial destination concurrency, so that we're not too gentle.
262     *
263     * Why is the concurrency increment based on preferred concurrency and not
264     * on the number of outstanding delivery requests? The latter fluctuates
265     * wildly when deliveries complete in bursts (artificial benchmark
266     * measurements), and does not account for cached connections.
267     *
268     * Keep the window within reasonable distance from actual concurrency
269     * otherwise negative feedback will be ineffective. This expression
270     * assumes that busy_refcount changes gradually. This is invalid when
271     * deliveries complete in bursts (artificial benchmark measurements).
272     */
273    if (transport->dest_concurrency_limit == 0
274	|| transport->dest_concurrency_limit > queue->window)
275	if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
276	    feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
277	    QMGR_LOG_FEEDBACK(feedback);
278	    queue->success += feedback;
279	    /* Prepare for overshoot (feedback > hysteresis, rounding error). */
280	    while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
281		queue->window += transport->pos_feedback.hysteresis;
282		queue->success -= transport->pos_feedback.hysteresis;
283		queue->failure = 0;
284	    }
285	    /* Prepare for overshoot. */
286	    if (transport->dest_concurrency_limit > 0
287		&& queue->window > transport->dest_concurrency_limit)
288		queue->window = transport->dest_concurrency_limit;
289	}
290    QMGR_LOG_WINDOW(queue);
291}
292
293/* qmgr_queue_throttle - handle destination delivery failure */
294
295void    qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
296{
297    const char *myname = "qmgr_queue_throttle";
298    QMGR_TRANSPORT *transport = queue->transport;
299    double  feedback;
300
301    /*
302     * Sanity checks.
303     */
304    if (!QMGR_QUEUE_READY(queue))
305	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
306    if (queue->dsn)
307	msg_panic("%s: queue %s: spurious reason %s",
308		  myname, queue->name, queue->dsn->reason);
309    if (msg_verbose)
310	msg_info("%s: queue %s: %s %s",
311		 myname, queue->name, dsn->status, dsn->reason);
312
313    /*
314     * Don't restart the positive feedback hysteresis cycle with every
315     * negative feedback. Restart it only when we make a negative concurrency
316     * adjustment (i.e. at the start of a negative feedback hysteresis
317     * cycle). Otherwise positive feedback would be too weak (positive
318     * feedback does not take effect until the end of its hysteresis cycle).
319     */
320
321    /*
322     * This queue is declared dead after a configurable number of
323     * pseudo-cohort failures.
324     */
325    if (QMGR_QUEUE_READY(queue)) {
326	queue->fail_cohorts += 1.0 / queue->window;
327	if (transport->fail_cohort_limit > 0
328	    && queue->fail_cohorts >= transport->fail_cohort_limit)
329	    queue->window = QMGR_QUEUE_STAT_THROTTLED;
330    }
331
332    /*
333     * Decrease the destination's concurrency limit until we reach 1. Base
334     * adjustments on the concurrency limit itself, instead of using the
335     * actual concurrency. The latter fluctuates wildly when deliveries
336     * complete in bursts (artificial benchmark measurements).
337     *
338     * Even after reaching 1, we maintain the negative hysteresis cycle so that
339     * negative feedback can cancel out positive feedback.
340     */
341    if (QMGR_QUEUE_READY(queue)) {
342	feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
343	QMGR_LOG_FEEDBACK(feedback);
344	queue->failure -= feedback;
345	/* Prepare for overshoot (feedback > hysteresis, rounding error). */
346	while (queue->failure - feedback / 2 < 0) {
347	    queue->window -= transport->neg_feedback.hysteresis;
348	    queue->success = 0;
349	    queue->failure += transport->neg_feedback.hysteresis;
350	}
351	/* Prepare for overshoot. */
352	if (queue->window < 1)
353	    queue->window = 1;
354    }
355
356    /*
357     * Special case for a site that just was declared dead.
358     */
359    if (QMGR_QUEUE_THROTTLED(queue)) {
360	queue->dsn = DSN_COPY(dsn);
361	event_request_timer(qmgr_queue_unthrottle_wrapper,
362			    (char *) queue, var_min_backoff_time);
363	queue->dflags = 0;
364    }
365    QMGR_LOG_WINDOW(queue);
366}
367
368/* qmgr_queue_done - delete in-core queue for site */
369
370void    qmgr_queue_done(QMGR_QUEUE *queue)
371{
372    const char *myname = "qmgr_queue_done";
373    QMGR_TRANSPORT *transport = queue->transport;
374
375    /*
376     * Sanity checks. It is an error to delete an in-core queue with pending
377     * messages or timers.
378     */
379    if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
380	msg_panic("%s: refcount: %d", myname,
381		  queue->busy_refcount + queue->todo_refcount);
382    if (queue->todo.next || queue->busy.next)
383	msg_panic("%s: queue not empty: %s", myname, queue->name);
384    if (!QMGR_QUEUE_READY(queue))
385	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
386    if (queue->dsn)
387	msg_panic("%s: queue %s: spurious reason %s",
388		  myname, queue->name, queue->dsn->reason);
389
390    /*
391     * Clean up this in-core queue.
392     */
393    QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers);
394    htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0);
395    myfree(queue->name);
396    myfree(queue->nexthop);
397    qmgr_queue_count--;
398    myfree((char *) queue);
399}
400
401/* qmgr_queue_create - create in-core queue for site */
402
403QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
404			              const char *nexthop)
405{
406    QMGR_QUEUE *queue;
407
408    /*
409     * If possible, choose an initial concurrency of > 1 so that one bad
410     * message or one bad network won't slow us down unnecessarily.
411     */
412
413    queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
414    qmgr_queue_count++;
415    queue->dflags = 0;
416    queue->last_done = 0;
417    queue->name = mystrdup(name);
418    queue->nexthop = mystrdup(nexthop);
419    queue->todo_refcount = 0;
420    queue->busy_refcount = 0;
421    queue->transport = transport;
422    queue->window = transport->init_dest_concurrency;
423    queue->success = queue->failure = queue->fail_cohorts = 0;
424    QMGR_LIST_INIT(queue->todo);
425    QMGR_LIST_INIT(queue->busy);
426    queue->dsn = 0;
427    queue->clog_time_to_warn = 0;
428    queue->blocker_tag = 0;
429    QMGR_LIST_APPEND(transport->queue_list, queue, peers);
430    htable_enter(transport->queue_byname, name, (char *) queue);
431    return (queue);
432}
433
434/* qmgr_queue_find - find in-core named queue */
435
436QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
437{
438    return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
439}
440