bounce.c revision 1.54
1/*	$OpenBSD: bounce.c,v 1.54 2013/01/26 09:37:23 gilles Exp $	*/
2
3/*
4 * Copyright (c) 2009 Gilles Chehade <gilles@poolp.org>
5 * Copyright (c) 2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
6 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
7 *
8 * Permission to use, copy, modify, and distribute this software for any
9 * purpose with or without fee is hereby granted, provided that the above
10 * copyright notice and this permission notice appear in all copies.
11 *
12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19 */
20
21#include <sys/types.h>
22#include <sys/queue.h>
23#include <sys/tree.h>
24#include <sys/param.h>
25#include <sys/socket.h>
26
27#include <err.h>
28#include <errno.h>
29#include <event.h>
30#include <imsg.h>
31#include <inttypes.h>
32#include <pwd.h>
33#include <signal.h>
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37#include <time.h>
38#include <unistd.h>
39
40#include "smtpd.h"
41#include "log.h"
42
43#define BOUNCE_MAXRUN	2
44#define BOUNCE_HIWAT	65535
45
46enum {
47	BOUNCE_EHLO,
48	BOUNCE_MAIL,
49	BOUNCE_RCPT,
50	BOUNCE_DATA,
51	BOUNCE_DATA_NOTICE,
52	BOUNCE_DATA_MESSAGE,
53	BOUNCE_DATA_END,
54	BOUNCE_QUIT,
55	BOUNCE_CLOSE,
56};
57
58struct bounce_envelope {
59	TAILQ_ENTRY(bounce_envelope)	 entry;
60	uint64_t			 id;
61	char				*report;
62};
63
64struct bounce_message {
65	SPLAY_ENTRY(bounce_message)	 sp_entry;
66	TAILQ_ENTRY(bounce_message)	 entry;
67	uint32_t			 msgid;
68	struct delivery_bounce		 bounce;
69	char				*to;
70	time_t				 timeout;
71	TAILQ_HEAD(, bounce_envelope)	 envelopes;
72};
73
74struct bounce_session {
75	struct bounce_message		*msg;
76	FILE				*msgfp;
77	int				 state;
78	struct iobuf			 iobuf;
79	struct io			 io;
80};
81
82SPLAY_HEAD(bounce_message_tree, bounce_message);
83static int bounce_message_cmp(const struct bounce_message *,
84    const struct bounce_message *);
85SPLAY_PROTOTYPE(bounce_message_tree, bounce_message, sp_entry,
86    bounce_message_cmp);
87
88static void bounce_drain(void);
89static void bounce_send(struct bounce_session *, const char *, ...);
90static int  bounce_next_message(struct bounce_session *);
91static int  bounce_next(struct bounce_session *);
92static void bounce_delivery(struct bounce_message *, int, const char *);
93static void bounce_status(struct bounce_session *, const char *, ...);
94static void bounce_io(struct io *, int);
95static void bounce_timeout(int, short, void *);
96static void bounce_free(struct bounce_session *);
97
98static struct tree			wait_fd;
99static struct bounce_message_tree	messages;
100static TAILQ_HEAD(, bounce_message)	pending;
101
102static int				nmessage = 0;
103static int				running = 0;
104static struct event			ev_timer;
105
106static void
107bounce_init(void)
108{
109	static int	init = 0;
110
111	if (init == 0) {
112		TAILQ_INIT(&pending);
113		SPLAY_INIT(&messages);
114		tree_init(&wait_fd);
115		evtimer_set(&ev_timer, bounce_timeout, NULL);
116		init = 1;
117	}
118}
119
120void
121bounce_add(uint64_t evpid)
122{
123	char			 buf[MAX_LINE_SIZE], *line;
124	struct envelope		 evp;
125	struct bounce_message	 key, *msg;
126	struct bounce_envelope	*be;
127
128	bounce_init();
129
130	if (queue_envelope_load(evpid, &evp) == 0) {
131		m_create(p_scheduler, IMSG_DELIVERY_PERMFAIL, 0, 0, -1, 9);
132		m_add_evpid(p_scheduler, evpid);
133		m_close(p_scheduler);
134		return;
135	}
136
137	if (evp.type != D_BOUNCE)
138		errx(1, "bounce: evp:%016" PRIx64 " is not of type D_BOUNCE!",
139		    evp.id);
140
141	key.msgid = evpid_to_msgid(evpid);
142	key.bounce = evp.agent.bounce;
143	msg = SPLAY_FIND(bounce_message_tree, &messages, &key);
144	if (msg == NULL) {
145		msg = xcalloc(1, sizeof(*msg), "bounce_add");
146		msg->msgid = key.msgid;
147		msg->bounce = key.bounce;
148		SPLAY_INSERT(bounce_message_tree, &messages, msg);
149
150		TAILQ_INIT(&msg->envelopes);
151
152		snprintf(buf, sizeof(buf), "%s@%s", evp.sender.user,
153		    evp.sender.domain);
154		msg->to = xstrdup(buf, "bounce_add");
155		nmessage += 1;
156		log_debug("debug: bounce: new message %08" PRIx32,
157		    msg->msgid);
158		stat_increment("bounce.message", 1);
159	} else
160		TAILQ_REMOVE(&pending, msg, entry);
161
162	line = evp.errorline;
163	if (strlen(line) > 4 && (*line == '1' || *line == '6'))
164		line += 4;
165	snprintf(buf, sizeof(buf), "%s@%s: %s\n", evp.dest.user,
166	    evp.dest.domain, line);
167
168	be = xmalloc(sizeof *be, "bounce_add");
169	be->id = evpid;
170	be->report = xstrdup(buf, "bounce_add");
171	TAILQ_INSERT_TAIL(&msg->envelopes, be, entry);
172	log_debug("debug: bounce: adding report %16"PRIx64": %s", be->id,
173	    be->report);
174
175	msg->timeout = time(NULL) + 1;
176	TAILQ_INSERT_TAIL(&pending, msg, entry);
177
178	stat_increment("bounce.envelope", 1);
179	bounce_drain();
180}
181
182void
183bounce_fd(int fd)
184{
185	struct bounce_session	*s;
186
187	log_debug("debug: bounce: got enqueue socket %i", fd);
188
189	if (fd == -1) {
190		running -= 1;
191		bounce_drain();
192		return;
193	}
194
195	s = xcalloc(1, sizeof(*s), "bounce_fd");
196	s->state = BOUNCE_EHLO;
197	iobuf_xinit(&s->iobuf, 0, 0, "bounce_run");
198	io_init(&s->io, fd, s, bounce_io, &s->iobuf);
199	io_set_timeout(&s->io, 30000);
200	io_set_read(&s->io);
201
202	log_debug("debug: bounce: new session %p", s);
203	stat_increment("bounce.session", 1);
204}
205
206static void
207bounce_timeout(int fd, short ev, void *arg)
208{
209	log_debug("debug: bounce: timeout");
210
211	bounce_drain();
212}
213
214static void
215bounce_drain()
216{
217	struct bounce_message	*msg;
218	struct timeval		 tv;
219	time_t			 t;
220
221	log_debug("debug: bounce: drain: nmessage=%i running=%i",
222	    nmessage, running);
223
224	while (1) {
225		if (running >= BOUNCE_MAXRUN) {
226			log_debug("debug: bounce: max session reached");
227			return;
228		}
229
230		if (nmessage == 0) {
231			log_debug("debug: bounce: no more messages");
232			return;
233		}
234
235		if (running >= nmessage) {
236			log_debug("debug: bounce: enough sessions running");
237			return;
238		}
239
240		if ((msg = TAILQ_FIRST(&pending)) == NULL) {
241			log_debug("debug: bounce: no more pending messages");
242			return;
243		}
244
245		t = time(NULL);
246		if (msg->timeout > t) {
247			log_debug("debug: bounce: next message not ready yet");
248			if (!evtimer_pending(&ev_timer, NULL)) {
249				log_debug("debug: bounce: setting timer");
250				tv.tv_sec = msg->timeout - t;
251				tv.tv_usec = 0;
252				evtimer_add(&ev_timer, &tv);
253			}
254			return;
255		}
256
257		log_debug("debug: bounce: requesting new enqueue socket...");
258		m_compose(p_smtp, IMSG_SMTP_ENQUEUE_FD, 0, 0, -1, NULL, 0);
259
260		running += 1;
261	}
262}
263
264static void
265bounce_send(struct bounce_session *s, const char *fmt, ...)
266{
267	va_list	 ap;
268	char	*p;
269	int	 len;
270
271	va_start(ap, fmt);
272	if ((len = vasprintf(&p, fmt, ap)) == -1)
273		fatal("bounce: vasprintf");
274	va_end(ap);
275
276	log_trace(TRACE_BOUNCE, "bounce: %p: >>> %s", s, p);
277
278	iobuf_xfqueue(&s->iobuf, "bounce_send", "%s\n", p);
279
280	free(p);
281}
282
283static const char *
284bounce_duration(long long int d) {
285	static char buf[32];
286
287	if (d < 60) {
288		snprintf(buf, sizeof buf, "%lli second%s", d, (d == 1)?"":"s");
289	} else if (d < 3600) {
290		d = d / 60;
291		snprintf(buf, sizeof buf, "%lli minute%s", d, (d == 1)?"":"s");
292	}
293	else if (d < 3600 * 24) {
294		d = d / 3600;
295		snprintf(buf, sizeof buf, "%lli hour%s", d, (d == 1)?"":"s");
296	}
297	else {
298		d = d / (3600 * 24);
299		snprintf(buf, sizeof buf, "%lli day%s", d, (d == 1)?"":"s");
300	}
301	return (buf);
302};
303
304#define NOTICE_INTRO							    \
305	"    Hi!\n\n"							    \
306	"    This is the MAILER-DAEMON, please DO NOT REPLY to this e-mail.\n"
307
308const char *notice_error =
309    "    An error has occurred while attempting to deliver a message for\n"
310    "    the following list of recipients:\n\n";
311
312const char *notice_warning =
313    "    A message is delayed for more than %s for the following\n"
314    "    list of recipients:\n\n";
315
316const char *notice_warning2 =
317    "    Please note that this is only a temporary failure report.\n"
318    "    The message is kept in the queue for up to %s.\n"
319    "    You DO NOT NEED to re-send the message to these recipients.\n\n";
320
321static int
322bounce_next_message(struct bounce_session *s)
323{
324	struct bounce_message	*msg;
325	char			 buf[MAX_LINE_SIZE];
326	int			 fd;
327
328    again:
329
330	msg = TAILQ_FIRST(&pending);
331	if (msg == NULL || msg->timeout > time(NULL))
332		return (0);
333
334	TAILQ_REMOVE(&pending, msg, entry);
335	SPLAY_REMOVE(bounce_message_tree, &messages, msg);
336
337	if ((fd = queue_message_fd_r(msg->msgid)) == -1) {
338		bounce_delivery(msg, IMSG_DELIVERY_TEMPFAIL,
339		    "Could not open message fd");
340		goto again;
341	}
342
343	if ((s->msgfp = fdopen(fd, "r")) == NULL) {
344		snprintf(buf, sizeof(buf), "fdopen: %s", strerror(errno));
345		log_warn("warn: bounce: fdopen");
346		close(fd);
347		bounce_delivery(msg, IMSG_DELIVERY_TEMPFAIL, buf);
348		goto again;
349	}
350
351	s->msg = msg;
352	return (1);
353}
354
355static int
356bounce_next(struct bounce_session *s)
357{
358	struct bounce_envelope	*evp;
359	char			*line;
360	size_t			 len, n;
361
362	switch (s->state) {
363	case BOUNCE_EHLO:
364		bounce_send(s, "EHLO %s", env->sc_hostname);
365		s->state = BOUNCE_MAIL;
366		break;
367
368	case BOUNCE_MAIL:
369	case BOUNCE_DATA_END:
370		log_debug("debug: bounce: %p: getting next message...", s);
371		if (bounce_next_message(s) == 0) {
372			log_debug("debug: bounce: %p: no more messages", s);
373			bounce_send(s, "QUIT");
374			s->state = BOUNCE_CLOSE;
375 			break;
376		}
377		log_debug("debug: bounce: %p: found message %08"PRIx32,
378		    s, s->msg->msgid);
379		bounce_send(s, "MAIL FROM: <>");
380		s->state = BOUNCE_RCPT;
381		break;
382
383	case BOUNCE_RCPT:
384		bounce_send(s, "RCPT TO: <%s>", s->msg->to);
385		s->state = BOUNCE_DATA;
386		break;
387
388	case BOUNCE_DATA:
389		bounce_send(s, "DATA");
390		s->state = BOUNCE_DATA_NOTICE;
391		break;
392
393	case BOUNCE_DATA_NOTICE:
394		/* Construct an appropriate notice. */
395
396		iobuf_xfqueue(&s->iobuf, "bounce_next: HEADER",
397		    "Subject: Delivery status notification: %s\n"
398		    "From: Mailer Daemon <MAILER-DAEMON@%s>\n"
399		    "To: %s\n"
400		    "Date: %s\n"
401		    "\n"
402		    NOTICE_INTRO
403		    "\n",
404		    (s->msg->bounce.type == B_ERROR) ? "error" : "warning",
405		    env->sc_hostname,
406		    s->msg->to,
407		    time_to_text(time(NULL)));
408
409		if (s->msg->bounce.type == B_ERROR)
410			iobuf_xfqueue(&s->iobuf, "bounce_next: BODY",
411			    notice_error);
412		else
413			iobuf_xfqueue(&s->iobuf, "bounce_next: BODY",
414			    notice_warning,
415			    bounce_duration(s->msg->bounce.delay));
416
417		TAILQ_FOREACH(evp, &s->msg->envelopes, entry) {
418			iobuf_xfqueue(&s->iobuf,
419			    "bounce_next: DATA_NOTICE",
420			    "%s", evp->report);
421		}
422		iobuf_xfqueue(&s->iobuf, "bounce_next: DATA_NOTICE", "\n");
423
424		if (s->msg->bounce.type == B_WARNING)
425			iobuf_xfqueue(&s->iobuf, "bounce_next: BODY",
426			    notice_warning2,
427			    bounce_duration(s->msg->bounce.expire));
428
429		iobuf_xfqueue(&s->iobuf, "bounce_next: DATA_NOTICE",
430		    "    Below is a copy of the original message:\n"
431		    "\n");
432
433		log_trace(TRACE_BOUNCE, "bounce: %p: >>> [... %zu bytes ...]",
434		    s, iobuf_queued(&s->iobuf));
435
436		s->state = BOUNCE_DATA_MESSAGE;
437		break;
438
439	case BOUNCE_DATA_MESSAGE:
440
441		n = iobuf_queued(&s->iobuf);
442
443		while (iobuf_len(&s->iobuf) < BOUNCE_HIWAT) {
444			line = fgetln(s->msgfp, &len);
445			if (line == NULL)
446				break;
447			line[len - 1] = '\0';
448			iobuf_xfqueue(&s->iobuf,
449			    "bounce_next: DATA_MESSAGE", "%s%s\n",
450			    (len == 2 && line[0] == '.') ? "." : "", line);
451		}
452
453		if (ferror(s->msgfp)) {
454			fclose(s->msgfp);
455			s->msgfp = NULL;
456			bounce_delivery(s->msg, IMSG_DELIVERY_TEMPFAIL,
457			    "Error reading message");
458			s->msg = NULL;
459			return (-1);
460		}
461
462		log_trace(TRACE_BOUNCE, "bounce: %p: >>> [... %zu bytes ...]",
463		    s, iobuf_queued(&s->iobuf) - n);
464
465		if (feof(s->msgfp)) {
466			fclose(s->msgfp);
467			s->msgfp = NULL;
468			bounce_send(s, ".");
469			s->state = BOUNCE_DATA_END;
470		}
471		break;
472
473	case BOUNCE_QUIT:
474		bounce_send(s, "QUIT");
475		s->state = BOUNCE_CLOSE;
476		break;
477
478	default:
479		fatalx("bounce: bad state");
480	}
481
482	return (0);
483}
484
485
486static void
487bounce_delivery(struct bounce_message *msg, int delivery, const char *status)
488{
489	struct bounce_envelope	*be;
490	struct envelope		 evp;
491	size_t			 n;
492
493	log_debug("debug: bounce: status %s for message %08"PRIx32": %s",
494	    imsg_to_str(delivery),
495	    msg->msgid,
496	    status);
497
498	n = 0;
499	while ((be = TAILQ_FIRST(&msg->envelopes))) {
500		if (delivery == IMSG_DELIVERY_TEMPFAIL) {
501			if (queue_envelope_load(be->id, &evp) == 0) {
502				fatalx("could not reload envelope!");
503			}
504			evp.retry++;
505			evp.lasttry = msg->timeout;
506			envelope_set_errormsg(&evp, "%s", status);
507			queue_envelope_update(&evp);
508			m_create(p_scheduler, delivery, 0, 0, -1, MSZ_EVP);
509			m_add_envelope(p_scheduler, &evp);
510			m_close(p_scheduler);
511		} else {
512			m_create(p_scheduler, delivery, 0, 0, -1,
513			    sizeof(be->id) + 1);
514			m_add_evpid(p_scheduler, be->id);
515			m_close(p_scheduler);
516			queue_envelope_delete(be->id);
517		}
518		TAILQ_REMOVE(&msg->envelopes, be, entry);
519		free(be);
520		n += 1;
521	}
522
523	nmessage -= 1;
524	stat_decrement("bounce.message", 1);
525	stat_decrement("bounce.envelope", n);
526	free(msg);
527}
528
529static void
530bounce_status(struct bounce_session *s, const char *fmt, ...)
531{
532	va_list		 ap;
533	char		*status;
534	int		 len, delivery;
535
536	/* Ignore if there is no message */
537	if (s->msg == NULL)
538		return;
539
540	va_start(ap, fmt);
541	if ((len = vasprintf(&status, fmt, ap)) == -1)
542		fatal("bounce: vasprintf");
543	va_end(ap);
544
545	if (*status == '2')
546		delivery = IMSG_DELIVERY_OK;
547	else if (*status == '5' || *status == '6')
548		delivery = IMSG_DELIVERY_PERMFAIL;
549	else
550		delivery = IMSG_DELIVERY_TEMPFAIL;
551
552	bounce_delivery(s->msg, delivery, status);
553	s->msg = NULL;
554	if (s->msgfp)
555		fclose(s->msgfp);
556
557	free(status);
558}
559
560static void
561bounce_free(struct bounce_session *s)
562{
563	log_debug("debug: bounce: %p: deleting session", s);
564
565	iobuf_clear(&s->iobuf);
566	io_clear(&s->io);
567	free(s);
568
569	running -= 1;
570	stat_decrement("bounce.session", 1);
571	bounce_drain();
572}
573
574static void
575bounce_io(struct io *io, int evt)
576{
577	struct bounce_session	*s = io->arg;
578	const char		*error;
579	char			*line, *msg;
580	int			 cont;
581	size_t			 len;
582
583	log_trace(TRACE_IO, "bounce: %p: %s %s", s, io_strevent(evt),
584	    io_strio(io));
585
586	switch (evt) {
587	case IO_DATAIN:
588	    nextline:
589		line = iobuf_getline(&s->iobuf, &len);
590		if (line == NULL && iobuf_len(&s->iobuf) >= SMTP_LINE_MAX) {
591			bounce_status(s, "Input too long");
592			bounce_free(s);
593			return;
594		}
595
596		if (line == NULL) {
597			iobuf_normalize(&s->iobuf);
598			break;
599		}
600
601		log_trace(TRACE_BOUNCE, "bounce: %p: <<< %s", s, line);
602
603		if ((error = parse_smtp_response(line, len, &msg, &cont))) {
604			bounce_status(s, "Bad response: %s", error);
605			bounce_free(s);
606			return;
607		}
608		if (cont)
609			goto nextline;
610
611		if (s->state == BOUNCE_CLOSE) {
612			bounce_free(s);
613			return;
614		}
615
616		if (line[0] != '2' && line[0] != '3') {		/* fail */
617			bounce_status(s, "%s", line);
618			s->state = BOUNCE_QUIT;
619		} else if (s->state == BOUNCE_DATA_END) {	/* accepted */
620			bounce_status(s, "%s", line);
621		}
622
623		if (bounce_next(s) == -1) {
624			bounce_free(s);
625			return;
626		}
627
628		io_set_write(io);
629		break;
630
631	case IO_LOWAT:
632		if (s->state == BOUNCE_DATA_MESSAGE)
633			if (bounce_next(s) == -1) {
634				bounce_free(s);
635				return;
636			}
637		if (iobuf_queued(&s->iobuf) == 0)
638			io_set_read(io);
639		break;
640
641	default:
642		bounce_status(s, "442 i/o error %i", evt);
643		bounce_free(s);
644		break;
645	}
646}
647
648static int
649bounce_message_cmp(const struct bounce_message *a,
650    const struct bounce_message *b)
651{
652	if (a->msgid < b->msgid)
653		return (-1);
654	if (a->msgid > b->msgid)
655		return (1);
656	return memcmp(&a->bounce, &b->bounce, sizeof (a->bounce));
657}
658
659SPLAY_GENERATE(bounce_message_tree, bounce_message, sp_entry,
660    bounce_message_cmp);
661