qmgr_transport.c revision 1.1.1.1
1/*	$NetBSD: qmgr_transport.c,v 1.1.1.1 2009/06/23 10:08:52 tron Exp $	*/
2
3/*++
4/* NAME
5/*	qmgr_transport 3
6/* SUMMARY
7/*	per-transport data structures
8/* SYNOPSIS
9/*	#include "qmgr.h"
10/*
11/*	QMGR_TRANSPORT *qmgr_transport_create(name)
12/*	const char *name;
13/*
14/*	QMGR_TRANSPORT *qmgr_transport_find(name)
15/*	const char *name;
16/*
17/*	QMGR_TRANSPORT *qmgr_transport_select()
18/*
19/*	void	qmgr_transport_alloc(transport, notify)
20/*	QMGR_TRANSPORT *transport;
21/*	void	(*notify)(QMGR_TRANSPORT *transport, VSTREAM *fp);
22/*
23/*	void	qmgr_transport_throttle(transport, dsn)
24/*	QMGR_TRANSPORT *transport;
25/*	DSN	*dsn;
26/*
27/*	void	qmgr_transport_unthrottle(transport)
28/*	QMGR_TRANSPORT *transport;
29/* DESCRIPTION
30/*	This module organizes the world by message transport type.
31/*	Each transport can have zero or more destination queues
32/*	associated with it.
33/*
34/*	qmgr_transport_create() instantiates a data structure for the
35/*	named transport type.
36/*
37/*	qmgr_transport_find() looks up an existing message transport
38/*	data structure.
39/*
40/*	qmgr_transport_select() attempts to find a transport that
41/*	has messages pending delivery.  This routine implements
42/*	round-robin search among transports.
43/*
44/*	qmgr_transport_alloc() allocates a delivery process for the
45/*	specified transport type. Allocation is performed asynchronously.
46/*	When a process becomes available, the application callback routine
47/*	is invoked with as arguments the transport and a stream that
48/*	is connected to a delivery process. It is an error to call
49/*	qmgr_transport_alloc() while delivery process allocation for
50/*	the same transport is in progress.
51/*
52/*	qmgr_transport_throttle blocks further allocation of delivery
53/*	processes for the named transport. Attempts to throttle a
54/*	throttled transport are ignored.
55/*
56/*	qmgr_transport_unthrottle() undoes qmgr_transport_throttle().
57/*	Attempts to unthrottle a non-throttled transport are ignored.
58/* DIAGNOSTICS
59/*	Panic: consistency check failure. Fatal: out of memory.
60/* LICENSE
61/* .ad
62/* .fi
63/*	The Secure Mailer license must be distributed with this software.
64/* AUTHOR(S)
65/*	Wietse Venema
66/*	IBM T.J. Watson Research
67/*	P.O. Box 704
68/*	Yorktown Heights, NY 10598, USA
69/*
70/*	Preemptive scheduler enhancements:
71/*	Patrik Rak
72/*	Modra 6
73/*	155 00, Prague, Czech Republic
74/*--*/
75
76/* System library. */
77
78#include <sys_defs.h>
79#include <unistd.h>
80
81#include <sys/time.h>			/* FD_SETSIZE */
82#include <sys/types.h>			/* FD_SETSIZE */
83#include <unistd.h>			/* FD_SETSIZE */
84
85#ifdef USE_SYS_SELECT_H
86#include <sys/select.h>			/* FD_SETSIZE */
87#endif
88
89/* Utility library. */
90
91#include <msg.h>
92#include <htable.h>
93#include <events.h>
94#include <mymalloc.h>
95#include <vstream.h>
96#include <iostuff.h>
97
98/* Global library. */
99
100#include <mail_proto.h>
101#include <recipient_list.h>
102#include <mail_conf.h>
103#include <mail_params.h>
104
105/* Application-specific. */
106
107#include "qmgr.h"
108
109HTABLE *qmgr_transport_byname;		/* transport by name */
110QMGR_TRANSPORT_LIST qmgr_transport_list;/* transports, round robin */
111
112 /*
113  * A local structure to remember a delivery process allocation request.
114  */
115typedef struct QMGR_TRANSPORT_ALLOC QMGR_TRANSPORT_ALLOC;
116
117struct QMGR_TRANSPORT_ALLOC {
118    QMGR_TRANSPORT *transport;		/* transport context */
119    VSTREAM *stream;			/* delivery service stream */
120    QMGR_TRANSPORT_ALLOC_NOTIFY notify;	/* application call-back routine */
121};
122
123 /*
124  * Connections to delivery agents are managed asynchronously. Each delivery
125  * agent connection goes through multiple wait states:
126  *
127  * - With Linux/Solaris and old queue manager implementations only, wait for
128  * the server to invoke accept().
129  *
130  * - Wait for the delivery agent's announcement that it is ready to receive a
131  * delivery request.
132  *
133  * - Wait for the delivery request completion status.
134  *
135  * Older queue manager implementations had only one pending delivery agent
136  * connection per transport. With low-latency destinations, the output rates
137  * were reduced on Linux/Solaris systems that had the extra wait state.
138  *
139  * To maximize delivery agent output rates with low-latency destinations, the
140  * following changes were made to the queue manager by the end of the 2.4
141  * development cycle:
142  *
143  * - The Linux/Solaris accept() wait state was eliminated.
144  *
145  * - A pipeline was implemented for pending delivery agent connections. The
146  * number of pending delivery agent connections was increased from one to
147  * two: the number of before-delivery wait states, plus one extra pipeline
148  * slot to prevent the pipeline from stalling easily. Increasing the
149  * pipeline much further actually hurt performance.
150  *
151  * - To reduce queue manager disk competition with delivery agents, the queue
152  * scanning algorithm was modified to import only one message per interrupt.
153  * The incoming and deferred queue scans now happen on alternate interrupts.
154  *
155  * Simplistically reasoned, a non-zero (incoming + active) queue length is
156  * equivalent to a time shift for mail deliveries; this is undesirable when
157  * delivery agents are not fully utilized.
158  *
159  * On the other hand a non-empty active queue is what allows us to do clever
160  * things such as queue file prefetch, concurrency windows, and connection
161  * caching; the idea is that such "thinking time" is affordable only after
162  * the output channels are maxed out.
163  */
164#ifndef QMGR_TRANSPORT_MAX_PEND
165#define QMGR_TRANSPORT_MAX_PEND	2
166#endif
167
168/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
169
170static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
171{
172    qmgr_transport_unthrottle((QMGR_TRANSPORT *) context);
173}
174
175/* qmgr_transport_unthrottle - open the throttle */
176
177void    qmgr_transport_unthrottle(QMGR_TRANSPORT *transport)
178{
179    const char *myname = "qmgr_transport_unthrottle";
180
181    /*
182     * This routine runs after expiration of the timer set by
183     * qmgr_transport_throttle(), or whenever a delivery transport has been
184     * used without malfunction. In either case, we enable delivery again if
185     * the transport was blocked, otherwise the request is ignored.
186     */
187    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0) {
188	if (msg_verbose)
189	    msg_info("%s: transport %s", myname, transport->name);
190	transport->flags &= ~QMGR_TRANSPORT_STAT_DEAD;
191	if (transport->dsn == 0)
192	    msg_panic("%s: transport %s: null reason",
193		      myname, transport->name);
194	dsn_free(transport->dsn);
195	transport->dsn = 0;
196	event_cancel_timer(qmgr_transport_unthrottle_wrapper,
197			   (char *) transport);
198    }
199}
200
201/* qmgr_transport_throttle - disable delivery process allocation */
202
203void    qmgr_transport_throttle(QMGR_TRANSPORT *transport, DSN *dsn)
204{
205    const char *myname = "qmgr_transport_throttle";
206
207    /*
208     * We are unable to connect to a deliver process for this type of message
209     * transport. Instead of hosing the system by retrying in a tight loop,
210     * back off and disable this transport type for a while.
211     */
212    if ((transport->flags & QMGR_TRANSPORT_STAT_DEAD) == 0) {
213	if (msg_verbose)
214	    msg_info("%s: transport %s: status: %s reason: %s",
215		     myname, transport->name, dsn->status, dsn->reason);
216	transport->flags |= QMGR_TRANSPORT_STAT_DEAD;
217	if (transport->dsn)
218	    msg_panic("%s: transport %s: spurious reason: %s",
219		      myname, transport->name, transport->dsn->reason);
220	transport->dsn = DSN_COPY(dsn);
221	event_request_timer(qmgr_transport_unthrottle_wrapper,
222			    (char *) transport, var_transport_retry_time);
223    }
224}
225
226/* qmgr_transport_abort - transport connect watchdog */
227
228static void qmgr_transport_abort(int unused_event, char *context)
229{
230    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
231
232    msg_fatal("timeout connecting to transport: %s", alloc->transport->name);
233}
234
235/* qmgr_transport_event - delivery process availability notice */
236
237static void qmgr_transport_event(int unused_event, char *context)
238{
239    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
240
241    /*
242     * This routine notifies the application when the request given to
243     * qmgr_transport_alloc() completes.
244     */
245    if (msg_verbose)
246	msg_info("transport_event: %s", alloc->transport->name);
247
248    /*
249     * Connection request completed. Stop the watchdog timer.
250     */
251    event_cancel_timer(qmgr_transport_abort, context);
252
253    /*
254     * Disable further read events that end up calling this function, and
255     * free up this pending connection pipeline slot.
256     */
257    if (alloc->stream) {
258	event_disable_readwrite(vstream_fileno(alloc->stream));
259	non_blocking(vstream_fileno(alloc->stream), BLOCKING);
260    }
261    alloc->transport->pending -= 1;
262
263    /*
264     * Notify the requestor.
265     */
266    alloc->notify(alloc->transport, alloc->stream);
267    myfree((char *) alloc);
268}
269
270/* qmgr_transport_select - select transport for allocation */
271
272QMGR_TRANSPORT *qmgr_transport_select(void)
273{
274    QMGR_TRANSPORT *xport;
275    QMGR_QUEUE *queue;
276    int     need;
277
278    /*
279     * If we find a suitable transport, rotate the list of transports to
280     * effectuate round-robin selection. See similar selection code in
281     * qmgr_peer_select().
282     *
283     * This function is called repeatedly until all transports have maxed out
284     * the number of pending delivery agent connections, until all delivery
285     * agent concurrency windows are maxed out, or until we run out of "todo"
286     * queue entries.
287     */
288#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
289
290    for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
291	if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
292	    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
293	    continue;
294	need = xport->pending + 1;
295	for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
296	    if (QMGR_QUEUE_READY(queue) == 0)
297		continue;
298	    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
299					  queue->todo_refcount)) <= 0) {
300		QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
301		if (msg_verbose)
302		    msg_info("qmgr_transport_select: %s", xport->name);
303		return (xport);
304	    }
305	}
306    }
307    return (0);
308}
309
310/* qmgr_transport_alloc - allocate delivery process */
311
312void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
313{
314    QMGR_TRANSPORT_ALLOC *alloc;
315
316    /*
317     * Sanity checks.
318     */
319    if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
320	msg_panic("qmgr_transport: dead transport: %s", transport->name);
321    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
322	msg_panic("qmgr_transport: excess allocation: %s", transport->name);
323
324    /*
325     * Connect to the well-known port for this delivery service, and wake up
326     * when a process announces its availability. Allow only a limited number
327     * of delivery process allocation attempts for this transport. In case of
328     * problems, back off. Do not hose the system when it is in trouble
329     * already.
330     *
331     * Use non-blocking connect(), so that Linux won't block the queue manager
332     * until the delivery agent calls accept().
333     *
334     * When the connection to delivery agent cannot be completed, notify the
335     * event handler so that it can throttle the transport and defer the todo
336     * queues, just like it does when communication fails *after* connection
337     * completion.
338     *
339     * Before Postfix 2.4, the event handler was not invoked after connect()
340     * error, and mail was not deferred. Because of this, mail would be stuck
341     * in the active queue after triggering a "connection refused" condition.
342     */
343    alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
344    alloc->transport = transport;
345    alloc->notify = notify;
346    transport->pending += 1;
347    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
348				      NON_BLOCKING)) == 0) {
349	msg_warn("connect to transport %s/%s: %m",
350		 MAIL_CLASS_PRIVATE, transport->name);
351	event_request_timer(qmgr_transport_event, (char *) alloc, 0);
352	return;
353    }
354#if (EVENTS_STYLE != EVENTS_STYLE_SELECT) && defined(VSTREAM_CTL_DUPFD)
355#ifndef THRESHOLD_FD_WORKAROUND
356#define THRESHOLD_FD_WORKAROUND 128
357#endif
358    vstream_control(alloc->stream,
359		    VSTREAM_CTL_DUPFD, THRESHOLD_FD_WORKAROUND,
360		    VSTREAM_CTL_END);
361#endif
362    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
363		      (char *) alloc);
364
365    /*
366     * Guard against broken systems.
367     */
368    event_request_timer(qmgr_transport_abort, (char *) alloc,
369			var_daemon_timeout);
370}
371
372/* qmgr_transport_create - create transport instance */
373
374QMGR_TRANSPORT *qmgr_transport_create(const char *name)
375{
376    QMGR_TRANSPORT *transport;
377
378    if (htable_find(qmgr_transport_byname, name) != 0)
379	msg_panic("qmgr_transport_create: transport exists: %s", name);
380    transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
381    transport->flags = 0;
382    transport->pending = 0;
383    transport->name = mystrdup(name);
384
385    /*
386     * Use global configuration settings or transport-specific settings.
387     */
388    transport->dest_concurrency_limit =
389	get_mail_conf_int2(name, _DEST_CON_LIMIT,
390			   var_dest_con_limit, 0, 0);
391    transport->recipient_limit =
392	get_mail_conf_int2(name, _DEST_RCPT_LIMIT,
393			   var_dest_rcpt_limit, 0, 0);
394    transport->init_dest_concurrency =
395	get_mail_conf_int2(name, _INIT_DEST_CON,
396			   var_init_dest_concurrency, 1, 0);
397    transport->rate_delay = get_mail_conf_time2(name, _DEST_RATE_DELAY,
398						var_dest_rate_delay,
399						's', 0, 0);
400
401    if (transport->rate_delay > 0)
402	transport->dest_concurrency_limit = 1;
403    if (transport->dest_concurrency_limit != 0
404    && transport->dest_concurrency_limit < transport->init_dest_concurrency)
405	transport->init_dest_concurrency = transport->dest_concurrency_limit;
406
407    transport->slot_cost = get_mail_conf_int2(name, _DELIVERY_SLOT_COST,
408					      var_delivery_slot_cost, 0, 0);
409    transport->slot_loan = get_mail_conf_int2(name, _DELIVERY_SLOT_LOAN,
410					      var_delivery_slot_loan, 0, 0);
411    transport->slot_loan_factor =
412	100 - get_mail_conf_int2(name, _DELIVERY_SLOT_DISCOUNT,
413				 var_delivery_slot_discount, 0, 100);
414    transport->min_slots = get_mail_conf_int2(name, _MIN_DELIVERY_SLOTS,
415					      var_min_delivery_slots, 0, 0);
416    transport->rcpt_unused = get_mail_conf_int2(name, _XPORT_RCPT_LIMIT,
417						var_xport_rcpt_limit, 0, 0);
418    transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
419						var_stack_rcpt_limit, 0, 0);
420    transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
421					      var_xport_refill_limit, 1, 0);
422    transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
423					 var_xport_refill_delay, 's', 1, 0);
424
425    transport->queue_byname = htable_create(0);
426    QMGR_LIST_INIT(transport->queue_list);
427    transport->job_byname = htable_create(0);
428    QMGR_LIST_INIT(transport->job_list);
429    QMGR_LIST_INIT(transport->job_bytime);
430    transport->job_current = 0;
431    transport->job_next_unread = 0;
432    transport->candidate_cache = 0;
433    transport->candidate_cache_current = 0;
434    transport->candidate_cache_time = (time_t) 0;
435    transport->blocker_tag = 1;
436    transport->dsn = 0;
437    qmgr_feedback_init(&transport->pos_feedback, name, _CONC_POS_FDBACK,
438		       VAR_CONC_POS_FDBACK, var_conc_pos_feedback);
439    qmgr_feedback_init(&transport->neg_feedback, name, _CONC_NEG_FDBACK,
440		       VAR_CONC_NEG_FDBACK, var_conc_neg_feedback);
441    transport->fail_cohort_limit =
442	get_mail_conf_int2(name, _CONC_COHORT_LIM,
443			   var_conc_cohort_limit, 0, 0);
444    if (qmgr_transport_byname == 0)
445	qmgr_transport_byname = htable_create(10);
446    htable_enter(qmgr_transport_byname, name, (char *) transport);
447    QMGR_LIST_PREPEND(qmgr_transport_list, transport, peers);
448    if (msg_verbose)
449	msg_info("qmgr_transport_create: %s concurrency %d recipients %d",
450		 transport->name, transport->dest_concurrency_limit,
451		 transport->recipient_limit);
452    return (transport);
453}
454
455/* qmgr_transport_find - find transport instance */
456
457QMGR_TRANSPORT *qmgr_transport_find(const char *name)
458{
459    return ((QMGR_TRANSPORT *) htable_find(qmgr_transport_byname, name));
460}
461