1/*++
2/* NAME
3/*	qmgr_transport 3
4/* SUMMARY
5/*	per-transport data structures
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	QMGR_TRANSPORT *qmgr_transport_create(name)
10/*	const char *name;
11/*
12/*	QMGR_TRANSPORT *qmgr_transport_find(name)
13/*	const char *name;
14/*
15/*	QMGR_TRANSPORT *qmgr_transport_select()
16/*
17/*	void	qmgr_transport_alloc(transport, notify)
18/*	QMGR_TRANSPORT *transport;
19/*	void	(*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
20/*
21/*	void	qmgr_transport_throttle(transport, dsn)
22/*	QMGR_TRANSPORT *transport;
23/*	DSN	*dsn;
24/*
25/*	void	qmgr_transport_unthrottle(transport)
26/*	QMGR_TRANSPORT *transport;
27/* DESCRIPTION
28/*	This module organizes the world by message transport type.
29/*	Each transport can have zero or more destination queues
30/*	associated with it.
31/*
32/*	qmgr_transport_create() instantiates a data structure for the
33/*	named transport type.
34/*
35/*	qmgr_transport_find() looks up an existing message transport
36/*	data structure.
37/*
38/*	qmgr_transport_select() attempts to find a transport that
39/*	has messages pending delivery.  This routine implements
40/*	round-robin search among transports.
41/*
42/*	qmgr_transport_alloc() allocates a delivery process for the
43/*	specified transport type. Allocation is performed asynchronously.
44/*	When a process becomes available, the application callback routine
45/*	is invoked with as arguments the transport and a stream that
46/*	is connected to a delivery process. It is an error to call
47/*	qmgr_transport_alloc() while delivery process allocation for
48/*	the same transport is in progress.
49/*
50/*	qmgr_transport_throttle blocks further allocation of delivery
51/*	processes for the named transport. Attempts to throttle a
52/*	throttled transport are ignored.
53/*
54/*	qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
55/*	Attempts to unthrottle a non-throttled transport are ignored.
56/* DIAGNOSTICS
57/*	Panic: consistency check failure. Fatal: out of memory.
58/* LICENSE
59/* .ad
60/* .fi
61/*	The Secure Mailer license must be distributed with this software.
62/* AUTHOR(S)
63/*	Wietse Venema
64/*	IBM T.J. Watson Research
65/*	P.O. Box 704
66/*	Yorktown Heights, NY 10598, USA
67/*
68/*	Preemptive scheduler enhancements:
69/*	Patrik Rak
70/*	Modra 6
71/*	155 00, Prague, Czech Republic
72/*--*/
73
74/* System library. */
75
76#include <sys_defs.h>
77#include <unistd.h>
78
79#include <sys/time.h>			/* FD_SETSIZE */
80#include <sys/types.h>			/* FD_SETSIZE */
81#include <unistd.h>			/* FD_SETSIZE */
82
83#ifdef USE_SYS_SELECT_H
84#include <sys/select.h>			/* FD_SETSIZE */
85#endif
86
87/* Utility library. */
88
89#include <msg.h>
90#include <htable.h>
91#include <events.h>
92#include <mymalloc.h>
93#include <vstream.h>
94#include <iostuff.h>
95
96/* Global library. */
97
98#include <mail_proto.h>
99#include <recipient_list.h>
100#include <mail_conf.h>
101#include <mail_params.h>
102
103/* Application-specific. */
104
105#include "qmgr.h"
106
107HTABLE *qmgr_transport_byname;		/* transport by name */
108QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
109
110 /*
111  * A local structure to remember a delivery process allocation request.
112  */
113typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
114
115struct QMGR_TRANSPORT_ALLOC {
116    QMGR_TRANSPORT *transport;		/* transport context */
117    VSTREAM *stream;			/* delivery service stream */
118    QMGR_TRANSPORT_ALLOC_NOTIFY notify;	/* application call-back routine */
119};
120
121 /*
122  * Connections to delivery agents are managed asynchronously. Each delivery
123  * agent connection goes through multiple wait states:
124  *
125  * - With Linux/Solaris and old queue manager implementations only, wait for
126  * the server to invoke accept().
127  *
128  * - Wait for the delivery agent's announcement that it is ready to receive a
129  * delivery request.
130  *
131  * - Wait for the delivery request completion status.
132  *
133  * Older queue manager implementations had only one pending delivery agent
134  * connection per transport. With low-latency destinations, the output rates
135  * were reduced on Linux/Solaris systems that had the extra wait state.
136  *
137  * To maximize delivery agent output rates with low-latency destinations, the
138  * following changes were made to the queue manager by the end of the 2.4
139  * development cycle:
140  *
141  * - The Linux/Solaris accept() wait state was eliminated.
142  *
143  * - A pipeline was implemented for pending delivery agent connections. The
144  * number of pending delivery agent connections was increased from one to
145  * two: the number of before-delivery wait states, plus one extra pipeline
146  * slot to prevent the pipeline from stalling easily. Increasing the
147  * pipeline much further actually hurt performance.
148  *
149  * - To reduce queue manager disk competition with delivery agents, the queue
150  * scanning algorithm was modified to import only one message per interrupt.
151  * The incoming and deferred queue scans now happen on alternate interrupts.
152  *
153  * Simplistically reasoned, a non-zero (incoming + active) queue length is
154  * equivalent to a time shift for mail deliveries; this is undesirable when
155  * delivery agents are not fully utilized.
156  *
157  * On the other hand a non-empty active queue is what allows us to do clever
158  * things such as queue file prefetch, concurrency windows, and connection
159  * caching; the idea is that such "thinking time" is affordable only after
160  * the output channels are maxed out.
161  */
162#ifndef QMGR_TRANSPORT_MAX_PEND
163#define QMGR_TRANSPORT_MAX_PEND	2
164#endif
165
166/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
167
168static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
169{
170    qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
171}
172
173/* qmgr_transport_unthrottle - open the throttle */
174
175void    qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
176{
177    const char *myname = "qmgr_transport_unthrottle";
178
179    /*
180     * This routine runs after expiration of the timer set by
181     * qmgr_transport_throttle(), or whenever a delivery transport has been
182     * used without malfunction. In either case, we enable delivery again if
183     * the transport was blocked, otherwise the request is ignored.
184     */
185    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
186	if (msg_verbose)
187	    msg_info("%s: transport %s", myname, transport->name);
188	transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
189	if (transport->dsn == 0)
190	    msg_panic("%s: transport %s: null reason",
191		      myname, transport->name);
192	dsn_free(transport->dsn);
193	transport->dsn = 0;
194	event_cancel_timer(qmgr_transport_unthrottle_wrapper,
195			   (char *) transport);
196    }
197}
198
199/* qmgr_transport_throttle - disable delivery process allocation */
200
201void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
202{
203    const char *myname = "qmgr_transport_throttle";
204
205    /*
206     * We are unable to connect to a deliver process for this type of message
207     * transport. Instead of hosing the system by retrying in a tight loop,
208     * back off and disable this transport type for a while.
209     */
210    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
211	if (msg_verbose)
212	    msg_info("%s: transport %s: status: %s reason: %s",
213		     myname, transport->name, dsn->status, dsn->reason);
214	transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
215	if (transport->dsn)
216	    msg_panic("%s: transport %s: spurious reason: %s",
217		      myname, transport->name, transport->dsn->reason);
218	transport->dsn = DSN_COPY(dsn);
219	event_request_timer(qmgr_transport_unthrottle_wrapper,
220			    (char *) transport, var_transport_retry_time);
221    }
222}
223
224/* qmgr_transport_abort - transport connect watchdog */
225
226static void qmgr_transport_abort(int unused_event, char *context)
227{
228    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
229
230    msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
231}
232
233/* qmgr_transport_event - delivery process availability notice */
234
235static void qmgr_transport_event(int unused_event, char *context)
236{
237    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
238
239    /*
240     * This routine notifies the application when the request given to
241     * qmgr_transport_alloc() completes.
242     */
243    if (msg_verbose)
244	msg_info("transport_event: %s", alloc->transport->name);
245
246    /*
247     * Connection request completed. Stop the watchdog timer.
248     */
249    event_cancel_timer(qmgr_transport_abort, context);
250
251    /*
252     * Disable further read events that end up calling this function, and
253     * free up this pending connection pipeline slot.
254     */
255    if (alloc->stream) {
256	event_disable_readwrite(vstream_fileno(alloc->stream));
257	non_blocking(vstream_fileno(alloc->stream), BLOCKING);
258    }
259    alloc->transport->pending -= 1;
260
261    /*
262     * Notify the requestor.
263     */
264    alloc->notify(alloc->transport, alloc->stream);
265    myfree((char *) alloc);
266}
267
268/* qmgr_transport_select - select transport for allocation */
269
270QMGR_TRANSPORT *qmgr_transport_select(void)
271{
272    QMGR_TRANSPORT *xport;
273    QMGR_QUEUE *queue;
274    int     need;
275
276    /*
277     * If we find a suitable transport, rotate the list of transports to
278     * effectuate round-robin selection. See similar selection code in
279     * qmgr_peer_select().
280     *
281     * This function is called repeatedly until all transports have maxed out
282     * the number of pending delivery agent connections, until all delivery
283     * agent concurrency windows are maxed out, or until we run out of "todo"
284     * queue entries.
285     */
286#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
287
288    for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
289	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
290	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
291	    continue;
292	need = xport->pending + 1;
293	for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
294	    if (QMGR_QUEUE_READY(queue) == 0)
295		continue;
296	    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
297					  queue->todo_refcount)) <= 0) {
298		QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
299		if (msg_verbose)
300		    msg_info("qmgr_transport_select: %s", xport->name);
301		return (xport);
302	    }
303	}
304    }
305    return (0);
306}
307
308/* qmgr_transport_alloc - allocate delivery process */
309
310void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
311{
312    QMGR_TRANSPORT_ALLOC *alloc;
313
314    /*
315     * Sanity checks.
316     */
317    if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
318	msg_panic("qmgr_transport: dead transport: %s", transport->name);
319    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
320	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
321
322    /*
323     * Connect to the well-known port for this delivery service, and wake up
324     * when a process announces its availability. Allow only a limited number
325     * of delivery process allocation attempts for this transport. In case of
326     * problems, back off. Do not hose the system when it is in trouble
327     * already.
328     *
329     * Use non-blocking connect(), so that Linux won't block the queue manager
330     * until the delivery agent calls accept().
331     *
332     * When the connection to delivery agent cannot be completed, notify the
333     * event handler so that it can throttle the transport and defer the todo
334     * queues, just like it does when communication fails *after* connection
335     * completion.
336     *
337     * Before Postfix 2.4, the event handler was not invoked after connect()
338     * error, and mail was not deferred. Because of this, mail would be stuck
339     * in the active queue after triggering a "connection refused" condition.
340     */
341    alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
342    alloc->transport = transport;
343    alloc->notify = notify;
344    transport->pending += 1;
345    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
346				      NON_BLOCKING)) == 0) {
347	msg_warn("connect to transport %s/%s: %m",
348		 MAIL_CLASS_PRIVATE, transport->name);
349	event_request_timer(qmgr_transport_event, (char *) alloc, 0);
350	return;
351    }
352#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
353#ifndef THRESHOLD_FD_WORKAROUND
354#define THRESHOLD_FD_WORKAROUND 128
355#endif
356    vstream_control(alloc->stream,
357		    VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
358		    VSTREAM_CTL_END);
359#endif
360    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
361		      (char *) alloc);
362
363    /*
364     * Guard against broken systems.
365     */
366    event_request_timer(qmgr_transport_abort, (char *) alloc,
367			var_daemon_timeout);
368}
369
370/* qmgr_transport_create - create transport instance */
371
372QMGR_TRANSPORT *qmgr_transport_create(const char *name)
373{
374    QMGR_TRANSPORT *transport;
375
376    if (htable_find(qmgr_transport_byname, name) != 0)
377	msg_panic("qmgr_transport_create: transport exists: %s", name);
378    transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
379    transport->flags = 0;
380    transport->pending = 0;
381    transport->name = mystrdup(name);
382
383    /*
384     * Use global configuration settings or transport-specific settings.
385     */
386    transport->dest_concurrency_limit =
387	get_mail_conf_int2(name, _DEST_CON_LIMIT,
388			   var_dest_con_limit, 0, 0);
389    transport->recipient_limit =
390	get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
391			   var_dest_rcpt_limit, 0, 0);
392    transport->init_dest_concurrency =
393	get_mail_conf_int2(name, _INIT_DEST_CON,
394			   var_init_dest_concurrency, 1, 0);
395    transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
396						var_dest_rate_delay,
397						's', 0, 0);
398
399    if (transport->rate_delay > 0)
400	transport->dest_concurrency_limit = 1;
401    if (transport->dest_concurrency_limit != 0
402    && transport->dest_concurrency_limit < transport->init_dest_concurrency)
403	transport->init_dest_concurrency = transport->dest_concurrency_limit;
404
405    transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST,
406					      var_delivery_slot_cost, 0, 0);
407    transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN,
408					      var_delivery_slot_loan, 0, 0);
409    transport->slot_loan_factor =
410	100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT,
411				 var_delivery_slot_discount, 0, 100);
412    transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS,
413					      var_min_delivery_slots, 0, 0);
414    transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT,
415						var_xport_rcpt_limit, 0, 0);
416    transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
417						var_stack_rcpt_limit, 0, 0);
418    transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
419					      var_xport_refill_limit, 1, 0);
420    transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
421					 var_xport_refill_delay, 's', 1, 0);
422
423    transport->queue_byname = htable_create(0);
424    QMGR_LIST_INIT(transport->queue_list);
425    transport->job_byname = htable_create(0);
426    QMGR_LIST_INIT(transport->job_list);
427    QMGR_LIST_INIT(transport->job_bytime);
428    transport->job_current = 0;
429    transport->job_next_unread = 0;
430    transport->candidate_cache = 0;
431    transport->candidate_cache_current = 0;
432    transport->candidate_cache_time = (time_t) 0;
433    transport->blocker_tag = 1;
434    transport->dsn = 0;
435    qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
436		       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
437    qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
438		       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
439    transport->fail_cohort_limit =
440	get_mail_conf_int2(name, _CONC_COHORT_LIM,
441			   var_conc_cohort_limit, 0, 0);
442    if (qmgr_transport_byname == 0)
443	qmgr_transport_byname = htable_create(10);
444    htable_enter(qmgr_transport_byname, name, (char *) transport);
445    QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers);
446    if (msg_verbose)
447	msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
448		 transport->name, transport->dest_concurrency_limit,
449		 transport->recipient_limit);
450    return (transport);
451}
452
453/* qmgr_transport_find - find transport instance */
454
455QMGR_TRANSPORT *qmgr_transport_find(const char *name)
456{
457    return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
458}
459