1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr.h,v 12.19 2008/01/08 20:58:18 bostic Exp $
7 */
8
9#ifndef _DB_REPMGR_H_
10#define	_DB_REPMGR_H_
11
12#include "dbinc_auto/repmgr_auto.h"
13
14#if defined(__cplusplus)
15extern "C" {
16#endif
17
18/*
19 * Replication Framework message types.  These values are transmitted to
20 * identify messages sent between sites, even sites running differing versions
21 * of software.  Therefore, once assigned, the values are permanently "frozen".
22 * New message types added in later versions always get new (higher) values.
23 *
24 * For example, in repmgr wire protocol version 1 the highest assigned message
25 * type value was 3, for REPMGR_REP_MESSAGE.  Wire protocol version 2 added the
26 * HEARTBEAT message type (4).
27 *
28 * We still list them in alphabetical order, for ease of reference.  But this
29 * generally does not correspond to numerical order.
30 */
31#define	REPMGR_ACK		1	/* Acknowledgement. */
32#define	REPMGR_HANDSHAKE	2	/* Connection establishment sequence. */
33#define	REPMGR_HEARTBEAT	4	/* Monitor connection health. */
34#define	REPMGR_REP_MESSAGE	3	/* Normal replication message. */
35
36/* Heartbeats were introduced in version 2. */
37#define REPMGR_MAX_V1_MSG_TYPE	3
38#define REPMGR_MAX_V2_MSG_TYPE	4
39#define	HEARTBEAT_MIN_VERSION	2
40
41/* The range of protocol versions we're willing to support. */
42#define	DB_REPMGR_VERSION	2
43#define DB_REPMGR_MIN_VERSION	1
44
45#ifdef DB_WIN32
46typedef SOCKET socket_t;
47typedef HANDLE thread_id_t;
48typedef HANDLE mgr_mutex_t;
49typedef HANDLE cond_var_t;
50typedef WSABUF db_iovec_t;
51#else
52typedef int socket_t;
53typedef pthread_t thread_id_t;
54typedef pthread_mutex_t mgr_mutex_t;
55typedef pthread_cond_t cond_var_t;
56typedef struct iovec db_iovec_t;
57#endif
58
59/*
60 * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
61 * a queue per connection, waiting for TCP buffer space to become available in
62 * the kernel.  Rather than exceeding this limit, we simply discard additional
63 * messages (since this is always allowed by the replication protocol).
64 *    As a special dispensation, if a message is destined for a specific remote
65 * site (i.e., it's not a broadcast), then we first try blocking the sending
66 * thread, waiting for space to become available (though we only wait a limited
67 * time).  This is so as to be able to handle the immediate flood of (a
68 * potentially large number of) outgoing messages that replication generates, in
69 * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
70 */
71#define	OUT_QUEUE_LIMIT	10
72
73/*
74 * The system value is available from sysconf(_SC_HOST_NAME_MAX).
75 * Historically, the maximum host name was 256.
76 */
77#ifndef MAXHOSTNAMELEN
78#define	MAXHOSTNAMELEN	256
79#endif
80
81/* A buffer big enough for the string "site host.domain.com:65535". */
82#define	MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
83typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
84
85/* Default timeout values, in seconds. */
86#define	DB_REPMGR_DEFAULT_ACK_TIMEOUT		(1 * US_PER_SEC)
87#define	DB_REPMGR_DEFAULT_CONNECTION_RETRY	(30 * US_PER_SEC)
88#define	DB_REPMGR_DEFAULT_ELECTION_RETRY	(10 * US_PER_SEC)
89
90struct __repmgr_connection;
91    typedef struct __repmgr_connection REPMGR_CONNECTION;
92struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
93struct __queued_output; typedef struct __queued_output QUEUED_OUTPUT;
94struct __repmgr_retry; typedef struct __repmgr_retry REPMGR_RETRY;
95struct __repmgr_runnable; typedef struct __repmgr_runnable REPMGR_RUNNABLE;
96struct __repmgr_site; typedef struct __repmgr_site REPMGR_SITE;
97struct __ack_waiters_table;
98    typedef struct __ack_waiters_table ACK_WAITERS_TABLE;
99
100typedef TAILQ_HEAD(__repmgr_conn_list, __repmgr_connection) CONNECTION_LIST;
101typedef STAILQ_HEAD(__repmgr_out_q_head, __queued_output) OUT_Q_HEADER;
102typedef TAILQ_HEAD(__repmgr_retry_q, __repmgr_retry) RETRY_Q_HEADER;
103
104/* Information about threads managed by Replication Framework. */
105struct __repmgr_runnable {
106	ENV *env;
107	thread_id_t thread_id;
108	void *(*run) __P((void *));
109	int finished;
110};
111
112/*
113 * Information about pending connection establishment retry operations.
114 *
115 * We keep these in order by time.  This works, under the assumption that the
116 * DB_REP_CONNECTION_RETRY never changes once we get going (though that
117 * assumption is of course wrong, so this needs to be fixed).
118 *
119 * Usually, we put things onto the tail end of the list.  But when we add a new
120 * site while threads are running, we trigger its first connection attempt by
121 * scheduling a retry for "0" microseconds from now, putting its retry element
122 * at the head of the list instead.
123 *
124 * TODO: I think this can be fixed by defining "time" to be the time the element
125 * was added (with some convention like "0" meaning immediate), rather than the
126 * deadline time.
127 */
128struct __repmgr_retry {
129	TAILQ_ENTRY(__repmgr_retry) entries;
130	u_int eid;
131	db_timespec time;
132};
133
134/*
135 * We use scatter/gather I/O for both reading and writing.  The largest number
136 * of buffers we ever try to use at once is 5, corresponding to the 5 segments
137 * of a message described in the "wire protocol" (repmgr_net.c).
138 */
139typedef struct {
140	db_iovec_t vectors[5];
141
142	/*
143	 * Index of the first iovec to be used.  Initially of course this is
144	 * zero.  But as we progress through partial I/O transfers, it ends up
145	 * pointing to the first iovec to be used on the next operation.
146	 */
147	int offset;
148
149	/*
150	 * Total number of pieces defined for this message; equal to the number
151	 * of times add_buffer and/or add_dbt were called to populate it.  We do
152	 * *NOT* revise this as we go along.  So subsequent I/O operations must
153	 * use count-offset to get the number of active vector pieces still
154	 * remaining.
155	 */
156	int count;
157
158	/*
159	 * Total number of bytes accounted for in all the pieces of this
160	 * message.  We do *NOT* revise this as we go along (though it's not
161	 * clear we shouldn't).
162	 */
163	size_t total_bytes;
164} REPMGR_IOVECS;
165
166typedef struct {
167	size_t length;		/* number of bytes in data */
168	int ref_count;		/* # of sites' send queues pointing to us */
169	u_int8_t data[1];	/* variable size data area */
170} REPMGR_FLAT;
171
172struct __queued_output {
173	STAILQ_ENTRY(__queued_output) entries;
174	REPMGR_FLAT *msg;
175	size_t offset;
176};
177
178/*
179 * The following is for input.  Once we know the sizes of the pieces of an
180 * incoming message, we can create this struct (and also the data areas for the
181 * pieces themselves, in the same memory allocation).  This is also the struct
182 * in which the message lives while it's waiting to be processed by message
183 * threads.
184 */
185typedef struct __repmgr_message {
186	STAILQ_ENTRY(__repmgr_message) entries;
187	int originating_eid;
188	DBT control, rec;
189} REPMGR_MESSAGE;
190
191typedef enum {
192	SIZES_PHASE,
193	DATA_PHASE
194} phase_t;
195
196/*
197 * If another site initiates a connection to us, when we receive it the
198 * connection state is immediately "connected".  But when we initiate the
199 * connection to another site, it first has to go through a "connecting" state,
200 * until the non-blocking connect() I/O operation completes successfully.
201 *     With an outgoing connection, we always know the associated site (and so
202 * we have a valid eid).  But with an incoming connection, we don't know the
203 * site until we get a handshake message, so until that time the eid is
204 * invalid.
205 */
206struct __repmgr_connection {
207	TAILQ_ENTRY(__repmgr_connection) entries;
208
209	int eid;		/* index into sites array in machtab */
210	socket_t fd;
211#ifdef DB_WIN32
212	WSAEVENT event_object;
213#endif
214
215	u_int32_t version;	/* Wire protocol version on this connection. */
216				/* (0 means not yet determined.) */
217
218#define	CONN_INCOMING	0x01	/* We received this via accept(). */
219	u_int32_t flags;
220
221/*
222 * When we initiate an outgoing connection, it starts off in CONNECTING state
223 * (or possibly CONNECTED).  When the (non-blocking) connection operation later
224 * completes, we move to CONNECTED state.  When we get the response to our
225 * version negotiation, we move to READY.
226 *     For incoming connections that we accept, we start in NEGOTIATE, then to
227 * PARAMETERS, and then to READY.
228 *     CONGESTED is a hierarchical substate of READY: it's just like READY, with
229 * the additional wrinkle that we don't bother waiting for the outgoing queue to
230 * drain in certain circumstances.
231 */
232#define	CONN_CONGESTED	1	/* Long-lived full outgoing queue. */
233#define	CONN_CONNECTED	2	/* Awaiting reply to our version negotiation. */
234#define	CONN_CONNECTING	3	/* Awaiting completion of non-block connect. */
235#define	CONN_DEFUNCT	4	/* Basically dead, awaiting clean-up. */
236#define	CONN_NEGOTIATE	5	/* Awaiting version proposal. */
237#define	CONN_PARAMETERS	6	/* Awaiting parameters handshake. */
238#define	CONN_READY	7	/* Everything's fine. */
239	int state;
240
241	/*
242	 * Output: usually we just simply write messages right in line, in the
243	 * send() function's thread.  But if TCP doesn't have enough network
244	 * buffer space for us when we first try it, we instead allocate some
245	 * memory, and copy the message, and then send it as space becomes
246	 * available in our main select() thread.  In some cases, if the queue
247	 * gets too long we wait until it's drained, and then append to it.
248	 * This condition variable's associated mutex is the normal per-repmgr
249	 * db_rep->mutex, because that mutex is always held anyway whenever the
250	 * output queue is consulted.
251	 */
252	OUT_Q_HEADER outbound_queue;
253	int out_queue_length;
254	cond_var_t drained;
255	int blockers;		/* ref count of msg threads waiting on us */
256
257	/*
258	 * Input: while we're reading a message, we keep track of what phase
259	 * we're in.  In both phases, we use a REPMGR_IOVECS to keep track of
260	 * our progress within the phase.  Depending upon the message type, we
261	 * end up with either a rep_message (which is a wrapper for the control
262	 * and rec DBTs), or a single generic DBT.
263	 *     Any time we're in DATA_PHASE, it means we have already received
264	 * the message header (consisting of msg_type and 2 sizes), and
265	 * therefore we have allocated buffer space to read the data.  (This is
266	 * important for resource clean-up.)
267	 */
268	phase_t		reading_phase;
269	REPMGR_IOVECS iovecs;
270
271	u_int8_t	msg_type;
272	u_int32_t	control_size_buf, rec_size_buf;
273
274	union {
275		REPMGR_MESSAGE *rep_message;
276		struct {
277			DBT cntrl, rec;
278		} repmgr_msg;
279	} input;
280};
281
282#define	IS_READY_STATE(s)	((s) == CONN_READY || (s) == CONN_CONGESTED)
283
284#ifdef HAVE_GETADDRINFO
285typedef struct addrinfo	ADDRINFO;
286#else
287/*
288 * Some windows platforms have getaddrinfo (Windows XP), some don't.  We don't
289 * support conditional compilation in our Windows build, so we always use our
290 * own getaddrinfo implementation.  Rename everything so that we don't collide
291 * with the system libraries.
292 */
293#undef	AI_PASSIVE
294#define	AI_PASSIVE	0x01
295#undef	AI_CANONNAME
296#define	AI_CANONNAME	0x02
297#undef	AI_NUMERICHOST
298#define	AI_NUMERICHOST	0x04
299
300typedef struct __addrinfo {
301	int ai_flags;		/* AI_PASSIVE, AI_CANONNAME, AI_NUMERICHOST */
302	int ai_family;		/* PF_xxx */
303	int ai_socktype;	/* SOCK_xxx */
304	int ai_protocol;	/* 0 or IPPROTO_xxx for IPv4 and IPv6 */
305	size_t ai_addrlen;	/* length of ai_addr */
306	char *ai_canonname;	/* canonical name for nodename */
307	struct sockaddr *ai_addr;	/* binary address */
308	struct __addrinfo *ai_next;	/* next structure in linked list */
309} ADDRINFO;
310#endif /* HAVE_GETADDRINFO */
311
312typedef struct {
313	char *host;		/* Separately allocated copy of string. */
314	u_int16_t port;		/* Stored in plain old host-byte-order. */
315	ADDRINFO *address_list;
316	ADDRINFO *current;
317} repmgr_netaddr_t;
318
319/*
320 * Each site that we know about is either idle or connected.  If it's connected,
321 * we have a reference to a connection object; if it's idle, we have a reference
322 * to a retry object.
323 *     We store site objects in a simple array in the machtab, indexed by EID.
324 * (We allocate EID numbers for other sites simply according to their index
325 * within this array; we use the special value INT_MAX to represent our own
326 * EID.)
327 */
328struct __repmgr_site {
329	repmgr_netaddr_t net_addr;
330	DB_LSN max_ack;		/* Best ack we've heard from this site. */
331	u_int32_t priority;
332	db_timespec last_rcvd_timestamp;
333
334#define	SITE_IDLE 1		/* Waiting til time to retry connecting. */
335#define	SITE_CONNECTED 2
336	int state;
337
338#define	SITE_HAS_PRIO	0x01	/* Set if priority field has valid value. */
339	u_int32_t flags;
340
341	union {
342		REPMGR_CONNECTION *conn; /* when CONNECTED */
343		REPMGR_RETRY *retry; /* when IDLE */
344	} ref;
345};
346
347/*
348 * Repmgr message formats.  We pass these in the "control" portion of a message.
349 * For an ack, we just let the "rec" part go unused.  But for a handshake, the
350 * "rec" part contains the variable-length host name (including terminating NUL
351 * character).
352 */
353
354/*
355 * The hand-shake message is exchanged upon establishment of a connection.  The
356 * message protocol version number here refers to the connection as a whole.  In
357 * other words, it's an assertion that every message sent or received on this
358 * connection shall be of the specified version.  Since repmgr uses TCP, a
359 * reliable stream-oriented protocol, this assertion is meaningful.
360 */
361typedef struct {
362	u_int32_t version;
363	u_int16_t port;
364	u_int32_t priority;
365} DB_REPMGR_V1_HANDSHAKE;
366
367/*
368 * We store site structs in a dynamically allocated, growable array, indexed by
369 * EID.  We allocate EID numbers for remote sites simply according to their
370 * index within this array.  We don't need (the same kind of) info for ourself
371 * (the local site), so we use an EID value that won't conflict with any valid
372 * array index.
373 */
374#define	SITE_FROM_EID(eid)	(&db_rep->sites[eid])
375#define	EID_FROM_SITE(s)	((int)((s) - (&db_rep->sites[0])))
376#define	IS_VALID_EID(e)		((e) >= 0)
377#define	SELF_EID		INT_MAX
378
379#define	IS_PEER_POLICY(p) ((p) == DB_REPMGR_ACKS_ALL_PEERS ||		\
380    (p) == DB_REPMGR_ACKS_QUORUM ||		\
381    (p) == DB_REPMGR_ACKS_ONE_PEER)
382
383#define	LOCK_MUTEX(m) do {						\
384	int __ret;							\
385	if ((__ret = __repmgr_lock_mutex(&(m))) != 0)			\
386		return (__ret);						\
387} while (0)
388
389#define	UNLOCK_MUTEX(m) do {						\
390	int __ret;							\
391	if ((__ret = __repmgr_unlock_mutex(&(m))) != 0)			\
392		return (__ret);						\
393} while (0)
394
395/* POSIX/Win32 socket (and other) portability. */
396#ifdef DB_WIN32
397#define	WOULDBLOCK		WSAEWOULDBLOCK
398#define	INPROGRESS		WSAEWOULDBLOCK
399
400#define	net_errno		WSAGetLastError()
401typedef int socklen_t;
402typedef char * sockopt_t;
403
404#define	iov_len len
405#define	iov_base buf
406
407typedef DWORD threadsync_timeout_t;
408
409#define	REPMGR_SYNC_INITED(db_rep) (db_rep->waiters != NULL)
410#else
411
412#define	INVALID_SOCKET		-1
413#define	SOCKET_ERROR		-1
414#define	WOULDBLOCK		EWOULDBLOCK
415#define	INPROGRESS		EINPROGRESS
416
417#define	net_errno		errno
418typedef void * sockopt_t;
419
420#define	closesocket(fd)		close(fd)
421
422typedef struct timespec threadsync_timeout_t;
423
424#define	REPMGR_SYNC_INITED(db_rep) (db_rep->read_pipe >= 0)
425#endif
426
427/* Macros to proceed, as with a cursor, through the address_list: */
428#define	ADDR_LIST_CURRENT(na)	((na)->current)
429#define	ADDR_LIST_FIRST(na)	((na)->current = (na)->address_list)
430#define	ADDR_LIST_NEXT(na)	((na)->current = (na)->current->ai_next)
431
432/*
433 * Various threads write onto TCP/IP sockets, and an I/O error could occur at
434 * any time.  However, only the dedicated "select()" thread may close the socket
435 * file descriptor, because under POSIX we have to drop our mutex and then call
436 * select() as two distinct (non-atomic) operations.
437 *
438 * To simplify matters, there is a single place in the select thread where we
439 * close and clean up after any defunct connection.  Even if the I/O error
440 * happens in the select thread we follow this convention.
441 *
442 * When an error occurs, we disable the connection (mark it defunct so that no
443 * one else will try to use it, and so that the select thread will find it and
444 * clean it up), and then usually take some additional recovery action: schedule
445 * a connection retry for later, and possibly call for an election if it was a
446 * connection to the master.  (This happens in the function
447 * __repmgr_bust_connection.)  But sometimes we don't want to do the recovery
448 * part; just the disabling part.
449 */
450#define DISABLE_CONNECTION(conn) do {					 \
451	(conn)->state = CONN_DEFUNCT;					 \
452	(conn)->eid = -1;						 \
453} while (0)
454
455#include "dbinc_auto/repmgr_ext.h"
456
457#if defined(__cplusplus)
458}
459#endif
460#endif /* !_DB_REPMGR_H_ */
461