1243730Srwatson/*- 2243730Srwatson * Copyright (c) 2012 The FreeBSD Foundation 3243730Srwatson * All rights reserved. 4243730Srwatson * 5243730Srwatson * This software was developed by Pawel Jakub Dawidek under sponsorship from 6243730Srwatson * the FreeBSD Foundation. 7243730Srwatson * 8243730Srwatson * Redistribution and use in source and binary forms, with or without 9243730Srwatson * modification, are permitted provided that the following conditions 10243730Srwatson * are met: 11243730Srwatson * 1. Redistributions of source code must retain the above copyright 12243730Srwatson * notice, this list of conditions and the following disclaimer. 13243730Srwatson * 2. Redistributions in binary form must reproduce the above copyright 14243730Srwatson * notice, this list of conditions and the following disclaimer in the 15243730Srwatson * documentation and/or other materials provided with the distribution. 16243730Srwatson * 17243730Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18243730Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19243730Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20243730Srwatson * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21243730Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22243730Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23243730Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24243730Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25243730Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26243730Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27243730Srwatson * SUCH DAMAGE. 28243730Srwatson */ 29243730Srwatson 30243734Srwatson#include <config/config.h> 31243730Srwatson 32243730Srwatson#include <sys/param.h> 33243730Srwatson#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 34243730Srwatson#include <sys/endian.h> 35243730Srwatson#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 36243730Srwatson#ifdef HAVE_MACHINE_ENDIAN_H 37243730Srwatson#include <machine/endian.h> 38243730Srwatson#else /* !HAVE_MACHINE_ENDIAN_H */ 39243730Srwatson#ifdef HAVE_ENDIAN_H 40243730Srwatson#include <endian.h> 41243730Srwatson#else /* !HAVE_ENDIAN_H */ 42243730Srwatson#error "No supported endian.h" 43243730Srwatson#endif /* !HAVE_ENDIAN_H */ 44243730Srwatson#endif /* !HAVE_MACHINE_ENDIAN_H */ 45243730Srwatson#include <compat/endian.h> 46243730Srwatson#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 47243730Srwatson#include <sys/queue.h> 48243730Srwatson#include <sys/stat.h> 49243730Srwatson#include <sys/wait.h> 50243730Srwatson 51243730Srwatson#include <stdio.h> 52243730Srwatson#include <stdlib.h> 53243730Srwatson#include <unistd.h> 54243730Srwatson 55243730Srwatson#include <ctype.h> 56243730Srwatson#include <dirent.h> 57243730Srwatson#include <err.h> 58243730Srwatson#include <errno.h> 59243730Srwatson#include <fcntl.h> 60243730Srwatson#ifdef HAVE_LIBUTIL_H 61243730Srwatson#include <libutil.h> 62243730Srwatson#endif 63243730Srwatson#include <signal.h> 64243730Srwatson#include <string.h> 65243730Srwatson#include <strings.h> 66243730Srwatson 67243730Srwatson#include <openssl/hmac.h> 68243730Srwatson 69243730Srwatson#ifndef HAVE_SIGTIMEDWAIT 70243730Srwatson#include "sigtimedwait.h" 71243730Srwatson#endif 72243730Srwatson 73243730Srwatson#include "auditdistd.h" 74243734Srwatson#include "pjdlog.h" 75243730Srwatson#include "proto.h" 76243730Srwatson#include "sandbox.h" 77243730Srwatson#include "subr.h" 78243730Srwatson#include "synch.h" 79243730Srwatson#include "trail.h" 80243730Srwatson 81243730Srwatsonstatic struct adist_config *adcfg; 82243730Srwatsonstatic struct adist_host *adhost; 83243730Srwatson 84243730Srwatsonstatic pthread_rwlock_t adist_remote_lock; 85243730Srwatsonstatic pthread_mutex_t adist_remote_mtx; 86243730Srwatsonstatic pthread_cond_t adist_remote_cond; 87243730Srwatsonstatic struct trail *adist_trail; 88243730Srwatson 89243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_free_list; 90243730Srwatsonstatic pthread_mutex_t adist_free_list_lock; 91243730Srwatsonstatic pthread_cond_t adist_free_list_cond; 92243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_send_list; 93243730Srwatsonstatic pthread_mutex_t adist_send_list_lock; 94243730Srwatsonstatic pthread_cond_t adist_send_list_cond; 95243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_recv_list; 96243730Srwatsonstatic pthread_mutex_t adist_recv_list_lock; 97243730Srwatsonstatic pthread_cond_t adist_recv_list_cond; 98243730Srwatson 99243730Srwatsonstatic void 100243730Srwatsoninit_environment(void) 101243730Srwatson{ 102243730Srwatson struct adreq *adreq; 103243730Srwatson unsigned int ii; 104243730Srwatson 105243730Srwatson rw_init(&adist_remote_lock); 106243730Srwatson mtx_init(&adist_remote_mtx); 107243730Srwatson cv_init(&adist_remote_cond); 108243730Srwatson TAILQ_INIT(&adist_free_list); 109243730Srwatson mtx_init(&adist_free_list_lock); 110243730Srwatson cv_init(&adist_free_list_cond); 111243730Srwatson TAILQ_INIT(&adist_send_list); 112243730Srwatson mtx_init(&adist_send_list_lock); 113243730Srwatson cv_init(&adist_send_list_cond); 114243730Srwatson TAILQ_INIT(&adist_recv_list); 115243730Srwatson mtx_init(&adist_recv_list_lock); 116243730Srwatson cv_init(&adist_recv_list_cond); 117243730Srwatson 118243730Srwatson for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 119243730Srwatson adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 120243730Srwatson if (adreq == NULL) { 121243730Srwatson pjdlog_exitx(EX_TEMPFAIL, 122243730Srwatson "Unable to allocate %zu bytes of memory for adreq object.", 123243730Srwatson sizeof(*adreq) + ADIST_BUF_SIZE); 124243730Srwatson } 125243730Srwatson adreq->adr_byteorder = ADIST_BYTEORDER; 126243730Srwatson adreq->adr_cmd = ADIST_CMD_UNDEFINED; 127243730Srwatson adreq->adr_seq = 0; 128243730Srwatson adreq->adr_datasize = 0; 129243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 130243730Srwatson } 131243730Srwatson} 132243730Srwatson 133243730Srwatsonstatic int 134243730Srwatsonsender_connect(void) 135243730Srwatson{ 136243730Srwatson unsigned char rnd[32], hash[32], resp[32]; 137243730Srwatson struct proto_conn *conn; 138243730Srwatson char welcome[8]; 139243730Srwatson int16_t val; 140243730Srwatson 141243730Srwatson val = 1; 142243730Srwatson if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 143243730Srwatson pjdlog_exit(EX_TEMPFAIL, 144243730Srwatson "Unable to send connection request to parent"); 145243730Srwatson } 146243730Srwatson if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 147243730Srwatson pjdlog_exit(EX_TEMPFAIL, 148243730Srwatson "Unable to receive reply to connection request from parent"); 149243730Srwatson } 150243730Srwatson if (val != 0) { 151243730Srwatson errno = val; 152243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 153243730Srwatson adhost->adh_remoteaddr); 154243730Srwatson return (-1); 155243730Srwatson } 156243730Srwatson if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 157243730Srwatson pjdlog_exit(EX_TEMPFAIL, 158243730Srwatson "Unable to receive connection from parent"); 159243730Srwatson } 160243730Srwatson if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 161243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 162243730Srwatson adhost->adh_remoteaddr); 163243730Srwatson proto_close(conn); 164243730Srwatson return (-1); 165243730Srwatson } 166243730Srwatson pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 167243730Srwatson /* Error in setting timeout is not critical, but why should it fail? */ 168243730Srwatson if (proto_timeout(conn, adcfg->adc_timeout) < 0) 169243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 170243730Srwatson else 171243730Srwatson pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 172243730Srwatson 173243730Srwatson /* Exchange welcome message, which includes version number. */ 174243730Srwatson (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 175243730Srwatson if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 176243730Srwatson pjdlog_errno(LOG_WARNING, 177243730Srwatson "Unable to send welcome message to %s", 178243730Srwatson adhost->adh_remoteaddr); 179243730Srwatson proto_close(conn); 180243730Srwatson return (-1); 181243730Srwatson } 182243730Srwatson pjdlog_debug(1, "Welcome message sent (%s).", welcome); 183243730Srwatson bzero(welcome, sizeof(welcome)); 184243730Srwatson if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 185243730Srwatson pjdlog_errno(LOG_WARNING, 186243730Srwatson "Unable to receive welcome message from %s", 187243730Srwatson adhost->adh_remoteaddr); 188243730Srwatson proto_close(conn); 189243730Srwatson return (-1); 190243730Srwatson } 191243730Srwatson if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 192243730Srwatson !isdigit(welcome[6]) || welcome[7] != '\0') { 193243730Srwatson pjdlog_warning("Invalid welcome message from %s.", 194243730Srwatson adhost->adh_remoteaddr); 195243730Srwatson proto_close(conn); 196243730Srwatson return (-1); 197243730Srwatson } 198243730Srwatson pjdlog_debug(1, "Welcome message received (%s).", welcome); 199243730Srwatson /* 200243730Srwatson * Receiver can only reply with version number lower or equal to 201243730Srwatson * the one we sent. 202243730Srwatson */ 203243730Srwatson adhost->adh_version = atoi(welcome + 5); 204243730Srwatson if (adhost->adh_version > ADIST_VERSION) { 205243730Srwatson pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 206243730Srwatson adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 207243730Srwatson proto_close(conn); 208243730Srwatson return (-1); 209243730Srwatson } 210243730Srwatson 211243730Srwatson pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 212243730Srwatson adhost->adh_remoteaddr); 213243730Srwatson 214243730Srwatson if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 215243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 216243730Srwatson adhost->adh_remoteaddr); 217243730Srwatson proto_close(conn); 218243730Srwatson return (-1); 219243730Srwatson } 220243730Srwatson pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 221243730Srwatson 222243730Srwatson if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 223243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 224243730Srwatson adhost->adh_remoteaddr); 225243730Srwatson proto_close(conn); 226243730Srwatson return (-1); 227243730Srwatson } 228243730Srwatson pjdlog_debug(1, "Challenge received."); 229243730Srwatson 230243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 231243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 232243730Srwatson NULL) == NULL) { 233243730Srwatson pjdlog_warning("Unable to generate response."); 234243730Srwatson proto_close(conn); 235243730Srwatson return (-1); 236243730Srwatson } 237243730Srwatson pjdlog_debug(1, "Response generated."); 238243730Srwatson 239243730Srwatson if (proto_send(conn, hash, sizeof(hash)) == -1) { 240243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 241243730Srwatson adhost->adh_remoteaddr); 242243730Srwatson proto_close(conn); 243243730Srwatson return (-1); 244243730Srwatson } 245243730Srwatson pjdlog_debug(1, "Response sent."); 246243730Srwatson 247243730Srwatson if (adist_random(rnd, sizeof(rnd)) == -1) { 248243730Srwatson pjdlog_warning("Unable to generate challenge."); 249243730Srwatson proto_close(conn); 250243730Srwatson return (-1); 251243730Srwatson } 252243730Srwatson pjdlog_debug(1, "Challenge generated."); 253243730Srwatson 254243730Srwatson if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 255243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 256243730Srwatson adhost->adh_remoteaddr); 257243730Srwatson proto_close(conn); 258243730Srwatson return (-1); 259243730Srwatson } 260243730Srwatson pjdlog_debug(1, "Challenge sent."); 261243730Srwatson 262243730Srwatson if (proto_recv(conn, resp, sizeof(resp)) == -1) { 263243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 264243730Srwatson adhost->adh_remoteaddr); 265243730Srwatson proto_close(conn); 266243730Srwatson return (-1); 267243730Srwatson } 268243730Srwatson pjdlog_debug(1, "Response received."); 269243730Srwatson 270243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 271243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 272243730Srwatson NULL) == NULL) { 273243730Srwatson pjdlog_warning("Unable to generate hash."); 274243730Srwatson proto_close(conn); 275243730Srwatson return (-1); 276243730Srwatson } 277243730Srwatson pjdlog_debug(1, "Hash generated."); 278243730Srwatson 279243730Srwatson if (memcmp(resp, hash, sizeof(hash)) != 0) { 280243730Srwatson pjdlog_warning("Invalid response from %s (wrong password?).", 281243730Srwatson adhost->adh_remoteaddr); 282243730Srwatson proto_close(conn); 283243730Srwatson return (-1); 284243730Srwatson } 285243730Srwatson pjdlog_info("Receiver authenticated."); 286243730Srwatson 287243730Srwatson if (proto_recv(conn, &adhost->adh_trail_offset, 288243730Srwatson sizeof(adhost->adh_trail_offset)) == -1) { 289243730Srwatson pjdlog_errno(LOG_WARNING, 290243730Srwatson "Unable to receive size of the most recent trail file from %s", 291243730Srwatson adhost->adh_remoteaddr); 292243730Srwatson proto_close(conn); 293243730Srwatson return (-1); 294243730Srwatson } 295243730Srwatson adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 296243730Srwatson if (proto_recv(conn, &adhost->adh_trail_name, 297243730Srwatson sizeof(adhost->adh_trail_name)) == -1) { 298243730Srwatson pjdlog_errno(LOG_WARNING, 299243730Srwatson "Unable to receive name of the most recent trail file from %s", 300243730Srwatson adhost->adh_remoteaddr); 301243730Srwatson proto_close(conn); 302243730Srwatson return (-1); 303243730Srwatson } 304243730Srwatson pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 305243730Srwatson adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 306243730Srwatson 307243730Srwatson rw_wlock(&adist_remote_lock); 308243730Srwatson mtx_lock(&adist_remote_mtx); 309243730Srwatson PJDLOG_ASSERT(adhost->adh_remote == NULL); 310243730Srwatson PJDLOG_ASSERT(conn != NULL); 311243730Srwatson adhost->adh_remote = conn; 312243730Srwatson mtx_unlock(&adist_remote_mtx); 313243730Srwatson rw_unlock(&adist_remote_lock); 314243730Srwatson cv_signal(&adist_remote_cond); 315243730Srwatson 316243730Srwatson return (0); 317243730Srwatson} 318243730Srwatson 319243730Srwatsonstatic void 320243730Srwatsonsender_disconnect(void) 321243730Srwatson{ 322243730Srwatson 323243730Srwatson rw_wlock(&adist_remote_lock); 324243730Srwatson /* 325243730Srwatson * Check for a race between dropping rlock and acquiring wlock - 326243730Srwatson * another thread can close connection in-between. 327243730Srwatson */ 328243730Srwatson if (adhost->adh_remote == NULL) { 329243730Srwatson rw_unlock(&adist_remote_lock); 330243730Srwatson return; 331243730Srwatson } 332243730Srwatson pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 333243730Srwatson proto_close(adhost->adh_remote); 334243730Srwatson mtx_lock(&adist_remote_mtx); 335243730Srwatson adhost->adh_remote = NULL; 336243730Srwatson adhost->adh_reset = true; 337243730Srwatson adhost->adh_trail_name[0] = '\0'; 338243730Srwatson adhost->adh_trail_offset = 0; 339243730Srwatson mtx_unlock(&adist_remote_mtx); 340243730Srwatson rw_unlock(&adist_remote_lock); 341243730Srwatson 342243730Srwatson pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 343243730Srwatson 344243730Srwatson /* Move all in-flight requests back onto free list. */ 345243730Srwatson mtx_lock(&adist_free_list_lock); 346243730Srwatson mtx_lock(&adist_send_list_lock); 347243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 348243730Srwatson mtx_unlock(&adist_send_list_lock); 349243730Srwatson mtx_lock(&adist_recv_list_lock); 350243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 351243730Srwatson mtx_unlock(&adist_recv_list_lock); 352243730Srwatson mtx_unlock(&adist_free_list_lock); 353243730Srwatson} 354243730Srwatson 355243730Srwatsonstatic void 356243730Srwatsonadreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 357243730Srwatson size_t size) 358243730Srwatson{ 359243730Srwatson static uint64_t seq = 1; 360243730Srwatson 361243730Srwatson PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 362243730Srwatson 363243730Srwatson switch (cmd) { 364243730Srwatson case ADIST_CMD_OPEN: 365243730Srwatson case ADIST_CMD_CLOSE: 366243730Srwatson PJDLOG_ASSERT(data != NULL && size == 0); 367243730Srwatson size = strlen(data) + 1; 368243730Srwatson break; 369243730Srwatson case ADIST_CMD_APPEND: 370243730Srwatson PJDLOG_ASSERT(data != NULL && size > 0); 371243730Srwatson break; 372243730Srwatson case ADIST_CMD_KEEPALIVE: 373243730Srwatson case ADIST_CMD_ERROR: 374243730Srwatson PJDLOG_ASSERT(data == NULL && size == 0); 375243730Srwatson break; 376243730Srwatson default: 377243730Srwatson PJDLOG_ABORT("Invalid command (%hhu).", cmd); 378243730Srwatson } 379243730Srwatson 380243730Srwatson adreq->adr_cmd = cmd; 381243730Srwatson adreq->adr_seq = seq++; 382243730Srwatson adreq->adr_datasize = size; 383243730Srwatson /* Don't copy if data is already in out buffer. */ 384243730Srwatson if (data != NULL && data != adreq->adr_data) 385243730Srwatson bcopy(data, adreq->adr_data, size); 386243730Srwatson} 387243730Srwatson 388243730Srwatsonstatic bool 389243730Srwatsonread_thread_wait(void) 390243730Srwatson{ 391243730Srwatson bool newfile = false; 392243730Srwatson 393243730Srwatson mtx_lock(&adist_remote_mtx); 394243730Srwatson if (adhost->adh_reset) { 395247442Spjdreset: 396243730Srwatson adhost->adh_reset = false; 397243730Srwatson if (trail_filefd(adist_trail) != -1) 398243730Srwatson trail_close(adist_trail); 399243730Srwatson trail_reset(adist_trail); 400243730Srwatson while (adhost->adh_remote == NULL) 401243730Srwatson cv_wait(&adist_remote_cond, &adist_remote_mtx); 402243730Srwatson trail_start(adist_trail, adhost->adh_trail_name, 403243730Srwatson adhost->adh_trail_offset); 404243730Srwatson newfile = true; 405243730Srwatson } 406243730Srwatson mtx_unlock(&adist_remote_mtx); 407243730Srwatson while (trail_filefd(adist_trail) == -1) { 408243730Srwatson newfile = true; 409243730Srwatson wait_for_dir(); 410247442Spjd /* 411247442Spjd * We may have been disconnected and reconnected in the 412247442Spjd * meantime, check if reset is set. 413247442Spjd */ 414247442Spjd mtx_lock(&adist_remote_mtx); 415247442Spjd if (adhost->adh_reset) 416247442Spjd goto reset; 417247442Spjd mtx_unlock(&adist_remote_mtx); 418243730Srwatson if (trail_filefd(adist_trail) == -1) 419243730Srwatson trail_next(adist_trail); 420243730Srwatson } 421243730Srwatson if (newfile) { 422243730Srwatson pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 423243730Srwatson adhost->adh_directory, 424243730Srwatson trail_filename(adist_trail)); 425243730Srwatson (void)wait_for_file_init(trail_filefd(adist_trail)); 426243730Srwatson } 427243730Srwatson return (newfile); 428243730Srwatson} 429243730Srwatson 430243730Srwatsonstatic void * 431243730Srwatsonread_thread(void *arg __unused) 432243730Srwatson{ 433243730Srwatson struct adreq *adreq; 434243730Srwatson ssize_t done; 435243730Srwatson bool newfile; 436243730Srwatson 437243730Srwatson pjdlog_debug(1, "%s started.", __func__); 438243730Srwatson 439243730Srwatson for (;;) { 440243730Srwatson newfile = read_thread_wait(); 441243730Srwatson QUEUE_TAKE(adreq, &adist_free_list, 0); 442243730Srwatson if (newfile) { 443243730Srwatson adreq_fill(adreq, ADIST_CMD_OPEN, 444243730Srwatson trail_filename(adist_trail), 0); 445243730Srwatson newfile = false; 446243730Srwatson goto move; 447243730Srwatson } 448243730Srwatson 449243730Srwatson done = read(trail_filefd(adist_trail), adreq->adr_data, 450243730Srwatson ADIST_BUF_SIZE); 451243730Srwatson if (done == -1) { 452243730Srwatson off_t offset; 453243730Srwatson int error; 454243730Srwatson 455243730Srwatson error = errno; 456243730Srwatson offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 457243730Srwatson errno = error; 458243730Srwatson pjdlog_errno(LOG_ERR, 459243730Srwatson "Error while reading \"%s/%s\" at offset %jd", 460243730Srwatson adhost->adh_directory, trail_filename(adist_trail), 461243730Srwatson offset); 462243730Srwatson trail_close(adist_trail); 463243730Srwatson adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 464243730Srwatson goto move; 465243730Srwatson } else if (done == 0) { 466243730Srwatson /* End of file. */ 467243730Srwatson pjdlog_debug(3, "End of \"%s/%s\".", 468243730Srwatson adhost->adh_directory, trail_filename(adist_trail)); 469243730Srwatson if (!trail_switch(adist_trail)) { 470243730Srwatson /* More audit records can arrive. */ 471243730Srwatson mtx_lock(&adist_free_list_lock); 472243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, 473243730Srwatson adr_next); 474243730Srwatson mtx_unlock(&adist_free_list_lock); 475243730Srwatson wait_for_file(); 476243730Srwatson continue; 477243730Srwatson } 478243730Srwatson adreq_fill(adreq, ADIST_CMD_CLOSE, 479243730Srwatson trail_filename(adist_trail), 0); 480243730Srwatson trail_close(adist_trail); 481243730Srwatson goto move; 482243730Srwatson } 483243730Srwatson 484243730Srwatson adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 485243730Srwatsonmove: 486243730Srwatson pjdlog_debug(3, 487243730Srwatson "read thread: Moving request %p to the send queue (%hhu).", 488243730Srwatson adreq, adreq->adr_cmd); 489243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 490243730Srwatson } 491243730Srwatson /* NOTREACHED */ 492243730Srwatson return (NULL); 493243730Srwatson} 494243730Srwatson 495243730Srwatsonstatic void 496243730Srwatsonkeepalive_send(void) 497243730Srwatson{ 498243730Srwatson struct adreq *adreq; 499243730Srwatson 500243730Srwatson rw_rlock(&adist_remote_lock); 501243730Srwatson if (adhost->adh_remote == NULL) { 502243730Srwatson rw_unlock(&adist_remote_lock); 503243730Srwatson return; 504243730Srwatson } 505243730Srwatson rw_unlock(&adist_remote_lock); 506243730Srwatson 507243730Srwatson mtx_lock(&adist_free_list_lock); 508243730Srwatson adreq = TAILQ_FIRST(&adist_free_list); 509243730Srwatson if (adreq != NULL) 510243730Srwatson TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 511243730Srwatson mtx_unlock(&adist_free_list_lock); 512243730Srwatson if (adreq == NULL) 513243730Srwatson return; 514243730Srwatson 515243730Srwatson adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 516243730Srwatson 517243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 518243730Srwatson 519243730Srwatson pjdlog_debug(3, "keepalive_send: Request sent."); 520243730Srwatson} 521243730Srwatson 522243730Srwatson/* 523243730Srwatson * Thread sends request to secondary node. 524243730Srwatson */ 525243730Srwatsonstatic void * 526243730Srwatsonsend_thread(void *arg __unused) 527243730Srwatson{ 528243730Srwatson time_t lastcheck, now; 529243730Srwatson struct adreq *adreq; 530243730Srwatson 531243730Srwatson pjdlog_debug(1, "%s started.", __func__); 532243730Srwatson 533243730Srwatson lastcheck = time(NULL); 534243730Srwatson 535243730Srwatson for (;;) { 536243730Srwatson pjdlog_debug(3, "send thread: Taking request."); 537243730Srwatson for (;;) { 538243730Srwatson QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 539243730Srwatson if (adreq != NULL) 540243730Srwatson break; 541243730Srwatson now = time(NULL); 542243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 543243730Srwatson keepalive_send(); 544243730Srwatson lastcheck = now; 545243730Srwatson } 546243730Srwatson } 547243730Srwatson PJDLOG_ASSERT(adreq != NULL); 548243730Srwatson pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 549243730Srwatson adreq->adr_cmd); 550243730Srwatson /* 551243730Srwatson * Protect connection from disappearing. 552243730Srwatson */ 553243730Srwatson rw_rlock(&adist_remote_lock); 554243730Srwatson /* 555243730Srwatson * Move the request to the recv queue first to avoid race 556243730Srwatson * where the recv thread receives the reply before we move 557243730Srwatson * the request to the recv queue. 558243730Srwatson */ 559243730Srwatson QUEUE_INSERT(adreq, &adist_recv_list); 560243730Srwatson if (adhost->adh_remote == NULL || 561243730Srwatson proto_send(adhost->adh_remote, &adreq->adr_packet, 562243730Srwatson ADPKT_SIZE(adreq)) == -1) { 563243730Srwatson rw_unlock(&adist_remote_lock); 564243730Srwatson pjdlog_debug(1, 565243730Srwatson "send thread: (%p) Unable to send request.", adreq); 566243730Srwatson if (adhost->adh_remote != NULL) 567243730Srwatson sender_disconnect(); 568243730Srwatson continue; 569243730Srwatson } else { 570243730Srwatson pjdlog_debug(3, "Request %p sent successfully.", adreq); 571243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 572243730Srwatson "send: (%p) Request sent: ", adreq); 573243730Srwatson rw_unlock(&adist_remote_lock); 574243730Srwatson } 575243730Srwatson } 576243730Srwatson /* NOTREACHED */ 577243730Srwatson return (NULL); 578243730Srwatson} 579243730Srwatson 580243730Srwatsonstatic void 581243730Srwatsonadrep_decode_header(struct adrep *adrep) 582243730Srwatson{ 583243730Srwatson 584243730Srwatson /* Byte-swap only is the receiver is using different byte order. */ 585243730Srwatson if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 586243730Srwatson adrep->adrp_byteorder = ADIST_BYTEORDER; 587243730Srwatson adrep->adrp_seq = bswap64(adrep->adrp_seq); 588243730Srwatson adrep->adrp_error = bswap16(adrep->adrp_error); 589243730Srwatson } 590243730Srwatson} 591243730Srwatson 592243730Srwatson/* 593243730Srwatson * Thread receives answer from secondary node and passes it to ggate_send 594243730Srwatson * thread. 595243730Srwatson */ 596243730Srwatsonstatic void * 597243730Srwatsonrecv_thread(void *arg __unused) 598243730Srwatson{ 599243730Srwatson struct adrep adrep; 600243730Srwatson struct adreq *adreq; 601243730Srwatson 602243730Srwatson pjdlog_debug(1, "%s started.", __func__); 603243730Srwatson 604243730Srwatson for (;;) { 605243730Srwatson /* Wait until there is anything to receive. */ 606243730Srwatson QUEUE_WAIT(&adist_recv_list); 607243730Srwatson pjdlog_debug(3, "recv thread: Got something."); 608243730Srwatson rw_rlock(&adist_remote_lock); 609243730Srwatson if (adhost->adh_remote == NULL) { 610243730Srwatson /* 611243730Srwatson * Connection is dead. 612243730Srwatson * XXX: We shouldn't be here. 613243730Srwatson */ 614243730Srwatson rw_unlock(&adist_remote_lock); 615243730Srwatson continue; 616243730Srwatson } 617243730Srwatson if (proto_recv(adhost->adh_remote, &adrep, 618243730Srwatson sizeof(adrep)) == -1) { 619243730Srwatson rw_unlock(&adist_remote_lock); 620243730Srwatson pjdlog_errno(LOG_ERR, "Unable to receive reply"); 621243730Srwatson sender_disconnect(); 622243730Srwatson continue; 623243730Srwatson } 624243730Srwatson rw_unlock(&adist_remote_lock); 625243730Srwatson adrep_decode_header(&adrep); 626243730Srwatson /* 627243730Srwatson * Find the request that was just confirmed. 628243730Srwatson */ 629243730Srwatson mtx_lock(&adist_recv_list_lock); 630243730Srwatson TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 631243730Srwatson if (adreq->adr_seq == adrep.adrp_seq) { 632243730Srwatson TAILQ_REMOVE(&adist_recv_list, adreq, 633243730Srwatson adr_next); 634243730Srwatson break; 635243730Srwatson } 636243730Srwatson } 637243730Srwatson if (adreq == NULL) { 638243730Srwatson /* 639243730Srwatson * If we disconnected in the meantime, just continue. 640243730Srwatson * On disconnect sender_disconnect() clears the queue, 641243730Srwatson * we can use that. 642243730Srwatson */ 643243730Srwatson if (TAILQ_EMPTY(&adist_recv_list)) { 644243730Srwatson rw_unlock(&adist_remote_lock); 645243730Srwatson continue; 646243730Srwatson } 647243730Srwatson mtx_unlock(&adist_recv_list_lock); 648243730Srwatson pjdlog_error("Found no request matching received 'seq' field (%ju).", 649243730Srwatson (uintmax_t)adrep.adrp_seq); 650243730Srwatson sender_disconnect(); 651243730Srwatson continue; 652243730Srwatson } 653243730Srwatson mtx_unlock(&adist_recv_list_lock); 654243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 655243730Srwatson "recv thread: (%p) Request confirmed: ", adreq); 656243730Srwatson pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 657243730Srwatson adreq->adr_cmd); 658243730Srwatson if (adrep.adrp_error != 0) { 659243730Srwatson pjdlog_error("Receiver returned error (%s), disconnecting.", 660243730Srwatson adist_errstr((int)adrep.adrp_error)); 661243730Srwatson sender_disconnect(); 662243730Srwatson continue; 663243730Srwatson } 664243730Srwatson if (adreq->adr_cmd == ADIST_CMD_CLOSE) 665243730Srwatson trail_unlink(adist_trail, adreq->adr_data); 666243730Srwatson pjdlog_debug(3, "Request received successfully."); 667243730Srwatson QUEUE_INSERT(adreq, &adist_free_list); 668243730Srwatson } 669243730Srwatson /* NOTREACHED */ 670243730Srwatson return (NULL); 671243730Srwatson} 672243730Srwatson 673243730Srwatsonstatic void 674243730Srwatsonguard_check_connection(void) 675243730Srwatson{ 676243730Srwatson 677243730Srwatson PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 678243730Srwatson 679243730Srwatson rw_rlock(&adist_remote_lock); 680243730Srwatson if (adhost->adh_remote != NULL) { 681243730Srwatson rw_unlock(&adist_remote_lock); 682243730Srwatson pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 683243730Srwatson adhost->adh_remoteaddr); 684243730Srwatson return; 685243730Srwatson } 686243730Srwatson 687243730Srwatson /* 688243730Srwatson * Upgrade the lock. It doesn't have to be atomic as no other thread 689243730Srwatson * can change connection status from disconnected to connected. 690243730Srwatson */ 691243730Srwatson rw_unlock(&adist_remote_lock); 692243730Srwatson pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 693243730Srwatson adhost->adh_remoteaddr); 694243730Srwatson if (sender_connect() == 0) { 695243730Srwatson pjdlog_info("Successfully reconnected to %s.", 696243730Srwatson adhost->adh_remoteaddr); 697243730Srwatson } else { 698243730Srwatson pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 699243730Srwatson adhost->adh_remoteaddr); 700243730Srwatson } 701243730Srwatson} 702243730Srwatson 703243730Srwatson/* 704243730Srwatson * Thread guards remote connections and reconnects when needed, handles 705243730Srwatson * signals, etc. 706243730Srwatson */ 707243730Srwatsonstatic void * 708243730Srwatsonguard_thread(void *arg __unused) 709243730Srwatson{ 710243730Srwatson struct timespec timeout; 711243730Srwatson time_t lastcheck, now; 712243730Srwatson sigset_t mask; 713243730Srwatson int signo; 714243730Srwatson 715243730Srwatson lastcheck = time(NULL); 716243730Srwatson 717243730Srwatson PJDLOG_VERIFY(sigemptyset(&mask) == 0); 718243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 719243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 720243730Srwatson 721243730Srwatson timeout.tv_sec = ADIST_KEEPALIVE; 722243730Srwatson timeout.tv_nsec = 0; 723243730Srwatson signo = -1; 724243730Srwatson 725243730Srwatson for (;;) { 726243730Srwatson switch (signo) { 727243730Srwatson case SIGINT: 728243730Srwatson case SIGTERM: 729243730Srwatson sigexit_received = true; 730243730Srwatson pjdlog_exitx(EX_OK, 731243730Srwatson "Termination signal received, exiting."); 732243730Srwatson break; 733243730Srwatson default: 734243730Srwatson break; 735243730Srwatson } 736243730Srwatson 737243730Srwatson pjdlog_debug(3, "remote_guard: Checking connections."); 738243730Srwatson now = time(NULL); 739243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 740243730Srwatson guard_check_connection(); 741243730Srwatson lastcheck = now; 742243730Srwatson } 743243730Srwatson signo = sigtimedwait(&mask, NULL, &timeout); 744243730Srwatson } 745243730Srwatson /* NOTREACHED */ 746243730Srwatson return (NULL); 747243730Srwatson} 748243730Srwatson 749243730Srwatsonvoid 750243730Srwatsonadist_sender(struct adist_config *config, struct adist_host *adh) 751243730Srwatson{ 752243730Srwatson pthread_t td; 753243730Srwatson pid_t pid; 754243730Srwatson int error, mode, debuglevel; 755243730Srwatson 756243730Srwatson /* 757243730Srwatson * Create communication channel for sending connection requests from 758243730Srwatson * child to parent. 759243730Srwatson */ 760243730Srwatson if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 761243730Srwatson pjdlog_errno(LOG_ERR, 762243730Srwatson "Unable to create connection sockets between child and parent"); 763243730Srwatson return; 764243730Srwatson } 765243730Srwatson 766243730Srwatson pid = fork(); 767243730Srwatson if (pid == -1) { 768243730Srwatson pjdlog_errno(LOG_ERR, "Unable to fork"); 769243730Srwatson proto_close(adh->adh_conn); 770243730Srwatson adh->adh_conn = NULL; 771243730Srwatson return; 772243730Srwatson } 773243730Srwatson 774243730Srwatson if (pid > 0) { 775243730Srwatson /* This is parent. */ 776243730Srwatson adh->adh_worker_pid = pid; 777243730Srwatson /* Declare that we are receiver. */ 778243730Srwatson proto_recv(adh->adh_conn, NULL, 0); 779243730Srwatson return; 780243730Srwatson } 781243730Srwatson 782243730Srwatson adcfg = config; 783243730Srwatson adhost = adh; 784243730Srwatson 785243730Srwatson mode = pjdlog_mode_get(); 786243730Srwatson debuglevel = pjdlog_debug_get(); 787243730Srwatson 788243730Srwatson /* Declare that we are sender. */ 789243730Srwatson proto_send(adhost->adh_conn, NULL, 0); 790243730Srwatson 791243730Srwatson descriptors_cleanup(adhost); 792243730Srwatson 793243730Srwatson#ifdef TODO 794243730Srwatson descriptors_assert(adhost, mode); 795243730Srwatson#endif 796243730Srwatson 797243730Srwatson pjdlog_init(mode); 798243730Srwatson pjdlog_debug_set(debuglevel); 799243730Srwatson pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 800243730Srwatson role2str(adhost->adh_role)); 801243730Srwatson#ifdef HAVE_SETPROCTITLE 802243730Srwatson setproctitle("[%s] (%s) ", adhost->adh_name, 803243730Srwatson role2str(adhost->adh_role)); 804243730Srwatson#endif 805243730Srwatson 806243730Srwatson /* 807243730Srwatson * The sender process should be able to remove entries from its 808243730Srwatson * trail directory, but it should not be able to write to the 809243730Srwatson * trail files, only read from them. 810243730Srwatson */ 811243730Srwatson adist_trail = trail_new(adhost->adh_directory, false); 812243730Srwatson if (adist_trail == NULL) 813243730Srwatson exit(EX_OSFILE); 814243730Srwatson 815243730Srwatson if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 816243730Srwatson role2str(adhost->adh_role), adhost->adh_name) != 0) { 817243730Srwatson exit(EX_CONFIG); 818243730Srwatson } 819243730Srwatson pjdlog_info("Privileges successfully dropped."); 820243730Srwatson 821243730Srwatson /* 822243730Srwatson * We can ignore wait_for_dir_init() failures. It will fall back to 823243730Srwatson * using sleep(3). 824243730Srwatson */ 825243730Srwatson (void)wait_for_dir_init(trail_dirfd(adist_trail)); 826243730Srwatson 827243730Srwatson init_environment(); 828243730Srwatson if (sender_connect() == 0) { 829243730Srwatson pjdlog_info("Successfully connected to %s.", 830243730Srwatson adhost->adh_remoteaddr); 831243730Srwatson } 832243730Srwatson adhost->adh_reset = true; 833243730Srwatson 834243730Srwatson /* 835243730Srwatson * Create the guard thread first, so we can handle signals from the 836243730Srwatson * very begining. 837243730Srwatson */ 838243730Srwatson error = pthread_create(&td, NULL, guard_thread, NULL); 839243730Srwatson PJDLOG_ASSERT(error == 0); 840243730Srwatson error = pthread_create(&td, NULL, send_thread, NULL); 841243730Srwatson PJDLOG_ASSERT(error == 0); 842243730Srwatson error = pthread_create(&td, NULL, recv_thread, NULL); 843243730Srwatson PJDLOG_ASSERT(error == 0); 844243730Srwatson (void)read_thread(NULL); 845243730Srwatson} 846