1/* 2 * dnstap/dtstream.c - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * Redistributions in binary form must reproduce the above copyright notice, 16 * this list of conditions and the following disclaimer in the documentation 17 * and/or other materials provided with the distribution. 18 * 19 * Neither the name of the NLNET LABS nor the names of its contributors may 20 * be used to endorse or promote products derived from this software without 21 * specific prior written permission. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34 * 35 */ 36 37/** 38 * \file 39 * 40 * An implementation of the Frame Streams data transport protocol for 41 * the Unbound DNSTAP message logging facility. 42 */ 43 44#include "config.h" 45#include "dnstap/dtstream.h" 46#include "dnstap/dnstap_fstrm.h" 47#include "util/config_file.h" 48#include "util/ub_event.h" 49#include "util/net_help.h" 50#include "services/outside_network.h" 51#include "sldns/sbuffer.h" 52#ifdef HAVE_SYS_UN_H 53#include <sys/un.h> 54#endif 55#include <fcntl.h> 56#ifdef HAVE_OPENSSL_SSL_H 57#include <openssl/ssl.h> 58#endif 59#ifdef HAVE_OPENSSL_ERR_H 60#include <openssl/err.h> 61#endif 62 63/** number of messages to process in one output callback */ 64#define DTIO_MESSAGES_PER_CALLBACK 100 65/** the msec to wait for reconnect (if not immediate, the first attempt) */ 66#define DTIO_RECONNECT_TIMEOUT_MIN 10 67/** the msec to wait for reconnect max after backoff */ 68#define DTIO_RECONNECT_TIMEOUT_MAX 1000 69/** the msec to wait for reconnect slow, to stop busy spinning on reconnect */ 70#define DTIO_RECONNECT_TIMEOUT_SLOW 1000 71/** number of messages before wakeup of thread */ 72#define DTIO_MSG_FOR_WAKEUP 32 73 74/** maximum length of received frame */ 75#define DTIO_RECV_FRAME_MAX_LEN 1000 76 77struct stop_flush_info; 78/** DTIO command channel commands */ 79enum { 80 /** DTIO command channel stop */ 81 DTIO_COMMAND_STOP = 0, 82 /** DTIO command channel wakeup */ 83 DTIO_COMMAND_WAKEUP = 1 84} dtio_channel_command; 85 86/** open the output channel */ 87static void dtio_open_output(struct dt_io_thread* dtio); 88/** add output event for read and write */ 89static int dtio_add_output_event_write(struct dt_io_thread* dtio); 90/** start reconnection attempts */ 91static void dtio_reconnect_enable(struct dt_io_thread* dtio); 92/** stop from stop_flush event loop */ 93static void dtio_stop_flush_exit(struct stop_flush_info* info); 94/** setup a start control message */ 95static int dtio_control_start_send(struct dt_io_thread* dtio); 96#ifdef HAVE_SSL 97/** enable briefly waiting for a read event, for SSL negotiation */ 98static int dtio_enable_brief_read(struct dt_io_thread* dtio); 99/** enable briefly waiting for a write event, for SSL negotiation */ 100static int dtio_enable_brief_write(struct dt_io_thread* dtio); 101#endif 102 103struct dt_msg_queue* 104dt_msg_queue_create(struct comm_base* base) 105{ 106 struct dt_msg_queue* mq = calloc(1, sizeof(*mq)); 107 if(!mq) return NULL; 108 mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker, 109 about 1 M should contain 64K messages with some overhead, 110 or a whole bunch smaller ones */ 111 mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq); 112 if(!mq->wakeup_timer) { 113 free(mq); 114 return NULL; 115 } 116 lock_basic_init(&mq->lock); 117 lock_protect(&mq->lock, mq, sizeof(*mq)); 118 return mq; 119} 120 121/** clear the message list, caller must hold the lock */ 122static void 123dt_msg_queue_clear(struct dt_msg_queue* mq) 124{ 125 struct dt_msg_entry* e = mq->first, *next=NULL; 126 while(e) { 127 next = e->next; 128 free(e->buf); 129 free(e); 130 e = next; 131 } 132 mq->first = NULL; 133 mq->last = NULL; 134 mq->cursize = 0; 135 mq->msgcount = 0; 136} 137 138void 139dt_msg_queue_delete(struct dt_msg_queue* mq) 140{ 141 if(!mq) return; 142 lock_basic_destroy(&mq->lock); 143 dt_msg_queue_clear(mq); 144 comm_timer_delete(mq->wakeup_timer); 145 free(mq); 146} 147 148/** make the dtio wake up by sending a wakeup command */ 149static void dtio_wakeup(struct dt_io_thread* dtio) 150{ 151 uint8_t cmd = DTIO_COMMAND_WAKEUP; 152 if(!dtio) return; 153 if(!dtio->started) return; 154 155 while(1) { 156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 157 if(r == -1) { 158#ifndef USE_WINSOCK 159 if(errno == EINTR || errno == EAGAIN) 160 continue; 161#else 162 if(WSAGetLastError() == WSAEINPROGRESS) 163 continue; 164 if(WSAGetLastError() == WSAEWOULDBLOCK) 165 continue; 166#endif 167 log_err("dnstap io wakeup: write: %s", 168 sock_strerror(errno)); 169 break; 170 } 171 break; 172 } 173} 174 175void 176mq_wakeup_cb(void* arg) 177{ 178 struct dt_msg_queue* mq = (struct dt_msg_queue*)arg; 179 /* even if the dtio is already active, because perhaps much 180 * traffic suddenly, we leave the timer running to save on 181 * managing it, the once a second timer is less work then 182 * starting and stopping the timer frequently */ 183 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 184 mq->dtio->wakeup_timer_enabled = 0; 185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 186 dtio_wakeup(mq->dtio); 187} 188 189/** start timer to wakeup dtio because there is content in the queue */ 190static void 191dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow) 192{ 193 struct timeval tv = {0}; 194 /* Start a timer to process messages to be logged. 195 * If we woke up the dtio thread for every message, the wakeup 196 * messages take up too much processing power. If the queue 197 * fills up the wakeup happens immediately. The timer wakes it up 198 * if there are infrequent messages to log. */ 199 200 /* we cannot start a timer in dtio thread, because it is a different 201 * thread and its event base is in use by the other thread, it would 202 * give race conditions if we tried to modify its event base, 203 * and locks would wait until it woke up, and this is what we do. */ 204 205 /* do not start the timer if a timer already exists, perhaps 206 * in another worker. So this variable is protected by a lock in 207 * dtio. */ 208 209 /* If we need to wakeupnow, 0 the timer to force the callback. */ 210 lock_basic_lock(&mq->dtio->wakeup_timer_lock); 211 if(mq->dtio->wakeup_timer_enabled) { 212 if(wakeupnow) { 213 comm_timer_set(mq->wakeup_timer, &tv); 214 } 215 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 216 return; 217 } 218 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */ 219 220 /* start the timer, in mq, in the event base of our worker */ 221 if(!wakeupnow) { 222 tv.tv_sec = 1; 223 tv.tv_usec = 0; 224 } 225 comm_timer_set(mq->wakeup_timer, &tv); 226 lock_basic_unlock(&mq->dtio->wakeup_timer_lock); 227} 228 229void 230dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) 231{ 232 int wakeupnow = 0, wakeupstarttimer = 0; 233 struct dt_msg_entry* entry; 234 235 /* check conditions */ 236 if(!buf) return; 237 if(len == 0) { 238 /* it is not possible to log entries with zero length, 239 * because the framestream protocol does not carry it. 240 * However the protobuf serialization does not create zero 241 * length datagrams for dnstap, so this should not happen. */ 242 free(buf); 243 return; 244 } 245 if(!mq) { 246 free(buf); 247 return; 248 } 249 250 /* allocate memory for queue entry */ 251 entry = malloc(sizeof(*entry)); 252 if(!entry) { 253 log_err("out of memory logging dnstap"); 254 free(buf); 255 return; 256 } 257 entry->next = NULL; 258 entry->buf = buf; 259 entry->len = len; 260 261 /* acquire lock */ 262 lock_basic_lock(&mq->lock); 263 /* if list was empty, start timer for (eventual) wakeup */ 264 if(mq->first == NULL) 265 wakeupstarttimer = 1; 266 /* if list contains more than wakeupnum elements, wakeup now, 267 * or if list is (going to be) almost full */ 268 if(mq->msgcount == DTIO_MSG_FOR_WAKEUP || 269 (mq->cursize < mq->maxsize * 9 / 10 && 270 mq->cursize+len >= mq->maxsize * 9 / 10)) 271 wakeupnow = 1; 272 /* see if it is going to fit */ 273 if(mq->cursize + len > mq->maxsize) { 274 /* buffer full, or congested. */ 275 /* drop */ 276 lock_basic_unlock(&mq->lock); 277 free(buf); 278 free(entry); 279 return; 280 } 281 mq->cursize += len; 282 mq->msgcount ++; 283 /* append to list */ 284 if(mq->last) { 285 mq->last->next = entry; 286 } else { 287 mq->first = entry; 288 } 289 mq->last = entry; 290 /* release lock */ 291 lock_basic_unlock(&mq->lock); 292 293 if(wakeupnow || wakeupstarttimer) { 294 dt_msg_queue_start_timer(mq, wakeupnow); 295 } 296} 297 298struct dt_io_thread* dt_io_thread_create(void) 299{ 300 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); 301 lock_basic_init(&dtio->wakeup_timer_lock); 302 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled, 303 sizeof(dtio->wakeup_timer_enabled)); 304 return dtio; 305} 306 307void dt_io_thread_delete(struct dt_io_thread* dtio) 308{ 309 struct dt_io_list_item* item, *nextitem; 310 if(!dtio) return; 311 lock_basic_destroy(&dtio->wakeup_timer_lock); 312 item=dtio->io_list; 313 while(item) { 314 nextitem = item->next; 315 free(item); 316 item = nextitem; 317 } 318 free(dtio->socket_path); 319 free(dtio->ip_str); 320 free(dtio->tls_server_name); 321 free(dtio->client_key_file); 322 free(dtio->client_cert_file); 323 if(dtio->ssl_ctx) { 324#ifdef HAVE_SSL 325 SSL_CTX_free(dtio->ssl_ctx); 326#endif 327 } 328 free(dtio); 329} 330 331int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) 332{ 333 if(!cfg->dnstap) { 334 log_warn("cannot setup dnstap because dnstap-enable is no"); 335 return 0; 336 } 337 338 /* what type of connectivity do we have */ 339 if(cfg->dnstap_ip && cfg->dnstap_ip[0]) { 340 if(cfg->dnstap_tls) 341 dtio->upstream_is_tls = 1; 342 else dtio->upstream_is_tcp = 1; 343 } else { 344 dtio->upstream_is_unix = 1; 345 } 346 dtio->is_bidirectional = cfg->dnstap_bidirectional; 347 348 if(dtio->upstream_is_unix) { 349 char* nm; 350 if(!cfg->dnstap_socket_path || 351 cfg->dnstap_socket_path[0]==0) { 352 log_err("dnstap setup: no dnstap-socket-path for " 353 "socket connect"); 354 return 0; 355 } 356 nm = cfg->dnstap_socket_path; 357 if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm, 358 cfg->chrootdir, strlen(cfg->chrootdir)) == 0) 359 nm += strlen(cfg->chrootdir); 360 free(dtio->socket_path); 361 dtio->socket_path = strdup(nm); 362 if(!dtio->socket_path) { 363 log_err("dnstap setup: malloc failure"); 364 return 0; 365 } 366 } 367 368 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 369 if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) { 370 log_err("dnstap setup: no dnstap-ip for TCP connect"); 371 return 0; 372 } 373 free(dtio->ip_str); 374 dtio->ip_str = strdup(cfg->dnstap_ip); 375 if(!dtio->ip_str) { 376 log_err("dnstap setup: malloc failure"); 377 return 0; 378 } 379 } 380 381 if(dtio->upstream_is_tls) { 382#ifdef HAVE_SSL 383 if(cfg->dnstap_tls_server_name && 384 cfg->dnstap_tls_server_name[0]) { 385 free(dtio->tls_server_name); 386 dtio->tls_server_name = strdup( 387 cfg->dnstap_tls_server_name); 388 if(!dtio->tls_server_name) { 389 log_err("dnstap setup: malloc failure"); 390 return 0; 391 } 392 if(!check_auth_name_for_ssl(dtio->tls_server_name)) 393 return 0; 394 } 395 if(cfg->dnstap_tls_client_key_file && 396 cfg->dnstap_tls_client_key_file[0]) { 397 dtio->use_client_certs = 1; 398 free(dtio->client_key_file); 399 dtio->client_key_file = strdup( 400 cfg->dnstap_tls_client_key_file); 401 if(!dtio->client_key_file) { 402 log_err("dnstap setup: malloc failure"); 403 return 0; 404 } 405 if(!cfg->dnstap_tls_client_cert_file || 406 cfg->dnstap_tls_client_cert_file[0]==0) { 407 log_err("dnstap setup: client key " 408 "authentication enabled with " 409 "dnstap-tls-client-key-file, but " 410 "no dnstap-tls-client-cert-file " 411 "is given"); 412 return 0; 413 } 414 free(dtio->client_cert_file); 415 dtio->client_cert_file = strdup( 416 cfg->dnstap_tls_client_cert_file); 417 if(!dtio->client_cert_file) { 418 log_err("dnstap setup: malloc failure"); 419 return 0; 420 } 421 } else { 422 dtio->use_client_certs = 0; 423 dtio->client_key_file = NULL; 424 dtio->client_cert_file = NULL; 425 } 426 427 if(cfg->dnstap_tls_cert_bundle) { 428 dtio->ssl_ctx = connect_sslctx_create( 429 dtio->client_key_file, 430 dtio->client_cert_file, 431 cfg->dnstap_tls_cert_bundle, 0); 432 } else { 433 dtio->ssl_ctx = connect_sslctx_create( 434 dtio->client_key_file, 435 dtio->client_cert_file, 436 cfg->tls_cert_bundle, cfg->tls_win_cert); 437 } 438 if(!dtio->ssl_ctx) { 439 log_err("could not setup SSL CTX"); 440 return 0; 441 } 442 dtio->tls_use_sni = cfg->tls_use_sni; 443#endif /* HAVE_SSL */ 444 } 445 return 1; 446} 447 448int dt_io_thread_register_queue(struct dt_io_thread* dtio, 449 struct dt_msg_queue* mq) 450{ 451 struct dt_io_list_item* item = malloc(sizeof(*item)); 452 if(!item) return 0; 453 lock_basic_lock(&mq->lock); 454 mq->dtio = dtio; 455 lock_basic_unlock(&mq->lock); 456 item->queue = mq; 457 item->next = dtio->io_list; 458 dtio->io_list = item; 459 dtio->io_list_iter = NULL; 460 return 1; 461} 462 463void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, 464 struct dt_msg_queue* mq) 465{ 466 struct dt_io_list_item* item, *prev=NULL; 467 if(!dtio) return; 468 item = dtio->io_list; 469 while(item) { 470 if(item->queue == mq) { 471 /* found it */ 472 if(prev) prev->next = item->next; 473 else dtio->io_list = item->next; 474 /* the queue itself only registered, not deleted */ 475 lock_basic_lock(&item->queue->lock); 476 item->queue->dtio = NULL; 477 lock_basic_unlock(&item->queue->lock); 478 free(item); 479 dtio->io_list_iter = NULL; 480 return; 481 } 482 prev = item; 483 item = item->next; 484 } 485} 486 487/** pick a message from the queue, the routine locks and unlocks, 488 * returns true if there is a message */ 489static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf, 490 size_t* len) 491{ 492 lock_basic_lock(&mq->lock); 493 if(mq->first) { 494 struct dt_msg_entry* entry = mq->first; 495 mq->first = entry->next; 496 if(!entry->next) mq->last = NULL; 497 mq->cursize -= entry->len; 498 mq->msgcount --; 499 lock_basic_unlock(&mq->lock); 500 501 *buf = entry->buf; 502 *len = entry->len; 503 free(entry); 504 return 1; 505 } 506 lock_basic_unlock(&mq->lock); 507 return 0; 508} 509 510/** find message in queue, false if no message, true if message to send */ 511static int dtio_find_in_queue(struct dt_io_thread* dtio, 512 struct dt_msg_queue* mq) 513{ 514 void* buf=NULL; 515 size_t len=0; 516 if(dt_msg_queue_pop(mq, &buf, &len)) { 517 dtio->cur_msg = buf; 518 dtio->cur_msg_len = len; 519 dtio->cur_msg_done = 0; 520 dtio->cur_msg_len_done = 0; 521 return 1; 522 } 523 return 0; 524} 525 526/** find a new message to write, search message queues, false if none */ 527static int dtio_find_msg(struct dt_io_thread* dtio) 528{ 529 struct dt_io_list_item *spot, *item; 530 531 spot = dtio->io_list_iter; 532 /* use the next queue for the next message lookup, 533 * if we hit the end(NULL) the NULL restarts the iter at start. */ 534 if(spot) 535 dtio->io_list_iter = spot->next; 536 else if(dtio->io_list) 537 dtio->io_list_iter = dtio->io_list->next; 538 539 /* scan from spot to end-of-io_list */ 540 item = spot; 541 while(item) { 542 if(dtio_find_in_queue(dtio, item->queue)) 543 return 1; 544 item = item->next; 545 } 546 /* scan starting at the start-of-list (to wrap around the end) */ 547 item = dtio->io_list; 548 while(item) { 549 if(dtio_find_in_queue(dtio, item->queue)) 550 return 1; 551 item = item->next; 552 } 553 return 0; 554} 555 556/** callback for the dnstap reconnect, to start reconnecting to output */ 557void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), 558 short ATTR_UNUSED(bits), void* arg) 559{ 560 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 561 dtio->reconnect_is_added = 0; 562 verbose(VERB_ALGO, "dnstap io: reconnect timer"); 563 564 dtio_open_output(dtio); 565 if(dtio->event) { 566 if(!dtio_add_output_event_write(dtio)) 567 return; 568 /* nothing wrong so far, wait on the output event */ 569 return; 570 } 571 /* exponential backoff and retry on timer */ 572 dtio_reconnect_enable(dtio); 573} 574 575/** attempt to reconnect to the output, after a timeout */ 576static void dtio_reconnect_enable(struct dt_io_thread* dtio) 577{ 578 struct timeval tv; 579 int msec; 580 if(dtio->want_to_exit) return; 581 if(dtio->reconnect_is_added) 582 return; /* already done */ 583 584 /* exponential backoff, store the value for next timeout */ 585 msec = dtio->reconnect_timeout; 586 if(msec == 0) { 587 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; 588 } else { 589 dtio->reconnect_timeout = msec*2; 590 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) 591 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; 592 } 593 verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", 594 msec); 595 596 /* setup wait timer */ 597 memset(&tv, 0, sizeof(tv)); 598 tv.tv_sec = msec/1000; 599 tv.tv_usec = (msec%1000)*1000; 600 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, 601 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { 602 log_err("dnstap io: could not reconnect ev timer add"); 603 return; 604 } 605 dtio->reconnect_is_added = 1; 606} 607 608/** remove dtio reconnect timer */ 609static void dtio_reconnect_del(struct dt_io_thread* dtio) 610{ 611 if(!dtio->reconnect_is_added) 612 return; 613 ub_timer_del(dtio->reconnect_timer); 614 dtio->reconnect_is_added = 0; 615} 616 617/** clear the reconnect exponential backoff timer. 618 * We have successfully connected so we can try again with short timeouts. */ 619static void dtio_reconnect_clear(struct dt_io_thread* dtio) 620{ 621 dtio->reconnect_timeout = 0; 622 dtio_reconnect_del(dtio); 623} 624 625/** reconnect slowly, because we already know we have to wait for a bit */ 626static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec) 627{ 628 dtio_reconnect_del(dtio); 629 dtio->reconnect_timeout = msec; 630 dtio_reconnect_enable(dtio); 631} 632 633/** delete the current message in the dtio, and reset counters */ 634static void dtio_cur_msg_free(struct dt_io_thread* dtio) 635{ 636 free(dtio->cur_msg); 637 dtio->cur_msg = NULL; 638 dtio->cur_msg_len = 0; 639 dtio->cur_msg_done = 0; 640 dtio->cur_msg_len_done = 0; 641} 642 643/** delete the buffer and counters used to read frame */ 644static void dtio_read_frame_free(struct dt_frame_read_buf* rb) 645{ 646 if(rb->buf) { 647 free(rb->buf); 648 rb->buf = NULL; 649 } 650 rb->buf_count = 0; 651 rb->buf_cap = 0; 652 rb->frame_len = 0; 653 rb->frame_len_done = 0; 654 rb->control_frame = 0; 655} 656 657/** del the output file descriptor event for listening */ 658static void dtio_del_output_event(struct dt_io_thread* dtio) 659{ 660 if(!dtio->event_added) 661 return; 662 ub_event_del(dtio->event); 663 dtio->event_added = 0; 664 dtio->event_added_is_write = 0; 665} 666 667/** close dtio socket and set it to -1 */ 668static void dtio_close_fd(struct dt_io_thread* dtio) 669{ 670 sock_close(dtio->fd); 671 dtio->fd = -1; 672} 673 674/** close and stop the output file descriptor event */ 675static void dtio_close_output(struct dt_io_thread* dtio) 676{ 677 if(!dtio->event) 678 return; 679 ub_event_free(dtio->event); 680 dtio->event = NULL; 681 if(dtio->ssl) { 682#ifdef HAVE_SSL 683 SSL_shutdown(dtio->ssl); 684 SSL_free(dtio->ssl); 685 dtio->ssl = NULL; 686#endif 687 } 688 dtio_close_fd(dtio); 689 690 /* if there is a (partial) message, discard it 691 * we cannot send (the remainder of) it, and a new 692 * connection needs to start with a control frame. */ 693 if(dtio->cur_msg) { 694 dtio_cur_msg_free(dtio); 695 } 696 697 dtio->ready_frame_sent = 0; 698 dtio->accept_frame_received = 0; 699 dtio_read_frame_free(&dtio->read_frame); 700 701 dtio_reconnect_enable(dtio); 702} 703 704/** check for pending nonblocking connect errors, 705 * returns 1 if it is okay. -1 on error (close it), 0 to try later */ 706static int dtio_check_nb_connect(struct dt_io_thread* dtio) 707{ 708 int error = 0; 709 socklen_t len = (socklen_t)sizeof(error); 710 if(!dtio->check_nb_connect) 711 return 1; /* everything okay */ 712 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error, 713 &len) < 0) { 714#ifndef USE_WINSOCK 715 error = errno; /* on solaris errno is error */ 716#else 717 error = WSAGetLastError(); 718#endif 719 } 720#ifndef USE_WINSOCK 721#if defined(EINPROGRESS) && defined(EWOULDBLOCK) 722 if(error == EINPROGRESS || error == EWOULDBLOCK) 723 return 0; /* try again later */ 724#endif 725#else 726 if(error == WSAEINPROGRESS) { 727 return 0; /* try again later */ 728 } else if(error == WSAEWOULDBLOCK) { 729 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 730 dtio->stop_flush_event:dtio->event), UB_EV_WRITE); 731 return 0; /* try again later */ 732 } 733#endif 734 if(error != 0) { 735 char* to = dtio->socket_path; 736 if(!to) to = dtio->ip_str; 737 if(!to) to = ""; 738 log_err("dnstap io: failed to connect to \"%s\": %s", 739 to, sock_strerror(error)); 740 return -1; /* error, close it */ 741 } 742 743 if(dtio->ip_str) 744 verbose(VERB_DETAIL, "dnstap io: connected to %s", 745 dtio->ip_str); 746 else if(dtio->socket_path) 747 verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"", 748 dtio->socket_path); 749 dtio_reconnect_clear(dtio); 750 dtio->check_nb_connect = 0; 751 return 1; /* everything okay */ 752} 753 754#ifdef HAVE_SSL 755/** write to ssl output 756 * returns number of bytes written, 0 if nothing happened, 757 * try again later, or -1 if the channel is to be closed. */ 758static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf, 759 size_t len) 760{ 761 int r; 762 ERR_clear_error(); 763 r = SSL_write(dtio->ssl, buf, len); 764 if(r <= 0) { 765 int want = SSL_get_error(dtio->ssl, r); 766 if(want == SSL_ERROR_ZERO_RETURN) { 767 /* closed */ 768 return -1; 769 } else if(want == SSL_ERROR_WANT_READ) { 770 /* we want a brief read event */ 771 dtio_enable_brief_read(dtio); 772 return 0; 773 } else if(want == SSL_ERROR_WANT_WRITE) { 774 /* write again later */ 775 return 0; 776 } else if(want == SSL_ERROR_SYSCALL) { 777#ifdef EPIPE 778 if(errno == EPIPE && verbosity < 2) 779 return -1; /* silence 'broken pipe' */ 780#endif 781#ifdef ECONNRESET 782 if(errno == ECONNRESET && verbosity < 2) 783 return -1; /* silence reset by peer */ 784#endif 785 if(errno != 0) { 786 log_err("dnstap io, SSL_write syscall: %s", 787 strerror(errno)); 788 } 789 return -1; 790 } 791 log_crypto_err_io("dnstap io, could not SSL_write", want); 792 return -1; 793 } 794 return r; 795} 796#endif /* HAVE_SSL */ 797 798/** write buffer to output. 799 * returns number of bytes written, 0 if nothing happened, 800 * try again later, or -1 if the channel is to be closed. */ 801static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf, 802 size_t len) 803{ 804 ssize_t ret; 805 if(dtio->fd == -1) 806 return -1; 807#ifdef HAVE_SSL 808 if(dtio->ssl) 809 return dtio_write_ssl(dtio, buf, len); 810#endif 811 ret = send(dtio->fd, (void*)buf, len, 0); 812 if(ret == -1) { 813#ifndef USE_WINSOCK 814 if(errno == EINTR || errno == EAGAIN) 815 return 0; 816#else 817 if(WSAGetLastError() == WSAEINPROGRESS) 818 return 0; 819 if(WSAGetLastError() == WSAEWOULDBLOCK) { 820 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 821 dtio->stop_flush_event:dtio->event), 822 UB_EV_WRITE); 823 return 0; 824 } 825#endif 826 log_err("dnstap io: failed send: %s", sock_strerror(errno)); 827 return -1; 828 } 829 return ret; 830} 831 832#ifdef HAVE_WRITEV 833/** write with writev, len and message, in one write, if possible. 834 * return true if message is done, false if incomplete */ 835static int dtio_write_with_writev(struct dt_io_thread* dtio) 836{ 837 uint32_t sendlen = htonl(dtio->cur_msg_len); 838 struct iovec iov[2]; 839 ssize_t r; 840 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done; 841 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done; 842 iov[1].iov_base = dtio->cur_msg; 843 iov[1].iov_len = dtio->cur_msg_len; 844 log_assert(iov[0].iov_len > 0); 845 r = writev(dtio->fd, iov, 2); 846 if(r == -1) { 847#ifndef USE_WINSOCK 848 if(errno == EINTR || errno == EAGAIN) 849 return 0; 850#else 851 if(WSAGetLastError() == WSAEINPROGRESS) 852 return 0; 853 if(WSAGetLastError() == WSAEWOULDBLOCK) { 854 ub_winsock_tcp_wouldblock((dtio->stop_flush_event? 855 dtio->stop_flush_event:dtio->event), 856 UB_EV_WRITE); 857 return 0; 858 } 859#endif 860 log_err("dnstap io: failed writev: %s", sock_strerror(errno)); 861 /* close the channel */ 862 dtio_del_output_event(dtio); 863 dtio_close_output(dtio); 864 return 0; 865 } 866 /* written r bytes */ 867 dtio->cur_msg_len_done += r; 868 if(dtio->cur_msg_len_done < 4) 869 return 0; 870 if(dtio->cur_msg_len_done > 4) { 871 dtio->cur_msg_done = dtio->cur_msg_len_done-4; 872 dtio->cur_msg_len_done = 4; 873 } 874 if(dtio->cur_msg_done < dtio->cur_msg_len) 875 return 0; 876 return 1; 877} 878#endif /* HAVE_WRITEV */ 879 880/** write more of the length, preceding the data frame. 881 * return true if message is done, false if incomplete. */ 882static int dtio_write_more_of_len(struct dt_io_thread* dtio) 883{ 884 uint32_t sendlen; 885 int r; 886 if(dtio->cur_msg_len_done >= 4) 887 return 1; 888#ifdef HAVE_WRITEV 889 if(!dtio->ssl) { 890 /* we try writev for everything.*/ 891 return dtio_write_with_writev(dtio); 892 } 893#endif /* HAVE_WRITEV */ 894 sendlen = htonl(dtio->cur_msg_len); 895 r = dtio_write_buf(dtio, 896 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done, 897 sizeof(sendlen)-dtio->cur_msg_len_done); 898 if(r == -1) { 899 /* close the channel */ 900 dtio_del_output_event(dtio); 901 dtio_close_output(dtio); 902 return 0; 903 } else if(r == 0) { 904 /* try again later */ 905 return 0; 906 } 907 dtio->cur_msg_len_done += r; 908 if(dtio->cur_msg_len_done < 4) 909 return 0; 910 return 1; 911} 912 913/** write more of the data frame. 914 * return true if message is done, false if incomplete. */ 915static int dtio_write_more_of_data(struct dt_io_thread* dtio) 916{ 917 int r; 918 if(dtio->cur_msg_done >= dtio->cur_msg_len) 919 return 1; 920 r = dtio_write_buf(dtio, 921 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done, 922 dtio->cur_msg_len - dtio->cur_msg_done); 923 if(r == -1) { 924 /* close the channel */ 925 dtio_del_output_event(dtio); 926 dtio_close_output(dtio); 927 return 0; 928 } else if(r == 0) { 929 /* try again later */ 930 return 0; 931 } 932 dtio->cur_msg_done += r; 933 if(dtio->cur_msg_done < dtio->cur_msg_len) 934 return 0; 935 return 1; 936} 937 938/** write more of the current message. false if incomplete, true if 939 * the message is done */ 940static int dtio_write_more(struct dt_io_thread* dtio) 941{ 942 if(dtio->cur_msg_len_done < 4) { 943 if(!dtio_write_more_of_len(dtio)) 944 return 0; 945 } 946 if(dtio->cur_msg_done < dtio->cur_msg_len) { 947 if(!dtio_write_more_of_data(dtio)) 948 return 0; 949 } 950 return 1; 951} 952 953/** Receive bytes from dtio->fd, store in buffer. Returns 0: closed, 954 * -1: continue, >0: number of bytes read into buffer */ 955static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) { 956 ssize_t r; 957 r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT); 958 if(r == -1) { 959 char* to = dtio->socket_path; 960 if(!to) to = dtio->ip_str; 961 if(!to) to = ""; 962#ifndef USE_WINSOCK 963 if(errno == EINTR || errno == EAGAIN) 964 return -1; /* try later */ 965#else 966 if(WSAGetLastError() == WSAEINPROGRESS) { 967 return -1; /* try later */ 968 } else if(WSAGetLastError() == WSAEWOULDBLOCK) { 969 ub_winsock_tcp_wouldblock( 970 (dtio->stop_flush_event? 971 dtio->stop_flush_event:dtio->event), 972 UB_EV_READ); 973 return -1; /* try later */ 974 } 975#endif 976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 977 verbosity < 4) 978 return 0; /* no log retries on low verbosity */ 979 log_err("dnstap io: output closed, recv %s: %s", to, 980 strerror(errno)); 981 /* and close below */ 982 return 0; 983 } 984 if(r == 0) { 985 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 986 verbosity < 4) 987 return 0; /* no log retries on low verbosity */ 988 verbose(VERB_DETAIL, "dnstap io: output closed by the other side"); 989 /* and close below */ 990 return 0; 991 } 992 /* something was received */ 993 return r; 994} 995 996#ifdef HAVE_SSL 997/** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed, 998 * -1: continue, >0: number of bytes read into buffer */ 999static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len) 1000{ 1001 int r; 1002 ERR_clear_error(); 1003 r = SSL_read(dtio->ssl, buf, len); 1004 if(r <= 0) { 1005 int want = SSL_get_error(dtio->ssl, r); 1006 if(want == SSL_ERROR_ZERO_RETURN) { 1007 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1008 verbosity < 4) 1009 return 0; /* no log retries on low verbosity */ 1010 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1011 "other side"); 1012 return 0; 1013 } else if(want == SSL_ERROR_WANT_READ) { 1014 /* continue later */ 1015 return -1; 1016 } else if(want == SSL_ERROR_WANT_WRITE) { 1017 (void)dtio_enable_brief_write(dtio); 1018 return -1; 1019 } else if(want == SSL_ERROR_SYSCALL) { 1020#ifdef ECONNRESET 1021 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1022 errno == ECONNRESET && verbosity < 4) 1023 return 0; /* silence reset by peer */ 1024#endif 1025 if(errno != 0) 1026 log_err("SSL_read syscall: %s", 1027 strerror(errno)); 1028 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1029 "other side"); 1030 return 0; 1031 } 1032 log_crypto_err_io("could not SSL_read", want); 1033 verbose(VERB_DETAIL, "dnstap io: output closed by the " 1034 "other side"); 1035 return 0; 1036 } 1037 return r; 1038} 1039#endif /* HAVE_SSL */ 1040 1041/** check if the output fd has been closed, 1042 * it returns false if the stream is closed. */ 1043static int dtio_check_close(struct dt_io_thread* dtio) 1044{ 1045 /* we don't want to read any packets, but if there are we can 1046 * discard the input (ignore it). Ignore of unknown (control) 1047 * packets is okay for the framestream protocol. And also, the 1048 * read call can return that the stream has been closed by the 1049 * other side. */ 1050 uint8_t buf[1024]; 1051 int r = -1; 1052 1053 1054 if(dtio->fd == -1) return 0; 1055 1056 while(r != 0) { 1057 /* not interested in buffer content, overwrite */ 1058 r = receive_bytes(dtio, (void*)buf, sizeof(buf)); 1059 if(r == -1) 1060 return 1; 1061 } 1062 /* the other end has been closed */ 1063 /* close the channel */ 1064 dtio_del_output_event(dtio); 1065 dtio_close_output(dtio); 1066 return 0; 1067} 1068 1069/** Read accept frame. Returns -1: continue reading, 0: closed, 1070 * 1: valid accept received. */ 1071static int dtio_read_accept_frame(struct dt_io_thread* dtio) 1072{ 1073 int r; 1074 size_t read_frame_done; 1075 while(dtio->read_frame.frame_len_done < 4) { 1076#ifdef HAVE_SSL 1077 if(dtio->ssl) { 1078 r = ssl_read_bytes(dtio, 1079 (uint8_t*)&dtio->read_frame.frame_len+ 1080 dtio->read_frame.frame_len_done, 1081 4-dtio->read_frame.frame_len_done); 1082 } else { 1083#endif 1084 r = receive_bytes(dtio, 1085 (uint8_t*)&dtio->read_frame.frame_len+ 1086 dtio->read_frame.frame_len_done, 1087 4-dtio->read_frame.frame_len_done); 1088#ifdef HAVE_SSL 1089 } 1090#endif 1091 if(r == -1) 1092 return -1; /* continue reading */ 1093 if(r == 0) { 1094 /* connection closed */ 1095 goto close_connection; 1096 } 1097 dtio->read_frame.frame_len_done += r; 1098 if(dtio->read_frame.frame_len_done < 4) 1099 return -1; /* continue reading */ 1100 1101 if(dtio->read_frame.frame_len == 0) { 1102 dtio->read_frame.frame_len_done = 0; 1103 dtio->read_frame.control_frame = 1; 1104 continue; 1105 } 1106 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len); 1107 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) { 1108 verbose(VERB_OPS, "dnstap: received frame exceeds max " 1109 "length of %d bytes, closing connection", 1110 DTIO_RECV_FRAME_MAX_LEN); 1111 goto close_connection; 1112 } 1113 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len); 1114 dtio->read_frame.buf_cap = dtio->read_frame.frame_len; 1115 if(!dtio->read_frame.buf) { 1116 log_err("dnstap io: out of memory (creating read " 1117 "buffer)"); 1118 goto close_connection; 1119 } 1120 } 1121 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) { 1122#ifdef HAVE_SSL 1123 if(dtio->ssl) { 1124 r = ssl_read_bytes(dtio, dtio->read_frame.buf+ 1125 dtio->read_frame.buf_count, 1126 dtio->read_frame.buf_cap- 1127 dtio->read_frame.buf_count); 1128 } else { 1129#endif 1130 r = receive_bytes(dtio, dtio->read_frame.buf+ 1131 dtio->read_frame.buf_count, 1132 dtio->read_frame.buf_cap- 1133 dtio->read_frame.buf_count); 1134#ifdef HAVE_SSL 1135 } 1136#endif 1137 if(r == -1) 1138 return -1; /* continue reading */ 1139 if(r == 0) { 1140 /* connection closed */ 1141 goto close_connection; 1142 } 1143 dtio->read_frame.buf_count += r; 1144 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) 1145 return -1; /* continue reading */ 1146 } 1147 1148 /* Complete frame received, check if this is a valid ACCEPT control 1149 * frame. */ 1150 if(dtio->read_frame.frame_len < 4) { 1151 verbose(VERB_OPS, "dnstap: invalid data received"); 1152 goto close_connection; 1153 } 1154 if(sldns_read_uint32(dtio->read_frame.buf) != 1155 FSTRM_CONTROL_FRAME_ACCEPT) { 1156 verbose(VERB_ALGO, "dnstap: invalid control type received, " 1157 "ignored"); 1158 dtio->ready_frame_sent = 0; 1159 dtio->accept_frame_received = 0; 1160 dtio_read_frame_free(&dtio->read_frame); 1161 return -1; 1162 } 1163 read_frame_done = 4; /* control frame type */ 1164 1165 /* Iterate over control fields, ignore unknown types. 1166 * Need to be able to read at least 8 bytes (control field type + 1167 * length). */ 1168 while(read_frame_done+8 < dtio->read_frame.frame_len) { 1169 uint32_t type = sldns_read_uint32(dtio->read_frame.buf + 1170 read_frame_done); 1171 uint32_t len = sldns_read_uint32(dtio->read_frame.buf + 1172 read_frame_done + 4); 1173 if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) { 1174 if(len == strlen(DNSTAP_CONTENT_TYPE) && 1175 read_frame_done+8+len <= 1176 dtio->read_frame.frame_len && 1177 memcmp(dtio->read_frame.buf + read_frame_done + 1178 + 8, DNSTAP_CONTENT_TYPE, len) == 0) { 1179 if(!dtio_control_start_send(dtio)) { 1180 verbose(VERB_OPS, "dnstap io: out of " 1181 "memory while sending START frame"); 1182 goto close_connection; 1183 } 1184 dtio->accept_frame_received = 1; 1185 if(!dtio_add_output_event_write(dtio)) 1186 goto close_connection; 1187 return 1; 1188 } else { 1189 /* unknown content type */ 1190 verbose(VERB_ALGO, "dnstap: ACCEPT frame " 1191 "contains unknown content type, " 1192 "closing connection"); 1193 goto close_connection; 1194 } 1195 } 1196 /* unknown option, try next */ 1197 read_frame_done += 8+len; 1198 } 1199 1200 1201close_connection: 1202 dtio_del_output_event(dtio); 1203 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1204 dtio_close_output(dtio); 1205 return 0; 1206} 1207 1208/** add the output file descriptor event for listening, read only */ 1209static int dtio_add_output_event_read(struct dt_io_thread* dtio) 1210{ 1211 if(!dtio->event) 1212 return 0; 1213 if(dtio->event_added && !dtio->event_added_is_write) 1214 return 1; 1215 /* we have to (re-)register the event */ 1216 if(dtio->event_added) 1217 ub_event_del(dtio->event); 1218 ub_event_del_bits(dtio->event, UB_EV_WRITE); 1219 if(ub_event_add(dtio->event, NULL) != 0) { 1220 log_err("dnstap io: out of memory (adding event)"); 1221 dtio->event_added = 0; 1222 dtio->event_added_is_write = 0; 1223 /* close output and start reattempts to open it */ 1224 dtio_close_output(dtio); 1225 return 0; 1226 } 1227 dtio->event_added = 1; 1228 dtio->event_added_is_write = 0; 1229 return 1; 1230} 1231 1232/** add the output file descriptor event for listening, read and write */ 1233static int dtio_add_output_event_write(struct dt_io_thread* dtio) 1234{ 1235 if(!dtio->event) 1236 return 0; 1237 if(dtio->event_added && dtio->event_added_is_write) 1238 return 1; 1239 /* we have to (re-)register the event */ 1240 if(dtio->event_added) 1241 ub_event_del(dtio->event); 1242 ub_event_add_bits(dtio->event, UB_EV_WRITE); 1243 if(ub_event_add(dtio->event, NULL) != 0) { 1244 log_err("dnstap io: out of memory (adding event)"); 1245 dtio->event_added = 0; 1246 dtio->event_added_is_write = 0; 1247 /* close output and start reattempts to open it */ 1248 dtio_close_output(dtio); 1249 return 0; 1250 } 1251 dtio->event_added = 1; 1252 dtio->event_added_is_write = 1; 1253 return 1; 1254} 1255 1256/** put the dtio thread to sleep */ 1257static void dtio_sleep(struct dt_io_thread* dtio) 1258{ 1259 /* unregister the event polling for write, because there is 1260 * nothing to be written */ 1261 (void)dtio_add_output_event_read(dtio); 1262} 1263 1264#ifdef HAVE_SSL 1265/** enable the brief read condition */ 1266static int dtio_enable_brief_read(struct dt_io_thread* dtio) 1267{ 1268 dtio->ssl_brief_read = 1; 1269 if(dtio->stop_flush_event) { 1270 ub_event_del(dtio->stop_flush_event); 1271 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE); 1272 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1273 log_err("dnstap io, stop flush, could not ub_event_add"); 1274 return 0; 1275 } 1276 return 1; 1277 } 1278 return dtio_add_output_event_read(dtio); 1279} 1280#endif /* HAVE_SSL */ 1281 1282#ifdef HAVE_SSL 1283/** disable the brief read condition */ 1284static int dtio_disable_brief_read(struct dt_io_thread* dtio) 1285{ 1286 dtio->ssl_brief_read = 0; 1287 if(dtio->stop_flush_event) { 1288 ub_event_del(dtio->stop_flush_event); 1289 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE); 1290 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) { 1291 log_err("dnstap io, stop flush, could not ub_event_add"); 1292 return 0; 1293 } 1294 return 1; 1295 } 1296 return dtio_add_output_event_write(dtio); 1297} 1298#endif /* HAVE_SSL */ 1299 1300#ifdef HAVE_SSL 1301/** enable the brief write condition */ 1302static int dtio_enable_brief_write(struct dt_io_thread* dtio) 1303{ 1304 dtio->ssl_brief_write = 1; 1305 return dtio_add_output_event_write(dtio); 1306} 1307#endif /* HAVE_SSL */ 1308 1309#ifdef HAVE_SSL 1310/** disable the brief write condition */ 1311static int dtio_disable_brief_write(struct dt_io_thread* dtio) 1312{ 1313 dtio->ssl_brief_write = 0; 1314 return dtio_add_output_event_read(dtio); 1315} 1316#endif /* HAVE_SSL */ 1317 1318#ifdef HAVE_SSL 1319/** check peer verification after ssl handshake connection, false if closed*/ 1320static int dtio_ssl_check_peer(struct dt_io_thread* dtio) 1321{ 1322 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) { 1323 /* verification */ 1324 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) { 1325 X509* x = SSL_get_peer_certificate(dtio->ssl); 1326 if(!x) { 1327 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1328 "connection failed no certificate", 1329 dtio->ip_str); 1330 return 0; 1331 } 1332 log_cert(VERB_ALGO, "dnstap io, peer certificate", 1333 x); 1334#ifdef HAVE_SSL_GET0_PEERNAME 1335 if(SSL_get0_peername(dtio->ssl)) { 1336 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1337 "connection to %s authenticated", 1338 dtio->ip_str, 1339 SSL_get0_peername(dtio->ssl)); 1340 } else { 1341#endif 1342 verbose(VERB_ALGO, "dnstap io, %s, SSL " 1343 "connection authenticated", 1344 dtio->ip_str); 1345#ifdef HAVE_SSL_GET0_PEERNAME 1346 } 1347#endif 1348 X509_free(x); 1349 } else { 1350 X509* x = SSL_get_peer_certificate(dtio->ssl); 1351 if(x) { 1352 log_cert(VERB_ALGO, "dnstap io, peer " 1353 "certificate", x); 1354 X509_free(x); 1355 } 1356 verbose(VERB_ALGO, "dnstap io, %s, SSL connection " 1357 "failed: failed to authenticate", 1358 dtio->ip_str); 1359 return 0; 1360 } 1361 } else { 1362 /* unauthenticated, the verify peer flag was not set 1363 * in ssl when the ssl object was created from ssl_ctx */ 1364 verbose(VERB_ALGO, "dnstap io, %s, SSL connection", 1365 dtio->ip_str); 1366 } 1367 return 1; 1368} 1369#endif /* HAVE_SSL */ 1370 1371#ifdef HAVE_SSL 1372/** perform ssl handshake, returns 1 if okay, 0 to stop */ 1373static int dtio_ssl_handshake(struct dt_io_thread* dtio, 1374 struct stop_flush_info* info) 1375{ 1376 int r; 1377 if(dtio->ssl_brief_read) { 1378 /* assume the brief read condition is satisfied, 1379 * if we need more or again, we can set it again */ 1380 if(!dtio_disable_brief_read(dtio)) { 1381 if(info) dtio_stop_flush_exit(info); 1382 return 0; 1383 } 1384 } 1385 if(dtio->ssl_handshake_done) 1386 return 1; 1387 1388 ERR_clear_error(); 1389 r = SSL_do_handshake(dtio->ssl); 1390 if(r != 1) { 1391 int want = SSL_get_error(dtio->ssl, r); 1392 if(want == SSL_ERROR_WANT_READ) { 1393 /* we want to read on the connection */ 1394 if(!dtio_enable_brief_read(dtio)) { 1395 if(info) dtio_stop_flush_exit(info); 1396 return 0; 1397 } 1398 return 0; 1399 } else if(want == SSL_ERROR_WANT_WRITE) { 1400 /* we want to write on the connection */ 1401 return 0; 1402 } else if(r == 0) { 1403 /* closed */ 1404 if(info) dtio_stop_flush_exit(info); 1405 dtio_del_output_event(dtio); 1406 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1407 dtio_close_output(dtio); 1408 return 0; 1409 } else if(want == SSL_ERROR_SYSCALL) { 1410 /* SYSCALL and errno==0 means closed uncleanly */ 1411 int silent = 0; 1412#ifdef EPIPE 1413 if(errno == EPIPE && verbosity < 2) 1414 silent = 1; /* silence 'broken pipe' */ 1415#endif 1416#ifdef ECONNRESET 1417 if(errno == ECONNRESET && verbosity < 2) 1418 silent = 1; /* silence reset by peer */ 1419#endif 1420 if(errno == 0) 1421 silent = 1; 1422 if(!silent) 1423 log_err("dnstap io, SSL_handshake syscall: %s", 1424 strerror(errno)); 1425 /* closed */ 1426 if(info) dtio_stop_flush_exit(info); 1427 dtio_del_output_event(dtio); 1428 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1429 dtio_close_output(dtio); 1430 return 0; 1431 } else { 1432 unsigned long err = ERR_get_error(); 1433 if(!squelch_err_ssl_handshake(err)) { 1434 log_crypto_err_io_code("dnstap io, ssl handshake failed", 1435 want, err); 1436 verbose(VERB_OPS, "dnstap io, ssl handshake failed " 1437 "from %s", dtio->ip_str); 1438 } 1439 /* closed */ 1440 if(info) dtio_stop_flush_exit(info); 1441 dtio_del_output_event(dtio); 1442 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1443 dtio_close_output(dtio); 1444 return 0; 1445 } 1446 1447 } 1448 /* check peer verification */ 1449 dtio->ssl_handshake_done = 1; 1450 1451 if(!dtio_ssl_check_peer(dtio)) { 1452 /* closed */ 1453 if(info) dtio_stop_flush_exit(info); 1454 dtio_del_output_event(dtio); 1455 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW); 1456 dtio_close_output(dtio); 1457 return 0; 1458 } 1459 return 1; 1460} 1461#endif /* HAVE_SSL */ 1462 1463/** callback for the dnstap events, to write to the output */ 1464void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1465{ 1466 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1467 int i; 1468 1469 if(dtio->check_nb_connect) { 1470 int connect_err = dtio_check_nb_connect(dtio); 1471 if(connect_err == -1) { 1472 /* close the channel */ 1473 dtio_del_output_event(dtio); 1474 dtio_close_output(dtio); 1475 return; 1476 } else if(connect_err == 0) { 1477 /* try again later */ 1478 return; 1479 } 1480 /* nonblocking connect check passed, continue */ 1481 } 1482 1483#ifdef HAVE_SSL 1484 if(dtio->ssl && 1485 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1486 if(!dtio_ssl_handshake(dtio, NULL)) 1487 return; 1488 } 1489#endif 1490 1491 if((bits&UB_EV_READ || dtio->ssl_brief_write)) { 1492 if(dtio->ssl_brief_write) 1493 (void)dtio_disable_brief_write(dtio); 1494 if(dtio->ready_frame_sent && !dtio->accept_frame_received) { 1495 if(dtio_read_accept_frame(dtio) <= 0) 1496 return; 1497 } else if(!dtio_check_close(dtio)) 1498 return; 1499 } 1500 1501 /* loop to process a number of messages. This improves throughput, 1502 * because selecting on write-event if not needed for busy messages 1503 * (dnstap log) generation and if they need to all be written back. 1504 * The write event is usually not blocked up. But not forever, 1505 * because the event loop needs to stay responsive for other events. 1506 * If there are no (more) messages, or if the output buffers get 1507 * full, it returns out of the loop. */ 1508 for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) { 1509 /* see if there are messages that need writing */ 1510 if(!dtio->cur_msg) { 1511 if(!dtio_find_msg(dtio)) { 1512 if(i == 0) { 1513 /* no messages on the first iteration, 1514 * the queues are all empty */ 1515 dtio_sleep(dtio); 1516 } 1517 return; /* nothing to do */ 1518 } 1519 } 1520 1521 /* write it */ 1522 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1523 if(!dtio_write_more(dtio)) 1524 return; 1525 } 1526 1527 /* done with the current message */ 1528 dtio_cur_msg_free(dtio); 1529 1530 /* If this is a bidirectional stream the first message will be 1531 * the READY control frame. We can only continue writing after 1532 * receiving an ACCEPT control frame. */ 1533 if(dtio->is_bidirectional && !dtio->ready_frame_sent) { 1534 dtio->ready_frame_sent = 1; 1535 (void)dtio_add_output_event_read(dtio); 1536 break; 1537 } 1538 } 1539} 1540 1541/** callback for the dnstap commandpipe, to stop the dnstap IO */ 1542void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) 1543{ 1544 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 1545 uint8_t cmd; 1546 ssize_t r; 1547 if(dtio->want_to_exit) 1548 return; 1549 r = read(fd, &cmd, sizeof(cmd)); 1550 if(r == -1) { 1551#ifndef USE_WINSOCK 1552 if(errno == EINTR || errno == EAGAIN) 1553 return; /* ignore this */ 1554#else 1555 if(WSAGetLastError() == WSAEINPROGRESS) 1556 return; 1557 if(WSAGetLastError() == WSAEWOULDBLOCK) 1558 return; 1559#endif 1560 log_err("dnstap io: failed to read: %s", sock_strerror(errno)); 1561 /* and then fall through to quit the thread */ 1562 } else if(r == 0) { 1563 verbose(VERB_ALGO, "dnstap io: cmd channel closed"); 1564 } else if(r == 1 && cmd == DTIO_COMMAND_STOP) { 1565 verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); 1566 } else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) { 1567 verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup"); 1568 1569 if(dtio->is_bidirectional && !dtio->accept_frame_received) { 1570 verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, " 1571 "waiting for ACCEPT control frame"); 1572 return; 1573 } 1574 1575 /* reregister event */ 1576 if(!dtio_add_output_event_write(dtio)) 1577 return; 1578 return; 1579 } else if(r == 1) { 1580 verbose(VERB_ALGO, "dnstap io: cmd channel unknown command"); 1581 } 1582 dtio->want_to_exit = 1; 1583 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) 1584 != 0) { 1585 log_err("dnstap io: could not loopexit"); 1586 } 1587} 1588 1589#ifndef THREADS_DISABLED 1590/** setup the event base for the dnstap io thread */ 1591static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, 1592 struct timeval* now) 1593{ 1594 memset(now, 0, sizeof(*now)); 1595 dtio->event_base = ub_default_event_base(0, secs, now); 1596 if(!dtio->event_base) { 1597 fatal_exit("dnstap io: could not create event_base"); 1598 } 1599} 1600#endif /* THREADS_DISABLED */ 1601 1602/** setup the cmd event for dnstap io */ 1603static void dtio_setup_cmd(struct dt_io_thread* dtio) 1604{ 1605 struct ub_event* cmdev; 1606 fd_set_nonblock(dtio->commandpipe[0]); 1607 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], 1608 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); 1609 if(!cmdev) { 1610 fatal_exit("dnstap io: out of memory"); 1611 } 1612 dtio->command_event = cmdev; 1613 if(ub_event_add(cmdev, NULL) != 0) { 1614 fatal_exit("dnstap io: out of memory (adding event)"); 1615 } 1616} 1617 1618/** setup the reconnect event for dnstap io */ 1619static void dtio_setup_reconnect(struct dt_io_thread* dtio) 1620{ 1621 dtio_reconnect_clear(dtio); 1622 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, 1623 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); 1624 if(!dtio->reconnect_timer) { 1625 fatal_exit("dnstap io: out of memory"); 1626 } 1627} 1628 1629/** 1630 * structure to keep track of information during stop flush 1631 */ 1632struct stop_flush_info { 1633 /** the event base during stop flush */ 1634 struct ub_event_base* base; 1635 /** did we already want to exit this stop-flush event base */ 1636 int want_to_exit_flush; 1637 /** has the timer fired */ 1638 int timer_done; 1639 /** the dtio */ 1640 struct dt_io_thread* dtio; 1641 /** the stop control frame */ 1642 void* stop_frame; 1643 /** length of the stop frame */ 1644 size_t stop_frame_len; 1645 /** how much we have done of the stop frame */ 1646 size_t stop_frame_done; 1647}; 1648 1649/** exit the stop flush base */ 1650static void dtio_stop_flush_exit(struct stop_flush_info* info) 1651{ 1652 if(info->want_to_exit_flush) 1653 return; 1654 info->want_to_exit_flush = 1; 1655 if(ub_event_base_loopexit(info->base) != 0) { 1656 log_err("dnstap io: could not loopexit"); 1657 } 1658} 1659 1660/** send the stop control, 1661 * return true if completed the frame. */ 1662static int dtio_control_stop_send(struct stop_flush_info* info) 1663{ 1664 struct dt_io_thread* dtio = info->dtio; 1665 int r; 1666 if(info->stop_frame_done >= info->stop_frame_len) 1667 return 1; 1668 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) + 1669 info->stop_frame_done, info->stop_frame_len - 1670 info->stop_frame_done); 1671 if(r == -1) { 1672 verbose(VERB_ALGO, "dnstap io: stop flush: output closed"); 1673 dtio_stop_flush_exit(info); 1674 return 0; 1675 } 1676 if(r == 0) { 1677 /* try again later, or timeout */ 1678 return 0; 1679 } 1680 info->stop_frame_done += r; 1681 if(info->stop_frame_done < info->stop_frame_len) 1682 return 0; /* not done yet */ 1683 return 1; 1684} 1685 1686void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), 1687 void* arg) 1688{ 1689 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1690 if(info->want_to_exit_flush) 1691 return; 1692 verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush"); 1693 info->timer_done = 1; 1694 dtio_stop_flush_exit(info); 1695} 1696 1697void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg) 1698{ 1699 struct stop_flush_info* info = (struct stop_flush_info*)arg; 1700 struct dt_io_thread* dtio = info->dtio; 1701 if(info->want_to_exit_flush) 1702 return; 1703 if(dtio->check_nb_connect) { 1704 /* we don't start the stop_flush if connect still 1705 * in progress, but the check code is here, just in case */ 1706 int connect_err = dtio_check_nb_connect(dtio); 1707 if(connect_err == -1) { 1708 /* close the channel, exit the stop flush */ 1709 dtio_stop_flush_exit(info); 1710 dtio_del_output_event(dtio); 1711 dtio_close_output(dtio); 1712 return; 1713 } else if(connect_err == 0) { 1714 /* try again later */ 1715 return; 1716 } 1717 /* nonblocking connect check passed, continue */ 1718 } 1719#ifdef HAVE_SSL 1720 if(dtio->ssl && 1721 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) { 1722 if(!dtio_ssl_handshake(dtio, info)) 1723 return; 1724 } 1725#endif 1726 1727 if((bits&UB_EV_READ)) { 1728 if(!dtio_check_close(dtio)) { 1729 if(dtio->fd == -1) { 1730 verbose(VERB_ALGO, "dnstap io: " 1731 "stop flush: output closed"); 1732 dtio_stop_flush_exit(info); 1733 } 1734 return; 1735 } 1736 } 1737 /* write remainder of last frame */ 1738 if(dtio->cur_msg) { 1739 if(dtio->cur_msg_done < dtio->cur_msg_len) { 1740 if(!dtio_write_more(dtio)) { 1741 if(dtio->fd == -1) { 1742 verbose(VERB_ALGO, "dnstap io: " 1743 "stop flush: output closed"); 1744 dtio_stop_flush_exit(info); 1745 } 1746 return; 1747 } 1748 } 1749 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1750 "last frame"); 1751 dtio_cur_msg_free(dtio); 1752 } 1753 /* write stop frame */ 1754 if(info->stop_frame_done < info->stop_frame_len) { 1755 if(!dtio_control_stop_send(info)) 1756 return; 1757 verbose(VERB_ALGO, "dnstap io: stop flush completed " 1758 "stop control frame"); 1759 } 1760 /* when last frame and stop frame are sent, exit */ 1761 dtio_stop_flush_exit(info); 1762} 1763 1764/** flush at end, last packet and stop control */ 1765static void dtio_control_stop_flush(struct dt_io_thread* dtio) 1766{ 1767 /* briefly attempt to flush the previous packet to the output, 1768 * this could be a partial packet, or even the start control frame */ 1769 time_t secs = 0; 1770 struct timeval now; 1771 struct stop_flush_info info; 1772 struct timeval tv; 1773 struct ub_event* timer, *stopev; 1774 1775 if(dtio->fd == -1 || dtio->check_nb_connect) { 1776 /* no connection or we have just connected, so nothing is 1777 * sent yet, so nothing to stop or flush */ 1778 return; 1779 } 1780 if(dtio->ssl && !dtio->ssl_handshake_done) { 1781 /* no SSL connection has been established yet */ 1782 return; 1783 } 1784 1785 memset(&info, 0, sizeof(info)); 1786 memset(&now, 0, sizeof(now)); 1787 info.dtio = dtio; 1788 info.base = ub_default_event_base(0, &secs, &now); 1789 if(!info.base) { 1790 log_err("dnstap io: malloc failure"); 1791 return; 1792 } 1793 timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT, 1794 &dtio_stop_timer_cb, &info); 1795 if(!timer) { 1796 log_err("dnstap io: malloc failure"); 1797 ub_event_base_free(info.base); 1798 return; 1799 } 1800 memset(&tv, 0, sizeof(tv)); 1801 tv.tv_sec = 2; 1802 if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info, 1803 &tv) != 0) { 1804 log_err("dnstap io: cannot event_timer_add"); 1805 ub_event_free(timer); 1806 ub_event_base_free(info.base); 1807 return; 1808 } 1809 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ | 1810 UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info); 1811 if(!stopev) { 1812 log_err("dnstap io: malloc failure"); 1813 ub_timer_del(timer); 1814 ub_event_free(timer); 1815 ub_event_base_free(info.base); 1816 return; 1817 } 1818 if(ub_event_add(stopev, NULL) != 0) { 1819 log_err("dnstap io: cannot event_add"); 1820 ub_event_free(stopev); 1821 ub_timer_del(timer); 1822 ub_event_free(timer); 1823 ub_event_base_free(info.base); 1824 return; 1825 } 1826 info.stop_frame = fstrm_create_control_frame_stop( 1827 &info.stop_frame_len); 1828 if(!info.stop_frame) { 1829 log_err("dnstap io: malloc failure"); 1830 ub_event_del(stopev); 1831 ub_event_free(stopev); 1832 ub_timer_del(timer); 1833 ub_event_free(timer); 1834 ub_event_base_free(info.base); 1835 return; 1836 } 1837 dtio->stop_flush_event = stopev; 1838 1839 /* wait briefly, or until finished */ 1840 verbose(VERB_ALGO, "dnstap io: stop flush started"); 1841 if(ub_event_base_dispatch(info.base) < 0) { 1842 log_err("dnstap io: dispatch flush failed, errno is %s", 1843 strerror(errno)); 1844 } 1845 verbose(VERB_ALGO, "dnstap io: stop flush ended"); 1846 free(info.stop_frame); 1847 dtio->stop_flush_event = NULL; 1848 ub_event_del(stopev); 1849 ub_event_free(stopev); 1850 ub_timer_del(timer); 1851 ub_event_free(timer); 1852 ub_event_base_free(info.base); 1853} 1854 1855/** perform desetup and free stuff when the dnstap io thread exits */ 1856static void dtio_desetup(struct dt_io_thread* dtio) 1857{ 1858 dtio_control_stop_flush(dtio); 1859 dtio_del_output_event(dtio); 1860 dtio_close_output(dtio); 1861 ub_event_del(dtio->command_event); 1862 ub_event_free(dtio->command_event); 1863#ifndef USE_WINSOCK 1864 close(dtio->commandpipe[0]); 1865#else 1866 _close(dtio->commandpipe[0]); 1867#endif 1868 dtio->commandpipe[0] = -1; 1869 dtio_reconnect_del(dtio); 1870 ub_event_free(dtio->reconnect_timer); 1871 dtio_cur_msg_free(dtio); 1872#ifndef THREADS_DISABLED 1873 ub_event_base_free(dtio->event_base); 1874#endif 1875} 1876 1877/** setup a start control message */ 1878static int dtio_control_start_send(struct dt_io_thread* dtio) 1879{ 1880 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1881 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE, 1882 &dtio->cur_msg_len); 1883 if(!dtio->cur_msg) { 1884 return 0; 1885 } 1886 /* setup to send the control message */ 1887 /* set that the buffer needs to be sent, but the length 1888 * of that buffer is already written, that way the buffer can 1889 * start with 0 length and then the length of the control frame 1890 * in it */ 1891 dtio->cur_msg_done = 0; 1892 dtio->cur_msg_len_done = 4; 1893 return 1; 1894} 1895 1896/** setup a ready control message */ 1897static int dtio_control_ready_send(struct dt_io_thread* dtio) 1898{ 1899 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0); 1900 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE, 1901 &dtio->cur_msg_len); 1902 if(!dtio->cur_msg) { 1903 return 0; 1904 } 1905 /* setup to send the control message */ 1906 /* set that the buffer needs to be sent, but the length 1907 * of that buffer is already written, that way the buffer can 1908 * start with 0 length and then the length of the control frame 1909 * in it */ 1910 dtio->cur_msg_done = 0; 1911 dtio->cur_msg_len_done = 4; 1912 return 1; 1913} 1914 1915/** open the output file descriptor for af_local */ 1916static int dtio_open_output_local(struct dt_io_thread* dtio) 1917{ 1918#ifdef HAVE_SYS_UN_H 1919 struct sockaddr_un s; 1920 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0); 1921 if(dtio->fd == -1) { 1922 log_err("dnstap io: failed to create socket: %s", 1923 sock_strerror(errno)); 1924 return 0; 1925 } 1926 memset(&s, 0, sizeof(s)); 1927#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN 1928 /* this member exists on BSDs, not Linux */ 1929 s.sun_len = (unsigned)sizeof(s); 1930#endif 1931 s.sun_family = AF_LOCAL; 1932 /* length is 92-108, 104 on FreeBSD */ 1933 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); 1934 fd_set_nonblock(dtio->fd); 1935 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) 1936 == -1) { 1937 char* to = dtio->socket_path; 1938 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1939 verbosity < 4) { 1940 dtio_close_fd(dtio); 1941 return 0; /* no log retries on low verbosity */ 1942 } 1943 log_err("dnstap io: failed to connect to \"%s\": %s", 1944 to, sock_strerror(errno)); 1945 dtio_close_fd(dtio); 1946 return 0; 1947 } 1948 return 1; 1949#else 1950 log_err("cannot create af_local socket"); 1951 return 0; 1952#endif /* HAVE_SYS_UN_H */ 1953} 1954 1955/** open the output file descriptor for af_inet and af_inet6 */ 1956static int dtio_open_output_tcp(struct dt_io_thread* dtio) 1957{ 1958 struct sockaddr_storage addr; 1959 socklen_t addrlen; 1960 memset(&addr, 0, sizeof(addr)); 1961 addrlen = (socklen_t)sizeof(addr); 1962 1963 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) { 1964 log_err("could not parse IP '%s'", dtio->ip_str); 1965 return 0; 1966 } 1967 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0); 1968 if(dtio->fd == -1) { 1969 log_err("can't create socket: %s", sock_strerror(errno)); 1970 return 0; 1971 } 1972 fd_set_nonblock(dtio->fd); 1973 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) { 1974 if(errno == EINPROGRESS) 1975 return 1; /* wait until connect done*/ 1976 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN && 1977 verbosity < 4) { 1978 dtio_close_fd(dtio); 1979 return 0; /* no log retries on low verbosity */ 1980 } 1981#ifndef USE_WINSOCK 1982 if(tcp_connect_errno_needs_log( 1983 (struct sockaddr *)&addr, addrlen)) { 1984 log_err("dnstap io: failed to connect to %s: %s", 1985 dtio->ip_str, strerror(errno)); 1986 } 1987#else 1988 if(WSAGetLastError() == WSAEINPROGRESS || 1989 WSAGetLastError() == WSAEWOULDBLOCK) 1990 return 1; /* wait until connect done*/ 1991 if(tcp_connect_errno_needs_log( 1992 (struct sockaddr *)&addr, addrlen)) { 1993 log_err("dnstap io: failed to connect to %s: %s", 1994 dtio->ip_str, wsa_strerror(WSAGetLastError())); 1995 } 1996#endif 1997 dtio_close_fd(dtio); 1998 return 0; 1999 } 2000 return 1; 2001} 2002 2003/** setup the SSL structure for new connection */ 2004static int dtio_setup_ssl(struct dt_io_thread* dtio) 2005{ 2006 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd); 2007 if(!dtio->ssl) return 0; 2008 dtio->ssl_handshake_done = 0; 2009 dtio->ssl_brief_read = 0; 2010 2011 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name, 2012 dtio->tls_use_sni)) { 2013 return 0; 2014 } 2015 return 1; 2016} 2017 2018/** open the output file descriptor */ 2019static void dtio_open_output(struct dt_io_thread* dtio) 2020{ 2021 struct ub_event* ev; 2022 if(dtio->upstream_is_unix) { 2023 if(!dtio_open_output_local(dtio)) { 2024 dtio_reconnect_enable(dtio); 2025 return; 2026 } 2027 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) { 2028 if(!dtio_open_output_tcp(dtio)) { 2029 dtio_reconnect_enable(dtio); 2030 return; 2031 } 2032 if(dtio->upstream_is_tls) { 2033 if(!dtio_setup_ssl(dtio)) { 2034 dtio_close_fd(dtio); 2035 dtio_reconnect_enable(dtio); 2036 return; 2037 } 2038 } 2039 } 2040 dtio->check_nb_connect = 1; 2041 2042 /* the EV_READ is to read ACCEPT control messages, and catch channel 2043 * close. EV_WRITE is to write packets */ 2044 ev = ub_event_new(dtio->event_base, dtio->fd, 2045 UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, 2046 dtio); 2047 if(!ev) { 2048 log_err("dnstap io: out of memory"); 2049 if(dtio->ssl) { 2050#ifdef HAVE_SSL 2051 SSL_free(dtio->ssl); 2052 dtio->ssl = NULL; 2053#endif 2054 } 2055 dtio_close_fd(dtio); 2056 dtio_reconnect_enable(dtio); 2057 return; 2058 } 2059 dtio->event = ev; 2060 2061 /* setup protocol control message to start */ 2062 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) || 2063 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) { 2064 log_err("dnstap io: out of memory"); 2065 ub_event_free(dtio->event); 2066 dtio->event = NULL; 2067 if(dtio->ssl) { 2068#ifdef HAVE_SSL 2069 SSL_free(dtio->ssl); 2070 dtio->ssl = NULL; 2071#endif 2072 } 2073 dtio_close_fd(dtio); 2074 dtio_reconnect_enable(dtio); 2075 return; 2076 } 2077} 2078 2079/** perform the setup of the writer thread on the established event_base */ 2080static void dtio_setup_on_base(struct dt_io_thread* dtio) 2081{ 2082 dtio_setup_cmd(dtio); 2083 dtio_setup_reconnect(dtio); 2084 dtio_open_output(dtio); 2085 if(!dtio_add_output_event_write(dtio)) 2086 return; 2087} 2088 2089#ifndef THREADS_DISABLED 2090/** the IO thread function for the DNSTAP IO */ 2091static void* dnstap_io(void* arg) 2092{ 2093 struct dt_io_thread* dtio = (struct dt_io_thread*)arg; 2094 time_t secs = 0; 2095 struct timeval now; 2096 log_thread_set(&dtio->threadnum); 2097 2098 /* setup */ 2099 verbose(VERB_ALGO, "start dnstap io thread"); 2100 dtio_setup_base(dtio, &secs, &now); 2101 dtio_setup_on_base(dtio); 2102 2103 /* run */ 2104 if(ub_event_base_dispatch(dtio->event_base) < 0) { 2105 log_err("dnstap io: dispatch failed, errno is %s", 2106 strerror(errno)); 2107 } 2108 2109 /* cleanup */ 2110 verbose(VERB_ALGO, "stop dnstap io thread"); 2111 dtio_desetup(dtio); 2112 return NULL; 2113} 2114#endif /* THREADS_DISABLED */ 2115 2116int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, 2117 int numworkers) 2118{ 2119 /* set up the thread, can fail */ 2120#ifndef USE_WINSOCK 2121 if(pipe(dtio->commandpipe) == -1) { 2122 log_err("failed to create pipe: %s", strerror(errno)); 2123 return 0; 2124 } 2125#else 2126 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) { 2127 log_err("failed to create _pipe: %s", 2128 wsa_strerror(WSAGetLastError())); 2129 return 0; 2130 } 2131#endif 2132 2133 /* start the thread */ 2134 dtio->threadnum = numworkers+1; 2135 dtio->started = 1; 2136#ifndef THREADS_DISABLED 2137 ub_thread_create(&dtio->tid, dnstap_io, dtio); 2138 (void)event_base_nothr; 2139#else 2140 dtio->event_base = event_base_nothr; 2141 dtio_setup_on_base(dtio); 2142#endif 2143 return 1; 2144} 2145 2146void dt_io_thread_stop(struct dt_io_thread* dtio) 2147{ 2148#ifndef THREADS_DISABLED 2149 uint8_t cmd = DTIO_COMMAND_STOP; 2150#endif 2151 if(!dtio) return; 2152 if(!dtio->started) return; 2153 verbose(VERB_ALGO, "dnstap io: send stop cmd"); 2154 2155#ifndef THREADS_DISABLED 2156 while(1) { 2157 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); 2158 if(r == -1) { 2159#ifndef USE_WINSOCK 2160 if(errno == EINTR || errno == EAGAIN) 2161 continue; 2162#else 2163 if(WSAGetLastError() == WSAEINPROGRESS) 2164 continue; 2165 if(WSAGetLastError() == WSAEWOULDBLOCK) 2166 continue; 2167#endif 2168 log_err("dnstap io stop: write: %s", 2169 sock_strerror(errno)); 2170 break; 2171 } 2172 break; 2173 } 2174 dtio->started = 0; 2175#endif /* THREADS_DISABLED */ 2176 2177#ifndef USE_WINSOCK 2178 close(dtio->commandpipe[1]); 2179#else 2180 _close(dtio->commandpipe[1]); 2181#endif 2182 dtio->commandpipe[1] = -1; 2183#ifndef THREADS_DISABLED 2184 ub_thread_join(dtio->tid); 2185#else 2186 dtio->want_to_exit = 1; 2187 dtio_desetup(dtio); 2188#endif 2189} 2190