1/*	$NetBSD$	*/
2
3/*++
4/* NAME
5/*	qmgr 3h
6/* SUMMARY
7/*	queue manager data structures
8/* SYNOPSIS
9/*	#include "qmgr.h"
10/* DESCRIPTION
11/* .nf
12
13 /*
14  * System library.
15  */
16#include <sys/time.h>
17#include <time.h>
18
19 /*
20  * Utility library.
21  */
22#include <vstream.h>
23#include <scan_dir.h>
24
25 /*
26  * Global library.
27  */
28#include <recipient_list.h>
29#include <dsn.h>
30
31 /*
32  * The queue manager is built around lots of mutually-referring structures.
33  * These typedefs save some typing.
34  */
35typedef struct QMGR_TRANSPORT QMGR_TRANSPORT;
36typedef struct QMGR_QUEUE QMGR_QUEUE;
37typedef struct QMGR_ENTRY QMGR_ENTRY;
38typedef struct QMGR_MESSAGE QMGR_MESSAGE;
39typedef struct QMGR_JOB QMGR_JOB;
40typedef struct QMGR_PEER QMGR_PEER;
41typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST;
42typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST;
43typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST;
44typedef struct QMGR_JOB_LIST QMGR_JOB_LIST;
45typedef struct QMGR_PEER_LIST QMGR_PEER_LIST;
46typedef struct QMGR_SCAN QMGR_SCAN;
47typedef struct QMGR_FEEDBACK QMGR_FEEDBACK;
48
49 /*
50  * Hairy macros to update doubly-linked lists.
51  */
52#define QMGR_LIST_ROTATE(head, object, peers) { \
53    head.next->peers.prev = head.prev; \
54    head.prev->peers.next = head.next; \
55    head.next = object->peers.next; \
56    head.next->peers.prev = 0; \
57    head.prev = object; \
58    object->peers.next = 0; \
59}
60
61#define QMGR_LIST_UNLINK(head, type, object, peers) { \
62    type   _next = object->peers.next; \
63    type   _prev = object->peers.prev; \
64    if (_prev) _prev->peers.next = _next; \
65	else head.next = _next; \
66    if (_next) _next->peers.prev = _prev; \
67	else head.prev = _prev; \
68    object->peers.next = object->peers.prev = 0; \
69}
70
71#define QMGR_LIST_LINK(head, pred, object, succ, peers) { \
72    object->peers.prev = pred; \
73    object->peers.next = succ; \
74    if (pred) pred->peers.next = object; \
75    else head.next = object; \
76    if (succ) succ->peers.prev = object; \
77    else head.prev = object; \
78}
79
80#define QMGR_LIST_PREPEND(head, object, peers) { \
81    object->peers.next = head.next; \
82    object->peers.prev = 0; \
83    if (head.next) { \
84	head.next->peers.prev = object; \
85    } else { \
86	head.prev = object; \
87    } \
88    head.next = object; \
89}
90
91#define QMGR_LIST_APPEND(head, object, peers) { \
92    object->peers.prev = head.prev; \
93    object->peers.next = 0; \
94    if (head.prev) { \
95	head.prev->peers.next = object; \
96    } else { \
97	head.next = object; \
98    } \
99    head.prev = object; \
100}
101
102#define QMGR_LIST_INIT(head) { \
103    head.prev = 0; \
104    head.next = 0; \
105}
106
107 /*
108  * Transports are looked up by name (when we have resolved a message), or
109  * round-robin wise (when we want to distribute resources fairly).
110  */
111struct QMGR_TRANSPORT_LIST {
112    QMGR_TRANSPORT *next;
113    QMGR_TRANSPORT *prev;
114};
115
116extern struct HTABLE *qmgr_transport_byname;	/* transport by name */
117extern QMGR_TRANSPORT_LIST qmgr_transport_list;	/* transports, round robin */
118
119 /*
120  * Delivery agents provide feedback, as hints that Postfix should expend
121  * more or fewer resources on a specific destination domain. The main.cf
122  * file specifies how feedback affects delivery concurrency: add/subtract a
123  * constant, a ratio of constants, or a constant divided by the delivery
124  * concurrency; and it specifies how much feedback must accumulate between
125  * concurrency updates.
126  */
127struct QMGR_FEEDBACK {
128    int     hysteresis;			/* to pass, need to be this tall */
129    double  base;			/* pre-computed from main.cf */
130    int     index;			/* none, window, sqrt(window) */
131};
132
133#define QMGR_FEEDBACK_IDX_NONE		0	/* no window dependence */
134#define QMGR_FEEDBACK_IDX_WIN		1	/* 1/window dependence */
135#if 0
136#define QMGR_FEEDBACK_IDX_SQRT_WIN	2	/* 1/sqrt(window) dependence */
137#endif
138
139#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN
140#include <math.h>
141#endif
142
143extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *);
144
145#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN
146#define QMGR_FEEDBACK_VAL(fb, win) \
147    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win))
148#else
149#define QMGR_FEEDBACK_VAL(fb, win) \
150    ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \
151    (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \
152    (fb).base / sqrt(win))
153#endif
154
155 /*
156  * Each transport (local, smtp-out, bounce) can have one queue per next hop
157  * name. Queues are looked up by next hop name (when we have resolved a
158  * message destination), or round-robin wise (when we want to deliver
159  * messages fairly).
160  */
161struct QMGR_QUEUE_LIST {
162    QMGR_QUEUE *next;
163    QMGR_QUEUE *prev;
164};
165
166struct QMGR_JOB_LIST {
167    QMGR_JOB *next;
168    QMGR_JOB *prev;
169};
170
171struct QMGR_TRANSPORT {
172    int     flags;			/* blocked, etc. */
173    int     pending;			/* incomplete DA connections */
174    char   *name;			/* transport name */
175    int     dest_concurrency_limit;	/* concurrency per domain */
176    int     init_dest_concurrency;	/* init. per-domain concurrency */
177    int     recipient_limit;		/* recipients per transaction */
178    int     rcpt_per_stack;		/* extra slots reserved for jobs put
179					 * on the job stack */
180    int     rcpt_unused;		/* available in-core recipient slots */
181    int     refill_limit;		/* recipient batch size for message
182					 * refill */
183    int     refill_delay;		/* delay before message refill */
184    int     slot_cost;			/* cost of new preemption slot (# of
185					 * selected entries) */
186    int     slot_loan;			/* preemption boost offset and */
187    int     slot_loan_factor;		/* factor, see qmgr_job_preempt() */
188    int     min_slots;			/* when preemption can take effect at
189					 * all */
190    struct HTABLE *queue_byname;	/* queues indexed by domain */
191    QMGR_QUEUE_LIST queue_list;		/* queues, round robin order */
192    struct HTABLE *job_byname;		/* jobs indexed by queue id */
193    QMGR_JOB_LIST job_list;		/* list of message jobs (1 per
194					 * message) ordered by scheduler */
195    QMGR_JOB_LIST job_bytime;		/* jobs ordered by time since queued */
196    QMGR_JOB *job_current;		/* keeps track of the current job */
197    QMGR_JOB *job_next_unread;		/* next job with unread recipients */
198    QMGR_JOB *candidate_cache;		/* cached result from
199					 * qmgr_job_candidate() */
200    QMGR_JOB *candidate_cache_current;	/* current job tied to the candidate */
201    time_t  candidate_cache_time;	/* when candidate_cache was last
202					 * updated */
203    int     blocker_tag;		/* for marking blocker jobs */
204    QMGR_TRANSPORT_LIST peers;		/* linkage */
205    DSN    *dsn;			/* why unavailable */
206    QMGR_FEEDBACK pos_feedback;		/* positive feedback control */
207    QMGR_FEEDBACK neg_feedback;		/* negative feedback control */
208    int     fail_cohort_limit;		/* flow shutdown control */
209    int     rate_delay;			/* suspend per delivery */
210};
211
212#define QMGR_TRANSPORT_STAT_DEAD	(1<<1)
213
214typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
215extern QMGR_TRANSPORT *qmgr_transport_select(void);
216extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY);
217extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *);
218extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
219extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
220extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
221
222#define QMGR_TRANSPORT_THROTTLED(t)	((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
223
224 /*
225  * Each next hop (e.g., a domain name) has its own queue of pending message
226  * transactions. The "todo" queue contains messages that are to be delivered
227  * to this next hop. When a message is elected for transmission, it is moved
228  * from the "todo" queue to the "busy" queue. Messages are taken from the
229  * "todo" queue in round-robin order.
230  */
231struct QMGR_ENTRY_LIST {
232    QMGR_ENTRY *next;
233    QMGR_ENTRY *prev;
234};
235
236struct QMGR_QUEUE {
237    int     dflags;			/* delivery request options */
238    time_t  last_done;			/* last delivery completion */
239    char   *name;			/* domain name or address */
240    char   *nexthop;			/* domain name */
241    int     todo_refcount;		/* queue entries (todo list) */
242    int     busy_refcount;		/* queue entries (busy list) */
243    int     window;			/* slow open algorithm */
244    double  success;			/* accumulated positive feedback */
245    double  failure;			/* accumulated negative feedback */
246    double  fail_cohorts;		/* pseudo-cohort failure count */
247    QMGR_TRANSPORT *transport;		/* transport linkage */
248    QMGR_ENTRY_LIST todo;		/* todo queue entries */
249    QMGR_ENTRY_LIST busy;		/* messages on the wire */
250    QMGR_QUEUE_LIST peers;		/* neighbor queues */
251    DSN    *dsn;			/* why unavailable */
252    time_t  clog_time_to_warn;		/* time of last warning */
253    int     blocker_tag;		/* tagged if blocks job list */
254};
255
256#define	QMGR_QUEUE_TODO	1		/* waiting for service */
257#define QMGR_QUEUE_BUSY	2		/* recipients on the wire */
258
259extern int qmgr_queue_count;
260
261extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *);
262extern void qmgr_queue_done(QMGR_QUEUE *);
263extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *);
264extern void qmgr_queue_unthrottle(QMGR_QUEUE *);
265extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *);
266extern void qmgr_queue_suspend(QMGR_QUEUE *, int);
267
268 /*
269  * Exclusive queue states. Originally there were only two: "throttled" and
270  * "not throttled". It was natural to encode these in the queue window size.
271  * After 10 years it's not practical to rip out all the working code and
272  * change representations, so we just clean up the names a little.
273  *
274  * Note: only the "ready" state can reach every state (including itself);
275  * non-ready states can reach only the "ready" state. Other transitions are
276  * forbidden, because they would result in dangling event handlers.
277  */
278#define QMGR_QUEUE_STAT_THROTTLED	0	/* back-off timer */
279#define QMGR_QUEUE_STAT_SUSPENDED	-1	/* voluntary delay timer */
280#define QMGR_QUEUE_STAT_SAVED		-2	/* delayed cleanup timer */
281#define QMGR_QUEUE_STAT_BAD		-3	/* can't happen */
282
283#define QMGR_QUEUE_READY(q)	((q)->window > 0)
284#define QMGR_QUEUE_THROTTLED(q)	((q)->window == QMGR_QUEUE_STAT_THROTTLED)
285#define QMGR_QUEUE_SUSPENDED(q)	((q)->window == QMGR_QUEUE_STAT_SUSPENDED)
286#define QMGR_QUEUE_SAVED(q)	((q)->window == QMGR_QUEUE_STAT_SAVED)
287#define QMGR_QUEUE_BAD(q)	((q)->window <= QMGR_QUEUE_STAT_BAD)
288
289#define QMGR_QUEUE_STATUS(q) ( \
290	    QMGR_QUEUE_READY(q) ? "ready" : \
291	    QMGR_QUEUE_THROTTLED(q) ? "throttled" : \
292	    QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \
293	    QMGR_QUEUE_SAVED(q) ? "saved" : \
294	    "invalid queue status" \
295	)
296
297 /*
298  * Structure of one next-hop queue entry. In order to save some copying
299  * effort we allow multiple recipients per transaction.
300  */
301struct QMGR_ENTRY {
302    VSTREAM *stream;			/* delivery process */
303    QMGR_MESSAGE *message;		/* message info */
304    RECIPIENT_LIST rcpt_list;		/* as many as it takes */
305    QMGR_QUEUE *queue;			/* parent linkage */
306    QMGR_PEER *peer;			/* parent linkage */
307    QMGR_ENTRY_LIST queue_peers;	/* per queue neighbor entries */
308    QMGR_ENTRY_LIST peer_peers;		/* per peer neighbor entries */
309};
310
311extern QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *);
312extern void qmgr_entry_unselect(QMGR_ENTRY *);
313extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *);
314extern void qmgr_entry_done(QMGR_ENTRY *, int);
315extern QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *, QMGR_MESSAGE *);
316
317 /*
318  * All common in-core information about a message is kept here. When all
319  * recipients have been tried the message file is linked to the "deferred"
320  * queue (some hosts not reachable), to the "bounce" queue (some recipients
321  * were rejected), and is then removed from the "active" queue.
322  */
323struct QMGR_MESSAGE {
324    int     flags;			/* delivery problems */
325    int     qflags;			/* queuing flags */
326    int     tflags;			/* tracing flags */
327    long    tflags_offset;		/* offset for killing */
328    int     rflags;			/* queue file read flags */
329    VSTREAM *fp;			/* open queue file or null */
330    int     refcount;			/* queue entries */
331    int     single_rcpt;		/* send one rcpt at a time */
332    struct timeval arrival_time;	/* start of receive transaction */
333    time_t  create_time;		/* queue file create time */
334    struct timeval active_time;		/* time of entry into active queue */
335    time_t  queued_time;		/* sanitized time when moved to the
336					 * active queue */
337    time_t  refill_time;		/* sanitized time of last message
338					 * refill */
339    long    warn_offset;		/* warning bounce flag offset */
340    time_t  warn_time;			/* time next warning to be sent */
341    long    data_offset;		/* data seek offset */
342    char   *queue_name;			/* queue name */
343    char   *queue_id;			/* queue file */
344    char   *encoding;			/* content encoding */
345    char   *sender;			/* complete address */
346    char   *dsn_envid;			/* DSN envelope ID */
347    int     dsn_ret;			/* DSN headers/full */
348    char   *verp_delims;		/* VERP delimiters */
349    char   *filter_xport;		/* filtering transport */
350    char   *inspect_xport;		/* inspecting transport */
351    char   *redirect_addr;		/* info@spammer.tld */
352    long    data_size;			/* data segment size */
353    long    cont_length;		/* message content length */
354    long    rcpt_offset;		/* more recipients here */
355    char   *client_name;		/* client hostname */
356    char   *client_addr;		/* client address */
357    char   *client_port;		/* client port */
358    char   *client_proto;		/* client protocol */
359    char   *client_helo;		/* helo parameter */
360    char   *sasl_method;		/* SASL method */
361    char   *sasl_username;		/* SASL user name */
362    char   *sasl_sender;		/* SASL sender */
363    char   *log_ident;			/* up-stream queue ID */
364    char   *rewrite_context;		/* address qualification */
365    RECIPIENT_LIST rcpt_list;		/* complete addresses */
366    int     rcpt_count;			/* used recipient slots */
367    int     rcpt_limit;			/* maximum read in-core */
368    int     rcpt_unread;		/* # of recipients left in queue file */
369    QMGR_JOB_LIST job_list;		/* jobs delivering this message (1
370					 * per transport) */
371};
372
373 /*
374  * Flags 0-15 are reserved for qmgr_user.h.
375  */
376#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT	(1<<16)
377
378#define QMGR_MESSAGE_LOCKED	((QMGR_MESSAGE *) 1)
379
380extern int qmgr_message_count;
381extern int qmgr_recipient_count;
382
383extern void qmgr_message_free(QMGR_MESSAGE *);
384extern void qmgr_message_update_warn(QMGR_MESSAGE *);
385extern void qmgr_message_kill_record(QMGR_MESSAGE *, long);
386extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t);
387extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *);
388
389#define QMGR_MSG_STATS(stats, message) \
390    MSG_STATS_INIT2(stats, \
391                    incoming_arrival, message->arrival_time, \
392                    active_arrival, message->active_time)
393
394 /*
395  * Sometimes it's required to access the transport queues and entries on per
396  * message basis. That's what the QMGR_JOB structure is for - it groups all
397  * per message information within each transport using a list of QMGR_PEER
398  * structures. These structures in turn correspond with per message
399  * QMGR_QUEUE structure and list all per message QMGR_ENTRY structures.
400  */
401struct QMGR_PEER_LIST {
402    QMGR_PEER *next;
403    QMGR_PEER *prev;
404};
405
406struct QMGR_JOB {
407    QMGR_MESSAGE *message;		/* message delivered by this job */
408    QMGR_TRANSPORT *transport;		/* transport this job belongs to */
409    QMGR_JOB_LIST message_peers;	/* per message neighbor linkage */
410    QMGR_JOB_LIST transport_peers;	/* per transport neighbor linkage */
411    QMGR_JOB_LIST time_peers;		/* by time neighbor linkage */
412    QMGR_JOB *stack_parent;		/* stack parent */
413    QMGR_JOB_LIST stack_children;	/* all stack children */
414    QMGR_JOB_LIST stack_siblings;	/* stack children linkage */
415    int     stack_level;		/* job stack nesting level (-1 means
416					 * it's not on the lists at all) */
417    int     blocker_tag;		/* tagged if blocks the job list */
418    struct HTABLE *peer_byname;		/* message job peers, indexed by
419					 * domain */
420    QMGR_PEER_LIST peer_list;		/* list of message job peers */
421    int     slots_used;			/* slots used during preemption */
422    int     slots_available;		/* slots available for preemption (in
423					 * multiples of slot_cost) */
424    int     selected_entries;		/* # of entries selected for delivery
425					 * so far */
426    int     read_entries;		/* # of entries read in-core so far */
427    int     rcpt_count;			/* used recipient slots */
428    int     rcpt_limit;			/* available recipient slots */
429};
430
431struct QMGR_PEER {
432    QMGR_JOB *job;			/* job handling this peer */
433    QMGR_QUEUE *queue;			/* queue corresponding with this peer */
434    int     refcount;			/* peer entries */
435    QMGR_ENTRY_LIST entry_list;		/* todo message entries queued for
436					 * this peer */
437    QMGR_PEER_LIST peers;		/* neighbor linkage */
438};
439
440extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *);
441extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *);
442extern void qmgr_job_blocker_update(QMGR_QUEUE *);
443
444extern QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *, QMGR_TRANSPORT *);
445extern void qmgr_job_free(QMGR_JOB *);
446extern void qmgr_job_move_limits(QMGR_JOB *);
447
448extern QMGR_PEER *qmgr_peer_create(QMGR_JOB *, QMGR_QUEUE *);
449extern QMGR_PEER *qmgr_peer_find(QMGR_JOB *, QMGR_QUEUE *);
450extern QMGR_PEER *qmgr_peer_obtain(QMGR_JOB *, QMGR_QUEUE *);
451extern void qmgr_peer_free(QMGR_PEER *);
452
453 /*
454  * qmgr_defer.c
455  */
456extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *);
457extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *);
458extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
459
460 /*
461  * qmgr_bounce.c
462  */
463extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *);
464
465 /*
466  * qmgr_deliver.c
467  */
468extern int qmgr_deliver_concurrency;
469extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *);
470
471 /*
472  * qmgr_active.c
473  */
474extern int qmgr_active_feed(QMGR_SCAN *, const char *);
475extern void qmgr_active_drain(void);
476extern void qmgr_active_done(QMGR_MESSAGE *);
477
478 /*
479  * qmgr_move.c
480  */
481extern void qmgr_move(const char *, const char *, time_t);
482
483 /*
484  * qmgr_enable.c
485  */
486extern void qmgr_enable_all(void);
487extern void qmgr_enable_transport(QMGR_TRANSPORT *);
488extern void qmgr_enable_queue(QMGR_QUEUE *);
489
490 /*
491  * Queue scan context.
492  */
493struct QMGR_SCAN {
494    char   *queue;			/* queue name */
495    int     flags;			/* private, this run */
496    int     nflags;			/* private, next run */
497    struct SCAN_DIR *handle;		/* scan */
498};
499
500 /*
501  * Flags that control queue scans or destination selection. These are
502  * similar to the QMGR_REQ_XXX request codes.
503  */
504#define QMGR_SCAN_START	(1<<0)		/* start now/restart when done */
505#define QMGR_SCAN_ALL	(1<<1)		/* all queue file time stamps */
506#define QMGR_FLUSH_ONCE	(1<<2)		/* unthrottle once */
507#define QMGR_FLUSH_DFXP	(1<<3)		/* override defer_transports */
508#define QMGR_FLUSH_EACH	(1<<4)		/* unthrottle per message */
509
510 /*
511  * qmgr_scan.c
512  */
513extern QMGR_SCAN *qmgr_scan_create(const char *);
514extern void qmgr_scan_request(QMGR_SCAN *, int);
515extern char *qmgr_scan_next(QMGR_SCAN *);
516
517 /*
518  * qmgr_error.c
519  */
520extern QMGR_TRANSPORT *qmgr_error_transport(const char *);
521extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *);
522extern char *qmgr_error_nexthop(DSN *);
523
524/* LICENSE
525/* .ad
526/* .fi
527/*	The Secure Mailer license must be distributed with this software.
528/* AUTHOR(S)
529/*	Wietse Venema
530/*	IBM T.J. Watson Research
531/*	P.O. Box 704
532/*	Yorktown Heights, NY 10598, USA
533/*
534/*	Preemptive scheduler enhancements:
535/*	Patrik Rak
536/*	Modra 6
537/*	155 00, Prague, Czech Republic
538/*--*/
539