1/*++
2/* NAME
3/*	qmgr_deliver 3
4/* SUMMARY
5/*	deliver one per-site queue entry to that site
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	int	qmgr_deliver_concurrency;
10/*
11/*	int	qmgr_deliver(transport, fp)
12/*	QMGR_TRANSPORT *transport;
13/*	VSTREAM	*fp;
14/* DESCRIPTION
15/*	This module implements the client side of the `queue manager
16/*	to delivery agent' protocol. The queue manager uses
17/*	asynchronous I/O so that it can drive multiple delivery
18/*	agents in parallel. Depending on the outcome of a delivery
19/*	attempt, the status of messages, queues and transports is
20/*	updated.
21/*
22/*	qmgr_deliver_concurrency is a global counter that says how
23/*	many delivery processes are in use. This can be used, for
24/*	example, to control the size of the `active' message queue.
25/*
26/*	qmgr_deliver() executes when a delivery process announces its
27/*	availability for the named transport. It arranges for delivery
28/*	of a suitable queue entry.  The \fIfp\fR argument specifies a
29/*	stream that is connected to a delivery process, or a null
30/*	pointer if the transport accepts no connection. Upon completion
31/*	of delivery (successful or not), the stream is closed, so that the
32/*	delivery process is released.
33/* DIAGNOSTICS
34/* LICENSE
35/* .ad
36/* .fi
37/*	The Secure Mailer license must be distributed with this software.
38/* AUTHOR(S)
39/*	Wietse Venema
40/*	IBM T.J. Watson Research
41/*	P.O. Box 704
42/*	Yorktown Heights, NY 10598, USA
43/*
44/*	Preemptive scheduler enhancements:
45/*	Patrik Rak
46/*	Modra 6
47/*	155 00, Prague, Czech Republic
48/*--*/
49
50/* System library. */
51
52#include <sys_defs.h>
53#include <time.h>
54#include <string.h>
55
56/* Utility library. */
57
58#include <msg.h>
59#include <vstring.h>
60#include <vstream.h>
61#include <vstring_vstream.h>
62#include <events.h>
63#include <iostuff.h>
64#include <stringops.h>
65#include <mymalloc.h>
66
67/* Global library. */
68
69#include <mail_queue.h>
70#include <mail_proto.h>
71#include <recipient_list.h>
72#include <mail_params.h>
73#include <deliver_request.h>
74#include <verp_sender.h>
75#include <dsn_util.h>
76#include <dsn_buf.h>
77#include <dsb_scan.h>
78#include <rcpt_print.h>
79
80/* Application-specific. */
81
82#include "qmgr.h"
83
84int     qmgr_deliver_concurrency;
85
86 /*
87  * Message delivery status codes.
88  */
89#define DELIVER_STAT_OK		0	/* all recipients delivered */
90#define DELIVER_STAT_DEFER	1	/* try some recipients later */
91#define DELIVER_STAT_CRASH	2	/* mailer internal problem */
92
93/* qmgr_deliver_initial_reply - retrieve initial delivery process response */
94
95static int qmgr_deliver_initial_reply(VSTREAM *stream)
96{
97    int     stat;
98
99    if (peekfd(vstream_fileno(stream)) < 0) {
100	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
101	return (DELIVER_STAT_CRASH);
102    } else if (attr_scan(stream, ATTR_FLAG_STRICT,
103			 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
104			 ATTR_TYPE_END) != 1) {
105	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
106	return (DELIVER_STAT_CRASH);
107    } else {
108	return (stat ? DELIVER_STAT_DEFER : 0);
109    }
110}
111
112/* qmgr_deliver_final_reply - retrieve final delivery process response */
113
114static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
115{
116    int     stat;
117
118    if (peekfd(vstream_fileno(stream)) < 0) {
119	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
120	return (DELIVER_STAT_CRASH);
121    } else if (attr_scan(stream, ATTR_FLAG_STRICT,
122			 ATTR_TYPE_FUNC, dsb_scan, (void *) dsb,
123			 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat,
124			 ATTR_TYPE_END) != 2) {
125	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
126	return (DELIVER_STAT_CRASH);
127    } else {
128	return (stat ? DELIVER_STAT_DEFER : 0);
129    }
130}
131
132/* qmgr_deliver_send_request - send delivery request to delivery process */
133
134static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
135{
136    RECIPIENT_LIST list = entry->rcpt_list;
137    RECIPIENT *recipient;
138    QMGR_MESSAGE *message = entry->message;
139    VSTRING *sender_buf = 0;
140    MSG_STATS stats;
141    char   *sender;
142    int     flags;
143
144    /*
145     * If variable envelope return path is requested, change prefix+@origin
146     * into prefix+user=domain@origin. Note that with VERP there is only one
147     * recipient per delivery.
148     */
149    if (message->verp_delims == 0) {
150	sender = message->sender;
151    } else {
152	sender_buf = vstring_alloc(100);
153	verp_sender(sender_buf, message->verp_delims,
154		    message->sender, list.info);
155	sender = vstring_str(sender_buf);
156    }
157
158    flags = message->tflags
159	| entry->queue->dflags
160	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
161    (void) QMGR_MSG_STATS(&stats, message);
162    attr_print(stream, ATTR_FLAG_NONE,
163	       ATTR_TYPE_INT, MAIL_ATTR_FLAGS, flags,
164	       ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name,
165	       ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id,
166	       ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset,
167	       ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->cont_length,
168	       ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop,
169	       ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding,
170	       ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender,
171	       ATTR_TYPE_STR, MAIL_ATTR_DSN_ENVID, message->dsn_envid,
172	       ATTR_TYPE_INT, MAIL_ATTR_DSN_RET, message->dsn_ret,
173	       ATTR_TYPE_FUNC, msg_stats_print, (void *) &stats,
174    /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
175	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_NAME, message->client_name,
176	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr,
177	     ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_PORT, message->client_port,
178	     ATTR_TYPE_STR, MAIL_ATTR_LOG_PROTO_NAME, message->client_proto,
179	       ATTR_TYPE_STR, MAIL_ATTR_LOG_HELO_NAME, message->client_helo,
180    /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
181	       ATTR_TYPE_STR, MAIL_ATTR_SASL_METHOD, message->sasl_method,
182	     ATTR_TYPE_STR, MAIL_ATTR_SASL_USERNAME, message->sasl_username,
183	       ATTR_TYPE_STR, MAIL_ATTR_SASL_SENDER, message->sasl_sender,
184    /* XXX Ditto if we want to pass TLS certificate info. */
185	       ATTR_TYPE_STR, MAIL_ATTR_LOG_IDENT, message->log_ident,
186	     ATTR_TYPE_STR, MAIL_ATTR_RWR_CONTEXT, message->rewrite_context,
187	       ATTR_TYPE_INT, MAIL_ATTR_RCPT_COUNT, list.len,
188	       ATTR_TYPE_END);
189    if (sender_buf != 0)
190	vstring_free(sender_buf);
191    for (recipient = list.info; recipient < list.info + list.len; recipient++)
192	attr_print(stream, ATTR_FLAG_NONE,
193		   ATTR_TYPE_FUNC, rcpt_print, (void *) recipient,
194		   ATTR_TYPE_END);
195    if (vstream_fflush(stream) != 0) {
196	msg_warn("write to process (%s): %m", entry->queue->transport->name);
197	return (-1);
198    } else {
199	if (msg_verbose)
200	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
201	return (0);
202    }
203}
204
205/* qmgr_deliver_abort - transport response watchdog */
206
207static void qmgr_deliver_abort(int unused_event, char *context)
208{
209    QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
210    QMGR_QUEUE *queue = entry->queue;
211    QMGR_TRANSPORT *transport = queue->transport;
212    QMGR_MESSAGE *message = entry->message;
213
214    msg_fatal("%s: timeout receiving delivery status from transport: %s",
215	      message->queue_id, transport->name);
216}
217
218/* qmgr_deliver_update - process delivery status report */
219
220static void qmgr_deliver_update(int unused_event, char *context)
221{
222    QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
223    QMGR_QUEUE *queue = entry->queue;
224    QMGR_TRANSPORT *transport = queue->transport;
225    QMGR_MESSAGE *message = entry->message;
226    static DSN_BUF *dsb;
227    int     status;
228
229    /*
230     * Release the delivery agent from a "hot" queue entry.
231     */
232#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
233	event_disable_readwrite(vstream_fileno(entry->stream)); \
234	(void) vstream_fclose(entry->stream); \
235	entry->stream = 0; \
236	qmgr_deliver_concurrency--; \
237    } while (0)
238
239    if (dsb == 0)
240	dsb = dsb_create();
241
242    /*
243     * The message transport has responded. Stop the watchdog timer.
244     */
245    event_cancel_timer(qmgr_deliver_abort, context);
246
247    /*
248     * Retrieve the delivery agent status report. The numerical status code
249     * indicates if delivery should be tried again. The reason text is sent
250     * only when a site should be avoided for a while, so that the queue
251     * manager can log why it does not even try to schedule delivery to the
252     * affected recipients.
253     */
254    status = qmgr_deliver_final_reply(entry->stream, dsb);
255
256    /*
257     * The mail delivery process failed for some reason (although delivery
258     * may have been successful). Back off with this transport type for a
259     * while. Dispose of queue entries for this transport that await
260     * selection (the todo lists). Stay away from queue entries that have
261     * been selected (the busy lists), or we would have dangling pointers.
262     * The queue itself won't go away before we dispose of the current queue
263     * entry.
264     */
265    if (status == DELIVER_STAT_CRASH) {
266	message->flags |= DELIVER_STAT_DEFER;
267#if 0
268	whatsup = concatenate("unknown ", transport->name,
269			      " mail transport error", (char *) 0);
270	qmgr_transport_throttle(transport,
271				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
272	myfree(whatsup);
273#else
274	qmgr_transport_throttle(transport,
275				DSN_SIMPLE(&dsb->dsn, "4.3.0",
276					   "unknown mail transport error"));
277#endif
278	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
279		 transport->name);
280
281	/*
282	 * Assume the worst and write a defer logfile record for each
283	 * recipient. This omission was already present in the first queue
284	 * manager implementation of 199703, and was fixed 200511.
285	 *
286	 * To avoid the synchronous qmgr_defer_recipient() operation for each
287	 * recipient of this queue entry, release the delivery process and
288	 * move the entry back to the todo queue. Let qmgr_defer_transport()
289	 * log the recipient asynchronously if possible, and get out of here.
290	 * Note: if asynchronous logging is not possible,
291	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
292	 * the entry becomes a dangling pointer.
293	 */
294	QMGR_DELIVER_RELEASE_AGENT(entry);
295	qmgr_entry_unselect(entry);
296	qmgr_defer_transport(transport, &dsb->dsn);
297	return;
298    }
299
300    /*
301     * This message must be tried again.
302     *
303     * If we have a problem talking to this site, back off with this site for a
304     * while; dispose of queue entries for this site that await selection
305     * (the todo list); stay away from queue entries that have been selected
306     * (the busy list), or we would have dangling pointers. The queue itself
307     * won't go away before we dispose of the current queue entry.
308     *
309     * XXX Caution: DSN_COPY() will panic on empty status or reason.
310     */
311#define SUSPENDED	"delivery temporarily suspended: "
312
313    if (status == DELIVER_STAT_DEFER) {
314	message->flags |= DELIVER_STAT_DEFER;
315	if (VSTRING_LEN(dsb->status)) {
316	    /* Sanitize the DSN status/reason from the delivery agent. */
317	    if (!dsn_valid(vstring_str(dsb->status)))
318		vstring_strcpy(dsb->status, "4.0.0");
319	    if (VSTRING_LEN(dsb->reason) == 0)
320		vstring_strcpy(dsb->reason, "unknown error");
321	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
322	    if (QMGR_QUEUE_READY(queue)) {
323		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
324		if (QMGR_QUEUE_THROTTLED(queue))
325		    qmgr_defer_todo(queue, &dsb->dsn);
326	    }
327	}
328    }
329
330    /*
331     * No problems detected. Mark the transport and queue as alive. The queue
332     * itself won't go away before we dispose of the current queue entry.
333     */
334    if (status != DELIVER_STAT_CRASH && VSTRING_LEN(dsb->reason) == 0) {
335	qmgr_transport_unthrottle(transport);
336	qmgr_queue_unthrottle(queue);
337    }
338
339    /*
340     * Release the delivery process, and give some other queue entry a chance
341     * to be delivered. When all recipients for a message have been tried,
342     * decide what to do next with this message: defer, bounce, delete.
343     */
344    QMGR_DELIVER_RELEASE_AGENT(entry);
345    qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
346}
347
348/* qmgr_deliver - deliver one per-site queue entry */
349
350void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
351{
352    QMGR_ENTRY *entry;
353    DSN     dsn;
354
355    /*
356     * Find out if this delivery process is really available. Once elected,
357     * the delivery process is supposed to express its happiness. If there is
358     * a problem, wipe the pending deliveries for this transport. This
359     * routine runs in response to an external event, so it does not run
360     * while some other queue manipulation is happening.
361     */
362    if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
363#if 0
364	whatsup = concatenate(transport->name,
365			      " mail transport unavailable", (char *) 0);
366	qmgr_transport_throttle(transport,
367				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
368	myfree(whatsup);
369#else
370	qmgr_transport_throttle(transport,
371				DSN_SIMPLE(&dsn, "4.3.0",
372					   "mail transport unavailable"));
373#endif
374	qmgr_defer_transport(transport, &dsn);
375	if (stream)
376	    (void) vstream_fclose(stream);
377	return;
378    }
379
380    /*
381     * Find a suitable queue entry. Things may have changed since this
382     * transport was allocated. If no suitable entry is found,
383     * unceremoniously disconnect from the delivery process. The delivery
384     * agent request reading routine is prepared for the queue manager to
385     * change its mind for no apparent reason.
386     */
387    if ((entry = qmgr_job_entry_select(transport)) == 0) {
388	(void) vstream_fclose(stream);
389	return;
390    }
391
392    /*
393     * Send the queue file info and recipient info to the delivery process.
394     * If there is a problem, wipe the pending deliveries for this transport.
395     * This routine runs in response to an external event, so it does not run
396     * while some other queue manipulation is happening.
397     */
398    if (qmgr_deliver_send_request(entry, stream) < 0) {
399	qmgr_entry_unselect(entry);
400#if 0
401	whatsup = concatenate(transport->name,
402			      " mail transport unavailable", (char *) 0);
403	qmgr_transport_throttle(transport,
404				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
405	myfree(whatsup);
406#else
407	qmgr_transport_throttle(transport,
408				DSN_SIMPLE(&dsn, "4.3.0",
409					   "mail transport unavailable"));
410#endif
411	qmgr_defer_transport(transport, &dsn);
412	/* warning: entry may be a dangling pointer here */
413	(void) vstream_fclose(stream);
414	return;
415    }
416
417    /*
418     * If we get this far, go wait for the delivery status report.
419     */
420    qmgr_deliver_concurrency++;
421    entry->stream = stream;
422    event_enable_read(vstream_fileno(stream),
423		      qmgr_deliver_update, (char *) entry);
424
425    /*
426     * Guard against broken systems.
427     */
428    event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout);
429}
430