1/*++ 2/* NAME 3/* qmgr 3h 4/* SUMMARY 5/* queue manager data structures 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* DESCRIPTION 9/* .nf 10 11 /* 12 * System library. 13 */ 14#include <sys/time.h> 15#include <time.h> 16 17 /* 18 * Utility library. 19 */ 20#include <vstream.h> 21#include <scan_dir.h> 22 23 /* 24 * Global library. 25 */ 26#include <recipient_list.h> 27#include <dsn.h> 28 29 /* 30 * The queue manager is built around lots of mutually-referring structures. 31 * These typedefs save some typing. 32 */ 33typedef struct QMGR_TRANSPORT QMGR_TRANSPORT; 34typedef struct QMGR_QUEUE QMGR_QUEUE; 35typedef struct QMGR_ENTRY QMGR_ENTRY; 36typedef struct QMGR_MESSAGE QMGR_MESSAGE; 37typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST; 38typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST; 39typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST; 40typedef struct QMGR_SCAN QMGR_SCAN; 41typedef struct QMGR_FEEDBACK QMGR_FEEDBACK; 42 43 /* 44 * Hairy macros to update doubly-linked lists. 45 */ 46#define QMGR_LIST_ROTATE(head, object) { \ 47 head.next->peers.prev = head.prev; \ 48 head.prev->peers.next = head.next; \ 49 head.next = object->peers.next; \ 50 if (object->peers.next) \ 51 head.next->peers.prev = 0; \ 52 head.prev = object; \ 53 object->peers.next = 0; \ 54} 55 56#define QMGR_LIST_UNLINK(head, type, object) { \ 57 type next = object->peers.next; \ 58 type prev = object->peers.prev; \ 59 if (prev) prev->peers.next = next; \ 60 else head.next = next; \ 61 if (next) next->peers.prev = prev; \ 62 else head.prev = prev; \ 63 object->peers.next = object->peers.prev = 0; \ 64} 65 66#define QMGR_LIST_APPEND(head, object) { \ 67 object->peers.next = head.next; \ 68 object->peers.prev = 0; \ 69 if (head.next) { \ 70 head.next->peers.prev = object; \ 71 } else { \ 72 head.prev = object; \ 73 } \ 74 head.next = object; \ 75} 76 77#define QMGR_LIST_PREPEND(head, object) { \ 78 object->peers.prev = head.prev; \ 79 object->peers.next = 0; \ 80 if (head.prev) { \ 81 head.prev->peers.next = object; \ 82 } else { \ 83 head.next = object; \ 84 } \ 85 head.prev = object; \ 86} 87 88#define QMGR_LIST_INIT(head) { \ 89 head.prev = 0; \ 90 head.next = 0; \ 91} 92 93 /* 94 * Transports are looked up by name (when we have resolved a message), or 95 * round-robin wise (when we want to distribute resources fairly). 96 */ 97struct QMGR_TRANSPORT_LIST { 98 QMGR_TRANSPORT *next; 99 QMGR_TRANSPORT *prev; 100}; 101 102extern struct HTABLE *qmgr_transport_byname; /* transport by name */ 103extern QMGR_TRANSPORT_LIST qmgr_transport_list; /* transports, round robin */ 104 105 /* 106 * Delivery agents provide feedback, as hints that Postfix should expend 107 * more or fewer resources on a specific destination domain. The main.cf 108 * file specifies how feedback affects delivery concurrency: add/subtract a 109 * constant, a ratio of constants, or a constant divided by the delivery 110 * concurrency; and it specifies how much feedback must accumulate between 111 * concurrency updates. 112 */ 113struct QMGR_FEEDBACK { 114 int hysteresis; /* to pass, need to be this tall */ 115 double base; /* pre-computed from main.cf */ 116 int index; /* none, window, sqrt(window) */ 117}; 118 119#define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */ 120#define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */ 121#if 0 122#define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */ 123#endif 124 125#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN 126#include <math.h> 127#endif 128 129extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *); 130 131#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN 132#define QMGR_FEEDBACK_VAL(fb, win) \ 133 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win)) 134#else 135#define QMGR_FEEDBACK_VAL(fb, win) \ 136 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \ 137 (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \ 138 (fb).base / sqrt(win)) 139#endif 140 141 /* 142 * Each transport (local, smtp-out, bounce) can have one queue per next hop 143 * name. Queues are looked up by next hop name (when we have resolved a 144 * message destination), or round-robin wise (when we want to deliver 145 * messages fairly). 146 */ 147struct QMGR_QUEUE_LIST { 148 QMGR_QUEUE *next; 149 QMGR_QUEUE *prev; 150}; 151 152struct QMGR_TRANSPORT { 153 int flags; /* blocked, etc. */ 154 int pending; /* incomplete DA connections */ 155 char *name; /* transport name */ 156 int dest_concurrency_limit; /* concurrency per domain */ 157 int init_dest_concurrency; /* init. per-domain concurrency */ 158 int recipient_limit; /* recipients per transaction */ 159 struct HTABLE *queue_byname; /* queues indexed by domain */ 160 QMGR_QUEUE_LIST queue_list; /* queues, round robin order */ 161 QMGR_TRANSPORT_LIST peers; /* linkage */ 162 DSN *dsn; /* why unavailable */ 163 QMGR_FEEDBACK pos_feedback; /* positive feedback control */ 164 QMGR_FEEDBACK neg_feedback; /* negative feedback control */ 165 int fail_cohort_limit; /* flow shutdown control */ 166 int rate_delay; /* suspend per delivery */ 167}; 168 169#define QMGR_TRANSPORT_STAT_DEAD (1<<1) 170 171typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); 172extern QMGR_TRANSPORT *qmgr_transport_select(void); 173extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY); 174extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *); 175extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); 176extern QMGR_TRANSPORT *qmgr_transport_create(const char *); 177extern QMGR_TRANSPORT *qmgr_transport_find(const char *); 178 179#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) 180 181 /* 182 * Each next hop (e.g., a domain name) has its own queue of pending message 183 * transactions. The "todo" queue contains messages that are to be delivered 184 * to this next hop. When a message is elected for transmission, it is moved 185 * from the "todo" queue to the "busy" queue. Messages are taken from the 186 * "todo" queue in sequence. An initial destination delivery concurrency > 1 187 * ensures that one problematic message will not block all other traffic to 188 * that next hop. 189 */ 190struct QMGR_ENTRY_LIST { 191 QMGR_ENTRY *next; 192 QMGR_ENTRY *prev; 193}; 194 195struct QMGR_QUEUE { 196 int dflags; /* delivery request options */ 197 time_t last_done; /* last delivery completion */ 198 char *name; /* domain name or address */ 199 char *nexthop; /* domain name */ 200 int todo_refcount; /* queue entries (todo list) */ 201 int busy_refcount; /* queue entries (busy list) */ 202 int window; /* slow open algorithm */ 203 double success; /* accumulated positive feedback */ 204 double failure; /* accumulated negative feedback */ 205 double fail_cohorts; /* pseudo-cohort failure count */ 206 QMGR_TRANSPORT *transport; /* transport linkage */ 207 QMGR_ENTRY_LIST todo; /* todo queue entries */ 208 QMGR_ENTRY_LIST busy; /* messages on the wire */ 209 QMGR_QUEUE_LIST peers; /* neighbor queues */ 210 DSN *dsn; /* why unavailable */ 211 time_t clog_time_to_warn; /* time of next warning */ 212}; 213 214#define QMGR_QUEUE_TODO 1 /* waiting for service */ 215#define QMGR_QUEUE_BUSY 2 /* recipients on the wire */ 216 217extern int qmgr_queue_count; 218 219extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *); 220extern QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *); 221extern void qmgr_queue_done(QMGR_QUEUE *); 222extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *); 223extern void qmgr_queue_unthrottle(QMGR_QUEUE *); 224extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *); 225extern void qmgr_queue_suspend(QMGR_QUEUE *, int); 226 227 /* 228 * Exclusive queue states. Originally there were only two: "throttled" and 229 * "not throttled". It was natural to encode these in the queue window size. 230 * After 10 years it's not practical to rip out all the working code and 231 * change representations, so we just clean up the names a little. 232 * 233 * Note: only the "ready" state can reach every state (including itself); 234 * non-ready states can reach only the "ready" state. Other transitions are 235 * forbidden, because they would result in dangling event handlers. 236 */ 237#define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */ 238#define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */ 239#define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */ 240#define QMGR_QUEUE_STAT_BAD -3 /* can't happen */ 241 242#define QMGR_QUEUE_READY(q) ((q)->window > 0) 243#define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED) 244#define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED) 245#define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED) 246#define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD) 247 248#define QMGR_QUEUE_STATUS(q) ( \ 249 QMGR_QUEUE_READY(q) ? "ready" : \ 250 QMGR_QUEUE_THROTTLED(q) ? "throttled" : \ 251 QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \ 252 QMGR_QUEUE_SAVED(q) ? "saved" : \ 253 "invalid queue status" \ 254 ) 255 256 /* 257 * Structure of one next-hop queue entry. In order to save some copying 258 * effort we allow multiple recipients per transaction. 259 */ 260struct QMGR_ENTRY { 261 VSTREAM *stream; /* delivery process */ 262 QMGR_MESSAGE *message; /* message info */ 263 RECIPIENT_LIST rcpt_list; /* as many as it takes */ 264 QMGR_QUEUE *queue; /* parent linkage */ 265 QMGR_ENTRY_LIST peers; /* neighbor entries */ 266}; 267 268extern QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *); 269extern void qmgr_entry_unselect(QMGR_QUEUE *, QMGR_ENTRY *); 270extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *); 271extern void qmgr_entry_done(QMGR_ENTRY *, int); 272extern QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *, QMGR_MESSAGE *); 273 274 /* 275 * All common in-core information about a message is kept here. When all 276 * recipients have been tried the message file is linked to the "deferred" 277 * queue (some hosts not reachable), to the "bounce" queue (some recipients 278 * were rejected), and is then removed from the "active" queue. 279 */ 280struct QMGR_MESSAGE { 281 int flags; /* delivery problems */ 282 int qflags; /* queuing flags */ 283 int tflags; /* tracing flags */ 284 long tflags_offset; /* offset for killing */ 285 int rflags; /* queue file read flags */ 286 VSTREAM *fp; /* open queue file or null */ 287 int refcount; /* queue entries */ 288 int single_rcpt; /* send one rcpt at a time */ 289 struct timeval arrival_time; /* start of receive transaction */ 290 time_t create_time; /* queue file create time */ 291 struct timeval active_time; /* time of entry into active queue */ 292 long warn_offset; /* warning bounce flag offset */ 293 time_t warn_time; /* time next warning to be sent */ 294 long data_offset; /* data seek offset */ 295 char *queue_name; /* queue name */ 296 char *queue_id; /* queue file */ 297 char *encoding; /* content encoding */ 298 char *sender; /* complete address */ 299 char *dsn_envid; /* DSN envelope ID */ 300 int dsn_ret; /* DSN headers/full */ 301 char *verp_delims; /* VERP delimiters */ 302 char *filter_xport; /* filtering transport */ 303 char *inspect_xport; /* inspecting transport */ 304 char *redirect_addr; /* info@spammer.tld */ 305 long data_size; /* data segment size */ 306 long cont_length; /* message content length */ 307 long rcpt_offset; /* more recipients here */ 308 char *client_name; /* client hostname */ 309 char *client_addr; /* client address */ 310 char *client_port; /* client port */ 311 char *client_proto; /* client protocol */ 312 char *client_helo; /* helo parameter */ 313 char *sasl_method; /* SASL method */ 314 char *sasl_username; /* SASL user name */ 315 char *sasl_sender; /* SASL sender */ 316 char *log_ident; /* up-stream queue ID */ 317 char *rewrite_context; /* address qualification */ 318 RECIPIENT_LIST rcpt_list; /* complete addresses */ 319}; 320 321 /* 322 * Flags 0-15 are reserved for qmgr_user.h. 323 */ 324#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16) 325 326#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1) 327 328extern int qmgr_message_count; 329extern int qmgr_recipient_count; 330 331extern void qmgr_message_free(QMGR_MESSAGE *); 332extern void qmgr_message_update_warn(QMGR_MESSAGE *); 333extern void qmgr_message_kill_record(QMGR_MESSAGE *, long); 334extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t); 335extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *); 336 337#define QMGR_MSG_STATS(stats, message) \ 338 MSG_STATS_INIT2(stats, \ 339 incoming_arrival, message->arrival_time, \ 340 active_arrival, message->active_time) 341 342 /* 343 * qmgr_defer.c 344 */ 345extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *); 346extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *); 347extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); 348 349 /* 350 * qmgr_bounce.c 351 */ 352extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); 353 354 /* 355 * qmgr_deliver.c 356 */ 357extern int qmgr_deliver_concurrency; 358extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *); 359 360 /* 361 * qmgr_active.c 362 */ 363extern int qmgr_active_feed(QMGR_SCAN *, const char *); 364extern void qmgr_active_drain(void); 365extern void qmgr_active_done(QMGR_MESSAGE *); 366 367 /* 368 * qmgr_move.c 369 */ 370extern void qmgr_move(const char *, const char *, time_t); 371 372 /* 373 * qmgr_enable.c 374 */ 375extern void qmgr_enable_all(void); 376extern void qmgr_enable_transport(QMGR_TRANSPORT *); 377extern void qmgr_enable_queue(QMGR_QUEUE *); 378 379 /* 380 * Queue scan context. 381 */ 382struct QMGR_SCAN { 383 char *queue; /* queue name */ 384 int flags; /* private, this run */ 385 int nflags; /* private, next run */ 386 struct SCAN_DIR *handle; /* scan */ 387}; 388 389 /* 390 * Flags that control queue scans or destination selection. These are 391 * similar to the QMGR_REQ_XXX request codes. 392 */ 393#define QMGR_SCAN_START (1<<0) /* start now/restart when done */ 394#define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */ 395#define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */ 396#define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */ 397#define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */ 398 399 /* 400 * qmgr_scan.c 401 */ 402extern QMGR_SCAN *qmgr_scan_create(const char *); 403extern void qmgr_scan_request(QMGR_SCAN *, int); 404extern char *qmgr_scan_next(QMGR_SCAN *); 405 406 /* 407 * qmgr_error.c 408 */ 409extern QMGR_TRANSPORT *qmgr_error_transport(const char *); 410extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *); 411extern char *qmgr_error_nexthop(DSN *); 412 413/* LICENSE 414/* .ad 415/* .fi 416/* The Secure Mailer license must be distributed with this software. 417/* AUTHOR(S) 418/* Wietse Venema 419/* IBM T.J. Watson Research 420/* P.O. Box 704 421/* Yorktown Heights, NY 10598, USA 422/*--*/ 423