1/*++ 2/* NAME 3/* qmgr_transport 3 4/* SUMMARY 5/* per-transport data structures 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* QMGR_TRANSPORT *qmgr_transport_create(name) 10/* const char *name; 11/* 12/* QMGR_TRANSPORT *qmgr_transport_find(name) 13/* const char *name; 14/* 15/* QMGR_TRANSPORT *qmgr_transport_select() 16/* 17/* void qmgr_transport_alloc(transport, notify) 18/* QMGR_TRANSPORT *transport; 19/* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); 20/* 21/* void qmgr_transport_throttle(transport, dsn) 22/* QMGR_TRANSPORT *transport; 23/* DSN *dsn; 24/* 25/* void qmgr_transport_unthrottle(transport) 26/* QMGR_TRANSPORT *transport; 27/* DESCRIPTION 28/* This module organizes the world by message transport type. 29/* Each transport can have zero or more destination queues 30/* associated with it. 31/* 32/* qmgr_transport_create() instantiates a data structure for the 33/* named transport type. 34/* 35/* qmgr_transport_find() looks up an existing message transport 36/* data structure. 37/* 38/* qmgr_transport_select() attempts to find a transport that 39/* has messages pending delivery. This routine implements 40/* round-robin search among transports. 41/* 42/* qmgr_transport_alloc() allocates a delivery process for the 43/* specified transport type. Allocation is performed asynchronously. 44/* When a process becomes available, the application callback routine 45/* is invoked with as arguments the transport and a stream that 46/* is connected to a delivery process. It is an error to call 47/* qmgr_transport_alloc() while delivery process allocation for 48/* the same transport is in progress. 49/* 50/* qmgr_transport_throttle blocks further allocation of delivery 51/* processes for the named transport. Attempts to throttle a 52/* throttled transport are ignored. 53/* 54/* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). 55/* Attempts to unthrottle a non-throttled transport are ignored. 56/* DIAGNOSTICS 57/* Panic: consistency check failure. Fatal: out of memory. 58/* LICENSE 59/* .ad 60/* .fi 61/* The Secure Mailer license must be distributed with this software. 62/* AUTHOR(S) 63/* Wietse Venema 64/* IBM T.J. Watson Research 65/* P.O. Box 704 66/* Yorktown Heights, NY 10598, USA 67/*--*/ 68 69/* System library. */ 70 71#include <sys_defs.h> 72#include <unistd.h> 73 74#include <sys/time.h> /* FD_SETSIZE */ 75#include <sys/types.h> /* FD_SETSIZE */ 76#include <unistd.h> /* FD_SETSIZE */ 77 78#ifdef USE_SYS_SELECT_H 79#include <sys/select.h> /* FD_SETSIZE */ 80#endif 81 82/* Utility library. */ 83 84#include <msg.h> 85#include <htable.h> 86#include <events.h> 87#include <mymalloc.h> 88#include <vstream.h> 89#include <iostuff.h> 90 91/* Global library. */ 92 93#include <mail_proto.h> 94#include <recipient_list.h> 95#include <mail_conf.h> 96#include <mail_params.h> 97 98/* Application-specific. */ 99 100#include "qmgr.h" 101 102HTABLE *qmgr_transport_byname; /* transport by name */ 103QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ 104 105 /* 106 * A local structure to remember a delivery process allocation request. 107 */ 108typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; 109 110struct QMGR_TRANSPORT_ALLOC { 111 QMGR_TRANSPORT *transport; /* transport context */ 112 VSTREAM *stream; /* delivery service stream */ 113 QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ 114}; 115 116 /* 117 * Connections to delivery agents are managed asynchronously. Each delivery 118 * agent connection goes through multiple wait states: 119 * 120 * - With Linux/Solaris and old queue manager implementations only, wait for 121 * the server to invoke accept(). 122 * 123 * - Wait for the delivery agent's announcement that it is ready to receive a 124 * delivery request. 125 * 126 * - Wait for the delivery request completion status. 127 * 128 * Older queue manager implementations had only one pending delivery agent 129 * connection per transport. With low-latency destinations, the output rates 130 * were reduced on Linux/Solaris systems that had the extra wait state. 131 * 132 * To maximize delivery agent output rates with low-latency destinations, the 133 * following changes were made to the queue manager by the end of the 2.4 134 * development cycle: 135 * 136 * - The Linux/Solaris accept() wait state was eliminated. 137 * 138 * - A pipeline was implemented for pending delivery agent connections. The 139 * number of pending delivery agent connections was increased from one to 140 * two: the number of before-delivery wait states, plus one extra pipeline 141 * slot to prevent the pipeline from stalling easily. Increasing the 142 * pipeline much further actually hurt performance. 143 * 144 * - To reduce queue manager disk competition with delivery agents, the queue 145 * scanning algorithm was modified to import only one message per interrupt. 146 * The incoming and deferred queue scans now happen on alternate interrupts. 147 * 148 * Simplistically reasoned, a non-zero (incoming + active) queue length is 149 * equivalent to a time shift for mail deliveries; this is undesirable when 150 * delivery agents are not fully utilized. 151 * 152 * On the other hand a non-empty active queue is what allows us to do clever 153 * things such as queue file prefetch, concurrency windows, and connection 154 * caching; the idea is that such "thinking time" is affordable only after 155 * the output channels are maxed out. 156 */ 157#ifndef QMGR_TRANSPORT_MAX_PEND 158#define QMGR_TRANSPORT_MAX_PEND 2 159#endif 160 161/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ 162 163static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context) 164{ 165 qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); 166} 167 168/* qmgr_transport_unthrottle - open the throttle */ 169 170void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) 171{ 172 const char *myname = "qmgr_transport_unthrottle"; 173 174 /* 175 * This routine runs after expiration of the timer set by 176 * qmgr_transport_throttle(), or whenever a delivery transport has been 177 * used without malfunction. In either case, we enable delivery again if 178 * the transport was blocked, otherwise the request is ignored. 179 */ 180 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { 181 if (msg_verbose) 182 msg_info("%s: transport %s", myname, transport->name); 183 transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; 184 if (transport->dsn == 0) 185 msg_panic("%s: transport %s: null reason", 186 myname, transport->name); 187 dsn_free(transport->dsn); 188 transport->dsn = 0; 189 event_cancel_timer(qmgr_transport_unthrottle_wrapper, 190 (char *) transport); 191 } 192} 193 194/* qmgr_transport_throttle - disable delivery process allocation */ 195 196void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) 197{ 198 const char *myname = "qmgr_transport_throttle"; 199 200 /* 201 * We are unable to connect to a deliver process for this type of message 202 * transport. Instead of hosing the system by retrying in a tight loop, 203 * back off and disable this transport type for a while. 204 */ 205 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { 206 if (msg_verbose) 207 msg_info("%s: transport %s: status: %s reason: %s", 208 myname, transport->name, dsn->status, dsn->reason); 209 transport->flags |= QMGR_TRANSPORT_STAT_DEAD; 210 if (transport->dsn) 211 msg_panic("%s: transport %s: spurious reason: %s", 212 myname, transport->name, transport->dsn->reason); 213 transport->dsn = DSN_COPY(dsn); 214 event_request_timer(qmgr_transport_unthrottle_wrapper, 215 (char *) transport, var_transport_retry_time); 216 } 217} 218 219/* qmgr_transport_abort - transport connect watchdog */ 220 221static void qmgr_transport_abort(int unused_event, char *context) 222{ 223 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 224 225 msg_fatal("timeout connecting to transport: %s", alloc->transport->name); 226} 227 228/* qmgr_transport_event - delivery process availability notice */ 229 230static void qmgr_transport_event(int unused_event, char *context) 231{ 232 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 233 234 /* 235 * This routine notifies the application when the request given to 236 * qmgr_transport_alloc() completes. 237 */ 238 if (msg_verbose) 239 msg_info("transport_event: %s", alloc->transport->name); 240 241 /* 242 * Connection request completed. Stop the watchdog timer. 243 */ 244 event_cancel_timer(qmgr_transport_abort, context); 245 246 /* 247 * Disable further read events that end up calling this function, and 248 * free up this pending connection pipeline slot. 249 */ 250 if (alloc->stream) { 251 event_disable_readwrite(vstream_fileno(alloc->stream)); 252 non_blocking(vstream_fileno(alloc->stream), BLOCKING); 253 } 254 alloc->transport->pending -= 1; 255 256 /* 257 * Notify the requestor. 258 */ 259 alloc->notify(alloc->transport, alloc->stream); 260 myfree((char *) alloc); 261} 262 263/* qmgr_transport_select - select transport for allocation */ 264 265QMGR_TRANSPORT *qmgr_transport_select(void) 266{ 267 QMGR_TRANSPORT *xport; 268 QMGR_QUEUE *queue; 269 int need; 270 271 /* 272 * If we find a suitable transport, rotate the list of transports to 273 * effectuate round-robin selection. See similar selection code in 274 * qmgr_queue_select(). 275 * 276 * This function is called repeatedly until all transports have maxed out 277 * the number of pending delivery agent connections, until all delivery 278 * agent concurrency windows are maxed out, or until we run out of "todo" 279 * queue entries. 280 */ 281#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) 282 283 for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { 284 if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 285 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) 286 continue; 287 need = xport->pending + 1; 288 for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { 289 if (QMGR_QUEUE_READY(queue) == 0) 290 continue; 291 if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, 292 queue->todo_refcount)) <= 0) { 293 QMGR_LIST_ROTATE(qmgr_transport_list, xport); 294 if (msg_verbose) 295 msg_info("qmgr_transport_select: %s", xport->name); 296 return (xport); 297 } 298 } 299 } 300 return (0); 301} 302 303/* qmgr_transport_alloc - allocate delivery process */ 304 305void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) 306{ 307 QMGR_TRANSPORT_ALLOC *alloc; 308 309 /* 310 * Sanity checks. 311 */ 312 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) 313 msg_panic("qmgr_transport: dead transport: %s", transport->name); 314 if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) 315 msg_panic("qmgr_transport: excess allocation: %s", transport->name); 316 317 /* 318 * Connect to the well-known port for this delivery service, and wake up 319 * when a process announces its availability. Allow only a limited number 320 * of delivery process allocation attempts for this transport. In case of 321 * problems, back off. Do not hose the system when it is in trouble 322 * already. 323 * 324 * Use non-blocking connect(), so that Linux won't block the queue manager 325 * until the delivery agent calls accept(). 326 * 327 * When the connection to delivery agent cannot be completed, notify the 328 * event handler so that it can throttle the transport and defer the todo 329 * queues, just like it does when communication fails *after* connection 330 * completion. 331 * 332 * Before Postfix 2.4, the event handler was not invoked after connect() 333 * error, and mail was not deferred. Because of this, mail would be stuck 334 * in the active queue after triggering a "connection refused" condition. 335 */ 336 alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); 337 alloc->transport = transport; 338 alloc->notify = notify; 339 transport->pending += 1; 340 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, 341 NON_BLOCKING)) == 0) { 342 msg_warn("connect to transport %s/%s: %m", 343 MAIL_CLASS_PRIVATE, transport->name); 344 event_request_timer(qmgr_transport_event, (char *) alloc, 0); 345 return; 346 } 347#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD) 348#ifndef THRESHOLD_FD_WORKAROUND 349#define THRESHOLD_FD_WORKAROUND 128 350#endif 351 vstream_control(alloc->stream, 352 VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND, 353 VSTREAM_CTL_END); 354#endif 355 event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, 356 (char *) alloc); 357 358 /* 359 * Guard against broken systems. 360 */ 361 event_request_timer(qmgr_transport_abort, (char *) alloc, 362 var_daemon_timeout); 363} 364 365/* qmgr_transport_create - create transport instance */ 366 367QMGR_TRANSPORT *qmgr_transport_create(const char *name) 368{ 369 QMGR_TRANSPORT *transport; 370 371 if (htable_find(qmgr_transport_byname, name) != 0) 372 msg_panic("qmgr_transport_create: transport exists: %s", name); 373 transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); 374 transport->flags = 0; 375 transport->pending = 0; 376 transport->name = mystrdup(name); 377 378 /* 379 * Use global configuration settings or transport-specific settings. 380 */ 381 transport->dest_concurrency_limit = 382 get_mail_conf_int2(name, _DEST_CON_LIMIT, 383 var_dest_con_limit, 0, 0); 384 transport->recipient_limit = 385 get_mail_conf_int2(name, _DEST_RCPT_LIMIT, 386 var_dest_rcpt_limit, 0, 0); 387 transport->init_dest_concurrency = 388 get_mail_conf_int2(name, _INIT_DEST_CON, 389 var_init_dest_concurrency, 1, 0); 390 transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, 391 var_dest_rate_delay, 392 's', 0, 0); 393 394 if (transport->rate_delay > 0) 395 transport->dest_concurrency_limit = 1; 396 if (transport->dest_concurrency_limit != 0 397 && transport->dest_concurrency_limit < transport->init_dest_concurrency) 398 transport->init_dest_concurrency = transport->dest_concurrency_limit; 399 400 transport->queue_byname = htable_create(0); 401 QMGR_LIST_INIT(transport->queue_list); 402 transport->dsn = 0; 403 qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, 404 VAR_CONC_POS_FDBACK, var_conc_pos_feedback); 405 qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, 406 VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); 407 transport->fail_cohort_limit = 408 get_mail_conf_int2(name, _CONC_COHORT_LIM, 409 var_conc_cohort_limit, 0, 0); 410 if (qmgr_transport_byname == 0) 411 qmgr_transport_byname = htable_create(10); 412 htable_enter(qmgr_transport_byname, name, (char *) transport); 413 QMGR_LIST_APPEND(qmgr_transport_list, transport); 414 if (msg_verbose) 415 msg_info("qmgr_transport_create: %s concurrency %d recipients %d", 416 transport->name, transport->dest_concurrency_limit, 417 transport->recipient_limit); 418 return (transport); 419} 420 421/* qmgr_transport_find - find transport instance */ 422 423QMGR_TRANSPORT *qmgr_transport_find(const char *name) 424{ 425 return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); 426} 427