dtstream.h (366095) | dtstream.h (368129) |
---|---|
1/* 2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without --- 35 unchanged lines hidden (view full) --- 44#ifndef DTSTREAM_H 45#define DTSTREAM_H 46 47#include "util/locks.h" 48struct dt_msg_entry; 49struct dt_io_list_item; 50struct dt_io_thread; 51struct config_file; | 1/* 2 * dnstap/dtstream.h - Frame Streams thread for unbound DNSTAP 3 * 4 * Copyright (c) 2020, NLnet Labs. All rights reserved. 5 * 6 * This software is open source. 7 * 8 * Redistribution and use in source and binary forms, with or without --- 35 unchanged lines hidden (view full) --- 44#ifndef DTSTREAM_H 45#define DTSTREAM_H 46 47#include "util/locks.h" 48struct dt_msg_entry; 49struct dt_io_list_item; 50struct dt_io_thread; 51struct config_file; |
52struct comm_base; |
|
52 53/** 54 * A message buffer with dnstap messages queued up. It is per-worker. 55 * It has locks to synchronize. If the buffer is full, a new message 56 * cannot be added and is discarded. A thread reads the messages and sends 57 * them. 58 */ 59struct dt_msg_queue { 60 /** lock of the buffer structure. Hold this lock to add or remove 61 * entries to the buffer. Release it so that other threads can also 62 * put messages to log, or a message can be taken out to send away 63 * by the writer thread. 64 */ 65 lock_basic_type lock; 66 /** the maximum size of the buffer, in bytes */ 67 size_t maxsize; 68 /** current size of the buffer, in bytes. data bytes of messages. 69 * If a new message make it more than maxsize, the buffer is full */ 70 size_t cursize; | 53 54/** 55 * A message buffer with dnstap messages queued up. It is per-worker. 56 * It has locks to synchronize. If the buffer is full, a new message 57 * cannot be added and is discarded. A thread reads the messages and sends 58 * them. 59 */ 60struct dt_msg_queue { 61 /** lock of the buffer structure. Hold this lock to add or remove 62 * entries to the buffer. Release it so that other threads can also 63 * put messages to log, or a message can be taken out to send away 64 * by the writer thread. 65 */ 66 lock_basic_type lock; 67 /** the maximum size of the buffer, in bytes */ 68 size_t maxsize; 69 /** current size of the buffer, in bytes. data bytes of messages. 70 * If a new message make it more than maxsize, the buffer is full */ 71 size_t cursize; |
72 /** number of messages in the queue */ 73 int msgcount; |
|
71 /** list of messages. The messages are added to the back and taken 72 * out from the front. */ 73 struct dt_msg_entry* first, *last; 74 /** reference to the io thread to wakeup */ 75 struct dt_io_thread* dtio; | 74 /** list of messages. The messages are added to the back and taken 75 * out from the front. */ 76 struct dt_msg_entry* first, *last; 77 /** reference to the io thread to wakeup */ 78 struct dt_io_thread* dtio; |
79 /** the wakeup timer for dtio, on worker event base */ 80 struct comm_timer* wakeup_timer; |
|
76}; 77 78/** 79 * An entry in the dt_msg_queue. contains one DNSTAP message. 80 * It is malloced. 81 */ 82struct dt_msg_entry { 83 /** next in the list. */ --- 77 unchanged lines hidden (view full) --- 161 /** length of the current message */ 162 size_t cur_msg_len; 163 /** number of bytes written for the current message */ 164 size_t cur_msg_done; 165 /** number of bytes of the length that have been written, 166 * for the current message length that precedes the frame */ 167 size_t cur_msg_len_done; 168 | 81}; 82 83/** 84 * An entry in the dt_msg_queue. contains one DNSTAP message. 85 * It is malloced. 86 */ 87struct dt_msg_entry { 88 /** next in the list. */ --- 77 unchanged lines hidden (view full) --- 166 /** length of the current message */ 167 size_t cur_msg_len; 168 /** number of bytes written for the current message */ 169 size_t cur_msg_done; 170 /** number of bytes of the length that have been written, 171 * for the current message length that precedes the frame */ 172 size_t cur_msg_len_done; 173 |
174 /** lock on wakeup_timer_enabled */ 175 lock_basic_type wakeup_timer_lock; 176 /** if wakeup timer is enabled in some thread */ 177 int wakeup_timer_enabled; |
|
169 /** command pipe that stops the pipe if closed. Used to quit 170 * the program. [0] is read, [1] is written to. */ 171 int commandpipe[2]; 172 /** the event to listen to the commandpipe */ 173 void* command_event; 174 /** the io thread wants to exit */ 175 int want_to_exit; 176 --- 51 unchanged lines hidden (view full) --- 228 /** next in the list of buffers to inspect */ 229 struct dt_io_list_item* next; 230 /** buffer of this worker */ 231 struct dt_msg_queue* queue; 232}; 233 234/** 235 * Create new (empty) worker message queue. Limit set to default on max. | 178 /** command pipe that stops the pipe if closed. Used to quit 179 * the program. [0] is read, [1] is written to. */ 180 int commandpipe[2]; 181 /** the event to listen to the commandpipe */ 182 void* command_event; 183 /** the io thread wants to exit */ 184 int want_to_exit; 185 --- 51 unchanged lines hidden (view full) --- 237 /** next in the list of buffers to inspect */ 238 struct dt_io_list_item* next; 239 /** buffer of this worker */ 240 struct dt_msg_queue* queue; 241}; 242 243/** 244 * Create new (empty) worker message queue. Limit set to default on max. |
245 * @param base: event base for wakeup timer. |
|
236 * @return NULL on malloc failure or a new queue (not locked). 237 */ | 246 * @return NULL on malloc failure or a new queue (not locked). 247 */ |
238struct dt_msg_queue* dt_msg_queue_create(void); | 248struct dt_msg_queue* dt_msg_queue_create(struct comm_base* base); |
239 240/** 241 * Delete a worker message queue. It has to be unlinked from access, 242 * so it can be deleted without lock worries. The queue is emptied (deleted). 243 * @param mq: message queue. 244 */ 245void dt_msg_queue_delete(struct dt_msg_queue* mq); 246 --- 6 unchanged lines hidden (view full) --- 253 * The buffer must have been malloced by caller. It is linked in 254 * the queue, and is free()d after use. If the routine fails 255 * the buffer is freed as well (and nothing happens, the item 256 * could not be logged). 257 * @param len: length of buffer. 258 */ 259void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); 260 | 249 250/** 251 * Delete a worker message queue. It has to be unlinked from access, 252 * so it can be deleted without lock worries. The queue is emptied (deleted). 253 * @param mq: message queue. 254 */ 255void dt_msg_queue_delete(struct dt_msg_queue* mq); 256 --- 6 unchanged lines hidden (view full) --- 263 * The buffer must have been malloced by caller. It is linked in 264 * the queue, and is free()d after use. If the routine fails 265 * the buffer is freed as well (and nothing happens, the item 266 * could not be logged). 267 * @param len: length of buffer. 268 */ 269void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); 270 |
271/** timer callback to wakeup dtio thread to process messages */ 272void mq_wakeup_cb(void* arg); 273 |
|
261/** 262 * Create IO thread. 263 * @return new io thread object. not yet started. or NULL malloc failure. 264 */ 265struct dt_io_thread* dt_io_thread_create(void); 266 267/** 268 * Delete the IO thread structure. --- 73 unchanged lines hidden --- | 274/** 275 * Create IO thread. 276 * @return new io thread object. not yet started. or NULL malloc failure. 277 */ 278struct dt_io_thread* dt_io_thread_create(void); 279 280/** 281 * Delete the IO thread structure. --- 73 unchanged lines hidden --- |