1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_method.c,v 1.48 2008/04/25 19:02:47 alanb Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14static int __repmgr_await_threads __P((ENV *));
15
16/*
17 * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
18 */
19int
20__repmgr_start(dbenv, nthreads, flags)
21	DB_ENV *dbenv;
22	int nthreads;
23	u_int32_t flags;
24{
25	DBT my_addr;
26	DB_REP *db_rep;
27	ENV *env;
28	REPMGR_RUNNABLE *selector, *messenger;
29	int ret, i;
30
31	env = dbenv->env;
32	db_rep = env->rep_handle;
33
34	if (!F_ISSET(env, ENV_THREAD)) {
35		__db_errx(env,
36		    "Replication Manager needs an environment with DB_THREAD");
37		return (EINVAL);
38	}
39
40	/* Check that the required initialization has been done. */
41	if (db_rep->my_addr.port == 0) {
42		__db_errx(env,
43		    "repmgr_set_local_site must be called before repmgr_start");
44		return (EINVAL);
45	}
46
47	if (db_rep->selector != NULL || db_rep->finished) {
48		__db_errx(env,
49		    "DB_ENV->repmgr_start may not be called more than once");
50		return (EINVAL);
51	}
52
53	switch (flags) {
54	case DB_REP_CLIENT:
55	case DB_REP_ELECTION:
56	case DB_REP_MASTER:
57		break;
58	default:
59		__db_errx(env,
60		    "repmgr_start: unrecognized flags parameter value");
61		return (EINVAL);
62	}
63
64	if (nthreads <= 0) {
65		__db_errx(env,
66		    "repmgr_start: nthreads parameter must be >= 1");
67		return (EINVAL);
68	}
69
70	if ((ret = __os_calloc(env, (u_int)nthreads,
71	   sizeof(REPMGR_RUNNABLE *), &db_rep->messengers)) != 0)
72		return (ret);
73	db_rep->nthreads = nthreads;
74
75	if ((ret = __repmgr_net_init(env, db_rep)) != 0 ||
76	    (ret = __repmgr_init_sync(env, db_rep)) != 0 ||
77	    (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0)
78		return (ret);
79
80	/*
81	 * Make some sort of call to rep_start before starting other threads, to
82	 * ensure that incoming messages being processed always have a rep
83	 * context properly configured.
84	 */
85	if ((db_rep->init_policy = flags) == DB_REP_MASTER)
86		ret = __repmgr_become_master(env);
87	else {
88		if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
89			return (ret);
90		ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT);
91		__os_free(env, my_addr.data);
92		if (ret == 0) {
93			LOCK_MUTEX(db_rep->mutex);
94			ret = __repmgr_init_election(env, ELECT_SEEK_MASTER);
95			UNLOCK_MUTEX(db_rep->mutex);
96		}
97	}
98	if (ret != 0)
99		return (ret);
100
101	if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), &selector))
102	    != 0)
103		return (ret);
104	selector->env = env;
105	selector->run = __repmgr_select_thread;
106	if ((ret = __repmgr_thread_start(env, selector)) != 0) {
107		__db_err(env, ret, "can't start selector thread");
108		__os_free(env, selector);
109		return (ret);
110	}
111	db_rep->selector = selector;
112
113	for (i=0; i<nthreads; i++) {
114		if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE),
115		    &messenger)) != 0)
116			return (ret);
117
118		messenger->env = env;
119		messenger->run = __repmgr_msg_thread;
120		if ((ret = __repmgr_thread_start(env, messenger)) != 0) {
121			__os_free(env, messenger);
122			return (ret);
123		}
124		db_rep->messengers[i] = messenger;
125	}
126
127	return (ret);
128}
129
130/*
131 * PUBLIC: int __repmgr_close __P((ENV *));
132 */
133int
134__repmgr_close(env)
135	ENV *env;
136{
137	DB_REP *db_rep;
138	int ret, t_ret;
139
140	ret = 0;
141	db_rep = env->rep_handle;
142	if (db_rep->selector != NULL) {
143		RPRINT(env, DB_VERB_REPMGR_MISC,
144		    (env, "Stopping repmgr threads"));
145		ret = __repmgr_stop_threads(env);
146		if ((t_ret = __repmgr_await_threads(env)) != 0 && ret == 0)
147			ret = t_ret;
148		RPRINT(env, DB_VERB_REPMGR_MISC,
149		    (env, "Repmgr threads are finished"));
150	}
151
152	if ((t_ret = __repmgr_net_close(env)) != 0 && ret == 0)
153		ret = t_ret;
154
155	if ((t_ret = __repmgr_close_sync(env)) != 0 && ret == 0)
156		ret = t_ret;
157
158	return (ret);
159}
160
161/*
162 * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int));
163 */
164int
165__repmgr_set_ack_policy(dbenv, policy)
166	DB_ENV *dbenv;
167	int policy;
168{
169	ENV *env;
170
171	env = dbenv->env;
172
173	switch (policy) {
174	case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */
175	case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */
176	case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */
177	case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */
178	case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */
179	case DB_REPMGR_ACKS_QUORUM:
180		env->rep_handle->perm_policy = policy;
181		return (0);
182	default:
183		__db_errx(env,
184		    "unknown ack_policy in DB_ENV->repmgr_set_ack_policy");
185		return (EINVAL);
186	}
187}
188
189/*
190 * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *));
191 */
192int
193__repmgr_get_ack_policy(dbenv, policy)
194	DB_ENV *dbenv;
195	int *policy;
196{
197	ENV *env;
198
199	env = dbenv->env;
200	*policy = env->rep_handle->perm_policy;
201	return (0);
202}
203
204/*
205 * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *));
206 */
207int
208__repmgr_env_create(env, db_rep)
209	ENV *env;
210	DB_REP *db_rep;
211{
212	int ret;
213
214	/* Set some default values. */
215	db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
216	db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
217	db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
218	db_rep->config_nsites = 0;
219	db_rep->peer = DB_EID_INVALID;
220	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
221
222#ifdef DB_WIN32
223	db_rep->waiters = NULL;
224#else
225	db_rep->read_pipe = db_rep->write_pipe = -1;
226#endif
227	if ((ret = __repmgr_net_create(db_rep)) == 0)
228		ret = __repmgr_queue_create(env, db_rep);
229
230	return (ret);
231}
232
233/*
234 * PUBLIC: void __repmgr_env_destroy __P((ENV *, DB_REP *));
235 */
236void
237__repmgr_env_destroy(env, db_rep)
238	ENV *env;
239	DB_REP *db_rep;
240{
241	__repmgr_queue_destroy(env);
242	__repmgr_net_destroy(env, db_rep);
243	if (db_rep->messengers != NULL) {
244		__os_free(env, db_rep->messengers);
245		db_rep->messengers = NULL;
246	}
247}
248
249/*
250 * PUBLIC: int __repmgr_stop_threads __P((ENV *));
251 */
252int
253__repmgr_stop_threads(env)
254	ENV *env;
255{
256	DB_REP *db_rep;
257	REPMGR_CONNECTION *conn;
258	int ret;
259
260	db_rep = env->rep_handle;
261
262	/*
263	 * Hold mutex for the purpose of waking up threads, but then get out of
264	 * the way to let them clean up and exit.
265	 */
266	LOCK_MUTEX(db_rep->mutex);
267	db_rep->finished = TRUE;
268	if (db_rep->elect_thread != NULL &&
269	    (ret = __repmgr_signal(&db_rep->check_election)) != 0)
270		goto unlock;
271
272	if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
273		goto unlock;
274
275	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
276		if (conn->blockers > 0 &&
277		    ((ret = __repmgr_signal(&conn->drained)) != 0))
278			goto unlock;
279	}
280	UNLOCK_MUTEX(db_rep->mutex);
281
282	return (__repmgr_wake_main_thread(env));
283
284unlock:
285	UNLOCK_MUTEX(db_rep->mutex);
286	return (ret);
287}
288
289static int
290__repmgr_await_threads(env)
291	ENV *env;
292{
293	DB_REP *db_rep;
294	REPMGR_RUNNABLE *messenger;
295	int ret, t_ret, i;
296
297	db_rep = env->rep_handle;
298	ret = 0;
299	if (db_rep->elect_thread != NULL) {
300		ret = __repmgr_thread_join(db_rep->elect_thread);
301		__os_free(env, db_rep->elect_thread);
302		db_rep->elect_thread = NULL;
303	}
304
305	for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
306		messenger = db_rep->messengers[i];
307		if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0)
308			ret = t_ret;
309		__os_free(env, messenger);
310		db_rep->messengers[i] = NULL; /* necessary? */
311	}
312	__os_free(env, db_rep->messengers);
313	db_rep->messengers = NULL;
314
315	if (db_rep->selector != NULL) {
316		if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 &&
317		    ret == 0)
318			ret = t_ret;
319		__os_free(env, db_rep->selector);
320		db_rep->selector = NULL;
321	}
322
323	return (ret);
324}
325
326/*
327 * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int,
328 * PUBLIC:     u_int32_t));
329 */
330int
331__repmgr_set_local_site(dbenv, host, port, flags)
332	DB_ENV *dbenv;
333	const char *host;
334	u_int port;
335	u_int32_t flags;
336{
337	ADDRINFO *address_list;
338	DB_REP *db_rep;
339	ENV *env;
340	repmgr_netaddr_t addr;
341	int locked, ret;
342
343	env = dbenv->env;
344
345	if (flags != 0)
346		return (__db_ferr(env, "DB_ENV->repmgr_set_local_site", 0));
347
348	db_rep = env->rep_handle;
349	if (db_rep->my_addr.port != 0) {
350		__db_errx(env, "Listen address already set");
351		return (EINVAL);
352	}
353
354	if (host == NULL) {
355		__db_errx(env,
356		    "repmgr_set_local_site: host name is required");
357		return (EINVAL);
358	}
359
360	if ((ret = __repmgr_getaddr(
361	    env, host, port, AI_PASSIVE, &address_list)) != 0)
362		return (ret);
363
364	if ((ret = __repmgr_pack_netaddr(env,
365	     host, port, address_list, &addr)) != 0) {
366		__os_freeaddrinfo(env, address_list);
367		return (ret);
368	}
369
370	if (REPMGR_SYNC_INITED(db_rep)) {
371		LOCK_MUTEX(db_rep->mutex);
372		locked = TRUE;
373	} else
374		locked = FALSE;
375
376	memcpy(&db_rep->my_addr, &addr, sizeof(addr));
377
378	if (locked)
379		UNLOCK_MUTEX(db_rep->mutex);
380	return (0);
381}
382
383/*
384 * If the application only calls this method from a single thread (e.g., during
385 * its initialization), it will avoid the problems with the non-thread-safe host
386 * name lookup.  In any case, if we relegate the blocking lookup to here it
387 * won't affect our select() loop.
388 *
389 * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int,
390 * PUBLIC: int *, u_int32_t));
391 */
392int
393__repmgr_add_remote_site(dbenv, host, port, eidp, flags)
394	DB_ENV *dbenv;
395	const char *host;
396	u_int port;
397	int *eidp;
398	u_int32_t flags;
399{
400	DB_REP *db_rep;
401	ENV *env;
402	REPMGR_SITE *site;
403	int eid, locked, ret;
404
405	env = dbenv->env;
406
407	if ((ret = __db_fchk(env,
408	    "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0)
409		return (ret);
410
411	if (host == NULL) {
412		__db_errx(env,
413		    "repmgr_add_remote_site: host name is required");
414		return (EINVAL);
415	}
416
417	db_rep = env->rep_handle;
418
419	if (REPMGR_SYNC_INITED(db_rep)) {
420		LOCK_MUTEX(db_rep->mutex);
421		locked = TRUE;
422	} else
423		locked = FALSE;
424
425	switch (ret = __repmgr_add_site(env, host, port, &site)) {
426	case 0:
427	case EEXIST:
428		ret = 0;
429		break;
430	default:
431		goto unlock;
432	}
433	eid = EID_FROM_SITE(site);
434
435	if (LF_ISSET(DB_REPMGR_PEER))
436		db_rep->peer = eid;
437	if (eidp != NULL)
438		*eidp = eid;
439
440unlock:	if (locked)
441		UNLOCK_MUTEX(db_rep->mutex);
442	return (ret);
443}
444