1/* $NetBSD$ */ 2 3/*++ 4/* NAME 5/* qmgr 3h 6/* SUMMARY 7/* queue manager data structures 8/* SYNOPSIS 9/* #include "qmgr.h" 10/* DESCRIPTION 11/* .nf 12 13 /* 14 * System library. 15 */ 16#include <sys/time.h> 17#include <time.h> 18 19 /* 20 * Utility library. 21 */ 22#include <vstream.h> 23#include <scan_dir.h> 24 25 /* 26 * Global library. 27 */ 28#include <recipient_list.h> 29#include <dsn.h> 30 31 /* 32 * The queue manager is built around lots of mutually-referring structures. 33 * These typedefs save some typing. 34 */ 35typedef struct QMGR_TRANSPORT QMGR_TRANSPORT; 36typedef struct QMGR_QUEUE QMGR_QUEUE; 37typedef struct QMGR_ENTRY QMGR_ENTRY; 38typedef struct QMGR_MESSAGE QMGR_MESSAGE; 39typedef struct QMGR_JOB QMGR_JOB; 40typedef struct QMGR_PEER QMGR_PEER; 41typedef struct QMGR_TRANSPORT_LIST QMGR_TRANSPORT_LIST; 42typedef struct QMGR_QUEUE_LIST QMGR_QUEUE_LIST; 43typedef struct QMGR_ENTRY_LIST QMGR_ENTRY_LIST; 44typedef struct QMGR_JOB_LIST QMGR_JOB_LIST; 45typedef struct QMGR_PEER_LIST QMGR_PEER_LIST; 46typedef struct QMGR_SCAN QMGR_SCAN; 47typedef struct QMGR_FEEDBACK QMGR_FEEDBACK; 48 49 /* 50 * Hairy macros to update doubly-linked lists. 51 */ 52#define QMGR_LIST_ROTATE(head, object, peers) { \ 53 head.next->peers.prev = head.prev; \ 54 head.prev->peers.next = head.next; \ 55 head.next = object->peers.next; \ 56 head.next->peers.prev = 0; \ 57 head.prev = object; \ 58 object->peers.next = 0; \ 59} 60 61#define QMGR_LIST_UNLINK(head, type, object, peers) { \ 62 type _next = object->peers.next; \ 63 type _prev = object->peers.prev; \ 64 if (_prev) _prev->peers.next = _next; \ 65 else head.next = _next; \ 66 if (_next) _next->peers.prev = _prev; \ 67 else head.prev = _prev; \ 68 object->peers.next = object->peers.prev = 0; \ 69} 70 71#define QMGR_LIST_LINK(head, pred, object, succ, peers) { \ 72 object->peers.prev = pred; \ 73 object->peers.next = succ; \ 74 if (pred) pred->peers.next = object; \ 75 else head.next = object; \ 76 if (succ) succ->peers.prev = object; \ 77 else head.prev = object; \ 78} 79 80#define QMGR_LIST_PREPEND(head, object, peers) { \ 81 object->peers.next = head.next; \ 82 object->peers.prev = 0; \ 83 if (head.next) { \ 84 head.next->peers.prev = object; \ 85 } else { \ 86 head.prev = object; \ 87 } \ 88 head.next = object; \ 89} 90 91#define QMGR_LIST_APPEND(head, object, peers) { \ 92 object->peers.prev = head.prev; \ 93 object->peers.next = 0; \ 94 if (head.prev) { \ 95 head.prev->peers.next = object; \ 96 } else { \ 97 head.next = object; \ 98 } \ 99 head.prev = object; \ 100} 101 102#define QMGR_LIST_INIT(head) { \ 103 head.prev = 0; \ 104 head.next = 0; \ 105} 106 107 /* 108 * Transports are looked up by name (when we have resolved a message), or 109 * round-robin wise (when we want to distribute resources fairly). 110 */ 111struct QMGR_TRANSPORT_LIST { 112 QMGR_TRANSPORT *next; 113 QMGR_TRANSPORT *prev; 114}; 115 116extern struct HTABLE *qmgr_transport_byname; /* transport by name */ 117extern QMGR_TRANSPORT_LIST qmgr_transport_list; /* transports, round robin */ 118 119 /* 120 * Delivery agents provide feedback, as hints that Postfix should expend 121 * more or fewer resources on a specific destination domain. The main.cf 122 * file specifies how feedback affects delivery concurrency: add/subtract a 123 * constant, a ratio of constants, or a constant divided by the delivery 124 * concurrency; and it specifies how much feedback must accumulate between 125 * concurrency updates. 126 */ 127struct QMGR_FEEDBACK { 128 int hysteresis; /* to pass, need to be this tall */ 129 double base; /* pre-computed from main.cf */ 130 int index; /* none, window, sqrt(window) */ 131}; 132 133#define QMGR_FEEDBACK_IDX_NONE 0 /* no window dependence */ 134#define QMGR_FEEDBACK_IDX_WIN 1 /* 1/window dependence */ 135#if 0 136#define QMGR_FEEDBACK_IDX_SQRT_WIN 2 /* 1/sqrt(window) dependence */ 137#endif 138 139#ifdef QMGR_FEEDBACK_IDX_SQRT_WIN 140#include <math.h> 141#endif 142 143extern void qmgr_feedback_init(QMGR_FEEDBACK *, const char *, const char *, const char *, const char *); 144 145#ifndef QMGR_FEEDBACK_IDX_SQRT_WIN 146#define QMGR_FEEDBACK_VAL(fb, win) \ 147 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : (fb).base / (win)) 148#else 149#define QMGR_FEEDBACK_VAL(fb, win) \ 150 ((fb).index == QMGR_FEEDBACK_IDX_NONE ? (fb).base : \ 151 (fb).index == QMGR_FEEDBACK_IDX_WIN ? (fb).base / (win) : \ 152 (fb).base / sqrt(win)) 153#endif 154 155 /* 156 * Each transport (local, smtp-out, bounce) can have one queue per next hop 157 * name. Queues are looked up by next hop name (when we have resolved a 158 * message destination), or round-robin wise (when we want to deliver 159 * messages fairly). 160 */ 161struct QMGR_QUEUE_LIST { 162 QMGR_QUEUE *next; 163 QMGR_QUEUE *prev; 164}; 165 166struct QMGR_JOB_LIST { 167 QMGR_JOB *next; 168 QMGR_JOB *prev; 169}; 170 171struct QMGR_TRANSPORT { 172 int flags; /* blocked, etc. */ 173 int pending; /* incomplete DA connections */ 174 char *name; /* transport name */ 175 int dest_concurrency_limit; /* concurrency per domain */ 176 int init_dest_concurrency; /* init. per-domain concurrency */ 177 int recipient_limit; /* recipients per transaction */ 178 int rcpt_per_stack; /* extra slots reserved for jobs put 179 * on the job stack */ 180 int rcpt_unused; /* available in-core recipient slots */ 181 int refill_limit; /* recipient batch size for message 182 * refill */ 183 int refill_delay; /* delay before message refill */ 184 int slot_cost; /* cost of new preemption slot (# of 185 * selected entries) */ 186 int slot_loan; /* preemption boost offset and */ 187 int slot_loan_factor; /* factor, see qmgr_job_preempt() */ 188 int min_slots; /* when preemption can take effect at 189 * all */ 190 struct HTABLE *queue_byname; /* queues indexed by domain */ 191 QMGR_QUEUE_LIST queue_list; /* queues, round robin order */ 192 struct HTABLE *job_byname; /* jobs indexed by queue id */ 193 QMGR_JOB_LIST job_list; /* list of message jobs (1 per 194 * message) ordered by scheduler */ 195 QMGR_JOB_LIST job_bytime; /* jobs ordered by time since queued */ 196 QMGR_JOB *job_current; /* keeps track of the current job */ 197 QMGR_JOB *job_next_unread; /* next job with unread recipients */ 198 QMGR_JOB *candidate_cache; /* cached result from 199 * qmgr_job_candidate() */ 200 QMGR_JOB *candidate_cache_current; /* current job tied to the candidate */ 201 time_t candidate_cache_time; /* when candidate_cache was last 202 * updated */ 203 int blocker_tag; /* for marking blocker jobs */ 204 QMGR_TRANSPORT_LIST peers; /* linkage */ 205 DSN *dsn; /* why unavailable */ 206 QMGR_FEEDBACK pos_feedback; /* positive feedback control */ 207 QMGR_FEEDBACK neg_feedback; /* negative feedback control */ 208 int fail_cohort_limit; /* flow shutdown control */ 209 int rate_delay; /* suspend per delivery */ 210}; 211 212#define QMGR_TRANSPORT_STAT_DEAD (1<<1) 213 214typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); 215extern QMGR_TRANSPORT *qmgr_transport_select(void); 216extern void qmgr_transport_alloc(QMGR_TRANSPORT *, QMGR_TRANSPORT_ALLOC_NOTIFY); 217extern void qmgr_transport_throttle(QMGR_TRANSPORT *, DSN *); 218extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); 219extern QMGR_TRANSPORT *qmgr_transport_create(const char *); 220extern QMGR_TRANSPORT *qmgr_transport_find(const char *); 221 222#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) 223 224 /* 225 * Each next hop (e.g., a domain name) has its own queue of pending message 226 * transactions. The "todo" queue contains messages that are to be delivered 227 * to this next hop. When a message is elected for transmission, it is moved 228 * from the "todo" queue to the "busy" queue. Messages are taken from the 229 * "todo" queue in round-robin order. 230 */ 231struct QMGR_ENTRY_LIST { 232 QMGR_ENTRY *next; 233 QMGR_ENTRY *prev; 234}; 235 236struct QMGR_QUEUE { 237 int dflags; /* delivery request options */ 238 time_t last_done; /* last delivery completion */ 239 char *name; /* domain name or address */ 240 char *nexthop; /* domain name */ 241 int todo_refcount; /* queue entries (todo list) */ 242 int busy_refcount; /* queue entries (busy list) */ 243 int window; /* slow open algorithm */ 244 double success; /* accumulated positive feedback */ 245 double failure; /* accumulated negative feedback */ 246 double fail_cohorts; /* pseudo-cohort failure count */ 247 QMGR_TRANSPORT *transport; /* transport linkage */ 248 QMGR_ENTRY_LIST todo; /* todo queue entries */ 249 QMGR_ENTRY_LIST busy; /* messages on the wire */ 250 QMGR_QUEUE_LIST peers; /* neighbor queues */ 251 DSN *dsn; /* why unavailable */ 252 time_t clog_time_to_warn; /* time of last warning */ 253 int blocker_tag; /* tagged if blocks job list */ 254}; 255 256#define QMGR_QUEUE_TODO 1 /* waiting for service */ 257#define QMGR_QUEUE_BUSY 2 /* recipients on the wire */ 258 259extern int qmgr_queue_count; 260 261extern QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *, const char *, const char *); 262extern void qmgr_queue_done(QMGR_QUEUE *); 263extern void qmgr_queue_throttle(QMGR_QUEUE *, DSN *); 264extern void qmgr_queue_unthrottle(QMGR_QUEUE *); 265extern QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *, const char *); 266extern void qmgr_queue_suspend(QMGR_QUEUE *, int); 267 268 /* 269 * Exclusive queue states. Originally there were only two: "throttled" and 270 * "not throttled". It was natural to encode these in the queue window size. 271 * After 10 years it's not practical to rip out all the working code and 272 * change representations, so we just clean up the names a little. 273 * 274 * Note: only the "ready" state can reach every state (including itself); 275 * non-ready states can reach only the "ready" state. Other transitions are 276 * forbidden, because they would result in dangling event handlers. 277 */ 278#define QMGR_QUEUE_STAT_THROTTLED 0 /* back-off timer */ 279#define QMGR_QUEUE_STAT_SUSPENDED -1 /* voluntary delay timer */ 280#define QMGR_QUEUE_STAT_SAVED -2 /* delayed cleanup timer */ 281#define QMGR_QUEUE_STAT_BAD -3 /* can't happen */ 282 283#define QMGR_QUEUE_READY(q) ((q)->window > 0) 284#define QMGR_QUEUE_THROTTLED(q) ((q)->window == QMGR_QUEUE_STAT_THROTTLED) 285#define QMGR_QUEUE_SUSPENDED(q) ((q)->window == QMGR_QUEUE_STAT_SUSPENDED) 286#define QMGR_QUEUE_SAVED(q) ((q)->window == QMGR_QUEUE_STAT_SAVED) 287#define QMGR_QUEUE_BAD(q) ((q)->window <= QMGR_QUEUE_STAT_BAD) 288 289#define QMGR_QUEUE_STATUS(q) ( \ 290 QMGR_QUEUE_READY(q) ? "ready" : \ 291 QMGR_QUEUE_THROTTLED(q) ? "throttled" : \ 292 QMGR_QUEUE_SUSPENDED(q) ? "suspended" : \ 293 QMGR_QUEUE_SAVED(q) ? "saved" : \ 294 "invalid queue status" \ 295 ) 296 297 /* 298 * Structure of one next-hop queue entry. In order to save some copying 299 * effort we allow multiple recipients per transaction. 300 */ 301struct QMGR_ENTRY { 302 VSTREAM *stream; /* delivery process */ 303 QMGR_MESSAGE *message; /* message info */ 304 RECIPIENT_LIST rcpt_list; /* as many as it takes */ 305 QMGR_QUEUE *queue; /* parent linkage */ 306 QMGR_PEER *peer; /* parent linkage */ 307 QMGR_ENTRY_LIST queue_peers; /* per queue neighbor entries */ 308 QMGR_ENTRY_LIST peer_peers; /* per peer neighbor entries */ 309}; 310 311extern QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *); 312extern void qmgr_entry_unselect(QMGR_ENTRY *); 313extern void qmgr_entry_move_todo(QMGR_QUEUE *, QMGR_ENTRY *); 314extern void qmgr_entry_done(QMGR_ENTRY *, int); 315extern QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *, QMGR_MESSAGE *); 316 317 /* 318 * All common in-core information about a message is kept here. When all 319 * recipients have been tried the message file is linked to the "deferred" 320 * queue (some hosts not reachable), to the "bounce" queue (some recipients 321 * were rejected), and is then removed from the "active" queue. 322 */ 323struct QMGR_MESSAGE { 324 int flags; /* delivery problems */ 325 int qflags; /* queuing flags */ 326 int tflags; /* tracing flags */ 327 long tflags_offset; /* offset for killing */ 328 int rflags; /* queue file read flags */ 329 VSTREAM *fp; /* open queue file or null */ 330 int refcount; /* queue entries */ 331 int single_rcpt; /* send one rcpt at a time */ 332 struct timeval arrival_time; /* start of receive transaction */ 333 time_t create_time; /* queue file create time */ 334 struct timeval active_time; /* time of entry into active queue */ 335 time_t queued_time; /* sanitized time when moved to the 336 * active queue */ 337 time_t refill_time; /* sanitized time of last message 338 * refill */ 339 long warn_offset; /* warning bounce flag offset */ 340 time_t warn_time; /* time next warning to be sent */ 341 long data_offset; /* data seek offset */ 342 char *queue_name; /* queue name */ 343 char *queue_id; /* queue file */ 344 char *encoding; /* content encoding */ 345 char *sender; /* complete address */ 346 char *dsn_envid; /* DSN envelope ID */ 347 int dsn_ret; /* DSN headers/full */ 348 char *verp_delims; /* VERP delimiters */ 349 char *filter_xport; /* filtering transport */ 350 char *inspect_xport; /* inspecting transport */ 351 char *redirect_addr; /* info@spammer.tld */ 352 long data_size; /* data segment size */ 353 long cont_length; /* message content length */ 354 long rcpt_offset; /* more recipients here */ 355 char *client_name; /* client hostname */ 356 char *client_addr; /* client address */ 357 char *client_port; /* client port */ 358 char *client_proto; /* client protocol */ 359 char *client_helo; /* helo parameter */ 360 char *sasl_method; /* SASL method */ 361 char *sasl_username; /* SASL user name */ 362 char *sasl_sender; /* SASL sender */ 363 char *log_ident; /* up-stream queue ID */ 364 char *rewrite_context; /* address qualification */ 365 RECIPIENT_LIST rcpt_list; /* complete addresses */ 366 int rcpt_count; /* used recipient slots */ 367 int rcpt_limit; /* maximum read in-core */ 368 int rcpt_unread; /* # of recipients left in queue file */ 369 QMGR_JOB_LIST job_list; /* jobs delivering this message (1 370 * per transport) */ 371}; 372 373 /* 374 * Flags 0-15 are reserved for qmgr_user.h. 375 */ 376#define QMGR_READ_FLAG_SEEN_ALL_NON_RCPT (1<<16) 377 378#define QMGR_MESSAGE_LOCKED ((QMGR_MESSAGE *) 1) 379 380extern int qmgr_message_count; 381extern int qmgr_recipient_count; 382 383extern void qmgr_message_free(QMGR_MESSAGE *); 384extern void qmgr_message_update_warn(QMGR_MESSAGE *); 385extern void qmgr_message_kill_record(QMGR_MESSAGE *, long); 386extern QMGR_MESSAGE *qmgr_message_alloc(const char *, const char *, int, mode_t); 387extern QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *); 388 389#define QMGR_MSG_STATS(stats, message) \ 390 MSG_STATS_INIT2(stats, \ 391 incoming_arrival, message->arrival_time, \ 392 active_arrival, message->active_time) 393 394 /* 395 * Sometimes it's required to access the transport queues and entries on per 396 * message basis. That's what the QMGR_JOB structure is for - it groups all 397 * per message information within each transport using a list of QMGR_PEER 398 * structures. These structures in turn correspond with per message 399 * QMGR_QUEUE structure and list all per message QMGR_ENTRY structures. 400 */ 401struct QMGR_PEER_LIST { 402 QMGR_PEER *next; 403 QMGR_PEER *prev; 404}; 405 406struct QMGR_JOB { 407 QMGR_MESSAGE *message; /* message delivered by this job */ 408 QMGR_TRANSPORT *transport; /* transport this job belongs to */ 409 QMGR_JOB_LIST message_peers; /* per message neighbor linkage */ 410 QMGR_JOB_LIST transport_peers; /* per transport neighbor linkage */ 411 QMGR_JOB_LIST time_peers; /* by time neighbor linkage */ 412 QMGR_JOB *stack_parent; /* stack parent */ 413 QMGR_JOB_LIST stack_children; /* all stack children */ 414 QMGR_JOB_LIST stack_siblings; /* stack children linkage */ 415 int stack_level; /* job stack nesting level (-1 means 416 * it's not on the lists at all) */ 417 int blocker_tag; /* tagged if blocks the job list */ 418 struct HTABLE *peer_byname; /* message job peers, indexed by 419 * domain */ 420 QMGR_PEER_LIST peer_list; /* list of message job peers */ 421 int slots_used; /* slots used during preemption */ 422 int slots_available; /* slots available for preemption (in 423 * multiples of slot_cost) */ 424 int selected_entries; /* # of entries selected for delivery 425 * so far */ 426 int read_entries; /* # of entries read in-core so far */ 427 int rcpt_count; /* used recipient slots */ 428 int rcpt_limit; /* available recipient slots */ 429}; 430 431struct QMGR_PEER { 432 QMGR_JOB *job; /* job handling this peer */ 433 QMGR_QUEUE *queue; /* queue corresponding with this peer */ 434 int refcount; /* peer entries */ 435 QMGR_ENTRY_LIST entry_list; /* todo message entries queued for 436 * this peer */ 437 QMGR_PEER_LIST peers; /* neighbor linkage */ 438}; 439 440extern QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *); 441extern QMGR_PEER *qmgr_peer_select(QMGR_JOB *); 442extern void qmgr_job_blocker_update(QMGR_QUEUE *); 443 444extern QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *, QMGR_TRANSPORT *); 445extern void qmgr_job_free(QMGR_JOB *); 446extern void qmgr_job_move_limits(QMGR_JOB *); 447 448extern QMGR_PEER *qmgr_peer_create(QMGR_JOB *, QMGR_QUEUE *); 449extern QMGR_PEER *qmgr_peer_find(QMGR_JOB *, QMGR_QUEUE *); 450extern QMGR_PEER *qmgr_peer_obtain(QMGR_JOB *, QMGR_QUEUE *); 451extern void qmgr_peer_free(QMGR_PEER *); 452 453 /* 454 * qmgr_defer.c 455 */ 456extern void qmgr_defer_transport(QMGR_TRANSPORT *, DSN *); 457extern void qmgr_defer_todo(QMGR_QUEUE *, DSN *); 458extern void qmgr_defer_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); 459 460 /* 461 * qmgr_bounce.c 462 */ 463extern void qmgr_bounce_recipient(QMGR_MESSAGE *, RECIPIENT *, DSN *); 464 465 /* 466 * qmgr_deliver.c 467 */ 468extern int qmgr_deliver_concurrency; 469extern void qmgr_deliver(QMGR_TRANSPORT *, VSTREAM *); 470 471 /* 472 * qmgr_active.c 473 */ 474extern int qmgr_active_feed(QMGR_SCAN *, const char *); 475extern void qmgr_active_drain(void); 476extern void qmgr_active_done(QMGR_MESSAGE *); 477 478 /* 479 * qmgr_move.c 480 */ 481extern void qmgr_move(const char *, const char *, time_t); 482 483 /* 484 * qmgr_enable.c 485 */ 486extern void qmgr_enable_all(void); 487extern void qmgr_enable_transport(QMGR_TRANSPORT *); 488extern void qmgr_enable_queue(QMGR_QUEUE *); 489 490 /* 491 * Queue scan context. 492 */ 493struct QMGR_SCAN { 494 char *queue; /* queue name */ 495 int flags; /* private, this run */ 496 int nflags; /* private, next run */ 497 struct SCAN_DIR *handle; /* scan */ 498}; 499 500 /* 501 * Flags that control queue scans or destination selection. These are 502 * similar to the QMGR_REQ_XXX request codes. 503 */ 504#define QMGR_SCAN_START (1<<0) /* start now/restart when done */ 505#define QMGR_SCAN_ALL (1<<1) /* all queue file time stamps */ 506#define QMGR_FLUSH_ONCE (1<<2) /* unthrottle once */ 507#define QMGR_FLUSH_DFXP (1<<3) /* override defer_transports */ 508#define QMGR_FLUSH_EACH (1<<4) /* unthrottle per message */ 509 510 /* 511 * qmgr_scan.c 512 */ 513extern QMGR_SCAN *qmgr_scan_create(const char *); 514extern void qmgr_scan_request(QMGR_SCAN *, int); 515extern char *qmgr_scan_next(QMGR_SCAN *); 516 517 /* 518 * qmgr_error.c 519 */ 520extern QMGR_TRANSPORT *qmgr_error_transport(const char *); 521extern QMGR_QUEUE *qmgr_error_queue(const char *, DSN *); 522extern char *qmgr_error_nexthop(DSN *); 523 524/* LICENSE 525/* .ad 526/* .fi 527/* The Secure Mailer license must be distributed with this software. 528/* AUTHOR(S) 529/* Wietse Venema 530/* IBM T.J. Watson Research 531/* P.O. Box 704 532/* Yorktown Heights, NY 10598, USA 533/* 534/* Preemptive scheduler enhancements: 535/* Patrik Rak 536/* Modra 6 537/* 155 00, Prague, Czech Republic 538/*--*/ 539