1/*++
2/* NAME
3/*	qmgr_transport 3
4/* SUMMARY
5/*	per-transport data structures
6/* SYNOPSIS
7/*	#include "qmgr.h"
8/*
9/*	QMGR_TRANSPORT *qmgr_transport_create(name)
10/*	const char *name;
11/*
12/*	QMGR_TRANSPORT *qmgr_transport_find(name)
13/*	const char *name;
14/*
15/*	QMGR_TRANSPORT *qmgr_transport_select()
16/*
17/*	void	qmgr_transport_alloc(transport, notify)
18/*	QMGR_TRANSPORT *transport;
19/*	void	(*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
20/*
21/*	void	qmgr_transport_throttle(transport, dsn)
22/*	QMGR_TRANSPORT *transport;
23/*	DSN	*dsn;
24/*
25/*	void	qmgr_transport_unthrottle(transport)
26/*	QMGR_TRANSPORT *transport;
27/* DESCRIPTION
28/*	This module organizes the world by message transport type.
29/*	Each transport can have zero or more destination queues
30/*	associated with it.
31/*
32/*	qmgr_transport_create() instantiates a data structure for the
33/*	named transport type.
34/*
35/*	qmgr_transport_find() looks up an existing message transport
36/*	data structure.
37/*
38/*	qmgr_transport_select() attempts to find a transport that
39/*	has messages pending delivery.  This routine implements
40/*	round-robin search among transports.
41/*
42/*	qmgr_transport_alloc() allocates a delivery process for the
43/*	specified transport type. Allocation is performed asynchronously.
44/*	When a process becomes available, the application callback routine
45/*	is invoked with as arguments the transport and a stream that
46/*	is connected to a delivery process. It is an error to call
47/*	qmgr_transport_alloc() while delivery process allocation for
48/*	the same transport is in progress.
49/*
50/*	qmgr_transport_throttle blocks further allocation of delivery
51/*	processes for the named transport. Attempts to throttle a
52/*	throttled transport are ignored.
53/*
54/*	qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
55/*	Attempts to unthrottle a non-throttled transport are ignored.
56/* DIAGNOSTICS
57/*	Panic: consistency check failure. Fatal: out of memory.
58/* LICENSE
59/* .ad
60/* .fi
61/*	The Secure Mailer license must be distributed with this software.
62/* AUTHOR(S)
63/*	Wietse Venema
64/*	IBM T.J. Watson Research
65/*	P.O. Box 704
66/*	Yorktown Heights, NY 10598, USA
67/*--*/
68
69/* System library. */
70
71#include <sys_defs.h>
72#include <unistd.h>
73
74#include <sys/time.h>			/* FD_SETSIZE */
75#include <sys/types.h>			/* FD_SETSIZE */
76#include <unistd.h>			/* FD_SETSIZE */
77
78#ifdef USE_SYS_SELECT_H
79#include <sys/select.h>			/* FD_SETSIZE */
80#endif
81
82/* Utility library. */
83
84#include <msg.h>
85#include <htable.h>
86#include <events.h>
87#include <mymalloc.h>
88#include <vstream.h>
89#include <iostuff.h>
90
91/* Global library. */
92
93#include <mail_proto.h>
94#include <recipient_list.h>
95#include <mail_conf.h>
96#include <mail_params.h>
97
98/* Application-specific. */
99
100#include "qmgr.h"
101
102HTABLE *qmgr_transport_byname;		/* transport by name */
103QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
104
105 /*
106  * A local structure to remember a delivery process allocation request.
107  */
108typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
109
110struct QMGR_TRANSPORT_ALLOC {
111    QMGR_TRANSPORT *transport;		/* transport context */
112    VSTREAM *stream;			/* delivery service stream */
113    QMGR_TRANSPORT_ALLOC_NOTIFY notify;	/* application call-back routine */
114};
115
116 /*
117  * Connections to delivery agents are managed asynchronously. Each delivery
118  * agent connection goes through multiple wait states:
119  *
120  * - With Linux/Solaris and old queue manager implementations only, wait for
121  * the server to invoke accept().
122  *
123  * - Wait for the delivery agent's announcement that it is ready to receive a
124  * delivery request.
125  *
126  * - Wait for the delivery request completion status.
127  *
128  * Older queue manager implementations had only one pending delivery agent
129  * connection per transport. With low-latency destinations, the output rates
130  * were reduced on Linux/Solaris systems that had the extra wait state.
131  *
132  * To maximize delivery agent output rates with low-latency destinations, the
133  * following changes were made to the queue manager by the end of the 2.4
134  * development cycle:
135  *
136  * - The Linux/Solaris accept() wait state was eliminated.
137  *
138  * - A pipeline was implemented for pending delivery agent connections. The
139  * number of pending delivery agent connections was increased from one to
140  * two: the number of before-delivery wait states, plus one extra pipeline
141  * slot to prevent the pipeline from stalling easily. Increasing the
142  * pipeline much further actually hurt performance.
143  *
144  * - To reduce queue manager disk competition with delivery agents, the queue
145  * scanning algorithm was modified to import only one message per interrupt.
146  * The incoming and deferred queue scans now happen on alternate interrupts.
147  *
148  * Simplistically reasoned, a non-zero (incoming + active) queue length is
149  * equivalent to a time shift for mail deliveries; this is undesirable when
150  * delivery agents are not fully utilized.
151  *
152  * On the other hand a non-empty active queue is what allows us to do clever
153  * things such as queue file prefetch, concurrency windows, and connection
154  * caching; the idea is that such "thinking time" is affordable only after
155  * the output channels are maxed out.
156  */
157#ifndef QMGR_TRANSPORT_MAX_PEND
158#define QMGR_TRANSPORT_MAX_PEND	2
159#endif
160
161/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
162
163static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
164{
165    qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
166}
167
168/* qmgr_transport_unthrottle - open the throttle */
169
170void    qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
171{
172    const char *myname = "qmgr_transport_unthrottle";
173
174    /*
175     * This routine runs after expiration of the timer set by
176     * qmgr_transport_throttle(), or whenever a delivery transport has been
177     * used without malfunction. In either case, we enable delivery again if
178     * the transport was blocked, otherwise the request is ignored.
179     */
180    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
181	if (msg_verbose)
182	    msg_info("%s: transport %s", myname, transport->name);
183	transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
184	if (transport->dsn == 0)
185	    msg_panic("%s: transport %s: null reason",
186		      myname, transport->name);
187	dsn_free(transport->dsn);
188	transport->dsn = 0;
189	event_cancel_timer(qmgr_transport_unthrottle_wrapper,
190			   (char *) transport);
191    }
192}
193
194/* qmgr_transport_throttle - disable delivery process allocation */
195
196void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
197{
198    const char *myname = "qmgr_transport_throttle";
199
200    /*
201     * We are unable to connect to a deliver process for this type of message
202     * transport. Instead of hosing the system by retrying in a tight loop,
203     * back off and disable this transport type for a while.
204     */
205    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
206	if (msg_verbose)
207	    msg_info("%s: transport %s: status: %s reason: %s",
208		     myname, transport->name, dsn->status, dsn->reason);
209	transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
210	if (transport->dsn)
211	    msg_panic("%s: transport %s: spurious reason: %s",
212		      myname, transport->name, transport->dsn->reason);
213	transport->dsn = DSN_COPY(dsn);
214	event_request_timer(qmgr_transport_unthrottle_wrapper,
215			    (char *) transport, var_transport_retry_time);
216    }
217}
218
219/* qmgr_transport_abort - transport connect watchdog */
220
221static void qmgr_transport_abort(int unused_event, char *context)
222{
223    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
224
225    msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
226}
227
228/* qmgr_transport_event - delivery process availability notice */
229
230static void qmgr_transport_event(int unused_event, char *context)
231{
232    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
233
234    /*
235     * This routine notifies the application when the request given to
236     * qmgr_transport_alloc() completes.
237     */
238    if (msg_verbose)
239	msg_info("transport_event: %s", alloc->transport->name);
240
241    /*
242     * Connection request completed. Stop the watchdog timer.
243     */
244    event_cancel_timer(qmgr_transport_abort, context);
245
246    /*
247     * Disable further read events that end up calling this function, and
248     * free up this pending connection pipeline slot.
249     */
250    if (alloc->stream) {
251	event_disable_readwrite(vstream_fileno(alloc->stream));
252	non_blocking(vstream_fileno(alloc->stream), BLOCKING);
253    }
254    alloc->transport->pending -= 1;
255
256    /*
257     * Notify the requestor.
258     */
259    alloc->notify(alloc->transport, alloc->stream);
260    myfree((char *) alloc);
261}
262
263/* qmgr_transport_select - select transport for allocation */
264
265QMGR_TRANSPORT *qmgr_transport_select(void)
266{
267    QMGR_TRANSPORT *xport;
268    QMGR_QUEUE *queue;
269    int     need;
270
271    /*
272     * If we find a suitable transport, rotate the list of transports to
273     * effectuate round-robin selection. See similar selection code in
274     * qmgr_queue_select().
275     *
276     * This function is called repeatedly until all transports have maxed out
277     * the number of pending delivery agent connections, until all delivery
278     * agent concurrency windows are maxed out, or until we run out of "todo"
279     * queue entries.
280     */
281#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
282
283    for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
284	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
285	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
286	    continue;
287	need = xport->pending + 1;
288	for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
289	    if (QMGR_QUEUE_READY(queue) == 0)
290		continue;
291	    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
292					  queue->todo_refcount)) <= 0) {
293		QMGR_LIST_ROTATE(qmgr_transport_list, xport);
294		if (msg_verbose)
295		    msg_info("qmgr_transport_select: %s", xport->name);
296		return (xport);
297	    }
298	}
299    }
300    return (0);
301}
302
303/* qmgr_transport_alloc - allocate delivery process */
304
305void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
306{
307    QMGR_TRANSPORT_ALLOC *alloc;
308
309    /*
310     * Sanity checks.
311     */
312    if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
313	msg_panic("qmgr_transport: dead transport: %s", transport->name);
314    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
315	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
316
317    /*
318     * Connect to the well-known port for this delivery service, and wake up
319     * when a process announces its availability. Allow only a limited number
320     * of delivery process allocation attempts for this transport. In case of
321     * problems, back off. Do not hose the system when it is in trouble
322     * already.
323     *
324     * Use non-blocking connect(), so that Linux won't block the queue manager
325     * until the delivery agent calls accept().
326     *
327     * When the connection to delivery agent cannot be completed, notify the
328     * event handler so that it can throttle the transport and defer the todo
329     * queues, just like it does when communication fails *after* connection
330     * completion.
331     *
332     * Before Postfix 2.4, the event handler was not invoked after connect()
333     * error, and mail was not deferred. Because of this, mail would be stuck
334     * in the active queue after triggering a "connection refused" condition.
335     */
336    alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
337    alloc->transport = transport;
338    alloc->notify = notify;
339    transport->pending += 1;
340    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
341				      NON_BLOCKING)) == 0) {
342	msg_warn("connect to transport %s/%s: %m",
343		 MAIL_CLASS_PRIVATE, transport->name);
344	event_request_timer(qmgr_transport_event, (char *) alloc, 0);
345	return;
346    }
347#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
348#ifndef THRESHOLD_FD_WORKAROUND
349#define THRESHOLD_FD_WORKAROUND 128
350#endif
351    vstream_control(alloc->stream,
352		    VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
353		    VSTREAM_CTL_END);
354#endif
355    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
356		      (char *) alloc);
357
358    /*
359     * Guard against broken systems.
360     */
361    event_request_timer(qmgr_transport_abort, (char *) alloc,
362			var_daemon_timeout);
363}
364
365/* qmgr_transport_create - create transport instance */
366
367QMGR_TRANSPORT *qmgr_transport_create(const char *name)
368{
369    QMGR_TRANSPORT *transport;
370
371    if (htable_find(qmgr_transport_byname, name) != 0)
372	msg_panic("qmgr_transport_create: transport exists: %s", name);
373    transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
374    transport->flags = 0;
375    transport->pending = 0;
376    transport->name = mystrdup(name);
377
378    /*
379     * Use global configuration settings or transport-specific settings.
380     */
381    transport->dest_concurrency_limit =
382	get_mail_conf_int2(name, _DEST_CON_LIMIT,
383			   var_dest_con_limit, 0, 0);
384    transport->recipient_limit =
385	get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
386			   var_dest_rcpt_limit, 0, 0);
387    transport->init_dest_concurrency =
388	get_mail_conf_int2(name, _INIT_DEST_CON,
389			   var_init_dest_concurrency, 1, 0);
390    transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
391						var_dest_rate_delay,
392						's', 0, 0);
393
394    if (transport->rate_delay > 0)
395	transport->dest_concurrency_limit = 1;
396    if (transport->dest_concurrency_limit != 0
397    && transport->dest_concurrency_limit < transport->init_dest_concurrency)
398	transport->init_dest_concurrency = transport->dest_concurrency_limit;
399
400    transport->queue_byname = htable_create(0);
401    QMGR_LIST_INIT(transport->queue_list);
402    transport->dsn = 0;
403    qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
404		       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
405    qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
406		       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
407    transport->fail_cohort_limit =
408	get_mail_conf_int2(name, _CONC_COHORT_LIM,
409			   var_conc_cohort_limit, 0, 0);
410    if (qmgr_transport_byname == 0)
411	qmgr_transport_byname = htable_create(10);
412    htable_enter(qmgr_transport_byname, name, (char *) transport);
413    QMGR_LIST_APPEND(qmgr_transport_list, transport);
414    if (msg_verbose)
415	msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
416		 transport->name, transport->dest_concurrency_limit,
417		 transport->recipient_limit);
418    return (transport);
419}
420
421/* qmgr_transport_find - find transport instance */
422
423QMGR_TRANSPORT *qmgr_transport_find(const char *name)
424{
425    return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
426}
427