secondary.c revision 229509
11590Srgrimes/*- 21590Srgrimes * Copyright (c) 2009-2010 The FreeBSD Foundation 31590Srgrimes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 41590Srgrimes * All rights reserved. 51590Srgrimes * 61590Srgrimes * This software was developed by Pawel Jakub Dawidek under sponsorship from 71590Srgrimes * the FreeBSD Foundation. 81590Srgrimes * 91590Srgrimes * Redistribution and use in source and binary forms, with or without 101590Srgrimes * modification, are permitted provided that the following conditions 111590Srgrimes * are met: 121590Srgrimes * 1. Redistributions of source code must retain the above copyright 131590Srgrimes * notice, this list of conditions and the following disclaimer. 141590Srgrimes * 2. Redistributions in binary form must reproduce the above copyright 151590Srgrimes * notice, this list of conditions and the following disclaimer in the 161590Srgrimes * documentation and/or other materials provided with the distribution. 171590Srgrimes * 181590Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 191590Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 201590Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 211590Srgrimes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 221590Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 231590Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 241590Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 251590Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 261590Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 271590Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 281590Srgrimes * SUCH DAMAGE. 291590Srgrimes */ 301590Srgrimes 311590Srgrimes#include <sys/cdefs.h> 321590Srgrimes__FBSDID("$FreeBSD: stable/9/sbin/hastd/secondary.c 229509 2012-01-04 17:22:10Z trociny $"); 331590Srgrimes 341590Srgrimes#include <sys/param.h> 351590Srgrimes#include <sys/time.h> 361590Srgrimes#include <sys/bio.h> 3794609Sdwmalone#include <sys/disk.h> 3894609Sdwmalone#include <sys/stat.h> 3994609Sdwmalone 4094609Sdwmalone#include <err.h> 4194609Sdwmalone#include <errno.h> 4294609Sdwmalone#include <fcntl.h> 4387712Smarkm#include <libgeom.h> 4487712Smarkm#include <pthread.h> 4587712Smarkm#include <signal.h> 461590Srgrimes#include <stdint.h> 471590Srgrimes#include <stdio.h> 481590Srgrimes#include <string.h> 491590Srgrimes#include <sysexits.h> 5087712Smarkm#include <unistd.h> 5187712Smarkm 521590Srgrimes#include <activemap.h> 53139994Sdwmalone#include <nv.h> 541590Srgrimes#include <pjdlog.h> 551590Srgrimes 561590Srgrimes#include "control.h" 5787712Smarkm#include "event.h" 5887712Smarkm#include "hast.h" 591590Srgrimes#include "hast_proto.h" 601590Srgrimes#include "hastd.h" 61193488Sbrian#include "hooks.h" 62193488Sbrian#include "metadata.h" 631590Srgrimes#include "proto.h" 641590Srgrimes#include "subr.h" 651590Srgrimes#include "synch.h" 661590Srgrimes 671590Srgrimesstruct hio { 681590Srgrimes uint64_t hio_seq; 691590Srgrimes int hio_error; 701590Srgrimes void *hio_data; 711590Srgrimes uint8_t hio_cmd; 721590Srgrimes uint64_t hio_offset; 731590Srgrimes uint64_t hio_length; 741590Srgrimes TAILQ_ENTRY(hio) hio_next; 751590Srgrimes}; 761590Srgrimes 771590Srgrimesstatic struct hast_resource *gres; 781590Srgrimes 791590Srgrimes/* 801590Srgrimes * Free list holds unused structures. When free list is empty, we have to wait 811590Srgrimes * until some in-progress requests are freed. 821590Srgrimes */ 83193488Sbrianstatic TAILQ_HEAD(, hio) hio_free_list; 841590Srgrimesstatic pthread_mutex_t hio_free_list_lock; 851590Srgrimesstatic pthread_cond_t hio_free_list_cond; 861590Srgrimes/* 871590Srgrimes * Disk thread (the one that do I/O requests) takes requests from this list. 881590Srgrimes */ 89193488Sbrianstatic TAILQ_HEAD(, hio) hio_disk_list; 901590Srgrimesstatic pthread_mutex_t hio_disk_list_lock; 911590Srgrimesstatic pthread_cond_t hio_disk_list_cond; 921590Srgrimes/* 931590Srgrimes * There is one recv list for every component, although local components don't 94193488Sbrian * use recv lists as local requests are done synchronously. 951590Srgrimes */ 961590Srgrimesstatic TAILQ_HEAD(, hio) hio_send_list; 971590Srgrimesstatic pthread_mutex_t hio_send_list_lock; 98193488Sbrianstatic pthread_cond_t hio_send_list_cond; 991590Srgrimes 1001590Srgrimes/* 101193488Sbrian * Maximum number of outstanding I/O requests. 1021590Srgrimes */ 10387712Smarkm#define HAST_HIO_MAX 256 10494178Smurray 1051590Srgrimesstatic void *recv_thread(void *arg); 1061590Srgrimesstatic void *disk_thread(void *arg); 1071590Srgrimesstatic void *send_thread(void *arg); 1081590Srgrimes 1091590Srgrimes#define QUEUE_INSERT(name, hio) do { \ 1101590Srgrimes bool _wakeup; \ 1111590Srgrimes \ 112193488Sbrian mtx_lock(&hio_##name##_list_lock); \ 1131590Srgrimes _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 11474876Sdwmalone TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 11574876Sdwmalone mtx_unlock(&hio_##name##_list_lock); \ 11674876Sdwmalone if (_wakeup) \ 1171590Srgrimes cv_signal(&hio_##name##_list_cond); \ 1181590Srgrimes} while (0) 1191590Srgrimes#define QUEUE_TAKE(name, hio) do { \ 1201590Srgrimes mtx_lock(&hio_##name##_list_lock); \ 12174876Sdwmalone while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 12274876Sdwmalone cv_wait(&hio_##name##_list_cond, \ 12374876Sdwmalone &hio_##name##_list_lock); \ 1241590Srgrimes } \ 12574876Sdwmalone TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 12674876Sdwmalone mtx_unlock(&hio_##name##_list_lock); \ 12774876Sdwmalone} while (0) 12874876Sdwmalone 12974876Sdwmalonestatic void 13074876Sdwmalonehio_clear(struct hio *hio) 13174876Sdwmalone{ 132139994Sdwmalone 133139994Sdwmalone hio->hio_seq = 0; 13474876Sdwmalone hio->hio_error = 0; 135193488Sbrian hio->hio_cmd = HIO_UNDEF; 13674876Sdwmalone hio->hio_offset = 0; 13774876Sdwmalone hio->hio_length = 0; 13874876Sdwmalone} 13974876Sdwmalone 14074876Sdwmalonestatic void 14174876Sdwmaloneinit_environment(void) 14274876Sdwmalone{ 14374876Sdwmalone struct hio *hio; 14474876Sdwmalone unsigned int ii; 14574876Sdwmalone 14674876Sdwmalone /* 14774876Sdwmalone * Initialize lists, their locks and theirs condition variables. 14874876Sdwmalone */ 1491590Srgrimes TAILQ_INIT(&hio_free_list); 15074876Sdwmalone mtx_init(&hio_free_list_lock); 15174876Sdwmalone cv_init(&hio_free_list_cond); 152193488Sbrian TAILQ_INIT(&hio_disk_list); 15374876Sdwmalone mtx_init(&hio_disk_list_lock); 15474876Sdwmalone cv_init(&hio_disk_list_cond); 15574876Sdwmalone TAILQ_INIT(&hio_send_list); 15674876Sdwmalone mtx_init(&hio_send_list_lock); 1571590Srgrimes cv_init(&hio_send_list_cond); 15874876Sdwmalone 15974876Sdwmalone /* 16074876Sdwmalone * Allocate requests pool and initialize requests. 16174876Sdwmalone */ 16274876Sdwmalone for (ii = 0; ii < HAST_HIO_MAX; ii++) { 16374876Sdwmalone hio = malloc(sizeof(*hio)); 16474876Sdwmalone if (hio == NULL) { 1651590Srgrimes pjdlog_exitx(EX_TEMPFAIL, 16674876Sdwmalone "Unable to allocate memory (%zu bytes) for hio request.", 16774876Sdwmalone sizeof(*hio)); 168193488Sbrian } 16974876Sdwmalone hio->hio_data = malloc(MAXPHYS); 17074876Sdwmalone if (hio->hio_data == NULL) { 17174876Sdwmalone pjdlog_exitx(EX_TEMPFAIL, 172193488Sbrian "Unable to allocate memory (%zu bytes) for gctl_data.", 1731590Srgrimes (size_t)MAXPHYS); 1741590Srgrimes } 1751590Srgrimes hio_clear(hio); 1761590Srgrimes TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 1771590Srgrimes } 1781590Srgrimes} 1791590Srgrimes 1801590Srgrimesstatic void 1811590Srgrimesinit_local(struct hast_resource *res) 1821590Srgrimes{ 1831590Srgrimes 1841590Srgrimes if (metadata_read(res, true) < 0) 1851590Srgrimes exit(EX_NOINPUT); 1861590Srgrimes} 1871590Srgrimes 1881590Srgrimesstatic void 1891590Srgrimesinit_remote(struct hast_resource *res, struct nv *nvin) 1901590Srgrimes{ 1911590Srgrimes uint64_t resuid; 1921590Srgrimes struct nv *nvout; 193193488Sbrian unsigned char *map; 1941590Srgrimes size_t mapsize; 19569552Sasmodai 19669552Sasmodai#ifdef notyet 19769552Sasmodai /* Setup direction. */ 1981590Srgrimes if (proto_send(res->hr_remoteout, NULL, 0) == -1) 1991590Srgrimes pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 200173285Scharnier#endif 2011590Srgrimes 2021590Srgrimes nvout = nv_alloc(); 2031590Srgrimes nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 2041590Srgrimes nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 2051590Srgrimes resuid = nv_get_uint64(nvin, "resuid"); 2061590Srgrimes res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 2071590Srgrimes res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 2081590Srgrimes nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 2091590Srgrimes nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 2101590Srgrimes mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 21117825Speter METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 2121590Srgrimes map = malloc(mapsize); 2131590Srgrimes if (map == NULL) { 2141590Srgrimes pjdlog_exitx(EX_TEMPFAIL, 2151590Srgrimes "Unable to allocate memory (%zu bytes) for activemap.", 2161590Srgrimes mapsize); 2171590Srgrimes } 2181590Srgrimes /* 21994609Sdwmalone * When we work as primary and secondary is missing we will increase 22094609Sdwmalone * localcnt in our metadata. When secondary is connected and synced 22194609Sdwmalone * we make localcnt be equal to remotecnt, which means nodes are more 22294609Sdwmalone * or less in sync. 2231590Srgrimes * Split-brain condition is when both nodes are not able to communicate 2241590Srgrimes * and are both configured as primary nodes. In turn, they can both 2251590Srgrimes * make incompatible changes to the data and we have to detect that. 2261590Srgrimes * Under split-brain condition we will increase our localcnt on first 2271590Srgrimes * write and remote node will increase its localcnt on first write. 2281590Srgrimes * When we connect we can see that primary's localcnt is greater than 22917339Sadam * our remotecnt (primary was modified while we weren't watching) and 230193488Sbrian * our localcnt is greater than primary's remotecnt (we were modified 23117339Sadam * while primary wasn't watching). 23217339Sadam * There are many possible combinations which are all gathered below. 23317339Sadam * Don't pay too much attention to exact numbers, the more important 2341590Srgrimes * is to compare them. We compare secondary's local with primary's 2351590Srgrimes * remote and secondary's remote with primary's local. 2361590Srgrimes * Note that every case where primary's localcnt is smaller than 2371590Srgrimes * secondary's remotecnt and where secondary's localcnt is smaller than 238143891Siedowse * primary's remotecnt should be impossible in practise. We will perform 239143891Siedowse * full synchronization then. Those cases are marked with an asterisk. 2401590Srgrimes * Regular synchronization means that only extents marked as dirty are 2411590Srgrimes * synchronized (regular synchronization). 2421590Srgrimes * 2431590Srgrimes * SECONDARY METADATA PRIMARY METADATA 2441590Srgrimes * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 2451590Srgrimes * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 2461590Srgrimes * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 2471590Srgrimes * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 2481590Srgrimes * regular sync from secondary. 2491590Srgrimes * local=3 remote=3 local=3 remote=3 Regular sync just in case. 250139994Sdwmalone * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 2511590Srgrimes * local=3 remote=3 local=4 remote=2 Split-brain condition. 2521590Srgrimes * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 2531590Srgrimes * regular sync from primary. 2541590Srgrimes * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 2551590Srgrimes */ 2561590Srgrimes if (res->hr_resuid == 0) { 2571590Srgrimes /* 2581590Srgrimes * Provider is used for the first time. If primary node done no 2591590Srgrimes * writes yet as well (we will find "virgin" argument) then 2601590Srgrimes * there is no need to synchronize anything. If primary node 2611590Srgrimes * done any writes already we have to synchronize everything. 2621590Srgrimes */ 2631590Srgrimes PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 2641590Srgrimes res->hr_resuid = resuid; 2651590Srgrimes if (metadata_write(res) < 0) 2661590Srgrimes exit(EX_NOINPUT); 2671590Srgrimes if (nv_exists(nvin, "virgin")) { 2681590Srgrimes free(map); 2691590Srgrimes map = NULL; 2701590Srgrimes mapsize = 0; 2711590Srgrimes } else { 2721590Srgrimes memset(map, 0xff, mapsize); 2731590Srgrimes } 2741590Srgrimes nv_add_int8(nvout, 1, "virgin"); 2751590Srgrimes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 2761590Srgrimes } else if (res->hr_resuid != resuid) { 2771590Srgrimes char errmsg[256]; 2781590Srgrimes 2791590Srgrimes free(map); 2801590Srgrimes (void)snprintf(errmsg, sizeof(errmsg), 2811590Srgrimes "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 2821590Srgrimes (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 2831590Srgrimes pjdlog_error("%s", errmsg); 2841590Srgrimes nv_add_string(nvout, errmsg, "errmsg"); 2851590Srgrimes if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 2861590Srgrimes pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 2871590Srgrimes res->hr_remoteaddr); 2881590Srgrimes } 289 nv_free(nvout); 290 exit(EX_CONFIG); 291 } else if ( 292 /* Is primary out-of-date? */ 293 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 294 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 295 /* Are the nodes more or less in sync? */ 296 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 297 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 298 /* Is secondary out-of-date? */ 299 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 300 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 301 /* 302 * Nodes are more or less in sync or one of the nodes is 303 * out-of-date. 304 * It doesn't matter at this point which one, we just have to 305 * send out local bitmap to the remote node. 306 */ 307 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 308 (ssize_t)mapsize) { 309 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 310 } 311 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 312 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 313 /* Primary is out-of-date, sync from secondary. */ 314 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 315 } else { 316 /* 317 * Secondary is out-of-date or counts match. 318 * Sync from primary. 319 */ 320 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 321 } 322 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 323 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 324 /* 325 * Not good, we have split-brain condition. 326 */ 327 free(map); 328 pjdlog_error("Split-brain detected, exiting."); 329 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 330 if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 331 pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 332 res->hr_remoteaddr); 333 } 334 nv_free(nvout); 335 /* Exit on split-brain. */ 336 event_send(res, EVENT_SPLITBRAIN); 337 exit(EX_CONFIG); 338 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 339 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 340 /* 341 * This should never happen in practise, but we will perform 342 * full synchronization. 343 */ 344 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 345 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 346 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 347 METADATA_SIZE, res->hr_extentsize, 348 res->hr_local_sectorsize); 349 memset(map, 0xff, mapsize); 350 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 351 /* In this one of five cases sync from secondary. */ 352 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 353 } else { 354 /* For the rest four cases sync from primary. */ 355 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 356 } 357 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 358 (uintmax_t)res->hr_primary_localcnt, 359 (uintmax_t)res->hr_primary_remotecnt, 360 (uintmax_t)res->hr_secondary_localcnt, 361 (uintmax_t)res->hr_secondary_remotecnt); 362 } 363 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 364 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 365 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 366 res->hr_remoteaddr); 367 } 368 if (map != NULL) 369 free(map); 370 nv_free(nvout); 371#ifdef notyet 372 /* Setup direction. */ 373 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 374 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 375#endif 376} 377 378void 379hastd_secondary(struct hast_resource *res, struct nv *nvin) 380{ 381 sigset_t mask; 382 pthread_t td; 383 pid_t pid; 384 int error, mode, debuglevel; 385 386 /* 387 * Create communication channel between parent and child. 388 */ 389 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 390 KEEP_ERRNO((void)pidfile_remove(pfh)); 391 pjdlog_exit(EX_OSERR, 392 "Unable to create control sockets between parent and child"); 393 } 394 /* 395 * Create communication channel between child and parent. 396 */ 397 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 398 KEEP_ERRNO((void)pidfile_remove(pfh)); 399 pjdlog_exit(EX_OSERR, 400 "Unable to create event sockets between child and parent"); 401 } 402 403 pid = fork(); 404 if (pid < 0) { 405 KEEP_ERRNO((void)pidfile_remove(pfh)); 406 pjdlog_exit(EX_OSERR, "Unable to fork"); 407 } 408 409 if (pid > 0) { 410 /* This is parent. */ 411 proto_close(res->hr_remotein); 412 res->hr_remotein = NULL; 413 proto_close(res->hr_remoteout); 414 res->hr_remoteout = NULL; 415 /* Declare that we are receiver. */ 416 proto_recv(res->hr_event, NULL, 0); 417 /* Declare that we are sender. */ 418 proto_send(res->hr_ctrl, NULL, 0); 419 res->hr_workerpid = pid; 420 return; 421 } 422 423 gres = res; 424 mode = pjdlog_mode_get(); 425 debuglevel = pjdlog_debug_get(); 426 427 /* Declare that we are sender. */ 428 proto_send(res->hr_event, NULL, 0); 429 /* Declare that we are receiver. */ 430 proto_recv(res->hr_ctrl, NULL, 0); 431 descriptors_cleanup(res); 432 433 descriptors_assert(res, mode); 434 435 pjdlog_init(mode); 436 pjdlog_debug_set(debuglevel); 437 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 438 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 439 440 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 441 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 442 443 /* Error in setting timeout is not critical, but why should it fail? */ 444 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) < 0) 445 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 446 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 447 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 448 449 init_local(res); 450 init_environment(); 451 452 if (drop_privs(res) != 0) 453 exit(EX_CONFIG); 454 pjdlog_info("Privileges successfully dropped."); 455 456 /* 457 * Create the control thread before sending any event to the parent, 458 * as we can deadlock when parent sends control request to worker, 459 * but worker has no control thread started yet, so parent waits. 460 * In the meantime worker sends an event to the parent, but parent 461 * is unable to handle the event, because it waits for control 462 * request response. 463 */ 464 error = pthread_create(&td, NULL, ctrl_thread, res); 465 PJDLOG_ASSERT(error == 0); 466 467 init_remote(res, nvin); 468 event_send(res, EVENT_CONNECT); 469 470 error = pthread_create(&td, NULL, recv_thread, res); 471 PJDLOG_ASSERT(error == 0); 472 error = pthread_create(&td, NULL, disk_thread, res); 473 PJDLOG_ASSERT(error == 0); 474 (void)send_thread(res); 475} 476 477static void 478reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 479{ 480 char msg[1024]; 481 va_list ap; 482 int len; 483 484 va_start(ap, fmt); 485 len = vsnprintf(msg, sizeof(msg), fmt, ap); 486 va_end(ap); 487 if ((size_t)len < sizeof(msg)) { 488 switch (hio->hio_cmd) { 489 case HIO_READ: 490 (void)snprintf(msg + len, sizeof(msg) - len, 491 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 492 (uintmax_t)hio->hio_length); 493 break; 494 case HIO_DELETE: 495 (void)snprintf(msg + len, sizeof(msg) - len, 496 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 497 (uintmax_t)hio->hio_length); 498 break; 499 case HIO_FLUSH: 500 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 501 break; 502 case HIO_WRITE: 503 (void)snprintf(msg + len, sizeof(msg) - len, 504 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 505 (uintmax_t)hio->hio_length); 506 break; 507 case HIO_KEEPALIVE: 508 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 509 break; 510 default: 511 (void)snprintf(msg + len, sizeof(msg) - len, 512 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 513 break; 514 } 515 } 516 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 517} 518 519static int 520requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 521{ 522 523 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 524 if (hio->hio_cmd == 0) { 525 pjdlog_error("Header contains no 'cmd' field."); 526 hio->hio_error = EINVAL; 527 goto end; 528 } 529 if (hio->hio_cmd != HIO_KEEPALIVE) { 530 hio->hio_seq = nv_get_uint64(nv, "seq"); 531 if (hio->hio_seq == 0) { 532 pjdlog_error("Header contains no 'seq' field."); 533 hio->hio_error = EINVAL; 534 goto end; 535 } 536 } 537 switch (hio->hio_cmd) { 538 case HIO_FLUSH: 539 case HIO_KEEPALIVE: 540 break; 541 case HIO_READ: 542 case HIO_WRITE: 543 case HIO_DELETE: 544 hio->hio_offset = nv_get_uint64(nv, "offset"); 545 if (nv_error(nv) != 0) { 546 pjdlog_error("Header is missing 'offset' field."); 547 hio->hio_error = EINVAL; 548 goto end; 549 } 550 hio->hio_length = nv_get_uint64(nv, "length"); 551 if (nv_error(nv) != 0) { 552 pjdlog_error("Header is missing 'length' field."); 553 hio->hio_error = EINVAL; 554 goto end; 555 } 556 if (hio->hio_length == 0) { 557 pjdlog_error("Data length is zero."); 558 hio->hio_error = EINVAL; 559 goto end; 560 } 561 if (hio->hio_length > MAXPHYS) { 562 pjdlog_error("Data length is too large (%ju > %ju).", 563 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 564 hio->hio_error = EINVAL; 565 goto end; 566 } 567 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 568 pjdlog_error("Offset %ju is not multiple of sector size.", 569 (uintmax_t)hio->hio_offset); 570 hio->hio_error = EINVAL; 571 goto end; 572 } 573 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 574 pjdlog_error("Length %ju is not multiple of sector size.", 575 (uintmax_t)hio->hio_length); 576 hio->hio_error = EINVAL; 577 goto end; 578 } 579 if (hio->hio_offset + hio->hio_length > 580 (uint64_t)res->hr_datasize) { 581 pjdlog_error("Data offset is too large (%ju > %ju).", 582 (uintmax_t)(hio->hio_offset + hio->hio_length), 583 (uintmax_t)res->hr_datasize); 584 hio->hio_error = EINVAL; 585 goto end; 586 } 587 break; 588 default: 589 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 590 hio->hio_cmd); 591 hio->hio_error = EINVAL; 592 goto end; 593 } 594 hio->hio_error = 0; 595end: 596 return (hio->hio_error); 597} 598 599static __dead2 void 600secondary_exit(int exitcode, const char *fmt, ...) 601{ 602 va_list ap; 603 604 PJDLOG_ASSERT(exitcode != EX_OK); 605 va_start(ap, fmt); 606 pjdlogv_errno(LOG_ERR, fmt, ap); 607 va_end(ap); 608 event_send(gres, EVENT_DISCONNECT); 609 exit(exitcode); 610} 611 612/* 613 * Thread receives requests from the primary node. 614 */ 615static void * 616recv_thread(void *arg) 617{ 618 struct hast_resource *res = arg; 619 struct hio *hio; 620 struct nv *nv; 621 622 for (;;) { 623 pjdlog_debug(2, "recv: Taking free request."); 624 QUEUE_TAKE(free, hio); 625 pjdlog_debug(2, "recv: (%p) Got request.", hio); 626 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 627 secondary_exit(EX_TEMPFAIL, 628 "Unable to receive request header"); 629 } 630 if (requnpack(res, hio, nv) != 0) { 631 nv_free(nv); 632 pjdlog_debug(2, 633 "recv: (%p) Moving request to the send queue.", 634 hio); 635 QUEUE_INSERT(send, hio); 636 continue; 637 } 638 switch (hio->hio_cmd) { 639 case HIO_READ: 640 res->hr_stat_read++; 641 break; 642 case HIO_WRITE: 643 res->hr_stat_write++; 644 break; 645 case HIO_DELETE: 646 res->hr_stat_delete++; 647 break; 648 case HIO_FLUSH: 649 res->hr_stat_flush++; 650 break; 651 case HIO_KEEPALIVE: 652 break; 653 default: 654 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 655 hio->hio_cmd); 656 } 657 reqlog(LOG_DEBUG, 2, -1, hio, 658 "recv: (%p) Got request header: ", hio); 659 if (hio->hio_cmd == HIO_KEEPALIVE) { 660 nv_free(nv); 661 pjdlog_debug(2, 662 "recv: (%p) Moving request to the free queue.", 663 hio); 664 hio_clear(hio); 665 QUEUE_INSERT(free, hio); 666 continue; 667 } else if (hio->hio_cmd == HIO_WRITE) { 668 if (hast_proto_recv_data(res, res->hr_remotein, nv, 669 hio->hio_data, MAXPHYS) < 0) { 670 secondary_exit(EX_TEMPFAIL, 671 "Unable to receive request data"); 672 } 673 } 674 nv_free(nv); 675 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 676 hio); 677 QUEUE_INSERT(disk, hio); 678 } 679 /* NOTREACHED */ 680 return (NULL); 681} 682 683/* 684 * Thread reads from or writes to local component and also handles DELETE and 685 * FLUSH requests. 686 */ 687static void * 688disk_thread(void *arg) 689{ 690 struct hast_resource *res = arg; 691 struct hio *hio; 692 ssize_t ret; 693 bool clear_activemap, logerror; 694 695 clear_activemap = true; 696 697 for (;;) { 698 pjdlog_debug(2, "disk: Taking request."); 699 QUEUE_TAKE(disk, hio); 700 while (clear_activemap) { 701 unsigned char *map; 702 size_t mapsize; 703 704 /* 705 * When first request is received, it means that primary 706 * already received our activemap, merged it and stored 707 * locally. We can now safely clear our activemap. 708 */ 709 mapsize = 710 activemap_calc_ondisk_size(res->hr_local_mediasize - 711 METADATA_SIZE, res->hr_extentsize, 712 res->hr_local_sectorsize); 713 map = calloc(1, mapsize); 714 if (map == NULL) { 715 pjdlog_warning("Unable to allocate memory to clear local activemap."); 716 break; 717 } 718 if (pwrite(res->hr_localfd, map, mapsize, 719 METADATA_SIZE) != (ssize_t)mapsize) { 720 pjdlog_errno(LOG_WARNING, 721 "Unable to store cleared activemap"); 722 free(map); 723 break; 724 } 725 free(map); 726 clear_activemap = false; 727 pjdlog_debug(1, "Local activemap cleared."); 728 break; 729 } 730 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 731 logerror = true; 732 /* Handle the actual request. */ 733 switch (hio->hio_cmd) { 734 case HIO_READ: 735 ret = pread(res->hr_localfd, hio->hio_data, 736 hio->hio_length, 737 hio->hio_offset + res->hr_localoff); 738 if (ret < 0) 739 hio->hio_error = errno; 740 else if (ret != (int64_t)hio->hio_length) 741 hio->hio_error = EIO; 742 else 743 hio->hio_error = 0; 744 break; 745 case HIO_WRITE: 746 ret = pwrite(res->hr_localfd, hio->hio_data, 747 hio->hio_length, 748 hio->hio_offset + res->hr_localoff); 749 if (ret < 0) 750 hio->hio_error = errno; 751 else if (ret != (int64_t)hio->hio_length) 752 hio->hio_error = EIO; 753 else 754 hio->hio_error = 0; 755 break; 756 case HIO_DELETE: 757 ret = g_delete(res->hr_localfd, 758 hio->hio_offset + res->hr_localoff, 759 hio->hio_length); 760 if (ret < 0) 761 hio->hio_error = errno; 762 else 763 hio->hio_error = 0; 764 break; 765 case HIO_FLUSH: 766 if (!res->hr_localflush) { 767 ret = -1; 768 hio->hio_error = EOPNOTSUPP; 769 logerror = false; 770 break; 771 } 772 ret = g_flush(res->hr_localfd); 773 if (ret < 0) { 774 if (errno == EOPNOTSUPP) 775 res->hr_localflush = false; 776 hio->hio_error = errno; 777 } else { 778 hio->hio_error = 0; 779 } 780 break; 781 default: 782 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 783 hio->hio_cmd); 784 } 785 if (logerror && hio->hio_error != 0) { 786 reqlog(LOG_ERR, 0, hio->hio_error, hio, 787 "Request failed: "); 788 } 789 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 790 hio); 791 QUEUE_INSERT(send, hio); 792 } 793 /* NOTREACHED */ 794 return (NULL); 795} 796 797/* 798 * Thread sends requests back to primary node. 799 */ 800static void * 801send_thread(void *arg) 802{ 803 struct hast_resource *res = arg; 804 struct nv *nvout; 805 struct hio *hio; 806 void *data; 807 size_t length; 808 809 for (;;) { 810 pjdlog_debug(2, "send: Taking request."); 811 QUEUE_TAKE(send, hio); 812 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 813 nvout = nv_alloc(); 814 /* Copy sequence number. */ 815 nv_add_uint64(nvout, hio->hio_seq, "seq"); 816 switch (hio->hio_cmd) { 817 case HIO_READ: 818 if (hio->hio_error == 0) { 819 data = hio->hio_data; 820 length = hio->hio_length; 821 break; 822 } 823 /* 824 * We send no data in case of an error. 825 */ 826 /* FALLTHROUGH */ 827 case HIO_DELETE: 828 case HIO_FLUSH: 829 case HIO_WRITE: 830 data = NULL; 831 length = 0; 832 break; 833 default: 834 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 835 hio->hio_cmd); 836 } 837 if (hio->hio_error != 0) 838 nv_add_int16(nvout, hio->hio_error, "error"); 839 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 840 length) < 0) { 841 secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 842 } 843 nv_free(nvout); 844 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 845 hio); 846 hio_clear(hio); 847 QUEUE_INSERT(free, hio); 848 } 849 /* NOTREACHED */ 850 return (NULL); 851} 852