150476Speter/*- 21802Sphk * Copyright (c) 2012 The FreeBSD Foundation 3298107Sgjb * All rights reserved. 41802Sphk * 5234746Sobrien * This software was developed by Pawel Jakub Dawidek under sponsorship from 6284421Sbapt * the FreeBSD Foundation. 7234746Sobrien * 844301Swollman * Redistribution and use in source and binary forms, with or without 9143334Scperciva * modification, are permitted provided that the following conditions 10220496Smarkm * are met: 11292782Sallanjude * 1. Redistributions of source code must retain the above copyright 12300903Sallanjude * notice, this list of conditions and the following disclaimer. 13300903Sallanjude * 2. Redistributions in binary form must reproduce the above copyright 1455955Srgrimes * notice, this list of conditions and the following disclaimer in the 15201381Sed * documentation and/or other materials provided with the distribution. 16201381Sed * 17234746Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 189488Sphk * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1974385Sphk * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2074385Sphk * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 219488Sphk * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2274385Sphk * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2374385Sphk * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2444437Sache * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2544437Sache * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2644437Sache * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2774385Sphk * SUCH DAMAGE. 2844437Sache */ 2974385Sphk 3074385Sphk#include <config/config.h> 3144437Sache 3274385Sphk#include <sys/param.h> 3374385Sphk#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 34143334Scperciva#include <sys/endian.h> 35143334Scperciva#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 36143334Scperciva#ifdef HAVE_MACHINE_ENDIAN_H 37143334Scperciva#include <machine/endian.h> 38292782Sallanjude#else /* !HAVE_MACHINE_ENDIAN_H */ 39292782Sallanjude#ifdef HAVE_ENDIAN_H 40292782Sallanjude#include <endian.h> 41292782Sallanjude#else /* !HAVE_ENDIAN_H */ 42220496Smarkm#error "No supported endian.h" 43220496Smarkm#endif /* !HAVE_ENDIAN_H */ 44220496Smarkm#endif /* !HAVE_MACHINE_ENDIAN_H */ 45220496Smarkm#include <compat/endian.h> 46300903Sallanjude#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 47300903Sallanjude#include <sys/queue.h> 48300903Sallanjude#include <sys/stat.h> 49300903Sallanjude#include <sys/wait.h> 5044290Swollman 5144301Swollman#include <stdio.h> 52143334Scperciva#include <stdlib.h> 53292782Sallanjude#include <unistd.h> 54300903Sallanjude 55220496Smarkm#include <ctype.h> 56282774Sthomas#include <dirent.h> 57282774Sthomas#include <err.h> 58282774Sthomas#include <errno.h> 59282774Sthomas#include <fcntl.h> 60282774Sthomas#ifdef HAVE_LIBUTIL_H 61282774Sthomas#include <libutil.h> 62285417Srodrigc#endif 63285417Srodrigc#include <signal.h> 64263218Sjmg#include <string.h> 651802Sphk#include <strings.h> 6644290Swollman 6744301Swollman#include <openssl/hmac.h> 68218723Sdim 6944290Swollman#ifndef HAVE_SIGTIMEDWAIT 7044301Swollman#include "sigtimedwait.h" 7144301Swollman#endif 72218723Sdim 7344301Swollman#include "auditdistd.h" 74218723Sdim#include "pjdlog.h" 75218723Sdim#include "proto.h" 76218723Sdim#include "sandbox.h" 7744290Swollman#include "subr.h" 781802Sphk#include "synch.h" 7944290Swollman#include "trail.h" 8044290Swollman 811802Sphkstatic struct adist_config *adcfg; 821802Sphkstatic struct adist_host *adhost; 8344290Swollman 8444290Swollmanstatic pthread_rwlock_t adist_remote_lock; 851802Sphkstatic pthread_mutex_t adist_remote_mtx; 8644290Swollmanstatic pthread_cond_t adist_remote_cond; 8744290Swollmanstatic struct trail *adist_trail; 8844290Swollman 8944290Swollmanstatic TAILQ_HEAD(, adreq) adist_free_list; 9044290Swollmanstatic pthread_mutex_t adist_free_list_lock; 9144290Swollmanstatic pthread_cond_t adist_free_list_cond; 9244290Swollmanstatic TAILQ_HEAD(, adreq) adist_send_list; 9344290Swollmanstatic pthread_mutex_t adist_send_list_lock; 9444290Swollmanstatic pthread_cond_t adist_send_list_cond; 9544290Swollmanstatic TAILQ_HEAD(, adreq) adist_recv_list; 96143334Scpercivastatic pthread_mutex_t adist_recv_list_lock; 97143334Scpercivastatic pthread_cond_t adist_recv_list_cond; 98143334Scperciva 99143334Scpercivastatic void 100143334Scpercivainit_environment(void) 101143334Scperciva{ 102292782Sallanjude struct adreq *adreq; 103292782Sallanjude unsigned int ii; 104292782Sallanjude 105292782Sallanjude rw_init(&adist_remote_lock); 106292782Sallanjude mtx_init(&adist_remote_mtx); 107292782Sallanjude cv_init(&adist_remote_cond); 108220496Smarkm TAILQ_INIT(&adist_free_list); 109220496Smarkm mtx_init(&adist_free_list_lock); 110220496Smarkm cv_init(&adist_free_list_cond); 111220496Smarkm TAILQ_INIT(&adist_send_list); 112220496Smarkm mtx_init(&adist_send_list_lock); 113220496Smarkm cv_init(&adist_send_list_cond); 114300903Sallanjude TAILQ_INIT(&adist_recv_list); 115300903Sallanjude mtx_init(&adist_recv_list_lock); 116300903Sallanjude cv_init(&adist_recv_list_cond); 117300903Sallanjude 118300903Sallanjude for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 119300903Sallanjude adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 120300903Sallanjude if (adreq == NULL) { 12144301Swollman pjdlog_exitx(EX_TEMPFAIL, 12244301Swollman "Unable to allocate %zu bytes of memory for adreq object.", 12344301Swollman sizeof(*adreq) + ADIST_BUF_SIZE); 12444301Swollman } 12544301Swollman adreq->adr_byteorder = ADIST_BYTEORDER; 12644301Swollman adreq->adr_cmd = ADIST_CMD_UNDEFINED; 12794367Sru adreq->adr_seq = 0; 12894367Sru adreq->adr_datasize = 0; 12994367Sru TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 13094367Sru } 13194367Sru} 1321802Sphk 1331802Sphkstatic int 13439063Simpsender_connect(void) 13539063Simp{ 13639063Simp unsigned char rnd[32], hash[32], resp[32]; 13739063Simp struct proto_conn *conn; 13839063Simp char welcome[8]; 13939063Simp int16_t val; 140220496Smarkm 141220496Smarkm val = 1; 142220496Smarkm if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 143220496Smarkm pjdlog_exit(EX_TEMPFAIL, 1441802Sphk "Unable to send connection request to parent"); 1451802Sphk } 14639063Simp if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 14739063Simp pjdlog_exit(EX_TEMPFAIL, 14839063Simp "Unable to receive reply to connection request from parent"); 14939063Simp } 15039063Simp if (val != 0) { 15139063Simp errno = val; 15239063Simp pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 15339063Simp adhost->adh_remoteaddr); 1541802Sphk return (-1); 15544290Swollman } 156220496Smarkm if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 157220496Smarkm pjdlog_exit(EX_TEMPFAIL, 158220496Smarkm "Unable to receive connection from parent"); 159220496Smarkm } 160220496Smarkm if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 161220496Smarkm pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 162220496Smarkm adhost->adh_remoteaddr); 163220496Smarkm proto_close(conn); 164220496Smarkm return (-1); 165220496Smarkm } 166220496Smarkm pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 16744290Swollman /* Error in setting timeout is not critical, but why should it fail? */ 16844290Swollman if (proto_timeout(conn, adcfg->adc_timeout) < 0) 169220496Smarkm pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 170220496Smarkm else 171220496Smarkm pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 172220496Smarkm 173220496Smarkm /* Exchange welcome message, which includes version number. */ 174220496Smarkm (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 175220496Smarkm if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 176220496Smarkm pjdlog_errno(LOG_WARNING, 177220496Smarkm "Unable to send welcome message to %s", 178220496Smarkm adhost->adh_remoteaddr); 179220496Smarkm proto_close(conn); 18044290Swollman return (-1); 181143334Scperciva } 182143334Scperciva pjdlog_debug(1, "Welcome message sent (%s).", welcome); 183143334Scperciva bzero(welcome, sizeof(welcome)); 184220496Smarkm if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 185220496Smarkm pjdlog_errno(LOG_WARNING, 186220496Smarkm "Unable to receive welcome message from %s", 187220496Smarkm adhost->adh_remoteaddr); 188220496Smarkm proto_close(conn); 189220496Smarkm return (-1); 190220496Smarkm } 191220496Smarkm if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 192220496Smarkm !isdigit(welcome[6]) || welcome[7] != '\0') { 193220496Smarkm pjdlog_warning("Invalid welcome message from %s.", 194143334Scperciva adhost->adh_remoteaddr); 195292782Sallanjude proto_close(conn); 196292782Sallanjude return (-1); 197292782Sallanjude } 198292782Sallanjude pjdlog_debug(1, "Welcome message received (%s).", welcome); 199292782Sallanjude /* 200292782Sallanjude * Receiver can only reply with version number lower or equal to 201292782Sallanjude * the one we sent. 202292782Sallanjude */ 203292782Sallanjude adhost->adh_version = atoi(welcome + 5); 204292782Sallanjude if (adhost->adh_version > ADIST_VERSION) { 205292782Sallanjude pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 206292782Sallanjude adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 207292782Sallanjude proto_close(conn); 208292782Sallanjude return (-1); 209292782Sallanjude } 210220496Smarkm 211220496Smarkm pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 212220496Smarkm adhost->adh_remoteaddr); 213220496Smarkm 214220496Smarkm if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 215220496Smarkm pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 216220496Smarkm adhost->adh_remoteaddr); 217220496Smarkm proto_close(conn); 218220496Smarkm return (-1); 219220496Smarkm } 220220496Smarkm pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 221220496Smarkm 222220496Smarkm if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 223220496Smarkm pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 224220496Smarkm adhost->adh_remoteaddr); 225300903Sallanjude proto_close(conn); 226300903Sallanjude return (-1); 227300903Sallanjude } 228300903Sallanjude pjdlog_debug(1, "Challenge received."); 229300903Sallanjude 230300903Sallanjude if (HMAC(EVP_sha256(), adhost->adh_password, 231300903Sallanjude (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 232300903Sallanjude NULL) == NULL) { 233300903Sallanjude pjdlog_warning("Unable to generate response."); 234300903Sallanjude proto_close(conn); 235300903Sallanjude return (-1); 236300903Sallanjude } 237300903Sallanjude pjdlog_debug(1, "Response generated."); 238300903Sallanjude 239300903Sallanjude if (proto_send(conn, hash, sizeof(hash)) == -1) { 24044301Swollman pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 241220496Smarkm adhost->adh_remoteaddr); 242220496Smarkm proto_close(conn); 243220496Smarkm return (-1); 244220496Smarkm } 245220496Smarkm pjdlog_debug(1, "Response sent."); 246220496Smarkm 247220496Smarkm if (adist_random(rnd, sizeof(rnd)) == -1) { 248220496Smarkm pjdlog_warning("Unable to generate challenge."); 249220496Smarkm proto_close(conn); 250220496Smarkm return (-1); 251220496Smarkm } 25244301Swollman pjdlog_debug(1, "Challenge generated."); 253292782Sallanjude 254300903Sallanjude if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 2552368Sbde pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 2562368Sbde adhost->adh_remoteaddr); 257185568Sphk proto_close(conn); 2581964Sjkh return (-1); 2592368Sbde } 260185568Sphk pjdlog_debug(1, "Challenge sent."); 2611964Sjkh 2622368Sbde if (proto_recv(conn, resp, sizeof(resp)) == -1) { 2631802Sphk pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 264185568Sphk adhost->adh_remoteaddr); 26544301Swollman proto_close(conn); 26644301Swollman return (-1); 26744301Swollman } 268185568Sphk pjdlog_debug(1, "Response received."); 26944290Swollman 27044290Swollman if (HMAC(EVP_sha256(), adhost->adh_password, 271185568Sphk (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 27244290Swollman NULL) == NULL) { 27344290Swollman pjdlog_warning("Unable to generate hash."); 274185568Sphk proto_close(conn); 275143334Scperciva return (-1); 276143334Scperciva } 277292782Sallanjude pjdlog_debug(1, "Hash generated."); 278292782Sallanjude 279292782Sallanjude if (memcmp(resp, hash, sizeof(hash)) != 0) { 280220496Smarkm pjdlog_warning("Invalid response from %s (wrong password?).", 281220496Smarkm adhost->adh_remoteaddr); 282220496Smarkm proto_close(conn); 283300903Sallanjude return (-1); 284300903Sallanjude } 285300903Sallanjude pjdlog_info("Receiver authenticated."); 28644290Swollman 2871802Sphk if (proto_recv(conn, &adhost->adh_trail_offset, 2881802Sphk sizeof(adhost->adh_trail_offset)) == -1) { 289 pjdlog_errno(LOG_WARNING, 290 "Unable to receive size of the most recent trail file from %s", 291 adhost->adh_remoteaddr); 292 proto_close(conn); 293 return (-1); 294 } 295 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 296 if (proto_recv(conn, &adhost->adh_trail_name, 297 sizeof(adhost->adh_trail_name)) == -1) { 298 pjdlog_errno(LOG_WARNING, 299 "Unable to receive name of the most recent trail file from %s", 300 adhost->adh_remoteaddr); 301 proto_close(conn); 302 return (-1); 303 } 304 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 305 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 306 307 rw_wlock(&adist_remote_lock); 308 mtx_lock(&adist_remote_mtx); 309 PJDLOG_ASSERT(adhost->adh_remote == NULL); 310 PJDLOG_ASSERT(conn != NULL); 311 adhost->adh_remote = conn; 312 mtx_unlock(&adist_remote_mtx); 313 rw_unlock(&adist_remote_lock); 314 cv_signal(&adist_remote_cond); 315 316 return (0); 317} 318 319static void 320sender_disconnect(void) 321{ 322 323 rw_wlock(&adist_remote_lock); 324 /* 325 * Check for a race between dropping rlock and acquiring wlock - 326 * another thread can close connection in-between. 327 */ 328 if (adhost->adh_remote == NULL) { 329 rw_unlock(&adist_remote_lock); 330 return; 331 } 332 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 333 proto_close(adhost->adh_remote); 334 mtx_lock(&adist_remote_mtx); 335 adhost->adh_remote = NULL; 336 adhost->adh_reset = true; 337 adhost->adh_trail_name[0] = '\0'; 338 adhost->adh_trail_offset = 0; 339 mtx_unlock(&adist_remote_mtx); 340 rw_unlock(&adist_remote_lock); 341 342 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 343 344 /* Move all in-flight requests back onto free list. */ 345 mtx_lock(&adist_free_list_lock); 346 mtx_lock(&adist_send_list_lock); 347 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 348 mtx_unlock(&adist_send_list_lock); 349 mtx_lock(&adist_recv_list_lock); 350 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 351 mtx_unlock(&adist_recv_list_lock); 352 mtx_unlock(&adist_free_list_lock); 353} 354 355static void 356adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 357 size_t size) 358{ 359 static uint64_t seq = 1; 360 361 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 362 363 switch (cmd) { 364 case ADIST_CMD_OPEN: 365 case ADIST_CMD_CLOSE: 366 PJDLOG_ASSERT(data != NULL && size == 0); 367 size = strlen(data) + 1; 368 break; 369 case ADIST_CMD_APPEND: 370 PJDLOG_ASSERT(data != NULL && size > 0); 371 break; 372 case ADIST_CMD_KEEPALIVE: 373 case ADIST_CMD_ERROR: 374 PJDLOG_ASSERT(data == NULL && size == 0); 375 break; 376 default: 377 PJDLOG_ABORT("Invalid command (%hhu).", cmd); 378 } 379 380 adreq->adr_cmd = cmd; 381 adreq->adr_seq = seq++; 382 adreq->adr_datasize = size; 383 /* Don't copy if data is already in out buffer. */ 384 if (data != NULL && data != adreq->adr_data) 385 bcopy(data, adreq->adr_data, size); 386} 387 388static bool 389read_thread_wait(void) 390{ 391 bool newfile = false; 392 393 mtx_lock(&adist_remote_mtx); 394 if (adhost->adh_reset) { 395reset: 396 adhost->adh_reset = false; 397 if (trail_filefd(adist_trail) != -1) 398 trail_close(adist_trail); 399 trail_reset(adist_trail); 400 while (adhost->adh_remote == NULL) 401 cv_wait(&adist_remote_cond, &adist_remote_mtx); 402 trail_start(adist_trail, adhost->adh_trail_name, 403 adhost->adh_trail_offset); 404 newfile = true; 405 } 406 mtx_unlock(&adist_remote_mtx); 407 while (trail_filefd(adist_trail) == -1) { 408 newfile = true; 409 wait_for_dir(); 410 /* 411 * We may have been disconnected and reconnected in the 412 * meantime, check if reset is set. 413 */ 414 mtx_lock(&adist_remote_mtx); 415 if (adhost->adh_reset) 416 goto reset; 417 mtx_unlock(&adist_remote_mtx); 418 if (trail_filefd(adist_trail) == -1) 419 trail_next(adist_trail); 420 } 421 if (newfile) { 422 pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 423 adhost->adh_directory, 424 trail_filename(adist_trail)); 425 (void)wait_for_file_init(trail_filefd(adist_trail)); 426 } 427 return (newfile); 428} 429 430static void * 431read_thread(void *arg __unused) 432{ 433 struct adreq *adreq; 434 ssize_t done; 435 bool newfile; 436 437 pjdlog_debug(1, "%s started.", __func__); 438 439 for (;;) { 440 newfile = read_thread_wait(); 441 QUEUE_TAKE(adreq, &adist_free_list, 0); 442 if (newfile) { 443 adreq_fill(adreq, ADIST_CMD_OPEN, 444 trail_filename(adist_trail), 0); 445 newfile = false; 446 goto move; 447 } 448 449 done = read(trail_filefd(adist_trail), adreq->adr_data, 450 ADIST_BUF_SIZE); 451 if (done == -1) { 452 off_t offset; 453 int error; 454 455 error = errno; 456 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 457 errno = error; 458 pjdlog_errno(LOG_ERR, 459 "Error while reading \"%s/%s\" at offset %jd", 460 adhost->adh_directory, trail_filename(adist_trail), 461 offset); 462 trail_close(adist_trail); 463 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 464 goto move; 465 } else if (done == 0) { 466 /* End of file. */ 467 pjdlog_debug(3, "End of \"%s/%s\".", 468 adhost->adh_directory, trail_filename(adist_trail)); 469 if (!trail_switch(adist_trail)) { 470 /* More audit records can arrive. */ 471 mtx_lock(&adist_free_list_lock); 472 TAILQ_INSERT_TAIL(&adist_free_list, adreq, 473 adr_next); 474 mtx_unlock(&adist_free_list_lock); 475 wait_for_file(); 476 continue; 477 } 478 adreq_fill(adreq, ADIST_CMD_CLOSE, 479 trail_filename(adist_trail), 0); 480 trail_close(adist_trail); 481 goto move; 482 } 483 484 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 485move: 486 pjdlog_debug(3, 487 "read thread: Moving request %p to the send queue (%hhu).", 488 adreq, adreq->adr_cmd); 489 QUEUE_INSERT(adreq, &adist_send_list); 490 } 491 /* NOTREACHED */ 492 return (NULL); 493} 494 495static void 496keepalive_send(void) 497{ 498 struct adreq *adreq; 499 500 rw_rlock(&adist_remote_lock); 501 if (adhost->adh_remote == NULL) { 502 rw_unlock(&adist_remote_lock); 503 return; 504 } 505 rw_unlock(&adist_remote_lock); 506 507 mtx_lock(&adist_free_list_lock); 508 adreq = TAILQ_FIRST(&adist_free_list); 509 if (adreq != NULL) 510 TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 511 mtx_unlock(&adist_free_list_lock); 512 if (adreq == NULL) 513 return; 514 515 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 516 517 QUEUE_INSERT(adreq, &adist_send_list); 518 519 pjdlog_debug(3, "keepalive_send: Request sent."); 520} 521 522/* 523 * Thread sends request to secondary node. 524 */ 525static void * 526send_thread(void *arg __unused) 527{ 528 time_t lastcheck, now; 529 struct adreq *adreq; 530 531 pjdlog_debug(1, "%s started.", __func__); 532 533 lastcheck = time(NULL); 534 535 for (;;) { 536 pjdlog_debug(3, "send thread: Taking request."); 537 for (;;) { 538 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 539 if (adreq != NULL) 540 break; 541 now = time(NULL); 542 if (lastcheck + ADIST_KEEPALIVE <= now) { 543 keepalive_send(); 544 lastcheck = now; 545 } 546 } 547 PJDLOG_ASSERT(adreq != NULL); 548 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 549 adreq->adr_cmd); 550 /* 551 * Protect connection from disappearing. 552 */ 553 rw_rlock(&adist_remote_lock); 554 /* 555 * Move the request to the recv queue first to avoid race 556 * where the recv thread receives the reply before we move 557 * the request to the recv queue. 558 */ 559 QUEUE_INSERT(adreq, &adist_recv_list); 560 if (adhost->adh_remote == NULL || 561 proto_send(adhost->adh_remote, &adreq->adr_packet, 562 ADPKT_SIZE(adreq)) == -1) { 563 rw_unlock(&adist_remote_lock); 564 pjdlog_debug(1, 565 "send thread: (%p) Unable to send request.", adreq); 566 if (adhost->adh_remote != NULL) 567 sender_disconnect(); 568 continue; 569 } else { 570 pjdlog_debug(3, "Request %p sent successfully.", adreq); 571 adreq_log(LOG_DEBUG, 2, -1, adreq, 572 "send: (%p) Request sent: ", adreq); 573 rw_unlock(&adist_remote_lock); 574 } 575 } 576 /* NOTREACHED */ 577 return (NULL); 578} 579 580static void 581adrep_decode_header(struct adrep *adrep) 582{ 583 584 /* Byte-swap only is the receiver is using different byte order. */ 585 if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 586 adrep->adrp_byteorder = ADIST_BYTEORDER; 587 adrep->adrp_seq = bswap64(adrep->adrp_seq); 588 adrep->adrp_error = bswap16(adrep->adrp_error); 589 } 590} 591 592/* 593 * Thread receives answer from secondary node and passes it to ggate_send 594 * thread. 595 */ 596static void * 597recv_thread(void *arg __unused) 598{ 599 struct adrep adrep; 600 struct adreq *adreq; 601 602 pjdlog_debug(1, "%s started.", __func__); 603 604 for (;;) { 605 /* Wait until there is anything to receive. */ 606 QUEUE_WAIT(&adist_recv_list); 607 pjdlog_debug(3, "recv thread: Got something."); 608 rw_rlock(&adist_remote_lock); 609 if (adhost->adh_remote == NULL) { 610 /* 611 * Connection is dead. 612 * XXX: We shouldn't be here. 613 */ 614 rw_unlock(&adist_remote_lock); 615 continue; 616 } 617 if (proto_recv(adhost->adh_remote, &adrep, 618 sizeof(adrep)) == -1) { 619 rw_unlock(&adist_remote_lock); 620 pjdlog_errno(LOG_ERR, "Unable to receive reply"); 621 sender_disconnect(); 622 continue; 623 } 624 rw_unlock(&adist_remote_lock); 625 adrep_decode_header(&adrep); 626 /* 627 * Find the request that was just confirmed. 628 */ 629 mtx_lock(&adist_recv_list_lock); 630 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 631 if (adreq->adr_seq == adrep.adrp_seq) { 632 TAILQ_REMOVE(&adist_recv_list, adreq, 633 adr_next); 634 break; 635 } 636 } 637 if (adreq == NULL) { 638 /* 639 * If we disconnected in the meantime, just continue. 640 * On disconnect sender_disconnect() clears the queue, 641 * we can use that. 642 */ 643 if (TAILQ_EMPTY(&adist_recv_list)) { 644 rw_unlock(&adist_remote_lock); 645 continue; 646 } 647 mtx_unlock(&adist_recv_list_lock); 648 pjdlog_error("Found no request matching received 'seq' field (%ju).", 649 (uintmax_t)adrep.adrp_seq); 650 sender_disconnect(); 651 continue; 652 } 653 mtx_unlock(&adist_recv_list_lock); 654 adreq_log(LOG_DEBUG, 2, -1, adreq, 655 "recv thread: (%p) Request confirmed: ", adreq); 656 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 657 adreq->adr_cmd); 658 if (adrep.adrp_error != 0) { 659 pjdlog_error("Receiver returned error (%s), disconnecting.", 660 adist_errstr((int)adrep.adrp_error)); 661 sender_disconnect(); 662 continue; 663 } 664 if (adreq->adr_cmd == ADIST_CMD_CLOSE) 665 trail_unlink(adist_trail, adreq->adr_data); 666 pjdlog_debug(3, "Request received successfully."); 667 QUEUE_INSERT(adreq, &adist_free_list); 668 } 669 /* NOTREACHED */ 670 return (NULL); 671} 672 673static void 674guard_check_connection(void) 675{ 676 677 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 678 679 rw_rlock(&adist_remote_lock); 680 if (adhost->adh_remote != NULL) { 681 rw_unlock(&adist_remote_lock); 682 pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 683 adhost->adh_remoteaddr); 684 return; 685 } 686 687 /* 688 * Upgrade the lock. It doesn't have to be atomic as no other thread 689 * can change connection status from disconnected to connected. 690 */ 691 rw_unlock(&adist_remote_lock); 692 pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 693 adhost->adh_remoteaddr); 694 if (sender_connect() == 0) { 695 pjdlog_info("Successfully reconnected to %s.", 696 adhost->adh_remoteaddr); 697 } else { 698 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 699 adhost->adh_remoteaddr); 700 } 701} 702 703/* 704 * Thread guards remote connections and reconnects when needed, handles 705 * signals, etc. 706 */ 707static void * 708guard_thread(void *arg __unused) 709{ 710 struct timespec timeout; 711 time_t lastcheck, now; 712 sigset_t mask; 713 int signo; 714 715 lastcheck = time(NULL); 716 717 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 718 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 719 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 720 721 timeout.tv_sec = ADIST_KEEPALIVE; 722 timeout.tv_nsec = 0; 723 signo = -1; 724 725 for (;;) { 726 switch (signo) { 727 case SIGINT: 728 case SIGTERM: 729 sigexit_received = true; 730 pjdlog_exitx(EX_OK, 731 "Termination signal received, exiting."); 732 break; 733 default: 734 break; 735 } 736 737 pjdlog_debug(3, "remote_guard: Checking connections."); 738 now = time(NULL); 739 if (lastcheck + ADIST_KEEPALIVE <= now) { 740 guard_check_connection(); 741 lastcheck = now; 742 } 743 signo = sigtimedwait(&mask, NULL, &timeout); 744 } 745 /* NOTREACHED */ 746 return (NULL); 747} 748 749void 750adist_sender(struct adist_config *config, struct adist_host *adh) 751{ 752 pthread_t td; 753 pid_t pid; 754 int error, mode, debuglevel; 755 756 /* 757 * Create communication channel for sending connection requests from 758 * child to parent. 759 */ 760 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 761 pjdlog_errno(LOG_ERR, 762 "Unable to create connection sockets between child and parent"); 763 return; 764 } 765 766 pid = fork(); 767 if (pid == -1) { 768 pjdlog_errno(LOG_ERR, "Unable to fork"); 769 proto_close(adh->adh_conn); 770 adh->adh_conn = NULL; 771 return; 772 } 773 774 if (pid > 0) { 775 /* This is parent. */ 776 adh->adh_worker_pid = pid; 777 /* Declare that we are receiver. */ 778 proto_recv(adh->adh_conn, NULL, 0); 779 return; 780 } 781 782 adcfg = config; 783 adhost = adh; 784 785 mode = pjdlog_mode_get(); 786 debuglevel = pjdlog_debug_get(); 787 788 /* Declare that we are sender. */ 789 proto_send(adhost->adh_conn, NULL, 0); 790 791 descriptors_cleanup(adhost); 792 793#ifdef TODO 794 descriptors_assert(adhost, mode); 795#endif 796 797 pjdlog_init(mode); 798 pjdlog_debug_set(debuglevel); 799 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 800 role2str(adhost->adh_role)); 801#ifdef HAVE_SETPROCTITLE 802 setproctitle("[%s] (%s) ", adhost->adh_name, 803 role2str(adhost->adh_role)); 804#endif 805 806 /* 807 * The sender process should be able to remove entries from its 808 * trail directory, but it should not be able to write to the 809 * trail files, only read from them. 810 */ 811 adist_trail = trail_new(adhost->adh_directory, false); 812 if (adist_trail == NULL) 813 exit(EX_OSFILE); 814 815 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 816 role2str(adhost->adh_role), adhost->adh_name) != 0) { 817 exit(EX_CONFIG); 818 } 819 pjdlog_info("Privileges successfully dropped."); 820 821 /* 822 * We can ignore wait_for_dir_init() failures. It will fall back to 823 * using sleep(3). 824 */ 825 (void)wait_for_dir_init(trail_dirfd(adist_trail)); 826 827 init_environment(); 828 if (sender_connect() == 0) { 829 pjdlog_info("Successfully connected to %s.", 830 adhost->adh_remoteaddr); 831 } 832 adhost->adh_reset = true; 833 834 /* 835 * Create the guard thread first, so we can handle signals from the 836 * very begining. 837 */ 838 error = pthread_create(&td, NULL, guard_thread, NULL); 839 PJDLOG_ASSERT(error == 0); 840 error = pthread_create(&td, NULL, send_thread, NULL); 841 PJDLOG_ASSERT(error == 0); 842 error = pthread_create(&td, NULL, recv_thread, NULL); 843 PJDLOG_ASSERT(error == 0); 844 (void)read_thread(NULL); 845} 846