Deleted Added
full compact
dtstream.h (366095) dtstream.h (368129)
1/*
2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 35 unchanged lines hidden (view full) ---

44#ifndef DTSTREAM_H
45#define DTSTREAM_H
46
47#include "util/locks.h"
48struct dt_msg_entry;
49struct dt_io_list_item;
50struct dt_io_thread;
51struct config_file;
1/*
2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP
3 *
4 * Copyright (c) 2020, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 35 unchanged lines hidden (view full) ---

44#ifndef DTSTREAM_H
45#define DTSTREAM_H
46
47#include "util/locks.h"
48struct dt_msg_entry;
49struct dt_io_list_item;
50struct dt_io_thread;
51struct config_file;
52struct comm_base;
52
53/**
54 * A message buffer with dnstap messages queued up. It is per-worker.
55 * It has locks to synchronize. If the buffer is full, a new message
56 * cannot be added and is discarded. A thread reads the messages and sends
57 * them.
58 */
59struct dt_msg_queue {
60 /** lock of the buffer structure. Hold this lock to add or remove
61 * entries to the buffer. Release it so that other threads can also
62 * put messages to log, or a message can be taken out to send away
63 * by the writer thread.
64 */
65 lock_basic_type lock;
66 /** the maximum size of the buffer, in bytes */
67 size_t maxsize;
68 /** current size of the buffer, in bytes. data bytes of messages.
69 * If a new message make it more than maxsize, the buffer is full */
70 size_t cursize;
53
54/**
55 * A message buffer with dnstap messages queued up. It is per-worker.
56 * It has locks to synchronize. If the buffer is full, a new message
57 * cannot be added and is discarded. A thread reads the messages and sends
58 * them.
59 */
60struct dt_msg_queue {
61 /** lock of the buffer structure. Hold this lock to add or remove
62 * entries to the buffer. Release it so that other threads can also
63 * put messages to log, or a message can be taken out to send away
64 * by the writer thread.
65 */
66 lock_basic_type lock;
67 /** the maximum size of the buffer, in bytes */
68 size_t maxsize;
69 /** current size of the buffer, in bytes. data bytes of messages.
70 * If a new message make it more than maxsize, the buffer is full */
71 size_t cursize;
72 /** number of messages in the queue */
73 int msgcount;
71 /** list of messages. The messages are added to the back and taken
72 * out from the front. */
73 struct dt_msg_entry* first, *last;
74 /** reference to the io thread to wakeup */
75 struct dt_io_thread* dtio;
74 /** list of messages. The messages are added to the back and taken
75 * out from the front. */
76 struct dt_msg_entry* first, *last;
77 /** reference to the io thread to wakeup */
78 struct dt_io_thread* dtio;
79 /** the wakeup timer for dtio, on worker event base */
80 struct comm_timer* wakeup_timer;
76};
77
78/**
79 * An entry in the dt_msg_queue. contains one DNSTAP message.
80 * It is malloced.
81 */
82struct dt_msg_entry {
83 /** next in the list. */

--- 77 unchanged lines hidden (view full) ---

161 /** length of the current message */
162 size_t cur_msg_len;
163 /** number of bytes written for the current message */
164 size_t cur_msg_done;
165 /** number of bytes of the length that have been written,
166 * for the current message length that precedes the frame */
167 size_t cur_msg_len_done;
168
81};
82
83/**
84 * An entry in the dt_msg_queue. contains one DNSTAP message.
85 * It is malloced.
86 */
87struct dt_msg_entry {
88 /** next in the list. */

--- 77 unchanged lines hidden (view full) ---

166 /** length of the current message */
167 size_t cur_msg_len;
168 /** number of bytes written for the current message */
169 size_t cur_msg_done;
170 /** number of bytes of the length that have been written,
171 * for the current message length that precedes the frame */
172 size_t cur_msg_len_done;
173
174 /** lock on wakeup_timer_enabled */
175 lock_basic_type wakeup_timer_lock;
176 /** if wakeup timer is enabled in some thread */
177 int wakeup_timer_enabled;
169 /** command pipe that stops the pipe if closed. Used to quit
170 * the program. [0] is read, [1] is written to. */
171 int commandpipe[2];
172 /** the event to listen to the commandpipe */
173 void* command_event;
174 /** the io thread wants to exit */
175 int want_to_exit;
176

--- 51 unchanged lines hidden (view full) ---

228 /** next in the list of buffers to inspect */
229 struct dt_io_list_item* next;
230 /** buffer of this worker */
231 struct dt_msg_queue* queue;
232};
233
234/**
235 * Create new (empty) worker message queue. Limit set to default on max.
178 /** command pipe that stops the pipe if closed. Used to quit
179 * the program. [0] is read, [1] is written to. */
180 int commandpipe[2];
181 /** the event to listen to the commandpipe */
182 void* command_event;
183 /** the io thread wants to exit */
184 int want_to_exit;
185

--- 51 unchanged lines hidden (view full) ---

237 /** next in the list of buffers to inspect */
238 struct dt_io_list_item* next;
239 /** buffer of this worker */
240 struct dt_msg_queue* queue;
241};
242
243/**
244 * Create new (empty) worker message queue. Limit set to default on max.
245 * @param base: event base for wakeup timer.
236 * @return NULL on malloc failure or a new queue (not locked).
237 */
246 * @return NULL on malloc failure or a new queue (not locked).
247 */
238struct dt_msg_queue* dt_msg_queue_create(void);
248struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base);
239
240/**
241 * Delete a worker message queue. It has to be unlinked from access,
242 * so it can be deleted without lock worries. The queue is emptied (deleted).
243 * @param mq: message queue.
244 */
245void dt_msg_queue_delete(struct dt_msg_queue* mq);
246

--- 6 unchanged lines hidden (view full) ---

253 * The buffer must have been malloced by caller. It is linked in
254 * the queue, and is free()d after use. If the routine fails
255 * the buffer is freed as well (and nothing happens, the item
256 * could not be logged).
257 * @param len: length of buffer.
258 */
259void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
260
249
250/**
251 * Delete a worker message queue. It has to be unlinked from access,
252 * so it can be deleted without lock worries. The queue is emptied (deleted).
253 * @param mq: message queue.
254 */
255void dt_msg_queue_delete(struct dt_msg_queue* mq);
256

--- 6 unchanged lines hidden (view full) ---

263 * The buffer must have been malloced by caller. It is linked in
264 * the queue, and is free()d after use. If the routine fails
265 * the buffer is freed as well (and nothing happens, the item
266 * could not be logged).
267 * @param len: length of buffer.
268 */
269void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
270
271/** timer callback to wakeup dtio thread to process messages */
272void mq_wakeup_cb(void* arg);
273
261/**
262 * Create IO thread.
263 * @return new io thread object. not yet started. or NULL malloc failure.
264 */
265struct dt_io_thread* dt_io_thread_create(void);
266
267/**
268 * Delete the IO thread structure.

--- 73 unchanged lines hidden ---
274/**
275 * Create IO thread.
276 * @return new io thread object. not yet started. or NULL malloc failure.
277 */
278struct dt_io_thread* dt_io_thread_create(void);
279
280/**
281 * Delete the IO thread structure.

--- 73 unchanged lines hidden ---