sender.c revision 243730
1/*- 2 * Copyright (c) 2012 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 * 29 * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#1 $ 30 */ 31 32#include "config.h" 33 34#include <sys/param.h> 35#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 36#include <sys/endian.h> 37#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 38#ifdef HAVE_MACHINE_ENDIAN_H 39#include <machine/endian.h> 40#else /* !HAVE_MACHINE_ENDIAN_H */ 41#ifdef HAVE_ENDIAN_H 42#include <endian.h> 43#else /* !HAVE_ENDIAN_H */ 44#error "No supported endian.h" 45#endif /* !HAVE_ENDIAN_H */ 46#endif /* !HAVE_MACHINE_ENDIAN_H */ 47#include <compat/endian.h> 48#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 49#include <sys/queue.h> 50#include <sys/stat.h> 51#include <sys/wait.h> 52 53#include <stdio.h> 54#include <stdlib.h> 55#include <unistd.h> 56 57#include <ctype.h> 58#include <dirent.h> 59#include <err.h> 60#include <errno.h> 61#include <fcntl.h> 62#ifdef HAVE_LIBUTIL_H 63#include <libutil.h> 64#endif 65#include <signal.h> 66#include <string.h> 67#include <strings.h> 68 69#include <openssl/hmac.h> 70 71#ifndef HAVE_SIGTIMEDWAIT 72#include "sigtimedwait.h" 73#endif 74 75#include <pjdlog.h> 76 77#include "auditdistd.h" 78#include "proto.h" 79#include "sandbox.h" 80#include "subr.h" 81#include "synch.h" 82#include "trail.h" 83 84static struct adist_config *adcfg; 85static struct adist_host *adhost; 86 87static pthread_rwlock_t adist_remote_lock; 88static pthread_mutex_t adist_remote_mtx; 89static pthread_cond_t adist_remote_cond; 90static struct trail *adist_trail; 91 92static TAILQ_HEAD(, adreq) adist_free_list; 93static pthread_mutex_t adist_free_list_lock; 94static pthread_cond_t adist_free_list_cond; 95static TAILQ_HEAD(, adreq) adist_send_list; 96static pthread_mutex_t adist_send_list_lock; 97static pthread_cond_t adist_send_list_cond; 98static TAILQ_HEAD(, adreq) adist_recv_list; 99static pthread_mutex_t adist_recv_list_lock; 100static pthread_cond_t adist_recv_list_cond; 101 102static void 103init_environment(void) 104{ 105 struct adreq *adreq; 106 unsigned int ii; 107 108 rw_init(&adist_remote_lock); 109 mtx_init(&adist_remote_mtx); 110 cv_init(&adist_remote_cond); 111 TAILQ_INIT(&adist_free_list); 112 mtx_init(&adist_free_list_lock); 113 cv_init(&adist_free_list_cond); 114 TAILQ_INIT(&adist_send_list); 115 mtx_init(&adist_send_list_lock); 116 cv_init(&adist_send_list_cond); 117 TAILQ_INIT(&adist_recv_list); 118 mtx_init(&adist_recv_list_lock); 119 cv_init(&adist_recv_list_cond); 120 121 for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 122 adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 123 if (adreq == NULL) { 124 pjdlog_exitx(EX_TEMPFAIL, 125 "Unable to allocate %zu bytes of memory for adreq object.", 126 sizeof(*adreq) + ADIST_BUF_SIZE); 127 } 128 adreq->adr_byteorder = ADIST_BYTEORDER; 129 adreq->adr_cmd = ADIST_CMD_UNDEFINED; 130 adreq->adr_seq = 0; 131 adreq->adr_datasize = 0; 132 TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 133 } 134} 135 136static int 137sender_connect(void) 138{ 139 unsigned char rnd[32], hash[32], resp[32]; 140 struct proto_conn *conn; 141 char welcome[8]; 142 int16_t val; 143 144 val = 1; 145 if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 146 pjdlog_exit(EX_TEMPFAIL, 147 "Unable to send connection request to parent"); 148 } 149 if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 150 pjdlog_exit(EX_TEMPFAIL, 151 "Unable to receive reply to connection request from parent"); 152 } 153 if (val != 0) { 154 errno = val; 155 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 156 adhost->adh_remoteaddr); 157 return (-1); 158 } 159 if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 160 pjdlog_exit(EX_TEMPFAIL, 161 "Unable to receive connection from parent"); 162 } 163 if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 164 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 165 adhost->adh_remoteaddr); 166 proto_close(conn); 167 return (-1); 168 } 169 pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 170 /* Error in setting timeout is not critical, but why should it fail? */ 171 if (proto_timeout(conn, adcfg->adc_timeout) < 0) 172 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 173 else 174 pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 175 176 /* Exchange welcome message, which includes version number. */ 177 (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 178 if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 179 pjdlog_errno(LOG_WARNING, 180 "Unable to send welcome message to %s", 181 adhost->adh_remoteaddr); 182 proto_close(conn); 183 return (-1); 184 } 185 pjdlog_debug(1, "Welcome message sent (%s).", welcome); 186 bzero(welcome, sizeof(welcome)); 187 if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 188 pjdlog_errno(LOG_WARNING, 189 "Unable to receive welcome message from %s", 190 adhost->adh_remoteaddr); 191 proto_close(conn); 192 return (-1); 193 } 194 if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 195 !isdigit(welcome[6]) || welcome[7] != '\0') { 196 pjdlog_warning("Invalid welcome message from %s.", 197 adhost->adh_remoteaddr); 198 proto_close(conn); 199 return (-1); 200 } 201 pjdlog_debug(1, "Welcome message received (%s).", welcome); 202 /* 203 * Receiver can only reply with version number lower or equal to 204 * the one we sent. 205 */ 206 adhost->adh_version = atoi(welcome + 5); 207 if (adhost->adh_version > ADIST_VERSION) { 208 pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 209 adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 210 proto_close(conn); 211 return (-1); 212 } 213 214 pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 215 adhost->adh_remoteaddr); 216 217 if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 218 pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 219 adhost->adh_remoteaddr); 220 proto_close(conn); 221 return (-1); 222 } 223 pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 224 225 if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 226 pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 227 adhost->adh_remoteaddr); 228 proto_close(conn); 229 return (-1); 230 } 231 pjdlog_debug(1, "Challenge received."); 232 233 if (HMAC(EVP_sha256(), adhost->adh_password, 234 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 235 NULL) == NULL) { 236 pjdlog_warning("Unable to generate response."); 237 proto_close(conn); 238 return (-1); 239 } 240 pjdlog_debug(1, "Response generated."); 241 242 if (proto_send(conn, hash, sizeof(hash)) == -1) { 243 pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 244 adhost->adh_remoteaddr); 245 proto_close(conn); 246 return (-1); 247 } 248 pjdlog_debug(1, "Response sent."); 249 250 if (adist_random(rnd, sizeof(rnd)) == -1) { 251 pjdlog_warning("Unable to generate challenge."); 252 proto_close(conn); 253 return (-1); 254 } 255 pjdlog_debug(1, "Challenge generated."); 256 257 if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 258 pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 259 adhost->adh_remoteaddr); 260 proto_close(conn); 261 return (-1); 262 } 263 pjdlog_debug(1, "Challenge sent."); 264 265 if (proto_recv(conn, resp, sizeof(resp)) == -1) { 266 pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 267 adhost->adh_remoteaddr); 268 proto_close(conn); 269 return (-1); 270 } 271 pjdlog_debug(1, "Response received."); 272 273 if (HMAC(EVP_sha256(), adhost->adh_password, 274 (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 275 NULL) == NULL) { 276 pjdlog_warning("Unable to generate hash."); 277 proto_close(conn); 278 return (-1); 279 } 280 pjdlog_debug(1, "Hash generated."); 281 282 if (memcmp(resp, hash, sizeof(hash)) != 0) { 283 pjdlog_warning("Invalid response from %s (wrong password?).", 284 adhost->adh_remoteaddr); 285 proto_close(conn); 286 return (-1); 287 } 288 pjdlog_info("Receiver authenticated."); 289 290 if (proto_recv(conn, &adhost->adh_trail_offset, 291 sizeof(adhost->adh_trail_offset)) == -1) { 292 pjdlog_errno(LOG_WARNING, 293 "Unable to receive size of the most recent trail file from %s", 294 adhost->adh_remoteaddr); 295 proto_close(conn); 296 return (-1); 297 } 298 adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 299 if (proto_recv(conn, &adhost->adh_trail_name, 300 sizeof(adhost->adh_trail_name)) == -1) { 301 pjdlog_errno(LOG_WARNING, 302 "Unable to receive name of the most recent trail file from %s", 303 adhost->adh_remoteaddr); 304 proto_close(conn); 305 return (-1); 306 } 307 pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 308 adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 309 310 rw_wlock(&adist_remote_lock); 311 mtx_lock(&adist_remote_mtx); 312 PJDLOG_ASSERT(adhost->adh_remote == NULL); 313 PJDLOG_ASSERT(conn != NULL); 314 adhost->adh_remote = conn; 315 mtx_unlock(&adist_remote_mtx); 316 rw_unlock(&adist_remote_lock); 317 cv_signal(&adist_remote_cond); 318 319 return (0); 320} 321 322static void 323sender_disconnect(void) 324{ 325 326 rw_wlock(&adist_remote_lock); 327 /* 328 * Check for a race between dropping rlock and acquiring wlock - 329 * another thread can close connection in-between. 330 */ 331 if (adhost->adh_remote == NULL) { 332 rw_unlock(&adist_remote_lock); 333 return; 334 } 335 pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 336 proto_close(adhost->adh_remote); 337 mtx_lock(&adist_remote_mtx); 338 adhost->adh_remote = NULL; 339 adhost->adh_reset = true; 340 adhost->adh_trail_name[0] = '\0'; 341 adhost->adh_trail_offset = 0; 342 mtx_unlock(&adist_remote_mtx); 343 rw_unlock(&adist_remote_lock); 344 345 pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 346 347 /* Move all in-flight requests back onto free list. */ 348 mtx_lock(&adist_free_list_lock); 349 mtx_lock(&adist_send_list_lock); 350 TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 351 mtx_unlock(&adist_send_list_lock); 352 mtx_lock(&adist_recv_list_lock); 353 TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 354 mtx_unlock(&adist_recv_list_lock); 355 mtx_unlock(&adist_free_list_lock); 356} 357 358static void 359adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 360 size_t size) 361{ 362 static uint64_t seq = 1; 363 364 PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 365 366 switch (cmd) { 367 case ADIST_CMD_OPEN: 368 case ADIST_CMD_CLOSE: 369 PJDLOG_ASSERT(data != NULL && size == 0); 370 size = strlen(data) + 1; 371 break; 372 case ADIST_CMD_APPEND: 373 PJDLOG_ASSERT(data != NULL && size > 0); 374 break; 375 case ADIST_CMD_KEEPALIVE: 376 case ADIST_CMD_ERROR: 377 PJDLOG_ASSERT(data == NULL && size == 0); 378 break; 379 default: 380 PJDLOG_ABORT("Invalid command (%hhu).", cmd); 381 } 382 383 adreq->adr_cmd = cmd; 384 adreq->adr_seq = seq++; 385 adreq->adr_datasize = size; 386 /* Don't copy if data is already in out buffer. */ 387 if (data != NULL && data != adreq->adr_data) 388 bcopy(data, adreq->adr_data, size); 389} 390 391static bool 392read_thread_wait(void) 393{ 394 bool newfile = false; 395 396 mtx_lock(&adist_remote_mtx); 397 if (adhost->adh_reset) { 398 adhost->adh_reset = false; 399 if (trail_filefd(adist_trail) != -1) 400 trail_close(adist_trail); 401 trail_reset(adist_trail); 402 while (adhost->adh_remote == NULL) 403 cv_wait(&adist_remote_cond, &adist_remote_mtx); 404 trail_start(adist_trail, adhost->adh_trail_name, 405 adhost->adh_trail_offset); 406 newfile = true; 407 } 408 mtx_unlock(&adist_remote_mtx); 409 while (trail_filefd(adist_trail) == -1) { 410 newfile = true; 411 wait_for_dir(); 412 if (trail_filefd(adist_trail) == -1) 413 trail_next(adist_trail); 414 } 415 if (newfile) { 416 pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 417 adhost->adh_directory, 418 trail_filename(adist_trail)); 419 (void)wait_for_file_init(trail_filefd(adist_trail)); 420 } 421 return (newfile); 422} 423 424static void * 425read_thread(void *arg __unused) 426{ 427 struct adreq *adreq; 428 ssize_t done; 429 bool newfile; 430 431 pjdlog_debug(1, "%s started.", __func__); 432 433 for (;;) { 434 newfile = read_thread_wait(); 435 QUEUE_TAKE(adreq, &adist_free_list, 0); 436 if (newfile) { 437 adreq_fill(adreq, ADIST_CMD_OPEN, 438 trail_filename(adist_trail), 0); 439 newfile = false; 440 goto move; 441 } 442 443 done = read(trail_filefd(adist_trail), adreq->adr_data, 444 ADIST_BUF_SIZE); 445 if (done == -1) { 446 off_t offset; 447 int error; 448 449 error = errno; 450 offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 451 errno = error; 452 pjdlog_errno(LOG_ERR, 453 "Error while reading \"%s/%s\" at offset %jd", 454 adhost->adh_directory, trail_filename(adist_trail), 455 offset); 456 trail_close(adist_trail); 457 adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 458 goto move; 459 } else if (done == 0) { 460 /* End of file. */ 461 pjdlog_debug(3, "End of \"%s/%s\".", 462 adhost->adh_directory, trail_filename(adist_trail)); 463 if (!trail_switch(adist_trail)) { 464 /* More audit records can arrive. */ 465 mtx_lock(&adist_free_list_lock); 466 TAILQ_INSERT_TAIL(&adist_free_list, adreq, 467 adr_next); 468 mtx_unlock(&adist_free_list_lock); 469 wait_for_file(); 470 continue; 471 } 472 adreq_fill(adreq, ADIST_CMD_CLOSE, 473 trail_filename(adist_trail), 0); 474 trail_close(adist_trail); 475 goto move; 476 } 477 478 adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 479move: 480 pjdlog_debug(3, 481 "read thread: Moving request %p to the send queue (%hhu).", 482 adreq, adreq->adr_cmd); 483 QUEUE_INSERT(adreq, &adist_send_list); 484 } 485 /* NOTREACHED */ 486 return (NULL); 487} 488 489static void 490keepalive_send(void) 491{ 492 struct adreq *adreq; 493 494 rw_rlock(&adist_remote_lock); 495 if (adhost->adh_remote == NULL) { 496 rw_unlock(&adist_remote_lock); 497 return; 498 } 499 rw_unlock(&adist_remote_lock); 500 501 mtx_lock(&adist_free_list_lock); 502 adreq = TAILQ_FIRST(&adist_free_list); 503 if (adreq != NULL) 504 TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 505 mtx_unlock(&adist_free_list_lock); 506 if (adreq == NULL) 507 return; 508 509 adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 510 511 QUEUE_INSERT(adreq, &adist_send_list); 512 513 pjdlog_debug(3, "keepalive_send: Request sent."); 514} 515 516/* 517 * Thread sends request to secondary node. 518 */ 519static void * 520send_thread(void *arg __unused) 521{ 522 time_t lastcheck, now; 523 struct adreq *adreq; 524 525 pjdlog_debug(1, "%s started.", __func__); 526 527 lastcheck = time(NULL); 528 529 for (;;) { 530 pjdlog_debug(3, "send thread: Taking request."); 531 for (;;) { 532 QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 533 if (adreq != NULL) 534 break; 535 now = time(NULL); 536 if (lastcheck + ADIST_KEEPALIVE <= now) { 537 keepalive_send(); 538 lastcheck = now; 539 } 540 } 541 PJDLOG_ASSERT(adreq != NULL); 542 pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 543 adreq->adr_cmd); 544 /* 545 * Protect connection from disappearing. 546 */ 547 rw_rlock(&adist_remote_lock); 548 /* 549 * Move the request to the recv queue first to avoid race 550 * where the recv thread receives the reply before we move 551 * the request to the recv queue. 552 */ 553 QUEUE_INSERT(adreq, &adist_recv_list); 554 if (adhost->adh_remote == NULL || 555 proto_send(adhost->adh_remote, &adreq->adr_packet, 556 ADPKT_SIZE(adreq)) == -1) { 557 rw_unlock(&adist_remote_lock); 558 pjdlog_debug(1, 559 "send thread: (%p) Unable to send request.", adreq); 560 if (adhost->adh_remote != NULL) 561 sender_disconnect(); 562 continue; 563 } else { 564 pjdlog_debug(3, "Request %p sent successfully.", adreq); 565 adreq_log(LOG_DEBUG, 2, -1, adreq, 566 "send: (%p) Request sent: ", adreq); 567 rw_unlock(&adist_remote_lock); 568 } 569 } 570 /* NOTREACHED */ 571 return (NULL); 572} 573 574static void 575adrep_decode_header(struct adrep *adrep) 576{ 577 578 /* Byte-swap only is the receiver is using different byte order. */ 579 if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 580 adrep->adrp_byteorder = ADIST_BYTEORDER; 581 adrep->adrp_seq = bswap64(adrep->adrp_seq); 582 adrep->adrp_error = bswap16(adrep->adrp_error); 583 } 584} 585 586/* 587 * Thread receives answer from secondary node and passes it to ggate_send 588 * thread. 589 */ 590static void * 591recv_thread(void *arg __unused) 592{ 593 struct adrep adrep; 594 struct adreq *adreq; 595 596 pjdlog_debug(1, "%s started.", __func__); 597 598 for (;;) { 599 /* Wait until there is anything to receive. */ 600 QUEUE_WAIT(&adist_recv_list); 601 pjdlog_debug(3, "recv thread: Got something."); 602 rw_rlock(&adist_remote_lock); 603 if (adhost->adh_remote == NULL) { 604 /* 605 * Connection is dead. 606 * XXX: We shouldn't be here. 607 */ 608 rw_unlock(&adist_remote_lock); 609 continue; 610 } 611 if (proto_recv(adhost->adh_remote, &adrep, 612 sizeof(adrep)) == -1) { 613 rw_unlock(&adist_remote_lock); 614 pjdlog_errno(LOG_ERR, "Unable to receive reply"); 615 sender_disconnect(); 616 continue; 617 } 618 rw_unlock(&adist_remote_lock); 619 adrep_decode_header(&adrep); 620 /* 621 * Find the request that was just confirmed. 622 */ 623 mtx_lock(&adist_recv_list_lock); 624 TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 625 if (adreq->adr_seq == adrep.adrp_seq) { 626 TAILQ_REMOVE(&adist_recv_list, adreq, 627 adr_next); 628 break; 629 } 630 } 631 if (adreq == NULL) { 632 /* 633 * If we disconnected in the meantime, just continue. 634 * On disconnect sender_disconnect() clears the queue, 635 * we can use that. 636 */ 637 if (TAILQ_EMPTY(&adist_recv_list)) { 638 rw_unlock(&adist_remote_lock); 639 continue; 640 } 641 mtx_unlock(&adist_recv_list_lock); 642 pjdlog_error("Found no request matching received 'seq' field (%ju).", 643 (uintmax_t)adrep.adrp_seq); 644 sender_disconnect(); 645 continue; 646 } 647 mtx_unlock(&adist_recv_list_lock); 648 adreq_log(LOG_DEBUG, 2, -1, adreq, 649 "recv thread: (%p) Request confirmed: ", adreq); 650 pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 651 adreq->adr_cmd); 652 if (adrep.adrp_error != 0) { 653 pjdlog_error("Receiver returned error (%s), disconnecting.", 654 adist_errstr((int)adrep.adrp_error)); 655 sender_disconnect(); 656 continue; 657 } 658 if (adreq->adr_cmd == ADIST_CMD_CLOSE) 659 trail_unlink(adist_trail, adreq->adr_data); 660 pjdlog_debug(3, "Request received successfully."); 661 QUEUE_INSERT(adreq, &adist_free_list); 662 } 663 /* NOTREACHED */ 664 return (NULL); 665} 666 667static void 668guard_check_connection(void) 669{ 670 671 PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 672 673 rw_rlock(&adist_remote_lock); 674 if (adhost->adh_remote != NULL) { 675 rw_unlock(&adist_remote_lock); 676 pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 677 adhost->adh_remoteaddr); 678 return; 679 } 680 681 /* 682 * Upgrade the lock. It doesn't have to be atomic as no other thread 683 * can change connection status from disconnected to connected. 684 */ 685 rw_unlock(&adist_remote_lock); 686 pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 687 adhost->adh_remoteaddr); 688 if (sender_connect() == 0) { 689 pjdlog_info("Successfully reconnected to %s.", 690 adhost->adh_remoteaddr); 691 } else { 692 pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 693 adhost->adh_remoteaddr); 694 } 695} 696 697/* 698 * Thread guards remote connections and reconnects when needed, handles 699 * signals, etc. 700 */ 701static void * 702guard_thread(void *arg __unused) 703{ 704 struct timespec timeout; 705 time_t lastcheck, now; 706 sigset_t mask; 707 int signo; 708 709 lastcheck = time(NULL); 710 711 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 712 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 713 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 714 715 timeout.tv_sec = ADIST_KEEPALIVE; 716 timeout.tv_nsec = 0; 717 signo = -1; 718 719 for (;;) { 720 switch (signo) { 721 case SIGINT: 722 case SIGTERM: 723 sigexit_received = true; 724 pjdlog_exitx(EX_OK, 725 "Termination signal received, exiting."); 726 break; 727 default: 728 break; 729 } 730 731 pjdlog_debug(3, "remote_guard: Checking connections."); 732 now = time(NULL); 733 if (lastcheck + ADIST_KEEPALIVE <= now) { 734 guard_check_connection(); 735 lastcheck = now; 736 } 737 signo = sigtimedwait(&mask, NULL, &timeout); 738 } 739 /* NOTREACHED */ 740 return (NULL); 741} 742 743void 744adist_sender(struct adist_config *config, struct adist_host *adh) 745{ 746 pthread_t td; 747 pid_t pid; 748 int error, mode, debuglevel; 749 750 /* 751 * Create communication channel for sending connection requests from 752 * child to parent. 753 */ 754 if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 755 pjdlog_errno(LOG_ERR, 756 "Unable to create connection sockets between child and parent"); 757 return; 758 } 759 760 pid = fork(); 761 if (pid == -1) { 762 pjdlog_errno(LOG_ERR, "Unable to fork"); 763 proto_close(adh->adh_conn); 764 adh->adh_conn = NULL; 765 return; 766 } 767 768 if (pid > 0) { 769 /* This is parent. */ 770 adh->adh_worker_pid = pid; 771 /* Declare that we are receiver. */ 772 proto_recv(adh->adh_conn, NULL, 0); 773 return; 774 } 775 776 adcfg = config; 777 adhost = adh; 778 779 mode = pjdlog_mode_get(); 780 debuglevel = pjdlog_debug_get(); 781 782 /* Declare that we are sender. */ 783 proto_send(adhost->adh_conn, NULL, 0); 784 785 descriptors_cleanup(adhost); 786 787#ifdef TODO 788 descriptors_assert(adhost, mode); 789#endif 790 791 pjdlog_init(mode); 792 pjdlog_debug_set(debuglevel); 793 pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 794 role2str(adhost->adh_role)); 795#ifdef HAVE_SETPROCTITLE 796 setproctitle("[%s] (%s) ", adhost->adh_name, 797 role2str(adhost->adh_role)); 798#endif 799 800 /* 801 * The sender process should be able to remove entries from its 802 * trail directory, but it should not be able to write to the 803 * trail files, only read from them. 804 */ 805 adist_trail = trail_new(adhost->adh_directory, false); 806 if (adist_trail == NULL) 807 exit(EX_OSFILE); 808 809 if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 810 role2str(adhost->adh_role), adhost->adh_name) != 0) { 811 exit(EX_CONFIG); 812 } 813 pjdlog_info("Privileges successfully dropped."); 814 815 /* 816 * We can ignore wait_for_dir_init() failures. It will fall back to 817 * using sleep(3). 818 */ 819 (void)wait_for_dir_init(trail_dirfd(adist_trail)); 820 821 init_environment(); 822 if (sender_connect() == 0) { 823 pjdlog_info("Successfully connected to %s.", 824 adhost->adh_remoteaddr); 825 } 826 adhost->adh_reset = true; 827 828 /* 829 * Create the guard thread first, so we can handle signals from the 830 * very begining. 831 */ 832 error = pthread_create(&td, NULL, guard_thread, NULL); 833 PJDLOG_ASSERT(error == 0); 834 error = pthread_create(&td, NULL, send_thread, NULL); 835 PJDLOG_ASSERT(error == 0); 836 error = pthread_create(&td, NULL, recv_thread, NULL); 837 PJDLOG_ASSERT(error == 0); 838 (void)read_thread(NULL); 839} 840