secondary.c revision 211977
1121986Sjhb/*- 2121986Sjhb * Copyright (c) 2009-2010 The FreeBSD Foundation 3121986Sjhb * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4121986Sjhb * All rights reserved. 5121986Sjhb * 6121986Sjhb * This software was developed by Pawel Jakub Dawidek under sponsorship from 7121986Sjhb * the FreeBSD Foundation. 8121986Sjhb * 9121986Sjhb * Redistribution and use in source and binary forms, with or without 10121986Sjhb * modification, are permitted provided that the following conditions 11121986Sjhb * are met: 12121986Sjhb * 1. Redistributions of source code must retain the above copyright 13121986Sjhb * notice, this list of conditions and the following disclaimer. 14121986Sjhb * 2. Redistributions in binary form must reproduce the above copyright 15121986Sjhb * notice, this list of conditions and the following disclaimer in the 16121986Sjhb * documentation and/or other materials provided with the distribution. 17121986Sjhb * 18121986Sjhb * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19121986Sjhb * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20121986Sjhb * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21121986Sjhb * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22121986Sjhb * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23121986Sjhb * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24121986Sjhb * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25121986Sjhb * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26121986Sjhb * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27121986Sjhb * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28121986Sjhb * SUCH DAMAGE. 29121986Sjhb */ 30121986Sjhb 31121986Sjhb#include <sys/cdefs.h> 32121986Sjhb__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 211977 2010-08-29 21:41:53Z pjd $"); 33121986Sjhb 34121986Sjhb#include <sys/param.h> 35121986Sjhb#include <sys/time.h> 36167240Sjhb#include <sys/bio.h> 37121986Sjhb#include <sys/disk.h> 38167240Sjhb#include <sys/stat.h> 39121986Sjhb 40148538Sjhb#include <assert.h> 41121986Sjhb#include <err.h> 42167240Sjhb#include <errno.h> 43167240Sjhb#include <fcntl.h> 44167240Sjhb#include <libgeom.h> 45121986Sjhb#include <pthread.h> 46121986Sjhb#include <stdint.h> 47121986Sjhb#include <stdio.h> 48214631Sjhb#include <string.h> 49121986Sjhb#include <sysexits.h> 50121986Sjhb#include <unistd.h> 51121986Sjhb 52167747Sjhb#include <activemap.h> 53121986Sjhb#include <nv.h> 54121986Sjhb#include <pjdlog.h> 55121986Sjhb 56121986Sjhb#include "control.h" 57121986Sjhb#include "hast.h" 58121986Sjhb#include "hast_proto.h" 59121986Sjhb#include "hastd.h" 60151979Sjhb#include "hooks.h" 61151979Sjhb#include "metadata.h" 62151979Sjhb#include "proto.h" 63151979Sjhb#include "subr.h" 64121986Sjhb#include "synch.h" 65151897Srwatson 66121986Sjhbstruct hio { 67121986Sjhb uint64_t hio_seq; 68151979Sjhb int hio_error; 69151979Sjhb struct nv *hio_nv; 70151979Sjhb void *hio_data; 71151979Sjhb uint8_t hio_cmd; 72151979Sjhb uint64_t hio_offset; 73151979Sjhb uint64_t hio_length; 74151979Sjhb TAILQ_ENTRY(hio) hio_next; 75152461Sandre}; 76152461Sandre 77152461Sandre/* 78152461Sandre * Free list holds unused structures. When free list is empty, we have to wait 79121986Sjhb * until some in-progress requests are freed. 80121986Sjhb */ 81121986Sjhbstatic TAILQ_HEAD(, hio) hio_free_list; 82121986Sjhbstatic pthread_mutex_t hio_free_list_lock; 83151979Sjhbstatic pthread_cond_t hio_free_list_cond; 84122124Sjhb/* 85122124Sjhb * Disk thread (the one that do I/O requests) takes requests from this list. 86156124Sjhb */ 87122124Sjhbstatic TAILQ_HEAD(, hio) hio_disk_list; 88122124Sjhbstatic pthread_mutex_t hio_disk_list_lock; 89122124Sjhbstatic pthread_cond_t hio_disk_list_cond; 90130980Sjhb/* 91157541Sjhb * There is one recv list for every component, although local components don't 92121986Sjhb * use recv lists as local requests are done synchronously. 93121986Sjhb */ 94121986Sjhbstatic TAILQ_HEAD(, hio) hio_send_list; 95121986Sjhbstatic pthread_mutex_t hio_send_list_lock; 96121986Sjhbstatic pthread_cond_t hio_send_list_cond; 97121986Sjhb 98121986Sjhb/* 99121986Sjhb * Maximum number of outstanding I/O requests. 100121986Sjhb */ 101167747Sjhb#define HAST_HIO_MAX 256 102121986Sjhb 103121986Sjhbstatic void *recv_thread(void *arg); 104121986Sjhbstatic void *disk_thread(void *arg); 105121986Sjhbstatic void *send_thread(void *arg); 106121986Sjhb 107121986Sjhb#define QUEUE_INSERT(name, hio) do { \ 108130980Sjhb bool _wakeup; \ 109151979Sjhb \ 110121986Sjhb mtx_lock(&hio_##name##_list_lock); \ 111133017Sscottl _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 112121986Sjhb TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 113121986Sjhb mtx_unlock(&hio_##name##_list_lock); \ 114169391Sjhb if (_wakeup) \ 115121986Sjhb cv_signal(&hio_##name##_list_cond); \ 116121986Sjhb} while (0) 117128931Sjhb#define QUEUE_TAKE(name, hio) do { \ 118128931Sjhb mtx_lock(&hio_##name##_list_lock); \ 119163219Sjhb while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 120195249Sjhb cv_wait(&hio_##name##_list_cond, \ 121129964Sjhb &hio_##name##_list_lock); \ 122121986Sjhb } \ 123129097Sjhb TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 124121986Sjhb mtx_unlock(&hio_##name##_list_lock); \ 125121986Sjhb} while (0) 126169391Sjhb 127169391Sjhbstatic void 128156124Sjhbinit_environment(void) 129121986Sjhb{ 130156124Sjhb struct hio *hio; 131156124Sjhb unsigned int ii; 132156124Sjhb 133248085Smarius /* 134148538Sjhb * Initialize lists, their locks and theirs condition variables. 135148538Sjhb */ 136148538Sjhb TAILQ_INIT(&hio_free_list); 137148538Sjhb mtx_init(&hio_free_list_lock); 138148538Sjhb cv_init(&hio_free_list_cond); 139133017Sscottl TAILQ_INIT(&hio_disk_list); 140133017Sscottl mtx_init(&hio_disk_list_lock); 141133017Sscottl cv_init(&hio_disk_list_cond); 142133017Sscottl TAILQ_INIT(&hio_send_list); 143133017Sscottl mtx_init(&hio_send_list_lock); 144133017Sscottl cv_init(&hio_send_list_cond); 145121986Sjhb 146121986Sjhb /* 147121986Sjhb * Allocate requests pool and initialize requests. 148121986Sjhb */ 149121986Sjhb for (ii = 0; ii < HAST_HIO_MAX; ii++) { 150121986Sjhb hio = malloc(sizeof(*hio)); 151121986Sjhb if (hio == NULL) { 152121986Sjhb pjdlog_exitx(EX_TEMPFAIL, 153121986Sjhb "Unable to allocate memory (%zu bytes) for hio request.", 154121986Sjhb sizeof(*hio)); 155121986Sjhb } 156121986Sjhb hio->hio_error = 0; 157121986Sjhb hio->hio_data = malloc(MAXPHYS); 158121986Sjhb if (hio->hio_data == NULL) { 159121986Sjhb pjdlog_exitx(EX_TEMPFAIL, 160121986Sjhb "Unable to allocate memory (%zu bytes) for gctl_data.", 161121986Sjhb (size_t)MAXPHYS); 162121986Sjhb } 163130980Sjhb TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 164130980Sjhb } 165130980Sjhb} 166130980Sjhb 167130980Sjhbstatic void 168130980Sjhbinit_local(struct hast_resource *res) 169130980Sjhb{ 170130980Sjhb 171130980Sjhb if (metadata_read(res, true) < 0) 172130980Sjhb exit(EX_NOINPUT); 173130980Sjhb} 174130980Sjhb 175130980Sjhbstatic void 176130980Sjhbinit_remote(struct hast_resource *res, struct nv *nvin) 177130980Sjhb{ 178130980Sjhb uint64_t resuid; 179121986Sjhb struct nv *nvout; 180151979Sjhb unsigned char *map; 181130980Sjhb size_t mapsize; 182130980Sjhb 183151979Sjhb map = NULL; 184151979Sjhb mapsize = 0; 185130980Sjhb nvout = nv_alloc(); 186130980Sjhb nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 187151979Sjhb nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 188130980Sjhb resuid = nv_get_uint64(nvin, "resuid"); 189130980Sjhb res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 190151979Sjhb res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 191130980Sjhb nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 192130980Sjhb nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 193151979Sjhb mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 194130980Sjhb METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 195130980Sjhb map = malloc(mapsize); 196130980Sjhb if (map == NULL) { 197130980Sjhb pjdlog_exitx(EX_TEMPFAIL, 198151979Sjhb "Unable to allocate memory (%zu bytes) for activemap.", 199130980Sjhb mapsize); 200130980Sjhb } 201130980Sjhb nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 202130980Sjhb /* 203121986Sjhb * When we work as primary and secondary is missing we will increase 204121986Sjhb * localcnt in our metadata. When secondary is connected and synced 205121986Sjhb * we make localcnt be equal to remotecnt, which means nodes are more 206121986Sjhb * or less in sync. 207121986Sjhb * Split-brain condition is when both nodes are not able to communicate 208121986Sjhb * and are both configured as primary nodes. In turn, they can both 209121986Sjhb * make incompatible changes to the data and we have to detect that. 210121986Sjhb * Under split-brain condition we will increase our localcnt on first 211157541Sjhb * write and remote node will increase its localcnt on first write. 212121986Sjhb * When we connect we can see that primary's localcnt is greater than 213121986Sjhb * our remotecnt (primary was modified while we weren't watching) and 214121986Sjhb * our localcnt is greater than primary's remotecnt (we were modified 215121986Sjhb * while primary wasn't watching). 216121986Sjhb * There are many possible combinations which are all gathered below. 217121986Sjhb * Don't pay too much attention to exact numbers, the more important 218121986Sjhb * is to compare them. We compare secondary's local with primary's 219121986Sjhb * remote and secondary's remote with primary's local. 220133017Sscottl * Note that every case where primary's localcnt is smaller than 221121986Sjhb * secondary's remotecnt and where secondary's localcnt is smaller than 222121986Sjhb * primary's remotecnt should be impossible in practise. We will perform 223121986Sjhb * full synchronization then. Those cases are marked with an asterisk. 224121986Sjhb * Regular synchronization means that only extents marked as dirty are 225121986Sjhb * synchronized (regular synchronization). 226121986Sjhb * 227121986Sjhb * SECONDARY METADATA PRIMARY METADATA 228157541Sjhb * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 229121986Sjhb * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 230121986Sjhb * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 231121986Sjhb * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 232121986Sjhb * regular sync from secondary. 233133017Sscottl * local=3 remote=3 local=3 remote=3 Regular sync just in case. 234133017Sscottl * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 235133017Sscottl * local=3 remote=3 local=4 remote=2 Split-brain condition. 236133017Sscottl * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 237121986Sjhb * regular sync from primary. 238121986Sjhb * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 239121986Sjhb */ 240121986Sjhb if (res->hr_resuid == 0) { 241121986Sjhb /* 242121986Sjhb * Provider is used for the first time. Initialize everything. 243122148Sjhb */ 244133017Sscottl assert(res->hr_secondary_localcnt == 0); 245121986Sjhb res->hr_resuid = resuid; 246121986Sjhb if (metadata_write(res) < 0) 247121986Sjhb exit(EX_NOINPUT); 248129964Sjhb memset(map, 0xff, mapsize); 249129964Sjhb nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 250129964Sjhb } else if ( 251129964Sjhb /* Is primary is out-of-date? */ 252129964Sjhb (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 253129964Sjhb res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 254129964Sjhb /* Node are more or less in sync? */ 255129964Sjhb (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 256129964Sjhb res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 257151979Sjhb /* Is secondary is out-of-date? */ 258151979Sjhb (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 259151979Sjhb res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 260151979Sjhb /* 261208915Sjhb * Nodes are more or less in sync or one of the nodes is 262151979Sjhb * out-of-date. 263151979Sjhb * It doesn't matter at this point which one, we just have to 264129964Sjhb * send out local bitmap to the remote node. 265129964Sjhb */ 266129964Sjhb if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 267129964Sjhb (ssize_t)mapsize) { 268129964Sjhb pjdlog_exit(LOG_ERR, "Unable to read activemap"); 269129964Sjhb } 270129964Sjhb if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 271129964Sjhb res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 272129964Sjhb /* Primary is out-of-date, sync from secondary. */ 273129964Sjhb nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 274156124Sjhb } else { 275156124Sjhb /* 276129964Sjhb * Secondary is out-of-date or counts match. 277129964Sjhb * Sync from primary. 278129964Sjhb */ 279129964Sjhb nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 280129964Sjhb } 281129964Sjhb } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 282129964Sjhb res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 283129964Sjhb /* 284129964Sjhb * Not good, we have split-brain condition. 285129964Sjhb */ 286129964Sjhb pjdlog_error("Split-brain detected, exiting."); 287129964Sjhb nv_add_string(nvout, "Split-brain condition!", "errmsg"); 288151979Sjhb free(map); 289151979Sjhb map = NULL; 290129964Sjhb mapsize = 0; 291148538Sjhb } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 292129964Sjhb res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 293129964Sjhb /* 294151979Sjhb * This should never happen in practise, but we will perform 295129964Sjhb * full synchronization. 296129964Sjhb */ 297129964Sjhb assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 298129964Sjhb res->hr_primary_localcnt < res->hr_secondary_remotecnt); 299151979Sjhb mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 300129964Sjhb METADATA_SIZE, res->hr_extentsize, 301129964Sjhb res->hr_local_sectorsize); 302129964Sjhb memset(map, 0xff, mapsize); 303129964Sjhb if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 304129964Sjhb /* In this one of five cases sync from secondary. */ 305151979Sjhb nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 306151979Sjhb } else { 307156124Sjhb /* For the rest four cases sync from primary. */ 308129964Sjhb nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 309129964Sjhb } 310129964Sjhb pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 311157541Sjhb (uintmax_t)res->hr_primary_localcnt, 312129964Sjhb (uintmax_t)res->hr_primary_remotecnt, 313129964Sjhb (uintmax_t)res->hr_secondary_localcnt, 314129964Sjhb (uintmax_t)res->hr_secondary_remotecnt); 315129964Sjhb } 316129964Sjhb if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 317129964Sjhb pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s", 318129964Sjhb res->hr_remoteaddr); 319195249Sjhb nv_free(nvout); 320156124Sjhb exit(EX_TEMPFAIL); 321121986Sjhb } 322156124Sjhb nv_free(nvout); 323156124Sjhb if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 324195415Sjhb res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 325187880Sjeff /* Exit on split-brain. */ 326121986Sjhb exit(EX_CONFIG); 327187880Sjeff } 328187880Sjeff} 329187880Sjeff 330187880Sjeffvoid 331187880Sjeffhastd_secondary(struct hast_resource *res, struct nv *nvin) 332187880Sjeff{ 333187880Sjeff pthread_t td; 334187880Sjeff pid_t pid; 335187880Sjeff int error; 336187880Sjeff 337187880Sjeff /* 338187880Sjeff * Create communication channel between parent and child. 339195249Sjhb */ 340187880Sjeff if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 341187880Sjeff KEEP_ERRNO((void)pidfile_remove(pfh)); 342187880Sjeff pjdlog_exit(EX_OSERR, 343187880Sjeff "Unable to create control sockets between parent and child"); 344187880Sjeff } 345195415Sjhb 346195415Sjhb pid = fork(); 347195249Sjhb if (pid < 0) { 348195249Sjhb KEEP_ERRNO((void)pidfile_remove(pfh)); 349208915Sjhb pjdlog_exit(EX_OSERR, "Unable to fork"); 350208915Sjhb } 351208915Sjhb 352208915Sjhb if (pid > 0) { 353208915Sjhb /* This is parent. */ 354208915Sjhb proto_close(res->hr_remotein); 355208915Sjhb res->hr_remotein = NULL; 356208991Smav proto_close(res->hr_remoteout); 357208915Sjhb res->hr_remoteout = NULL; 358208915Sjhb res->hr_workerpid = pid; 359216679Sjhb return; 360208915Sjhb } 361216679Sjhb 362208915Sjhb (void)pidfile_close(pfh); 363208915Sjhb hook_fini(); 364195415Sjhb 365195415Sjhb setproctitle("%s (secondary)", res->hr_name); 366195415Sjhb 367195415Sjhb signal(SIGHUP, SIG_DFL); 368121986Sjhb signal(SIGCHLD, SIG_DFL); 369187880Sjeff 370187880Sjeff /* Error in setting timeout is not critical, but why should it fail? */ 371151979Sjhb if (proto_timeout(res->hr_remotein, 0) < 0) 372187880Sjeff pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 373187880Sjeff if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 374121986Sjhb pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 375129964Sjhb 376208915Sjhb init_local(res); 377208915Sjhb init_remote(res, nvin); 378187880Sjeff init_environment(); 379187880Sjeff 380187880Sjeff error = pthread_create(&td, NULL, recv_thread, res); 381187880Sjeff assert(error == 0); 382195415Sjhb error = pthread_create(&td, NULL, disk_thread, res); 383195415Sjhb assert(error == 0); 384195415Sjhb error = pthread_create(&td, NULL, send_thread, res); 385187880Sjeff assert(error == 0); 386195415Sjhb (void)ctrl_thread(res); 387195249Sjhb} 388121986Sjhb 389121986Sjhbstatic void 390121986Sjhbreqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 391121986Sjhb{ 392121986Sjhb char msg[1024]; 393121986Sjhb va_list ap; 394121986Sjhb int len; 395187880Sjeff 396195249Sjhb va_start(ap, fmt); 397195249Sjhb len = vsnprintf(msg, sizeof(msg), fmt, ap); 398195249Sjhb va_end(ap); 399187880Sjeff if ((size_t)len < sizeof(msg)) { 400121986Sjhb switch (hio->hio_cmd) { 401121986Sjhb case HIO_READ: 402187880Sjeff (void)snprintf(msg + len, sizeof(msg) - len, 403169391Sjhb "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 404169391Sjhb (uintmax_t)hio->hio_length); 405169391Sjhb break; 406169391Sjhb case HIO_DELETE: 407169391Sjhb (void)snprintf(msg + len, sizeof(msg) - len, 408169391Sjhb "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 409169391Sjhb (uintmax_t)hio->hio_length); 410169391Sjhb break; 411169391Sjhb case HIO_FLUSH: 412187880Sjeff (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 413208915Sjhb break; 414169391Sjhb case HIO_WRITE: 415169391Sjhb (void)snprintf(msg + len, sizeof(msg) - len, 416169391Sjhb "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 417208915Sjhb (uintmax_t)hio->hio_length); 418187880Sjeff break; 419169391Sjhb case HIO_KEEPALIVE: 420169391Sjhb (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 421169391Sjhb break; 422121986Sjhb default: 423121986Sjhb (void)snprintf(msg + len, sizeof(msg) - len, 424121986Sjhb "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 425121986Sjhb break; 426121986Sjhb } 427121986Sjhb } 428151979Sjhb pjdlog_common(loglevel, debuglevel, error, "%s", msg); 429121986Sjhb} 430121986Sjhb 431121986Sjhbstatic int 432121986Sjhbrequnpack(struct hast_resource *res, struct hio *hio) 433121986Sjhb{ 434121986Sjhb 435121986Sjhb hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 436151979Sjhb if (hio->hio_cmd == 0) { 437151979Sjhb pjdlog_error("Header contains no 'cmd' field."); 438121986Sjhb hio->hio_error = EINVAL; 439121986Sjhb goto end; 440121986Sjhb } 441128931Sjhb switch (hio->hio_cmd) { 442128931Sjhb case HIO_KEEPALIVE: 443128931Sjhb break; 444128931Sjhb case HIO_READ: 445128931Sjhb case HIO_WRITE: 446128931Sjhb case HIO_DELETE: 447130984Sjhb hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 448128931Sjhb if (nv_error(hio->hio_nv) != 0) { 449128931Sjhb pjdlog_error("Header is missing 'offset' field."); 450128931Sjhb hio->hio_error = EINVAL; 451128931Sjhb goto end; 452128931Sjhb } 453130984Sjhb hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 454130984Sjhb if (nv_error(hio->hio_nv) != 0) { 455130984Sjhb pjdlog_error("Header is missing 'length' field."); 456130984Sjhb hio->hio_error = EINVAL; 457140452Sjhb goto end; 458128931Sjhb } 459208915Sjhb if (hio->hio_length == 0) { 460130984Sjhb pjdlog_error("Data length is zero."); 461130984Sjhb hio->hio_error = EINVAL; 462130984Sjhb goto end; 463130984Sjhb } 464130984Sjhb if (hio->hio_length > MAXPHYS) { 465130984Sjhb pjdlog_error("Data length is too large (%ju > %ju).", 466130984Sjhb (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 467130984Sjhb hio->hio_error = EINVAL; 468130984Sjhb goto end; 469130984Sjhb } 470130984Sjhb if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 471130984Sjhb pjdlog_error("Offset %ju is not multiple of sector size.", 472130984Sjhb (uintmax_t)hio->hio_offset); 473130984Sjhb hio->hio_error = EINVAL; 474130984Sjhb goto end; 475130984Sjhb } 476130984Sjhb if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 477130984Sjhb pjdlog_error("Length %ju is not multiple of sector size.", 478130984Sjhb (uintmax_t)hio->hio_length); 479130984Sjhb hio->hio_error = EINVAL; 480130984Sjhb goto end; 481208915Sjhb } 482128931Sjhb if (hio->hio_offset + hio->hio_length > 483128931Sjhb (uint64_t)res->hr_datasize) { 484128931Sjhb pjdlog_error("Data offset is too large (%ju > %ju).", 485121986Sjhb (uintmax_t)(hio->hio_offset + hio->hio_length), 486163219Sjhb (uintmax_t)res->hr_datasize); 487121986Sjhb hio->hio_error = EINVAL; 488163219Sjhb goto end; 489163219Sjhb } 490121986Sjhb break; 491208915Sjhb default: 492163219Sjhb pjdlog_error("Header contains invalid 'cmd' (%hhu).", 493163219Sjhb hio->hio_cmd); 494208915Sjhb hio->hio_error = EINVAL; 495121986Sjhb goto end; 496121986Sjhb } 497121986Sjhb hio->hio_error = 0; 498121986Sjhbend: 499121986Sjhb return (hio->hio_error); 500121986Sjhb} 501167247Sjhb 502121986Sjhb/* 503121986Sjhb * Thread receives requests from the primary node. 504121986Sjhb */ 505121986Sjhbstatic void * 506121986Sjhbrecv_thread(void *arg) 507121986Sjhb{ 508121986Sjhb struct hast_resource *res = arg; 509145054Sjhb struct hio *hio; 510156920Sjhb 511121986Sjhb for (;;) { 512145054Sjhb pjdlog_debug(2, "recv: Taking free request."); 513121986Sjhb QUEUE_TAKE(free, hio); 514145054Sjhb pjdlog_debug(2, "recv: (%p) Got request.", hio); 515145054Sjhb if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 516152528Sjhb pjdlog_exit(EX_TEMPFAIL, 517145057Sjhb "Unable to receive request header"); 518145054Sjhb } 519145054Sjhb if (requnpack(res, hio) != 0) { 520145054Sjhb pjdlog_debug(2, 521145054Sjhb "recv: (%p) Moving request to the send queue.", 522145054Sjhb hio); 523121986Sjhb QUEUE_INSERT(send, hio); 524121986Sjhb continue; 525121986Sjhb } 526121986Sjhb reqlog(LOG_DEBUG, 2, -1, hio, 527121986Sjhb "recv: (%p) Got request header: ", hio); 528195249Sjhb if (hio->hio_cmd == HIO_KEEPALIVE) { 529121986Sjhb pjdlog_debug(2, 530121986Sjhb "recv: (%p) Moving request to the free queue.", 531121986Sjhb hio); 532121986Sjhb nv_free(hio->hio_nv); 533121986Sjhb QUEUE_INSERT(free, hio); 534121986Sjhb continue; 535121986Sjhb } else if (hio->hio_cmd == HIO_WRITE) { 536121986Sjhb if (hast_proto_recv_data(res, res->hr_remotein, 537121986Sjhb hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 538121986Sjhb pjdlog_exit(EX_TEMPFAIL, 539121986Sjhb "Unable to receive reply data"); 540121986Sjhb } 541170340Sjhb } 542121986Sjhb pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 543121986Sjhb hio); 544121986Sjhb QUEUE_INSERT(disk, hio); 545122124Sjhb } 546121986Sjhb /* NOTREACHED */ 547121986Sjhb return (NULL); 548167747Sjhb} 549121986Sjhb 550121986Sjhb/* 551121986Sjhb * Thread reads from or writes to local component and also handles DELETE and 552121986Sjhb * FLUSH requests. 553121986Sjhb */ 554121986Sjhbstatic void * 555121986Sjhbdisk_thread(void *arg) 556121986Sjhb{ 557121986Sjhb struct hast_resource *res = arg; 558121986Sjhb struct hio *hio; 559121986Sjhb ssize_t ret; 560151979Sjhb bool clear_activemap; 561121986Sjhb 562121986Sjhb clear_activemap = true; 563145080Sjhb 564142256Sjhb for (;;) { 565130980Sjhb pjdlog_debug(2, "disk: Taking request."); 566121986Sjhb QUEUE_TAKE(disk, hio); 567151979Sjhb while (clear_activemap) { 568130980Sjhb unsigned char *map; 569151979Sjhb size_t mapsize; 570130980Sjhb 571121986Sjhb /* 572121986Sjhb * When first request is received, it means that primary 573121986Sjhb * already received our activemap, merged it and stored 574121986Sjhb * locally. We can now safely clear our activemap. 575130980Sjhb */ 576121986Sjhb mapsize = 577121986Sjhb activemap_calc_ondisk_size(res->hr_local_mediasize - 578121986Sjhb METADATA_SIZE, res->hr_extentsize, 579121986Sjhb res->hr_local_sectorsize); 580121986Sjhb map = calloc(1, mapsize); 581121986Sjhb if (map == NULL) { 582156124Sjhb pjdlog_warning("Unable to allocate memory to clear local activemap."); 583156124Sjhb break; 584121986Sjhb } 585156124Sjhb if (pwrite(res->hr_localfd, map, mapsize, 586121986Sjhb METADATA_SIZE) != (ssize_t)mapsize) { 587121986Sjhb pjdlog_errno(LOG_WARNING, 588121986Sjhb "Unable to store cleared activemap"); 589121986Sjhb free(map); 590121986Sjhb break; 591121986Sjhb } 592121986Sjhb free(map); 593121986Sjhb clear_activemap = false; 594121986Sjhb pjdlog_debug(1, "Local activemap cleared."); 595121986Sjhb } 596121986Sjhb reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 597121986Sjhb /* Handle the actual request. */ 598121986Sjhb switch (hio->hio_cmd) { 599121986Sjhb case HIO_READ: 600121986Sjhb ret = pread(res->hr_localfd, hio->hio_data, 601121986Sjhb hio->hio_length, 602151979Sjhb hio->hio_offset + res->hr_localoff); 603121986Sjhb if (ret < 0) 604121986Sjhb hio->hio_error = errno; 605121986Sjhb else if (ret != (int64_t)hio->hio_length) 606121986Sjhb hio->hio_error = EIO; 607121986Sjhb else 608121986Sjhb hio->hio_error = 0; 609121986Sjhb break; 610121986Sjhb case HIO_WRITE: 611121986Sjhb ret = pwrite(res->hr_localfd, hio->hio_data, 612121986Sjhb hio->hio_length, 613151979Sjhb hio->hio_offset + res->hr_localoff); 614121986Sjhb if (ret < 0) 615151979Sjhb hio->hio_error = errno; 616121986Sjhb else if (ret != (int64_t)hio->hio_length) 617121986Sjhb hio->hio_error = EIO; 618121986Sjhb else 619121986Sjhb hio->hio_error = 0; 620121986Sjhb break; 621121986Sjhb case HIO_DELETE: 622121986Sjhb ret = g_delete(res->hr_localfd, 623121986Sjhb hio->hio_offset + res->hr_localoff, 624121986Sjhb hio->hio_length); 625121986Sjhb if (ret < 0) 626121986Sjhb hio->hio_error = errno; 627121986Sjhb else 628121986Sjhb hio->hio_error = 0; 629151979Sjhb break; 630121986Sjhb case HIO_FLUSH: 631151979Sjhb ret = g_flush(res->hr_localfd); 632121986Sjhb if (ret < 0) 633121986Sjhb hio->hio_error = errno; 634121986Sjhb else 635121986Sjhb hio->hio_error = 0; 636121986Sjhb break; 637121986Sjhb } 638121986Sjhb if (hio->hio_error != 0) { 639130980Sjhb reqlog(LOG_ERR, 0, hio->hio_error, hio, 640130980Sjhb "Request failed: "); 641130980Sjhb } 642130980Sjhb pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 643130980Sjhb hio); 644130980Sjhb QUEUE_INSERT(send, hio); 645130980Sjhb } 646130980Sjhb /* NOTREACHED */ 647130980Sjhb return (NULL); 648151979Sjhb} 649130980Sjhb 650164358Sjhb/* 651164358Sjhb * Thread sends requests back to primary node. 652130980Sjhb */ 653130980Sjhbstatic void * 654130980Sjhbsend_thread(void *arg) 655130980Sjhb{ 656130980Sjhb struct hast_resource *res = arg; 657130980Sjhb struct nv *nvout; 658130980Sjhb struct hio *hio; 659130980Sjhb void *data; 660121986Sjhb size_t length; 661121986Sjhb 662121986Sjhb for (;;) { 663121986Sjhb pjdlog_debug(2, "send: Taking request."); 664121986Sjhb QUEUE_TAKE(send, hio); 665121986Sjhb reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 666121986Sjhb nvout = nv_alloc(); 667151979Sjhb /* Copy sequence number. */ 668130980Sjhb nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 669151979Sjhb switch (hio->hio_cmd) { 670121986Sjhb case HIO_READ: 671130980Sjhb if (hio->hio_error == 0) { 672151979Sjhb data = hio->hio_data; 673121986Sjhb length = hio->hio_length; 674121986Sjhb break; 675121986Sjhb } 676121986Sjhb /* 677121986Sjhb * We send no data in case of an error. 678121986Sjhb */ 679121986Sjhb /* FALLTHROUGH */ 680121986Sjhb case HIO_DELETE: 681121986Sjhb case HIO_FLUSH: 682121986Sjhb case HIO_WRITE: 683121986Sjhb data = NULL; 684121986Sjhb length = 0; 685121986Sjhb break; 686121986Sjhb default: 687121986Sjhb abort(); 688121986Sjhb break; 689121986Sjhb } 690151979Sjhb if (hio->hio_error != 0) 691130980Sjhb nv_add_int16(nvout, hio->hio_error, "error"); 692151979Sjhb if (hast_proto_send(res, res->hr_remoteout, nvout, data, 693121986Sjhb length) < 0) { 694130980Sjhb pjdlog_exit(EX_TEMPFAIL, "Unable to send reply."); 695151979Sjhb } 696121986Sjhb nv_free(nvout); 697121986Sjhb pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 698121986Sjhb hio); 699121986Sjhb nv_free(hio->hio_nv); 700121986Sjhb hio->hio_error = 0; 701121986Sjhb QUEUE_INSERT(free, hio); 702121986Sjhb } 703121986Sjhb /* NOTREACHED */ 704121986Sjhb return (NULL); 705121986Sjhb} 706121986Sjhb