• Home
  • History
  • Annotate
  • Raw
  • Download
  • only in /freebsd-13-stable/contrib/unbound/dnstap/

Lines Matching defs:dtio

87 static void dtio_open_output(struct dt_io_thread* dtio);
89 static int dtio_add_output_event_write(struct dt_io_thread* dtio);
91 static void dtio_reconnect_enable(struct dt_io_thread* dtio);
95 static int dtio_control_start_send(struct dt_io_thread* dtio);
98 static int dtio_enable_brief_read(struct dt_io_thread* dtio);
100 static int dtio_enable_brief_write(struct dt_io_thread* dtio);
148 /** make the dtio wake up by sending a wakeup command */
149 static void dtio_wakeup(struct dt_io_thread* dtio)
152 if(!dtio) return;
153 if(!dtio->started) return;
156 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
179 /* even if the dtio is already active, because perhaps much
183 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
184 mq->dtio->wakeup_timer_enabled = 0;
185 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
186 dtio_wakeup(mq->dtio);
189 /** start timer to wakeup dtio because there is content in the queue */
195 * If we woke up the dtio thread for every message, the wakeup
200 /* we cannot start a timer in dtio thread, because it is a different
207 * dtio */
208 lock_basic_lock(&mq->dtio->wakeup_timer_lock);
209 if(mq->dtio->wakeup_timer_enabled) {
210 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
213 mq->dtio->wakeup_timer_enabled = 1; /* we are going to start one */
214 lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
287 dtio_wakeup(mq->dtio);
295 struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
296 lock_basic_init(&dtio->wakeup_timer_lock);
297 lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
298 sizeof(dtio->wakeup_timer_enabled));
299 return dtio;
302 void dt_io_thread_delete(struct dt_io_thread* dtio)
305 if(!dtio) return;
306 lock_basic_destroy(&dtio->wakeup_timer_lock);
307 item=dtio->io_list;
313 free(dtio->socket_path);
314 free(dtio->ip_str);
315 free(dtio->tls_server_name);
316 free(dtio->client_key_file);
317 free(dtio->client_cert_file);
318 if(dtio->ssl_ctx) {
320 SSL_CTX_free(dtio->ssl_ctx);
323 free(dtio);
326 int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
336 dtio->upstream_is_tls = 1;
337 else dtio->upstream_is_tcp = 1;
339 dtio->upstream_is_unix = 1;
341 dtio->is_bidirectional = cfg->dnstap_bidirectional;
343 if(dtio->upstream_is_unix) {
355 free(dtio->socket_path);
356 dtio->socket_path = strdup(nm);
357 if(!dtio->socket_path) {
363 if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
368 free(dtio->ip_str);
369 dtio->ip_str = strdup(cfg->dnstap_ip);
370 if(!dtio->ip_str) {
376 if(dtio->upstream_is_tls) {
380 free(dtio->tls_server_name);
381 dtio->tls_server_name = strdup(
383 if(!dtio->tls_server_name) {
387 if(!check_auth_name_for_ssl(dtio->tls_server_name))
392 dtio->use_client_certs = 1;
393 free(dtio->client_key_file);
394 dtio->client_key_file = strdup(
396 if(!dtio->client_key_file) {
409 free(dtio->client_cert_file);
410 dtio->client_cert_file = strdup(
412 if(!dtio->client_cert_file) {
417 dtio->use_client_certs = 0;
418 dtio->client_key_file = NULL;
419 dtio->client_cert_file = NULL;
423 dtio->ssl_ctx = connect_sslctx_create(
424 dtio->client_key_file,
425 dtio->client_cert_file,
428 dtio->ssl_ctx = connect_sslctx_create(
429 dtio->client_key_file,
430 dtio->client_cert_file,
433 if(!dtio->ssl_ctx) {
437 dtio->tls_use_sni = cfg->tls_use_sni;
443 int dt_io_thread_register_queue(struct dt_io_thread* dtio,
449 mq->dtio = dtio;
452 item->next = dtio->io_list;
453 dtio->io_list = item;
454 dtio->io_list_iter = NULL;
458 void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
462 if(!dtio) return;
463 item = dtio->io_list;
468 else dtio->io_list = item->next;
471 item->queue->dtio = NULL;
474 dtio->io_list_iter = NULL;
506 static int dtio_find_in_queue(struct dt_io_thread* dtio,
512 dtio->cur_msg = buf;
513 dtio->cur_msg_len = len;
514 dtio->cur_msg_done = 0;
515 dtio->cur_msg_len_done = 0;
522 static int dtio_find_msg(struct dt_io_thread* dtio)
526 spot = dtio->io_list_iter;
530 dtio->io_list_iter = spot->next;
531 else if(dtio->io_list)
532 dtio->io_list_iter = dtio->io_list->next;
537 if(dtio_find_in_queue(dtio, item->queue))
542 item = dtio->io_list;
544 if(dtio_find_in_queue(dtio, item->queue))
555 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
556 dtio->reconnect_is_added = 0;
559 dtio_open_output(dtio);
560 if(dtio->event) {
561 if(!dtio_add_output_event_write(dtio))
567 dtio_reconnect_enable(dtio);
571 static void dtio_reconnect_enable(struct dt_io_thread* dtio)
575 if(dtio->want_to_exit) return;
576 if(dtio->reconnect_is_added)
580 msec = dtio->reconnect_timeout;
582 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
584 dtio->reconnect_timeout = msec*2;
585 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
586 dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
595 if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
596 &dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
600 dtio->reconnect_is_added = 1;
603 /** remove dtio reconnect timer */
604 static void dtio_reconnect_del(struct dt_io_thread* dtio)
606 if(!dtio->reconnect_is_added)
608 ub_timer_del(dtio->reconnect_timer);
609 dtio->reconnect_is_added = 0;
614 static void dtio_reconnect_clear(struct dt_io_thread* dtio)
616 dtio->reconnect_timeout = 0;
617 dtio_reconnect_del(dtio);
621 static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
623 dtio_reconnect_del(dtio);
624 dtio->reconnect_timeout = msec;
625 dtio_reconnect_enable(dtio);
628 /** delete the current message in the dtio, and reset counters */
629 static void dtio_cur_msg_free(struct dt_io_thread* dtio)
631 free(dtio->cur_msg);
632 dtio->cur_msg = NULL;
633 dtio->cur_msg_len = 0;
634 dtio->cur_msg_done = 0;
635 dtio->cur_msg_len_done = 0;
653 static void dtio_del_output_event(struct dt_io_thread* dtio)
655 if(!dtio->event_added)
657 ub_event_del(dtio->event);
658 dtio->event_added = 0;
659 dtio->event_added_is_write = 0;
662 /** close dtio socket and set it to -1 */
663 static void dtio_close_fd(struct dt_io_thread* dtio)
665 sock_close(dtio->fd);
666 dtio->fd = -1;
670 static void dtio_close_output(struct dt_io_thread* dtio)
672 if(!dtio->event)
674 ub_event_free(dtio->event);
675 dtio->event = NULL;
676 if(dtio->ssl) {
678 SSL_shutdown(dtio->ssl);
679 SSL_free(dtio->ssl);
680 dtio->ssl = NULL;
683 dtio_close_fd(dtio);
688 if(dtio->cur_msg) {
689 dtio_cur_msg_free(dtio);
692 dtio->ready_frame_sent = 0;
693 dtio->accept_frame_received = 0;
694 dtio_read_frame_free(&dtio->read_frame);
696 dtio_reconnect_enable(dtio);
701 static int dtio_check_nb_connect(struct dt_io_thread* dtio)
705 if(!dtio->check_nb_connect)
707 if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
724 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
725 dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
730 char* to = dtio->socket_path;
731 if(!to) to = dtio->ip_str;
738 if(dtio->ip_str)
740 dtio->ip_str);
741 else if(dtio->socket_path)
743 dtio->socket_path);
744 dtio_reconnect_clear(dtio);
745 dtio->check_nb_connect = 0;
753 static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
758 r = SSL_write(dtio->ssl, buf, len);
760 int want = SSL_get_error(dtio->ssl, r);
766 dtio_enable_brief_read(dtio);
796 static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
800 if(dtio->fd == -1)
803 if(dtio->ssl)
804 return dtio_write_ssl(dtio, buf, len);
806 ret = send(dtio->fd, (void*)buf, len, 0);
815 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
816 dtio->stop_flush_event:dtio->event),
830 static int dtio_write_with_writev(struct dt_io_thread* dtio)
832 uint32_t sendlen = htonl(dtio->cur_msg_len);
835 iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
836 iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
837 iov[1].iov_base = dtio->cur_msg;
838 iov[1].iov_len = dtio->cur_msg_len;
840 r = writev(dtio->fd, iov, 2);
849 ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
850 dtio->stop_flush_event:dtio->event),
857 dtio_del_output_event(dtio);
858 dtio_close_output(dtio);
862 dtio->cur_msg_len_done += r;
863 if(dtio->cur_msg_len_done < 4)
865 if(dtio->cur_msg_len_done > 4) {
866 dtio->cur_msg_done = dtio->cur_msg_len_done-4;
867 dtio->cur_msg_len_done = 4;
869 if(dtio->cur_msg_done < dtio->cur_msg_len)
877 static int dtio_write_more_of_len(struct dt_io_thread* dtio)
881 if(dtio->cur_msg_len_done >= 4)
884 if(!dtio->ssl) {
886 return dtio_write_with_writev(dtio);
889 sendlen = htonl(dtio->cur_msg_len);
890 r = dtio_write_buf(dtio,
891 ((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
892 sizeof(sendlen)-dtio->cur_msg_len_done);
895 dtio_del_output_event(dtio);
896 dtio_close_output(dtio);
902 dtio->cur_msg_len_done += r;
903 if(dtio->cur_msg_len_done < 4)
910 static int dtio_write_more_of_data(struct dt_io_thread* dtio)
913 if(dtio->cur_msg_done >= dtio->cur_msg_len)
915 r = dtio_write_buf(dtio,
916 ((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
917 dtio->cur_msg_len - dtio->cur_msg_done);
920 dtio_del_output_event(dtio);
921 dtio_close_output(dtio);
927 dtio->cur_msg_done += r;
928 if(dtio->cur_msg_done < dtio->cur_msg_len)
935 static int dtio_write_more(struct dt_io_thread* dtio)
937 if(dtio->cur_msg_len_done < 4) {
938 if(!dtio_write_more_of_len(dtio))
941 if(dtio->cur_msg_done < dtio->cur_msg_len) {
942 if(!dtio_write_more_of_data(dtio))
948 /** Receive bytes from dtio->fd, store in buffer. Returns 0: closed,
950 static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
952 r = recv(dtio->fd, (void*)buf, len, 0);
954 char* to = dtio->socket_path;
955 if(!to) to = dtio->ip_str;
965 (dtio->stop_flush_event?
966 dtio->stop_flush_event:dtio->event),
971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
980 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
992 /** Receive bytes over TLS from dtio->fd, store in buffer. Returns 0: closed,
994 static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
998 r = SSL_read(dtio->ssl, buf, len);
1000 int want = SSL_get_error(dtio->ssl, r);
1002 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1012 (void)dtio_enable_brief_write(dtio);
1016 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1038 static int dtio_check_close(struct dt_io_thread* dtio)
1049 if(dtio->fd == -1) return 0;
1053 r = receive_bytes(dtio, (void*)buf, sizeof(buf));
1059 dtio_del_output_event(dtio);
1060 dtio_close_output(dtio);
1066 static int dtio_read_accept_frame(struct dt_io_thread* dtio)
1070 while(dtio->read_frame.frame_len_done < 4) {
1072 if(dtio->ssl) {
1073 r = ssl_read_bytes(dtio,
1074 (uint8_t*)&dtio->read_frame.frame_len+
1075 dtio->read_frame.frame_len_done,
1076 4-dtio->read_frame.frame_len_done);
1079 r = receive_bytes(dtio,
1080 (uint8_t*)&dtio->read_frame.frame_len+
1081 dtio->read_frame.frame_len_done,
1082 4-dtio->read_frame.frame_len_done);
1092 dtio->read_frame.frame_len_done += r;
1093 if(dtio->read_frame.frame_len_done < 4)
1096 if(dtio->read_frame.frame_len == 0) {
1097 dtio->read_frame.frame_len_done = 0;
1098 dtio->read_frame.control_frame = 1;
1101 dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
1102 if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
1108 dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
1109 dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
1110 if(!dtio->read_frame.buf) {
1116 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
1118 if(dtio->ssl) {
1119 r = ssl_read_bytes(dtio, dtio->read_frame.buf+
1120 dtio->read_frame.buf_count,
1121 dtio->read_frame.buf_cap-
1122 dtio->read_frame.buf_count);
1125 r = receive_bytes(dtio, dtio->read_frame.buf+
1126 dtio->read_frame.buf_count,
1127 dtio->read_frame.buf_cap-
1128 dtio->read_frame.buf_count);
1138 dtio->read_frame.buf_count += r;
1139 if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
1145 if(dtio->read_frame.frame_len < 4) {
1149 if(sldns_read_uint32(dtio->read_frame.buf) !=
1153 dtio->ready_frame_sent = 0;
1154 dtio->accept_frame_received = 0;
1155 dtio_read_frame_free(&dtio->read_frame);
1163 while(read_frame_done+8 < dtio->read_frame.frame_len) {
1164 uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
1166 uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
1171 dtio->read_frame.frame_len &&
1172 memcmp(dtio->read_frame.buf + read_frame_done +
1174 if(!dtio_control_start_send(dtio)) {
1179 dtio->accept_frame_received = 1;
1180 if(!dtio_add_output_event_write(dtio))
1197 dtio_del_output_event(dtio);
1198 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1199 dtio_close_output(dtio);
1204 static int dtio_add_output_event_read(struct dt_io_thread* dtio)
1206 if(!dtio->event)
1208 if(dtio->event_added && !dtio->event_added_is_write)
1211 if(dtio->event_added)
1212 ub_event_del(dtio->event);
1213 ub_event_del_bits(dtio->event, UB_EV_WRITE);
1214 if(ub_event_add(dtio->event, NULL) != 0) {
1216 dtio->event_added = 0;
1217 dtio->event_added_is_write = 0;
1219 dtio_close_output(dtio);
1222 dtio->event_added = 1;
1223 dtio->event_added_is_write = 0;
1228 static int dtio_add_output_event_write(struct dt_io_thread* dtio)
1230 if(!dtio->event)
1232 if(dtio->event_added && dtio->event_added_is_write)
1235 if(dtio->event_added)
1236 ub_event_del(dtio->event);
1237 ub_event_add_bits(dtio->event, UB_EV_WRITE);
1238 if(ub_event_add(dtio->event, NULL) != 0) {
1240 dtio->event_added = 0;
1241 dtio->event_added_is_write = 0;
1243 dtio_close_output(dtio);
1246 dtio->event_added = 1;
1247 dtio->event_added_is_write = 1;
1251 /** put the dtio thread to sleep */
1252 static void dtio_sleep(struct dt_io_thread* dtio)
1256 (void)dtio_add_output_event_read(dtio);
1261 static int dtio_enable_brief_read(struct dt_io_thread* dtio)
1263 dtio->ssl_brief_read = 1;
1264 if(dtio->stop_flush_event) {
1265 ub_event_del(dtio->stop_flush_event);
1266 ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
1267 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1273 return dtio_add_output_event_read(dtio);
1279 static int dtio_disable_brief_read(struct dt_io_thread* dtio)
1281 dtio->ssl_brief_read = 0;
1282 if(dtio->stop_flush_event) {
1283 ub_event_del(dtio->stop_flush_event);
1284 ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
1285 if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
1291 return dtio_add_output_event_write(dtio);
1297 static int dtio_enable_brief_write(struct dt_io_thread* dtio)
1299 dtio->ssl_brief_write = 1;
1300 return dtio_add_output_event_write(dtio);
1306 static int dtio_disable_brief_write(struct dt_io_thread* dtio)
1308 dtio->ssl_brief_write = 0;
1309 return dtio_add_output_event_read(dtio);
1315 static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
1317 if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
1319 if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
1320 X509* x = SSL_get_peer_certificate(dtio->ssl);
1324 dtio->ip_str);
1330 if(SSL_get0_peername(dtio->ssl)) {
1333 dtio->ip_str,
1334 SSL_get0_peername(dtio->ssl));
1339 dtio->ip_str);
1345 X509* x = SSL_get_peer_certificate(dtio->ssl);
1353 dtio->ip_str);
1360 dtio->ip_str);
1368 static int dtio_ssl_handshake(struct dt_io_thread* dtio,
1372 if(dtio->ssl_brief_read) {
1375 if(!dtio_disable_brief_read(dtio)) {
1380 if(dtio->ssl_handshake_done)
1384 r = SSL_do_handshake(dtio->ssl);
1386 int want = SSL_get_error(dtio->ssl, r);
1389 if(!dtio_enable_brief_read(dtio)) {
1400 dtio_del_output_event(dtio);
1401 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1402 dtio_close_output(dtio);
1422 dtio_del_output_event(dtio);
1423 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1424 dtio_close_output(dtio);
1432 "from %s", dtio->ip_str);
1436 dtio_del_output_event(dtio);
1437 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1438 dtio_close_output(dtio);
1444 dtio->ssl_handshake_done = 1;
1446 if(!dtio_ssl_check_peer(dtio)) {
1449 dtio_del_output_event(dtio);
1450 dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
1451 dtio_close_output(dtio);
1461 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1464 if(dtio->check_nb_connect) {
1465 int connect_err = dtio_check_nb_connect(dtio);
1468 dtio_del_output_event(dtio);
1469 dtio_close_output(dtio);
1479 if(dtio->ssl &&
1480 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1481 if(!dtio_ssl_handshake(dtio, NULL))
1486 if((bits&UB_EV_READ || dtio->ssl_brief_write)) {
1487 if(dtio->ssl_brief_write)
1488 (void)dtio_disable_brief_write(dtio);
1489 if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
1490 if(dtio_read_accept_frame(dtio) <= 0)
1492 } else if(!dtio_check_close(dtio))
1505 if(!dtio->cur_msg) {
1506 if(!dtio_find_msg(dtio)) {
1510 dtio_sleep(dtio);
1517 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1518 if(!dtio_write_more(dtio))
1523 dtio_cur_msg_free(dtio);
1528 if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
1529 dtio->ready_frame_sent = 1;
1530 (void)dtio_add_output_event_read(dtio);
1539 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
1542 if(dtio->want_to_exit)
1564 if(dtio->is_bidirectional && !dtio->accept_frame_received) {
1571 if(!dtio_add_output_event_write(dtio))
1577 dtio->want_to_exit = 1;
1578 if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
1586 static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
1590 dtio->event_base = ub_default_event_base(0, secs, now);
1591 if(!dtio->event_base) {
1598 static void dtio_setup_cmd(struct dt_io_thread* dtio)
1601 fd_set_nonblock(dtio->commandpipe[0]);
1602 cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
1603 UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
1607 dtio->command_event = cmdev;
1614 static void dtio_setup_reconnect(struct dt_io_thread* dtio)
1616 dtio_reconnect_clear(dtio);
1617 dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
1618 UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
1619 if(!dtio->reconnect_timer) {
1634 /** the dtio */
1635 struct dt_io_thread* dtio;
1659 struct dt_io_thread* dtio = info->dtio;
1663 r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
1695 struct dt_io_thread* dtio = info->dtio;
1698 if(dtio->check_nb_connect) {
1701 int connect_err = dtio_check_nb_connect(dtio);
1705 dtio_del_output_event(dtio);
1706 dtio_close_output(dtio);
1715 if(dtio->ssl &&
1716 (!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
1717 if(!dtio_ssl_handshake(dtio, info))
1723 if(!dtio_check_close(dtio)) {
1724 if(dtio->fd == -1) {
1733 if(dtio->cur_msg) {
1734 if(dtio->cur_msg_done < dtio->cur_msg_len) {
1735 if(!dtio_write_more(dtio)) {
1736 if(dtio->fd == -1) {
1746 dtio_cur_msg_free(dtio);
1760 static void dtio_control_stop_flush(struct dt_io_thread* dtio)
1770 if(dtio->fd == -1 || dtio->check_nb_connect) {
1775 if(dtio->ssl && !dtio->ssl_handshake_done) {
1782 info.dtio = dtio;
1804 stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
1832 dtio->stop_flush_event = stopev;
1842 dtio->stop_flush_event = NULL;
1851 static void dtio_desetup(struct dt_io_thread* dtio)
1853 dtio_control_stop_flush(dtio);
1854 dtio_del_output_event(dtio);
1855 dtio_close_output(dtio);
1856 ub_event_del(dtio->command_event);
1857 ub_event_free(dtio->command_event);
1859 close(dtio->commandpipe[0]);
1861 _close(dtio->commandpipe[0]);
1863 dtio->commandpipe[0] = -1;
1864 dtio_reconnect_del(dtio);
1865 ub_event_free(dtio->reconnect_timer);
1866 dtio_cur_msg_free(dtio);
1868 ub_event_base_free(dtio->event_base);
1873 static int dtio_control_start_send(struct dt_io_thread* dtio)
1875 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1876 dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
1877 &dtio->cur_msg_len);
1878 if(!dtio->cur_msg) {
1886 dtio->cur_msg_done = 0;
1887 dtio->cur_msg_len_done = 4;
1892 static int dtio_control_ready_send(struct dt_io_thread* dtio)
1894 log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
1895 dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
1896 &dtio->cur_msg_len);
1897 if(!dtio->cur_msg) {
1905 dtio->cur_msg_done = 0;
1906 dtio->cur_msg_len_done = 4;
1911 static int dtio_open_output_local(struct dt_io_thread* dtio)
1915 dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
1916 if(dtio->fd == -1) {
1928 (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
1929 fd_set_nonblock(dtio->fd);
1930 if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
1932 char* to = dtio->socket_path;
1933 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1935 dtio_close_fd(dtio);
1940 dtio_close_fd(dtio);
1951 static int dtio_open_output_tcp(struct dt_io_thread* dtio)
1958 if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen)) {
1959 log_err("could not parse IP '%s'", dtio->ip_str);
1962 dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
1963 if(dtio->fd == -1) {
1967 fd_set_nonblock(dtio->fd);
1968 if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
1971 if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
1973 dtio_close_fd(dtio);
1980 dtio->ip_str, strerror(errno));
1989 dtio->ip_str, wsa_strerror(WSAGetLastError()));
1992 dtio_close_fd(dtio);
1999 static int dtio_setup_ssl(struct dt_io_thread* dtio)
2001 dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
2002 if(!dtio->ssl) return 0;
2003 dtio->ssl_handshake_done = 0;
2004 dtio->ssl_brief_read = 0;
2006 if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
2007 dtio->tls_use_sni)) {
2014 static void dtio_open_output(struct dt_io_thread* dtio)
2017 if(dtio->upstream_is_unix) {
2018 if(!dtio_open_output_local(dtio)) {
2019 dtio_reconnect_enable(dtio);
2022 } else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
2023 if(!dtio_open_output_tcp(dtio)) {
2024 dtio_reconnect_enable(dtio);
2027 if(dtio->upstream_is_tls) {
2028 if(!dtio_setup_ssl(dtio)) {
2029 dtio_close_fd(dtio);
2030 dtio_reconnect_enable(dtio);
2035 dtio->check_nb_connect = 1;
2039 ev = ub_event_new(dtio->event_base, dtio->fd,
2041 dtio);
2044 if(dtio->ssl) {
2046 SSL_free(dtio->ssl);
2047 dtio->ssl = NULL;
2050 dtio_close_fd(dtio);
2051 dtio_reconnect_enable(dtio);
2054 dtio->event = ev;
2057 if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
2058 (dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
2060 ub_event_free(dtio->event);
2061 dtio->event = NULL;
2062 if(dtio->ssl) {
2064 SSL_free(dtio->ssl);
2065 dtio->ssl = NULL;
2068 dtio_close_fd(dtio);
2069 dtio_reconnect_enable(dtio);
2075 static void dtio_setup_on_base(struct dt_io_thread* dtio)
2077 dtio_setup_cmd(dtio);
2078 dtio_setup_reconnect(dtio);
2079 dtio_open_output(dtio);
2080 if(!dtio_add_output_event_write(dtio))
2088 struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
2091 log_thread_set(&dtio->threadnum);
2095 dtio_setup_base(dtio, &secs, &now);
2096 dtio_setup_on_base(dtio);
2099 if(ub_event_base_dispatch(dtio->event_base) < 0) {
2106 dtio_desetup(dtio);
2111 int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
2116 if(pipe(dtio->commandpipe) == -1) {
2121 if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
2129 dtio->threadnum = numworkers+1;
2130 dtio->started = 1;
2132 ub_thread_create(&dtio->tid, dnstap_io, dtio);
2135 dtio->event_base = event_base_nothr;
2136 dtio_setup_on_base(dtio);
2141 void dt_io_thread_stop(struct dt_io_thread* dtio)
2146 if(!dtio) return;
2147 if(!dtio->started) return;
2152 ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
2169 dtio->started = 0;
2173 close(dtio->commandpipe[1]);
2175 _close(dtio->commandpipe[1]);
2177 dtio->commandpipe[1] = -1;
2179 ub_thread_join(dtio->tid);
2181 dtio->want_to_exit = 1;
2182 dtio_desetup(dtio);