1/*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37/**
38 * \file
39 *
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
42 */
43
44#include "config.h"
45#include "dnstap/dtstream.h"
46#include "dnstap/dnstap_fstrm.h"
47#include "util/config_file.h"
48#include "util/ub_event.h"
49#include "util/net_help.h"
50#include "services/outside_network.h"
51#include "sldns/sbuffer.h"
52#ifdef HAVE_SYS_UN_H
53#include <sys/un.h>
54#endif
55#include <fcntl.h>
56#ifdef HAVE_OPENSSL_SSL_H
57#include <openssl/ssl.h>
58#endif
59#ifdef HAVE_OPENSSL_ERR_H
60#include <openssl/err.h>
61#endif
62
63/** number of messages to process in one output callback */
64#define DTIO_MESSAGES_PER_CALLBACK 100
65/** the msec to wait for reconnect (if not immediate, the first attempt) */
66#define DTIO_RECONNECT_TIMEOUT_MIN 10
67/** the msec to wait for reconnect max after backoff */
68#define DTIO_RECONNECT_TIMEOUT_MAX 1000
69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71/** number of messages before wakeup of thread */
72#define DTIO_MSG_FOR_WAKEUP 32
73
74/** maximum length of received frame */
75#define DTIO_RECV_FRAME_MAX_LEN 1000
76
77struct stop_flush_info;
78/** DTIO command channel commands */
79enum {
80	/** DTIO command channel stop */
81	DTIO_COMMAND_STOP = 0,
82	/** DTIO command channel wakeup */
83	DTIO_COMMAND_WAKEUP = 1
84} dtio_channel_command;
85
86/** open the output channel */
87static void dtio_open_output(struct dt_io_thread* dtio);
88/** add output event for read and write */
89static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90/** start reconnection attempts */
91static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92/** stop from stop_flush event loop */
93static void dtio_stop_flush_exit(struct stop_flush_info* info);
94/** setup a start control message */
95static int dtio_control_start_send(struct dt_io_thread* dtio);
96#ifdef HAVE_SSL
97/** enable briefly waiting for a read event, for SSL negotiation */
98static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99/** enable briefly waiting for a write event, for SSL negotiation */
100static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101#endif
102
103struct dt_msg_queue*
104dt_msg_queue_create(struct comm_base* base)
105{
106	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107	if(!mq) return NULL;
108	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109		about 1 M should contain 64K messages with some overhead,
110		or a whole bunch smaller ones */
111	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112	if(!mq->wakeup_timer) {
113		free(mq);
114		return NULL;
115	}
116	lock_basic_init(&mq->lock);
117	lock_protect(&mq->lock, mq, sizeof(*mq));
118	return mq;
119}
120
121/** clear the message list, caller must hold the lock */
122static void
123dt_msg_queue_clear(struct dt_msg_queue* mq)
124{
125	struct dt_msg_entry* e = mq->first, *next=NULL;
126	while(e) {
127		next = e->next;
128		free(e->buf);
129		free(e);
130		e = next;
131	}
132	mq->first = NULL;
133	mq->last = NULL;
134	mq->cursize = 0;
135	mq->msgcount = 0;
136}
137
138void
139dt_msg_queue_delete(struct dt_msg_queue* mq)
140{
141	if(!mq) return;
142	lock_basic_destroy(&mq->lock);
143	dt_msg_queue_clear(mq);
144	comm_timer_delete(mq->wakeup_timer);
145	free(mq);
146}
147
148/** make the dtio wake up by sending a wakeup command */
149static void dtio_wakeup(struct dt_io_thread* dtio)
150{
151	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152	if(!dtio) return;
153	if(!dtio->started) return;
154
155	while(1) {
156		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157		if(r == -1) {
158#ifndef USE_WINSOCK
159			if(errno == EINTR || errno == EAGAIN)
160				continue;
161#else
162			if(WSAGetLastError() == WSAEINPROGRESS)
163				continue;
164			if(WSAGetLastError() == WSAEWOULDBLOCK)
165				continue;
166#endif
167			log_err("dnstap io wakeup: write: %s",
168				sock_strerror(errno));
169			break;
170		}
171		break;
172	}
173}
174
175void
176mq_wakeup_cb(void* arg)
177{
178	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179	/* even if the dtio is already active, because perhaps much
180	 * traffic suddenly, we leave the timer running to save on
181	 * managing it, the once a second timer is less work then
182	 * starting and stopping the timer frequently */
183	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184	mq->dtio->wakeup_timer_enabled = 0;
185	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186	dtio_wakeup(mq->dtio);
187}
188
189/** start timer to wakeup dtio because there is content in the queue */
190static void
191dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192{
193	struct timeval tv;
194	/* Start a timer to process messages to be logged.
195	 * If we woke up the dtio thread for every message, the wakeup
196	 * messages take up too much processing power.  If the queue
197	 * fills up the wakeup happens immediately.  The timer wakes it up
198	 * if there are infrequent messages to log. */
199
200	/* we cannot start a timer in dtio thread, because it is a different
201	 * thread and its event base is in use by the other thread, it would
202	 * give race conditions if we tried to modify its event base,
203	 * and locks would wait until it woke up, and this is what we do. */
204
205	/* do not start the timer if a timer already exists, perhaps
206	 * in another worker.  So this variable is protected by a lock in
207	 * dtio */
208	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209	if(mq->dtio->wakeup_timer_enabled) {
210		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211		return;
212	}
213	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215
216	/* start the timer, in mq, in the event base of our worker */
217	tv.tv_sec = 1;
218	tv.tv_usec = 0;
219	comm_timer_set(mq->wakeup_timer, &tv);
220}
221
222void
223dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224{
225	int wakeupnow = 0, wakeupstarttimer = 0;
226	struct dt_msg_entry* entry;
227
228	/* check conditions */
229	if(!buf) return;
230	if(len == 0) {
231		/* it is not possible to log entries with zero length,
232		 * because the framestream protocol does not carry it.
233		 * However the protobuf serialization does not create zero
234		 * length datagrams for dnstap, so this should not happen. */
235		free(buf);
236		return;
237	}
238	if(!mq) {
239		free(buf);
240		return;
241	}
242
243	/* allocate memory for queue entry */
244	entry = malloc(sizeof(*entry));
245	if(!entry) {
246		log_err("out of memory logging dnstap");
247		free(buf);
248		return;
249	}
250	entry->next = NULL;
251	entry->buf = buf;
252	entry->len = len;
253
254	/* aqcuire lock */
255	lock_basic_lock(&mq->lock);
256	/* if list was empty, start timer for (eventual) wakeup */
257	if(mq->first == NULL)
258		wakeupstarttimer = 1;
259	/* if list contains more than wakeupnum elements, wakeup now,
260	 * or if list is (going to be) almost full */
261	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262		(mq->cursize < mq->maxsize * 9 / 10 &&
263		mq->cursize+len >= mq->maxsize * 9 / 10))
264		wakeupnow = 1;
265	/* see if it is going to fit */
266	if(mq->cursize + len > mq->maxsize) {
267		/* buffer full, or congested. */
268		/* drop */
269		lock_basic_unlock(&mq->lock);
270		free(buf);
271		free(entry);
272		return;
273	}
274	mq->cursize += len;
275	mq->msgcount ++;
276	/* append to list */
277	if(mq->last) {
278		mq->last->next = entry;
279	} else {
280		mq->first = entry;
281	}
282	mq->last = entry;
283	/* release lock */
284	lock_basic_unlock(&mq->lock);
285
286	if(wakeupnow) {
287		dtio_wakeup(mq->dtio);
288	} else if(wakeupstarttimer) {
289		dt_msg_queue_start_timer(mq);
290	}
291}
292
293struct dt_io_thread* dt_io_thread_create(void)
294{
295	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296	lock_basic_init(&dtio->wakeup_timer_lock);
297	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298		sizeof(dtio->wakeup_timer_enabled));
299	return dtio;
300}
301
302void dt_io_thread_delete(struct dt_io_thread* dtio)
303{
304	struct dt_io_list_item* item, *nextitem;
305	if(!dtio) return;
306	lock_basic_destroy(&dtio->wakeup_timer_lock);
307	item=dtio->io_list;
308	while(item) {
309		nextitem = item->next;
310		free(item);
311		item = nextitem;
312	}
313	free(dtio->socket_path);
314	free(dtio->ip_str);
315	free(dtio->tls_server_name);
316	free(dtio->client_key_file);
317	free(dtio->client_cert_file);
318	if(dtio->ssl_ctx) {
319#ifdef HAVE_SSL
320		SSL_CTX_free(dtio->ssl_ctx);
321#endif
322	}
323	free(dtio);
324}
325
326int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
327{
328	if(!cfg->dnstap) {
329		log_warn("cannot setup dnstap because dnstap-enable is no");
330		return 0;
331	}
332
333	/* what type of connectivity do we have */
334	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
335		if(cfg->dnstap_tls)
336			dtio->upstream_is_tls = 1;
337		else	dtio->upstream_is_tcp = 1;
338	} else {
339		dtio->upstream_is_unix = 1;
340	}
341	dtio->is_bidirectional = cfg->dnstap_bidirectional;
342
343	if(dtio->upstream_is_unix) {
344		char* nm;
345		if(!cfg->dnstap_socket_path ||
346			cfg->dnstap_socket_path[0]==0) {
347			log_err("dnstap setup: no dnstap-socket-path for "
348				"socket connect");
349			return 0;
350		}
351		nm = cfg->dnstap_socket_path;
352		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
353			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
354			nm += strlen(cfg->chrootdir);
355		free(dtio->socket_path);
356		dtio->socket_path = strdup(nm);
357		if(!dtio->socket_path) {
358			log_err("dnstap setup: malloc failure");
359			return 0;
360		}
361	}
362
363	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
364		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
365			log_err("dnstap setup: no dnstap-ip for TCP connect");
366			return 0;
367		}
368		free(dtio->ip_str);
369		dtio->ip_str = strdup(cfg->dnstap_ip);
370		if(!dtio->ip_str) {
371			log_err("dnstap setup: malloc failure");
372			return 0;
373		}
374	}
375
376	if(dtio->upstream_is_tls) {
377#ifdef HAVE_SSL
378		if(cfg->dnstap_tls_server_name &&
379			cfg->dnstap_tls_server_name[0]) {
380			free(dtio->tls_server_name);
381			dtio->tls_server_name = strdup(
382				cfg->dnstap_tls_server_name);
383			if(!dtio->tls_server_name) {
384				log_err("dnstap setup: malloc failure");
385				return 0;
386			}
387			if(!check_auth_name_for_ssl(dtio->tls_server_name))
388				return 0;
389		}
390		if(cfg->dnstap_tls_client_key_file &&
391			cfg->dnstap_tls_client_key_file[0]) {
392			dtio->use_client_certs = 1;
393			free(dtio->client_key_file);
394			dtio->client_key_file = strdup(
395				cfg->dnstap_tls_client_key_file);
396			if(!dtio->client_key_file) {
397				log_err("dnstap setup: malloc failure");
398				return 0;
399			}
400			if(!cfg->dnstap_tls_client_cert_file ||
401				cfg->dnstap_tls_client_cert_file[0]==0) {
402				log_err("dnstap setup: client key "
403					"authentication enabled with "
404					"dnstap-tls-client-key-file, but "
405					"no dnstap-tls-client-cert-file "
406					"is given");
407				return 0;
408			}
409			free(dtio->client_cert_file);
410			dtio->client_cert_file = strdup(
411				cfg->dnstap_tls_client_cert_file);
412			if(!dtio->client_cert_file) {
413				log_err("dnstap setup: malloc failure");
414				return 0;
415			}
416		} else {
417			dtio->use_client_certs = 0;
418			dtio->client_key_file = NULL;
419			dtio->client_cert_file = NULL;
420		}
421
422		if(cfg->dnstap_tls_cert_bundle) {
423			dtio->ssl_ctx = connect_sslctx_create(
424				dtio->client_key_file,
425				dtio->client_cert_file,
426				cfg->dnstap_tls_cert_bundle, 0);
427		} else {
428			dtio->ssl_ctx = connect_sslctx_create(
429				dtio->client_key_file,
430				dtio->client_cert_file,
431				cfg->tls_cert_bundle, cfg->tls_win_cert);
432		}
433		if(!dtio->ssl_ctx) {
434			log_err("could not setup SSL CTX");
435			return 0;
436		}
437		dtio->tls_use_sni = cfg->tls_use_sni;
438#endif /* HAVE_SSL */
439	}
440	return 1;
441}
442
443int dt_io_thread_register_queue(struct dt_io_thread* dtio,
444        struct dt_msg_queue* mq)
445{
446	struct dt_io_list_item* item = malloc(sizeof(*item));
447	if(!item) return 0;
448	lock_basic_lock(&mq->lock);
449	mq->dtio = dtio;
450	lock_basic_unlock(&mq->lock);
451	item->queue = mq;
452	item->next = dtio->io_list;
453	dtio->io_list = item;
454	dtio->io_list_iter = NULL;
455	return 1;
456}
457
458void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
459        struct dt_msg_queue* mq)
460{
461	struct dt_io_list_item* item, *prev=NULL;
462	if(!dtio) return;
463	item = dtio->io_list;
464	while(item) {
465		if(item->queue == mq) {
466			/* found it */
467			if(prev) prev->next = item->next;
468			else dtio->io_list = item->next;
469			/* the queue itself only registered, not deleted */
470			lock_basic_lock(&item->queue->lock);
471			item->queue->dtio = NULL;
472			lock_basic_unlock(&item->queue->lock);
473			free(item);
474			dtio->io_list_iter = NULL;
475			return;
476		}
477		prev = item;
478		item = item->next;
479	}
480}
481
482/** pick a message from the queue, the routine locks and unlocks,
483 * returns true if there is a message */
484static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
485	size_t* len)
486{
487	lock_basic_lock(&mq->lock);
488	if(mq->first) {
489		struct dt_msg_entry* entry = mq->first;
490		mq->first = entry->next;
491		if(!entry->next) mq->last = NULL;
492		mq->cursize -= entry->len;
493		mq->msgcount --;
494		lock_basic_unlock(&mq->lock);
495
496		*buf = entry->buf;
497		*len = entry->len;
498		free(entry);
499		return 1;
500	}
501	lock_basic_unlock(&mq->lock);
502	return 0;
503}
504
505/** find message in queue, false if no message, true if message to send */
506static int dtio_find_in_queue(struct dt_io_thread* dtio,
507	struct dt_msg_queue* mq)
508{
509	void* buf=NULL;
510	size_t len=0;
511	if(dt_msg_queue_pop(mq, &buf, &len)) {
512		dtio->cur_msg = buf;
513		dtio->cur_msg_len = len;
514		dtio->cur_msg_done = 0;
515		dtio->cur_msg_len_done = 0;
516		return 1;
517	}
518	return 0;
519}
520
521/** find a new message to write, search message queues, false if none */
522static int dtio_find_msg(struct dt_io_thread* dtio)
523{
524	struct dt_io_list_item *spot, *item;
525
526	spot = dtio->io_list_iter;
527	/* use the next queue for the next message lookup,
528	 * if we hit the end(NULL) the NULL restarts the iter at start. */
529	if(spot)
530		dtio->io_list_iter = spot->next;
531	else if(dtio->io_list)
532		dtio->io_list_iter = dtio->io_list->next;
533
534	/* scan from spot to end-of-io_list */
535	item = spot;
536	while(item) {
537		if(dtio_find_in_queue(dtio, item->queue))
538			return 1;
539		item = item->next;
540	}
541	/* scan starting at the start-of-list (to wrap around the end) */
542	item = dtio->io_list;
543	while(item) {
544		if(dtio_find_in_queue(dtio, item->queue))
545			return 1;
546		item = item->next;
547	}
548	return 0;
549}
550
551/** callback for the dnstap reconnect, to start reconnecting to output */
552void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
553	short ATTR_UNUSED(bits), void* arg)
554{
555	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
556	dtio->reconnect_is_added = 0;
557	verbose(VERB_ALGO, "dnstap io: reconnect timer");
558
559	dtio_open_output(dtio);
560	if(dtio->event) {
561		if(!dtio_add_output_event_write(dtio))
562			return;
563		/* nothing wrong so far, wait on the output event */
564		return;
565	}
566	/* exponential backoff and retry on timer */
567	dtio_reconnect_enable(dtio);
568}
569
570/** attempt to reconnect to the output, after a timeout */
571static void dtio_reconnect_enable(struct dt_io_thread* dtio)
572{
573	struct timeval tv;
574	int msec;
575	if(dtio->want_to_exit) return;
576	if(dtio->reconnect_is_added)
577		return; /* already done */
578
579	/* exponential backoff, store the value for next timeout */
580	msec = dtio->reconnect_timeout;
581	if(msec == 0) {
582		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
583	} else {
584		dtio->reconnect_timeout = msec*2;
585		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
586			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
587	}
588	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
589		msec);
590
591	/* setup wait timer */
592	memset(&tv, 0, sizeof(tv));
593	tv.tv_sec = msec/1000;
594	tv.tv_usec = (msec%1000)*1000;
595	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
596		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
597		log_err("dnstap io: could not reconnect ev timer add");
598		return;
599	}
600	dtio->reconnect_is_added = 1;
601}
602
603/** remove dtio reconnect timer */
604static void dtio_reconnect_del(struct dt_io_thread* dtio)
605{
606	if(!dtio->reconnect_is_added)
607		return;
608	ub_timer_del(dtio->reconnect_timer);
609	dtio->reconnect_is_added = 0;
610}
611
612/** clear the reconnect exponential backoff timer.
613 * We have successfully connected so we can try again with short timeouts. */
614static void dtio_reconnect_clear(struct dt_io_thread* dtio)
615{
616	dtio->reconnect_timeout = 0;
617	dtio_reconnect_del(dtio);
618}
619
620/** reconnect slowly, because we already know we have to wait for a bit */
621static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
622{
623	dtio_reconnect_del(dtio);
624	dtio->reconnect_timeout = msec;
625	dtio_reconnect_enable(dtio);
626}
627
628/** delete the current message in the dtio, and reset counters */
629static void dtio_cur_msg_free(struct dt_io_thread* dtio)
630{
631	free(dtio->cur_msg);
632	dtio->cur_msg = NULL;
633	dtio->cur_msg_len = 0;
634	dtio->cur_msg_done = 0;
635	dtio->cur_msg_len_done = 0;
636}
637
638/** delete the buffer and counters used to read frame */
639static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
640{
641	if(rb->buf) {
642		free(rb->buf);
643		rb->buf = NULL;
644	}
645	rb->buf_count = 0;
646	rb->buf_cap = 0;
647	rb->frame_len = 0;
648	rb->frame_len_done = 0;
649	rb->control_frame = 0;
650}
651
652/** del the output file descriptor event for listening */
653static void dtio_del_output_event(struct dt_io_thread* dtio)
654{
655	if(!dtio->event_added)
656		return;
657	ub_event_del(dtio->event);
658	dtio->event_added = 0;
659	dtio->event_added_is_write = 0;
660}
661
662/** close dtio socket and set it to -1 */
663static void dtio_close_fd(struct dt_io_thread* dtio)
664{
665	sock_close(dtio->fd);
666	dtio->fd = -1;
667}
668
669/** close and stop the output file descriptor event */
670static void dtio_close_output(struct dt_io_thread* dtio)
671{
672	if(!dtio->event)
673		return;
674	ub_event_free(dtio->event);
675	dtio->event = NULL;
676	if(dtio->ssl) {
677#ifdef HAVE_SSL
678		SSL_shutdown(dtio->ssl);
679		SSL_free(dtio->ssl);
680		dtio->ssl = NULL;
681#endif
682	}
683	dtio_close_fd(dtio);
684
685	/* if there is a (partial) message, discard it
686	 * we cannot send (the remainder of) it, and a new
687	 * connection needs to start with a control frame. */
688	if(dtio->cur_msg) {
689		dtio_cur_msg_free(dtio);
690	}
691
692	dtio->ready_frame_sent = 0;
693	dtio->accept_frame_received = 0;
694	dtio_read_frame_free(&dtio->read_frame);
695
696	dtio_reconnect_enable(dtio);
697}
698
699/** check for pending nonblocking connect errors,
700 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
701static int dtio_check_nb_connect(struct dt_io_thread* dtio)
702{
703	int error = 0;
704	socklen_t len = (socklen_t)sizeof(error);
705	if(!dtio->check_nb_connect)
706		return 1; /* everything okay */
707	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
708		&len) < 0) {
709#ifndef USE_WINSOCK
710		error = errno; /* on solaris errno is error */
711#else
712		error = WSAGetLastError();
713#endif
714	}
715#ifndef USE_WINSOCK
716#if defined(EINPROGRESS) && defined(EWOULDBLOCK)
717	if(error == EINPROGRESS || error == EWOULDBLOCK)
718		return 0; /* try again later */
719#endif
720#else
721	if(error == WSAEINPROGRESS) {
722		return 0; /* try again later */
723	} else if(error == WSAEWOULDBLOCK) {
724		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
725			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
726		return 0; /* try again later */
727	}
728#endif
729	if(error != 0) {
730		char* to = dtio->socket_path;
731		if(!to) to = dtio->ip_str;
732		if(!to) to = "";
733		log_err("dnstap io: failed to connect to \"%s\": %s",
734			to, sock_strerror(error));
735		return -1; /* error, close it */
736	}
737
738	if(dtio->ip_str)
739		verbose(VERB_DETAIL, "dnstap io: connected to %s",
740			dtio->ip_str);
741	else if(dtio->socket_path)
742		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
743			dtio->socket_path);
744	dtio_reconnect_clear(dtio);
745	dtio->check_nb_connect = 0;
746	return 1; /* everything okay */
747}
748
749#ifdef HAVE_SSL
750/** write to ssl output
751 * returns number of bytes written, 0 if nothing happened,
752 * try again later, or -1 if the channel is to be closed. */
753static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
754	size_t len)
755{
756	int r;
757	ERR_clear_error();
758	r = SSL_write(dtio->ssl, buf, len);
759	if(r <= 0) {
760		int want = SSL_get_error(dtio->ssl, r);
761		if(want == SSL_ERROR_ZERO_RETURN) {
762			/* closed */
763			return -1;
764		} else if(want == SSL_ERROR_WANT_READ) {
765			/* we want a brief read event */
766			dtio_enable_brief_read(dtio);
767			return 0;
768		} else if(want == SSL_ERROR_WANT_WRITE) {
769			/* write again later */
770			return 0;
771		} else if(want == SSL_ERROR_SYSCALL) {
772#ifdef EPIPE
773			if(errno == EPIPE && verbosity < 2)
774				return -1; /* silence 'broken pipe' */
775#endif
776#ifdef ECONNRESET
777			if(errno == ECONNRESET && verbosity < 2)
778				return -1; /* silence reset by peer */
779#endif
780			if(errno != 0) {
781				log_err("dnstap io, SSL_write syscall: %s",
782					strerror(errno));
783			}
784			return -1;
785		}
786		log_crypto_err("dnstap io, could not SSL_write");
787		return -1;
788	}
789	return r;
790}
791#endif /* HAVE_SSL */
792
793/** write buffer to output.
794 * returns number of bytes written, 0 if nothing happened,
795 * try again later, or -1 if the channel is to be closed. */
796static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
797	size_t len)
798{
799	ssize_t ret;
800	if(dtio->fd == -1)
801		return -1;
802#ifdef HAVE_SSL
803	if(dtio->ssl)
804		return dtio_write_ssl(dtio, buf, len);
805#endif
806	ret = send(dtio->fd, (void*)buf, len, 0);
807	if(ret == -1) {
808#ifndef USE_WINSOCK
809		if(errno == EINTR || errno == EAGAIN)
810			return 0;
811#else
812		if(WSAGetLastError() == WSAEINPROGRESS)
813			return 0;
814		if(WSAGetLastError() == WSAEWOULDBLOCK) {
815			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
816				dtio->stop_flush_event:dtio->event),
817				UB_EV_WRITE);
818			return 0;
819		}
820#endif
821		log_err("dnstap io: failed send: %s", sock_strerror(errno));
822		return -1;
823	}
824	return ret;
825}
826
827#ifdef HAVE_WRITEV
828/** write with writev, len and message, in one write, if possible.
829 * return true if message is done, false if incomplete */
830static int dtio_write_with_writev(struct dt_io_thread* dtio)
831{
832	uint32_t sendlen = htonl(dtio->cur_msg_len);
833	struct iovec iov[2];
834	ssize_t r;
835	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
836	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
837	iov[1].iov_base = dtio->cur_msg;
838	iov[1].iov_len = dtio->cur_msg_len;
839	log_assert(iov[0].iov_len > 0);
840	r = writev(dtio->fd, iov, 2);
841	if(r == -1) {
842#ifndef USE_WINSOCK
843		if(errno == EINTR || errno == EAGAIN)
844			return 0;
845#else
846		if(WSAGetLastError() == WSAEINPROGRESS)
847			return 0;
848		if(WSAGetLastError() == WSAEWOULDBLOCK) {
849			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
850				dtio->stop_flush_event:dtio->event),
851				UB_EV_WRITE);
852			return 0;
853		}
854#endif
855		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
856		/* close the channel */
857		dtio_del_output_event(dtio);
858		dtio_close_output(dtio);
859		return 0;
860	}
861	/* written r bytes */
862	dtio->cur_msg_len_done += r;
863	if(dtio->cur_msg_len_done < 4)
864		return 0;
865	if(dtio->cur_msg_len_done > 4) {
866		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
867		dtio->cur_msg_len_done = 4;
868	}
869	if(dtio->cur_msg_done < dtio->cur_msg_len)
870		return 0;
871	return 1;
872}
873#endif /* HAVE_WRITEV */
874
875/** write more of the length, preceding the data frame.
876 * return true if message is done, false if incomplete. */
877static int dtio_write_more_of_len(struct dt_io_thread* dtio)
878{
879	uint32_t sendlen;
880	int r;
881	if(dtio->cur_msg_len_done >= 4)
882		return 1;
883#ifdef HAVE_WRITEV
884	if(!dtio->ssl) {
885		/* we try writev for everything.*/
886		return dtio_write_with_writev(dtio);
887	}
888#endif /* HAVE_WRITEV */
889	sendlen = htonl(dtio->cur_msg_len);
890	r = dtio_write_buf(dtio,
891		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
892		sizeof(sendlen)-dtio->cur_msg_len_done);
893	if(r == -1) {
894		/* close the channel */
895		dtio_del_output_event(dtio);
896		dtio_close_output(dtio);
897		return 0;
898	} else if(r == 0) {
899		/* try again later */
900		return 0;
901	}
902	dtio->cur_msg_len_done += r;
903	if(dtio->cur_msg_len_done < 4)
904		return 0;
905	return 1;
906}
907
908/** write more of the data frame.
909 * return true if message is done, false if incomplete. */
910static int dtio_write_more_of_data(struct dt_io_thread* dtio)
911{
912	int r;
913	if(dtio->cur_msg_done >= dtio->cur_msg_len)
914		return 1;
915	r = dtio_write_buf(dtio,
916		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
917		dtio->cur_msg_len - dtio->cur_msg_done);
918	if(r == -1) {
919		/* close the channel */
920		dtio_del_output_event(dtio);
921		dtio_close_output(dtio);
922		return 0;
923	} else if(r == 0) {
924		/* try again later */
925		return 0;
926	}
927	dtio->cur_msg_done += r;
928	if(dtio->cur_msg_done < dtio->cur_msg_len)
929		return 0;
930	return 1;
931}
932
933/** write more of the current messsage. false if incomplete, true if
934 * the message is done */
935static int dtio_write_more(struct dt_io_thread* dtio)
936{
937	if(dtio->cur_msg_len_done < 4) {
938		if(!dtio_write_more_of_len(dtio))
939			return 0;
940	}
941	if(dtio->cur_msg_done < dtio->cur_msg_len) {
942		if(!dtio_write_more_of_data(dtio))
943			return 0;
944	}
945	return 1;
946}
947
948/** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
949 * -1: continue, >0: number of bytes read into buffer */
950static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
951	ssize_t r;
952	r = recv(dtio->fd, (void*)buf, len, 0);
953	if(r == -1) {
954		char* to = dtio->socket_path;
955		if(!to) to = dtio->ip_str;
956		if(!to) to = "";
957#ifndef USE_WINSOCK
958		if(errno == EINTR || errno == EAGAIN)
959			return -1; /* try later */
960#else
961		if(WSAGetLastError() == WSAEINPROGRESS) {
962			return -1; /* try later */
963		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
964			ub_winsock_tcp_wouldblock(
965				(dtio->stop_flush_event?
966				dtio->stop_flush_event:dtio->event),
967				UB_EV_READ);
968			return -1; /* try later */
969		}
970#endif
971		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
972			verbosity < 4)
973			return 0; /* no log retries on low verbosity */
974		log_err("dnstap io: output closed, recv %s: %s", to,
975			strerror(errno));
976		/* and close below */
977		return 0;
978	}
979	if(r == 0) {
980		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
981			verbosity < 4)
982			return 0; /* no log retries on low verbosity */
983		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
984		/* and close below */
985		return 0;
986	}
987	/* something was received */
988	return r;
989}
990
991#ifdef HAVE_SSL
992/** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
993 * -1: continue, >0: number of bytes read into buffer */
994static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
995{
996	int r;
997	ERR_clear_error();
998	r = SSL_read(dtio->ssl, buf, len);
999	if(r <= 0) {
1000		int want = SSL_get_error(dtio->ssl, r);
1001		if(want == SSL_ERROR_ZERO_RETURN) {
1002			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1003				verbosity < 4)
1004				return 0; /* no log retries on low verbosity */
1005			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1006				"other side");
1007			return 0;
1008		} else if(want == SSL_ERROR_WANT_READ) {
1009			/* continue later */
1010			return -1;
1011		} else if(want == SSL_ERROR_WANT_WRITE) {
1012			(void)dtio_enable_brief_write(dtio);
1013			return -1;
1014		} else if(want == SSL_ERROR_SYSCALL) {
1015#ifdef ECONNRESET
1016			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1017				errno == ECONNRESET && verbosity < 4)
1018				return 0; /* silence reset by peer */
1019#endif
1020			if(errno != 0)
1021				log_err("SSL_read syscall: %s",
1022					strerror(errno));
1023			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1024				"other side");
1025			return 0;
1026		}
1027		log_crypto_err("could not SSL_read");
1028		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029				"other side");
1030		return 0;
1031	}
1032	return r;
1033}
1034#endif /* HAVE_SSL */
1035
1036/** check if the output fd has been closed,
1037 * it returns false if the stream is closed. */
1038static int dtio_check_close(struct dt_io_thread* dtio)
1039{
1040	/* we don't want to read any packets, but if there are we can
1041	 * discard the input (ignore it).  Ignore of unknown (control)
1042	 * packets is okay for the framestream protocol.  And also, the
1043	 * read call can return that the stream has been closed by the
1044	 * other side. */
1045	uint8_t buf[1024];
1046	int r = -1;
1047
1048
1049	if(dtio->fd == -1) return 0;
1050
1051	while(r != 0) {
1052		/* not interested in buffer content, overwrite */
1053		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1054		if(r == -1)
1055			return 1;
1056	}
1057	/* the other end has been closed */
1058	/* close the channel */
1059	dtio_del_output_event(dtio);
1060	dtio_close_output(dtio);
1061	return 0;
1062}
1063
1064/** Read accept frame. Returns -1: continue reading, 0: closed,
1065 * 1: valid accept received. */
1066static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1067{
1068	int r;
1069	size_t read_frame_done;
1070	while(dtio->read_frame.frame_len_done < 4) {
1071#ifdef HAVE_SSL
1072		if(dtio->ssl) {
1073			r = ssl_read_bytes(dtio,
1074				(uint8_t*)&dtio->read_frame.frame_len+
1075				dtio->read_frame.frame_len_done,
1076				4-dtio->read_frame.frame_len_done);
1077		} else {
1078#endif
1079			r = receive_bytes(dtio,
1080				(uint8_t*)&dtio->read_frame.frame_len+
1081				dtio->read_frame.frame_len_done,
1082				4-dtio->read_frame.frame_len_done);
1083#ifdef HAVE_SSL
1084		}
1085#endif
1086		if(r == -1)
1087			return -1; /* continue reading */
1088		if(r == 0) {
1089			 /* connection closed */
1090			goto close_connection;
1091		}
1092		dtio->read_frame.frame_len_done += r;
1093		if(dtio->read_frame.frame_len_done < 4)
1094			return -1; /* continue reading */
1095
1096		if(dtio->read_frame.frame_len == 0) {
1097			dtio->read_frame.frame_len_done = 0;
1098			dtio->read_frame.control_frame = 1;
1099			continue;
1100		}
1101		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1102		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1103			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1104				"length of %d bytes, closing connection",
1105				DTIO_RECV_FRAME_MAX_LEN);
1106			goto close_connection;
1107		}
1108		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1109		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1110		if(!dtio->read_frame.buf) {
1111			log_err("dnstap io: out of memory (creating read "
1112				"buffer)");
1113			goto close_connection;
1114		}
1115	}
1116	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1117#ifdef HAVE_SSL
1118		if(dtio->ssl) {
1119			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1120				dtio->read_frame.buf_count,
1121				dtio->read_frame.buf_cap-
1122				dtio->read_frame.buf_count);
1123		} else {
1124#endif
1125			r = receive_bytes(dtio, dtio->read_frame.buf+
1126				dtio->read_frame.buf_count,
1127				dtio->read_frame.buf_cap-
1128				dtio->read_frame.buf_count);
1129#ifdef HAVE_SSL
1130		}
1131#endif
1132		if(r == -1)
1133			return -1; /* continue reading */
1134		if(r == 0) {
1135			 /* connection closed */
1136			goto close_connection;
1137		}
1138		dtio->read_frame.buf_count += r;
1139		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1140			return -1; /* continue reading */
1141	}
1142
1143	/* Complete frame received, check if this is a valid ACCEPT control
1144	 * frame. */
1145	if(dtio->read_frame.frame_len < 4) {
1146		verbose(VERB_OPS, "dnstap: invalid data received");
1147		goto close_connection;
1148	}
1149	if(sldns_read_uint32(dtio->read_frame.buf) !=
1150		FSTRM_CONTROL_FRAME_ACCEPT) {
1151		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1152			"ignored");
1153		dtio->ready_frame_sent = 0;
1154		dtio->accept_frame_received = 0;
1155		dtio_read_frame_free(&dtio->read_frame);
1156		return -1;
1157	}
1158	read_frame_done = 4; /* control frame type */
1159
1160	/* Iterate over control fields, ignore unknown types.
1161	 * Need to be able to read at least 8 bytes (control field type +
1162	 * length). */
1163	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1164		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1165			read_frame_done);
1166		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1167			read_frame_done + 4);
1168		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1169			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1170				read_frame_done+8+len <=
1171				dtio->read_frame.frame_len &&
1172				memcmp(dtio->read_frame.buf + read_frame_done +
1173					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1174				if(!dtio_control_start_send(dtio)) {
1175					verbose(VERB_OPS, "dnstap io: out of "
1176					 "memory while sending START frame");
1177					goto close_connection;
1178				}
1179				dtio->accept_frame_received = 1;
1180				if(!dtio_add_output_event_write(dtio))
1181					goto close_connection;
1182				return 1;
1183			} else {
1184				/* unknow content type */
1185				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1186					"contains unknown content type, "
1187					"closing connection");
1188				goto close_connection;
1189			}
1190		}
1191		/* unknown option, try next */
1192		read_frame_done += 8+len;
1193	}
1194
1195
1196close_connection:
1197	dtio_del_output_event(dtio);
1198	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1199	dtio_close_output(dtio);
1200	return 0;
1201}
1202
1203/** add the output file descriptor event for listening, read only */
1204static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1205{
1206	if(!dtio->event)
1207		return 0;
1208	if(dtio->event_added && !dtio->event_added_is_write)
1209		return 1;
1210	/* we have to (re-)register the event */
1211	if(dtio->event_added)
1212		ub_event_del(dtio->event);
1213	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1214	if(ub_event_add(dtio->event, NULL) != 0) {
1215		log_err("dnstap io: out of memory (adding event)");
1216		dtio->event_added = 0;
1217		dtio->event_added_is_write = 0;
1218		/* close output and start reattempts to open it */
1219		dtio_close_output(dtio);
1220		return 0;
1221	}
1222	dtio->event_added = 1;
1223	dtio->event_added_is_write = 0;
1224	return 1;
1225}
1226
1227/** add the output file descriptor event for listening, read and write */
1228static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1229{
1230	if(!dtio->event)
1231		return 0;
1232	if(dtio->event_added && dtio->event_added_is_write)
1233		return 1;
1234	/* we have to (re-)register the event */
1235	if(dtio->event_added)
1236		ub_event_del(dtio->event);
1237	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1238	if(ub_event_add(dtio->event, NULL) != 0) {
1239		log_err("dnstap io: out of memory (adding event)");
1240		dtio->event_added = 0;
1241		dtio->event_added_is_write = 0;
1242		/* close output and start reattempts to open it */
1243		dtio_close_output(dtio);
1244		return 0;
1245	}
1246	dtio->event_added = 1;
1247	dtio->event_added_is_write = 1;
1248	return 1;
1249}
1250
1251/** put the dtio thread to sleep */
1252static void dtio_sleep(struct dt_io_thread* dtio)
1253{
1254	/* unregister the event polling for write, because there is
1255	 * nothing to be written */
1256	(void)dtio_add_output_event_read(dtio);
1257}
1258
1259#ifdef HAVE_SSL
1260/** enable the brief read condition */
1261static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1262{
1263	dtio->ssl_brief_read = 1;
1264	if(dtio->stop_flush_event) {
1265		ub_event_del(dtio->stop_flush_event);
1266		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1267		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1268			log_err("dnstap io, stop flush, could not ub_event_add");
1269			return 0;
1270		}
1271		return 1;
1272	}
1273	return dtio_add_output_event_read(dtio);
1274}
1275#endif /* HAVE_SSL */
1276
1277#ifdef HAVE_SSL
1278/** disable the brief read condition */
1279static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1280{
1281	dtio->ssl_brief_read = 0;
1282	if(dtio->stop_flush_event) {
1283		ub_event_del(dtio->stop_flush_event);
1284		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1285		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1286			log_err("dnstap io, stop flush, could not ub_event_add");
1287			return 0;
1288		}
1289		return 1;
1290	}
1291	return dtio_add_output_event_write(dtio);
1292}
1293#endif /* HAVE_SSL */
1294
1295#ifdef HAVE_SSL
1296/** enable the brief write condition */
1297static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1298{
1299	dtio->ssl_brief_write = 1;
1300	return dtio_add_output_event_write(dtio);
1301}
1302#endif /* HAVE_SSL */
1303
1304#ifdef HAVE_SSL
1305/** disable the brief write condition */
1306static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1307{
1308	dtio->ssl_brief_write = 0;
1309	return dtio_add_output_event_read(dtio);
1310}
1311#endif /* HAVE_SSL */
1312
1313#ifdef HAVE_SSL
1314/** check peer verification after ssl handshake connection, false if closed*/
1315static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1316{
1317	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1318		/* verification */
1319		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1320			X509* x = SSL_get_peer_certificate(dtio->ssl);
1321			if(!x) {
1322				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1323					"connection failed no certificate",
1324					dtio->ip_str);
1325				return 0;
1326			}
1327			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1328				x);
1329#ifdef HAVE_SSL_GET0_PEERNAME
1330			if(SSL_get0_peername(dtio->ssl)) {
1331				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1332					"connection to %s authenticated",
1333					dtio->ip_str,
1334					SSL_get0_peername(dtio->ssl));
1335			} else {
1336#endif
1337				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1338					"connection authenticated",
1339					dtio->ip_str);
1340#ifdef HAVE_SSL_GET0_PEERNAME
1341			}
1342#endif
1343			X509_free(x);
1344		} else {
1345			X509* x = SSL_get_peer_certificate(dtio->ssl);
1346			if(x) {
1347				log_cert(VERB_ALGO, "dnstap io, peer "
1348					"certificate", x);
1349				X509_free(x);
1350			}
1351			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1352				"failed: failed to authenticate",
1353				dtio->ip_str);
1354			return 0;
1355		}
1356	} else {
1357		/* unauthenticated, the verify peer flag was not set
1358		 * in ssl when the ssl object was created from ssl_ctx */
1359		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1360			dtio->ip_str);
1361	}
1362	return 1;
1363}
1364#endif /* HAVE_SSL */
1365
1366#ifdef HAVE_SSL
1367/** perform ssl handshake, returns 1 if okay, 0 to stop */
1368static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1369	struct stop_flush_info* info)
1370{
1371	int r;
1372	if(dtio->ssl_brief_read) {
1373		/* assume the brief read condition is satisfied,
1374		 * if we need more or again, we can set it again */
1375		if(!dtio_disable_brief_read(dtio)) {
1376			if(info) dtio_stop_flush_exit(info);
1377			return 0;
1378		}
1379	}
1380	if(dtio->ssl_handshake_done)
1381		return 1;
1382
1383	ERR_clear_error();
1384	r = SSL_do_handshake(dtio->ssl);
1385	if(r != 1) {
1386		int want = SSL_get_error(dtio->ssl, r);
1387		if(want == SSL_ERROR_WANT_READ) {
1388			/* we want to read on the connection */
1389			if(!dtio_enable_brief_read(dtio)) {
1390				if(info) dtio_stop_flush_exit(info);
1391				return 0;
1392			}
1393			return 0;
1394		} else if(want == SSL_ERROR_WANT_WRITE) {
1395			/* we want to write on the connection */
1396			return 0;
1397		} else if(r == 0) {
1398			/* closed */
1399			if(info) dtio_stop_flush_exit(info);
1400			dtio_del_output_event(dtio);
1401			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1402			dtio_close_output(dtio);
1403			return 0;
1404		} else if(want == SSL_ERROR_SYSCALL) {
1405			/* SYSCALL and errno==0 means closed uncleanly */
1406			int silent = 0;
1407#ifdef EPIPE
1408			if(errno == EPIPE && verbosity < 2)
1409				silent = 1; /* silence 'broken pipe' */
1410#endif
1411#ifdef ECONNRESET
1412			if(errno == ECONNRESET && verbosity < 2)
1413				silent = 1; /* silence reset by peer */
1414#endif
1415			if(errno == 0)
1416				silent = 1;
1417			if(!silent)
1418				log_err("dnstap io, SSL_handshake syscall: %s",
1419					strerror(errno));
1420			/* closed */
1421			if(info) dtio_stop_flush_exit(info);
1422			dtio_del_output_event(dtio);
1423			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1424			dtio_close_output(dtio);
1425			return 0;
1426		} else {
1427			unsigned long err = ERR_get_error();
1428			if(!squelch_err_ssl_handshake(err)) {
1429				log_crypto_err_code("dnstap io, ssl handshake failed",
1430					err);
1431				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1432					"from %s", dtio->ip_str);
1433			}
1434			/* closed */
1435			if(info) dtio_stop_flush_exit(info);
1436			dtio_del_output_event(dtio);
1437			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1438			dtio_close_output(dtio);
1439			return 0;
1440		}
1441
1442	}
1443	/* check peer verification */
1444	dtio->ssl_handshake_done = 1;
1445
1446	if(!dtio_ssl_check_peer(dtio)) {
1447		/* closed */
1448		if(info) dtio_stop_flush_exit(info);
1449		dtio_del_output_event(dtio);
1450		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1451		dtio_close_output(dtio);
1452		return 0;
1453	}
1454	return 1;
1455}
1456#endif /* HAVE_SSL */
1457
1458/** callback for the dnstap events, to write to the output */
1459void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1460{
1461	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1462	int i;
1463
1464	if(dtio->check_nb_connect) {
1465		int connect_err = dtio_check_nb_connect(dtio);
1466		if(connect_err == -1) {
1467			/* close the channel */
1468			dtio_del_output_event(dtio);
1469			dtio_close_output(dtio);
1470			return;
1471		} else if(connect_err == 0) {
1472			/* try again later */
1473			return;
1474		}
1475		/* nonblocking connect check passed, continue */
1476	}
1477
1478#ifdef HAVE_SSL
1479	if(dtio->ssl &&
1480		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1481		if(!dtio_ssl_handshake(dtio, NULL))
1482			return;
1483	}
1484#endif
1485
1486	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1487		if(dtio->ssl_brief_write)
1488			(void)dtio_disable_brief_write(dtio);
1489		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1490			if(dtio_read_accept_frame(dtio) <= 0)
1491				return;
1492		} else if(!dtio_check_close(dtio))
1493			return;
1494	}
1495
1496	/* loop to process a number of messages.  This improves throughput,
1497	 * because selecting on write-event if not needed for busy messages
1498	 * (dnstap log) generation and if they need to all be written back.
1499	 * The write event is usually not blocked up.  But not forever,
1500	 * because the event loop needs to stay responsive for other events.
1501	 * If there are no (more) messages, or if the output buffers get
1502	 * full, it returns out of the loop. */
1503	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1504		/* see if there are messages that need writing */
1505		if(!dtio->cur_msg) {
1506			if(!dtio_find_msg(dtio)) {
1507				if(i == 0) {
1508					/* no messages on the first iteration,
1509					 * the queues are all empty */
1510					dtio_sleep(dtio);
1511				}
1512				return; /* nothing to do */
1513			}
1514		}
1515
1516		/* write it */
1517		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1518			if(!dtio_write_more(dtio))
1519				return;
1520		}
1521
1522		/* done with the current message */
1523		dtio_cur_msg_free(dtio);
1524
1525		/* If this is a bidirectional stream the first message will be
1526		 * the READY control frame. We can only continue writing after
1527		 * receiving an ACCEPT control frame. */
1528		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1529			dtio->ready_frame_sent = 1;
1530			(void)dtio_add_output_event_read(dtio);
1531			break;
1532		}
1533	}
1534}
1535
1536/** callback for the dnstap commandpipe, to stop the dnstap IO */
1537void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1538{
1539	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1540	uint8_t cmd;
1541	ssize_t r;
1542	if(dtio->want_to_exit)
1543		return;
1544	r = read(fd, &cmd, sizeof(cmd));
1545	if(r == -1) {
1546#ifndef USE_WINSOCK
1547		if(errno == EINTR || errno == EAGAIN)
1548			return; /* ignore this */
1549#else
1550		if(WSAGetLastError() == WSAEINPROGRESS)
1551			return;
1552		if(WSAGetLastError() == WSAEWOULDBLOCK)
1553			return;
1554#endif
1555		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1556		/* and then fall through to quit the thread */
1557	} else if(r == 0) {
1558		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1559	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1560		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1561	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1562		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1563
1564		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1565			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1566				"waiting for ACCEPT control frame");
1567			return;
1568		}
1569
1570		/* reregister event */
1571		if(!dtio_add_output_event_write(dtio))
1572			return;
1573		return;
1574	} else if(r == 1) {
1575		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1576	}
1577	dtio->want_to_exit = 1;
1578	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1579		!= 0) {
1580		log_err("dnstap io: could not loopexit");
1581	}
1582}
1583
1584#ifndef THREADS_DISABLED
1585/** setup the event base for the dnstap io thread */
1586static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1587	struct timeval* now)
1588{
1589	memset(now, 0, sizeof(*now));
1590	dtio->event_base = ub_default_event_base(0, secs, now);
1591	if(!dtio->event_base) {
1592		fatal_exit("dnstap io: could not create event_base");
1593	}
1594}
1595#endif /* THREADS_DISABLED */
1596
1597/** setup the cmd event for dnstap io */
1598static void dtio_setup_cmd(struct dt_io_thread* dtio)
1599{
1600	struct ub_event* cmdev;
1601	fd_set_nonblock(dtio->commandpipe[0]);
1602	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1603		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1604	if(!cmdev) {
1605		fatal_exit("dnstap io: out of memory");
1606	}
1607	dtio->command_event = cmdev;
1608	if(ub_event_add(cmdev, NULL) != 0) {
1609		fatal_exit("dnstap io: out of memory (adding event)");
1610	}
1611}
1612
1613/** setup the reconnect event for dnstap io */
1614static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1615{
1616	dtio_reconnect_clear(dtio);
1617	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1618		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1619	if(!dtio->reconnect_timer) {
1620		fatal_exit("dnstap io: out of memory");
1621	}
1622}
1623
1624/**
1625 * structure to keep track of information during stop flush
1626 */
1627struct stop_flush_info {
1628	/** the event base during stop flush */
1629	struct ub_event_base* base;
1630	/** did we already want to exit this stop-flush event base */
1631	int want_to_exit_flush;
1632	/** has the timer fired */
1633	int timer_done;
1634	/** the dtio */
1635	struct dt_io_thread* dtio;
1636	/** the stop control frame */
1637	void* stop_frame;
1638	/** length of the stop frame */
1639	size_t stop_frame_len;
1640	/** how much we have done of the stop frame */
1641	size_t stop_frame_done;
1642};
1643
1644/** exit the stop flush base */
1645static void dtio_stop_flush_exit(struct stop_flush_info* info)
1646{
1647	if(info->want_to_exit_flush)
1648		return;
1649	info->want_to_exit_flush = 1;
1650	if(ub_event_base_loopexit(info->base) != 0) {
1651		log_err("dnstap io: could not loopexit");
1652	}
1653}
1654
1655/** send the stop control,
1656 * return true if completed the frame. */
1657static int dtio_control_stop_send(struct stop_flush_info* info)
1658{
1659	struct dt_io_thread* dtio = info->dtio;
1660	int r;
1661	if(info->stop_frame_done >= info->stop_frame_len)
1662		return 1;
1663	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1664		info->stop_frame_done, info->stop_frame_len -
1665		info->stop_frame_done);
1666	if(r == -1) {
1667		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1668		dtio_stop_flush_exit(info);
1669		return 0;
1670	}
1671	if(r == 0) {
1672		/* try again later, or timeout */
1673		return 0;
1674	}
1675	info->stop_frame_done += r;
1676	if(info->stop_frame_done < info->stop_frame_len)
1677		return 0; /* not done yet */
1678	return 1;
1679}
1680
1681void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1682	void* arg)
1683{
1684	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1685	if(info->want_to_exit_flush)
1686		return;
1687	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1688	info->timer_done = 1;
1689	dtio_stop_flush_exit(info);
1690}
1691
1692void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1693{
1694	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1695	struct dt_io_thread* dtio = info->dtio;
1696	if(info->want_to_exit_flush)
1697		return;
1698	if(dtio->check_nb_connect) {
1699		/* we don't start the stop_flush if connect still
1700		 * in progress, but the check code is here, just in case */
1701		int connect_err = dtio_check_nb_connect(dtio);
1702		if(connect_err == -1) {
1703			/* close the channel, exit the stop flush */
1704			dtio_stop_flush_exit(info);
1705			dtio_del_output_event(dtio);
1706			dtio_close_output(dtio);
1707			return;
1708		} else if(connect_err == 0) {
1709			/* try again later */
1710			return;
1711		}
1712		/* nonblocking connect check passed, continue */
1713	}
1714#ifdef HAVE_SSL
1715	if(dtio->ssl &&
1716		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1717		if(!dtio_ssl_handshake(dtio, info))
1718			return;
1719	}
1720#endif
1721
1722	if((bits&UB_EV_READ)) {
1723		if(!dtio_check_close(dtio)) {
1724			if(dtio->fd == -1) {
1725				verbose(VERB_ALGO, "dnstap io: "
1726					"stop flush: output closed");
1727				dtio_stop_flush_exit(info);
1728			}
1729			return;
1730		}
1731	}
1732	/* write remainder of last frame */
1733	if(dtio->cur_msg) {
1734		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1735			if(!dtio_write_more(dtio)) {
1736				if(dtio->fd == -1) {
1737					verbose(VERB_ALGO, "dnstap io: "
1738						"stop flush: output closed");
1739					dtio_stop_flush_exit(info);
1740				}
1741				return;
1742			}
1743		}
1744		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1745			"last frame");
1746		dtio_cur_msg_free(dtio);
1747	}
1748	/* write stop frame */
1749	if(info->stop_frame_done < info->stop_frame_len) {
1750		if(!dtio_control_stop_send(info))
1751			return;
1752		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1753			"stop control frame");
1754	}
1755	/* when last frame and stop frame are sent, exit */
1756	dtio_stop_flush_exit(info);
1757}
1758
1759/** flush at end, last packet and stop control */
1760static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1761{
1762	/* briefly attempt to flush the previous packet to the output,
1763	 * this could be a partial packet, or even the start control frame */
1764	time_t secs = 0;
1765	struct timeval now;
1766	struct stop_flush_info info;
1767	struct timeval tv;
1768	struct ub_event* timer, *stopev;
1769
1770	if(dtio->fd == -1 || dtio->check_nb_connect) {
1771		/* no connection or we have just connected, so nothing is
1772		 * sent yet, so nothing to stop or flush */
1773		return;
1774	}
1775	if(dtio->ssl && !dtio->ssl_handshake_done) {
1776		/* no SSL connection has been established yet */
1777		return;
1778	}
1779
1780	memset(&info, 0, sizeof(info));
1781	memset(&now, 0, sizeof(now));
1782	info.dtio = dtio;
1783	info.base = ub_default_event_base(0, &secs, &now);
1784	if(!info.base) {
1785		log_err("dnstap io: malloc failure");
1786		return;
1787	}
1788	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1789		&dtio_stop_timer_cb, &info);
1790	if(!timer) {
1791		log_err("dnstap io: malloc failure");
1792		ub_event_base_free(info.base);
1793		return;
1794	}
1795	memset(&tv, 0, sizeof(tv));
1796	tv.tv_sec = 2;
1797	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1798		&tv) != 0) {
1799		log_err("dnstap io: cannot event_timer_add");
1800		ub_event_free(timer);
1801		ub_event_base_free(info.base);
1802		return;
1803	}
1804	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1805		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1806	if(!stopev) {
1807		log_err("dnstap io: malloc failure");
1808		ub_timer_del(timer);
1809		ub_event_free(timer);
1810		ub_event_base_free(info.base);
1811		return;
1812	}
1813	if(ub_event_add(stopev, NULL) != 0) {
1814		log_err("dnstap io: cannot event_add");
1815		ub_event_free(stopev);
1816		ub_timer_del(timer);
1817		ub_event_free(timer);
1818		ub_event_base_free(info.base);
1819		return;
1820	}
1821	info.stop_frame = fstrm_create_control_frame_stop(
1822		&info.stop_frame_len);
1823	if(!info.stop_frame) {
1824		log_err("dnstap io: malloc failure");
1825		ub_event_del(stopev);
1826		ub_event_free(stopev);
1827		ub_timer_del(timer);
1828		ub_event_free(timer);
1829		ub_event_base_free(info.base);
1830		return;
1831	}
1832	dtio->stop_flush_event = stopev;
1833
1834	/* wait briefly, or until finished */
1835	verbose(VERB_ALGO, "dnstap io: stop flush started");
1836	if(ub_event_base_dispatch(info.base) < 0) {
1837		log_err("dnstap io: dispatch flush failed, errno is %s",
1838			strerror(errno));
1839	}
1840	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1841	free(info.stop_frame);
1842	dtio->stop_flush_event = NULL;
1843	ub_event_del(stopev);
1844	ub_event_free(stopev);
1845	ub_timer_del(timer);
1846	ub_event_free(timer);
1847	ub_event_base_free(info.base);
1848}
1849
1850/** perform desetup and free stuff when the dnstap io thread exits */
1851static void dtio_desetup(struct dt_io_thread* dtio)
1852{
1853	dtio_control_stop_flush(dtio);
1854	dtio_del_output_event(dtio);
1855	dtio_close_output(dtio);
1856	ub_event_del(dtio->command_event);
1857	ub_event_free(dtio->command_event);
1858#ifndef USE_WINSOCK
1859	close(dtio->commandpipe[0]);
1860#else
1861	_close(dtio->commandpipe[0]);
1862#endif
1863	dtio->commandpipe[0] = -1;
1864	dtio_reconnect_del(dtio);
1865	ub_event_free(dtio->reconnect_timer);
1866	dtio_cur_msg_free(dtio);
1867#ifndef THREADS_DISABLED
1868	ub_event_base_free(dtio->event_base);
1869#endif
1870}
1871
1872/** setup a start control message */
1873static int dtio_control_start_send(struct dt_io_thread* dtio)
1874{
1875	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1876	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1877		&dtio->cur_msg_len);
1878	if(!dtio->cur_msg) {
1879		return 0;
1880	}
1881	/* setup to send the control message */
1882	/* set that the buffer needs to be sent, but the length
1883	 * of that buffer is already written, that way the buffer can
1884	 * start with 0 length and then the length of the control frame
1885	 * in it */
1886	dtio->cur_msg_done = 0;
1887	dtio->cur_msg_len_done = 4;
1888	return 1;
1889}
1890
1891/** setup a ready control message */
1892static int dtio_control_ready_send(struct dt_io_thread* dtio)
1893{
1894	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1895	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1896		&dtio->cur_msg_len);
1897	if(!dtio->cur_msg) {
1898		return 0;
1899	}
1900	/* setup to send the control message */
1901	/* set that the buffer needs to be sent, but the length
1902	 * of that buffer is already written, that way the buffer can
1903	 * start with 0 length and then the length of the control frame
1904	 * in it */
1905	dtio->cur_msg_done = 0;
1906	dtio->cur_msg_len_done = 4;
1907	return 1;
1908}
1909
1910/** open the output file descriptor for af_local */
1911static int dtio_open_output_local(struct dt_io_thread* dtio)
1912{
1913#ifdef HAVE_SYS_UN_H
1914	struct sockaddr_un s;
1915	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1916	if(dtio->fd == -1) {
1917		log_err("dnstap io: failed to create socket: %s",
1918			sock_strerror(errno));
1919		return 0;
1920	}
1921	memset(&s, 0, sizeof(s));
1922#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1923        /* this member exists on BSDs, not Linux */
1924        s.sun_len = (unsigned)sizeof(s);
1925#endif
1926	s.sun_family = AF_LOCAL;
1927	/* length is 92-108, 104 on FreeBSD */
1928        (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1929	fd_set_nonblock(dtio->fd);
1930	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1931		== -1) {
1932		char* to = dtio->socket_path;
1933		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1934			verbosity < 4) {
1935			dtio_close_fd(dtio);
1936			return 0; /* no log retries on low verbosity */
1937		}
1938		log_err("dnstap io: failed to connect to \"%s\": %s",
1939			to, sock_strerror(errno));
1940		dtio_close_fd(dtio);
1941		return 0;
1942	}
1943	return 1;
1944#else
1945	log_err("cannot create af_local socket");
1946	return 0;
1947#endif /* HAVE_SYS_UN_H */
1948}
1949
1950/** open the output file descriptor for af_inet and af_inet6 */
1951static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1952{
1953	struct sockaddr_storage addr;
1954	socklen_t addrlen;
1955	memset(&addr, 0, sizeof(addr));
1956	addrlen = (socklen_t)sizeof(addr);
1957
1958	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1959		log_err("could not parse IP '%s'", dtio->ip_str);
1960		return 0;
1961	}
1962	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1963	if(dtio->fd == -1) {
1964		log_err("can't create socket: %s", sock_strerror(errno));
1965		return 0;
1966	}
1967	fd_set_nonblock(dtio->fd);
1968	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1969		if(errno == EINPROGRESS)
1970			return 1; /* wait until connect done*/
1971		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1972			verbosity < 4) {
1973			dtio_close_fd(dtio);
1974			return 0; /* no log retries on low verbosity */
1975		}
1976#ifndef USE_WINSOCK
1977		if(tcp_connect_errno_needs_log(
1978			(struct sockaddr *)&addr, addrlen)) {
1979			log_err("dnstap io: failed to connect to %s: %s",
1980				dtio->ip_str, strerror(errno));
1981		}
1982#else
1983		if(WSAGetLastError() == WSAEINPROGRESS ||
1984			WSAGetLastError() == WSAEWOULDBLOCK)
1985			return 1; /* wait until connect done*/
1986		if(tcp_connect_errno_needs_log(
1987			(struct sockaddr *)&addr, addrlen)) {
1988			log_err("dnstap io: failed to connect to %s: %s",
1989				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1990		}
1991#endif
1992		dtio_close_fd(dtio);
1993		return 0;
1994	}
1995	return 1;
1996}
1997
1998/** setup the SSL structure for new connection */
1999static int dtio_setup_ssl(struct dt_io_thread* dtio)
2000{
2001	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2002	if(!dtio->ssl) return 0;
2003	dtio->ssl_handshake_done = 0;
2004	dtio->ssl_brief_read = 0;
2005
2006	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2007		dtio->tls_use_sni)) {
2008		return 0;
2009	}
2010	return 1;
2011}
2012
2013/** open the output file descriptor */
2014static void dtio_open_output(struct dt_io_thread* dtio)
2015{
2016	struct ub_event* ev;
2017	if(dtio->upstream_is_unix) {
2018		if(!dtio_open_output_local(dtio)) {
2019			dtio_reconnect_enable(dtio);
2020			return;
2021		}
2022	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2023		if(!dtio_open_output_tcp(dtio)) {
2024			dtio_reconnect_enable(dtio);
2025			return;
2026		}
2027		if(dtio->upstream_is_tls) {
2028			if(!dtio_setup_ssl(dtio)) {
2029				dtio_close_fd(dtio);
2030				dtio_reconnect_enable(dtio);
2031				return;
2032			}
2033		}
2034	}
2035	dtio->check_nb_connect = 1;
2036
2037	/* the EV_READ is to read ACCEPT control messages, and catch channel
2038	 * close. EV_WRITE is to write packets */
2039	ev = ub_event_new(dtio->event_base, dtio->fd,
2040		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2041		dtio);
2042	if(!ev) {
2043		log_err("dnstap io: out of memory");
2044		if(dtio->ssl) {
2045#ifdef HAVE_SSL
2046			SSL_free(dtio->ssl);
2047			dtio->ssl = NULL;
2048#endif
2049		}
2050		dtio_close_fd(dtio);
2051		dtio_reconnect_enable(dtio);
2052		return;
2053	}
2054	dtio->event = ev;
2055
2056	/* setup protocol control message to start */
2057	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2058		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2059		log_err("dnstap io: out of memory");
2060		ub_event_free(dtio->event);
2061		dtio->event = NULL;
2062		if(dtio->ssl) {
2063#ifdef HAVE_SSL
2064			SSL_free(dtio->ssl);
2065			dtio->ssl = NULL;
2066#endif
2067		}
2068		dtio_close_fd(dtio);
2069		dtio_reconnect_enable(dtio);
2070		return;
2071	}
2072}
2073
2074/** perform the setup of the writer thread on the established event_base */
2075static void dtio_setup_on_base(struct dt_io_thread* dtio)
2076{
2077	dtio_setup_cmd(dtio);
2078	dtio_setup_reconnect(dtio);
2079	dtio_open_output(dtio);
2080	if(!dtio_add_output_event_write(dtio))
2081		return;
2082}
2083
2084#ifndef THREADS_DISABLED
2085/** the IO thread function for the DNSTAP IO */
2086static void* dnstap_io(void* arg)
2087{
2088	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2089	time_t secs = 0;
2090	struct timeval now;
2091	log_thread_set(&dtio->threadnum);
2092
2093	/* setup */
2094	verbose(VERB_ALGO, "start dnstap io thread");
2095	dtio_setup_base(dtio, &secs, &now);
2096	dtio_setup_on_base(dtio);
2097
2098	/* run */
2099	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2100		log_err("dnstap io: dispatch failed, errno is %s",
2101			strerror(errno));
2102	}
2103
2104	/* cleanup */
2105	verbose(VERB_ALGO, "stop dnstap io thread");
2106	dtio_desetup(dtio);
2107	return NULL;
2108}
2109#endif /* THREADS_DISABLED */
2110
2111int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2112	int numworkers)
2113{
2114	/* set up the thread, can fail */
2115#ifndef USE_WINSOCK
2116	if(pipe(dtio->commandpipe) == -1) {
2117		log_err("failed to create pipe: %s", strerror(errno));
2118		return 0;
2119	}
2120#else
2121	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2122		log_err("failed to create _pipe: %s",
2123			wsa_strerror(WSAGetLastError()));
2124		return 0;
2125	}
2126#endif
2127
2128	/* start the thread */
2129	dtio->threadnum = numworkers+1;
2130	dtio->started = 1;
2131#ifndef THREADS_DISABLED
2132	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2133	(void)event_base_nothr;
2134#else
2135	dtio->event_base = event_base_nothr;
2136	dtio_setup_on_base(dtio);
2137#endif
2138	return 1;
2139}
2140
2141void dt_io_thread_stop(struct dt_io_thread* dtio)
2142{
2143#ifndef THREADS_DISABLED
2144	uint8_t cmd = DTIO_COMMAND_STOP;
2145#endif
2146	if(!dtio) return;
2147	if(!dtio->started) return;
2148	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2149
2150#ifndef THREADS_DISABLED
2151	while(1) {
2152		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2153		if(r == -1) {
2154#ifndef USE_WINSOCK
2155			if(errno == EINTR || errno == EAGAIN)
2156				continue;
2157#else
2158			if(WSAGetLastError() == WSAEINPROGRESS)
2159				continue;
2160			if(WSAGetLastError() == WSAEWOULDBLOCK)
2161				continue;
2162#endif
2163			log_err("dnstap io stop: write: %s",
2164				sock_strerror(errno));
2165			break;
2166		}
2167		break;
2168	}
2169	dtio->started = 0;
2170#endif /* THREADS_DISABLED */
2171
2172#ifndef USE_WINSOCK
2173	close(dtio->commandpipe[1]);
2174#else
2175	_close(dtio->commandpipe[1]);
2176#endif
2177	dtio->commandpipe[1] = -1;
2178#ifndef THREADS_DISABLED
2179	ub_thread_join(dtio->tid);
2180#else
2181	dtio->want_to_exit = 1;
2182	dtio_desetup(dtio);
2183#endif
2184}
2185