dtstream.h revision 368129
1/*
2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37/**
38 * \file
39 *
40 * An implementation of the Frame Streams data transport protocol for
41 * the Unbound DNSTAP message logging facility.
42 */
43
44#ifndef DTSTREAM_H
45#define DTSTREAM_H
46
47#include "util/locks.h"
48struct dt_msg_entry;
49struct dt_io_list_item;
50struct dt_io_thread;
51struct config_file;
52struct comm_base;
53
54/**
55 * A message buffer with dnstap messages queued up.  It is per-worker.
56 * It has locks to synchronize.  If the buffer is full, a new message
57 * cannot be added and is discarded.  A thread reads the messages and sends
58 * them.
59 */
60struct dt_msg_queue {
61	/** lock of the buffer structure.  Hold this lock to add or remove
62	 * entries to the buffer.  Release it so that other threads can also
63	 * put messages to log, or a message can be taken out to send away
64	 * by the writer thread.
65	 */
66	lock_basic_type lock;
67	/** the maximum size of the buffer, in bytes */
68	size_t maxsize;
69	/** current size of the buffer, in bytes.  data bytes of messages.
70	 * If a new message make it more than maxsize, the buffer is full */
71	size_t cursize;
72	/** number of messages in the queue */
73	int msgcount;
74	/** list of messages.  The messages are added to the back and taken
75	 * out from the front. */
76	struct dt_msg_entry* first, *last;
77	/** reference to the io thread to wakeup */
78	struct dt_io_thread* dtio;
79	/** the wakeup timer for dtio, on worker event base */
80	struct comm_timer* wakeup_timer;
81};
82
83/**
84 * An entry in the dt_msg_queue. contains one DNSTAP message.
85 * It is malloced.
86 */
87struct dt_msg_entry {
88	/** next in the list. */
89	struct dt_msg_entry* next;
90	/** the buffer with the data to send, an encoded DNSTAP message */
91	void* buf;
92	/** the length to send. */
93	size_t len;
94};
95
96/**
97 * Containing buffer and counter for reading DNSTAP frames.
98 */
99struct dt_frame_read_buf {
100	/** Buffer containing frame, except length counter(s). */
101	void* buf;
102	/** Number of bytes written to buffer. */
103	size_t buf_count;
104	/** Capacity of the buffer. */
105	size_t buf_cap;
106
107	/** Frame length field. Will contain the 2nd length field for control
108	 * frames. */
109	uint32_t frame_len;
110	/** Number of bytes that have been written to the frame_length field. */
111	size_t frame_len_done;
112
113	/** Set to 1 if this is a control frame, 0 otherwise (ie data frame). */
114	int control_frame;
115};
116
117/**
118 * IO thread that reads from the queues and writes them.
119 */
120struct dt_io_thread {
121	/** the thread number for the dtio thread,
122	 * must be first to cast thread arg to int* in checklock code. */
123	int threadnum;
124	/** event base, for event handling */
125	void* event_base;
126	/** list of queues that is registered to get written */
127	struct dt_io_list_item* io_list;
128	/** iterator point in the io_list, to pick from them in a
129	 * round-robin fashion, instead of only from the first when busy.
130	 * if NULL it means start at the start of the list. */
131	struct dt_io_list_item* io_list_iter;
132	/** thread id, of the io thread */
133	ub_thread_type tid;
134	/** if the io processing has started */
135	int started;
136	/** ssl context for the io thread, for tls connections. type SSL_CTX* */
137	void* ssl_ctx;
138	/** if SNI will be used for TLS connections. */
139	int tls_use_sni;
140
141	/** file descriptor that the thread writes to */
142	int fd;
143	/** event structure that the thread uses */
144	void* event;
145	/** the event is added */
146	int event_added;
147	/** event added is a write event */
148	int event_added_is_write;
149	/** check for nonblocking connect errors on fd */
150	int check_nb_connect;
151	/** ssl for current connection, type SSL* */
152	void* ssl;
153	/** true if the handshake for SSL is done, 0 if not */
154	int ssl_handshake_done;
155	/** true if briefly the SSL wants a read event, 0 if not.
156	 * This happens during negotiation, we then do not want to write,
157	 * but wait for a read event. */
158	int ssl_brief_read;
159	/** true if SSL_read is waiting for a write event. Set back to 0 after
160	 * single write event is handled. */
161	int ssl_brief_write;
162
163	/** the buffer that currently getting written, or NULL if no
164	 * (partial) message written now */
165	void* cur_msg;
166	/** length of the current message */
167	size_t cur_msg_len;
168	/** number of bytes written for the current message */
169	size_t cur_msg_done;
170	/** number of bytes of the length that have been written,
171	 * for the current message length that precedes the frame */
172	size_t cur_msg_len_done;
173
174	/** lock on wakeup_timer_enabled */
175	lock_basic_type wakeup_timer_lock;
176	/** if wakeup timer is enabled in some thread */
177	int wakeup_timer_enabled;
178	/** command pipe that stops the pipe if closed.  Used to quit
179	 * the program. [0] is read, [1] is written to. */
180	int commandpipe[2];
181	/** the event to listen to the commandpipe */
182	void* command_event;
183	/** the io thread wants to exit */
184	int want_to_exit;
185
186	/** in stop flush, this is nonNULL and references the stop_ev */
187	void* stop_flush_event;
188
189	/** the timer event for connection retries */
190	void* reconnect_timer;
191	/** if the reconnect timer is added to the event base */
192	int reconnect_is_added;
193	/** the current reconnection timeout, it is increased with
194	 * exponential backoff, in msec */
195	int reconnect_timeout;
196
197	/** If the log server is connected to over unix domain sockets,
198	 * eg. a file is named that is created to log onto. */
199	int upstream_is_unix;
200	/** if the log server is connected to over TCP.  The ip address and
201	 * port are used */
202	int upstream_is_tcp;
203	/** if the log server is connected to over TLS.  ip address, port,
204	 * and client certificates can be used for authentication. */
205	int upstream_is_tls;
206
207	/** Perform bidirectional Frame Streams handshake before sending
208	 * messages. */
209	int is_bidirectional;
210	/** Set if the READY control frame has been sent. */
211	int ready_frame_sent;
212	/** Set if valid ACCEPT frame is received. */
213	int accept_frame_received;
214	/** (partially) read frame */
215	struct dt_frame_read_buf read_frame;
216
217	/** the file path for unix socket (or NULL) */
218	char* socket_path;
219	/** the ip address and port number (or NULL) */
220	char* ip_str;
221	/** is the TLS upstream authenticated by name, if nonNULL,
222	 * we use the same cert bundle as used by other TLS streams. */
223	char* tls_server_name;
224	/** are client certificates in use */
225	int use_client_certs;
226	/** client cert files: the .key file */
227	char* client_key_file;
228	/** client cert files: the .pem file */
229	char* client_cert_file;
230};
231
232/**
233 * IO thread list of queues list item
234 * lists a worker queue that should be looked at and sent to the log server.
235 */
236struct dt_io_list_item {
237	/** next in the list of buffers to inspect */
238	struct dt_io_list_item* next;
239	/** buffer of this worker */
240	struct dt_msg_queue* queue;
241};
242
243/**
244 * Create new (empty) worker message queue. Limit set to default on max.
245 * @param base: event base for wakeup timer.
246 * @return NULL on malloc failure or a new queue (not locked).
247 */
248struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
249
250/**
251 * Delete a worker message queue.  It has to be unlinked from access,
252 * so it can be deleted without lock worries.  The queue is emptied (deleted).
253 * @param mq: message queue.
254 */
255void dt_msg_queue_delete(struct dt_msg_queue* mq);
256
257/**
258 * Submit a message to the queue.  The queue is locked by the routine,
259 * the message is inserted, and then the queue is unlocked so the
260 * message can be picked up by the writer thread.
261 * @param mq: message queue.
262 * @param buf: buffer with message (dnstap contents).
263 * 	The buffer must have been malloced by caller.  It is linked in
264 * 	the queue, and is free()d after use.  If the routine fails
265 * 	the buffer is freed as well (and nothing happens, the item
266 * 	could not be logged).
267 * @param len: length of buffer.
268 */
269void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
270
271/** timer callback to wakeup dtio thread to process messages */
272void mq_wakeup_cb(void* arg);
273
274/**
275 * Create IO thread.
276 * @return new io thread object. not yet started. or NULL malloc failure.
277 */
278struct dt_io_thread* dt_io_thread_create(void);
279
280/**
281 * Delete the IO thread structure.
282 * @param dtio: the io thread that is deleted.  It must not be running.
283 */
284void dt_io_thread_delete(struct dt_io_thread* dtio);
285
286/**
287 * Apply config to the dtio thread
288 * @param dtio: io thread, not yet started.
289 * @param cfg: config file struct.
290 * @return false on malloc failure.
291 */
292int dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
293	struct config_file *cfg);
294
295/**
296 * Register a msg queue to the io thread.  It will be polled to see if
297 * there are messages and those then get removed and sent, when the thread
298 * is running.
299 * @param dtio: the io thread.
300 * @param mq: message queue to register.
301 * @return false on failure (malloc failure).
302 */
303int dt_io_thread_register_queue(struct dt_io_thread* dtio,
304	struct dt_msg_queue* mq);
305
306/**
307 * Unregister queue from io thread.
308 * @param dtio: the io thread.
309 * @param mq: message queue.
310 */
311void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
312        struct dt_msg_queue* mq);
313
314/**
315 * Start the io thread
316 * @param dtio: the io thread.
317 * @param event_base_nothr: the event base to attach the events to, in case
318 * 	we are running without threads.  With threads, this is ignored
319 * 	and a thread is started to process the dnstap log messages.
320 * @param numworkers: number of worker threads.  The dnstap io thread is
321 * 	that number +1 as the threadnumber (in logs).
322 * @return false on failure.
323 */
324int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
325	int numworkers);
326
327/**
328 * Stop the io thread
329 * @param dtio: the io thread.
330 */
331void dt_io_thread_stop(struct dt_io_thread* dtio);
332
333/** callback for the dnstap reconnect, to start reconnecting to output */
334void dtio_reconnect_timeout_cb(int fd, short bits, void* arg);
335
336/** callback for the dnstap events, to write to the output */
337void dtio_output_cb(int fd, short bits, void* arg);
338
339/** callback for the dnstap commandpipe, to stop the dnstap IO */
340void dtio_cmd_cb(int fd, short bits, void* arg);
341
342/** callback for the timer when the thread stops and wants to finish up */
343void dtio_stop_timer_cb(int fd, short bits, void* arg);
344
345/** callback for the output when the thread stops and wants to finish up */
346void dtio_stop_ev_cb(int fd, short bits, void* arg);
347
348/** callback for unbound-dnstap-socket */
349void dtio_tap_callback(int fd, short bits, void* arg);
350
351/** callback for unbound-dnstap-socket */
352void dtio_mainfdcallback(int fd, short bits, void* arg);
353
354#endif /* DTSTREAM_H */
355