1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_msg.c,v 1.42 2008/01/08 20:58:48 bostic Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14static int message_loop __P((ENV *));
15static int process_message __P((ENV*, DBT*, DBT*, int));
16static int handle_newsite __P((ENV *, const DBT *));
17static int ack_message __P((ENV *, u_int32_t, DB_LSN *));
18
19/*
20 * PUBLIC: void *__repmgr_msg_thread __P((void *));
21 */
22void *
23__repmgr_msg_thread(args)
24	void *args;
25{
26	ENV *env = args;
27	int ret;
28
29	if ((ret = message_loop(env)) != 0) {
30		__db_err(env, ret, "message thread failed");
31		__repmgr_thread_failure(env, ret);
32	}
33	return (NULL);
34}
35
36static int
37message_loop(env)
38	ENV *env;
39{
40	REPMGR_MESSAGE *msg;
41	int ret;
42
43	while ((ret = __repmgr_queue_get(env, &msg)) == 0) {
44		while ((ret = process_message(env, &msg->control, &msg->rec,
45		    msg->originating_eid)) == DB_LOCK_DEADLOCK)
46			RPRINT(env, DB_VERB_REPMGR_MISC,
47			    (env, "repmgr deadlock retry"));
48
49		__os_free(env, msg);
50		if (ret != 0)
51			return (ret);
52	}
53
54	return (ret == DB_REP_UNAVAIL ? 0 : ret);
55}
56
57static int
58process_message(env, control, rec, eid)
59	ENV *env;
60	DBT *control, *rec;
61	int eid;
62{
63	DB_LSN permlsn;
64	DB_REP *db_rep;
65	REP *rep;
66	int ret;
67	u_int32_t generation;
68
69	db_rep = env->rep_handle;
70
71	/*
72	 * Save initial generation number, in case it changes in a close race
73	 * with a NEWMASTER.  See msgdir.10000/10039/msg00086.html.
74	 */
75	generation = db_rep->generation;
76
77	switch (ret =
78	    __rep_process_message(env->dbenv, control, rec, eid, &permlsn)) {
79	case 0:
80		if (db_rep->takeover_pending) {
81			db_rep->takeover_pending = FALSE;
82			return (__repmgr_become_master(env));
83		}
84		break;
85
86	case DB_REP_NEWSITE:
87		return (handle_newsite(env, rec));
88
89	case DB_REP_HOLDELECTION:
90		LOCK_MUTEX(db_rep->mutex);
91		ret = __repmgr_init_election(env, ELECT_ELECTION);
92		UNLOCK_MUTEX(db_rep->mutex);
93		if (ret != 0)
94			return (ret);
95		break;
96
97	case DB_REP_DUPMASTER:
98		if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
99			return (ret);
100		LOCK_MUTEX(db_rep->mutex);
101		ret = __repmgr_init_election(env, ELECT_ELECTION);
102		UNLOCK_MUTEX(db_rep->mutex);
103		if (ret != 0)
104			return (ret);
105		break;
106
107	case DB_REP_ISPERM:
108		/*
109		 * Don't bother sending ack if master doesn't care about it.
110		 */
111		rep = db_rep->region;
112		if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE ||
113		    (IS_PEER_POLICY(db_rep->perm_policy) &&
114		    rep->priority == 0))
115			break;
116
117		if ((ret = ack_message(env, generation, &permlsn)) != 0)
118			return (ret);
119
120		break;
121
122	case DB_REP_NOTPERM: /* FALLTHROUGH */
123	case DB_REP_IGNORE: /* FALLTHROUGH */
124	case DB_LOCK_DEADLOCK:
125		break;
126
127	default:
128		__db_err(env, ret, "DB_ENV->rep_process_message");
129		return (ret);
130	}
131	return (0);
132}
133
134/*
135 * Handle replication-related events.  Returns only 0 or DB_EVENT_NOT_HANDLED;
136 * no other error returns are tolerated.
137 *
138 * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *));
139 */
140int
141__repmgr_handle_event(env, event, info)
142	ENV *env;
143	u_int32_t event;
144	void *info;
145{
146	DB_REP *db_rep;
147
148	db_rep = env->rep_handle;
149
150	if (db_rep->selector == NULL) {
151		/* Repmgr is not in use, so all events go to application. */
152		return (DB_EVENT_NOT_HANDLED);
153	}
154
155	switch (event) {
156	case DB_EVENT_REP_ELECTED:
157		DB_ASSERT(env, info == NULL);
158
159		db_rep->found_master = TRUE;
160		db_rep->takeover_pending = TRUE;
161
162		/*
163		 * The application doesn't really need to see this, because the
164		 * purpose of this event is to tell the winning site that it
165		 * should call rep_start(MASTER), and in repmgr we do that
166		 * automatically.  Still, they could conceivably be curious, and
167		 * it doesn't hurt anything to let them know.
168		 */
169		break;
170	case DB_EVENT_REP_NEWMASTER:
171		DB_ASSERT(env, info != NULL);
172
173		db_rep->found_master = TRUE;
174		db_rep->master_eid = *(int *)info;
175		__repmgr_stash_generation(env);
176
177		/* Application still needs to see this. */
178		break;
179	default:
180		break;
181	}
182	return (DB_EVENT_NOT_HANDLED);
183}
184
185/*
186 * Acknowledges a message.
187 */
188static int
189ack_message(env, generation, lsn)
190	ENV *env;
191	u_int32_t generation;
192	DB_LSN *lsn;
193{
194	DBT control2, rec2;
195	DB_REP *db_rep;
196	__repmgr_ack_args ack;
197	u_int8_t buf[__REPMGR_ACK_SIZE];
198	REPMGR_CONNECTION *conn;
199	REPMGR_SITE *site;
200	int ret;
201
202	db_rep = env->rep_handle;
203	/*
204	 * Regardless of where a message came from, all ack's go to the master
205	 * site.  If we're not in touch with the master, we drop it, since
206	 * there's not much else we can do.
207	 */
208	if (!IS_VALID_EID(db_rep->master_eid) ||
209	    db_rep->master_eid == SELF_EID) {
210		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
211		    "dropping ack with master %d", db_rep->master_eid));
212		return (0);
213	}
214
215	ret = 0;
216	LOCK_MUTEX(db_rep->mutex);
217	site = SITE_FROM_EID(db_rep->master_eid);
218	if (site->state == SITE_CONNECTED &&
219	    site->ref.conn->state == CONN_READY) {
220		conn = site->ref.conn;
221		DB_ASSERT(env, conn->version > 0);
222		ack.generation = generation;
223		memcpy(&ack.lsn, lsn, sizeof(DB_LSN));
224		if (conn->version == 1) {
225			control2.data = &ack;
226			control2.size = sizeof(ack);
227		} else {
228			__repmgr_ack_marshal(env, &ack, buf);
229			control2.data = buf;
230			control2.size = __REPMGR_ACK_SIZE;
231		}
232		rec2.size = 0;
233		/*
234		 * It's hard to imagine anyone would care about a lost ack if
235		 * the path to the master is so congested as to need blocking;
236		 * so pass "blockable" argument as FALSE.
237		 */
238		if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK,
239		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
240			ret = __repmgr_bust_connection(env, conn);
241	}
242
243	UNLOCK_MUTEX(db_rep->mutex);
244	return (ret);
245}
246
247/*
248 * Does everything necessary to handle the processing of a NEWSITE return.
249 */
250static int
251handle_newsite(env, rec)
252	ENV *env;
253	const DBT *rec;
254{
255	ADDRINFO *ai;
256	DB_REP *db_rep;
257	REPMGR_SITE *site;
258	SITE_STRING_BUFFER buffer;
259	repmgr_netaddr_t *addr;
260	size_t hlen;
261	u_int16_t port;
262	int ret;
263	char *host;
264
265	db_rep = env->rep_handle;
266	/*
267	 * Check if we got sent connect information and if we did, if
268	 * this is me or if we already have a connection to this new
269	 * site.  If we don't, establish a new one.
270	 *
271	 * Unmarshall the cdata: a 2-byte port number, in network byte order,
272	 * followed by the host name string, which should already be
273	 * null-terminated, but let's make sure.
274	 */
275	if (rec->size < sizeof(port) + 1) {
276		__db_errx(env, "unexpected cdata size, msg ignored");
277		return (0);
278	}
279	memcpy(&port, rec->data, sizeof(port));
280	port = ntohs(port);
281
282	host = (char*)((u_int8_t*)rec->data + sizeof(port));
283	hlen = (rec->size - sizeof(port)) - 1;
284	host[hlen] = '\0';
285
286	/* It's me, do nothing. */
287	if (strcmp(host, db_rep->my_addr.host) == 0 &&
288	    port == db_rep->my_addr.port) {
289		RPRINT(env, DB_VERB_REPMGR_MISC,
290		    (env, "repmgr ignores own NEWSITE info"));
291		return (0);
292	}
293
294	LOCK_MUTEX(db_rep->mutex);
295	if ((ret = __repmgr_add_site(env, host, port, &site)) == EEXIST) {
296		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
297		    "NEWSITE info from %s was already known",
298		    __repmgr_format_site_loc(site, buffer)));
299		/*
300		 * In case we already know about this site only because it
301		 * first connected to us, we may not yet have had a chance to
302		 * look up its addresses.  Even though we don't need them just
303		 * now, this is an advantageous opportunity to get them since we
304		 * can do so away from the critical select thread.  Give up only
305		 * for a disastrous failure.
306		 */
307		addr = &site->net_addr;
308		if (addr->address_list == NULL) {
309			if ((ret = __repmgr_getaddr(env,
310			    addr->host, addr->port, 0, &ai)) == 0)
311				addr->address_list = ai;
312			else if (ret != DB_REP_UNAVAIL)
313				goto unlock;
314		}
315
316		ret = 0;
317		if (site->state == SITE_CONNECTED)
318			goto unlock; /* Nothing to do. */
319	} else {
320		if (ret != 0)
321			goto unlock;
322		RPRINT(env, DB_VERB_REPMGR_MISC,
323		    (env, "NEWSITE info added %s",
324		    __repmgr_format_site_loc(site, buffer)));
325	}
326
327	/*
328	 * Wake up the main thread to connect to the new or reawakened
329	 * site.
330	 */
331	ret = __repmgr_wake_main_thread(env);
332
333unlock: UNLOCK_MUTEX(db_rep->mutex);
334	return (ret);
335}
336
337/*
338 * PUBLIC: void __repmgr_stash_generation __P((ENV *));
339 */
340void
341__repmgr_stash_generation(env)
342	ENV *env;
343{
344	DB_REP *db_rep;
345	REP *rep;
346
347	db_rep = env->rep_handle;
348	rep = db_rep->region;
349
350	db_rep->generation = rep->gen;
351}
352