primary.c revision 212038
11556Srgrimes/*- 21556Srgrimes * Copyright (c) 2009 The FreeBSD Foundation 31556Srgrimes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 41556Srgrimes * All rights reserved. 51556Srgrimes * 61556Srgrimes * This software was developed by Pawel Jakub Dawidek under sponsorship from 71556Srgrimes * the FreeBSD Foundation. 81556Srgrimes * 91556Srgrimes * Redistribution and use in source and binary forms, with or without 101556Srgrimes * modification, are permitted provided that the following conditions 111556Srgrimes * are met: 121556Srgrimes * 1. Redistributions of source code must retain the above copyright 131556Srgrimes * notice, this list of conditions and the following disclaimer. 141556Srgrimes * 2. Redistributions in binary form must reproduce the above copyright 151556Srgrimes * notice, this list of conditions and the following disclaimer in the 161556Srgrimes * documentation and/or other materials provided with the distribution. 171556Srgrimes * 181556Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 191556Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 201556Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 211556Srgrimes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 221556Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 231556Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 241556Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 251556Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 261556Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 271556Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 281556Srgrimes * SUCH DAMAGE. 291556Srgrimes */ 301556Srgrimes 311556Srgrimes#include <sys/cdefs.h> 321556Srgrimes__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212038 2010-08-30 23:26:10Z pjd $"); 331556Srgrimes 3436150Scharnier#include <sys/types.h> 3536150Scharnier#include <sys/time.h> 3636150Scharnier#include <sys/bio.h> 371556Srgrimes#include <sys/disk.h> 3899110Sobrien#include <sys/refcount.h> 3999110Sobrien#include <sys/stat.h> 401556Srgrimes 4117987Speter#include <geom/gate/g_gate.h> 4217987Speter 4317987Speter#include <assert.h> 4417987Speter#include <err.h> 4517987Speter#include <errno.h> 4617987Speter#include <fcntl.h> 4717987Speter#include <libgeom.h> 481556Srgrimes#include <pthread.h> 491556Srgrimes#include <signal.h> 501556Srgrimes#include <stdint.h> 511556Srgrimes#include <stdio.h> 521556Srgrimes#include <string.h> 5317987Speter#include <sysexits.h> 541556Srgrimes#include <unistd.h> 551556Srgrimes 561556Srgrimes#include <activemap.h> 571556Srgrimes#include <nv.h> 581556Srgrimes#include <rangelock.h> 591556Srgrimes 601556Srgrimes#include "control.h" 611556Srgrimes#include "event.h" 621556Srgrimes#include "hast.h" 63100588Stjr#include "hast_proto.h" 641556Srgrimes#include "hastd.h" 651556Srgrimes#include "hooks.h" 661556Srgrimes#include "metadata.h" 671556Srgrimes#include "proto.h" 681556Srgrimes#include "pjdlog.h" 691556Srgrimes#include "subr.h" 701556Srgrimes#include "synch.h" 711556Srgrimes 7212043Speter/* The is only one remote component for now. */ 731556Srgrimes#define ISREMOTE(no) ((no) == 1) 741556Srgrimes 751556Srgrimesstruct hio { 761556Srgrimes /* 771556Srgrimes * Number of components we are still waiting for. 781556Srgrimes * When this field goes to 0, we can send the request back to the 791556Srgrimes * kernel. Each component has to decrease this counter by one 801556Srgrimes * even on failure. 811556Srgrimes */ 821556Srgrimes unsigned int hio_countdown; 831556Srgrimes /* 841556Srgrimes * Each component has a place to store its own error. 851556Srgrimes * Once the request is handled by all components we can decide if the 8620425Ssteve * request overall is successful or not. 8720425Ssteve */ 881556Srgrimes int *hio_errors; 891556Srgrimes /* 901556Srgrimes * Structure used to comunicate with GEOM Gate class. 911556Srgrimes */ 921556Srgrimes struct g_gate_ctl_io hio_ggio; 931556Srgrimes TAILQ_ENTRY(hio) *hio_next; 941556Srgrimes}; 951556Srgrimes#define hio_free_next hio_next[0] 96201053Sjilles#define hio_done_next hio_next[0] 9712043Speter 981556Srgrimes/* 991556Srgrimes * Free list holds unused structures. When free list is empty, we have to wait 1001556Srgrimes * until some in-progress requests are freed. 101213760Sobrien */ 1021556Srgrimesstatic TAILQ_HEAD(, hio) hio_free_list; 1031556Srgrimesstatic pthread_mutex_t hio_free_list_lock; 1041556Srgrimesstatic pthread_cond_t hio_free_list_cond; 1051556Srgrimes/* 1061556Srgrimes * There is one send list for every component. One requests is placed on all 107213811Sobrien * send lists - each component gets the same request, but each component is 108213811Sobrien * responsible for managing his own send list. 109231790Sjilles */ 1101556Srgrimesstatic TAILQ_HEAD(, hio) *hio_send_list; 1111556Srgrimesstatic pthread_mutex_t *hio_send_list_lock; 1121556Srgrimesstatic pthread_cond_t *hio_send_list_cond; 1131556Srgrimes/* 1141556Srgrimes * There is one recv list for every component, although local components don't 115201053Sjilles * use recv lists as local requests are done synchronously. 116201053Sjilles */ 1171556Srgrimesstatic TAILQ_HEAD(, hio) *hio_recv_list; 1181556Srgrimesstatic pthread_mutex_t *hio_recv_list_lock; 1191556Srgrimesstatic pthread_cond_t *hio_recv_list_cond; 1201556Srgrimes/* 1211556Srgrimes * Request is placed on done list by the slowest component (the one that 122194406Sjilles * decreased hio_countdown from 1 to 0). 123218306Sjilles */ 1241556Srgrimesstatic TAILQ_HEAD(, hio) hio_done_list; 1251556Srgrimesstatic pthread_mutex_t hio_done_list_lock; 1261556Srgrimesstatic pthread_cond_t hio_done_list_cond; 1271556Srgrimes/* 1281556Srgrimes * Structure below are for interaction with sync thread. 1291556Srgrimes */ 1301556Srgrimesstatic bool sync_inprogress; 1311556Srgrimesstatic pthread_mutex_t sync_lock; 1321556Srgrimesstatic pthread_cond_t sync_cond; 13390111Simp/* 13417987Speter * The lock below allows to synchornize access to remote connections. 13525225Ssteve */ 1361556Srgrimesstatic pthread_rwlock_t *hio_remote_lock; 1371556Srgrimes 1381556Srgrimes/* 1391556Srgrimes * Lock to synchronize metadata updates. Also synchronize access to 1401556Srgrimes * hr_primary_localcnt and hr_primary_remotecnt fields. 1411556Srgrimes */ 1421556Srgrimesstatic pthread_mutex_t metadata_lock; 1431556Srgrimes 1441556Srgrimes/* 1451556Srgrimes * Maximum number of outstanding I/O requests. 1461556Srgrimes */ 1471556Srgrimes#define HAST_HIO_MAX 256 1481556Srgrimes/* 1491556Srgrimes * Number of components. At this point there are only two components: local 1501556Srgrimes * and remote, but in the future it might be possible to use multiple local 1511556Srgrimes * and remote components. 1521556Srgrimes */ 1531556Srgrimes#define HAST_NCOMPONENTS 2 1541556Srgrimes/* 1551556Srgrimes * Number of seconds to sleep between reconnect retries or keepalive packets. 1561556Srgrimes */ 1571556Srgrimes#define RETRY_SLEEP 10 1581556Srgrimes 1591556Srgrimes#define ISCONNECTED(res, no) \ 1601556Srgrimes ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 1611556Srgrimes 16290111Simp#define QUEUE_INSERT1(hio, name, ncomp) do { \ 16320425Ssteve bool _wakeup; \ 1641556Srgrimes \ 1651556Srgrimes mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 1661556Srgrimes _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 16720425Ssteve TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 168213811Sobrien hio_next[(ncomp)]); \ 16990111Simp mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 17012043Speter if (_wakeup) \ 17112043Speter cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 17220425Ssteve} while (0) 1731556Srgrimes#define QUEUE_INSERT2(hio, name) do { \ 174100588Stjr bool _wakeup; \ 175100588Stjr \ 176100588Stjr mtx_lock(&hio_##name##_list_lock); \ 177100588Stjr _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 178100588Stjr TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 179100588Stjr mtx_unlock(&hio_##name##_list_lock); \ 18012043Speter if (_wakeup) \ 18125225Ssteve cv_signal(&hio_##name##_list_cond); \ 18212043Speter} while (0) 183158143Sstefanf#define QUEUE_TAKE1(hio, name, ncomp) do { \ 184158143Sstefanf mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 18512043Speter while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 18612043Speter cv_wait(&hio_##name##_list_cond[(ncomp)], \ 187158143Sstefanf &hio_##name##_list_lock[(ncomp)]); \ 188158143Sstefanf } \ 18912043Speter TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 19012043Speter hio_next[(ncomp)]); \ 191158143Sstefanf mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 192158143Sstefanf} while (0) 193158143Sstefanf#define QUEUE_TAKE2(hio, name) do { \ 194158143Sstefanf mtx_lock(&hio_##name##_list_lock); \ 195158143Sstefanf while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 196158143Sstefanf cv_wait(&hio_##name##_list_cond, \ 197158143Sstefanf &hio_##name##_list_lock); \ 198158143Sstefanf } \ 199158143Sstefanf TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 20012043Speter mtx_unlock(&hio_##name##_list_lock); \ 20125225Ssteve} while (0) 20225225Ssteve 20312043Speter#define SYNCREQ(hio) do { \ 20412043Speter (hio)->hio_ggio.gctl_unit = -1; \ 20512043Speter (hio)->hio_ggio.gctl_seq = 1; \ 20612043Speter} while (0) 20712043Speter#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 20812043Speter#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 20912043Speter#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 21012043Speter 21112043Speterstatic struct hast_resource *gres; 21212043Speter 21312043Speterstatic pthread_mutex_t range_lock; 214199629Sjillesstatic struct rangelocks *range_regular; 21512043Speterstatic bool range_regular_wait; 21612043Speterstatic pthread_cond_t range_regular_cond; 21712043Speterstatic struct rangelocks *range_sync; 21812043Speterstatic bool range_sync_wait; 21912043Speterstatic pthread_cond_t range_sync_cond; 22020425Ssteve 22112043Speterstatic void *ggate_recv_thread(void *arg); 22212043Speterstatic void *local_send_thread(void *arg); 22312043Speterstatic void *remote_send_thread(void *arg); 22412043Speterstatic void *remote_recv_thread(void *arg); 2251556Srgrimesstatic void *ggate_send_thread(void *arg); 2261556Srgrimesstatic void *sync_thread(void *arg); 2271556Srgrimesstatic void *guard_thread(void *arg); 2281556Srgrimes 2291556Srgrimesstatic void 2301556Srgrimescleanup(struct hast_resource *res) 23120425Ssteve{ 23220425Ssteve int rerrno; 2331556Srgrimes 2341556Srgrimes /* Remember errno. */ 2351556Srgrimes rerrno = errno; 23690111Simp 23720425Ssteve /* 23812043Speter * Close descriptor to /dev/hast/<name> 23912043Speter * to work-around race in the kernel. 24012043Speter */ 24112043Speter close(res->hr_localfd); 2421556Srgrimes 2431556Srgrimes /* Destroy ggate provider if we created one. */ 2441556Srgrimes if (res->hr_ggateunit >= 0) { 2451556Srgrimes struct g_gate_ctl_destroy ggiod; 2461556Srgrimes 2471556Srgrimes ggiod.gctl_version = G_GATE_VERSION; 2481556Srgrimes ggiod.gctl_unit = res->hr_ggateunit; 2491556Srgrimes ggiod.gctl_force = 1; 2501556Srgrimes if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 2511556Srgrimes pjdlog_warning("Unable to destroy hast/%s device", 2521556Srgrimes res->hr_provname); 25312043Speter } 25412043Speter res->hr_ggateunit = -1; 25525225Ssteve } 25612043Speter 25712043Speter /* Restore errno. */ 2581556Srgrimes errno = rerrno; 2591556Srgrimes} 2601556Srgrimes 26112043Speterstatic void 26212043Speterprimary_exit(int exitcode, const char *fmt, ...) 2631556Srgrimes{ 2641556Srgrimes va_list ap; 26512043Speter 26612043Speter assert(exitcode != EX_OK); 26712043Speter va_start(ap, fmt); 26812043Speter pjdlogv_errno(LOG_ERR, fmt, ap); 26912043Speter va_end(ap); 27012043Speter cleanup(gres); 27112043Speter exit(exitcode); 27212043Speter} 2731556Srgrimes 27412043Speterstatic void 27512043Speterprimary_exitx(int exitcode, const char *fmt, ...) 27612043Speter{ 27712043Speter va_list ap; 27812043Speter 27912043Speter va_start(ap, fmt); 28012043Speter pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 2811556Srgrimes va_end(ap); 28212043Speter cleanup(gres); 2831556Srgrimes exit(exitcode); 28412043Speter} 28512043Speter 28612043Speterstatic int 28712043Speterhast_activemap_flush(struct hast_resource *res) 28812043Speter{ 28912043Speter const unsigned char *buf; 29012043Speter size_t size; 29112043Speter 29212043Speter buf = activemap_bitmap(res->hr_amp, &size); 29312043Speter assert(buf != NULL); 2941556Srgrimes assert((size % res->hr_local_sectorsize) == 0); 29512043Speter if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 29612043Speter (ssize_t)size) { 2971556Srgrimes KEEP_ERRNO(pjdlog_errno(LOG_ERR, 2981556Srgrimes "Unable to flush activemap to disk")); 29918018Speter return (-1); 3001556Srgrimes } 30184261Sobrien return (0); 3021556Srgrimes} 30384261Sobrien 30484261Sobrienstatic bool 3051556Srgrimesreal_remote(const struct hast_resource *res) 3061556Srgrimes{ 30718018Speter 30812043Speter return (strcmp(res->hr_remoteaddr, "none") != 0); 3091556Srgrimes} 31012043Speter 3111556Srgrimesstatic void 3121556Srgrimesinit_environment(struct hast_resource *res __unused) 31312043Speter{ 31412043Speter struct hio *hio; 31512043Speter unsigned int ii, ncomps; 3161556Srgrimes sigset_t mask; 3171556Srgrimes 3181556Srgrimes /* 3191556Srgrimes * In the future it might be per-resource value. 320194128Sjilles */ 321194128Sjilles ncomps = HAST_NCOMPONENTS; 322194128Sjilles 323194128Sjilles /* 324194128Sjilles * Allocate memory needed by lists. 325194128Sjilles */ 326194128Sjilles hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 327194128Sjilles if (hio_send_list == NULL) { 328194128Sjilles primary_exitx(EX_TEMPFAIL, 329194128Sjilles "Unable to allocate %zu bytes of memory for send lists.", 330194128Sjilles sizeof(hio_send_list[0]) * ncomps); 331194128Sjilles } 332194128Sjilles hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 333194128Sjilles if (hio_send_list_lock == NULL) { 334194128Sjilles primary_exitx(EX_TEMPFAIL, 335194128Sjilles "Unable to allocate %zu bytes of memory for send list locks.", 336194128Sjilles sizeof(hio_send_list_lock[0]) * ncomps); 3371556Srgrimes } 3381556Srgrimes hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 3391556Srgrimes if (hio_send_list_cond == NULL) { 3401556Srgrimes primary_exitx(EX_TEMPFAIL, 3411556Srgrimes "Unable to allocate %zu bytes of memory for send list condition variables.", 34290111Simp sizeof(hio_send_list_cond[0]) * ncomps); 34390111Simp } 3441556Srgrimes hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 3451556Srgrimes if (hio_recv_list == NULL) { 3461556Srgrimes primary_exitx(EX_TEMPFAIL, 3471556Srgrimes "Unable to allocate %zu bytes of memory for recv lists.", 3481556Srgrimes sizeof(hio_recv_list[0]) * ncomps); 3491556Srgrimes } 3501556Srgrimes hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 3511556Srgrimes if (hio_recv_list_lock == NULL) { 3521556Srgrimes primary_exitx(EX_TEMPFAIL, 35390111Simp "Unable to allocate %zu bytes of memory for recv list locks.", 35490111Simp sizeof(hio_recv_list_lock[0]) * ncomps); 3551556Srgrimes } 3561556Srgrimes hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 3571556Srgrimes if (hio_recv_list_cond == NULL) { 358199629Sjilles primary_exitx(EX_TEMPFAIL, 3591556Srgrimes "Unable to allocate %zu bytes of memory for recv list condition variables.", 3601556Srgrimes sizeof(hio_recv_list_cond[0]) * ncomps); 3611556Srgrimes } 3621556Srgrimes hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 3631556Srgrimes if (hio_remote_lock == NULL) { 3641556Srgrimes primary_exitx(EX_TEMPFAIL, 3651556Srgrimes "Unable to allocate %zu bytes of memory for remote connections locks.", 3661556Srgrimes sizeof(hio_remote_lock[0]) * ncomps); 36712043Speter } 3681556Srgrimes 3691556Srgrimes /* 3701556Srgrimes * Initialize lists, their locks and theirs condition variables. 3711556Srgrimes */ 3721556Srgrimes TAILQ_INIT(&hio_free_list); 3731556Srgrimes mtx_init(&hio_free_list_lock); 3741556Srgrimes cv_init(&hio_free_list_cond); 3751556Srgrimes for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 376231790Sjilles TAILQ_INIT(&hio_send_list[ii]); 37790111Simp mtx_init(&hio_send_list_lock[ii]); 3781556Srgrimes cv_init(&hio_send_list_cond[ii]); 3791556Srgrimes TAILQ_INIT(&hio_recv_list[ii]); 3801556Srgrimes mtx_init(&hio_recv_list_lock[ii]); 3811556Srgrimes cv_init(&hio_recv_list_cond[ii]); 3821556Srgrimes rw_init(&hio_remote_lock[ii]); 3831556Srgrimes } 38412043Speter TAILQ_INIT(&hio_done_list); 385199629Sjilles mtx_init(&hio_done_list_lock); 3861556Srgrimes cv_init(&hio_done_list_cond); 3871556Srgrimes mtx_init(&metadata_lock); 3881556Srgrimes 3891556Srgrimes /* 3901556Srgrimes * Allocate requests pool and initialize requests. 3911556Srgrimes */ 3921556Srgrimes for (ii = 0; ii < HAST_HIO_MAX; ii++) { 3931556Srgrimes hio = malloc(sizeof(*hio)); 3941556Srgrimes if (hio == NULL) { 3951556Srgrimes primary_exitx(EX_TEMPFAIL, 3961556Srgrimes "Unable to allocate %zu bytes of memory for hio request.", 3971556Srgrimes sizeof(*hio)); 3981556Srgrimes } 3991556Srgrimes hio->hio_countdown = 0; 400200956Sjilles hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 40117987Speter if (hio->hio_errors == NULL) { 4021556Srgrimes primary_exitx(EX_TEMPFAIL, 4031556Srgrimes "Unable allocate %zu bytes of memory for hio errors.", 4041556Srgrimes sizeof(hio->hio_errors[0]) * ncomps); 4051556Srgrimes } 4061556Srgrimes hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 407222684Sjilles if (hio->hio_next == NULL) { 4081556Srgrimes primary_exitx(EX_TEMPFAIL, 409124780Sdes "Unable allocate %zu bytes of memory for hio_next field.", 4101556Srgrimes sizeof(hio->hio_next[0]) * ncomps); 4111556Srgrimes } 4121556Srgrimes hio->hio_ggio.gctl_version = G_GATE_VERSION; 4131556Srgrimes hio->hio_ggio.gctl_data = malloc(MAXPHYS); 4141556Srgrimes if (hio->hio_ggio.gctl_data == NULL) { 4151556Srgrimes primary_exitx(EX_TEMPFAIL, 4161556Srgrimes "Unable to allocate %zu bytes of memory for gctl_data.", 4171556Srgrimes MAXPHYS); 4181556Srgrimes } 4191556Srgrimes hio->hio_ggio.gctl_length = MAXPHYS; 4201556Srgrimes hio->hio_ggio.gctl_error = 0; 4211556Srgrimes TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 4221556Srgrimes } 4231556Srgrimes 4241556Srgrimes /* 4251556Srgrimes * Turn on signals handling. 42690111Simp */ 42717987Speter PJDLOG_VERIFY(sigfillset(&mask) == 0); 42825225Ssteve PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 4291556Srgrimes} 4301556Srgrimes 4311556Srgrimesstatic void 4321556Srgrimesinit_local(struct hast_resource *res) 4331556Srgrimes{ 4341556Srgrimes unsigned char *buf; 4351556Srgrimes size_t mapsize; 4361556Srgrimes 4371556Srgrimes if (metadata_read(res, true) < 0) 43812043Speter exit(EX_NOINPUT); 4391556Srgrimes mtx_init(&res->hr_amp_lock); 4401556Srgrimes if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 4411556Srgrimes res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 4421556Srgrimes primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 4431556Srgrimes } 4441556Srgrimes mtx_init(&range_lock); 4451556Srgrimes cv_init(&range_regular_cond); 4461556Srgrimes if (rangelock_init(&range_regular) < 0) 4471556Srgrimes primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 44890111Simp cv_init(&range_sync_cond); 44990111Simp if (rangelock_init(&range_sync) < 0) 4501556Srgrimes primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 4511556Srgrimes mapsize = activemap_ondisk_size(res->hr_amp); 4521556Srgrimes buf = calloc(1, mapsize); 4531556Srgrimes if (buf == NULL) { 45412043Speter primary_exitx(EX_TEMPFAIL, 4551556Srgrimes "Unable to allocate buffer for activemap."); 4561556Srgrimes } 4571556Srgrimes if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 4581556Srgrimes (ssize_t)mapsize) { 4591556Srgrimes primary_exit(EX_NOINPUT, "Unable to read activemap"); 4601556Srgrimes } 4611556Srgrimes activemap_copyin(res->hr_amp, buf, mapsize); 4621556Srgrimes free(buf); 4631556Srgrimes if (res->hr_resuid != 0) 4641556Srgrimes return; 4651556Srgrimes /* 4661556Srgrimes * We're using provider for the first time, so we have to generate 467213811Sobrien * resource unique identifier and initialize local and remote counts. 46890111Simp */ 46990111Simp arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 4701556Srgrimes res->hr_primary_localcnt = 1; 4711556Srgrimes res->hr_primary_remotecnt = 0; 4721556Srgrimes if (metadata_write(res) < 0) 47312043Speter exit(EX_NOINPUT); 4741556Srgrimes} 4751556Srgrimes 4761556Srgrimesstatic bool 4771556Srgrimesinit_remote(struct hast_resource *res, struct proto_conn **inp, 4781556Srgrimes struct proto_conn **outp) 4791556Srgrimes{ 4801556Srgrimes struct proto_conn *in, *out; 4811556Srgrimes struct nv *nvout, *nvin; 4821556Srgrimes const unsigned char *token; 4831556Srgrimes unsigned char *map; 4841556Srgrimes const char *errmsg; 4851556Srgrimes int32_t extentsize; 48690111Simp int64_t datasize; 48790111Simp uint32_t mapsize; 4881556Srgrimes size_t size; 4891556Srgrimes 4901556Srgrimes assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 4911556Srgrimes assert(real_remote(res)); 4921556Srgrimes 4931556Srgrimes in = out = NULL; 4941556Srgrimes errmsg = NULL; 4951556Srgrimes 4961556Srgrimes /* Prepare outgoing connection with remote node. */ 4971556Srgrimes if (proto_client(res->hr_remoteaddr, &out) < 0) { 4981556Srgrimes primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 4991556Srgrimes res->hr_remoteaddr); 50012043Speter } 5011556Srgrimes /* Try to connect, but accept failure. */ 5021556Srgrimes if (proto_connect(out) < 0) { 5031556Srgrimes pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 5041556Srgrimes res->hr_remoteaddr); 5051556Srgrimes goto close; 5061556Srgrimes } 5071556Srgrimes /* Error in setting timeout is not critical, but why should it fail? */ 508199647Sjilles if (proto_timeout(out, res->hr_timeout) < 0) 509199647Sjilles pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 510199647Sjilles /* 511199647Sjilles * First handshake step. 512199647Sjilles * Setup outgoing connection with remote node. 513199647Sjilles */ 514199647Sjilles nvout = nv_alloc(); 515199647Sjilles nv_add_string(nvout, res->hr_name, "resource"); 516199647Sjilles if (nv_error(nvout) != 0) { 517199647Sjilles pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 518199647Sjilles "Unable to allocate header for connection with %s", 519199647Sjilles res->hr_remoteaddr); 520199647Sjilles nv_free(nvout); 521199647Sjilles goto close; 522199647Sjilles } 523199647Sjilles if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 524199647Sjilles pjdlog_errno(LOG_WARNING, 525199647Sjilles "Unable to send handshake header to %s", 526199647Sjilles res->hr_remoteaddr); 527199647Sjilles nv_free(nvout); 528199647Sjilles goto close; 529199647Sjilles } 530199647Sjilles nv_free(nvout); 531199647Sjilles if (hast_proto_recv_hdr(out, &nvin) < 0) { 532199647Sjilles pjdlog_errno(LOG_WARNING, 533199647Sjilles "Unable to receive handshake header from %s", 5341556Srgrimes res->hr_remoteaddr); 5351556Srgrimes goto close; 5361556Srgrimes } 5371556Srgrimes errmsg = nv_get_string(nvin, "errmsg"); 53890111Simp if (errmsg != NULL) { 53990111Simp pjdlog_warning("%s", errmsg); 5401556Srgrimes nv_free(nvin); 5411556Srgrimes goto close; 5421556Srgrimes } 5431556Srgrimes token = nv_get_uint8_array(nvin, &size, "token"); 5441556Srgrimes if (token == NULL) { 5451556Srgrimes pjdlog_warning("Handshake header from %s has no 'token' field.", 5461556Srgrimes res->hr_remoteaddr); 5471556Srgrimes nv_free(nvin); 5481556Srgrimes goto close; 5491556Srgrimes } 5501556Srgrimes if (size != sizeof(res->hr_token)) { 5511556Srgrimes pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 55290111Simp res->hr_remoteaddr, size, sizeof(res->hr_token)); 55390111Simp nv_free(nvin); 5541556Srgrimes goto close; 5551556Srgrimes } 5561556Srgrimes bcopy(token, res->hr_token, sizeof(res->hr_token)); 5571556Srgrimes nv_free(nvin); 5581556Srgrimes 5591556Srgrimes /* 560 * Second handshake step. 561 * Setup incoming connection with remote node. 562 */ 563 if (proto_client(res->hr_remoteaddr, &in) < 0) { 564 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 565 res->hr_remoteaddr); 566 } 567 /* Try to connect, but accept failure. */ 568 if (proto_connect(in) < 0) { 569 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 570 res->hr_remoteaddr); 571 goto close; 572 } 573 /* Error in setting timeout is not critical, but why should it fail? */ 574 if (proto_timeout(in, res->hr_timeout) < 0) 575 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 576 nvout = nv_alloc(); 577 nv_add_string(nvout, res->hr_name, "resource"); 578 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 579 "token"); 580 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 581 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 582 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 583 if (nv_error(nvout) != 0) { 584 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 585 "Unable to allocate header for connection with %s", 586 res->hr_remoteaddr); 587 nv_free(nvout); 588 goto close; 589 } 590 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 591 pjdlog_errno(LOG_WARNING, 592 "Unable to send handshake header to %s", 593 res->hr_remoteaddr); 594 nv_free(nvout); 595 goto close; 596 } 597 nv_free(nvout); 598 if (hast_proto_recv_hdr(out, &nvin) < 0) { 599 pjdlog_errno(LOG_WARNING, 600 "Unable to receive handshake header from %s", 601 res->hr_remoteaddr); 602 goto close; 603 } 604 errmsg = nv_get_string(nvin, "errmsg"); 605 if (errmsg != NULL) { 606 pjdlog_warning("%s", errmsg); 607 nv_free(nvin); 608 goto close; 609 } 610 datasize = nv_get_int64(nvin, "datasize"); 611 if (datasize != res->hr_datasize) { 612 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 613 (intmax_t)res->hr_datasize, (intmax_t)datasize); 614 nv_free(nvin); 615 goto close; 616 } 617 extentsize = nv_get_int32(nvin, "extentsize"); 618 if (extentsize != res->hr_extentsize) { 619 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 620 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 621 nv_free(nvin); 622 goto close; 623 } 624 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 625 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 626 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 627 map = NULL; 628 mapsize = nv_get_uint32(nvin, "mapsize"); 629 if (mapsize > 0) { 630 map = malloc(mapsize); 631 if (map == NULL) { 632 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 633 (uintmax_t)mapsize); 634 nv_free(nvin); 635 goto close; 636 } 637 /* 638 * Remote node have some dirty extents on its own, lets 639 * download its activemap. 640 */ 641 if (hast_proto_recv_data(res, out, nvin, map, 642 mapsize) < 0) { 643 pjdlog_errno(LOG_ERR, 644 "Unable to receive remote activemap"); 645 nv_free(nvin); 646 free(map); 647 goto close; 648 } 649 /* 650 * Merge local and remote bitmaps. 651 */ 652 activemap_merge(res->hr_amp, map, mapsize); 653 free(map); 654 /* 655 * Now that we merged bitmaps from both nodes, flush it to the 656 * disk before we start to synchronize. 657 */ 658 (void)hast_activemap_flush(res); 659 } 660 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 661 if (inp != NULL && outp != NULL) { 662 *inp = in; 663 *outp = out; 664 } else { 665 res->hr_remotein = in; 666 res->hr_remoteout = out; 667 } 668 event_send(res, EVENT_CONNECT); 669 return (true); 670close: 671 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 672 event_send(res, EVENT_SPLITBRAIN); 673 proto_close(out); 674 if (in != NULL) 675 proto_close(in); 676 return (false); 677} 678 679static void 680sync_start(void) 681{ 682 683 mtx_lock(&sync_lock); 684 sync_inprogress = true; 685 mtx_unlock(&sync_lock); 686 cv_signal(&sync_cond); 687} 688 689static void 690sync_stop(void) 691{ 692 693 mtx_lock(&sync_lock); 694 if (sync_inprogress) 695 sync_inprogress = false; 696 mtx_unlock(&sync_lock); 697} 698 699static void 700init_ggate(struct hast_resource *res) 701{ 702 struct g_gate_ctl_create ggiocreate; 703 struct g_gate_ctl_cancel ggiocancel; 704 705 /* 706 * We communicate with ggate via /dev/ggctl. Open it. 707 */ 708 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 709 if (res->hr_ggatefd < 0) 710 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 711 /* 712 * Create provider before trying to connect, as connection failure 713 * is not critical, but may take some time. 714 */ 715 ggiocreate.gctl_version = G_GATE_VERSION; 716 ggiocreate.gctl_mediasize = res->hr_datasize; 717 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 718 ggiocreate.gctl_flags = 0; 719 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 720 ggiocreate.gctl_timeout = 0; 721 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 722 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 723 res->hr_provname); 724 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 725 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 726 pjdlog_info("Device hast/%s created.", res->hr_provname); 727 res->hr_ggateunit = ggiocreate.gctl_unit; 728 return; 729 } 730 if (errno != EEXIST) { 731 primary_exit(EX_OSERR, "Unable to create hast/%s device", 732 res->hr_provname); 733 } 734 pjdlog_debug(1, 735 "Device hast/%s already exists, we will try to take it over.", 736 res->hr_provname); 737 /* 738 * If we received EEXIST, we assume that the process who created the 739 * provider died and didn't clean up. In that case we will start from 740 * where he left of. 741 */ 742 ggiocancel.gctl_version = G_GATE_VERSION; 743 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 744 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 745 res->hr_provname); 746 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 747 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 748 res->hr_ggateunit = ggiocancel.gctl_unit; 749 return; 750 } 751 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 752 res->hr_provname); 753} 754 755void 756hastd_primary(struct hast_resource *res) 757{ 758 pthread_t td; 759 pid_t pid; 760 int error; 761 762 /* 763 * Create communication channel between parent and child. 764 */ 765 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 766 KEEP_ERRNO((void)pidfile_remove(pfh)); 767 pjdlog_exit(EX_OSERR, 768 "Unable to create control sockets between parent and child"); 769 } 770 /* 771 * Create communication channel between child and parent. 772 */ 773 if (proto_client("socketpair://", &res->hr_event) < 0) { 774 KEEP_ERRNO((void)pidfile_remove(pfh)); 775 pjdlog_exit(EX_OSERR, 776 "Unable to create event sockets between child and parent"); 777 } 778 779 pid = fork(); 780 if (pid < 0) { 781 KEEP_ERRNO((void)pidfile_remove(pfh)); 782 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 783 } 784 785 if (pid > 0) { 786 /* This is parent. */ 787 /* Declare that we are receiver. */ 788 proto_recv(res->hr_event, NULL, 0); 789 res->hr_workerpid = pid; 790 return; 791 } 792 793 gres = res; 794 795 (void)pidfile_close(pfh); 796 hook_fini(); 797 798 setproctitle("%s (primary)", res->hr_name); 799 800 signal(SIGHUP, SIG_DFL); 801 signal(SIGCHLD, SIG_DFL); 802 803 /* Declare that we are sender. */ 804 proto_send(res->hr_event, NULL, 0); 805 806 init_local(res); 807 if (real_remote(res) && init_remote(res, NULL, NULL)) 808 sync_start(); 809 init_ggate(res); 810 init_environment(res); 811 error = pthread_create(&td, NULL, ggate_recv_thread, res); 812 assert(error == 0); 813 error = pthread_create(&td, NULL, local_send_thread, res); 814 assert(error == 0); 815 error = pthread_create(&td, NULL, remote_send_thread, res); 816 assert(error == 0); 817 error = pthread_create(&td, NULL, remote_recv_thread, res); 818 assert(error == 0); 819 error = pthread_create(&td, NULL, ggate_send_thread, res); 820 assert(error == 0); 821 error = pthread_create(&td, NULL, sync_thread, res); 822 assert(error == 0); 823 error = pthread_create(&td, NULL, ctrl_thread, res); 824 assert(error == 0); 825 (void)guard_thread(res); 826} 827 828static void 829reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 830{ 831 char msg[1024]; 832 va_list ap; 833 int len; 834 835 va_start(ap, fmt); 836 len = vsnprintf(msg, sizeof(msg), fmt, ap); 837 va_end(ap); 838 if ((size_t)len < sizeof(msg)) { 839 switch (ggio->gctl_cmd) { 840 case BIO_READ: 841 (void)snprintf(msg + len, sizeof(msg) - len, 842 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 843 (uintmax_t)ggio->gctl_length); 844 break; 845 case BIO_DELETE: 846 (void)snprintf(msg + len, sizeof(msg) - len, 847 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 848 (uintmax_t)ggio->gctl_length); 849 break; 850 case BIO_FLUSH: 851 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 852 break; 853 case BIO_WRITE: 854 (void)snprintf(msg + len, sizeof(msg) - len, 855 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 856 (uintmax_t)ggio->gctl_length); 857 break; 858 default: 859 (void)snprintf(msg + len, sizeof(msg) - len, 860 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 861 break; 862 } 863 } 864 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 865} 866 867static void 868remote_close(struct hast_resource *res, int ncomp) 869{ 870 871 rw_wlock(&hio_remote_lock[ncomp]); 872 /* 873 * A race is possible between dropping rlock and acquiring wlock - 874 * another thread can close connection in-between. 875 */ 876 if (!ISCONNECTED(res, ncomp)) { 877 assert(res->hr_remotein == NULL); 878 assert(res->hr_remoteout == NULL); 879 rw_unlock(&hio_remote_lock[ncomp]); 880 return; 881 } 882 883 assert(res->hr_remotein != NULL); 884 assert(res->hr_remoteout != NULL); 885 886 pjdlog_debug(2, "Closing incoming connection to %s.", 887 res->hr_remoteaddr); 888 proto_close(res->hr_remotein); 889 res->hr_remotein = NULL; 890 pjdlog_debug(2, "Closing outgoing connection to %s.", 891 res->hr_remoteaddr); 892 proto_close(res->hr_remoteout); 893 res->hr_remoteout = NULL; 894 895 rw_unlock(&hio_remote_lock[ncomp]); 896 897 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 898 899 /* 900 * Stop synchronization if in-progress. 901 */ 902 sync_stop(); 903 904 event_send(res, EVENT_DISCONNECT); 905} 906 907/* 908 * Thread receives ggate I/O requests from the kernel and passes them to 909 * appropriate threads: 910 * WRITE - always goes to both local_send and remote_send threads 911 * READ (when the block is up-to-date on local component) - 912 * only local_send thread 913 * READ (when the block isn't up-to-date on local component) - 914 * only remote_send thread 915 * DELETE - always goes to both local_send and remote_send threads 916 * FLUSH - always goes to both local_send and remote_send threads 917 */ 918static void * 919ggate_recv_thread(void *arg) 920{ 921 struct hast_resource *res = arg; 922 struct g_gate_ctl_io *ggio; 923 struct hio *hio; 924 unsigned int ii, ncomp, ncomps; 925 int error; 926 927 ncomps = HAST_NCOMPONENTS; 928 929 for (;;) { 930 pjdlog_debug(2, "ggate_recv: Taking free request."); 931 QUEUE_TAKE2(hio, free); 932 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 933 ggio = &hio->hio_ggio; 934 ggio->gctl_unit = res->hr_ggateunit; 935 ggio->gctl_length = MAXPHYS; 936 ggio->gctl_error = 0; 937 pjdlog_debug(2, 938 "ggate_recv: (%p) Waiting for request from the kernel.", 939 hio); 940 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 941 if (sigexit_received) 942 pthread_exit(NULL); 943 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 944 } 945 error = ggio->gctl_error; 946 switch (error) { 947 case 0: 948 break; 949 case ECANCELED: 950 /* Exit gracefully. */ 951 if (!sigexit_received) { 952 pjdlog_debug(2, 953 "ggate_recv: (%p) Received cancel from the kernel.", 954 hio); 955 pjdlog_info("Received cancel from the kernel, exiting."); 956 } 957 pthread_exit(NULL); 958 case ENOMEM: 959 /* 960 * Buffer too small? Impossible, we allocate MAXPHYS 961 * bytes - request can't be bigger than that. 962 */ 963 /* FALLTHROUGH */ 964 case ENXIO: 965 default: 966 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 967 strerror(error)); 968 } 969 for (ii = 0; ii < ncomps; ii++) 970 hio->hio_errors[ii] = EINVAL; 971 reqlog(LOG_DEBUG, 2, ggio, 972 "ggate_recv: (%p) Request received from the kernel: ", 973 hio); 974 /* 975 * Inform all components about new write request. 976 * For read request prefer local component unless the given 977 * range is out-of-date, then use remote component. 978 */ 979 switch (ggio->gctl_cmd) { 980 case BIO_READ: 981 pjdlog_debug(2, 982 "ggate_recv: (%p) Moving request to the send queue.", 983 hio); 984 refcount_init(&hio->hio_countdown, 1); 985 mtx_lock(&metadata_lock); 986 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 987 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 988 /* 989 * This range is up-to-date on local component, 990 * so handle request locally. 991 */ 992 /* Local component is 0 for now. */ 993 ncomp = 0; 994 } else /* if (res->hr_syncsrc == 995 HAST_SYNCSRC_SECONDARY) */ { 996 assert(res->hr_syncsrc == 997 HAST_SYNCSRC_SECONDARY); 998 /* 999 * This range is out-of-date on local component, 1000 * so send request to the remote node. 1001 */ 1002 /* Remote component is 1 for now. */ 1003 ncomp = 1; 1004 } 1005 mtx_unlock(&metadata_lock); 1006 QUEUE_INSERT1(hio, send, ncomp); 1007 break; 1008 case BIO_WRITE: 1009 for (;;) { 1010 mtx_lock(&range_lock); 1011 if (rangelock_islocked(range_sync, 1012 ggio->gctl_offset, ggio->gctl_length)) { 1013 pjdlog_debug(2, 1014 "regular: Range offset=%jd length=%zu locked.", 1015 (intmax_t)ggio->gctl_offset, 1016 (size_t)ggio->gctl_length); 1017 range_regular_wait = true; 1018 cv_wait(&range_regular_cond, &range_lock); 1019 range_regular_wait = false; 1020 mtx_unlock(&range_lock); 1021 continue; 1022 } 1023 if (rangelock_add(range_regular, 1024 ggio->gctl_offset, ggio->gctl_length) < 0) { 1025 mtx_unlock(&range_lock); 1026 pjdlog_debug(2, 1027 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1028 (intmax_t)ggio->gctl_offset, 1029 (size_t)ggio->gctl_length); 1030 sleep(1); 1031 continue; 1032 } 1033 mtx_unlock(&range_lock); 1034 break; 1035 } 1036 mtx_lock(&res->hr_amp_lock); 1037 if (activemap_write_start(res->hr_amp, 1038 ggio->gctl_offset, ggio->gctl_length)) { 1039 (void)hast_activemap_flush(res); 1040 } 1041 mtx_unlock(&res->hr_amp_lock); 1042 /* FALLTHROUGH */ 1043 case BIO_DELETE: 1044 case BIO_FLUSH: 1045 pjdlog_debug(2, 1046 "ggate_recv: (%p) Moving request to the send queues.", 1047 hio); 1048 refcount_init(&hio->hio_countdown, ncomps); 1049 for (ii = 0; ii < ncomps; ii++) 1050 QUEUE_INSERT1(hio, send, ii); 1051 break; 1052 } 1053 } 1054 /* NOTREACHED */ 1055 return (NULL); 1056} 1057 1058/* 1059 * Thread reads from or writes to local component. 1060 * If local read fails, it redirects it to remote_send thread. 1061 */ 1062static void * 1063local_send_thread(void *arg) 1064{ 1065 struct hast_resource *res = arg; 1066 struct g_gate_ctl_io *ggio; 1067 struct hio *hio; 1068 unsigned int ncomp, rncomp; 1069 ssize_t ret; 1070 1071 /* Local component is 0 for now. */ 1072 ncomp = 0; 1073 /* Remote component is 1 for now. */ 1074 rncomp = 1; 1075 1076 for (;;) { 1077 pjdlog_debug(2, "local_send: Taking request."); 1078 QUEUE_TAKE1(hio, send, ncomp); 1079 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1080 ggio = &hio->hio_ggio; 1081 switch (ggio->gctl_cmd) { 1082 case BIO_READ: 1083 ret = pread(res->hr_localfd, ggio->gctl_data, 1084 ggio->gctl_length, 1085 ggio->gctl_offset + res->hr_localoff); 1086 if (ret == ggio->gctl_length) 1087 hio->hio_errors[ncomp] = 0; 1088 else { 1089 /* 1090 * If READ failed, try to read from remote node. 1091 */ 1092 QUEUE_INSERT1(hio, send, rncomp); 1093 continue; 1094 } 1095 break; 1096 case BIO_WRITE: 1097 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1098 ggio->gctl_length, 1099 ggio->gctl_offset + res->hr_localoff); 1100 if (ret < 0) 1101 hio->hio_errors[ncomp] = errno; 1102 else if (ret != ggio->gctl_length) 1103 hio->hio_errors[ncomp] = EIO; 1104 else 1105 hio->hio_errors[ncomp] = 0; 1106 break; 1107 case BIO_DELETE: 1108 ret = g_delete(res->hr_localfd, 1109 ggio->gctl_offset + res->hr_localoff, 1110 ggio->gctl_length); 1111 if (ret < 0) 1112 hio->hio_errors[ncomp] = errno; 1113 else 1114 hio->hio_errors[ncomp] = 0; 1115 break; 1116 case BIO_FLUSH: 1117 ret = g_flush(res->hr_localfd); 1118 if (ret < 0) 1119 hio->hio_errors[ncomp] = errno; 1120 else 1121 hio->hio_errors[ncomp] = 0; 1122 break; 1123 } 1124 if (refcount_release(&hio->hio_countdown)) { 1125 if (ISSYNCREQ(hio)) { 1126 mtx_lock(&sync_lock); 1127 SYNCREQDONE(hio); 1128 mtx_unlock(&sync_lock); 1129 cv_signal(&sync_cond); 1130 } else { 1131 pjdlog_debug(2, 1132 "local_send: (%p) Moving request to the done queue.", 1133 hio); 1134 QUEUE_INSERT2(hio, done); 1135 } 1136 } 1137 } 1138 /* NOTREACHED */ 1139 return (NULL); 1140} 1141 1142/* 1143 * Thread sends request to secondary node. 1144 */ 1145static void * 1146remote_send_thread(void *arg) 1147{ 1148 struct hast_resource *res = arg; 1149 struct g_gate_ctl_io *ggio; 1150 struct hio *hio; 1151 struct nv *nv; 1152 unsigned int ncomp; 1153 bool wakeup; 1154 uint64_t offset, length; 1155 uint8_t cmd; 1156 void *data; 1157 1158 /* Remote component is 1 for now. */ 1159 ncomp = 1; 1160 1161 for (;;) { 1162 pjdlog_debug(2, "remote_send: Taking request."); 1163 QUEUE_TAKE1(hio, send, ncomp); 1164 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1165 ggio = &hio->hio_ggio; 1166 switch (ggio->gctl_cmd) { 1167 case BIO_READ: 1168 cmd = HIO_READ; 1169 data = NULL; 1170 offset = ggio->gctl_offset; 1171 length = ggio->gctl_length; 1172 break; 1173 case BIO_WRITE: 1174 cmd = HIO_WRITE; 1175 data = ggio->gctl_data; 1176 offset = ggio->gctl_offset; 1177 length = ggio->gctl_length; 1178 break; 1179 case BIO_DELETE: 1180 cmd = HIO_DELETE; 1181 data = NULL; 1182 offset = ggio->gctl_offset; 1183 length = ggio->gctl_length; 1184 break; 1185 case BIO_FLUSH: 1186 cmd = HIO_FLUSH; 1187 data = NULL; 1188 offset = 0; 1189 length = 0; 1190 break; 1191 default: 1192 assert(!"invalid condition"); 1193 abort(); 1194 } 1195 nv = nv_alloc(); 1196 nv_add_uint8(nv, cmd, "cmd"); 1197 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1198 nv_add_uint64(nv, offset, "offset"); 1199 nv_add_uint64(nv, length, "length"); 1200 if (nv_error(nv) != 0) { 1201 hio->hio_errors[ncomp] = nv_error(nv); 1202 pjdlog_debug(2, 1203 "remote_send: (%p) Unable to prepare header to send.", 1204 hio); 1205 reqlog(LOG_ERR, 0, ggio, 1206 "Unable to prepare header to send (%s): ", 1207 strerror(nv_error(nv))); 1208 /* Move failed request immediately to the done queue. */ 1209 goto done_queue; 1210 } 1211 pjdlog_debug(2, 1212 "remote_send: (%p) Moving request to the recv queue.", 1213 hio); 1214 /* 1215 * Protect connection from disappearing. 1216 */ 1217 rw_rlock(&hio_remote_lock[ncomp]); 1218 if (!ISCONNECTED(res, ncomp)) { 1219 rw_unlock(&hio_remote_lock[ncomp]); 1220 hio->hio_errors[ncomp] = ENOTCONN; 1221 goto done_queue; 1222 } 1223 /* 1224 * Move the request to recv queue before sending it, because 1225 * in different order we can get reply before we move request 1226 * to recv queue. 1227 */ 1228 mtx_lock(&hio_recv_list_lock[ncomp]); 1229 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1230 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1231 mtx_unlock(&hio_recv_list_lock[ncomp]); 1232 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1233 data != NULL ? length : 0) < 0) { 1234 hio->hio_errors[ncomp] = errno; 1235 rw_unlock(&hio_remote_lock[ncomp]); 1236 pjdlog_debug(2, 1237 "remote_send: (%p) Unable to send request.", hio); 1238 reqlog(LOG_ERR, 0, ggio, 1239 "Unable to send request (%s): ", 1240 strerror(hio->hio_errors[ncomp])); 1241 remote_close(res, ncomp); 1242 /* 1243 * Take request back from the receive queue and move 1244 * it immediately to the done queue. 1245 */ 1246 mtx_lock(&hio_recv_list_lock[ncomp]); 1247 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1248 mtx_unlock(&hio_recv_list_lock[ncomp]); 1249 goto done_queue; 1250 } 1251 rw_unlock(&hio_remote_lock[ncomp]); 1252 nv_free(nv); 1253 if (wakeup) 1254 cv_signal(&hio_recv_list_cond[ncomp]); 1255 continue; 1256done_queue: 1257 nv_free(nv); 1258 if (ISSYNCREQ(hio)) { 1259 if (!refcount_release(&hio->hio_countdown)) 1260 continue; 1261 mtx_lock(&sync_lock); 1262 SYNCREQDONE(hio); 1263 mtx_unlock(&sync_lock); 1264 cv_signal(&sync_cond); 1265 continue; 1266 } 1267 if (ggio->gctl_cmd == BIO_WRITE) { 1268 mtx_lock(&res->hr_amp_lock); 1269 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1270 ggio->gctl_length)) { 1271 (void)hast_activemap_flush(res); 1272 } 1273 mtx_unlock(&res->hr_amp_lock); 1274 } 1275 if (!refcount_release(&hio->hio_countdown)) 1276 continue; 1277 pjdlog_debug(2, 1278 "remote_send: (%p) Moving request to the done queue.", 1279 hio); 1280 QUEUE_INSERT2(hio, done); 1281 } 1282 /* NOTREACHED */ 1283 return (NULL); 1284} 1285 1286/* 1287 * Thread receives answer from secondary node and passes it to ggate_send 1288 * thread. 1289 */ 1290static void * 1291remote_recv_thread(void *arg) 1292{ 1293 struct hast_resource *res = arg; 1294 struct g_gate_ctl_io *ggio; 1295 struct hio *hio; 1296 struct nv *nv; 1297 unsigned int ncomp; 1298 uint64_t seq; 1299 int error; 1300 1301 /* Remote component is 1 for now. */ 1302 ncomp = 1; 1303 1304 for (;;) { 1305 /* Wait until there is anything to receive. */ 1306 mtx_lock(&hio_recv_list_lock[ncomp]); 1307 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1308 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1309 cv_wait(&hio_recv_list_cond[ncomp], 1310 &hio_recv_list_lock[ncomp]); 1311 } 1312 mtx_unlock(&hio_recv_list_lock[ncomp]); 1313 rw_rlock(&hio_remote_lock[ncomp]); 1314 if (!ISCONNECTED(res, ncomp)) { 1315 rw_unlock(&hio_remote_lock[ncomp]); 1316 /* 1317 * Connection is dead, so move all pending requests to 1318 * the done queue (one-by-one). 1319 */ 1320 mtx_lock(&hio_recv_list_lock[ncomp]); 1321 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1322 assert(hio != NULL); 1323 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1324 hio_next[ncomp]); 1325 mtx_unlock(&hio_recv_list_lock[ncomp]); 1326 goto done_queue; 1327 } 1328 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1329 pjdlog_errno(LOG_ERR, 1330 "Unable to receive reply header"); 1331 rw_unlock(&hio_remote_lock[ncomp]); 1332 remote_close(res, ncomp); 1333 continue; 1334 } 1335 rw_unlock(&hio_remote_lock[ncomp]); 1336 seq = nv_get_uint64(nv, "seq"); 1337 if (seq == 0) { 1338 pjdlog_error("Header contains no 'seq' field."); 1339 nv_free(nv); 1340 continue; 1341 } 1342 mtx_lock(&hio_recv_list_lock[ncomp]); 1343 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1344 if (hio->hio_ggio.gctl_seq == seq) { 1345 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1346 hio_next[ncomp]); 1347 break; 1348 } 1349 } 1350 mtx_unlock(&hio_recv_list_lock[ncomp]); 1351 if (hio == NULL) { 1352 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1353 (uintmax_t)seq); 1354 nv_free(nv); 1355 continue; 1356 } 1357 error = nv_get_int16(nv, "error"); 1358 if (error != 0) { 1359 /* Request failed on remote side. */ 1360 hio->hio_errors[ncomp] = 0; 1361 nv_free(nv); 1362 goto done_queue; 1363 } 1364 ggio = &hio->hio_ggio; 1365 switch (ggio->gctl_cmd) { 1366 case BIO_READ: 1367 rw_rlock(&hio_remote_lock[ncomp]); 1368 if (!ISCONNECTED(res, ncomp)) { 1369 rw_unlock(&hio_remote_lock[ncomp]); 1370 nv_free(nv); 1371 goto done_queue; 1372 } 1373 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1374 ggio->gctl_data, ggio->gctl_length) < 0) { 1375 hio->hio_errors[ncomp] = errno; 1376 pjdlog_errno(LOG_ERR, 1377 "Unable to receive reply data"); 1378 rw_unlock(&hio_remote_lock[ncomp]); 1379 nv_free(nv); 1380 remote_close(res, ncomp); 1381 goto done_queue; 1382 } 1383 rw_unlock(&hio_remote_lock[ncomp]); 1384 break; 1385 case BIO_WRITE: 1386 case BIO_DELETE: 1387 case BIO_FLUSH: 1388 break; 1389 default: 1390 assert(!"invalid condition"); 1391 abort(); 1392 } 1393 hio->hio_errors[ncomp] = 0; 1394 nv_free(nv); 1395done_queue: 1396 if (refcount_release(&hio->hio_countdown)) { 1397 if (ISSYNCREQ(hio)) { 1398 mtx_lock(&sync_lock); 1399 SYNCREQDONE(hio); 1400 mtx_unlock(&sync_lock); 1401 cv_signal(&sync_cond); 1402 } else { 1403 pjdlog_debug(2, 1404 "remote_recv: (%p) Moving request to the done queue.", 1405 hio); 1406 QUEUE_INSERT2(hio, done); 1407 } 1408 } 1409 } 1410 /* NOTREACHED */ 1411 return (NULL); 1412} 1413 1414/* 1415 * Thread sends answer to the kernel. 1416 */ 1417static void * 1418ggate_send_thread(void *arg) 1419{ 1420 struct hast_resource *res = arg; 1421 struct g_gate_ctl_io *ggio; 1422 struct hio *hio; 1423 unsigned int ii, ncomp, ncomps; 1424 1425 ncomps = HAST_NCOMPONENTS; 1426 1427 for (;;) { 1428 pjdlog_debug(2, "ggate_send: Taking request."); 1429 QUEUE_TAKE2(hio, done); 1430 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1431 ggio = &hio->hio_ggio; 1432 for (ii = 0; ii < ncomps; ii++) { 1433 if (hio->hio_errors[ii] == 0) { 1434 /* 1435 * One successful request is enough to declare 1436 * success. 1437 */ 1438 ggio->gctl_error = 0; 1439 break; 1440 } 1441 } 1442 if (ii == ncomps) { 1443 /* 1444 * None of the requests were successful. 1445 * Use first error. 1446 */ 1447 ggio->gctl_error = hio->hio_errors[0]; 1448 } 1449 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1450 mtx_lock(&res->hr_amp_lock); 1451 activemap_write_complete(res->hr_amp, 1452 ggio->gctl_offset, ggio->gctl_length); 1453 mtx_unlock(&res->hr_amp_lock); 1454 } 1455 if (ggio->gctl_cmd == BIO_WRITE) { 1456 /* 1457 * Unlock range we locked. 1458 */ 1459 mtx_lock(&range_lock); 1460 rangelock_del(range_regular, ggio->gctl_offset, 1461 ggio->gctl_length); 1462 if (range_sync_wait) 1463 cv_signal(&range_sync_cond); 1464 mtx_unlock(&range_lock); 1465 /* 1466 * Bump local count if this is first write after 1467 * connection failure with remote node. 1468 */ 1469 ncomp = 1; 1470 rw_rlock(&hio_remote_lock[ncomp]); 1471 if (!ISCONNECTED(res, ncomp)) { 1472 mtx_lock(&metadata_lock); 1473 if (res->hr_primary_localcnt == 1474 res->hr_secondary_remotecnt) { 1475 res->hr_primary_localcnt++; 1476 pjdlog_debug(1, 1477 "Increasing localcnt to %ju.", 1478 (uintmax_t)res->hr_primary_localcnt); 1479 (void)metadata_write(res); 1480 } 1481 mtx_unlock(&metadata_lock); 1482 } 1483 rw_unlock(&hio_remote_lock[ncomp]); 1484 } 1485 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1486 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1487 pjdlog_debug(2, 1488 "ggate_send: (%p) Moving request to the free queue.", hio); 1489 QUEUE_INSERT2(hio, free); 1490 } 1491 /* NOTREACHED */ 1492 return (NULL); 1493} 1494 1495/* 1496 * Thread synchronize local and remote components. 1497 */ 1498static void * 1499sync_thread(void *arg __unused) 1500{ 1501 struct hast_resource *res = arg; 1502 struct hio *hio; 1503 struct g_gate_ctl_io *ggio; 1504 unsigned int ii, ncomp, ncomps; 1505 off_t offset, length, synced; 1506 bool dorewind; 1507 int syncext; 1508 1509 ncomps = HAST_NCOMPONENTS; 1510 dorewind = true; 1511 synced = 0; 1512 offset = -1; 1513 1514 for (;;) { 1515 mtx_lock(&sync_lock); 1516 if (offset >= 0 && !sync_inprogress) { 1517 pjdlog_info("Synchronization interrupted. " 1518 "%jd bytes synchronized so far.", 1519 (intmax_t)synced); 1520 event_send(res, EVENT_SYNCINTR); 1521 } 1522 while (!sync_inprogress) { 1523 dorewind = true; 1524 synced = 0; 1525 cv_wait(&sync_cond, &sync_lock); 1526 } 1527 mtx_unlock(&sync_lock); 1528 /* 1529 * Obtain offset at which we should synchronize. 1530 * Rewind synchronization if needed. 1531 */ 1532 mtx_lock(&res->hr_amp_lock); 1533 if (dorewind) 1534 activemap_sync_rewind(res->hr_amp); 1535 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1536 if (syncext != -1) { 1537 /* 1538 * We synchronized entire syncext extent, we can mark 1539 * it as clean now. 1540 */ 1541 if (activemap_extent_complete(res->hr_amp, syncext)) 1542 (void)hast_activemap_flush(res); 1543 } 1544 mtx_unlock(&res->hr_amp_lock); 1545 if (dorewind) { 1546 dorewind = false; 1547 if (offset < 0) 1548 pjdlog_info("Nodes are in sync."); 1549 else { 1550 pjdlog_info("Synchronization started. %ju bytes to go.", 1551 (uintmax_t)(res->hr_extentsize * 1552 activemap_ndirty(res->hr_amp))); 1553 event_send(res, EVENT_SYNCSTART); 1554 } 1555 } 1556 if (offset < 0) { 1557 sync_stop(); 1558 pjdlog_debug(1, "Nothing to synchronize."); 1559 /* 1560 * Synchronization complete, make both localcnt and 1561 * remotecnt equal. 1562 */ 1563 ncomp = 1; 1564 rw_rlock(&hio_remote_lock[ncomp]); 1565 if (ISCONNECTED(res, ncomp)) { 1566 if (synced > 0) { 1567 pjdlog_info("Synchronization complete. " 1568 "%jd bytes synchronized.", 1569 (intmax_t)synced); 1570 event_send(res, EVENT_SYNCDONE); 1571 } 1572 mtx_lock(&metadata_lock); 1573 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1574 res->hr_primary_localcnt = 1575 res->hr_secondary_localcnt; 1576 res->hr_primary_remotecnt = 1577 res->hr_secondary_remotecnt; 1578 pjdlog_debug(1, 1579 "Setting localcnt to %ju and remotecnt to %ju.", 1580 (uintmax_t)res->hr_primary_localcnt, 1581 (uintmax_t)res->hr_secondary_localcnt); 1582 (void)metadata_write(res); 1583 mtx_unlock(&metadata_lock); 1584 } 1585 rw_unlock(&hio_remote_lock[ncomp]); 1586 continue; 1587 } 1588 pjdlog_debug(2, "sync: Taking free request."); 1589 QUEUE_TAKE2(hio, free); 1590 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1591 /* 1592 * Lock the range we are going to synchronize. We don't want 1593 * race where someone writes between our read and write. 1594 */ 1595 for (;;) { 1596 mtx_lock(&range_lock); 1597 if (rangelock_islocked(range_regular, offset, length)) { 1598 pjdlog_debug(2, 1599 "sync: Range offset=%jd length=%jd locked.", 1600 (intmax_t)offset, (intmax_t)length); 1601 range_sync_wait = true; 1602 cv_wait(&range_sync_cond, &range_lock); 1603 range_sync_wait = false; 1604 mtx_unlock(&range_lock); 1605 continue; 1606 } 1607 if (rangelock_add(range_sync, offset, length) < 0) { 1608 mtx_unlock(&range_lock); 1609 pjdlog_debug(2, 1610 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1611 (intmax_t)offset, (intmax_t)length); 1612 sleep(1); 1613 continue; 1614 } 1615 mtx_unlock(&range_lock); 1616 break; 1617 } 1618 /* 1619 * First read the data from synchronization source. 1620 */ 1621 SYNCREQ(hio); 1622 ggio = &hio->hio_ggio; 1623 ggio->gctl_cmd = BIO_READ; 1624 ggio->gctl_offset = offset; 1625 ggio->gctl_length = length; 1626 ggio->gctl_error = 0; 1627 for (ii = 0; ii < ncomps; ii++) 1628 hio->hio_errors[ii] = EINVAL; 1629 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1630 hio); 1631 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1632 hio); 1633 mtx_lock(&metadata_lock); 1634 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1635 /* 1636 * This range is up-to-date on local component, 1637 * so handle request locally. 1638 */ 1639 /* Local component is 0 for now. */ 1640 ncomp = 0; 1641 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1642 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1643 /* 1644 * This range is out-of-date on local component, 1645 * so send request to the remote node. 1646 */ 1647 /* Remote component is 1 for now. */ 1648 ncomp = 1; 1649 } 1650 mtx_unlock(&metadata_lock); 1651 refcount_init(&hio->hio_countdown, 1); 1652 QUEUE_INSERT1(hio, send, ncomp); 1653 1654 /* 1655 * Let's wait for READ to finish. 1656 */ 1657 mtx_lock(&sync_lock); 1658 while (!ISSYNCREQDONE(hio)) 1659 cv_wait(&sync_cond, &sync_lock); 1660 mtx_unlock(&sync_lock); 1661 1662 if (hio->hio_errors[ncomp] != 0) { 1663 pjdlog_error("Unable to read synchronization data: %s.", 1664 strerror(hio->hio_errors[ncomp])); 1665 goto free_queue; 1666 } 1667 1668 /* 1669 * We read the data from synchronization source, now write it 1670 * to synchronization target. 1671 */ 1672 SYNCREQ(hio); 1673 ggio->gctl_cmd = BIO_WRITE; 1674 for (ii = 0; ii < ncomps; ii++) 1675 hio->hio_errors[ii] = EINVAL; 1676 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1677 hio); 1678 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1679 hio); 1680 mtx_lock(&metadata_lock); 1681 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1682 /* 1683 * This range is up-to-date on local component, 1684 * so we update remote component. 1685 */ 1686 /* Remote component is 1 for now. */ 1687 ncomp = 1; 1688 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1689 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1690 /* 1691 * This range is out-of-date on local component, 1692 * so we update it. 1693 */ 1694 /* Local component is 0 for now. */ 1695 ncomp = 0; 1696 } 1697 mtx_unlock(&metadata_lock); 1698 1699 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1700 hio); 1701 refcount_init(&hio->hio_countdown, 1); 1702 QUEUE_INSERT1(hio, send, ncomp); 1703 1704 /* 1705 * Let's wait for WRITE to finish. 1706 */ 1707 mtx_lock(&sync_lock); 1708 while (!ISSYNCREQDONE(hio)) 1709 cv_wait(&sync_cond, &sync_lock); 1710 mtx_unlock(&sync_lock); 1711 1712 if (hio->hio_errors[ncomp] != 0) { 1713 pjdlog_error("Unable to write synchronization data: %s.", 1714 strerror(hio->hio_errors[ncomp])); 1715 goto free_queue; 1716 } 1717 1718 synced += length; 1719free_queue: 1720 mtx_lock(&range_lock); 1721 rangelock_del(range_sync, offset, length); 1722 if (range_regular_wait) 1723 cv_signal(&range_regular_cond); 1724 mtx_unlock(&range_lock); 1725 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1726 hio); 1727 QUEUE_INSERT2(hio, free); 1728 } 1729 /* NOTREACHED */ 1730 return (NULL); 1731} 1732 1733static void 1734config_reload(void) 1735{ 1736 struct hastd_config *newcfg; 1737 struct hast_resource *res; 1738 unsigned int ii, ncomps; 1739 int modified; 1740 1741 pjdlog_info("Reloading configuration..."); 1742 1743 ncomps = HAST_NCOMPONENTS; 1744 1745 newcfg = yy_config_parse(cfgpath, false); 1746 if (newcfg == NULL) 1747 goto failed; 1748 1749 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1750 if (strcmp(res->hr_name, gres->hr_name) == 0) 1751 break; 1752 } 1753 /* 1754 * If resource was removed from the configuration file, resource 1755 * name, provider name or path to local component was modified we 1756 * shouldn't be here. This means that someone modified configuration 1757 * file and send SIGHUP to us instead of main hastd process. 1758 * Log advice and ignore the signal. 1759 */ 1760 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1761 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1762 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1763 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1764 (unsigned int)getppid()); 1765 goto failed; 1766 } 1767 1768#define MODIFIED_REMOTEADDR 0x1 1769#define MODIFIED_REPLICATION 0x2 1770#define MODIFIED_TIMEOUT 0x4 1771#define MODIFIED_EXEC 0x8 1772 modified = 0; 1773 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1774 /* 1775 * Don't copy res->hr_remoteaddr to gres just yet. 1776 * We want remote_close() to log disconnect from the old 1777 * addresses, not from the new ones. 1778 */ 1779 modified |= MODIFIED_REMOTEADDR; 1780 } 1781 if (gres->hr_replication != res->hr_replication) { 1782 gres->hr_replication = res->hr_replication; 1783 modified |= MODIFIED_REPLICATION; 1784 } 1785 if (gres->hr_timeout != res->hr_timeout) { 1786 gres->hr_timeout = res->hr_timeout; 1787 modified |= MODIFIED_TIMEOUT; 1788 } 1789 if (strcmp(gres->hr_exec, res->hr_exec) != 0) { 1790 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec)); 1791 modified |= MODIFIED_EXEC; 1792 } 1793 /* 1794 * If only timeout was modified we only need to change it without 1795 * reconnecting. 1796 */ 1797 if (modified == MODIFIED_TIMEOUT) { 1798 for (ii = 0; ii < ncomps; ii++) { 1799 if (!ISREMOTE(ii)) 1800 continue; 1801 rw_rlock(&hio_remote_lock[ii]); 1802 if (!ISCONNECTED(gres, ii)) { 1803 rw_unlock(&hio_remote_lock[ii]); 1804 continue; 1805 } 1806 rw_unlock(&hio_remote_lock[ii]); 1807 if (proto_timeout(gres->hr_remotein, 1808 gres->hr_timeout) < 0) { 1809 pjdlog_errno(LOG_WARNING, 1810 "Unable to set connection timeout"); 1811 } 1812 if (proto_timeout(gres->hr_remoteout, 1813 gres->hr_timeout) < 0) { 1814 pjdlog_errno(LOG_WARNING, 1815 "Unable to set connection timeout"); 1816 } 1817 } 1818 } else if ((modified & 1819 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1820 for (ii = 0; ii < ncomps; ii++) { 1821 if (!ISREMOTE(ii)) 1822 continue; 1823 remote_close(gres, ii); 1824 } 1825 if (modified & MODIFIED_REMOTEADDR) { 1826 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1827 sizeof(gres->hr_remoteaddr)); 1828 } 1829 } 1830#undef MODIFIED_REMOTEADDR 1831#undef MODIFIED_REPLICATION 1832#undef MODIFIED_TIMEOUT 1833#undef MODIFIED_EXEC 1834 1835 pjdlog_info("Configuration reloaded successfully."); 1836 return; 1837failed: 1838 if (newcfg != NULL) { 1839 if (newcfg->hc_controlconn != NULL) 1840 proto_close(newcfg->hc_controlconn); 1841 if (newcfg->hc_listenconn != NULL) 1842 proto_close(newcfg->hc_listenconn); 1843 yy_config_free(newcfg); 1844 } 1845 pjdlog_warning("Configuration not reloaded."); 1846} 1847 1848static void 1849keepalive_send(struct hast_resource *res, unsigned int ncomp) 1850{ 1851 struct nv *nv; 1852 1853 nv = nv_alloc(); 1854 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1855 if (nv_error(nv) != 0) { 1856 nv_free(nv); 1857 pjdlog_debug(1, 1858 "keepalive_send: Unable to prepare header to send."); 1859 return; 1860 } 1861 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1862 pjdlog_common(LOG_DEBUG, 1, errno, 1863 "keepalive_send: Unable to send request"); 1864 nv_free(nv); 1865 rw_unlock(&hio_remote_lock[ncomp]); 1866 remote_close(res, ncomp); 1867 rw_rlock(&hio_remote_lock[ncomp]); 1868 return; 1869 } 1870 nv_free(nv); 1871 pjdlog_debug(2, "keepalive_send: Request sent."); 1872} 1873 1874static void 1875guard_one(struct hast_resource *res, unsigned int ncomp) 1876{ 1877 struct proto_conn *in, *out; 1878 1879 if (!ISREMOTE(ncomp)) 1880 return; 1881 1882 rw_rlock(&hio_remote_lock[ncomp]); 1883 1884 if (!real_remote(res)) { 1885 rw_unlock(&hio_remote_lock[ncomp]); 1886 return; 1887 } 1888 1889 if (ISCONNECTED(res, ncomp)) { 1890 assert(res->hr_remotein != NULL); 1891 assert(res->hr_remoteout != NULL); 1892 keepalive_send(res, ncomp); 1893 } 1894 1895 if (ISCONNECTED(res, ncomp)) { 1896 assert(res->hr_remotein != NULL); 1897 assert(res->hr_remoteout != NULL); 1898 rw_unlock(&hio_remote_lock[ncomp]); 1899 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1900 res->hr_remoteaddr); 1901 return; 1902 } 1903 1904 assert(res->hr_remotein == NULL); 1905 assert(res->hr_remoteout == NULL); 1906 /* 1907 * Upgrade the lock. It doesn't have to be atomic as no other thread 1908 * can change connection status from disconnected to connected. 1909 */ 1910 rw_unlock(&hio_remote_lock[ncomp]); 1911 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1912 res->hr_remoteaddr); 1913 in = out = NULL; 1914 if (init_remote(res, &in, &out)) { 1915 rw_wlock(&hio_remote_lock[ncomp]); 1916 assert(res->hr_remotein == NULL); 1917 assert(res->hr_remoteout == NULL); 1918 assert(in != NULL && out != NULL); 1919 res->hr_remotein = in; 1920 res->hr_remoteout = out; 1921 rw_unlock(&hio_remote_lock[ncomp]); 1922 pjdlog_info("Successfully reconnected to %s.", 1923 res->hr_remoteaddr); 1924 sync_start(); 1925 } else { 1926 /* Both connections should be NULL. */ 1927 assert(res->hr_remotein == NULL); 1928 assert(res->hr_remoteout == NULL); 1929 assert(in == NULL && out == NULL); 1930 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 1931 res->hr_remoteaddr); 1932 } 1933} 1934 1935/* 1936 * Thread guards remote connections and reconnects when needed, handles 1937 * signals, etc. 1938 */ 1939static void * 1940guard_thread(void *arg) 1941{ 1942 struct hast_resource *res = arg; 1943 unsigned int ii, ncomps; 1944 struct timespec timeout; 1945 time_t lastcheck, now; 1946 sigset_t mask; 1947 int signo; 1948 1949 ncomps = HAST_NCOMPONENTS; 1950 lastcheck = time(NULL); 1951 1952 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1953 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1954 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1955 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 1956 1957 timeout.tv_nsec = 0; 1958 signo = -1; 1959 1960 for (;;) { 1961 switch (signo) { 1962 case SIGHUP: 1963 config_reload(); 1964 break; 1965 case SIGINT: 1966 case SIGTERM: 1967 sigexit_received = true; 1968 primary_exitx(EX_OK, 1969 "Termination signal received, exiting."); 1970 break; 1971 default: 1972 break; 1973 } 1974 1975 pjdlog_debug(2, "remote_guard: Checking connections."); 1976 now = time(NULL); 1977 if (lastcheck + RETRY_SLEEP <= now) { 1978 for (ii = 0; ii < ncomps; ii++) 1979 guard_one(res, ii); 1980 lastcheck = now; 1981 } 1982 timeout.tv_sec = RETRY_SLEEP; 1983 signo = sigtimedwait(&mask, NULL, &timeout); 1984 } 1985 /* NOTREACHED */ 1986 return (NULL); 1987} 1988