1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_net.c,v 12.20 2008/02/27 22:04:15 alanb Exp $ 7 */ 8 9#include <sys/types.h> 10#include <errno.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <string.h> 14 15#include <db.h> 16#include "rep_base.h" 17#ifndef _SYS_QUEUE_H 18/* 19 * Some *BSD Unix variants include the Queue macros in their libraries and 20 * these might already have been included. In that case, it would be bad 21 * to include them again. 22 */ 23#include <dbinc/queue.h> /* !!!: for the LIST_XXX macros. */ 24#endif 25 26int machtab_add __P((machtab_t *, socket_t, u_int32_t, int, int *)); 27#ifdef DIAGNOSTIC 28void machtab_print __P((machtab_t *)); 29#endif 30ssize_t readn __P((socket_t, void *, size_t)); 31 32/* 33 * This file defines the communication infrastructure for the ex_repquote 34 * sample application. 35 * 36 * This application uses TCP/IP for its communication. In an N-site 37 * replication group, this means that there are N * N communication 38 * channels so that every site can communicate with every other site 39 * (this allows elections to be held when the master fails). We do 40 * not require that anyone know about all sites when the application 41 * starts up. In order to communicate, the application should know 42 * about someone, else it has no idea how to ever get in the game. 43 * 44 * Communication is handled via a number of different threads. These 45 * thread functions are implemented in rep_util.c In this file, we 46 * define the data structures that maintain the state that describes 47 * the comm infrastructure, the functions that manipulates this state 48 * and the routines used to actually send and receive data over the 49 * sockets. 50 */ 51 52/* 53 * The communication infrastructure is represented by a machine table, 54 * machtab_t, which is essentially a mutex-protected linked list of members 55 * of the group. The machtab also contains the parameters that are needed 56 * to call for an election. We hardwire values for these parameters in the 57 * init function, but these could be set via some configuration setup in a 58 * real application. We reserve the machine-id 1 to refer to ourselves and 59 * make the machine-id 0 be invalid. 60 */ 61 62#define MACHID_INVALID 0 63#define MACHID_SELF 1 64 65struct __machtab { 66 LIST_HEAD(__machlist, __member) machlist; 67 int nextid; 68 mutex_t mtmutex; 69 u_int32_t timeout_time; 70 int current; 71 int max; 72 int nsites; 73}; 74 75/* Data structure that describes each entry in the machtab. */ 76struct __member { 77 u_int32_t hostaddr; /* Host IP address. */ 78 int port; /* Port number. */ 79 int eid; /* Application-specific machine id. */ 80 socket_t fd; /* File descriptor for the socket. */ 81 LIST_ENTRY(__member) links; 82 /* For linked list of all members we know of. */ 83}; 84 85static int quote_send_broadcast __P((machtab_t *, 86 const DBT *, const DBT *, u_int32_t)); 87static int quote_send_one __P((const DBT *, const DBT *, socket_t, u_int32_t)); 88 89/* 90 * machtab_init -- 91 * Initialize the machine ID table. 92 * XXX Right now we treat the number of sites as the maximum 93 * number we've ever had on the list at one time. We probably 94 * want to make that smarter. 95 */ 96int 97machtab_init(machtabp, nsites) 98 machtab_t **machtabp; 99 int nsites; 100{ 101 int ret; 102 machtab_t *machtab; 103 104 if ((machtab = malloc(sizeof(machtab_t))) == NULL) { 105 fprintf(stderr, "can't allocate memory\n"); 106 return (ENOMEM); 107 } 108 109 LIST_INIT(&machtab->machlist); 110 111 /* Reserve eid's 0 and 1. */ 112 machtab->nextid = 2; 113 machtab->timeout_time = 2 * 1000000; /* 2 seconds. */ 114 machtab->current = machtab->max = 0; 115 machtab->nsites = nsites; 116 117 ret = mutex_init(&machtab->mtmutex, NULL); 118 *machtabp = machtab; 119 120 return (ret); 121} 122 123/* 124 * machtab_add -- 125 * Add a file descriptor to the table of machines, returning 126 * a new machine ID. 127 */ 128int 129machtab_add(machtab, fd, hostaddr, port, idp) 130 machtab_t *machtab; 131 socket_t fd; 132 u_int32_t hostaddr; 133 int port, *idp; 134{ 135 int ret; 136 member_t *m, *member; 137 138 ret = 0; 139 if ((member = malloc(sizeof(member_t))) == NULL) { 140 fprintf(stderr, "can't allocate memory\n"); 141 return (ENOMEM); 142 } 143 144 member->fd = fd; 145 member->hostaddr = hostaddr; 146 member->port = port; 147 148 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { 149 fprintf(stderr, "can't lock mutex"); 150 return (ret); 151 } 152 153 for (m = LIST_FIRST(&machtab->machlist); 154 m != NULL; m = LIST_NEXT(m, links)) 155 if (m->hostaddr == hostaddr && m->port == port) 156 break; 157 158 if (m == NULL) { 159 member->eid = machtab->nextid++; 160 LIST_INSERT_HEAD(&machtab->machlist, member, links); 161 } else 162 member->eid = m->eid; 163 164 if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { 165 fprintf(stderr, "can't unlock mutex\n"); 166 return (ret); 167 } 168 169 if (idp != NULL) 170 *idp = member->eid; 171 172 if (m == NULL) { 173 if (++machtab->current > machtab->max) 174 machtab->max = machtab->current; 175 } else { 176 free(member); 177 ret = EEXIST; 178 } 179#ifdef DIAGNOSTIC 180 printf("Exiting machtab_add\n"); 181 machtab_print(machtab); 182#endif 183 return (ret); 184} 185 186/* 187 * machtab_getinfo -- 188 * Return host and port information for a particular machine id. 189 */ 190int 191machtab_getinfo(machtab, eid, hostp, portp) 192 machtab_t *machtab; 193 int eid; 194 u_int32_t *hostp; 195 int *portp; 196{ 197 int ret; 198 member_t *member; 199 200 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { 201 fprintf(stderr, "can't lock mutex\n"); 202 return (ret); 203 } 204 205 for (member = LIST_FIRST(&machtab->machlist); 206 member != NULL; 207 member = LIST_NEXT(member, links)) 208 if (member->eid == eid) { 209 *hostp = member->hostaddr; 210 *portp = member->port; 211 break; 212 } 213 214 if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { 215 fprintf(stderr, "can't unlock mutex\n"); 216 return (ret); 217 } 218 219 return (member != NULL ? 0 : EINVAL); 220} 221 222/* 223 * machtab_rem -- 224 * Remove a mapping from the table of machines. Lock indicates 225 * whether we need to lock the machtab or not (0 indicates we do not 226 * need to lock; non-zero indicates that we do need to lock). 227 */ 228int 229machtab_rem(machtab, eid, lock) 230 machtab_t *machtab; 231 int eid; 232 int lock; 233{ 234 int found, ret; 235 member_t *member; 236 237 ret = 0; 238 if (lock && (ret = mutex_lock(&machtab->mtmutex)) != 0) { 239 fprintf(stderr, "can't lock mutex\n"); 240 return (ret); 241 } 242 243 for (found = 0, member = LIST_FIRST(&machtab->machlist); 244 member != NULL; 245 member = LIST_NEXT(member, links)) 246 if (member->eid == eid) { 247 found = 1; 248 LIST_REMOVE(member, links); 249 (void)closesocket(member->fd); 250 free(member); 251 machtab->current--; 252 break; 253 } 254 255 if (LIST_FIRST(&machtab->machlist) == NULL) 256 machtab->nextid = 2; 257 258 if (lock && (ret = mutex_unlock(&machtab->mtmutex)) != 0) 259 fprintf(stderr, "can't unlock mutex\n"); 260 261#ifdef DIAGNOSTIC 262 printf("Exiting machtab_rem\n"); 263 machtab_print(machtab); 264#endif 265 return (ret); 266} 267 268void 269machtab_parm(machtab, nump, timeoutp) 270 machtab_t *machtab; 271 int *nump; 272 u_int32_t *timeoutp; 273{ 274 if (machtab->nsites == 0) 275 *nump = machtab->max; 276 else 277 *nump = machtab->nsites; 278 *timeoutp = machtab->timeout_time; 279} 280 281#ifdef DIAGNOSTIC 282void 283machtab_print(machtab) 284 machtab_t *machtab; 285{ 286 member_t *m; 287 288 if (mutex_lock(&machtab->mtmutex) != 0) { 289 fprintf(stderr, "can't lock mutex\n"); 290 abort(); 291 } 292 293 for (m = LIST_FIRST(&machtab->machlist); 294 m != NULL; m = LIST_NEXT(m, links)) { 295 296 printf("IP: %lx Port: %6d EID: %2d FD: %3d\n", 297 (long)m->hostaddr, m->port, m->eid, m->fd); 298 } 299 300 if (mutex_unlock(&machtab->mtmutex) != 0) { 301 fprintf(stderr, "can't unlock mutex\n"); 302 abort(); 303 } 304} 305#endif 306/* 307 * listen_socket_init -- 308 * Initialize a socket for listening on the specified port. Returns 309 * a file descriptor for the socket, ready for an accept() call 310 * in a thread that we're happy to let block. 311 */ 312socket_t 313listen_socket_init(progname, port) 314 const char *progname; 315 int port; 316{ 317 socket_t s; 318 int sockopt; 319 struct sockaddr_in si; 320 321 COMPQUIET(progname, NULL); 322 323 if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { 324 perror("can't create listen socket"); 325 return (-1); 326 } 327 328 memset(&si, 0, sizeof(si)); 329 si.sin_family = AF_INET; 330 si.sin_addr.s_addr = htonl(INADDR_ANY); 331 si.sin_port = htons((unsigned short)port); 332 333 /* 334 * When using this example for testing, it's common to kill and restart 335 * regularly. On some systems, this causes bind to fail with "address 336 * in use" errors unless this option is set. 337 */ 338 sockopt = 1; 339 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, 340 (const char *)&sockopt, sizeof(sockopt)); 341 342 if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0) { 343 perror("can't bind listen socket"); 344 goto err; 345 } 346 347 if (listen(s, 5) != 0) { 348 perror("can't establish listen queue"); 349 goto err; 350 } 351 352 return (s); 353 354err: closesocket(s); 355 return (-1); 356} 357 358/* 359 * listen_socket_accept -- 360 * Accept a connection on a socket. This is essentially just a wrapper 361 * for accept(3). 362 */ 363socket_t 364listen_socket_accept(machtab, progname, s, eidp) 365 machtab_t *machtab; 366 const char *progname; 367 socket_t s; 368 int *eidp; 369{ 370 struct sockaddr_in si; 371 socklen_t si_len; 372 int host, ret; 373 socket_t ns; 374 u_int16_t port; 375 376 COMPQUIET(progname, NULL); 377 378accept_wait: 379 memset(&si, 0, sizeof(si)); 380 si_len = sizeof(si); 381 ns = accept(s, (struct sockaddr *)&si, &si_len); 382 if (ns == SOCKET_CREATION_FAILURE) { 383 fprintf(stderr, "can't accept incoming connection\n"); 384 return ns; 385 } 386 host = ntohl(si.sin_addr.s_addr); 387 388 /* 389 * Sites send their listening port when connections are first 390 * established, as it will be different from the outgoing port 391 * for this connection. 392 */ 393 if (readn(ns, &port, 2) != 2) 394 goto err; 395 port = ntohs(port); 396 397 ret = machtab_add(machtab, ns, host, port, eidp); 398 if (ret == EEXIST) { 399 closesocket(ns); 400 goto accept_wait; 401 } else if (ret != 0) 402 goto err; 403 printf("Connected to host %x port %d, eid = %d\n", host, port, *eidp); 404 return (ns); 405 406err: closesocket(ns); 407 return SOCKET_CREATION_FAILURE; 408} 409 410/* 411 * get_connected_socket -- 412 * Connect to the specified port of the specified remote machine, 413 * and return a file descriptor when we have accepted a connection on it. 414 * Add this connection to the machtab. If we already have a connection 415 * open to this machine, then don't create another one, return the eid 416 * of the connection (in *eidp) and set is_open to 1. Return 0. 417 */ 418socket_t 419get_connected_socket(machtab, progname, remotehost, port, is_open, eidp) 420 machtab_t *machtab; 421 const char *progname, *remotehost; 422 int port, *is_open, *eidp; 423{ 424 int ret; 425 socket_t s; 426 struct hostent *hp; 427 struct sockaddr_in si; 428 u_int32_t addr; 429 u_int16_t nport; 430 431 *is_open = 0; 432 433 if ((hp = gethostbyname(remotehost)) == NULL) { 434 fprintf(stderr, "%s: host not found: %s\n", progname, 435 strerror(net_errno)); 436 return (-1); 437 } 438 439 if ((s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { 440 perror("can't create outgoing socket"); 441 return (-1); 442 } 443 memset(&si, 0, sizeof(si)); 444 memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length); 445 addr = ntohl(si.sin_addr.s_addr); 446 ret = machtab_add(machtab, s, addr, port, eidp); 447 if (ret == EEXIST) { 448 *is_open = 1; 449 closesocket(s); 450 return (0); 451 } else if (ret != 0) { 452 closesocket(s); 453 return (-1); 454 } 455 456 si.sin_family = AF_INET; 457 si.sin_port = htons((unsigned short)port); 458 if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) { 459 fprintf(stderr, "%s: connection failed: %s\n", 460 progname, strerror(net_errno)); 461 (void)machtab_rem(machtab, *eidp, 1); 462 return (-1); 463 } 464 465 /* 466 * The first thing we send on the socket is our (listening) port 467 * so the site we are connecting to can register us correctly in 468 * its machtab. 469 */ 470 nport = htons(myport); 471 writesocket(s, &nport, 2); 472 473 return (s); 474} 475 476/* 477 * get_next_message -- 478 * Read a single message from the specified file descriptor, and 479 * return it in the format used by rep functions (two DBTs and a type). 480 * 481 * This function is called in a loop by both clients and masters, and 482 * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message(). 483 */ 484int 485get_next_message(fd, rec, control) 486 socket_t fd; 487 DBT *rec, *control; 488{ 489 size_t nr; 490 u_int32_t rsize, csize; 491 u_int8_t *recbuf, *controlbuf; 492 493 /* 494 * The protocol we use on the wire is dead simple: 495 * 496 * 4 bytes - rec->size 497 * (# read above) - rec->data 498 * 4 bytes - control->size 499 * (# read above) - control->data 500 */ 501 502 /* Read rec->size. */ 503 nr = readn(fd, &rsize, 4); 504 if (nr != 4) 505 return (1); 506 507 /* Read the record itself. */ 508 if (rsize > 0) { 509 if (rec->size < rsize) 510 rec->data = realloc(rec->data, rsize); 511 recbuf = rec->data; 512 nr = readn(fd, recbuf, rsize); 513 } else { 514 if (rec->data != NULL) 515 free(rec->data); 516 rec->data = NULL; 517 } 518 rec->size = rsize; 519 520 /* Read control->size. */ 521 nr = readn(fd, &csize, 4); 522 if (nr != 4) 523 return (1); 524 525 /* Read the control struct itself. */ 526 if (csize > 0) { 527 controlbuf = control->data; 528 if (control->size < csize) 529 controlbuf = realloc(controlbuf, csize); 530 nr = readn(fd, controlbuf, csize); 531 if (nr != csize) 532 return (1); 533 } else { 534 if (control->data != NULL) 535 free(control->data); 536 controlbuf = NULL; 537 } 538 control->data = controlbuf; 539 control->size = csize; 540 541 return (0); 542} 543 544/* 545 * readn -- 546 * Read a full n characters from a file descriptor, unless we get an error 547 * or EOF. 548 */ 549ssize_t 550readn(fd, vptr, n) 551 socket_t fd; 552 void *vptr; 553 size_t n; 554{ 555 size_t nleft; 556 ssize_t nread; 557 char *ptr; 558 559 ptr = vptr; 560 nleft = n; 561 while (nleft > 0) { 562 if ((nread = readsocket(fd, ptr, nleft)) < 0) { 563 /* 564 * Call read() again on interrupted system call; 565 * on other errors, bail. 566 */ 567 if (net_errno == EINTR) 568 nread = 0; 569 else { 570 perror("can't read from socket"); 571 return (-1); 572 } 573 } else if (nread == 0) 574 break; /* EOF */ 575 576 nleft -= nread; 577 ptr += nread; 578 } 579 580 return (n - nleft); 581} 582 583/* 584 * quote_send -- 585 * The f_send function for DB_ENV->set_rep_transport. 586 */ 587int 588quote_send(dbenv, control, rec, lsnp, eid, flags) 589 DB_ENV *dbenv; 590 const DBT *control, *rec; 591 const DB_LSN *lsnp; 592 int eid; 593 u_int32_t flags; 594{ 595 int n, ret, t_ret; 596 socket_t fd; 597 machtab_t *machtab; 598 member_t *m; 599 600 COMPQUIET(lsnp, NULL); 601 machtab = 602 (machtab_t *)((APP_DATA*)dbenv->app_private)->comm_infrastructure; 603 604 if (eid == DB_EID_BROADCAST) { 605 /* 606 * Right now, we do not require successful transmission. 607 * I'd like to move this requiring at least one successful 608 * transmission on PERMANENT requests. 609 */ 610 n = quote_send_broadcast(machtab, rec, control, flags); 611 if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/) 612 return (DB_REP_UNAVAIL); 613 return (0); 614 } 615 616 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { 617 dbenv->errx(dbenv, "can't lock mutex"); 618 return (ret); 619 } 620 621 fd = 0; 622 for (m = LIST_FIRST(&machtab->machlist); m != NULL; 623 m = LIST_NEXT(m, links)) { 624 if (m->eid == eid) { 625 fd = m->fd; 626 break; 627 } 628 } 629 630 if (fd == 0) { 631 dbenv->err(dbenv, DB_REP_UNAVAIL, 632 "quote_send: cannot find machine ID %d", eid); 633 return (DB_REP_UNAVAIL); 634 } 635 636 if ((ret = quote_send_one(rec, control, fd, flags)) != 0) 637 fprintf(stderr, "socket write error in send() function\n"); 638 639 if ((t_ret = mutex_unlock(&machtab->mtmutex)) != 0) { 640 dbenv->errx(dbenv, "can't unlock mutex"); 641 if (ret == 0) 642 ret = t_ret; 643 } 644 645 return (ret); 646} 647 648/* 649 * quote_send_broadcast -- 650 * Send a message to everybody. 651 * Returns the number of sites to which this message was successfully 652 * communicated. A -1 indicates a fatal error. 653 */ 654static int 655quote_send_broadcast(machtab, rec, control, flags) 656 machtab_t *machtab; 657 const DBT *rec, *control; 658 u_int32_t flags; 659{ 660 int ret, sent; 661 member_t *m, *next; 662 663 if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { 664 fprintf(stderr, "can't lock mutex\n"); 665 return (ret); 666 } 667 668 sent = 0; 669 for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) { 670 next = LIST_NEXT(m, links); 671 if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) { 672 fprintf(stderr, "socket write error in broadcast\n"); 673 (void)machtab_rem(machtab, m->eid, 0); 674 } else 675 sent++; 676 } 677 678 if (mutex_unlock(&machtab->mtmutex) != 0) { 679 fprintf(stderr, "can't unlock mutex\n"); 680 return (-1); 681 } 682 683 return (sent); 684} 685 686/* 687 * quote_send_one -- 688 * Send a message to a single machine, given that machine's file 689 * descriptor. 690 * 691 * !!! 692 * Note that the machtab mutex should be held through this call. 693 * It doubles as a synchronizer to make sure that two threads don't 694 * intersperse writes that are part of two single messages. 695 */ 696static int 697quote_send_one(rec, control, fd, flags) 698 const DBT *rec, *control; 699 socket_t fd; 700 u_int32_t flags; 701 702{ 703 int retry; 704 ssize_t bytes_left, nw; 705 u_int8_t *wp; 706 707 COMPQUIET(flags, 0); 708 709 /* 710 * The protocol is simply: write rec->size, write rec->data, 711 * write control->size, write control->data. 712 */ 713 nw = writesocket(fd, (const char *)&rec->size, 4); 714 if (nw != 4) 715 return (DB_REP_UNAVAIL); 716 717 if (rec->size > 0) { 718 nw = writesocket(fd, rec->data, rec->size); 719 if (nw < 0) 720 return (DB_REP_UNAVAIL); 721 if (nw != (ssize_t)rec->size) { 722 /* Try a couple of times to finish the write. */ 723 wp = (u_int8_t *)rec->data + nw; 724 bytes_left = rec->size - nw; 725 for (retry = 0; bytes_left > 0 && retry < 3; retry++) { 726 nw = writesocket(fd, wp, bytes_left); 727 if (nw < 0) 728 return (DB_REP_UNAVAIL); 729 bytes_left -= nw; 730 wp += nw; 731 } 732 if (bytes_left > 0) 733 return (DB_REP_UNAVAIL); 734 } 735 } 736 737 nw = writesocket(fd, (const char *)&control->size, 4); 738 if (nw != 4) 739 return (DB_REP_UNAVAIL); 740 if (control->size > 0) { 741 nw = writesocket(fd, control->data, control->size); 742 if (nw != (ssize_t)control->size) 743 return (DB_REP_UNAVAIL); 744 } 745 return (0); 746} 747