dtstream.c (366095) | dtstream.c (368129) |
---|---|
1/* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without --- 54 unchanged lines hidden (view full) --- 63/** number of messages to process in one output callback */ 64#define DTIO_MESSAGES_PER_CALLBACK 100 65/** the msec to wait for reconnect (if not immediate, the first attempt) */ 66#define DTIO_RECONNECT_TIMEOUT_MIN 10 67/** the msec to wait for reconnect max after backoff */ 68#define DTIO_RECONNECT_TIMEOUT_MAX 1000 69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000 | 1/* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without --- 54 unchanged lines hidden (view full) --- 63/** number of messages to process in one output callback */ 64#define DTIO_MESSAGES_PER_CALLBACK 100 65/** the msec to wait for reconnect (if not immediate, the first attempt) */ 66#define DTIO_RECONNECT_TIMEOUT_MIN 10 67/** the msec to wait for reconnect max after backoff */ 68#define DTIO_RECONNECT_TIMEOUT_MAX 1000 69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000 |
71/** number of messages before wakeup of thread */ 72#define DTIO_MSG_FOR_WAKEUP 32 |
|
71 72/** maximum length of received frame */ 73#define DTIO_RECV_FRAME_MAX_LEN 1000 74 75struct stop_flush_info; 76/** DTIO command channel commands */ 77enum { 78 /** DTIO command channel stop */ --- 15 unchanged lines hidden (view full) --- 94#ifdef HAVE_SSL 95/** enable briefly waiting for a read event, for SSL negotiation */ 96static int dtio_enable_brief_read(struct dt_io_thread* dtio); 97/** enable briefly waiting for a write event, for SSL negotiation */ 98static int dtio_enable_brief_write(struct dt_io_thread* dtio); 99#endif 100 101struct dt_msg_queue* | 73 74/** maximum length of received frame */ 75#define DTIO_RECV_FRAME_MAX_LEN 1000 76 77struct stop_flush_info; 78/** DTIO command channel commands */ 79enum { 80 /** DTIO command channel stop */ --- 15 unchanged lines hidden (view full) --- 96#ifdef HAVE_SSL 97/** enable briefly waiting for a read event, for SSL negotiation */ 98static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99/** enable briefly waiting for a write event, for SSL negotiation */ 100static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101#endif 102 103struct dt_msg_queue* |
102dt_msg_queue_create(void) | 104dt_msg_queue_create(struct comm_base* base) |
103{ 104 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 105 if(!mq) return NULL; 106 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 107 about 1 M should contain 64K messages with some overhead, 108 or a whole bunch smaller ones */ | 105{ 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ |
111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } |
|
109 lock_basic_init(&mq->lock); 110 lock_protect(&mq->lock, mq, sizeof(*mq)); 111 return mq; 112} 113 114/** clear the message list, caller must hold the lock */ 115static void 116dt_msg_queue_clear(struct dt_msg_queue* mq) 117{ 118 struct dt_msg_entry* e = mq->first, *next=NULL; 119 while(e) { 120 next = e->next; 121 free(e->buf); 122 free(e); 123 e = next; 124 } 125 mq->first = NULL; 126 mq->last = NULL; 127 mq->cursize = 0; | 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119} 120 121/** clear the message list, caller must hold the lock */ 122static void 123dt_msg_queue_clear(struct dt_msg_queue* mq) 124{ 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; |
135 mq->msgcount = 0; |
|
128} 129 130void 131dt_msg_queue_delete(struct dt_msg_queue* mq) 132{ 133 if(!mq) return; 134 lock_basic_destroy(&mq->lock); 135 dt_msg_queue_clear(mq); | 136} 137 138void 139dt_msg_queue_delete(struct dt_msg_queue* mq) 140{ 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); |
144 comm_timer_delete(mq->wakeup_timer); |
|
136 free(mq); 137} 138 139/** make the dtio wake up by sending a wakeup command */ 140static void dtio_wakeup(struct dt_io_thread* dtio) 141{ 142 uint8_t cmd = DTIO_COMMAND_WAKEUP; 143 if(!dtio) return; 144 if(!dtio->started) return; 145 146 while(1) { 147 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 148 if(r == -1) { 149#ifndef USE_WINSOCK 150 if(errno == EINTR || errno == EAGAIN) 151 continue; | 145 free(mq); 146} 147 148/** make the dtio wake up by sending a wakeup command */ 149static void dtio_wakeup(struct dt_io_thread* dtio) 150{ 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158#ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; |
152 log_err("dnstap io wakeup: write: %s", strerror(errno)); | |
153#else 154 if(WSAGetLastError() == WSAEINPROGRESS) 155 continue; 156 if(WSAGetLastError() == WSAEWOULDBLOCK) 157 continue; | 161#else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; |
158 log_err("dnstap io stop: write: %s", 159 wsa_strerror(WSAGetLastError())); | |
160#endif | 166#endif |
167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); |
|
161 break; 162 } 163 break; 164 } 165} 166 167void | 169 break; 170 } 171 break; 172 } 173} 174 175void |
176mq_wakeup_cb(void* arg) 177{ 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 /* even if the dtio is already active, because perhaps much 180 * traffic suddenly, we leave the timer running to save on 181 * managing it, the once a second timer is less work then 182 * starting and stopping the timer frequently */ 183 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184 mq->dtio->wakeup_timer_enabled = 0; 185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186 dtio_wakeup(mq->dtio); 187} 188 189/** start timer to wakeup dtio because there is content in the queue */ 190static void 191dt_msg_queue_start_timer(struct dt_msg_queue* mq) 192{ 193 struct timeval tv; 194 /* Start a timer to process messages to be logged. 195 * If we woke up the dtio thread for every message, the wakeup 196 * messages take up too much processing power. If the queue 197 * fills up the wakeup happens immediately. The timer wakes it up 198 * if there are infrequent messages to log. */ 199 200 /* we cannot start a timer in dtio thread, because it is a different 201 * thread and its event base is in use by the other thread, it would 202 * give race conditions if we tried to modify its event base, 203 * and locks would wait until it woke up, and this is what we do. */ 204 205 /* do not start the timer if a timer already exists, perhaps 206 * in another worker. So this variable is protected by a lock in 207 * dtio */ 208 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 209 if(mq->dtio->wakeup_timer_enabled) { 210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 211 return; 212 } 213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 215 216 /* start the timer, in mq, in the event base of our worker */ 217 tv.tv_sec = 1; 218 tv.tv_usec = 0; 219 comm_timer_set(mq->wakeup_timer, &tv); 220} 221 222void |
|
168dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 169{ | 223dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 224{ |
170 int wakeup = 0; | 225 int wakeupnow = 0, wakeupstarttimer = 0; |
171 struct dt_msg_entry* entry; 172 173 /* check conditions */ 174 if(!buf) return; 175 if(len == 0) { 176 /* it is not possible to log entries with zero length, 177 * because the framestream protocol does not carry it. 178 * However the protobuf serialization does not create zero --- 14 unchanged lines hidden (view full) --- 193 return; 194 } 195 entry->next = NULL; 196 entry->buf = buf; 197 entry->len = len; 198 199 /* aqcuire lock */ 200 lock_basic_lock(&mq->lock); | 226 struct dt_msg_entry* entry; 227 228 /* check conditions */ 229 if(!buf) return; 230 if(len == 0) { 231 /* it is not possible to log entries with zero length, 232 * because the framestream protocol does not carry it. 233 * However the protobuf serialization does not create zero --- 14 unchanged lines hidden (view full) --- 248 return; 249 } 250 entry->next = NULL; 251 entry->buf = buf; 252 entry->len = len; 253 254 /* aqcuire lock */ 255 lock_basic_lock(&mq->lock); |
201 /* list was empty, wakeup dtio */ | 256 /* if list was empty, start timer for (eventual) wakeup */ |
202 if(mq->first == NULL) | 257 if(mq->first == NULL) |
203 wakeup = 1; | 258 wakeupstarttimer = 1; 259 /* if list contains more than wakeupnum elements, wakeup now, 260 * or if list is (going to be) almost full */ 261 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 262 (mq->cursize < mq->maxsize * 9 / 10 && 263 mq->cursize+len >= mq->maxsize * 9 / 10)) 264 wakeupnow = 1; |
204 /* see if it is going to fit */ 205 if(mq->cursize + len > mq->maxsize) { 206 /* buffer full, or congested. */ 207 /* drop */ 208 lock_basic_unlock(&mq->lock); 209 free(buf); 210 free(entry); 211 return; 212 } 213 mq->cursize += len; | 265 /* see if it is going to fit */ 266 if(mq->cursize + len > mq->maxsize) { 267 /* buffer full, or congested. */ 268 /* drop */ 269 lock_basic_unlock(&mq->lock); 270 free(buf); 271 free(entry); 272 return; 273 } 274 mq->cursize += len; |
275 mq->msgcount ++; |
|
214 /* append to list */ 215 if(mq->last) { 216 mq->last->next = entry; 217 } else { 218 mq->first = entry; 219 } 220 mq->last = entry; 221 /* release lock */ 222 lock_basic_unlock(&mq->lock); 223 | 276 /* append to list */ 277 if(mq->last) { 278 mq->last->next = entry; 279 } else { 280 mq->first = entry; 281 } 282 mq->last = entry; 283 /* release lock */ 284 lock_basic_unlock(&mq->lock); 285 |
224 if(wakeup) | 286 if(wakeupnow) { |
225 dtio_wakeup(mq->dtio); | 287 dtio_wakeup(mq->dtio); |
288 } else if(wakeupstarttimer) { 289 dt_msg_queue_start_timer(mq); 290 } |
|
226} 227 228struct dt_io_thread* dt_io_thread_create(void) 229{ 230 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); | 291} 292 293struct dt_io_thread* dt_io_thread_create(void) 294{ 295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); |
296 lock_basic_init(&dtio->wakeup_timer_lock); 297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 298 sizeof(dtio->wakeup_timer_enabled)); |
|
231 return dtio; 232} 233 234void dt_io_thread_delete(struct dt_io_thread* dtio) 235{ 236 struct dt_io_list_item* item, *nextitem; 237 if(!dtio) return; | 299 return dtio; 300} 301 302void dt_io_thread_delete(struct dt_io_thread* dtio) 303{ 304 struct dt_io_list_item* item, *nextitem; 305 if(!dtio) return; |
306 lock_basic_destroy(&dtio->wakeup_timer_lock); |
|
238 item=dtio->io_list; 239 while(item) { 240 nextitem = item->next; 241 free(item); 242 item = nextitem; 243 } 244 free(dtio->socket_path); 245 free(dtio->ip_str); --- 28 unchanged lines hidden (view full) --- 274 if(dtio->upstream_is_unix) { 275 if(!cfg->dnstap_socket_path || 276 cfg->dnstap_socket_path[0]==0) { 277 log_err("dnstap setup: no dnstap-socket-path for " 278 "socket connect"); 279 return 0; 280 } 281 free(dtio->socket_path); | 307 item=dtio->io_list; 308 while(item) { 309 nextitem = item->next; 310 free(item); 311 item = nextitem; 312 } 313 free(dtio->socket_path); 314 free(dtio->ip_str); --- 28 unchanged lines hidden (view full) --- 343 if(dtio->upstream_is_unix) { 344 if(!cfg->dnstap_socket_path || 345 cfg->dnstap_socket_path[0]==0) { 346 log_err("dnstap setup: no dnstap-socket-path for " 347 "socket connect"); 348 return 0; 349 } 350 free(dtio->socket_path); |
282 dtio->socket_path = strdup(cfg->dnstap_socket_path); | 351 dtio->socket_path = fname_after_chroot(cfg->dnstap_socket_path, 352 cfg, 1); |
283 if(!dtio->socket_path) { 284 log_err("dnstap setup: malloc failure"); 285 return 0; 286 } 287 } 288 289 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 290 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { --- 120 unchanged lines hidden (view full) --- 411 size_t* len) 412{ 413 lock_basic_lock(&mq->lock); 414 if(mq->first) { 415 struct dt_msg_entry* entry = mq->first; 416 mq->first = entry->next; 417 if(!entry->next) mq->last = NULL; 418 mq->cursize -= entry->len; | 353 if(!dtio->socket_path) { 354 log_err("dnstap setup: malloc failure"); 355 return 0; 356 } 357 } 358 359 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 360 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { --- 120 unchanged lines hidden (view full) --- 481 size_t* len) 482{ 483 lock_basic_lock(&mq->lock); 484 if(mq->first) { 485 struct dt_msg_entry* entry = mq->first; 486 mq->first = entry->next; 487 if(!entry->next) mq->last = NULL; 488 mq->cursize -= entry->len; |
489 mq->msgcount --; |
|
419 lock_basic_unlock(&mq->lock); 420 421 *buf = entry->buf; 422 *len = entry->len; 423 free(entry); 424 return 1; 425 } 426 lock_basic_unlock(&mq->lock); --- 155 unchanged lines hidden (view full) --- 582 ub_event_del(dtio->event); 583 dtio->event_added = 0; 584 dtio->event_added_is_write = 0; 585} 586 587/** close dtio socket and set it to -1 */ 588static void dtio_close_fd(struct dt_io_thread* dtio) 589{ | 490 lock_basic_unlock(&mq->lock); 491 492 *buf = entry->buf; 493 *len = entry->len; 494 free(entry); 495 return 1; 496 } 497 lock_basic_unlock(&mq->lock); --- 155 unchanged lines hidden (view full) --- 653 ub_event_del(dtio->event); 654 dtio->event_added = 0; 655 dtio->event_added_is_write = 0; 656} 657 658/** close dtio socket and set it to -1 */ 659static void dtio_close_fd(struct dt_io_thread* dtio) 660{ |
590#ifndef USE_WINSOCK 591 close(dtio->fd); 592#else 593 closesocket(dtio->fd); 594#endif | 661 sock_close(dtio->fd); |
595 dtio->fd = -1; 596} 597 598/** close and stop the output file descriptor event */ 599static void dtio_close_output(struct dt_io_thread* dtio) 600{ 601 if(!dtio->event) 602 return; --- 51 unchanged lines hidden (view full) --- 654 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 655 return 0; /* try again later */ 656 } 657#endif 658 if(error != 0) { 659 char* to = dtio->socket_path; 660 if(!to) to = dtio->ip_str; 661 if(!to) to = ""; | 662 dtio->fd = -1; 663} 664 665/** close and stop the output file descriptor event */ 666static void dtio_close_output(struct dt_io_thread* dtio) 667{ 668 if(!dtio->event) 669 return; --- 51 unchanged lines hidden (view full) --- 721 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 722 return 0; /* try again later */ 723 } 724#endif 725 if(error != 0) { 726 char* to = dtio->socket_path; 727 if(!to) to = dtio->ip_str; 728 if(!to) to = ""; |
662#ifndef USE_WINSOCK | |
663 log_err("dnstap io: failed to connect to \"%s\": %s", | 729 log_err("dnstap io: failed to connect to \"%s\": %s", |
664 to, strerror(error)); 665#else 666 log_err("dnstap io: failed to connect to \"%s\": %s", 667 to, wsa_strerror(error)); 668#endif | 730 to, sock_strerror(error)); |
669 return -1; /* error, close it */ 670 } 671 672 if(dtio->ip_str) 673 verbose(VERB_DETAIL, "dnstap io: connected to %s", 674 dtio->ip_str); 675 else if(dtio->socket_path) 676 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", --- 60 unchanged lines hidden (view full) --- 737 if(dtio->ssl) 738 return dtio_write_ssl(dtio, buf, len); 739#endif 740 ret = send(dtio->fd, (void*)buf, len, 0); 741 if(ret == -1) { 742#ifndef USE_WINSOCK 743 if(errno == EINTR || errno == EAGAIN) 744 return 0; | 731 return -1; /* error, close it */ 732 } 733 734 if(dtio->ip_str) 735 verbose(VERB_DETAIL, "dnstap io: connected to %s", 736 dtio->ip_str); 737 else if(dtio->socket_path) 738 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", --- 60 unchanged lines hidden (view full) --- 799 if(dtio->ssl) 800 return dtio_write_ssl(dtio, buf, len); 801#endif 802 ret = send(dtio->fd, (void*)buf, len, 0); 803 if(ret == -1) { 804#ifndef USE_WINSOCK 805 if(errno == EINTR || errno == EAGAIN) 806 return 0; |
745 log_err("dnstap io: failed send: %s", strerror(errno)); | |
746#else 747 if(WSAGetLastError() == WSAEINPROGRESS) 748 return 0; 749 if(WSAGetLastError() == WSAEWOULDBLOCK) { 750 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 751 dtio->stop_flush_event:dtio->event), 752 UB_EV_WRITE); 753 return 0; 754 } | 807#else 808 if(WSAGetLastError() == WSAEINPROGRESS) 809 return 0; 810 if(WSAGetLastError() == WSAEWOULDBLOCK) { 811 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 812 dtio->stop_flush_event:dtio->event), 813 UB_EV_WRITE); 814 return 0; 815 } |
755 log_err("dnstap io: failed send: %s", 756 wsa_strerror(WSAGetLastError())); | |
757#endif | 816#endif |
817 log_err("dnstap io: failed send: %s", sock_strerror(errno)); |
|
758 return -1; 759 } 760 return ret; 761} 762 763#ifdef HAVE_WRITEV 764/** write with writev, len and message, in one write, if possible. 765 * return true if message is done, false if incomplete */ --- 7 unchanged lines hidden (view full) --- 773 iov[1].iov_base = dtio->cur_msg; 774 iov[1].iov_len = dtio->cur_msg_len; 775 log_assert(iov[0].iov_len > 0); 776 r = writev(dtio->fd, iov, 2); 777 if(r == -1) { 778#ifndef USE_WINSOCK 779 if(errno == EINTR || errno == EAGAIN) 780 return 0; | 818 return -1; 819 } 820 return ret; 821} 822 823#ifdef HAVE_WRITEV 824/** write with writev, len and message, in one write, if possible. 825 * return true if message is done, false if incomplete */ --- 7 unchanged lines hidden (view full) --- 833 iov[1].iov_base = dtio->cur_msg; 834 iov[1].iov_len = dtio->cur_msg_len; 835 log_assert(iov[0].iov_len > 0); 836 r = writev(dtio->fd, iov, 2); 837 if(r == -1) { 838#ifndef USE_WINSOCK 839 if(errno == EINTR || errno == EAGAIN) 840 return 0; |
781 log_err("dnstap io: failed writev: %s", strerror(errno)); | |
782#else 783 if(WSAGetLastError() == WSAEINPROGRESS) 784 return 0; 785 if(WSAGetLastError() == WSAEWOULDBLOCK) { 786 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 787 dtio->stop_flush_event:dtio->event), 788 UB_EV_WRITE); 789 return 0; 790 } | 841#else 842 if(WSAGetLastError() == WSAEINPROGRESS) 843 return 0; 844 if(WSAGetLastError() == WSAEWOULDBLOCK) { 845 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 846 dtio->stop_flush_event:dtio->event), 847 UB_EV_WRITE); 848 return 0; 849 } |
791 log_err("dnstap io: failed writev: %s", 792 wsa_strerror(WSAGetLastError())); | |
793#endif | 850#endif |
851 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); |
|
794 /* close the channel */ 795 dtio_del_output_event(dtio); 796 dtio_close_output(dtio); 797 return 0; 798 } 799 /* written r bytes */ 800 dtio->cur_msg_len_done += r; 801 if(dtio->cur_msg_len_done < 4) --- 308 unchanged lines hidden (view full) --- 1110 memcmp(dtio->read_frame.buf + read_frame_done + 1111 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1112 if(!dtio_control_start_send(dtio)) { 1113 verbose(VERB_OPS, "dnstap io: out of " 1114 "memory while sending START frame"); 1115 goto close_connection; 1116 } 1117 dtio->accept_frame_received = 1; | 852 /* close the channel */ 853 dtio_del_output_event(dtio); 854 dtio_close_output(dtio); 855 return 0; 856 } 857 /* written r bytes */ 858 dtio->cur_msg_len_done += r; 859 if(dtio->cur_msg_len_done < 4) --- 308 unchanged lines hidden (view full) --- 1168 memcmp(dtio->read_frame.buf + read_frame_done + 1169 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1170 if(!dtio_control_start_send(dtio)) { 1171 verbose(VERB_OPS, "dnstap io: out of " 1172 "memory while sending START frame"); 1173 goto close_connection; 1174 } 1175 dtio->accept_frame_received = 1; |
1176 if(!dtio_add_output_event_write(dtio)) 1177 goto close_connection; |
|
1118 return 1; 1119 } else { 1120 /* unknow content type */ 1121 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1122 "contains unknown content type, " 1123 "closing connection"); 1124 goto close_connection; 1125 } --- 351 unchanged lines hidden (view full) --- 1477 ssize_t r; 1478 if(dtio->want_to_exit) 1479 return; 1480 r = read(fd, &cmd, sizeof(cmd)); 1481 if(r == -1) { 1482#ifndef USE_WINSOCK 1483 if(errno == EINTR || errno == EAGAIN) 1484 return; /* ignore this */ | 1178 return 1; 1179 } else { 1180 /* unknow content type */ 1181 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1182 "contains unknown content type, " 1183 "closing connection"); 1184 goto close_connection; 1185 } --- 351 unchanged lines hidden (view full) --- 1537 ssize_t r; 1538 if(dtio->want_to_exit) 1539 return; 1540 r = read(fd, &cmd, sizeof(cmd)); 1541 if(r == -1) { 1542#ifndef USE_WINSOCK 1543 if(errno == EINTR || errno == EAGAIN) 1544 return; /* ignore this */ |
1485 log_err("dnstap io: failed to read: %s", strerror(errno)); | |
1486#else 1487 if(WSAGetLastError() == WSAEINPROGRESS) 1488 return; 1489 if(WSAGetLastError() == WSAEWOULDBLOCK) 1490 return; | 1545#else 1546 if(WSAGetLastError() == WSAEINPROGRESS) 1547 return; 1548 if(WSAGetLastError() == WSAEWOULDBLOCK) 1549 return; |
1491 log_err("dnstap io: failed to read: %s", 1492 wsa_strerror(WSAGetLastError())); | |
1493#endif | 1550#endif |
1551 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); |
|
1494 /* and then fall through to quit the thread */ 1495 } else if(r == 0) { 1496 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1497 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1498 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1499 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1500 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1501 --- 345 unchanged lines hidden (view full) --- 1847 1848/** open the output file descriptor for af_local */ 1849static int dtio_open_output_local(struct dt_io_thread* dtio) 1850{ 1851#ifdef HAVE_SYS_UN_H 1852 struct sockaddr_un s; 1853 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1854 if(dtio->fd == -1) { | 1552 /* and then fall through to quit the thread */ 1553 } else if(r == 0) { 1554 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1555 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1556 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1557 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1558 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1559 --- 345 unchanged lines hidden (view full) --- 1905 1906/** open the output file descriptor for af_local */ 1907static int dtio_open_output_local(struct dt_io_thread* dtio) 1908{ 1909#ifdef HAVE_SYS_UN_H 1910 struct sockaddr_un s; 1911 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1912 if(dtio->fd == -1) { |
1855#ifndef USE_WINSOCK | |
1856 log_err("dnstap io: failed to create socket: %s", | 1913 log_err("dnstap io: failed to create socket: %s", |
1857 strerror(errno)); 1858#else 1859 log_err("dnstap io: failed to create socket: %s", 1860 wsa_strerror(WSAGetLastError())); 1861#endif | 1914 sock_strerror(errno)); |
1862 return 0; 1863 } 1864 memset(&s, 0, sizeof(s)); 1865#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1866 /* this member exists on BSDs, not Linux */ 1867 s.sun_len = (unsigned)sizeof(s); 1868#endif 1869 s.sun_family = AF_LOCAL; 1870 /* length is 92-108, 104 on FreeBSD */ 1871 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1872 fd_set_nonblock(dtio->fd); 1873 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1874 == -1) { 1875 char* to = dtio->socket_path; | 1915 return 0; 1916 } 1917 memset(&s, 0, sizeof(s)); 1918#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1919 /* this member exists on BSDs, not Linux */ 1920 s.sun_len = (unsigned)sizeof(s); 1921#endif 1922 s.sun_family = AF_LOCAL; 1923 /* length is 92-108, 104 on FreeBSD */ 1924 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1925 fd_set_nonblock(dtio->fd); 1926 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1927 == -1) { 1928 char* to = dtio->socket_path; |
1876#ifndef USE_WINSOCK | 1929 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1930 verbosity < 4) { 1931 dtio_close_fd(dtio); 1932 return 0; /* no log retries on low verbosity */ 1933 } |
1877 log_err("dnstap io: failed to connect to \"%s\": %s", | 1934 log_err("dnstap io: failed to connect to \"%s\": %s", |
1878 to, strerror(errno)); 1879#else 1880 log_err("dnstap io: failed to connect to \"%s\": %s", 1881 to, wsa_strerror(WSAGetLastError())); 1882#endif | 1935 to, sock_strerror(errno)); |
1883 dtio_close_fd(dtio); 1884 return 0; 1885 } 1886 return 1; 1887#else 1888 log_err("cannot create af_local socket"); 1889 return 0; 1890#endif /* HAVE_SYS_UN_H */ --- 8 unchanged lines hidden (view full) --- 1899 addrlen = (socklen_t)sizeof(addr); 1900 1901 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1902 log_err("could not parse IP '%s'", dtio->ip_str); 1903 return 0; 1904 } 1905 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1906 if(dtio->fd == -1) { | 1936 dtio_close_fd(dtio); 1937 return 0; 1938 } 1939 return 1; 1940#else 1941 log_err("cannot create af_local socket"); 1942 return 0; 1943#endif /* HAVE_SYS_UN_H */ --- 8 unchanged lines hidden (view full) --- 1952 addrlen = (socklen_t)sizeof(addr); 1953 1954 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) { 1955 log_err("could not parse IP '%s'", dtio->ip_str); 1956 return 0; 1957 } 1958 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1959 if(dtio->fd == -1) { |
1907#ifndef USE_WINSOCK 1908 log_err("can't create socket: %s", strerror(errno)); 1909#else 1910 log_err("can't create socket: %s", 1911 wsa_strerror(WSAGetLastError())); 1912#endif | 1960 log_err("can't create socket: %s", sock_strerror(errno)); |
1913 return 0; 1914 } 1915 fd_set_nonblock(dtio->fd); 1916 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1917 if(errno == EINPROGRESS) 1918 return 1; /* wait until connect done*/ | 1961 return 0; 1962 } 1963 fd_set_nonblock(dtio->fd); 1964 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1965 if(errno == EINPROGRESS) 1966 return 1; /* wait until connect done*/ |
1967 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1968 verbosity < 4) { 1969 dtio_close_fd(dtio); 1970 return 0; /* no log retries on low verbosity */ 1971 } |
|
1919#ifndef USE_WINSOCK 1920 if(tcp_connect_errno_needs_log( 1921 (struct sockaddr *)&addr, addrlen)) { 1922 log_err("dnstap io: failed to connect to %s: %s", 1923 dtio->ip_str, strerror(errno)); 1924 } 1925#else 1926 if(WSAGetLastError() == WSAEINPROGRESS || --- 165 unchanged lines hidden (view full) --- 2092 2093#ifndef THREADS_DISABLED 2094 while(1) { 2095 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2096 if(r == -1) { 2097#ifndef USE_WINSOCK 2098 if(errno == EINTR || errno == EAGAIN) 2099 continue; | 1972#ifndef USE_WINSOCK 1973 if(tcp_connect_errno_needs_log( 1974 (struct sockaddr *)&addr, addrlen)) { 1975 log_err("dnstap io: failed to connect to %s: %s", 1976 dtio->ip_str, strerror(errno)); 1977 } 1978#else 1979 if(WSAGetLastError() == WSAEINPROGRESS || --- 165 unchanged lines hidden (view full) --- 2145 2146#ifndef THREADS_DISABLED 2147 while(1) { 2148 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2149 if(r == -1) { 2150#ifndef USE_WINSOCK 2151 if(errno == EINTR || errno == EAGAIN) 2152 continue; |
2100 log_err("dnstap io stop: write: %s", strerror(errno)); | |
2101#else 2102 if(WSAGetLastError() == WSAEINPROGRESS) 2103 continue; 2104 if(WSAGetLastError() == WSAEWOULDBLOCK) 2105 continue; | 2153#else 2154 if(WSAGetLastError() == WSAEINPROGRESS) 2155 continue; 2156 if(WSAGetLastError() == WSAEWOULDBLOCK) 2157 continue; |
2106 log_err("dnstap io stop: write: %s", 2107 wsa_strerror(WSAGetLastError())); | |
2108#endif | 2158#endif |
2159 log_err("dnstap io stop: write: %s", 2160 sock_strerror(errno)); |
|
2109 break; 2110 } 2111 break; 2112 } 2113 dtio->started = 0; 2114#endif /* THREADS_DISABLED */ 2115 2116#ifndef USE_WINSOCK --- 12 unchanged lines hidden --- | 2161 break; 2162 } 2163 break; 2164 } 2165 dtio->started = 0; 2166#endif /* THREADS_DISABLED */ 2167 2168#ifndef USE_WINSOCK --- 12 unchanged lines hidden --- |