1/* $NetBSD: qmgr_active.c,v 1.3 2020/03/18 19:05:19 christos Exp $ */ 2 3/*++ 4/* NAME 5/* qmgr_active 3 6/* SUMMARY 7/* active queue management 8/* SYNOPSIS 9/* #include "qmgr.h" 10/* 11/* void qmgr_active_feed(scan_info, queue_id) 12/* QMGR_SCAN *scan_info; 13/* const char *queue_id; 14/* 15/* void qmgr_active_drain() 16/* 17/* int qmgr_active_done(message) 18/* QMGR_MESSAGE *message; 19/* DESCRIPTION 20/* These functions maintain the active message queue: the set 21/* of messages that the queue manager is actually working on. 22/* The active queue is limited in size. Messages are drained 23/* from the active queue by allocating a delivery process and 24/* by delivering mail via that process. Messages leak into the 25/* active queue only when the active queue is small enough. 26/* Damaged message files are saved to the "corrupt" directory. 27/* 28/* qmgr_active_feed() inserts the named message file into 29/* the active queue. Message files with the wrong name or 30/* with other wrong properties are skipped but not removed. 31/* The following queue flags are recognized, other flags being 32/* ignored: 33/* .IP QMGR_SCAN_ALL 34/* Examine all queue files. Normally, deferred queue files with 35/* future time stamps are ignored, and incoming queue files with 36/* future time stamps are frowned upon. 37/* .PP 38/* qmgr_active_drain() allocates one delivery process. 39/* Process allocation is asynchronous. Once the delivery 40/* process is available, an attempt is made to deliver 41/* a message via it. Message delivery is asynchronous, too. 42/* 43/* qmgr_active_done() deals with a message after delivery 44/* has been tried for all in-core recipients. If the message 45/* was bounced, a bounce message is sent to the sender, or 46/* to the Errors-To: address if one was specified. 47/* If there are more on-file recipients, a new batch of 48/* in-core recipients is read from the queue file. Otherwise, 49/* if a delivery agent marked the queue file as corrupt, 50/* the queue file is moved to the "corrupt" queue (surprise); 51/* if at least one delivery failed, the message is moved 52/* to the deferred queue. The time stamps of a deferred queue 53/* file are set to the nearest wakeup time of its recipient 54/* sites (if delivery failed due to a problem with a next-hop 55/* host), are set into the future by the amount of time the 56/* message was queued (per-message exponential backoff), or are set 57/* into the future by a minimal backoff time, whichever is more. 58/* The minimal_backoff_time parameter specifies the minimal 59/* amount of time between delivery attempts; maximal_backoff_time 60/* specifies an upper limit. 61/* DIAGNOSTICS 62/* Fatal: queue file access failures, out of memory. 63/* Panic: interface violations, internal consistency errors. 64/* Warnings: corrupt message file. A corrupt message is saved 65/* to the "corrupt" queue for further inspection. 66/* LICENSE 67/* .ad 68/* .fi 69/* The Secure Mailer license must be distributed with this software. 70/* AUTHOR(S) 71/* Wietse Venema 72/* IBM T.J. Watson Research 73/* P.O. Box 704 74/* Yorktown Heights, NY 10598, USA 75/* 76/* Wietse Venema 77/* Google, Inc. 78/* 111 8th Avenue 79/* New York, NY 10011, USA 80/*--*/ 81 82/* System library. */ 83 84#include <sys_defs.h> 85#include <sys/stat.h> 86#include <dirent.h> 87#include <stdlib.h> 88#include <unistd.h> 89#include <string.h> 90#include <utime.h> 91#include <errno.h> 92 93#ifndef S_IRWXU /* What? no POSIX system? */ 94#define S_IRWXU 0700 95#endif 96 97/* Utility library. */ 98 99#include <msg.h> 100#include <events.h> 101#include <mymalloc.h> 102#include <vstream.h> 103#include <warn_stat.h> 104 105/* Global library. */ 106 107#include <mail_params.h> 108#include <mail_open_ok.h> 109#include <mail_queue.h> 110#include <recipient_list.h> 111#include <bounce.h> 112#include <defer.h> 113#include <trace.h> 114#include <abounce.h> 115#include <rec_type.h> 116#include <qmgr_user.h> 117#include <info_log_addr_form.h> 118 119/* Application-specific. */ 120 121#include "qmgr.h" 122 123 /* 124 * A bunch of call-back routines. 125 */ 126static void qmgr_active_done_2_bounce_flush(int, void *); 127static void qmgr_active_done_2_generic(QMGR_MESSAGE *); 128static void qmgr_active_done_25_trace_flush(int, void *); 129static void qmgr_active_done_25_generic(QMGR_MESSAGE *); 130static void qmgr_active_done_3_defer_flush(int, void *); 131static void qmgr_active_done_3_defer_warn(int, void *); 132static void qmgr_active_done_3_generic(QMGR_MESSAGE *); 133 134/* qmgr_active_corrupt - move corrupted file out of the way */ 135 136static void qmgr_active_corrupt(const char *queue_id) 137{ 138 const char *myname = "qmgr_active_corrupt"; 139 140 if (mail_queue_rename(queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT)) { 141 if (errno != ENOENT) 142 msg_fatal("%s: save corrupt file queue %s id %s: %m", 143 myname, MAIL_QUEUE_ACTIVE, queue_id); 144 } else { 145 msg_warn("saving corrupt file \"%s\" from queue \"%s\" to queue \"%s\"", 146 queue_id, MAIL_QUEUE_ACTIVE, MAIL_QUEUE_CORRUPT); 147 } 148} 149 150/* qmgr_active_defer - defer queue file */ 151 152static void qmgr_active_defer(const char *queue_name, const char *queue_id, 153 const char *dest_queue, int delay) 154{ 155 const char *myname = "qmgr_active_defer"; 156 const char *path; 157 struct utimbuf tbuf; 158 159 if (msg_verbose) 160 msg_info("wakeup %s after %ld secs", queue_id, (long) delay); 161 162 tbuf.actime = tbuf.modtime = event_time() + delay; 163 path = mail_queue_path((VSTRING *) 0, queue_name, queue_id); 164 if (utime(path, &tbuf) < 0 && errno != ENOENT) 165 msg_fatal("%s: update %s time stamps: %m", myname, path); 166 if (mail_queue_rename(queue_id, queue_name, dest_queue)) { 167 if (errno != ENOENT) 168 msg_fatal("%s: rename %s from %s to %s: %m", myname, 169 queue_id, queue_name, dest_queue); 170 msg_warn("%s: rename %s from %s to %s: %m", myname, 171 queue_id, queue_name, dest_queue); 172 } else if (msg_verbose) { 173 msg_info("%s: defer %s", myname, queue_id); 174 } 175} 176 177/* qmgr_active_feed - feed one message into active queue */ 178 179int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id) 180{ 181 const char *myname = "qmgr_active_feed"; 182 QMGR_MESSAGE *message; 183 struct stat st; 184 const char *path; 185 186 if (strcmp(scan_info->queue, MAIL_QUEUE_ACTIVE) == 0) 187 msg_panic("%s: bad queue %s", myname, scan_info->queue); 188 if (msg_verbose) 189 msg_info("%s: queue %s", myname, scan_info->queue); 190 191 /* 192 * Make sure this is something we are willing to open. 193 */ 194 if (mail_open_ok(scan_info->queue, queue_id, &st, &path) == MAIL_OPEN_NO) 195 return (0); 196 197 if (msg_verbose) 198 msg_info("%s: %s", myname, path); 199 200 /* 201 * Skip files that have time stamps into the future. They need to cool 202 * down. Incoming and deferred files can have future time stamps. 203 */ 204 if ((scan_info->flags & QMGR_SCAN_ALL) == 0 205 && st.st_mtime > time((time_t *) 0) + 1) { 206 if (msg_verbose) 207 msg_info("%s: skip %s (%ld seconds)", myname, queue_id, 208 (long) (st.st_mtime - event_time())); 209 return (0); 210 } 211 212 /* 213 * Move the message to the active queue. File access errors are fatal. 214 */ 215 if (mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)) { 216 if (errno != ENOENT) 217 msg_fatal("%s: %s: rename from %s to %s: %m", myname, 218 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 219 msg_warn("%s: %s: rename from %s to %s: %m", myname, 220 queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE); 221 return (0); 222 } 223 224 /* 225 * Extract envelope information: sender and recipients. At this point, 226 * mail addresses have been processed by the cleanup service so they 227 * should be in canonical form. Generate requests to deliver this 228 * message. 229 * 230 * Throwing away queue files seems bad, especially when they made it this 231 * far into the mail system. Therefore we save bad files to a separate 232 * directory for further inspection. 233 * 234 * After queue manager restart it is possible that a queue file is still 235 * being delivered. In that case (the file is locked), defer delivery by 236 * a minimal amount of time. 237 */ 238#define QMGR_FLUSH_AFTER (QMGR_FLUSH_EACH | QMGR_FLUSH_DFXP) 239#define MAYBE_FLUSH_AFTER(mode) \ 240 (((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? QMGR_FLUSH_AFTER : 0) 241#define MAYBE_FORCE_EXPIRE(mode) \ 242 (((mode) & MAIL_QUEUE_STAT_EXPIRE) ? QMGR_FORCE_EXPIRE : 0) 243#define MAYBE_UPDATE_MODE(mode) \ 244 (((mode) & MAIL_QUEUE_STAT_UNTHROTTLE) ? \ 245 (mode) & ~MAIL_QUEUE_STAT_UNTHROTTLE : 0) 246 247 if ((message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id, 248 scan_info->flags 249 | MAYBE_FLUSH_AFTER(st.st_mode) 250 | MAYBE_FORCE_EXPIRE(st.st_mode), 251 MAYBE_UPDATE_MODE(st.st_mode))) == 0) { 252 qmgr_active_corrupt(queue_id); 253 return (0); 254 } else if (message == QMGR_MESSAGE_LOCKED) { 255 qmgr_active_defer(MAIL_QUEUE_ACTIVE, queue_id, MAIL_QUEUE_INCOMING, 60); 256 return (0); 257 } else { 258 259 /* 260 * Special case if all recipients were already delivered. Send any 261 * bounces and clean up. 262 */ 263 if (message->refcount == 0) 264 qmgr_active_done(message); 265 return (1); 266 } 267} 268 269/* qmgr_active_done - dispose of message after recipients have been tried */ 270 271void qmgr_active_done(QMGR_MESSAGE *message) 272{ 273 const char *myname = "qmgr_active_done"; 274 struct stat st; 275 276 if (msg_verbose) 277 msg_info("%s: %s", myname, message->queue_id); 278 279 /* 280 * During a previous iteration, an attempt to bounce this message may 281 * have failed, so there may still be a bounce log lying around. XXX By 282 * groping around in the bounce queue, we're trespassing on the bounce 283 * service's territory. But doing so is more robust than depending on the 284 * bounce daemon to do the lookup for us, and for us to do the deleting 285 * after we have received a successful status from the bounce service. 286 * The bounce queue directory blocks are most likely in memory anyway. If 287 * these lookups become a performance problem we will have to build an 288 * in-core cache into the bounce daemon. 289 * 290 * Don't bounce when the bounce log is empty. The bounce process obviously 291 * failed, and the delivery agent will have requested that the message be 292 * deferred. 293 * 294 * Bounces are sent asynchronously to avoid stalling while the cleanup 295 * daemon waits for the qmgr to accept the "new mail" trigger. 296 * 297 * See also code in cleanup_bounce.c. 298 */ 299 if (stat(mail_queue_path((VSTRING *) 0, MAIL_QUEUE_BOUNCE, message->queue_id), &st) == 0) { 300 if (st.st_size == 0) { 301 if (mail_queue_remove(MAIL_QUEUE_BOUNCE, message->queue_id)) 302 msg_fatal("remove %s %s: %m", 303 MAIL_QUEUE_BOUNCE, message->queue_id); 304 } else { 305 if (msg_verbose) 306 msg_info("%s: bounce %s", myname, message->queue_id); 307 if (message->verp_delims == 0 || var_verp_bounce_off) 308 abounce_flush(BOUNCE_FLAG_KEEP, 309 message->queue_name, 310 message->queue_id, 311 message->encoding, 312 message->smtputf8, 313 message->sender, 314 message->dsn_envid, 315 message->dsn_ret, 316 qmgr_active_done_2_bounce_flush, 317 (void *) message); 318 else 319 abounce_flush_verp(BOUNCE_FLAG_KEEP, 320 message->queue_name, 321 message->queue_id, 322 message->encoding, 323 message->smtputf8, 324 message->sender, 325 message->dsn_envid, 326 message->dsn_ret, 327 message->verp_delims, 328 qmgr_active_done_2_bounce_flush, 329 (void *) message); 330 return; 331 } 332 } 333 334 /* 335 * Asynchronous processing does not reach this point. 336 */ 337 qmgr_active_done_2_generic(message); 338} 339 340/* qmgr_active_done_2_bounce_flush - process abounce_flush() status */ 341 342static void qmgr_active_done_2_bounce_flush(int status, void *context) 343{ 344 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 345 346 /* 347 * Process abounce_flush() status and continue processing. 348 */ 349 message->flags |= status; 350 qmgr_active_done_2_generic(message); 351} 352 353/* qmgr_active_done_2_generic - continue processing */ 354 355static void qmgr_active_done_2_generic(QMGR_MESSAGE *message) 356{ 357 const char *path; 358 struct stat st; 359 360 /* 361 * A delivery agent marks a queue file as corrupt by changing its 362 * attributes, and by pretending that delivery was deferred. 363 */ 364 if (message->flags 365 && mail_open_ok(MAIL_QUEUE_ACTIVE, message->queue_id, &st, &path) == MAIL_OPEN_NO) { 366 qmgr_active_corrupt(message->queue_id); 367 qmgr_message_free(message); 368 return; 369 } 370 371 /* 372 * If we did not read all recipients from this file, go read some more, 373 * but remember whether some recipients have to be tried again. 374 * 375 * Throwing away queue files seems bad, especially when they made it this 376 * far into the mail system. Therefore we save bad files to a separate 377 * directory for further inspection by a human being. 378 */ 379 if (message->rcpt_offset > 0) { 380 if (qmgr_message_realloc(message) == 0) { 381 qmgr_active_corrupt(message->queue_id); 382 qmgr_message_free(message); 383 } else { 384 if (message->refcount == 0) 385 qmgr_active_done(message); /* recurse for consistency */ 386 } 387 return; 388 } 389 390 /* 391 * XXX With multi-recipient mail, some recipients may have NOTIFY=SUCCESS 392 * and others not. Depending on what subset of recipients are delivered, 393 * a trace file may or may not be created. Even when the last partial 394 * delivery attempt had no NOTIFY=SUCCESS recipients, a trace file may 395 * still exist from a previous partial delivery attempt. So as long as 396 * any recipient has NOTIFY=SUCCESS we have to always look for the trace 397 * file and be prepared for the file not to exist. 398 * 399 * See also comments in bounce/bounce_notify_util.c. 400 */ 401 if ((message->tflags & (DEL_REQ_FLAG_USR_VRFY | DEL_REQ_FLAG_RECORD 402 | DEL_REQ_FLAG_REC_DLY_SENT)) 403 || (message->rflags & QMGR_READ_FLAG_NOTIFY_SUCCESS)) { 404 atrace_flush(message->tflags, 405 message->queue_name, 406 message->queue_id, 407 message->encoding, 408 message->smtputf8, 409 message->sender, 410 message->dsn_envid, 411 message->dsn_ret, 412 qmgr_active_done_25_trace_flush, 413 (void *) message); 414 return; 415 } 416 417 /* 418 * Asynchronous processing does not reach this point. 419 */ 420 qmgr_active_done_25_generic(message); 421} 422 423/* qmgr_active_done_25_trace_flush - continue after atrace_flush() completion */ 424 425static void qmgr_active_done_25_trace_flush(int status, void *context) 426{ 427 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 428 429 /* 430 * Process atrace_flush() status and continue processing. 431 */ 432 if (status == 0 && message->tflags_offset) 433 qmgr_message_kill_record(message, message->tflags_offset); 434 message->flags |= status; 435 qmgr_active_done_25_generic(message); 436} 437 438/* qmgr_active_done_25_generic - continue processing */ 439 440static void qmgr_active_done_25_generic(QMGR_MESSAGE *message) 441{ 442 const char *myname = "qmgr_active_done_25_generic"; 443 const char *expire_status = 0; 444 445 /* 446 * If we get to this point we have tried all recipients for this message. 447 * If the message is too old, try to bounce it. 448 * 449 * Bounces are sent asynchronously to avoid stalling while the cleanup 450 * daemon waits for the qmgr to accept the "new mail" trigger. 451 */ 452 if (message->flags) { 453 if ((message->qflags & QMGR_FORCE_EXPIRE) != 0) { 454 expire_status = "force-expired"; 455 } else if (event_time() >= message->create_time + 456 (*message->sender ? var_max_queue_time : var_dsn_queue_time)) { 457 expire_status = "expired"; 458 } else { 459 expire_status = 0; 460 } 461 if (expire_status != 0) { 462 msg_info("%s: from=<%s>, status=%s, returned to sender", 463 message->queue_id, info_log_addr_form_sender(message->sender), 464 expire_status); 465 if (message->verp_delims == 0 || var_verp_bounce_off) 466 adefer_flush(BOUNCE_FLAG_KEEP, 467 message->queue_name, 468 message->queue_id, 469 message->encoding, 470 message->smtputf8, 471 message->sender, 472 message->dsn_envid, 473 message->dsn_ret, 474 qmgr_active_done_3_defer_flush, 475 (void *) message); 476 else 477 adefer_flush_verp(BOUNCE_FLAG_KEEP, 478 message->queue_name, 479 message->queue_id, 480 message->encoding, 481 message->smtputf8, 482 message->sender, 483 message->dsn_envid, 484 message->dsn_ret, 485 message->verp_delims, 486 qmgr_active_done_3_defer_flush, 487 (void *) message); 488 return; 489 } else if (message->warn_time > 0 490 && event_time() >= message->warn_time - 1) { 491 if (msg_verbose) 492 msg_info("%s: sending defer warning for %s", myname, message->queue_id); 493 adefer_warn(BOUNCE_FLAG_KEEP, 494 message->queue_name, 495 message->queue_id, 496 message->encoding, 497 message->smtputf8, 498 message->sender, 499 message->dsn_envid, 500 message->dsn_ret, 501 qmgr_active_done_3_defer_warn, 502 (void *) message); 503 return; 504 } 505 } 506 507 /* 508 * Asynchronous processing does not reach this point. 509 */ 510 qmgr_active_done_3_generic(message); 511} 512 513/* qmgr_active_done_3_defer_warn - continue after adefer_warn() completion */ 514 515static void qmgr_active_done_3_defer_warn(int status, void *context) 516{ 517 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 518 519 /* 520 * Process adefer_warn() completion status and continue processing. 521 */ 522 if (status == 0) 523 qmgr_message_update_warn(message); 524 qmgr_active_done_3_generic(message); 525} 526 527/* qmgr_active_done_3_defer_flush - continue after adefer_flush() completion */ 528 529static void qmgr_active_done_3_defer_flush(int status, void *context) 530{ 531 QMGR_MESSAGE *message = (QMGR_MESSAGE *) context; 532 533 /* 534 * Process adefer_flush() status and continue processing. 535 */ 536 message->flags = status; 537 qmgr_active_done_3_generic(message); 538} 539 540/* qmgr_active_done_3_generic - continue processing */ 541 542static void qmgr_active_done_3_generic(QMGR_MESSAGE *message) 543{ 544 const char *myname = "qmgr_active_done_3_generic"; 545 int delay; 546 547 /* 548 * Some recipients need to be tried again. Move the queue file time 549 * stamps into the future by the amount of time that the message is 550 * delayed, and move the message to the deferred queue. Impose minimal 551 * and maximal backoff times. 552 * 553 * Since we look at actual time in queue, not time since last delivery 554 * attempt, backoff times will be distributed. However, we can still see 555 * spikes in delivery activity because the interval between deferred 556 * queue scans is finite. 557 */ 558 if (message->flags) { 559 if (message->create_time > 0) { 560 delay = event_time() - message->create_time; 561 if (delay > var_max_backoff_time) 562 delay = var_max_backoff_time; 563 if (delay < var_min_backoff_time) 564 delay = var_min_backoff_time; 565 } else { 566 delay = var_min_backoff_time; 567 } 568 qmgr_active_defer(message->queue_name, message->queue_id, 569 MAIL_QUEUE_DEFERRED, delay); 570 } 571 572 /* 573 * All recipients done. Remove the queue file. 574 */ 575 else { 576 if (mail_queue_remove(message->queue_name, message->queue_id)) { 577 if (errno != ENOENT) 578 msg_fatal("%s: remove %s from %s: %m", myname, 579 message->queue_id, message->queue_name); 580 msg_warn("%s: remove %s from %s: %m", myname, 581 message->queue_id, message->queue_name); 582 } else { 583 /* Same format as logged by postsuper. */ 584 msg_info("%s: removed", message->queue_id); 585 } 586 } 587 588 /* 589 * Finally, delete the in-core message structure. 590 */ 591 qmgr_message_free(message); 592} 593 594/* qmgr_active_drain - drain active queue by allocating a delivery process */ 595 596void qmgr_active_drain(void) 597{ 598 QMGR_TRANSPORT *transport; 599 600 /* 601 * Allocate one delivery process for every transport with pending mail. 602 * The process allocation completes asynchronously. 603 */ 604 while ((transport = qmgr_transport_select()) != 0) { 605 if (msg_verbose) 606 msg_info("qmgr_active_drain: allocate %s", transport->name); 607 qmgr_transport_alloc(transport, qmgr_deliver); 608 } 609} 610