1/*++
2/* NAME
3/*	qmgr_active 3
4/* SUMMARY
5/*	active queue management
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	void	qmgr_active_feed(scan_info, queue_id)
10/*	QMGR_SCAN *scan_info;
11/*	const char *queue_id;
12/*
13/*	void	qmgr_active_drain()
14/*
15/*	int	qmgr_active_done(message)
16/*	QMGR_MESSAGE *message;
17/* DESCRIPTION
18/*	These functions maintain the active message queue: the set
19/*	of messages that the queue manager is actually working on.
20/*	The active queue is limited in size. Messages are drained
21/*	from the active queue by allocating a delivery process and
22/*	by delivering mail via that process.  Messages leak into the
23/*	active queue only when the active queue is small enough.
24/*	Damaged message files are saved to the "corrupt" directory.
25/*
26/*	qmgr_active_feed() inserts the named message file into
27/*	the active queue. Message files with the wrong name or
28/*	with other wrong properties are skipped but not removed.
29/*	The following queue flags are recognized, other flags being
30/*	ignored:
31/* .IP QMGR_SCAN_ALL
32/*	Examine all queue files. Normally, deferred queue files with
33/*	future time stamps are ignored, and incoming queue files with
34/*	future time stamps are frowned upon.
35/* .PP
36/*	qmgr_active_drain() allocates one delivery process.
37/*	Process allocation is asynchronous. Once the delivery
38/*	process is available, an attempt is made to deliver
39/*	a message via it. Message delivery is asynchronous, too.
40/*
41/*	qmgr_active_done() deals with a message after delivery
42/*	has been tried for all in-core recipients. If the message
43/*	was bounced, a bounce message is sent to the sender, or
44/*	to the Errors-To: address if one was specified.
45/*	If there are more on-file recipients, a new batch of
46/*	in-core recipients is read from the queue file. Otherwise,
47/*	if a delivery agent marked the queue file as corrupt,
48/*	the queue file is moved to the "corrupt" queue (surprise);
49/*	if at least one delivery failed, the message is moved
50/*	to the deferred queue. The time stamps of a deferred queue
51/*	file are set to the nearest wakeup time of its recipient
52/*	sites (if delivery failed due to a problem with a next-hop
53/*	host), are set into the future by the amount of time the
54/*	message was queued (per-message exponential backoff), or are set
55/*	into the future by a minimal backoff time, whichever is more.
56/*	The minimal_backoff_time parameter specifies the minimal
57/*	amount of time between delivery attempts; maximal_backoff_time
58/*	specifies an upper limit.
59/* DIAGNOSTICS
60/*	Fatal: queue file access failures, out of memory.
61/*	Panic: interface violations, internal consistency errors.
62/*	Warnings: corrupt message file. A corrupt message is saved
63/*	to the "corrupt" queue for further inspection.
64/* LICENSE
65/* .ad
66/* .fi
67/*	The Secure Mailer license must be distributed with this software.
68/* AUTHOR(S)
69/*	Wietse Venema
70/*	IBM T.J. Watson Research
71/*	P.O. Box 704
72/*	Yorktown Heights, NY 10598, USA
73/*--*/
74
75/* System library. */
76
77#include <sys_defs.h>
78#include <sys/stat.h>
79#include <dirent.h>
80#include <stdlib.h>
81#include <unistd.h>
82#include <string.h>
83#include <utime.h>
84#include <errno.h>
85
86#ifndef S_IRWXU				/* What? no POSIX system? */
87#define S_IRWXU 0700
88#endif
89
90/* Utility library. */
91
92#include <msg.h>
93#include <events.h>
94#include <mymalloc.h>
95#include <vstream.h>
96#include <warn_stat.h>
97
98/* Global library. */
99
100#include <mail_params.h>
101#include <mail_open_ok.h>
102#include <mail_queue.h>
103#include <recipient_list.h>
104#include <bounce.h>
105#include <defer.h>
106#include <trace.h>
107#include <abounce.h>
108#include <rec_type.h>
109#include <qmgr_user.h>
110
111#ifdef __APPLE_OS_X_SERVER__
112#include <dtrace-postfix.h>
113#endif
114
115/* Application-specific. */
116
117#include "qmgr.h"
118
119 /*
120  * A bunch of call-back routines.
121  */
122static void qmgr_active_done_2_bounce_flush(int, char *);
123static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
124static void qmgr_active_done_25_trace_flush(int, char *);
125static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
126static void qmgr_active_done_3_defer_flush(int, char *);
127static void qmgr_active_done_3_defer_warn(int, char *);
128static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
129
130/* qmgr_active_corrupt - move corrupted file out of the way */
131
132static void qmgr_active_corrupt(const char *queue_id)
133{
134    const char *myname = "qmgr_active_corrupt";
135
136    if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
137	if (errno != ENOENT)
138	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
139		      myname, MAIL_QUEUE_ACTIVE, queue_id);
140    } else {
141	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
142		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
143    }
144}
145
146/* qmgr_active_defer - defer queue file */
147
148static void qmgr_active_defer(const char *queue_name, const char *queue_id,
149			              const char *dest_queue, int delay)
150{
151    const char *myname = "qmgr_active_defer";
152    const char *path;
153    struct utimbuf tbuf;
154
155    if (msg_verbose)
156	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
157
158    tbuf.actime = tbuf.modtime = event_time() + delay;
159    path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
160    if (utime(path, &tbuf) < 0 && errno != ENOENT)
161	msg_fatal("%s: update %s time stamps: %m", myname, path);
162    if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
163	if (errno != ENOENT)
164	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
165		      queue_id, queue_name, dest_queue);
166	msg_warn("%s: rename %s from %s to %s: %m", myname,
167		 queue_id, queue_name, dest_queue);
168    } else if (msg_verbose) {
169	msg_info("%s: defer %s", myname, queue_id);
170    }
171}
172
173/* qmgr_active_feed - feed one message into active queue */
174
175int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
176{
177    const char *myname = "qmgr_active_feed";
178    QMGR_MESSAGE *message;
179    struct stat st;
180    const char *path;
181
182    if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
183	msg_panic("%s: bad queue %s", myname, scan_info->queue);
184    if (msg_verbose)
185	msg_info("%s: queue %s", myname, scan_info->queue);
186
187    /*
188     * Make sure this is something we are willing to open.
189     */
190    if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
191	return (0);
192
193    if (msg_verbose)
194	msg_info("%s: %s", myname, path);
195
196    /*
197     * Skip files that have time stamps into the future. They need to cool
198     * down. Incoming and deferred files can have future time stamps.
199     */
200    if ((scan_info->flags & QMGR_SCAN_ALL) == 0
201	&& st.st_mtime > time((time_t *) 0) + 1) {
202	if (msg_verbose)
203	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
204		     (long) (st.st_mtime - event_time()));
205	return (0);
206    }
207
208    /*
209     * Move the message to the active queue. File access errors are fatal.
210     */
211    if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
212	if (errno != ENOENT)
213	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
214		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
215	msg_warn("%s: %s: rename from %s to %s: %m", myname,
216		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
217	return (0);
218    }
219
220    /*
221     * Extract envelope information: sender and recipients. At this point,
222     * mail addresses have been processed by the cleanup service so they
223     * should be in canonical form. Generate requests to deliver this
224     * message.
225     *
226     * Throwing away queue files seems bad, especially when they made it this
227     * far into the mail system. Therefore we save bad files to a separate
228     * directory for further inspection.
229     *
230     * After queue manager restart it is possible that a queue file is still
231     * being delivered. In that case (the file is locked), defer delivery by
232     * a minimal amount of time.
233     */
234#define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
235
236    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
237				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
238				      scan_info->flags | QMGR_FLUSH_AFTER :
239				      scan_info->flags,
240				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
241				  st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :
242				      0)) == 0) {
243	qmgr_active_corrupt(queue_id);
244	return (0);
245    } else if (message == QMGR_MESSAGE_LOCKED) {
246	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
247	return (0);
248    } else {
249
250	/*
251	 * Special case if all recipients were already delivered. Send any
252	 * bounces and clean up.
253	 */
254	if (message->refcount == 0)
255	    qmgr_active_done(message);
256	return (1);
257    }
258}
259
260/* qmgr_active_done - dispose of message after recipients have been tried */
261
262void    qmgr_active_done(QMGR_MESSAGE *message)
263{
264    const char *myname = "qmgr_active_done";
265    struct stat st;
266
267    if (msg_verbose)
268	msg_info("%s: %s", myname, message->queue_id);
269
270    /*
271     * During a previous iteration, an attempt to bounce this message may
272     * have failed, so there may still be a bounce log lying around. XXX By
273     * groping around in the bounce queue, we're trespassing on the bounce
274     * service's territory. But doing so is more robust than depending on the
275     * bounce daemon to do the lookup for us, and for us to do the deleting
276     * after we have received a successful status from the bounce service.
277     * The bounce queue directory blocks are most likely in memory anyway. If
278     * these lookups become a performance problem we will have to build an
279     * in-core cache into the bounce daemon.
280     *
281     * Don't bounce when the bounce log is empty. The bounce process obviously
282     * failed, and the delivery agent will have requested that the message be
283     * deferred.
284     *
285     * Bounces are sent asynchronously to avoid stalling while the cleanup
286     * daemon waits for the qmgr to accept the "new mail" trigger.
287     *
288     * See also code in cleanup_bounce.c.
289     */
290    if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
291	if (st.st_size == 0) {
292	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
293		msg_fatal("remove %s %s: %m",
294			  MAIL_QUEUE_BOUNCE, message->queue_id);
295	} else {
296	    if (msg_verbose)
297		msg_info("%s: bounce %s", myname, message->queue_id);
298	    if (message->verp_delims == 0 || var_verp_bounce_off)
299		abounce_flush(BOUNCE_FLAG_KEEP,
300			      message->queue_name,
301			      message->queue_id,
302			      message->encoding,
303			      message->sender,
304			      message->dsn_envid,
305			      message->dsn_ret,
306			      qmgr_active_done_2_bounce_flush,
307			      (char *) message);
308	    else
309		abounce_flush_verp(BOUNCE_FLAG_KEEP,
310				   message->queue_name,
311				   message->queue_id,
312				   message->encoding,
313				   message->sender,
314				   message->dsn_envid,
315				   message->dsn_ret,
316				   message->verp_delims,
317				   qmgr_active_done_2_bounce_flush,
318				   (char *) message);
319	    return;
320	}
321    }
322
323    /*
324     * Asynchronous processing does not reach this point.
325     */
326    qmgr_active_done_2_generic(message);
327}
328
329/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
330
331static void qmgr_active_done_2_bounce_flush(int status, char *context)
332{
333    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
334
335    /*
336     * Process abounce_flush() status and continue processing.
337     */
338    message->flags |= status;
339    qmgr_active_done_2_generic(message);
340}
341
342/* qmgr_active_done_2_generic - continue processing */
343
344static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
345{
346    const char *path;
347    struct stat st;
348
349    /*
350     * A delivery agent marks a queue file as corrupt by changing its
351     * attributes, and by pretending that delivery was deferred.
352     */
353    if (message->flags
354	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
355	qmgr_active_corrupt(message->queue_id);
356	qmgr_message_free(message);
357	return;
358    }
359
360    /*
361     * If we did not read all recipients from this file, go read some more,
362     * but remember whether some recipients have to be tried again.
363     *
364     * Throwing away queue files seems bad, especially when they made it this
365     * far into the mail system. Therefore we save bad files to a separate
366     * directory for further inspection by a human being.
367     */
368    if (message->rcpt_offset > 0) {
369	if (qmgr_message_realloc(message) == 0) {
370	    qmgr_active_corrupt(message->queue_id);
371	    qmgr_message_free(message);
372	} else {
373	    if (message->refcount == 0)
374		qmgr_active_done(message);	/* recurse for consistency */
375	}
376	return;
377    }
378
379    /*
380     * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
381     * and others not. Depending on what subset of recipients are delivered,
382     * a trace file may or may not be created. Even when the last partial
383     * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
384     * still exist from a previous partial delivery attempt. So as long as
385     * any recipient has NOTIFY=SUCCESS we have to always look for the trace
386     * file and be prepared for the file not to exist.
387     *
388     * See also comments in bounce/bounce_notify_util.c.
389     */
390    if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD))
391	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
392	atrace_flush(message->tflags,
393		     message->queue_name,
394		     message->queue_id,
395		     message->encoding,
396		     message->sender,
397		     message->dsn_envid,
398		     message->dsn_ret,
399		     qmgr_active_done_25_trace_flush,
400		     (char *) message);
401	return;
402    }
403
404    /*
405     * Asynchronous processing does not reach this point.
406     */
407    qmgr_active_done_25_generic(message);
408}
409
410/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
411
412static void qmgr_active_done_25_trace_flush(int status, char *context)
413{
414    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
415
416    /*
417     * Process atrace_flush() status and continue processing.
418     */
419    if (status == 0 && message->tflags_offset)
420	qmgr_message_kill_record(message, message->tflags_offset);
421    message->flags |= status;
422    qmgr_active_done_25_generic(message);
423}
424
425/* qmgr_active_done_25_generic - continue processing */
426
427static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
428{
429    const char *myname = "qmgr_active_done_25_generic";
430
431    /*
432     * If we get to this point we have tried all recipients for this message.
433     * If the message is too old, try to bounce it.
434     *
435     * Bounces are sent asynchronously to avoid stalling while the cleanup
436     * daemon waits for the qmgr to accept the "new mail" trigger.
437     */
438    if (message->flags) {
439	if (event_time() >= message->create_time +
440	    (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
441	    msg_info("%s: from=<%s>, status=expired, returned to sender",
442		     message->queue_id, message->sender);
443	    if (message->verp_delims == 0 || var_verp_bounce_off)
444		adefer_flush(BOUNCE_FLAG_KEEP,
445			     message->queue_name,
446			     message->queue_id,
447			     message->encoding,
448			     message->sender,
449			     message->dsn_envid,
450			     message->dsn_ret,
451			     qmgr_active_done_3_defer_flush,
452			     (char *) message);
453	    else
454		adefer_flush_verp(BOUNCE_FLAG_KEEP,
455				  message->queue_name,
456				  message->queue_id,
457				  message->encoding,
458				  message->sender,
459				  message->dsn_envid,
460				  message->dsn_ret,
461				  message->verp_delims,
462				  qmgr_active_done_3_defer_flush,
463				  (char *) message);
464	    return;
465	} else if (message->warn_time > 0
466		   && event_time() >= message->warn_time - 1) {
467	    if (msg_verbose)
468		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
469	    adefer_warn(BOUNCE_FLAG_KEEP,
470			message->queue_name,
471			message->queue_id,
472			message->encoding,
473			message->sender,
474			message->dsn_envid,
475			message->dsn_ret,
476			qmgr_active_done_3_defer_warn,
477			(char *) message);
478	    return;
479	}
480    }
481
482    /*
483     * Asynchronous processing does not reach this point.
484     */
485    qmgr_active_done_3_generic(message);
486}
487
488/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
489
490static void qmgr_active_done_3_defer_warn(int status, char *context)
491{
492    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
493
494    /*
495     * Process adefer_warn() completion status and continue processing.
496     */
497    if (status == 0)
498	qmgr_message_update_warn(message);
499    qmgr_active_done_3_generic(message);
500}
501
502/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
503
504static void qmgr_active_done_3_defer_flush(int status, char *context)
505{
506    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
507
508    /*
509     * Process adefer_flush() status and continue processing.
510     */
511    message->flags = status;
512    qmgr_active_done_3_generic(message);
513}
514
515/* qmgr_active_done_3_generic - continue processing */
516
517static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
518{
519    const char *myname = "qmgr_active_done_3_generic";
520    int     delay;
521
522    /*
523     * Some recipients need to be tried again. Move the queue file time
524     * stamps into the future by the amount of time that the message is
525     * delayed, and move the message to the deferred queue. Impose minimal
526     * and maximal backoff times.
527     *
528     * Since we look at actual time in queue, not time since last delivery
529     * attempt, backoff times will be distributed. However, we can still see
530     * spikes in delivery activity because the interval between deferred
531     * queue scans is finite.
532     */
533    if (message->flags) {
534	if (message->create_time > 0) {
535	    delay = event_time() - message->create_time;
536	    if (delay > var_max_backoff_time)
537		delay = var_max_backoff_time;
538	    if (delay < var_min_backoff_time)
539		delay = var_min_backoff_time;
540	} else {
541	    delay = var_min_backoff_time;
542	}
543	qmgr_active_defer(message->queue_name, message->queue_id,
544			  MAIL_QUEUE_DEFERRED, delay);
545    }
546
547    /*
548     * All recipients done. Remove the queue file.
549     */
550    else {
551	if (mail_queue_remove(message->queue_name, message->queue_id)) {
552	    if (errno != ENOENT)
553		msg_fatal("%s: remove %s from %s: %m", myname,
554			  message->queue_id, message->queue_name);
555	    msg_warn("%s: remove %s from %s: %m", myname,
556		     message->queue_id, message->queue_name);
557	} else {
558	    /* Same format as logged by postsuper. */
559	    msg_info("%s: removed", message->queue_id);
560
561#ifdef __APPLE_OS_X_SERVER__
562	    if (POSTFIX_SMTP_DEQUEUE_ENABLED())
563		POSTFIX_SMTP_DEQUEUE(message);
564#endif
565	}
566    }
567
568    /*
569     * Finally, delete the in-core message structure.
570     */
571    qmgr_message_free(message);
572}
573
574/* qmgr_active_drain - drain active queue by allocating a delivery process */
575
576void    qmgr_active_drain(void)
577{
578    QMGR_TRANSPORT *transport;
579
580    /*
581     * Allocate one delivery process for every transport with pending mail.
582     * The process allocation completes asynchronously.
583     */
584    while ((transport = qmgr_transport_select()) != 0) {
585	if (msg_verbose)
586	    msg_info("qmgr_active_drain: allocate %s", transport->name);
587	qmgr_transport_alloc(transport, qmgr_deliver);
588    }
589}
590