1/*++ 2/* NAME 3/* qmgr_active 3 4/* SUMMARY 5/* active queue management 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* void qmgr_active_feed(scan_info, queue_id) 10/* QMGR_SCAN *scan_info; 11/* const char *queue_id; 12/* 13/* void qmgr_active_drain() 14/* 15/* int qmgr_active_done(message) 16/* QMGR_MESSAGE *message; 17/* DESCRIPTION 18/* These functions maintain the active message queue: the set 19/* of messages that the queue manager is actually working on. 20/* The active queue is limited in size. Messages are drained 21/* from the active queue by allocating a delivery process and 22/* by delivering mail via that process. Messages leak into the 23/* active queue only when the active queue is small enough. 24/* Damaged message files are saved to the "corrupt" directory. 25/* 26/* qmgr_active_feed() inserts the named message file into 27/* the active queue. Message files with the wrong name or 28/* with other wrong properties are skipped but not removed. 29/* The following queue flags are recognized, other flags being 30/* ignored: 31/* .IP QMGR_SCAN_ALL 32/* Examine all queue files. Normally, deferred queue files with 33/* future time stamps are ignored, and incoming queue files with 34/* future time stamps are frowned upon. 35/* .PP 36/* qmgr_active_drain() allocates one delivery process. 37/* Process allocation is asynchronous. Once the delivery 38/* process is available, an attempt is made to deliver 39/* a message via it. Message delivery is asynchronous, too. 40/* 41/* qmgr_active_done() deals with a message after delivery 42/* has been tried for all in-core recipients. If the message 43/* was bounced, a bounce message is sent to the sender, or 44/* to the Errors-To: address if one was specified. 45/* If there are more on-file recipients, a new batch of 46/* in-core recipients is read from the queue file. Otherwise, 47/* if a delivery agent marked the queue file as corrupt, 48/* the queue file is moved to the "corrupt" queue (surprise); 49/* if at least one delivery failed, the message is moved 50/* to the deferred queue. The time stamps of a deferred queue 51/* file are set to the nearest wakeup time of its recipient 52/* sites (if delivery failed due to a problem with a next-hop 53/* host), are set into the future by the amount of time the 54/* message was queued (per-message exponential backoff), or are set 55/* into the future by a minimal backoff time, whichever is more. 56/* The minimal_backoff_time parameter specifies the minimal 57/* amount of time between delivery attempts; maximal_backoff_time 58/* specifies an upper limit. 59/* DIAGNOSTICS 60/* Fatal: queue file access failures, out of memory. 61/* Panic: interface violations, internal consistency errors. 62/* Warnings: corrupt message file. A corrupt message is saved 63/* to the "corrupt" queue for further inspection. 64/* LICENSE 65/* .ad 66/* .fi 67/* The Secure Mailer license must be distributed with this software. 68/* AUTHOR(S) 69/* Wietse Venema 70/* IBM T.J. Watson Research 71/* P.O. Box 704 72/* Yorktown Heights, NY 10598, USA 73/*--*/ 74 75/* System library. */ 76 77#include <sys_defs.h> 78#include <sys/stat.h> 79#include <dirent.h> 80#include <stdlib.h> 81#include <unistd.h> 82#include <string.h> 83#include <utime.h> 84#include <errno.h> 85 86#ifndef S_IRWXU /* What? no POSIX system? */ 87#define S_IRWXU 0700 88#endif 89 90/* Utility library. */ 91 92#include <msg.h> 93#include <events.h> 94#include <mymalloc.h> 95#include <vstream.h> 96#include <warn_stat.h> 97 98/* Global library. */ 99 100#include <mail_params.h> 101#include <mail_open_ok.h> 102#include <mail_queue.h> 103#include <recipient_list.h> 104#include <bounce.h> 105#include <defer.h> 106#include <trace.h> 107#include <abounce.h> 108#include <rec_type.h> 109#include <qmgr_user.h> 110 111#ifdef __APPLE_OS_X_SERVER__ 112#include <dtrace-postfix.h> 113#endif 114 115/* Application-specific. */ 116 117#include "qmgr.h" 118 119 /* 120 * A bunch of call-back routines. 121 */ 122static void qmgr_active_done_2_bounce_flush(int, char *); 123static void qmgr_active_done_2_generic(QMGR_MESSAGE *); 124static void qmgr_active_done_25_trace_flush(int, char *); 125static void qmgr_active_done_25_generic(QMGR_MESSAGE *); 126static void qmgr_active_done_3_defer_flush(int, char *); 127static void qmgr_active_done_3_defer_warn(int, char *); 128static void qmgr_active_done_3_generic(QMGR_MESSAGE *); 129 130/* qmgr_active_corrupt - move corrupted file out of the way */ 131 132static void qmgr_active_corrupt(const char *queue_id) 133{ 134 const char *myname = "qmgr_active_corrupt"; 135 136 if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) { 137 if (errno != ENOENT) 138 msg_fatal("%s: save corrupt file queue %s id %s: %m", 139 myname, MAIL_QUEUE_ACTIVE, queue_id); 140 } else { 141 msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", 142 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT); 143 } 144} 145 146/* qmgr_active_defer - defer queue file */ 147 148static void qmgr_active_defer(const char *queue_name, const char *queue_id, 149 const char *dest_queue, int delay) 150{ 151 const char *myname = "qmgr_active_defer"; 152 const char *path; 153 struct utimbuf tbuf; 154 155 if (msg_verbose) 156 msg_info("wakeup %s after %ld secs", queue_id, (long) delay); 157 158 tbuf.actime = tbuf.modtime = event_time() + delay; 159 path = mail_queue_path((VSTRING *) 0, queue_name, queue_id); 160 if (utime(path, &tbuf) < 0 && errno != ENOENT) 161 msg_fatal("%s: update %s time stamps: %m", myname, path); 162 if (mail_queue_rename(queue_id, queue_name, dest_queue)) { 163 if (errno != ENOENT) 164 msg_fatal("%s: rename %s from %s to %s: %m", myname, 165 queue_id, queue_name, dest_queue); 166 msg_warn("%s: rename %s from %s to %s: %m", myname, 167 queue_id, queue_name, dest_queue); 168 } else if (msg_verbose) { 169 msg_info("%s: defer %s", myname, queue_id); 170 } 171} 172 173/* qmgr_active_feed - feed one message into active queue */ 174 175int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) 176{ 177 const char *myname = "qmgr_active_feed"; 178 QMGR_MESSAGE *message; 179 struct stat st; 180 const char *path; 181 182 if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0) 183 msg_panic("%s: bad queue %s", myname, scan_info->queue); 184 if (msg_verbose) 185 msg_info("%s: queue %s", myname, scan_info->queue); 186 187 /* 188 * Make sure this is something we are willing to open. 189 */ 190 if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO) 191 return (0); 192 193 if (msg_verbose) 194 msg_info("%s: %s", myname, path); 195 196 /* 197 * Skip files that have time stamps into the future. They need to cool 198 * down. Incoming and deferred files can have future time stamps. 199 */ 200 if ((scan_info->flags & QMGR_SCAN_ALL) == 0 201 && st.st_mtime > time((time_t *) 0) + 1) { 202 if (msg_verbose) 203 msg_info("%s: skip %s (%ld seconds)", myname, queue_id, 204 (long) (st.st_mtime - event_time())); 205 return (0); 206 } 207 208 /* 209 * Move the message to the active queue. File access errors are fatal. 210 */ 211 if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) { 212 if (errno != ENOENT) 213 msg_fatal("%s: %s: rename from %s to %s: %m", myname, 214 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 215 msg_warn("%s: %s: rename from %s to %s: %m", myname, 216 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 217 return (0); 218 } 219 220 /* 221 * Extract envelope information: sender and recipients. At this point, 222 * mail addresses have been processed by the cleanup service so they 223 * should be in canonical form. Generate requests to deliver this 224 * message. 225 * 226 * Throwing away queue files seems bad, especially when they made it this 227 * far into the mail system. Therefore we save bad files to a separate 228 * directory for further inspection. 229 * 230 * After queue manager restart it is possible that a queue file is still 231 * being delivered. In that case (the file is locked), defer delivery by 232 * a minimal amount of time. 233 */ 234#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) 235 236 if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, 237 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? 238 scan_info->flags | QMGR_FLUSH_AFTER : 239 scan_info->flags, 240 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ? 241 st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE : 242 0)) == 0) { 243 qmgr_active_corrupt(queue_id); 244 return (0); 245 } else if (message == QMGR_MESSAGE_LOCKED) { 246 qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60); 247 return (0); 248 } else { 249 250 /* 251 * Special case if all recipients were already delivered. Send any 252 * bounces and clean up. 253 */ 254 if (message->refcount == 0) 255 qmgr_active_done(message); 256 return (1); 257 } 258} 259 260/* qmgr_active_done - dispose of message after recipients have been tried */ 261 262void qmgr_active_done(QMGR_MESSAGE *message) 263{ 264 const char *myname = "qmgr_active_done"; 265 struct stat st; 266 267 if (msg_verbose) 268 msg_info("%s: %s", myname, message->queue_id); 269 270 /* 271 * During a previous iteration, an attempt to bounce this message may 272 * have failed, so there may still be a bounce log lying around. XXX By 273 * groping around in the bounce queue, we're trespassing on the bounce 274 * service's territory. But doing so is more robust than depending on the 275 * bounce daemon to do the lookup for us, and for us to do the deleting 276 * after we have received a successful status from the bounce service. 277 * The bounce queue directory blocks are most likely in memory anyway. If 278 * these lookups become a performance problem we will have to build an 279 * in-core cache into the bounce daemon. 280 * 281 * Don't bounce when the bounce log is empty. The bounce process obviously 282 * failed, and the delivery agent will have requested that the message be 283 * deferred. 284 * 285 * Bounces are sent asynchronously to avoid stalling while the cleanup 286 * daemon waits for the qmgr to accept the "new mail" trigger. 287 * 288 * See also code in cleanup_bounce.c. 289 */ 290 if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) { 291 if (st.st_size == 0) { 292 if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id)) 293 msg_fatal("remove %s %s: %m", 294 MAIL_QUEUE_BOUNCE, message->queue_id); 295 } else { 296 if (msg_verbose) 297 msg_info("%s: bounce %s", myname, message->queue_id); 298 if (message->verp_delims == 0 || var_verp_bounce_off) 299 abounce_flush(BOUNCE_FLAG_KEEP, 300 message->queue_name, 301 message->queue_id, 302 message->encoding, 303 message->sender, 304 message->dsn_envid, 305 message->dsn_ret, 306 qmgr_active_done_2_bounce_flush, 307 (char *) message); 308 else 309 abounce_flush_verp(BOUNCE_FLAG_KEEP, 310 message->queue_name, 311 message->queue_id, 312 message->encoding, 313 message->sender, 314 message->dsn_envid, 315 message->dsn_ret, 316 message->verp_delims, 317 qmgr_active_done_2_bounce_flush, 318 (char *) message); 319 return; 320 } 321 } 322 323 /* 324 * Asynchronous processing does not reach this point. 325 */ 326 qmgr_active_done_2_generic(message); 327} 328 329/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */ 330 331static void qmgr_active_done_2_bounce_flush(int status, char *context) 332{ 333 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 334 335 /* 336 * Process abounce_flush() status and continue processing. 337 */ 338 message->flags |= status; 339 qmgr_active_done_2_generic(message); 340} 341 342/* qmgr_active_done_2_generic - continue processing */ 343 344static void qmgr_active_done_2_generic(QMGR_MESSAGE *message) 345{ 346 const char *path; 347 struct stat st; 348 349 /* 350 * A delivery agent marks a queue file as corrupt by changing its 351 * attributes, and by pretending that delivery was deferred. 352 */ 353 if (message->flags 354 && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) { 355 qmgr_active_corrupt(message->queue_id); 356 qmgr_message_free(message); 357 return; 358 } 359 360 /* 361 * If we did not read all recipients from this file, go read some more, 362 * but remember whether some recipients have to be tried again. 363 * 364 * Throwing away queue files seems bad, especially when they made it this 365 * far into the mail system. Therefore we save bad files to a separate 366 * directory for further inspection by a human being. 367 */ 368 if (message->rcpt_offset > 0) { 369 if (qmgr_message_realloc(message) == 0) { 370 qmgr_active_corrupt(message->queue_id); 371 qmgr_message_free(message); 372 } else { 373 if (message->refcount == 0) 374 qmgr_active_done(message); /* recurse for consistency */ 375 } 376 return; 377 } 378 379 /* 380 * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS 381 * and others not. Depending on what subset of recipients are delivered, 382 * a trace file may or may not be created. Even when the last partial 383 * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may 384 * still exist from a previous partial delivery attempt. So as long as 385 * any recipient has NOTIFY=SUCCESS we have to always look for the trace 386 * file and be prepared for the file not to exist. 387 * 388 * See also comments in bounce/bounce_notify_util.c. 389 */ 390 if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD)) 391 || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) { 392 atrace_flush(message->tflags, 393 message->queue_name, 394 message->queue_id, 395 message->encoding, 396 message->sender, 397 message->dsn_envid, 398 message->dsn_ret, 399 qmgr_active_done_25_trace_flush, 400 (char *) message); 401 return; 402 } 403 404 /* 405 * Asynchronous processing does not reach this point. 406 */ 407 qmgr_active_done_25_generic(message); 408} 409 410/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */ 411 412static void qmgr_active_done_25_trace_flush(int status, char *context) 413{ 414 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 415 416 /* 417 * Process atrace_flush() status and continue processing. 418 */ 419 if (status == 0 && message->tflags_offset) 420 qmgr_message_kill_record(message, message->tflags_offset); 421 message->flags |= status; 422 qmgr_active_done_25_generic(message); 423} 424 425/* qmgr_active_done_25_generic - continue processing */ 426 427static void qmgr_active_done_25_generic(QMGR_MESSAGE *message) 428{ 429 const char *myname = "qmgr_active_done_25_generic"; 430 431 /* 432 * If we get to this point we have tried all recipients for this message. 433 * If the message is too old, try to bounce it. 434 * 435 * Bounces are sent asynchronously to avoid stalling while the cleanup 436 * daemon waits for the qmgr to accept the "new mail" trigger. 437 */ 438 if (message->flags) { 439 if (event_time() >= message->create_time + 440 (*message->sender ? var_max_queue_time : var_dsn_queue_time)) { 441 msg_info("%s: from=<%s>, status=expired, returned to sender", 442 message->queue_id, message->sender); 443 if (message->verp_delims == 0 || var_verp_bounce_off) 444 adefer_flush(BOUNCE_FLAG_KEEP, 445 message->queue_name, 446 message->queue_id, 447 message->encoding, 448 message->sender, 449 message->dsn_envid, 450 message->dsn_ret, 451 qmgr_active_done_3_defer_flush, 452 (char *) message); 453 else 454 adefer_flush_verp(BOUNCE_FLAG_KEEP, 455 message->queue_name, 456 message->queue_id, 457 message->encoding, 458 message->sender, 459 message->dsn_envid, 460 message->dsn_ret, 461 message->verp_delims, 462 qmgr_active_done_3_defer_flush, 463 (char *) message); 464 return; 465 } else if (message->warn_time > 0 466 && event_time() >= message->warn_time - 1) { 467 if (msg_verbose) 468 msg_info("%s: sending defer warning for %s", myname, message->queue_id); 469 adefer_warn(BOUNCE_FLAG_KEEP, 470 message->queue_name, 471 message->queue_id, 472 message->encoding, 473 message->sender, 474 message->dsn_envid, 475 message->dsn_ret, 476 qmgr_active_done_3_defer_warn, 477 (char *) message); 478 return; 479 } 480 } 481 482 /* 483 * Asynchronous processing does not reach this point. 484 */ 485 qmgr_active_done_3_generic(message); 486} 487 488/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */ 489 490static void qmgr_active_done_3_defer_warn(int status, char *context) 491{ 492 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 493 494 /* 495 * Process adefer_warn() completion status and continue processing. 496 */ 497 if (status == 0) 498 qmgr_message_update_warn(message); 499 qmgr_active_done_3_generic(message); 500} 501 502/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */ 503 504static void qmgr_active_done_3_defer_flush(int status, char *context) 505{ 506 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 507 508 /* 509 * Process adefer_flush() status and continue processing. 510 */ 511 message->flags = status; 512 qmgr_active_done_3_generic(message); 513} 514 515/* qmgr_active_done_3_generic - continue processing */ 516 517static void qmgr_active_done_3_generic(QMGR_MESSAGE *message) 518{ 519 const char *myname = "qmgr_active_done_3_generic"; 520 int delay; 521 522 /* 523 * Some recipients need to be tried again. Move the queue file time 524 * stamps into the future by the amount of time that the message is 525 * delayed, and move the message to the deferred queue. Impose minimal 526 * and maximal backoff times. 527 * 528 * Since we look at actual time in queue, not time since last delivery 529 * attempt, backoff times will be distributed. However, we can still see 530 * spikes in delivery activity because the interval between deferred 531 * queue scans is finite. 532 */ 533 if (message->flags) { 534 if (message->create_time > 0) { 535 delay = event_time() - message->create_time; 536 if (delay > var_max_backoff_time) 537 delay = var_max_backoff_time; 538 if (delay < var_min_backoff_time) 539 delay = var_min_backoff_time; 540 } else { 541 delay = var_min_backoff_time; 542 } 543 qmgr_active_defer(message->queue_name, message->queue_id, 544 MAIL_QUEUE_DEFERRED, delay); 545 } 546 547 /* 548 * All recipients done. Remove the queue file. 549 */ 550 else { 551 if (mail_queue_remove(message->queue_name, message->queue_id)) { 552 if (errno != ENOENT) 553 msg_fatal("%s: remove %s from %s: %m", myname, 554 message->queue_id, message->queue_name); 555 msg_warn("%s: remove %s from %s: %m", myname, 556 message->queue_id, message->queue_name); 557 } else { 558 /* Same format as logged by postsuper. */ 559 msg_info("%s: removed", message->queue_id); 560 561#ifdef __APPLE_OS_X_SERVER__ 562 if (POSTFIX_SMTP_DEQUEUE_ENABLED()) 563 POSTFIX_SMTP_DEQUEUE(message); 564#endif 565 } 566 } 567 568 /* 569 * Finally, delete the in-core message structure. 570 */ 571 qmgr_message_free(message); 572} 573 574/* qmgr_active_drain - drain active queue by allocating a delivery process */ 575 576void qmgr_active_drain(void) 577{ 578 QMGR_TRANSPORT *transport; 579 580 /* 581 * Allocate one delivery process for every transport with pending mail. 582 * The process allocation completes asynchronously. 583 */ 584 while ((transport = qmgr_transport_select()) != 0) { 585 if (msg_verbose) 586 msg_info("qmgr_active_drain: allocate %s", transport->name); 587 qmgr_transport_alloc(transport, qmgr_deliver); 588 } 589} 590