sender.c revision 243730
1/*-
2 * Copyright (c) 2012 The FreeBSD Foundation
3 * All rights reserved.
4 *
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#1 $
30 */
31
32#include "config.h"
33
34#include <sys/param.h>
35#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36#include <sys/endian.h>
37#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38#ifdef HAVE_MACHINE_ENDIAN_H
39#include <machine/endian.h>
40#else /* !HAVE_MACHINE_ENDIAN_H */
41#ifdef HAVE_ENDIAN_H
42#include <endian.h>
43#else /* !HAVE_ENDIAN_H */
44#error "No supported endian.h"
45#endif /* !HAVE_ENDIAN_H */
46#endif /* !HAVE_MACHINE_ENDIAN_H */
47#include <compat/endian.h>
48#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49#include <sys/queue.h>
50#include <sys/stat.h>
51#include <sys/wait.h>
52
53#include <stdio.h>
54#include <stdlib.h>
55#include <unistd.h>
56
57#include <ctype.h>
58#include <dirent.h>
59#include <err.h>
60#include <errno.h>
61#include <fcntl.h>
62#ifdef HAVE_LIBUTIL_H
63#include <libutil.h>
64#endif
65#include <signal.h>
66#include <string.h>
67#include <strings.h>
68
69#include <openssl/hmac.h>
70
71#ifndef HAVE_SIGTIMEDWAIT
72#include "sigtimedwait.h"
73#endif
74
75#include <pjdlog.h>
76
77#include "auditdistd.h"
78#include "proto.h"
79#include "sandbox.h"
80#include "subr.h"
81#include "synch.h"
82#include "trail.h"
83
84static struct adist_config *adcfg;
85static struct adist_host *adhost;
86
87static pthread_rwlock_t adist_remote_lock;
88static pthread_mutex_t adist_remote_mtx;
89static pthread_cond_t adist_remote_cond;
90static struct trail *adist_trail;
91
92static TAILQ_HEAD(, adreq) adist_free_list;
93static pthread_mutex_t adist_free_list_lock;
94static pthread_cond_t adist_free_list_cond;
95static TAILQ_HEAD(, adreq) adist_send_list;
96static pthread_mutex_t adist_send_list_lock;
97static pthread_cond_t adist_send_list_cond;
98static TAILQ_HEAD(, adreq) adist_recv_list;
99static pthread_mutex_t adist_recv_list_lock;
100static pthread_cond_t adist_recv_list_cond;
101
102static void
103init_environment(void)
104{
105	struct adreq *adreq;
106	unsigned int ii;
107
108	rw_init(&adist_remote_lock);
109	mtx_init(&adist_remote_mtx);
110	cv_init(&adist_remote_cond);
111	TAILQ_INIT(&adist_free_list);
112	mtx_init(&adist_free_list_lock);
113	cv_init(&adist_free_list_cond);
114	TAILQ_INIT(&adist_send_list);
115	mtx_init(&adist_send_list_lock);
116	cv_init(&adist_send_list_cond);
117	TAILQ_INIT(&adist_recv_list);
118	mtx_init(&adist_recv_list_lock);
119	cv_init(&adist_recv_list_cond);
120
121	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
122		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
123		if (adreq == NULL) {
124			pjdlog_exitx(EX_TEMPFAIL,
125			    "Unable to allocate %zu bytes of memory for adreq object.",
126			    sizeof(*adreq) + ADIST_BUF_SIZE);
127		}
128		adreq->adr_byteorder = ADIST_BYTEORDER;
129		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
130		adreq->adr_seq = 0;
131		adreq->adr_datasize = 0;
132		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
133	}
134}
135
136static int
137sender_connect(void)
138{
139	unsigned char rnd[32], hash[32], resp[32];
140	struct proto_conn *conn;
141	char welcome[8];
142	int16_t val;
143
144	val = 1;
145	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
146		pjdlog_exit(EX_TEMPFAIL,
147		    "Unable to send connection request to parent");
148	}
149	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
150		pjdlog_exit(EX_TEMPFAIL,
151		    "Unable to receive reply to connection request from parent");
152	}
153	if (val != 0) {
154		errno = val;
155		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
156		    adhost->adh_remoteaddr);
157		return (-1);
158	}
159	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
160		pjdlog_exit(EX_TEMPFAIL,
161		    "Unable to receive connection from parent");
162	}
163	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
164		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
165		    adhost->adh_remoteaddr);
166		proto_close(conn);
167		return (-1);
168	}
169	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
170	/* Error in setting timeout is not critical, but why should it fail? */
171	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
172		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
173	else
174		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
175
176	/* Exchange welcome message, which includes version number. */
177	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
178	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
179		pjdlog_errno(LOG_WARNING,
180		    "Unable to send welcome message to %s",
181		    adhost->adh_remoteaddr);
182		proto_close(conn);
183		return (-1);
184	}
185	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
186	bzero(welcome, sizeof(welcome));
187	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
188		pjdlog_errno(LOG_WARNING,
189		    "Unable to receive welcome message from %s",
190		    adhost->adh_remoteaddr);
191		proto_close(conn);
192		return (-1);
193	}
194	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
195	    !isdigit(welcome[6]) || welcome[7] != '\0') {
196		pjdlog_warning("Invalid welcome message from %s.",
197		    adhost->adh_remoteaddr);
198		proto_close(conn);
199		return (-1);
200	}
201	pjdlog_debug(1, "Welcome message received (%s).", welcome);
202	/*
203	 * Receiver can only reply with version number lower or equal to
204	 * the one we sent.
205	 */
206	adhost->adh_version = atoi(welcome + 5);
207	if (adhost->adh_version > ADIST_VERSION) {
208		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
209		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
210		proto_close(conn);
211		return (-1);
212	}
213
214	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
215	    adhost->adh_remoteaddr);
216
217	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
218		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
219		    adhost->adh_remoteaddr);
220		proto_close(conn);
221		return (-1);
222	}
223	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
224
225	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
226		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
227		    adhost->adh_remoteaddr);
228		proto_close(conn);
229		return (-1);
230	}
231	pjdlog_debug(1, "Challenge received.");
232
233	if (HMAC(EVP_sha256(), adhost->adh_password,
234	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
235	    NULL) == NULL) {
236		pjdlog_warning("Unable to generate response.");
237		proto_close(conn);
238		return (-1);
239	}
240	pjdlog_debug(1, "Response generated.");
241
242	if (proto_send(conn, hash, sizeof(hash)) == -1) {
243		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
244		    adhost->adh_remoteaddr);
245		proto_close(conn);
246		return (-1);
247	}
248	pjdlog_debug(1, "Response sent.");
249
250	if (adist_random(rnd, sizeof(rnd)) == -1) {
251		pjdlog_warning("Unable to generate challenge.");
252		proto_close(conn);
253		return (-1);
254	}
255	pjdlog_debug(1, "Challenge generated.");
256
257	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
258		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
259		    adhost->adh_remoteaddr);
260		proto_close(conn);
261		return (-1);
262	}
263	pjdlog_debug(1, "Challenge sent.");
264
265	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
266		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
267		    adhost->adh_remoteaddr);
268		proto_close(conn);
269		return (-1);
270	}
271	pjdlog_debug(1, "Response received.");
272
273	if (HMAC(EVP_sha256(), adhost->adh_password,
274	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
275	    NULL) == NULL) {
276		pjdlog_warning("Unable to generate hash.");
277		proto_close(conn);
278		return (-1);
279	}
280	pjdlog_debug(1, "Hash generated.");
281
282	if (memcmp(resp, hash, sizeof(hash)) != 0) {
283		pjdlog_warning("Invalid response from %s (wrong password?).",
284		    adhost->adh_remoteaddr);
285		proto_close(conn);
286		return (-1);
287	}
288	pjdlog_info("Receiver authenticated.");
289
290	if (proto_recv(conn, &adhost->adh_trail_offset,
291	    sizeof(adhost->adh_trail_offset)) == -1) {
292		pjdlog_errno(LOG_WARNING,
293		    "Unable to receive size of the most recent trail file from %s",
294		    adhost->adh_remoteaddr);
295		proto_close(conn);
296		return (-1);
297	}
298	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
299	if (proto_recv(conn, &adhost->adh_trail_name,
300	    sizeof(adhost->adh_trail_name)) == -1) {
301		pjdlog_errno(LOG_WARNING,
302		    "Unable to receive name of the most recent trail file from %s",
303		    adhost->adh_remoteaddr);
304		proto_close(conn);
305		return (-1);
306	}
307	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
308	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
309
310	rw_wlock(&adist_remote_lock);
311	mtx_lock(&adist_remote_mtx);
312	PJDLOG_ASSERT(adhost->adh_remote == NULL);
313	PJDLOG_ASSERT(conn != NULL);
314	adhost->adh_remote = conn;
315	mtx_unlock(&adist_remote_mtx);
316	rw_unlock(&adist_remote_lock);
317	cv_signal(&adist_remote_cond);
318
319	return (0);
320}
321
322static void
323sender_disconnect(void)
324{
325
326	rw_wlock(&adist_remote_lock);
327	/*
328	 * Check for a race between dropping rlock and acquiring wlock -
329	 * another thread can close connection in-between.
330	 */
331	if (adhost->adh_remote == NULL) {
332		rw_unlock(&adist_remote_lock);
333		return;
334	}
335	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
336	proto_close(adhost->adh_remote);
337	mtx_lock(&adist_remote_mtx);
338	adhost->adh_remote = NULL;
339	adhost->adh_reset = true;
340	adhost->adh_trail_name[0] = '\0';
341	adhost->adh_trail_offset = 0;
342	mtx_unlock(&adist_remote_mtx);
343	rw_unlock(&adist_remote_lock);
344
345	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
346
347	/* Move all in-flight requests back onto free list. */
348	mtx_lock(&adist_free_list_lock);
349	mtx_lock(&adist_send_list_lock);
350	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
351	mtx_unlock(&adist_send_list_lock);
352	mtx_lock(&adist_recv_list_lock);
353	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
354	mtx_unlock(&adist_recv_list_lock);
355	mtx_unlock(&adist_free_list_lock);
356}
357
358static void
359adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
360    size_t size)
361{
362	static uint64_t seq = 1;
363
364	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
365
366	switch (cmd) {
367	case ADIST_CMD_OPEN:
368	case ADIST_CMD_CLOSE:
369		PJDLOG_ASSERT(data != NULL && size == 0);
370		size = strlen(data) + 1;
371		break;
372	case ADIST_CMD_APPEND:
373		PJDLOG_ASSERT(data != NULL && size > 0);
374		break;
375	case ADIST_CMD_KEEPALIVE:
376	case ADIST_CMD_ERROR:
377		PJDLOG_ASSERT(data == NULL && size == 0);
378		break;
379	default:
380		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
381	}
382
383	adreq->adr_cmd = cmd;
384	adreq->adr_seq = seq++;
385	adreq->adr_datasize = size;
386	/* Don't copy if data is already in out buffer. */
387	if (data != NULL && data != adreq->adr_data)
388		bcopy(data, adreq->adr_data, size);
389}
390
391static bool
392read_thread_wait(void)
393{
394	bool newfile = false;
395
396	mtx_lock(&adist_remote_mtx);
397	if (adhost->adh_reset) {
398		adhost->adh_reset = false;
399		if (trail_filefd(adist_trail) != -1)
400			trail_close(adist_trail);
401		trail_reset(adist_trail);
402		while (adhost->adh_remote == NULL)
403			cv_wait(&adist_remote_cond, &adist_remote_mtx);
404		trail_start(adist_trail, adhost->adh_trail_name,
405		    adhost->adh_trail_offset);
406		newfile = true;
407	}
408	mtx_unlock(&adist_remote_mtx);
409	while (trail_filefd(adist_trail) == -1) {
410		newfile = true;
411		wait_for_dir();
412		if (trail_filefd(adist_trail) == -1)
413			trail_next(adist_trail);
414	}
415	if (newfile) {
416		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
417		    adhost->adh_directory,
418		    trail_filename(adist_trail));
419		(void)wait_for_file_init(trail_filefd(adist_trail));
420	}
421	return (newfile);
422}
423
424static void *
425read_thread(void *arg __unused)
426{
427	struct adreq *adreq;
428	ssize_t done;
429	bool newfile;
430
431	pjdlog_debug(1, "%s started.", __func__);
432
433	for (;;) {
434		newfile = read_thread_wait();
435		QUEUE_TAKE(adreq, &adist_free_list, 0);
436		if (newfile) {
437			adreq_fill(adreq, ADIST_CMD_OPEN,
438			    trail_filename(adist_trail), 0);
439			newfile = false;
440			goto move;
441		}
442
443		done = read(trail_filefd(adist_trail), adreq->adr_data,
444		    ADIST_BUF_SIZE);
445		if (done == -1) {
446			off_t offset;
447			int error;
448
449			error = errno;
450			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
451			errno = error;
452			pjdlog_errno(LOG_ERR,
453			    "Error while reading \"%s/%s\" at offset %jd",
454			    adhost->adh_directory, trail_filename(adist_trail),
455			    offset);
456			trail_close(adist_trail);
457			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
458			goto move;
459		} else if (done == 0) {
460			/* End of file. */
461			pjdlog_debug(3, "End of \"%s/%s\".",
462			    adhost->adh_directory, trail_filename(adist_trail));
463			if (!trail_switch(adist_trail)) {
464				/* More audit records can arrive. */
465				mtx_lock(&adist_free_list_lock);
466				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
467				    adr_next);
468				mtx_unlock(&adist_free_list_lock);
469				wait_for_file();
470				continue;
471			}
472			adreq_fill(adreq, ADIST_CMD_CLOSE,
473			    trail_filename(adist_trail), 0);
474			trail_close(adist_trail);
475			goto move;
476		}
477
478		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
479move:
480		pjdlog_debug(3,
481		    "read thread: Moving request %p to the send queue (%hhu).",
482		    adreq, adreq->adr_cmd);
483		QUEUE_INSERT(adreq, &adist_send_list);
484	}
485	/* NOTREACHED */
486	return (NULL);
487}
488
489static void
490keepalive_send(void)
491{
492	struct adreq *adreq;
493
494	rw_rlock(&adist_remote_lock);
495	if (adhost->adh_remote == NULL) {
496		rw_unlock(&adist_remote_lock);
497		return;
498	}
499	rw_unlock(&adist_remote_lock);
500
501	mtx_lock(&adist_free_list_lock);
502	adreq = TAILQ_FIRST(&adist_free_list);
503	if (adreq != NULL)
504		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
505	mtx_unlock(&adist_free_list_lock);
506	if (adreq == NULL)
507		return;
508
509	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
510
511	QUEUE_INSERT(adreq, &adist_send_list);
512
513	pjdlog_debug(3, "keepalive_send: Request sent.");
514}
515
516/*
517 * Thread sends request to secondary node.
518 */
519static void *
520send_thread(void *arg __unused)
521{
522	time_t lastcheck, now;
523	struct adreq *adreq;
524
525	pjdlog_debug(1, "%s started.", __func__);
526
527	lastcheck = time(NULL);
528
529	for (;;) {
530		pjdlog_debug(3, "send thread: Taking request.");
531		for (;;) {
532			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
533			if (adreq != NULL)
534				break;
535			now = time(NULL);
536			if (lastcheck + ADIST_KEEPALIVE <= now) {
537				keepalive_send();
538				lastcheck = now;
539			}
540		}
541		PJDLOG_ASSERT(adreq != NULL);
542		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
543		    adreq->adr_cmd);
544		/*
545		 * Protect connection from disappearing.
546		 */
547		rw_rlock(&adist_remote_lock);
548		/*
549		 * Move the request to the recv queue first to avoid race
550		 * where the recv thread receives the reply before we move
551		 * the request to the recv queue.
552		 */
553		QUEUE_INSERT(adreq, &adist_recv_list);
554		if (adhost->adh_remote == NULL ||
555		    proto_send(adhost->adh_remote, &adreq->adr_packet,
556		    ADPKT_SIZE(adreq)) == -1) {
557			rw_unlock(&adist_remote_lock);
558			pjdlog_debug(1,
559			    "send thread: (%p) Unable to send request.", adreq);
560			if (adhost->adh_remote != NULL)
561				sender_disconnect();
562			continue;
563		} else {
564			pjdlog_debug(3, "Request %p sent successfully.", adreq);
565			adreq_log(LOG_DEBUG, 2, -1, adreq,
566			    "send: (%p) Request sent: ", adreq);
567			rw_unlock(&adist_remote_lock);
568		}
569	}
570	/* NOTREACHED */
571	return (NULL);
572}
573
574static void
575adrep_decode_header(struct adrep *adrep)
576{
577
578	/* Byte-swap only is the receiver is using different byte order. */
579	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
580		adrep->adrp_byteorder = ADIST_BYTEORDER;
581		adrep->adrp_seq = bswap64(adrep->adrp_seq);
582		adrep->adrp_error = bswap16(adrep->adrp_error);
583	}
584}
585
586/*
587 * Thread receives answer from secondary node and passes it to ggate_send
588 * thread.
589 */
590static void *
591recv_thread(void *arg __unused)
592{
593	struct adrep adrep;
594	struct adreq *adreq;
595
596	pjdlog_debug(1, "%s started.", __func__);
597
598	for (;;) {
599		/* Wait until there is anything to receive. */
600		QUEUE_WAIT(&adist_recv_list);
601		pjdlog_debug(3, "recv thread: Got something.");
602		rw_rlock(&adist_remote_lock);
603		if (adhost->adh_remote == NULL) {
604			/*
605			 * Connection is dead.
606			 * XXX: We shouldn't be here.
607			 */
608			rw_unlock(&adist_remote_lock);
609			continue;
610		}
611		if (proto_recv(adhost->adh_remote, &adrep,
612		    sizeof(adrep)) == -1) {
613			rw_unlock(&adist_remote_lock);
614			pjdlog_errno(LOG_ERR, "Unable to receive reply");
615			sender_disconnect();
616			continue;
617		}
618		rw_unlock(&adist_remote_lock);
619		adrep_decode_header(&adrep);
620		/*
621		 * Find the request that was just confirmed.
622		 */
623		mtx_lock(&adist_recv_list_lock);
624		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
625			if (adreq->adr_seq == adrep.adrp_seq) {
626				TAILQ_REMOVE(&adist_recv_list, adreq,
627				    adr_next);
628				break;
629			}
630		}
631		if (adreq == NULL) {
632			/*
633			 * If we disconnected in the meantime, just continue.
634			 * On disconnect sender_disconnect() clears the queue,
635			 * we can use that.
636			 */
637			if (TAILQ_EMPTY(&adist_recv_list)) {
638				rw_unlock(&adist_remote_lock);
639				continue;
640			}
641			mtx_unlock(&adist_recv_list_lock);
642			pjdlog_error("Found no request matching received 'seq' field (%ju).",
643			    (uintmax_t)adrep.adrp_seq);
644			sender_disconnect();
645			continue;
646		}
647		mtx_unlock(&adist_recv_list_lock);
648		adreq_log(LOG_DEBUG, 2, -1, adreq,
649		    "recv thread: (%p) Request confirmed: ", adreq);
650		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
651		    adreq->adr_cmd);
652		if (adrep.adrp_error != 0) {
653			pjdlog_error("Receiver returned error (%s), disconnecting.",
654			    adist_errstr((int)adrep.adrp_error));
655			sender_disconnect();
656			continue;
657		}
658		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
659			trail_unlink(adist_trail, adreq->adr_data);
660		pjdlog_debug(3, "Request received successfully.");
661		QUEUE_INSERT(adreq, &adist_free_list);
662	}
663	/* NOTREACHED */
664	return (NULL);
665}
666
667static void
668guard_check_connection(void)
669{
670
671	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
672
673	rw_rlock(&adist_remote_lock);
674	if (adhost->adh_remote != NULL) {
675		rw_unlock(&adist_remote_lock);
676		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
677		    adhost->adh_remoteaddr);
678		return;
679	}
680
681	/*
682	 * Upgrade the lock. It doesn't have to be atomic as no other thread
683	 * can change connection status from disconnected to connected.
684	 */
685	rw_unlock(&adist_remote_lock);
686	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
687	    adhost->adh_remoteaddr);
688	if (sender_connect() == 0) {
689		pjdlog_info("Successfully reconnected to %s.",
690		    adhost->adh_remoteaddr);
691	} else {
692		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
693		    adhost->adh_remoteaddr);
694	}
695}
696
697/*
698 * Thread guards remote connections and reconnects when needed, handles
699 * signals, etc.
700 */
701static void *
702guard_thread(void *arg __unused)
703{
704	struct timespec timeout;
705	time_t lastcheck, now;
706	sigset_t mask;
707	int signo;
708
709	lastcheck = time(NULL);
710
711	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
712	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
713	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
714
715	timeout.tv_sec = ADIST_KEEPALIVE;
716	timeout.tv_nsec = 0;
717	signo = -1;
718
719	for (;;) {
720		switch (signo) {
721		case SIGINT:
722		case SIGTERM:
723			sigexit_received = true;
724			pjdlog_exitx(EX_OK,
725			    "Termination signal received, exiting.");
726			break;
727		default:
728			break;
729		}
730
731		pjdlog_debug(3, "remote_guard: Checking connections.");
732		now = time(NULL);
733		if (lastcheck + ADIST_KEEPALIVE <= now) {
734			guard_check_connection();
735			lastcheck = now;
736		}
737		signo = sigtimedwait(&mask, NULL, &timeout);
738	}
739	/* NOTREACHED */
740	return (NULL);
741}
742
743void
744adist_sender(struct adist_config *config, struct adist_host *adh)
745{
746	pthread_t td;
747	pid_t pid;
748	int error, mode, debuglevel;
749
750	/*
751	 * Create communication channel for sending connection requests from
752	 * child to parent.
753	 */
754	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
755		pjdlog_errno(LOG_ERR,
756		    "Unable to create connection sockets between child and parent");
757		return;
758	}
759
760	pid = fork();
761	if (pid == -1) {
762		pjdlog_errno(LOG_ERR, "Unable to fork");
763		proto_close(adh->adh_conn);
764		adh->adh_conn = NULL;
765		return;
766	}
767
768	if (pid > 0) {
769		/* This is parent. */
770		adh->adh_worker_pid = pid;
771		/* Declare that we are receiver. */
772		proto_recv(adh->adh_conn, NULL, 0);
773		return;
774	}
775
776	adcfg = config;
777	adhost = adh;
778
779	mode = pjdlog_mode_get();
780	debuglevel = pjdlog_debug_get();
781
782	/* Declare that we are sender. */
783	proto_send(adhost->adh_conn, NULL, 0);
784
785	descriptors_cleanup(adhost);
786
787#ifdef TODO
788	descriptors_assert(adhost, mode);
789#endif
790
791	pjdlog_init(mode);
792	pjdlog_debug_set(debuglevel);
793	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
794	    role2str(adhost->adh_role));
795#ifdef HAVE_SETPROCTITLE
796	setproctitle("[%s] (%s) ", adhost->adh_name,
797	    role2str(adhost->adh_role));
798#endif
799
800	/*
801	 * The sender process should be able to remove entries from its
802	 * trail directory, but it should not be able to write to the
803	 * trail files, only read from them.
804	 */
805	adist_trail = trail_new(adhost->adh_directory, false);
806	if (adist_trail == NULL)
807		exit(EX_OSFILE);
808
809	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
810	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
811		exit(EX_CONFIG);
812	}
813	pjdlog_info("Privileges successfully dropped.");
814
815	/*
816	 * We can ignore wait_for_dir_init() failures. It will fall back to
817	 * using sleep(3).
818	 */
819	(void)wait_for_dir_init(trail_dirfd(adist_trail));
820
821	init_environment();
822	if (sender_connect() == 0) {
823		pjdlog_info("Successfully connected to %s.",
824		    adhost->adh_remoteaddr);
825	}
826	adhost->adh_reset = true;
827
828	/*
829	 * Create the guard thread first, so we can handle signals from the
830	 * very begining.
831	 */
832	error = pthread_create(&td, NULL, guard_thread, NULL);
833	PJDLOG_ASSERT(error == 0);
834	error = pthread_create(&td, NULL, send_thread, NULL);
835	PJDLOG_ASSERT(error == 0);
836	error = pthread_create(&td, NULL, recv_thread, NULL);
837	PJDLOG_ASSERT(error == 0);
838	(void)read_thread(NULL);
839}
840