1/*++ 2/* NAME 3/* qmgr_deliver 3 4/* SUMMARY 5/* deliver one per-site queue entry to that site 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* int qmgr_deliver_concurrency; 10/* 11/* int qmgr_deliver(transport, fp) 12/* QMGR_TRANSPORT *transport; 13/* VSTREAM *fp; 14/* DESCRIPTION 15/* This module implements the client side of the `queue manager 16/* to delivery agent' protocol. The queue manager uses 17/* asynchronous I/O so that it can drive multiple delivery 18/* agents in parallel. Depending on the outcome of a delivery 19/* attempt, the status of messages, queues and transports is 20/* updated. 21/* 22/* qmgr_deliver_concurrency is a global counter that says how 23/* many delivery processes are in use. This can be used, for 24/* example, to control the size of the `active' message queue. 25/* 26/* qmgr_deliver() executes when a delivery process announces its 27/* availability for the named transport. It arranges for delivery 28/* of a suitable queue entry. The \fIfp\fR argument specifies a 29/* stream that is connected to a delivery process, or a null 30/* pointer if the transport accepts no connection. Upon completion 31/* of delivery (successful or not), the stream is closed, so that the 32/* delivery process is released. 33/* DIAGNOSTICS 34/* LICENSE 35/* .ad 36/* .fi 37/* The Secure Mailer license must be distributed with this software. 38/* AUTHOR(S) 39/* Wietse Venema 40/* IBM T.J. Watson Research 41/* P.O. Box 704 42/* Yorktown Heights, NY 10598, USA 43/* 44/* Preemptive scheduler enhancements: 45/* Patrik Rak 46/* Modra 6 47/* 155 00, Prague, Czech Republic 48/*--*/ 49 50/* System library. */ 51 52#include <sys_defs.h> 53#include <time.h> 54#include <string.h> 55 56/* Utility library. */ 57 58#include <msg.h> 59#include <vstring.h> 60#include <vstream.h> 61#include <vstring_vstream.h> 62#include <events.h> 63#include <iostuff.h> 64#include <stringops.h> 65#include <mymalloc.h> 66 67/* Global library. */ 68 69#include <mail_queue.h> 70#include <mail_proto.h> 71#include <recipient_list.h> 72#include <mail_params.h> 73#include <deliver_request.h> 74#include <verp_sender.h> 75#include <dsn_util.h> 76#include <dsn_buf.h> 77#include <dsb_scan.h> 78#include <rcpt_print.h> 79 80/* Application-specific. */ 81 82#include "qmgr.h" 83 84int qmgr_deliver_concurrency; 85 86 /* 87 * Message delivery status codes. 88 */ 89#define DELIVER_STAT_OK 0 /* all recipients delivered */ 90#define DELIVER_STAT_DEFER 1 /* try some recipients later */ 91#define DELIVER_STAT_CRASH 2 /* mailer internal problem */ 92 93/* qmgr_deliver_initial_reply - retrieve initial delivery process response */ 94 95static int qmgr_deliver_initial_reply(VSTREAM *stream) 96{ 97 int stat; 98 99 if (peekfd(vstream_fileno(stream)) < 0) { 100 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 101 return (DELIVER_STAT_CRASH); 102 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 103 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat, 104 ATTR_TYPE_END) != 1) { 105 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 106 return (DELIVER_STAT_CRASH); 107 } else { 108 return (stat ? DELIVER_STAT_DEFER : 0); 109 } 110} 111 112/* qmgr_deliver_final_reply - retrieve final delivery process response */ 113 114static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb) 115{ 116 int stat; 117 118 if (peekfd(vstream_fileno(stream)) < 0) { 119 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream)); 120 return (DELIVER_STAT_CRASH); 121 } else if (attr_scan(stream, ATTR_FLAG_STRICT, 122 ATTR_TYPE_FUNC, dsb_scan, (void *) dsb, 123 ATTR_TYPE_INT, MAIL_ATTR_STATUS, &stat, 124 ATTR_TYPE_END) != 2) { 125 msg_warn("%s: malformed response", VSTREAM_PATH(stream)); 126 return (DELIVER_STAT_CRASH); 127 } else { 128 return (stat ? DELIVER_STAT_DEFER : 0); 129 } 130} 131 132/* qmgr_deliver_send_request - send delivery request to delivery process */ 133 134static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream) 135{ 136 RECIPIENT_LIST list = entry->rcpt_list; 137 RECIPIENT *recipient; 138 QMGR_MESSAGE *message = entry->message; 139 VSTRING *sender_buf = 0; 140 MSG_STATS stats; 141 char *sender; 142 int flags; 143 144 /* 145 * If variable envelope return path is requested, change prefix+@origin 146 * into prefix+user=domain@origin. Note that with VERP there is only one 147 * recipient per delivery. 148 */ 149 if (message->verp_delims == 0) { 150 sender = message->sender; 151 } else { 152 sender_buf = vstring_alloc(100); 153 verp_sender(sender_buf, message->verp_delims, 154 message->sender, list.info); 155 sender = vstring_str(sender_buf); 156 } 157 158 flags = message->tflags 159 | entry->queue->dflags 160 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT); 161 (void) QMGR_MSG_STATS(&stats, message); 162 attr_print(stream, ATTR_FLAG_NONE, 163 ATTR_TYPE_INT, MAIL_ATTR_FLAGS, flags, 164 ATTR_TYPE_STR, MAIL_ATTR_QUEUE, message->queue_name, 165 ATTR_TYPE_STR, MAIL_ATTR_QUEUEID, message->queue_id, 166 ATTR_TYPE_LONG, MAIL_ATTR_OFFSET, message->data_offset, 167 ATTR_TYPE_LONG, MAIL_ATTR_SIZE, message->cont_length, 168 ATTR_TYPE_STR, MAIL_ATTR_NEXTHOP, entry->queue->nexthop, 169 ATTR_TYPE_STR, MAIL_ATTR_ENCODING, message->encoding, 170 ATTR_TYPE_STR, MAIL_ATTR_SENDER, sender, 171 ATTR_TYPE_STR, MAIL_ATTR_DSN_ENVID, message->dsn_envid, 172 ATTR_TYPE_INT, MAIL_ATTR_DSN_RET, message->dsn_ret, 173 ATTR_TYPE_FUNC, msg_stats_print, (void *) &stats, 174 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 175 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_NAME, message->client_name, 176 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr, 177 ATTR_TYPE_STR, MAIL_ATTR_LOG_CLIENT_PORT, message->client_port, 178 ATTR_TYPE_STR, MAIL_ATTR_LOG_PROTO_NAME, message->client_proto, 179 ATTR_TYPE_STR, MAIL_ATTR_LOG_HELO_NAME, message->client_helo, 180 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */ 181 ATTR_TYPE_STR, MAIL_ATTR_SASL_METHOD, message->sasl_method, 182 ATTR_TYPE_STR, MAIL_ATTR_SASL_USERNAME, message->sasl_username, 183 ATTR_TYPE_STR, MAIL_ATTR_SASL_SENDER, message->sasl_sender, 184 /* XXX Ditto if we want to pass TLS certificate info. */ 185 ATTR_TYPE_STR, MAIL_ATTR_LOG_IDENT, message->log_ident, 186 ATTR_TYPE_STR, MAIL_ATTR_RWR_CONTEXT, message->rewrite_context, 187 ATTR_TYPE_INT, MAIL_ATTR_RCPT_COUNT, list.len, 188 ATTR_TYPE_END); 189 if (sender_buf != 0) 190 vstring_free(sender_buf); 191 for (recipient = list.info; recipient < list.info + list.len; recipient++) 192 attr_print(stream, ATTR_FLAG_NONE, 193 ATTR_TYPE_FUNC, rcpt_print, (void *) recipient, 194 ATTR_TYPE_END); 195 if (vstream_fflush(stream) != 0) { 196 msg_warn("write to process (%s): %m", entry->queue->transport->name); 197 return (-1); 198 } else { 199 if (msg_verbose) 200 msg_info("qmgr_deliver: site `%s'", entry->queue->name); 201 return (0); 202 } 203} 204 205/* qmgr_deliver_abort - transport response watchdog */ 206 207static void qmgr_deliver_abort(int unused_event, char *context) 208{ 209 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 210 QMGR_QUEUE *queue = entry->queue; 211 QMGR_TRANSPORT *transport = queue->transport; 212 QMGR_MESSAGE *message = entry->message; 213 214 msg_fatal("%s: timeout receiving delivery status from transport: %s", 215 message->queue_id, transport->name); 216} 217 218/* qmgr_deliver_update - process delivery status report */ 219 220static void qmgr_deliver_update(int unused_event, char *context) 221{ 222 QMGR_ENTRY *entry = (QMGR_ENTRY *) context; 223 QMGR_QUEUE *queue = entry->queue; 224 QMGR_TRANSPORT *transport = queue->transport; 225 QMGR_MESSAGE *message = entry->message; 226 static DSN_BUF *dsb; 227 int status; 228 229 /* 230 * Release the delivery agent from a "hot" queue entry. 231 */ 232#define QMGR_DELIVER_RELEASE_AGENT(entry) do { \ 233 event_disable_readwrite(vstream_fileno(entry->stream)); \ 234 (void) vstream_fclose(entry->stream); \ 235 entry->stream = 0; \ 236 qmgr_deliver_concurrency--; \ 237 } while (0) 238 239 if (dsb == 0) 240 dsb = dsb_create(); 241 242 /* 243 * The message transport has responded. Stop the watchdog timer. 244 */ 245 event_cancel_timer(qmgr_deliver_abort, context); 246 247 /* 248 * Retrieve the delivery agent status report. The numerical status code 249 * indicates if delivery should be tried again. The reason text is sent 250 * only when a site should be avoided for a while, so that the queue 251 * manager can log why it does not even try to schedule delivery to the 252 * affected recipients. 253 */ 254 status = qmgr_deliver_final_reply(entry->stream, dsb); 255 256 /* 257 * The mail delivery process failed for some reason (although delivery 258 * may have been successful). Back off with this transport type for a 259 * while. Dispose of queue entries for this transport that await 260 * selection (the todo lists). Stay away from queue entries that have 261 * been selected (the busy lists), or we would have dangling pointers. 262 * The queue itself won't go away before we dispose of the current queue 263 * entry. 264 */ 265 if (status == DELIVER_STAT_CRASH) { 266 message->flags |= DELIVER_STAT_DEFER; 267#if 0 268 whatsup = concatenate("unknown ", transport->name, 269 " mail transport error", (char *) 0); 270 qmgr_transport_throttle(transport, 271 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup)); 272 myfree(whatsup); 273#else 274 qmgr_transport_throttle(transport, 275 DSN_SIMPLE(&dsb->dsn, "4.3.0", 276 "unknown mail transport error")); 277#endif 278 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description", 279 transport->name); 280 281 /* 282 * Assume the worst and write a defer logfile record for each 283 * recipient. This omission was already present in the first queue 284 * manager implementation of 199703, and was fixed 200511. 285 * 286 * To avoid the synchronous qmgr_defer_recipient() operation for each 287 * recipient of this queue entry, release the delivery process and 288 * move the entry back to the todo queue. Let qmgr_defer_transport() 289 * log the recipient asynchronously if possible, and get out of here. 290 * Note: if asynchronous logging is not possible, 291 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and 292 * the entry becomes a dangling pointer. 293 */ 294 QMGR_DELIVER_RELEASE_AGENT(entry); 295 qmgr_entry_unselect(entry); 296 qmgr_defer_transport(transport, &dsb->dsn); 297 return; 298 } 299 300 /* 301 * This message must be tried again. 302 * 303 * If we have a problem talking to this site, back off with this site for a 304 * while; dispose of queue entries for this site that await selection 305 * (the todo list); stay away from queue entries that have been selected 306 * (the busy list), or we would have dangling pointers. The queue itself 307 * won't go away before we dispose of the current queue entry. 308 * 309 * XXX Caution: DSN_COPY() will panic on empty status or reason. 310 */ 311#define SUSPENDED "delivery temporarily suspended: " 312 313 if (status == DELIVER_STAT_DEFER) { 314 message->flags |= DELIVER_STAT_DEFER; 315 if (VSTRING_LEN(dsb->status)) { 316 /* Sanitize the DSN status/reason from the delivery agent. */ 317 if (!dsn_valid(vstring_str(dsb->status))) 318 vstring_strcpy(dsb->status, "4.0.0"); 319 if (VSTRING_LEN(dsb->reason) == 0) 320 vstring_strcpy(dsb->reason, "unknown error"); 321 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1); 322 if (QMGR_QUEUE_READY(queue)) { 323 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb)); 324 if (QMGR_QUEUE_THROTTLED(queue)) 325 qmgr_defer_todo(queue, &dsb->dsn); 326 } 327 } 328 } 329 330 /* 331 * No problems detected. Mark the transport and queue as alive. The queue 332 * itself won't go away before we dispose of the current queue entry. 333 */ 334 if (status != DELIVER_STAT_CRASH && VSTRING_LEN(dsb->reason) == 0) { 335 qmgr_transport_unthrottle(transport); 336 qmgr_queue_unthrottle(queue); 337 } 338 339 /* 340 * Release the delivery process, and give some other queue entry a chance 341 * to be delivered. When all recipients for a message have been tried, 342 * decide what to do next with this message: defer, bounce, delete. 343 */ 344 QMGR_DELIVER_RELEASE_AGENT(entry); 345 qmgr_entry_done(entry, QMGR_QUEUE_BUSY); 346} 347 348/* qmgr_deliver - deliver one per-site queue entry */ 349 350void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream) 351{ 352 QMGR_ENTRY *entry; 353 DSN dsn; 354 355 /* 356 * Find out if this delivery process is really available. Once elected, 357 * the delivery process is supposed to express its happiness. If there is 358 * a problem, wipe the pending deliveries for this transport. This 359 * routine runs in response to an external event, so it does not run 360 * while some other queue manipulation is happening. 361 */ 362 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) { 363#if 0 364 whatsup = concatenate(transport->name, 365 " mail transport unavailable", (char *) 0); 366 qmgr_transport_throttle(transport, 367 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 368 myfree(whatsup); 369#else 370 qmgr_transport_throttle(transport, 371 DSN_SIMPLE(&dsn, "4.3.0", 372 "mail transport unavailable")); 373#endif 374 qmgr_defer_transport(transport, &dsn); 375 if (stream) 376 (void) vstream_fclose(stream); 377 return; 378 } 379 380 /* 381 * Find a suitable queue entry. Things may have changed since this 382 * transport was allocated. If no suitable entry is found, 383 * unceremoniously disconnect from the delivery process. The delivery 384 * agent request reading routine is prepared for the queue manager to 385 * change its mind for no apparent reason. 386 */ 387 if ((entry = qmgr_job_entry_select(transport)) == 0) { 388 (void) vstream_fclose(stream); 389 return; 390 } 391 392 /* 393 * Send the queue file info and recipient info to the delivery process. 394 * If there is a problem, wipe the pending deliveries for this transport. 395 * This routine runs in response to an external event, so it does not run 396 * while some other queue manipulation is happening. 397 */ 398 if (qmgr_deliver_send_request(entry, stream) < 0) { 399 qmgr_entry_unselect(entry); 400#if 0 401 whatsup = concatenate(transport->name, 402 " mail transport unavailable", (char *) 0); 403 qmgr_transport_throttle(transport, 404 DSN_SIMPLE(&dsn, "4.3.0", whatsup)); 405 myfree(whatsup); 406#else 407 qmgr_transport_throttle(transport, 408 DSN_SIMPLE(&dsn, "4.3.0", 409 "mail transport unavailable")); 410#endif 411 qmgr_defer_transport(transport, &dsn); 412 /* warning: entry may be a dangling pointer here */ 413 (void) vstream_fclose(stream); 414 return; 415 } 416 417 /* 418 * If we get this far, go wait for the delivery status report. 419 */ 420 qmgr_deliver_concurrency++; 421 entry->stream = stream; 422 event_enable_read(vstream_fileno(stream), 423 qmgr_deliver_update, (char *) entry); 424 425 /* 426 * Guard against broken systems. 427 */ 428 event_request_timer(qmgr_deliver_abort, (char *) entry, var_daemon_timeout); 429} 430