1/*-
2 * Copyright (c) 2012 The FreeBSD Foundation
3 * All rights reserved.
4 *
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
30 */
31
32#include <config/config.h>
33
34#include <sys/param.h>
35#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36#include <sys/endian.h>
37#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38#ifdef HAVE_MACHINE_ENDIAN_H
39#include <machine/endian.h>
40#else /* !HAVE_MACHINE_ENDIAN_H */
41#ifdef HAVE_ENDIAN_H
42#include <endian.h>
43#else /* !HAVE_ENDIAN_H */
44#error "No supported endian.h"
45#endif /* !HAVE_ENDIAN_H */
46#endif /* !HAVE_MACHINE_ENDIAN_H */
47#include <compat/endian.h>
48#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49#include <sys/queue.h>
50#include <sys/stat.h>
51#include <sys/wait.h>
52
53#include <stdio.h>
54#include <stdlib.h>
55#include <unistd.h>
56
57#include <ctype.h>
58#include <dirent.h>
59#include <err.h>
60#include <errno.h>
61#include <fcntl.h>
62#ifdef HAVE_LIBUTIL_H
63#include <libutil.h>
64#endif
65#include <signal.h>
66#include <string.h>
67#include <strings.h>
68
69#include <openssl/hmac.h>
70
71#ifndef HAVE_SIGTIMEDWAIT
72#include "sigtimedwait.h"
73#endif
74
75#include "auditdistd.h"
76#include "pjdlog.h"
77#include "proto.h"
78#include "sandbox.h"
79#include "subr.h"
80#include "synch.h"
81#include "trail.h"
82
83static struct adist_config *adcfg;
84static struct adist_host *adhost;
85
86static pthread_rwlock_t adist_remote_lock;
87static pthread_mutex_t adist_remote_mtx;
88static pthread_cond_t adist_remote_cond;
89static struct trail *adist_trail;
90
91static TAILQ_HEAD(, adreq) adist_free_list;
92static pthread_mutex_t adist_free_list_lock;
93static pthread_cond_t adist_free_list_cond;
94static TAILQ_HEAD(, adreq) adist_send_list;
95static pthread_mutex_t adist_send_list_lock;
96static pthread_cond_t adist_send_list_cond;
97static TAILQ_HEAD(, adreq) adist_recv_list;
98static pthread_mutex_t adist_recv_list_lock;
99static pthread_cond_t adist_recv_list_cond;
100
101static void
102init_environment(void)
103{
104	struct adreq *adreq;
105	unsigned int ii;
106
107	rw_init(&adist_remote_lock);
108	mtx_init(&adist_remote_mtx);
109	cv_init(&adist_remote_cond);
110	TAILQ_INIT(&adist_free_list);
111	mtx_init(&adist_free_list_lock);
112	cv_init(&adist_free_list_cond);
113	TAILQ_INIT(&adist_send_list);
114	mtx_init(&adist_send_list_lock);
115	cv_init(&adist_send_list_cond);
116	TAILQ_INIT(&adist_recv_list);
117	mtx_init(&adist_recv_list_lock);
118	cv_init(&adist_recv_list_cond);
119
120	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
122		if (adreq == NULL) {
123			pjdlog_exitx(EX_TEMPFAIL,
124			    "Unable to allocate %zu bytes of memory for adreq object.",
125			    sizeof(*adreq) + ADIST_BUF_SIZE);
126		}
127		adreq->adr_byteorder = ADIST_BYTEORDER;
128		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
129		adreq->adr_seq = 0;
130		adreq->adr_datasize = 0;
131		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
132	}
133}
134
135static int
136sender_connect(void)
137{
138	unsigned char rnd[32], hash[32], resp[32];
139	struct proto_conn *conn;
140	char welcome[8];
141	int16_t val;
142
143	val = 1;
144	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145		pjdlog_exit(EX_TEMPFAIL,
146		    "Unable to send connection request to parent");
147	}
148	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149		pjdlog_exit(EX_TEMPFAIL,
150		    "Unable to receive reply to connection request from parent");
151	}
152	if (val != 0) {
153		errno = val;
154		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155		    adhost->adh_remoteaddr);
156		return (-1);
157	}
158	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159		pjdlog_exit(EX_TEMPFAIL,
160		    "Unable to receive connection from parent");
161	}
162	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164		    adhost->adh_remoteaddr);
165		proto_close(conn);
166		return (-1);
167	}
168	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169	/* Error in setting timeout is not critical, but why should it fail? */
170	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
172	else
173		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
174
175	/* Exchange welcome message, which includes version number. */
176	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178		pjdlog_errno(LOG_WARNING,
179		    "Unable to send welcome message to %s",
180		    adhost->adh_remoteaddr);
181		proto_close(conn);
182		return (-1);
183	}
184	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185	bzero(welcome, sizeof(welcome));
186	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187		pjdlog_errno(LOG_WARNING,
188		    "Unable to receive welcome message from %s",
189		    adhost->adh_remoteaddr);
190		proto_close(conn);
191		return (-1);
192	}
193	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194	    !isdigit(welcome[6]) || welcome[7] != '\0') {
195		pjdlog_warning("Invalid welcome message from %s.",
196		    adhost->adh_remoteaddr);
197		proto_close(conn);
198		return (-1);
199	}
200	pjdlog_debug(1, "Welcome message received (%s).", welcome);
201	/*
202	 * Receiver can only reply with version number lower or equal to
203	 * the one we sent.
204	 */
205	adhost->adh_version = atoi(welcome + 5);
206	if (adhost->adh_version > ADIST_VERSION) {
207		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
209		proto_close(conn);
210		return (-1);
211	}
212
213	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214	    adhost->adh_remoteaddr);
215
216	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218		    adhost->adh_remoteaddr);
219		proto_close(conn);
220		return (-1);
221	}
222	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
223
224	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226		    adhost->adh_remoteaddr);
227		proto_close(conn);
228		return (-1);
229	}
230	pjdlog_debug(1, "Challenge received.");
231
232	if (HMAC(EVP_sha256(), adhost->adh_password,
233	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
234	    NULL) == NULL) {
235		pjdlog_warning("Unable to generate response.");
236		proto_close(conn);
237		return (-1);
238	}
239	pjdlog_debug(1, "Response generated.");
240
241	if (proto_send(conn, hash, sizeof(hash)) == -1) {
242		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243		    adhost->adh_remoteaddr);
244		proto_close(conn);
245		return (-1);
246	}
247	pjdlog_debug(1, "Response sent.");
248
249	if (adist_random(rnd, sizeof(rnd)) == -1) {
250		pjdlog_warning("Unable to generate challenge.");
251		proto_close(conn);
252		return (-1);
253	}
254	pjdlog_debug(1, "Challenge generated.");
255
256	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258		    adhost->adh_remoteaddr);
259		proto_close(conn);
260		return (-1);
261	}
262	pjdlog_debug(1, "Challenge sent.");
263
264	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266		    adhost->adh_remoteaddr);
267		proto_close(conn);
268		return (-1);
269	}
270	pjdlog_debug(1, "Response received.");
271
272	if (HMAC(EVP_sha256(), adhost->adh_password,
273	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
274	    NULL) == NULL) {
275		pjdlog_warning("Unable to generate hash.");
276		proto_close(conn);
277		return (-1);
278	}
279	pjdlog_debug(1, "Hash generated.");
280
281	if (memcmp(resp, hash, sizeof(hash)) != 0) {
282		pjdlog_warning("Invalid response from %s (wrong password?).",
283		    adhost->adh_remoteaddr);
284		proto_close(conn);
285		return (-1);
286	}
287	pjdlog_info("Receiver authenticated.");
288
289	if (proto_recv(conn, &adhost->adh_trail_offset,
290	    sizeof(adhost->adh_trail_offset)) == -1) {
291		pjdlog_errno(LOG_WARNING,
292		    "Unable to receive size of the most recent trail file from %s",
293		    adhost->adh_remoteaddr);
294		proto_close(conn);
295		return (-1);
296	}
297	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298	if (proto_recv(conn, &adhost->adh_trail_name,
299	    sizeof(adhost->adh_trail_name)) == -1) {
300		pjdlog_errno(LOG_WARNING,
301		    "Unable to receive name of the most recent trail file from %s",
302		    adhost->adh_remoteaddr);
303		proto_close(conn);
304		return (-1);
305	}
306	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
308
309	rw_wlock(&adist_remote_lock);
310	mtx_lock(&adist_remote_mtx);
311	PJDLOG_ASSERT(adhost->adh_remote == NULL);
312	PJDLOG_ASSERT(conn != NULL);
313	adhost->adh_remote = conn;
314	mtx_unlock(&adist_remote_mtx);
315	rw_unlock(&adist_remote_lock);
316	cv_signal(&adist_remote_cond);
317
318	return (0);
319}
320
321static void
322sender_disconnect(void)
323{
324
325	rw_wlock(&adist_remote_lock);
326	/*
327	 * Check for a race between dropping rlock and acquiring wlock -
328	 * another thread can close connection in-between.
329	 */
330	if (adhost->adh_remote == NULL) {
331		rw_unlock(&adist_remote_lock);
332		return;
333	}
334	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335	proto_close(adhost->adh_remote);
336	mtx_lock(&adist_remote_mtx);
337	adhost->adh_remote = NULL;
338	adhost->adh_reset = true;
339	adhost->adh_trail_name[0] = '\0';
340	adhost->adh_trail_offset = 0;
341	mtx_unlock(&adist_remote_mtx);
342	rw_unlock(&adist_remote_lock);
343
344	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
345
346	/* Move all in-flight requests back onto free list. */
347	mtx_lock(&adist_free_list_lock);
348	mtx_lock(&adist_send_list_lock);
349	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350	mtx_unlock(&adist_send_list_lock);
351	mtx_lock(&adist_recv_list_lock);
352	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353	mtx_unlock(&adist_recv_list_lock);
354	mtx_unlock(&adist_free_list_lock);
355}
356
357static void
358adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
359    size_t size)
360{
361	static uint64_t seq = 1;
362
363	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
364
365	switch (cmd) {
366	case ADIST_CMD_OPEN:
367	case ADIST_CMD_CLOSE:
368		PJDLOG_ASSERT(data != NULL && size == 0);
369		size = strlen(data) + 1;
370		break;
371	case ADIST_CMD_APPEND:
372		PJDLOG_ASSERT(data != NULL && size > 0);
373		break;
374	case ADIST_CMD_KEEPALIVE:
375	case ADIST_CMD_ERROR:
376		PJDLOG_ASSERT(data == NULL && size == 0);
377		break;
378	default:
379		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
380	}
381
382	adreq->adr_cmd = cmd;
383	adreq->adr_seq = seq++;
384	adreq->adr_datasize = size;
385	/* Don't copy if data is already in out buffer. */
386	if (data != NULL && data != adreq->adr_data)
387		bcopy(data, adreq->adr_data, size);
388}
389
390static bool
391read_thread_wait(void)
392{
393	bool newfile = false;
394
395	mtx_lock(&adist_remote_mtx);
396	if (adhost->adh_reset) {
397		adhost->adh_reset = false;
398		if (trail_filefd(adist_trail) != -1)
399			trail_close(adist_trail);
400		trail_reset(adist_trail);
401		while (adhost->adh_remote == NULL)
402			cv_wait(&adist_remote_cond, &adist_remote_mtx);
403		trail_start(adist_trail, adhost->adh_trail_name,
404		    adhost->adh_trail_offset);
405		newfile = true;
406	}
407	mtx_unlock(&adist_remote_mtx);
408	while (trail_filefd(adist_trail) == -1) {
409		newfile = true;
410		wait_for_dir();
411		if (trail_filefd(adist_trail) == -1)
412			trail_next(adist_trail);
413	}
414	if (newfile) {
415		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
416		    adhost->adh_directory,
417		    trail_filename(adist_trail));
418		(void)wait_for_file_init(trail_filefd(adist_trail));
419	}
420	return (newfile);
421}
422
423static void *
424read_thread(void *arg __unused)
425{
426	struct adreq *adreq;
427	ssize_t done;
428	bool newfile;
429
430	pjdlog_debug(1, "%s started.", __func__);
431
432	for (;;) {
433		newfile = read_thread_wait();
434		QUEUE_TAKE(adreq, &adist_free_list, 0);
435		if (newfile) {
436			adreq_fill(adreq, ADIST_CMD_OPEN,
437			    trail_filename(adist_trail), 0);
438			newfile = false;
439			goto move;
440		}
441
442		done = read(trail_filefd(adist_trail), adreq->adr_data,
443		    ADIST_BUF_SIZE);
444		if (done == -1) {
445			off_t offset;
446			int error;
447
448			error = errno;
449			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
450			errno = error;
451			pjdlog_errno(LOG_ERR,
452			    "Error while reading \"%s/%s\" at offset %jd",
453			    adhost->adh_directory, trail_filename(adist_trail),
454			    offset);
455			trail_close(adist_trail);
456			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
457			goto move;
458		} else if (done == 0) {
459			/* End of file. */
460			pjdlog_debug(3, "End of \"%s/%s\".",
461			    adhost->adh_directory, trail_filename(adist_trail));
462			if (!trail_switch(adist_trail)) {
463				/* More audit records can arrive. */
464				mtx_lock(&adist_free_list_lock);
465				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
466				    adr_next);
467				mtx_unlock(&adist_free_list_lock);
468				wait_for_file();
469				continue;
470			}
471			adreq_fill(adreq, ADIST_CMD_CLOSE,
472			    trail_filename(adist_trail), 0);
473			trail_close(adist_trail);
474			goto move;
475		}
476
477		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
478move:
479		pjdlog_debug(3,
480		    "read thread: Moving request %p to the send queue (%hhu).",
481		    adreq, adreq->adr_cmd);
482		QUEUE_INSERT(adreq, &adist_send_list);
483	}
484	/* NOTREACHED */
485	return (NULL);
486}
487
488static void
489keepalive_send(void)
490{
491	struct adreq *adreq;
492
493	rw_rlock(&adist_remote_lock);
494	if (adhost->adh_remote == NULL) {
495		rw_unlock(&adist_remote_lock);
496		return;
497	}
498	rw_unlock(&adist_remote_lock);
499
500	mtx_lock(&adist_free_list_lock);
501	adreq = TAILQ_FIRST(&adist_free_list);
502	if (adreq != NULL)
503		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
504	mtx_unlock(&adist_free_list_lock);
505	if (adreq == NULL)
506		return;
507
508	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
509
510	QUEUE_INSERT(adreq, &adist_send_list);
511
512	pjdlog_debug(3, "keepalive_send: Request sent.");
513}
514
515/*
516 * Thread sends request to secondary node.
517 */
518static void *
519send_thread(void *arg __unused)
520{
521	time_t lastcheck, now;
522	struct adreq *adreq;
523
524	pjdlog_debug(1, "%s started.", __func__);
525
526	lastcheck = time(NULL);
527
528	for (;;) {
529		pjdlog_debug(3, "send thread: Taking request.");
530		for (;;) {
531			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
532			if (adreq != NULL)
533				break;
534			now = time(NULL);
535			if (lastcheck + ADIST_KEEPALIVE <= now) {
536				keepalive_send();
537				lastcheck = now;
538			}
539		}
540		PJDLOG_ASSERT(adreq != NULL);
541		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
542		    adreq->adr_cmd);
543		/*
544		 * Protect connection from disappearing.
545		 */
546		rw_rlock(&adist_remote_lock);
547		/*
548		 * Move the request to the recv queue first to avoid race
549		 * where the recv thread receives the reply before we move
550		 * the request to the recv queue.
551		 */
552		QUEUE_INSERT(adreq, &adist_recv_list);
553		if (adhost->adh_remote == NULL ||
554		    proto_send(adhost->adh_remote, &adreq->adr_packet,
555		    ADPKT_SIZE(adreq)) == -1) {
556			rw_unlock(&adist_remote_lock);
557			pjdlog_debug(1,
558			    "send thread: (%p) Unable to send request.", adreq);
559			if (adhost->adh_remote != NULL)
560				sender_disconnect();
561			continue;
562		} else {
563			pjdlog_debug(3, "Request %p sent successfully.", adreq);
564			adreq_log(LOG_DEBUG, 2, -1, adreq,
565			    "send: (%p) Request sent: ", adreq);
566			rw_unlock(&adist_remote_lock);
567		}
568	}
569	/* NOTREACHED */
570	return (NULL);
571}
572
573static void
574adrep_decode_header(struct adrep *adrep)
575{
576
577	/* Byte-swap only is the receiver is using different byte order. */
578	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
579		adrep->adrp_byteorder = ADIST_BYTEORDER;
580		adrep->adrp_seq = bswap64(adrep->adrp_seq);
581		adrep->adrp_error = bswap16(adrep->adrp_error);
582	}
583}
584
585/*
586 * Thread receives answer from secondary node and passes it to ggate_send
587 * thread.
588 */
589static void *
590recv_thread(void *arg __unused)
591{
592	struct adrep adrep;
593	struct adreq *adreq;
594
595	pjdlog_debug(1, "%s started.", __func__);
596
597	for (;;) {
598		/* Wait until there is anything to receive. */
599		QUEUE_WAIT(&adist_recv_list);
600		pjdlog_debug(3, "recv thread: Got something.");
601		rw_rlock(&adist_remote_lock);
602		if (adhost->adh_remote == NULL) {
603			/*
604			 * Connection is dead.
605			 * XXX: We shouldn't be here.
606			 */
607			rw_unlock(&adist_remote_lock);
608			continue;
609		}
610		if (proto_recv(adhost->adh_remote, &adrep,
611		    sizeof(adrep)) == -1) {
612			rw_unlock(&adist_remote_lock);
613			pjdlog_errno(LOG_ERR, "Unable to receive reply");
614			sender_disconnect();
615			continue;
616		}
617		rw_unlock(&adist_remote_lock);
618		adrep_decode_header(&adrep);
619		/*
620		 * Find the request that was just confirmed.
621		 */
622		mtx_lock(&adist_recv_list_lock);
623		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
624			if (adreq->adr_seq == adrep.adrp_seq) {
625				TAILQ_REMOVE(&adist_recv_list, adreq,
626				    adr_next);
627				break;
628			}
629		}
630		if (adreq == NULL) {
631			/*
632			 * If we disconnected in the meantime, just continue.
633			 * On disconnect sender_disconnect() clears the queue,
634			 * we can use that.
635			 */
636			if (TAILQ_EMPTY(&adist_recv_list)) {
637				rw_unlock(&adist_remote_lock);
638				continue;
639			}
640			mtx_unlock(&adist_recv_list_lock);
641			pjdlog_error("Found no request matching received 'seq' field (%ju).",
642			    (uintmax_t)adrep.adrp_seq);
643			sender_disconnect();
644			continue;
645		}
646		mtx_unlock(&adist_recv_list_lock);
647		adreq_log(LOG_DEBUG, 2, -1, adreq,
648		    "recv thread: (%p) Request confirmed: ", adreq);
649		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
650		    adreq->adr_cmd);
651		if (adrep.adrp_error != 0) {
652			pjdlog_error("Receiver returned error (%s), disconnecting.",
653			    adist_errstr((int)adrep.adrp_error));
654			sender_disconnect();
655			continue;
656		}
657		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
658			trail_unlink(adist_trail, adreq->adr_data);
659		pjdlog_debug(3, "Request received successfully.");
660		QUEUE_INSERT(adreq, &adist_free_list);
661	}
662	/* NOTREACHED */
663	return (NULL);
664}
665
666static void
667guard_check_connection(void)
668{
669
670	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
671
672	rw_rlock(&adist_remote_lock);
673	if (adhost->adh_remote != NULL) {
674		rw_unlock(&adist_remote_lock);
675		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
676		    adhost->adh_remoteaddr);
677		return;
678	}
679
680	/*
681	 * Upgrade the lock. It doesn't have to be atomic as no other thread
682	 * can change connection status from disconnected to connected.
683	 */
684	rw_unlock(&adist_remote_lock);
685	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
686	    adhost->adh_remoteaddr);
687	if (sender_connect() == 0) {
688		pjdlog_info("Successfully reconnected to %s.",
689		    adhost->adh_remoteaddr);
690	} else {
691		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
692		    adhost->adh_remoteaddr);
693	}
694}
695
696/*
697 * Thread guards remote connections and reconnects when needed, handles
698 * signals, etc.
699 */
700static void *
701guard_thread(void *arg __unused)
702{
703	struct timespec timeout;
704	time_t lastcheck, now;
705	sigset_t mask;
706	int signo;
707
708	lastcheck = time(NULL);
709
710	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
711	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
712	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
713
714	timeout.tv_sec = ADIST_KEEPALIVE;
715	timeout.tv_nsec = 0;
716	signo = -1;
717
718	for (;;) {
719		switch (signo) {
720		case SIGINT:
721		case SIGTERM:
722			sigexit_received = true;
723			pjdlog_exitx(EX_OK,
724			    "Termination signal received, exiting.");
725			break;
726		default:
727			break;
728		}
729
730		pjdlog_debug(3, "remote_guard: Checking connections.");
731		now = time(NULL);
732		if (lastcheck + ADIST_KEEPALIVE <= now) {
733			guard_check_connection();
734			lastcheck = now;
735		}
736		signo = sigtimedwait(&mask, NULL, &timeout);
737	}
738	/* NOTREACHED */
739	return (NULL);
740}
741
742void
743adist_sender(struct adist_config *config, struct adist_host *adh)
744{
745	pthread_t td;
746	pid_t pid;
747	int error, mode, debuglevel;
748
749	/*
750	 * Create communication channel for sending connection requests from
751	 * child to parent.
752	 */
753	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
754		pjdlog_errno(LOG_ERR,
755		    "Unable to create connection sockets between child and parent");
756		return;
757	}
758
759	pid = fork();
760	if (pid == -1) {
761		pjdlog_errno(LOG_ERR, "Unable to fork");
762		proto_close(adh->adh_conn);
763		adh->adh_conn = NULL;
764		return;
765	}
766
767	if (pid > 0) {
768		/* This is parent. */
769		adh->adh_worker_pid = pid;
770		/* Declare that we are receiver. */
771		proto_recv(adh->adh_conn, NULL, 0);
772		return;
773	}
774
775	adcfg = config;
776	adhost = adh;
777
778	mode = pjdlog_mode_get();
779	debuglevel = pjdlog_debug_get();
780
781	/* Declare that we are sender. */
782	proto_send(adhost->adh_conn, NULL, 0);
783
784	descriptors_cleanup(adhost);
785
786#ifdef TODO
787	descriptors_assert(adhost, mode);
788#endif
789
790	pjdlog_init(mode);
791	pjdlog_debug_set(debuglevel);
792	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
793	    role2str(adhost->adh_role));
794#ifdef HAVE_SETPROCTITLE
795	setproctitle("[%s] (%s) ", adhost->adh_name,
796	    role2str(adhost->adh_role));
797#endif
798
799	/*
800	 * The sender process should be able to remove entries from its
801	 * trail directory, but it should not be able to write to the
802	 * trail files, only read from them.
803	 */
804	adist_trail = trail_new(adhost->adh_directory, false);
805	if (adist_trail == NULL)
806		exit(EX_OSFILE);
807
808	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
809	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
810		exit(EX_CONFIG);
811	}
812	pjdlog_info("Privileges successfully dropped.");
813
814	/*
815	 * We can ignore wait_for_dir_init() failures. It will fall back to
816	 * using sleep(3).
817	 */
818	(void)wait_for_dir_init(trail_dirfd(adist_trail));
819
820	init_environment();
821	if (sender_connect() == 0) {
822		pjdlog_info("Successfully connected to %s.",
823		    adhost->adh_remoteaddr);
824	}
825	adhost->adh_reset = true;
826
827	/*
828	 * Create the guard thread first, so we can handle signals from the
829	 * very begining.
830	 */
831	error = pthread_create(&td, NULL, guard_thread, NULL);
832	PJDLOG_ASSERT(error == 0);
833	error = pthread_create(&td, NULL, send_thread, NULL);
834	PJDLOG_ASSERT(error == 0);
835	error = pthread_create(&td, NULL, recv_thread, NULL);
836	PJDLOG_ASSERT(error == 0);
837	(void)read_thread(NULL);
838}
839