1/*++ 2/* NAME 3/* qmgr_transport 3 4/* SUMMARY 5/* per-transport data structures 6/* SYNOPSIS 7/* #include "qmgr.h" 8/* 9/* QMGR_TRANSPORT *qmgr_transport_create(name) 10/* const char *name; 11/* 12/* QMGR_TRANSPORT *qmgr_transport_find(name) 13/* const char *name; 14/* 15/* QMGR_TRANSPORT *qmgr_transport_select() 16/* 17/* void qmgr_transport_alloc(transport, notify) 18/* QMGR_TRANSPORT *transport; 19/* void (*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp); 20/* 21/* void qmgr_transport_throttle(transport, dsn) 22/* QMGR_TRANSPORT *transport; 23/* DSN *dsn; 24/* 25/* void qmgr_transport_unthrottle(transport) 26/* QMGR_TRANSPORT *transport; 27/* DESCRIPTION 28/* This module organizes the world by message transport type. 29/* Each transport can have zero or more destination queues 30/* associated with it. 31/* 32/* qmgr_transport_create() instantiates a data structure for the 33/* named transport type. 34/* 35/* qmgr_transport_find() looks up an existing message transport 36/* data structure. 37/* 38/* qmgr_transport_select() attempts to find a transport that 39/* has messages pending delivery. This routine implements 40/* round-robin search among transports. 41/* 42/* qmgr_transport_alloc() allocates a delivery process for the 43/* specified transport type. Allocation is performed asynchronously. 44/* When a process becomes available, the application callback routine 45/* is invoked with as arguments the transport and a stream that 46/* is connected to a delivery process. It is an error to call 47/* qmgr_transport_alloc() while delivery process allocation for 48/* the same transport is in progress. 49/* 50/* qmgr_transport_throttle blocks further allocation of delivery 51/* processes for the named transport. Attempts to throttle a 52/* throttled transport are ignored. 53/* 54/* qmgr_transport_unthrottle() undoes qmgr_transport_throttle(). 55/* Attempts to unthrottle a non-throttled transport are ignored. 56/* DIAGNOSTICS 57/* Panic: consistency check failure. Fatal: out of memory. 58/* LICENSE 59/* .ad 60/* .fi 61/* The Secure Mailer license must be distributed with this software. 62/* AUTHOR(S) 63/* Wietse Venema 64/* IBM T.J. Watson Research 65/* P.O. Box 704 66/* Yorktown Heights, NY 10598, USA 67/* 68/* Preemptive scheduler enhancements: 69/* Patrik Rak 70/* Modra 6 71/* 155 00, Prague, Czech Republic 72/*--*/ 73 74/* System library. */ 75 76#include <sys_defs.h> 77#include <unistd.h> 78 79#include <sys/time.h> /* FD_SETSIZE */ 80#include <sys/types.h> /* FD_SETSIZE */ 81#include <unistd.h> /* FD_SETSIZE */ 82 83#ifdef USE_SYS_SELECT_H 84#include <sys/select.h> /* FD_SETSIZE */ 85#endif 86 87/* Utility library. */ 88 89#include <msg.h> 90#include <htable.h> 91#include <events.h> 92#include <mymalloc.h> 93#include <vstream.h> 94#include <iostuff.h> 95 96/* Global library. */ 97 98#include <mail_proto.h> 99#include <recipient_list.h> 100#include <mail_conf.h> 101#include <mail_params.h> 102 103/* Application-specific. */ 104 105#include "qmgr.h" 106 107HTABLE *qmgr_transport_byname; /* transport by name */ 108QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */ 109 110 /* 111 * A local structure to remember a delivery process allocation request. 112 */ 113typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC; 114 115struct QMGR_TRANSPORT_ALLOC { 116 QMGR_TRANSPORT *transport; /* transport context */ 117 VSTREAM *stream; /* delivery service stream */ 118 QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ 119}; 120 121 /* 122 * Connections to delivery agents are managed asynchronously. Each delivery 123 * agent connection goes through multiple wait states: 124 * 125 * - With Linux/Solaris and old queue manager implementations only, wait for 126 * the server to invoke accept(). 127 * 128 * - Wait for the delivery agent's announcement that it is ready to receive a 129 * delivery request. 130 * 131 * - Wait for the delivery request completion status. 132 * 133 * Older queue manager implementations had only one pending delivery agent 134 * connection per transport. With low-latency destinations, the output rates 135 * were reduced on Linux/Solaris systems that had the extra wait state. 136 * 137 * To maximize delivery agent output rates with low-latency destinations, the 138 * following changes were made to the queue manager by the end of the 2.4 139 * development cycle: 140 * 141 * - The Linux/Solaris accept() wait state was eliminated. 142 * 143 * - A pipeline was implemented for pending delivery agent connections. The 144 * number of pending delivery agent connections was increased from one to 145 * two: the number of before-delivery wait states, plus one extra pipeline 146 * slot to prevent the pipeline from stalling easily. Increasing the 147 * pipeline much further actually hurt performance. 148 * 149 * - To reduce queue manager disk competition with delivery agents, the queue 150 * scanning algorithm was modified to import only one message per interrupt. 151 * The incoming and deferred queue scans now happen on alternate interrupts. 152 * 153 * Simplistically reasoned, a non-zero (incoming + active) queue length is 154 * equivalent to a time shift for mail deliveries; this is undesirable when 155 * delivery agents are not fully utilized. 156 * 157 * On the other hand a non-empty active queue is what allows us to do clever 158 * things such as queue file prefetch, concurrency windows, and connection 159 * caching; the idea is that such "thinking time" is affordable only after 160 * the output channels are maxed out. 161 */ 162#ifndef QMGR_TRANSPORT_MAX_PEND 163#define QMGR_TRANSPORT_MAX_PEND 2 164#endif 165 166/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ 167 168static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context) 169{ 170 qmgr_transport_unthrottle((QMGR_TRANSPORT *) context); 171} 172 173/* qmgr_transport_unthrottle - open the throttle */ 174 175void qmgr_transport_unthrottle(QMGR_TRANSPORT *transport) 176{ 177 const char *myname = "qmgr_transport_unthrottle"; 178 179 /* 180 * This routine runs after expiration of the timer set by 181 * qmgr_transport_throttle(), or whenever a delivery transport has been 182 * used without malfunction. In either case, we enable delivery again if 183 * the transport was blocked, otherwise the request is ignored. 184 */ 185 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) { 186 if (msg_verbose) 187 msg_info("%s: transport %s", myname, transport->name); 188 transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD; 189 if (transport->dsn == 0) 190 msg_panic("%s: transport %s: null reason", 191 myname, transport->name); 192 dsn_free(transport->dsn); 193 transport->dsn = 0; 194 event_cancel_timer(qmgr_transport_unthrottle_wrapper, 195 (char *) transport); 196 } 197} 198 199/* qmgr_transport_throttle - disable delivery process allocation */ 200 201void qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn) 202{ 203 const char *myname = "qmgr_transport_throttle"; 204 205 /* 206 * We are unable to connect to a deliver process for this type of message 207 * transport. Instead of hosing the system by retrying in a tight loop, 208 * back off and disable this transport type for a while. 209 */ 210 if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) { 211 if (msg_verbose) 212 msg_info("%s: transport %s: status: %s reason: %s", 213 myname, transport->name, dsn->status, dsn->reason); 214 transport->flags |= QMGR_TRANSPORT_STAT_DEAD; 215 if (transport->dsn) 216 msg_panic("%s: transport %s: spurious reason: %s", 217 myname, transport->name, transport->dsn->reason); 218 transport->dsn = DSN_COPY(dsn); 219 event_request_timer(qmgr_transport_unthrottle_wrapper, 220 (char *) transport, var_transport_retry_time); 221 } 222} 223 224/* qmgr_transport_abort - transport connect watchdog */ 225 226static void qmgr_transport_abort(int unused_event, char *context) 227{ 228 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 229 230 msg_fatal("timeout connecting to transport: %s", alloc->transport->name); 231} 232 233/* qmgr_transport_event - delivery process availability notice */ 234 235static void qmgr_transport_event(int unused_event, char *context) 236{ 237 QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; 238 239 /* 240 * This routine notifies the application when the request given to 241 * qmgr_transport_alloc() completes. 242 */ 243 if (msg_verbose) 244 msg_info("transport_event: %s", alloc->transport->name); 245 246 /* 247 * Connection request completed. Stop the watchdog timer. 248 */ 249 event_cancel_timer(qmgr_transport_abort, context); 250 251 /* 252 * Disable further read events that end up calling this function, and 253 * free up this pending connection pipeline slot. 254 */ 255 if (alloc->stream) { 256 event_disable_readwrite(vstream_fileno(alloc->stream)); 257 non_blocking(vstream_fileno(alloc->stream), BLOCKING); 258 } 259 alloc->transport->pending -= 1; 260 261 /* 262 * Notify the requestor. 263 */ 264 alloc->notify(alloc->transport, alloc->stream); 265 myfree((char *) alloc); 266} 267 268/* qmgr_transport_select - select transport for allocation */ 269 270QMGR_TRANSPORT *qmgr_transport_select(void) 271{ 272 QMGR_TRANSPORT *xport; 273 QMGR_QUEUE *queue; 274 int need; 275 276 /* 277 * If we find a suitable transport, rotate the list of transports to 278 * effectuate round-robin selection. See similar selection code in 279 * qmgr_peer_select(). 280 * 281 * This function is called repeatedly until all transports have maxed out 282 * the number of pending delivery agent connections, until all delivery 283 * agent concurrency windows are maxed out, or until we run out of "todo" 284 * queue entries. 285 */ 286#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) 287 288 for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { 289 if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 290 || xport->pending >= QMGR_TRANSPORT_MAX_PEND) 291 continue; 292 need = xport->pending + 1; 293 for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { 294 if (QMGR_QUEUE_READY(queue) == 0) 295 continue; 296 if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, 297 queue->todo_refcount)) <= 0) { 298 QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers); 299 if (msg_verbose) 300 msg_info("qmgr_transport_select: %s", xport->name); 301 return (xport); 302 } 303 } 304 } 305 return (0); 306} 307 308/* qmgr_transport_alloc - allocate delivery process */ 309 310void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) 311{ 312 QMGR_TRANSPORT_ALLOC *alloc; 313 314 /* 315 * Sanity checks. 316 */ 317 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) 318 msg_panic("qmgr_transport: dead transport: %s", transport->name); 319 if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) 320 msg_panic("qmgr_transport: excess allocation: %s", transport->name); 321 322 /* 323 * Connect to the well-known port for this delivery service, and wake up 324 * when a process announces its availability. Allow only a limited number 325 * of delivery process allocation attempts for this transport. In case of 326 * problems, back off. Do not hose the system when it is in trouble 327 * already. 328 * 329 * Use non-blocking connect(), so that Linux won't block the queue manager 330 * until the delivery agent calls accept(). 331 * 332 * When the connection to delivery agent cannot be completed, notify the 333 * event handler so that it can throttle the transport and defer the todo 334 * queues, just like it does when communication fails *after* connection 335 * completion. 336 * 337 * Before Postfix 2.4, the event handler was not invoked after connect() 338 * error, and mail was not deferred. Because of this, mail would be stuck 339 * in the active queue after triggering a "connection refused" condition. 340 */ 341 alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); 342 alloc->transport = transport; 343 alloc->notify = notify; 344 transport->pending += 1; 345 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, 346 NON_BLOCKING)) == 0) { 347 msg_warn("connect to transport %s/%s: %m", 348 MAIL_CLASS_PRIVATE, transport->name); 349 event_request_timer(qmgr_transport_event, (char *) alloc, 0); 350 return; 351 } 352#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD) 353#ifndef THRESHOLD_FD_WORKAROUND 354#define THRESHOLD_FD_WORKAROUND 128 355#endif 356 vstream_control(alloc->stream, 357 VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND, 358 VSTREAM_CTL_END); 359#endif 360 event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, 361 (char *) alloc); 362 363 /* 364 * Guard against broken systems. 365 */ 366 event_request_timer(qmgr_transport_abort, (char *) alloc, 367 var_daemon_timeout); 368} 369 370/* qmgr_transport_create - create transport instance */ 371 372QMGR_TRANSPORT *qmgr_transport_create(const char *name) 373{ 374 QMGR_TRANSPORT *transport; 375 376 if (htable_find(qmgr_transport_byname, name) != 0) 377 msg_panic("qmgr_transport_create: transport exists: %s", name); 378 transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); 379 transport->flags = 0; 380 transport->pending = 0; 381 transport->name = mystrdup(name); 382 383 /* 384 * Use global configuration settings or transport-specific settings. 385 */ 386 transport->dest_concurrency_limit = 387 get_mail_conf_int2(name, _DEST_CON_LIMIT, 388 var_dest_con_limit, 0, 0); 389 transport->recipient_limit = 390 get_mail_conf_int2(name, _DEST_RCPT_LIMIT, 391 var_dest_rcpt_limit, 0, 0); 392 transport->init_dest_concurrency = 393 get_mail_conf_int2(name, _INIT_DEST_CON, 394 var_init_dest_concurrency, 1, 0); 395 transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY, 396 var_dest_rate_delay, 397 's', 0, 0); 398 399 if (transport->rate_delay > 0) 400 transport->dest_concurrency_limit = 1; 401 if (transport->dest_concurrency_limit != 0 402 && transport->dest_concurrency_limit < transport->init_dest_concurrency) 403 transport->init_dest_concurrency = transport->dest_concurrency_limit; 404 405 transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST, 406 var_delivery_slot_cost, 0, 0); 407 transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN, 408 var_delivery_slot_loan, 0, 0); 409 transport->slot_loan_factor = 410 100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT, 411 var_delivery_slot_discount, 0, 100); 412 transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS, 413 var_min_delivery_slots, 0, 0); 414 transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT, 415 var_xport_rcpt_limit, 0, 0); 416 transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT, 417 var_stack_rcpt_limit, 0, 0); 418 transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT, 419 var_xport_refill_limit, 1, 0); 420 transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY, 421 var_xport_refill_delay, 's', 1, 0); 422 423 transport->queue_byname = htable_create(0); 424 QMGR_LIST_INIT(transport->queue_list); 425 transport->job_byname = htable_create(0); 426 QMGR_LIST_INIT(transport->job_list); 427 QMGR_LIST_INIT(transport->job_bytime); 428 transport->job_current = 0; 429 transport->job_next_unread = 0; 430 transport->candidate_cache = 0; 431 transport->candidate_cache_current = 0; 432 transport->candidate_cache_time = (time_t) 0; 433 transport->blocker_tag = 1; 434 transport->dsn = 0; 435 qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK, 436 VAR_CONC_POS_FDBACK, var_conc_pos_feedback); 437 qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK, 438 VAR_CONC_NEG_FDBACK, var_conc_neg_feedback); 439 transport->fail_cohort_limit = 440 get_mail_conf_int2(name, _CONC_COHORT_LIM, 441 var_conc_cohort_limit, 0, 0); 442 if (qmgr_transport_byname == 0) 443 qmgr_transport_byname = htable_create(10); 444 htable_enter(qmgr_transport_byname, name, (char *) transport); 445 QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers); 446 if (msg_verbose) 447 msg_info("qmgr_transport_create: %s concurrency %d recipients %d", 448 transport->name, transport->dest_concurrency_limit, 449 transport->recipient_limit); 450 return (transport); 451} 452 453/* qmgr_transport_find - find transport instance */ 454 455QMGR_TRANSPORT *qmgr_transport_find(const char *name) 456{ 457 return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name)); 458} 459