1/*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37/**
38 * \file
39 *
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
42 */
43
44#include "config.h"
45#include "dnstap/dtstream.h"
46#include "dnstap/dnstap_fstrm.h"
47#include "util/config_file.h"
48#include "util/ub_event.h"
49#include "util/net_help.h"
50#include "services/outside_network.h"
51#include "sldns/sbuffer.h"
52#ifdef HAVE_SYS_UN_H
53#include <sys/un.h>
54#endif
55#include <fcntl.h>
56#ifdef HAVE_OPENSSL_SSL_H
57#include <openssl/ssl.h>
58#endif
59#ifdef HAVE_OPENSSL_ERR_H
60#include <openssl/err.h>
61#endif
62
63/** number of messages to process in one output callback */
64#define DTIO_MESSAGES_PER_CALLBACK 100
65/** the msec to wait for reconnect (if not immediate, the first attempt) */
66#define DTIO_RECONNECT_TIMEOUT_MIN 10
67/** the msec to wait for reconnect max after backoff */
68#define DTIO_RECONNECT_TIMEOUT_MAX 1000
69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71/** number of messages before wakeup of thread */
72#define DTIO_MSG_FOR_WAKEUP 32
73
74/** maximum length of received frame */
75#define DTIO_RECV_FRAME_MAX_LEN 1000
76
77struct stop_flush_info;
78/** DTIO command channel commands */
79enum {
80	/** DTIO command channel stop */
81	DTIO_COMMAND_STOP = 0,
82	/** DTIO command channel wakeup */
83	DTIO_COMMAND_WAKEUP = 1
84} dtio_channel_command;
85
86/** open the output channel */
87static void dtio_open_output(struct dt_io_thread* dtio);
88/** add output event for read and write */
89static int dtio_add_output_event_write(struct dt_io_thread* dtio);
90/** start reconnection attempts */
91static void dtio_reconnect_enable(struct dt_io_thread* dtio);
92/** stop from stop_flush event loop */
93static void dtio_stop_flush_exit(struct stop_flush_info* info);
94/** setup a start control message */
95static int dtio_control_start_send(struct dt_io_thread* dtio);
96#ifdef HAVE_SSL
97/** enable briefly waiting for a read event, for SSL negotiation */
98static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99/** enable briefly waiting for a write event, for SSL negotiation */
100static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101#endif
102
103struct dt_msg_queue*
104dt_msg_queue_create(struct comm_base* base)
105{
106	struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107	if(!mq) return NULL;
108	mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109		about 1 M should contain 64K messages with some overhead,
110		or a whole bunch smaller ones */
111	mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112	if(!mq->wakeup_timer) {
113		free(mq);
114		return NULL;
115	}
116	lock_basic_init(&mq->lock);
117	lock_protect(&mq->lock, mq, sizeof(*mq));
118	return mq;
119}
120
121/** clear the message list, caller must hold the lock */
122static void
123dt_msg_queue_clear(struct dt_msg_queue* mq)
124{
125	struct dt_msg_entry* e = mq->first, *next=NULL;
126	while(e) {
127		next = e->next;
128		free(e->buf);
129		free(e);
130		e = next;
131	}
132	mq->first = NULL;
133	mq->last = NULL;
134	mq->cursize = 0;
135	mq->msgcount = 0;
136}
137
138void
139dt_msg_queue_delete(struct dt_msg_queue* mq)
140{
141	if(!mq) return;
142	lock_basic_destroy(&mq->lock);
143	dt_msg_queue_clear(mq);
144	comm_timer_delete(mq->wakeup_timer);
145	free(mq);
146}
147
148/** make the dtio wake up by sending a wakeup command */
149static void dtio_wakeup(struct dt_io_thread* dtio)
150{
151	uint8_t cmd = DTIO_COMMAND_WAKEUP;
152	if(!dtio) return;
153	if(!dtio->started) return;
154
155	while(1) {
156		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157		if(r == -1) {
158#ifndef USE_WINSOCK
159			if(errno == EINTR || errno == EAGAIN)
160				continue;
161#else
162			if(WSAGetLastError() == WSAEINPROGRESS)
163				continue;
164			if(WSAGetLastError() == WSAEWOULDBLOCK)
165				continue;
166#endif
167			log_err("dnstap io wakeup: write: %s",
168				sock_strerror(errno));
169			break;
170		}
171		break;
172	}
173}
174
175void
176mq_wakeup_cb(void* arg)
177{
178	struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179	/* even if the dtio is already active, because perhaps much
180	 * traffic suddenly, we leave the timer running to save on
181	 * managing it, the once a second timer is less work then
182	 * starting and stopping the timer frequently */
183	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184	mq->dtio->wakeup_timer_enabled = 0;
185	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186	dtio_wakeup(mq->dtio);
187}
188
189/** start timer to wakeup dtio because there is content in the queue */
190static void
191dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
192{
193	struct timeval tv = {0};
194	/* Start a timer to process messages to be logged.
195	 * If we woke up the dtio thread for every message, the wakeup
196	 * messages take up too much processing power.  If the queue
197	 * fills up the wakeup happens immediately.  The timer wakes it up
198	 * if there are infrequent messages to log. */
199
200	/* we cannot start a timer in dtio thread, because it is a different
201	 * thread and its event base is in use by the other thread, it would
202	 * give race conditions if we tried to modify its event base,
203	 * and locks would wait until it woke up, and this is what we do. */
204
205	/* do not start the timer if a timer already exists, perhaps
206	 * in another worker.  So this variable is protected by a lock in
207	 * dtio. */
208
209	/* If we need to wakeupnow, 0 the timer to force the callback. */
210	lock_basic_lock(&mq->dtio->wakeup_timer_lock);
211	if(mq->dtio->wakeup_timer_enabled) {
212		if(wakeupnow) {
213			comm_timer_set(mq->wakeup_timer, &tv);
214		}
215		lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
216		return;
217	}
218	mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
219
220	/* start the timer, in mq, in the event base of our worker */
221	if(!wakeupnow) {
222		tv.tv_sec = 1;
223		tv.tv_usec = 0;
224	}
225	comm_timer_set(mq->wakeup_timer, &tv);
226	lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
227}
228
229void
230dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
231{
232	int wakeupnow = 0, wakeupstarttimer = 0;
233	struct dt_msg_entry* entry;
234
235	/* check conditions */
236	if(!buf) return;
237	if(len == 0) {
238		/* it is not possible to log entries with zero length,
239		 * because the framestream protocol does not carry it.
240		 * However the protobuf serialization does not create zero
241		 * length datagrams for dnstap, so this should not happen. */
242		free(buf);
243		return;
244	}
245	if(!mq) {
246		free(buf);
247		return;
248	}
249
250	/* allocate memory for queue entry */
251	entry = malloc(sizeof(*entry));
252	if(!entry) {
253		log_err("out of memory logging dnstap");
254		free(buf);
255		return;
256	}
257	entry->next = NULL;
258	entry->buf = buf;
259	entry->len = len;
260
261	/* acquire lock */
262	lock_basic_lock(&mq->lock);
263	/* if list was empty, start timer for (eventual) wakeup */
264	if(mq->first == NULL)
265		wakeupstarttimer = 1;
266	/* if list contains more than wakeupnum elements, wakeup now,
267	 * or if list is (going to be) almost full */
268	if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
269		(mq->cursize < mq->maxsize * 9 / 10 &&
270		mq->cursize+len >= mq->maxsize * 9 / 10))
271		wakeupnow = 1;
272	/* see if it is going to fit */
273	if(mq->cursize + len > mq->maxsize) {
274		/* buffer full, or congested. */
275		/* drop */
276		lock_basic_unlock(&mq->lock);
277		free(buf);
278		free(entry);
279		return;
280	}
281	mq->cursize += len;
282	mq->msgcount ++;
283	/* append to list */
284	if(mq->last) {
285		mq->last->next = entry;
286	} else {
287		mq->first = entry;
288	}
289	mq->last = entry;
290	/* release lock */
291	lock_basic_unlock(&mq->lock);
292
293	if(wakeupnow || wakeupstarttimer) {
294		dt_msg_queue_start_timer(mq, wakeupnow);
295	}
296}
297
298struct dt_io_thread* dt_io_thread_create(void)
299{
300	struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
301	lock_basic_init(&dtio->wakeup_timer_lock);
302	lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
303		sizeof(dtio->wakeup_timer_enabled));
304	return dtio;
305}
306
307void dt_io_thread_delete(struct dt_io_thread* dtio)
308{
309	struct dt_io_list_item* item, *nextitem;
310	if(!dtio) return;
311	lock_basic_destroy(&dtio->wakeup_timer_lock);
312	item=dtio->io_list;
313	while(item) {
314		nextitem = item->next;
315		free(item);
316		item = nextitem;
317	}
318	free(dtio->socket_path);
319	free(dtio->ip_str);
320	free(dtio->tls_server_name);
321	free(dtio->client_key_file);
322	free(dtio->client_cert_file);
323	if(dtio->ssl_ctx) {
324#ifdef HAVE_SSL
325		SSL_CTX_free(dtio->ssl_ctx);
326#endif
327	}
328	free(dtio);
329}
330
331int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
332{
333	if(!cfg->dnstap) {
334		log_warn("cannot setup dnstap because dnstap-enable is no");
335		return 0;
336	}
337
338	/* what type of connectivity do we have */
339	if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
340		if(cfg->dnstap_tls)
341			dtio->upstream_is_tls = 1;
342		else	dtio->upstream_is_tcp = 1;
343	} else {
344		dtio->upstream_is_unix = 1;
345	}
346	dtio->is_bidirectional = cfg->dnstap_bidirectional;
347
348	if(dtio->upstream_is_unix) {
349		char* nm;
350		if(!cfg->dnstap_socket_path ||
351			cfg->dnstap_socket_path[0]==0) {
352			log_err("dnstap setup: no dnstap-socket-path for "
353				"socket connect");
354			return 0;
355		}
356		nm = cfg->dnstap_socket_path;
357		if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
358			cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
359			nm += strlen(cfg->chrootdir);
360		free(dtio->socket_path);
361		dtio->socket_path = strdup(nm);
362		if(!dtio->socket_path) {
363			log_err("dnstap setup: malloc failure");
364			return 0;
365		}
366	}
367
368	if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
369		if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
370			log_err("dnstap setup: no dnstap-ip for TCP connect");
371			return 0;
372		}
373		free(dtio->ip_str);
374		dtio->ip_str = strdup(cfg->dnstap_ip);
375		if(!dtio->ip_str) {
376			log_err("dnstap setup: malloc failure");
377			return 0;
378		}
379	}
380
381	if(dtio->upstream_is_tls) {
382#ifdef HAVE_SSL
383		if(cfg->dnstap_tls_server_name &&
384			cfg->dnstap_tls_server_name[0]) {
385			free(dtio->tls_server_name);
386			dtio->tls_server_name = strdup(
387				cfg->dnstap_tls_server_name);
388			if(!dtio->tls_server_name) {
389				log_err("dnstap setup: malloc failure");
390				return 0;
391			}
392			if(!check_auth_name_for_ssl(dtio->tls_server_name))
393				return 0;
394		}
395		if(cfg->dnstap_tls_client_key_file &&
396			cfg->dnstap_tls_client_key_file[0]) {
397			dtio->use_client_certs = 1;
398			free(dtio->client_key_file);
399			dtio->client_key_file = strdup(
400				cfg->dnstap_tls_client_key_file);
401			if(!dtio->client_key_file) {
402				log_err("dnstap setup: malloc failure");
403				return 0;
404			}
405			if(!cfg->dnstap_tls_client_cert_file ||
406				cfg->dnstap_tls_client_cert_file[0]==0) {
407				log_err("dnstap setup: client key "
408					"authentication enabled with "
409					"dnstap-tls-client-key-file, but "
410					"no dnstap-tls-client-cert-file "
411					"is given");
412				return 0;
413			}
414			free(dtio->client_cert_file);
415			dtio->client_cert_file = strdup(
416				cfg->dnstap_tls_client_cert_file);
417			if(!dtio->client_cert_file) {
418				log_err("dnstap setup: malloc failure");
419				return 0;
420			}
421		} else {
422			dtio->use_client_certs = 0;
423			dtio->client_key_file = NULL;
424			dtio->client_cert_file = NULL;
425		}
426
427		if(cfg->dnstap_tls_cert_bundle) {
428			dtio->ssl_ctx = connect_sslctx_create(
429				dtio->client_key_file,
430				dtio->client_cert_file,
431				cfg->dnstap_tls_cert_bundle, 0);
432		} else {
433			dtio->ssl_ctx = connect_sslctx_create(
434				dtio->client_key_file,
435				dtio->client_cert_file,
436				cfg->tls_cert_bundle, cfg->tls_win_cert);
437		}
438		if(!dtio->ssl_ctx) {
439			log_err("could not setup SSL CTX");
440			return 0;
441		}
442		dtio->tls_use_sni = cfg->tls_use_sni;
443#endif /* HAVE_SSL */
444	}
445	return 1;
446}
447
448int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449        struct dt_msg_queue* mq)
450{
451	struct dt_io_list_item* item = malloc(sizeof(*item));
452	if(!item) return 0;
453	lock_basic_lock(&mq->lock);
454	mq->dtio = dtio;
455	lock_basic_unlock(&mq->lock);
456	item->queue = mq;
457	item->next = dtio->io_list;
458	dtio->io_list = item;
459	dtio->io_list_iter = NULL;
460	return 1;
461}
462
463void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
464        struct dt_msg_queue* mq)
465{
466	struct dt_io_list_item* item, *prev=NULL;
467	if(!dtio) return;
468	item = dtio->io_list;
469	while(item) {
470		if(item->queue == mq) {
471			/* found it */
472			if(prev) prev->next = item->next;
473			else dtio->io_list = item->next;
474			/* the queue itself only registered, not deleted */
475			lock_basic_lock(&item->queue->lock);
476			item->queue->dtio = NULL;
477			lock_basic_unlock(&item->queue->lock);
478			free(item);
479			dtio->io_list_iter = NULL;
480			return;
481		}
482		prev = item;
483		item = item->next;
484	}
485}
486
487/** pick a message from the queue, the routine locks and unlocks,
488 * returns true if there is a message */
489static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
490	size_t* len)
491{
492	lock_basic_lock(&mq->lock);
493	if(mq->first) {
494		struct dt_msg_entry* entry = mq->first;
495		mq->first = entry->next;
496		if(!entry->next) mq->last = NULL;
497		mq->cursize -= entry->len;
498		mq->msgcount --;
499		lock_basic_unlock(&mq->lock);
500
501		*buf = entry->buf;
502		*len = entry->len;
503		free(entry);
504		return 1;
505	}
506	lock_basic_unlock(&mq->lock);
507	return 0;
508}
509
510/** find message in queue, false if no message, true if message to send */
511static int dtio_find_in_queue(struct dt_io_thread* dtio,
512	struct dt_msg_queue* mq)
513{
514	void* buf=NULL;
515	size_t len=0;
516	if(dt_msg_queue_pop(mq, &buf, &len)) {
517		dtio->cur_msg = buf;
518		dtio->cur_msg_len = len;
519		dtio->cur_msg_done = 0;
520		dtio->cur_msg_len_done = 0;
521		return 1;
522	}
523	return 0;
524}
525
526/** find a new message to write, search message queues, false if none */
527static int dtio_find_msg(struct dt_io_thread* dtio)
528{
529	struct dt_io_list_item *spot, *item;
530
531	spot = dtio->io_list_iter;
532	/* use the next queue for the next message lookup,
533	 * if we hit the end(NULL) the NULL restarts the iter at start. */
534	if(spot)
535		dtio->io_list_iter = spot->next;
536	else if(dtio->io_list)
537		dtio->io_list_iter = dtio->io_list->next;
538
539	/* scan from spot to end-of-io_list */
540	item = spot;
541	while(item) {
542		if(dtio_find_in_queue(dtio, item->queue))
543			return 1;
544		item = item->next;
545	}
546	/* scan starting at the start-of-list (to wrap around the end) */
547	item = dtio->io_list;
548	while(item) {
549		if(dtio_find_in_queue(dtio, item->queue))
550			return 1;
551		item = item->next;
552	}
553	return 0;
554}
555
556/** callback for the dnstap reconnect, to start reconnecting to output */
557void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
558	short ATTR_UNUSED(bits), void* arg)
559{
560	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
561	dtio->reconnect_is_added = 0;
562	verbose(VERB_ALGO, "dnstap io: reconnect timer");
563
564	dtio_open_output(dtio);
565	if(dtio->event) {
566		if(!dtio_add_output_event_write(dtio))
567			return;
568		/* nothing wrong so far, wait on the output event */
569		return;
570	}
571	/* exponential backoff and retry on timer */
572	dtio_reconnect_enable(dtio);
573}
574
575/** attempt to reconnect to the output, after a timeout */
576static void dtio_reconnect_enable(struct dt_io_thread* dtio)
577{
578	struct timeval tv;
579	int msec;
580	if(dtio->want_to_exit) return;
581	if(dtio->reconnect_is_added)
582		return; /* already done */
583
584	/* exponential backoff, store the value for next timeout */
585	msec = dtio->reconnect_timeout;
586	if(msec == 0) {
587		dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
588	} else {
589		dtio->reconnect_timeout = msec*2;
590		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
591			dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
592	}
593	verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
594		msec);
595
596	/* setup wait timer */
597	memset(&tv, 0, sizeof(tv));
598	tv.tv_sec = msec/1000;
599	tv.tv_usec = (msec%1000)*1000;
600	if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
601		&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
602		log_err("dnstap io: could not reconnect ev timer add");
603		return;
604	}
605	dtio->reconnect_is_added = 1;
606}
607
608/** remove dtio reconnect timer */
609static void dtio_reconnect_del(struct dt_io_thread* dtio)
610{
611	if(!dtio->reconnect_is_added)
612		return;
613	ub_timer_del(dtio->reconnect_timer);
614	dtio->reconnect_is_added = 0;
615}
616
617/** clear the reconnect exponential backoff timer.
618 * We have successfully connected so we can try again with short timeouts. */
619static void dtio_reconnect_clear(struct dt_io_thread* dtio)
620{
621	dtio->reconnect_timeout = 0;
622	dtio_reconnect_del(dtio);
623}
624
625/** reconnect slowly, because we already know we have to wait for a bit */
626static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
627{
628	dtio_reconnect_del(dtio);
629	dtio->reconnect_timeout = msec;
630	dtio_reconnect_enable(dtio);
631}
632
633/** delete the current message in the dtio, and reset counters */
634static void dtio_cur_msg_free(struct dt_io_thread* dtio)
635{
636	free(dtio->cur_msg);
637	dtio->cur_msg = NULL;
638	dtio->cur_msg_len = 0;
639	dtio->cur_msg_done = 0;
640	dtio->cur_msg_len_done = 0;
641}
642
643/** delete the buffer and counters used to read frame */
644static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
645{
646	if(rb->buf) {
647		free(rb->buf);
648		rb->buf = NULL;
649	}
650	rb->buf_count = 0;
651	rb->buf_cap = 0;
652	rb->frame_len = 0;
653	rb->frame_len_done = 0;
654	rb->control_frame = 0;
655}
656
657/** del the output file descriptor event for listening */
658static void dtio_del_output_event(struct dt_io_thread* dtio)
659{
660	if(!dtio->event_added)
661		return;
662	ub_event_del(dtio->event);
663	dtio->event_added = 0;
664	dtio->event_added_is_write = 0;
665}
666
667/** close dtio socket and set it to -1 */
668static void dtio_close_fd(struct dt_io_thread* dtio)
669{
670	sock_close(dtio->fd);
671	dtio->fd = -1;
672}
673
674/** close and stop the output file descriptor event */
675static void dtio_close_output(struct dt_io_thread* dtio)
676{
677	if(!dtio->event)
678		return;
679	ub_event_free(dtio->event);
680	dtio->event = NULL;
681	if(dtio->ssl) {
682#ifdef HAVE_SSL
683		SSL_shutdown(dtio->ssl);
684		SSL_free(dtio->ssl);
685		dtio->ssl = NULL;
686#endif
687	}
688	dtio_close_fd(dtio);
689
690	/* if there is a (partial) message, discard it
691	 * we cannot send (the remainder of) it, and a new
692	 * connection needs to start with a control frame. */
693	if(dtio->cur_msg) {
694		dtio_cur_msg_free(dtio);
695	}
696
697	dtio->ready_frame_sent = 0;
698	dtio->accept_frame_received = 0;
699	dtio_read_frame_free(&dtio->read_frame);
700
701	dtio_reconnect_enable(dtio);
702}
703
704/** check for pending nonblocking connect errors,
705 * returns 1 if it is okay. -1 on error (close it), 0 to try later */
706static int dtio_check_nb_connect(struct dt_io_thread* dtio)
707{
708	int error = 0;
709	socklen_t len = (socklen_t)sizeof(error);
710	if(!dtio->check_nb_connect)
711		return 1; /* everything okay */
712	if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
713		&len) < 0) {
714#ifndef USE_WINSOCK
715		error = errno; /* on solaris errno is error */
716#else
717		error = WSAGetLastError();
718#endif
719	}
720#ifndef USE_WINSOCK
721#if defined(EINPROGRESS) && defined(EWOULDBLOCK)
722	if(error == EINPROGRESS || error == EWOULDBLOCK)
723		return 0; /* try again later */
724#endif
725#else
726	if(error == WSAEINPROGRESS) {
727		return 0; /* try again later */
728	} else if(error == WSAEWOULDBLOCK) {
729		ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
730			dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
731		return 0; /* try again later */
732	}
733#endif
734	if(error != 0) {
735		char* to = dtio->socket_path;
736		if(!to) to = dtio->ip_str;
737		if(!to) to = "";
738		log_err("dnstap io: failed to connect to \"%s\": %s",
739			to, sock_strerror(error));
740		return -1; /* error, close it */
741	}
742
743	if(dtio->ip_str)
744		verbose(VERB_DETAIL, "dnstap io: connected to %s",
745			dtio->ip_str);
746	else if(dtio->socket_path)
747		verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
748			dtio->socket_path);
749	dtio_reconnect_clear(dtio);
750	dtio->check_nb_connect = 0;
751	return 1; /* everything okay */
752}
753
754#ifdef HAVE_SSL
755/** write to ssl output
756 * returns number of bytes written, 0 if nothing happened,
757 * try again later, or -1 if the channel is to be closed. */
758static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
759	size_t len)
760{
761	int r;
762	ERR_clear_error();
763	r = SSL_write(dtio->ssl, buf, len);
764	if(r <= 0) {
765		int want = SSL_get_error(dtio->ssl, r);
766		if(want == SSL_ERROR_ZERO_RETURN) {
767			/* closed */
768			return -1;
769		} else if(want == SSL_ERROR_WANT_READ) {
770			/* we want a brief read event */
771			dtio_enable_brief_read(dtio);
772			return 0;
773		} else if(want == SSL_ERROR_WANT_WRITE) {
774			/* write again later */
775			return 0;
776		} else if(want == SSL_ERROR_SYSCALL) {
777#ifdef EPIPE
778			if(errno == EPIPE && verbosity < 2)
779				return -1; /* silence 'broken pipe' */
780#endif
781#ifdef ECONNRESET
782			if(errno == ECONNRESET && verbosity < 2)
783				return -1; /* silence reset by peer */
784#endif
785			if(errno != 0) {
786				log_err("dnstap io, SSL_write syscall: %s",
787					strerror(errno));
788			}
789			return -1;
790		}
791		log_crypto_err_io("dnstap io, could not SSL_write", want);
792		return -1;
793	}
794	return r;
795}
796#endif /* HAVE_SSL */
797
798/** write buffer to output.
799 * returns number of bytes written, 0 if nothing happened,
800 * try again later, or -1 if the channel is to be closed. */
801static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
802	size_t len)
803{
804	ssize_t ret;
805	if(dtio->fd == -1)
806		return -1;
807#ifdef HAVE_SSL
808	if(dtio->ssl)
809		return dtio_write_ssl(dtio, buf, len);
810#endif
811	ret = send(dtio->fd, (void*)buf, len, 0);
812	if(ret == -1) {
813#ifndef USE_WINSOCK
814		if(errno == EINTR || errno == EAGAIN)
815			return 0;
816#else
817		if(WSAGetLastError() == WSAEINPROGRESS)
818			return 0;
819		if(WSAGetLastError() == WSAEWOULDBLOCK) {
820			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
821				dtio->stop_flush_event:dtio->event),
822				UB_EV_WRITE);
823			return 0;
824		}
825#endif
826		log_err("dnstap io: failed send: %s", sock_strerror(errno));
827		return -1;
828	}
829	return ret;
830}
831
832#ifdef HAVE_WRITEV
833/** write with writev, len and message, in one write, if possible.
834 * return true if message is done, false if incomplete */
835static int dtio_write_with_writev(struct dt_io_thread* dtio)
836{
837	uint32_t sendlen = htonl(dtio->cur_msg_len);
838	struct iovec iov[2];
839	ssize_t r;
840	iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
841	iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
842	iov[1].iov_base = dtio->cur_msg;
843	iov[1].iov_len = dtio->cur_msg_len;
844	log_assert(iov[0].iov_len > 0);
845	r = writev(dtio->fd, iov, 2);
846	if(r == -1) {
847#ifndef USE_WINSOCK
848		if(errno == EINTR || errno == EAGAIN)
849			return 0;
850#else
851		if(WSAGetLastError() == WSAEINPROGRESS)
852			return 0;
853		if(WSAGetLastError() == WSAEWOULDBLOCK) {
854			ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
855				dtio->stop_flush_event:dtio->event),
856				UB_EV_WRITE);
857			return 0;
858		}
859#endif
860		log_err("dnstap io: failed writev: %s", sock_strerror(errno));
861		/* close the channel */
862		dtio_del_output_event(dtio);
863		dtio_close_output(dtio);
864		return 0;
865	}
866	/* written r bytes */
867	dtio->cur_msg_len_done += r;
868	if(dtio->cur_msg_len_done < 4)
869		return 0;
870	if(dtio->cur_msg_len_done > 4) {
871		dtio->cur_msg_done = dtio->cur_msg_len_done-4;
872		dtio->cur_msg_len_done = 4;
873	}
874	if(dtio->cur_msg_done < dtio->cur_msg_len)
875		return 0;
876	return 1;
877}
878#endif /* HAVE_WRITEV */
879
880/** write more of the length, preceding the data frame.
881 * return true if message is done, false if incomplete. */
882static int dtio_write_more_of_len(struct dt_io_thread* dtio)
883{
884	uint32_t sendlen;
885	int r;
886	if(dtio->cur_msg_len_done >= 4)
887		return 1;
888#ifdef HAVE_WRITEV
889	if(!dtio->ssl) {
890		/* we try writev for everything.*/
891		return dtio_write_with_writev(dtio);
892	}
893#endif /* HAVE_WRITEV */
894	sendlen = htonl(dtio->cur_msg_len);
895	r = dtio_write_buf(dtio,
896		((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
897		sizeof(sendlen)-dtio->cur_msg_len_done);
898	if(r == -1) {
899		/* close the channel */
900		dtio_del_output_event(dtio);
901		dtio_close_output(dtio);
902		return 0;
903	} else if(r == 0) {
904		/* try again later */
905		return 0;
906	}
907	dtio->cur_msg_len_done += r;
908	if(dtio->cur_msg_len_done < 4)
909		return 0;
910	return 1;
911}
912
913/** write more of the data frame.
914 * return true if message is done, false if incomplete. */
915static int dtio_write_more_of_data(struct dt_io_thread* dtio)
916{
917	int r;
918	if(dtio->cur_msg_done >= dtio->cur_msg_len)
919		return 1;
920	r = dtio_write_buf(dtio,
921		((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
922		dtio->cur_msg_len - dtio->cur_msg_done);
923	if(r == -1) {
924		/* close the channel */
925		dtio_del_output_event(dtio);
926		dtio_close_output(dtio);
927		return 0;
928	} else if(r == 0) {
929		/* try again later */
930		return 0;
931	}
932	dtio->cur_msg_done += r;
933	if(dtio->cur_msg_done < dtio->cur_msg_len)
934		return 0;
935	return 1;
936}
937
938/** write more of the current message. false if incomplete, true if
939 * the message is done */
940static int dtio_write_more(struct dt_io_thread* dtio)
941{
942	if(dtio->cur_msg_len_done < 4) {
943		if(!dtio_write_more_of_len(dtio))
944			return 0;
945	}
946	if(dtio->cur_msg_done < dtio->cur_msg_len) {
947		if(!dtio_write_more_of_data(dtio))
948			return 0;
949	}
950	return 1;
951}
952
953/** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
954 * -1: continue, >0: number of bytes read into buffer */
955static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
956	ssize_t r;
957	r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
958	if(r == -1) {
959		char* to = dtio->socket_path;
960		if(!to) to = dtio->ip_str;
961		if(!to) to = "";
962#ifndef USE_WINSOCK
963		if(errno == EINTR || errno == EAGAIN)
964			return -1; /* try later */
965#else
966		if(WSAGetLastError() == WSAEINPROGRESS) {
967			return -1; /* try later */
968		} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
969			ub_winsock_tcp_wouldblock(
970				(dtio->stop_flush_event?
971				dtio->stop_flush_event:dtio->event),
972				UB_EV_READ);
973			return -1; /* try later */
974		}
975#endif
976		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
977			verbosity < 4)
978			return 0; /* no log retries on low verbosity */
979		log_err("dnstap io: output closed, recv %s: %s", to,
980			strerror(errno));
981		/* and close below */
982		return 0;
983	}
984	if(r == 0) {
985		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
986			verbosity < 4)
987			return 0; /* no log retries on low verbosity */
988		verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
989		/* and close below */
990		return 0;
991	}
992	/* something was received */
993	return r;
994}
995
996#ifdef HAVE_SSL
997/** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
998 * -1: continue, >0: number of bytes read into buffer */
999static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
1000{
1001	int r;
1002	ERR_clear_error();
1003	r = SSL_read(dtio->ssl, buf, len);
1004	if(r <= 0) {
1005		int want = SSL_get_error(dtio->ssl, r);
1006		if(want == SSL_ERROR_ZERO_RETURN) {
1007			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1008				verbosity < 4)
1009				return 0; /* no log retries on low verbosity */
1010			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1011				"other side");
1012			return 0;
1013		} else if(want == SSL_ERROR_WANT_READ) {
1014			/* continue later */
1015			return -1;
1016		} else if(want == SSL_ERROR_WANT_WRITE) {
1017			(void)dtio_enable_brief_write(dtio);
1018			return -1;
1019		} else if(want == SSL_ERROR_SYSCALL) {
1020#ifdef ECONNRESET
1021			if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1022				errno == ECONNRESET && verbosity < 4)
1023				return 0; /* silence reset by peer */
1024#endif
1025			if(errno != 0)
1026				log_err("SSL_read syscall: %s",
1027					strerror(errno));
1028			verbose(VERB_DETAIL, "dnstap io: output closed by the "
1029				"other side");
1030			return 0;
1031		}
1032		log_crypto_err_io("could not SSL_read", want);
1033		verbose(VERB_DETAIL, "dnstap io: output closed by the "
1034				"other side");
1035		return 0;
1036	}
1037	return r;
1038}
1039#endif /* HAVE_SSL */
1040
1041/** check if the output fd has been closed,
1042 * it returns false if the stream is closed. */
1043static int dtio_check_close(struct dt_io_thread* dtio)
1044{
1045	/* we don't want to read any packets, but if there are we can
1046	 * discard the input (ignore it).  Ignore of unknown (control)
1047	 * packets is okay for the framestream protocol.  And also, the
1048	 * read call can return that the stream has been closed by the
1049	 * other side. */
1050	uint8_t buf[1024];
1051	int r = -1;
1052
1053
1054	if(dtio->fd == -1) return 0;
1055
1056	while(r != 0) {
1057		/* not interested in buffer content, overwrite */
1058		r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1059		if(r == -1)
1060			return 1;
1061	}
1062	/* the other end has been closed */
1063	/* close the channel */
1064	dtio_del_output_event(dtio);
1065	dtio_close_output(dtio);
1066	return 0;
1067}
1068
1069/** Read accept frame. Returns -1: continue reading, 0: closed,
1070 * 1: valid accept received. */
1071static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1072{
1073	int r;
1074	size_t read_frame_done;
1075	while(dtio->read_frame.frame_len_done < 4) {
1076#ifdef HAVE_SSL
1077		if(dtio->ssl) {
1078			r = ssl_read_bytes(dtio,
1079				(uint8_t*)&dtio->read_frame.frame_len+
1080				dtio->read_frame.frame_len_done,
1081				4-dtio->read_frame.frame_len_done);
1082		} else {
1083#endif
1084			r = receive_bytes(dtio,
1085				(uint8_t*)&dtio->read_frame.frame_len+
1086				dtio->read_frame.frame_len_done,
1087				4-dtio->read_frame.frame_len_done);
1088#ifdef HAVE_SSL
1089		}
1090#endif
1091		if(r == -1)
1092			return -1; /* continue reading */
1093		if(r == 0) {
1094			 /* connection closed */
1095			goto close_connection;
1096		}
1097		dtio->read_frame.frame_len_done += r;
1098		if(dtio->read_frame.frame_len_done < 4)
1099			return -1; /* continue reading */
1100
1101		if(dtio->read_frame.frame_len == 0) {
1102			dtio->read_frame.frame_len_done = 0;
1103			dtio->read_frame.control_frame = 1;
1104			continue;
1105		}
1106		dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1107		if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108			verbose(VERB_OPS, "dnstap: received frame exceeds max "
1109				"length of %d bytes, closing connection",
1110				DTIO_RECV_FRAME_MAX_LEN);
1111			goto close_connection;
1112		}
1113		dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1114		dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1115		if(!dtio->read_frame.buf) {
1116			log_err("dnstap io: out of memory (creating read "
1117				"buffer)");
1118			goto close_connection;
1119		}
1120	}
1121	if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1122#ifdef HAVE_SSL
1123		if(dtio->ssl) {
1124			r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1125				dtio->read_frame.buf_count,
1126				dtio->read_frame.buf_cap-
1127				dtio->read_frame.buf_count);
1128		} else {
1129#endif
1130			r = receive_bytes(dtio, dtio->read_frame.buf+
1131				dtio->read_frame.buf_count,
1132				dtio->read_frame.buf_cap-
1133				dtio->read_frame.buf_count);
1134#ifdef HAVE_SSL
1135		}
1136#endif
1137		if(r == -1)
1138			return -1; /* continue reading */
1139		if(r == 0) {
1140			 /* connection closed */
1141			goto close_connection;
1142		}
1143		dtio->read_frame.buf_count += r;
1144		if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145			return -1; /* continue reading */
1146	}
1147
1148	/* Complete frame received, check if this is a valid ACCEPT control
1149	 * frame. */
1150	if(dtio->read_frame.frame_len < 4) {
1151		verbose(VERB_OPS, "dnstap: invalid data received");
1152		goto close_connection;
1153	}
1154	if(sldns_read_uint32(dtio->read_frame.buf) !=
1155		FSTRM_CONTROL_FRAME_ACCEPT) {
1156		verbose(VERB_ALGO, "dnstap: invalid control type received, "
1157			"ignored");
1158		dtio->ready_frame_sent = 0;
1159		dtio->accept_frame_received = 0;
1160		dtio_read_frame_free(&dtio->read_frame);
1161		return -1;
1162	}
1163	read_frame_done = 4; /* control frame type */
1164
1165	/* Iterate over control fields, ignore unknown types.
1166	 * Need to be able to read at least 8 bytes (control field type +
1167	 * length). */
1168	while(read_frame_done+8 < dtio->read_frame.frame_len) {
1169		uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1170			read_frame_done);
1171		uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1172			read_frame_done + 4);
1173		if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
1174			if(len == strlen(DNSTAP_CONTENT_TYPE) &&
1175				read_frame_done+8+len <=
1176				dtio->read_frame.frame_len &&
1177				memcmp(dtio->read_frame.buf + read_frame_done +
1178					+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1179				if(!dtio_control_start_send(dtio)) {
1180					verbose(VERB_OPS, "dnstap io: out of "
1181					 "memory while sending START frame");
1182					goto close_connection;
1183				}
1184				dtio->accept_frame_received = 1;
1185				if(!dtio_add_output_event_write(dtio))
1186					goto close_connection;
1187				return 1;
1188			} else {
1189				/* unknown content type */
1190				verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1191					"contains unknown content type, "
1192					"closing connection");
1193				goto close_connection;
1194			}
1195		}
1196		/* unknown option, try next */
1197		read_frame_done += 8+len;
1198	}
1199
1200
1201close_connection:
1202	dtio_del_output_event(dtio);
1203	dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1204	dtio_close_output(dtio);
1205	return 0;
1206}
1207
1208/** add the output file descriptor event for listening, read only */
1209static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1210{
1211	if(!dtio->event)
1212		return 0;
1213	if(dtio->event_added && !dtio->event_added_is_write)
1214		return 1;
1215	/* we have to (re-)register the event */
1216	if(dtio->event_added)
1217		ub_event_del(dtio->event);
1218	ub_event_del_bits(dtio->event, UB_EV_WRITE);
1219	if(ub_event_add(dtio->event, NULL) != 0) {
1220		log_err("dnstap io: out of memory (adding event)");
1221		dtio->event_added = 0;
1222		dtio->event_added_is_write = 0;
1223		/* close output and start reattempts to open it */
1224		dtio_close_output(dtio);
1225		return 0;
1226	}
1227	dtio->event_added = 1;
1228	dtio->event_added_is_write = 0;
1229	return 1;
1230}
1231
1232/** add the output file descriptor event for listening, read and write */
1233static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1234{
1235	if(!dtio->event)
1236		return 0;
1237	if(dtio->event_added && dtio->event_added_is_write)
1238		return 1;
1239	/* we have to (re-)register the event */
1240	if(dtio->event_added)
1241		ub_event_del(dtio->event);
1242	ub_event_add_bits(dtio->event, UB_EV_WRITE);
1243	if(ub_event_add(dtio->event, NULL) != 0) {
1244		log_err("dnstap io: out of memory (adding event)");
1245		dtio->event_added = 0;
1246		dtio->event_added_is_write = 0;
1247		/* close output and start reattempts to open it */
1248		dtio_close_output(dtio);
1249		return 0;
1250	}
1251	dtio->event_added = 1;
1252	dtio->event_added_is_write = 1;
1253	return 1;
1254}
1255
1256/** put the dtio thread to sleep */
1257static void dtio_sleep(struct dt_io_thread* dtio)
1258{
1259	/* unregister the event polling for write, because there is
1260	 * nothing to be written */
1261	(void)dtio_add_output_event_read(dtio);
1262}
1263
1264#ifdef HAVE_SSL
1265/** enable the brief read condition */
1266static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1267{
1268	dtio->ssl_brief_read = 1;
1269	if(dtio->stop_flush_event) {
1270		ub_event_del(dtio->stop_flush_event);
1271		ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1272		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273			log_err("dnstap io, stop flush, could not ub_event_add");
1274			return 0;
1275		}
1276		return 1;
1277	}
1278	return dtio_add_output_event_read(dtio);
1279}
1280#endif /* HAVE_SSL */
1281
1282#ifdef HAVE_SSL
1283/** disable the brief read condition */
1284static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1285{
1286	dtio->ssl_brief_read = 0;
1287	if(dtio->stop_flush_event) {
1288		ub_event_del(dtio->stop_flush_event);
1289		ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1290		if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291			log_err("dnstap io, stop flush, could not ub_event_add");
1292			return 0;
1293		}
1294		return 1;
1295	}
1296	return dtio_add_output_event_write(dtio);
1297}
1298#endif /* HAVE_SSL */
1299
1300#ifdef HAVE_SSL
1301/** enable the brief write condition */
1302static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1303{
1304	dtio->ssl_brief_write = 1;
1305	return dtio_add_output_event_write(dtio);
1306}
1307#endif /* HAVE_SSL */
1308
1309#ifdef HAVE_SSL
1310/** disable the brief write condition */
1311static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1312{
1313	dtio->ssl_brief_write = 0;
1314	return dtio_add_output_event_read(dtio);
1315}
1316#endif /* HAVE_SSL */
1317
1318#ifdef HAVE_SSL
1319/** check peer verification after ssl handshake connection, false if closed*/
1320static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1321{
1322	if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1323		/* verification */
1324		if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1325			X509* x = SSL_get_peer_certificate(dtio->ssl);
1326			if(!x) {
1327				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1328					"connection failed no certificate",
1329					dtio->ip_str);
1330				return 0;
1331			}
1332			log_cert(VERB_ALGO, "dnstap io, peer certificate",
1333				x);
1334#ifdef HAVE_SSL_GET0_PEERNAME
1335			if(SSL_get0_peername(dtio->ssl)) {
1336				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1337					"connection to %s authenticated",
1338					dtio->ip_str,
1339					SSL_get0_peername(dtio->ssl));
1340			} else {
1341#endif
1342				verbose(VERB_ALGO, "dnstap io, %s, SSL "
1343					"connection authenticated",
1344					dtio->ip_str);
1345#ifdef HAVE_SSL_GET0_PEERNAME
1346			}
1347#endif
1348			X509_free(x);
1349		} else {
1350			X509* x = SSL_get_peer_certificate(dtio->ssl);
1351			if(x) {
1352				log_cert(VERB_ALGO, "dnstap io, peer "
1353					"certificate", x);
1354				X509_free(x);
1355			}
1356			verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
1357				"failed: failed to authenticate",
1358				dtio->ip_str);
1359			return 0;
1360		}
1361	} else {
1362		/* unauthenticated, the verify peer flag was not set
1363		 * in ssl when the ssl object was created from ssl_ctx */
1364		verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
1365			dtio->ip_str);
1366	}
1367	return 1;
1368}
1369#endif /* HAVE_SSL */
1370
1371#ifdef HAVE_SSL
1372/** perform ssl handshake, returns 1 if okay, 0 to stop */
1373static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1374	struct stop_flush_info* info)
1375{
1376	int r;
1377	if(dtio->ssl_brief_read) {
1378		/* assume the brief read condition is satisfied,
1379		 * if we need more or again, we can set it again */
1380		if(!dtio_disable_brief_read(dtio)) {
1381			if(info) dtio_stop_flush_exit(info);
1382			return 0;
1383		}
1384	}
1385	if(dtio->ssl_handshake_done)
1386		return 1;
1387
1388	ERR_clear_error();
1389	r = SSL_do_handshake(dtio->ssl);
1390	if(r != 1) {
1391		int want = SSL_get_error(dtio->ssl, r);
1392		if(want == SSL_ERROR_WANT_READ) {
1393			/* we want to read on the connection */
1394			if(!dtio_enable_brief_read(dtio)) {
1395				if(info) dtio_stop_flush_exit(info);
1396				return 0;
1397			}
1398			return 0;
1399		} else if(want == SSL_ERROR_WANT_WRITE) {
1400			/* we want to write on the connection */
1401			return 0;
1402		} else if(r == 0) {
1403			/* closed */
1404			if(info) dtio_stop_flush_exit(info);
1405			dtio_del_output_event(dtio);
1406			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1407			dtio_close_output(dtio);
1408			return 0;
1409		} else if(want == SSL_ERROR_SYSCALL) {
1410			/* SYSCALL and errno==0 means closed uncleanly */
1411			int silent = 0;
1412#ifdef EPIPE
1413			if(errno == EPIPE && verbosity < 2)
1414				silent = 1; /* silence 'broken pipe' */
1415#endif
1416#ifdef ECONNRESET
1417			if(errno == ECONNRESET && verbosity < 2)
1418				silent = 1; /* silence reset by peer */
1419#endif
1420			if(errno == 0)
1421				silent = 1;
1422			if(!silent)
1423				log_err("dnstap io, SSL_handshake syscall: %s",
1424					strerror(errno));
1425			/* closed */
1426			if(info) dtio_stop_flush_exit(info);
1427			dtio_del_output_event(dtio);
1428			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1429			dtio_close_output(dtio);
1430			return 0;
1431		} else {
1432			unsigned long err = ERR_get_error();
1433			if(!squelch_err_ssl_handshake(err)) {
1434				log_crypto_err_io_code("dnstap io, ssl handshake failed",
1435					want, err);
1436				verbose(VERB_OPS, "dnstap io, ssl handshake failed "
1437					"from %s", dtio->ip_str);
1438			}
1439			/* closed */
1440			if(info) dtio_stop_flush_exit(info);
1441			dtio_del_output_event(dtio);
1442			dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1443			dtio_close_output(dtio);
1444			return 0;
1445		}
1446
1447	}
1448	/* check peer verification */
1449	dtio->ssl_handshake_done = 1;
1450
1451	if(!dtio_ssl_check_peer(dtio)) {
1452		/* closed */
1453		if(info) dtio_stop_flush_exit(info);
1454		dtio_del_output_event(dtio);
1455		dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1456		dtio_close_output(dtio);
1457		return 0;
1458	}
1459	return 1;
1460}
1461#endif /* HAVE_SSL */
1462
1463/** callback for the dnstap events, to write to the output */
1464void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1465{
1466	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1467	int i;
1468
1469	if(dtio->check_nb_connect) {
1470		int connect_err = dtio_check_nb_connect(dtio);
1471		if(connect_err == -1) {
1472			/* close the channel */
1473			dtio_del_output_event(dtio);
1474			dtio_close_output(dtio);
1475			return;
1476		} else if(connect_err == 0) {
1477			/* try again later */
1478			return;
1479		}
1480		/* nonblocking connect check passed, continue */
1481	}
1482
1483#ifdef HAVE_SSL
1484	if(dtio->ssl &&
1485		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1486		if(!dtio_ssl_handshake(dtio, NULL))
1487			return;
1488	}
1489#endif
1490
1491	if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1492		if(dtio->ssl_brief_write)
1493			(void)dtio_disable_brief_write(dtio);
1494		if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1495			if(dtio_read_accept_frame(dtio) <= 0)
1496				return;
1497		} else if(!dtio_check_close(dtio))
1498			return;
1499	}
1500
1501	/* loop to process a number of messages.  This improves throughput,
1502	 * because selecting on write-event if not needed for busy messages
1503	 * (dnstap log) generation and if they need to all be written back.
1504	 * The write event is usually not blocked up.  But not forever,
1505	 * because the event loop needs to stay responsive for other events.
1506	 * If there are no (more) messages, or if the output buffers get
1507	 * full, it returns out of the loop. */
1508	for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
1509		/* see if there are messages that need writing */
1510		if(!dtio->cur_msg) {
1511			if(!dtio_find_msg(dtio)) {
1512				if(i == 0) {
1513					/* no messages on the first iteration,
1514					 * the queues are all empty */
1515					dtio_sleep(dtio);
1516				}
1517				return; /* nothing to do */
1518			}
1519		}
1520
1521		/* write it */
1522		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1523			if(!dtio_write_more(dtio))
1524				return;
1525		}
1526
1527		/* done with the current message */
1528		dtio_cur_msg_free(dtio);
1529
1530		/* If this is a bidirectional stream the first message will be
1531		 * the READY control frame. We can only continue writing after
1532		 * receiving an ACCEPT control frame. */
1533		if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1534			dtio->ready_frame_sent = 1;
1535			(void)dtio_add_output_event_read(dtio);
1536			break;
1537		}
1538	}
1539}
1540
1541/** callback for the dnstap commandpipe, to stop the dnstap IO */
1542void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
1543{
1544	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1545	uint8_t cmd;
1546	ssize_t r;
1547	if(dtio->want_to_exit)
1548		return;
1549	r = read(fd, &cmd, sizeof(cmd));
1550	if(r == -1) {
1551#ifndef USE_WINSOCK
1552		if(errno == EINTR || errno == EAGAIN)
1553			return; /* ignore this */
1554#else
1555		if(WSAGetLastError() == WSAEINPROGRESS)
1556			return;
1557		if(WSAGetLastError() == WSAEWOULDBLOCK)
1558			return;
1559#endif
1560		log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1561		/* and then fall through to quit the thread */
1562	} else if(r == 0) {
1563		verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1564	} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1565		verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1566	} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1567		verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1568
1569		if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1570			verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
1571				"waiting for ACCEPT control frame");
1572			return;
1573		}
1574
1575		/* reregister event */
1576		if(!dtio_add_output_event_write(dtio))
1577			return;
1578		return;
1579	} else if(r == 1) {
1580		verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
1581	}
1582	dtio->want_to_exit = 1;
1583	if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1584		!= 0) {
1585		log_err("dnstap io: could not loopexit");
1586	}
1587}
1588
1589#ifndef THREADS_DISABLED
1590/** setup the event base for the dnstap io thread */
1591static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1592	struct timeval* now)
1593{
1594	memset(now, 0, sizeof(*now));
1595	dtio->event_base = ub_default_event_base(0, secs, now);
1596	if(!dtio->event_base) {
1597		fatal_exit("dnstap io: could not create event_base");
1598	}
1599}
1600#endif /* THREADS_DISABLED */
1601
1602/** setup the cmd event for dnstap io */
1603static void dtio_setup_cmd(struct dt_io_thread* dtio)
1604{
1605	struct ub_event* cmdev;
1606	fd_set_nonblock(dtio->commandpipe[0]);
1607	cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1608		UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1609	if(!cmdev) {
1610		fatal_exit("dnstap io: out of memory");
1611	}
1612	dtio->command_event = cmdev;
1613	if(ub_event_add(cmdev, NULL) != 0) {
1614		fatal_exit("dnstap io: out of memory (adding event)");
1615	}
1616}
1617
1618/** setup the reconnect event for dnstap io */
1619static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1620{
1621	dtio_reconnect_clear(dtio);
1622	dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1623		UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1624	if(!dtio->reconnect_timer) {
1625		fatal_exit("dnstap io: out of memory");
1626	}
1627}
1628
1629/**
1630 * structure to keep track of information during stop flush
1631 */
1632struct stop_flush_info {
1633	/** the event base during stop flush */
1634	struct ub_event_base* base;
1635	/** did we already want to exit this stop-flush event base */
1636	int want_to_exit_flush;
1637	/** has the timer fired */
1638	int timer_done;
1639	/** the dtio */
1640	struct dt_io_thread* dtio;
1641	/** the stop control frame */
1642	void* stop_frame;
1643	/** length of the stop frame */
1644	size_t stop_frame_len;
1645	/** how much we have done of the stop frame */
1646	size_t stop_frame_done;
1647};
1648
1649/** exit the stop flush base */
1650static void dtio_stop_flush_exit(struct stop_flush_info* info)
1651{
1652	if(info->want_to_exit_flush)
1653		return;
1654	info->want_to_exit_flush = 1;
1655	if(ub_event_base_loopexit(info->base) != 0) {
1656		log_err("dnstap io: could not loopexit");
1657	}
1658}
1659
1660/** send the stop control,
1661 * return true if completed the frame. */
1662static int dtio_control_stop_send(struct stop_flush_info* info)
1663{
1664	struct dt_io_thread* dtio = info->dtio;
1665	int r;
1666	if(info->stop_frame_done >= info->stop_frame_len)
1667		return 1;
1668	r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1669		info->stop_frame_done, info->stop_frame_len -
1670		info->stop_frame_done);
1671	if(r == -1) {
1672		verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
1673		dtio_stop_flush_exit(info);
1674		return 0;
1675	}
1676	if(r == 0) {
1677		/* try again later, or timeout */
1678		return 0;
1679	}
1680	info->stop_frame_done += r;
1681	if(info->stop_frame_done < info->stop_frame_len)
1682		return 0; /* not done yet */
1683	return 1;
1684}
1685
1686void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
1687	void* arg)
1688{
1689	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1690	if(info->want_to_exit_flush)
1691		return;
1692	verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
1693	info->timer_done = 1;
1694	dtio_stop_flush_exit(info);
1695}
1696
1697void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
1698{
1699	struct stop_flush_info* info = (struct stop_flush_info*)arg;
1700	struct dt_io_thread* dtio = info->dtio;
1701	if(info->want_to_exit_flush)
1702		return;
1703	if(dtio->check_nb_connect) {
1704		/* we don't start the stop_flush if connect still
1705		 * in progress, but the check code is here, just in case */
1706		int connect_err = dtio_check_nb_connect(dtio);
1707		if(connect_err == -1) {
1708			/* close the channel, exit the stop flush */
1709			dtio_stop_flush_exit(info);
1710			dtio_del_output_event(dtio);
1711			dtio_close_output(dtio);
1712			return;
1713		} else if(connect_err == 0) {
1714			/* try again later */
1715			return;
1716		}
1717		/* nonblocking connect check passed, continue */
1718	}
1719#ifdef HAVE_SSL
1720	if(dtio->ssl &&
1721		(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1722		if(!dtio_ssl_handshake(dtio, info))
1723			return;
1724	}
1725#endif
1726
1727	if((bits&UB_EV_READ)) {
1728		if(!dtio_check_close(dtio)) {
1729			if(dtio->fd == -1) {
1730				verbose(VERB_ALGO, "dnstap io: "
1731					"stop flush: output closed");
1732				dtio_stop_flush_exit(info);
1733			}
1734			return;
1735		}
1736	}
1737	/* write remainder of last frame */
1738	if(dtio->cur_msg) {
1739		if(dtio->cur_msg_done < dtio->cur_msg_len) {
1740			if(!dtio_write_more(dtio)) {
1741				if(dtio->fd == -1) {
1742					verbose(VERB_ALGO, "dnstap io: "
1743						"stop flush: output closed");
1744					dtio_stop_flush_exit(info);
1745				}
1746				return;
1747			}
1748		}
1749		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1750			"last frame");
1751		dtio_cur_msg_free(dtio);
1752	}
1753	/* write stop frame */
1754	if(info->stop_frame_done < info->stop_frame_len) {
1755		if(!dtio_control_stop_send(info))
1756			return;
1757		verbose(VERB_ALGO, "dnstap io: stop flush completed "
1758			"stop control frame");
1759	}
1760	/* when last frame and stop frame are sent, exit */
1761	dtio_stop_flush_exit(info);
1762}
1763
1764/** flush at end, last packet and stop control */
1765static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1766{
1767	/* briefly attempt to flush the previous packet to the output,
1768	 * this could be a partial packet, or even the start control frame */
1769	time_t secs = 0;
1770	struct timeval now;
1771	struct stop_flush_info info;
1772	struct timeval tv;
1773	struct ub_event* timer, *stopev;
1774
1775	if(dtio->fd == -1 || dtio->check_nb_connect) {
1776		/* no connection or we have just connected, so nothing is
1777		 * sent yet, so nothing to stop or flush */
1778		return;
1779	}
1780	if(dtio->ssl && !dtio->ssl_handshake_done) {
1781		/* no SSL connection has been established yet */
1782		return;
1783	}
1784
1785	memset(&info, 0, sizeof(info));
1786	memset(&now, 0, sizeof(now));
1787	info.dtio = dtio;
1788	info.base = ub_default_event_base(0, &secs, &now);
1789	if(!info.base) {
1790		log_err("dnstap io: malloc failure");
1791		return;
1792	}
1793	timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
1794		&dtio_stop_timer_cb, &info);
1795	if(!timer) {
1796		log_err("dnstap io: malloc failure");
1797		ub_event_base_free(info.base);
1798		return;
1799	}
1800	memset(&tv, 0, sizeof(tv));
1801	tv.tv_sec = 2;
1802	if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
1803		&tv) != 0) {
1804		log_err("dnstap io: cannot event_timer_add");
1805		ub_event_free(timer);
1806		ub_event_base_free(info.base);
1807		return;
1808	}
1809	stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1810		UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
1811	if(!stopev) {
1812		log_err("dnstap io: malloc failure");
1813		ub_timer_del(timer);
1814		ub_event_free(timer);
1815		ub_event_base_free(info.base);
1816		return;
1817	}
1818	if(ub_event_add(stopev, NULL) != 0) {
1819		log_err("dnstap io: cannot event_add");
1820		ub_event_free(stopev);
1821		ub_timer_del(timer);
1822		ub_event_free(timer);
1823		ub_event_base_free(info.base);
1824		return;
1825	}
1826	info.stop_frame = fstrm_create_control_frame_stop(
1827		&info.stop_frame_len);
1828	if(!info.stop_frame) {
1829		log_err("dnstap io: malloc failure");
1830		ub_event_del(stopev);
1831		ub_event_free(stopev);
1832		ub_timer_del(timer);
1833		ub_event_free(timer);
1834		ub_event_base_free(info.base);
1835		return;
1836	}
1837	dtio->stop_flush_event = stopev;
1838
1839	/* wait briefly, or until finished */
1840	verbose(VERB_ALGO, "dnstap io: stop flush started");
1841	if(ub_event_base_dispatch(info.base) < 0) {
1842		log_err("dnstap io: dispatch flush failed, errno is %s",
1843			strerror(errno));
1844	}
1845	verbose(VERB_ALGO, "dnstap io: stop flush ended");
1846	free(info.stop_frame);
1847	dtio->stop_flush_event = NULL;
1848	ub_event_del(stopev);
1849	ub_event_free(stopev);
1850	ub_timer_del(timer);
1851	ub_event_free(timer);
1852	ub_event_base_free(info.base);
1853}
1854
1855/** perform desetup and free stuff when the dnstap io thread exits */
1856static void dtio_desetup(struct dt_io_thread* dtio)
1857{
1858	dtio_control_stop_flush(dtio);
1859	dtio_del_output_event(dtio);
1860	dtio_close_output(dtio);
1861	ub_event_del(dtio->command_event);
1862	ub_event_free(dtio->command_event);
1863#ifndef USE_WINSOCK
1864	close(dtio->commandpipe[0]);
1865#else
1866	_close(dtio->commandpipe[0]);
1867#endif
1868	dtio->commandpipe[0] = -1;
1869	dtio_reconnect_del(dtio);
1870	ub_event_free(dtio->reconnect_timer);
1871	dtio_cur_msg_free(dtio);
1872#ifndef THREADS_DISABLED
1873	ub_event_base_free(dtio->event_base);
1874#endif
1875}
1876
1877/** setup a start control message */
1878static int dtio_control_start_send(struct dt_io_thread* dtio)
1879{
1880	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1881	dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1882		&dtio->cur_msg_len);
1883	if(!dtio->cur_msg) {
1884		return 0;
1885	}
1886	/* setup to send the control message */
1887	/* set that the buffer needs to be sent, but the length
1888	 * of that buffer is already written, that way the buffer can
1889	 * start with 0 length and then the length of the control frame
1890	 * in it */
1891	dtio->cur_msg_done = 0;
1892	dtio->cur_msg_len_done = 4;
1893	return 1;
1894}
1895
1896/** setup a ready control message */
1897static int dtio_control_ready_send(struct dt_io_thread* dtio)
1898{
1899	log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1900	dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1901		&dtio->cur_msg_len);
1902	if(!dtio->cur_msg) {
1903		return 0;
1904	}
1905	/* setup to send the control message */
1906	/* set that the buffer needs to be sent, but the length
1907	 * of that buffer is already written, that way the buffer can
1908	 * start with 0 length and then the length of the control frame
1909	 * in it */
1910	dtio->cur_msg_done = 0;
1911	dtio->cur_msg_len_done = 4;
1912	return 1;
1913}
1914
1915/** open the output file descriptor for af_local */
1916static int dtio_open_output_local(struct dt_io_thread* dtio)
1917{
1918#ifdef HAVE_SYS_UN_H
1919	struct sockaddr_un s;
1920	dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1921	if(dtio->fd == -1) {
1922		log_err("dnstap io: failed to create socket: %s",
1923			sock_strerror(errno));
1924		return 0;
1925	}
1926	memset(&s, 0, sizeof(s));
1927#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1928        /* this member exists on BSDs, not Linux */
1929        s.sun_len = (unsigned)sizeof(s);
1930#endif
1931	s.sun_family = AF_LOCAL;
1932	/* length is 92-108, 104 on FreeBSD */
1933        (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1934	fd_set_nonblock(dtio->fd);
1935	if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1936		== -1) {
1937		char* to = dtio->socket_path;
1938		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1939			verbosity < 4) {
1940			dtio_close_fd(dtio);
1941			return 0; /* no log retries on low verbosity */
1942		}
1943		log_err("dnstap io: failed to connect to \"%s\": %s",
1944			to, sock_strerror(errno));
1945		dtio_close_fd(dtio);
1946		return 0;
1947	}
1948	return 1;
1949#else
1950	log_err("cannot create af_local socket");
1951	return 0;
1952#endif /* HAVE_SYS_UN_H */
1953}
1954
1955/** open the output file descriptor for af_inet and af_inet6 */
1956static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1957{
1958	struct sockaddr_storage addr;
1959	socklen_t addrlen;
1960	memset(&addr, 0, sizeof(addr));
1961	addrlen = (socklen_t)sizeof(addr);
1962
1963	if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
1964		log_err("could not parse IP '%s'", dtio->ip_str);
1965		return 0;
1966	}
1967	dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1968	if(dtio->fd == -1) {
1969		log_err("can't create socket: %s", sock_strerror(errno));
1970		return 0;
1971	}
1972	fd_set_nonblock(dtio->fd);
1973	if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1974		if(errno == EINPROGRESS)
1975			return 1; /* wait until connect done*/
1976		if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1977			verbosity < 4) {
1978			dtio_close_fd(dtio);
1979			return 0; /* no log retries on low verbosity */
1980		}
1981#ifndef USE_WINSOCK
1982		if(tcp_connect_errno_needs_log(
1983			(struct sockaddr *)&addr, addrlen)) {
1984			log_err("dnstap io: failed to connect to %s: %s",
1985				dtio->ip_str, strerror(errno));
1986		}
1987#else
1988		if(WSAGetLastError() == WSAEINPROGRESS ||
1989			WSAGetLastError() == WSAEWOULDBLOCK)
1990			return 1; /* wait until connect done*/
1991		if(tcp_connect_errno_needs_log(
1992			(struct sockaddr *)&addr, addrlen)) {
1993			log_err("dnstap io: failed to connect to %s: %s",
1994				dtio->ip_str, wsa_strerror(WSAGetLastError()));
1995		}
1996#endif
1997		dtio_close_fd(dtio);
1998		return 0;
1999	}
2000	return 1;
2001}
2002
2003/** setup the SSL structure for new connection */
2004static int dtio_setup_ssl(struct dt_io_thread* dtio)
2005{
2006	dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2007	if(!dtio->ssl) return 0;
2008	dtio->ssl_handshake_done = 0;
2009	dtio->ssl_brief_read = 0;
2010
2011	if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2012		dtio->tls_use_sni)) {
2013		return 0;
2014	}
2015	return 1;
2016}
2017
2018/** open the output file descriptor */
2019static void dtio_open_output(struct dt_io_thread* dtio)
2020{
2021	struct ub_event* ev;
2022	if(dtio->upstream_is_unix) {
2023		if(!dtio_open_output_local(dtio)) {
2024			dtio_reconnect_enable(dtio);
2025			return;
2026		}
2027	} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2028		if(!dtio_open_output_tcp(dtio)) {
2029			dtio_reconnect_enable(dtio);
2030			return;
2031		}
2032		if(dtio->upstream_is_tls) {
2033			if(!dtio_setup_ssl(dtio)) {
2034				dtio_close_fd(dtio);
2035				dtio_reconnect_enable(dtio);
2036				return;
2037			}
2038		}
2039	}
2040	dtio->check_nb_connect = 1;
2041
2042	/* the EV_READ is to read ACCEPT control messages, and catch channel
2043	 * close. EV_WRITE is to write packets */
2044	ev = ub_event_new(dtio->event_base, dtio->fd,
2045		UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
2046		dtio);
2047	if(!ev) {
2048		log_err("dnstap io: out of memory");
2049		if(dtio->ssl) {
2050#ifdef HAVE_SSL
2051			SSL_free(dtio->ssl);
2052			dtio->ssl = NULL;
2053#endif
2054		}
2055		dtio_close_fd(dtio);
2056		dtio_reconnect_enable(dtio);
2057		return;
2058	}
2059	dtio->event = ev;
2060
2061	/* setup protocol control message to start */
2062	if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2063		(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2064		log_err("dnstap io: out of memory");
2065		ub_event_free(dtio->event);
2066		dtio->event = NULL;
2067		if(dtio->ssl) {
2068#ifdef HAVE_SSL
2069			SSL_free(dtio->ssl);
2070			dtio->ssl = NULL;
2071#endif
2072		}
2073		dtio_close_fd(dtio);
2074		dtio_reconnect_enable(dtio);
2075		return;
2076	}
2077}
2078
2079/** perform the setup of the writer thread on the established event_base */
2080static void dtio_setup_on_base(struct dt_io_thread* dtio)
2081{
2082	dtio_setup_cmd(dtio);
2083	dtio_setup_reconnect(dtio);
2084	dtio_open_output(dtio);
2085	if(!dtio_add_output_event_write(dtio))
2086		return;
2087}
2088
2089#ifndef THREADS_DISABLED
2090/** the IO thread function for the DNSTAP IO */
2091static void* dnstap_io(void* arg)
2092{
2093	struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2094	time_t secs = 0;
2095	struct timeval now;
2096	log_thread_set(&dtio->threadnum);
2097
2098	/* setup */
2099	verbose(VERB_ALGO, "start dnstap io thread");
2100	dtio_setup_base(dtio, &secs, &now);
2101	dtio_setup_on_base(dtio);
2102
2103	/* run */
2104	if(ub_event_base_dispatch(dtio->event_base) < 0) {
2105		log_err("dnstap io: dispatch failed, errno is %s",
2106			strerror(errno));
2107	}
2108
2109	/* cleanup */
2110	verbose(VERB_ALGO, "stop dnstap io thread");
2111	dtio_desetup(dtio);
2112	return NULL;
2113}
2114#endif /* THREADS_DISABLED */
2115
2116int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2117	int numworkers)
2118{
2119	/* set up the thread, can fail */
2120#ifndef USE_WINSOCK
2121	if(pipe(dtio->commandpipe) == -1) {
2122		log_err("failed to create pipe: %s", strerror(errno));
2123		return 0;
2124	}
2125#else
2126	if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2127		log_err("failed to create _pipe: %s",
2128			wsa_strerror(WSAGetLastError()));
2129		return 0;
2130	}
2131#endif
2132
2133	/* start the thread */
2134	dtio->threadnum = numworkers+1;
2135	dtio->started = 1;
2136#ifndef THREADS_DISABLED
2137	ub_thread_create(&dtio->tid, dnstap_io, dtio);
2138	(void)event_base_nothr;
2139#else
2140	dtio->event_base = event_base_nothr;
2141	dtio_setup_on_base(dtio);
2142#endif
2143	return 1;
2144}
2145
2146void dt_io_thread_stop(struct dt_io_thread* dtio)
2147{
2148#ifndef THREADS_DISABLED
2149	uint8_t cmd = DTIO_COMMAND_STOP;
2150#endif
2151	if(!dtio) return;
2152	if(!dtio->started) return;
2153	verbose(VERB_ALGO, "dnstap io: send stop cmd");
2154
2155#ifndef THREADS_DISABLED
2156	while(1) {
2157		ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2158		if(r == -1) {
2159#ifndef USE_WINSOCK
2160			if(errno == EINTR || errno == EAGAIN)
2161				continue;
2162#else
2163			if(WSAGetLastError() == WSAEINPROGRESS)
2164				continue;
2165			if(WSAGetLastError() == WSAEWOULDBLOCK)
2166				continue;
2167#endif
2168			log_err("dnstap io stop: write: %s",
2169				sock_strerror(errno));
2170			break;
2171		}
2172		break;
2173	}
2174	dtio->started = 0;
2175#endif /* THREADS_DISABLED */
2176
2177#ifndef USE_WINSOCK
2178	close(dtio->commandpipe[1]);
2179#else
2180	_close(dtio->commandpipe[1]);
2181#endif
2182	dtio->commandpipe[1] = -1;
2183#ifndef THREADS_DISABLED
2184	ub_thread_join(dtio->tid);
2185#else
2186	dtio->want_to_exit = 1;
2187	dtio_desetup(dtio);
2188#endif
2189}
2190