sender.c revision 247442
1/*-
2 * Copyright (c) 2012 The FreeBSD Foundation
3 * All rights reserved.
4 *
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
30 */
31
32#include <config/config.h>
33
34#include <sys/param.h>
35#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36#include <sys/endian.h>
37#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38#ifdef HAVE_MACHINE_ENDIAN_H
39#include <machine/endian.h>
40#else /* !HAVE_MACHINE_ENDIAN_H */
41#ifdef HAVE_ENDIAN_H
42#include <endian.h>
43#else /* !HAVE_ENDIAN_H */
44#error "No supported endian.h"
45#endif /* !HAVE_ENDIAN_H */
46#endif /* !HAVE_MACHINE_ENDIAN_H */
47#include <compat/endian.h>
48#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49#include <sys/queue.h>
50#include <sys/stat.h>
51#include <sys/wait.h>
52
53#include <stdio.h>
54#include <stdlib.h>
55#include <unistd.h>
56
57#include <ctype.h>
58#include <dirent.h>
59#include <err.h>
60#include <errno.h>
61#include <fcntl.h>
62#ifdef HAVE_LIBUTIL_H
63#include <libutil.h>
64#endif
65#include <signal.h>
66#include <string.h>
67#include <strings.h>
68
69#include <openssl/hmac.h>
70
71#ifndef HAVE_SIGTIMEDWAIT
72#include "sigtimedwait.h"
73#endif
74
75#include "auditdistd.h"
76#include "pjdlog.h"
77#include "proto.h"
78#include "sandbox.h"
79#include "subr.h"
80#include "synch.h"
81#include "trail.h"
82
83static struct adist_config *adcfg;
84static struct adist_host *adhost;
85
86static pthread_rwlock_t adist_remote_lock;
87static pthread_mutex_t adist_remote_mtx;
88static pthread_cond_t adist_remote_cond;
89static struct trail *adist_trail;
90
91static TAILQ_HEAD(, adreq) adist_free_list;
92static pthread_mutex_t adist_free_list_lock;
93static pthread_cond_t adist_free_list_cond;
94static TAILQ_HEAD(, adreq) adist_send_list;
95static pthread_mutex_t adist_send_list_lock;
96static pthread_cond_t adist_send_list_cond;
97static TAILQ_HEAD(, adreq) adist_recv_list;
98static pthread_mutex_t adist_recv_list_lock;
99static pthread_cond_t adist_recv_list_cond;
100
101static void
102init_environment(void)
103{
104	struct adreq *adreq;
105	unsigned int ii;
106
107	rw_init(&adist_remote_lock);
108	mtx_init(&adist_remote_mtx);
109	cv_init(&adist_remote_cond);
110	TAILQ_INIT(&adist_free_list);
111	mtx_init(&adist_free_list_lock);
112	cv_init(&adist_free_list_cond);
113	TAILQ_INIT(&adist_send_list);
114	mtx_init(&adist_send_list_lock);
115	cv_init(&adist_send_list_cond);
116	TAILQ_INIT(&adist_recv_list);
117	mtx_init(&adist_recv_list_lock);
118	cv_init(&adist_recv_list_cond);
119
120	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
122		if (adreq == NULL) {
123			pjdlog_exitx(EX_TEMPFAIL,
124			    "Unable to allocate %zu bytes of memory for adreq object.",
125			    sizeof(*adreq) + ADIST_BUF_SIZE);
126		}
127		adreq->adr_byteorder = ADIST_BYTEORDER;
128		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
129		adreq->adr_seq = 0;
130		adreq->adr_datasize = 0;
131		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
132	}
133}
134
135static int
136sender_connect(void)
137{
138	unsigned char rnd[32], hash[32], resp[32];
139	struct proto_conn *conn;
140	char welcome[8];
141	int16_t val;
142
143	val = 1;
144	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145		pjdlog_exit(EX_TEMPFAIL,
146		    "Unable to send connection request to parent");
147	}
148	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149		pjdlog_exit(EX_TEMPFAIL,
150		    "Unable to receive reply to connection request from parent");
151	}
152	if (val != 0) {
153		errno = val;
154		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155		    adhost->adh_remoteaddr);
156		return (-1);
157	}
158	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159		pjdlog_exit(EX_TEMPFAIL,
160		    "Unable to receive connection from parent");
161	}
162	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164		    adhost->adh_remoteaddr);
165		proto_close(conn);
166		return (-1);
167	}
168	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169	/* Error in setting timeout is not critical, but why should it fail? */
170	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
172	else
173		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
174
175	/* Exchange welcome message, which includes version number. */
176	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178		pjdlog_errno(LOG_WARNING,
179		    "Unable to send welcome message to %s",
180		    adhost->adh_remoteaddr);
181		proto_close(conn);
182		return (-1);
183	}
184	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185	bzero(welcome, sizeof(welcome));
186	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187		pjdlog_errno(LOG_WARNING,
188		    "Unable to receive welcome message from %s",
189		    adhost->adh_remoteaddr);
190		proto_close(conn);
191		return (-1);
192	}
193	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194	    !isdigit(welcome[6]) || welcome[7] != '\0') {
195		pjdlog_warning("Invalid welcome message from %s.",
196		    adhost->adh_remoteaddr);
197		proto_close(conn);
198		return (-1);
199	}
200	pjdlog_debug(1, "Welcome message received (%s).", welcome);
201	/*
202	 * Receiver can only reply with version number lower or equal to
203	 * the one we sent.
204	 */
205	adhost->adh_version = atoi(welcome + 5);
206	if (adhost->adh_version > ADIST_VERSION) {
207		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
209		proto_close(conn);
210		return (-1);
211	}
212
213	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214	    adhost->adh_remoteaddr);
215
216	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218		    adhost->adh_remoteaddr);
219		proto_close(conn);
220		return (-1);
221	}
222	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
223
224	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226		    adhost->adh_remoteaddr);
227		proto_close(conn);
228		return (-1);
229	}
230	pjdlog_debug(1, "Challenge received.");
231
232	if (HMAC(EVP_sha256(), adhost->adh_password,
233	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
234	    NULL) == NULL) {
235		pjdlog_warning("Unable to generate response.");
236		proto_close(conn);
237		return (-1);
238	}
239	pjdlog_debug(1, "Response generated.");
240
241	if (proto_send(conn, hash, sizeof(hash)) == -1) {
242		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243		    adhost->adh_remoteaddr);
244		proto_close(conn);
245		return (-1);
246	}
247	pjdlog_debug(1, "Response sent.");
248
249	if (adist_random(rnd, sizeof(rnd)) == -1) {
250		pjdlog_warning("Unable to generate challenge.");
251		proto_close(conn);
252		return (-1);
253	}
254	pjdlog_debug(1, "Challenge generated.");
255
256	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258		    adhost->adh_remoteaddr);
259		proto_close(conn);
260		return (-1);
261	}
262	pjdlog_debug(1, "Challenge sent.");
263
264	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266		    adhost->adh_remoteaddr);
267		proto_close(conn);
268		return (-1);
269	}
270	pjdlog_debug(1, "Response received.");
271
272	if (HMAC(EVP_sha256(), adhost->adh_password,
273	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
274	    NULL) == NULL) {
275		pjdlog_warning("Unable to generate hash.");
276		proto_close(conn);
277		return (-1);
278	}
279	pjdlog_debug(1, "Hash generated.");
280
281	if (memcmp(resp, hash, sizeof(hash)) != 0) {
282		pjdlog_warning("Invalid response from %s (wrong password?).",
283		    adhost->adh_remoteaddr);
284		proto_close(conn);
285		return (-1);
286	}
287	pjdlog_info("Receiver authenticated.");
288
289	if (proto_recv(conn, &adhost->adh_trail_offset,
290	    sizeof(adhost->adh_trail_offset)) == -1) {
291		pjdlog_errno(LOG_WARNING,
292		    "Unable to receive size of the most recent trail file from %s",
293		    adhost->adh_remoteaddr);
294		proto_close(conn);
295		return (-1);
296	}
297	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298	if (proto_recv(conn, &adhost->adh_trail_name,
299	    sizeof(adhost->adh_trail_name)) == -1) {
300		pjdlog_errno(LOG_WARNING,
301		    "Unable to receive name of the most recent trail file from %s",
302		    adhost->adh_remoteaddr);
303		proto_close(conn);
304		return (-1);
305	}
306	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
308
309	rw_wlock(&adist_remote_lock);
310	mtx_lock(&adist_remote_mtx);
311	PJDLOG_ASSERT(adhost->adh_remote == NULL);
312	PJDLOG_ASSERT(conn != NULL);
313	adhost->adh_remote = conn;
314	mtx_unlock(&adist_remote_mtx);
315	rw_unlock(&adist_remote_lock);
316	cv_signal(&adist_remote_cond);
317
318	return (0);
319}
320
321static void
322sender_disconnect(void)
323{
324
325	rw_wlock(&adist_remote_lock);
326	/*
327	 * Check for a race between dropping rlock and acquiring wlock -
328	 * another thread can close connection in-between.
329	 */
330	if (adhost->adh_remote == NULL) {
331		rw_unlock(&adist_remote_lock);
332		return;
333	}
334	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335	proto_close(adhost->adh_remote);
336	mtx_lock(&adist_remote_mtx);
337	adhost->adh_remote = NULL;
338	adhost->adh_reset = true;
339	adhost->adh_trail_name[0] = '\0';
340	adhost->adh_trail_offset = 0;
341	mtx_unlock(&adist_remote_mtx);
342	rw_unlock(&adist_remote_lock);
343
344	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
345
346	/* Move all in-flight requests back onto free list. */
347	mtx_lock(&adist_free_list_lock);
348	mtx_lock(&adist_send_list_lock);
349	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350	mtx_unlock(&adist_send_list_lock);
351	mtx_lock(&adist_recv_list_lock);
352	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353	mtx_unlock(&adist_recv_list_lock);
354	mtx_unlock(&adist_free_list_lock);
355}
356
357static void
358adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
359    size_t size)
360{
361	static uint64_t seq = 1;
362
363	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
364
365	switch (cmd) {
366	case ADIST_CMD_OPEN:
367	case ADIST_CMD_CLOSE:
368		PJDLOG_ASSERT(data != NULL && size == 0);
369		size = strlen(data) + 1;
370		break;
371	case ADIST_CMD_APPEND:
372		PJDLOG_ASSERT(data != NULL && size > 0);
373		break;
374	case ADIST_CMD_KEEPALIVE:
375	case ADIST_CMD_ERROR:
376		PJDLOG_ASSERT(data == NULL && size == 0);
377		break;
378	default:
379		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
380	}
381
382	adreq->adr_cmd = cmd;
383	adreq->adr_seq = seq++;
384	adreq->adr_datasize = size;
385	/* Don't copy if data is already in out buffer. */
386	if (data != NULL && data != adreq->adr_data)
387		bcopy(data, adreq->adr_data, size);
388}
389
390static bool
391read_thread_wait(void)
392{
393	bool newfile = false;
394
395	mtx_lock(&adist_remote_mtx);
396	if (adhost->adh_reset) {
397reset:
398		adhost->adh_reset = false;
399		if (trail_filefd(adist_trail) != -1)
400			trail_close(adist_trail);
401		trail_reset(adist_trail);
402		while (adhost->adh_remote == NULL)
403			cv_wait(&adist_remote_cond, &adist_remote_mtx);
404		trail_start(adist_trail, adhost->adh_trail_name,
405		    adhost->adh_trail_offset);
406		newfile = true;
407	}
408	mtx_unlock(&adist_remote_mtx);
409	while (trail_filefd(adist_trail) == -1) {
410		newfile = true;
411		wait_for_dir();
412		/*
413		 * We may have been disconnected and reconnected in the
414		 * meantime, check if reset is set.
415		 */
416		mtx_lock(&adist_remote_mtx);
417		if (adhost->adh_reset)
418			goto reset;
419		mtx_unlock(&adist_remote_mtx);
420		if (trail_filefd(adist_trail) == -1)
421			trail_next(adist_trail);
422	}
423	if (newfile) {
424		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
425		    adhost->adh_directory,
426		    trail_filename(adist_trail));
427		(void)wait_for_file_init(trail_filefd(adist_trail));
428	}
429	return (newfile);
430}
431
432static void *
433read_thread(void *arg __unused)
434{
435	struct adreq *adreq;
436	ssize_t done;
437	bool newfile;
438
439	pjdlog_debug(1, "%s started.", __func__);
440
441	for (;;) {
442		newfile = read_thread_wait();
443		QUEUE_TAKE(adreq, &adist_free_list, 0);
444		if (newfile) {
445			adreq_fill(adreq, ADIST_CMD_OPEN,
446			    trail_filename(adist_trail), 0);
447			newfile = false;
448			goto move;
449		}
450
451		done = read(trail_filefd(adist_trail), adreq->adr_data,
452		    ADIST_BUF_SIZE);
453		if (done == -1) {
454			off_t offset;
455			int error;
456
457			error = errno;
458			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
459			errno = error;
460			pjdlog_errno(LOG_ERR,
461			    "Error while reading \"%s/%s\" at offset %jd",
462			    adhost->adh_directory, trail_filename(adist_trail),
463			    offset);
464			trail_close(adist_trail);
465			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
466			goto move;
467		} else if (done == 0) {
468			/* End of file. */
469			pjdlog_debug(3, "End of \"%s/%s\".",
470			    adhost->adh_directory, trail_filename(adist_trail));
471			if (!trail_switch(adist_trail)) {
472				/* More audit records can arrive. */
473				mtx_lock(&adist_free_list_lock);
474				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
475				    adr_next);
476				mtx_unlock(&adist_free_list_lock);
477				wait_for_file();
478				continue;
479			}
480			adreq_fill(adreq, ADIST_CMD_CLOSE,
481			    trail_filename(adist_trail), 0);
482			trail_close(adist_trail);
483			goto move;
484		}
485
486		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
487move:
488		pjdlog_debug(3,
489		    "read thread: Moving request %p to the send queue (%hhu).",
490		    adreq, adreq->adr_cmd);
491		QUEUE_INSERT(adreq, &adist_send_list);
492	}
493	/* NOTREACHED */
494	return (NULL);
495}
496
497static void
498keepalive_send(void)
499{
500	struct adreq *adreq;
501
502	rw_rlock(&adist_remote_lock);
503	if (adhost->adh_remote == NULL) {
504		rw_unlock(&adist_remote_lock);
505		return;
506	}
507	rw_unlock(&adist_remote_lock);
508
509	mtx_lock(&adist_free_list_lock);
510	adreq = TAILQ_FIRST(&adist_free_list);
511	if (adreq != NULL)
512		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
513	mtx_unlock(&adist_free_list_lock);
514	if (adreq == NULL)
515		return;
516
517	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
518
519	QUEUE_INSERT(adreq, &adist_send_list);
520
521	pjdlog_debug(3, "keepalive_send: Request sent.");
522}
523
524/*
525 * Thread sends request to secondary node.
526 */
527static void *
528send_thread(void *arg __unused)
529{
530	time_t lastcheck, now;
531	struct adreq *adreq;
532
533	pjdlog_debug(1, "%s started.", __func__);
534
535	lastcheck = time(NULL);
536
537	for (;;) {
538		pjdlog_debug(3, "send thread: Taking request.");
539		for (;;) {
540			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
541			if (adreq != NULL)
542				break;
543			now = time(NULL);
544			if (lastcheck + ADIST_KEEPALIVE <= now) {
545				keepalive_send();
546				lastcheck = now;
547			}
548		}
549		PJDLOG_ASSERT(adreq != NULL);
550		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
551		    adreq->adr_cmd);
552		/*
553		 * Protect connection from disappearing.
554		 */
555		rw_rlock(&adist_remote_lock);
556		/*
557		 * Move the request to the recv queue first to avoid race
558		 * where the recv thread receives the reply before we move
559		 * the request to the recv queue.
560		 */
561		QUEUE_INSERT(adreq, &adist_recv_list);
562		if (adhost->adh_remote == NULL ||
563		    proto_send(adhost->adh_remote, &adreq->adr_packet,
564		    ADPKT_SIZE(adreq)) == -1) {
565			rw_unlock(&adist_remote_lock);
566			pjdlog_debug(1,
567			    "send thread: (%p) Unable to send request.", adreq);
568			if (adhost->adh_remote != NULL)
569				sender_disconnect();
570			continue;
571		} else {
572			pjdlog_debug(3, "Request %p sent successfully.", adreq);
573			adreq_log(LOG_DEBUG, 2, -1, adreq,
574			    "send: (%p) Request sent: ", adreq);
575			rw_unlock(&adist_remote_lock);
576		}
577	}
578	/* NOTREACHED */
579	return (NULL);
580}
581
582static void
583adrep_decode_header(struct adrep *adrep)
584{
585
586	/* Byte-swap only is the receiver is using different byte order. */
587	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
588		adrep->adrp_byteorder = ADIST_BYTEORDER;
589		adrep->adrp_seq = bswap64(adrep->adrp_seq);
590		adrep->adrp_error = bswap16(adrep->adrp_error);
591	}
592}
593
594/*
595 * Thread receives answer from secondary node and passes it to ggate_send
596 * thread.
597 */
598static void *
599recv_thread(void *arg __unused)
600{
601	struct adrep adrep;
602	struct adreq *adreq;
603
604	pjdlog_debug(1, "%s started.", __func__);
605
606	for (;;) {
607		/* Wait until there is anything to receive. */
608		QUEUE_WAIT(&adist_recv_list);
609		pjdlog_debug(3, "recv thread: Got something.");
610		rw_rlock(&adist_remote_lock);
611		if (adhost->adh_remote == NULL) {
612			/*
613			 * Connection is dead.
614			 * XXX: We shouldn't be here.
615			 */
616			rw_unlock(&adist_remote_lock);
617			continue;
618		}
619		if (proto_recv(adhost->adh_remote, &adrep,
620		    sizeof(adrep)) == -1) {
621			rw_unlock(&adist_remote_lock);
622			pjdlog_errno(LOG_ERR, "Unable to receive reply");
623			sender_disconnect();
624			continue;
625		}
626		rw_unlock(&adist_remote_lock);
627		adrep_decode_header(&adrep);
628		/*
629		 * Find the request that was just confirmed.
630		 */
631		mtx_lock(&adist_recv_list_lock);
632		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
633			if (adreq->adr_seq == adrep.adrp_seq) {
634				TAILQ_REMOVE(&adist_recv_list, adreq,
635				    adr_next);
636				break;
637			}
638		}
639		if (adreq == NULL) {
640			/*
641			 * If we disconnected in the meantime, just continue.
642			 * On disconnect sender_disconnect() clears the queue,
643			 * we can use that.
644			 */
645			if (TAILQ_EMPTY(&adist_recv_list)) {
646				rw_unlock(&adist_remote_lock);
647				continue;
648			}
649			mtx_unlock(&adist_recv_list_lock);
650			pjdlog_error("Found no request matching received 'seq' field (%ju).",
651			    (uintmax_t)adrep.adrp_seq);
652			sender_disconnect();
653			continue;
654		}
655		mtx_unlock(&adist_recv_list_lock);
656		adreq_log(LOG_DEBUG, 2, -1, adreq,
657		    "recv thread: (%p) Request confirmed: ", adreq);
658		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
659		    adreq->adr_cmd);
660		if (adrep.adrp_error != 0) {
661			pjdlog_error("Receiver returned error (%s), disconnecting.",
662			    adist_errstr((int)adrep.adrp_error));
663			sender_disconnect();
664			continue;
665		}
666		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
667			trail_unlink(adist_trail, adreq->adr_data);
668		pjdlog_debug(3, "Request received successfully.");
669		QUEUE_INSERT(adreq, &adist_free_list);
670	}
671	/* NOTREACHED */
672	return (NULL);
673}
674
675static void
676guard_check_connection(void)
677{
678
679	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
680
681	rw_rlock(&adist_remote_lock);
682	if (adhost->adh_remote != NULL) {
683		rw_unlock(&adist_remote_lock);
684		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
685		    adhost->adh_remoteaddr);
686		return;
687	}
688
689	/*
690	 * Upgrade the lock. It doesn't have to be atomic as no other thread
691	 * can change connection status from disconnected to connected.
692	 */
693	rw_unlock(&adist_remote_lock);
694	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
695	    adhost->adh_remoteaddr);
696	if (sender_connect() == 0) {
697		pjdlog_info("Successfully reconnected to %s.",
698		    adhost->adh_remoteaddr);
699	} else {
700		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
701		    adhost->adh_remoteaddr);
702	}
703}
704
705/*
706 * Thread guards remote connections and reconnects when needed, handles
707 * signals, etc.
708 */
709static void *
710guard_thread(void *arg __unused)
711{
712	struct timespec timeout;
713	time_t lastcheck, now;
714	sigset_t mask;
715	int signo;
716
717	lastcheck = time(NULL);
718
719	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
720	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
721	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
722
723	timeout.tv_sec = ADIST_KEEPALIVE;
724	timeout.tv_nsec = 0;
725	signo = -1;
726
727	for (;;) {
728		switch (signo) {
729		case SIGINT:
730		case SIGTERM:
731			sigexit_received = true;
732			pjdlog_exitx(EX_OK,
733			    "Termination signal received, exiting.");
734			break;
735		default:
736			break;
737		}
738
739		pjdlog_debug(3, "remote_guard: Checking connections.");
740		now = time(NULL);
741		if (lastcheck + ADIST_KEEPALIVE <= now) {
742			guard_check_connection();
743			lastcheck = now;
744		}
745		signo = sigtimedwait(&mask, NULL, &timeout);
746	}
747	/* NOTREACHED */
748	return (NULL);
749}
750
751void
752adist_sender(struct adist_config *config, struct adist_host *adh)
753{
754	pthread_t td;
755	pid_t pid;
756	int error, mode, debuglevel;
757
758	/*
759	 * Create communication channel for sending connection requests from
760	 * child to parent.
761	 */
762	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
763		pjdlog_errno(LOG_ERR,
764		    "Unable to create connection sockets between child and parent");
765		return;
766	}
767
768	pid = fork();
769	if (pid == -1) {
770		pjdlog_errno(LOG_ERR, "Unable to fork");
771		proto_close(adh->adh_conn);
772		adh->adh_conn = NULL;
773		return;
774	}
775
776	if (pid > 0) {
777		/* This is parent. */
778		adh->adh_worker_pid = pid;
779		/* Declare that we are receiver. */
780		proto_recv(adh->adh_conn, NULL, 0);
781		return;
782	}
783
784	adcfg = config;
785	adhost = adh;
786
787	mode = pjdlog_mode_get();
788	debuglevel = pjdlog_debug_get();
789
790	/* Declare that we are sender. */
791	proto_send(adhost->adh_conn, NULL, 0);
792
793	descriptors_cleanup(adhost);
794
795#ifdef TODO
796	descriptors_assert(adhost, mode);
797#endif
798
799	pjdlog_init(mode);
800	pjdlog_debug_set(debuglevel);
801	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
802	    role2str(adhost->adh_role));
803#ifdef HAVE_SETPROCTITLE
804	setproctitle("[%s] (%s) ", adhost->adh_name,
805	    role2str(adhost->adh_role));
806#endif
807
808	/*
809	 * The sender process should be able to remove entries from its
810	 * trail directory, but it should not be able to write to the
811	 * trail files, only read from them.
812	 */
813	adist_trail = trail_new(adhost->adh_directory, false);
814	if (adist_trail == NULL)
815		exit(EX_OSFILE);
816
817	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
818	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
819		exit(EX_CONFIG);
820	}
821	pjdlog_info("Privileges successfully dropped.");
822
823	/*
824	 * We can ignore wait_for_dir_init() failures. It will fall back to
825	 * using sleep(3).
826	 */
827	(void)wait_for_dir_init(trail_dirfd(adist_trail));
828
829	init_environment();
830	if (sender_connect() == 0) {
831		pjdlog_info("Successfully connected to %s.",
832		    adhost->adh_remoteaddr);
833	}
834	adhost->adh_reset = true;
835
836	/*
837	 * Create the guard thread first, so we can handle signals from the
838	 * very begining.
839	 */
840	error = pthread_create(&td, NULL, guard_thread, NULL);
841	PJDLOG_ASSERT(error == 0);
842	error = pthread_create(&td, NULL, send_thread, NULL);
843	PJDLOG_ASSERT(error == 0);
844	error = pthread_create(&td, NULL, recv_thread, NULL);
845	PJDLOG_ASSERT(error == 0);
846	(void)read_thread(NULL);
847}
848