1/*-
2 * Copyright (c) 2012 The FreeBSD Foundation
3 * All rights reserved.
4 *
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30#include <config/config.h>
31
32#include <sys/param.h>
33#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
34#include <sys/endian.h>
35#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
36#ifdef HAVE_MACHINE_ENDIAN_H
37#include <machine/endian.h>
38#else /* !HAVE_MACHINE_ENDIAN_H */
39#ifdef HAVE_ENDIAN_H
40#include <endian.h>
41#else /* !HAVE_ENDIAN_H */
42#error "No supported endian.h"
43#endif /* !HAVE_ENDIAN_H */
44#endif /* !HAVE_MACHINE_ENDIAN_H */
45#include <compat/endian.h>
46#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
47#include <sys/queue.h>
48#include <sys/stat.h>
49#include <sys/wait.h>
50
51#include <stdio.h>
52#include <stdlib.h>
53#include <unistd.h>
54
55#include <ctype.h>
56#include <dirent.h>
57#include <err.h>
58#include <errno.h>
59#include <fcntl.h>
60#ifdef HAVE_LIBUTIL_H
61#include <libutil.h>
62#endif
63#include <signal.h>
64#include <string.h>
65#include <strings.h>
66
67#include <openssl/hmac.h>
68
69#ifndef HAVE_SIGTIMEDWAIT
70#include "sigtimedwait.h"
71#endif
72
73#include "auditdistd.h"
74#include "pjdlog.h"
75#include "proto.h"
76#include "sandbox.h"
77#include "subr.h"
78#include "synch.h"
79#include "trail.h"
80
81static struct adist_config *adcfg;
82static struct adist_host *adhost;
83
84static pthread_rwlock_t adist_remote_lock;
85static pthread_mutex_t adist_remote_mtx;
86static pthread_cond_t adist_remote_cond;
87static struct trail *adist_trail;
88
89static TAILQ_HEAD(, adreq) adist_free_list;
90static pthread_mutex_t adist_free_list_lock;
91static pthread_cond_t adist_free_list_cond;
92static TAILQ_HEAD(, adreq) adist_send_list;
93static pthread_mutex_t adist_send_list_lock;
94static pthread_cond_t adist_send_list_cond;
95static TAILQ_HEAD(, adreq) adist_recv_list;
96static pthread_mutex_t adist_recv_list_lock;
97static pthread_cond_t adist_recv_list_cond;
98
99static void
100init_environment(void)
101{
102	struct adreq *adreq;
103	unsigned int ii;
104
105	rw_init(&adist_remote_lock);
106	mtx_init(&adist_remote_mtx);
107	cv_init(&adist_remote_cond);
108	TAILQ_INIT(&adist_free_list);
109	mtx_init(&adist_free_list_lock);
110	cv_init(&adist_free_list_cond);
111	TAILQ_INIT(&adist_send_list);
112	mtx_init(&adist_send_list_lock);
113	cv_init(&adist_send_list_cond);
114	TAILQ_INIT(&adist_recv_list);
115	mtx_init(&adist_recv_list_lock);
116	cv_init(&adist_recv_list_cond);
117
118	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
119		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
120		if (adreq == NULL) {
121			pjdlog_exitx(EX_TEMPFAIL,
122			    "Unable to allocate %zu bytes of memory for adreq object.",
123			    sizeof(*adreq) + ADIST_BUF_SIZE);
124		}
125		adreq->adr_byteorder = ADIST_BYTEORDER;
126		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
127		adreq->adr_seq = 0;
128		adreq->adr_datasize = 0;
129		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
130	}
131}
132
133static int
134sender_connect(void)
135{
136	unsigned char rnd[32], hash[32], resp[32];
137	struct proto_conn *conn;
138	char welcome[8];
139	int16_t val;
140
141	val = 1;
142	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
143		pjdlog_exit(EX_TEMPFAIL,
144		    "Unable to send connection request to parent");
145	}
146	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
147		pjdlog_exit(EX_TEMPFAIL,
148		    "Unable to receive reply to connection request from parent");
149	}
150	if (val != 0) {
151		errno = val;
152		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
153		    adhost->adh_remoteaddr);
154		return (-1);
155	}
156	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
157		pjdlog_exit(EX_TEMPFAIL,
158		    "Unable to receive connection from parent");
159	}
160	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
161		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
162		    adhost->adh_remoteaddr);
163		proto_close(conn);
164		return (-1);
165	}
166	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
167	/* Error in setting timeout is not critical, but why should it fail? */
168	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
169		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
170	else
171		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
172
173	/* Exchange welcome message, which includes version number. */
174	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
175	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
176		pjdlog_errno(LOG_WARNING,
177		    "Unable to send welcome message to %s",
178		    adhost->adh_remoteaddr);
179		proto_close(conn);
180		return (-1);
181	}
182	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
183	bzero(welcome, sizeof(welcome));
184	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
185		pjdlog_errno(LOG_WARNING,
186		    "Unable to receive welcome message from %s",
187		    adhost->adh_remoteaddr);
188		proto_close(conn);
189		return (-1);
190	}
191	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
192	    !isdigit(welcome[6]) || welcome[7] != '\0') {
193		pjdlog_warning("Invalid welcome message from %s.",
194		    adhost->adh_remoteaddr);
195		proto_close(conn);
196		return (-1);
197	}
198	pjdlog_debug(1, "Welcome message received (%s).", welcome);
199	/*
200	 * Receiver can only reply with version number lower or equal to
201	 * the one we sent.
202	 */
203	adhost->adh_version = atoi(welcome + 5);
204	if (adhost->adh_version > ADIST_VERSION) {
205		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
206		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
207		proto_close(conn);
208		return (-1);
209	}
210
211	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
212	    adhost->adh_remoteaddr);
213
214	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
215		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
216		    adhost->adh_remoteaddr);
217		proto_close(conn);
218		return (-1);
219	}
220	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
221
222	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
223		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
224		    adhost->adh_remoteaddr);
225		proto_close(conn);
226		return (-1);
227	}
228	pjdlog_debug(1, "Challenge received.");
229
230	if (HMAC(EVP_sha256(), adhost->adh_password,
231	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
232	    NULL) == NULL) {
233		pjdlog_warning("Unable to generate response.");
234		proto_close(conn);
235		return (-1);
236	}
237	pjdlog_debug(1, "Response generated.");
238
239	if (proto_send(conn, hash, sizeof(hash)) == -1) {
240		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
241		    adhost->adh_remoteaddr);
242		proto_close(conn);
243		return (-1);
244	}
245	pjdlog_debug(1, "Response sent.");
246
247	if (adist_random(rnd, sizeof(rnd)) == -1) {
248		pjdlog_warning("Unable to generate challenge.");
249		proto_close(conn);
250		return (-1);
251	}
252	pjdlog_debug(1, "Challenge generated.");
253
254	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
255		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
256		    adhost->adh_remoteaddr);
257		proto_close(conn);
258		return (-1);
259	}
260	pjdlog_debug(1, "Challenge sent.");
261
262	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
263		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
264		    adhost->adh_remoteaddr);
265		proto_close(conn);
266		return (-1);
267	}
268	pjdlog_debug(1, "Response received.");
269
270	if (HMAC(EVP_sha256(), adhost->adh_password,
271	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
272	    NULL) == NULL) {
273		pjdlog_warning("Unable to generate hash.");
274		proto_close(conn);
275		return (-1);
276	}
277	pjdlog_debug(1, "Hash generated.");
278
279	if (memcmp(resp, hash, sizeof(hash)) != 0) {
280		pjdlog_warning("Invalid response from %s (wrong password?).",
281		    adhost->adh_remoteaddr);
282		proto_close(conn);
283		return (-1);
284	}
285	pjdlog_info("Receiver authenticated.");
286
287	if (proto_recv(conn, &adhost->adh_trail_offset,
288	    sizeof(adhost->adh_trail_offset)) == -1) {
289		pjdlog_errno(LOG_WARNING,
290		    "Unable to receive size of the most recent trail file from %s",
291		    adhost->adh_remoteaddr);
292		proto_close(conn);
293		return (-1);
294	}
295	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
296	if (proto_recv(conn, &adhost->adh_trail_name,
297	    sizeof(adhost->adh_trail_name)) == -1) {
298		pjdlog_errno(LOG_WARNING,
299		    "Unable to receive name of the most recent trail file from %s",
300		    adhost->adh_remoteaddr);
301		proto_close(conn);
302		return (-1);
303	}
304	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
305	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
306
307	rw_wlock(&adist_remote_lock);
308	mtx_lock(&adist_remote_mtx);
309	PJDLOG_ASSERT(adhost->adh_remote == NULL);
310	PJDLOG_ASSERT(conn != NULL);
311	adhost->adh_remote = conn;
312	mtx_unlock(&adist_remote_mtx);
313	rw_unlock(&adist_remote_lock);
314	cv_signal(&adist_remote_cond);
315
316	return (0);
317}
318
319static void
320sender_disconnect(void)
321{
322
323	rw_wlock(&adist_remote_lock);
324	/*
325	 * Check for a race between dropping rlock and acquiring wlock -
326	 * another thread can close connection in-between.
327	 */
328	if (adhost->adh_remote == NULL) {
329		rw_unlock(&adist_remote_lock);
330		return;
331	}
332	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
333	proto_close(adhost->adh_remote);
334	mtx_lock(&adist_remote_mtx);
335	adhost->adh_remote = NULL;
336	adhost->adh_reset = true;
337	adhost->adh_trail_name[0] = '\0';
338	adhost->adh_trail_offset = 0;
339	mtx_unlock(&adist_remote_mtx);
340	rw_unlock(&adist_remote_lock);
341
342	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
343
344	/* Move all in-flight requests back onto free list. */
345	QUEUE_CONCAT2(&adist_free_list, &adist_send_list, &adist_recv_list);
346}
347
348static void
349adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
350    size_t size)
351{
352	static uint64_t seq = 1;
353
354	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
355
356	switch (cmd) {
357	case ADIST_CMD_OPEN:
358	case ADIST_CMD_CLOSE:
359		PJDLOG_ASSERT(data != NULL && size == 0);
360		size = strlen(data) + 1;
361		break;
362	case ADIST_CMD_APPEND:
363		PJDLOG_ASSERT(data != NULL && size > 0);
364		break;
365	case ADIST_CMD_KEEPALIVE:
366	case ADIST_CMD_ERROR:
367		PJDLOG_ASSERT(data == NULL && size == 0);
368		break;
369	default:
370		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
371	}
372
373	adreq->adr_cmd = cmd;
374	adreq->adr_seq = seq++;
375	adreq->adr_datasize = size;
376	/* Don't copy if data is already in out buffer. */
377	if (data != NULL && data != adreq->adr_data)
378		bcopy(data, adreq->adr_data, size);
379}
380
381static bool
382read_thread_wait(void)
383{
384	bool newfile = false;
385
386	mtx_lock(&adist_remote_mtx);
387	if (adhost->adh_reset) {
388reset:
389		adhost->adh_reset = false;
390		if (trail_filefd(adist_trail) != -1)
391			trail_close(adist_trail);
392		trail_reset(adist_trail);
393		while (adhost->adh_remote == NULL)
394			cv_wait(&adist_remote_cond, &adist_remote_mtx);
395		trail_start(adist_trail, adhost->adh_trail_name,
396		    adhost->adh_trail_offset);
397		newfile = true;
398	}
399	mtx_unlock(&adist_remote_mtx);
400	while (trail_filefd(adist_trail) == -1) {
401		newfile = true;
402		wait_for_dir();
403		/*
404		 * We may have been disconnected and reconnected in the
405		 * meantime, check if reset is set.
406		 */
407		mtx_lock(&adist_remote_mtx);
408		if (adhost->adh_reset)
409			goto reset;
410		mtx_unlock(&adist_remote_mtx);
411		if (trail_filefd(adist_trail) == -1)
412			trail_next(adist_trail);
413	}
414	if (newfile) {
415		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
416		    adhost->adh_directory,
417		    trail_filename(adist_trail));
418		(void)wait_for_file_init(trail_filefd(adist_trail));
419	}
420	return (newfile);
421}
422
423static void *
424read_thread(void *arg __unused)
425{
426	struct adreq *adreq;
427	ssize_t done;
428	bool newfile;
429
430	pjdlog_debug(1, "%s started.", __func__);
431
432	for (;;) {
433		newfile = read_thread_wait();
434		QUEUE_TAKE(adreq, &adist_free_list, 0);
435		if (newfile) {
436			adreq_fill(adreq, ADIST_CMD_OPEN,
437			    trail_filename(adist_trail), 0);
438			newfile = false;
439			goto move;
440		}
441
442		done = read(trail_filefd(adist_trail), adreq->adr_data,
443		    ADIST_BUF_SIZE);
444		if (done == -1) {
445			off_t offset;
446			int error;
447
448			error = errno;
449			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
450			errno = error;
451			pjdlog_errno(LOG_ERR,
452			    "Error while reading \"%s/%s\" at offset %jd",
453			    adhost->adh_directory, trail_filename(adist_trail),
454			    offset);
455			trail_close(adist_trail);
456			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
457			goto move;
458		} else if (done == 0) {
459			/* End of file. */
460			pjdlog_debug(3, "End of \"%s/%s\".",
461			    adhost->adh_directory, trail_filename(adist_trail));
462			if (!trail_switch(adist_trail)) {
463				/* More audit records can arrive. */
464				mtx_lock(&adist_free_list_lock);
465				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
466				    adr_next);
467				mtx_unlock(&adist_free_list_lock);
468				wait_for_file();
469				continue;
470			}
471			adreq_fill(adreq, ADIST_CMD_CLOSE,
472			    trail_filename(adist_trail), 0);
473			trail_close(adist_trail);
474			goto move;
475		}
476
477		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
478move:
479		pjdlog_debug(3,
480		    "read thread: Moving request %p to the send queue (%hhu).",
481		    adreq, adreq->adr_cmd);
482		QUEUE_INSERT(adreq, &adist_send_list);
483	}
484	/* NOTREACHED */
485	return (NULL);
486}
487
488static void
489keepalive_send(void)
490{
491	struct adreq *adreq;
492
493	rw_rlock(&adist_remote_lock);
494	if (adhost->adh_remote == NULL) {
495		rw_unlock(&adist_remote_lock);
496		return;
497	}
498	rw_unlock(&adist_remote_lock);
499
500	mtx_lock(&adist_free_list_lock);
501	adreq = TAILQ_FIRST(&adist_free_list);
502	if (adreq != NULL)
503		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
504	mtx_unlock(&adist_free_list_lock);
505	if (adreq == NULL)
506		return;
507
508	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
509
510	QUEUE_INSERT(adreq, &adist_send_list);
511
512	pjdlog_debug(3, "keepalive_send: Request sent.");
513}
514
515static void *
516send_thread(void *arg __unused)
517{
518	time_t lastcheck, now;
519	struct adreq *adreq;
520
521	pjdlog_debug(1, "%s started.", __func__);
522
523	lastcheck = time(NULL);
524
525	for (;;) {
526		pjdlog_debug(3, "send thread: Taking request.");
527		for (;;) {
528			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
529			if (adreq != NULL)
530				break;
531			now = time(NULL);
532			if (lastcheck + ADIST_KEEPALIVE <= now) {
533				keepalive_send();
534				lastcheck = now;
535			}
536		}
537		PJDLOG_ASSERT(adreq != NULL);
538		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
539		    adreq->adr_cmd);
540		/*
541		 * Protect connection from disappearing.
542		 */
543		rw_rlock(&adist_remote_lock);
544		/*
545		 * Move the request to the recv queue first to avoid race
546		 * where the recv thread receives the reply before we move
547		 * the request to the recv queue.
548		 */
549		QUEUE_INSERT(adreq, &adist_recv_list);
550		if (adhost->adh_remote == NULL ||
551		    proto_send(adhost->adh_remote, &adreq->adr_packet,
552		    ADPKT_SIZE(adreq)) == -1) {
553			rw_unlock(&adist_remote_lock);
554			pjdlog_debug(1,
555			    "send thread: (%p) Unable to send request.", adreq);
556			if (adhost->adh_remote != NULL)
557				sender_disconnect();
558			continue;
559		} else {
560			pjdlog_debug(3, "Request %p sent successfully.", adreq);
561			adreq_log(LOG_DEBUG, 2, -1, adreq,
562			    "send: (%p) Request sent: ", adreq);
563			rw_unlock(&adist_remote_lock);
564		}
565	}
566	/* NOTREACHED */
567	return (NULL);
568}
569
570static void
571adrep_decode_header(struct adrep *adrep)
572{
573
574	/* Byte-swap only if the receiver is using different byte order. */
575	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
576		adrep->adrp_byteorder = ADIST_BYTEORDER;
577		adrep->adrp_seq = bswap64(adrep->adrp_seq);
578		adrep->adrp_error = bswap16(adrep->adrp_error);
579	}
580}
581
582static void *
583recv_thread(void *arg __unused)
584{
585	struct adrep adrep;
586	struct adreq *adreq;
587
588	pjdlog_debug(1, "%s started.", __func__);
589
590	for (;;) {
591		/* Wait until there is anything to receive. */
592		QUEUE_WAIT(&adist_recv_list);
593		pjdlog_debug(3, "recv thread: Got something.");
594		rw_rlock(&adist_remote_lock);
595		if (adhost->adh_remote == NULL) {
596			/*
597			 * Connection is dead.
598			 * There is a short race in sender_disconnect() between
599			 * setting adh_remote to NULL and removing entries from
600			 * the recv list, which can result in us being here.
601			 * To avoid just spinning, wait for 0.1s.
602			 */
603			rw_unlock(&adist_remote_lock);
604			usleep(100000);
605			continue;
606		}
607		if (proto_recv(adhost->adh_remote, &adrep,
608		    sizeof(adrep)) == -1) {
609			rw_unlock(&adist_remote_lock);
610			pjdlog_errno(LOG_ERR, "Unable to receive reply");
611			sender_disconnect();
612			continue;
613		}
614		rw_unlock(&adist_remote_lock);
615		adrep_decode_header(&adrep);
616		/*
617		 * Find the request that was just confirmed.
618		 */
619		mtx_lock(&adist_recv_list_lock);
620		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
621			if (adreq->adr_seq == adrep.adrp_seq) {
622				TAILQ_REMOVE(&adist_recv_list, adreq,
623				    adr_next);
624				break;
625			}
626		}
627		if (adreq == NULL) {
628			/*
629			 * If we disconnected in the meantime, just continue.
630			 * On disconnect sender_disconnect() clears the queue,
631			 * we can use that.
632			 */
633			if (TAILQ_EMPTY(&adist_recv_list)) {
634				mtx_unlock(&adist_recv_list_lock);
635				continue;
636			}
637			mtx_unlock(&adist_recv_list_lock);
638			pjdlog_error("Found no request matching received 'seq' field (%ju).",
639			    (uintmax_t)adrep.adrp_seq);
640			sender_disconnect();
641			continue;
642		}
643		mtx_unlock(&adist_recv_list_lock);
644		adreq_log(LOG_DEBUG, 2, -1, adreq,
645		    "recv thread: (%p) Request confirmed: ", adreq);
646		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
647		    adreq->adr_cmd);
648		if (adrep.adrp_error != 0) {
649			pjdlog_error("Receiver returned error (%s), disconnecting.",
650			    adist_errstr((int)adrep.adrp_error));
651			sender_disconnect();
652			continue;
653		}
654		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
655			trail_unlink(adist_trail, adreq->adr_data);
656		pjdlog_debug(3, "Request received successfully.");
657		QUEUE_INSERT(adreq, &adist_free_list);
658	}
659	/* NOTREACHED */
660	return (NULL);
661}
662
663static void
664guard_check_connection(void)
665{
666
667	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
668
669	rw_rlock(&adist_remote_lock);
670	if (adhost->adh_remote != NULL) {
671		rw_unlock(&adist_remote_lock);
672		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
673		    adhost->adh_remoteaddr);
674		return;
675	}
676
677	/*
678	 * Upgrade the lock. It doesn't have to be atomic as no other thread
679	 * can change connection status from disconnected to connected.
680	 */
681	rw_unlock(&adist_remote_lock);
682	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
683	    adhost->adh_remoteaddr);
684	if (sender_connect() == 0) {
685		pjdlog_info("Successfully reconnected to %s.",
686		    adhost->adh_remoteaddr);
687	} else {
688		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
689		    adhost->adh_remoteaddr);
690	}
691}
692
693/*
694 * Thread guards remote connections and reconnects when needed, handles
695 * signals, etc.
696 */
697static void *
698guard_thread(void *arg __unused)
699{
700	struct timespec timeout;
701	time_t lastcheck, now;
702	sigset_t mask;
703	int signo;
704
705	lastcheck = time(NULL);
706
707	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
708	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
709	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
710
711	timeout.tv_sec = ADIST_KEEPALIVE;
712	timeout.tv_nsec = 0;
713	signo = -1;
714
715	for (;;) {
716		switch (signo) {
717		case SIGINT:
718		case SIGTERM:
719			sigexit_received = true;
720			pjdlog_exitx(EX_OK,
721			    "Termination signal received, exiting.");
722			break;
723		default:
724			break;
725		}
726
727		pjdlog_debug(3, "remote_guard: Checking connections.");
728		now = time(NULL);
729		if (lastcheck + ADIST_KEEPALIVE <= now) {
730			guard_check_connection();
731			lastcheck = now;
732		}
733		signo = sigtimedwait(&mask, NULL, &timeout);
734	}
735	/* NOTREACHED */
736	return (NULL);
737}
738
739void
740adist_sender(struct adist_config *config, struct adist_host *adh)
741{
742	pthread_t td;
743	pid_t pid;
744	int error, mode, debuglevel;
745
746	/*
747	 * Create communication channel for sending connection requests from
748	 * child to parent.
749	 */
750	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
751		pjdlog_errno(LOG_ERR,
752		    "Unable to create connection sockets between child and parent");
753		return;
754	}
755
756	pid = fork();
757	if (pid == -1) {
758		pjdlog_errno(LOG_ERR, "Unable to fork");
759		proto_close(adh->adh_conn);
760		adh->adh_conn = NULL;
761		return;
762	}
763
764	if (pid > 0) {
765		/* This is parent. */
766		adh->adh_worker_pid = pid;
767		/* Declare that we are receiver. */
768		proto_recv(adh->adh_conn, NULL, 0);
769		return;
770	}
771
772	adcfg = config;
773	adhost = adh;
774
775	mode = pjdlog_mode_get();
776	debuglevel = pjdlog_debug_get();
777
778	/* Declare that we are sender. */
779	proto_send(adhost->adh_conn, NULL, 0);
780
781	descriptors_cleanup(adhost);
782
783#ifdef TODO
784	descriptors_assert(adhost, mode);
785#endif
786
787	pjdlog_init(mode);
788	pjdlog_debug_set(debuglevel);
789	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
790	    role2str(adhost->adh_role));
791#ifdef HAVE_SETPROCTITLE
792	setproctitle("[%s] (%s) ", adhost->adh_name,
793	    role2str(adhost->adh_role));
794#endif
795
796	/*
797	 * The sender process should be able to remove entries from its
798	 * trail directory, but it should not be able to write to the
799	 * trail files, only read from them.
800	 */
801	adist_trail = trail_new(adhost->adh_directory, false);
802	if (adist_trail == NULL)
803		exit(EX_OSFILE);
804
805	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
806	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
807		exit(EX_CONFIG);
808	}
809	pjdlog_info("Privileges successfully dropped.");
810
811	/*
812	 * We can ignore wait_for_dir_init() failures. It will fall back to
813	 * using sleep(3).
814	 */
815	(void)wait_for_dir_init(trail_dirfd(adist_trail));
816
817	init_environment();
818	if (sender_connect() == 0) {
819		pjdlog_info("Successfully connected to %s.",
820		    adhost->adh_remoteaddr);
821	}
822	adhost->adh_reset = true;
823
824	/*
825	 * Create the guard thread first, so we can handle signals from the
826	 * very begining.
827	 */
828	error = pthread_create(&td, NULL, guard_thread, NULL);
829	PJDLOG_ASSERT(error == 0);
830	error = pthread_create(&td, NULL, send_thread, NULL);
831	PJDLOG_ASSERT(error == 0);
832	error = pthread_create(&td, NULL, recv_thread, NULL);
833	PJDLOG_ASSERT(error == 0);
834	(void)read_thread(NULL);
835}
836