Deleted Added
full compact
dtstream.c (366095) dtstream.c (368129)
1/*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 54 unchanged lines hidden (view full) ---

63/** number of messages to process in one output callback */
64#define DTIO_MESSAGES_PER_CALLBACK 100
65/** the msec to wait for reconnect (if not immediate, the first attempt) */
66#define DTIO_RECONNECT_TIMEOUT_MIN 10
67/** the msec to wait for reconnect max after backoff */
68#define DTIO_RECONNECT_TIMEOUT_MAX 1000
69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
1/*
2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 54 unchanged lines hidden (view full) ---

63/** number of messages to process in one output callback */
64#define DTIO_MESSAGES_PER_CALLBACK 100
65/** the msec to wait for reconnect (if not immediate, the first attempt) */
66#define DTIO_RECONNECT_TIMEOUT_MIN 10
67/** the msec to wait for reconnect max after backoff */
68#define DTIO_RECONNECT_TIMEOUT_MAX 1000
69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */
70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
71/** number of messages before wakeup of thread */
72#define DTIO_MSG_FOR_WAKEUP 32
71
72/** maximum length of received frame */
73#define DTIO_RECV_FRAME_MAX_LEN 1000
74
75struct stop_flush_info;
76/** DTIO command channel commands */
77enum {
78 /** DTIO command channel stop */

--- 15 unchanged lines hidden (view full) ---

94#ifdef HAVE_SSL
95/** enable briefly waiting for a read event, for SSL negotiation */
96static int dtio_enable_brief_read(struct dt_io_thread* dtio);
97/** enable briefly waiting for a write event, for SSL negotiation */
98static int dtio_enable_brief_write(struct dt_io_thread* dtio);
99#endif
100
101struct dt_msg_queue*
73
74/** maximum length of received frame */
75#define DTIO_RECV_FRAME_MAX_LEN 1000
76
77struct stop_flush_info;
78/** DTIO command channel commands */
79enum {
80 /** DTIO command channel stop */

--- 15 unchanged lines hidden (view full) ---

96#ifdef HAVE_SSL
97/** enable briefly waiting for a read event, for SSL negotiation */
98static int dtio_enable_brief_read(struct dt_io_thread* dtio);
99/** enable briefly waiting for a write event, for SSL negotiation */
100static int dtio_enable_brief_write(struct dt_io_thread* dtio);
101#endif
102
103struct dt_msg_queue*
102dt_msg_queue_create(void)
104dt_msg_queue_create(struct comm_base* base)
103{
104 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
105 if(!mq) return NULL;
106 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
107 about 1 M should contain 64K messages with some overhead,
108 or a whole bunch smaller ones */
105{
106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
107 if(!mq) return NULL;
108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
109 about 1 M should contain 64K messages with some overhead,
110 or a whole bunch smaller ones */
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
112 if(!mq->wakeup_timer) {
113 free(mq);
114 return NULL;
115 }
109 lock_basic_init(&mq->lock);
110 lock_protect(&mq->lock, mq, sizeof(*mq));
111 return mq;
112}
113
114/** clear the message list, caller must hold the lock */
115static void
116dt_msg_queue_clear(struct dt_msg_queue* mq)
117{
118 struct dt_msg_entry* e = mq->first, *next=NULL;
119 while(e) {
120 next = e->next;
121 free(e->buf);
122 free(e);
123 e = next;
124 }
125 mq->first = NULL;
126 mq->last = NULL;
127 mq->cursize = 0;
116 lock_basic_init(&mq->lock);
117 lock_protect(&mq->lock, mq, sizeof(*mq));
118 return mq;
119}
120
121/** clear the message list, caller must hold the lock */
122static void
123dt_msg_queue_clear(struct dt_msg_queue* mq)
124{
125 struct dt_msg_entry* e = mq->first, *next=NULL;
126 while(e) {
127 next = e->next;
128 free(e->buf);
129 free(e);
130 e = next;
131 }
132 mq->first = NULL;
133 mq->last = NULL;
134 mq->cursize = 0;
135 mq->msgcount = 0;
128}
129
130void
131dt_msg_queue_delete(struct dt_msg_queue* mq)
132{
133 if(!mq) return;
134 lock_basic_destroy(&mq->lock);
135 dt_msg_queue_clear(mq);
136}
137
138void
139dt_msg_queue_delete(struct dt_msg_queue* mq)
140{
141 if(!mq) return;
142 lock_basic_destroy(&mq->lock);
143 dt_msg_queue_clear(mq);
144 comm_timer_delete(mq->wakeup_timer);
136 free(mq);
137}
138
139/** make the dtio wake up by sending a wakeup command */
140static void dtio_wakeup(struct dt_io_thread* dtio)
141{
142 uint8_t cmd = DTIO_COMMAND_WAKEUP;
143 if(!dtio) return;
144 if(!dtio->started) return;
145
146 while(1) {
147 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
148 if(r == -1) {
149#ifndef USE_WINSOCK
150 if(errno == EINTR || errno == EAGAIN)
151 continue;
145 free(mq);
146}
147
148/** make the dtio wake up by sending a wakeup command */
149static void dtio_wakeup(struct dt_io_thread* dtio)
150{
151 uint8_t cmd = DTIO_COMMAND_WAKEUP;
152 if(!dtio) return;
153 if(!dtio->started) return;
154
155 while(1) {
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
157 if(r == -1) {
158#ifndef USE_WINSOCK
159 if(errno == EINTR || errno == EAGAIN)
160 continue;
152 log_err("dnstap io wakeup: write: %s", strerror(errno));
153#else
154 if(WSAGetLastError() == WSAEINPROGRESS)
155 continue;
156 if(WSAGetLastError() == WSAEWOULDBLOCK)
157 continue;
161#else
162 if(WSAGetLastError() == WSAEINPROGRESS)
163 continue;
164 if(WSAGetLastError() == WSAEWOULDBLOCK)
165 continue;
158 log_err("dnstap io stop: write: %s",
159 wsa_strerror(WSAGetLastError()));
160#endif
166#endif
167 log_err("dnstap io wakeup: write: %s",
168 sock_strerror(errno));
161 break;
162 }
163 break;
164 }
165}
166
167void
169 break;
170 }
171 break;
172 }
173}
174
175void
176mq_wakeup_cb(void* arg)
177{
178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
179 /* even if the dtio is already active, because perhaps much
180 * traffic suddenly, we leave the timer running to save on
181 * managing it, the once a second timer is less work then
182 * starting and stopping the timer frequently */
183 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 mq->dtio->wakeup_timer_enabled = 0;
185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 dtio_wakeup(mq->dtio);
187}
188
189/** start timer to wakeup dtio because there is content in the queue */
190static void
191dt_msg_queue_start_timer(struct dt_msg_queue* mq)
192{
193 struct timeval tv;
194 /* Start a timer to process messages to be logged.
195 * If we woke up the dtio thread for every message, the wakeup
196 * messages take up too much processing power. If the queue
197 * fills up the wakeup happens immediately. The timer wakes it up
198 * if there are infrequent messages to log. */
199
200 /* we cannot start a timer in dtio thread, because it is a different
201 * thread and its event base is in use by the other thread, it would
202 * give race conditions if we tried to modify its event base,
203 * and locks would wait until it woke up, and this is what we do. */
204
205 /* do not start the timer if a timer already exists, perhaps
206 * in another worker. So this variable is protected by a lock in
207 * dtio */
208 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 if(mq->dtio->wakeup_timer_enabled) {
210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
211 return;
212 }
213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
215
216 /* start the timer, in mq, in the event base of our worker */
217 tv.tv_sec = 1;
218 tv.tv_usec = 0;
219 comm_timer_set(mq->wakeup_timer, &tv);
220}
221
222void
168dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
169{
223dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
224{
170 int wakeup = 0;
225 int wakeupnow = 0, wakeupstarttimer = 0;
171 struct dt_msg_entry* entry;
172
173 /* check conditions */
174 if(!buf) return;
175 if(len == 0) {
176 /* it is not possible to log entries with zero length,
177 * because the framestream protocol does not carry it.
178 * However the protobuf serialization does not create zero

--- 14 unchanged lines hidden (view full) ---

193 return;
194 }
195 entry->next = NULL;
196 entry->buf = buf;
197 entry->len = len;
198
199 /* aqcuire lock */
200 lock_basic_lock(&mq->lock);
226 struct dt_msg_entry* entry;
227
228 /* check conditions */
229 if(!buf) return;
230 if(len == 0) {
231 /* it is not possible to log entries with zero length,
232 * because the framestream protocol does not carry it.
233 * However the protobuf serialization does not create zero

--- 14 unchanged lines hidden (view full) ---

248 return;
249 }
250 entry->next = NULL;
251 entry->buf = buf;
252 entry->len = len;
253
254 /* aqcuire lock */
255 lock_basic_lock(&mq->lock);
201 /* list was empty, wakeup dtio */
256 /* if list was empty, start timer for (eventual) wakeup */
202 if(mq->first == NULL)
257 if(mq->first == NULL)
203 wakeup = 1;
258 wakeupstarttimer = 1;
259 /* if list contains more than wakeupnum elements, wakeup now,
260 * or if list is (going to be) almost full */
261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
262 (mq->cursize < mq->maxsize * 9 / 10 &&
263 mq->cursize+len >= mq->maxsize * 9 / 10))
264 wakeupnow = 1;
204 /* see if it is going to fit */
205 if(mq->cursize + len > mq->maxsize) {
206 /* buffer full, or congested. */
207 /* drop */
208 lock_basic_unlock(&mq->lock);
209 free(buf);
210 free(entry);
211 return;
212 }
213 mq->cursize += len;
265 /* see if it is going to fit */
266 if(mq->cursize + len > mq->maxsize) {
267 /* buffer full, or congested. */
268 /* drop */
269 lock_basic_unlock(&mq->lock);
270 free(buf);
271 free(entry);
272 return;
273 }
274 mq->cursize += len;
275 mq->msgcount ++;
214 /* append to list */
215 if(mq->last) {
216 mq->last->next = entry;
217 } else {
218 mq->first = entry;
219 }
220 mq->last = entry;
221 /* release lock */
222 lock_basic_unlock(&mq->lock);
223
276 /* append to list */
277 if(mq->last) {
278 mq->last->next = entry;
279 } else {
280 mq->first = entry;
281 }
282 mq->last = entry;
283 /* release lock */
284 lock_basic_unlock(&mq->lock);
285
224 if(wakeup)
286 if(wakeupnow) {
225 dtio_wakeup(mq->dtio);
287 dtio_wakeup(mq->dtio);
288 } else if(wakeupstarttimer) {
289 dt_msg_queue_start_timer(mq);
290 }
226}
227
228struct dt_io_thread* dt_io_thread_create(void)
229{
230 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
291}
292
293struct dt_io_thread* dt_io_thread_create(void)
294{
295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 lock_basic_init(&dtio->wakeup_timer_lock);
297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 sizeof(dtio->wakeup_timer_enabled));
231 return dtio;
232}
233
234void dt_io_thread_delete(struct dt_io_thread* dtio)
235{
236 struct dt_io_list_item* item, *nextitem;
237 if(!dtio) return;
299 return dtio;
300}
301
302void dt_io_thread_delete(struct dt_io_thread* dtio)
303{
304 struct dt_io_list_item* item, *nextitem;
305 if(!dtio) return;
306 lock_basic_destroy(&dtio->wakeup_timer_lock);
238 item=dtio->io_list;
239 while(item) {
240 nextitem = item->next;
241 free(item);
242 item = nextitem;
243 }
244 free(dtio->socket_path);
245 free(dtio->ip_str);

--- 28 unchanged lines hidden (view full) ---

274 if(dtio->upstream_is_unix) {
275 if(!cfg->dnstap_socket_path ||
276 cfg->dnstap_socket_path[0]==0) {
277 log_err("dnstap setup: no dnstap-socket-path for "
278 "socket connect");
279 return 0;
280 }
281 free(dtio->socket_path);
307 item=dtio->io_list;
308 while(item) {
309 nextitem = item->next;
310 free(item);
311 item = nextitem;
312 }
313 free(dtio->socket_path);
314 free(dtio->ip_str);

--- 28 unchanged lines hidden (view full) ---

343 if(dtio->upstream_is_unix) {
344 if(!cfg->dnstap_socket_path ||
345 cfg->dnstap_socket_path[0]==0) {
346 log_err("dnstap setup: no dnstap-socket-path for "
347 "socket connect");
348 return 0;
349 }
350 free(dtio->socket_path);
282 dtio->socket_path = strdup(cfg->dnstap_socket_path);
351 dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path,
352 cfg, 1);
283 if(!dtio->socket_path) {
284 log_err("dnstap setup: malloc failure");
285 return 0;
286 }
287 }
288
289 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
290 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {

--- 120 unchanged lines hidden (view full) ---

411 size_t* len)
412{
413 lock_basic_lock(&mq->lock);
414 if(mq->first) {
415 struct dt_msg_entry* entry = mq->first;
416 mq->first = entry->next;
417 if(!entry->next) mq->last = NULL;
418 mq->cursize -= entry->len;
353 if(!dtio->socket_path) {
354 log_err("dnstap setup: malloc failure");
355 return 0;
356 }
357 }
358
359 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
360 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {

--- 120 unchanged lines hidden (view full) ---

481 size_t* len)
482{
483 lock_basic_lock(&mq->lock);
484 if(mq->first) {
485 struct dt_msg_entry* entry = mq->first;
486 mq->first = entry->next;
487 if(!entry->next) mq->last = NULL;
488 mq->cursize -= entry->len;
489 mq->msgcount --;
419 lock_basic_unlock(&mq->lock);
420
421 *buf = entry->buf;
422 *len = entry->len;
423 free(entry);
424 return 1;
425 }
426 lock_basic_unlock(&mq->lock);

--- 155 unchanged lines hidden (view full) ---

582 ub_event_del(dtio->event);
583 dtio->event_added = 0;
584 dtio->event_added_is_write = 0;
585}
586
587/** close dtio socket and set it to -1 */
588static void dtio_close_fd(struct dt_io_thread* dtio)
589{
490 lock_basic_unlock(&mq->lock);
491
492 *buf = entry->buf;
493 *len = entry->len;
494 free(entry);
495 return 1;
496 }
497 lock_basic_unlock(&mq->lock);

--- 155 unchanged lines hidden (view full) ---

653 ub_event_del(dtio->event);
654 dtio->event_added = 0;
655 dtio->event_added_is_write = 0;
656}
657
658/** close dtio socket and set it to -1 */
659static void dtio_close_fd(struct dt_io_thread* dtio)
660{
590#ifndef USE_WINSOCK
591 close(dtio->fd);
592#else
593 closesocket(dtio->fd);
594#endif
661 sock_close(dtio->fd);
595 dtio->fd = -1;
596}
597
598/** close and stop the output file descriptor event */
599static void dtio_close_output(struct dt_io_thread* dtio)
600{
601 if(!dtio->event)
602 return;

--- 51 unchanged lines hidden (view full) ---

654 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
655 return 0; /* try again later */
656 }
657#endif
658 if(error != 0) {
659 char* to = dtio->socket_path;
660 if(!to) to = dtio->ip_str;
661 if(!to) to = "";
662 dtio->fd = -1;
663}
664
665/** close and stop the output file descriptor event */
666static void dtio_close_output(struct dt_io_thread* dtio)
667{
668 if(!dtio->event)
669 return;

--- 51 unchanged lines hidden (view full) ---

721 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
722 return 0; /* try again later */
723 }
724#endif
725 if(error != 0) {
726 char* to = dtio->socket_path;
727 if(!to) to = dtio->ip_str;
728 if(!to) to = "";
662#ifndef USE_WINSOCK
663 log_err("dnstap io: failed to connect to \"%s\": %s",
729 log_err("dnstap io: failed to connect to \"%s\": %s",
664 to, strerror(error));
665#else
666 log_err("dnstap io: failed to connect to \"%s\": %s",
667 to, wsa_strerror(error));
668#endif
730 to, sock_strerror(error));
669 return -1; /* error, close it */
670 }
671
672 if(dtio->ip_str)
673 verbose(VERB_DETAIL, "dnstap io: connected to %s",
674 dtio->ip_str);
675 else if(dtio->socket_path)
676 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",

--- 60 unchanged lines hidden (view full) ---

737 if(dtio->ssl)
738 return dtio_write_ssl(dtio, buf, len);
739#endif
740 ret = send(dtio->fd, (void*)buf, len, 0);
741 if(ret == -1) {
742#ifndef USE_WINSOCK
743 if(errno == EINTR || errno == EAGAIN)
744 return 0;
731 return -1; /* error, close it */
732 }
733
734 if(dtio->ip_str)
735 verbose(VERB_DETAIL, "dnstap io: connected to %s",
736 dtio->ip_str);
737 else if(dtio->socket_path)
738 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",

--- 60 unchanged lines hidden (view full) ---

799 if(dtio->ssl)
800 return dtio_write_ssl(dtio, buf, len);
801#endif
802 ret = send(dtio->fd, (void*)buf, len, 0);
803 if(ret == -1) {
804#ifndef USE_WINSOCK
805 if(errno == EINTR || errno == EAGAIN)
806 return 0;
745 log_err("dnstap io: failed send: %s", strerror(errno));
746#else
747 if(WSAGetLastError() == WSAEINPROGRESS)
748 return 0;
749 if(WSAGetLastError() == WSAEWOULDBLOCK) {
750 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
751 dtio->stop_flush_event:dtio->event),
752 UB_EV_WRITE);
753 return 0;
754 }
807#else
808 if(WSAGetLastError() == WSAEINPROGRESS)
809 return 0;
810 if(WSAGetLastError() == WSAEWOULDBLOCK) {
811 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
812 dtio->stop_flush_event:dtio->event),
813 UB_EV_WRITE);
814 return 0;
815 }
755 log_err("dnstap io: failed send: %s",
756 wsa_strerror(WSAGetLastError()));
757#endif
816#endif
817 log_err("dnstap io: failed send: %s", sock_strerror(errno));
758 return -1;
759 }
760 return ret;
761}
762
763#ifdef HAVE_WRITEV
764/** write with writev, len and message, in one write, if possible.
765 * return true if message is done, false if incomplete */

--- 7 unchanged lines hidden (view full) ---

773 iov[1].iov_base = dtio->cur_msg;
774 iov[1].iov_len = dtio->cur_msg_len;
775 log_assert(iov[0].iov_len > 0);
776 r = writev(dtio->fd, iov, 2);
777 if(r == -1) {
778#ifndef USE_WINSOCK
779 if(errno == EINTR || errno == EAGAIN)
780 return 0;
818 return -1;
819 }
820 return ret;
821}
822
823#ifdef HAVE_WRITEV
824/** write with writev, len and message, in one write, if possible.
825 * return true if message is done, false if incomplete */

--- 7 unchanged lines hidden (view full) ---

833 iov[1].iov_base = dtio->cur_msg;
834 iov[1].iov_len = dtio->cur_msg_len;
835 log_assert(iov[0].iov_len > 0);
836 r = writev(dtio->fd, iov, 2);
837 if(r == -1) {
838#ifndef USE_WINSOCK
839 if(errno == EINTR || errno == EAGAIN)
840 return 0;
781 log_err("dnstap io: failed writev: %s", strerror(errno));
782#else
783 if(WSAGetLastError() == WSAEINPROGRESS)
784 return 0;
785 if(WSAGetLastError() == WSAEWOULDBLOCK) {
786 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
787 dtio->stop_flush_event:dtio->event),
788 UB_EV_WRITE);
789 return 0;
790 }
841#else
842 if(WSAGetLastError() == WSAEINPROGRESS)
843 return 0;
844 if(WSAGetLastError() == WSAEWOULDBLOCK) {
845 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
846 dtio->stop_flush_event:dtio->event),
847 UB_EV_WRITE);
848 return 0;
849 }
791 log_err("dnstap io: failed writev: %s",
792 wsa_strerror(WSAGetLastError()));
793#endif
850#endif
851 log_err("dnstap io: failed writev: %s", sock_strerror(errno));
794 /* close the channel */
795 dtio_del_output_event(dtio);
796 dtio_close_output(dtio);
797 return 0;
798 }
799 /* written r bytes */
800 dtio->cur_msg_len_done += r;
801 if(dtio->cur_msg_len_done < 4)

--- 308 unchanged lines hidden (view full) ---

1110 memcmp(dtio->read_frame.buf + read_frame_done +
1111 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1112 if(!dtio_control_start_send(dtio)) {
1113 verbose(VERB_OPS, "dnstap io: out of "
1114 "memory while sending START frame");
1115 goto close_connection;
1116 }
1117 dtio->accept_frame_received = 1;
852 /* close the channel */
853 dtio_del_output_event(dtio);
854 dtio_close_output(dtio);
855 return 0;
856 }
857 /* written r bytes */
858 dtio->cur_msg_len_done += r;
859 if(dtio->cur_msg_len_done < 4)

--- 308 unchanged lines hidden (view full) ---

1168 memcmp(dtio->read_frame.buf + read_frame_done +
1169 + 8, DNSTAP_CONTENT_TYPE, len) == 0) {
1170 if(!dtio_control_start_send(dtio)) {
1171 verbose(VERB_OPS, "dnstap io: out of "
1172 "memory while sending START frame");
1173 goto close_connection;
1174 }
1175 dtio->accept_frame_received = 1;
1176 if(!dtio_add_output_event_write(dtio))
1177 goto close_connection;
1118 return 1;
1119 } else {
1120 /* unknow content type */
1121 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1122 "contains unknown content type, "
1123 "closing connection");
1124 goto close_connection;
1125 }

--- 351 unchanged lines hidden (view full) ---

1477 ssize_t r;
1478 if(dtio->want_to_exit)
1479 return;
1480 r = read(fd, &cmd, sizeof(cmd));
1481 if(r == -1) {
1482#ifndef USE_WINSOCK
1483 if(errno == EINTR || errno == EAGAIN)
1484 return; /* ignore this */
1178 return 1;
1179 } else {
1180 /* unknow content type */
1181 verbose(VERB_ALGO, "dnstap: ACCEPT frame "
1182 "contains unknown content type, "
1183 "closing connection");
1184 goto close_connection;
1185 }

--- 351 unchanged lines hidden (view full) ---

1537 ssize_t r;
1538 if(dtio->want_to_exit)
1539 return;
1540 r = read(fd, &cmd, sizeof(cmd));
1541 if(r == -1) {
1542#ifndef USE_WINSOCK
1543 if(errno == EINTR || errno == EAGAIN)
1544 return; /* ignore this */
1485 log_err("dnstap io: failed to read: %s", strerror(errno));
1486#else
1487 if(WSAGetLastError() == WSAEINPROGRESS)
1488 return;
1489 if(WSAGetLastError() == WSAEWOULDBLOCK)
1490 return;
1545#else
1546 if(WSAGetLastError() == WSAEINPROGRESS)
1547 return;
1548 if(WSAGetLastError() == WSAEWOULDBLOCK)
1549 return;
1491 log_err("dnstap io: failed to read: %s",
1492 wsa_strerror(WSAGetLastError()));
1493#endif
1550#endif
1551 log_err("dnstap io: failed to read: %s", sock_strerror(errno));
1494 /* and then fall through to quit the thread */
1495 } else if(r == 0) {
1496 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1497 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1498 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1499 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1500 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1501

--- 345 unchanged lines hidden (view full) ---

1847
1848/** open the output file descriptor for af_local */
1849static int dtio_open_output_local(struct dt_io_thread* dtio)
1850{
1851#ifdef HAVE_SYS_UN_H
1852 struct sockaddr_un s;
1853 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1854 if(dtio->fd == -1) {
1552 /* and then fall through to quit the thread */
1553 } else if(r == 0) {
1554 verbose(VERB_ALGO, "dnstap io: cmd channel closed");
1555 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
1556 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
1557 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
1558 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
1559

--- 345 unchanged lines hidden (view full) ---

1905
1906/** open the output file descriptor for af_local */
1907static int dtio_open_output_local(struct dt_io_thread* dtio)
1908{
1909#ifdef HAVE_SYS_UN_H
1910 struct sockaddr_un s;
1911 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1912 if(dtio->fd == -1) {
1855#ifndef USE_WINSOCK
1856 log_err("dnstap io: failed to create socket: %s",
1913 log_err("dnstap io: failed to create socket: %s",
1857 strerror(errno));
1858#else
1859 log_err("dnstap io: failed to create socket: %s",
1860 wsa_strerror(WSAGetLastError()));
1861#endif
1914 sock_strerror(errno));
1862 return 0;
1863 }
1864 memset(&s, 0, sizeof(s));
1865#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1866 /* this member exists on BSDs, not Linux */
1867 s.sun_len = (unsigned)sizeof(s);
1868#endif
1869 s.sun_family = AF_LOCAL;
1870 /* length is 92-108, 104 on FreeBSD */
1871 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1872 fd_set_nonblock(dtio->fd);
1873 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1874 == -1) {
1875 char* to = dtio->socket_path;
1915 return 0;
1916 }
1917 memset(&s, 0, sizeof(s));
1918#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
1919 /* this member exists on BSDs, not Linux */
1920 s.sun_len = (unsigned)sizeof(s);
1921#endif
1922 s.sun_family = AF_LOCAL;
1923 /* length is 92-108, 104 on FreeBSD */
1924 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1925 fd_set_nonblock(dtio->fd);
1926 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1927 == -1) {
1928 char* to = dtio->socket_path;
1876#ifndef USE_WINSOCK
1929 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1930 verbosity < 4) {
1931 dtio_close_fd(dtio);
1932 return 0; /* no log retries on low verbosity */
1933 }
1877 log_err("dnstap io: failed to connect to \"%s\": %s",
1934 log_err("dnstap io: failed to connect to \"%s\": %s",
1878 to, strerror(errno));
1879#else
1880 log_err("dnstap io: failed to connect to \"%s\": %s",
1881 to, wsa_strerror(WSAGetLastError()));
1882#endif
1935 to, sock_strerror(errno));
1883 dtio_close_fd(dtio);
1884 return 0;
1885 }
1886 return 1;
1887#else
1888 log_err("cannot create af_local socket");
1889 return 0;
1890#endif /* HAVE_SYS_UN_H */

--- 8 unchanged lines hidden (view full) ---

1899 addrlen = (socklen_t)sizeof(addr);
1900
1901 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1902 log_err("could not parse IP '%s'", dtio->ip_str);
1903 return 0;
1904 }
1905 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1906 if(dtio->fd == -1) {
1936 dtio_close_fd(dtio);
1937 return 0;
1938 }
1939 return 1;
1940#else
1941 log_err("cannot create af_local socket");
1942 return 0;
1943#endif /* HAVE_SYS_UN_H */

--- 8 unchanged lines hidden (view full) ---

1952 addrlen = (socklen_t)sizeof(addr);
1953
1954 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1955 log_err("could not parse IP '%s'", dtio->ip_str);
1956 return 0;
1957 }
1958 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1959 if(dtio->fd == -1) {
1907#ifndef USE_WINSOCK
1908 log_err("can't create socket: %s", strerror(errno));
1909#else
1910 log_err("can't create socket: %s",
1911 wsa_strerror(WSAGetLastError()));
1912#endif
1960 log_err("can't create socket: %s", sock_strerror(errno));
1913 return 0;
1914 }
1915 fd_set_nonblock(dtio->fd);
1916 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1917 if(errno == EINPROGRESS)
1918 return 1; /* wait until connect done*/
1961 return 0;
1962 }
1963 fd_set_nonblock(dtio->fd);
1964 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1965 if(errno == EINPROGRESS)
1966 return 1; /* wait until connect done*/
1967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1968 verbosity < 4) {
1969 dtio_close_fd(dtio);
1970 return 0; /* no log retries on low verbosity */
1971 }
1919#ifndef USE_WINSOCK
1920 if(tcp_connect_errno_needs_log(
1921 (struct sockaddr *)&addr, addrlen)) {
1922 log_err("dnstap io: failed to connect to %s: %s",
1923 dtio->ip_str, strerror(errno));
1924 }
1925#else
1926 if(WSAGetLastError() == WSAEINPROGRESS ||

--- 165 unchanged lines hidden (view full) ---

2092
2093#ifndef THREADS_DISABLED
2094 while(1) {
2095 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2096 if(r == -1) {
2097#ifndef USE_WINSOCK
2098 if(errno == EINTR || errno == EAGAIN)
2099 continue;
1972#ifndef USE_WINSOCK
1973 if(tcp_connect_errno_needs_log(
1974 (struct sockaddr *)&addr, addrlen)) {
1975 log_err("dnstap io: failed to connect to %s: %s",
1976 dtio->ip_str, strerror(errno));
1977 }
1978#else
1979 if(WSAGetLastError() == WSAEINPROGRESS ||

--- 165 unchanged lines hidden (view full) ---

2145
2146#ifndef THREADS_DISABLED
2147 while(1) {
2148 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2149 if(r == -1) {
2150#ifndef USE_WINSOCK
2151 if(errno == EINTR || errno == EAGAIN)
2152 continue;
2100 log_err("dnstap io stop: write: %s", strerror(errno));
2101#else
2102 if(WSAGetLastError() == WSAEINPROGRESS)
2103 continue;
2104 if(WSAGetLastError() == WSAEWOULDBLOCK)
2105 continue;
2153#else
2154 if(WSAGetLastError() == WSAEINPROGRESS)
2155 continue;
2156 if(WSAGetLastError() == WSAEWOULDBLOCK)
2157 continue;
2106 log_err("dnstap io stop: write: %s",
2107 wsa_strerror(WSAGetLastError()));
2108#endif
2158#endif
2159 log_err("dnstap io stop: write: %s",
2160 sock_strerror(errno));
2109 break;
2110 }
2111 break;
2112 }
2113 dtio->started = 0;
2114#endif /* THREADS_DISABLED */
2115
2116#ifndef USE_WINSOCK

--- 12 unchanged lines hidden ---
2161 break;
2162 }
2163 break;
2164 }
2165 dtio->started = 0;
2166#endif /* THREADS_DISABLED */
2167
2168#ifndef USE_WINSOCK

--- 12 unchanged lines hidden ---