1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *));
15static int mismatch_err __P((const ENV *));
16static int __repmgr_await_threads __P((ENV *));
17
18/*
19 * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
20 */
21int
22__repmgr_start(dbenv, nthreads, flags)
23	DB_ENV *dbenv;
24	int nthreads;
25	u_int32_t flags;
26{
27	DBT my_addr;
28	DB_REP *db_rep;
29	REP *rep;
30	DB_THREAD_INFO *ip;
31	ENV *env;
32	REPMGR_RUNNABLE *messenger;
33	int i, is_listener, locked, need_masterseek, ret;
34
35	env = dbenv->env;
36	db_rep = env->rep_handle;
37
38	switch (flags) {
39	case DB_REP_CLIENT:
40	case DB_REP_ELECTION:
41	case DB_REP_MASTER:
42		break;
43	default:
44		__db_errx(env,
45		    "repmgr_start: unrecognized flags parameter value");
46		return (EINVAL);
47	}
48
49	ENV_REQUIRES_CONFIG_XX(
50	    env, rep_handle, "DB_ENV->repmgr_start", DB_INIT_REP);
51	if (!F_ISSET(env, ENV_THREAD)) {
52		__db_errx(env,
53		    "Replication Manager needs an environment with DB_THREAD");
54		return (EINVAL);
55	}
56
57	if (APP_IS_BASEAPI(env)) {
58		__db_errx(env,
59"DB_ENV->repmgr_start: cannot call from base replication application");
60		return (EINVAL);
61	}
62
63	/* Check that the required initialization has been done. */
64	if (db_rep->my_addr.host == NULL) {
65		__db_errx(env,
66		    "repmgr_set_local_site must be called before repmgr_start");
67		return (EINVAL);
68	}
69
70	if (db_rep->selector != NULL || db_rep->finished) {
71		__db_errx(env,
72		    "DB_ENV->repmgr_start may not be called more than once");
73		return (EINVAL);
74	}
75
76	/*
77	 * See if anyone else is already fulfilling the listener role.  If not,
78	 * we'll do so.
79	 */
80	rep = db_rep->region;
81	ENV_ENTER(env, ip);
82	MUTEX_LOCK(env, rep->mtx_repmgr);
83	if (rep->listener == 0) {
84		is_listener = TRUE;
85		__os_id(dbenv, &rep->listener, NULL);
86	} else {
87		is_listener = FALSE;
88		nthreads = 0;
89	}
90	MUTEX_UNLOCK(env, rep->mtx_repmgr);
91	ENV_LEAVE(env, ip);
92
93	/*
94	 * The minimum legal number of threads is either 1 or 0, depending upon
95	 * whether we're the main process or a subordinate.
96	 */
97	locked = FALSE;
98	if (nthreads < (is_listener ? 1 : 0)) {
99		__db_errx(env,
100		    "repmgr_start: nthreads parameter must be >= 1");
101		ret = EINVAL;
102		goto err;
103	}
104
105	if ((ret = __repmgr_init(env)) != 0)
106		goto err;
107	if (is_listener && (ret = __repmgr_listen(env)) != 0)
108		goto err;
109
110	/*
111	 * Make some sort of call to rep_start before starting other threads, to
112	 * ensure that incoming messages being processed always have a rep
113	 * context properly configured.  Note that in a way this is wasted, in
114	 * the sense that any messages that rep_start sends won't really go
115	 * anywhere, because we haven't started the select() thread yet, so we
116	 * don't yet really have any connections to any remote sites.  But
117	 * trying to do it the other way ends up requiring complicated code;
118	 * this way we know easily that by the time we receive a message, we've
119	 * already called rep_start, so it'll be legal to call
120	 * rep_process_message.
121	 *     Note that even if we're starting without recovery, we need a
122	 * rep_start call in case we're using leases.  Leases keep track of
123	 * rep_start calls even within an env region lifetime.
124	 */
125	if ((ret = __rep_set_transport_int(env, SELF_EID, __repmgr_send)) != 0)
126		goto err;
127	need_masterseek = FALSE;
128	if (!is_listener) {
129		/* Another process currently already listening in this env. */
130		db_rep->master_eid = rep->master_id;
131	} else if ((db_rep->init_policy = flags) == DB_REP_MASTER)
132		ret = __repmgr_become_master(env);
133	else {
134		if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
135			goto err;
136		ret = __rep_start_int(env, &my_addr, DB_REP_CLIENT);
137		__os_free(env, my_addr.data);
138
139		if (rep->master_id == DB_EID_INVALID ||
140		    rep->master_id == SELF_EID) {
141			need_masterseek = TRUE;
142		} else {
143			/*
144			 * Restarted without recovery.  Use existing known
145			 * master.
146			 */
147			db_rep->master_eid = rep->master_id;
148		}
149	}
150	if (ret != 0)
151		goto err;
152	if ((ret = __repmgr_start_selector(env)) != 0)
153		goto err;
154
155	if (is_listener) {
156		/*
157		 * Since these allocated memory blocks are used by other
158		 * threads, we have to be a bit careful about freeing them in
159		 * case of any errors.  __repmgr_await_threads (which we call in
160		 * the err: coda below) takes care of that.
161		 */
162		if ((ret = __os_calloc(env, (u_int)nthreads,
163		    sizeof(REPMGR_RUNNABLE *), &db_rep->messengers)) != 0)
164			goto err;
165		db_rep->nthreads = nthreads;
166
167		for (i = 0; i < nthreads; i++) {
168			if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE),
169			    &messenger)) != 0)
170				goto err;
171
172			messenger->env = env;
173			messenger->run = __repmgr_msg_thread;
174			if ((ret = __repmgr_thread_start(env,
175			    messenger)) != 0) {
176				__os_free(env, messenger);
177				goto err;
178			}
179			db_rep->messengers[i] = messenger;
180		}
181	}
182
183	if (need_masterseek) {
184		LOCK_MUTEX(db_rep->mutex);
185		locked = TRUE;
186		if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0)
187			goto err;
188		UNLOCK_MUTEX(db_rep->mutex);
189		locked = FALSE;
190	}
191
192	return (is_listener ? 0 : DB_REP_IGNORE);
193
194err:
195	/* If we couldn't succeed at everything, undo the parts we did do. */
196	if (locked)
197		UNLOCK_MUTEX(db_rep->mutex);
198	if (db_rep->selector != NULL) {
199		(void)__repmgr_stop_threads(env);
200		(void)__repmgr_await_threads(env);
201	}
202	LOCK_MUTEX(db_rep->mutex);
203	(void)__repmgr_net_close(env);
204	if (REPMGR_INITED(db_rep))
205		(void)__repmgr_deinit(env);
206	UNLOCK_MUTEX(db_rep->mutex);
207	return (ret);
208}
209
210/*
211 * PUBLIC: int __repmgr_autostart __P((ENV *));
212 *
213 * Preconditions: rep_start() has been called; we're within an ENV_ENTER.
214 *     Because of this, we mustn't call __rep_set_transport(), but rather we
215 *     poke in send() function address manually.
216 */
217int
218__repmgr_autostart(env)
219	ENV *env;
220{
221	DB_REP *db_rep;
222	int ret;
223
224	db_rep = env->rep_handle;
225
226	DB_ASSERT(env, REP_ON(env));
227	LOCK_MUTEX(db_rep->mutex);
228
229	if (REPMGR_INITED(db_rep))
230		ret = 0;
231	else
232		ret = __repmgr_init(env);
233	if (ret != 0)
234		goto out;
235
236	RPRINT(env, DB_VERB_REPMGR_MISC,
237	    (env, "Automatically joining existing repmgr env"));
238
239	db_rep->send = __repmgr_send;
240
241	if (db_rep->selector == NULL && !db_rep->finished)
242		ret = __repmgr_start_selector(env);
243
244out:
245	UNLOCK_MUTEX(db_rep->mutex);
246	return (ret);
247}
248
249/*
250 * PUBLIC: int __repmgr_start_selector __P((ENV *));
251 */
252int
253__repmgr_start_selector(env)
254	ENV *env;
255{
256	DB_REP *db_rep;
257	REPMGR_RUNNABLE *selector;
258	int ret;
259
260	db_rep = env->rep_handle;
261	if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), &selector))
262	    != 0)
263		return (ret);
264	selector->env = env;
265	selector->run = __repmgr_select_thread;
266
267	/*
268	 * In case the select thread ever examines db_rep->selector, set it
269	 * before starting the thread (since once we create it we could be
270	 * racing with it).
271	 */
272	db_rep->selector = selector;
273	if ((ret = __repmgr_thread_start(env, selector)) != 0) {
274		__db_err(env, ret, "can't start selector thread");
275		__os_free(env, selector);
276		db_rep->selector = NULL;
277		return (ret);
278	}
279
280	return (0);
281}
282
283/*
284 * PUBLIC: int __repmgr_close __P((ENV *));
285 */
286int
287__repmgr_close(env)
288	ENV *env;
289{
290	DB_REP *db_rep;
291	int ret, t_ret;
292
293	ret = 0;
294	db_rep = env->rep_handle;
295	if (db_rep->selector != NULL) {
296		RPRINT(env, DB_VERB_REPMGR_MISC,
297		    (env, "Stopping repmgr threads"));
298		ret = __repmgr_stop_threads(env);
299		if ((t_ret = __repmgr_await_threads(env)) != 0 && ret == 0)
300			ret = t_ret;
301		RPRINT(env, DB_VERB_REPMGR_MISC,
302		    (env, "Repmgr threads are finished"));
303	}
304
305	if ((t_ret = __repmgr_net_close(env)) != 0 && ret == 0)
306		ret = t_ret;
307
308	if ((t_ret = __repmgr_deinit(env)) != 0 && ret == 0)
309		ret = t_ret;
310
311	return (ret);
312}
313
314/*
315 * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int));
316 */
317int
318__repmgr_set_ack_policy(dbenv, policy)
319	DB_ENV *dbenv;
320	int policy;
321{
322	DB_REP *db_rep;
323	ENV *env;
324
325	env = dbenv->env;
326	db_rep = env->rep_handle;
327
328	ENV_NOT_CONFIGURED(
329	    env, db_rep->region, "DB_ENV->repmgr_set_ack_policy", DB_INIT_REP);
330
331	if (APP_IS_BASEAPI(env)) {
332		__db_errx(env, "%s %s", "DB_ENV->repmgr_set_ack_policy:",
333		    "cannot call from base replication application");
334		return (EINVAL);
335	}
336
337	switch (policy) {
338	case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */
339	case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */
340	case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */
341	case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */
342	case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */
343	case DB_REPMGR_ACKS_QUORUM:
344		env->rep_handle->perm_policy = policy;
345		/*
346		 * Setting an ack policy makes this a replication manager
347		 * application.
348		 */
349		APP_SET_REPMGR(env);
350		return (0);
351	default:
352		__db_errx(env,
353		    "unknown ack_policy in DB_ENV->repmgr_set_ack_policy");
354		return (EINVAL);
355	}
356}
357
358/*
359 * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *));
360 */
361int
362__repmgr_get_ack_policy(dbenv, policy)
363	DB_ENV *dbenv;
364	int *policy;
365{
366	DB_REP *db_rep;
367	ENV *env;
368
369	env = dbenv->env;
370	db_rep = env->rep_handle;
371
372	ENV_NOT_CONFIGURED(
373	    env, db_rep->region, "DB_ENV->repmgr_get_ack_policy", DB_INIT_REP);
374
375	*policy = env->rep_handle->perm_policy;
376	return (0);
377}
378
379/*
380 * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *));
381 */
382int
383__repmgr_env_create(env, db_rep)
384	ENV *env;
385	DB_REP *db_rep;
386{
387	COMPQUIET(env, NULL);
388
389	/* Set some default values. */
390	db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
391	db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
392	db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
393	db_rep->config_nsites = 0;
394	db_rep->peer = DB_EID_INVALID;
395	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
396
397	db_rep->listen_fd = INVALID_SOCKET;
398	db_rep->master_eid = DB_EID_INVALID;
399	TAILQ_INIT(&db_rep->connections);
400	TAILQ_INIT(&db_rep->retries);
401
402	db_rep->input_queue.size = 0;
403	STAILQ_INIT(&db_rep->input_queue.header);
404
405	__repmgr_env_create_pf(db_rep);
406
407	return (0);
408}
409
410/*
411 * PUBLIC: void __repmgr_env_destroy __P((ENV *, DB_REP *));
412 */
413void
414__repmgr_env_destroy(env, db_rep)
415	ENV *env;
416	DB_REP *db_rep;
417{
418	__repmgr_queue_destroy(env);
419	__repmgr_net_destroy(env, db_rep);
420	if (db_rep->messengers != NULL) {
421		__os_free(env, db_rep->messengers);
422		db_rep->messengers = NULL;
423	}
424}
425
426/*
427 * PUBLIC: int __repmgr_stop_threads __P((ENV *));
428 */
429int
430__repmgr_stop_threads(env)
431	ENV *env;
432{
433	DB_REP *db_rep;
434	int ret;
435
436	db_rep = env->rep_handle;
437
438	/*
439	 * Hold mutex for the purpose of waking up threads, but then get out of
440	 * the way to let them clean up and exit.
441	 */
442	LOCK_MUTEX(db_rep->mutex);
443	db_rep->finished = TRUE;
444	if (db_rep->elect_thread != NULL &&
445	    (ret = __repmgr_signal(&db_rep->check_election)) != 0)
446		goto unlock;
447
448	if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
449		goto unlock;
450
451	if ((ret = __repmgr_each_connection(env,
452	    kick_blockers, NULL, TRUE)) != 0)
453		goto unlock;
454	UNLOCK_MUTEX(db_rep->mutex);
455
456	return (__repmgr_wake_main_thread(env));
457
458unlock:
459	UNLOCK_MUTEX(db_rep->mutex);
460	return (ret);
461}
462
463static int
464kick_blockers(env, conn, unused)
465	ENV *env;
466	REPMGR_CONNECTION *conn;
467	void *unused;
468{
469	COMPQUIET(env, NULL);
470	COMPQUIET(unused, NULL);
471
472	return (conn->blockers > 0 ? __repmgr_signal(&conn->drained) : 0);
473}
474
475static int
476__repmgr_await_threads(env)
477	ENV *env;
478{
479	DB_REP *db_rep;
480	REPMGR_RUNNABLE *messenger;
481	int ret, t_ret, i;
482
483	db_rep = env->rep_handle;
484	ret = 0;
485	if (db_rep->elect_thread != NULL) {
486		ret = __repmgr_thread_join(db_rep->elect_thread);
487		__os_free(env, db_rep->elect_thread);
488		db_rep->elect_thread = NULL;
489	}
490
491	for (i = 0;
492	    i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
493		messenger = db_rep->messengers[i];
494		if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0)
495			ret = t_ret;
496		__os_free(env, messenger);
497	}
498	__os_free(env, db_rep->messengers);
499	db_rep->messengers = NULL;
500
501	if (db_rep->selector != NULL) {
502		if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 &&
503		    ret == 0)
504			ret = t_ret;
505		__os_free(env, db_rep->selector);
506		db_rep->selector = NULL;
507	}
508
509	return (ret);
510}
511
512/*
513 * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int,
514 * PUBLIC:     u_int32_t));
515 */
516int
517__repmgr_set_local_site(dbenv, host, port, flags)
518	DB_ENV *dbenv;
519	const char *host;
520	u_int port;
521	u_int32_t flags;
522{
523	DB_REP *db_rep;
524	DB_THREAD_INFO *ip;
525	ENV *env;
526	REGENV *renv;
527	REGINFO *infop;
528	REP *rep;
529	repmgr_netaddr_t addr;
530	char *myhost;
531	int locked, ret;
532
533	env = dbenv->env;
534	db_rep = env->rep_handle;
535
536	ENV_NOT_CONFIGURED(
537	    env, db_rep->region, "DB_ENV->repmgr_set_local_site", DB_INIT_REP);
538
539	if (APP_IS_BASEAPI(env)) {
540		__db_errx(env, "%s %s", "DB_ENV->repmgr_set_local_site:",
541		    "cannot call from base replication application");
542		return (EINVAL);
543	}
544
545	if (db_rep->selector != NULL) {
546		__db_errx(env,
547"DB_ENV->repmgr_set_local_site: must be called before DB_ENV->repmgr_start");
548		return (EINVAL);
549	}
550
551	if (flags != 0)
552		return (__db_ferr(env, "DB_ENV->repmgr_set_local_site", 0));
553
554	if (host == NULL || port == 0) {
555		__db_errx(env,
556		    "repmgr_set_local_site: host name and port (>0) required");
557		return (EINVAL);
558	}
559
560	/*
561	 * If the local site address hasn't already been set, just set it from
562	 * the given inputs.  If it has, all we do is verify that it matches
563	 * what had already been set previously.
564	 *
565	 * Do this in the shared region if we have one, or else just in the
566	 * local handle.
567	 *
568	 * In either case, don't perturb global structures until we're sure
569	 * everything will succeed.
570	 */
571	COMPQUIET(rep, NULL);
572	COMPQUIET(ip, NULL);
573	COMPQUIET(renv, NULL);
574	locked = FALSE;
575	ret = 0;
576	if (REP_ON(env)) {
577		rep = db_rep->region;
578		ENV_ENTER(env, ip);
579		MUTEX_LOCK(env, rep->mtx_repmgr);
580
581		infop = env->reginfo;
582		renv = infop->primary;
583		MUTEX_LOCK(env, renv->mtx_regenv);
584		locked = TRUE;
585		if (rep->my_addr.host == INVALID_ROFF) {
586			if ((ret = __repmgr_pack_netaddr(env,
587			    host, port, NULL, &addr)) != 0)
588				goto unlock;
589
590			if ((ret = __env_alloc(infop,
591			    strlen(host)+1, &myhost)) == 0) {
592				(void)strcpy(myhost, host);
593				rep->my_addr.host = R_OFFSET(infop, myhost);
594				rep->my_addr.port = port;
595			} else {
596				__repmgr_cleanup_netaddr(env, &addr);
597				goto unlock;
598			}
599			memcpy(&db_rep->my_addr, &addr, sizeof(addr));
600			rep->siteaddr_seq++;
601		} else {
602			myhost = R_ADDR(infop, rep->my_addr.host);
603			if (strcmp(myhost, host) != 0 ||
604			    port != rep->my_addr.port) {
605				ret = mismatch_err(env);
606				goto unlock;
607			}
608		}
609	} else {
610		if (db_rep->my_addr.host == NULL) {
611			if ((ret = __repmgr_pack_netaddr(env,
612			    host, port, NULL, &db_rep->my_addr)) != 0)
613				goto unlock;
614		} else if (strcmp(host, db_rep->my_addr.host) != 0 ||
615		    port != db_rep->my_addr.port) {
616			ret = mismatch_err(env);
617			goto unlock;
618		}
619	}
620
621unlock:
622	if (locked) {
623		MUTEX_UNLOCK(env, renv->mtx_regenv);
624		MUTEX_UNLOCK(env, rep->mtx_repmgr);
625		ENV_LEAVE(env, ip);
626	}
627	/*
628	 * Setting a local site makes this a replication manager application.
629	 */
630	if (ret == 0)
631		APP_SET_REPMGR(env);
632	return (ret);
633}
634
635static int
636mismatch_err(env)
637	const ENV *env;
638{
639	__db_errx(env, "A (different) local site address has already been set");
640	return (EINVAL);
641}
642
643/*
644 * If the application only calls this method from a single thread (e.g., during
645 * its initialization), it will avoid the problems with the non-thread-safe host
646 * name lookup.  In any case, if we relegate the blocking lookup to here it
647 * won't affect our select() loop.
648 *
649 * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int,
650 * PUBLIC: int *, u_int32_t));
651 */
652int
653__repmgr_add_remote_site(dbenv, host, port, eidp, flags)
654	DB_ENV *dbenv;
655	const char *host;
656	u_int port;
657	int *eidp;
658	u_int32_t flags;
659{
660	DB_REP *db_rep;
661	ENV *env;
662	REPMGR_SITE *site;
663	int eid, locked, ret;
664
665	env = dbenv->env;
666	db_rep = env->rep_handle;
667	locked = FALSE;
668	ret = 0;
669
670	ENV_NOT_CONFIGURED(
671	    env, db_rep->region, "DB_ENV->repmgr_add_remote_site", DB_INIT_REP);
672
673	if (APP_IS_BASEAPI(env)) {
674		__db_errx(env, "%s %s", "DB_ENV->repmgr_add_remote_site:",
675		    "cannot call from base replication application");
676		return (EINVAL);
677	}
678
679	if ((ret = __db_fchk(env,
680	    "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0)
681		return (ret);
682
683	if (host == NULL) {
684		__db_errx(env,
685		    "repmgr_add_remote_site: host name is required");
686		return (EINVAL);
687	}
688
689	if (REP_ON(env)) {
690		LOCK_MUTEX(db_rep->mutex);
691		locked = TRUE;
692
693		ret = __repmgr_add_site(env, host, port, &site, flags);
694		if (ret == EEXIST) {
695			/*
696			 * With NEWSITE messages arriving at any time, it would
697			 * be impractical for applications to avoid this.  Also
698			 * this provides a way they can still set peer.
699			 */
700			ret = 0;
701		}
702		if (ret != 0)
703			goto out;
704		eid = EID_FROM_SITE(site);
705		if (eidp != NULL)
706			*eidp = eid;
707	} else {
708		if ((site = __repmgr_find_site(env, host, port)) == NULL &&
709		    (ret = __repmgr_new_site(env,
710		    &site, host, port, SITE_IDLE)) != 0)
711			goto out;
712		eid = EID_FROM_SITE(site);
713
714		/*
715		 * Set provisional EID of peer; may be adjusted at env open/join
716		 * time.
717		 */
718		if (LF_ISSET(DB_REPMGR_PEER))
719			db_rep->peer = eid;
720	}
721
722out:
723	if (locked)
724		UNLOCK_MUTEX(db_rep->mutex);
725	/*
726	 * Adding a remote site makes this a replication manager application.
727	 */
728	if (ret == 0)
729		APP_SET_REPMGR(env);
730	return (ret);
731}
732