qmgr.h revision 1.1
1/*	$NetBSD: qmgr.h,v 1.1 2009/06/23 10:08:50 tron Exp $	*/
2
3/*++
4/* NAME
5/*	qmgr 3h
6/* SUMMARY
7/*	queue manager data structures
8/* SYNOPSIS
9/*	#include "qmgr.h"
10/* DESCRIPTION
11/* .nf
12
13 /*
14  * System library.
15  */
16#include <sys/time.h>
17#include <time.h>
18
19 /*
20  * Utility library.
21  */
22#include <vstream.h>
23#include <scan_dir.h>
24
25 /*
26  * Global library.
27  */
28#include <recipient_list.h>
29#include <dsn.h>
30
31 /*
32  * The queue manager is built around lots of mutually-referring structures.
33  * These typedefs save some typing.
34  */
35typedef struct QMGR_TRANSPORT QMGR_TRANSPORT;
36typedef struct QMGR_QUEUE QMGR_QUEUE;
37typedef struct QMGR_ENTRY QMGR_ENTRY;
38typedef struct QMGR_MESSAGE QMGR_MESSAGE;
39typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST;
40typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST;
41typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST;
42typedef struct QMGR_SCAN QMGR_SCAN;
43typedef struct QMGR_FEEDBACK QMGR_FEEDBACK;
44
45 /*
46  * Hairy macros to update doubly-linked lists.
47  */
48#define QMGR_LIST_ROTATE(head, object) { \
49    head.next->peers.prev = head.prev; \
50    head.prev->peers.next = head.next; \
51    head.next = object->peers.next; \
52    if (object->peers.next) \
53	head.next->peers.prev = 0; \
54    head.prev = object; \
55    object->peers.next = 0; \
56}
57
58#define QMGR_LIST_UNLINK(head, type, object) { \
59    type   next = object->peers.next; \
60    type   prev = object->peers.prev; \
61    if (prev) prev->peers.next = next; \
62    else head.next = next; \
63    if (next) next->peers.prev = prev; \
64    else head.prev = prev; \
65    object->peers.next = object->peers.prev = 0; \
66}
67
68#define QMGR_LIST_APPEND(head, object) { \
69    object->peers.next = head.next; \
70    object->peers.prev = 0; \
71    if (head.next) { \
72	head.next->peers.prev = object; \
73    } else { \
74	head.prev = object; \
75    } \
76    head.next = object; \
77}
78
79#define QMGR_LIST_PREPEND(head, object) { \
80    object->peers.prev = head.prev; \
81    object->peers.next = 0; \
82    if (head.prev) { \
83	head.prev->peers.next = object; \
84    } else { \
85	head.next = object; \
86    } \
87    head.prev = object; \
88}
89
90#define QMGR_LIST_INIT(head) { \
91    head.prev = 0; \
92    head.next = 0; \
93}
94
95 /*
96  * Transports are looked up by name (when we have resolved a message), or
97  * round-robin wise (when we want to distribute resources fairly).
98  */
99struct QMGR_TRANSPORT_LIST {
100    QMGR_TRANSPORT *next;
101    QMGR_TRANSPORT *prev;
102};
103
104extern struct HTABLE *qmgr_transport_byname;	/* transport by name */
105extern QMGR_TRANSPORT_LIST qmgr_transport_list;	/* transports, round robin */
106
107 /*
108  * Delivery agents provide feedback, as hints that Postfix should expend
109  * more or fewer resources on a specific destination domain. The main.cf
110  * file specifies how feedback affects delivery concurrency: add/subtract a
111  * constant, a ratio of constants, or a constant divided by the delivery
112  * concurrency; and it specifies how much feedback must accumulate between
113  * concurrency updates.
114  */
115struct QMGR_FEEDBACK {
116    int     hysteresis;			/* to pass, need to be this tall */
117    double  base;			/* pre-computed from main.cf */
118    int     index;			/* none, window, sqrt(window) */
119};
120
121#define QMGR_FEEDBACK_IDX_NONE		0	/* no window dependence */
122#define QMGR_FEEDBACK_IDX_WIN		1	/* 1/window dependence */
123#if 0
124#define QMGR_FEEDBACK_IDX_SQRT_WIN	2	/* 1/sqrt(window) dependence */
125#endif
126
127#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN
128#include <math.h>
129#endif
130
131extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *);
132
133#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN
134#define QMGR_FEEDBACK_VAL(fb, win) \
135    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win))
136#else
137#define QMGR_FEEDBACK_VAL(fb, win) \
138    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \
139    (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \
140    (fb).base / sqrt(win))
141#endif
142
143 /*
144  * Each transport (local, smtp-out, bounce) can have one queue per next hop
145  * name. Queues are looked up by next hop name (when we have resolved a
146  * message destination), or round-robin wise (when we want to deliver
147  * messages fairly).
148  */
149struct QMGR_QUEUE_LIST {
150    QMGR_QUEUE *next;
151    QMGR_QUEUE *prev;
152};
153
154struct QMGR_TRANSPORT {
155    int     flags;			/* blocked, etc. */
156    int     pending;			/* incomplete DA connections */
157    char   *name;			/* transport name */
158    int     dest_concurrency_limit;	/* concurrency per domain */
159    int     init_dest_concurrency;	/* init. per-domain concurrency */
160    int     recipient_limit;		/* recipients per transaction */
161    struct HTABLE *queue_byname;	/* queues indexed by domain */
162    QMGR_QUEUE_LIST queue_list;		/* queues, round robin order */
163    QMGR_TRANSPORT_LIST peers;		/* linkage */
164    DSN    *dsn;			/* why unavailable */
165    QMGR_FEEDBACK pos_feedback;		/* positive feedback control */
166    QMGR_FEEDBACK neg_feedback;		/* negative feedback control */
167    int     fail_cohort_limit;		/* flow shutdown control */
168    int     rate_delay;			/* suspend per delivery */
169};
170
171#define QMGR_TRANSPORT_STAT_DEAD	(1<<1)
172
173typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
174extern QMGR_TRANSPORT *qmgr_transport_select(void);
175extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
176extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
177extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
178extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
179extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
180
181#define QMGR_TRANSPORT_THROTTLED(t)	((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
182
183 /*
184  * Each next hop (e.g., a domain name) has its own queue of pending message
185  * transactions. The "todo" queue contains messages that are to be delivered
186  * to this next hop. When a message is elected for transmission, it is moved
187  * from the "todo" queue to the "busy" queue. Messages are taken from the
188  * "todo" queue in sequence. An initial destination delivery concurrency > 1
189  * ensures that one problematic message will not block all other traffic to
190  * that next hop.
191  */
192struct QMGR_ENTRY_LIST {
193    QMGR_ENTRY *next;
194    QMGR_ENTRY *prev;
195};
196
197struct QMGR_QUEUE {
198    int     dflags;			/* delivery request options */
199    time_t  last_done;			/* last delivery completion */
200    char   *name;			/* domain name or address */
201    char   *nexthop;			/* domain name */
202    int     todo_refcount;		/* queue entries (todo list) */
203    int     busy_refcount;		/* queue entries (busy list) */
204    int     window;			/* slow open algorithm */
205    double  success;			/* accumulated positive feedback */
206    double  failure;			/* accumulated negative feedback */
207    double  fail_cohorts;		/* pseudo-cohort failure count */
208    QMGR_TRANSPORT *transport;		/* transport linkage */
209    QMGR_ENTRY_LIST todo;		/* todo queue entries */
210    QMGR_ENTRY_LIST busy;		/* messages on the wire */
211    QMGR_QUEUE_LIST peers;		/* neighbor queues */
212    DSN    *dsn;			/* why unavailable */
213    time_t  clog_time_to_warn;		/* time of next warning */
214};
215
216#define	QMGR_QUEUE_TODO	1		/* waiting for service */
217#define QMGR_QUEUE_BUSY	2		/* recipients on the wire */
218
219extern int qmgr_queue_count;
220
221extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *);
222extern QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *);
223extern void qmgr_queue_done(QMGR_QUEUE *);
224extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *);
225extern void qmgr_queue_unthrottle(QMGR_QUEUE *);
226extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *);
227extern void qmgr_queue_suspend(QMGR_QUEUE *, int);
228
229 /*
230  * Exclusive queue states. Originally there were only two: "throttled" and
231  * "not throttled". It was natural to encode these in the queue window size.
232  * After 10 years it's not practical to rip out all the working code and
233  * change representations, so we just clean up the names a little.
234  *
235  * Note: only the "ready" state can reach every state (including itself);
236  * non-ready states can reach only the "ready" state. Other transitions are
237  * forbidden, because they would result in dangling event handlers.
238  */
239#define QMGR_QUEUE_STAT_THROTTLED	0	/* back-off timer */
240#define QMGR_QUEUE_STAT_SUSPENDED	-1	/* voluntary delay timer */
241#define QMGR_QUEUE_STAT_SAVED		-2	/* delayed cleanup timer */
242#define QMGR_QUEUE_STAT_BAD		-3	/* can't happen */
243
244#define QMGR_QUEUE_READY(q)	((q)->window > 0)
245#define QMGR_QUEUE_THROTTLED(q)	((q)->window == QMGR_QUEUE_STAT_THROTTLED)
246#define QMGR_QUEUE_SUSPENDED(q)	((q)->window == QMGR_QUEUE_STAT_SUSPENDED)
247#define QMGR_QUEUE_SAVED(q)	((q)->window == QMGR_QUEUE_STAT_SAVED)
248#define QMGR_QUEUE_BAD(q)	((q)->window <= QMGR_QUEUE_STAT_BAD)
249
250#define QMGR_QUEUE_STATUS(q) ( \
251	    QMGR_QUEUE_READY(q) ? "ready" : \
252	    QMGR_QUEUE_THROTTLED(q) ? "throttled" : \
253	    QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \
254	    QMGR_QUEUE_SAVED(q) ? "saved" : \
255	    "invalid queue status" \
256	)
257
258 /*
259  * Structure of one next-hop queue entry. In order to save some copying
260  * effort we allow multiple recipients per transaction.
261  */
262struct QMGR_ENTRY {
263    VSTREAM *stream;			/* delivery process */
264    QMGR_MESSAGE *message;		/* message info */
265    RECIPIENT_LIST rcpt_list;		/* as many as it takes */
266    QMGR_QUEUE *queue;			/* parent linkage */
267    QMGR_ENTRY_LIST peers;		/* neighbor entries */
268};
269
270extern QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *);
271extern void qmgr_entry_unselect(QMGR_QUEUE *, QMGR_ENTRY *);
272extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *);
273extern void qmgr_entry_done(QMGR_ENTRY *, int);
274extern QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *, QMGR_MESSAGE *);
275
276 /*
277  * All common in-core information about a message is kept here. When all
278  * recipients have been tried the message file is linked to the "deferred"
279  * queue (some hosts not reachable), to the "bounce" queue (some recipients
280  * were rejected), and is then removed from the "active" queue.
281  */
282struct QMGR_MESSAGE {
283    int     flags;			/* delivery problems */
284    int     qflags;			/* queuing flags */
285    int     tflags;			/* tracing flags */
286    long    tflags_offset;		/* offset for killing */
287    int     rflags;			/* queue file read flags */
288    VSTREAM *fp;			/* open queue file or null */
289    int     refcount;			/* queue entries */
290    int     single_rcpt;		/* send one rcpt at a time */
291    struct timeval arrival_time;	/* start of receive transaction */
292    time_t  create_time;		/* queue file create time */
293    struct timeval active_time;		/* time of entry into active queue */
294    long    warn_offset;		/* warning bounce flag offset */
295    time_t  warn_time;			/* time next warning to be sent */
296    long    data_offset;		/* data seek offset */
297    char   *queue_name;			/* queue name */
298    char   *queue_id;			/* queue file */
299    char   *encoding;			/* content encoding */
300    char   *sender;			/* complete address */
301    char   *dsn_envid;			/* DSN envelope ID */
302    int     dsn_ret;			/* DSN headers/full */
303    char   *verp_delims;		/* VERP delimiters */
304    char   *filter_xport;		/* filtering transport */
305    char   *inspect_xport;		/* inspecting transport */
306    char   *redirect_addr;		/* info@spammer.tld */
307    long    data_size;			/* data segment size */
308    long    cont_length;		/* message content length */
309    long    rcpt_offset;		/* more recipients here */
310    char   *client_name;		/* client hostname */
311    char   *client_addr;		/* client address */
312    char   *client_port;		/* client port */
313    char   *client_proto;		/* client protocol */
314    char   *client_helo;		/* helo parameter */
315    char   *sasl_method;		/* SASL method */
316    char   *sasl_username;		/* SASL user name */
317    char   *sasl_sender;		/* SASL sender */
318    char   *rewrite_context;		/* address qualification */
319    RECIPIENT_LIST rcpt_list;		/* complete addresses */
320};
321
322 /*
323  * Flags 0-15 are reserved for qmgr_user.h.
324  */
325#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT	(1<<16)
326
327#define QMGR_MESSAGE_LOCKED	((QMGR_MESSAGE *) 1)
328
329extern int qmgr_message_count;
330extern int qmgr_recipient_count;
331
332extern void qmgr_message_free(QMGR_MESSAGE *);
333extern void qmgr_message_update_warn(QMGR_MESSAGE *);
334extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
335extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
336extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
337
338#define QMGR_MSG_STATS(stats, message) \
339    MSG_STATS_INIT2(stats, \
340		    incoming_arrival, message->arrival_time, \
341		    active_arrival, message->active_time)
342
343 /*
344  * qmgr_defer.c
345  */
346extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *);
347extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *);
348extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
349
350 /*
351  * qmgr_bounce.c
352  */
353extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
354
355 /*
356  * qmgr_deliver.c
357  */
358extern int qmgr_deliver_concurrency;
359extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *);
360
361 /*
362  * qmgr_active.c
363  */
364extern int qmgr_active_feed(QMGR_SCAN *, const char *);
365extern void qmgr_active_drain(void);
366extern void qmgr_active_done(QMGR_MESSAGE *);
367
368 /*
369  * qmgr_move.c
370  */
371extern void qmgr_move(const char *, const char *, time_t);
372
373 /*
374  * qmgr_enable.c
375  */
376extern void qmgr_enable_all(void);
377extern void qmgr_enable_transport(QMGR_TRANSPORT *);
378extern void qmgr_enable_queue(QMGR_QUEUE *);
379
380 /*
381  * Queue scan context.
382  */
383struct QMGR_SCAN {
384    char   *queue;			/* queue name */
385    int     flags;			/* private, this run */
386    int     nflags;			/* private, next run */
387    struct SCAN_DIR *handle;		/* scan */
388};
389
390 /*
391  * Flags that control queue scans or destination selection. These are
392  * similar to the QMGR_REQ_XXX request codes.
393  */
394#define QMGR_SCAN_START	(1<<0)		/* start now/restart when done */
395#define QMGR_SCAN_ALL	(1<<1)		/* all queue file time stamps */
396#define QMGR_FLUSH_ONCE	(1<<2)		/* unthrottle once */
397#define QMGR_FLUSH_DFXP	(1<<3)		/* override defer_transports */
398#define QMGR_FLUSH_EACH	(1<<4)		/* unthrottle per message */
399
400 /*
401  * qmgr_scan.c
402  */
403extern QMGR_SCAN *qmgr_scan_create(const char *);
404extern void qmgr_scan_request(QMGR_SCAN *, int);
405extern char *qmgr_scan_next(QMGR_SCAN *);
406
407 /*
408  * qmgr_error.c
409  */
410extern QMGR_TRANSPORT *qmgr_error_transport(const char *);
411extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *);
412extern char *qmgr_error_nexthop(DSN *);
413
414/* LICENSE
415/* .ad
416/* .fi
417/*	The Secure Mailer license must be distributed with this software.
418/* AUTHOR(S)
419/*	Wietse Venema
420/*	IBM T.J. Watson Research
421/*	P.O. Box 704
422/*	Yorktown Heights, NY 10598, USA
423/*--*/
424