1/*++
2/* NAME
3/*	qmgr_message 3
4/* SUMMARY
5/*	in-core message structures
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	int	qmgr_message_count;
10/*	int	qmgr_recipient_count;
11/*
12/*	QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode)
13/*	const char *class;
14/*	const char *name;
15/*	int	qflags;
16/*	mode_t	mode;
17/*
18/*	QMGR_MESSAGE *qmgr_message_realloc(message)
19/*	QMGR_MESSAGE *message;
20/*
21/*	void	qmgr_message_free(message)
22/*	QMGR_MESSAGE *message;
23/*
24/*	void	qmgr_message_update_warn(message)
25/*	QMGR_MESSAGE *message;
26/*
27/*	void	qmgr_message_kill_record(message, offset)
28/*	QMGR_MESSAGE *message;
29/*	long	offset;
30/* DESCRIPTION
31/*	This module performs en-gross operations on queue messages.
32/*
33/*	qmgr_message_count is a global counter for the total number
34/*	of in-core message structures (i.e. the total size of the
35/*	`active' message queue).
36/*
37/*	qmgr_recipient_count is a global counter for the total number
38/*	of in-core recipient structures (i.e. the sum of all recipients
39/*	in all in-core message structures).
40/*
41/*	qmgr_message_alloc() creates an in-core message structure
42/*	with sender and recipient information taken from the named queue
43/*	file. A null result means the queue file could not be read or
44/*	that the queue file contained incorrect information. A result
45/*	QMGR_MESSAGE_LOCKED means delivery must be deferred. The number
46/*	of recipients read from a queue file is limited by the global
47/*	var_qmgr_rcpt_limit configuration parameter. When the limit
48/*	is reached, the \fIrcpt_offset\fR structure member is set to
49/*	the position where the read was terminated. Recipients are
50/*	run through the resolver, and are assigned to destination
51/*	queues. Recipients that cannot be assigned are deferred or
52/*	bounced. Mail that has bounced twice is silently absorbed.
53/*	A non-zero mode means change the queue file permissions.
54/*
55/*	qmgr_message_realloc() resumes reading recipients from the queue
56/*	file, and updates the recipient list and \fIrcpt_offset\fR message
57/*	structure members. A null result means that the file could not be
58/*	read or that the file contained incorrect information. Recipient
59/*	limit imposed this time is based on the position of the message
60/*	job(s) on corresponding transport job list(s). It's considered
61/*	an error to call this when the recipient slots can't be allocated.
62/*
63/*	qmgr_message_free() destroys an in-core message structure and makes
64/*	the resources available for reuse. It is an error to destroy
65/*	a message structure that is still referenced by queue entry structures.
66/*
67/*	qmgr_message_update_warn() takes a closed message, opens it, updates
68/*	the warning field, and closes it again.
69/*
70/*	qmgr_message_kill_record() takes a closed message, opens it, updates
71/*	the record type at the given offset to "killed", and closes the file.
72/*	A killed envelope record is ignored. Killed records are not allowed
73/*	inside the message content.
74/* DIAGNOSTICS
75/*	Warnings: malformed message file. Fatal errors: out of memory.
76/* SEE ALSO
77/*	envelope(3) message envelope parser
78/* LICENSE
79/* .ad
80/* .fi
81/*	The Secure Mailer license must be distributed with this software.
82/* AUTHOR(S)
83/*	Wietse Venema
84/*	IBM T.J. Watson Research
85/*	P.O. Box 704
86/*	Yorktown Heights, NY 10598, USA
87/*
88/*	Preemptive scheduler enhancements:
89/*	Patrik Rak
90/*	Modra 6
91/*	155 00, Prague, Czech Republic
92/*--*/
93
94/* System library. */
95
96#include <sys_defs.h>
97#include <sys/stat.h>
98#include <stdlib.h>
99#include <stdio.h>			/* sscanf() */
100#include <fcntl.h>
101#include <errno.h>
102#include <unistd.h>
103#include <string.h>
104#include <ctype.h>
105
106#ifdef STRCASECMP_IN_STRINGS_H
107#include <strings.h>
108#endif
109
110/* Utility library. */
111
112#include <msg.h>
113#include <mymalloc.h>
114#include <vstring.h>
115#include <vstream.h>
116#include <split_at.h>
117#include <valid_hostname.h>
118#include <argv.h>
119#include <stringops.h>
120#include <myflock.h>
121#include <sane_time.h>
122
123/* Global library. */
124
125#include <dict.h>
126#include <mail_queue.h>
127#include <mail_params.h>
128#include <canon_addr.h>
129#include <record.h>
130#include <rec_type.h>
131#include <sent.h>
132#include <deliver_completed.h>
133#include <opened.h>
134#include <verp_sender.h>
135#include <mail_proto.h>
136#include <qmgr_user.h>
137#include <split_addr.h>
138#include <dsn_mask.h>
139#include <rec_attr_map.h>
140
141/* Client stubs. */
142
143#include <rewrite_clnt.h>
144#include <resolve_clnt.h>
145
146/* Application-specific. */
147
148#include "qmgr.h"
149
150int     qmgr_message_count;
151int     qmgr_recipient_count;
152
153/* qmgr_message_create - create in-core message structure */
154
155static QMGR_MESSAGE *qmgr_message_create(const char *queue_name,
156				           const char *queue_id, int qflags)
157{
158    QMGR_MESSAGE *message;
159
160    message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE));
161    qmgr_message_count++;
162    message->flags = 0;
163    message->qflags = qflags;
164    message->tflags = 0;
165    message->tflags_offset = 0;
166    message->rflags = QMGR_READ_FLAG_DEFAULT;
167    message->fp = 0;
168    message->refcount = 0;
169    message->single_rcpt = 0;
170    message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0;
171    message->create_time = 0;
172    GETTIMEOFDAY(&message->active_time);
173    message->queued_time = sane_time();
174    message->refill_time = 0;
175    message->data_offset = 0;
176    message->queue_id = mystrdup(queue_id);
177    message->queue_name = mystrdup(queue_name);
178    message->encoding = 0;
179    message->sender = 0;
180    message->dsn_envid = 0;
181    message->dsn_ret = 0;
182    message->filter_xport = 0;
183    message->inspect_xport = 0;
184    message->redirect_addr = 0;
185    message->data_size = 0;
186    message->cont_length = 0;
187    message->warn_offset = 0;
188    message->warn_time = 0;
189    message->rcpt_offset = 0;
190    message->verp_delims = 0;
191    message->client_name = 0;
192    message->client_addr = 0;
193    message->client_port = 0;
194    message->client_proto = 0;
195    message->client_helo = 0;
196    message->sasl_method = 0;
197    message->sasl_username = 0;
198    message->sasl_sender = 0;
199    message->log_ident = 0;
200    message->rewrite_context = 0;
201    recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
202    message->rcpt_count = 0;
203    message->rcpt_limit = var_qmgr_msg_rcpt_limit;
204    message->rcpt_unread = 0;
205    QMGR_LIST_INIT(message->job_list);
206    return (message);
207}
208
209/* qmgr_message_close - close queue file */
210
211static void qmgr_message_close(QMGR_MESSAGE *message)
212{
213    vstream_fclose(message->fp);
214    message->fp = 0;
215}
216
217/* qmgr_message_open - open queue file */
218
219static int qmgr_message_open(QMGR_MESSAGE *message)
220{
221
222    /*
223     * Sanity check.
224     */
225    if (message->fp)
226	msg_panic("%s: queue file is open", message->queue_id);
227
228    /*
229     * Open this queue file. Skip files that we cannot open. Back off when
230     * the system appears to be running out of resources.
231     */
232    if ((message->fp = mail_queue_open(message->queue_name,
233				       message->queue_id,
234				       O_RDWR, 0)) == 0) {
235	if (errno != ENOENT)
236	    msg_fatal("open %s %s: %m", message->queue_name, message->queue_id);
237	msg_warn("open %s %s: %m", message->queue_name, message->queue_id);
238	return (-1);
239    }
240    return (0);
241}
242
243/* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */
244
245static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message)
246{
247    VSTRING *buf;
248    long    orig_offset, extra_offset;
249    int     rec_type;
250    char   *start;
251
252    /*
253     * Initialize. No early returns or we have a memory leak.
254     */
255    buf = vstring_alloc(100);
256    if ((orig_offset = vstream_ftell(message->fp)) < 0)
257	msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
258
259    /*
260     * Rewind to the very beginning to make sure we see all records.
261     */
262    if (vstream_fseek(message->fp, 0, SEEK_SET) < 0)
263	msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
264
265    /*
266     * Scan through the old style queue file. Count the total number of
267     * recipients and find the data/extra sections offsets. Note that the new
268     * queue files require that data_size equals extra_offset - data_offset,
269     * so we set data_size to this as well and ignore the size record itself
270     * completely.
271     */
272    message->rcpt_unread = 0;
273    for (;;) {
274	rec_type = rec_get(message->fp, buf, 0);
275	if (rec_type <= 0)
276	    /* Report missing end record later. */
277	    break;
278	start = vstring_str(buf);
279	if (msg_verbose > 1)
280	    msg_info("old-style scan record %c %s", rec_type, start);
281	if (rec_type == REC_TYPE_END)
282	    break;
283	if (rec_type == REC_TYPE_DONE
284	    || rec_type == REC_TYPE_RCPT
285	    || rec_type == REC_TYPE_DRCP) {
286	    message->rcpt_unread++;
287	    continue;
288	}
289	if (rec_type == REC_TYPE_MESG) {
290	    if (message->data_offset == 0) {
291		if ((message->data_offset = vstream_ftell(message->fp)) < 0)
292		    msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
293		if ((extra_offset = atol(start)) <= message->data_offset)
294		    msg_fatal("bad extra offset %s file %s",
295			      start, VSTREAM_PATH(message->fp));
296		if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0)
297		    msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
298		message->data_size = extra_offset - message->data_offset;
299	    }
300	    continue;
301	}
302    }
303
304    /*
305     * Clean up.
306     */
307    if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0)
308	msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
309    vstring_free(buf);
310
311    /*
312     * Sanity checks. Verify that all required information was found,
313     * including the queue file end marker.
314     */
315    if (message->data_offset == 0 || rec_type != REC_TYPE_END)
316	msg_fatal("%s: envelope records out of order", message->queue_id);
317}
318
319/* qmgr_message_read - read envelope records */
320
321static int qmgr_message_read(QMGR_MESSAGE *message)
322{
323    VSTRING *buf;
324    int     rec_type;
325    long    curr_offset;
326    long    save_offset = message->rcpt_offset;	/* save a flag */
327    int     save_unread = message->rcpt_unread;	/* save a count */
328    char   *start;
329    int     recipient_limit;
330    const char *error_text;
331    char   *name;
332    char   *value;
333    char   *orig_rcpt = 0;
334    int     count;
335    int     dsn_notify = 0;
336    char   *dsn_orcpt = 0;
337    int     n;
338    int     have_log_client_attr = 0;
339
340    /*
341     * Initialize. No early returns or we have a memory leak.
342     */
343    buf = vstring_alloc(100);
344
345    /*
346     * If we re-open this file, skip over on-file recipient records that we
347     * already looked at, and refill the in-core recipient address list.
348     *
349     * For the first time, the message recipient limit is calculated from the
350     * global recipient limit. This is to avoid reading little recipients
351     * when the active queue is near empty. When the queue becomes full, only
352     * the necessary amount is read in core. Such priming is necessary
353     * because there are no message jobs yet.
354     *
355     * For the next time, the recipient limit is based solely on the message
356     * jobs' positions in the job lists and/or job stacks.
357     */
358    if (message->rcpt_offset) {
359	if (message->rcpt_list.len)
360	    msg_panic("%s: recipient list not empty on recipient reload",
361		      message->queue_id);
362	if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0)
363	    msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
364	message->rcpt_offset = 0;
365	recipient_limit = message->rcpt_limit - message->rcpt_count;
366    } else {
367	recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count;
368	if (recipient_limit < message->rcpt_limit)
369	    recipient_limit = message->rcpt_limit;
370    }
371    /* Keep interrupt latency in check. */
372    if (recipient_limit > 5000)
373	recipient_limit = 5000;
374    if (recipient_limit <= 0)
375	msg_panic("%s: no recipient slots available", message->queue_id);
376    if (msg_verbose)
377	msg_info("%s: recipient limit %d", message->queue_id, recipient_limit);
378
379    /*
380     * Read envelope records. XXX Rely on the front-end programs to enforce
381     * record size limits. Read up to recipient_limit recipients from the
382     * queue file, to protect against memory exhaustion. Recipient records
383     * may appear before or after the message content, so we keep reading
384     * from the queue file until we have enough recipients (rcpt_offset != 0)
385     * and until we know all the non-recipient information.
386     *
387     * Note that the total recipient count record is accurate only for fresh
388     * queue files. After some of the recipients are marked as done and the
389     * queue file is deferred, it can be used as upper bound estimate only.
390     * Fortunately, this poses no major problem on the scheduling algorithm,
391     * as the only impact is that the already deferred messages are not
392     * chosen by qmgr_job_candidate() as often as they could.
393     *
394     * On the first open, we must examine all non-recipient records.
395     *
396     * Optimization: when we know that recipient records are not mixed with
397     * non-recipient records, as is typical with mailing list mail, then we
398     * can avoid having to examine all the queue file records before we can
399     * start deliveries. This avoids some file system thrashing with huge
400     * mailing lists.
401     */
402    for (;;) {
403	if ((curr_offset = vstream_ftell(message->fp)) < 0)
404	    msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp));
405	if (curr_offset == message->data_offset && curr_offset > 0) {
406	    if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0)
407		msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
408	    curr_offset += message->data_size;
409	}
410	rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE);
411	start = vstring_str(buf);
412	if (msg_verbose > 1)
413	    msg_info("record %c %s", rec_type, start);
414	if (rec_type == REC_TYPE_PTR) {
415	    if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR)
416		break;
417	    /* Need to update curr_offset after pointer jump. */
418	    continue;
419	}
420	if (rec_type <= 0) {
421	    msg_warn("%s: message rejected: missing end record",
422		     message->queue_id);
423	    break;
424	}
425	if (rec_type == REC_TYPE_END) {
426	    message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
427	    break;
428	}
429
430	/*
431	 * Map named attributes to pseudo record types, so that we don't have
432	 * to pollute the queue file with records that are incompatible with
433	 * past Postfix versions. Preferably, people should be able to back
434	 * out from an upgrade without losing mail.
435	 */
436	if (rec_type == REC_TYPE_ATTR) {
437	    if ((error_text = split_nameval(start, &name, &value)) != 0) {
438		msg_warn("%s: ignoring bad attribute: %s: %.200s",
439			 message->queue_id, error_text, start);
440		rec_type = REC_TYPE_ERROR;
441		break;
442	    }
443	    if ((n = rec_attr_map(name)) != 0) {
444		start = value;
445		rec_type = n;
446	    }
447	}
448
449	/*
450	 * Process recipient records.
451	 */
452	if (rec_type == REC_TYPE_RCPT) {
453	    /* See also below for code setting orig_rcpt etc. */
454	    if (message->rcpt_offset == 0) {
455		message->rcpt_unread--;
456		recipient_list_add(&message->rcpt_list, curr_offset,
457				   dsn_orcpt ? dsn_orcpt : "",
458				   dsn_notify ? dsn_notify : 0,
459				   orig_rcpt ? orig_rcpt : "", start);
460		if (dsn_orcpt) {
461		    myfree(dsn_orcpt);
462		    dsn_orcpt = 0;
463		}
464		if (orig_rcpt) {
465		    myfree(orig_rcpt);
466		    orig_rcpt = 0;
467		}
468		if (dsn_notify)
469		    dsn_notify = 0;
470		if (message->rcpt_list.len >= recipient_limit) {
471		    if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0)
472			msg_fatal("vstream_ftell %s: %m",
473				  VSTREAM_PATH(message->fp));
474		    if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
475			/* We already examined all non-recipient records. */
476			break;
477		    if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER)
478			/* Examine all remaining non-recipient records. */
479			continue;
480		    /* Optimizations for "pure recipient" record sections. */
481		    if (curr_offset > message->data_offset) {
482			/* We already examined all non-recipient records. */
483			message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT;
484			break;
485		    }
486		    /* Examine non-recipient records in extracted segment. */
487		    if (vstream_fseek(message->fp, message->data_offset
488				      + message->data_size, SEEK_SET) < 0)
489			msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp));
490		    continue;
491		}
492	    }
493	    continue;
494	}
495	if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) {
496	    if (message->rcpt_offset == 0) {
497		message->rcpt_unread--;
498		if (dsn_orcpt) {
499		    myfree(dsn_orcpt);
500		    dsn_orcpt = 0;
501		}
502		if (orig_rcpt) {
503		    myfree(orig_rcpt);
504		    orig_rcpt = 0;
505		}
506		if (dsn_notify)
507		    dsn_notify = 0;
508	    }
509	    continue;
510	}
511	if (rec_type == REC_TYPE_DSN_ORCPT) {
512	    /* See also above for code clearing dsn_orcpt. */
513	    if (dsn_orcpt != 0) {
514		msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>",
515			 message->queue_id, dsn_orcpt);
516		myfree(dsn_orcpt);
517		dsn_orcpt = 0;
518	    }
519	    if (message->rcpt_offset == 0)
520		dsn_orcpt = mystrdup(start);
521	    continue;
522	}
523	if (rec_type == REC_TYPE_DSN_NOTIFY) {
524	    /* See also above for code clearing dsn_notify. */
525	    if (dsn_notify != 0) {
526		msg_warn("%s: ignoring out-of-order DSN notify flags <%d>",
527			 message->queue_id, dsn_notify);
528		dsn_notify = 0;
529	    }
530	    if (message->rcpt_offset == 0) {
531		if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n))
532		    msg_warn("%s: ignoring malformed DSN notify flags <%.200s>",
533			     message->queue_id, start);
534		else
535		    dsn_notify = n;
536		continue;
537	    }
538	}
539	if (rec_type == REC_TYPE_ORCP) {
540	    /* See also above for code clearing orig_rcpt. */
541	    if (orig_rcpt != 0) {
542		msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
543			 message->queue_id, orig_rcpt);
544		myfree(orig_rcpt);
545		orig_rcpt = 0;
546	    }
547	    if (message->rcpt_offset == 0)
548		orig_rcpt = mystrdup(start);
549	    continue;
550	}
551
552	/*
553	 * Process non-recipient records.
554	 */
555	if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT)
556	    /* We already examined all non-recipient records. */
557	    continue;
558	if (rec_type == REC_TYPE_SIZE) {
559	    if (message->data_offset == 0) {
560		if ((count = sscanf(start, "%ld %ld %d %d %ld",
561				 &message->data_size, &message->data_offset,
562				    &message->rcpt_unread, &message->rflags,
563				    &message->cont_length)) >= 3) {
564		    /* Postfix >= 1.0 (a.k.a. 20010228). */
565		    if (message->data_offset <= 0 || message->data_size <= 0) {
566			msg_warn("%s: invalid size record: %.100s",
567				 message->queue_id, start);
568			rec_type = REC_TYPE_ERROR;
569			break;
570		    }
571		    if (message->rflags & ~QMGR_READ_FLAG_USER) {
572			msg_warn("%s: invalid flags in size record: %.100s",
573				 message->queue_id, start);
574			rec_type = REC_TYPE_ERROR;
575			break;
576		    }
577		} else if (count == 1) {
578		    /* Postfix < 1.0 (a.k.a. 20010228). */
579		    qmgr_message_oldstyle_scan(message);
580		} else {
581		    /* Can't happen. */
582		    msg_warn("%s: message rejected: weird size record",
583			     message->queue_id);
584		    rec_type = REC_TYPE_ERROR;
585		    break;
586		}
587	    }
588	    /* Postfix < 2.4 compatibility. */
589	    if (message->cont_length == 0) {
590		message->cont_length = message->data_size;
591	    } else if (message->cont_length < 0) {
592		msg_warn("%s: invalid size record: %.100s",
593			 message->queue_id, start);
594		rec_type = REC_TYPE_ERROR;
595		break;
596	    }
597	    continue;
598	}
599	if (rec_type == REC_TYPE_TIME) {
600	    if (message->arrival_time.tv_sec == 0)
601		REC_TYPE_TIME_SCAN(start, message->arrival_time);
602	    continue;
603	}
604	if (rec_type == REC_TYPE_CTIME) {
605	    if (message->create_time == 0)
606		message->create_time = atol(start);
607	    continue;
608	}
609	if (rec_type == REC_TYPE_FILT) {
610	    if (message->filter_xport != 0)
611		myfree(message->filter_xport);
612	    message->filter_xport = mystrdup(start);
613	    continue;
614	}
615	if (rec_type == REC_TYPE_INSP) {
616	    if (message->inspect_xport != 0)
617		myfree(message->inspect_xport);
618	    message->inspect_xport = mystrdup(start);
619	    continue;
620	}
621	if (rec_type == REC_TYPE_RDR) {
622	    if (message->redirect_addr != 0)
623		myfree(message->redirect_addr);
624	    message->redirect_addr = mystrdup(start);
625	    continue;
626	}
627	if (rec_type == REC_TYPE_FROM) {
628	    if (message->sender == 0) {
629		message->sender = mystrdup(start);
630		opened(message->queue_id, message->sender,
631		       message->cont_length, message->rcpt_unread,
632		       "queue %s", message->queue_name);
633	    }
634	    continue;
635	}
636	if (rec_type == REC_TYPE_DSN_ENVID) {
637	    if (message->dsn_envid == 0)
638		message->dsn_envid = mystrdup(start);
639	}
640	if (rec_type == REC_TYPE_DSN_RET) {
641	    if (message->dsn_ret == 0) {
642		if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n))
643		    msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s",
644			     message->queue_id, start);
645		else
646		    message->dsn_ret = n;
647	    }
648	}
649	if (rec_type == REC_TYPE_ATTR) {
650	    /* Allow extra segment to override envelope segment info. */
651	    if (strcmp(name, MAIL_ATTR_ENCODING) == 0) {
652		if (message->encoding != 0)
653		    myfree(message->encoding);
654		message->encoding = mystrdup(value);
655	    }
656
657	    /*
658	     * Backwards compatibility. Before Postfix 2.3, the logging
659	     * attributes were called client_name, etc. Now they are called
660	     * log_client_name. etc., and client_name is used for the actual
661	     * client information. To support old queue files we accept both
662	     * names for the purpose of logging; the new name overrides the
663	     * old one.
664	     *
665	     * XXX Do not use the "legacy" client_name etc. attribute values for
666	     * initializing the logging attributes, when this file already
667	     * contains the "modern" log_client_name etc. logging attributes.
668	     * Otherwise, logging attributes that are not present in the
669	     * queue file would be set with information from the real client.
670	     */
671	    else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) {
672		if (have_log_client_attr == 0 && message->client_name == 0)
673		    message->client_name = mystrdup(value);
674	    } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) {
675		if (have_log_client_attr == 0 && message->client_addr == 0)
676		    message->client_addr = mystrdup(value);
677	    } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) {
678		if (have_log_client_attr == 0 && message->client_port == 0)
679		    message->client_port = mystrdup(value);
680	    } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) {
681		if (have_log_client_attr == 0 && message->client_proto == 0)
682		    message->client_proto = mystrdup(value);
683	    } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) {
684		if (have_log_client_attr == 0 && message->client_helo == 0)
685		    message->client_helo = mystrdup(value);
686	    }
687	    /* Original client attributes. */
688	    else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) {
689		if (message->client_name != 0)
690		    myfree(message->client_name);
691		message->client_name = mystrdup(value);
692		have_log_client_attr = 1;
693	    } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) {
694		if (message->client_addr != 0)
695		    myfree(message->client_addr);
696		message->client_addr = mystrdup(value);
697		have_log_client_attr = 1;
698	    } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) {
699		if (message->client_port != 0)
700		    myfree(message->client_port);
701		message->client_port = mystrdup(value);
702		have_log_client_attr = 1;
703	    } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) {
704		if (message->client_proto != 0)
705		    myfree(message->client_proto);
706		message->client_proto = mystrdup(value);
707		have_log_client_attr = 1;
708	    } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) {
709		if (message->client_helo != 0)
710		    myfree(message->client_helo);
711		message->client_helo = mystrdup(value);
712		have_log_client_attr = 1;
713	    } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) {
714		if (message->sasl_method == 0)
715		    message->sasl_method = mystrdup(value);
716		else
717		    msg_warn("%s: ignoring multiple %s attribute: %s",
718			   message->queue_id, MAIL_ATTR_SASL_METHOD, value);
719	    } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) {
720		if (message->sasl_username == 0)
721		    message->sasl_username = mystrdup(value);
722		else
723		    msg_warn("%s: ignoring multiple %s attribute: %s",
724			 message->queue_id, MAIL_ATTR_SASL_USERNAME, value);
725	    } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) {
726		if (message->sasl_sender == 0)
727		    message->sasl_sender = mystrdup(value);
728		else
729		    msg_warn("%s: ignoring multiple %s attribute: %s",
730			   message->queue_id, MAIL_ATTR_SASL_SENDER, value);
731	    } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) {
732		if (message->log_ident == 0)
733		    message->log_ident = mystrdup(value);
734		else
735		    msg_warn("%s: ignoring multiple %s attribute: %s",
736			     message->queue_id, MAIL_ATTR_LOG_IDENT, value);
737	    } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) {
738		if (message->rewrite_context == 0)
739		    message->rewrite_context = mystrdup(value);
740		else
741		    msg_warn("%s: ignoring multiple %s attribute: %s",
742			   message->queue_id, MAIL_ATTR_RWR_CONTEXT, value);
743	    }
744
745	    /*
746	     * Optional tracing flags (verify, sendmail -v, sendmail -bv).
747	     * This record is killed after a trace logfile report is sent and
748	     * after the logfile is deleted.
749	     */
750	    else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) {
751		message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value));
752		if (message->tflags == DEL_REQ_FLAG_RECORD)
753		    message->tflags_offset = curr_offset;
754		else
755		    message->tflags_offset = 0;
756	    }
757	    continue;
758	}
759	if (rec_type == REC_TYPE_WARN) {
760	    if (message->warn_offset == 0) {
761		message->warn_offset = curr_offset;
762		REC_TYPE_WARN_SCAN(start, message->warn_time);
763	    }
764	    continue;
765	}
766	if (rec_type == REC_TYPE_VERP) {
767	    if (message->verp_delims == 0) {
768		if (message->sender == 0 || message->sender[0] == 0) {
769		    msg_warn("%s: ignoring VERP request for null sender",
770			     message->queue_id);
771		} else if (verp_delims_verify(start) != 0) {
772		    msg_warn("%s: ignoring bad VERP request: \"%.100s\"",
773			     message->queue_id, start);
774		} else {
775		    if (msg_verbose)
776			msg_info("%s: enabling VERP for sender \"%.100s\"",
777				 message->queue_id, message->sender);
778		    message->single_rcpt = 1;
779		    message->verp_delims = mystrdup(start);
780		}
781	    }
782	    continue;
783	}
784    }
785
786    /*
787     * Grr.
788     */
789    if (dsn_orcpt != 0) {
790	if (rec_type > 0)
791	    msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>",
792		     message->queue_id, dsn_orcpt);
793	myfree(dsn_orcpt);
794    }
795    if (orig_rcpt != 0) {
796	if (rec_type > 0)
797	    msg_warn("%s: ignoring out-of-order original recipient <%.200s>",
798		     message->queue_id, orig_rcpt);
799	myfree(orig_rcpt);
800    }
801
802    /*
803     * Remember when we have read the last recipient batch. Note that we do
804     * it here after reading as reading might have used considerable amount
805     * of time.
806     */
807    message->refill_time = sane_time();
808
809    /*
810     * Avoid clumsiness elsewhere in the program. When sending data across an
811     * IPC channel, sending an empty string is more convenient than sending a
812     * null pointer.
813     */
814    if (message->dsn_envid == 0)
815	message->dsn_envid = mystrdup("");
816    if (message->encoding == 0)
817	message->encoding = mystrdup(MAIL_ATTR_ENC_NONE);
818    if (message->client_name == 0)
819	message->client_name = mystrdup("");
820    if (message->client_addr == 0)
821	message->client_addr = mystrdup("");
822    if (message->client_port == 0)
823	message->client_port = mystrdup("");
824    if (message->client_proto == 0)
825	message->client_proto = mystrdup("");
826    if (message->client_helo == 0)
827	message->client_helo = mystrdup("");
828    if (message->sasl_method == 0)
829	message->sasl_method = mystrdup("");
830    if (message->sasl_username == 0)
831	message->sasl_username = mystrdup("");
832    if (message->sasl_sender == 0)
833	message->sasl_sender = mystrdup("");
834    if (message->log_ident == 0)
835	message->log_ident = mystrdup("");
836    if (message->rewrite_context == 0)
837	message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL);
838    /* Postfix < 2.3 compatibility. */
839    if (message->create_time == 0)
840	message->create_time = message->arrival_time.tv_sec;
841
842    /*
843     * Clean up.
844     */
845    vstring_free(buf);
846
847    /*
848     * Sanity checks. Verify that all required information was found,
849     * including the queue file end marker.
850     */
851    if (message->rcpt_unread < 0
852	|| (message->rcpt_offset == 0 && message->rcpt_unread != 0)) {
853	msg_warn("%s: rcpt count mismatch (%d)",
854		 message->queue_id, message->rcpt_unread);
855	message->rcpt_unread = 0;
856    }
857    if (rec_type <= 0) {
858	/* Already logged warning. */
859    } else if (message->arrival_time.tv_sec == 0) {
860	msg_warn("%s: message rejected: missing arrival time record",
861		 message->queue_id);
862    } else if (message->sender == 0) {
863	msg_warn("%s: message rejected: missing sender record",
864		 message->queue_id);
865    } else if (message->data_offset == 0) {
866	msg_warn("%s: message rejected: missing size record",
867		 message->queue_id);
868    } else {
869	return (0);
870    }
871    message->rcpt_offset = save_offset;		/* restore flag */
872    message->rcpt_unread = save_unread;		/* restore count */
873    recipient_list_free(&message->rcpt_list);
874    recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
875    return (-1);
876}
877
878/* qmgr_message_update_warn - update the time of next delay warning */
879
880void    qmgr_message_update_warn(QMGR_MESSAGE *message)
881{
882
883    /*
884     * XXX eventually this should let us schedule multiple warnings, right
885     * now it just allows for one.
886     */
887    if (qmgr_message_open(message)
888	|| vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0
889	|| rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT,
890		       REC_TYPE_WARN_ARG(0)) < 0
891	|| vstream_fflush(message->fp))
892	msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
893    qmgr_message_close(message);
894}
895
896/* qmgr_message_kill_record - mark one message record as killed */
897
898void    qmgr_message_kill_record(QMGR_MESSAGE *message, long offset)
899{
900    if (offset <= 0)
901	msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset);
902    if (qmgr_message_open(message)
903	|| rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0
904	|| vstream_fflush(message->fp))
905	msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp));
906    qmgr_message_close(message);
907}
908
909/* qmgr_message_sort_compare - compare recipient information */
910
911static int qmgr_message_sort_compare(const void *p1, const void *p2)
912{
913    RECIPIENT *rcpt1 = (RECIPIENT *) p1;
914    RECIPIENT *rcpt2 = (RECIPIENT *) p2;
915    QMGR_QUEUE *queue1;
916    QMGR_QUEUE *queue2;
917    char   *at1;
918    char   *at2;
919    int     result;
920
921    /*
922     * Compare most significant to least significant recipient attributes.
923     * The comparison function must be transitive, so NULL values need to be
924     * assigned an ordinal (we set NULL last).
925     */
926
927    queue1 = rcpt1->u.queue;
928    queue2 = rcpt2->u.queue;
929    if (queue1 != 0 && queue2 == 0)
930	return (-1);
931    if (queue1 == 0 && queue2 != 0)
932	return (1);
933    if (queue1 != 0 && queue2 != 0) {
934
935	/*
936	 * Compare message transport.
937	 */
938	if ((result = strcmp(queue1->transport->name,
939			     queue2->transport->name)) != 0)
940	    return (result);
941
942	/*
943	 * Compare queue name (nexthop or recipient@nexthop).
944	 */
945	if ((result = strcmp(queue1->name, queue2->name)) != 0)
946	    return (result);
947    }
948
949    /*
950     * Compare recipient domain.
951     */
952    at1 = strrchr(rcpt1->address, '@');
953    at2 = strrchr(rcpt2->address, '@');
954    if (at1 == 0 && at2 != 0)
955	return (1);
956    if (at1 != 0 && at2 == 0)
957	return (-1);
958    if (at1 != 0 && at2 != 0
959	&& (result = strcasecmp(at1, at2)) != 0)
960	return (result);
961
962    /*
963     * Compare recipient address.
964     */
965    return (strcmp(rcpt1->address, rcpt2->address));
966}
967
968/* qmgr_message_sort - sort message recipient addresses by domain */
969
970static void qmgr_message_sort(QMGR_MESSAGE *message)
971{
972    qsort((char *) message->rcpt_list.info, message->rcpt_list.len,
973	  sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare);
974    if (msg_verbose) {
975	RECIPIENT_LIST list = message->rcpt_list;
976	RECIPIENT *rcpt;
977
978	msg_info("start sorted recipient list");
979	for (rcpt = list.info; rcpt < list.info + list.len; rcpt++)
980	    msg_info("qmgr_message_sort: %s", rcpt->address);
981	msg_info("end sorted recipient list");
982    }
983}
984
985/* qmgr_resolve_one - resolve or skip one recipient */
986
987static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient,
988			            const char *addr, RESOLVE_REPLY *reply)
989{
990#define QMGR_REDIRECT(rp, tp, np) do { \
991	(rp)->flags = 0; \
992	vstring_strcpy((rp)->transport, (tp)); \
993	vstring_strcpy((rp)->nexthop, (np)); \
994    } while (0)
995
996    if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0)
997	resolve_clnt_query_from(message->sender, addr, reply);
998    else
999	resolve_clnt_verify_from(message->sender, addr, reply);
1000    if (reply->flags & RESOLVE_FLAG_FAIL) {
1001	QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY,
1002		      "4.3.0 address resolver failure");
1003	return (0);
1004    } else if (reply->flags & RESOLVE_FLAG_ERROR) {
1005	QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR,
1006		      "5.1.3 bad address syntax");
1007	return (0);
1008    } else {
1009	return (0);
1010    }
1011}
1012
1013/* qmgr_message_resolve - resolve recipients */
1014
1015static void qmgr_message_resolve(QMGR_MESSAGE *message)
1016{
1017    static ARGV *defer_xport_argv;
1018    RECIPIENT_LIST list = message->rcpt_list;
1019    RECIPIENT *recipient;
1020    QMGR_TRANSPORT *transport = 0;
1021    QMGR_QUEUE *queue = 0;
1022    RESOLVE_REPLY reply;
1023    VSTRING *queue_name;
1024    char   *at;
1025    char  **cpp;
1026    char   *nexthop;
1027    ssize_t len;
1028    int     status;
1029    DSN     dsn;
1030    MSG_STATS stats;
1031    DSN    *saved_dsn;
1032
1033#define STREQ(x,y)	(strcmp(x,y) == 0)
1034#define STR		vstring_str
1035#define LEN		VSTRING_LEN
1036
1037    resolve_clnt_init(&reply);
1038    queue_name = vstring_alloc(1);
1039    for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1040
1041	/*
1042	 * Redirect overrides all else. But only once (per entire message).
1043	 * For consistency with the remainder of Postfix, rewrite the address
1044	 * to canonical form before resolving it.
1045	 */
1046	if (message->redirect_addr) {
1047	    if (recipient > list.info) {
1048		recipient->u.queue = 0;
1049		continue;
1050	    }
1051	    message->rcpt_offset = 0;
1052	    message->rcpt_unread = 0;
1053
1054	    rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,
1055				  reply.recipient);
1056	    RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1057	    if (qmgr_resolve_one(message, recipient,
1058				 recipient->address, &reply) < 0)
1059		continue;
1060	    if (!STREQ(recipient->address, STR(reply.recipient)))
1061		RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1062	}
1063
1064	/*
1065	 * Content filtering overrides the address resolver.
1066	 *
1067	 * XXX Bypass content_filter inspection for user-generated probes
1068	 * (sendmail -bv). MTA-generated probes never have the "please filter
1069	 * me" bits turned on, but we handle them here anyway for the sake of
1070	 * future proofing.
1071	 */
1072#define FILTER_WITHOUT_NEXTHOP(filter, next) \
1073	(((next) = split_at((filter), ':')) == 0 || *(next) == 0)
1074
1075#define RCPT_WITHOUT_DOMAIN(rcpt, next) \
1076	((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0)
1077
1078	else if (message->filter_xport
1079		 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) {
1080	    reply.flags = 0;
1081	    vstring_strcpy(reply.transport, message->filter_xport);
1082	    if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop)
1083		&& *(nexthop = var_def_filter_nexthop) == 0
1084		&& RCPT_WITHOUT_DOMAIN(recipient->address, nexthop))
1085		nexthop = var_myhostname;
1086	    vstring_strcpy(reply.nexthop, nexthop);
1087	    vstring_strcpy(reply.recipient, recipient->address);
1088	}
1089
1090	/*
1091	 * Resolve the destination to (transport, nexthop, address). The
1092	 * result address may differ from the one specified by the sender.
1093	 */
1094	else {
1095	    if (qmgr_resolve_one(message, recipient,
1096				 recipient->address, &reply) < 0)
1097		continue;
1098	    if (!STREQ(recipient->address, STR(reply.recipient)))
1099		RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
1100	}
1101
1102	/*
1103	 * Bounce null recipients. This should never happen, but is most
1104	 * likely the result of a fault in a different program, so aborting
1105	 * the queue manager process does not help.
1106	 */
1107	if (recipient->address[0] == 0) {
1108	    QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR,
1109			  "5.1.3 null recipient address");
1110	}
1111
1112	/*
1113	 * Discard mail to the local double bounce address here, so this
1114	 * system can run without a local delivery agent. They'd still have
1115	 * to configure something for mail directed to the local postmaster,
1116	 * though, but that is an RFC requirement anyway.
1117	 *
1118	 * XXX This lookup should be done in the resolver, and the mail should
1119	 * be directed to a general-purpose null delivery agent.
1120	 */
1121	if (reply.flags & RESOLVE_CLASS_LOCAL) {
1122	    at = strrchr(STR(reply.recipient), '@');
1123	    len = (at ? (at - STR(reply.recipient))
1124		   : strlen(STR(reply.recipient)));
1125	    if (strncasecmp(STR(reply.recipient), var_double_bounce_sender,
1126			    len) == 0
1127		&& !var_double_bounce_sender[len]) {
1128		status = sent(message->tflags, message->queue_id,
1129			      QMGR_MSG_STATS(&stats, message), recipient,
1130			      "none", DSN_SIMPLE(&dsn, "2.0.0",
1131			"undeliverable postmaster notification discarded"));
1132		if (status == 0) {
1133		    deliver_completed(message->fp, recipient->offset);
1134#if 0
1135		    /* It's the default verification probe sender address. */
1136		    msg_warn("%s: undeliverable postmaster notification discarded",
1137			     message->queue_id);
1138#endif
1139		} else
1140		    message->flags |= status;
1141		continue;
1142	    }
1143	}
1144
1145	/*
1146	 * Optionally defer deliveries over specific transports, unless the
1147	 * restriction is lifted temporarily.
1148	 */
1149	if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) {
1150	    if (defer_xport_argv == 0)
1151		defer_xport_argv = argv_split(var_defer_xports, " \t\r\n,");
1152	    for (cpp = defer_xport_argv->argv; *cpp; cpp++)
1153		if (strcmp(*cpp, STR(reply.transport)) == 0)
1154		    break;
1155	    if (*cpp) {
1156		QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY,
1157			      "4.3.2 deferred transport");
1158	    }
1159	}
1160
1161	/*
1162	 * Look up or instantiate the proper transport.
1163	 */
1164	if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) {
1165	    if ((transport = qmgr_transport_find(STR(reply.transport))) == 0)
1166		transport = qmgr_transport_create(STR(reply.transport));
1167	    queue = 0;
1168	}
1169
1170	/*
1171	 * This message is being flushed. If need-be unthrottle the
1172	 * transport.
1173	 */
1174	if ((message->qflags & QMGR_FLUSH_EACH) != 0
1175	    && QMGR_TRANSPORT_THROTTLED(transport))
1176	    qmgr_transport_unthrottle(transport);
1177
1178	/*
1179	 * This transport is dead. Defer delivery to this recipient.
1180	 */
1181	if (QMGR_TRANSPORT_THROTTLED(transport)) {
1182	    saved_dsn = transport->dsn;
1183	    if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) {
1184		nexthop = qmgr_error_nexthop(saved_dsn);
1185		vstring_strcpy(reply.nexthop, nexthop);
1186		myfree(nexthop);
1187		queue = 0;
1188	    } else {
1189		qmgr_defer_recipient(message, recipient, saved_dsn);
1190		continue;
1191	    }
1192	}
1193
1194	/*
1195	 * The nexthop destination provides the default name for the
1196	 * per-destination queue. When the delivery agent accepts only one
1197	 * recipient per delivery, give each recipient its own queue, so that
1198	 * deliveries to different recipients of the same message can happen
1199	 * in parallel, and so that we can enforce per-recipient concurrency
1200	 * limits and prevent one recipient from tying up all the delivery
1201	 * agent resources. We use recipient@nexthop as queue name rather
1202	 * than the actual recipient domain name, so that one recipient in
1203	 * multiple equivalent domains cannot evade the per-recipient
1204	 * concurrency limit. Split the address on the recipient delimiter if
1205	 * one is defined, so that extended addresses don't get extra
1206	 * delivery slots.
1207	 *
1208	 * Fold the result to lower case so that we don't have multiple queues
1209	 * for the same name.
1210	 *
1211	 * Important! All recipients in a queue must have the same nexthop
1212	 * value. It is OK to have multiple queues with the same nexthop
1213	 * value, but only when those queues are named after recipients.
1214	 *
1215	 * The single-recipient code below was written for local(8) like
1216	 * delivery agents, and assumes that all domains that deliver to the
1217	 * same (transport + nexthop) are aliases for $nexthop. Delivery
1218	 * concurrency is changed from per-domain into per-recipient, by
1219	 * changing the queue name from nexthop into localpart@nexthop.
1220	 *
1221	 * XXX This assumption is incorrect when different destinations share
1222	 * the same (transport + nexthop). In reality, such transports are
1223	 * rarely configured to use single-recipient deliveries. The fix is
1224	 * to decouple the per-destination recipient limit from the
1225	 * per-destination concurrency.
1226	 */
1227	vstring_strcpy(queue_name, STR(reply.nexthop));
1228	if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0
1229	    && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0
1230	    && transport->recipient_limit == 1) {
1231	    /* Copy the recipient localpart. */
1232	    at = strrchr(STR(reply.recipient), '@');
1233	    len = (at ? (at - STR(reply.recipient))
1234		   : strlen(STR(reply.recipient)));
1235	    vstring_strncpy(queue_name, STR(reply.recipient), len);
1236	    /* Remove the address extension from the recipient localpart. */
1237	    if (*var_rcpt_delim && split_addr(STR(queue_name), *var_rcpt_delim))
1238		vstring_truncate(queue_name, strlen(STR(queue_name)));
1239	    /* Assume the recipient domain is equivalent to nexthop. */
1240	    vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop));
1241	}
1242	lowercase(STR(queue_name));
1243
1244	/*
1245	 * This transport is alive. Find or instantiate a queue for this
1246	 * recipient.
1247	 */
1248	if (queue == 0 || !STREQ(queue->name, STR(queue_name))) {
1249	    if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0)
1250		queue = qmgr_queue_create(transport, STR(queue_name),
1251					  STR(reply.nexthop));
1252	}
1253
1254	/*
1255	 * This message is being flushed. If need-be unthrottle the queue.
1256	 */
1257	if ((message->qflags & QMGR_FLUSH_EACH) != 0
1258	    && QMGR_QUEUE_THROTTLED(queue))
1259	    qmgr_queue_unthrottle(queue);
1260
1261	/*
1262	 * This queue is dead. Defer delivery to this recipient.
1263	 */
1264	if (QMGR_QUEUE_THROTTLED(queue)) {
1265	    saved_dsn = queue->dsn;
1266	    if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) {
1267		qmgr_defer_recipient(message, recipient, saved_dsn);
1268		continue;
1269	    }
1270	}
1271
1272	/*
1273	 * This queue is alive. Bind this recipient to this queue instance.
1274	 */
1275	recipient->u.queue = queue;
1276    }
1277    resolve_clnt_free(&reply);
1278    vstring_free(queue_name);
1279}
1280
1281/* qmgr_message_assign - assign recipients to specific delivery requests */
1282
1283static void qmgr_message_assign(QMGR_MESSAGE *message)
1284{
1285    RECIPIENT_LIST list = message->rcpt_list;
1286    RECIPIENT *recipient;
1287    QMGR_ENTRY *entry = 0;
1288    QMGR_QUEUE *queue;
1289    QMGR_JOB *job = 0;
1290    QMGR_PEER *peer = 0;
1291
1292    /*
1293     * Try to bundle as many recipients in a delivery request as we can. When
1294     * the recipient resolves to the same site and transport as an existing
1295     * recipient, do not create a new queue entry, just move that recipient
1296     * to the recipient list of the existing queue entry. All this provided
1297     * that we do not exceed the transport-specific limit on the number of
1298     * recipients per transaction.
1299     */
1300#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))
1301
1302    for (recipient = list.info; recipient < list.info + list.len; recipient++) {
1303
1304	/*
1305	 * Skip recipients with a dead transport or destination.
1306	 */
1307	if ((queue = recipient->u.queue) == 0)
1308	    continue;
1309
1310	/*
1311	 * Lookup or instantiate the message job if necessary.
1312	 */
1313	if (job == 0 || queue->transport != job->transport) {
1314	    job = qmgr_job_obtain(message, queue->transport);
1315	    peer = 0;
1316	}
1317
1318	/*
1319	 * Lookup or instantiate job peer if necessary.
1320	 */
1321	if (peer == 0 || queue != peer->queue)
1322	    peer = qmgr_peer_obtain(job, queue);
1323
1324	/*
1325	 * Lookup old or instantiate new recipient entry. We try to reuse the
1326	 * last existing entry whenever the recipient limit permits.
1327	 */
1328	entry = peer->entry_list.prev;
1329	if (message->single_rcpt || entry == 0
1330	    || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len))
1331	    entry = qmgr_entry_create(peer, message);
1332
1333	/*
1334	 * Add the recipient to the current entry and increase all those
1335	 * recipient counters accordingly.
1336	 */
1337	recipient_list_add(&entry->rcpt_list, recipient->offset,
1338			   recipient->dsn_orcpt, recipient->dsn_notify,
1339			   recipient->orig_addr, recipient->address);
1340	job->rcpt_count++;
1341	message->rcpt_count++;
1342	qmgr_recipient_count++;
1343    }
1344
1345    /*
1346     * Release the message recipient list and reinitialize it for the next
1347     * time.
1348     */
1349    recipient_list_free(&message->rcpt_list);
1350    recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
1351
1352    /*
1353     * Note that even if qmgr_job_obtain() reset the job candidate cache of
1354     * all transports to which we assigned new recipients, this message may
1355     * have other jobs which we didn't touch at all this time. But the number
1356     * of unread recipients affecting the candidate selection might have
1357     * changed considerably, so we must invalidate the caches if it might be
1358     * of some use.
1359     */
1360    for (job = message->job_list.next; job; job = job->message_peers.next)
1361	if (job->selected_entries < job->read_entries
1362	    && job->blocker_tag != job->transport->blocker_tag)
1363	    job->transport->candidate_cache_current = 0;
1364}
1365
1366/* qmgr_message_move_limits - recycle unused recipient slots */
1367
1368static void qmgr_message_move_limits(QMGR_MESSAGE *message)
1369{
1370    QMGR_JOB *job;
1371
1372    for (job = message->job_list.next; job; job = job->message_peers.next)
1373	qmgr_job_move_limits(job);
1374}
1375
1376/* qmgr_message_free - release memory for in-core message structure */
1377
1378void    qmgr_message_free(QMGR_MESSAGE *message)
1379{
1380    QMGR_JOB *job;
1381
1382    if (message->refcount != 0)
1383	msg_panic("qmgr_message_free: reference len: %d", message->refcount);
1384    if (message->fp)
1385	msg_panic("qmgr_message_free: queue file is open");
1386    while ((job = message->job_list.next) != 0)
1387	qmgr_job_free(job);
1388    myfree(message->queue_id);
1389    myfree(message->queue_name);
1390    if (message->dsn_envid)
1391	myfree(message->dsn_envid);
1392    if (message->encoding)
1393	myfree(message->encoding);
1394    if (message->sender)
1395	myfree(message->sender);
1396    if (message->verp_delims)
1397	myfree(message->verp_delims);
1398    if (message->filter_xport)
1399	myfree(message->filter_xport);
1400    if (message->inspect_xport)
1401	myfree(message->inspect_xport);
1402    if (message->redirect_addr)
1403	myfree(message->redirect_addr);
1404    if (message->client_name)
1405	myfree(message->client_name);
1406    if (message->client_addr)
1407	myfree(message->client_addr);
1408    if (message->client_port)
1409	myfree(message->client_port);
1410    if (message->client_proto)
1411	myfree(message->client_proto);
1412    if (message->client_helo)
1413	myfree(message->client_helo);
1414    if (message->sasl_method)
1415	myfree(message->sasl_method);
1416    if (message->sasl_username)
1417	myfree(message->sasl_username);
1418    if (message->sasl_sender)
1419	myfree(message->sasl_sender);
1420    if (message->log_ident)
1421	myfree(message->log_ident);
1422    if (message->rewrite_context)
1423	myfree(message->rewrite_context);
1424    recipient_list_free(&message->rcpt_list);
1425    qmgr_message_count--;
1426    myfree((char *) message);
1427}
1428
1429/* qmgr_message_alloc - create in-core message structure */
1430
1431QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id,
1432				         int qflags, mode_t mode)
1433{
1434    const char *myname = "qmgr_message_alloc";
1435    QMGR_MESSAGE *message;
1436
1437    if (msg_verbose)
1438	msg_info("%s: %s %s", myname, queue_name, queue_id);
1439
1440    /*
1441     * Create an in-core message structure.
1442     */
1443    message = qmgr_message_create(queue_name, queue_id, qflags);
1444
1445    /*
1446     * Extract message envelope information: time of arrival, sender address,
1447     * recipient addresses. Skip files with malformed envelope information.
1448     */
1449#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT)
1450
1451    if (qmgr_message_open(message) < 0) {
1452	qmgr_message_free(message);
1453	return (0);
1454    }
1455    if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) {
1456	msg_info("%s: skipped, still being delivered", queue_id);
1457	qmgr_message_close(message);
1458	qmgr_message_free(message);
1459	return (QMGR_MESSAGE_LOCKED);
1460    }
1461    if (qmgr_message_read(message) < 0) {
1462	qmgr_message_close(message);
1463	qmgr_message_free(message);
1464	return (0);
1465    } else {
1466
1467	/*
1468	 * We have validated the queue file content, so it is safe to modify
1469	 * the file properties now.
1470	 */
1471	if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0)
1472	    msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp));
1473
1474	/*
1475	 * Reset the defer log. This code should not be here, but we must
1476	 * reset the defer log *after* acquiring the exclusive lock on the
1477	 * queue file and *before* resolving new recipients. Since all those
1478	 * operations are encapsulated so nicely by this routine, the defer
1479	 * log reset has to be done here as well.
1480	 *
1481	 * Note: it is safe to remove the defer logfile from a previous queue
1482	 * run of this queue file, because the defer log contains information
1483	 * about recipients that still exist in this queue file.
1484	 */
1485	if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)
1486	    msg_fatal("%s: %s: remove %s %s: %m", myname,
1487		      queue_id, MAIL_QUEUE_DEFER, queue_id);
1488	qmgr_message_sort(message);
1489	qmgr_message_resolve(message);
1490	qmgr_message_sort(message);
1491	qmgr_message_assign(message);
1492	qmgr_message_close(message);
1493	if (message->rcpt_offset == 0)
1494	    qmgr_message_move_limits(message);
1495	return (message);
1496    }
1497}
1498
1499/* qmgr_message_realloc - refresh in-core message structure */
1500
1501QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message)
1502{
1503    const char *myname = "qmgr_message_realloc";
1504
1505    /*
1506     * Sanity checks.
1507     */
1508    if (message->rcpt_offset <= 0)
1509	msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset);
1510    if (msg_verbose)
1511	msg_info("%s: %s %s offset %ld", myname, message->queue_name,
1512		 message->queue_id, message->rcpt_offset);
1513
1514    /*
1515     * Extract recipient addresses. Skip files with malformed envelope
1516     * information.
1517     */
1518    if (qmgr_message_open(message) < 0)
1519	return (0);
1520    if (qmgr_message_read(message) < 0) {
1521	qmgr_message_close(message);
1522	return (0);
1523    } else {
1524	qmgr_message_sort(message);
1525	qmgr_message_resolve(message);
1526	qmgr_message_sort(message);
1527	qmgr_message_assign(message);
1528	qmgr_message_close(message);
1529	if (message->rcpt_offset == 0)
1530	    qmgr_message_move_limits(message);
1531	return (message);
1532    }
1533}
1534