1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr.h,v 12.19 2008/01/08 20:58:18 bostic Exp $ 7 */ 8 9#ifndef _DB_REPMGR_H_ 10#define _DB_REPMGR_H_ 11 12#include "dbinc_auto/repmgr_auto.h" 13 14#if defined(__cplusplus) 15extern "C" { 16#endif 17 18/* 19 * Replication Framework message types. These values are transmitted to 20 * identify messages sent between sites, even sites running differing versions 21 * of software. Therefore, once assigned, the values are permanently "frozen". 22 * New message types added in later versions always get new (higher) values. 23 * 24 * For example, in repmgr wire protocol version 1 the highest assigned message 25 * type value was 3, for REPMGR_REP_MESSAGE. Wire protocol version 2 added the 26 * HEARTBEAT message type (4). 27 * 28 * We still list them in alphabetical order, for ease of reference. But this 29 * generally does not correspond to numerical order. 30 */ 31#define REPMGR_ACK 1 /* Acknowledgement. */ 32#define REPMGR_HANDSHAKE 2 /* Connection establishment sequence. */ 33#define REPMGR_HEARTBEAT 4 /* Monitor connection health. */ 34#define REPMGR_REP_MESSAGE 3 /* Normal replication message. */ 35 36/* Heartbeats were introduced in version 2. */ 37#define REPMGR_MAX_V1_MSG_TYPE 3 38#define REPMGR_MAX_V2_MSG_TYPE 4 39#define HEARTBEAT_MIN_VERSION 2 40 41/* The range of protocol versions we're willing to support. */ 42#define DB_REPMGR_VERSION 2 43#define DB_REPMGR_MIN_VERSION 1 44 45#ifdef DB_WIN32 46typedef SOCKET socket_t; 47typedef HANDLE thread_id_t; 48typedef HANDLE mgr_mutex_t; 49typedef HANDLE cond_var_t; 50typedef WSABUF db_iovec_t; 51#else 52typedef int socket_t; 53typedef pthread_t thread_id_t; 54typedef pthread_mutex_t mgr_mutex_t; 55typedef pthread_cond_t cond_var_t; 56typedef struct iovec db_iovec_t; 57#endif 58 59/* 60 * The (arbitrary) maximum number of outgoing messages we're willing to hold, on 61 * a queue per connection, waiting for TCP buffer space to become available in 62 * the kernel. Rather than exceeding this limit, we simply discard additional 63 * messages (since this is always allowed by the replication protocol). 64 * As a special dispensation, if a message is destined for a specific remote 65 * site (i.e., it's not a broadcast), then we first try blocking the sending 66 * thread, waiting for space to become available (though we only wait a limited 67 * time). This is so as to be able to handle the immediate flood of (a 68 * potentially large number of) outgoing messages that replication generates, in 69 * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests. 70 */ 71#define OUT_QUEUE_LIMIT 10 72 73/* 74 * The system value is available from sysconf(_SC_HOST_NAME_MAX). 75 * Historically, the maximum host name was 256. 76 */ 77#ifndef MAXHOSTNAMELEN 78#define MAXHOSTNAMELEN 256 79#endif 80 81/* A buffer big enough for the string "site host.domain.com:65535". */ 82#define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20) 83typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1]; 84 85/* Default timeout values, in seconds. */ 86#define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC) 87#define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC) 88#define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC) 89 90struct __repmgr_connection; 91 typedef struct __repmgr_connection REPMGR_CONNECTION; 92struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE; 93struct __queued_output; typedef struct __queued_output QUEUED_OUTPUT; 94struct __repmgr_retry; typedef struct __repmgr_retry REPMGR_RETRY; 95struct __repmgr_runnable; typedef struct __repmgr_runnable REPMGR_RUNNABLE; 96struct __repmgr_site; typedef struct __repmgr_site REPMGR_SITE; 97struct __ack_waiters_table; 98 typedef struct __ack_waiters_table ACK_WAITERS_TABLE; 99 100typedef TAILQ_HEAD(__repmgr_conn_list, __repmgr_connection) CONNECTION_LIST; 101typedef STAILQ_HEAD(__repmgr_out_q_head, __queued_output) OUT_Q_HEADER; 102typedef TAILQ_HEAD(__repmgr_retry_q, __repmgr_retry) RETRY_Q_HEADER; 103 104/* Information about threads managed by Replication Framework. */ 105struct __repmgr_runnable { 106 ENV *env; 107 thread_id_t thread_id; 108 void *(*run) __P((void *)); 109 int finished; 110}; 111 112/* 113 * Information about pending connection establishment retry operations. 114 * 115 * We keep these in order by time. This works, under the assumption that the 116 * DB_REP_CONNECTION_RETRY never changes once we get going (though that 117 * assumption is of course wrong, so this needs to be fixed). 118 * 119 * Usually, we put things onto the tail end of the list. But when we add a new 120 * site while threads are running, we trigger its first connection attempt by 121 * scheduling a retry for "0" microseconds from now, putting its retry element 122 * at the head of the list instead. 123 * 124 * TODO: I think this can be fixed by defining "time" to be the time the element 125 * was added (with some convention like "0" meaning immediate), rather than the 126 * deadline time. 127 */ 128struct __repmgr_retry { 129 TAILQ_ENTRY(__repmgr_retry) entries; 130 u_int eid; 131 db_timespec time; 132}; 133 134/* 135 * We use scatter/gather I/O for both reading and writing. The largest number 136 * of buffers we ever try to use at once is 5, corresponding to the 5 segments 137 * of a message described in the "wire protocol" (repmgr_net.c). 138 */ 139typedef struct { 140 db_iovec_t vectors[5]; 141 142 /* 143 * Index of the first iovec to be used. Initially of course this is 144 * zero. But as we progress through partial I/O transfers, it ends up 145 * pointing to the first iovec to be used on the next operation. 146 */ 147 int offset; 148 149 /* 150 * Total number of pieces defined for this message; equal to the number 151 * of times add_buffer and/or add_dbt were called to populate it. We do 152 * *NOT* revise this as we go along. So subsequent I/O operations must 153 * use count-offset to get the number of active vector pieces still 154 * remaining. 155 */ 156 int count; 157 158 /* 159 * Total number of bytes accounted for in all the pieces of this 160 * message. We do *NOT* revise this as we go along (though it's not 161 * clear we shouldn't). 162 */ 163 size_t total_bytes; 164} REPMGR_IOVECS; 165 166typedef struct { 167 size_t length; /* number of bytes in data */ 168 int ref_count; /* # of sites' send queues pointing to us */ 169 u_int8_t data[1]; /* variable size data area */ 170} REPMGR_FLAT; 171 172struct __queued_output { 173 STAILQ_ENTRY(__queued_output) entries; 174 REPMGR_FLAT *msg; 175 size_t offset; 176}; 177 178/* 179 * The following is for input. Once we know the sizes of the pieces of an 180 * incoming message, we can create this struct (and also the data areas for the 181 * pieces themselves, in the same memory allocation). This is also the struct 182 * in which the message lives while it's waiting to be processed by message 183 * threads. 184 */ 185typedef struct __repmgr_message { 186 STAILQ_ENTRY(__repmgr_message) entries; 187 int originating_eid; 188 DBT control, rec; 189} REPMGR_MESSAGE; 190 191typedef enum { 192 SIZES_PHASE, 193 DATA_PHASE 194} phase_t; 195 196/* 197 * If another site initiates a connection to us, when we receive it the 198 * connection state is immediately "connected". But when we initiate the 199 * connection to another site, it first has to go through a "connecting" state, 200 * until the non-blocking connect() I/O operation completes successfully. 201 * With an outgoing connection, we always know the associated site (and so 202 * we have a valid eid). But with an incoming connection, we don't know the 203 * site until we get a handshake message, so until that time the eid is 204 * invalid. 205 */ 206struct __repmgr_connection { 207 TAILQ_ENTRY(__repmgr_connection) entries; 208 209 int eid; /* index into sites array in machtab */ 210 socket_t fd; 211#ifdef DB_WIN32 212 WSAEVENT event_object; 213#endif 214 215 u_int32_t version; /* Wire protocol version on this connection. */ 216 /* (0 means not yet determined.) */ 217 218#define CONN_INCOMING 0x01 /* We received this via accept(). */ 219 u_int32_t flags; 220 221/* 222 * When we initiate an outgoing connection, it starts off in CONNECTING state 223 * (or possibly CONNECTED). When the (non-blocking) connection operation later 224 * completes, we move to CONNECTED state. When we get the response to our 225 * version negotiation, we move to READY. 226 * For incoming connections that we accept, we start in NEGOTIATE, then to 227 * PARAMETERS, and then to READY. 228 * CONGESTED is a hierarchical substate of READY: it's just like READY, with 229 * the additional wrinkle that we don't bother waiting for the outgoing queue to 230 * drain in certain circumstances. 231 */ 232#define CONN_CONGESTED 1 /* Long-lived full outgoing queue. */ 233#define CONN_CONNECTED 2 /* Awaiting reply to our version negotiation. */ 234#define CONN_CONNECTING 3 /* Awaiting completion of non-block connect. */ 235#define CONN_DEFUNCT 4 /* Basically dead, awaiting clean-up. */ 236#define CONN_NEGOTIATE 5 /* Awaiting version proposal. */ 237#define CONN_PARAMETERS 6 /* Awaiting parameters handshake. */ 238#define CONN_READY 7 /* Everything's fine. */ 239 int state; 240 241 /* 242 * Output: usually we just simply write messages right in line, in the 243 * send() function's thread. But if TCP doesn't have enough network 244 * buffer space for us when we first try it, we instead allocate some 245 * memory, and copy the message, and then send it as space becomes 246 * available in our main select() thread. In some cases, if the queue 247 * gets too long we wait until it's drained, and then append to it. 248 * This condition variable's associated mutex is the normal per-repmgr 249 * db_rep->mutex, because that mutex is always held anyway whenever the 250 * output queue is consulted. 251 */ 252 OUT_Q_HEADER outbound_queue; 253 int out_queue_length; 254 cond_var_t drained; 255 int blockers; /* ref count of msg threads waiting on us */ 256 257 /* 258 * Input: while we're reading a message, we keep track of what phase 259 * we're in. In both phases, we use a REPMGR_IOVECS to keep track of 260 * our progress within the phase. Depending upon the message type, we 261 * end up with either a rep_message (which is a wrapper for the control 262 * and rec DBTs), or a single generic DBT. 263 * Any time we're in DATA_PHASE, it means we have already received 264 * the message header (consisting of msg_type and 2 sizes), and 265 * therefore we have allocated buffer space to read the data. (This is 266 * important for resource clean-up.) 267 */ 268 phase_t reading_phase; 269 REPMGR_IOVECS iovecs; 270 271 u_int8_t msg_type; 272 u_int32_t control_size_buf, rec_size_buf; 273 274 union { 275 REPMGR_MESSAGE *rep_message; 276 struct { 277 DBT cntrl, rec; 278 } repmgr_msg; 279 } input; 280}; 281 282#define IS_READY_STATE(s) ((s) == CONN_READY || (s) == CONN_CONGESTED) 283 284#ifdef HAVE_GETADDRINFO 285typedef struct addrinfo ADDRINFO; 286#else 287/* 288 * Some windows platforms have getaddrinfo (Windows XP), some don't. We don't 289 * support conditional compilation in our Windows build, so we always use our 290 * own getaddrinfo implementation. Rename everything so that we don't collide 291 * with the system libraries. 292 */ 293#undef AI_PASSIVE 294#define AI_PASSIVE 0x01 295#undef AI_CANONNAME 296#define AI_CANONNAME 0x02 297#undef AI_NUMERICHOST 298#define AI_NUMERICHOST 0x04 299 300typedef struct __addrinfo { 301 int ai_flags; /* AI_PASSIVE, AI_CANONNAME, AI_NUMERICHOST */ 302 int ai_family; /* PF_xxx */ 303 int ai_socktype; /* SOCK_xxx */ 304 int ai_protocol; /* 0 or IPPROTO_xxx for IPv4 and IPv6 */ 305 size_t ai_addrlen; /* length of ai_addr */ 306 char *ai_canonname; /* canonical name for nodename */ 307 struct sockaddr *ai_addr; /* binary address */ 308 struct __addrinfo *ai_next; /* next structure in linked list */ 309} ADDRINFO; 310#endif /* HAVE_GETADDRINFO */ 311 312typedef struct { 313 char *host; /* Separately allocated copy of string. */ 314 u_int16_t port; /* Stored in plain old host-byte-order. */ 315 ADDRINFO *address_list; 316 ADDRINFO *current; 317} repmgr_netaddr_t; 318 319/* 320 * Each site that we know about is either idle or connected. If it's connected, 321 * we have a reference to a connection object; if it's idle, we have a reference 322 * to a retry object. 323 * We store site objects in a simple array in the machtab, indexed by EID. 324 * (We allocate EID numbers for other sites simply according to their index 325 * within this array; we use the special value INT_MAX to represent our own 326 * EID.) 327 */ 328struct __repmgr_site { 329 repmgr_netaddr_t net_addr; 330 DB_LSN max_ack; /* Best ack we've heard from this site. */ 331 u_int32_t priority; 332 db_timespec last_rcvd_timestamp; 333 334#define SITE_IDLE 1 /* Waiting til time to retry connecting. */ 335#define SITE_CONNECTED 2 336 int state; 337 338#define SITE_HAS_PRIO 0x01 /* Set if priority field has valid value. */ 339 u_int32_t flags; 340 341 union { 342 REPMGR_CONNECTION *conn; /* when CONNECTED */ 343 REPMGR_RETRY *retry; /* when IDLE */ 344 } ref; 345}; 346 347/* 348 * Repmgr message formats. We pass these in the "control" portion of a message. 349 * For an ack, we just let the "rec" part go unused. But for a handshake, the 350 * "rec" part contains the variable-length host name (including terminating NUL 351 * character). 352 */ 353 354/* 355 * The hand-shake message is exchanged upon establishment of a connection. The 356 * message protocol version number here refers to the connection as a whole. In 357 * other words, it's an assertion that every message sent or received on this 358 * connection shall be of the specified version. Since repmgr uses TCP, a 359 * reliable stream-oriented protocol, this assertion is meaningful. 360 */ 361typedef struct { 362 u_int32_t version; 363 u_int16_t port; 364 u_int32_t priority; 365} DB_REPMGR_V1_HANDSHAKE; 366 367/* 368 * We store site structs in a dynamically allocated, growable array, indexed by 369 * EID. We allocate EID numbers for remote sites simply according to their 370 * index within this array. We don't need (the same kind of) info for ourself 371 * (the local site), so we use an EID value that won't conflict with any valid 372 * array index. 373 */ 374#define SITE_FROM_EID(eid) (&db_rep->sites[eid]) 375#define EID_FROM_SITE(s) ((int)((s) - (&db_rep->sites[0]))) 376#define IS_VALID_EID(e) ((e) >= 0) 377#define SELF_EID INT_MAX 378 379#define IS_PEER_POLICY(p) ((p) == DB_REPMGR_ACKS_ALL_PEERS || \ 380 (p) == DB_REPMGR_ACKS_QUORUM || \ 381 (p) == DB_REPMGR_ACKS_ONE_PEER) 382 383#define LOCK_MUTEX(m) do { \ 384 int __ret; \ 385 if ((__ret = __repmgr_lock_mutex(&(m))) != 0) \ 386 return (__ret); \ 387} while (0) 388 389#define UNLOCK_MUTEX(m) do { \ 390 int __ret; \ 391 if ((__ret = __repmgr_unlock_mutex(&(m))) != 0) \ 392 return (__ret); \ 393} while (0) 394 395/* POSIX/Win32 socket (and other) portability. */ 396#ifdef DB_WIN32 397#define WOULDBLOCK WSAEWOULDBLOCK 398#define INPROGRESS WSAEWOULDBLOCK 399 400#define net_errno WSAGetLastError() 401typedef int socklen_t; 402typedef char * sockopt_t; 403 404#define iov_len len 405#define iov_base buf 406 407typedef DWORD threadsync_timeout_t; 408 409#define REPMGR_SYNC_INITED(db_rep) (db_rep->waiters != NULL) 410#else 411 412#define INVALID_SOCKET -1 413#define SOCKET_ERROR -1 414#define WOULDBLOCK EWOULDBLOCK 415#define INPROGRESS EINPROGRESS 416 417#define net_errno errno 418typedef void * sockopt_t; 419 420#define closesocket(fd) close(fd) 421 422typedef struct timespec threadsync_timeout_t; 423 424#define REPMGR_SYNC_INITED(db_rep) (db_rep->read_pipe >= 0) 425#endif 426 427/* Macros to proceed, as with a cursor, through the address_list: */ 428#define ADDR_LIST_CURRENT(na) ((na)->current) 429#define ADDR_LIST_FIRST(na) ((na)->current = (na)->address_list) 430#define ADDR_LIST_NEXT(na) ((na)->current = (na)->current->ai_next) 431 432/* 433 * Various threads write onto TCP/IP sockets, and an I/O error could occur at 434 * any time. However, only the dedicated "select()" thread may close the socket 435 * file descriptor, because under POSIX we have to drop our mutex and then call 436 * select() as two distinct (non-atomic) operations. 437 * 438 * To simplify matters, there is a single place in the select thread where we 439 * close and clean up after any defunct connection. Even if the I/O error 440 * happens in the select thread we follow this convention. 441 * 442 * When an error occurs, we disable the connection (mark it defunct so that no 443 * one else will try to use it, and so that the select thread will find it and 444 * clean it up), and then usually take some additional recovery action: schedule 445 * a connection retry for later, and possibly call for an election if it was a 446 * connection to the master. (This happens in the function 447 * __repmgr_bust_connection.) But sometimes we don't want to do the recovery 448 * part; just the disabling part. 449 */ 450#define DISABLE_CONNECTION(conn) do { \ 451 (conn)->state = CONN_DEFUNCT; \ 452 (conn)->eid = -1; \ 453} while (0) 454 455#include "dbinc_auto/repmgr_ext.h" 456 457#if defined(__cplusplus) 458} 459#endif 460#endif /* !_DB_REPMGR_H_ */ 461