1/*++ 2/* NAME 3/* qmgr_message 3 4/* SUMMARY 5/* in-core message structures 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* int qmgr_message_count; 10/* int qmgr_recipient_count; 11/* 12/* QMGR_MESSAGE *qmgr_message_alloc(class, name, qflags, mode) 13/* const char *class; 14/* const char *name; 15/* int qflags; 16/* mode_t mode; 17/* 18/* QMGR_MESSAGE *qmgr_message_realloc(message) 19/* QMGR_MESSAGE *message; 20/* 21/* void qmgr_message_free(message) 22/* QMGR_MESSAGE *message; 23/* 24/* void qmgr_message_update_warn(message) 25/* QMGR_MESSAGE *message; 26/* 27/* void qmgr_message_kill_record(message, offset) 28/* QMGR_MESSAGE *message; 29/* long offset; 30/* DESCRIPTION 31/* This module performs en-gross operations on queue messages. 32/* 33/* qmgr_message_count is a global counter for the total number 34/* of in-core message structures (i.e. the total size of the 35/* `active' message queue). 36/* 37/* qmgr_recipient_count is a global counter for the total number 38/* of in-core recipient structures (i.e. the sum of all recipients 39/* in all in-core message structures). 40/* 41/* qmgr_message_alloc() creates an in-core message structure 42/* with sender and recipient information taken from the named queue 43/* file. A null result means the queue file could not be read or 44/* that the queue file contained incorrect information. A result 45/* QMGR_MESSAGE_LOCKED means delivery must be deferred. The number 46/* of recipients read from a queue file is limited by the global 47/* var_qmgr_rcpt_limit configuration parameter. When the limit 48/* is reached, the \fIrcpt_offset\fR structure member is set to 49/* the position where the read was terminated. Recipients are 50/* run through the resolver, and are assigned to destination 51/* queues. Recipients that cannot be assigned are deferred or 52/* bounced. Mail that has bounced twice is silently absorbed. 53/* A non-zero mode means change the queue file permissions. 54/* 55/* qmgr_message_realloc() resumes reading recipients from the queue 56/* file, and updates the recipient list and \fIrcpt_offset\fR message 57/* structure members. A null result means that the file could not be 58/* read or that the file contained incorrect information. Recipient 59/* limit imposed this time is based on the position of the message 60/* job(s) on corresponding transport job list(s). It's considered 61/* an error to call this when the recipient slots can't be allocated. 62/* 63/* qmgr_message_free() destroys an in-core message structure and makes 64/* the resources available for reuse. It is an error to destroy 65/* a message structure that is still referenced by queue entry structures. 66/* 67/* qmgr_message_update_warn() takes a closed message, opens it, updates 68/* the warning field, and closes it again. 69/* 70/* qmgr_message_kill_record() takes a closed message, opens it, updates 71/* the record type at the given offset to "killed", and closes the file. 72/* A killed envelope record is ignored. Killed records are not allowed 73/* inside the message content. 74/* DIAGNOSTICS 75/* Warnings: malformed message file. Fatal errors: out of memory. 76/* SEE ALSO 77/* envelope(3) message envelope parser 78/* LICENSE 79/* .ad 80/* .fi 81/* The Secure Mailer license must be distributed with this software. 82/* AUTHOR(S) 83/* Wietse Venema 84/* IBM T.J. Watson Research 85/* P.O. Box 704 86/* Yorktown Heights, NY 10598, USA 87/* 88/* Preemptive scheduler enhancements: 89/* Patrik Rak 90/* Modra 6 91/* 155 00, Prague, Czech Republic 92/*--*/ 93 94/* System library. */ 95 96#include <sys_defs.h> 97#include <sys/stat.h> 98#include <stdlib.h> 99#include <stdio.h> /* sscanf() */ 100#include <fcntl.h> 101#include <errno.h> 102#include <unistd.h> 103#include <string.h> 104#include <ctype.h> 105 106#ifdef STRCASECMP_IN_STRINGS_H 107#include <strings.h> 108#endif 109 110/* Utility library. */ 111 112#include <msg.h> 113#include <mymalloc.h> 114#include <vstring.h> 115#include <vstream.h> 116#include <split_at.h> 117#include <valid_hostname.h> 118#include <argv.h> 119#include <stringops.h> 120#include <myflock.h> 121#include <sane_time.h> 122 123/* Global library. */ 124 125#include <dict.h> 126#include <mail_queue.h> 127#include <mail_params.h> 128#include <canon_addr.h> 129#include <record.h> 130#include <rec_type.h> 131#include <sent.h> 132#include <deliver_completed.h> 133#include <opened.h> 134#include <verp_sender.h> 135#include <mail_proto.h> 136#include <qmgr_user.h> 137#include <split_addr.h> 138#include <dsn_mask.h> 139#include <rec_attr_map.h> 140 141/* Client stubs. */ 142 143#include <rewrite_clnt.h> 144#include <resolve_clnt.h> 145 146/* Application-specific. */ 147 148#include "qmgr.h" 149 150int qmgr_message_count; 151int qmgr_recipient_count; 152 153/* qmgr_message_create - create in-core message structure */ 154 155static QMGR_MESSAGE *qmgr_message_create(const char *queue_name, 156 const char *queue_id, int qflags) 157{ 158 QMGR_MESSAGE *message; 159 160 message = (QMGR_MESSAGE *) mymalloc(sizeof(QMGR_MESSAGE)); 161 qmgr_message_count++; 162 message->flags = 0; 163 message->qflags = qflags; 164 message->tflags = 0; 165 message->tflags_offset = 0; 166 message->rflags = QMGR_READ_FLAG_DEFAULT; 167 message->fp = 0; 168 message->refcount = 0; 169 message->single_rcpt = 0; 170 message->arrival_time.tv_sec = message->arrival_time.tv_usec = 0; 171 message->create_time = 0; 172 GETTIMEOFDAY(&message->active_time); 173 message->queued_time = sane_time(); 174 message->refill_time = 0; 175 message->data_offset = 0; 176 message->queue_id = mystrdup(queue_id); 177 message->queue_name = mystrdup(queue_name); 178 message->encoding = 0; 179 message->sender = 0; 180 message->dsn_envid = 0; 181 message->dsn_ret = 0; 182 message->filter_xport = 0; 183 message->inspect_xport = 0; 184 message->redirect_addr = 0; 185 message->data_size = 0; 186 message->cont_length = 0; 187 message->warn_offset = 0; 188 message->warn_time = 0; 189 message->rcpt_offset = 0; 190 message->verp_delims = 0; 191 message->client_name = 0; 192 message->client_addr = 0; 193 message->client_port = 0; 194 message->client_proto = 0; 195 message->client_helo = 0; 196 message->sasl_method = 0; 197 message->sasl_username = 0; 198 message->sasl_sender = 0; 199 message->log_ident = 0; 200 message->rewrite_context = 0; 201 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 202 message->rcpt_count = 0; 203 message->rcpt_limit = var_qmgr_msg_rcpt_limit; 204 message->rcpt_unread = 0; 205 QMGR_LIST_INIT(message->job_list); 206 return (message); 207} 208 209/* qmgr_message_close - close queue file */ 210 211static void qmgr_message_close(QMGR_MESSAGE *message) 212{ 213 vstream_fclose(message->fp); 214 message->fp = 0; 215} 216 217/* qmgr_message_open - open queue file */ 218 219static int qmgr_message_open(QMGR_MESSAGE *message) 220{ 221 222 /* 223 * Sanity check. 224 */ 225 if (message->fp) 226 msg_panic("%s: queue file is open", message->queue_id); 227 228 /* 229 * Open this queue file. Skip files that we cannot open. Back off when 230 * the system appears to be running out of resources. 231 */ 232 if ((message->fp = mail_queue_open(message->queue_name, 233 message->queue_id, 234 O_RDWR, 0)) == 0) { 235 if (errno != ENOENT) 236 msg_fatal("open %s %s: %m", message->queue_name, message->queue_id); 237 msg_warn("open %s %s: %m", message->queue_name, message->queue_id); 238 return (-1); 239 } 240 return (0); 241} 242 243/* qmgr_message_oldstyle_scan - support for Postfix < 1.0 queue files */ 244 245static void qmgr_message_oldstyle_scan(QMGR_MESSAGE *message) 246{ 247 VSTRING *buf; 248 long orig_offset, extra_offset; 249 int rec_type; 250 char *start; 251 252 /* 253 * Initialize. No early returns or we have a memory leak. 254 */ 255 buf = vstring_alloc(100); 256 if ((orig_offset = vstream_ftell(message->fp)) < 0) 257 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 258 259 /* 260 * Rewind to the very beginning to make sure we see all records. 261 */ 262 if (vstream_fseek(message->fp, 0, SEEK_SET) < 0) 263 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 264 265 /* 266 * Scan through the old style queue file. Count the total number of 267 * recipients and find the data/extra sections offsets. Note that the new 268 * queue files require that data_size equals extra_offset - data_offset, 269 * so we set data_size to this as well and ignore the size record itself 270 * completely. 271 */ 272 message->rcpt_unread = 0; 273 for (;;) { 274 rec_type = rec_get(message->fp, buf, 0); 275 if (rec_type <= 0) 276 /* Report missing end record later. */ 277 break; 278 start = vstring_str(buf); 279 if (msg_verbose > 1) 280 msg_info("old-style scan record %c %s", rec_type, start); 281 if (rec_type == REC_TYPE_END) 282 break; 283 if (rec_type == REC_TYPE_DONE 284 || rec_type == REC_TYPE_RCPT 285 || rec_type == REC_TYPE_DRCP) { 286 message->rcpt_unread++; 287 continue; 288 } 289 if (rec_type == REC_TYPE_MESG) { 290 if (message->data_offset == 0) { 291 if ((message->data_offset = vstream_ftell(message->fp)) < 0) 292 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 293 if ((extra_offset = atol(start)) <= message->data_offset) 294 msg_fatal("bad extra offset %s file %s", 295 start, VSTREAM_PATH(message->fp)); 296 if (vstream_fseek(message->fp, extra_offset, SEEK_SET) < 0) 297 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 298 message->data_size = extra_offset - message->data_offset; 299 } 300 continue; 301 } 302 } 303 304 /* 305 * Clean up. 306 */ 307 if (vstream_fseek(message->fp, orig_offset, SEEK_SET) < 0) 308 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 309 vstring_free(buf); 310 311 /* 312 * Sanity checks. Verify that all required information was found, 313 * including the queue file end marker. 314 */ 315 if (message->data_offset == 0 || rec_type != REC_TYPE_END) 316 msg_fatal("%s: envelope records out of order", message->queue_id); 317} 318 319/* qmgr_message_read - read envelope records */ 320 321static int qmgr_message_read(QMGR_MESSAGE *message) 322{ 323 VSTRING *buf; 324 int rec_type; 325 long curr_offset; 326 long save_offset = message->rcpt_offset; /* save a flag */ 327 int save_unread = message->rcpt_unread; /* save a count */ 328 char *start; 329 int recipient_limit; 330 const char *error_text; 331 char *name; 332 char *value; 333 char *orig_rcpt = 0; 334 int count; 335 int dsn_notify = 0; 336 char *dsn_orcpt = 0; 337 int n; 338 int have_log_client_attr = 0; 339 340 /* 341 * Initialize. No early returns or we have a memory leak. 342 */ 343 buf = vstring_alloc(100); 344 345 /* 346 * If we re-open this file, skip over on-file recipient records that we 347 * already looked at, and refill the in-core recipient address list. 348 * 349 * For the first time, the message recipient limit is calculated from the 350 * global recipient limit. This is to avoid reading little recipients 351 * when the active queue is near empty. When the queue becomes full, only 352 * the necessary amount is read in core. Such priming is necessary 353 * because there are no message jobs yet. 354 * 355 * For the next time, the recipient limit is based solely on the message 356 * jobs' positions in the job lists and/or job stacks. 357 */ 358 if (message->rcpt_offset) { 359 if (message->rcpt_list.len) 360 msg_panic("%s: recipient list not empty on recipient reload", 361 message->queue_id); 362 if (vstream_fseek(message->fp, message->rcpt_offset, SEEK_SET) < 0) 363 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 364 message->rcpt_offset = 0; 365 recipient_limit = message->rcpt_limit - message->rcpt_count; 366 } else { 367 recipient_limit = var_qmgr_rcpt_limit - qmgr_recipient_count; 368 if (recipient_limit < message->rcpt_limit) 369 recipient_limit = message->rcpt_limit; 370 } 371 /* Keep interrupt latency in check. */ 372 if (recipient_limit > 5000) 373 recipient_limit = 5000; 374 if (recipient_limit <= 0) 375 msg_panic("%s: no recipient slots available", message->queue_id); 376 if (msg_verbose) 377 msg_info("%s: recipient limit %d", message->queue_id, recipient_limit); 378 379 /* 380 * Read envelope records. XXX Rely on the front-end programs to enforce 381 * record size limits. Read up to recipient_limit recipients from the 382 * queue file, to protect against memory exhaustion. Recipient records 383 * may appear before or after the message content, so we keep reading 384 * from the queue file until we have enough recipients (rcpt_offset != 0) 385 * and until we know all the non-recipient information. 386 * 387 * Note that the total recipient count record is accurate only for fresh 388 * queue files. After some of the recipients are marked as done and the 389 * queue file is deferred, it can be used as upper bound estimate only. 390 * Fortunately, this poses no major problem on the scheduling algorithm, 391 * as the only impact is that the already deferred messages are not 392 * chosen by qmgr_job_candidate() as often as they could. 393 * 394 * On the first open, we must examine all non-recipient records. 395 * 396 * Optimization: when we know that recipient records are not mixed with 397 * non-recipient records, as is typical with mailing list mail, then we 398 * can avoid having to examine all the queue file records before we can 399 * start deliveries. This avoids some file system thrashing with huge 400 * mailing lists. 401 */ 402 for (;;) { 403 if ((curr_offset = vstream_ftell(message->fp)) < 0) 404 msg_fatal("vstream_ftell %s: %m", VSTREAM_PATH(message->fp)); 405 if (curr_offset == message->data_offset && curr_offset > 0) { 406 if (vstream_fseek(message->fp, message->data_size, SEEK_CUR) < 0) 407 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 408 curr_offset += message->data_size; 409 } 410 rec_type = rec_get_raw(message->fp, buf, 0, REC_FLAG_NONE); 411 start = vstring_str(buf); 412 if (msg_verbose > 1) 413 msg_info("record %c %s", rec_type, start); 414 if (rec_type == REC_TYPE_PTR) { 415 if ((rec_type = rec_goto(message->fp, start)) == REC_TYPE_ERROR) 416 break; 417 /* Need to update curr_offset after pointer jump. */ 418 continue; 419 } 420 if (rec_type <= 0) { 421 msg_warn("%s: message rejected: missing end record", 422 message->queue_id); 423 break; 424 } 425 if (rec_type == REC_TYPE_END) { 426 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 427 break; 428 } 429 430 /* 431 * Map named attributes to pseudo record types, so that we don't have 432 * to pollute the queue file with records that are incompatible with 433 * past Postfix versions. Preferably, people should be able to back 434 * out from an upgrade without losing mail. 435 */ 436 if (rec_type == REC_TYPE_ATTR) { 437 if ((error_text = split_nameval(start, &name, &value)) != 0) { 438 msg_warn("%s: ignoring bad attribute: %s: %.200s", 439 message->queue_id, error_text, start); 440 rec_type = REC_TYPE_ERROR; 441 break; 442 } 443 if ((n = rec_attr_map(name)) != 0) { 444 start = value; 445 rec_type = n; 446 } 447 } 448 449 /* 450 * Process recipient records. 451 */ 452 if (rec_type == REC_TYPE_RCPT) { 453 /* See also below for code setting orig_rcpt etc. */ 454 if (message->rcpt_offset == 0) { 455 message->rcpt_unread--; 456 recipient_list_add(&message->rcpt_list, curr_offset, 457 dsn_orcpt ? dsn_orcpt : "", 458 dsn_notify ? dsn_notify : 0, 459 orig_rcpt ? orig_rcpt : "", start); 460 if (dsn_orcpt) { 461 myfree(dsn_orcpt); 462 dsn_orcpt = 0; 463 } 464 if (orig_rcpt) { 465 myfree(orig_rcpt); 466 orig_rcpt = 0; 467 } 468 if (dsn_notify) 469 dsn_notify = 0; 470 if (message->rcpt_list.len >= recipient_limit) { 471 if ((message->rcpt_offset = vstream_ftell(message->fp)) < 0) 472 msg_fatal("vstream_ftell %s: %m", 473 VSTREAM_PATH(message->fp)); 474 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 475 /* We already examined all non-recipient records. */ 476 break; 477 if (message->rflags & QMGR_READ_FLAG_MIXED_RCPT_OTHER) 478 /* Examine all remaining non-recipient records. */ 479 continue; 480 /* Optimizations for "pure recipient" record sections. */ 481 if (curr_offset > message->data_offset) { 482 /* We already examined all non-recipient records. */ 483 message->rflags |= QMGR_READ_FLAG_SEEN_ALL_NON_RCPT; 484 break; 485 } 486 /* Examine non-recipient records in extracted segment. */ 487 if (vstream_fseek(message->fp, message->data_offset 488 + message->data_size, SEEK_SET) < 0) 489 msg_fatal("seek file %s: %m", VSTREAM_PATH(message->fp)); 490 continue; 491 } 492 } 493 continue; 494 } 495 if (rec_type == REC_TYPE_DONE || rec_type == REC_TYPE_DRCP) { 496 if (message->rcpt_offset == 0) { 497 message->rcpt_unread--; 498 if (dsn_orcpt) { 499 myfree(dsn_orcpt); 500 dsn_orcpt = 0; 501 } 502 if (orig_rcpt) { 503 myfree(orig_rcpt); 504 orig_rcpt = 0; 505 } 506 if (dsn_notify) 507 dsn_notify = 0; 508 } 509 continue; 510 } 511 if (rec_type == REC_TYPE_DSN_ORCPT) { 512 /* See also above for code clearing dsn_orcpt. */ 513 if (dsn_orcpt != 0) { 514 msg_warn("%s: ignoring out-of-order DSN original recipient address <%.200s>", 515 message->queue_id, dsn_orcpt); 516 myfree(dsn_orcpt); 517 dsn_orcpt = 0; 518 } 519 if (message->rcpt_offset == 0) 520 dsn_orcpt = mystrdup(start); 521 continue; 522 } 523 if (rec_type == REC_TYPE_DSN_NOTIFY) { 524 /* See also above for code clearing dsn_notify. */ 525 if (dsn_notify != 0) { 526 msg_warn("%s: ignoring out-of-order DSN notify flags <%d>", 527 message->queue_id, dsn_notify); 528 dsn_notify = 0; 529 } 530 if (message->rcpt_offset == 0) { 531 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_NOTIFY_OK(n)) 532 msg_warn("%s: ignoring malformed DSN notify flags <%.200s>", 533 message->queue_id, start); 534 else 535 dsn_notify = n; 536 continue; 537 } 538 } 539 if (rec_type == REC_TYPE_ORCP) { 540 /* See also above for code clearing orig_rcpt. */ 541 if (orig_rcpt != 0) { 542 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 543 message->queue_id, orig_rcpt); 544 myfree(orig_rcpt); 545 orig_rcpt = 0; 546 } 547 if (message->rcpt_offset == 0) 548 orig_rcpt = mystrdup(start); 549 continue; 550 } 551 552 /* 553 * Process non-recipient records. 554 */ 555 if (message->rflags & QMGR_READ_FLAG_SEEN_ALL_NON_RCPT) 556 /* We already examined all non-recipient records. */ 557 continue; 558 if (rec_type == REC_TYPE_SIZE) { 559 if (message->data_offset == 0) { 560 if ((count = sscanf(start, "%ld %ld %d %d %ld", 561 &message->data_size, &message->data_offset, 562 &message->rcpt_unread, &message->rflags, 563 &message->cont_length)) >= 3) { 564 /* Postfix >= 1.0 (a.k.a. 20010228). */ 565 if (message->data_offset <= 0 || message->data_size <= 0) { 566 msg_warn("%s: invalid size record: %.100s", 567 message->queue_id, start); 568 rec_type = REC_TYPE_ERROR; 569 break; 570 } 571 if (message->rflags & ~QMGR_READ_FLAG_USER) { 572 msg_warn("%s: invalid flags in size record: %.100s", 573 message->queue_id, start); 574 rec_type = REC_TYPE_ERROR; 575 break; 576 } 577 } else if (count == 1) { 578 /* Postfix < 1.0 (a.k.a. 20010228). */ 579 qmgr_message_oldstyle_scan(message); 580 } else { 581 /* Can't happen. */ 582 msg_warn("%s: message rejected: weird size record", 583 message->queue_id); 584 rec_type = REC_TYPE_ERROR; 585 break; 586 } 587 } 588 /* Postfix < 2.4 compatibility. */ 589 if (message->cont_length == 0) { 590 message->cont_length = message->data_size; 591 } else if (message->cont_length < 0) { 592 msg_warn("%s: invalid size record: %.100s", 593 message->queue_id, start); 594 rec_type = REC_TYPE_ERROR; 595 break; 596 } 597 continue; 598 } 599 if (rec_type == REC_TYPE_TIME) { 600 if (message->arrival_time.tv_sec == 0) 601 REC_TYPE_TIME_SCAN(start, message->arrival_time); 602 continue; 603 } 604 if (rec_type == REC_TYPE_CTIME) { 605 if (message->create_time == 0) 606 message->create_time = atol(start); 607 continue; 608 } 609 if (rec_type == REC_TYPE_FILT) { 610 if (message->filter_xport != 0) 611 myfree(message->filter_xport); 612 message->filter_xport = mystrdup(start); 613 continue; 614 } 615 if (rec_type == REC_TYPE_INSP) { 616 if (message->inspect_xport != 0) 617 myfree(message->inspect_xport); 618 message->inspect_xport = mystrdup(start); 619 continue; 620 } 621 if (rec_type == REC_TYPE_RDR) { 622 if (message->redirect_addr != 0) 623 myfree(message->redirect_addr); 624 message->redirect_addr = mystrdup(start); 625 continue; 626 } 627 if (rec_type == REC_TYPE_FROM) { 628 if (message->sender == 0) { 629 message->sender = mystrdup(start); 630 opened(message->queue_id, message->sender, 631 message->cont_length, message->rcpt_unread, 632 "queue %s", message->queue_name); 633 } 634 continue; 635 } 636 if (rec_type == REC_TYPE_DSN_ENVID) { 637 if (message->dsn_envid == 0) 638 message->dsn_envid = mystrdup(start); 639 } 640 if (rec_type == REC_TYPE_DSN_RET) { 641 if (message->dsn_ret == 0) { 642 if (!alldig(start) || (n = atoi(start)) == 0 || !DSN_RET_OK(n)) 643 msg_warn("%s: ignoring malformed DSN RET flags in queue file record:%.100s", 644 message->queue_id, start); 645 else 646 message->dsn_ret = n; 647 } 648 } 649 if (rec_type == REC_TYPE_ATTR) { 650 /* Allow extra segment to override envelope segment info. */ 651 if (strcmp(name, MAIL_ATTR_ENCODING) == 0) { 652 if (message->encoding != 0) 653 myfree(message->encoding); 654 message->encoding = mystrdup(value); 655 } 656 657 /* 658 * Backwards compatibility. Before Postfix 2.3, the logging 659 * attributes were called client_name, etc. Now they are called 660 * log_client_name. etc., and client_name is used for the actual 661 * client information. To support old queue files we accept both 662 * names for the purpose of logging; the new name overrides the 663 * old one. 664 * 665 * XXX Do not use the "legacy" client_name etc. attribute values for 666 * initializing the logging attributes, when this file already 667 * contains the "modern" log_client_name etc. logging attributes. 668 * Otherwise, logging attributes that are not present in the 669 * queue file would be set with information from the real client. 670 */ 671 else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_NAME) == 0) { 672 if (have_log_client_attr == 0 && message->client_name == 0) 673 message->client_name = mystrdup(value); 674 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_ADDR) == 0) { 675 if (have_log_client_attr == 0 && message->client_addr == 0) 676 message->client_addr = mystrdup(value); 677 } else if (strcmp(name, MAIL_ATTR_ACT_CLIENT_PORT) == 0) { 678 if (have_log_client_attr == 0 && message->client_port == 0) 679 message->client_port = mystrdup(value); 680 } else if (strcmp(name, MAIL_ATTR_ACT_PROTO_NAME) == 0) { 681 if (have_log_client_attr == 0 && message->client_proto == 0) 682 message->client_proto = mystrdup(value); 683 } else if (strcmp(name, MAIL_ATTR_ACT_HELO_NAME) == 0) { 684 if (have_log_client_attr == 0 && message->client_helo == 0) 685 message->client_helo = mystrdup(value); 686 } 687 /* Original client attributes. */ 688 else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_NAME) == 0) { 689 if (message->client_name != 0) 690 myfree(message->client_name); 691 message->client_name = mystrdup(value); 692 have_log_client_attr = 1; 693 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_ADDR) == 0) { 694 if (message->client_addr != 0) 695 myfree(message->client_addr); 696 message->client_addr = mystrdup(value); 697 have_log_client_attr = 1; 698 } else if (strcmp(name, MAIL_ATTR_LOG_CLIENT_PORT) == 0) { 699 if (message->client_port != 0) 700 myfree(message->client_port); 701 message->client_port = mystrdup(value); 702 have_log_client_attr = 1; 703 } else if (strcmp(name, MAIL_ATTR_LOG_PROTO_NAME) == 0) { 704 if (message->client_proto != 0) 705 myfree(message->client_proto); 706 message->client_proto = mystrdup(value); 707 have_log_client_attr = 1; 708 } else if (strcmp(name, MAIL_ATTR_LOG_HELO_NAME) == 0) { 709 if (message->client_helo != 0) 710 myfree(message->client_helo); 711 message->client_helo = mystrdup(value); 712 have_log_client_attr = 1; 713 } else if (strcmp(name, MAIL_ATTR_SASL_METHOD) == 0) { 714 if (message->sasl_method == 0) 715 message->sasl_method = mystrdup(value); 716 else 717 msg_warn("%s: ignoring multiple %s attribute: %s", 718 message->queue_id, MAIL_ATTR_SASL_METHOD, value); 719 } else if (strcmp(name, MAIL_ATTR_SASL_USERNAME) == 0) { 720 if (message->sasl_username == 0) 721 message->sasl_username = mystrdup(value); 722 else 723 msg_warn("%s: ignoring multiple %s attribute: %s", 724 message->queue_id, MAIL_ATTR_SASL_USERNAME, value); 725 } else if (strcmp(name, MAIL_ATTR_SASL_SENDER) == 0) { 726 if (message->sasl_sender == 0) 727 message->sasl_sender = mystrdup(value); 728 else 729 msg_warn("%s: ignoring multiple %s attribute: %s", 730 message->queue_id, MAIL_ATTR_SASL_SENDER, value); 731 } else if (strcmp(name, MAIL_ATTR_LOG_IDENT) == 0) { 732 if (message->log_ident == 0) 733 message->log_ident = mystrdup(value); 734 else 735 msg_warn("%s: ignoring multiple %s attribute: %s", 736 message->queue_id, MAIL_ATTR_LOG_IDENT, value); 737 } else if (strcmp(name, MAIL_ATTR_RWR_CONTEXT) == 0) { 738 if (message->rewrite_context == 0) 739 message->rewrite_context = mystrdup(value); 740 else 741 msg_warn("%s: ignoring multiple %s attribute: %s", 742 message->queue_id, MAIL_ATTR_RWR_CONTEXT, value); 743 } 744 745 /* 746 * Optional tracing flags (verify, sendmail -v, sendmail -bv). 747 * This record is killed after a trace logfile report is sent and 748 * after the logfile is deleted. 749 */ 750 else if (strcmp(name, MAIL_ATTR_TRACE_FLAGS) == 0) { 751 message->tflags = DEL_REQ_TRACE_FLAGS(atoi(value)); 752 if (message->tflags == DEL_REQ_FLAG_RECORD) 753 message->tflags_offset = curr_offset; 754 else 755 message->tflags_offset = 0; 756 } 757 continue; 758 } 759 if (rec_type == REC_TYPE_WARN) { 760 if (message->warn_offset == 0) { 761 message->warn_offset = curr_offset; 762 REC_TYPE_WARN_SCAN(start, message->warn_time); 763 } 764 continue; 765 } 766 if (rec_type == REC_TYPE_VERP) { 767 if (message->verp_delims == 0) { 768 if (message->sender == 0 || message->sender[0] == 0) { 769 msg_warn("%s: ignoring VERP request for null sender", 770 message->queue_id); 771 } else if (verp_delims_verify(start) != 0) { 772 msg_warn("%s: ignoring bad VERP request: \"%.100s\"", 773 message->queue_id, start); 774 } else { 775 if (msg_verbose) 776 msg_info("%s: enabling VERP for sender \"%.100s\"", 777 message->queue_id, message->sender); 778 message->single_rcpt = 1; 779 message->verp_delims = mystrdup(start); 780 } 781 } 782 continue; 783 } 784 } 785 786 /* 787 * Grr. 788 */ 789 if (dsn_orcpt != 0) { 790 if (rec_type > 0) 791 msg_warn("%s: ignoring out-of-order DSN original recipient <%.200s>", 792 message->queue_id, dsn_orcpt); 793 myfree(dsn_orcpt); 794 } 795 if (orig_rcpt != 0) { 796 if (rec_type > 0) 797 msg_warn("%s: ignoring out-of-order original recipient <%.200s>", 798 message->queue_id, orig_rcpt); 799 myfree(orig_rcpt); 800 } 801 802 /* 803 * Remember when we have read the last recipient batch. Note that we do 804 * it here after reading as reading might have used considerable amount 805 * of time. 806 */ 807 message->refill_time = sane_time(); 808 809 /* 810 * Avoid clumsiness elsewhere in the program. When sending data across an 811 * IPC channel, sending an empty string is more convenient than sending a 812 * null pointer. 813 */ 814 if (message->dsn_envid == 0) 815 message->dsn_envid = mystrdup(""); 816 if (message->encoding == 0) 817 message->encoding = mystrdup(MAIL_ATTR_ENC_NONE); 818 if (message->client_name == 0) 819 message->client_name = mystrdup(""); 820 if (message->client_addr == 0) 821 message->client_addr = mystrdup(""); 822 if (message->client_port == 0) 823 message->client_port = mystrdup(""); 824 if (message->client_proto == 0) 825 message->client_proto = mystrdup(""); 826 if (message->client_helo == 0) 827 message->client_helo = mystrdup(""); 828 if (message->sasl_method == 0) 829 message->sasl_method = mystrdup(""); 830 if (message->sasl_username == 0) 831 message->sasl_username = mystrdup(""); 832 if (message->sasl_sender == 0) 833 message->sasl_sender = mystrdup(""); 834 if (message->log_ident == 0) 835 message->log_ident = mystrdup(""); 836 if (message->rewrite_context == 0) 837 message->rewrite_context = mystrdup(MAIL_ATTR_RWR_LOCAL); 838 /* Postfix < 2.3 compatibility. */ 839 if (message->create_time == 0) 840 message->create_time = message->arrival_time.tv_sec; 841 842 /* 843 * Clean up. 844 */ 845 vstring_free(buf); 846 847 /* 848 * Sanity checks. Verify that all required information was found, 849 * including the queue file end marker. 850 */ 851 if (message->rcpt_unread < 0 852 || (message->rcpt_offset == 0 && message->rcpt_unread != 0)) { 853 msg_warn("%s: rcpt count mismatch (%d)", 854 message->queue_id, message->rcpt_unread); 855 message->rcpt_unread = 0; 856 } 857 if (rec_type <= 0) { 858 /* Already logged warning. */ 859 } else if (message->arrival_time.tv_sec == 0) { 860 msg_warn("%s: message rejected: missing arrival time record", 861 message->queue_id); 862 } else if (message->sender == 0) { 863 msg_warn("%s: message rejected: missing sender record", 864 message->queue_id); 865 } else if (message->data_offset == 0) { 866 msg_warn("%s: message rejected: missing size record", 867 message->queue_id); 868 } else { 869 return (0); 870 } 871 message->rcpt_offset = save_offset; /* restore flag */ 872 message->rcpt_unread = save_unread; /* restore count */ 873 recipient_list_free(&message->rcpt_list); 874 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 875 return (-1); 876} 877 878/* qmgr_message_update_warn - update the time of next delay warning */ 879 880void qmgr_message_update_warn(QMGR_MESSAGE *message) 881{ 882 883 /* 884 * XXX eventually this should let us schedule multiple warnings, right 885 * now it just allows for one. 886 */ 887 if (qmgr_message_open(message) 888 || vstream_fseek(message->fp, message->warn_offset, SEEK_SET) < 0 889 || rec_fprintf(message->fp, REC_TYPE_WARN, REC_TYPE_WARN_FORMAT, 890 REC_TYPE_WARN_ARG(0)) < 0 891 || vstream_fflush(message->fp)) 892 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 893 qmgr_message_close(message); 894} 895 896/* qmgr_message_kill_record - mark one message record as killed */ 897 898void qmgr_message_kill_record(QMGR_MESSAGE *message, long offset) 899{ 900 if (offset <= 0) 901 msg_panic("qmgr_message_kill_record: bad offset 0x%lx", offset); 902 if (qmgr_message_open(message) 903 || rec_put_type(message->fp, REC_TYPE_KILL, offset) < 0 904 || vstream_fflush(message->fp)) 905 msg_fatal("update queue file %s: %m", VSTREAM_PATH(message->fp)); 906 qmgr_message_close(message); 907} 908 909/* qmgr_message_sort_compare - compare recipient information */ 910 911static int qmgr_message_sort_compare(const void *p1, const void *p2) 912{ 913 RECIPIENT *rcpt1 = (RECIPIENT *) p1; 914 RECIPIENT *rcpt2 = (RECIPIENT *) p2; 915 QMGR_QUEUE *queue1; 916 QMGR_QUEUE *queue2; 917 char *at1; 918 char *at2; 919 int result; 920 921 /* 922 * Compare most significant to least significant recipient attributes. 923 * The comparison function must be transitive, so NULL values need to be 924 * assigned an ordinal (we set NULL last). 925 */ 926 927 queue1 = rcpt1->u.queue; 928 queue2 = rcpt2->u.queue; 929 if (queue1 != 0 && queue2 == 0) 930 return (-1); 931 if (queue1 == 0 && queue2 != 0) 932 return (1); 933 if (queue1 != 0 && queue2 != 0) { 934 935 /* 936 * Compare message transport. 937 */ 938 if ((result = strcmp(queue1->transport->name, 939 queue2->transport->name)) != 0) 940 return (result); 941 942 /* 943 * Compare queue name (nexthop or recipient@nexthop). 944 */ 945 if ((result = strcmp(queue1->name, queue2->name)) != 0) 946 return (result); 947 } 948 949 /* 950 * Compare recipient domain. 951 */ 952 at1 = strrchr(rcpt1->address, '@'); 953 at2 = strrchr(rcpt2->address, '@'); 954 if (at1 == 0 && at2 != 0) 955 return (1); 956 if (at1 != 0 && at2 == 0) 957 return (-1); 958 if (at1 != 0 && at2 != 0 959 && (result = strcasecmp(at1, at2)) != 0) 960 return (result); 961 962 /* 963 * Compare recipient address. 964 */ 965 return (strcmp(rcpt1->address, rcpt2->address)); 966} 967 968/* qmgr_message_sort - sort message recipient addresses by domain */ 969 970static void qmgr_message_sort(QMGR_MESSAGE *message) 971{ 972 qsort((char *) message->rcpt_list.info, message->rcpt_list.len, 973 sizeof(message->rcpt_list.info[0]), qmgr_message_sort_compare); 974 if (msg_verbose) { 975 RECIPIENT_LIST list = message->rcpt_list; 976 RECIPIENT *rcpt; 977 978 msg_info("start sorted recipient list"); 979 for (rcpt = list.info; rcpt < list.info + list.len; rcpt++) 980 msg_info("qmgr_message_sort: %s", rcpt->address); 981 msg_info("end sorted recipient list"); 982 } 983} 984 985/* qmgr_resolve_one - resolve or skip one recipient */ 986 987static int qmgr_resolve_one(QMGR_MESSAGE *message, RECIPIENT *recipient, 988 const char *addr, RESOLVE_REPLY *reply) 989{ 990#define QMGR_REDIRECT(rp, tp, np) do { \ 991 (rp)->flags = 0; \ 992 vstring_strcpy((rp)->transport, (tp)); \ 993 vstring_strcpy((rp)->nexthop, (np)); \ 994 } while (0) 995 996 if ((message->tflags & DEL_REQ_FLAG_MTA_VRFY) == 0) 997 resolve_clnt_query_from(message->sender, addr, reply); 998 else 999 resolve_clnt_verify_from(message->sender, addr, reply); 1000 if (reply->flags & RESOLVE_FLAG_FAIL) { 1001 QMGR_REDIRECT(reply, MAIL_SERVICE_RETRY, 1002 "4.3.0 address resolver failure"); 1003 return (0); 1004 } else if (reply->flags & RESOLVE_FLAG_ERROR) { 1005 QMGR_REDIRECT(reply, MAIL_SERVICE_ERROR, 1006 "5.1.3 bad address syntax"); 1007 return (0); 1008 } else { 1009 return (0); 1010 } 1011} 1012 1013/* qmgr_message_resolve - resolve recipients */ 1014 1015static void qmgr_message_resolve(QMGR_MESSAGE *message) 1016{ 1017 static ARGV *defer_xport_argv; 1018 RECIPIENT_LIST list = message->rcpt_list; 1019 RECIPIENT *recipient; 1020 QMGR_TRANSPORT *transport = 0; 1021 QMGR_QUEUE *queue = 0; 1022 RESOLVE_REPLY reply; 1023 VSTRING *queue_name; 1024 char *at; 1025 char **cpp; 1026 char *nexthop; 1027 ssize_t len; 1028 int status; 1029 DSN dsn; 1030 MSG_STATS stats; 1031 DSN *saved_dsn; 1032 1033#define STREQ(x,y) (strcmp(x,y) == 0) 1034#define STR vstring_str 1035#define LEN VSTRING_LEN 1036 1037 resolve_clnt_init(&reply); 1038 queue_name = vstring_alloc(1); 1039 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1040 1041 /* 1042 * Redirect overrides all else. But only once (per entire message). 1043 * For consistency with the remainder of Postfix, rewrite the address 1044 * to canonical form before resolving it. 1045 */ 1046 if (message->redirect_addr) { 1047 if (recipient > list.info) { 1048 recipient->u.queue = 0; 1049 continue; 1050 } 1051 message->rcpt_offset = 0; 1052 message->rcpt_unread = 0; 1053 1054 rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr, 1055 reply.recipient); 1056 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1057 if (qmgr_resolve_one(message, recipient, 1058 recipient->address, &reply) < 0) 1059 continue; 1060 if (!STREQ(recipient->address, STR(reply.recipient))) 1061 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1062 } 1063 1064 /* 1065 * Content filtering overrides the address resolver. 1066 * 1067 * XXX Bypass content_filter inspection for user-generated probes 1068 * (sendmail -bv). MTA-generated probes never have the "please filter 1069 * me" bits turned on, but we handle them here anyway for the sake of 1070 * future proofing. 1071 */ 1072#define FILTER_WITHOUT_NEXTHOP(filter, next) \ 1073 (((next) = split_at((filter), ':')) == 0 || *(next) == 0) 1074 1075#define RCPT_WITHOUT_DOMAIN(rcpt, next) \ 1076 ((next = strrchr(rcpt, '@')) == 0 || *++(next) == 0) 1077 1078 else if (message->filter_xport 1079 && (message->tflags & DEL_REQ_TRACE_ONLY_MASK) == 0) { 1080 reply.flags = 0; 1081 vstring_strcpy(reply.transport, message->filter_xport); 1082 if (FILTER_WITHOUT_NEXTHOP(STR(reply.transport), nexthop) 1083 && *(nexthop = var_def_filter_nexthop) == 0 1084 && RCPT_WITHOUT_DOMAIN(recipient->address, nexthop)) 1085 nexthop = var_myhostname; 1086 vstring_strcpy(reply.nexthop, nexthop); 1087 vstring_strcpy(reply.recipient, recipient->address); 1088 } 1089 1090 /* 1091 * Resolve the destination to (transport, nexthop, address). The 1092 * result address may differ from the one specified by the sender. 1093 */ 1094 else { 1095 if (qmgr_resolve_one(message, recipient, 1096 recipient->address, &reply) < 0) 1097 continue; 1098 if (!STREQ(recipient->address, STR(reply.recipient))) 1099 RECIPIENT_UPDATE(recipient->address, STR(reply.recipient)); 1100 } 1101 1102 /* 1103 * Bounce null recipients. This should never happen, but is most 1104 * likely the result of a fault in a different program, so aborting 1105 * the queue manager process does not help. 1106 */ 1107 if (recipient->address[0] == 0) { 1108 QMGR_REDIRECT(&reply, MAIL_SERVICE_ERROR, 1109 "5.1.3 null recipient address"); 1110 } 1111 1112 /* 1113 * Discard mail to the local double bounce address here, so this 1114 * system can run without a local delivery agent. They'd still have 1115 * to configure something for mail directed to the local postmaster, 1116 * though, but that is an RFC requirement anyway. 1117 * 1118 * XXX This lookup should be done in the resolver, and the mail should 1119 * be directed to a general-purpose null delivery agent. 1120 */ 1121 if (reply.flags & RESOLVE_CLASS_LOCAL) { 1122 at = strrchr(STR(reply.recipient), '@'); 1123 len = (at ? (at - STR(reply.recipient)) 1124 : strlen(STR(reply.recipient))); 1125 if (strncasecmp(STR(reply.recipient), var_double_bounce_sender, 1126 len) == 0 1127 && !var_double_bounce_sender[len]) { 1128 status = sent(message->tflags, message->queue_id, 1129 QMGR_MSG_STATS(&stats, message), recipient, 1130 "none", DSN_SIMPLE(&dsn, "2.0.0", 1131 "undeliverable postmaster notification discarded")); 1132 if (status == 0) { 1133 deliver_completed(message->fp, recipient->offset); 1134#if 0 1135 /* It's the default verification probe sender address. */ 1136 msg_warn("%s: undeliverable postmaster notification discarded", 1137 message->queue_id); 1138#endif 1139 } else 1140 message->flags |= status; 1141 continue; 1142 } 1143 } 1144 1145 /* 1146 * Optionally defer deliveries over specific transports, unless the 1147 * restriction is lifted temporarily. 1148 */ 1149 if (*var_defer_xports && (message->qflags & QMGR_FLUSH_DFXP) == 0) { 1150 if (defer_xport_argv == 0) 1151 defer_xport_argv = argv_split(var_defer_xports, " \t\r\n,"); 1152 for (cpp = defer_xport_argv->argv; *cpp; cpp++) 1153 if (strcmp(*cpp, STR(reply.transport)) == 0) 1154 break; 1155 if (*cpp) { 1156 QMGR_REDIRECT(&reply, MAIL_SERVICE_RETRY, 1157 "4.3.2 deferred transport"); 1158 } 1159 } 1160 1161 /* 1162 * Look up or instantiate the proper transport. 1163 */ 1164 if (transport == 0 || !STREQ(transport->name, STR(reply.transport))) { 1165 if ((transport = qmgr_transport_find(STR(reply.transport))) == 0) 1166 transport = qmgr_transport_create(STR(reply.transport)); 1167 queue = 0; 1168 } 1169 1170 /* 1171 * This message is being flushed. If need-be unthrottle the 1172 * transport. 1173 */ 1174 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1175 && QMGR_TRANSPORT_THROTTLED(transport)) 1176 qmgr_transport_unthrottle(transport); 1177 1178 /* 1179 * This transport is dead. Defer delivery to this recipient. 1180 */ 1181 if (QMGR_TRANSPORT_THROTTLED(transport)) { 1182 saved_dsn = transport->dsn; 1183 if ((transport = qmgr_error_transport(MAIL_SERVICE_RETRY)) != 0) { 1184 nexthop = qmgr_error_nexthop(saved_dsn); 1185 vstring_strcpy(reply.nexthop, nexthop); 1186 myfree(nexthop); 1187 queue = 0; 1188 } else { 1189 qmgr_defer_recipient(message, recipient, saved_dsn); 1190 continue; 1191 } 1192 } 1193 1194 /* 1195 * The nexthop destination provides the default name for the 1196 * per-destination queue. When the delivery agent accepts only one 1197 * recipient per delivery, give each recipient its own queue, so that 1198 * deliveries to different recipients of the same message can happen 1199 * in parallel, and so that we can enforce per-recipient concurrency 1200 * limits and prevent one recipient from tying up all the delivery 1201 * agent resources. We use recipient@nexthop as queue name rather 1202 * than the actual recipient domain name, so that one recipient in 1203 * multiple equivalent domains cannot evade the per-recipient 1204 * concurrency limit. Split the address on the recipient delimiter if 1205 * one is defined, so that extended addresses don't get extra 1206 * delivery slots. 1207 * 1208 * Fold the result to lower case so that we don't have multiple queues 1209 * for the same name. 1210 * 1211 * Important! All recipients in a queue must have the same nexthop 1212 * value. It is OK to have multiple queues with the same nexthop 1213 * value, but only when those queues are named after recipients. 1214 * 1215 * The single-recipient code below was written for local(8) like 1216 * delivery agents, and assumes that all domains that deliver to the 1217 * same (transport + nexthop) are aliases for $nexthop. Delivery 1218 * concurrency is changed from per-domain into per-recipient, by 1219 * changing the queue name from nexthop into localpart@nexthop. 1220 * 1221 * XXX This assumption is incorrect when different destinations share 1222 * the same (transport + nexthop). In reality, such transports are 1223 * rarely configured to use single-recipient deliveries. The fix is 1224 * to decouple the per-destination recipient limit from the 1225 * per-destination concurrency. 1226 */ 1227 vstring_strcpy(queue_name, STR(reply.nexthop)); 1228 if (strcmp(transport->name, MAIL_SERVICE_ERROR) != 0 1229 && strcmp(transport->name, MAIL_SERVICE_RETRY) != 0 1230 && transport->recipient_limit == 1) { 1231 /* Copy the recipient localpart. */ 1232 at = strrchr(STR(reply.recipient), '@'); 1233 len = (at ? (at - STR(reply.recipient)) 1234 : strlen(STR(reply.recipient))); 1235 vstring_strncpy(queue_name, STR(reply.recipient), len); 1236 /* Remove the address extension from the recipient localpart. */ 1237 if (*var_rcpt_delim && split_addr(STR(queue_name), *var_rcpt_delim)) 1238 vstring_truncate(queue_name, strlen(STR(queue_name))); 1239 /* Assume the recipient domain is equivalent to nexthop. */ 1240 vstring_sprintf_append(queue_name, "@%s", STR(reply.nexthop)); 1241 } 1242 lowercase(STR(queue_name)); 1243 1244 /* 1245 * This transport is alive. Find or instantiate a queue for this 1246 * recipient. 1247 */ 1248 if (queue == 0 || !STREQ(queue->name, STR(queue_name))) { 1249 if ((queue = qmgr_queue_find(transport, STR(queue_name))) == 0) 1250 queue = qmgr_queue_create(transport, STR(queue_name), 1251 STR(reply.nexthop)); 1252 } 1253 1254 /* 1255 * This message is being flushed. If need-be unthrottle the queue. 1256 */ 1257 if ((message->qflags & QMGR_FLUSH_EACH) != 0 1258 && QMGR_QUEUE_THROTTLED(queue)) 1259 qmgr_queue_unthrottle(queue); 1260 1261 /* 1262 * This queue is dead. Defer delivery to this recipient. 1263 */ 1264 if (QMGR_QUEUE_THROTTLED(queue)) { 1265 saved_dsn = queue->dsn; 1266 if ((queue = qmgr_error_queue(MAIL_SERVICE_RETRY, saved_dsn)) == 0) { 1267 qmgr_defer_recipient(message, recipient, saved_dsn); 1268 continue; 1269 } 1270 } 1271 1272 /* 1273 * This queue is alive. Bind this recipient to this queue instance. 1274 */ 1275 recipient->u.queue = queue; 1276 } 1277 resolve_clnt_free(&reply); 1278 vstring_free(queue_name); 1279} 1280 1281/* qmgr_message_assign - assign recipients to specific delivery requests */ 1282 1283static void qmgr_message_assign(QMGR_MESSAGE *message) 1284{ 1285 RECIPIENT_LIST list = message->rcpt_list; 1286 RECIPIENT *recipient; 1287 QMGR_ENTRY *entry = 0; 1288 QMGR_QUEUE *queue; 1289 QMGR_JOB *job = 0; 1290 QMGR_PEER *peer = 0; 1291 1292 /* 1293 * Try to bundle as many recipients in a delivery request as we can. When 1294 * the recipient resolves to the same site and transport as an existing 1295 * recipient, do not create a new queue entry, just move that recipient 1296 * to the recipient list of the existing queue entry. All this provided 1297 * that we do not exceed the transport-specific limit on the number of 1298 * recipients per transaction. 1299 */ 1300#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit))) 1301 1302 for (recipient = list.info; recipient < list.info + list.len; recipient++) { 1303 1304 /* 1305 * Skip recipients with a dead transport or destination. 1306 */ 1307 if ((queue = recipient->u.queue) == 0) 1308 continue; 1309 1310 /* 1311 * Lookup or instantiate the message job if necessary. 1312 */ 1313 if (job == 0 || queue->transport != job->transport) { 1314 job = qmgr_job_obtain(message, queue->transport); 1315 peer = 0; 1316 } 1317 1318 /* 1319 * Lookup or instantiate job peer if necessary. 1320 */ 1321 if (peer == 0 || queue != peer->queue) 1322 peer = qmgr_peer_obtain(job, queue); 1323 1324 /* 1325 * Lookup old or instantiate new recipient entry. We try to reuse the 1326 * last existing entry whenever the recipient limit permits. 1327 */ 1328 entry = peer->entry_list.prev; 1329 if (message->single_rcpt || entry == 0 1330 || !LIMIT_OK(queue->transport->recipient_limit, entry->rcpt_list.len)) 1331 entry = qmgr_entry_create(peer, message); 1332 1333 /* 1334 * Add the recipient to the current entry and increase all those 1335 * recipient counters accordingly. 1336 */ 1337 recipient_list_add(&entry->rcpt_list, recipient->offset, 1338 recipient->dsn_orcpt, recipient->dsn_notify, 1339 recipient->orig_addr, recipient->address); 1340 job->rcpt_count++; 1341 message->rcpt_count++; 1342 qmgr_recipient_count++; 1343 } 1344 1345 /* 1346 * Release the message recipient list and reinitialize it for the next 1347 * time. 1348 */ 1349 recipient_list_free(&message->rcpt_list); 1350 recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE); 1351 1352 /* 1353 * Note that even if qmgr_job_obtain() reset the job candidate cache of 1354 * all transports to which we assigned new recipients, this message may 1355 * have other jobs which we didn't touch at all this time. But the number 1356 * of unread recipients affecting the candidate selection might have 1357 * changed considerably, so we must invalidate the caches if it might be 1358 * of some use. 1359 */ 1360 for (job = message->job_list.next; job; job = job->message_peers.next) 1361 if (job->selected_entries < job->read_entries 1362 && job->blocker_tag != job->transport->blocker_tag) 1363 job->transport->candidate_cache_current = 0; 1364} 1365 1366/* qmgr_message_move_limits - recycle unused recipient slots */ 1367 1368static void qmgr_message_move_limits(QMGR_MESSAGE *message) 1369{ 1370 QMGR_JOB *job; 1371 1372 for (job = message->job_list.next; job; job = job->message_peers.next) 1373 qmgr_job_move_limits(job); 1374} 1375 1376/* qmgr_message_free - release memory for in-core message structure */ 1377 1378void qmgr_message_free(QMGR_MESSAGE *message) 1379{ 1380 QMGR_JOB *job; 1381 1382 if (message->refcount != 0) 1383 msg_panic("qmgr_message_free: reference len: %d", message->refcount); 1384 if (message->fp) 1385 msg_panic("qmgr_message_free: queue file is open"); 1386 while ((job = message->job_list.next) != 0) 1387 qmgr_job_free(job); 1388 myfree(message->queue_id); 1389 myfree(message->queue_name); 1390 if (message->dsn_envid) 1391 myfree(message->dsn_envid); 1392 if (message->encoding) 1393 myfree(message->encoding); 1394 if (message->sender) 1395 myfree(message->sender); 1396 if (message->verp_delims) 1397 myfree(message->verp_delims); 1398 if (message->filter_xport) 1399 myfree(message->filter_xport); 1400 if (message->inspect_xport) 1401 myfree(message->inspect_xport); 1402 if (message->redirect_addr) 1403 myfree(message->redirect_addr); 1404 if (message->client_name) 1405 myfree(message->client_name); 1406 if (message->client_addr) 1407 myfree(message->client_addr); 1408 if (message->client_port) 1409 myfree(message->client_port); 1410 if (message->client_proto) 1411 myfree(message->client_proto); 1412 if (message->client_helo) 1413 myfree(message->client_helo); 1414 if (message->sasl_method) 1415 myfree(message->sasl_method); 1416 if (message->sasl_username) 1417 myfree(message->sasl_username); 1418 if (message->sasl_sender) 1419 myfree(message->sasl_sender); 1420 if (message->log_ident) 1421 myfree(message->log_ident); 1422 if (message->rewrite_context) 1423 myfree(message->rewrite_context); 1424 recipient_list_free(&message->rcpt_list); 1425 qmgr_message_count--; 1426 myfree((char *) message); 1427} 1428 1429/* qmgr_message_alloc - create in-core message structure */ 1430 1431QMGR_MESSAGE *qmgr_message_alloc(const char *queue_name, const char *queue_id, 1432 int qflags, mode_t mode) 1433{ 1434 const char *myname = "qmgr_message_alloc"; 1435 QMGR_MESSAGE *message; 1436 1437 if (msg_verbose) 1438 msg_info("%s: %s %s", myname, queue_name, queue_id); 1439 1440 /* 1441 * Create an in-core message structure. 1442 */ 1443 message = qmgr_message_create(queue_name, queue_id, qflags); 1444 1445 /* 1446 * Extract message envelope information: time of arrival, sender address, 1447 * recipient addresses. Skip files with malformed envelope information. 1448 */ 1449#define QMGR_LOCK_MODE (MYFLOCK_OP_EXCLUSIVE | MYFLOCK_OP_NOWAIT) 1450 1451 if (qmgr_message_open(message) < 0) { 1452 qmgr_message_free(message); 1453 return (0); 1454 } 1455 if (myflock(vstream_fileno(message->fp), INTERNAL_LOCK, QMGR_LOCK_MODE) < 0) { 1456 msg_info("%s: skipped, still being delivered", queue_id); 1457 qmgr_message_close(message); 1458 qmgr_message_free(message); 1459 return (QMGR_MESSAGE_LOCKED); 1460 } 1461 if (qmgr_message_read(message) < 0) { 1462 qmgr_message_close(message); 1463 qmgr_message_free(message); 1464 return (0); 1465 } else { 1466 1467 /* 1468 * We have validated the queue file content, so it is safe to modify 1469 * the file properties now. 1470 */ 1471 if (mode != 0 && fchmod(vstream_fileno(message->fp), mode) < 0) 1472 msg_fatal("fchmod %s: %m", VSTREAM_PATH(message->fp)); 1473 1474 /* 1475 * Reset the defer log. This code should not be here, but we must 1476 * reset the defer log *after* acquiring the exclusive lock on the 1477 * queue file and *before* resolving new recipients. Since all those 1478 * operations are encapsulated so nicely by this routine, the defer 1479 * log reset has to be done here as well. 1480 * 1481 * Note: it is safe to remove the defer logfile from a previous queue 1482 * run of this queue file, because the defer log contains information 1483 * about recipients that still exist in this queue file. 1484 */ 1485 if (mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT) 1486 msg_fatal("%s: %s: remove %s %s: %m", myname, 1487 queue_id, MAIL_QUEUE_DEFER, queue_id); 1488 qmgr_message_sort(message); 1489 qmgr_message_resolve(message); 1490 qmgr_message_sort(message); 1491 qmgr_message_assign(message); 1492 qmgr_message_close(message); 1493 if (message->rcpt_offset == 0) 1494 qmgr_message_move_limits(message); 1495 return (message); 1496 } 1497} 1498 1499/* qmgr_message_realloc - refresh in-core message structure */ 1500 1501QMGR_MESSAGE *qmgr_message_realloc(QMGR_MESSAGE *message) 1502{ 1503 const char *myname = "qmgr_message_realloc"; 1504 1505 /* 1506 * Sanity checks. 1507 */ 1508 if (message->rcpt_offset <= 0) 1509 msg_panic("%s: invalid offset: %ld", myname, message->rcpt_offset); 1510 if (msg_verbose) 1511 msg_info("%s: %s %s offset %ld", myname, message->queue_name, 1512 message->queue_id, message->rcpt_offset); 1513 1514 /* 1515 * Extract recipient addresses. Skip files with malformed envelope 1516 * information. 1517 */ 1518 if (qmgr_message_open(message) < 0) 1519 return (0); 1520 if (qmgr_message_read(message) < 0) { 1521 qmgr_message_close(message); 1522 return (0); 1523 } else { 1524 qmgr_message_sort(message); 1525 qmgr_message_resolve(message); 1526 qmgr_message_sort(message); 1527 qmgr_message_assign(message); 1528 qmgr_message_close(message); 1529 if (message->rcpt_offset == 0) 1530 qmgr_message_move_limits(message); 1531 return (message); 1532 } 1533} 1534