150476Speter/*-
21802Sphk * Copyright (c) 2012 The FreeBSD Foundation
3298107Sgjb * All rights reserved.
41802Sphk *
5234746Sobrien * This software was developed by Pawel Jakub Dawidek under sponsorship from
6284421Sbapt * the FreeBSD Foundation.
7234746Sobrien *
844301Swollman * Redistribution and use in source and binary forms, with or without
9143334Scperciva * modification, are permitted provided that the following conditions
10220496Smarkm * are met:
11292782Sallanjude * 1. Redistributions of source code must retain the above copyright
12300903Sallanjude *    notice, this list of conditions and the following disclaimer.
13300903Sallanjude * 2. Redistributions in binary form must reproduce the above copyright
1455955Srgrimes *    notice, this list of conditions and the following disclaimer in the
15201381Sed *    documentation and/or other materials provided with the distribution.
16201381Sed *
17234746Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
189488Sphk * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1974385Sphk * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
2074385Sphk * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
219488Sphk * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2274385Sphk * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2374385Sphk * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2444437Sache * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2544437Sache * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2644437Sache * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2774385Sphk * SUCH DAMAGE.
2844437Sache */
2974385Sphk
3074385Sphk#include <config/config.h>
3144437Sache
3274385Sphk#include <sys/param.h>
3374385Sphk#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
34143334Scperciva#include <sys/endian.h>
35143334Scperciva#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
36143334Scperciva#ifdef HAVE_MACHINE_ENDIAN_H
37143334Scperciva#include <machine/endian.h>
38292782Sallanjude#else /* !HAVE_MACHINE_ENDIAN_H */
39292782Sallanjude#ifdef HAVE_ENDIAN_H
40292782Sallanjude#include <endian.h>
41292782Sallanjude#else /* !HAVE_ENDIAN_H */
42220496Smarkm#error "No supported endian.h"
43220496Smarkm#endif /* !HAVE_ENDIAN_H */
44220496Smarkm#endif /* !HAVE_MACHINE_ENDIAN_H */
45220496Smarkm#include <compat/endian.h>
46300903Sallanjude#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
47300903Sallanjude#include <sys/queue.h>
48300903Sallanjude#include <sys/stat.h>
49300903Sallanjude#include <sys/wait.h>
5044290Swollman
5144301Swollman#include <stdio.h>
52143334Scperciva#include <stdlib.h>
53292782Sallanjude#include <unistd.h>
54300903Sallanjude
55220496Smarkm#include <ctype.h>
56282774Sthomas#include <dirent.h>
57282774Sthomas#include <err.h>
58282774Sthomas#include <errno.h>
59282774Sthomas#include <fcntl.h>
60282774Sthomas#ifdef HAVE_LIBUTIL_H
61282774Sthomas#include <libutil.h>
62285417Srodrigc#endif
63285417Srodrigc#include <signal.h>
64263218Sjmg#include <string.h>
651802Sphk#include <strings.h>
6644290Swollman
6744301Swollman#include <openssl/hmac.h>
68218723Sdim
6944290Swollman#ifndef HAVE_SIGTIMEDWAIT
7044301Swollman#include "sigtimedwait.h"
7144301Swollman#endif
72218723Sdim
7344301Swollman#include "auditdistd.h"
74218723Sdim#include "pjdlog.h"
75218723Sdim#include "proto.h"
76218723Sdim#include "sandbox.h"
7744290Swollman#include "subr.h"
781802Sphk#include "synch.h"
7944290Swollman#include "trail.h"
8044290Swollman
811802Sphkstatic struct adist_config *adcfg;
821802Sphkstatic struct adist_host *adhost;
8344290Swollman
8444290Swollmanstatic pthread_rwlock_t adist_remote_lock;
851802Sphkstatic pthread_mutex_t adist_remote_mtx;
8644290Swollmanstatic pthread_cond_t adist_remote_cond;
8744290Swollmanstatic struct trail *adist_trail;
8844290Swollman
8944290Swollmanstatic TAILQ_HEAD(, adreq) adist_free_list;
9044290Swollmanstatic pthread_mutex_t adist_free_list_lock;
9144290Swollmanstatic pthread_cond_t adist_free_list_cond;
9244290Swollmanstatic TAILQ_HEAD(, adreq) adist_send_list;
9344290Swollmanstatic pthread_mutex_t adist_send_list_lock;
9444290Swollmanstatic pthread_cond_t adist_send_list_cond;
9544290Swollmanstatic TAILQ_HEAD(, adreq) adist_recv_list;
96143334Scpercivastatic pthread_mutex_t adist_recv_list_lock;
97143334Scpercivastatic pthread_cond_t adist_recv_list_cond;
98143334Scperciva
99143334Scpercivastatic void
100143334Scpercivainit_environment(void)
101143334Scperciva{
102292782Sallanjude	struct adreq *adreq;
103292782Sallanjude	unsigned int ii;
104292782Sallanjude
105292782Sallanjude	rw_init(&adist_remote_lock);
106292782Sallanjude	mtx_init(&adist_remote_mtx);
107292782Sallanjude	cv_init(&adist_remote_cond);
108220496Smarkm	TAILQ_INIT(&adist_free_list);
109220496Smarkm	mtx_init(&adist_free_list_lock);
110220496Smarkm	cv_init(&adist_free_list_cond);
111220496Smarkm	TAILQ_INIT(&adist_send_list);
112220496Smarkm	mtx_init(&adist_send_list_lock);
113220496Smarkm	cv_init(&adist_send_list_cond);
114300903Sallanjude	TAILQ_INIT(&adist_recv_list);
115300903Sallanjude	mtx_init(&adist_recv_list_lock);
116300903Sallanjude	cv_init(&adist_recv_list_cond);
117300903Sallanjude
118300903Sallanjude	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
119300903Sallanjude		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
120300903Sallanjude		if (adreq == NULL) {
12144301Swollman			pjdlog_exitx(EX_TEMPFAIL,
12244301Swollman			    "Unable to allocate %zu bytes of memory for adreq object.",
12344301Swollman			    sizeof(*adreq) + ADIST_BUF_SIZE);
12444301Swollman		}
12544301Swollman		adreq->adr_byteorder = ADIST_BYTEORDER;
12644301Swollman		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
12794367Sru		adreq->adr_seq = 0;
12894367Sru		adreq->adr_datasize = 0;
12994367Sru		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
13094367Sru	}
13194367Sru}
1321802Sphk
1331802Sphkstatic int
13439063Simpsender_connect(void)
13539063Simp{
13639063Simp	unsigned char rnd[32], hash[32], resp[32];
13739063Simp	struct proto_conn *conn;
13839063Simp	char welcome[8];
13939063Simp	int16_t val;
140220496Smarkm
141220496Smarkm	val = 1;
142220496Smarkm	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
143220496Smarkm		pjdlog_exit(EX_TEMPFAIL,
1441802Sphk		    "Unable to send connection request to parent");
1451802Sphk	}
14639063Simp	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
14739063Simp		pjdlog_exit(EX_TEMPFAIL,
14839063Simp		    "Unable to receive reply to connection request from parent");
14939063Simp	}
15039063Simp	if (val != 0) {
15139063Simp		errno = val;
15239063Simp		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
15339063Simp		    adhost->adh_remoteaddr);
1541802Sphk		return (-1);
15544290Swollman	}
156220496Smarkm	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
157220496Smarkm		pjdlog_exit(EX_TEMPFAIL,
158220496Smarkm		    "Unable to receive connection from parent");
159220496Smarkm	}
160220496Smarkm	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
161220496Smarkm		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
162220496Smarkm		    adhost->adh_remoteaddr);
163220496Smarkm		proto_close(conn);
164220496Smarkm		return (-1);
165220496Smarkm	}
166220496Smarkm	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
16744290Swollman	/* Error in setting timeout is not critical, but why should it fail? */
16844290Swollman	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
169220496Smarkm		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
170220496Smarkm	else
171220496Smarkm		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
172220496Smarkm
173220496Smarkm	/* Exchange welcome message, which includes version number. */
174220496Smarkm	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
175220496Smarkm	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
176220496Smarkm		pjdlog_errno(LOG_WARNING,
177220496Smarkm		    "Unable to send welcome message to %s",
178220496Smarkm		    adhost->adh_remoteaddr);
179220496Smarkm		proto_close(conn);
18044290Swollman		return (-1);
181143334Scperciva	}
182143334Scperciva	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
183143334Scperciva	bzero(welcome, sizeof(welcome));
184220496Smarkm	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
185220496Smarkm		pjdlog_errno(LOG_WARNING,
186220496Smarkm		    "Unable to receive welcome message from %s",
187220496Smarkm		    adhost->adh_remoteaddr);
188220496Smarkm		proto_close(conn);
189220496Smarkm		return (-1);
190220496Smarkm	}
191220496Smarkm	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
192220496Smarkm	    !isdigit(welcome[6]) || welcome[7] != '\0') {
193220496Smarkm		pjdlog_warning("Invalid welcome message from %s.",
194143334Scperciva		    adhost->adh_remoteaddr);
195292782Sallanjude		proto_close(conn);
196292782Sallanjude		return (-1);
197292782Sallanjude	}
198292782Sallanjude	pjdlog_debug(1, "Welcome message received (%s).", welcome);
199292782Sallanjude	/*
200292782Sallanjude	 * Receiver can only reply with version number lower or equal to
201292782Sallanjude	 * the one we sent.
202292782Sallanjude	 */
203292782Sallanjude	adhost->adh_version = atoi(welcome + 5);
204292782Sallanjude	if (adhost->adh_version > ADIST_VERSION) {
205292782Sallanjude		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
206292782Sallanjude		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
207292782Sallanjude		proto_close(conn);
208292782Sallanjude		return (-1);
209292782Sallanjude	}
210220496Smarkm
211220496Smarkm	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
212220496Smarkm	    adhost->adh_remoteaddr);
213220496Smarkm
214220496Smarkm	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
215220496Smarkm		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
216220496Smarkm		    adhost->adh_remoteaddr);
217220496Smarkm		proto_close(conn);
218220496Smarkm		return (-1);
219220496Smarkm	}
220220496Smarkm	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
221220496Smarkm
222220496Smarkm	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
223220496Smarkm		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
224220496Smarkm		    adhost->adh_remoteaddr);
225300903Sallanjude		proto_close(conn);
226300903Sallanjude		return (-1);
227300903Sallanjude	}
228300903Sallanjude	pjdlog_debug(1, "Challenge received.");
229300903Sallanjude
230300903Sallanjude	if (HMAC(EVP_sha256(), adhost->adh_password,
231300903Sallanjude	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
232300903Sallanjude	    NULL) == NULL) {
233300903Sallanjude		pjdlog_warning("Unable to generate response.");
234300903Sallanjude		proto_close(conn);
235300903Sallanjude		return (-1);
236300903Sallanjude	}
237300903Sallanjude	pjdlog_debug(1, "Response generated.");
238300903Sallanjude
239300903Sallanjude	if (proto_send(conn, hash, sizeof(hash)) == -1) {
24044301Swollman		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
241220496Smarkm		    adhost->adh_remoteaddr);
242220496Smarkm		proto_close(conn);
243220496Smarkm		return (-1);
244220496Smarkm	}
245220496Smarkm	pjdlog_debug(1, "Response sent.");
246220496Smarkm
247220496Smarkm	if (adist_random(rnd, sizeof(rnd)) == -1) {
248220496Smarkm		pjdlog_warning("Unable to generate challenge.");
249220496Smarkm		proto_close(conn);
250220496Smarkm		return (-1);
251220496Smarkm	}
25244301Swollman	pjdlog_debug(1, "Challenge generated.");
253292782Sallanjude
254300903Sallanjude	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
2552368Sbde		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
2562368Sbde		    adhost->adh_remoteaddr);
257185568Sphk		proto_close(conn);
2581964Sjkh		return (-1);
2592368Sbde	}
260185568Sphk	pjdlog_debug(1, "Challenge sent.");
2611964Sjkh
2622368Sbde	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
2631802Sphk		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
264185568Sphk		    adhost->adh_remoteaddr);
26544301Swollman		proto_close(conn);
26644301Swollman		return (-1);
26744301Swollman	}
268185568Sphk	pjdlog_debug(1, "Response received.");
26944290Swollman
27044290Swollman	if (HMAC(EVP_sha256(), adhost->adh_password,
271185568Sphk	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
27244290Swollman	    NULL) == NULL) {
27344290Swollman		pjdlog_warning("Unable to generate hash.");
274185568Sphk		proto_close(conn);
275143334Scperciva		return (-1);
276143334Scperciva	}
277292782Sallanjude	pjdlog_debug(1, "Hash generated.");
278292782Sallanjude
279292782Sallanjude	if (memcmp(resp, hash, sizeof(hash)) != 0) {
280220496Smarkm		pjdlog_warning("Invalid response from %s (wrong password?).",
281220496Smarkm		    adhost->adh_remoteaddr);
282220496Smarkm		proto_close(conn);
283300903Sallanjude		return (-1);
284300903Sallanjude	}
285300903Sallanjude	pjdlog_info("Receiver authenticated.");
28644290Swollman
2871802Sphk	if (proto_recv(conn, &adhost->adh_trail_offset,
2881802Sphk	    sizeof(adhost->adh_trail_offset)) == -1) {
289		pjdlog_errno(LOG_WARNING,
290		    "Unable to receive size of the most recent trail file from %s",
291		    adhost->adh_remoteaddr);
292		proto_close(conn);
293		return (-1);
294	}
295	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
296	if (proto_recv(conn, &adhost->adh_trail_name,
297	    sizeof(adhost->adh_trail_name)) == -1) {
298		pjdlog_errno(LOG_WARNING,
299		    "Unable to receive name of the most recent trail file from %s",
300		    adhost->adh_remoteaddr);
301		proto_close(conn);
302		return (-1);
303	}
304	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
305	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
306
307	rw_wlock(&adist_remote_lock);
308	mtx_lock(&adist_remote_mtx);
309	PJDLOG_ASSERT(adhost->adh_remote == NULL);
310	PJDLOG_ASSERT(conn != NULL);
311	adhost->adh_remote = conn;
312	mtx_unlock(&adist_remote_mtx);
313	rw_unlock(&adist_remote_lock);
314	cv_signal(&adist_remote_cond);
315
316	return (0);
317}
318
319static void
320sender_disconnect(void)
321{
322
323	rw_wlock(&adist_remote_lock);
324	/*
325	 * Check for a race between dropping rlock and acquiring wlock -
326	 * another thread can close connection in-between.
327	 */
328	if (adhost->adh_remote == NULL) {
329		rw_unlock(&adist_remote_lock);
330		return;
331	}
332	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
333	proto_close(adhost->adh_remote);
334	mtx_lock(&adist_remote_mtx);
335	adhost->adh_remote = NULL;
336	adhost->adh_reset = true;
337	adhost->adh_trail_name[0] = '\0';
338	adhost->adh_trail_offset = 0;
339	mtx_unlock(&adist_remote_mtx);
340	rw_unlock(&adist_remote_lock);
341
342	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
343
344	/* Move all in-flight requests back onto free list. */
345	mtx_lock(&adist_free_list_lock);
346	mtx_lock(&adist_send_list_lock);
347	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
348	mtx_unlock(&adist_send_list_lock);
349	mtx_lock(&adist_recv_list_lock);
350	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
351	mtx_unlock(&adist_recv_list_lock);
352	mtx_unlock(&adist_free_list_lock);
353}
354
355static void
356adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
357    size_t size)
358{
359	static uint64_t seq = 1;
360
361	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
362
363	switch (cmd) {
364	case ADIST_CMD_OPEN:
365	case ADIST_CMD_CLOSE:
366		PJDLOG_ASSERT(data != NULL && size == 0);
367		size = strlen(data) + 1;
368		break;
369	case ADIST_CMD_APPEND:
370		PJDLOG_ASSERT(data != NULL && size > 0);
371		break;
372	case ADIST_CMD_KEEPALIVE:
373	case ADIST_CMD_ERROR:
374		PJDLOG_ASSERT(data == NULL && size == 0);
375		break;
376	default:
377		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
378	}
379
380	adreq->adr_cmd = cmd;
381	adreq->adr_seq = seq++;
382	adreq->adr_datasize = size;
383	/* Don't copy if data is already in out buffer. */
384	if (data != NULL && data != adreq->adr_data)
385		bcopy(data, adreq->adr_data, size);
386}
387
388static bool
389read_thread_wait(void)
390{
391	bool newfile = false;
392
393	mtx_lock(&adist_remote_mtx);
394	if (adhost->adh_reset) {
395reset:
396		adhost->adh_reset = false;
397		if (trail_filefd(adist_trail) != -1)
398			trail_close(adist_trail);
399		trail_reset(adist_trail);
400		while (adhost->adh_remote == NULL)
401			cv_wait(&adist_remote_cond, &adist_remote_mtx);
402		trail_start(adist_trail, adhost->adh_trail_name,
403		    adhost->adh_trail_offset);
404		newfile = true;
405	}
406	mtx_unlock(&adist_remote_mtx);
407	while (trail_filefd(adist_trail) == -1) {
408		newfile = true;
409		wait_for_dir();
410		/*
411		 * We may have been disconnected and reconnected in the
412		 * meantime, check if reset is set.
413		 */
414		mtx_lock(&adist_remote_mtx);
415		if (adhost->adh_reset)
416			goto reset;
417		mtx_unlock(&adist_remote_mtx);
418		if (trail_filefd(adist_trail) == -1)
419			trail_next(adist_trail);
420	}
421	if (newfile) {
422		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
423		    adhost->adh_directory,
424		    trail_filename(adist_trail));
425		(void)wait_for_file_init(trail_filefd(adist_trail));
426	}
427	return (newfile);
428}
429
430static void *
431read_thread(void *arg __unused)
432{
433	struct adreq *adreq;
434	ssize_t done;
435	bool newfile;
436
437	pjdlog_debug(1, "%s started.", __func__);
438
439	for (;;) {
440		newfile = read_thread_wait();
441		QUEUE_TAKE(adreq, &adist_free_list, 0);
442		if (newfile) {
443			adreq_fill(adreq, ADIST_CMD_OPEN,
444			    trail_filename(adist_trail), 0);
445			newfile = false;
446			goto move;
447		}
448
449		done = read(trail_filefd(adist_trail), adreq->adr_data,
450		    ADIST_BUF_SIZE);
451		if (done == -1) {
452			off_t offset;
453			int error;
454
455			error = errno;
456			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
457			errno = error;
458			pjdlog_errno(LOG_ERR,
459			    "Error while reading \"%s/%s\" at offset %jd",
460			    adhost->adh_directory, trail_filename(adist_trail),
461			    offset);
462			trail_close(adist_trail);
463			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
464			goto move;
465		} else if (done == 0) {
466			/* End of file. */
467			pjdlog_debug(3, "End of \"%s/%s\".",
468			    adhost->adh_directory, trail_filename(adist_trail));
469			if (!trail_switch(adist_trail)) {
470				/* More audit records can arrive. */
471				mtx_lock(&adist_free_list_lock);
472				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
473				    adr_next);
474				mtx_unlock(&adist_free_list_lock);
475				wait_for_file();
476				continue;
477			}
478			adreq_fill(adreq, ADIST_CMD_CLOSE,
479			    trail_filename(adist_trail), 0);
480			trail_close(adist_trail);
481			goto move;
482		}
483
484		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
485move:
486		pjdlog_debug(3,
487		    "read thread: Moving request %p to the send queue (%hhu).",
488		    adreq, adreq->adr_cmd);
489		QUEUE_INSERT(adreq, &adist_send_list);
490	}
491	/* NOTREACHED */
492	return (NULL);
493}
494
495static void
496keepalive_send(void)
497{
498	struct adreq *adreq;
499
500	rw_rlock(&adist_remote_lock);
501	if (adhost->adh_remote == NULL) {
502		rw_unlock(&adist_remote_lock);
503		return;
504	}
505	rw_unlock(&adist_remote_lock);
506
507	mtx_lock(&adist_free_list_lock);
508	adreq = TAILQ_FIRST(&adist_free_list);
509	if (adreq != NULL)
510		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
511	mtx_unlock(&adist_free_list_lock);
512	if (adreq == NULL)
513		return;
514
515	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
516
517	QUEUE_INSERT(adreq, &adist_send_list);
518
519	pjdlog_debug(3, "keepalive_send: Request sent.");
520}
521
522/*
523 * Thread sends request to secondary node.
524 */
525static void *
526send_thread(void *arg __unused)
527{
528	time_t lastcheck, now;
529	struct adreq *adreq;
530
531	pjdlog_debug(1, "%s started.", __func__);
532
533	lastcheck = time(NULL);
534
535	for (;;) {
536		pjdlog_debug(3, "send thread: Taking request.");
537		for (;;) {
538			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
539			if (adreq != NULL)
540				break;
541			now = time(NULL);
542			if (lastcheck + ADIST_KEEPALIVE <= now) {
543				keepalive_send();
544				lastcheck = now;
545			}
546		}
547		PJDLOG_ASSERT(adreq != NULL);
548		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
549		    adreq->adr_cmd);
550		/*
551		 * Protect connection from disappearing.
552		 */
553		rw_rlock(&adist_remote_lock);
554		/*
555		 * Move the request to the recv queue first to avoid race
556		 * where the recv thread receives the reply before we move
557		 * the request to the recv queue.
558		 */
559		QUEUE_INSERT(adreq, &adist_recv_list);
560		if (adhost->adh_remote == NULL ||
561		    proto_send(adhost->adh_remote, &adreq->adr_packet,
562		    ADPKT_SIZE(adreq)) == -1) {
563			rw_unlock(&adist_remote_lock);
564			pjdlog_debug(1,
565			    "send thread: (%p) Unable to send request.", adreq);
566			if (adhost->adh_remote != NULL)
567				sender_disconnect();
568			continue;
569		} else {
570			pjdlog_debug(3, "Request %p sent successfully.", adreq);
571			adreq_log(LOG_DEBUG, 2, -1, adreq,
572			    "send: (%p) Request sent: ", adreq);
573			rw_unlock(&adist_remote_lock);
574		}
575	}
576	/* NOTREACHED */
577	return (NULL);
578}
579
580static void
581adrep_decode_header(struct adrep *adrep)
582{
583
584	/* Byte-swap only is the receiver is using different byte order. */
585	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
586		adrep->adrp_byteorder = ADIST_BYTEORDER;
587		adrep->adrp_seq = bswap64(adrep->adrp_seq);
588		adrep->adrp_error = bswap16(adrep->adrp_error);
589	}
590}
591
592/*
593 * Thread receives answer from secondary node and passes it to ggate_send
594 * thread.
595 */
596static void *
597recv_thread(void *arg __unused)
598{
599	struct adrep adrep;
600	struct adreq *adreq;
601
602	pjdlog_debug(1, "%s started.", __func__);
603
604	for (;;) {
605		/* Wait until there is anything to receive. */
606		QUEUE_WAIT(&adist_recv_list);
607		pjdlog_debug(3, "recv thread: Got something.");
608		rw_rlock(&adist_remote_lock);
609		if (adhost->adh_remote == NULL) {
610			/*
611			 * Connection is dead.
612			 * XXX: We shouldn't be here.
613			 */
614			rw_unlock(&adist_remote_lock);
615			continue;
616		}
617		if (proto_recv(adhost->adh_remote, &adrep,
618		    sizeof(adrep)) == -1) {
619			rw_unlock(&adist_remote_lock);
620			pjdlog_errno(LOG_ERR, "Unable to receive reply");
621			sender_disconnect();
622			continue;
623		}
624		rw_unlock(&adist_remote_lock);
625		adrep_decode_header(&adrep);
626		/*
627		 * Find the request that was just confirmed.
628		 */
629		mtx_lock(&adist_recv_list_lock);
630		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
631			if (adreq->adr_seq == adrep.adrp_seq) {
632				TAILQ_REMOVE(&adist_recv_list, adreq,
633				    adr_next);
634				break;
635			}
636		}
637		if (adreq == NULL) {
638			/*
639			 * If we disconnected in the meantime, just continue.
640			 * On disconnect sender_disconnect() clears the queue,
641			 * we can use that.
642			 */
643			if (TAILQ_EMPTY(&adist_recv_list)) {
644				rw_unlock(&adist_remote_lock);
645				continue;
646			}
647			mtx_unlock(&adist_recv_list_lock);
648			pjdlog_error("Found no request matching received 'seq' field (%ju).",
649			    (uintmax_t)adrep.adrp_seq);
650			sender_disconnect();
651			continue;
652		}
653		mtx_unlock(&adist_recv_list_lock);
654		adreq_log(LOG_DEBUG, 2, -1, adreq,
655		    "recv thread: (%p) Request confirmed: ", adreq);
656		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
657		    adreq->adr_cmd);
658		if (adrep.adrp_error != 0) {
659			pjdlog_error("Receiver returned error (%s), disconnecting.",
660			    adist_errstr((int)adrep.adrp_error));
661			sender_disconnect();
662			continue;
663		}
664		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
665			trail_unlink(adist_trail, adreq->adr_data);
666		pjdlog_debug(3, "Request received successfully.");
667		QUEUE_INSERT(adreq, &adist_free_list);
668	}
669	/* NOTREACHED */
670	return (NULL);
671}
672
673static void
674guard_check_connection(void)
675{
676
677	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
678
679	rw_rlock(&adist_remote_lock);
680	if (adhost->adh_remote != NULL) {
681		rw_unlock(&adist_remote_lock);
682		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
683		    adhost->adh_remoteaddr);
684		return;
685	}
686
687	/*
688	 * Upgrade the lock. It doesn't have to be atomic as no other thread
689	 * can change connection status from disconnected to connected.
690	 */
691	rw_unlock(&adist_remote_lock);
692	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
693	    adhost->adh_remoteaddr);
694	if (sender_connect() == 0) {
695		pjdlog_info("Successfully reconnected to %s.",
696		    adhost->adh_remoteaddr);
697	} else {
698		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
699		    adhost->adh_remoteaddr);
700	}
701}
702
703/*
704 * Thread guards remote connections and reconnects when needed, handles
705 * signals, etc.
706 */
707static void *
708guard_thread(void *arg __unused)
709{
710	struct timespec timeout;
711	time_t lastcheck, now;
712	sigset_t mask;
713	int signo;
714
715	lastcheck = time(NULL);
716
717	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
718	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
719	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
720
721	timeout.tv_sec = ADIST_KEEPALIVE;
722	timeout.tv_nsec = 0;
723	signo = -1;
724
725	for (;;) {
726		switch (signo) {
727		case SIGINT:
728		case SIGTERM:
729			sigexit_received = true;
730			pjdlog_exitx(EX_OK,
731			    "Termination signal received, exiting.");
732			break;
733		default:
734			break;
735		}
736
737		pjdlog_debug(3, "remote_guard: Checking connections.");
738		now = time(NULL);
739		if (lastcheck + ADIST_KEEPALIVE <= now) {
740			guard_check_connection();
741			lastcheck = now;
742		}
743		signo = sigtimedwait(&mask, NULL, &timeout);
744	}
745	/* NOTREACHED */
746	return (NULL);
747}
748
749void
750adist_sender(struct adist_config *config, struct adist_host *adh)
751{
752	pthread_t td;
753	pid_t pid;
754	int error, mode, debuglevel;
755
756	/*
757	 * Create communication channel for sending connection requests from
758	 * child to parent.
759	 */
760	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
761		pjdlog_errno(LOG_ERR,
762		    "Unable to create connection sockets between child and parent");
763		return;
764	}
765
766	pid = fork();
767	if (pid == -1) {
768		pjdlog_errno(LOG_ERR, "Unable to fork");
769		proto_close(adh->adh_conn);
770		adh->adh_conn = NULL;
771		return;
772	}
773
774	if (pid > 0) {
775		/* This is parent. */
776		adh->adh_worker_pid = pid;
777		/* Declare that we are receiver. */
778		proto_recv(adh->adh_conn, NULL, 0);
779		return;
780	}
781
782	adcfg = config;
783	adhost = adh;
784
785	mode = pjdlog_mode_get();
786	debuglevel = pjdlog_debug_get();
787
788	/* Declare that we are sender. */
789	proto_send(adhost->adh_conn, NULL, 0);
790
791	descriptors_cleanup(adhost);
792
793#ifdef TODO
794	descriptors_assert(adhost, mode);
795#endif
796
797	pjdlog_init(mode);
798	pjdlog_debug_set(debuglevel);
799	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
800	    role2str(adhost->adh_role));
801#ifdef HAVE_SETPROCTITLE
802	setproctitle("[%s] (%s) ", adhost->adh_name,
803	    role2str(adhost->adh_role));
804#endif
805
806	/*
807	 * The sender process should be able to remove entries from its
808	 * trail directory, but it should not be able to write to the
809	 * trail files, only read from them.
810	 */
811	adist_trail = trail_new(adhost->adh_directory, false);
812	if (adist_trail == NULL)
813		exit(EX_OSFILE);
814
815	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
816	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
817		exit(EX_CONFIG);
818	}
819	pjdlog_info("Privileges successfully dropped.");
820
821	/*
822	 * We can ignore wait_for_dir_init() failures. It will fall back to
823	 * using sleep(3).
824	 */
825	(void)wait_for_dir_init(trail_dirfd(adist_trail));
826
827	init_environment();
828	if (sender_connect() == 0) {
829		pjdlog_info("Successfully connected to %s.",
830		    adhost->adh_remoteaddr);
831	}
832	adhost->adh_reset = true;
833
834	/*
835	 * Create the guard thread first, so we can handle signals from the
836	 * very begining.
837	 */
838	error = pthread_create(&td, NULL, guard_thread, NULL);
839	PJDLOG_ASSERT(error == 0);
840	error = pthread_create(&td, NULL, send_thread, NULL);
841	PJDLOG_ASSERT(error == 0);
842	error = pthread_create(&td, NULL, recv_thread, NULL);
843	PJDLOG_ASSERT(error == 0);
844	(void)read_thread(NULL);
845}
846