1243730Srwatson/*-
2243730Srwatson * Copyright (c) 2012 The FreeBSD Foundation
3243730Srwatson * All rights reserved.
4243730Srwatson *
5243730Srwatson * This software was developed by Pawel Jakub Dawidek under sponsorship from
6243730Srwatson * the FreeBSD Foundation.
7243730Srwatson *
8243730Srwatson * Redistribution and use in source and binary forms, with or without
9243730Srwatson * modification, are permitted provided that the following conditions
10243730Srwatson * are met:
11243730Srwatson * 1. Redistributions of source code must retain the above copyright
12243730Srwatson *    notice, this list of conditions and the following disclaimer.
13243730Srwatson * 2. Redistributions in binary form must reproduce the above copyright
14243730Srwatson *    notice, this list of conditions and the following disclaimer in the
15243730Srwatson *    documentation and/or other materials provided with the distribution.
16243730Srwatson *
17243730Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18243730Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19243730Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20243730Srwatson * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21243730Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22243730Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23243730Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24243730Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25243730Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26243730Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27243730Srwatson * SUCH DAMAGE.
28243730Srwatson */
29243730Srwatson
30243734Srwatson#include <config/config.h>
31243730Srwatson
32243730Srwatson#include <sys/param.h>
33243730Srwatson#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
34243730Srwatson#include <sys/endian.h>
35243730Srwatson#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
36243730Srwatson#ifdef HAVE_MACHINE_ENDIAN_H
37243730Srwatson#include <machine/endian.h>
38243730Srwatson#else /* !HAVE_MACHINE_ENDIAN_H */
39243730Srwatson#ifdef HAVE_ENDIAN_H
40243730Srwatson#include <endian.h>
41243730Srwatson#else /* !HAVE_ENDIAN_H */
42243730Srwatson#error "No supported endian.h"
43243730Srwatson#endif /* !HAVE_ENDIAN_H */
44243730Srwatson#endif /* !HAVE_MACHINE_ENDIAN_H */
45243730Srwatson#include <compat/endian.h>
46243730Srwatson#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
47243730Srwatson#include <sys/queue.h>
48243730Srwatson#include <sys/stat.h>
49243730Srwatson#include <sys/wait.h>
50243730Srwatson
51243730Srwatson#include <stdio.h>
52243730Srwatson#include <stdlib.h>
53243730Srwatson#include <unistd.h>
54243730Srwatson
55243730Srwatson#include <ctype.h>
56243730Srwatson#include <dirent.h>
57243730Srwatson#include <err.h>
58243730Srwatson#include <errno.h>
59243730Srwatson#include <fcntl.h>
60243730Srwatson#ifdef HAVE_LIBUTIL_H
61243730Srwatson#include <libutil.h>
62243730Srwatson#endif
63243730Srwatson#include <signal.h>
64243730Srwatson#include <string.h>
65243730Srwatson#include <strings.h>
66243730Srwatson
67243730Srwatson#include <openssl/hmac.h>
68243730Srwatson
69243730Srwatson#ifndef HAVE_SIGTIMEDWAIT
70243730Srwatson#include "sigtimedwait.h"
71243730Srwatson#endif
72243730Srwatson
73243730Srwatson#include "auditdistd.h"
74243734Srwatson#include "pjdlog.h"
75243730Srwatson#include "proto.h"
76243730Srwatson#include "sandbox.h"
77243730Srwatson#include "subr.h"
78243730Srwatson#include "synch.h"
79243730Srwatson#include "trail.h"
80243730Srwatson
81243730Srwatsonstatic struct adist_config *adcfg;
82243730Srwatsonstatic struct adist_host *adhost;
83243730Srwatson
84243730Srwatsonstatic pthread_rwlock_t adist_remote_lock;
85243730Srwatsonstatic pthread_mutex_t adist_remote_mtx;
86243730Srwatsonstatic pthread_cond_t adist_remote_cond;
87243730Srwatsonstatic struct trail *adist_trail;
88243730Srwatson
89243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_free_list;
90243730Srwatsonstatic pthread_mutex_t adist_free_list_lock;
91243730Srwatsonstatic pthread_cond_t adist_free_list_cond;
92243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_send_list;
93243730Srwatsonstatic pthread_mutex_t adist_send_list_lock;
94243730Srwatsonstatic pthread_cond_t adist_send_list_cond;
95243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_recv_list;
96243730Srwatsonstatic pthread_mutex_t adist_recv_list_lock;
97243730Srwatsonstatic pthread_cond_t adist_recv_list_cond;
98243730Srwatson
99243730Srwatsonstatic void
100243730Srwatsoninit_environment(void)
101243730Srwatson{
102243730Srwatson	struct adreq *adreq;
103243730Srwatson	unsigned int ii;
104243730Srwatson
105243730Srwatson	rw_init(&adist_remote_lock);
106243730Srwatson	mtx_init(&adist_remote_mtx);
107243730Srwatson	cv_init(&adist_remote_cond);
108243730Srwatson	TAILQ_INIT(&adist_free_list);
109243730Srwatson	mtx_init(&adist_free_list_lock);
110243730Srwatson	cv_init(&adist_free_list_cond);
111243730Srwatson	TAILQ_INIT(&adist_send_list);
112243730Srwatson	mtx_init(&adist_send_list_lock);
113243730Srwatson	cv_init(&adist_send_list_cond);
114243730Srwatson	TAILQ_INIT(&adist_recv_list);
115243730Srwatson	mtx_init(&adist_recv_list_lock);
116243730Srwatson	cv_init(&adist_recv_list_cond);
117243730Srwatson
118243730Srwatson	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
119243730Srwatson		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
120243730Srwatson		if (adreq == NULL) {
121243730Srwatson			pjdlog_exitx(EX_TEMPFAIL,
122243730Srwatson			    "Unable to allocate %zu bytes of memory for adreq object.",
123243730Srwatson			    sizeof(*adreq) + ADIST_BUF_SIZE);
124243730Srwatson		}
125243730Srwatson		adreq->adr_byteorder = ADIST_BYTEORDER;
126243730Srwatson		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
127243730Srwatson		adreq->adr_seq = 0;
128243730Srwatson		adreq->adr_datasize = 0;
129243730Srwatson		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
130243730Srwatson	}
131243730Srwatson}
132243730Srwatson
133243730Srwatsonstatic int
134243730Srwatsonsender_connect(void)
135243730Srwatson{
136243730Srwatson	unsigned char rnd[32], hash[32], resp[32];
137243730Srwatson	struct proto_conn *conn;
138243730Srwatson	char welcome[8];
139243730Srwatson	int16_t val;
140243730Srwatson
141243730Srwatson	val = 1;
142243730Srwatson	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
143243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
144243730Srwatson		    "Unable to send connection request to parent");
145243730Srwatson	}
146243730Srwatson	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
147243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
148243730Srwatson		    "Unable to receive reply to connection request from parent");
149243730Srwatson	}
150243730Srwatson	if (val != 0) {
151243730Srwatson		errno = val;
152243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
153243730Srwatson		    adhost->adh_remoteaddr);
154243730Srwatson		return (-1);
155243730Srwatson	}
156243730Srwatson	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
157243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
158243730Srwatson		    "Unable to receive connection from parent");
159243730Srwatson	}
160243730Srwatson	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
161243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
162243730Srwatson		    adhost->adh_remoteaddr);
163243730Srwatson		proto_close(conn);
164243730Srwatson		return (-1);
165243730Srwatson	}
166243730Srwatson	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
167243730Srwatson	/* Error in setting timeout is not critical, but why should it fail? */
168243730Srwatson	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
169243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
170243730Srwatson	else
171243730Srwatson		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
172243730Srwatson
173243730Srwatson	/* Exchange welcome message, which includes version number. */
174243730Srwatson	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
175243730Srwatson	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
176243730Srwatson		pjdlog_errno(LOG_WARNING,
177243730Srwatson		    "Unable to send welcome message to %s",
178243730Srwatson		    adhost->adh_remoteaddr);
179243730Srwatson		proto_close(conn);
180243730Srwatson		return (-1);
181243730Srwatson	}
182243730Srwatson	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
183243730Srwatson	bzero(welcome, sizeof(welcome));
184243730Srwatson	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
185243730Srwatson		pjdlog_errno(LOG_WARNING,
186243730Srwatson		    "Unable to receive welcome message from %s",
187243730Srwatson		    adhost->adh_remoteaddr);
188243730Srwatson		proto_close(conn);
189243730Srwatson		return (-1);
190243730Srwatson	}
191243730Srwatson	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
192243730Srwatson	    !isdigit(welcome[6]) || welcome[7] != '\0') {
193243730Srwatson		pjdlog_warning("Invalid welcome message from %s.",
194243730Srwatson		    adhost->adh_remoteaddr);
195243730Srwatson		proto_close(conn);
196243730Srwatson		return (-1);
197243730Srwatson	}
198243730Srwatson	pjdlog_debug(1, "Welcome message received (%s).", welcome);
199243730Srwatson	/*
200243730Srwatson	 * Receiver can only reply with version number lower or equal to
201243730Srwatson	 * the one we sent.
202243730Srwatson	 */
203243730Srwatson	adhost->adh_version = atoi(welcome + 5);
204243730Srwatson	if (adhost->adh_version > ADIST_VERSION) {
205243730Srwatson		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
206243730Srwatson		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
207243730Srwatson		proto_close(conn);
208243730Srwatson		return (-1);
209243730Srwatson	}
210243730Srwatson
211243730Srwatson	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
212243730Srwatson	    adhost->adh_remoteaddr);
213243730Srwatson
214243730Srwatson	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
215243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
216243730Srwatson		    adhost->adh_remoteaddr);
217243730Srwatson		proto_close(conn);
218243730Srwatson		return (-1);
219243730Srwatson	}
220243730Srwatson	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
221243730Srwatson
222243730Srwatson	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
223243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
224243730Srwatson		    adhost->adh_remoteaddr);
225243730Srwatson		proto_close(conn);
226243730Srwatson		return (-1);
227243730Srwatson	}
228243730Srwatson	pjdlog_debug(1, "Challenge received.");
229243730Srwatson
230243730Srwatson	if (HMAC(EVP_sha256(), adhost->adh_password,
231243730Srwatson	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
232243730Srwatson	    NULL) == NULL) {
233243730Srwatson		pjdlog_warning("Unable to generate response.");
234243730Srwatson		proto_close(conn);
235243730Srwatson		return (-1);
236243730Srwatson	}
237243730Srwatson	pjdlog_debug(1, "Response generated.");
238243730Srwatson
239243730Srwatson	if (proto_send(conn, hash, sizeof(hash)) == -1) {
240243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
241243730Srwatson		    adhost->adh_remoteaddr);
242243730Srwatson		proto_close(conn);
243243730Srwatson		return (-1);
244243730Srwatson	}
245243730Srwatson	pjdlog_debug(1, "Response sent.");
246243730Srwatson
247243730Srwatson	if (adist_random(rnd, sizeof(rnd)) == -1) {
248243730Srwatson		pjdlog_warning("Unable to generate challenge.");
249243730Srwatson		proto_close(conn);
250243730Srwatson		return (-1);
251243730Srwatson	}
252243730Srwatson	pjdlog_debug(1, "Challenge generated.");
253243730Srwatson
254243730Srwatson	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
255243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
256243730Srwatson		    adhost->adh_remoteaddr);
257243730Srwatson		proto_close(conn);
258243730Srwatson		return (-1);
259243730Srwatson	}
260243730Srwatson	pjdlog_debug(1, "Challenge sent.");
261243730Srwatson
262243730Srwatson	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
263243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
264243730Srwatson		    adhost->adh_remoteaddr);
265243730Srwatson		proto_close(conn);
266243730Srwatson		return (-1);
267243730Srwatson	}
268243730Srwatson	pjdlog_debug(1, "Response received.");
269243730Srwatson
270243730Srwatson	if (HMAC(EVP_sha256(), adhost->adh_password,
271243730Srwatson	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
272243730Srwatson	    NULL) == NULL) {
273243730Srwatson		pjdlog_warning("Unable to generate hash.");
274243730Srwatson		proto_close(conn);
275243730Srwatson		return (-1);
276243730Srwatson	}
277243730Srwatson	pjdlog_debug(1, "Hash generated.");
278243730Srwatson
279243730Srwatson	if (memcmp(resp, hash, sizeof(hash)) != 0) {
280243730Srwatson		pjdlog_warning("Invalid response from %s (wrong password?).",
281243730Srwatson		    adhost->adh_remoteaddr);
282243730Srwatson		proto_close(conn);
283243730Srwatson		return (-1);
284243730Srwatson	}
285243730Srwatson	pjdlog_info("Receiver authenticated.");
286243730Srwatson
287243730Srwatson	if (proto_recv(conn, &adhost->adh_trail_offset,
288243730Srwatson	    sizeof(adhost->adh_trail_offset)) == -1) {
289243730Srwatson		pjdlog_errno(LOG_WARNING,
290243730Srwatson		    "Unable to receive size of the most recent trail file from %s",
291243730Srwatson		    adhost->adh_remoteaddr);
292243730Srwatson		proto_close(conn);
293243730Srwatson		return (-1);
294243730Srwatson	}
295243730Srwatson	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
296243730Srwatson	if (proto_recv(conn, &adhost->adh_trail_name,
297243730Srwatson	    sizeof(adhost->adh_trail_name)) == -1) {
298243730Srwatson		pjdlog_errno(LOG_WARNING,
299243730Srwatson		    "Unable to receive name of the most recent trail file from %s",
300243730Srwatson		    adhost->adh_remoteaddr);
301243730Srwatson		proto_close(conn);
302243730Srwatson		return (-1);
303243730Srwatson	}
304243730Srwatson	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
305243730Srwatson	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
306243730Srwatson
307243730Srwatson	rw_wlock(&adist_remote_lock);
308243730Srwatson	mtx_lock(&adist_remote_mtx);
309243730Srwatson	PJDLOG_ASSERT(adhost->adh_remote == NULL);
310243730Srwatson	PJDLOG_ASSERT(conn != NULL);
311243730Srwatson	adhost->adh_remote = conn;
312243730Srwatson	mtx_unlock(&adist_remote_mtx);
313243730Srwatson	rw_unlock(&adist_remote_lock);
314243730Srwatson	cv_signal(&adist_remote_cond);
315243730Srwatson
316243730Srwatson	return (0);
317243730Srwatson}
318243730Srwatson
319243730Srwatsonstatic void
320243730Srwatsonsender_disconnect(void)
321243730Srwatson{
322243730Srwatson
323243730Srwatson	rw_wlock(&adist_remote_lock);
324243730Srwatson	/*
325243730Srwatson	 * Check for a race between dropping rlock and acquiring wlock -
326243730Srwatson	 * another thread can close connection in-between.
327243730Srwatson	 */
328243730Srwatson	if (adhost->adh_remote == NULL) {
329243730Srwatson		rw_unlock(&adist_remote_lock);
330243730Srwatson		return;
331243730Srwatson	}
332243730Srwatson	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
333243730Srwatson	proto_close(adhost->adh_remote);
334243730Srwatson	mtx_lock(&adist_remote_mtx);
335243730Srwatson	adhost->adh_remote = NULL;
336243730Srwatson	adhost->adh_reset = true;
337243730Srwatson	adhost->adh_trail_name[0] = '\0';
338243730Srwatson	adhost->adh_trail_offset = 0;
339243730Srwatson	mtx_unlock(&adist_remote_mtx);
340243730Srwatson	rw_unlock(&adist_remote_lock);
341243730Srwatson
342243730Srwatson	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
343243730Srwatson
344243730Srwatson	/* Move all in-flight requests back onto free list. */
345243730Srwatson	mtx_lock(&adist_free_list_lock);
346243730Srwatson	mtx_lock(&adist_send_list_lock);
347243730Srwatson	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
348243730Srwatson	mtx_unlock(&adist_send_list_lock);
349243730Srwatson	mtx_lock(&adist_recv_list_lock);
350243730Srwatson	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
351243730Srwatson	mtx_unlock(&adist_recv_list_lock);
352243730Srwatson	mtx_unlock(&adist_free_list_lock);
353243730Srwatson}
354243730Srwatson
355243730Srwatsonstatic void
356243730Srwatsonadreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
357243730Srwatson    size_t size)
358243730Srwatson{
359243730Srwatson	static uint64_t seq = 1;
360243730Srwatson
361243730Srwatson	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
362243730Srwatson
363243730Srwatson	switch (cmd) {
364243730Srwatson	case ADIST_CMD_OPEN:
365243730Srwatson	case ADIST_CMD_CLOSE:
366243730Srwatson		PJDLOG_ASSERT(data != NULL && size == 0);
367243730Srwatson		size = strlen(data) + 1;
368243730Srwatson		break;
369243730Srwatson	case ADIST_CMD_APPEND:
370243730Srwatson		PJDLOG_ASSERT(data != NULL && size > 0);
371243730Srwatson		break;
372243730Srwatson	case ADIST_CMD_KEEPALIVE:
373243730Srwatson	case ADIST_CMD_ERROR:
374243730Srwatson		PJDLOG_ASSERT(data == NULL && size == 0);
375243730Srwatson		break;
376243730Srwatson	default:
377243730Srwatson		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
378243730Srwatson	}
379243730Srwatson
380243730Srwatson	adreq->adr_cmd = cmd;
381243730Srwatson	adreq->adr_seq = seq++;
382243730Srwatson	adreq->adr_datasize = size;
383243730Srwatson	/* Don't copy if data is already in out buffer. */
384243730Srwatson	if (data != NULL && data != adreq->adr_data)
385243730Srwatson		bcopy(data, adreq->adr_data, size);
386243730Srwatson}
387243730Srwatson
388243730Srwatsonstatic bool
389243730Srwatsonread_thread_wait(void)
390243730Srwatson{
391243730Srwatson	bool newfile = false;
392243730Srwatson
393243730Srwatson	mtx_lock(&adist_remote_mtx);
394243730Srwatson	if (adhost->adh_reset) {
395247442Spjdreset:
396243730Srwatson		adhost->adh_reset = false;
397243730Srwatson		if (trail_filefd(adist_trail) != -1)
398243730Srwatson			trail_close(adist_trail);
399243730Srwatson		trail_reset(adist_trail);
400243730Srwatson		while (adhost->adh_remote == NULL)
401243730Srwatson			cv_wait(&adist_remote_cond, &adist_remote_mtx);
402243730Srwatson		trail_start(adist_trail, adhost->adh_trail_name,
403243730Srwatson		    adhost->adh_trail_offset);
404243730Srwatson		newfile = true;
405243730Srwatson	}
406243730Srwatson	mtx_unlock(&adist_remote_mtx);
407243730Srwatson	while (trail_filefd(adist_trail) == -1) {
408243730Srwatson		newfile = true;
409243730Srwatson		wait_for_dir();
410247442Spjd		/*
411247442Spjd		 * We may have been disconnected and reconnected in the
412247442Spjd		 * meantime, check if reset is set.
413247442Spjd		 */
414247442Spjd		mtx_lock(&adist_remote_mtx);
415247442Spjd		if (adhost->adh_reset)
416247442Spjd			goto reset;
417247442Spjd		mtx_unlock(&adist_remote_mtx);
418243730Srwatson		if (trail_filefd(adist_trail) == -1)
419243730Srwatson			trail_next(adist_trail);
420243730Srwatson	}
421243730Srwatson	if (newfile) {
422243730Srwatson		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
423243730Srwatson		    adhost->adh_directory,
424243730Srwatson		    trail_filename(adist_trail));
425243730Srwatson		(void)wait_for_file_init(trail_filefd(adist_trail));
426243730Srwatson	}
427243730Srwatson	return (newfile);
428243730Srwatson}
429243730Srwatson
430243730Srwatsonstatic void *
431243730Srwatsonread_thread(void *arg __unused)
432243730Srwatson{
433243730Srwatson	struct adreq *adreq;
434243730Srwatson	ssize_t done;
435243730Srwatson	bool newfile;
436243730Srwatson
437243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
438243730Srwatson
439243730Srwatson	for (;;) {
440243730Srwatson		newfile = read_thread_wait();
441243730Srwatson		QUEUE_TAKE(adreq, &adist_free_list, 0);
442243730Srwatson		if (newfile) {
443243730Srwatson			adreq_fill(adreq, ADIST_CMD_OPEN,
444243730Srwatson			    trail_filename(adist_trail), 0);
445243730Srwatson			newfile = false;
446243730Srwatson			goto move;
447243730Srwatson		}
448243730Srwatson
449243730Srwatson		done = read(trail_filefd(adist_trail), adreq->adr_data,
450243730Srwatson		    ADIST_BUF_SIZE);
451243730Srwatson		if (done == -1) {
452243730Srwatson			off_t offset;
453243730Srwatson			int error;
454243730Srwatson
455243730Srwatson			error = errno;
456243730Srwatson			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
457243730Srwatson			errno = error;
458243730Srwatson			pjdlog_errno(LOG_ERR,
459243730Srwatson			    "Error while reading \"%s/%s\" at offset %jd",
460243730Srwatson			    adhost->adh_directory, trail_filename(adist_trail),
461243730Srwatson			    offset);
462243730Srwatson			trail_close(adist_trail);
463243730Srwatson			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
464243730Srwatson			goto move;
465243730Srwatson		} else if (done == 0) {
466243730Srwatson			/* End of file. */
467243730Srwatson			pjdlog_debug(3, "End of \"%s/%s\".",
468243730Srwatson			    adhost->adh_directory, trail_filename(adist_trail));
469243730Srwatson			if (!trail_switch(adist_trail)) {
470243730Srwatson				/* More audit records can arrive. */
471243730Srwatson				mtx_lock(&adist_free_list_lock);
472243730Srwatson				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
473243730Srwatson				    adr_next);
474243730Srwatson				mtx_unlock(&adist_free_list_lock);
475243730Srwatson				wait_for_file();
476243730Srwatson				continue;
477243730Srwatson			}
478243730Srwatson			adreq_fill(adreq, ADIST_CMD_CLOSE,
479243730Srwatson			    trail_filename(adist_trail), 0);
480243730Srwatson			trail_close(adist_trail);
481243730Srwatson			goto move;
482243730Srwatson		}
483243730Srwatson
484243730Srwatson		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
485243730Srwatsonmove:
486243730Srwatson		pjdlog_debug(3,
487243730Srwatson		    "read thread: Moving request %p to the send queue (%hhu).",
488243730Srwatson		    adreq, adreq->adr_cmd);
489243730Srwatson		QUEUE_INSERT(adreq, &adist_send_list);
490243730Srwatson	}
491243730Srwatson	/* NOTREACHED */
492243730Srwatson	return (NULL);
493243730Srwatson}
494243730Srwatson
495243730Srwatsonstatic void
496243730Srwatsonkeepalive_send(void)
497243730Srwatson{
498243730Srwatson	struct adreq *adreq;
499243730Srwatson
500243730Srwatson	rw_rlock(&adist_remote_lock);
501243730Srwatson	if (adhost->adh_remote == NULL) {
502243730Srwatson		rw_unlock(&adist_remote_lock);
503243730Srwatson		return;
504243730Srwatson	}
505243730Srwatson	rw_unlock(&adist_remote_lock);
506243730Srwatson
507243730Srwatson	mtx_lock(&adist_free_list_lock);
508243730Srwatson	adreq = TAILQ_FIRST(&adist_free_list);
509243730Srwatson	if (adreq != NULL)
510243730Srwatson		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
511243730Srwatson	mtx_unlock(&adist_free_list_lock);
512243730Srwatson	if (adreq == NULL)
513243730Srwatson		return;
514243730Srwatson
515243730Srwatson	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
516243730Srwatson
517243730Srwatson	QUEUE_INSERT(adreq, &adist_send_list);
518243730Srwatson
519243730Srwatson	pjdlog_debug(3, "keepalive_send: Request sent.");
520243730Srwatson}
521243730Srwatson
522243730Srwatson/*
523243730Srwatson * Thread sends request to secondary node.
524243730Srwatson */
525243730Srwatsonstatic void *
526243730Srwatsonsend_thread(void *arg __unused)
527243730Srwatson{
528243730Srwatson	time_t lastcheck, now;
529243730Srwatson	struct adreq *adreq;
530243730Srwatson
531243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
532243730Srwatson
533243730Srwatson	lastcheck = time(NULL);
534243730Srwatson
535243730Srwatson	for (;;) {
536243730Srwatson		pjdlog_debug(3, "send thread: Taking request.");
537243730Srwatson		for (;;) {
538243730Srwatson			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
539243730Srwatson			if (adreq != NULL)
540243730Srwatson				break;
541243730Srwatson			now = time(NULL);
542243730Srwatson			if (lastcheck + ADIST_KEEPALIVE <= now) {
543243730Srwatson				keepalive_send();
544243730Srwatson				lastcheck = now;
545243730Srwatson			}
546243730Srwatson		}
547243730Srwatson		PJDLOG_ASSERT(adreq != NULL);
548243730Srwatson		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
549243730Srwatson		    adreq->adr_cmd);
550243730Srwatson		/*
551243730Srwatson		 * Protect connection from disappearing.
552243730Srwatson		 */
553243730Srwatson		rw_rlock(&adist_remote_lock);
554243730Srwatson		/*
555243730Srwatson		 * Move the request to the recv queue first to avoid race
556243730Srwatson		 * where the recv thread receives the reply before we move
557243730Srwatson		 * the request to the recv queue.
558243730Srwatson		 */
559243730Srwatson		QUEUE_INSERT(adreq, &adist_recv_list);
560243730Srwatson		if (adhost->adh_remote == NULL ||
561243730Srwatson		    proto_send(adhost->adh_remote, &adreq->adr_packet,
562243730Srwatson		    ADPKT_SIZE(adreq)) == -1) {
563243730Srwatson			rw_unlock(&adist_remote_lock);
564243730Srwatson			pjdlog_debug(1,
565243730Srwatson			    "send thread: (%p) Unable to send request.", adreq);
566243730Srwatson			if (adhost->adh_remote != NULL)
567243730Srwatson				sender_disconnect();
568243730Srwatson			continue;
569243730Srwatson		} else {
570243730Srwatson			pjdlog_debug(3, "Request %p sent successfully.", adreq);
571243730Srwatson			adreq_log(LOG_DEBUG, 2, -1, adreq,
572243730Srwatson			    "send: (%p) Request sent: ", adreq);
573243730Srwatson			rw_unlock(&adist_remote_lock);
574243730Srwatson		}
575243730Srwatson	}
576243730Srwatson	/* NOTREACHED */
577243730Srwatson	return (NULL);
578243730Srwatson}
579243730Srwatson
580243730Srwatsonstatic void
581243730Srwatsonadrep_decode_header(struct adrep *adrep)
582243730Srwatson{
583243730Srwatson
584243730Srwatson	/* Byte-swap only is the receiver is using different byte order. */
585243730Srwatson	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
586243730Srwatson		adrep->adrp_byteorder = ADIST_BYTEORDER;
587243730Srwatson		adrep->adrp_seq = bswap64(adrep->adrp_seq);
588243730Srwatson		adrep->adrp_error = bswap16(adrep->adrp_error);
589243730Srwatson	}
590243730Srwatson}
591243730Srwatson
592243730Srwatson/*
593243730Srwatson * Thread receives answer from secondary node and passes it to ggate_send
594243730Srwatson * thread.
595243730Srwatson */
596243730Srwatsonstatic void *
597243730Srwatsonrecv_thread(void *arg __unused)
598243730Srwatson{
599243730Srwatson	struct adrep adrep;
600243730Srwatson	struct adreq *adreq;
601243730Srwatson
602243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
603243730Srwatson
604243730Srwatson	for (;;) {
605243730Srwatson		/* Wait until there is anything to receive. */
606243730Srwatson		QUEUE_WAIT(&adist_recv_list);
607243730Srwatson		pjdlog_debug(3, "recv thread: Got something.");
608243730Srwatson		rw_rlock(&adist_remote_lock);
609243730Srwatson		if (adhost->adh_remote == NULL) {
610243730Srwatson			/*
611243730Srwatson			 * Connection is dead.
612243730Srwatson			 * XXX: We shouldn't be here.
613243730Srwatson			 */
614243730Srwatson			rw_unlock(&adist_remote_lock);
615243730Srwatson			continue;
616243730Srwatson		}
617243730Srwatson		if (proto_recv(adhost->adh_remote, &adrep,
618243730Srwatson		    sizeof(adrep)) == -1) {
619243730Srwatson			rw_unlock(&adist_remote_lock);
620243730Srwatson			pjdlog_errno(LOG_ERR, "Unable to receive reply");
621243730Srwatson			sender_disconnect();
622243730Srwatson			continue;
623243730Srwatson		}
624243730Srwatson		rw_unlock(&adist_remote_lock);
625243730Srwatson		adrep_decode_header(&adrep);
626243730Srwatson		/*
627243730Srwatson		 * Find the request that was just confirmed.
628243730Srwatson		 */
629243730Srwatson		mtx_lock(&adist_recv_list_lock);
630243730Srwatson		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
631243730Srwatson			if (adreq->adr_seq == adrep.adrp_seq) {
632243730Srwatson				TAILQ_REMOVE(&adist_recv_list, adreq,
633243730Srwatson				    adr_next);
634243730Srwatson				break;
635243730Srwatson			}
636243730Srwatson		}
637243730Srwatson		if (adreq == NULL) {
638243730Srwatson			/*
639243730Srwatson			 * If we disconnected in the meantime, just continue.
640243730Srwatson			 * On disconnect sender_disconnect() clears the queue,
641243730Srwatson			 * we can use that.
642243730Srwatson			 */
643243730Srwatson			if (TAILQ_EMPTY(&adist_recv_list)) {
644243730Srwatson				rw_unlock(&adist_remote_lock);
645243730Srwatson				continue;
646243730Srwatson			}
647243730Srwatson			mtx_unlock(&adist_recv_list_lock);
648243730Srwatson			pjdlog_error("Found no request matching received 'seq' field (%ju).",
649243730Srwatson			    (uintmax_t)adrep.adrp_seq);
650243730Srwatson			sender_disconnect();
651243730Srwatson			continue;
652243730Srwatson		}
653243730Srwatson		mtx_unlock(&adist_recv_list_lock);
654243730Srwatson		adreq_log(LOG_DEBUG, 2, -1, adreq,
655243730Srwatson		    "recv thread: (%p) Request confirmed: ", adreq);
656243730Srwatson		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
657243730Srwatson		    adreq->adr_cmd);
658243730Srwatson		if (adrep.adrp_error != 0) {
659243730Srwatson			pjdlog_error("Receiver returned error (%s), disconnecting.",
660243730Srwatson			    adist_errstr((int)adrep.adrp_error));
661243730Srwatson			sender_disconnect();
662243730Srwatson			continue;
663243730Srwatson		}
664243730Srwatson		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
665243730Srwatson			trail_unlink(adist_trail, adreq->adr_data);
666243730Srwatson		pjdlog_debug(3, "Request received successfully.");
667243730Srwatson		QUEUE_INSERT(adreq, &adist_free_list);
668243730Srwatson	}
669243730Srwatson	/* NOTREACHED */
670243730Srwatson	return (NULL);
671243730Srwatson}
672243730Srwatson
673243730Srwatsonstatic void
674243730Srwatsonguard_check_connection(void)
675243730Srwatson{
676243730Srwatson
677243730Srwatson	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
678243730Srwatson
679243730Srwatson	rw_rlock(&adist_remote_lock);
680243730Srwatson	if (adhost->adh_remote != NULL) {
681243730Srwatson		rw_unlock(&adist_remote_lock);
682243730Srwatson		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
683243730Srwatson		    adhost->adh_remoteaddr);
684243730Srwatson		return;
685243730Srwatson	}
686243730Srwatson
687243730Srwatson	/*
688243730Srwatson	 * Upgrade the lock. It doesn't have to be atomic as no other thread
689243730Srwatson	 * can change connection status from disconnected to connected.
690243730Srwatson	 */
691243730Srwatson	rw_unlock(&adist_remote_lock);
692243730Srwatson	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
693243730Srwatson	    adhost->adh_remoteaddr);
694243730Srwatson	if (sender_connect() == 0) {
695243730Srwatson		pjdlog_info("Successfully reconnected to %s.",
696243730Srwatson		    adhost->adh_remoteaddr);
697243730Srwatson	} else {
698243730Srwatson		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
699243730Srwatson		    adhost->adh_remoteaddr);
700243730Srwatson	}
701243730Srwatson}
702243730Srwatson
703243730Srwatson/*
704243730Srwatson * Thread guards remote connections and reconnects when needed, handles
705243730Srwatson * signals, etc.
706243730Srwatson */
707243730Srwatsonstatic void *
708243730Srwatsonguard_thread(void *arg __unused)
709243730Srwatson{
710243730Srwatson	struct timespec timeout;
711243730Srwatson	time_t lastcheck, now;
712243730Srwatson	sigset_t mask;
713243730Srwatson	int signo;
714243730Srwatson
715243730Srwatson	lastcheck = time(NULL);
716243730Srwatson
717243730Srwatson	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
718243730Srwatson	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
719243730Srwatson	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
720243730Srwatson
721243730Srwatson	timeout.tv_sec = ADIST_KEEPALIVE;
722243730Srwatson	timeout.tv_nsec = 0;
723243730Srwatson	signo = -1;
724243730Srwatson
725243730Srwatson	for (;;) {
726243730Srwatson		switch (signo) {
727243730Srwatson		case SIGINT:
728243730Srwatson		case SIGTERM:
729243730Srwatson			sigexit_received = true;
730243730Srwatson			pjdlog_exitx(EX_OK,
731243730Srwatson			    "Termination signal received, exiting.");
732243730Srwatson			break;
733243730Srwatson		default:
734243730Srwatson			break;
735243730Srwatson		}
736243730Srwatson
737243730Srwatson		pjdlog_debug(3, "remote_guard: Checking connections.");
738243730Srwatson		now = time(NULL);
739243730Srwatson		if (lastcheck + ADIST_KEEPALIVE <= now) {
740243730Srwatson			guard_check_connection();
741243730Srwatson			lastcheck = now;
742243730Srwatson		}
743243730Srwatson		signo = sigtimedwait(&mask, NULL, &timeout);
744243730Srwatson	}
745243730Srwatson	/* NOTREACHED */
746243730Srwatson	return (NULL);
747243730Srwatson}
748243730Srwatson
749243730Srwatsonvoid
750243730Srwatsonadist_sender(struct adist_config *config, struct adist_host *adh)
751243730Srwatson{
752243730Srwatson	pthread_t td;
753243730Srwatson	pid_t pid;
754243730Srwatson	int error, mode, debuglevel;
755243730Srwatson
756243730Srwatson	/*
757243730Srwatson	 * Create communication channel for sending connection requests from
758243730Srwatson	 * child to parent.
759243730Srwatson	 */
760243730Srwatson	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
761243730Srwatson		pjdlog_errno(LOG_ERR,
762243730Srwatson		    "Unable to create connection sockets between child and parent");
763243730Srwatson		return;
764243730Srwatson	}
765243730Srwatson
766243730Srwatson	pid = fork();
767243730Srwatson	if (pid == -1) {
768243730Srwatson		pjdlog_errno(LOG_ERR, "Unable to fork");
769243730Srwatson		proto_close(adh->adh_conn);
770243730Srwatson		adh->adh_conn = NULL;
771243730Srwatson		return;
772243730Srwatson	}
773243730Srwatson
774243730Srwatson	if (pid > 0) {
775243730Srwatson		/* This is parent. */
776243730Srwatson		adh->adh_worker_pid = pid;
777243730Srwatson		/* Declare that we are receiver. */
778243730Srwatson		proto_recv(adh->adh_conn, NULL, 0);
779243730Srwatson		return;
780243730Srwatson	}
781243730Srwatson
782243730Srwatson	adcfg = config;
783243730Srwatson	adhost = adh;
784243730Srwatson
785243730Srwatson	mode = pjdlog_mode_get();
786243730Srwatson	debuglevel = pjdlog_debug_get();
787243730Srwatson
788243730Srwatson	/* Declare that we are sender. */
789243730Srwatson	proto_send(adhost->adh_conn, NULL, 0);
790243730Srwatson
791243730Srwatson	descriptors_cleanup(adhost);
792243730Srwatson
793243730Srwatson#ifdef TODO
794243730Srwatson	descriptors_assert(adhost, mode);
795243730Srwatson#endif
796243730Srwatson
797243730Srwatson	pjdlog_init(mode);
798243730Srwatson	pjdlog_debug_set(debuglevel);
799243730Srwatson	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
800243730Srwatson	    role2str(adhost->adh_role));
801243730Srwatson#ifdef HAVE_SETPROCTITLE
802243730Srwatson	setproctitle("[%s] (%s) ", adhost->adh_name,
803243730Srwatson	    role2str(adhost->adh_role));
804243730Srwatson#endif
805243730Srwatson
806243730Srwatson	/*
807243730Srwatson	 * The sender process should be able to remove entries from its
808243730Srwatson	 * trail directory, but it should not be able to write to the
809243730Srwatson	 * trail files, only read from them.
810243730Srwatson	 */
811243730Srwatson	adist_trail = trail_new(adhost->adh_directory, false);
812243730Srwatson	if (adist_trail == NULL)
813243730Srwatson		exit(EX_OSFILE);
814243730Srwatson
815243730Srwatson	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
816243730Srwatson	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
817243730Srwatson		exit(EX_CONFIG);
818243730Srwatson	}
819243730Srwatson	pjdlog_info("Privileges successfully dropped.");
820243730Srwatson
821243730Srwatson	/*
822243730Srwatson	 * We can ignore wait_for_dir_init() failures. It will fall back to
823243730Srwatson	 * using sleep(3).
824243730Srwatson	 */
825243730Srwatson	(void)wait_for_dir_init(trail_dirfd(adist_trail));
826243730Srwatson
827243730Srwatson	init_environment();
828243730Srwatson	if (sender_connect() == 0) {
829243730Srwatson		pjdlog_info("Successfully connected to %s.",
830243730Srwatson		    adhost->adh_remoteaddr);
831243730Srwatson	}
832243730Srwatson	adhost->adh_reset = true;
833243730Srwatson
834243730Srwatson	/*
835243730Srwatson	 * Create the guard thread first, so we can handle signals from the
836243730Srwatson	 * very begining.
837243730Srwatson	 */
838243730Srwatson	error = pthread_create(&td, NULL, guard_thread, NULL);
839243730Srwatson	PJDLOG_ASSERT(error == 0);
840243730Srwatson	error = pthread_create(&td, NULL, send_thread, NULL);
841243730Srwatson	PJDLOG_ASSERT(error == 0);
842243730Srwatson	error = pthread_create(&td, NULL, recv_thread, NULL);
843243730Srwatson	PJDLOG_ASSERT(error == 0);
844243730Srwatson	(void)read_thread(NULL);
845243730Srwatson}
846