1/*++ 2/* NAME 3/* qmgr_queue 3 4/* SUMMARY 5/* per-destination queues 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* int qmgr_queue_count; 10/* 11/* QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop) 12/* QMGR_TRANSPORT *transport; 13/* const char *name; 14/* const char *nexthop; 15/* 16/* void qmgr_queue_done(queue) 17/* QMGR_QUEUE *queue; 18/* 19/* QMGR_QUEUE *qmgr_queue_find(transport, name) 20/* QMGR_TRANSPORT *transport; 21/* const char *name; 22/* 23/* void qmgr_queue_throttle(queue, dsn) 24/* QMGR_QUEUE *queue; 25/* DSN *dsn; 26/* 27/* void qmgr_queue_unthrottle(queue) 28/* QMGR_QUEUE *queue; 29/* 30/* void qmgr_queue_suspend(queue, delay) 31/* QMGR_QUEUE *queue; 32/* int delay; 33/* DESCRIPTION 34/* These routines add/delete/manipulate per-destination queues. 35/* Each queue corresponds to a specific transport and destination. 36/* Each queue has a `todo' list of delivery requests for that 37/* destination, and a `busy' list of delivery requests in progress. 38/* 39/* qmgr_queue_count is a global counter for the total number 40/* of in-core queue structures. 41/* 42/* qmgr_queue_create() creates an empty named queue for the named 43/* transport and destination. The queue is given an initial 44/* concurrency limit as specified with the 45/* \fIinitial_destination_concurrency\fR configuration parameter, 46/* provided that it does not exceed the transport-specific 47/* concurrency limit. 48/* 49/* qmgr_queue_done() disposes of a per-destination queue after all 50/* its entries have been taken care of. It is an error to dispose 51/* of a dead queue. 52/* 53/* qmgr_queue_find() looks up the named queue for the named 54/* transport. A null result means that the queue was not found. 55/* 56/* qmgr_queue_throttle() handles a delivery error, and decrements the 57/* concurrency limit for the destination, with a lower bound of 1. 58/* When the cohort failure bound is reached, qmgr_queue_throttle() 59/* sets the concurrency limit to zero and starts a timer 60/* to re-enable delivery to the destination after a configurable delay. 61/* 62/* qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects. 63/* The concurrency limit for the destination is incremented, 64/* provided that it does not exceed the destination concurrency 65/* limit specified for the transport. This routine implements 66/* "slow open" mode, and eliminates the "thundering herd" problem. 67/* 68/* qmgr_queue_suspend() suspends delivery for this destination 69/* briefly. This function invalidates any scheduling decisions 70/* that are based on the present queue's concurrency window. 71/* To compensate for work skipped by qmgr_entry_done(), the 72/* status of blocker jobs is re-evaluated after the queue is 73/* resumed. 74/* DIAGNOSTICS 75/* Panic: consistency check failure. 76/* LICENSE 77/* .ad 78/* .fi 79/* The Secure Mailer license must be distributed with this software. 80/* AUTHOR(S) 81/* Wietse Venema 82/* IBM T.J. Watson Research 83/* P.O. Box 704 84/* Yorktown Heights, NY 10598, USA 85/* 86/* Pre-emptive scheduler enhancements: 87/* Patrik Rak 88/* Modra 6 89/* 155 00, Prague, Czech Republic 90/* 91/* Concurrency scheduler enhancements with: 92/* Victor Duchovni 93/* Morgan Stanley 94/*--*/ 95 96/* System library. */ 97 98#include <sys_defs.h> 99#include <time.h> 100 101/* Utility library. */ 102 103#include <msg.h> 104#include <mymalloc.h> 105#include <events.h> 106#include <htable.h> 107 108/* Global library. */ 109 110#include <mail_params.h> 111#include <recipient_list.h> 112#include <mail_proto.h> /* QMGR_LOG_WINDOW */ 113 114/* Application-specific. */ 115 116#include "qmgr.h" 117 118int qmgr_queue_count; 119 120#define QMGR_ERROR_OR_RETRY_QUEUE(queue) \ 121 (strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \ 122 || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0) 123 124#define QMGR_LOG_FEEDBACK(feedback) \ 125 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 126 msg_info("%s: feedback %g", myname, feedback); 127 128#define QMGR_LOG_WINDOW(queue) \ 129 if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \ 130 msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \ 131 myname, queue->name, queue->transport->dest_concurrency_limit, \ 132 queue->window, queue->success, queue->failure, queue->fail_cohorts); 133 134/* qmgr_queue_resume - resume delivery to destination */ 135 136static void qmgr_queue_resume(int event, char *context) 137{ 138 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 139 const char *myname = "qmgr_queue_resume"; 140 141 /* 142 * Sanity checks. 143 */ 144 if (!QMGR_QUEUE_SUSPENDED(queue)) 145 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 146 147 /* 148 * We can't simply force delivery on this queue: the transport's pending 149 * count may already be maxed out, and there may be other constraints 150 * that definitely should be none of our business. The best we can do is 151 * to play by the same rules as everyone else: let qmgr_active_drain() 152 * and round-robin selection take care of message selection. 153 */ 154 queue->window = 1; 155 156 /* 157 * Every event handler that leaves a queue in the "ready" state should 158 * remove the queue when it is empty. 159 * 160 * XXX Do not omit the redundant test below. It is here to simplify code 161 * consistency checks. The check is trivially eliminated by the compiler 162 * optimizer. There is no need to sacrifice code clarity for the sake of 163 * performance. 164 * 165 * XXX Do not expose the blocker job logic here. Rate-limited queues are not 166 * a performance-critical feature. Here, too, there is no need to sacrifice 167 * code clarity for the sake of performance. 168 */ 169 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 170 qmgr_queue_done(queue); 171 else 172 qmgr_job_blocker_update(queue); 173} 174 175/* qmgr_queue_suspend - briefly suspend a destination */ 176 177void qmgr_queue_suspend(QMGR_QUEUE *queue, int delay) 178{ 179 const char *myname = "qmgr_queue_suspend"; 180 181 /* 182 * Sanity checks. 183 */ 184 if (!QMGR_QUEUE_READY(queue)) 185 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 186 if (queue->busy_refcount > 0) 187 msg_panic("%s: queue is busy", myname); 188 189 /* 190 * Set the queue status to "suspended". No-one is supposed to remove a 191 * queue in suspended state. 192 */ 193 queue->window = QMGR_QUEUE_STAT_SUSPENDED; 194 event_request_timer(qmgr_queue_resume, (char *) queue, delay); 195} 196 197/* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */ 198 199static void qmgr_queue_unthrottle_wrapper(int unused_event, char *context) 200{ 201 QMGR_QUEUE *queue = (QMGR_QUEUE *) context; 202 203 /* 204 * This routine runs when a wakeup timer goes off; it does not run in the 205 * context of some queue manipulation. Therefore, it is safe to discard 206 * this in-core queue when it is empty and when this site is not dead. 207 */ 208 qmgr_queue_unthrottle(queue); 209 if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0) 210 qmgr_queue_done(queue); 211} 212 213/* qmgr_queue_unthrottle - give this destination another chance */ 214 215void qmgr_queue_unthrottle(QMGR_QUEUE *queue) 216{ 217 const char *myname = "qmgr_queue_unthrottle"; 218 QMGR_TRANSPORT *transport = queue->transport; 219 double feedback; 220 221 if (msg_verbose) 222 msg_info("%s: queue %s", myname, queue->name); 223 224 /* 225 * Sanity checks. 226 */ 227 if (!QMGR_QUEUE_READY(queue) && !QMGR_QUEUE_THROTTLED(queue)) 228 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 229 230 /* 231 * Don't restart the negative feedback hysteresis cycle with every 232 * positive feedback. Restart it only when we make a positive concurrency 233 * adjustment (i.e. at the end of a positive feedback hysteresis cycle). 234 * Otherwise negative feedback would be too aggressive: negative feedback 235 * takes effect immediately at the start of its hysteresis cycle. 236 */ 237 queue->fail_cohorts = 0; 238 239 /* 240 * Special case when this site was dead. 241 */ 242 if (QMGR_QUEUE_THROTTLED(queue)) { 243 event_cancel_timer(qmgr_queue_unthrottle_wrapper, (char *) queue); 244 if (queue->dsn == 0) 245 msg_panic("%s: queue %s: window 0 status 0", myname, queue->name); 246 dsn_free(queue->dsn); 247 queue->dsn = 0; 248 /* Back from the almost grave, best concurrency is anyone's guess. */ 249 if (queue->busy_refcount > 0) 250 queue->window = queue->busy_refcount; 251 else 252 queue->window = transport->init_dest_concurrency; 253 queue->success = queue->failure = 0; 254 QMGR_LOG_WINDOW(queue); 255 return; 256 } 257 258 /* 259 * Increase the destination's concurrency limit until we reach the 260 * transport's concurrency limit. Allow for a margin the size of the 261 * initial destination concurrency, so that we're not too gentle. 262 * 263 * Why is the concurrency increment based on preferred concurrency and not 264 * on the number of outstanding delivery requests? The latter fluctuates 265 * wildly when deliveries complete in bursts (artificial benchmark 266 * measurements), and does not account for cached connections. 267 * 268 * Keep the window within reasonable distance from actual concurrency 269 * otherwise negative feedback will be ineffective. This expression 270 * assumes that busy_refcount changes gradually. This is invalid when 271 * deliveries complete in bursts (artificial benchmark measurements). 272 */ 273 if (transport->dest_concurrency_limit == 0 274 || transport->dest_concurrency_limit > queue->window) 275 if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) { 276 feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window); 277 QMGR_LOG_FEEDBACK(feedback); 278 queue->success += feedback; 279 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 280 while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) { 281 queue->window += transport->pos_feedback.hysteresis; 282 queue->success -= transport->pos_feedback.hysteresis; 283 queue->failure = 0; 284 } 285 /* Prepare for overshoot. */ 286 if (transport->dest_concurrency_limit > 0 287 && queue->window > transport->dest_concurrency_limit) 288 queue->window = transport->dest_concurrency_limit; 289 } 290 QMGR_LOG_WINDOW(queue); 291} 292 293/* qmgr_queue_throttle - handle destination delivery failure */ 294 295void qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn) 296{ 297 const char *myname = "qmgr_queue_throttle"; 298 QMGR_TRANSPORT *transport = queue->transport; 299 double feedback; 300 301 /* 302 * Sanity checks. 303 */ 304 if (!QMGR_QUEUE_READY(queue)) 305 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 306 if (queue->dsn) 307 msg_panic("%s: queue %s: spurious reason %s", 308 myname, queue->name, queue->dsn->reason); 309 if (msg_verbose) 310 msg_info("%s: queue %s: %s %s", 311 myname, queue->name, dsn->status, dsn->reason); 312 313 /* 314 * Don't restart the positive feedback hysteresis cycle with every 315 * negative feedback. Restart it only when we make a negative concurrency 316 * adjustment (i.e. at the start of a negative feedback hysteresis 317 * cycle). Otherwise positive feedback would be too weak (positive 318 * feedback does not take effect until the end of its hysteresis cycle). 319 */ 320 321 /* 322 * This queue is declared dead after a configurable number of 323 * pseudo-cohort failures. 324 */ 325 if (QMGR_QUEUE_READY(queue)) { 326 queue->fail_cohorts += 1.0 / queue->window; 327 if (transport->fail_cohort_limit > 0 328 && queue->fail_cohorts >= transport->fail_cohort_limit) 329 queue->window = QMGR_QUEUE_STAT_THROTTLED; 330 } 331 332 /* 333 * Decrease the destination's concurrency limit until we reach 1. Base 334 * adjustments on the concurrency limit itself, instead of using the 335 * actual concurrency. The latter fluctuates wildly when deliveries 336 * complete in bursts (artificial benchmark measurements). 337 * 338 * Even after reaching 1, we maintain the negative hysteresis cycle so that 339 * negative feedback can cancel out positive feedback. 340 */ 341 if (QMGR_QUEUE_READY(queue)) { 342 feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window); 343 QMGR_LOG_FEEDBACK(feedback); 344 queue->failure -= feedback; 345 /* Prepare for overshoot (feedback > hysteresis, rounding error). */ 346 while (queue->failure - feedback / 2 < 0) { 347 queue->window -= transport->neg_feedback.hysteresis; 348 queue->success = 0; 349 queue->failure += transport->neg_feedback.hysteresis; 350 } 351 /* Prepare for overshoot. */ 352 if (queue->window < 1) 353 queue->window = 1; 354 } 355 356 /* 357 * Special case for a site that just was declared dead. 358 */ 359 if (QMGR_QUEUE_THROTTLED(queue)) { 360 queue->dsn = DSN_COPY(dsn); 361 event_request_timer(qmgr_queue_unthrottle_wrapper, 362 (char *) queue, var_min_backoff_time); 363 queue->dflags = 0; 364 } 365 QMGR_LOG_WINDOW(queue); 366} 367 368/* qmgr_queue_done - delete in-core queue for site */ 369 370void qmgr_queue_done(QMGR_QUEUE *queue) 371{ 372 const char *myname = "qmgr_queue_done"; 373 QMGR_TRANSPORT *transport = queue->transport; 374 375 /* 376 * Sanity checks. It is an error to delete an in-core queue with pending 377 * messages or timers. 378 */ 379 if (queue->busy_refcount != 0 || queue->todo_refcount != 0) 380 msg_panic("%s: refcount: %d", myname, 381 queue->busy_refcount + queue->todo_refcount); 382 if (queue->todo.next || queue->busy.next) 383 msg_panic("%s: queue not empty: %s", myname, queue->name); 384 if (!QMGR_QUEUE_READY(queue)) 385 msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue)); 386 if (queue->dsn) 387 msg_panic("%s: queue %s: spurious reason %s", 388 myname, queue->name, queue->dsn->reason); 389 390 /* 391 * Clean up this in-core queue. 392 */ 393 QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue, peers); 394 htable_delete(transport->queue_byname, queue->name, (void (*) (char *)) 0); 395 myfree(queue->name); 396 myfree(queue->nexthop); 397 qmgr_queue_count--; 398 myfree((char *) queue); 399} 400 401/* qmgr_queue_create - create in-core queue for site */ 402 403QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name, 404 const char *nexthop) 405{ 406 QMGR_QUEUE *queue; 407 408 /* 409 * If possible, choose an initial concurrency of > 1 so that one bad 410 * message or one bad network won't slow us down unnecessarily. 411 */ 412 413 queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE)); 414 qmgr_queue_count++; 415 queue->dflags = 0; 416 queue->last_done = 0; 417 queue->name = mystrdup(name); 418 queue->nexthop = mystrdup(nexthop); 419 queue->todo_refcount = 0; 420 queue->busy_refcount = 0; 421 queue->transport = transport; 422 queue->window = transport->init_dest_concurrency; 423 queue->success = queue->failure = queue->fail_cohorts = 0; 424 QMGR_LIST_INIT(queue->todo); 425 QMGR_LIST_INIT(queue->busy); 426 queue->dsn = 0; 427 queue->clog_time_to_warn = 0; 428 queue->blocker_tag = 0; 429 QMGR_LIST_APPEND(transport->queue_list, queue, peers); 430 htable_enter(transport->queue_byname, name, (char *) queue); 431 return (queue); 432} 433 434/* qmgr_queue_find - find in-core named queue */ 435 436QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name) 437{ 438 return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name)); 439} 440