1/* $NetBSD: qmgr_entry.c,v 1.3 2022/10/08 16:12:46 christos Exp $ */ 2 3/*++ 4/* NAME 5/* qmgr_entry 3 6/* SUMMARY 7/* per-site queue entries 8/* SYNOPSIS 9/* #include "qmgr.h" 10/* 11/* QMGR_ENTRY *qmgr_entry_create(queue, message) 12/* QMGR_QUEUE *queue; 13/* QMGR_MESSAGE *message; 14/* 15/* void qmgr_entry_done(entry, which) 16/* QMGR_ENTRY *entry; 17/* int which; 18/* 19/* QMGR_ENTRY *qmgr_entry_select(queue) 20/* QMGR_QUEUE *queue; 21/* 22/* void qmgr_entry_unselect(queue, entry) 23/* QMGR_QUEUE *queue; 24/* QMGR_ENTRY *entry; 25/* 26/* void qmgr_entry_move_todo(dst, entry) 27/* QMGR_QUEUE *dst; 28/* QMGR_ENTRY *entry; 29/* DESCRIPTION 30/* These routines add/delete/manipulate per-site message 31/* delivery requests. 32/* 33/* qmgr_entry_create() creates an entry for the named queue and 34/* message, and appends the entry to the queue's todo list. 35/* Filling in and cleaning up the recipients is the responsibility 36/* of the caller. 37/* 38/* qmgr_entry_done() discards a per-site queue entry. The 39/* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry 40/* of the site's `busy' list (i.e. queue entries that have been 41/* selected for actual delivery), or QMGR_QUEUE_TODO for an entry 42/* of the site's `todo' list (i.e. queue entries awaiting selection 43/* for actual delivery). 44/* 45/* qmgr_entry_done() triggers cleanup of the per-site queue when 46/* the site has no pending deliveries, and the site is either 47/* alive, or the site is dead and the number of in-core queues 48/* exceeds a configurable limit (see qmgr_queue_done()). 49/* 50/* qmgr_entry_done() triggers special action when the last in-core 51/* queue entry for a message is done with: either read more 52/* recipients from the queue file, delete the queue file, or move 53/* the queue file to the deferred queue; send bounce reports to the 54/* message originator (see qmgr_active_done()). 55/* 56/* qmgr_entry_select() selects the next entry from the named 57/* per-site queue's `todo' list for actual delivery. The entry is 58/* moved to the queue's `busy' list: the list of messages being 59/* delivered. 60/* 61/* qmgr_entry_unselect() takes the named entry off the named 62/* per-site queue's `busy' list and moves it to the queue's 63/* `todo' list. 64/* 65/* qmgr_entry_move_todo() moves the specified "todo" queue entry 66/* to the specified "todo" queue. 67/* DIAGNOSTICS 68/* Panic: interface violations, internal inconsistencies. 69/* LICENSE 70/* .ad 71/* .fi 72/* The Secure Mailer license must be distributed with this software. 73/* AUTHOR(S) 74/* Wietse Venema 75/* IBM T.J. Watson Research 76/* P.O. Box 704 77/* Yorktown Heights, NY 10598, USA 78/*--*/ 79 80/* System library. */ 81 82#include <sys_defs.h> 83#include <stdlib.h> 84#include <time.h> 85 86/* Utility library. */ 87 88#include <msg.h> 89#include <mymalloc.h> 90#include <events.h> 91#include <vstream.h> 92 93/* Global library. */ 94 95#include <mail_params.h> 96#include <deliver_request.h> /* opportunistic session caching */ 97 98/* Application-specific. */ 99 100#include "qmgr.h" 101 102/* qmgr_entry_select - select queue entry for delivery */ 103 104QMGR_ENTRY *qmgr_entry_select(QMGR_QUEUE *queue) 105{ 106 const char *myname = "qmgr_entry_select"; 107 QMGR_ENTRY *entry; 108 109 if ((entry = queue->todo.prev) != 0) { 110 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry); 111 queue->todo_refcount--; 112 QMGR_LIST_APPEND(queue->busy, entry); 113 queue->busy_refcount++; 114 115 /* 116 * With opportunistic session caching, the delivery agent must not 117 * only 1) save a session upon completion, but also 2) reuse a cached 118 * session upon the next delivery request. In order to not miss out 119 * on 2), we have to make caching sticky or else we get silly 120 * behavior when the in-memory queue drains. Specifically, new 121 * connections must not be made as long as cached connections exist. 122 * 123 * Safety: don't enable opportunistic session caching unless the queue 124 * manager is able to schedule concurrent or back-to-back deliveries 125 * (we need to recognize back-to-back deliveries for transports with 126 * concurrency 1). 127 * 128 * If caching has previously been enabled, but is not now, fetch any 129 * existing entries from the cache, but don't add new ones. 130 */ 131#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \ 132 (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY()) 133 134#define BACK_TO_BACK_DELIVERY() \ 135 (queue->last_done + 1 >= event_time()) 136 137 /* 138 * Turn on session caching after we get up to speed. Don't enable 139 * session caching just because we have concurrent deliveries. This 140 * prevents unnecessary session caching when we have a burst of mail 141 * <= the initial concurrency limit. 142 */ 143 if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) { 144 if (BACK_TO_BACK_DELIVERY()) { 145 if (msg_verbose) 146 msg_info("%s: allowing on-demand session caching for %s", 147 myname, queue->name); 148 queue->dflags |= DEL_REQ_FLAG_CONN_MASK; 149 } 150 } 151 152 /* 153 * Turn off session caching when concurrency drops and we're running 154 * out of steam. This is what prevents from turning off session 155 * caching too early, and from making new connections while old ones 156 * are still cached. 157 */ 158 else { 159 if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) { 160 if (msg_verbose) 161 msg_info("%s: disallowing on-demand session caching for %s", 162 myname, queue->name); 163 queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE; 164 } 165 } 166 } 167 return (entry); 168} 169 170/* qmgr_entry_unselect - unselect queue entry for delivery */ 171 172void qmgr_entry_unselect(QMGR_QUEUE *queue, QMGR_ENTRY *entry) 173{ 174 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry); 175 queue->busy_refcount--; 176 QMGR_LIST_APPEND(queue->todo, entry); 177 queue->todo_refcount++; 178} 179 180/* qmgr_entry_move_todo - move entry between todo queues */ 181 182void qmgr_entry_move_todo(QMGR_QUEUE *dst, QMGR_ENTRY *entry) 183{ 184 const char *myname = "qmgr_entry_move_todo"; 185 QMGR_MESSAGE *message = entry->message; 186 QMGR_QUEUE *src = entry->queue; 187 QMGR_ENTRY *new_entry; 188 189 if (entry->stream != 0) 190 msg_panic("%s: queue %s entry is busy", myname, src->name); 191 if (QMGR_QUEUE_THROTTLED(dst)) 192 msg_panic("%s: destination queue %s is throttled", myname, dst->name); 193 if (QMGR_TRANSPORT_THROTTLED(dst->transport)) 194 msg_panic("%s: destination transport %s is throttled", 195 myname, dst->transport->name); 196 197 /* 198 * Create new entry, swap the recipients between the old and new entries, 199 * then dispose of the old entry. This gives us any end-game actions that 200 * are implemented by qmgr_entry_done(), so we don't have to duplicate 201 * those actions here. 202 * 203 * XXX This does not enforce the per-entry recipient limit, but that is not 204 * a problem as long as qmgr_entry_move_todo() is called only to bounce 205 * or defer mail. 206 */ 207 new_entry = qmgr_entry_create(dst, message); 208 recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list); 209 qmgr_entry_done(entry, QMGR_QUEUE_TODO); 210} 211 212/* qmgr_entry_done - dispose of queue entry */ 213 214void qmgr_entry_done(QMGR_ENTRY *entry, int which) 215{ 216 const char *myname = "qmgr_entry_done"; 217 QMGR_QUEUE *queue = entry->queue; 218 QMGR_MESSAGE *message = entry->message; 219 QMGR_TRANSPORT *transport = queue->transport; 220 221 /* 222 * Take this entry off the in-core queue. 223 */ 224 if (entry->stream != 0) 225 msg_panic("%s: file is open", myname); 226 if (which == QMGR_QUEUE_BUSY) { 227 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry); 228 queue->busy_refcount--; 229 } else if (which == QMGR_QUEUE_TODO) { 230 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry); 231 queue->todo_refcount--; 232 } else { 233 msg_panic("%s: bad queue spec: %d", myname, which); 234 } 235 236 /* 237 * Free the recipient list and decrease the in-core recipient count 238 * accordingly. 239 */ 240 qmgr_recipient_count -= entry->rcpt_list.len; 241 recipient_list_free(&entry->rcpt_list); 242 243 myfree((void *) entry); 244 245 /* 246 * Maintain back-to-back delivery status. 247 */ 248 if (which == QMGR_QUEUE_BUSY) 249 queue->last_done = event_time(); 250 251 /* 252 * Suspend a rate-limited queue, so that mail trickles out. 253 */ 254 if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { 255 if (queue->window > 1) 256 msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", 257 myname, transport->name, queue->name, queue->window); 258 if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ 259 qmgr_queue_unthrottle(queue); 260 if (QMGR_QUEUE_READY(queue)) 261 qmgr_queue_suspend(queue, transport->rate_delay); 262 } 263 264 /* 265 * When the in-core queue for this site is empty and when this site is 266 * not dead, discard the in-core queue. When this site is dead, but the 267 * number of in-core queues exceeds some threshold, get rid of this 268 * in-core queue anyway, in order to avoid running out of memory. 269 * 270 * See also: qmgr_entry_move_todo(). 271 */ 272 if (queue->todo.next == 0 && queue->busy.next == 0) { 273 if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit) 274 qmgr_queue_unthrottle(queue); 275 if (QMGR_QUEUE_READY(queue)) 276 qmgr_queue_done(queue); 277 } 278 279 /* 280 * Update the in-core message reference count. When the in-core message 281 * structure has no more references, dispose of the message. 282 * 283 * When the in-core recipient count falls below a threshold, and this 284 * message has more recipients, read more recipients now. If we read more 285 * recipients as soon as the recipient count falls below the in-core 286 * recipient limit, we do not give other messages a chance until this 287 * message is delivered. That's good for mailing list deliveries, bad for 288 * one-to-one mail. If we wait until the in-core recipient count drops 289 * well below the in-core recipient limit, we give other mail a chance, 290 * but we also allow list deliveries to become interleaved. In the worst 291 * case, people near the start of a mailing list get a burst of postings 292 * today, while people near the end of the list get that same burst of 293 * postings a whole day later. 294 */ 295#define FUDGE(x) ((x) * (var_qmgr_fudge / 100.0)) 296 message->refcount--; 297 if (message->rcpt_offset > 0 298 && qmgr_recipient_count < FUDGE(var_qmgr_rcpt_limit) - 100) 299 qmgr_message_realloc(message); 300 if (message->refcount == 0) 301 qmgr_active_done(message); 302} 303 304/* qmgr_entry_create - create queue todo entry */ 305 306QMGR_ENTRY *qmgr_entry_create(QMGR_QUEUE *queue, QMGR_MESSAGE *message) 307{ 308 QMGR_ENTRY *entry; 309 310 /* 311 * Sanity check. 312 */ 313 if (QMGR_QUEUE_THROTTLED(queue)) 314 msg_panic("qmgr_entry_create: dead queue: %s", queue->name); 315 316 /* 317 * Create the delivery request. 318 */ 319 entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY)); 320 entry->stream = 0; 321 entry->message = message; 322 recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE); 323 message->refcount++; 324 entry->queue = queue; 325 QMGR_LIST_APPEND(queue->todo, entry); 326 queue->todo_refcount++; 327 328 /* 329 * Warn if a destination is falling behind while the active queue 330 * contains a non-trivial amount of single-recipient email. When a 331 * destination takes up more and more space in the active queue, then 332 * other mail will not get through and delivery performance will suffer. 333 * 334 * XXX At this point in the code, the busy reference count is still less 335 * than the concurrency limit (otherwise this code would not be invoked 336 * in the first place) so we have to make some awkward adjustments 337 * below. 338 * 339 * XXX The queue length test below looks at the active queue share of an 340 * individual destination. This catches the case where mail for one 341 * destination is falling behind because it has to round-robin compete 342 * with many other destinations. However, Postfix will also perform 343 * poorly when most of the active queue is tied up by a small number of 344 * concurrency limited destinations. The queue length test below detects 345 * such conditions only indirectly. 346 * 347 * XXX This code does not detect the case that the active queue is being 348 * starved because incoming mail is pounding the disk. 349 */ 350 if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) { 351 int queue_length = queue->todo_refcount + queue->busy_refcount; 352 time_t now; 353 QMGR_TRANSPORT *transport; 354 double active_share; 355 356 if (queue_length > var_qmgr_active_limit / 5 357 && (now = event_time()) >= queue->clog_time_to_warn) { 358 active_share = queue_length / (double) qmgr_message_count; 359 msg_warn("mail for %s is using up %d of %d active queue entries", 360 queue->nexthop, queue_length, qmgr_message_count); 361 if (active_share < 0.9) 362 msg_warn("this may slow down other mail deliveries"); 363 transport = queue->transport; 364 if (transport->dest_concurrency_limit > 0 365 && transport->dest_concurrency_limit <= queue->busy_refcount + 1) 366 msg_warn("you may need to increase the main.cf %s%s from %d", 367 transport->name, _DEST_CON_LIMIT, 368 transport->dest_concurrency_limit); 369 else if (queue->window > var_qmgr_active_limit * active_share) 370 msg_warn("you may need to increase the main.cf %s from %d", 371 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit); 372 else if (queue->peers.next != queue->peers.prev) 373 msg_warn("you may need a separate master.cf transport for %s", 374 queue->nexthop); 375 else { 376 msg_warn("you may need to reduce %s connect and helo timeouts", 377 transport->name); 378 msg_warn("so that Postfix quickly skips unavailable hosts"); 379 msg_warn("you may need to increase the main.cf %s and %s", 380 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME); 381 msg_warn("so that Postfix wastes less time on undeliverable mail"); 382 msg_warn("you may need to increase the master.cf %s process limit", 383 transport->name); 384 } 385 msg_warn("please avoid flushing the whole queue when you have"); 386 msg_warn("lots of deferred mail, that is bad for performance"); 387 msg_warn("to turn off these warnings specify: %s = 0", 388 VAR_QMGR_CLOG_WARN_TIME); 389 queue->clog_time_to_warn = now + var_qmgr_clog_warn_time; 390 } 391 } 392 return (entry); 393} 394