1/*++ 2/* NAME 3/* qmgr_entry 3 4/* SUMMARY 5/* per-site queue entries 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* QMGR_ENTRY *qmgr_entry_create(peer, message) 10/* QMGR_PEER *peer; 11/* QMGR_MESSAGE *message; 12/* 13/* void qmgr_entry_done(entry, which) 14/* QMGR_ENTRY *entry; 15/* int which; 16/* 17/* QMGR_ENTRY *qmgr_entry_select(queue) 18/* QMGR_QUEUE *queue; 19/* 20/* void qmgr_entry_unselect(queue, entry) 21/* QMGR_QUEUE *queue; 22/* QMGR_ENTRY *entry; 23/* 24/* void qmgr_entry_move_todo(dst, entry) 25/* QMGR_QUEUE *dst; 26/* QMGR_ENTRY *entry; 27/* DESCRIPTION 28/* These routines add/delete/manipulate per-site message 29/* delivery requests. 30/* 31/* qmgr_entry_create() creates an entry for the named peer and message, 32/* and appends the entry to the peer's list and its queue's todo list. 33/* Filling in and cleaning up the recipients is the responsibility 34/* of the caller. 35/* 36/* qmgr_entry_done() discards a per-site queue entry. The 37/* \fIwhich\fR argument is either QMGR_QUEUE_BUSY for an entry 38/* of the site's `busy' list (i.e. queue entries that have been 39/* selected for actual delivery), or QMGR_QUEUE_TODO for an entry 40/* of the site's `todo' list (i.e. queue entries awaiting selection 41/* for actual delivery). 42/* 43/* qmgr_entry_done() discards its peer structure when the peer 44/* is not referenced anymore. 45/* 46/* qmgr_entry_done() triggers cleanup of the per-site queue when 47/* the site has no pending deliveries, and the site is either 48/* alive, or the site is dead and the number of in-core queues 49/* exceeds a configurable limit (see qmgr_queue_done()). 50/* 51/* qmgr_entry_done() triggers special action when the last in-core 52/* queue entry for a message is done with: either read more 53/* recipients from the queue file, delete the queue file, or move 54/* the queue file to the deferred queue; send bounce reports to the 55/* message originator (see qmgr_active_done()). 56/* 57/* qmgr_entry_select() selects first entry from the named 58/* per-site queue's `todo' list for actual delivery. The entry is 59/* moved to the queue's `busy' list: the list of messages being 60/* delivered. The entry is also removed from its peer list. 61/* 62/* qmgr_entry_unselect() takes the named entry off the named 63/* per-site queue's `busy' list and moves it to the queue's 64/* `todo' list. The entry is also prepended to its peer list again. 65/* 66/* qmgr_entry_move_todo() moves the specified "todo" queue entry 67/* to the specified "todo" queue. 68/* DIAGNOSTICS 69/* Panic: interface violations, internal inconsistencies. 70/* LICENSE 71/* .ad 72/* .fi 73/* The Secure Mailer license must be distributed with this software. 74/* AUTHOR(S) 75/* Wietse Venema 76/* IBM T.J. Watson Research 77/* P.O. Box 704 78/* Yorktown Heights, NY 10598, USA 79/* 80/* Preemptive scheduler enhancements: 81/* Patrik Rak 82/* Modra 6 83/* 155 00, Prague, Czech Republic 84/*--*/ 85 86/* System library. */ 87 88#include <sys_defs.h> 89#include <stdlib.h> 90#include <time.h> 91 92/* Utility library. */ 93 94#include <msg.h> 95#include <mymalloc.h> 96#include <events.h> 97#include <vstream.h> 98 99/* Global library. */ 100 101#include <mail_params.h> 102#include <deliver_request.h> /* opportunistic session caching */ 103 104/* Application-specific. */ 105 106#include "qmgr.h" 107 108/* qmgr_entry_select - select queue entry for delivery */ 109 110QMGR_ENTRY *qmgr_entry_select(QMGR_PEER *peer) 111{ 112 const char *myname = "qmgr_entry_select"; 113 QMGR_ENTRY *entry; 114 QMGR_QUEUE *queue; 115 116 if ((entry = peer->entry_list.next) != 0) { 117 queue = entry->queue; 118 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); 119 queue->todo_refcount--; 120 QMGR_LIST_APPEND(queue->busy, entry, queue_peers); 121 queue->busy_refcount++; 122 QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); 123 peer->job->selected_entries++; 124 125 /* 126 * With opportunistic session caching, the delivery agent must not 127 * only 1) save a session upon completion, but also 2) reuse a cached 128 * session upon the next delivery request. In order to not miss out 129 * on 2), we have to make caching sticky or else we get silly 130 * behavior when the in-memory queue drains. Specifically, new 131 * connections must not be made as long as cached connections exist. 132 * 133 * Safety: don't enable opportunistic session caching unless the queue 134 * manager is able to schedule concurrent or back-to-back deliveries 135 * (we need to recognize back-to-back deliveries for transports with 136 * concurrency 1). 137 * 138 * If caching has previously been enabled, but is not now, fetch any 139 * existing entries from the cache, but don't add new ones. 140 */ 141#define CONCURRENT_OR_BACK_TO_BACK_DELIVERY() \ 142 (queue->busy_refcount > 1 || BACK_TO_BACK_DELIVERY()) 143 144#define BACK_TO_BACK_DELIVERY() \ 145 (queue->last_done + 1 >= event_time()) 146 147 /* 148 * Turn on session caching after we get up to speed. Don't enable 149 * session caching just because we have concurrent deliveries. This 150 * prevents unnecessary session caching when we have a burst of mail 151 * <= the initial concurrency limit. 152 */ 153 if ((queue->dflags & DEL_REQ_FLAG_CONN_STORE) == 0) { 154 if (BACK_TO_BACK_DELIVERY()) { 155 if (msg_verbose) 156 msg_info("%s: allowing on-demand session caching for %s", 157 myname, queue->name); 158 queue->dflags |= DEL_REQ_FLAG_CONN_MASK; 159 } 160 } 161 162 /* 163 * Turn off session caching when concurrency drops and we're running 164 * out of steam. This is what prevents from turning off session 165 * caching too early, and from making new connections while old ones 166 * are still cached. 167 */ 168 else { 169 if (!CONCURRENT_OR_BACK_TO_BACK_DELIVERY()) { 170 if (msg_verbose) 171 msg_info("%s: disallowing on-demand session caching for %s", 172 myname, queue->name); 173 queue->dflags &= ~DEL_REQ_FLAG_CONN_STORE; 174 } 175 } 176 } 177 return (entry); 178} 179 180/* qmgr_entry_unselect - unselect queue entry for delivery */ 181 182void qmgr_entry_unselect(QMGR_ENTRY *entry) 183{ 184 QMGR_PEER *peer = entry->peer; 185 QMGR_QUEUE *queue = entry->queue; 186 187 /* 188 * Move the entry back to the todo lists. In case of the peer list, put 189 * it back to the beginning, so the select()/unselect() does not reorder 190 * entries. We use this in qmgr_message_assign() to put recipients into 191 * existing entries when possible. 192 */ 193 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); 194 queue->busy_refcount--; 195 QMGR_LIST_APPEND(queue->todo, entry, queue_peers); 196 queue->todo_refcount++; 197 QMGR_LIST_PREPEND(peer->entry_list, entry, peer_peers); 198 peer->job->selected_entries--; 199} 200 201/* qmgr_entry_move_todo - move entry between todo queues */ 202 203void qmgr_entry_move_todo(QMGR_QUEUE *dst_queue, QMGR_ENTRY *entry) 204{ 205 const char *myname = "qmgr_entry_move_todo"; 206 QMGR_TRANSPORT *dst_transport = dst_queue->transport; 207 QMGR_MESSAGE *message = entry->message; 208 QMGR_QUEUE *src_queue = entry->queue; 209 QMGR_PEER *dst_peer, *src_peer = entry->peer; 210 QMGR_JOB *dst_job, *src_job = src_peer->job; 211 QMGR_ENTRY *new_entry; 212 int rcpt_count = entry->rcpt_list.len; 213 214 if (entry->stream != 0) 215 msg_panic("%s: queue %s entry is busy", myname, src_queue->name); 216 if (QMGR_QUEUE_THROTTLED(dst_queue)) 217 msg_panic("%s: destination queue %s is throttled", myname, dst_queue->name); 218 if (QMGR_TRANSPORT_THROTTLED(dst_transport)) 219 msg_panic("%s: destination transport %s is throttled", 220 myname, dst_transport->name); 221 222 /* 223 * Create new entry, swap the recipients between the two entries, 224 * adjusting the job counters accordingly, then dispose of the old entry. 225 * 226 * Note that qmgr_entry_done() will also take care of adjusting the 227 * recipient limits of all the message jobs, so we do not have to do that 228 * explicitly for the new job here. 229 * 230 * XXX This does not enforce the per-entry recipient limit, but that is not 231 * a problem as long as qmgr_entry_move_todo() is called only to bounce 232 * or defer mail. 233 */ 234 dst_job = qmgr_job_obtain(message, dst_transport); 235 dst_peer = qmgr_peer_obtain(dst_job, dst_queue); 236 237 new_entry = qmgr_entry_create(dst_peer, message); 238 239 recipient_list_swap(&entry->rcpt_list, &new_entry->rcpt_list); 240 241 src_job->rcpt_count -= rcpt_count; 242 dst_job->rcpt_count += rcpt_count; 243 244 qmgr_entry_done(entry, QMGR_QUEUE_TODO); 245} 246 247/* qmgr_entry_done - dispose of queue entry */ 248 249void qmgr_entry_done(QMGR_ENTRY *entry, int which) 250{ 251 const char *myname = "qmgr_entry_done"; 252 QMGR_QUEUE *queue = entry->queue; 253 QMGR_MESSAGE *message = entry->message; 254 QMGR_PEER *peer = entry->peer; 255 QMGR_JOB *sponsor, *job = peer->job; 256 QMGR_TRANSPORT *transport = job->transport; 257 258 /* 259 * Take this entry off the in-core queue. 260 */ 261 if (entry->stream != 0) 262 msg_panic("%s: file is open", myname); 263 if (which == QMGR_QUEUE_BUSY) { 264 QMGR_LIST_UNLINK(queue->busy, QMGR_ENTRY *, entry, queue_peers); 265 queue->busy_refcount--; 266 } else if (which == QMGR_QUEUE_TODO) { 267 QMGR_LIST_UNLINK(peer->entry_list, QMGR_ENTRY *, entry, peer_peers); 268 job->selected_entries++; 269 QMGR_LIST_UNLINK(queue->todo, QMGR_ENTRY *, entry, queue_peers); 270 queue->todo_refcount--; 271 } else { 272 msg_panic("%s: bad queue spec: %d", myname, which); 273 } 274 275 /* 276 * Decrease the in-core recipient counts and free the recipient list and 277 * the structure itself. 278 */ 279 job->rcpt_count -= entry->rcpt_list.len; 280 message->rcpt_count -= entry->rcpt_list.len; 281 qmgr_recipient_count -= entry->rcpt_list.len; 282 recipient_list_free(&entry->rcpt_list); 283 myfree((char *) entry); 284 285 /* 286 * Make sure that the transport of any retired or finishing job that 287 * donated recipient slots to this message gets them back first. Then, if 288 * possible, pass the remaining unused recipient slots to the next job on 289 * the job list. 290 */ 291 for (sponsor = message->job_list.next; sponsor; sponsor = sponsor->message_peers.next) { 292 if (sponsor->rcpt_count >= sponsor->rcpt_limit || sponsor == job) 293 continue; 294 if (sponsor->stack_level < 0 || message->rcpt_offset == 0) 295 qmgr_job_move_limits(sponsor); 296 } 297 if (message->rcpt_offset == 0) { 298 qmgr_job_move_limits(job); 299 } 300 301 /* 302 * We implement a rate-limited queue by emulating a slow delivery 303 * channel. We insert the artificial delays with qmgr_queue_suspend(). 304 * 305 * When a queue is suspended, we must postpone any job scheduling decisions 306 * until the queue is resumed. Otherwise, we make those decisions now. 307 * The job scheduling decisions are made by qmgr_job_blocker_update(). 308 */ 309 if (which == QMGR_QUEUE_BUSY && transport->rate_delay > 0) { 310 if (queue->window > 1) 311 msg_panic("%s: queue %s/%s: window %d > 1 on rate-limited service", 312 myname, transport->name, queue->name, queue->window); 313 if (QMGR_QUEUE_THROTTLED(queue)) /* XXX */ 314 qmgr_queue_unthrottle(queue); 315 if (QMGR_QUEUE_READY(queue)) 316 qmgr_queue_suspend(queue, transport->rate_delay); 317 } 318 if (!QMGR_QUEUE_SUSPENDED(queue) 319 && queue->blocker_tag == transport->blocker_tag) 320 qmgr_job_blocker_update(queue); 321 322 /* 323 * When there are no more entries for this peer, discard the peer 324 * structure. 325 */ 326 peer->refcount--; 327 if (peer->refcount == 0) 328 qmgr_peer_free(peer); 329 330 /* 331 * Maintain back-to-back delivery status. 332 */ 333 if (which == QMGR_QUEUE_BUSY) 334 queue->last_done = event_time(); 335 336 /* 337 * When the in-core queue for this site is empty and when this site is 338 * not dead or suspended, discard the in-core queue. When this site is 339 * dead, but the number of in-core queues exceeds some threshold, get rid 340 * of this in-core queue anyway, in order to avoid running out of memory. 341 */ 342 if (queue->todo.next == 0 && queue->busy.next == 0) { 343 if (QMGR_QUEUE_THROTTLED(queue) && qmgr_queue_count > 2 * var_qmgr_rcpt_limit) 344 qmgr_queue_unthrottle(queue); 345 if (QMGR_QUEUE_READY(queue)) 346 qmgr_queue_done(queue); 347 } 348 349 /* 350 * Update the in-core message reference count. When the in-core message 351 * structure has no more references, dispose of the message. 352 */ 353 message->refcount--; 354 if (message->refcount == 0) 355 qmgr_active_done(message); 356} 357 358/* qmgr_entry_create - create queue todo entry */ 359 360QMGR_ENTRY *qmgr_entry_create(QMGR_PEER *peer, QMGR_MESSAGE *message) 361{ 362 QMGR_ENTRY *entry; 363 QMGR_QUEUE *queue = peer->queue; 364 365 /* 366 * Sanity check. 367 */ 368 if (QMGR_QUEUE_THROTTLED(queue)) 369 msg_panic("qmgr_entry_create: dead queue: %s", queue->name); 370 371 /* 372 * Create the delivery request. 373 */ 374 entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY)); 375 entry->stream = 0; 376 entry->message = message; 377 recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE); 378 message->refcount++; 379 entry->peer = peer; 380 QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers); 381 peer->refcount++; 382 entry->queue = queue; 383 QMGR_LIST_APPEND(queue->todo, entry, queue_peers); 384 queue->todo_refcount++; 385 peer->job->read_entries++; 386 387 /* 388 * Warn if a destination is falling behind while the active queue 389 * contains a non-trivial amount of single-recipient email. When a 390 * destination takes up more and more space in the active queue, then 391 * other mail will not get through and delivery performance will suffer. 392 * 393 * XXX At this point in the code, the busy reference count is still less 394 * than the concurrency limit (otherwise this code would not be invoked 395 * in the first place) so we have to make make some awkward adjustments 396 * below. 397 * 398 * XXX The queue length test below looks at the active queue share of an 399 * individual destination. This catches the case where mail for one 400 * destination is falling behind because it has to round-robin compete 401 * with many other destinations. However, Postfix will also perform 402 * poorly when most of the active queue is tied up by a small number of 403 * concurrency limited destinations. The queue length test below detects 404 * such conditions only indirectly. 405 * 406 * XXX This code does not detect the case that the active queue is being 407 * starved because incoming mail is pounding the disk. 408 */ 409 if (var_helpful_warnings && var_qmgr_clog_warn_time > 0) { 410 int queue_length = queue->todo_refcount + queue->busy_refcount; 411 time_t now; 412 QMGR_TRANSPORT *transport; 413 double active_share; 414 415 if (queue_length > var_qmgr_active_limit / 5 416 && (now = event_time()) >= queue->clog_time_to_warn) { 417 active_share = queue_length / (double) qmgr_message_count; 418 msg_warn("mail for %s is using up %d of %d active queue entries", 419 queue->nexthop, queue_length, qmgr_message_count); 420 if (active_share < 0.9) 421 msg_warn("this may slow down other mail deliveries"); 422 transport = queue->transport; 423 if (transport->dest_concurrency_limit > 0 424 && transport->dest_concurrency_limit <= queue->busy_refcount + 1) 425 msg_warn("you may need to increase the main.cf %s%s from %d", 426 transport->name, _DEST_CON_LIMIT, 427 transport->dest_concurrency_limit); 428 else if (queue->window > var_qmgr_active_limit * active_share) 429 msg_warn("you may need to increase the main.cf %s from %d", 430 VAR_QMGR_ACT_LIMIT, var_qmgr_active_limit); 431 else if (queue->peers.next != queue->peers.prev) 432 msg_warn("you may need a separate master.cf transport for %s", 433 queue->nexthop); 434 else { 435 msg_warn("you may need to reduce %s connect and helo timeouts", 436 transport->name); 437 msg_warn("so that Postfix quickly skips unavailable hosts"); 438 msg_warn("you may need to increase the main.cf %s and %s", 439 VAR_MIN_BACKOFF_TIME, VAR_MAX_BACKOFF_TIME); 440 msg_warn("so that Postfix wastes less time on undeliverable mail"); 441 msg_warn("you may need to increase the master.cf %s process limit", 442 transport->name); 443 } 444 msg_warn("please avoid flushing the whole queue when you have"); 445 msg_warn("lots of deferred mail, that is bad for performance"); 446 msg_warn("to turn off these warnings specify: %s = 0", 447 VAR_QMGR_CLOG_WARN_TIME); 448 queue->clog_time_to_warn = now + var_qmgr_clog_warn_time; 449 } 450 } 451 return (entry); 452} 453