1/*	$NetBSD: qmgr_active.c,v 1.3 2020/03/18 19:05:19 christos Exp $	*/
2
3/*++
4/* NAME
5/*	qmgr_active 3
6/* SUMMARY
7/*	active queue management
8/* SYNOPSIS
9/*	#include "qmgr.h"
10/*
11/*	void	qmgr_active_feed(scan_info, queue_id)
12/*	QMGR_SCAN *scan_info;
13/*	const char *queue_id;
14/*
15/*	void	qmgr_active_drain()
16/*
17/*	int	qmgr_active_done(message)
18/*	QMGR_MESSAGE *message;
19/* DESCRIPTION
20/*	These functions maintain the active message queue: the set
21/*	of messages that the queue manager is actually working on.
22/*	The active queue is limited in size. Messages are drained
23/*	from the active queue by allocating a delivery process and
24/*	by delivering mail via that process.  Messages leak into the
25/*	active queue only when the active queue is small enough.
26/*	Damaged message files are saved to the "corrupt" directory.
27/*
28/*	qmgr_active_feed() inserts the named message file into
29/*	the active queue. Message files with the wrong name or
30/*	with other wrong properties are skipped but not removed.
31/*	The following queue flags are recognized, other flags being
32/*	ignored:
33/* .IP QMGR_SCAN_ALL
34/*	Examine all queue files. Normally, deferred queue files with
35/*	future time stamps are ignored, and incoming queue files with
36/*	future time stamps are frowned upon.
37/* .PP
38/*	qmgr_active_drain() allocates one delivery process.
39/*	Process allocation is asynchronous. Once the delivery
40/*	process is available, an attempt is made to deliver
41/*	a message via it. Message delivery is asynchronous, too.
42/*
43/*	qmgr_active_done() deals with a message after delivery
44/*	has been tried for all in-core recipients. If the message
45/*	was bounced, a bounce message is sent to the sender, or
46/*	to the Errors-To: address if one was specified.
47/*	If there are more on-file recipients, a new batch of
48/*	in-core recipients is read from the queue file. Otherwise,
49/*	if a delivery agent marked the queue file as corrupt,
50/*	the queue file is moved to the "corrupt" queue (surprise);
51/*	if at least one delivery failed, the message is moved
52/*	to the deferred queue. The time stamps of a deferred queue
53/*	file are set to the nearest wakeup time of its recipient
54/*	sites (if delivery failed due to a problem with a next-hop
55/*	host), are set into the future by the amount of time the
56/*	message was queued (per-message exponential backoff), or are set
57/*	into the future by a minimal backoff time, whichever is more.
58/*	The minimal_backoff_time parameter specifies the minimal
59/*	amount of time between delivery attempts; maximal_backoff_time
60/*	specifies an upper limit.
61/* DIAGNOSTICS
62/*	Fatal: queue file access failures, out of memory.
63/*	Panic: interface violations, internal consistency errors.
64/*	Warnings: corrupt message file. A corrupt message is saved
65/*	to the "corrupt" queue for further inspection.
66/* LICENSE
67/* .ad
68/* .fi
69/*	The Secure Mailer license must be distributed with this software.
70/* AUTHOR(S)
71/*	Wietse Venema
72/*	IBM T.J. Watson Research
73/*	P.O. Box 704
74/*	Yorktown Heights, NY 10598, USA
75/*
76/*	Wietse Venema
77/*	Google, Inc.
78/*	111 8th Avenue
79/*	New York, NY 10011, USA
80/*--*/
81
82/* System library. */
83
84#include <sys_defs.h>
85#include <sys/stat.h>
86#include <dirent.h>
87#include <stdlib.h>
88#include <unistd.h>
89#include <string.h>
90#include <utime.h>
91#include <errno.h>
92
93#ifndef S_IRWXU				/* What? no POSIX system? */
94#define S_IRWXU 0700
95#endif
96
97/* Utility library. */
98
99#include <msg.h>
100#include <events.h>
101#include <mymalloc.h>
102#include <vstream.h>
103#include <warn_stat.h>
104
105/* Global library. */
106
107#include <mail_params.h>
108#include <mail_open_ok.h>
109#include <mail_queue.h>
110#include <recipient_list.h>
111#include <bounce.h>
112#include <defer.h>
113#include <trace.h>
114#include <abounce.h>
115#include <rec_type.h>
116#include <qmgr_user.h>
117#include <info_log_addr_form.h>
118
119/* Application-specific. */
120
121#include "qmgr.h"
122
123 /*
124  * A bunch of call-back routines.
125  */
126static void qmgr_active_done_2_bounce_flush(int, void *);
127static void qmgr_active_done_2_generic(QMGR_MESSAGE *);
128static void qmgr_active_done_25_trace_flush(int, void *);
129static void qmgr_active_done_25_generic(QMGR_MESSAGE *);
130static void qmgr_active_done_3_defer_flush(int, void *);
131static void qmgr_active_done_3_defer_warn(int, void *);
132static void qmgr_active_done_3_generic(QMGR_MESSAGE *);
133
134/* qmgr_active_corrupt - move corrupted file out of the way */
135
136static void qmgr_active_corrupt(const char *queue_id)
137{
138    const char *myname = "qmgr_active_corrupt";
139
140    if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) {
141	if (errno != ENOENT)
142	    msg_fatal("%s: save corrupt file queue %s id %s: %m",
143		      myname, MAIL_QUEUE_ACTIVE, queue_id);
144    } else {
145	msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"",
146		 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT);
147    }
148}
149
150/* qmgr_active_defer - defer queue file */
151
152static void qmgr_active_defer(const char *queue_name, const char *queue_id,
153			              const char *dest_queue, int delay)
154{
155    const char *myname = "qmgr_active_defer";
156    const char *path;
157    struct utimbuf tbuf;
158
159    if (msg_verbose)
160	msg_info("wakeup %s after %ld secs", queue_id, (long) delay);
161
162    tbuf.actime = tbuf.modtime = event_time() + delay;
163    path = mail_queue_path((VSTRING *) 0, queue_name, queue_id);
164    if (utime(path, &tbuf) < 0 && errno != ENOENT)
165	msg_fatal("%s: update %s time stamps: %m", myname, path);
166    if (mail_queue_rename(queue_id, queue_name, dest_queue)) {
167	if (errno != ENOENT)
168	    msg_fatal("%s: rename %s from %s to %s: %m", myname,
169		      queue_id, queue_name, dest_queue);
170	msg_warn("%s: rename %s from %s to %s: %m", myname,
171		 queue_id, queue_name, dest_queue);
172    } else if (msg_verbose) {
173	msg_info("%s: defer %s", myname, queue_id);
174    }
175}
176
177/* qmgr_active_feed - feed one message into active queue */
178
179int     qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
180{
181    const char *myname = "qmgr_active_feed";
182    QMGR_MESSAGE *message;
183    struct stat st;
184    const char *path;
185
186    if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0)
187	msg_panic("%s: bad queue %s", myname, scan_info->queue);
188    if (msg_verbose)
189	msg_info("%s: queue %s", myname, scan_info->queue);
190
191    /*
192     * Make sure this is something we are willing to open.
193     */
194    if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO)
195	return (0);
196
197    if (msg_verbose)
198	msg_info("%s: %s", myname, path);
199
200    /*
201     * Skip files that have time stamps into the future. They need to cool
202     * down. Incoming and deferred files can have future time stamps.
203     */
204    if ((scan_info->flags & QMGR_SCAN_ALL) == 0
205	&& st.st_mtime > time((time_t *) 0) + 1) {
206	if (msg_verbose)
207	    msg_info("%s: skip %s (%ld seconds)", myname, queue_id,
208		     (long) (st.st_mtime - event_time()));
209	return (0);
210    }
211
212    /*
213     * Move the message to the active queue. File access errors are fatal.
214     */
215    if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) {
216	if (errno != ENOENT)
217	    msg_fatal("%s: %s: rename from %s to %s: %m", myname,
218		      queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
219	msg_warn("%s: %s: rename from %s to %s: %m", myname,
220		 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE);
221	return (0);
222    }
223
224    /*
225     * Extract envelope information: sender and recipients. At this point,
226     * mail addresses have been processed by the cleanup service so they
227     * should be in canonical form. Generate requests to deliver this
228     * message.
229     *
230     * Throwing away queue files seems bad, especially when they made it this
231     * far into the mail system. Therefore we save bad files to a separate
232     * directory for further inspection.
233     *
234     * After queue manager restart it is possible that a queue file is still
235     * being delivered. In that case (the file is locked), defer delivery by
236     * a minimal amount of time.
237     */
238#define QMGR_FLUSH_AFTER	(QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP)
239#define MAYBE_FLUSH_AFTER(mode) \
240	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? QMGR_FLUSH_AFTER : 0)
241#define MAYBE_FORCE_EXPIRE(mode) \
242	(((mode) & MAIL_QUEUE_STAT_EXPIRE) ? QMGR_FORCE_EXPIRE : 0)
243#define MAYBE_UPDATE_MODE(mode) \
244	(((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? \
245	(mode) & ~MAIL_QUEUE_STAT_UNTHROTTLE : 0)
246
247    if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
248				      scan_info->flags
249				      | MAYBE_FLUSH_AFTER(st.st_mode)
250				      | MAYBE_FORCE_EXPIRE(st.st_mode),
251				      MAYBE_UPDATE_MODE(st.st_mode))) == 0) {
252	qmgr_active_corrupt(queue_id);
253	return (0);
254    } else if (message == QMGR_MESSAGE_LOCKED) {
255	qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60);
256	return (0);
257    } else {
258
259	/*
260	 * Special case if all recipients were already delivered. Send any
261	 * bounces and clean up.
262	 */
263	if (message->refcount == 0)
264	    qmgr_active_done(message);
265	return (1);
266    }
267}
268
269/* qmgr_active_done - dispose of message after recipients have been tried */
270
271void    qmgr_active_done(QMGR_MESSAGE *message)
272{
273    const char *myname = "qmgr_active_done";
274    struct stat st;
275
276    if (msg_verbose)
277	msg_info("%s: %s", myname, message->queue_id);
278
279    /*
280     * During a previous iteration, an attempt to bounce this message may
281     * have failed, so there may still be a bounce log lying around. XXX By
282     * groping around in the bounce queue, we're trespassing on the bounce
283     * service's territory. But doing so is more robust than depending on the
284     * bounce daemon to do the lookup for us, and for us to do the deleting
285     * after we have received a successful status from the bounce service.
286     * The bounce queue directory blocks are most likely in memory anyway. If
287     * these lookups become a performance problem we will have to build an
288     * in-core cache into the bounce daemon.
289     *
290     * Don't bounce when the bounce log is empty. The bounce process obviously
291     * failed, and the delivery agent will have requested that the message be
292     * deferred.
293     *
294     * Bounces are sent asynchronously to avoid stalling while the cleanup
295     * daemon waits for the qmgr to accept the "new mail" trigger.
296     *
297     * See also code in cleanup_bounce.c.
298     */
299    if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) {
300	if (st.st_size == 0) {
301	    if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id))
302		msg_fatal("remove %s %s: %m",
303			  MAIL_QUEUE_BOUNCE, message->queue_id);
304	} else {
305	    if (msg_verbose)
306		msg_info("%s: bounce %s", myname, message->queue_id);
307	    if (message->verp_delims == 0 || var_verp_bounce_off)
308		abounce_flush(BOUNCE_FLAG_KEEP,
309			      message->queue_name,
310			      message->queue_id,
311			      message->encoding,
312			      message->smtputf8,
313			      message->sender,
314			      message->dsn_envid,
315			      message->dsn_ret,
316			      qmgr_active_done_2_bounce_flush,
317			      (void *) message);
318	    else
319		abounce_flush_verp(BOUNCE_FLAG_KEEP,
320				   message->queue_name,
321				   message->queue_id,
322				   message->encoding,
323				   message->smtputf8,
324				   message->sender,
325				   message->dsn_envid,
326				   message->dsn_ret,
327				   message->verp_delims,
328				   qmgr_active_done_2_bounce_flush,
329				   (void *) message);
330	    return;
331	}
332    }
333
334    /*
335     * Asynchronous processing does not reach this point.
336     */
337    qmgr_active_done_2_generic(message);
338}
339
340/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */
341
342static void qmgr_active_done_2_bounce_flush(int status, void *context)
343{
344    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
345
346    /*
347     * Process abounce_flush() status and continue processing.
348     */
349    message->flags |= status;
350    qmgr_active_done_2_generic(message);
351}
352
353/* qmgr_active_done_2_generic - continue processing */
354
355static void qmgr_active_done_2_generic(QMGR_MESSAGE *message)
356{
357    const char *path;
358    struct stat st;
359
360    /*
361     * A delivery agent marks a queue file as corrupt by changing its
362     * attributes, and by pretending that delivery was deferred.
363     */
364    if (message->flags
365	&& mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) {
366	qmgr_active_corrupt(message->queue_id);
367	qmgr_message_free(message);
368	return;
369    }
370
371    /*
372     * If we did not read all recipients from this file, go read some more,
373     * but remember whether some recipients have to be tried again.
374     *
375     * Throwing away queue files seems bad, especially when they made it this
376     * far into the mail system. Therefore we save bad files to a separate
377     * directory for further inspection by a human being.
378     */
379    if (message->rcpt_offset > 0) {
380	if (qmgr_message_realloc(message) == 0) {
381	    qmgr_active_corrupt(message->queue_id);
382	    qmgr_message_free(message);
383	} else {
384	    if (message->refcount == 0)
385		qmgr_active_done(message);	/* recurse for consistency */
386	}
387	return;
388    }
389
390    /*
391     * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS
392     * and others not. Depending on what subset of recipients are delivered,
393     * a trace file may or may not be created. Even when the last partial
394     * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may
395     * still exist from a previous partial delivery attempt. So as long as
396     * any recipient has NOTIFY=SUCCESS we have to always look for the trace
397     * file and be prepared for the file not to exist.
398     *
399     * See also comments in bounce/bounce_notify_util.c.
400     */
401    if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD
402			    | DEL_REQ_FLAG_REC_DLY_SENT))
403	|| (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) {
404	atrace_flush(message->tflags,
405		     message->queue_name,
406		     message->queue_id,
407		     message->encoding,
408		     message->smtputf8,
409		     message->sender,
410		     message->dsn_envid,
411		     message->dsn_ret,
412		     qmgr_active_done_25_trace_flush,
413		     (void *) message);
414	return;
415    }
416
417    /*
418     * Asynchronous processing does not reach this point.
419     */
420    qmgr_active_done_25_generic(message);
421}
422
423/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */
424
425static void qmgr_active_done_25_trace_flush(int status, void *context)
426{
427    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
428
429    /*
430     * Process atrace_flush() status and continue processing.
431     */
432    if (status == 0 && message->tflags_offset)
433	qmgr_message_kill_record(message, message->tflags_offset);
434    message->flags |= status;
435    qmgr_active_done_25_generic(message);
436}
437
438/* qmgr_active_done_25_generic - continue processing */
439
440static void qmgr_active_done_25_generic(QMGR_MESSAGE *message)
441{
442    const char *myname = "qmgr_active_done_25_generic";
443    const char *expire_status = 0;
444
445    /*
446     * If we get to this point we have tried all recipients for this message.
447     * If the message is too old, try to bounce it.
448     *
449     * Bounces are sent asynchronously to avoid stalling while the cleanup
450     * daemon waits for the qmgr to accept the "new mail" trigger.
451     */
452    if (message->flags) {
453	if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) {
454	    expire_status = "force-expired";
455	} else if (event_time() >= message->create_time +
456	     (*message->sender ? var_max_queue_time : var_dsn_queue_time)) {
457	    expire_status = "expired";
458	} else {
459	    expire_status = 0;
460	}
461	if (expire_status != 0) {
462	    msg_info("%s: from=<%s>, status=%s, returned to sender",
463	      message->queue_id, info_log_addr_form_sender(message->sender),
464		     expire_status);
465	    if (message->verp_delims == 0 || var_verp_bounce_off)
466		adefer_flush(BOUNCE_FLAG_KEEP,
467			     message->queue_name,
468			     message->queue_id,
469			     message->encoding,
470			     message->smtputf8,
471			     message->sender,
472			     message->dsn_envid,
473			     message->dsn_ret,
474			     qmgr_active_done_3_defer_flush,
475			     (void *) message);
476	    else
477		adefer_flush_verp(BOUNCE_FLAG_KEEP,
478				  message->queue_name,
479				  message->queue_id,
480				  message->encoding,
481				  message->smtputf8,
482				  message->sender,
483				  message->dsn_envid,
484				  message->dsn_ret,
485				  message->verp_delims,
486				  qmgr_active_done_3_defer_flush,
487				  (void *) message);
488	    return;
489	} else if (message->warn_time > 0
490		   && event_time() >= message->warn_time - 1) {
491	    if (msg_verbose)
492		msg_info("%s: sending defer warning for %s", myname, message->queue_id);
493	    adefer_warn(BOUNCE_FLAG_KEEP,
494			message->queue_name,
495			message->queue_id,
496			message->encoding,
497			message->smtputf8,
498			message->sender,
499			message->dsn_envid,
500			message->dsn_ret,
501			qmgr_active_done_3_defer_warn,
502			(void *) message);
503	    return;
504	}
505    }
506
507    /*
508     * Asynchronous processing does not reach this point.
509     */
510    qmgr_active_done_3_generic(message);
511}
512
513/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */
514
515static void qmgr_active_done_3_defer_warn(int status, void *context)
516{
517    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
518
519    /*
520     * Process adefer_warn() completion status and continue processing.
521     */
522    if (status == 0)
523	qmgr_message_update_warn(message);
524    qmgr_active_done_3_generic(message);
525}
526
527/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */
528
529static void qmgr_active_done_3_defer_flush(int status, void *context)
530{
531    QMGR_MESSAGE *message = (QMGR_MESSAGE *) context;
532
533    /*
534     * Process adefer_flush() status and continue processing.
535     */
536    message->flags = status;
537    qmgr_active_done_3_generic(message);
538}
539
540/* qmgr_active_done_3_generic - continue processing */
541
542static void qmgr_active_done_3_generic(QMGR_MESSAGE *message)
543{
544    const char *myname = "qmgr_active_done_3_generic";
545    int     delay;
546
547    /*
548     * Some recipients need to be tried again. Move the queue file time
549     * stamps into the future by the amount of time that the message is
550     * delayed, and move the message to the deferred queue. Impose minimal
551     * and maximal backoff times.
552     *
553     * Since we look at actual time in queue, not time since last delivery
554     * attempt, backoff times will be distributed. However, we can still see
555     * spikes in delivery activity because the interval between deferred
556     * queue scans is finite.
557     */
558    if (message->flags) {
559	if (message->create_time > 0) {
560	    delay = event_time() - message->create_time;
561	    if (delay > var_max_backoff_time)
562		delay = var_max_backoff_time;
563	    if (delay < var_min_backoff_time)
564		delay = var_min_backoff_time;
565	} else {
566	    delay = var_min_backoff_time;
567	}
568	qmgr_active_defer(message->queue_name, message->queue_id,
569			  MAIL_QUEUE_DEFERRED, delay);
570    }
571
572    /*
573     * All recipients done. Remove the queue file.
574     */
575    else {
576	if (mail_queue_remove(message->queue_name, message->queue_id)) {
577	    if (errno != ENOENT)
578		msg_fatal("%s: remove %s from %s: %m", myname,
579			  message->queue_id, message->queue_name);
580	    msg_warn("%s: remove %s from %s: %m", myname,
581		     message->queue_id, message->queue_name);
582	} else {
583	    /* Same format as logged by postsuper. */
584	    msg_info("%s: removed", message->queue_id);
585	}
586    }
587
588    /*
589     * Finally, delete the in-core message structure.
590     */
591    qmgr_message_free(message);
592}
593
594/* qmgr_active_drain - drain active queue by allocating a delivery process */
595
596void    qmgr_active_drain(void)
597{
598    QMGR_TRANSPORT *transport;
599
600    /*
601     * Allocate one delivery process for every transport with pending mail.
602     * The process allocation completes asynchronously.
603     */
604    while ((transport = qmgr_transport_select()) != 0) {
605	if (msg_verbose)
606	    msg_info("qmgr_active_drain: allocate %s", transport->name);
607	qmgr_transport_alloc(transport, qmgr_deliver);
608    }
609}
610