1243730Srwatson/*- 2243730Srwatson * Copyright (c) 2012 The FreeBSD Foundation 3243730Srwatson * All rights reserved. 4243730Srwatson * 5243730Srwatson * This software was developed by Pawel Jakub Dawidek under sponsorship from 6243730Srwatson * the FreeBSD Foundation. 7243730Srwatson * 8243730Srwatson * Redistribution and use in source and binary forms, with or without 9243730Srwatson * modification, are permitted provided that the following conditions 10243730Srwatson * are met: 11243730Srwatson * 1. Redistributions of source code must retain the above copyright 12243730Srwatson * notice, this list of conditions and the following disclaimer. 13243730Srwatson * 2. Redistributions in binary form must reproduce the above copyright 14243730Srwatson * notice, this list of conditions and the following disclaimer in the 15243730Srwatson * documentation and/or other materials provided with the distribution. 16243730Srwatson * 17243730Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18243730Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19243730Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20243730Srwatson * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21243730Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22243730Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23243730Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24243730Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25243730Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26243730Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27243730Srwatson * SUCH DAMAGE. 28243730Srwatson * 29243734Srwatson * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $ 30243730Srwatson */ 31243730Srwatson 32243734Srwatson#include <config/config.h> 33243730Srwatson 34243730Srwatson#include <sys/param.h> 35243730Srwatson#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 36243730Srwatson#include <sys/endian.h> 37243730Srwatson#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 38243730Srwatson#ifdef HAVE_MACHINE_ENDIAN_H 39243730Srwatson#include <machine/endian.h> 40243730Srwatson#else /* !HAVE_MACHINE_ENDIAN_H */ 41243730Srwatson#ifdef HAVE_ENDIAN_H 42243730Srwatson#include <endian.h> 43243730Srwatson#else /* !HAVE_ENDIAN_H */ 44243730Srwatson#error "No supported endian.h" 45243730Srwatson#endif /* !HAVE_ENDIAN_H */ 46243730Srwatson#endif /* !HAVE_MACHINE_ENDIAN_H */ 47243730Srwatson#include <compat/endian.h> 48243730Srwatson#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 49243730Srwatson#include <sys/queue.h> 50243730Srwatson#include <sys/stat.h> 51243730Srwatson#include <sys/wait.h> 52243730Srwatson 53243730Srwatson#include <stdio.h> 54243730Srwatson#include <stdlib.h> 55243730Srwatson#include <unistd.h> 56243730Srwatson 57243730Srwatson#include <ctype.h> 58243730Srwatson#include <dirent.h> 59243730Srwatson#include <err.h> 60243730Srwatson#include <errno.h> 61243730Srwatson#include <fcntl.h> 62243730Srwatson#ifdef HAVE_LIBUTIL_H 63243730Srwatson#include <libutil.h> 64243730Srwatson#endif 65243730Srwatson#include <signal.h> 66243730Srwatson#include <string.h> 67243730Srwatson#include <strings.h> 68243730Srwatson 69243730Srwatson#include <openssl/hmac.h> 70243730Srwatson 71243730Srwatson#ifndef HAVE_SIGTIMEDWAIT 72243730Srwatson#include "sigtimedwait.h" 73243730Srwatson#endif 74243730Srwatson 75243730Srwatson#include "auditdistd.h" 76243734Srwatson#include "pjdlog.h" 77243730Srwatson#include "proto.h" 78243730Srwatson#include "sandbox.h" 79243730Srwatson#include "subr.h" 80243730Srwatson#include "synch.h" 81243730Srwatson#include "trail.h" 82243730Srwatson 83243730Srwatsonstatic struct adist_config *adcfg; 84243730Srwatsonstatic struct adist_host *adhost; 85243730Srwatson 86243730Srwatsonstatic pthread_rwlock_t adist_remote_lock; 87243730Srwatsonstatic pthread_mutex_t adist_remote_mtx; 88243730Srwatsonstatic pthread_cond_t adist_remote_cond; 89243730Srwatsonstatic struct trail *adist_trail; 90243730Srwatson 91243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_free_list; 92243730Srwatsonstatic pthread_mutex_t adist_free_list_lock; 93243730Srwatsonstatic pthread_cond_t adist_free_list_cond; 94243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_send_list; 95243730Srwatsonstatic pthread_mutex_t adist_send_list_lock; 96243730Srwatsonstatic pthread_cond_t adist_send_list_cond; 97243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_recv_list; 98243730Srwatsonstatic pthread_mutex_t adist_recv_list_lock; 99243730Srwatsonstatic pthread_cond_t adist_recv_list_cond; 100243730Srwatson 101243730Srwatsonstatic void 102243730Srwatsoninit_environment(void) 103243730Srwatson{ 104243730Srwatson struct adreq *adreq; 105243730Srwatson unsigned int ii; 106243730Srwatson 107243730Srwatson rw_init(&adist_remote_lock); 108243730Srwatson mtx_init(&adist_remote_mtx); 109243730Srwatson cv_init(&adist_remote_cond); 110243730Srwatson TAILQ_INIT(&adist_free_list); 111243730Srwatson mtx_init(&adist_free_list_lock); 112243730Srwatson cv_init(&adist_free_list_cond); 113243730Srwatson TAILQ_INIT(&adist_send_list); 114243730Srwatson mtx_init(&adist_send_list_lock); 115243730Srwatson cv_init(&adist_send_list_cond); 116243730Srwatson TAILQ_INIT(&adist_recv_list); 117243730Srwatson mtx_init(&adist_recv_list_lock); 118243730Srwatson cv_init(&adist_recv_list_cond); 119243730Srwatson 120243730Srwatson for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 121243730Srwatson adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 122243730Srwatson if (adreq == NULL) { 123243730Srwatson pjdlog_exitx(EX_TEMPFAIL, 124243730Srwatson "Unable to allocate %zu bytes of memory for adreq object.", 125243730Srwatson sizeof(*adreq) + ADIST_BUF_SIZE); 126243730Srwatson } 127243730Srwatson adreq->adr_byteorder = ADIST_BYTEORDER; 128243730Srwatson adreq->adr_cmd = ADIST_CMD_UNDEFINED; 129243730Srwatson adreq->adr_seq = 0; 130243730Srwatson adreq->adr_datasize = 0; 131243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 132243730Srwatson } 133243730Srwatson} 134243730Srwatson 135243730Srwatsonstatic int 136243730Srwatsonsender_connect(void) 137243730Srwatson{ 138243730Srwatson unsigned char rnd[32], hash[32], resp[32]; 139243730Srwatson struct proto_conn *conn; 140243730Srwatson char welcome[8]; 141243730Srwatson int16_t val; 142243730Srwatson 143243730Srwatson val = 1; 144243730Srwatson if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 145243730Srwatson pjdlog_exit(EX_TEMPFAIL, 146243730Srwatson "Unable to send connection request to parent"); 147243730Srwatson } 148243730Srwatson if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 149243730Srwatson pjdlog_exit(EX_TEMPFAIL, 150243730Srwatson "Unable to receive reply to connection request from parent"); 151243730Srwatson } 152243730Srwatson if (val != 0) { 153243730Srwatson errno = val; 154243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 155243730Srwatson adhost->adh_remoteaddr); 156243730Srwatson return (-1); 157243730Srwatson } 158243730Srwatson if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 159243730Srwatson pjdlog_exit(EX_TEMPFAIL, 160243730Srwatson "Unable to receive connection from parent"); 161243730Srwatson } 162243730Srwatson if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 163243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 164243730Srwatson adhost->adh_remoteaddr); 165243730Srwatson proto_close(conn); 166243730Srwatson return (-1); 167243730Srwatson } 168243730Srwatson pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 169243730Srwatson /* Error in setting timeout is not critical, but why should it fail? */ 170243730Srwatson if (proto_timeout(conn, adcfg->adc_timeout) < 0) 171243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 172243730Srwatson else 173243730Srwatson pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 174243730Srwatson 175243730Srwatson /* Exchange welcome message, which includes version number. */ 176243730Srwatson (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 177243730Srwatson if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 178243730Srwatson pjdlog_errno(LOG_WARNING, 179243730Srwatson "Unable to send welcome message to %s", 180243730Srwatson adhost->adh_remoteaddr); 181243730Srwatson proto_close(conn); 182243730Srwatson return (-1); 183243730Srwatson } 184243730Srwatson pjdlog_debug(1, "Welcome message sent (%s).", welcome); 185243730Srwatson bzero(welcome, sizeof(welcome)); 186243730Srwatson if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 187243730Srwatson pjdlog_errno(LOG_WARNING, 188243730Srwatson "Unable to receive welcome message from %s", 189243730Srwatson adhost->adh_remoteaddr); 190243730Srwatson proto_close(conn); 191243730Srwatson return (-1); 192243730Srwatson } 193243730Srwatson if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 194243730Srwatson !isdigit(welcome[6]) || welcome[7] != '\0') { 195243730Srwatson pjdlog_warning("Invalid welcome message from %s.", 196243730Srwatson adhost->adh_remoteaddr); 197243730Srwatson proto_close(conn); 198243730Srwatson return (-1); 199243730Srwatson } 200243730Srwatson pjdlog_debug(1, "Welcome message received (%s).", welcome); 201243730Srwatson /* 202243730Srwatson * Receiver can only reply with version number lower or equal to 203243730Srwatson * the one we sent. 204243730Srwatson */ 205243730Srwatson adhost->adh_version = atoi(welcome + 5); 206243730Srwatson if (adhost->adh_version > ADIST_VERSION) { 207243730Srwatson pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 208243730Srwatson adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 209243730Srwatson proto_close(conn); 210243730Srwatson return (-1); 211243730Srwatson } 212243730Srwatson 213243730Srwatson pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 214243730Srwatson adhost->adh_remoteaddr); 215243730Srwatson 216243730Srwatson if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 217243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 218243730Srwatson adhost->adh_remoteaddr); 219243730Srwatson proto_close(conn); 220243730Srwatson return (-1); 221243730Srwatson } 222243730Srwatson pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 223243730Srwatson 224243730Srwatson if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 225243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 226243730Srwatson adhost->adh_remoteaddr); 227243730Srwatson proto_close(conn); 228243730Srwatson return (-1); 229243730Srwatson } 230243730Srwatson pjdlog_debug(1, "Challenge received."); 231243730Srwatson 232243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 233243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 234243730Srwatson NULL) == NULL) { 235243730Srwatson pjdlog_warning("Unable to generate response."); 236243730Srwatson proto_close(conn); 237243730Srwatson return (-1); 238243730Srwatson } 239243730Srwatson pjdlog_debug(1, "Response generated."); 240243730Srwatson 241243730Srwatson if (proto_send(conn, hash, sizeof(hash)) == -1) { 242243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 243243730Srwatson adhost->adh_remoteaddr); 244243730Srwatson proto_close(conn); 245243730Srwatson return (-1); 246243730Srwatson } 247243730Srwatson pjdlog_debug(1, "Response sent."); 248243730Srwatson 249243730Srwatson if (adist_random(rnd, sizeof(rnd)) == -1) { 250243730Srwatson pjdlog_warning("Unable to generate challenge."); 251243730Srwatson proto_close(conn); 252243730Srwatson return (-1); 253243730Srwatson } 254243730Srwatson pjdlog_debug(1, "Challenge generated."); 255243730Srwatson 256243730Srwatson if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 257243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 258243730Srwatson adhost->adh_remoteaddr); 259243730Srwatson proto_close(conn); 260243730Srwatson return (-1); 261243730Srwatson } 262243730Srwatson pjdlog_debug(1, "Challenge sent."); 263243730Srwatson 264243730Srwatson if (proto_recv(conn, resp, sizeof(resp)) == -1) { 265243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 266243730Srwatson adhost->adh_remoteaddr); 267243730Srwatson proto_close(conn); 268243730Srwatson return (-1); 269243730Srwatson } 270243730Srwatson pjdlog_debug(1, "Response received."); 271243730Srwatson 272243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 273243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 274243730Srwatson NULL) == NULL) { 275243730Srwatson pjdlog_warning("Unable to generate hash."); 276243730Srwatson proto_close(conn); 277243730Srwatson return (-1); 278243730Srwatson } 279243730Srwatson pjdlog_debug(1, "Hash generated."); 280243730Srwatson 281243730Srwatson if (memcmp(resp, hash, sizeof(hash)) != 0) { 282243730Srwatson pjdlog_warning("Invalid response from %s (wrong password?).", 283243730Srwatson adhost->adh_remoteaddr); 284243730Srwatson proto_close(conn); 285243730Srwatson return (-1); 286243730Srwatson } 287243730Srwatson pjdlog_info("Receiver authenticated."); 288243730Srwatson 289243730Srwatson if (proto_recv(conn, &adhost->adh_trail_offset, 290243730Srwatson sizeof(adhost->adh_trail_offset)) == -1) { 291243730Srwatson pjdlog_errno(LOG_WARNING, 292243730Srwatson "Unable to receive size of the most recent trail file from %s", 293243730Srwatson adhost->adh_remoteaddr); 294243730Srwatson proto_close(conn); 295243730Srwatson return (-1); 296243730Srwatson } 297243730Srwatson adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 298243730Srwatson if (proto_recv(conn, &adhost->adh_trail_name, 299243730Srwatson sizeof(adhost->adh_trail_name)) == -1) { 300243730Srwatson pjdlog_errno(LOG_WARNING, 301243730Srwatson "Unable to receive name of the most recent trail file from %s", 302243730Srwatson adhost->adh_remoteaddr); 303243730Srwatson proto_close(conn); 304243730Srwatson return (-1); 305243730Srwatson } 306243730Srwatson pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 307243730Srwatson adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 308243730Srwatson 309243730Srwatson rw_wlock(&adist_remote_lock); 310243730Srwatson mtx_lock(&adist_remote_mtx); 311243730Srwatson PJDLOG_ASSERT(adhost->adh_remote == NULL); 312243730Srwatson PJDLOG_ASSERT(conn != NULL); 313243730Srwatson adhost->adh_remote = conn; 314243730Srwatson mtx_unlock(&adist_remote_mtx); 315243730Srwatson rw_unlock(&adist_remote_lock); 316243730Srwatson cv_signal(&adist_remote_cond); 317243730Srwatson 318243730Srwatson return (0); 319243730Srwatson} 320243730Srwatson 321243730Srwatsonstatic void 322243730Srwatsonsender_disconnect(void) 323243730Srwatson{ 324243730Srwatson 325243730Srwatson rw_wlock(&adist_remote_lock); 326243730Srwatson /* 327243730Srwatson * Check for a race between dropping rlock and acquiring wlock - 328243730Srwatson * another thread can close connection in-between. 329243730Srwatson */ 330243730Srwatson if (adhost->adh_remote == NULL) { 331243730Srwatson rw_unlock(&adist_remote_lock); 332243730Srwatson return; 333243730Srwatson } 334243730Srwatson pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 335243730Srwatson proto_close(adhost->adh_remote); 336243730Srwatson mtx_lock(&adist_remote_mtx); 337243730Srwatson adhost->adh_remote = NULL; 338243730Srwatson adhost->adh_reset = true; 339243730Srwatson adhost->adh_trail_name[0] = '\0'; 340243730Srwatson adhost->adh_trail_offset = 0; 341243730Srwatson mtx_unlock(&adist_remote_mtx); 342243730Srwatson rw_unlock(&adist_remote_lock); 343243730Srwatson 344243730Srwatson pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 345243730Srwatson 346243730Srwatson /* Move all in-flight requests back onto free list. */ 347243730Srwatson mtx_lock(&adist_free_list_lock); 348243730Srwatson mtx_lock(&adist_send_list_lock); 349243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 350243730Srwatson mtx_unlock(&adist_send_list_lock); 351243730Srwatson mtx_lock(&adist_recv_list_lock); 352243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 353243730Srwatson mtx_unlock(&adist_recv_list_lock); 354243730Srwatson mtx_unlock(&adist_free_list_lock); 355243730Srwatson} 356243730Srwatson 357243730Srwatsonstatic void 358243730Srwatsonadreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 359243730Srwatson size_t size) 360243730Srwatson{ 361243730Srwatson static uint64_t seq = 1; 362243730Srwatson 363243730Srwatson PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 364243730Srwatson 365243730Srwatson switch (cmd) { 366243730Srwatson case ADIST_CMD_OPEN: 367243730Srwatson case ADIST_CMD_CLOSE: 368243730Srwatson PJDLOG_ASSERT(data != NULL && size == 0); 369243730Srwatson size = strlen(data) + 1; 370243730Srwatson break; 371243730Srwatson case ADIST_CMD_APPEND: 372243730Srwatson PJDLOG_ASSERT(data != NULL && size > 0); 373243730Srwatson break; 374243730Srwatson case ADIST_CMD_KEEPALIVE: 375243730Srwatson case ADIST_CMD_ERROR: 376243730Srwatson PJDLOG_ASSERT(data == NULL && size == 0); 377243730Srwatson break; 378243730Srwatson default: 379243730Srwatson PJDLOG_ABORT("Invalid command (%hhu).", cmd); 380243730Srwatson } 381243730Srwatson 382243730Srwatson adreq->adr_cmd = cmd; 383243730Srwatson adreq->adr_seq = seq++; 384243730Srwatson adreq->adr_datasize = size; 385243730Srwatson /* Don't copy if data is already in out buffer. */ 386243730Srwatson if (data != NULL && data != adreq->adr_data) 387243730Srwatson bcopy(data, adreq->adr_data, size); 388243730Srwatson} 389243730Srwatson 390243730Srwatsonstatic bool 391243730Srwatsonread_thread_wait(void) 392243730Srwatson{ 393243730Srwatson bool newfile = false; 394243730Srwatson 395243730Srwatson mtx_lock(&adist_remote_mtx); 396243730Srwatson if (adhost->adh_reset) { 397243730Srwatson adhost->adh_reset = false; 398243730Srwatson if (trail_filefd(adist_trail) != -1) 399243730Srwatson trail_close(adist_trail); 400243730Srwatson trail_reset(adist_trail); 401243730Srwatson while (adhost->adh_remote == NULL) 402243730Srwatson cv_wait(&adist_remote_cond, &adist_remote_mtx); 403243730Srwatson trail_start(adist_trail, adhost->adh_trail_name, 404243730Srwatson adhost->adh_trail_offset); 405243730Srwatson newfile = true; 406243730Srwatson } 407243730Srwatson mtx_unlock(&adist_remote_mtx); 408243730Srwatson while (trail_filefd(adist_trail) == -1) { 409243730Srwatson newfile = true; 410243730Srwatson wait_for_dir(); 411243730Srwatson if (trail_filefd(adist_trail) == -1) 412243730Srwatson trail_next(adist_trail); 413243730Srwatson } 414243730Srwatson if (newfile) { 415243730Srwatson pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 416243730Srwatson adhost->adh_directory, 417243730Srwatson trail_filename(adist_trail)); 418243730Srwatson (void)wait_for_file_init(trail_filefd(adist_trail)); 419243730Srwatson } 420243730Srwatson return (newfile); 421243730Srwatson} 422243730Srwatson 423243730Srwatsonstatic void * 424243730Srwatsonread_thread(void *arg __unused) 425243730Srwatson{ 426243730Srwatson struct adreq *adreq; 427243730Srwatson ssize_t done; 428243730Srwatson bool newfile; 429243730Srwatson 430243730Srwatson pjdlog_debug(1, "%s started.", __func__); 431243730Srwatson 432243730Srwatson for (;;) { 433243730Srwatson newfile = read_thread_wait(); 434243730Srwatson QUEUE_TAKE(adreq, &adist_free_list, 0); 435243730Srwatson if (newfile) { 436243730Srwatson adreq_fill(adreq, ADIST_CMD_OPEN, 437243730Srwatson trail_filename(adist_trail), 0); 438243730Srwatson newfile = false; 439243730Srwatson goto move; 440243730Srwatson } 441243730Srwatson 442243730Srwatson done = read(trail_filefd(adist_trail), adreq->adr_data, 443243730Srwatson ADIST_BUF_SIZE); 444243730Srwatson if (done == -1) { 445243730Srwatson off_t offset; 446243730Srwatson int error; 447243730Srwatson 448243730Srwatson error = errno; 449243730Srwatson offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 450243730Srwatson errno = error; 451243730Srwatson pjdlog_errno(LOG_ERR, 452243730Srwatson "Error while reading \"%s/%s\" at offset %jd", 453243730Srwatson adhost->adh_directory, trail_filename(adist_trail), 454243730Srwatson offset); 455243730Srwatson trail_close(adist_trail); 456243730Srwatson adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 457243730Srwatson goto move; 458243730Srwatson } else if (done == 0) { 459243730Srwatson /* End of file. */ 460243730Srwatson pjdlog_debug(3, "End of \"%s/%s\".", 461243730Srwatson adhost->adh_directory, trail_filename(adist_trail)); 462243730Srwatson if (!trail_switch(adist_trail)) { 463243730Srwatson /* More audit records can arrive. */ 464243730Srwatson mtx_lock(&adist_free_list_lock); 465243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, 466243730Srwatson adr_next); 467243730Srwatson mtx_unlock(&adist_free_list_lock); 468243730Srwatson wait_for_file(); 469243730Srwatson continue; 470243730Srwatson } 471243730Srwatson adreq_fill(adreq, ADIST_CMD_CLOSE, 472243730Srwatson trail_filename(adist_trail), 0); 473243730Srwatson trail_close(adist_trail); 474243730Srwatson goto move; 475243730Srwatson } 476243730Srwatson 477243730Srwatson adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 478243730Srwatsonmove: 479243730Srwatson pjdlog_debug(3, 480243730Srwatson "read thread: Moving request %p to the send queue (%hhu).", 481243730Srwatson adreq, adreq->adr_cmd); 482243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 483243730Srwatson } 484243730Srwatson /* NOTREACHED */ 485243730Srwatson return (NULL); 486243730Srwatson} 487243730Srwatson 488243730Srwatsonstatic void 489243730Srwatsonkeepalive_send(void) 490243730Srwatson{ 491243730Srwatson struct adreq *adreq; 492243730Srwatson 493243730Srwatson rw_rlock(&adist_remote_lock); 494243730Srwatson if (adhost->adh_remote == NULL) { 495243730Srwatson rw_unlock(&adist_remote_lock); 496243730Srwatson return; 497243730Srwatson } 498243730Srwatson rw_unlock(&adist_remote_lock); 499243730Srwatson 500243730Srwatson mtx_lock(&adist_free_list_lock); 501243730Srwatson adreq = TAILQ_FIRST(&adist_free_list); 502243730Srwatson if (adreq != NULL) 503243730Srwatson TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 504243730Srwatson mtx_unlock(&adist_free_list_lock); 505243730Srwatson if (adreq == NULL) 506243730Srwatson return; 507243730Srwatson 508243730Srwatson adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 509243730Srwatson 510243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 511243730Srwatson 512243730Srwatson pjdlog_debug(3, "keepalive_send: Request sent."); 513243730Srwatson} 514243730Srwatson 515243730Srwatson/* 516243730Srwatson * Thread sends request to secondary node. 517243730Srwatson */ 518243730Srwatsonstatic void * 519243730Srwatsonsend_thread(void *arg __unused) 520243730Srwatson{ 521243730Srwatson time_t lastcheck, now; 522243730Srwatson struct adreq *adreq; 523243730Srwatson 524243730Srwatson pjdlog_debug(1, "%s started.", __func__); 525243730Srwatson 526243730Srwatson lastcheck = time(NULL); 527243730Srwatson 528243730Srwatson for (;;) { 529243730Srwatson pjdlog_debug(3, "send thread: Taking request."); 530243730Srwatson for (;;) { 531243730Srwatson QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 532243730Srwatson if (adreq != NULL) 533243730Srwatson break; 534243730Srwatson now = time(NULL); 535243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 536243730Srwatson keepalive_send(); 537243730Srwatson lastcheck = now; 538243730Srwatson } 539243730Srwatson } 540243730Srwatson PJDLOG_ASSERT(adreq != NULL); 541243730Srwatson pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 542243730Srwatson adreq->adr_cmd); 543243730Srwatson /* 544243730Srwatson * Protect connection from disappearing. 545243730Srwatson */ 546243730Srwatson rw_rlock(&adist_remote_lock); 547243730Srwatson /* 548243730Srwatson * Move the request to the recv queue first to avoid race 549243730Srwatson * where the recv thread receives the reply before we move 550243730Srwatson * the request to the recv queue. 551243730Srwatson */ 552243730Srwatson QUEUE_INSERT(adreq, &adist_recv_list); 553243730Srwatson if (adhost->adh_remote == NULL || 554243730Srwatson proto_send(adhost->adh_remote, &adreq->adr_packet, 555243730Srwatson ADPKT_SIZE(adreq)) == -1) { 556243730Srwatson rw_unlock(&adist_remote_lock); 557243730Srwatson pjdlog_debug(1, 558243730Srwatson "send thread: (%p) Unable to send request.", adreq); 559243730Srwatson if (adhost->adh_remote != NULL) 560243730Srwatson sender_disconnect(); 561243730Srwatson continue; 562243730Srwatson } else { 563243730Srwatson pjdlog_debug(3, "Request %p sent successfully.", adreq); 564243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 565243730Srwatson "send: (%p) Request sent: ", adreq); 566243730Srwatson rw_unlock(&adist_remote_lock); 567243730Srwatson } 568243730Srwatson } 569243730Srwatson /* NOTREACHED */ 570243730Srwatson return (NULL); 571243730Srwatson} 572243730Srwatson 573243730Srwatsonstatic void 574243730Srwatsonadrep_decode_header(struct adrep *adrep) 575243730Srwatson{ 576243730Srwatson 577243730Srwatson /* Byte-swap only is the receiver is using different byte order. */ 578243730Srwatson if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 579243730Srwatson adrep->adrp_byteorder = ADIST_BYTEORDER; 580243730Srwatson adrep->adrp_seq = bswap64(adrep->adrp_seq); 581243730Srwatson adrep->adrp_error = bswap16(adrep->adrp_error); 582243730Srwatson } 583243730Srwatson} 584243730Srwatson 585243730Srwatson/* 586243730Srwatson * Thread receives answer from secondary node and passes it to ggate_send 587243730Srwatson * thread. 588243730Srwatson */ 589243730Srwatsonstatic void * 590243730Srwatsonrecv_thread(void *arg __unused) 591243730Srwatson{ 592243730Srwatson struct adrep adrep; 593243730Srwatson struct adreq *adreq; 594243730Srwatson 595243730Srwatson pjdlog_debug(1, "%s started.", __func__); 596243730Srwatson 597243730Srwatson for (;;) { 598243730Srwatson /* Wait until there is anything to receive. */ 599243730Srwatson QUEUE_WAIT(&adist_recv_list); 600243730Srwatson pjdlog_debug(3, "recv thread: Got something."); 601243730Srwatson rw_rlock(&adist_remote_lock); 602243730Srwatson if (adhost->adh_remote == NULL) { 603243730Srwatson /* 604243730Srwatson * Connection is dead. 605243730Srwatson * XXX: We shouldn't be here. 606243730Srwatson */ 607243730Srwatson rw_unlock(&adist_remote_lock); 608243730Srwatson continue; 609243730Srwatson } 610243730Srwatson if (proto_recv(adhost->adh_remote, &adrep, 611243730Srwatson sizeof(adrep)) == -1) { 612243730Srwatson rw_unlock(&adist_remote_lock); 613243730Srwatson pjdlog_errno(LOG_ERR, "Unable to receive reply"); 614243730Srwatson sender_disconnect(); 615243730Srwatson continue; 616243730Srwatson } 617243730Srwatson rw_unlock(&adist_remote_lock); 618243730Srwatson adrep_decode_header(&adrep); 619243730Srwatson /* 620243730Srwatson * Find the request that was just confirmed. 621243730Srwatson */ 622243730Srwatson mtx_lock(&adist_recv_list_lock); 623243730Srwatson TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 624243730Srwatson if (adreq->adr_seq == adrep.adrp_seq) { 625243730Srwatson TAILQ_REMOVE(&adist_recv_list, adreq, 626243730Srwatson adr_next); 627243730Srwatson break; 628243730Srwatson } 629243730Srwatson } 630243730Srwatson if (adreq == NULL) { 631243730Srwatson /* 632243730Srwatson * If we disconnected in the meantime, just continue. 633243730Srwatson * On disconnect sender_disconnect() clears the queue, 634243730Srwatson * we can use that. 635243730Srwatson */ 636243730Srwatson if (TAILQ_EMPTY(&adist_recv_list)) { 637243730Srwatson rw_unlock(&adist_remote_lock); 638243730Srwatson continue; 639243730Srwatson } 640243730Srwatson mtx_unlock(&adist_recv_list_lock); 641243730Srwatson pjdlog_error("Found no request matching received 'seq' field (%ju).", 642243730Srwatson (uintmax_t)adrep.adrp_seq); 643243730Srwatson sender_disconnect(); 644243730Srwatson continue; 645243730Srwatson } 646243730Srwatson mtx_unlock(&adist_recv_list_lock); 647243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 648243730Srwatson "recv thread: (%p) Request confirmed: ", adreq); 649243730Srwatson pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 650243730Srwatson adreq->adr_cmd); 651243730Srwatson if (adrep.adrp_error != 0) { 652243730Srwatson pjdlog_error("Receiver returned error (%s), disconnecting.", 653243730Srwatson adist_errstr((int)adrep.adrp_error)); 654243730Srwatson sender_disconnect(); 655243730Srwatson continue; 656243730Srwatson } 657243730Srwatson if (adreq->adr_cmd == ADIST_CMD_CLOSE) 658243730Srwatson trail_unlink(adist_trail, adreq->adr_data); 659243730Srwatson pjdlog_debug(3, "Request received successfully."); 660243730Srwatson QUEUE_INSERT(adreq, &adist_free_list); 661243730Srwatson } 662243730Srwatson /* NOTREACHED */ 663243730Srwatson return (NULL); 664243730Srwatson} 665243730Srwatson 666243730Srwatsonstatic void 667243730Srwatsonguard_check_connection(void) 668243730Srwatson{ 669243730Srwatson 670243730Srwatson PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 671243730Srwatson 672243730Srwatson rw_rlock(&adist_remote_lock); 673243730Srwatson if (adhost->adh_remote != NULL) { 674243730Srwatson rw_unlock(&adist_remote_lock); 675243730Srwatson pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 676243730Srwatson adhost->adh_remoteaddr); 677243730Srwatson return; 678243730Srwatson } 679243730Srwatson 680243730Srwatson /* 681243730Srwatson * Upgrade the lock. It doesn't have to be atomic as no other thread 682243730Srwatson * can change connection status from disconnected to connected. 683243730Srwatson */ 684243730Srwatson rw_unlock(&adist_remote_lock); 685243730Srwatson pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 686243730Srwatson adhost->adh_remoteaddr); 687243730Srwatson if (sender_connect() == 0) { 688243730Srwatson pjdlog_info("Successfully reconnected to %s.", 689243730Srwatson adhost->adh_remoteaddr); 690243730Srwatson } else { 691243730Srwatson pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 692243730Srwatson adhost->adh_remoteaddr); 693243730Srwatson } 694243730Srwatson} 695243730Srwatson 696243730Srwatson/* 697243730Srwatson * Thread guards remote connections and reconnects when needed, handles 698243730Srwatson * signals, etc. 699243730Srwatson */ 700243730Srwatsonstatic void * 701243730Srwatsonguard_thread(void *arg __unused) 702243730Srwatson{ 703243730Srwatson struct timespec timeout; 704243730Srwatson time_t lastcheck, now; 705243730Srwatson sigset_t mask; 706243730Srwatson int signo; 707243730Srwatson 708243730Srwatson lastcheck = time(NULL); 709243730Srwatson 710243730Srwatson PJDLOG_VERIFY(sigemptyset(&mask) == 0); 711243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 712243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 713243730Srwatson 714243730Srwatson timeout.tv_sec = ADIST_KEEPALIVE; 715243730Srwatson timeout.tv_nsec = 0; 716243730Srwatson signo = -1; 717243730Srwatson 718243730Srwatson for (;;) { 719243730Srwatson switch (signo) { 720243730Srwatson case SIGINT: 721243730Srwatson case SIGTERM: 722243730Srwatson sigexit_received = true; 723243730Srwatson pjdlog_exitx(EX_OK, 724243730Srwatson "Termination signal received, exiting."); 725243730Srwatson break; 726243730Srwatson default: 727243730Srwatson break; 728243730Srwatson } 729243730Srwatson 730243730Srwatson pjdlog_debug(3, "remote_guard: Checking connections."); 731243730Srwatson now = time(NULL); 732243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 733243730Srwatson guard_check_connection(); 734243730Srwatson lastcheck = now; 735243730Srwatson } 736243730Srwatson signo = sigtimedwait(&mask, NULL, &timeout); 737243730Srwatson } 738243730Srwatson /* NOTREACHED */ 739243730Srwatson return (NULL); 740243730Srwatson} 741243730Srwatson 742243730Srwatsonvoid 743243730Srwatsonadist_sender(struct adist_config *config, struct adist_host *adh) 744243730Srwatson{ 745243730Srwatson pthread_t td; 746243730Srwatson pid_t pid; 747243730Srwatson int error, mode, debuglevel; 748243730Srwatson 749243730Srwatson /* 750243730Srwatson * Create communication channel for sending connection requests from 751243730Srwatson * child to parent. 752243730Srwatson */ 753243730Srwatson if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 754243730Srwatson pjdlog_errno(LOG_ERR, 755243730Srwatson "Unable to create connection sockets between child and parent"); 756243730Srwatson return; 757243730Srwatson } 758243730Srwatson 759243730Srwatson pid = fork(); 760243730Srwatson if (pid == -1) { 761243730Srwatson pjdlog_errno(LOG_ERR, "Unable to fork"); 762243730Srwatson proto_close(adh->adh_conn); 763243730Srwatson adh->adh_conn = NULL; 764243730Srwatson return; 765243730Srwatson } 766243730Srwatson 767243730Srwatson if (pid > 0) { 768243730Srwatson /* This is parent. */ 769243730Srwatson adh->adh_worker_pid = pid; 770243730Srwatson /* Declare that we are receiver. */ 771243730Srwatson proto_recv(adh->adh_conn, NULL, 0); 772243730Srwatson return; 773243730Srwatson } 774243730Srwatson 775243730Srwatson adcfg = config; 776243730Srwatson adhost = adh; 777243730Srwatson 778243730Srwatson mode = pjdlog_mode_get(); 779243730Srwatson debuglevel = pjdlog_debug_get(); 780243730Srwatson 781243730Srwatson /* Declare that we are sender. */ 782243730Srwatson proto_send(adhost->adh_conn, NULL, 0); 783243730Srwatson 784243730Srwatson descriptors_cleanup(adhost); 785243730Srwatson 786243730Srwatson#ifdef TODO 787243730Srwatson descriptors_assert(adhost, mode); 788243730Srwatson#endif 789243730Srwatson 790243730Srwatson pjdlog_init(mode); 791243730Srwatson pjdlog_debug_set(debuglevel); 792243730Srwatson pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 793243730Srwatson role2str(adhost->adh_role)); 794243730Srwatson#ifdef HAVE_SETPROCTITLE 795243730Srwatson setproctitle("[%s] (%s) ", adhost->adh_name, 796243730Srwatson role2str(adhost->adh_role)); 797243730Srwatson#endif 798243730Srwatson 799243730Srwatson /* 800243730Srwatson * The sender process should be able to remove entries from its 801243730Srwatson * trail directory, but it should not be able to write to the 802243730Srwatson * trail files, only read from them. 803243730Srwatson */ 804243730Srwatson adist_trail = trail_new(adhost->adh_directory, false); 805243730Srwatson if (adist_trail == NULL) 806243730Srwatson exit(EX_OSFILE); 807243730Srwatson 808243730Srwatson if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 809243730Srwatson role2str(adhost->adh_role), adhost->adh_name) != 0) { 810243730Srwatson exit(EX_CONFIG); 811243730Srwatson } 812243730Srwatson pjdlog_info("Privileges successfully dropped."); 813243730Srwatson 814243730Srwatson /* 815243730Srwatson * We can ignore wait_for_dir_init() failures. It will fall back to 816243730Srwatson * using sleep(3). 817243730Srwatson */ 818243730Srwatson (void)wait_for_dir_init(trail_dirfd(adist_trail)); 819243730Srwatson 820243730Srwatson init_environment(); 821243730Srwatson if (sender_connect() == 0) { 822243730Srwatson pjdlog_info("Successfully connected to %s.", 823243730Srwatson adhost->adh_remoteaddr); 824243730Srwatson } 825243730Srwatson adhost->adh_reset = true; 826243730Srwatson 827243730Srwatson /* 828243730Srwatson * Create the guard thread first, so we can handle signals from the 829243730Srwatson * very begining. 830243730Srwatson */ 831243730Srwatson error = pthread_create(&td, NULL, guard_thread, NULL); 832243730Srwatson PJDLOG_ASSERT(error == 0); 833243730Srwatson error = pthread_create(&td, NULL, send_thread, NULL); 834243730Srwatson PJDLOG_ASSERT(error == 0); 835243730Srwatson error = pthread_create(&td, NULL, recv_thread, NULL); 836243730Srwatson PJDLOG_ASSERT(error == 0); 837243730Srwatson (void)read_thread(NULL); 838243730Srwatson} 839