Deleted Added
full compact
secondary.c (230092) secondary.c (246922)
1/*-
2 * Copyright (c) 2009-2010 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4 * All rights reserved.
5 *
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31#include <sys/cdefs.h>
1/*-
2 * Copyright (c) 2009-2010 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4 * All rights reserved.
5 *
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31#include <sys/cdefs.h>
32__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 230092 2012-01-13 23:25:35Z pjd $");
32__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 246922 2013-02-17 21:12:34Z pjd $");
33
34#include <sys/param.h>
35#include <sys/time.h>
36#include <sys/bio.h>
37#include <sys/disk.h>
38#include <sys/stat.h>
39
40#include <err.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <libgeom.h>
44#include <pthread.h>
45#include <signal.h>
46#include <stdint.h>
47#include <stdio.h>
48#include <string.h>
49#include <sysexits.h>
50#include <unistd.h>
51
52#include <activemap.h>
53#include <nv.h>
54#include <pjdlog.h>
55
56#include "control.h"
57#include "event.h"
58#include "hast.h"
59#include "hast_proto.h"
60#include "hastd.h"
61#include "hooks.h"
62#include "metadata.h"
63#include "proto.h"
64#include "subr.h"
65#include "synch.h"
66
67struct hio {
68 uint64_t hio_seq;
69 int hio_error;
70 void *hio_data;
71 uint8_t hio_cmd;
72 uint64_t hio_offset;
73 uint64_t hio_length;
33
34#include <sys/param.h>
35#include <sys/time.h>
36#include <sys/bio.h>
37#include <sys/disk.h>
38#include <sys/stat.h>
39
40#include <err.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <libgeom.h>
44#include <pthread.h>
45#include <signal.h>
46#include <stdint.h>
47#include <stdio.h>
48#include <string.h>
49#include <sysexits.h>
50#include <unistd.h>
51
52#include <activemap.h>
53#include <nv.h>
54#include <pjdlog.h>
55
56#include "control.h"
57#include "event.h"
58#include "hast.h"
59#include "hast_proto.h"
60#include "hastd.h"
61#include "hooks.h"
62#include "metadata.h"
63#include "proto.h"
64#include "subr.h"
65#include "synch.h"
66
67struct hio {
68 uint64_t hio_seq;
69 int hio_error;
70 void *hio_data;
71 uint8_t hio_cmd;
72 uint64_t hio_offset;
73 uint64_t hio_length;
74 bool hio_memsync;
74 TAILQ_ENTRY(hio) hio_next;
75};
76
77static struct hast_resource *gres;
78
79/*
80 * Free list holds unused structures. When free list is empty, we have to wait
81 * until some in-progress requests are freed.
82 */
83static TAILQ_HEAD(, hio) hio_free_list;
84static pthread_mutex_t hio_free_list_lock;
85static pthread_cond_t hio_free_list_cond;
86/*
87 * Disk thread (the one that do I/O requests) takes requests from this list.
88 */
89static TAILQ_HEAD(, hio) hio_disk_list;
90static pthread_mutex_t hio_disk_list_lock;
91static pthread_cond_t hio_disk_list_cond;
92/*
93 * There is one recv list for every component, although local components don't
94 * use recv lists as local requests are done synchronously.
95 */
96static TAILQ_HEAD(, hio) hio_send_list;
97static pthread_mutex_t hio_send_list_lock;
98static pthread_cond_t hio_send_list_cond;
99
100/*
101 * Maximum number of outstanding I/O requests.
102 */
103#define HAST_HIO_MAX 256
104
105static void *recv_thread(void *arg);
106static void *disk_thread(void *arg);
107static void *send_thread(void *arg);
108
109#define QUEUE_INSERT(name, hio) do { \
110 bool _wakeup; \
111 \
112 mtx_lock(&hio_##name##_list_lock); \
113 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \
114 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \
115 mtx_unlock(&hio_##name##_list_lock); \
116 if (_wakeup) \
117 cv_signal(&hio_##name##_list_cond); \
118} while (0)
119#define QUEUE_TAKE(name, hio) do { \
120 mtx_lock(&hio_##name##_list_lock); \
121 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
122 cv_wait(&hio_##name##_list_cond, \
123 &hio_##name##_list_lock); \
124 } \
125 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \
126 mtx_unlock(&hio_##name##_list_lock); \
127} while (0)
128
129static void
130hio_clear(struct hio *hio)
131{
132
133 hio->hio_seq = 0;
134 hio->hio_error = 0;
135 hio->hio_cmd = HIO_UNDEF;
136 hio->hio_offset = 0;
137 hio->hio_length = 0;
75 TAILQ_ENTRY(hio) hio_next;
76};
77
78static struct hast_resource *gres;
79
80/*
81 * Free list holds unused structures. When free list is empty, we have to wait
82 * until some in-progress requests are freed.
83 */
84static TAILQ_HEAD(, hio) hio_free_list;
85static pthread_mutex_t hio_free_list_lock;
86static pthread_cond_t hio_free_list_cond;
87/*
88 * Disk thread (the one that do I/O requests) takes requests from this list.
89 */
90static TAILQ_HEAD(, hio) hio_disk_list;
91static pthread_mutex_t hio_disk_list_lock;
92static pthread_cond_t hio_disk_list_cond;
93/*
94 * There is one recv list for every component, although local components don't
95 * use recv lists as local requests are done synchronously.
96 */
97static TAILQ_HEAD(, hio) hio_send_list;
98static pthread_mutex_t hio_send_list_lock;
99static pthread_cond_t hio_send_list_cond;
100
101/*
102 * Maximum number of outstanding I/O requests.
103 */
104#define HAST_HIO_MAX 256
105
106static void *recv_thread(void *arg);
107static void *disk_thread(void *arg);
108static void *send_thread(void *arg);
109
110#define QUEUE_INSERT(name, hio) do { \
111 bool _wakeup; \
112 \
113 mtx_lock(&hio_##name##_list_lock); \
114 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \
115 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \
116 mtx_unlock(&hio_##name##_list_lock); \
117 if (_wakeup) \
118 cv_signal(&hio_##name##_list_cond); \
119} while (0)
120#define QUEUE_TAKE(name, hio) do { \
121 mtx_lock(&hio_##name##_list_lock); \
122 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
123 cv_wait(&hio_##name##_list_cond, \
124 &hio_##name##_list_lock); \
125 } \
126 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \
127 mtx_unlock(&hio_##name##_list_lock); \
128} while (0)
129
130static void
131hio_clear(struct hio *hio)
132{
133
134 hio->hio_seq = 0;
135 hio->hio_error = 0;
136 hio->hio_cmd = HIO_UNDEF;
137 hio->hio_offset = 0;
138 hio->hio_length = 0;
139 hio->hio_memsync = false;
138}
139
140static void
140}
141
142static void
143hio_copy(const struct hio *srchio, struct hio *dsthio)
144{
145
146 /*
147 * We don't copy hio_error, hio_data and hio_next fields.
148 */
149
150 dsthio->hio_seq = srchio->hio_seq;
151 dsthio->hio_cmd = srchio->hio_cmd;
152 dsthio->hio_offset = srchio->hio_offset;
153 dsthio->hio_length = srchio->hio_length;
154 dsthio->hio_memsync = srchio->hio_memsync;
155}
156
157static void
141init_environment(void)
142{
143 struct hio *hio;
144 unsigned int ii;
145
146 /*
147 * Initialize lists, their locks and theirs condition variables.
148 */
149 TAILQ_INIT(&hio_free_list);
150 mtx_init(&hio_free_list_lock);
151 cv_init(&hio_free_list_cond);
152 TAILQ_INIT(&hio_disk_list);
153 mtx_init(&hio_disk_list_lock);
154 cv_init(&hio_disk_list_cond);
155 TAILQ_INIT(&hio_send_list);
156 mtx_init(&hio_send_list_lock);
157 cv_init(&hio_send_list_cond);
158
159 /*
160 * Allocate requests pool and initialize requests.
161 */
162 for (ii = 0; ii < HAST_HIO_MAX; ii++) {
163 hio = malloc(sizeof(*hio));
164 if (hio == NULL) {
165 pjdlog_exitx(EX_TEMPFAIL,
166 "Unable to allocate memory (%zu bytes) for hio request.",
167 sizeof(*hio));
168 }
169 hio->hio_data = malloc(MAXPHYS);
170 if (hio->hio_data == NULL) {
171 pjdlog_exitx(EX_TEMPFAIL,
172 "Unable to allocate memory (%zu bytes) for gctl_data.",
173 (size_t)MAXPHYS);
174 }
175 hio_clear(hio);
176 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
177 }
178}
179
180static void
181init_local(struct hast_resource *res)
182{
183
184 if (metadata_read(res, true) == -1)
185 exit(EX_NOINPUT);
186}
187
188static void
189init_remote(struct hast_resource *res, struct nv *nvin)
190{
191 uint64_t resuid;
192 struct nv *nvout;
193 unsigned char *map;
194 size_t mapsize;
195
196#ifdef notyet
197 /* Setup direction. */
198 if (proto_send(res->hr_remoteout, NULL, 0) == -1)
199 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
200#endif
201
202 nvout = nv_alloc();
203 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
204 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
205 resuid = nv_get_uint64(nvin, "resuid");
206 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
207 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
208 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
209 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
210 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
211 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
212 map = malloc(mapsize);
213 if (map == NULL) {
214 pjdlog_exitx(EX_TEMPFAIL,
215 "Unable to allocate memory (%zu bytes) for activemap.",
216 mapsize);
217 }
218 /*
219 * When we work as primary and secondary is missing we will increase
220 * localcnt in our metadata. When secondary is connected and synced
221 * we make localcnt be equal to remotecnt, which means nodes are more
222 * or less in sync.
223 * Split-brain condition is when both nodes are not able to communicate
224 * and are both configured as primary nodes. In turn, they can both
225 * make incompatible changes to the data and we have to detect that.
226 * Under split-brain condition we will increase our localcnt on first
227 * write and remote node will increase its localcnt on first write.
228 * When we connect we can see that primary's localcnt is greater than
229 * our remotecnt (primary was modified while we weren't watching) and
230 * our localcnt is greater than primary's remotecnt (we were modified
231 * while primary wasn't watching).
232 * There are many possible combinations which are all gathered below.
233 * Don't pay too much attention to exact numbers, the more important
234 * is to compare them. We compare secondary's local with primary's
235 * remote and secondary's remote with primary's local.
236 * Note that every case where primary's localcnt is smaller than
237 * secondary's remotecnt and where secondary's localcnt is smaller than
238 * primary's remotecnt should be impossible in practise. We will perform
239 * full synchronization then. Those cases are marked with an asterisk.
240 * Regular synchronization means that only extents marked as dirty are
241 * synchronized (regular synchronization).
242 *
243 * SECONDARY METADATA PRIMARY METADATA
244 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary.
245 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary.
246 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary.
247 * local=3 remote=3 local=3 remote=2 Primary is out-of-date,
248 * regular sync from secondary.
249 * local=3 remote=3 local=3 remote=3 Regular sync just in case.
250 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary.
251 * local=3 remote=3 local=4 remote=2 Split-brain condition.
252 * local=3 remote=3 local=4 remote=3 Secondary out-of-date,
253 * regular sync from primary.
254 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary.
255 */
256 if (res->hr_resuid == 0) {
257 /*
258 * Provider is used for the first time. If primary node done no
259 * writes yet as well (we will find "virgin" argument) then
260 * there is no need to synchronize anything. If primary node
261 * done any writes already we have to synchronize everything.
262 */
263 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
264 res->hr_resuid = resuid;
265 if (metadata_write(res) == -1)
266 exit(EX_NOINPUT);
267 if (nv_exists(nvin, "virgin")) {
268 free(map);
269 map = NULL;
270 mapsize = 0;
271 } else {
272 memset(map, 0xff, mapsize);
273 }
274 nv_add_int8(nvout, 1, "virgin");
275 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
276 } else if (res->hr_resuid != resuid) {
277 char errmsg[256];
278
279 free(map);
280 (void)snprintf(errmsg, sizeof(errmsg),
281 "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
282 (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
283 pjdlog_error("%s", errmsg);
284 nv_add_string(nvout, errmsg, "errmsg");
285 if (hast_proto_send(res, res->hr_remotein, nvout,
286 NULL, 0) == -1) {
287 pjdlog_exit(EX_TEMPFAIL,
288 "Unable to send response to %s",
289 res->hr_remoteaddr);
290 }
291 nv_free(nvout);
292 exit(EX_CONFIG);
293 } else if (
294 /* Is primary out-of-date? */
295 (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
296 res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
297 /* Are the nodes more or less in sync? */
298 (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
299 res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
300 /* Is secondary out-of-date? */
301 (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
302 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
303 /*
304 * Nodes are more or less in sync or one of the nodes is
305 * out-of-date.
306 * It doesn't matter at this point which one, we just have to
307 * send out local bitmap to the remote node.
308 */
309 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
310 (ssize_t)mapsize) {
311 pjdlog_exit(LOG_ERR, "Unable to read activemap");
312 }
313 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
314 res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
315 /* Primary is out-of-date, sync from secondary. */
316 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
317 } else {
318 /*
319 * Secondary is out-of-date or counts match.
320 * Sync from primary.
321 */
322 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
323 }
324 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
325 res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
326 /*
327 * Not good, we have split-brain condition.
328 */
329 free(map);
330 pjdlog_error("Split-brain detected, exiting.");
331 nv_add_string(nvout, "Split-brain condition!", "errmsg");
332 if (hast_proto_send(res, res->hr_remotein, nvout,
333 NULL, 0) == -1) {
334 pjdlog_exit(EX_TEMPFAIL,
335 "Unable to send response to %s",
336 res->hr_remoteaddr);
337 }
338 nv_free(nvout);
339 /* Exit on split-brain. */
340 event_send(res, EVENT_SPLITBRAIN);
341 exit(EX_CONFIG);
342 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
343 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
344 /*
345 * This should never happen in practise, but we will perform
346 * full synchronization.
347 */
348 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
349 res->hr_primary_localcnt < res->hr_secondary_remotecnt);
350 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
351 METADATA_SIZE, res->hr_extentsize,
352 res->hr_local_sectorsize);
353 memset(map, 0xff, mapsize);
354 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
355 /* In this one of five cases sync from secondary. */
356 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
357 } else {
358 /* For the rest four cases sync from primary. */
359 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
360 }
361 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
362 (uintmax_t)res->hr_primary_localcnt,
363 (uintmax_t)res->hr_primary_remotecnt,
364 (uintmax_t)res->hr_secondary_localcnt,
365 (uintmax_t)res->hr_secondary_remotecnt);
366 }
367 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
368 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
369 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
370 res->hr_remoteaddr);
371 }
372 if (map != NULL)
373 free(map);
374 nv_free(nvout);
375#ifdef notyet
376 /* Setup direction. */
377 if (proto_recv(res->hr_remotein, NULL, 0) == -1)
378 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
379#endif
380}
381
382void
383hastd_secondary(struct hast_resource *res, struct nv *nvin)
384{
385 sigset_t mask;
386 pthread_t td;
387 pid_t pid;
388 int error, mode, debuglevel;
389
390 /*
391 * Create communication channel between parent and child.
392 */
393 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
394 KEEP_ERRNO((void)pidfile_remove(pfh));
395 pjdlog_exit(EX_OSERR,
396 "Unable to create control sockets between parent and child");
397 }
398 /*
399 * Create communication channel between child and parent.
400 */
401 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
402 KEEP_ERRNO((void)pidfile_remove(pfh));
403 pjdlog_exit(EX_OSERR,
404 "Unable to create event sockets between child and parent");
405 }
406
407 pid = fork();
408 if (pid == -1) {
409 KEEP_ERRNO((void)pidfile_remove(pfh));
410 pjdlog_exit(EX_OSERR, "Unable to fork");
411 }
412
413 if (pid > 0) {
414 /* This is parent. */
415 proto_close(res->hr_remotein);
416 res->hr_remotein = NULL;
417 proto_close(res->hr_remoteout);
418 res->hr_remoteout = NULL;
419 /* Declare that we are receiver. */
420 proto_recv(res->hr_event, NULL, 0);
421 /* Declare that we are sender. */
422 proto_send(res->hr_ctrl, NULL, 0);
423 res->hr_workerpid = pid;
424 return;
425 }
426
427 gres = res;
428 mode = pjdlog_mode_get();
429 debuglevel = pjdlog_debug_get();
430
431 /* Declare that we are sender. */
432 proto_send(res->hr_event, NULL, 0);
433 /* Declare that we are receiver. */
434 proto_recv(res->hr_ctrl, NULL, 0);
435 descriptors_cleanup(res);
436
437 descriptors_assert(res, mode);
438
439 pjdlog_init(mode);
440 pjdlog_debug_set(debuglevel);
441 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
442 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
443
444 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
445 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
446
447 /* Error in setting timeout is not critical, but why should it fail? */
448 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
449 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
450 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
451 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
452
453 init_local(res);
454 init_environment();
455
456 if (drop_privs(res) != 0)
457 exit(EX_CONFIG);
458 pjdlog_info("Privileges successfully dropped.");
459
460 /*
461 * Create the control thread before sending any event to the parent,
462 * as we can deadlock when parent sends control request to worker,
463 * but worker has no control thread started yet, so parent waits.
464 * In the meantime worker sends an event to the parent, but parent
465 * is unable to handle the event, because it waits for control
466 * request response.
467 */
468 error = pthread_create(&td, NULL, ctrl_thread, res);
469 PJDLOG_ASSERT(error == 0);
470
471 init_remote(res, nvin);
472 event_send(res, EVENT_CONNECT);
473
474 error = pthread_create(&td, NULL, recv_thread, res);
475 PJDLOG_ASSERT(error == 0);
476 error = pthread_create(&td, NULL, disk_thread, res);
477 PJDLOG_ASSERT(error == 0);
478 (void)send_thread(res);
479}
480
481static void
482reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
483 const char *fmt, ...)
484{
485 char msg[1024];
486 va_list ap;
487 int len;
488
489 va_start(ap, fmt);
490 len = vsnprintf(msg, sizeof(msg), fmt, ap);
491 va_end(ap);
492 if ((size_t)len < sizeof(msg)) {
493 switch (hio->hio_cmd) {
494 case HIO_READ:
495 (void)snprintf(msg + len, sizeof(msg) - len,
496 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
497 (uintmax_t)hio->hio_length);
498 break;
499 case HIO_DELETE:
500 (void)snprintf(msg + len, sizeof(msg) - len,
501 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
502 (uintmax_t)hio->hio_length);
503 break;
504 case HIO_FLUSH:
505 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
506 break;
507 case HIO_WRITE:
508 (void)snprintf(msg + len, sizeof(msg) - len,
509 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
510 (uintmax_t)hio->hio_length);
511 break;
512 case HIO_KEEPALIVE:
513 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
514 break;
515 default:
516 (void)snprintf(msg + len, sizeof(msg) - len,
517 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
518 break;
519 }
520 }
521 pjdlog_common(loglevel, debuglevel, error, "%s", msg);
522}
523
524static int
525requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
526{
527
528 hio->hio_cmd = nv_get_uint8(nv, "cmd");
529 if (hio->hio_cmd == 0) {
530 pjdlog_error("Header contains no 'cmd' field.");
531 hio->hio_error = EINVAL;
532 goto end;
533 }
534 if (hio->hio_cmd != HIO_KEEPALIVE) {
535 hio->hio_seq = nv_get_uint64(nv, "seq");
536 if (hio->hio_seq == 0) {
537 pjdlog_error("Header contains no 'seq' field.");
538 hio->hio_error = EINVAL;
539 goto end;
540 }
541 }
542 switch (hio->hio_cmd) {
543 case HIO_FLUSH:
544 case HIO_KEEPALIVE:
545 break;
158init_environment(void)
159{
160 struct hio *hio;
161 unsigned int ii;
162
163 /*
164 * Initialize lists, their locks and theirs condition variables.
165 */
166 TAILQ_INIT(&hio_free_list);
167 mtx_init(&hio_free_list_lock);
168 cv_init(&hio_free_list_cond);
169 TAILQ_INIT(&hio_disk_list);
170 mtx_init(&hio_disk_list_lock);
171 cv_init(&hio_disk_list_cond);
172 TAILQ_INIT(&hio_send_list);
173 mtx_init(&hio_send_list_lock);
174 cv_init(&hio_send_list_cond);
175
176 /*
177 * Allocate requests pool and initialize requests.
178 */
179 for (ii = 0; ii < HAST_HIO_MAX; ii++) {
180 hio = malloc(sizeof(*hio));
181 if (hio == NULL) {
182 pjdlog_exitx(EX_TEMPFAIL,
183 "Unable to allocate memory (%zu bytes) for hio request.",
184 sizeof(*hio));
185 }
186 hio->hio_data = malloc(MAXPHYS);
187 if (hio->hio_data == NULL) {
188 pjdlog_exitx(EX_TEMPFAIL,
189 "Unable to allocate memory (%zu bytes) for gctl_data.",
190 (size_t)MAXPHYS);
191 }
192 hio_clear(hio);
193 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
194 }
195}
196
197static void
198init_local(struct hast_resource *res)
199{
200
201 if (metadata_read(res, true) == -1)
202 exit(EX_NOINPUT);
203}
204
205static void
206init_remote(struct hast_resource *res, struct nv *nvin)
207{
208 uint64_t resuid;
209 struct nv *nvout;
210 unsigned char *map;
211 size_t mapsize;
212
213#ifdef notyet
214 /* Setup direction. */
215 if (proto_send(res->hr_remoteout, NULL, 0) == -1)
216 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
217#endif
218
219 nvout = nv_alloc();
220 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
221 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
222 resuid = nv_get_uint64(nvin, "resuid");
223 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
224 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
225 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
226 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
227 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
228 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
229 map = malloc(mapsize);
230 if (map == NULL) {
231 pjdlog_exitx(EX_TEMPFAIL,
232 "Unable to allocate memory (%zu bytes) for activemap.",
233 mapsize);
234 }
235 /*
236 * When we work as primary and secondary is missing we will increase
237 * localcnt in our metadata. When secondary is connected and synced
238 * we make localcnt be equal to remotecnt, which means nodes are more
239 * or less in sync.
240 * Split-brain condition is when both nodes are not able to communicate
241 * and are both configured as primary nodes. In turn, they can both
242 * make incompatible changes to the data and we have to detect that.
243 * Under split-brain condition we will increase our localcnt on first
244 * write and remote node will increase its localcnt on first write.
245 * When we connect we can see that primary's localcnt is greater than
246 * our remotecnt (primary was modified while we weren't watching) and
247 * our localcnt is greater than primary's remotecnt (we were modified
248 * while primary wasn't watching).
249 * There are many possible combinations which are all gathered below.
250 * Don't pay too much attention to exact numbers, the more important
251 * is to compare them. We compare secondary's local with primary's
252 * remote and secondary's remote with primary's local.
253 * Note that every case where primary's localcnt is smaller than
254 * secondary's remotecnt and where secondary's localcnt is smaller than
255 * primary's remotecnt should be impossible in practise. We will perform
256 * full synchronization then. Those cases are marked with an asterisk.
257 * Regular synchronization means that only extents marked as dirty are
258 * synchronized (regular synchronization).
259 *
260 * SECONDARY METADATA PRIMARY METADATA
261 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary.
262 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary.
263 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary.
264 * local=3 remote=3 local=3 remote=2 Primary is out-of-date,
265 * regular sync from secondary.
266 * local=3 remote=3 local=3 remote=3 Regular sync just in case.
267 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary.
268 * local=3 remote=3 local=4 remote=2 Split-brain condition.
269 * local=3 remote=3 local=4 remote=3 Secondary out-of-date,
270 * regular sync from primary.
271 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary.
272 */
273 if (res->hr_resuid == 0) {
274 /*
275 * Provider is used for the first time. If primary node done no
276 * writes yet as well (we will find "virgin" argument) then
277 * there is no need to synchronize anything. If primary node
278 * done any writes already we have to synchronize everything.
279 */
280 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
281 res->hr_resuid = resuid;
282 if (metadata_write(res) == -1)
283 exit(EX_NOINPUT);
284 if (nv_exists(nvin, "virgin")) {
285 free(map);
286 map = NULL;
287 mapsize = 0;
288 } else {
289 memset(map, 0xff, mapsize);
290 }
291 nv_add_int8(nvout, 1, "virgin");
292 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
293 } else if (res->hr_resuid != resuid) {
294 char errmsg[256];
295
296 free(map);
297 (void)snprintf(errmsg, sizeof(errmsg),
298 "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
299 (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
300 pjdlog_error("%s", errmsg);
301 nv_add_string(nvout, errmsg, "errmsg");
302 if (hast_proto_send(res, res->hr_remotein, nvout,
303 NULL, 0) == -1) {
304 pjdlog_exit(EX_TEMPFAIL,
305 "Unable to send response to %s",
306 res->hr_remoteaddr);
307 }
308 nv_free(nvout);
309 exit(EX_CONFIG);
310 } else if (
311 /* Is primary out-of-date? */
312 (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
313 res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
314 /* Are the nodes more or less in sync? */
315 (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
316 res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
317 /* Is secondary out-of-date? */
318 (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
319 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
320 /*
321 * Nodes are more or less in sync or one of the nodes is
322 * out-of-date.
323 * It doesn't matter at this point which one, we just have to
324 * send out local bitmap to the remote node.
325 */
326 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
327 (ssize_t)mapsize) {
328 pjdlog_exit(LOG_ERR, "Unable to read activemap");
329 }
330 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
331 res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
332 /* Primary is out-of-date, sync from secondary. */
333 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
334 } else {
335 /*
336 * Secondary is out-of-date or counts match.
337 * Sync from primary.
338 */
339 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
340 }
341 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
342 res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
343 /*
344 * Not good, we have split-brain condition.
345 */
346 free(map);
347 pjdlog_error("Split-brain detected, exiting.");
348 nv_add_string(nvout, "Split-brain condition!", "errmsg");
349 if (hast_proto_send(res, res->hr_remotein, nvout,
350 NULL, 0) == -1) {
351 pjdlog_exit(EX_TEMPFAIL,
352 "Unable to send response to %s",
353 res->hr_remoteaddr);
354 }
355 nv_free(nvout);
356 /* Exit on split-brain. */
357 event_send(res, EVENT_SPLITBRAIN);
358 exit(EX_CONFIG);
359 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
360 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
361 /*
362 * This should never happen in practise, but we will perform
363 * full synchronization.
364 */
365 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
366 res->hr_primary_localcnt < res->hr_secondary_remotecnt);
367 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
368 METADATA_SIZE, res->hr_extentsize,
369 res->hr_local_sectorsize);
370 memset(map, 0xff, mapsize);
371 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
372 /* In this one of five cases sync from secondary. */
373 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
374 } else {
375 /* For the rest four cases sync from primary. */
376 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
377 }
378 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
379 (uintmax_t)res->hr_primary_localcnt,
380 (uintmax_t)res->hr_primary_remotecnt,
381 (uintmax_t)res->hr_secondary_localcnt,
382 (uintmax_t)res->hr_secondary_remotecnt);
383 }
384 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
385 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
386 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
387 res->hr_remoteaddr);
388 }
389 if (map != NULL)
390 free(map);
391 nv_free(nvout);
392#ifdef notyet
393 /* Setup direction. */
394 if (proto_recv(res->hr_remotein, NULL, 0) == -1)
395 pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
396#endif
397}
398
399void
400hastd_secondary(struct hast_resource *res, struct nv *nvin)
401{
402 sigset_t mask;
403 pthread_t td;
404 pid_t pid;
405 int error, mode, debuglevel;
406
407 /*
408 * Create communication channel between parent and child.
409 */
410 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
411 KEEP_ERRNO((void)pidfile_remove(pfh));
412 pjdlog_exit(EX_OSERR,
413 "Unable to create control sockets between parent and child");
414 }
415 /*
416 * Create communication channel between child and parent.
417 */
418 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
419 KEEP_ERRNO((void)pidfile_remove(pfh));
420 pjdlog_exit(EX_OSERR,
421 "Unable to create event sockets between child and parent");
422 }
423
424 pid = fork();
425 if (pid == -1) {
426 KEEP_ERRNO((void)pidfile_remove(pfh));
427 pjdlog_exit(EX_OSERR, "Unable to fork");
428 }
429
430 if (pid > 0) {
431 /* This is parent. */
432 proto_close(res->hr_remotein);
433 res->hr_remotein = NULL;
434 proto_close(res->hr_remoteout);
435 res->hr_remoteout = NULL;
436 /* Declare that we are receiver. */
437 proto_recv(res->hr_event, NULL, 0);
438 /* Declare that we are sender. */
439 proto_send(res->hr_ctrl, NULL, 0);
440 res->hr_workerpid = pid;
441 return;
442 }
443
444 gres = res;
445 mode = pjdlog_mode_get();
446 debuglevel = pjdlog_debug_get();
447
448 /* Declare that we are sender. */
449 proto_send(res->hr_event, NULL, 0);
450 /* Declare that we are receiver. */
451 proto_recv(res->hr_ctrl, NULL, 0);
452 descriptors_cleanup(res);
453
454 descriptors_assert(res, mode);
455
456 pjdlog_init(mode);
457 pjdlog_debug_set(debuglevel);
458 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
459 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
460
461 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
462 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
463
464 /* Error in setting timeout is not critical, but why should it fail? */
465 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
466 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
467 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
468 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
469
470 init_local(res);
471 init_environment();
472
473 if (drop_privs(res) != 0)
474 exit(EX_CONFIG);
475 pjdlog_info("Privileges successfully dropped.");
476
477 /*
478 * Create the control thread before sending any event to the parent,
479 * as we can deadlock when parent sends control request to worker,
480 * but worker has no control thread started yet, so parent waits.
481 * In the meantime worker sends an event to the parent, but parent
482 * is unable to handle the event, because it waits for control
483 * request response.
484 */
485 error = pthread_create(&td, NULL, ctrl_thread, res);
486 PJDLOG_ASSERT(error == 0);
487
488 init_remote(res, nvin);
489 event_send(res, EVENT_CONNECT);
490
491 error = pthread_create(&td, NULL, recv_thread, res);
492 PJDLOG_ASSERT(error == 0);
493 error = pthread_create(&td, NULL, disk_thread, res);
494 PJDLOG_ASSERT(error == 0);
495 (void)send_thread(res);
496}
497
498static void
499reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
500 const char *fmt, ...)
501{
502 char msg[1024];
503 va_list ap;
504 int len;
505
506 va_start(ap, fmt);
507 len = vsnprintf(msg, sizeof(msg), fmt, ap);
508 va_end(ap);
509 if ((size_t)len < sizeof(msg)) {
510 switch (hio->hio_cmd) {
511 case HIO_READ:
512 (void)snprintf(msg + len, sizeof(msg) - len,
513 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
514 (uintmax_t)hio->hio_length);
515 break;
516 case HIO_DELETE:
517 (void)snprintf(msg + len, sizeof(msg) - len,
518 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
519 (uintmax_t)hio->hio_length);
520 break;
521 case HIO_FLUSH:
522 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
523 break;
524 case HIO_WRITE:
525 (void)snprintf(msg + len, sizeof(msg) - len,
526 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
527 (uintmax_t)hio->hio_length);
528 break;
529 case HIO_KEEPALIVE:
530 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
531 break;
532 default:
533 (void)snprintf(msg + len, sizeof(msg) - len,
534 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
535 break;
536 }
537 }
538 pjdlog_common(loglevel, debuglevel, error, "%s", msg);
539}
540
541static int
542requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
543{
544
545 hio->hio_cmd = nv_get_uint8(nv, "cmd");
546 if (hio->hio_cmd == 0) {
547 pjdlog_error("Header contains no 'cmd' field.");
548 hio->hio_error = EINVAL;
549 goto end;
550 }
551 if (hio->hio_cmd != HIO_KEEPALIVE) {
552 hio->hio_seq = nv_get_uint64(nv, "seq");
553 if (hio->hio_seq == 0) {
554 pjdlog_error("Header contains no 'seq' field.");
555 hio->hio_error = EINVAL;
556 goto end;
557 }
558 }
559 switch (hio->hio_cmd) {
560 case HIO_FLUSH:
561 case HIO_KEEPALIVE:
562 break;
546 case HIO_READ:
547 case HIO_WRITE:
563 case HIO_WRITE:
564 hio->hio_memsync = nv_exists(nv, "memsync");
565 /* FALLTHROUGH */
566 case HIO_READ:
548 case HIO_DELETE:
549 hio->hio_offset = nv_get_uint64(nv, "offset");
550 if (nv_error(nv) != 0) {
551 pjdlog_error("Header is missing 'offset' field.");
552 hio->hio_error = EINVAL;
553 goto end;
554 }
555 hio->hio_length = nv_get_uint64(nv, "length");
556 if (nv_error(nv) != 0) {
557 pjdlog_error("Header is missing 'length' field.");
558 hio->hio_error = EINVAL;
559 goto end;
560 }
561 if (hio->hio_length == 0) {
562 pjdlog_error("Data length is zero.");
563 hio->hio_error = EINVAL;
564 goto end;
565 }
566 if (hio->hio_length > MAXPHYS) {
567 pjdlog_error("Data length is too large (%ju > %ju).",
568 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
569 hio->hio_error = EINVAL;
570 goto end;
571 }
572 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
573 pjdlog_error("Offset %ju is not multiple of sector size.",
574 (uintmax_t)hio->hio_offset);
575 hio->hio_error = EINVAL;
576 goto end;
577 }
578 if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
579 pjdlog_error("Length %ju is not multiple of sector size.",
580 (uintmax_t)hio->hio_length);
581 hio->hio_error = EINVAL;
582 goto end;
583 }
584 if (hio->hio_offset + hio->hio_length >
585 (uint64_t)res->hr_datasize) {
586 pjdlog_error("Data offset is too large (%ju > %ju).",
587 (uintmax_t)(hio->hio_offset + hio->hio_length),
588 (uintmax_t)res->hr_datasize);
589 hio->hio_error = EINVAL;
590 goto end;
591 }
592 break;
593 default:
594 pjdlog_error("Header contains invalid 'cmd' (%hhu).",
595 hio->hio_cmd);
596 hio->hio_error = EINVAL;
597 goto end;
598 }
599 hio->hio_error = 0;
600end:
601 return (hio->hio_error);
602}
603
604static __dead2 void
605secondary_exit(int exitcode, const char *fmt, ...)
606{
607 va_list ap;
608
609 PJDLOG_ASSERT(exitcode != EX_OK);
610 va_start(ap, fmt);
611 pjdlogv_errno(LOG_ERR, fmt, ap);
612 va_end(ap);
613 event_send(gres, EVENT_DISCONNECT);
614 exit(exitcode);
615}
616
617/*
618 * Thread receives requests from the primary node.
619 */
620static void *
621recv_thread(void *arg)
622{
623 struct hast_resource *res = arg;
567 case HIO_DELETE:
568 hio->hio_offset = nv_get_uint64(nv, "offset");
569 if (nv_error(nv) != 0) {
570 pjdlog_error("Header is missing 'offset' field.");
571 hio->hio_error = EINVAL;
572 goto end;
573 }
574 hio->hio_length = nv_get_uint64(nv, "length");
575 if (nv_error(nv) != 0) {
576 pjdlog_error("Header is missing 'length' field.");
577 hio->hio_error = EINVAL;
578 goto end;
579 }
580 if (hio->hio_length == 0) {
581 pjdlog_error("Data length is zero.");
582 hio->hio_error = EINVAL;
583 goto end;
584 }
585 if (hio->hio_length > MAXPHYS) {
586 pjdlog_error("Data length is too large (%ju > %ju).",
587 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
588 hio->hio_error = EINVAL;
589 goto end;
590 }
591 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
592 pjdlog_error("Offset %ju is not multiple of sector size.",
593 (uintmax_t)hio->hio_offset);
594 hio->hio_error = EINVAL;
595 goto end;
596 }
597 if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
598 pjdlog_error("Length %ju is not multiple of sector size.",
599 (uintmax_t)hio->hio_length);
600 hio->hio_error = EINVAL;
601 goto end;
602 }
603 if (hio->hio_offset + hio->hio_length >
604 (uint64_t)res->hr_datasize) {
605 pjdlog_error("Data offset is too large (%ju > %ju).",
606 (uintmax_t)(hio->hio_offset + hio->hio_length),
607 (uintmax_t)res->hr_datasize);
608 hio->hio_error = EINVAL;
609 goto end;
610 }
611 break;
612 default:
613 pjdlog_error("Header contains invalid 'cmd' (%hhu).",
614 hio->hio_cmd);
615 hio->hio_error = EINVAL;
616 goto end;
617 }
618 hio->hio_error = 0;
619end:
620 return (hio->hio_error);
621}
622
623static __dead2 void
624secondary_exit(int exitcode, const char *fmt, ...)
625{
626 va_list ap;
627
628 PJDLOG_ASSERT(exitcode != EX_OK);
629 va_start(ap, fmt);
630 pjdlogv_errno(LOG_ERR, fmt, ap);
631 va_end(ap);
632 event_send(gres, EVENT_DISCONNECT);
633 exit(exitcode);
634}
635
636/*
637 * Thread receives requests from the primary node.
638 */
639static void *
640recv_thread(void *arg)
641{
642 struct hast_resource *res = arg;
624 struct hio *hio;
643 struct hio *hio, *mshio;
625 struct nv *nv;
626
627 for (;;) {
628 pjdlog_debug(2, "recv: Taking free request.");
629 QUEUE_TAKE(free, hio);
630 pjdlog_debug(2, "recv: (%p) Got request.", hio);
631 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
632 secondary_exit(EX_TEMPFAIL,
633 "Unable to receive request header");
634 }
635 if (requnpack(res, hio, nv) != 0) {
636 nv_free(nv);
637 pjdlog_debug(2,
638 "recv: (%p) Moving request to the send queue.",
639 hio);
640 QUEUE_INSERT(send, hio);
641 continue;
642 }
643 switch (hio->hio_cmd) {
644 case HIO_READ:
645 res->hr_stat_read++;
646 break;
647 case HIO_WRITE:
648 res->hr_stat_write++;
649 break;
650 case HIO_DELETE:
651 res->hr_stat_delete++;
652 break;
653 case HIO_FLUSH:
654 res->hr_stat_flush++;
655 break;
656 case HIO_KEEPALIVE:
657 break;
658 default:
659 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
660 hio->hio_cmd);
661 }
662 reqlog(LOG_DEBUG, 2, -1, hio,
663 "recv: (%p) Got request header: ", hio);
664 if (hio->hio_cmd == HIO_KEEPALIVE) {
665 nv_free(nv);
666 pjdlog_debug(2,
667 "recv: (%p) Moving request to the free queue.",
668 hio);
669 hio_clear(hio);
670 QUEUE_INSERT(free, hio);
671 continue;
672 } else if (hio->hio_cmd == HIO_WRITE) {
673 if (hast_proto_recv_data(res, res->hr_remotein, nv,
674 hio->hio_data, MAXPHYS) == -1) {
675 secondary_exit(EX_TEMPFAIL,
676 "Unable to receive request data");
677 }
644 struct nv *nv;
645
646 for (;;) {
647 pjdlog_debug(2, "recv: Taking free request.");
648 QUEUE_TAKE(free, hio);
649 pjdlog_debug(2, "recv: (%p) Got request.", hio);
650 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
651 secondary_exit(EX_TEMPFAIL,
652 "Unable to receive request header");
653 }
654 if (requnpack(res, hio, nv) != 0) {
655 nv_free(nv);
656 pjdlog_debug(2,
657 "recv: (%p) Moving request to the send queue.",
658 hio);
659 QUEUE_INSERT(send, hio);
660 continue;
661 }
662 switch (hio->hio_cmd) {
663 case HIO_READ:
664 res->hr_stat_read++;
665 break;
666 case HIO_WRITE:
667 res->hr_stat_write++;
668 break;
669 case HIO_DELETE:
670 res->hr_stat_delete++;
671 break;
672 case HIO_FLUSH:
673 res->hr_stat_flush++;
674 break;
675 case HIO_KEEPALIVE:
676 break;
677 default:
678 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
679 hio->hio_cmd);
680 }
681 reqlog(LOG_DEBUG, 2, -1, hio,
682 "recv: (%p) Got request header: ", hio);
683 if (hio->hio_cmd == HIO_KEEPALIVE) {
684 nv_free(nv);
685 pjdlog_debug(2,
686 "recv: (%p) Moving request to the free queue.",
687 hio);
688 hio_clear(hio);
689 QUEUE_INSERT(free, hio);
690 continue;
691 } else if (hio->hio_cmd == HIO_WRITE) {
692 if (hast_proto_recv_data(res, res->hr_remotein, nv,
693 hio->hio_data, MAXPHYS) == -1) {
694 secondary_exit(EX_TEMPFAIL,
695 "Unable to receive request data");
696 }
697 if (hio->hio_memsync) {
698 /*
699 * For memsync requests we expect two replies.
700 * Clone the hio so we can handle both of them.
701 */
702 pjdlog_debug(2, "recv: Taking free request.");
703 QUEUE_TAKE(free, mshio);
704 pjdlog_debug(2, "recv: (%p) Got request.",
705 mshio);
706 hio_copy(hio, mshio);
707 mshio->hio_error = 0;
708 /*
709 * We want to keep 'memsync' tag only on the
710 * request going onto send queue (mshio).
711 */
712 hio->hio_memsync = false;
713 pjdlog_debug(2,
714 "recv: (%p) Moving memsync request to the send queue.",
715 mshio);
716 QUEUE_INSERT(send, mshio);
717 }
678 }
679 nv_free(nv);
680 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
681 hio);
682 QUEUE_INSERT(disk, hio);
683 }
684 /* NOTREACHED */
685 return (NULL);
686}
687
688/*
689 * Thread reads from or writes to local component and also handles DELETE and
690 * FLUSH requests.
691 */
692static void *
693disk_thread(void *arg)
694{
695 struct hast_resource *res = arg;
696 struct hio *hio;
697 ssize_t ret;
698 bool clear_activemap, logerror;
699
700 clear_activemap = true;
701
702 for (;;) {
703 pjdlog_debug(2, "disk: Taking request.");
704 QUEUE_TAKE(disk, hio);
705 while (clear_activemap) {
706 unsigned char *map;
707 size_t mapsize;
708
709 /*
710 * When first request is received, it means that primary
711 * already received our activemap, merged it and stored
712 * locally. We can now safely clear our activemap.
713 */
714 mapsize =
715 activemap_calc_ondisk_size(res->hr_local_mediasize -
716 METADATA_SIZE, res->hr_extentsize,
717 res->hr_local_sectorsize);
718 map = calloc(1, mapsize);
719 if (map == NULL) {
720 pjdlog_warning("Unable to allocate memory to clear local activemap.");
721 break;
722 }
723 if (pwrite(res->hr_localfd, map, mapsize,
724 METADATA_SIZE) != (ssize_t)mapsize) {
725 pjdlog_errno(LOG_WARNING,
726 "Unable to store cleared activemap");
727 free(map);
728 break;
729 }
730 free(map);
731 clear_activemap = false;
732 pjdlog_debug(1, "Local activemap cleared.");
733 break;
734 }
735 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
736 logerror = true;
737 /* Handle the actual request. */
738 switch (hio->hio_cmd) {
739 case HIO_READ:
740 ret = pread(res->hr_localfd, hio->hio_data,
741 hio->hio_length,
742 hio->hio_offset + res->hr_localoff);
743 if (ret == -1)
744 hio->hio_error = errno;
745 else if (ret != (int64_t)hio->hio_length)
746 hio->hio_error = EIO;
747 else
748 hio->hio_error = 0;
749 break;
750 case HIO_WRITE:
751 ret = pwrite(res->hr_localfd, hio->hio_data,
752 hio->hio_length,
753 hio->hio_offset + res->hr_localoff);
754 if (ret == -1)
755 hio->hio_error = errno;
756 else if (ret != (int64_t)hio->hio_length)
757 hio->hio_error = EIO;
758 else
759 hio->hio_error = 0;
760 break;
761 case HIO_DELETE:
762 ret = g_delete(res->hr_localfd,
763 hio->hio_offset + res->hr_localoff,
764 hio->hio_length);
765 if (ret == -1)
766 hio->hio_error = errno;
767 else
768 hio->hio_error = 0;
769 break;
770 case HIO_FLUSH:
771 if (!res->hr_localflush) {
772 ret = -1;
773 hio->hio_error = EOPNOTSUPP;
774 logerror = false;
775 break;
776 }
777 ret = g_flush(res->hr_localfd);
778 if (ret == -1) {
779 if (errno == EOPNOTSUPP)
780 res->hr_localflush = false;
781 hio->hio_error = errno;
782 } else {
783 hio->hio_error = 0;
784 }
785 break;
786 default:
787 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
788 hio->hio_cmd);
789 }
790 if (logerror && hio->hio_error != 0) {
791 reqlog(LOG_ERR, 0, hio->hio_error, hio,
792 "Request failed: ");
793 }
794 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
795 hio);
796 QUEUE_INSERT(send, hio);
797 }
798 /* NOTREACHED */
799 return (NULL);
800}
801
802/*
803 * Thread sends requests back to primary node.
804 */
805static void *
806send_thread(void *arg)
807{
808 struct hast_resource *res = arg;
809 struct nv *nvout;
810 struct hio *hio;
811 void *data;
812 size_t length;
813
814 for (;;) {
815 pjdlog_debug(2, "send: Taking request.");
816 QUEUE_TAKE(send, hio);
817 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
818 nvout = nv_alloc();
819 /* Copy sequence number. */
820 nv_add_uint64(nvout, hio->hio_seq, "seq");
718 }
719 nv_free(nv);
720 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
721 hio);
722 QUEUE_INSERT(disk, hio);
723 }
724 /* NOTREACHED */
725 return (NULL);
726}
727
728/*
729 * Thread reads from or writes to local component and also handles DELETE and
730 * FLUSH requests.
731 */
732static void *
733disk_thread(void *arg)
734{
735 struct hast_resource *res = arg;
736 struct hio *hio;
737 ssize_t ret;
738 bool clear_activemap, logerror;
739
740 clear_activemap = true;
741
742 for (;;) {
743 pjdlog_debug(2, "disk: Taking request.");
744 QUEUE_TAKE(disk, hio);
745 while (clear_activemap) {
746 unsigned char *map;
747 size_t mapsize;
748
749 /*
750 * When first request is received, it means that primary
751 * already received our activemap, merged it and stored
752 * locally. We can now safely clear our activemap.
753 */
754 mapsize =
755 activemap_calc_ondisk_size(res->hr_local_mediasize -
756 METADATA_SIZE, res->hr_extentsize,
757 res->hr_local_sectorsize);
758 map = calloc(1, mapsize);
759 if (map == NULL) {
760 pjdlog_warning("Unable to allocate memory to clear local activemap.");
761 break;
762 }
763 if (pwrite(res->hr_localfd, map, mapsize,
764 METADATA_SIZE) != (ssize_t)mapsize) {
765 pjdlog_errno(LOG_WARNING,
766 "Unable to store cleared activemap");
767 free(map);
768 break;
769 }
770 free(map);
771 clear_activemap = false;
772 pjdlog_debug(1, "Local activemap cleared.");
773 break;
774 }
775 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
776 logerror = true;
777 /* Handle the actual request. */
778 switch (hio->hio_cmd) {
779 case HIO_READ:
780 ret = pread(res->hr_localfd, hio->hio_data,
781 hio->hio_length,
782 hio->hio_offset + res->hr_localoff);
783 if (ret == -1)
784 hio->hio_error = errno;
785 else if (ret != (int64_t)hio->hio_length)
786 hio->hio_error = EIO;
787 else
788 hio->hio_error = 0;
789 break;
790 case HIO_WRITE:
791 ret = pwrite(res->hr_localfd, hio->hio_data,
792 hio->hio_length,
793 hio->hio_offset + res->hr_localoff);
794 if (ret == -1)
795 hio->hio_error = errno;
796 else if (ret != (int64_t)hio->hio_length)
797 hio->hio_error = EIO;
798 else
799 hio->hio_error = 0;
800 break;
801 case HIO_DELETE:
802 ret = g_delete(res->hr_localfd,
803 hio->hio_offset + res->hr_localoff,
804 hio->hio_length);
805 if (ret == -1)
806 hio->hio_error = errno;
807 else
808 hio->hio_error = 0;
809 break;
810 case HIO_FLUSH:
811 if (!res->hr_localflush) {
812 ret = -1;
813 hio->hio_error = EOPNOTSUPP;
814 logerror = false;
815 break;
816 }
817 ret = g_flush(res->hr_localfd);
818 if (ret == -1) {
819 if (errno == EOPNOTSUPP)
820 res->hr_localflush = false;
821 hio->hio_error = errno;
822 } else {
823 hio->hio_error = 0;
824 }
825 break;
826 default:
827 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
828 hio->hio_cmd);
829 }
830 if (logerror && hio->hio_error != 0) {
831 reqlog(LOG_ERR, 0, hio->hio_error, hio,
832 "Request failed: ");
833 }
834 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
835 hio);
836 QUEUE_INSERT(send, hio);
837 }
838 /* NOTREACHED */
839 return (NULL);
840}
841
842/*
843 * Thread sends requests back to primary node.
844 */
845static void *
846send_thread(void *arg)
847{
848 struct hast_resource *res = arg;
849 struct nv *nvout;
850 struct hio *hio;
851 void *data;
852 size_t length;
853
854 for (;;) {
855 pjdlog_debug(2, "send: Taking request.");
856 QUEUE_TAKE(send, hio);
857 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
858 nvout = nv_alloc();
859 /* Copy sequence number. */
860 nv_add_uint64(nvout, hio->hio_seq, "seq");
861 if (hio->hio_memsync) {
862 PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
863 nv_add_int8(nvout, 1, "received");
864 }
821 switch (hio->hio_cmd) {
822 case HIO_READ:
823 if (hio->hio_error == 0) {
824 data = hio->hio_data;
825 length = hio->hio_length;
826 break;
827 }
828 /*
829 * We send no data in case of an error.
830 */
831 /* FALLTHROUGH */
832 case HIO_DELETE:
833 case HIO_FLUSH:
834 case HIO_WRITE:
835 data = NULL;
836 length = 0;
837 break;
838 default:
839 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
840 hio->hio_cmd);
841 }
842 if (hio->hio_error != 0)
843 nv_add_int16(nvout, hio->hio_error, "error");
844 if (hast_proto_send(res, res->hr_remoteout, nvout, data,
845 length) == -1) {
846 secondary_exit(EX_TEMPFAIL, "Unable to send reply");
847 }
848 nv_free(nvout);
849 pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
850 hio);
851 hio_clear(hio);
852 QUEUE_INSERT(free, hio);
853 }
854 /* NOTREACHED */
855 return (NULL);
856}
865 switch (hio->hio_cmd) {
866 case HIO_READ:
867 if (hio->hio_error == 0) {
868 data = hio->hio_data;
869 length = hio->hio_length;
870 break;
871 }
872 /*
873 * We send no data in case of an error.
874 */
875 /* FALLTHROUGH */
876 case HIO_DELETE:
877 case HIO_FLUSH:
878 case HIO_WRITE:
879 data = NULL;
880 length = 0;
881 break;
882 default:
883 PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
884 hio->hio_cmd);
885 }
886 if (hio->hio_error != 0)
887 nv_add_int16(nvout, hio->hio_error, "error");
888 if (hast_proto_send(res, res->hr_remoteout, nvout, data,
889 length) == -1) {
890 secondary_exit(EX_TEMPFAIL, "Unable to send reply");
891 }
892 nv_free(nvout);
893 pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
894 hio);
895 hio_clear(hio);
896 QUEUE_INSERT(free, hio);
897 }
898 /* NOTREACHED */
899 return (NULL);
900}