1/*++
2/* NAME
3/*	qmgr_entry 3
4/* SUMMARY
5/*	per-site queue entries
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	QMGR_ENTRY *qmgr_entry_create(peer, message)
10/*      QMGR_PEER *peer;
11/*	QMGR_MESSAGE *message;
12/*
13/*	void	qmgr_entry_done(entry, which)
14/*	QMGR_ENTRY *entry;
15/*	int	which;
16/*
17/*	QMGR_ENTRY *qmgr_entry_select(queue)
18/*	QMGR_QUEUE *queue;
19/*
20/*	void	qmgr_entry_unselect(queue, entry)
21/*	QMGR_QUEUE *queue;
22/*	QMGR_ENTRY *entry;
23/*
24/*	void	qmgr_entry_move_todo(dst, entry)
25/*	QMGR_QUEUE *dst;
26/*	QMGR_ENTRY *entry;
27/* DESCRIPTION
28/*	These routines add/delete/manipulate per-site message
29/*	delivery requests.
30/*
31/*	qmgr_entry_create() creates an entry for the named peer and message,
32/*      and appends the entry to the peer's list and its queue's todo list.
33/*	Filling in and cleaning up the recipients is the responsibility
34/*	of the caller.
35/*
36/*	qmgr_entry_done() discards a per-site queue entry.  The
37/*	\fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry
38/*	of the site's `busy' list (i.e. queue entries that have been
39/*	selected for actual delivery), or QMGR_QUEUE_TODO for an entry
40/*	of the site's `todo' list (i.e. queue entries awaiting selection
41/*	for actual delivery).
42/*
43/*	qmgr_entry_done() discards its peer structure when the peer
44/*      is not referenced anymore.
45/*
46/*	qmgr_entry_done() triggers cleanup of the per-site queue when
47/*	the site has no pending deliveries, and the site is either
48/*	alive, or the site is dead and the number of in-core queues
49/*	exceeds a configurable limit (see qmgr_queue_done()).
50/*
51/*	qmgr_entry_done() triggers special action when the last in-core
52/*	queue entry for a message is done with: either read more
53/*	recipients from the queue file, delete the queue file, or move
54/*	the queue file to the deferred queue; send bounce reports to the
55/*	message originator (see qmgr_active_done()).
56/*
57/*	qmgr_entry_select() selects first entry from the named
58/*	per-site queue's `todo' list for actual delivery. The entry is
59/*	moved to the queue's `busy' list: the list of messages being
60/*	delivered. The entry is also removed from its peer list.
61/*
62/*	qmgr_entry_unselect() takes the named entry off the named
63/*	per-site queue's `busy' list and moves it to the queue's
64/*	`todo' list. The entry is also prepended to its peer list again.
65/*
66/*	qmgr_entry_move_todo() moves the specified "todo" queue entry
67/*	to the specified "todo" queue.
68/* DIAGNOSTICS
69/*	Panic: interface violations, internal inconsistencies.
70/* LICENSE
71/* .ad
72/* .fi
73/*	The Secure Mailer license must be distributed with this software.
74/* AUTHOR(S)
75/*	Wietse Venema
76/*	IBM T.J. Watson Research
77/*	P.O. Box 704
78/*	Yorktown Heights, NY 10598, USA
79/*
80/*	Preemptive scheduler enhancements:
81/*	Patrik Rak
82/*	Modra 6
83/*	155 00, Prague, Czech Republic
84/*--*/
85
86/* System library. */
87
88#include <sys_defs.h>
89#include <stdlib.h>
90#include <time.h>
91
92/* Utility library. */
93
94#include <msg.h>
95#include <mymalloc.h>
96#include <events.h>
97#include <vstream.h>
98
99/* Global library. */
100
101#include <mail_params.h>
102#include <deliver_request.h>		/* opportunistic session caching */
103
104/* Application-specific. */
105
106#include "qmgr.h"
107
108/* qmgr_entry_select - select queue entry for delivery */
109
110QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *peer)
111{
112    const char *myname = "qmgr_entry_select";
113    QMGR_ENTRY *entry;
114    QMGR_QUEUE *queue;
115
116    if ((entry = peer->entry_list.next) != 0) {
117	queue = entry->queue;
118	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
119	queue->todo_refcount--;
120	QMGR_LIST_APPEND(queue->busy, entry, queue_peers);
121	queue->busy_refcount++;
122	QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
123	peer->job->selected_entries++;
124
125	/*
126	 * With opportunistic session caching, the delivery agent must not
127	 * only 1) save a session upon completion, but also 2) reuse a cached
128	 * session upon the next delivery request. In order to not miss out
129	 * on 2), we have to make caching sticky or else we get silly
130	 * behavior when the in-memory queue drains. Specifically, new
131	 * connections must not be made as long as cached connections exist.
132	 *
133	 * Safety: don't enable opportunistic session caching unless the queue
134	 * manager is able to schedule concurrent or back-to-back deliveries
135	 * (we need to recognize back-to-back deliveries for transports with
136	 * concurrency 1).
137	 *
138	 * If caching has previously been enabled, but is not now, fetch any
139	 * existing entries from the cache, but don't add new ones.
140	 */
141#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \
142	    (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY())
143
144#define BACK_TO_BACK_DELIVERY() \
145		(queue->last_done + 1 >= event_time())
146
147	/*
148	 * Turn on session caching after we get up to speed. Don't enable
149	 * session caching just because we have concurrent deliveries. This
150	 * prevents unnecessary session caching when we have a burst of mail
151	 * <= the initial concurrency limit.
152	 */
153	if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) {
154	    if (BACK_TO_BACK_DELIVERY()) {
155		if (msg_verbose)
156		    msg_info("%s: allowing on-demand session caching for %s",
157			     myname, queue->name);
158		queue->dflags |= DEL_REQ_FLAG_CONN_MASK;
159	    }
160	}
161
162	/*
163	 * Turn off session caching when concurrency drops and we're running
164	 * out of steam. This is what prevents from turning off session
165	 * caching too early, and from making new connections while old ones
166	 * are still cached.
167	 */
168	else {
169	    if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) {
170		if (msg_verbose)
171		    msg_info("%s: disallowing on-demand session caching for %s",
172			     myname, queue->name);
173		queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE;
174	    }
175	}
176    }
177    return (entry);
178}
179
180/* qmgr_entry_unselect - unselect queue entry for delivery */
181
182void    qmgr_entry_unselect(QMGR_ENTRY *entry)
183{
184    QMGR_PEER *peer = entry->peer;
185    QMGR_QUEUE *queue = entry->queue;
186
187    /*
188     * Move the entry back to the todo lists. In case of the peer list, put
189     * it back to the beginning, so the select()/unselect() does not reorder
190     * entries. We use this in qmgr_message_assign() to put recipients into
191     * existing entries when possible.
192     */
193    QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
194    queue->busy_refcount--;
195    QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
196    queue->todo_refcount++;
197    QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers);
198    peer->job->selected_entries--;
199}
200
201/* qmgr_entry_move_todo - move entry between todo queues */
202
203void    qmgr_entry_move_todo(QMGR_QUEUE *dst_queue, QMGR_ENTRY *entry)
204{
205    const char *myname = "qmgr_entry_move_todo";
206    QMGR_TRANSPORT *dst_transport = dst_queue->transport;
207    QMGR_MESSAGE *message = entry->message;
208    QMGR_QUEUE *src_queue = entry->queue;
209    QMGR_PEER *dst_peer, *src_peer = entry->peer;
210    QMGR_JOB *dst_job, *src_job = src_peer->job;
211    QMGR_ENTRY *new_entry;
212    int     rcpt_count = entry->rcpt_list.len;
213
214    if (entry->stream != 0)
215	msg_panic("%s: queue %s entry is busy", myname, src_queue->name);
216    if (QMGR_QUEUE_THROTTLED(dst_queue))
217	msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name);
218    if (QMGR_TRANSPORT_THROTTLED(dst_transport))
219	msg_panic("%s: destination transport %s is throttled",
220		  myname, dst_transport->name);
221
222    /*
223     * Create new entry, swap the recipients between the two entries,
224     * adjusting the job counters accordingly, then dispose of the old entry.
225     *
226     * Note that qmgr_entry_done() will also take care of adjusting the
227     * recipient limits of all the message jobs, so we do not have to do that
228     * explicitly for the new job here.
229     *
230     * XXX This does not enforce the per-entry recipient limit, but that is not
231     * a problem as long as qmgr_entry_move_todo() is called only to bounce
232     * or defer mail.
233     */
234    dst_job = qmgr_job_obtain(message, dst_transport);
235    dst_peer = qmgr_peer_obtain(dst_job, dst_queue);
236
237    new_entry = qmgr_entry_create(dst_peer, message);
238
239    recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list);
240
241    src_job->rcpt_count -= rcpt_count;
242    dst_job->rcpt_count += rcpt_count;
243
244    qmgr_entry_done(entry, QMGR_QUEUE_TODO);
245}
246
247/* qmgr_entry_done - dispose of queue entry */
248
249void    qmgr_entry_done(QMGR_ENTRY *entry, int which)
250{
251    const char *myname = "qmgr_entry_done";
252    QMGR_QUEUE *queue = entry->queue;
253    QMGR_MESSAGE *message = entry->message;
254    QMGR_PEER *peer = entry->peer;
255    QMGR_JOB *sponsor, *job = peer->job;
256    QMGR_TRANSPORT *transport = job->transport;
257
258    /*
259     * Take this entry off the in-core queue.
260     */
261    if (entry->stream != 0)
262	msg_panic("%s: file is open", myname);
263    if (which == QMGR_QUEUE_BUSY) {
264	QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers);
265	queue->busy_refcount--;
266    } else if (which == QMGR_QUEUE_TODO) {
267	QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers);
268	job->selected_entries++;
269	QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers);
270	queue->todo_refcount--;
271    } else {
272	msg_panic("%s: bad queue spec: %d", myname, which);
273    }
274
275    /*
276     * Decrease the in-core recipient counts and free the recipient list and
277     * the structure itself.
278     */
279    job->rcpt_count -= entry->rcpt_list.len;
280    message->rcpt_count -= entry->rcpt_list.len;
281    qmgr_recipient_count -= entry->rcpt_list.len;
282    recipient_list_free(&entry->rcpt_list);
283    myfree((char *) entry);
284
285    /*
286     * Make sure that the transport of any retired or finishing job that
287     * donated recipient slots to this message gets them back first. Then, if
288     * possible, pass the remaining unused recipient slots to the next job on
289     * the job list.
290     */
291    for (sponsor = message->job_list.next; sponsor; sponsor = sponsor->message_peers.next) {
292	if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job)
293	    continue;
294	if (sponsor->stack_level < 0 || message->rcpt_offset == 0)
295	    qmgr_job_move_limits(sponsor);
296    }
297    if (message->rcpt_offset == 0) {
298	qmgr_job_move_limits(job);
299    }
300
301    /*
302     * We implement a rate-limited queue by emulating a slow delivery
303     * channel. We insert the artificial delays with qmgr_queue_suspend().
304     *
305     * When a queue is suspended, we must postpone any job scheduling decisions
306     * until the queue is resumed. Otherwise, we make those decisions now.
307     * The job scheduling decisions are made by qmgr_job_blocker_update().
308     */
309    if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) {
310	if (queue->window > 1)
311	    msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service",
312		      myname, transport->name, queue->name, queue->window);
313	if (QMGR_QUEUE_THROTTLED(queue))	/* XXX */
314	    qmgr_queue_unthrottle(queue);
315	if (QMGR_QUEUE_READY(queue))
316	    qmgr_queue_suspend(queue, transport->rate_delay);
317    }
318    if (!QMGR_QUEUE_SUSPENDED(queue)
319	&& queue->blocker_tag == transport->blocker_tag)
320	qmgr_job_blocker_update(queue);
321
322    /*
323     * When there are no more entries for this peer, discard the peer
324     * structure.
325     */
326    peer->refcount--;
327    if (peer->refcount == 0)
328	qmgr_peer_free(peer);
329
330    /*
331     * Maintain back-to-back delivery status.
332     */
333    if (which == QMGR_QUEUE_BUSY)
334	queue->last_done = event_time();
335
336    /*
337     * When the in-core queue for this site is empty and when this site is
338     * not dead or suspended, discard the in-core queue. When this site is
339     * dead, but the number of in-core queues exceeds some threshold, get rid
340     * of this in-core queue anyway, in order to avoid running out of memory.
341     */
342    if (queue->todo.next == 0 && queue->busy.next == 0) {
343	if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit)
344	    qmgr_queue_unthrottle(queue);
345	if (QMGR_QUEUE_READY(queue))
346	    qmgr_queue_done(queue);
347    }
348
349    /*
350     * Update the in-core message reference count. When the in-core message
351     * structure has no more references, dispose of the message.
352     */
353    message->refcount--;
354    if (message->refcount == 0)
355	qmgr_active_done(message);
356}
357
358/* qmgr_entry_create - create queue todo entry */
359
360QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *peer, QMGR_MESSAGE *message)
361{
362    QMGR_ENTRY *entry;
363    QMGR_QUEUE *queue = peer->queue;
364
365    /*
366     * Sanity check.
367     */
368    if (QMGR_QUEUE_THROTTLED(queue))
369	msg_panic("qmgr_entry_create: dead queue: %s", queue->name);
370
371    /*
372     * Create the delivery request.
373     */
374    entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
375    entry->stream = 0;
376    entry->message = message;
377    recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);
378    message->refcount++;
379    entry->peer = peer;
380    QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers);
381    peer->refcount++;
382    entry->queue = queue;
383    QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
384    queue->todo_refcount++;
385    peer->job->read_entries++;
386
387    /*
388     * Warn if a destination is falling behind while the active queue
389     * contains a non-trivial amount of single-recipient email. When a
390     * destination takes up more and more space in the active queue, then
391     * other mail will not get through and delivery performance will suffer.
392     *
393     * XXX At this point in the code, the busy reference count is still less
394     * than the concurrency limit (otherwise this code would not be invoked
395     * in the first place) so we have to make make some awkward adjustments
396     * below.
397     *
398     * XXX The queue length test below looks at the active queue share of an
399     * individual destination. This catches the case where mail for one
400     * destination is falling behind because it has to round-robin compete
401     * with many other destinations. However, Postfix will also perform
402     * poorly when most of the active queue is tied up by a small number of
403     * concurrency limited destinations. The queue length test below detects
404     * such conditions only indirectly.
405     *
406     * XXX This code does not detect the case that the active queue is being
407     * starved because incoming mail is pounding the disk.
408     */
409    if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) {
410	int     queue_length = queue->todo_refcount + queue->busy_refcount;
411	time_t  now;
412	QMGR_TRANSPORT *transport;
413	double  active_share;
414
415	if (queue_length > var_qmgr_active_limit / 5
416	    && (now = event_time()) >= queue->clog_time_to_warn) {
417	    active_share = queue_length / (double) qmgr_message_count;
418	    msg_warn("mail for %s is using up %d of %d active queue entries",
419		     queue->nexthop, queue_length, qmgr_message_count);
420	    if (active_share < 0.9)
421		msg_warn("this may slow down other mail deliveries");
422	    transport = queue->transport;
423	    if (transport->dest_concurrency_limit > 0
424	    && transport->dest_concurrency_limit <= queue->busy_refcount + 1)
425		msg_warn("you may need to increase the main.cf %s%s from %d",
426			 transport->name, _DEST_CON_LIMIT,
427			 transport->dest_concurrency_limit);
428	    else if (queue->window > var_qmgr_active_limit * active_share)
429		msg_warn("you may need to increase the main.cf %s from %d",
430			 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit);
431	    else if (queue->peers.next != queue->peers.prev)
432		msg_warn("you may need a separate master.cf transport for %s",
433			 queue->nexthop);
434	    else {
435		msg_warn("you may need to reduce %s connect and helo timeouts",
436			 transport->name);
437		msg_warn("so that Postfix quickly skips unavailable hosts");
438		msg_warn("you may need to increase the main.cf %s and %s",
439			 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME);
440		msg_warn("so that Postfix wastes less time on undeliverable mail");
441		msg_warn("you may need to increase the master.cf %s process limit",
442			 transport->name);
443	    }
444	    msg_warn("please avoid flushing the whole queue when you have");
445	    msg_warn("lots of deferred mail, that is bad for performance");
446	    msg_warn("to turn off these warnings specify: %s = 0",
447		     VAR_QMGR_CLOG_WARN_TIME);
448	    queue->clog_time_to_warn = now + var_qmgr_clog_warn_time;
449	}
450    }
451    return (entry);
452}
453