1/*	$NetBSD: qmgr_queue.c,v 1.2 2017/02/14 01:16:46 christos Exp $	*/
2
3/*++
4/* NAME
5/*	qmgr_queue 3
6/* SUMMARY
7/*	per-destination queues
8/* SYNOPSIS
9/*	#include "qmgr.h"
10/*
11/*	int	qmgr_queue_count;
12/*
13/*	QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
14/*	QMGR_TRANSPORT *transport;
15/*	const char *name;
16/*	const char *nexthop;
17/*
18/*	void	qmgr_queue_done(queue)
19/*	QMGR_QUEUE *queue;
20/*
21/*	QMGR_QUEUE *qmgr_queue_find(transport, name)
22/*	QMGR_TRANSPORT *transport;
23/*	const char *name;
24/*
25/*	QMGR_QUEUE *qmgr_queue_select(transport)
26/*	QMGR_TRANSPORT *transport;
27/*
28/*	void	qmgr_queue_throttle(queue, dsn)
29/*	QMGR_QUEUE *queue;
30/*	DSN	*dsn;
31/*
32/*	void	qmgr_queue_unthrottle(queue)
33/*	QMGR_QUEUE *queue;
34/*
35/*	void	qmgr_queue_suspend(queue, delay)
36/*	QMGR_QUEUE *queue;
37/*	int	delay;
38/* DESCRIPTION
39/*	These routines add/delete/manipulate per-destination queues.
40/*	Each queue corresponds to a specific transport and destination.
41/*	Each queue has a `todo' list of delivery requests for that
42/*	destination, and a `busy' list of delivery requests in progress.
43/*
44/*	qmgr_queue_count is a global counter for the total number
45/*	of in-core queue structures.
46/*
47/*	qmgr_queue_create() creates an empty named queue for the named
48/*	transport and destination. The queue is given an initial
49/*	concurrency limit as specified with the
50/*	\fIinitial_destination_concurrency\fR configuration parameter,
51/*	provided that it does not exceed the transport-specific
52/*	concurrency limit.
53/*
54/*	qmgr_queue_done() disposes of a per-destination queue after all
55/*	its entries have been taken care of. It is an error to dispose
56/*	of a dead queue.
57/*
58/*	qmgr_queue_find() looks up the named queue for the named
59/*	transport. A null result means that the queue was not found.
60/*
61/*	qmgr_queue_select() uses a round-robin strategy to select
62/*	from the named transport one per-destination queue with a
63/*	non-empty `todo' list.
64/*
65/*	qmgr_queue_throttle() handles a delivery error, and decrements the
66/*	concurrency limit for the destination, with a lower bound of 1.
67/*	When the cohort failure bound is reached, qmgr_queue_throttle()
68/*	sets the concurrency limit to zero and starts a timer
69/*	to re-enable delivery to the destination after a configurable delay.
70/*
71/*	qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
72/*	The concurrency limit for the destination is incremented,
73/*	provided that it does not exceed the destination concurrency
74/*	limit specified for the transport. This routine implements
75/*	"slow open" mode, and eliminates the "thundering herd" problem.
76/*
77/*	qmgr_queue_suspend() suspends delivery for this destination
78/*	briefly.
79/* DIAGNOSTICS
80/*	Panic: consistency check failure.
81/* LICENSE
82/* .ad
83/* .fi
84/*	The Secure Mailer license must be distributed with this software.
85/* AUTHOR(S)
86/*	Wietse Venema
87/*	IBM T.J. Watson Research
88/*	P.O. Box 704
89/*	Yorktown Heights, NY 10598, USA
90/*--*/
91
92/* System library. */
93
94#include <sys_defs.h>
95#include <time.h>
96
97/* Utility library. */
98
99#include <msg.h>
100#include <mymalloc.h>
101#include <events.h>
102#include <htable.h>
103
104/* Global library. */
105
106#include <mail_params.h>
107#include <recipient_list.h>
108#include <mail_proto.h>			/* QMGR_LOG_WINDOW */
109
110/* Application-specific. */
111
112#include "qmgr.h"
113
114int     qmgr_queue_count;
115
116#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
117	(strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
118	    || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
119
120#define QMGR_LOG_FEEDBACK(feedback) \
121	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
122	    msg_info("%s: feedback %g", myname, feedback);
123
124#define QMGR_LOG_WINDOW(queue) \
125	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
126	    msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
127		    myname, queue->name, queue->transport->dest_concurrency_limit, \
128		    queue->window, queue->success, queue->failure, queue->fail_cohorts);
129
130/* qmgr_queue_resume - resume delivery to destination */
131
132static void qmgr_queue_resume(int event, void *context)
133{
134    QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
135    const char *myname = "qmgr_queue_resume";
136
137    /*
138     * Sanity checks.
139     */
140    if (!QMGR_QUEUE_SUSPENDED(queue))
141	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
142
143    /*
144     * We can't simply force delivery on this queue: the transport's pending
145     * count may already be maxed out, and there may be other constraints
146     * that definitely should be none of our business. The best we can do is
147     * to play by the same rules as everyone else: let qmgr_active_drain()
148     * and round-robin selection take care of message selection.
149     */
150    queue->window = 1;
151
152    /*
153     * Every event handler that leaves a queue in the "ready" state should
154     * remove the queue when it is empty.
155     */
156    if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
157	qmgr_queue_done(queue);
158}
159
160/* qmgr_queue_suspend - briefly suspend a destination */
161
162void    qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
163{
164    const char *myname = "qmgr_queue_suspend";
165
166    /*
167     * Sanity checks.
168     */
169    if (!QMGR_QUEUE_READY(queue))
170	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
171    if (queue->busy_refcount > 0)
172	msg_panic("%s: queue is busy", myname);
173
174    /*
175     * Set the queue status to "suspended". No-one is supposed to remove a
176     * queue in suspended state.
177     */
178    queue->window = QMGR_QUEUE_STAT_SUSPENDED;
179    event_request_timer(qmgr_queue_resume, (void *) queue, delay);
180}
181
182/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
183
184static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
185{
186    QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
187
188    /*
189     * This routine runs when a wakeup timer goes off; it does not run in the
190     * context of some queue manipulation. Therefore, it is safe to discard
191     * this in-core queue when it is empty and when this site is not dead.
192     */
193    qmgr_queue_unthrottle(queue);
194    if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
195	qmgr_queue_done(queue);
196}
197
198/* qmgr_queue_unthrottle - give this destination another chance */
199
200void    qmgr_queue_unthrottle(QMGR_QUEUE *queue)
201{
202    const char *myname = "qmgr_queue_unthrottle";
203    QMGR_TRANSPORT *transport = queue->transport;
204    double  feedback;
205
206    if (msg_verbose)
207	msg_info("%s: queue %s", myname, queue->name);
208
209    /*
210     * Sanity checks.
211     */
212    if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue))
213	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
214
215    /*
216     * Don't restart the negative feedback hysteresis cycle with every
217     * positive feedback. Restart it only when we make a positive concurrency
218     * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
219     * Otherwise negative feedback would be too aggressive: negative feedback
220     * takes effect immediately at the start of its hysteresis cycle.
221     */
222    queue->fail_cohorts = 0;
223
224    /*
225     * Special case when this site was dead.
226     */
227    if (QMGR_QUEUE_THROTTLED(queue)) {
228	event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
229	if (queue->dsn == 0)
230	    msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
231	dsn_free(queue->dsn);
232	queue->dsn = 0;
233	/* Back from the almost grave, best concurrency is anyone's guess. */
234	if (queue->busy_refcount > 0)
235	    queue->window = queue->busy_refcount;
236	else
237	    queue->window = transport->init_dest_concurrency;
238	queue->success = queue->failure = 0;
239	QMGR_LOG_WINDOW(queue);
240	return;
241    }
242
243    /*
244     * Increase the destination's concurrency limit until we reach the
245     * transport's concurrency limit. Allow for a margin the size of the
246     * initial destination concurrency, so that we're not too gentle.
247     *
248     * Why is the concurrency increment based on preferred concurrency and not
249     * on the number of outstanding delivery requests? The latter fluctuates
250     * wildly when deliveries complete in bursts (artificial benchmark
251     * measurements), and does not account for cached connections.
252     *
253     * Keep the window within reasonable distance from actual concurrency
254     * otherwise negative feedback will be ineffective. This expression
255     * assumes that busy_refcount changes gradually. This is invalid when
256     * deliveries complete in bursts (artificial benchmark measurements).
257     */
258    if (transport->dest_concurrency_limit == 0
259	|| transport->dest_concurrency_limit > queue->window)
260	if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
261	    feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
262	    QMGR_LOG_FEEDBACK(feedback);
263	    queue->success += feedback;
264	    /* Prepare for overshoot (feedback > hysteresis, rounding error). */
265	    while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
266		queue->window += transport->pos_feedback.hysteresis;
267		queue->success -= transport->pos_feedback.hysteresis;
268		queue->failure = 0;
269	    }
270	    /* Prepare for overshoot. */
271	    if (transport->dest_concurrency_limit > 0
272		&& queue->window > transport->dest_concurrency_limit)
273		queue->window = transport->dest_concurrency_limit;
274	}
275    QMGR_LOG_WINDOW(queue);
276}
277
278/* qmgr_queue_throttle - handle destination delivery failure */
279
280void    qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
281{
282    const char *myname = "qmgr_queue_throttle";
283    QMGR_TRANSPORT *transport = queue->transport;
284    double  feedback;
285
286    /*
287     * Sanity checks.
288     */
289    if (!QMGR_QUEUE_READY(queue))
290	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
291    if (queue->dsn)
292	msg_panic("%s: queue %s: spurious reason %s",
293		  myname, queue->name, queue->dsn->reason);
294    if (msg_verbose)
295	msg_info("%s: queue %s: %s %s",
296		 myname, queue->name, dsn->status, dsn->reason);
297
298    /*
299     * Don't restart the positive feedback hysteresis cycle with every
300     * negative feedback. Restart it only when we make a negative concurrency
301     * adjustment (i.e. at the start of a negative feedback hysteresis
302     * cycle). Otherwise positive feedback would be too weak (positive
303     * feedback does not take effect until the end of its hysteresis cycle).
304     */
305
306    /*
307     * This queue is declared dead after a configurable number of
308     * pseudo-cohort failures.
309     */
310    if (QMGR_QUEUE_READY(queue)) {
311	queue->fail_cohorts += 1.0 / queue->window;
312	if (transport->fail_cohort_limit > 0
313	    && queue->fail_cohorts >= transport->fail_cohort_limit)
314	    queue->window = QMGR_QUEUE_STAT_THROTTLED;
315    }
316
317    /*
318     * Decrease the destination's concurrency limit until we reach 1. Base
319     * adjustments on the concurrency limit itself, instead of using the
320     * actual concurrency. The latter fluctuates wildly when deliveries
321     * complete in bursts (artificial benchmark measurements).
322     *
323     * Even after reaching 1, we maintain the negative hysteresis cycle so that
324     * negative feedback can cancel out positive feedback.
325     */
326    if (QMGR_QUEUE_READY(queue)) {
327	feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
328	QMGR_LOG_FEEDBACK(feedback);
329	queue->failure -= feedback;
330	/* Prepare for overshoot (feedback > hysteresis, rounding error). */
331	while (queue->failure - feedback / 2 < 0) {
332	    queue->window -= transport->neg_feedback.hysteresis;
333	    queue->success = 0;
334	    queue->failure += transport->neg_feedback.hysteresis;
335	}
336	/* Prepare for overshoot. */
337	if (queue->window < 1)
338	    queue->window = 1;
339    }
340
341    /*
342     * Special case for a site that just was declared dead.
343     */
344    if (QMGR_QUEUE_THROTTLED(queue)) {
345	queue->dsn = DSN_COPY(dsn);
346	event_request_timer(qmgr_queue_unthrottle_wrapper,
347			    (void *) queue, var_min_backoff_time);
348	queue->dflags = 0;
349    }
350    QMGR_LOG_WINDOW(queue);
351}
352
353/* qmgr_queue_select - select in-core queue for delivery */
354
355QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport)
356{
357    QMGR_QUEUE *queue;
358
359    /*
360     * If we find a suitable site, rotate the list to enforce round-robin
361     * selection. See similar selection code in qmgr_transport_select().
362     */
363    for (queue = transport->queue_list.next; queue; queue = queue->peers.next) {
364	if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
365	    QMGR_LIST_ROTATE(transport->queue_list, queue);
366	    if (msg_verbose)
367		msg_info("qmgr_queue_select: %s", queue->name);
368	    return (queue);
369	}
370    }
371    return (0);
372}
373
374/* qmgr_queue_done - delete in-core queue for site */
375
376void    qmgr_queue_done(QMGR_QUEUE *queue)
377{
378    const char *myname = "qmgr_queue_done";
379    QMGR_TRANSPORT *transport = queue->transport;
380
381    /*
382     * Sanity checks. It is an error to delete an in-core queue with pending
383     * messages or timers.
384     */
385    if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
386	msg_panic("%s: refcount: %d", myname,
387		  queue->busy_refcount + queue->todo_refcount);
388    if (queue->todo.next || queue->busy.next)
389	msg_panic("%s: queue not empty: %s", myname, queue->name);
390    if (!QMGR_QUEUE_READY(queue))
391	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
392    if (queue->dsn)
393	msg_panic("%s: queue %s: spurious reason %s",
394		  myname, queue->name, queue->dsn->reason);
395
396    /*
397     * Clean up this in-core queue.
398     */
399    QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue);
400    htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
401    myfree(queue->name);
402    myfree(queue->nexthop);
403    qmgr_queue_count--;
404    myfree((void *) queue);
405}
406
407/* qmgr_queue_create - create in-core queue for site */
408
409QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
410			              const char *nexthop)
411{
412    QMGR_QUEUE *queue;
413
414    /*
415     * If possible, choose an initial concurrency of > 1 so that one bad
416     * message or one bad network won't slow us down unnecessarily.
417     */
418
419    queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
420    qmgr_queue_count++;
421    queue->dflags = 0;
422    queue->last_done = 0;
423    queue->name = mystrdup(name);
424    queue->nexthop = mystrdup(nexthop);
425    queue->todo_refcount = 0;
426    queue->busy_refcount = 0;
427    queue->transport = transport;
428    queue->window = transport->init_dest_concurrency;
429    queue->success = queue->failure = queue->fail_cohorts = 0;
430    QMGR_LIST_INIT(queue->todo);
431    QMGR_LIST_INIT(queue->busy);
432    queue->dsn = 0;
433    queue->clog_time_to_warn = 0;
434    QMGR_LIST_PREPEND(transport->queue_list, queue);
435    htable_enter(transport->queue_byname, name, (void *) queue);
436    return (queue);
437}
438
439/* qmgr_queue_find - find in-core named queue */
440
441QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
442{
443    return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
444}
445