1/*++
2/* NAME
3/*	qmgr_active 3
4/* SUMMARY
5/*	active queue management
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	void	qmgr_active_feed(scan_info, queue_id)
10/*	QMGR_SCAN *scan_info;
11/*	const char *queue_id;
12/*
13/*	void	qmgr_active_drain()
14/*
15/*	int	qmgr_active_done(message)
16/*	QMGR_MESSAGE *message;
17/* DESCRIPTION
18/*	These functions maintain the active message queue: the set
19/*	of messages that the queue manager is actually working on.
20/*	The active queue is limited in size. Messages are drained
21/*	from the active queue by allocating a delivery process and
22/*	by delivering mail via that process.  Messages leak into the
23/*	active queue only when the active queue is small enough.
24/*	Damaged message files are saved to the "corrupt" directory.
25/*
26/*	qmgr_active_feed() inserts the named message file into
27/*	the active queue. Message files with the wrong name or
28/*	with other wrong properties are skipped but not removed.
29/*	The following queue flags are recognized, other flags being
30/*	ignored:
31/* .IP QMGR_SCAN_ALL
32/*	Examine all queue files. Normally, deferred queue files with
33/*	future time stamps are ignored, and incoming queue files with
34/*	future time stamps are frowned upon.
35/* .PP
36/*	qmgr_active_drain() allocates one delivery process.
37/*	Process allocation is asynchronous. Once the delivery
38/*	process is available, an attempt is made to deliver
39/*	a message via it. Message delivery is asynchronous, too.
40/*
41/*	qmgr_active_done() deals with a message after delivery
42/*	has been tried for all in-core recipients. If the message
43/*	was bounced, a bounce message is sent to the sender, or
44/*	to the Errors-To: address if one was specified.
45/*	If there are more on-file recipients, a new batch of
46/*	in-core recipients is read from the queue file. Otherwise,
47/*	if a delivery agent marked the queue file as corrupt,
48/*	the queue file is moved to the "corrupt" queue (surprise);
49/*	if at least one delivery failed, the message is moved
50/*	to the deferred queue. The time stamps of a deferred queue
51/*	file are set to the nearest wakeup time of its recipient
52/*	sites (if delivery failed due to a problem with a next-hop
53/*	host), are set into the future by the amount of time the
54/*	message was queued (per-message exponential backoff), or are set
55/*	into the future by a minimal backoff time, whichever is more.
56/*	The minimal_backoff_time parameter specifies the minimal
57/*	amount of time between delivery attempts; maximal_backoff_time
58/*	specifies an upper limit.
59/* DIAGNOSTICS
60/*	Fatal: queue file access failures, out of memory.
61/*	Panic: interface violations, internal consistency errors.
62/*	Warnings: corrupt message file. A corrupt message is saved
63/*	to the "corrupt" queue for further inspection.
64/* LICENSE
65/* .ad
66/* .fi
67/*	The Secure Mailer license must be distributed with this software.
68/* AUTHOR(S)
69/*	Wietse Venema
70/*	IBM T.J. Watson Research
71/*	P.O. Box 704
72/*	Yorktown Heights, NY 10598, USA
73/*--*/
74
75/* System library. */
76
77#include <sys_defs.h>
78#include <sys/stat.h>
79#include <dirent.h>
80#include <stdlib.h>
81#include <unistd.h>
82#include <string.h>
83#include <utime.h>
84#include <errno.h>
85
86#ifndef S_IRWXU				/* What? no POSIX system? */
87#define S_IRWXU 0700
88#endif
89
90/* Utility library. */
91
92#include <msg.h>
93#include <events.h>
94#include <mymalloc.h>
95#include <vstream.h>
96#include <warn_stat.h>
97
98/* Global library. */
99
100#include <mail_params.h>
101#include <mail_open_ok.h>
102#include <mail_queue.h>
103#include <recipient_list.h>
104#include <bounce.h>
105#include <defer.h>
106#include <trace.h>
107#include <abounce.h>
108#include <rec_type.h>
109#include <qmgr_user.h>
110
111/* Application-specific. */
112
113#include "qmgr.h"
114
115 /*
116  * A bunch of call-back routines.
117  */
118static void qmgr_active_done_2_bounce_flush(int, char *);
119static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
120static void qmgr_active_done_25_trace_flush(int, char *);
121static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
122static void qmgr_active_done_3_defer_flush(int, char *);
123static void qmgr_active_done_3_defer_warn(int, char *);
124static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
125
126/* qmgr_active_corrupt - move corrupted file out of the way */
127
128static void qmgr_active_corrupt(const char *queue_id)
129{
130    const char *myname = "qmgr_active_corrupt";
131
132    if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
133	if (errno != ENOENT)
134	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
135		      myname, MAIL_QUEUE_ACTIVE, queue_id);
136    } else {
137	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
138		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
139    }
140}
141
142/* qmgr_active_defer - defer queue file */
143
144static void qmgr_active_defer(const char *queue_name, const char *queue_id,
145			              const char *dest_queue, int delay)
146{
147    const char *myname = "qmgr_active_defer";
148    const char *path;
149    struct utimbuf tbuf;
150
151    if (msg_verbose)
152	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
153
154    tbuf.actime = tbuf.modtime = event_time() + delay;
155    path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
156    if (utime(path, &tbuf) < 0 && errno != ENOENT)
157	msg_fatal("%s: update %s time stamps: %m", myname, path);
158    if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
159	if (errno != ENOENT)
160	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
161		      queue_id, queue_name, dest_queue);
162	msg_warn("%s: rename %s from %s to %s: %m", myname,
163		 queue_id, queue_name, dest_queue);
164    } else if (msg_verbose) {
165	msg_info("%s: defer %s", myname, queue_id);
166    }
167}
168
169/* qmgr_active_feed - feed one message into active queue */
170
171int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
172{
173    const char *myname = "qmgr_active_feed";
174    QMGR_MESSAGE *message;
175    struct stat st;
176    const char *path;
177
178    if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
179	msg_panic("%s: bad queue %s", myname, scan_info->queue);
180    if (msg_verbose)
181	msg_info("%s: queue %s", myname, scan_info->queue);
182
183    /*
184     * Make sure this is something we are willing to open.
185     */
186    if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
187	return (0);
188
189    if (msg_verbose)
190	msg_info("%s: %s", myname, path);
191
192    /*
193     * Skip files that have time stamps into the future. They need to cool
194     * down. Incoming and deferred files can have future time stamps.
195     */
196    if ((scan_info->flags & QMGR_SCAN_ALL) == 0
197	&& st.st_mtime > time((time_t *) 0) + 1) {
198	if (msg_verbose)
199	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
200		     (long) (st.st_mtime - event_time()));
201	return (0);
202    }
203
204    /*
205     * Move the message to the active queue. File access errors are fatal.
206     */
207    if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
208	if (errno != ENOENT)
209	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
210		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
211	msg_warn("%s: %s: rename from %s to %s: %m", myname,
212		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
213	return (0);
214    }
215
216    /*
217     * Extract envelope information: sender and recipients. At this point,
218     * mail addresses have been processed by the cleanup service so they
219     * should be in canonical form. Generate requests to deliver this
220     * message.
221     *
222     * Throwing away queue files seems bad, especially when they made it this
223     * far into the mail system. Therefore we save bad files to a separate
224     * directory for further inspection.
225     *
226     * After queue manager restart it is possible that a queue file is still
227     * being delivered. In that case (the file is locked), defer delivery by
228     * a minimal amount of time.
229     */
230#define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
231
232    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
233				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
234				      scan_info->flags | QMGR_FLUSH_AFTER :
235				      scan_info->flags,
236				 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
237				  st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :
238				      0)) == 0) {
239	qmgr_active_corrupt(queue_id);
240	return (0);
241    } else if (message == QMGR_MESSAGE_LOCKED) {
242	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
243	return (0);
244    } else {
245
246	/*
247	 * Special case if all recipients were already delivered. Send any
248	 * bounces and clean up.
249	 */
250	if (message->refcount == 0)
251	    qmgr_active_done(message);
252	return (1);
253    }
254}
255
256/* qmgr_active_done - dispose of message after recipients have been tried */
257
258void    qmgr_active_done(QMGR_MESSAGE *message)
259{
260    const char *myname = "qmgr_active_done";
261    struct stat st;
262
263    if (msg_verbose)
264	msg_info("%s: %s", myname, message->queue_id);
265
266    /*
267     * During a previous iteration, an attempt to bounce this message may
268     * have failed, so there may still be a bounce log lying around. XXX By
269     * groping around in the bounce queue, we're trespassing on the bounce
270     * service's territory. But doing so is more robust than depending on the
271     * bounce daemon to do the lookup for us, and for us to do the deleting
272     * after we have received a successful status from the bounce service.
273     * The bounce queue directory blocks are most likely in memory anyway. If
274     * these lookups become a performance problem we will have to build an
275     * in-core cache into the bounce daemon.
276     *
277     * Don't bounce when the bounce log is empty. The bounce process obviously
278     * failed, and the delivery agent will have requested that the message be
279     * deferred.
280     *
281     * Bounces are sent asynchronously to avoid stalling while the cleanup
282     * daemon waits for the qmgr to accept the "new mail" trigger.
283     *
284     * See also code in cleanup_bounce.c.
285     */
286    if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
287	if (st.st_size == 0) {
288	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
289		msg_fatal("remove %s %s: %m",
290			  MAIL_QUEUE_BOUNCE, message->queue_id);
291	} else {
292	    if (msg_verbose)
293		msg_info("%s: bounce %s", myname, message->queue_id);
294	    if (message->verp_delims == 0 || var_verp_bounce_off)
295		abounce_flush(BOUNCE_FLAG_KEEP,
296			      message->queue_name,
297			      message->queue_id,
298			      message->encoding,
299			      message->sender,
300			      message->dsn_envid,
301			      message->dsn_ret,
302			      qmgr_active_done_2_bounce_flush,
303			      (char *) message);
304	    else
305		abounce_flush_verp(BOUNCE_FLAG_KEEP,
306				   message->queue_name,
307				   message->queue_id,
308				   message->encoding,
309				   message->sender,
310				   message->dsn_envid,
311				   message->dsn_ret,
312				   message->verp_delims,
313				   qmgr_active_done_2_bounce_flush,
314				   (char *) message);
315	    return;
316	}
317    }
318
319    /*
320     * Asynchronous processing does not reach this point.
321     */
322    qmgr_active_done_2_generic(message);
323}
324
325/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
326
327static void qmgr_active_done_2_bounce_flush(int status, char *context)
328{
329    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
330
331    /*
332     * Process abounce_flush() status and continue processing.
333     */
334    message->flags |= status;
335    qmgr_active_done_2_generic(message);
336}
337
338/* qmgr_active_done_2_generic - continue processing */
339
340static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
341{
342    const char *path;
343    struct stat st;
344
345    /*
346     * A delivery agent marks a queue file as corrupt by changing its
347     * attributes, and by pretending that delivery was deferred.
348     */
349    if (message->flags
350	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
351	qmgr_active_corrupt(message->queue_id);
352	qmgr_message_free(message);
353	return;
354    }
355
356    /*
357     * If we did not read all recipients from this file, go read some more,
358     * but remember whether some recipients have to be tried again.
359     *
360     * Throwing away queue files seems bad, especially when they made it this
361     * far into the mail system. Therefore we save bad files to a separate
362     * directory for further inspection by a human being.
363     */
364    if (message->rcpt_offset > 0) {
365	if (qmgr_message_realloc(message) == 0) {
366	    qmgr_active_corrupt(message->queue_id);
367	    qmgr_message_free(message);
368	} else {
369	    if (message->refcount == 0)
370		qmgr_active_done(message);	/* recurse for consistency */
371	}
372	return;
373    }
374
375    /*
376     * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
377     * and others not. Depending on what subset of recipients are delivered,
378     * a trace file may or may not be created. Even when the last partial
379     * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
380     * still exist from a previous partial delivery attempt. So as long as
381     * any recipient has NOTIFY=SUCCESS we have to always look for the trace
382     * file and be prepared for the file not to exist.
383     *
384     * See also comments in bounce/bounce_notify_util.c.
385     */
386    if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD))
387	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
388	atrace_flush(message->tflags,
389		     message->queue_name,
390		     message->queue_id,
391		     message->encoding,
392		     message->sender,
393		     message->dsn_envid,
394		     message->dsn_ret,
395		     qmgr_active_done_25_trace_flush,
396		     (char *) message);
397	return;
398    }
399
400    /*
401     * Asynchronous processing does not reach this point.
402     */
403    qmgr_active_done_25_generic(message);
404}
405
406/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
407
408static void qmgr_active_done_25_trace_flush(int status, char *context)
409{
410    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
411
412    /*
413     * Process atrace_flush() status and continue processing.
414     */
415    if (status == 0 && message->tflags_offset)
416	qmgr_message_kill_record(message, message->tflags_offset);
417    message->flags |= status;
418    qmgr_active_done_25_generic(message);
419}
420
421/* qmgr_active_done_25_generic - continue processing */
422
423static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
424{
425    const char *myname = "qmgr_active_done_25_generic";
426
427    /*
428     * If we get to this point we have tried all recipients for this message.
429     * If the message is too old, try to bounce it.
430     *
431     * Bounces are sent asynchronously to avoid stalling while the cleanup
432     * daemon waits for the qmgr to accept the "new mail" trigger.
433     */
434    if (message->flags) {
435	if (event_time() >= message->create_time +
436	    (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
437	    msg_info("%s: from=<%s>, status=expired, returned to sender",
438		     message->queue_id, message->sender);
439	    if (message->verp_delims == 0 || var_verp_bounce_off)
440		adefer_flush(BOUNCE_FLAG_KEEP,
441			     message->queue_name,
442			     message->queue_id,
443			     message->encoding,
444			     message->sender,
445			     message->dsn_envid,
446			     message->dsn_ret,
447			     qmgr_active_done_3_defer_flush,
448			     (char *) message);
449	    else
450		adefer_flush_verp(BOUNCE_FLAG_KEEP,
451				  message->queue_name,
452				  message->queue_id,
453				  message->encoding,
454				  message->sender,
455				  message->dsn_envid,
456				  message->dsn_ret,
457				  message->verp_delims,
458				  qmgr_active_done_3_defer_flush,
459				  (char *) message);
460	    return;
461	} else if (message->warn_time > 0
462		   && event_time() >= message->warn_time - 1) {
463	    if (msg_verbose)
464		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
465	    adefer_warn(BOUNCE_FLAG_KEEP,
466			message->queue_name,
467			message->queue_id,
468			message->encoding,
469			message->sender,
470			message->dsn_envid,
471			message->dsn_ret,
472			qmgr_active_done_3_defer_warn,
473			(char *) message);
474	    return;
475	}
476    }
477
478    /*
479     * Asynchronous processing does not reach this point.
480     */
481    qmgr_active_done_3_generic(message);
482}
483
484/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
485
486static void qmgr_active_done_3_defer_warn(int status, char *context)
487{
488    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
489
490    /*
491     * Process adefer_warn() completion status and continue processing.
492     */
493    if (status == 0)
494	qmgr_message_update_warn(message);
495    qmgr_active_done_3_generic(message);
496}
497
498/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
499
500static void qmgr_active_done_3_defer_flush(int status, char *context)
501{
502    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
503
504    /*
505     * Process adefer_flush() status and continue processing.
506     */
507    message->flags = status;
508    qmgr_active_done_3_generic(message);
509}
510
511/* qmgr_active_done_3_generic - continue processing */
512
513static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
514{
515    const char *myname = "qmgr_active_done_3_generic";
516    int     delay;
517
518    /*
519     * Some recipients need to be tried again. Move the queue file time
520     * stamps into the future by the amount of time that the message is
521     * delayed, and move the message to the deferred queue. Impose minimal
522     * and maximal backoff times.
523     *
524     * Since we look at actual time in queue, not time since last delivery
525     * attempt, backoff times will be distributed. However, we can still see
526     * spikes in delivery activity because the interval between deferred
527     * queue scans is finite.
528     */
529    if (message->flags) {
530	if (message->create_time > 0) {
531	    delay = event_time() - message->create_time;
532	    if (delay > var_max_backoff_time)
533		delay = var_max_backoff_time;
534	    if (delay < var_min_backoff_time)
535		delay = var_min_backoff_time;
536	} else {
537	    delay = var_min_backoff_time;
538	}
539	qmgr_active_defer(message->queue_name, message->queue_id,
540			  MAIL_QUEUE_DEFERRED, delay);
541    }
542
543    /*
544     * All recipients done. Remove the queue file.
545     */
546    else {
547	if (mail_queue_remove(message->queue_name, message->queue_id)) {
548	    if (errno != ENOENT)
549		msg_fatal("%s: remove %s from %s: %m", myname,
550			  message->queue_id, message->queue_name);
551	    msg_warn("%s: remove %s from %s: %m", myname,
552		     message->queue_id, message->queue_name);
553	} else {
554	    /* Same format as logged by postsuper. */
555	    msg_info("%s: removed", message->queue_id);
556	}
557    }
558
559    /*
560     * Finally, delete the in-core message structure.
561     */
562    qmgr_message_free(message);
563}
564
565/* qmgr_active_drain - drain active queue by allocating a delivery process */
566
567void    qmgr_active_drain(void)
568{
569    QMGR_TRANSPORT *transport;
570
571    /*
572     * Allocate one delivery process for every transport with pending mail.
573     * The process allocation completes asynchronously.
574     */
575    while ((transport = qmgr_transport_select()) != 0) {
576	if (msg_verbose)
577	    msg_info("qmgr_active_drain: allocate %s", transport->name);
578	qmgr_transport_alloc(transport, qmgr_deliver);
579    }
580}
581