Deleted Added
full compact
primary.c (212034) primary.c (212038)
1/*-
2 * Copyright (c) 2009 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4 * All rights reserved.
5 *
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
8 *

--- 15 unchanged lines hidden (view full) ---

24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31#include <sys/cdefs.h>
1/*-
2 * Copyright (c) 2009 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4 * All rights reserved.
5 *
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
8 *

--- 15 unchanged lines hidden (view full) ---

24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31#include <sys/cdefs.h>
32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212034 2010-08-30 22:28:04Z pjd $");
32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212038 2010-08-30 23:26:10Z pjd $");
33
34#include <sys/types.h>
35#include <sys/time.h>
36#include <sys/bio.h>
37#include <sys/disk.h>
38#include <sys/refcount.h>
39#include <sys/stat.h>
40

--- 12 unchanged lines hidden (view full) ---

53#include <sysexits.h>
54#include <unistd.h>
55
56#include <activemap.h>
57#include <nv.h>
58#include <rangelock.h>
59
60#include "control.h"
33
34#include <sys/types.h>
35#include <sys/time.h>
36#include <sys/bio.h>
37#include <sys/disk.h>
38#include <sys/refcount.h>
39#include <sys/stat.h>
40

--- 12 unchanged lines hidden (view full) ---

53#include <sysexits.h>
54#include <unistd.h>
55
56#include <activemap.h>
57#include <nv.h>
58#include <rangelock.h>
59
60#include "control.h"
61#include "event.h"
61#include "hast.h"
62#include "hast_proto.h"
63#include "hastd.h"
64#include "hooks.h"
65#include "metadata.h"
66#include "proto.h"
67#include "pjdlog.h"
68#include "subr.h"

--- 152 unchanged lines hidden (view full) ---

221static void *local_send_thread(void *arg);
222static void *remote_send_thread(void *arg);
223static void *remote_recv_thread(void *arg);
224static void *ggate_send_thread(void *arg);
225static void *sync_thread(void *arg);
226static void *guard_thread(void *arg);
227
228static void
62#include "hast.h"
63#include "hast_proto.h"
64#include "hastd.h"
65#include "hooks.h"
66#include "metadata.h"
67#include "proto.h"
68#include "pjdlog.h"
69#include "subr.h"

--- 152 unchanged lines hidden (view full) ---

222static void *local_send_thread(void *arg);
223static void *remote_send_thread(void *arg);
224static void *remote_recv_thread(void *arg);
225static void *ggate_send_thread(void *arg);
226static void *sync_thread(void *arg);
227static void *guard_thread(void *arg);
228
229static void
229dummy_sighandler(int sig __unused)
230{
231 /* Nothing to do. */
232}
233
234static void
235cleanup(struct hast_resource *res)
236{
237 int rerrno;
238
239 /* Remember errno. */
240 rerrno = errno;
241
242 /*

--- 181 unchanged lines hidden (view full) ---

424 hio->hio_ggio.gctl_length = MAXPHYS;
425 hio->hio_ggio.gctl_error = 0;
426 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
427 }
428
429 /*
430 * Turn on signals handling.
431 */
230cleanup(struct hast_resource *res)
231{
232 int rerrno;
233
234 /* Remember errno. */
235 rerrno = errno;
236
237 /*

--- 181 unchanged lines hidden (view full) ---

419 hio->hio_ggio.gctl_length = MAXPHYS;
420 hio->hio_ggio.gctl_error = 0;
421 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
422 }
423
424 /*
425 * Turn on signals handling.
426 */
432 /* Because SIGCHLD is ignored by default, setup dummy handler for it. */
433 PJDLOG_VERIFY(signal(SIGCHLD, dummy_sighandler) != SIG_ERR);
434 PJDLOG_VERIFY(sigfillset(&mask) == 0);
435 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
436}
437
438static void
439init_local(struct hast_resource *res)
440{
441 unsigned char *buf;

--- 225 unchanged lines hidden (view full) ---

667 pjdlog_info("Connected to %s.", res->hr_remoteaddr);
668 if (inp != NULL && outp != NULL) {
669 *inp = in;
670 *outp = out;
671 } else {
672 res->hr_remotein = in;
673 res->hr_remoteout = out;
674 }
427 PJDLOG_VERIFY(sigfillset(&mask) == 0);
428 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
429}
430
431static void
432init_local(struct hast_resource *res)
433{
434 unsigned char *buf;

--- 225 unchanged lines hidden (view full) ---

660 pjdlog_info("Connected to %s.", res->hr_remoteaddr);
661 if (inp != NULL && outp != NULL) {
662 *inp = in;
663 *outp = out;
664 } else {
665 res->hr_remotein = in;
666 res->hr_remoteout = out;
667 }
675 hook_exec(res->hr_exec, "connect", res->hr_name, NULL);
668 event_send(res, EVENT_CONNECT);
676 return (true);
677close:
678 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
669 return (true);
670close:
671 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
679 hook_exec(res->hr_exec, "split-brain", res->hr_name, NULL);
672 event_send(res, EVENT_SPLITBRAIN);
680 proto_close(out);
681 if (in != NULL)
682 proto_close(in);
683 return (false);
684}
685
686static void
687sync_start(void)

--- 81 unchanged lines hidden (view full) ---

769 /*
770 * Create communication channel between parent and child.
771 */
772 if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
773 KEEP_ERRNO((void)pidfile_remove(pfh));
774 pjdlog_exit(EX_OSERR,
775 "Unable to create control sockets between parent and child");
776 }
673 proto_close(out);
674 if (in != NULL)
675 proto_close(in);
676 return (false);
677}
678
679static void
680sync_start(void)

--- 81 unchanged lines hidden (view full) ---

762 /*
763 * Create communication channel between parent and child.
764 */
765 if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
766 KEEP_ERRNO((void)pidfile_remove(pfh));
767 pjdlog_exit(EX_OSERR,
768 "Unable to create control sockets between parent and child");
769 }
770 /*
771 * Create communication channel between child and parent.
772 */
773 if (proto_client("socketpair://", &res->hr_event) < 0) {
774 KEEP_ERRNO((void)pidfile_remove(pfh));
775 pjdlog_exit(EX_OSERR,
776 "Unable to create event sockets between child and parent");
777 }
777
778 pid = fork();
779 if (pid < 0) {
780 KEEP_ERRNO((void)pidfile_remove(pfh));
781 pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
782 }
783
784 if (pid > 0) {
785 /* This is parent. */
778
779 pid = fork();
780 if (pid < 0) {
781 KEEP_ERRNO((void)pidfile_remove(pfh));
782 pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
783 }
784
785 if (pid > 0) {
786 /* This is parent. */
787 /* Declare that we are receiver. */
788 proto_recv(res->hr_event, NULL, 0);
786 res->hr_workerpid = pid;
787 return;
788 }
789
790 gres = res;
791
792 (void)pidfile_close(pfh);
793 hook_fini();
794
795 setproctitle("%s (primary)", res->hr_name);
796
797 signal(SIGHUP, SIG_DFL);
798 signal(SIGCHLD, SIG_DFL);
799
789 res->hr_workerpid = pid;
790 return;
791 }
792
793 gres = res;
794
795 (void)pidfile_close(pfh);
796 hook_fini();
797
798 setproctitle("%s (primary)", res->hr_name);
799
800 signal(SIGHUP, SIG_DFL);
801 signal(SIGCHLD, SIG_DFL);
802
800 hook_init();
803 /* Declare that we are sender. */
804 proto_send(res->hr_event, NULL, 0);
805
801 init_local(res);
802 if (real_remote(res) && init_remote(res, NULL, NULL))
803 sync_start();
804 init_ggate(res);
805 init_environment(res);
806 error = pthread_create(&td, NULL, ggate_recv_thread, res);
807 assert(error == 0);
808 error = pthread_create(&td, NULL, local_send_thread, res);

--- 82 unchanged lines hidden (view full) ---

891
892 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
893
894 /*
895 * Stop synchronization if in-progress.
896 */
897 sync_stop();
898
806 init_local(res);
807 if (real_remote(res) && init_remote(res, NULL, NULL))
808 sync_start();
809 init_ggate(res);
810 init_environment(res);
811 error = pthread_create(&td, NULL, ggate_recv_thread, res);
812 assert(error == 0);
813 error = pthread_create(&td, NULL, local_send_thread, res);

--- 82 unchanged lines hidden (view full) ---

896
897 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
898
899 /*
900 * Stop synchronization if in-progress.
901 */
902 sync_stop();
903
899 hook_exec(res->hr_exec, "disconnect", res->hr_name, NULL);
904 event_send(res, EVENT_DISCONNECT);
900}
901
902/*
903 * Thread receives ggate I/O requests from the kernel and passes them to
904 * appropriate threads:
905 * WRITE - always goes to both local_send and remote_send threads
906 * READ (when the block is up-to-date on local component) -
907 * only local_send thread

--- 599 unchanged lines hidden (view full) ---

1507 offset = -1;
1508
1509 for (;;) {
1510 mtx_lock(&sync_lock);
1511 if (offset >= 0 && !sync_inprogress) {
1512 pjdlog_info("Synchronization interrupted. "
1513 "%jd bytes synchronized so far.",
1514 (intmax_t)synced);
905}
906
907/*
908 * Thread receives ggate I/O requests from the kernel and passes them to
909 * appropriate threads:
910 * WRITE - always goes to both local_send and remote_send threads
911 * READ (when the block is up-to-date on local component) -
912 * only local_send thread

--- 599 unchanged lines hidden (view full) ---

1512 offset = -1;
1513
1514 for (;;) {
1515 mtx_lock(&sync_lock);
1516 if (offset >= 0 && !sync_inprogress) {
1517 pjdlog_info("Synchronization interrupted. "
1518 "%jd bytes synchronized so far.",
1519 (intmax_t)synced);
1515 hook_exec(res->hr_exec, "syncintr", res->hr_name, NULL);
1520 event_send(res, EVENT_SYNCINTR);
1516 }
1517 while (!sync_inprogress) {
1518 dorewind = true;
1519 synced = 0;
1520 cv_wait(&sync_cond, &sync_lock);
1521 }
1522 mtx_unlock(&sync_lock);
1523 /*

--- 16 unchanged lines hidden (view full) ---

1540 if (dorewind) {
1541 dorewind = false;
1542 if (offset < 0)
1543 pjdlog_info("Nodes are in sync.");
1544 else {
1545 pjdlog_info("Synchronization started. %ju bytes to go.",
1546 (uintmax_t)(res->hr_extentsize *
1547 activemap_ndirty(res->hr_amp)));
1521 }
1522 while (!sync_inprogress) {
1523 dorewind = true;
1524 synced = 0;
1525 cv_wait(&sync_cond, &sync_lock);
1526 }
1527 mtx_unlock(&sync_lock);
1528 /*

--- 16 unchanged lines hidden (view full) ---

1545 if (dorewind) {
1546 dorewind = false;
1547 if (offset < 0)
1548 pjdlog_info("Nodes are in sync.");
1549 else {
1550 pjdlog_info("Synchronization started. %ju bytes to go.",
1551 (uintmax_t)(res->hr_extentsize *
1552 activemap_ndirty(res->hr_amp)));
1548 hook_exec(res->hr_exec, "syncstart",
1549 res->hr_name, NULL);
1553 event_send(res, EVENT_SYNCSTART);
1550 }
1551 }
1552 if (offset < 0) {
1553 sync_stop();
1554 pjdlog_debug(1, "Nothing to synchronize.");
1555 /*
1556 * Synchronization complete, make both localcnt and
1557 * remotecnt equal.
1558 */
1559 ncomp = 1;
1560 rw_rlock(&hio_remote_lock[ncomp]);
1561 if (ISCONNECTED(res, ncomp)) {
1562 if (synced > 0) {
1563 pjdlog_info("Synchronization complete. "
1564 "%jd bytes synchronized.",
1565 (intmax_t)synced);
1554 }
1555 }
1556 if (offset < 0) {
1557 sync_stop();
1558 pjdlog_debug(1, "Nothing to synchronize.");
1559 /*
1560 * Synchronization complete, make both localcnt and
1561 * remotecnt equal.
1562 */
1563 ncomp = 1;
1564 rw_rlock(&hio_remote_lock[ncomp]);
1565 if (ISCONNECTED(res, ncomp)) {
1566 if (synced > 0) {
1567 pjdlog_info("Synchronization complete. "
1568 "%jd bytes synchronized.",
1569 (intmax_t)synced);
1566 hook_exec(res->hr_exec, "syncdone",
1567 res->hr_name, NULL);
1570 event_send(res, EVENT_SYNCDONE);
1568 }
1569 mtx_lock(&metadata_lock);
1570 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1571 res->hr_primary_localcnt =
1572 res->hr_secondary_localcnt;
1573 res->hr_primary_remotecnt =
1574 res->hr_secondary_remotecnt;
1575 pjdlog_debug(1,

--- 369 unchanged lines hidden (view full) ---

1945
1946 ncomps = HAST_NCOMPONENTS;
1947 lastcheck = time(NULL);
1948
1949 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
1950 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0);
1951 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
1952 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
1571 }
1572 mtx_lock(&metadata_lock);
1573 res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1574 res->hr_primary_localcnt =
1575 res->hr_secondary_localcnt;
1576 res->hr_primary_remotecnt =
1577 res->hr_secondary_remotecnt;
1578 pjdlog_debug(1,

--- 369 unchanged lines hidden (view full) ---

1948
1949 ncomps = HAST_NCOMPONENTS;
1950 lastcheck = time(NULL);
1951
1952 PJDLOG_VERIFY(sigemptyset(&mask) == 0);
1953 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0);
1954 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
1955 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
1953 PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0);
1954
1955 timeout.tv_nsec = 0;
1956 signo = -1;
1957
1958 for (;;) {
1959 switch (signo) {
1960 case SIGHUP:
1961 config_reload();
1962 break;
1963 case SIGINT:
1964 case SIGTERM:
1965 sigexit_received = true;
1966 primary_exitx(EX_OK,
1967 "Termination signal received, exiting.");
1968 break;
1969 default:
1970 break;
1971 }
1956
1957 timeout.tv_nsec = 0;
1958 signo = -1;
1959
1960 for (;;) {
1961 switch (signo) {
1962 case SIGHUP:
1963 config_reload();
1964 break;
1965 case SIGINT:
1966 case SIGTERM:
1967 sigexit_received = true;
1968 primary_exitx(EX_OK,
1969 "Termination signal received, exiting.");
1970 break;
1971 default:
1972 break;
1973 }
1972 hook_check(signo == SIGCHLD);
1973
1974 pjdlog_debug(2, "remote_guard: Checking connections.");
1975 now = time(NULL);
1976 if (lastcheck + RETRY_SLEEP <= now) {
1977 for (ii = 0; ii < ncomps; ii++)
1978 guard_one(res, ii);
1979 lastcheck = now;
1980 }
1981 timeout.tv_sec = RETRY_SLEEP;
1982 signo = sigtimedwait(&mask, NULL, &timeout);
1983 }
1984 /* NOTREACHED */
1985 return (NULL);
1986}
1974
1975 pjdlog_debug(2, "remote_guard: Checking connections.");
1976 now = time(NULL);
1977 if (lastcheck + RETRY_SLEEP <= now) {
1978 for (ii = 0; ii < ncomps; ii++)
1979 guard_one(res, ii);
1980 lastcheck = now;
1981 }
1982 timeout.tv_sec = RETRY_SLEEP;
1983 signo = sigtimedwait(&mask, NULL, &timeout);
1984 }
1985 /* NOTREACHED */
1986 return (NULL);
1987}