primary.c (212034) | primary.c (212038) |
---|---|
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * --- 15 unchanged lines hidden (view full) --- 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> | 1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * --- 15 unchanged lines hidden (view full) --- 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> |
32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212034 2010-08-30 22:28:04Z pjd $"); | 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212038 2010-08-30 23:26:10Z pjd $"); |
33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 --- 12 unchanged lines hidden (view full) --- 53#include <sysexits.h> 54#include <unistd.h> 55 56#include <activemap.h> 57#include <nv.h> 58#include <rangelock.h> 59 60#include "control.h" | 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 --- 12 unchanged lines hidden (view full) --- 53#include <sysexits.h> 54#include <unistd.h> 55 56#include <activemap.h> 57#include <nv.h> 58#include <rangelock.h> 59 60#include "control.h" |
61#include "event.h" |
|
61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" --- 152 unchanged lines hidden (view full) --- 221static void *local_send_thread(void *arg); 222static void *remote_send_thread(void *arg); 223static void *remote_recv_thread(void *arg); 224static void *ggate_send_thread(void *arg); 225static void *sync_thread(void *arg); 226static void *guard_thread(void *arg); 227 228static void | 62#include "hast.h" 63#include "hast_proto.h" 64#include "hastd.h" 65#include "hooks.h" 66#include "metadata.h" 67#include "proto.h" 68#include "pjdlog.h" 69#include "subr.h" --- 152 unchanged lines hidden (view full) --- 222static void *local_send_thread(void *arg); 223static void *remote_send_thread(void *arg); 224static void *remote_recv_thread(void *arg); 225static void *ggate_send_thread(void *arg); 226static void *sync_thread(void *arg); 227static void *guard_thread(void *arg); 228 229static void |
229dummy_sighandler(int sig __unused) 230{ 231 /* Nothing to do. */ 232} 233 234static void | |
235cleanup(struct hast_resource *res) 236{ 237 int rerrno; 238 239 /* Remember errno. */ 240 rerrno = errno; 241 242 /* --- 181 unchanged lines hidden (view full) --- 424 hio->hio_ggio.gctl_length = MAXPHYS; 425 hio->hio_ggio.gctl_error = 0; 426 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 427 } 428 429 /* 430 * Turn on signals handling. 431 */ | 230cleanup(struct hast_resource *res) 231{ 232 int rerrno; 233 234 /* Remember errno. */ 235 rerrno = errno; 236 237 /* --- 181 unchanged lines hidden (view full) --- 419 hio->hio_ggio.gctl_length = MAXPHYS; 420 hio->hio_ggio.gctl_error = 0; 421 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 422 } 423 424 /* 425 * Turn on signals handling. 426 */ |
432 /* Because SIGCHLD is ignored by default, setup dummy handler for it. */ 433 PJDLOG_VERIFY(signal(SIGCHLD, dummy_sighandler) != SIG_ERR); | |
434 PJDLOG_VERIFY(sigfillset(&mask) == 0); 435 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 436} 437 438static void 439init_local(struct hast_resource *res) 440{ 441 unsigned char *buf; --- 225 unchanged lines hidden (view full) --- 667 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 668 if (inp != NULL && outp != NULL) { 669 *inp = in; 670 *outp = out; 671 } else { 672 res->hr_remotein = in; 673 res->hr_remoteout = out; 674 } | 427 PJDLOG_VERIFY(sigfillset(&mask) == 0); 428 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 429} 430 431static void 432init_local(struct hast_resource *res) 433{ 434 unsigned char *buf; --- 225 unchanged lines hidden (view full) --- 660 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 661 if (inp != NULL && outp != NULL) { 662 *inp = in; 663 *outp = out; 664 } else { 665 res->hr_remotein = in; 666 res->hr_remoteout = out; 667 } |
675 hook_exec(res->hr_exec, "connect", res->hr_name, NULL); | 668 event_send(res, EVENT_CONNECT); |
676 return (true); 677close: 678 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) | 669 return (true); 670close: 671 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) |
679 hook_exec(res->hr_exec, "split-brain", res->hr_name, NULL); | 672 event_send(res, EVENT_SPLITBRAIN); |
680 proto_close(out); 681 if (in != NULL) 682 proto_close(in); 683 return (false); 684} 685 686static void 687sync_start(void) --- 81 unchanged lines hidden (view full) --- 769 /* 770 * Create communication channel between parent and child. 771 */ 772 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 773 KEEP_ERRNO((void)pidfile_remove(pfh)); 774 pjdlog_exit(EX_OSERR, 775 "Unable to create control sockets between parent and child"); 776 } | 673 proto_close(out); 674 if (in != NULL) 675 proto_close(in); 676 return (false); 677} 678 679static void 680sync_start(void) --- 81 unchanged lines hidden (view full) --- 762 /* 763 * Create communication channel between parent and child. 764 */ 765 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 766 KEEP_ERRNO((void)pidfile_remove(pfh)); 767 pjdlog_exit(EX_OSERR, 768 "Unable to create control sockets between parent and child"); 769 } |
770 /* 771 * Create communication channel between child and parent. 772 */ 773 if (proto_client("socketpair://", &res->hr_event) < 0) { 774 KEEP_ERRNO((void)pidfile_remove(pfh)); 775 pjdlog_exit(EX_OSERR, 776 "Unable to create event sockets between child and parent"); 777 } |
|
777 778 pid = fork(); 779 if (pid < 0) { 780 KEEP_ERRNO((void)pidfile_remove(pfh)); 781 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 782 } 783 784 if (pid > 0) { 785 /* This is parent. */ | 778 779 pid = fork(); 780 if (pid < 0) { 781 KEEP_ERRNO((void)pidfile_remove(pfh)); 782 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 783 } 784 785 if (pid > 0) { 786 /* This is parent. */ |
787 /* Declare that we are receiver. */ 788 proto_recv(res->hr_event, NULL, 0); |
|
786 res->hr_workerpid = pid; 787 return; 788 } 789 790 gres = res; 791 792 (void)pidfile_close(pfh); 793 hook_fini(); 794 795 setproctitle("%s (primary)", res->hr_name); 796 797 signal(SIGHUP, SIG_DFL); 798 signal(SIGCHLD, SIG_DFL); 799 | 789 res->hr_workerpid = pid; 790 return; 791 } 792 793 gres = res; 794 795 (void)pidfile_close(pfh); 796 hook_fini(); 797 798 setproctitle("%s (primary)", res->hr_name); 799 800 signal(SIGHUP, SIG_DFL); 801 signal(SIGCHLD, SIG_DFL); 802 |
800 hook_init(); | 803 /* Declare that we are sender. */ 804 proto_send(res->hr_event, NULL, 0); 805 |
801 init_local(res); 802 if (real_remote(res) && init_remote(res, NULL, NULL)) 803 sync_start(); 804 init_ggate(res); 805 init_environment(res); 806 error = pthread_create(&td, NULL, ggate_recv_thread, res); 807 assert(error == 0); 808 error = pthread_create(&td, NULL, local_send_thread, res); --- 82 unchanged lines hidden (view full) --- 891 892 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 893 894 /* 895 * Stop synchronization if in-progress. 896 */ 897 sync_stop(); 898 | 806 init_local(res); 807 if (real_remote(res) && init_remote(res, NULL, NULL)) 808 sync_start(); 809 init_ggate(res); 810 init_environment(res); 811 error = pthread_create(&td, NULL, ggate_recv_thread, res); 812 assert(error == 0); 813 error = pthread_create(&td, NULL, local_send_thread, res); --- 82 unchanged lines hidden (view full) --- 896 897 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 898 899 /* 900 * Stop synchronization if in-progress. 901 */ 902 sync_stop(); 903 |
899 hook_exec(res->hr_exec, "disconnect", res->hr_name, NULL); | 904 event_send(res, EVENT_DISCONNECT); |
900} 901 902/* 903 * Thread receives ggate I/O requests from the kernel and passes them to 904 * appropriate threads: 905 * WRITE - always goes to both local_send and remote_send threads 906 * READ (when the block is up-to-date on local component) - 907 * only local_send thread --- 599 unchanged lines hidden (view full) --- 1507 offset = -1; 1508 1509 for (;;) { 1510 mtx_lock(&sync_lock); 1511 if (offset >= 0 && !sync_inprogress) { 1512 pjdlog_info("Synchronization interrupted. " 1513 "%jd bytes synchronized so far.", 1514 (intmax_t)synced); | 905} 906 907/* 908 * Thread receives ggate I/O requests from the kernel and passes them to 909 * appropriate threads: 910 * WRITE - always goes to both local_send and remote_send threads 911 * READ (when the block is up-to-date on local component) - 912 * only local_send thread --- 599 unchanged lines hidden (view full) --- 1512 offset = -1; 1513 1514 for (;;) { 1515 mtx_lock(&sync_lock); 1516 if (offset >= 0 && !sync_inprogress) { 1517 pjdlog_info("Synchronization interrupted. " 1518 "%jd bytes synchronized so far.", 1519 (intmax_t)synced); |
1515 hook_exec(res->hr_exec, "syncintr", res->hr_name, NULL); | 1520 event_send(res, EVENT_SYNCINTR); |
1516 } 1517 while (!sync_inprogress) { 1518 dorewind = true; 1519 synced = 0; 1520 cv_wait(&sync_cond, &sync_lock); 1521 } 1522 mtx_unlock(&sync_lock); 1523 /* --- 16 unchanged lines hidden (view full) --- 1540 if (dorewind) { 1541 dorewind = false; 1542 if (offset < 0) 1543 pjdlog_info("Nodes are in sync."); 1544 else { 1545 pjdlog_info("Synchronization started. %ju bytes to go.", 1546 (uintmax_t)(res->hr_extentsize * 1547 activemap_ndirty(res->hr_amp))); | 1521 } 1522 while (!sync_inprogress) { 1523 dorewind = true; 1524 synced = 0; 1525 cv_wait(&sync_cond, &sync_lock); 1526 } 1527 mtx_unlock(&sync_lock); 1528 /* --- 16 unchanged lines hidden (view full) --- 1545 if (dorewind) { 1546 dorewind = false; 1547 if (offset < 0) 1548 pjdlog_info("Nodes are in sync."); 1549 else { 1550 pjdlog_info("Synchronization started. %ju bytes to go.", 1551 (uintmax_t)(res->hr_extentsize * 1552 activemap_ndirty(res->hr_amp))); |
1548 hook_exec(res->hr_exec, "syncstart", 1549 res->hr_name, NULL); | 1553 event_send(res, EVENT_SYNCSTART); |
1550 } 1551 } 1552 if (offset < 0) { 1553 sync_stop(); 1554 pjdlog_debug(1, "Nothing to synchronize."); 1555 /* 1556 * Synchronization complete, make both localcnt and 1557 * remotecnt equal. 1558 */ 1559 ncomp = 1; 1560 rw_rlock(&hio_remote_lock[ncomp]); 1561 if (ISCONNECTED(res, ncomp)) { 1562 if (synced > 0) { 1563 pjdlog_info("Synchronization complete. " 1564 "%jd bytes synchronized.", 1565 (intmax_t)synced); | 1554 } 1555 } 1556 if (offset < 0) { 1557 sync_stop(); 1558 pjdlog_debug(1, "Nothing to synchronize."); 1559 /* 1560 * Synchronization complete, make both localcnt and 1561 * remotecnt equal. 1562 */ 1563 ncomp = 1; 1564 rw_rlock(&hio_remote_lock[ncomp]); 1565 if (ISCONNECTED(res, ncomp)) { 1566 if (synced > 0) { 1567 pjdlog_info("Synchronization complete. " 1568 "%jd bytes synchronized.", 1569 (intmax_t)synced); |
1566 hook_exec(res->hr_exec, "syncdone", 1567 res->hr_name, NULL); | 1570 event_send(res, EVENT_SYNCDONE); |
1568 } 1569 mtx_lock(&metadata_lock); 1570 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1571 res->hr_primary_localcnt = 1572 res->hr_secondary_localcnt; 1573 res->hr_primary_remotecnt = 1574 res->hr_secondary_remotecnt; 1575 pjdlog_debug(1, --- 369 unchanged lines hidden (view full) --- 1945 1946 ncomps = HAST_NCOMPONENTS; 1947 lastcheck = time(NULL); 1948 1949 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1950 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1951 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1952 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); | 1571 } 1572 mtx_lock(&metadata_lock); 1573 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1574 res->hr_primary_localcnt = 1575 res->hr_secondary_localcnt; 1576 res->hr_primary_remotecnt = 1577 res->hr_secondary_remotecnt; 1578 pjdlog_debug(1, --- 369 unchanged lines hidden (view full) --- 1948 1949 ncomps = HAST_NCOMPONENTS; 1950 lastcheck = time(NULL); 1951 1952 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1953 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1954 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1955 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); |
1953 PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0); | |
1954 1955 timeout.tv_nsec = 0; 1956 signo = -1; 1957 1958 for (;;) { 1959 switch (signo) { 1960 case SIGHUP: 1961 config_reload(); 1962 break; 1963 case SIGINT: 1964 case SIGTERM: 1965 sigexit_received = true; 1966 primary_exitx(EX_OK, 1967 "Termination signal received, exiting."); 1968 break; 1969 default: 1970 break; 1971 } | 1956 1957 timeout.tv_nsec = 0; 1958 signo = -1; 1959 1960 for (;;) { 1961 switch (signo) { 1962 case SIGHUP: 1963 config_reload(); 1964 break; 1965 case SIGINT: 1966 case SIGTERM: 1967 sigexit_received = true; 1968 primary_exitx(EX_OK, 1969 "Termination signal received, exiting."); 1970 break; 1971 default: 1972 break; 1973 } |
1972 hook_check(signo == SIGCHLD); | |
1973 1974 pjdlog_debug(2, "remote_guard: Checking connections."); 1975 now = time(NULL); 1976 if (lastcheck + RETRY_SLEEP <= now) { 1977 for (ii = 0; ii < ncomps; ii++) 1978 guard_one(res, ii); 1979 lastcheck = now; 1980 } 1981 timeout.tv_sec = RETRY_SLEEP; 1982 signo = sigtimedwait(&mask, NULL, &timeout); 1983 } 1984 /* NOTREACHED */ 1985 return (NULL); 1986} | 1974 1975 pjdlog_debug(2, "remote_guard: Checking connections."); 1976 now = time(NULL); 1977 if (lastcheck + RETRY_SLEEP <= now) { 1978 for (ii = 0; ii < ncomps; ii++) 1979 guard_one(res, ii); 1980 lastcheck = now; 1981 } 1982 timeout.tv_sec = RETRY_SLEEP; 1983 signo = sigtimedwait(&mask, NULL, &timeout); 1984 } 1985 /* NOTREACHED */ 1986 return (NULL); 1987} |