1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14static int message_loop __P((ENV *));
15static int process_message __P((ENV*, DBT*, DBT*, int));
16static int handle_newsite __P((ENV *, const DBT *));
17static int ack_message __P((ENV *, u_int32_t, DB_LSN *));
18static int ack_msg_conn __P((ENV *, REPMGR_CONNECTION *, u_int32_t, DB_LSN *));
19
20/*
21 * PUBLIC: void *__repmgr_msg_thread __P((void *));
22 */
23void *
24__repmgr_msg_thread(args)
25	void *args;
26{
27	ENV *env = args;
28	int ret;
29
30	if ((ret = message_loop(env)) != 0) {
31		__db_err(env, ret, "message thread failed");
32		__repmgr_thread_failure(env, ret);
33	}
34	return (NULL);
35}
36
37static int
38message_loop(env)
39	ENV *env;
40{
41	REPMGR_MESSAGE *msg;
42	int ret;
43
44	while ((ret = __repmgr_queue_get(env, &msg)) == 0) {
45		while ((ret = process_message(env, &msg->control, &msg->rec,
46		    msg->originating_eid)) == DB_LOCK_DEADLOCK)
47			RPRINT(env, DB_VERB_REPMGR_MISC,
48			    (env, "repmgr deadlock retry"));
49
50		__os_free(env, msg);
51		if (ret != 0)
52			return (ret);
53	}
54
55	return (ret == DB_REP_UNAVAIL ? 0 : ret);
56}
57
58static int
59process_message(env, control, rec, eid)
60	ENV *env;
61	DBT *control, *rec;
62	int eid;
63{
64	DB_LSN permlsn;
65	DB_REP *db_rep;
66	REP *rep;
67	int ret;
68	u_int32_t generation;
69
70	db_rep = env->rep_handle;
71	rep = db_rep->region;
72
73	/*
74	 * Save initial generation number, in case it changes in a close race
75	 * with a NEWMASTER.
76	 */
77	generation = rep->gen;
78
79	switch (ret =
80	    __rep_process_message_int(env, control, rec, eid, &permlsn)) {
81	case 0:
82		if (db_rep->takeover_pending) {
83			db_rep->takeover_pending = FALSE;
84			return (__repmgr_become_master(env));
85		}
86		break;
87
88	case DB_REP_NEWSITE:
89		return (handle_newsite(env, rec));
90
91	case DB_REP_HOLDELECTION:
92		LOCK_MUTEX(db_rep->mutex);
93		ret = __repmgr_init_election(env, ELECT_ELECTION);
94		UNLOCK_MUTEX(db_rep->mutex);
95		if (ret != 0)
96			return (ret);
97		break;
98
99	case DB_REP_DUPMASTER:
100		if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
101			return (ret);
102		LOCK_MUTEX(db_rep->mutex);
103		ret = __repmgr_init_election(env, ELECT_ELECTION);
104		UNLOCK_MUTEX(db_rep->mutex);
105		if (ret != 0)
106			return (ret);
107		break;
108
109	case DB_REP_ISPERM:
110		/*
111		 * Don't bother sending ack if master doesn't care about it.
112		 */
113		if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE ||
114		    (IS_PEER_POLICY(db_rep->perm_policy) &&
115		    rep->priority == 0))
116			break;
117
118		if ((ret = ack_message(env, generation, &permlsn)) != 0)
119			return (ret);
120
121		break;
122
123	case DB_REP_NOTPERM: /* FALLTHROUGH */
124	case DB_REP_IGNORE: /* FALLTHROUGH */
125	case DB_LOCK_DEADLOCK:
126		break;
127
128	default:
129		__db_err(env, ret, "DB_ENV->rep_process_message");
130		return (ret);
131	}
132	return (0);
133}
134
135/*
136 * Handle replication-related events.  Returns only 0 or DB_EVENT_NOT_HANDLED;
137 * no other error returns are tolerated.
138 *
139 * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *));
140 */
141int
142__repmgr_handle_event(env, event, info)
143	ENV *env;
144	u_int32_t event;
145	void *info;
146{
147	DB_REP *db_rep;
148
149	db_rep = env->rep_handle;
150
151	if (db_rep->selector == NULL) {
152		/* Repmgr is not in use, so all events go to application. */
153		return (DB_EVENT_NOT_HANDLED);
154	}
155
156	switch (event) {
157	case DB_EVENT_REP_ELECTED:
158		DB_ASSERT(env, info == NULL);
159
160		db_rep->found_master = TRUE;
161		db_rep->takeover_pending = TRUE;
162
163		/*
164		 * The application doesn't really need to see this, because the
165		 * purpose of this event is to tell the winning site that it
166		 * should call rep_start(MASTER), and in repmgr we do that
167		 * automatically.  Still, they could conceivably be curious, and
168		 * it doesn't hurt anything to let them know.
169		 */
170		break;
171	case DB_EVENT_REP_NEWMASTER:
172		DB_ASSERT(env, info != NULL);
173
174		db_rep->found_master = TRUE;
175		db_rep->master_eid = *(int *)info;
176
177		/* Application still needs to see this. */
178		break;
179	default:
180		break;
181	}
182	return (DB_EVENT_NOT_HANDLED);
183}
184
185static int
186ack_message(env, generation, lsn)
187	ENV *env;
188	u_int32_t generation;
189	DB_LSN *lsn;
190{
191	DB_REP *db_rep;
192	REPMGR_CONNECTION *conn;
193	REPMGR_SITE *site;
194	int ret;
195
196	db_rep = env->rep_handle;
197	/*
198	 * Regardless of where a message came from, all ack's go to the master
199	 * site.  If we're not in touch with the master, we drop it, since
200	 * there's not much else we can do.
201	 */
202	ret = 0;
203	LOCK_MUTEX(db_rep->mutex);
204	if (!IS_VALID_EID(db_rep->master_eid) ||
205	    db_rep->master_eid == SELF_EID) {
206		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
207		    "dropping ack with master %d", db_rep->master_eid));
208		goto unlock;
209	}
210
211	site = SITE_FROM_EID(db_rep->master_eid);
212
213	/*
214	 * Send the ack out on any/all connections that need it, rather than
215	 * going to the trouble of trying to keep track of what LSN's each
216	 * connection may be waiting for.
217	 */
218	if (site->state == SITE_CONNECTED &&
219	    (ret = ack_msg_conn(env, site->ref.conn, generation, lsn)) != 0)
220		goto unlock;
221	TAILQ_FOREACH(conn, &site->sub_conns, entries) {
222		if ((ret = ack_msg_conn(env, conn, generation, lsn)) != 0)
223			goto unlock;
224	}
225
226unlock:
227	UNLOCK_MUTEX(db_rep->mutex);
228	return (ret);
229}
230
231/*
232 * Sends an acknowledgment on one connection, if it needs it.
233 *
234 * !!! Called with mutex held.
235 */
236static int
237ack_msg_conn(env, conn, generation, lsn)
238	ENV *env;
239	REPMGR_CONNECTION *conn;
240	u_int32_t generation;
241	DB_LSN *lsn;
242{
243	DBT control2, rec2;
244	__repmgr_ack_args ack;
245	u_int8_t buf[__REPMGR_ACK_SIZE];
246	int ret;
247
248	ret = 0;
249
250	if (conn->state == CONN_READY) {
251		DB_ASSERT(env, conn->version > 0);
252		ack.generation = generation;
253		memcpy(&ack.lsn, lsn, sizeof(DB_LSN));
254		if (conn->version == 1) {
255			control2.data = &ack;
256			control2.size = sizeof(ack);
257		} else {
258			__repmgr_ack_marshal(env, &ack, buf);
259			control2.data = buf;
260			control2.size = __REPMGR_ACK_SIZE;
261		}
262		rec2.size = 0;
263		/*
264		 * It's hard to imagine anyone would care about a lost ack if
265		 * the path to the master is so congested as to need blocking;
266		 * so pass "blockable" argument as FALSE.
267		 */
268		if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK,
269		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
270			ret = __repmgr_bust_connection(env, conn);
271	}
272	return (ret);
273}
274
275/*
276 * Does everything necessary to handle the processing of a NEWSITE return.
277 */
278static int
279handle_newsite(env, rec)
280	ENV *env;
281	const DBT *rec;
282{
283	DB_REP *db_rep;
284	REPMGR_SITE *site;
285	SITE_STRING_BUFFER buffer;
286	size_t hlen;
287	u_int16_t port;
288	int ret;
289	char *host;
290
291	db_rep = env->rep_handle;
292	/*
293	 * Check if we got sent connect information and if we did, if
294	 * this is me or if we already have a connection to this new
295	 * site.  If we don't, establish a new one.
296	 *
297	 * Unmarshall the cdata: a 2-byte port number, in network byte order,
298	 * followed by the host name string, which should already be
299	 * null-terminated, but let's make sure.
300	 */
301	if (rec->size < sizeof(port) + 1) {
302		__db_errx(env, "unexpected cdata size, msg ignored");
303		return (0);
304	}
305	memcpy(&port, rec->data, sizeof(port));
306	port = ntohs(port);
307
308	host = (char*)((u_int8_t*)rec->data + sizeof(port));
309	hlen = (rec->size - sizeof(port)) - 1;
310	host[hlen] = '\0';
311
312	/* It's me, do nothing. */
313	if (strcmp(host, db_rep->my_addr.host) == 0 &&
314	    port == db_rep->my_addr.port) {
315		RPRINT(env, DB_VERB_REPMGR_MISC,
316		    (env, "repmgr ignores own NEWSITE info"));
317		return (0);
318	}
319
320	LOCK_MUTEX(db_rep->mutex);
321	if ((ret = __repmgr_add_site(env, host, port, &site, 0)) == EEXIST) {
322		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
323		    "NEWSITE info from %s was already known",
324		    __repmgr_format_site_loc(site, buffer)));
325		/*
326		 * This is a good opportunity to look up the host name, if
327		 * needed, because we're on a message thread (not the critical
328		 * select() thread).
329		 */
330		if ((ret = __repmgr_check_host_name(env,
331		    EID_FROM_SITE(site))) != 0)
332			return (ret);
333
334		if (site->state == SITE_CONNECTED)
335			goto unlock; /* Nothing to do. */
336	} else {
337		if (ret != 0)
338			goto unlock;
339		RPRINT(env, DB_VERB_REPMGR_MISC,
340		    (env, "NEWSITE info added %s",
341		    __repmgr_format_site_loc(site, buffer)));
342	}
343
344	/*
345	 * Wake up the main thread to connect to the new or reawakened
346	 * site.
347	 */
348	ret = __repmgr_wake_main_thread(env);
349
350unlock: UNLOCK_MUTEX(db_rep->mutex);
351	return (ret);
352}
353