1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_method.c,v 12.122 2008/04/28 18:18:36 sue Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/log.h"
15#include "dbinc/mp.h"
16#include "dbinc/txn.h"
17
18static int  __rep_abort_prepared __P((ENV *));
19static int  __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
20static void __rep_config_map __P((ENV *, u_int32_t *, u_int32_t *));
21static u_int32_t __rep_conv_vers __P((ENV *, u_int32_t));
22static int  __rep_restore_prepared __P((ENV *));
23
24/*
25 * __rep_env_create --
26 *	Replication-specific initialization of the ENV structure.
27 *
28 * PUBLIC: int __rep_env_create __P((DB_ENV *));
29 */
30int
31__rep_env_create(dbenv)
32	DB_ENV *dbenv;
33{
34	DB_REP *db_rep;
35	ENV *env;
36	int ret;
37
38	env = dbenv->env;
39
40	if ((ret = __os_calloc(env, 1, sizeof(DB_REP), &db_rep)) != 0)
41		return (ret);
42
43	db_rep->eid = DB_EID_INVALID;
44	db_rep->bytes = REP_DEFAULT_THROTTLE;
45	DB_TIMEOUT_TO_TIMESPEC(DB_REP_REQUEST_GAP, &db_rep->request_gap);
46	DB_TIMEOUT_TO_TIMESPEC(DB_REP_MAX_GAP, &db_rep->max_gap);
47	db_rep->elect_timeout = 2 * US_PER_SEC;			/*  2 seconds */
48	db_rep->chkpt_delay = 30 * US_PER_SEC;			/* 30 seconds */
49	db_rep->my_priority = DB_REP_DEFAULT_PRIORITY;
50	/*
51	 * Make no clock skew the default.  Setting both fields
52	 * to the same non-zero value means no skew.
53	 */
54	db_rep->clock_skew = 1;
55	db_rep->clock_base = 1;
56
57#ifdef HAVE_REPLICATION_THREADS
58	if ((ret = __repmgr_env_create(env, db_rep)) != 0) {
59		__os_free(env, db_rep);
60		return (ret);
61	}
62#endif
63
64	env->rep_handle = db_rep;
65	return (0);
66}
67
68/*
69 * __rep_env_destroy --
70 *	Replication-specific destruction of the ENV structure.
71 *
72 * PUBLIC: void __rep_env_destroy __P((DB_ENV *));
73 */
74void
75__rep_env_destroy(dbenv)
76	DB_ENV *dbenv;
77{
78	ENV *env;
79
80	env = dbenv->env;
81
82	if (env->rep_handle != NULL) {
83#ifdef HAVE_REPLICATION_THREADS
84		__repmgr_env_destroy(env, env->rep_handle);
85#endif
86		__os_free(env, env->rep_handle);
87		env->rep_handle = NULL;
88	}
89}
90
91/*
92 * __rep_get_config --
93 *	Return the replication subsystem configuration.
94 *
95 * PUBLIC: int __rep_get_config __P((DB_ENV *, u_int32_t, int *));
96 */
97int
98__rep_get_config(dbenv, which, onp)
99	DB_ENV *dbenv;
100	u_int32_t which;
101	int *onp;
102{
103	DB_REP *db_rep;
104	ENV *env;
105	REP *rep;
106	u_int32_t mapped;
107
108	env = dbenv->env;
109
110#undef	OK_FLAGS
111#define	OK_FLAGS							\
112	(DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT |			\
113	DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT)
114
115	if (FLD_ISSET(which, ~OK_FLAGS))
116		return (__db_ferr(env, "DB_ENV->rep_get_config", 0));
117
118	db_rep = env->rep_handle;
119	ENV_NOT_CONFIGURED(
120	    env, db_rep->region, "DB_ENV->rep_get_config", DB_INIT_REP);
121
122	mapped = 0;
123	__rep_config_map(env, &which, &mapped);
124	if (REP_ON(env)) {
125		rep = db_rep->region;
126		if (FLD_ISSET(rep->config, mapped))
127			*onp = 1;
128		else
129			*onp = 0;
130	} else {
131		if (FLD_ISSET(db_rep->config, mapped))
132			*onp = 1;
133		else
134			*onp = 0;
135	}
136	return (0);
137}
138
139/*
140 * __rep_set_config --
141 *	Configure the replication subsystem.
142 *
143 * PUBLIC: int __rep_set_config __P((DB_ENV *, u_int32_t, int));
144 */
145int
146__rep_set_config(dbenv, which, on)
147	DB_ENV *dbenv;
148	u_int32_t which;
149	int on;
150{
151	DB_LOG *dblp;
152	DB_REP *db_rep;
153	DB_THREAD_INFO *ip;
154	ENV *env;
155	LOG *lp;
156	REP *rep;
157	REP_BULK bulk;
158	u_int32_t mapped, orig;
159	int ret;
160
161	env = dbenv->env;
162	db_rep = env->rep_handle;
163	ret = 0;
164
165#undef	OK_FLAGS
166#define	OK_FLAGS							\
167    (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_LEASE |	\
168    DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT | DB_REPMGR_CONF_2SITE_STRICT)
169
170	ENV_NOT_CONFIGURED(
171	    env, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP);
172
173	if (FLD_ISSET(which, ~OK_FLAGS))
174		return (__db_ferr(env, "DB_ENV->rep_set_config", 0));
175
176	mapped = 0;
177	__rep_config_map(env, &which, &mapped);
178
179	if (REP_ON(env)) {
180		ENV_ENTER(env, ip);
181
182		rep = db_rep->region;
183		/*
184		 * Leases must be turned on before calling rep_start.
185		 * Leases can never be turned off once they're turned on.
186		 */
187		if (FLD_ISSET(mapped, REP_C_LEASE)) {
188			if (F_ISSET(rep, REP_F_START_CALLED)) {
189				__db_errx(env,
190"DB_ENV->rep_set_config: leases must be configured before DB_ENV->rep_start");
191				ret = EINVAL;
192			}
193			if (on == 0) {
194				__db_errx(env,
195	"DB_ENV->rep_set_config: leases cannot be turned off");
196				ret = EINVAL;
197			}
198			if (ret != 0)
199				return (ret);
200		}
201		MUTEX_LOCK(env, rep->mtx_clientdb);
202		REP_SYSTEM_LOCK(env);
203		orig = rep->config;
204		if (on)
205			FLD_SET(rep->config, mapped);
206		else
207			FLD_CLR(rep->config, mapped);
208
209		/*
210		 * Bulk transfer requires special processing if it is getting
211		 * toggled.
212		 */
213		dblp = env->lg_handle;
214		lp = dblp->reginfo.primary;
215		if (FLD_ISSET(rep->config, REP_C_BULK) &&
216		    !FLD_ISSET(orig, REP_C_BULK))
217			db_rep->bulk = R_ADDR(&dblp->reginfo, lp->bulk_buf);
218		REP_SYSTEM_UNLOCK(env);
219
220		/*
221		 * If turning bulk off and it was on, send out whatever is in
222		 * the buffer already.
223		 */
224		if (FLD_ISSET(orig, REP_C_BULK) &&
225		    !FLD_ISSET(rep->config, REP_C_BULK) && lp->bulk_off != 0) {
226			memset(&bulk, 0, sizeof(bulk));
227			if (db_rep->bulk == NULL)
228				bulk.addr =
229				    R_ADDR(&dblp->reginfo, lp->bulk_buf);
230			else
231				bulk.addr = db_rep->bulk;
232			bulk.offp = &lp->bulk_off;
233			bulk.len = lp->bulk_len;
234			bulk.type = REP_BULK_LOG;
235			bulk.eid = DB_EID_BROADCAST;
236			bulk.flagsp = &lp->bulk_flags;
237			ret = __rep_send_bulk(env, &bulk, 0);
238		}
239		MUTEX_UNLOCK(env, rep->mtx_clientdb);
240
241		ENV_LEAVE(env, ip);
242	} else {
243		if (on)
244			FLD_SET(db_rep->config, mapped);
245		else
246			FLD_CLR(db_rep->config, mapped);
247	}
248	return (ret);
249}
250
251static void
252__rep_config_map(env, inflagsp, outflagsp)
253	ENV *env;
254	u_int32_t *inflagsp, *outflagsp;
255{
256	COMPQUIET(env, NULL);
257
258	if (FLD_ISSET(*inflagsp, DB_REP_CONF_BULK)) {
259		FLD_SET(*outflagsp, REP_C_BULK);
260		FLD_CLR(*inflagsp, DB_REP_CONF_BULK);
261	}
262	if (FLD_ISSET(*inflagsp, DB_REP_CONF_DELAYCLIENT)) {
263		FLD_SET(*outflagsp, REP_C_DELAYCLIENT);
264		FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT);
265	}
266	if (FLD_ISSET(*inflagsp, DB_REP_CONF_LEASE)) {
267		FLD_SET(*outflagsp, REP_C_LEASE);
268		FLD_CLR(*inflagsp, DB_REP_CONF_LEASE);
269	}
270	if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOAUTOINIT)) {
271		FLD_SET(*outflagsp, REP_C_NOAUTOINIT);
272		FLD_CLR(*inflagsp, DB_REP_CONF_NOAUTOINIT);
273	}
274	if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOWAIT)) {
275		FLD_SET(*outflagsp, REP_C_NOWAIT);
276		FLD_CLR(*inflagsp, DB_REP_CONF_NOWAIT);
277	}
278	if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT)) {
279		FLD_SET(*outflagsp, REP_C_2SITE_STRICT);
280		FLD_CLR(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT);
281	}
282}
283
284/*
285 * __rep_start --
286 *	Become a master or client, and start sending messages to participate
287 * in the replication environment.  Must be called after the environment
288 * is open.
289 *
290 * We must protect rep_start, which may change the world, with the rest
291 * of the DB library.  Each API interface will count itself as it enters
292 * the library.  Rep_start checks the following:
293 *
294 * rep->msg_th - this is the count of threads currently in rep_process_message
295 * rep->handle_cnt - number of threads actively using a dbp in library.
296 * rep->txn_cnt - number of active txns.
297 * REP_F_READY_* - Replication flag that indicates that we wish to run
298 * recovery, and want to prohibit new transactions from entering and cause
299 * existing ones to return immediately (with a DB_LOCK_DEADLOCK error).
300 *
301 * There is also the renv->rep_timestamp which is updated whenever significant
302 * events (i.e., new masters, log rollback, etc).  Upon creation, a handle
303 * is associated with the current timestamp.  Each time a handle enters the
304 * library it must check if the handle timestamp is the same as the one
305 * stored in the replication region.  This prevents the use of handles on
306 * clients that reference non-existent files whose creation was backed out
307 * during a synchronizing recovery.
308 *
309 * PUBLIC: int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
310 */
311int
312__rep_start(dbenv, dbt, flags)
313	DB_ENV *dbenv;
314	DBT *dbt;
315	u_int32_t flags;
316{
317	DB *dbp;
318	DB_LOG *dblp;
319	DB_LSN lsn;
320	DB_REP *db_rep;
321	DB_THREAD_INFO *ip;
322	DB_TXNREGION *region;
323	ENV *env;
324	LOG *lp;
325	REGINFO *infop;
326	REP *rep;
327	db_timeout_t tmp;
328	u_int32_t oldvers, pending_event, repflags, role;
329	int announce, locked, ret, role_chg;
330	int t_ret;
331
332	env = dbenv->env;
333
334	ENV_REQUIRES_CONFIG_XX(
335	    env, rep_handle, "DB_ENV->rep_start", DB_INIT_REP);
336
337	db_rep = env->rep_handle;
338	rep = db_rep->region;
339	infop = env->reginfo;
340	locked = 0;
341	pending_event = DB_EVENT_NO_SUCH_EVENT;
342
343	switch (role = LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) {
344	case DB_REP_CLIENT:
345	case DB_REP_MASTER:
346		break;
347	default:
348		__db_errx(env,
349	"DB_ENV->rep_start: must specify DB_REP_CLIENT or DB_REP_MASTER");
350		return (EINVAL);
351	}
352
353	/* We need a transport function. */
354	if (db_rep->send == NULL) {
355		__db_errx(env,
356    "DB_ENV->rep_set_transport must be called before DB_ENV->rep_start");
357		return (EINVAL);
358	}
359
360	/*
361	 * If we're using master leases, check that all needed
362	 * setup has been done.
363	 */
364	if (IS_USING_LEASES(env) && rep->lease_timeout == 0) {
365		__db_errx(env,
366"DB_ENV->rep_start: must call DB_ENV->rep_set_timeout for leases first");
367		return (EINVAL);
368	}
369
370	ENV_ENTER(env, ip);
371
372	/*
373	 * In order to correctly check log files for old versions, we
374	 * need to flush the logs.
375	 */
376	if ((ret = __log_flush(env, NULL)) != 0)
377		goto out;
378
379	REP_SYSTEM_LOCK(env);
380	/*
381	 * We only need one thread to start-up replication, so if
382	 * there is another thread in rep_start, we'll let it finish
383	 * its work and have this thread simply return.  Similarly,
384	 * if a thread is in a critical lockout section we return.
385	 */
386	if (F_ISSET(rep, REP_F_READY_MSG)) {
387		/*
388		 * There is already someone in lockout.  Return.
389		 */
390		RPRINT(env, DB_VERB_REP_MISC,
391		    (env, "Thread already in lockout"));
392		REP_SYSTEM_UNLOCK(env);
393		goto out;
394	} else if ((ret = __rep_lockout_msg(env, rep, 0)) != 0)
395		goto errunlock;
396
397	role_chg = (!F_ISSET(rep, REP_F_MASTER) && role == DB_REP_MASTER) ||
398	    (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT);
399
400	/*
401	 * Wait for any active txns or mpool ops to complete, and
402	 * prevent any new ones from occurring, only if we're
403	 * changing roles.
404	 */
405	if (role_chg) {
406		if ((ret = __rep_lockout_api(env, rep)) != 0)
407			goto errunlock;
408		locked = 1;
409	}
410
411	dblp = env->lg_handle;
412	lp = dblp->reginfo.primary;
413	if (role == DB_REP_MASTER) {
414		if (role_chg) {
415			/*
416			 * If we're upgrading from having been a client,
417			 * preclose, so that we close our temporary database
418			 * and any files we opened while doing a rep_apply.
419			 * If we don't we can infinitely leak file ids if
420			 * the master crashed with files open (the likely
421			 * case).  If we don't close them we can run into
422			 * problems if we try to remove that file or long
423			 * running applications end up with an unbounded
424			 * number of used fileids, each getting written
425			 * on checkpoint.  Just close them.
426			 * Then invalidate all files open in the logging
427			 * region.  These are files open by other processes
428			 * attached to the environment.  They must be
429			 * closed by the other processes when they notice
430			 * the change in role.
431			 */
432			if ((ret = __rep_preclose(env)) != 0)
433				goto errunlock;
434
435			rep->gen++;
436			/*
437			 * There could have been any number of failed
438			 * elections, so jump the gen if we need to now.
439			 */
440			if (rep->egen > rep->gen)
441				rep->gen = rep->egen;
442			if (IS_USING_LEASES(env) &&
443			    !F_ISSET(rep, REP_F_MASTERELECT)) {
444				__db_errx(env,
445    "rep_start: Cannot become master without being elected when using leases.");
446				ret = EINVAL;
447				goto errunlock;
448			}
449			if (F_ISSET(rep, REP_F_MASTERELECT)) {
450				__rep_elect_done(env, rep, 0);
451				F_CLR(rep, REP_F_MASTERELECT);
452			}
453			if (rep->egen <= rep->gen)
454				rep->egen = rep->gen + 1;
455			RPRINT(env, DB_VERB_REP_MISC, (env,
456			    "New master gen %lu, egen %lu",
457			    (u_long)rep->gen, (u_long)rep->egen));
458			if ((ret = __rep_write_gen(env, rep->gen)) != 0)
459				goto errunlock;
460		}
461		/*
462		 * Set lease duration assuming clients have faster clock.
463		 * Master needs to compensate so that clients do not
464		 * expire their grant while the master thinks it is valid.
465		 */
466		if (IS_USING_LEASES(env) &&
467		    (role_chg || !F_ISSET(rep, REP_F_START_CALLED))) {
468			/*
469			 * If we have already granted our lease, we
470			 * cannot become master.
471			 */
472			if ((ret = __rep_islease_granted(env))) {
473				__db_errx(env,
474    "rep_start: Cannot become master with outstanding lease granted.");
475				ret = EINVAL;
476				goto errunlock;
477			}
478			/*
479			 * Simply compute the larger ratio for the lease.
480			 */
481			tmp = (db_timeout_t)((double)rep->lease_timeout /
482			    ((double)rep->clock_skew /
483			    (double)rep->clock_base));
484			DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration);
485			/*
486			 * Keep track of last perm LSN on master for
487			 * lease refresh.
488			 */
489			INIT_LSN(lp->max_perm_lsn);
490			if ((ret = __rep_lease_table_alloc(env,
491			    rep->nsites)) != 0)
492				goto errunlock;
493		}
494		rep->master_id = rep->eid;
495
496		/*
497		 * Clear out almost everything, and then set MASTER.  Leave
498		 * READY_* alone in case we did a lockout above;
499		 * we'll clear it in a moment (below), once we've written
500		 * the txn_recycle into the log.
501		 */
502		repflags = F_ISSET(rep, REP_F_READY_API | REP_F_READY_MSG |
503		    REP_F_READY_OP);
504#ifdef	DIAGNOSTIC
505		if (!F_ISSET(rep, REP_F_GROUP_ESTD))
506			RPRINT(env, DB_VERB_REP_MISC, (env,
507			    "Establishing group as master."));
508#endif
509		FLD_SET(repflags, REP_F_MASTER | REP_F_GROUP_ESTD);
510		rep->flags = repflags;
511
512		/*
513		 * We're master.  Set the versions to the current ones.
514		 */
515		oldvers = lp->persist.version;
516		/*
517		 * If we're moving forward to the current version, we need
518		 * to force the log file to advance and reset the
519		 * recovery table since it contains pointers to old
520		 * recovery functions.
521		 */
522		RPRINT(env, DB_VERB_REP_MISC, (env,
523		    "rep_start: Old log version was %lu", (u_long)oldvers));
524		if (lp->persist.version != DB_LOGVERSION) {
525			if ((ret = __env_init_rec(env, DB_LOGVERSION)) != 0)
526				goto errunlock;
527		}
528		rep->version = DB_REPVERSION;
529		F_CLR(rep, REP_F_READY_MSG);
530		REP_SYSTEM_UNLOCK(env);
531		LOG_SYSTEM_LOCK(env);
532		lsn = lp->lsn;
533		LOG_SYSTEM_UNLOCK(env);
534
535		/*
536		 * Send the NEWMASTER message first so that clients know
537		 * subsequent messages are coming from the right master.
538		 * We need to perform all actions below no matter what
539		 * regarding errors.
540		 */
541		(void)__rep_send_message(env,
542		    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
543		ret = 0;
544		if (role_chg) {
545			pending_event = DB_EVENT_REP_MASTER;
546			/*
547			 * If prepared transactions have not been restored
548			 * look to see if there are any.  If there are,
549			 * then mark the open files, otherwise close them.
550			 */
551			region = env->tx_handle->reginfo.primary;
552			if (region->stat.st_nrestores == 0 &&
553			    (t_ret = __rep_restore_prepared(env)) != 0 &&
554			    ret == 0)
555				ret = t_ret;
556			if (region->stat.st_nrestores != 0) {
557			    if ((t_ret = __dbreg_mark_restored(env)) != 0 &&
558				    ret == 0)
559					ret = t_ret;
560			} else {
561				ret = __dbreg_invalidate_files(env, 0);
562				if ((t_ret = __rep_closefiles(
563				    env, 0)) != 0 && ret == 0)
564					ret = t_ret;
565			}
566			if ((t_ret = __txn_recycle_id(env)) != 0 && ret == 0)
567				ret = t_ret;
568			REP_SYSTEM_LOCK(env);
569			F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
570			locked = 0;
571			REP_SYSTEM_UNLOCK(env);
572			(void)__memp_set_config(
573			    dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
574		}
575	} else {
576		announce = role_chg || rep->master_id == DB_EID_INVALID;
577
578		if (role_chg)
579			rep->master_id = DB_EID_INVALID;
580		/* Zero out everything except recovery and tally flags. */
581		repflags = F_ISSET(rep, REP_F_NOARCHIVE | REP_F_READY_MSG |
582		    REP_F_RECOVER_MASK | REP_F_TALLY);
583		FLD_SET(repflags, REP_F_CLIENT);
584		if (role_chg) {
585			if ((ret = __log_get_oldversion(env, &oldvers)) != 0)
586				goto errunlock;
587			RPRINT(env, DB_VERB_REP_MISC, (env,
588			    "rep_start: Found old version log %d", oldvers));
589			if (oldvers >= DB_LOGVERSION_MIN) {
590				__log_set_version(env, oldvers);
591				oldvers = __rep_conv_vers(env, oldvers);
592				DB_ASSERT(
593				    env, oldvers != DB_REPVERSION_INVALID);
594				rep->version = oldvers;
595			}
596		}
597		rep->flags = repflags;
598		/*
599		 * On a client, compute the lease duration on the
600		 * assumption that the client has a fast clock.
601		 * Expire any existing leases we might have held as
602		 * a master.
603		 */
604		if (IS_USING_LEASES(env) &&
605		    (role_chg || !F_ISSET(rep, REP_F_START_CALLED))) {
606			if ((ret = __rep_lease_expire(env, 1)) != 0)
607				goto errunlock;
608			/*
609			 * Since the master is also compensating on its
610			 * side as well, we're being doubly conservative
611			 * to compensate on the client side.  Theoretically,
612			 * this compensation is not necessary, as it is
613			 * effectively doubling the skew compensation.
614			 * But we are making guarantees based on time and
615			 * skews across machines.  So we are being extra
616			 * cautious.
617			 */
618			tmp = (db_timeout_t)((double)rep->lease_timeout *
619			    ((double)rep->clock_skew /
620			    (double)rep->clock_base));
621			DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration);
622			if (rep->lease_off != INVALID_ROFF) {
623				__env_alloc_free(infop,
624				    R_ADDR(infop, rep->lease_off));
625				rep->lease_off = INVALID_ROFF;
626			}
627		}
628		REP_SYSTEM_UNLOCK(env);
629
630		/*
631		 * Abort any prepared transactions that were restored
632		 * by recovery.  We won't be able to create any txns of
633		 * our own until they're resolved, but we can't resolve
634		 * them ourselves;  the master has to.  If any get
635		 * resolved as commits, we'll redo them when commit
636		 * records come in.  Aborts will simply be ignored.
637		 */
638		if ((ret = __rep_abort_prepared(env)) != 0)
639			goto errlock;
640
641		/*
642		 * If we're changing roles we need to init the db.
643		 */
644		if (role_chg) {
645			if ((ret = db_create(&dbp, dbenv, 0)) != 0)
646				goto errlock;
647			/*
648			 * Ignore errors, because if the file doesn't exist,
649			 * this is perfectly OK.
650			 */
651			MUTEX_LOCK(env, rep->mtx_clientdb);
652			(void)__db_remove(dbp, ip, NULL, REPDBNAME,
653			    NULL, DB_FORCE);
654			MUTEX_UNLOCK(env, rep->mtx_clientdb);
655			/*
656			 * Set pending_event after calls that can fail.
657			 */
658			pending_event = DB_EVENT_REP_CLIENT;
659		}
660		REP_SYSTEM_LOCK(env);
661		F_CLR(rep, REP_F_READY_MSG);
662		if (locked) {
663			F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
664			locked = 0;
665		}
666		REP_SYSTEM_UNLOCK(env);
667
668		/*
669		 * If this client created a newly replicated environment,
670		 * then announce the existence of this client.  The master
671		 * should respond with a message that will tell this client
672		 * the current generation number and the current LSN.  This
673		 * will allow the client to either perform recovery or
674		 * simply join in.
675		 */
676		if (announce) {
677			/*
678			 * If we think we're a new client, and we have a
679			 * private env, set our gen number down to 0.
680			 * Otherwise, we can restart and think
681			 * we're ready to accept a new record (because our
682			 * gen is okay), but really this client needs to
683			 * sync with the master.  So, if we are announcing
684			 * ourselves force ourselves to find the master
685			 * and sync up.
686			 */
687			if (F_ISSET(env, ENV_PRIVATE))
688				rep->gen = 0;
689			if ((ret = __dbt_usercopy(env, dbt)) != 0)
690				goto out;
691			(void)__rep_send_message(env,
692			    DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0);
693		} else
694			(void)__rep_send_message(env,
695			    DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0, 0);
696	}
697
698	if (0) {
699		/*
700		 * We have separate labels for errors.  If we're returning an
701		 * error before we've set REP_F_READY_MSG, we use 'err'.  If
702		 * we are erroring while holding the region mutex, then we use
703		 * 'errunlock' label.  If we error without holding the rep
704		 * mutex we must use 'errlock'.
705		 */
706errlock:	REP_SYSTEM_LOCK(env);
707errunlock:	F_CLR(rep, REP_F_READY_MSG);
708		if (locked)
709			F_CLR(rep, REP_F_READY_API | REP_F_READY_OP);
710		REP_SYSTEM_UNLOCK(env);
711	}
712out:
713	if (ret == 0) {
714		REP_SYSTEM_LOCK(env);
715		F_SET(rep, REP_F_START_CALLED);
716		REP_SYSTEM_UNLOCK(env);
717	}
718	if (pending_event != DB_EVENT_NO_SUCH_EVENT)
719		__rep_fire_event(env, pending_event, NULL);
720	__dbt_userfree(env, dbt, NULL, NULL);
721	ENV_LEAVE(env, ip);
722	return (ret);
723}
724
725/*
726 * __rep_client_dbinit --
727 *
728 * Initialize the LSN database on the client side.  This is called from the
729 * client initialization code.  The startup flag value indicates if
730 * this is the first thread/process starting up and therefore should create
731 * the LSN database.  This routine must be called once by each process acting
732 * as a client.
733 *
734 * Assumes caller holds appropriate mutex.
735 *
736 * PUBLIC: int __rep_client_dbinit __P((ENV *, int, repdb_t));
737 */
738int
739__rep_client_dbinit(env, startup, which)
740	ENV *env;
741	int startup;
742	repdb_t which;
743{
744	DB *dbp, **rdbpp;
745	DB_ENV *dbenv;
746	DB_REP *db_rep;
747	DB_THREAD_INFO *ip;
748	REP *rep;
749	int ret, t_ret;
750	u_int32_t flags;
751	const char *name;
752
753	dbenv = env->dbenv;
754	db_rep = env->rep_handle;
755	rep = db_rep->region;
756	dbp = NULL;
757
758	if (which == REP_DB) {
759		name = REPDBNAME;
760		rdbpp = &db_rep->rep_db;
761	} else {
762		name = REPPAGENAME;
763		rdbpp = &rep->file_dbp;
764	}
765	/* Check if this has already been called on this environment. */
766	if (*rdbpp != NULL)
767		return (0);
768
769	ENV_GET_THREAD_INFO(env, ip);
770
771	if (startup) {
772		if ((ret = db_create(&dbp, dbenv, 0)) != 0)
773			goto err;
774		/*
775		 * Ignore errors, because if the file doesn't exist, this
776		 * is perfectly OK.
777		 */
778		(void)__db_remove(dbp, ip, NULL, name, NULL, DB_FORCE);
779	}
780
781	if ((ret = db_create(&dbp, dbenv, 0)) != 0)
782		goto err;
783	if (which == REP_DB &&
784	    (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)
785		goto err;
786
787	/* Don't write log records on the client. */
788	if ((ret = __db_set_flags(dbp, DB_TXN_NOT_DURABLE)) != 0)
789		goto err;
790
791	flags = DB_NO_AUTO_COMMIT | DB_CREATE |
792	    (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
793
794	if ((ret = __db_open(dbp, ip, NULL, name, NULL,
795	    (which == REP_DB ? DB_BTREE : DB_RECNO),
796	    flags, 0, PGNO_BASE_MD)) != 0)
797		goto err;
798
799	*rdbpp = dbp;
800
801	if (0) {
802err:		if (dbp != NULL &&
803		    (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
804			ret = t_ret;
805		*rdbpp = NULL;
806	}
807
808	return (ret);
809}
810
811/*
812 * __rep_bt_cmp --
813 *
814 * Comparison function for the LSN table.  We use the entire control
815 * structure as a key (for simplicity, so we don't have to merge the
816 * other fields in the control with the data field), but really only
817 * care about the LSNs.
818 */
819static int
820__rep_bt_cmp(dbp, dbt1, dbt2)
821	DB *dbp;
822	const DBT *dbt1, *dbt2;
823{
824	DB_LSN lsn1, lsn2;
825	__rep_control_args *rp1, *rp2;
826
827	COMPQUIET(dbp, NULL);
828
829	rp1 = dbt1->data;
830	rp2 = dbt2->data;
831
832	(void)__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
833	(void)__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
834
835	if (lsn1.file > lsn2.file)
836		return (1);
837
838	if (lsn1.file < lsn2.file)
839		return (-1);
840
841	if (lsn1.offset > lsn2.offset)
842		return (1);
843
844	if (lsn1.offset < lsn2.offset)
845		return (-1);
846
847	return (0);
848}
849
850/*
851 * __rep_abort_prepared --
852 *	Abort any prepared transactions that recovery restored.
853 *
854 *	This is used by clients that have just run recovery, since
855 * they cannot/should not call txn_recover and handle prepared transactions
856 * themselves.
857 */
858static int
859__rep_abort_prepared(env)
860	ENV *env;
861{
862#define	PREPLISTSIZE	50
863	DB_LOG *dblp;
864	DB_PREPLIST prep[PREPLISTSIZE], *p;
865	DB_TXNMGR *mgr;
866	DB_TXNREGION *region;
867	LOG *lp;
868	int ret;
869	long count, i;
870	u_int32_t op;
871
872	mgr = env->tx_handle;
873	region = mgr->reginfo.primary;
874	dblp = env->lg_handle;
875	lp = dblp->reginfo.primary;
876
877	if (region->stat.st_nrestores == 0)
878		return (0);
879
880	op = DB_FIRST;
881	do {
882		if ((ret = __txn_recover(env,
883		    prep, PREPLISTSIZE, &count, op)) != 0)
884			return (ret);
885		for (i = 0; i < count; i++) {
886			p = &prep[i];
887			if ((ret = __txn_abort(p->txn)) != 0)
888				return (ret);
889			env->rep_handle->region->op_cnt--;
890			env->rep_handle->region->max_prep_lsn = lp->lsn;
891			region->stat.st_nrestores--;
892		}
893		op = DB_NEXT;
894	} while (count == PREPLISTSIZE);
895
896	return (0);
897}
898
899/*
900 * __rep_restore_prepared --
901 *	Restore to a prepared state any prepared but not yet committed
902 * transactions.
903 *
904 *	This performs, in effect, a "mini-recovery";  it is called from
905 * __rep_start by newly upgraded masters.  There may be transactions that an
906 * old master prepared but did not resolve, which we need to restore to an
907 * active state.
908 */
909static int
910__rep_restore_prepared(env)
911	ENV *env;
912{
913	DBT rec;
914	DB_LOGC *logc;
915	DB_LSN ckp_lsn, lsn;
916	DB_REP *db_rep;
917	DB_TXNHEAD *txninfo;
918	REP *rep;
919	__txn_ckp_args *ckp_args;
920	__txn_regop_args *regop_args;
921	__txn_xa_regop_args *prep_args;
922	int ret, t_ret;
923	u_int32_t hi_txn, low_txn, rectype, status, txnid, txnop;
924
925	db_rep = env->rep_handle;
926	rep = db_rep->region;
927	if (IS_ZERO_LSN(rep->max_prep_lsn)) {
928		RPRINT(env, DB_VERB_REP_MISC,
929		    (env, "restore_prep: No prepares. Skip."));
930		return (0);
931	}
932	txninfo = NULL;
933	ckp_args = NULL;
934	prep_args = NULL;
935	regop_args = NULL;
936	ZERO_LSN(ckp_lsn);
937	ZERO_LSN(lsn);
938
939	if ((ret = __log_cursor(env, &logc)) != 0)
940		return (ret);
941
942	/*
943	 * Get our first LSN to see if the prepared LSN is still
944	 * available.  If so, it might be unresolved.  If not,
945	 * then it is guaranteed to be resolved.
946	 */
947	memset(&rec, 0, sizeof(DBT));
948	if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0)  {
949		__db_errx(env, "First record not found");
950		goto err;
951	}
952	/*
953	 * If the max_prep_lsn is no longer available, we're sure
954	 * that txn has been resolved.  We're done.
955	 */
956	if (rep->max_prep_lsn.file < lsn.file) {
957		RPRINT(env, DB_VERB_REP_MISC,
958		    (env, "restore_prep: Prepare resolved. Skip"));
959		ZERO_LSN(rep->max_prep_lsn);
960		goto done;
961	}
962	/*
963	 * We need to consider the set of records between the most recent
964	 * checkpoint LSN and the end of the log;  any txn in that
965	 * range, and only txns in that range, could still have been
966	 * active, and thus prepared but not yet committed (PBNYC),
967	 * when the old master died.
968	 *
969	 * Find the most recent checkpoint LSN, and get the record there.
970	 * If there is no checkpoint in the log, start off by getting
971	 * the very first record in the log instead.
972	 */
973	if ((ret = __txn_getckp(env, &lsn)) == 0) {
974		if ((ret = __logc_get(logc, &lsn, &rec, DB_SET)) != 0)  {
975			__db_errx(env,
976			    "Checkpoint record at LSN [%lu][%lu] not found",
977			    (u_long)lsn.file, (u_long)lsn.offset);
978			goto err;
979		}
980
981		if ((ret = __txn_ckp_read(
982		    env, rec.data, &ckp_args)) == 0) {
983			ckp_lsn = ckp_args->ckp_lsn;
984			__os_free(env, ckp_args);
985		}
986		if (ret != 0) {
987			__db_errx(env,
988			    "Invalid checkpoint record at [%lu][%lu]",
989			    (u_long)lsn.file, (u_long)lsn.offset);
990			goto err;
991		}
992
993		if ((ret = __logc_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
994			__db_errx(env,
995			    "Checkpoint LSN record [%lu][%lu] not found",
996			    (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
997			goto err;
998		}
999	} else if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0) {
1000		if (ret == DB_NOTFOUND) {
1001			/* An empty log means no PBNYC txns. */
1002			ret = 0;
1003			goto done;
1004		}
1005		__db_errx(env, "Attempt to get first log record failed");
1006		goto err;
1007	}
1008
1009	/*
1010	 * We use the same txnlist infrastructure that recovery does;
1011	 * it demands an estimate of the high and low txnids for
1012	 * initialization.
1013	 *
1014	 * First, the low txnid.
1015	 */
1016	do {
1017		/* txnid is after rectype, which is a u_int32. */
1018		LOGCOPY_32(env, &low_txn,
1019		    (u_int8_t *)rec.data + sizeof(u_int32_t));
1020		if (low_txn != 0)
1021			break;
1022	} while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0);
1023
1024	/* If there are no txns, there are no PBNYC txns. */
1025	if (ret == DB_NOTFOUND) {
1026		ret = 0;
1027		goto done;
1028	} else if (ret != 0)
1029		goto err;
1030
1031	/* Now, the high txnid. */
1032	if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0) {
1033		/*
1034		 * Note that DB_NOTFOUND is unacceptable here because we
1035		 * had to have looked at some log record to get this far.
1036		 */
1037		__db_errx(env, "Final log record not found");
1038		goto err;
1039	}
1040	do {
1041		/* txnid is after rectype, which is a u_int32. */
1042		LOGCOPY_32(env, &hi_txn,
1043		    (u_int8_t *)rec.data + sizeof(u_int32_t));
1044		if (hi_txn != 0)
1045			break;
1046	} while ((ret = __logc_get(logc, &lsn, &rec, DB_PREV)) == 0);
1047	if (ret == DB_NOTFOUND) {
1048		ret = 0;
1049		goto done;
1050	} else if (ret != 0)
1051		goto err;
1052
1053	/* We have a high and low txnid.  Initialise the txn list. */
1054	if ((ret = __db_txnlist_init(env,
1055	    NULL, low_txn, hi_txn, NULL, &txninfo)) != 0)
1056		goto err;
1057
1058	/*
1059	 * Now, walk backward from the end of the log to ckp_lsn.  Any
1060	 * prepares that we hit without first hitting a commit or
1061	 * abort belong to PBNYC txns, and we need to apply them and
1062	 * restore them to a prepared state.
1063	 *
1064	 * Note that we wind up applying transactions out of order.
1065	 * Since all PBNYC txns still held locks on the old master and
1066	 * were isolated, this should be safe.
1067	 */
1068	F_SET(env->lg_handle, DBLOG_RECOVER);
1069	for (ret = __logc_get(logc, &lsn, &rec, DB_LAST);
1070	    ret == 0 && LOG_COMPARE(&lsn, &ckp_lsn) > 0;
1071	    ret = __logc_get(logc, &lsn, &rec, DB_PREV)) {
1072		LOGCOPY_32(env, &rectype, rec.data);
1073		switch (rectype) {
1074		case DB___txn_regop:
1075			/*
1076			 * It's a commit or abort--but we don't care
1077			 * which!  Just add it to the list of txns
1078			 * that are resolved.
1079			 */
1080			if ((ret = __txn_regop_read(
1081			    env, rec.data, &regop_args)) != 0)
1082				goto err;
1083			txnid = regop_args->txnp->txnid;
1084			txnop = regop_args->opcode;
1085			__os_free(env, regop_args);
1086
1087			ret = __db_txnlist_find(env,
1088			    txninfo, txnid, &status);
1089			if (ret == DB_NOTFOUND)
1090				ret = __db_txnlist_add(env, txninfo,
1091				    txnid, txnop, &lsn);
1092			else if (ret != 0)
1093				goto err;
1094			break;
1095		case DB___txn_xa_regop:
1096			/*
1097			 * It's a prepare.  If its not aborted and
1098			 * we haven't put the txn on our list yet, it
1099			 * hasn't been resolved, so apply and restore it.
1100			 */
1101			if ((ret = __txn_xa_regop_read(
1102			    env, rec.data, &prep_args)) != 0)
1103				goto err;
1104			ret = __db_txnlist_find(env, txninfo,
1105			    prep_args->txnp->txnid, &status);
1106			if (ret == DB_NOTFOUND) {
1107				if (prep_args->opcode == TXN_ABORT)
1108					ret = __db_txnlist_add(env, txninfo,
1109					    prep_args->txnp->txnid,
1110					    prep_args->opcode, &lsn);
1111				else if ((ret =
1112				    __rep_process_txn(env, &rec)) == 0) {
1113					/*
1114					 * We are guaranteed to be single
1115					 * threaded here.  We need to
1116					 * account for this newly
1117					 * instantiated txn in the op_cnt
1118					 * so that it is counted when it is
1119					 * resolved.
1120					 */
1121					rep->op_cnt++;
1122					ret = __txn_restore_txn(env,
1123					    &lsn, prep_args);
1124				}
1125			} else if (ret != 0)
1126				goto err;
1127			__os_free(env, prep_args);
1128			break;
1129		default:
1130			continue;
1131		}
1132	}
1133
1134	/* It's not an error to have hit the beginning of the log. */
1135	if (ret == DB_NOTFOUND)
1136		ret = 0;
1137
1138done:
1139err:	t_ret = __logc_close(logc);
1140	F_CLR(env->lg_handle, DBLOG_RECOVER);
1141
1142	if (txninfo != NULL)
1143		__db_txnlist_end(env, txninfo);
1144
1145	return (ret == 0 ? t_ret : ret);
1146}
1147
1148/*
1149 * __rep_get_limit --
1150 *	Get the limit on the amount of data that will be sent during a single
1151 * invocation of __rep_process_message.
1152 *
1153 * PUBLIC: int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
1154 */
1155int
1156__rep_get_limit(dbenv, gbytesp, bytesp)
1157	DB_ENV *dbenv;
1158	u_int32_t *gbytesp, *bytesp;
1159{
1160	DB_REP *db_rep;
1161	DB_THREAD_INFO *ip;
1162	ENV *env;
1163	REP *rep;
1164
1165	env = dbenv->env;
1166	db_rep = env->rep_handle;
1167
1168	ENV_NOT_CONFIGURED(
1169	    env, db_rep->region, "DB_ENV->rep_get_limit", DB_INIT_REP);
1170
1171	if (REP_ON(env)) {
1172		rep = db_rep->region;
1173		ENV_ENTER(env, ip);
1174		REP_SYSTEM_LOCK(env);
1175		if (gbytesp != NULL)
1176			*gbytesp = rep->gbytes;
1177		if (bytesp != NULL)
1178			*bytesp = rep->bytes;
1179		REP_SYSTEM_UNLOCK(env);
1180		ENV_LEAVE(env, ip);
1181	} else {
1182		if (gbytesp != NULL)
1183			*gbytesp = db_rep->gbytes;
1184		if (bytesp != NULL)
1185			*bytesp = db_rep->bytes;
1186	}
1187
1188	return (0);
1189}
1190
1191/*
1192 * __rep_set_limit --
1193 *	Set a limit on the amount of data that will be sent during a single
1194 * invocation of __rep_process_message.
1195 *
1196 * PUBLIC: int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
1197 */
1198int
1199__rep_set_limit(dbenv, gbytes, bytes)
1200	DB_ENV *dbenv;
1201	u_int32_t gbytes, bytes;
1202{
1203	DB_REP *db_rep;
1204	DB_THREAD_INFO *ip;
1205	ENV *env;
1206	REP *rep;
1207
1208	env = dbenv->env;
1209	db_rep = env->rep_handle;
1210
1211	ENV_NOT_CONFIGURED(
1212	    env, db_rep->region, "DB_ENV->rep_set_limit", DB_INIT_REP);
1213
1214	if (bytes > GIGABYTE) {
1215		gbytes += bytes / GIGABYTE;
1216		bytes = bytes % GIGABYTE;
1217	}
1218
1219	if (REP_ON(env)) {
1220		rep = db_rep->region;
1221		ENV_ENTER(env, ip);
1222		REP_SYSTEM_LOCK(env);
1223		rep->gbytes = gbytes;
1224		rep->bytes = bytes;
1225		REP_SYSTEM_UNLOCK(env);
1226		ENV_LEAVE(env, ip);
1227	} else {
1228		db_rep->gbytes = gbytes;
1229		db_rep->bytes = bytes;
1230	}
1231
1232	return (0);
1233}
1234
1235/*
1236 * PUBLIC: int __rep_set_nsites __P((DB_ENV *, u_int32_t));
1237 */
1238int
1239__rep_set_nsites(dbenv, n)
1240	DB_ENV *dbenv;
1241	u_int32_t n;
1242{
1243	DB_REP *db_rep;
1244	ENV *env;
1245	REP *rep;
1246
1247	env = dbenv->env;
1248	db_rep = env->rep_handle;
1249
1250	ENV_NOT_CONFIGURED(
1251	    env, db_rep->region, "DB_ENV->rep_set_nsites", DB_INIT_REP);
1252
1253	if (REP_ON(env)) {
1254		rep = db_rep->region;
1255		if (rep != NULL && F_ISSET(rep, REP_F_START_CALLED)) {
1256			__db_errx(env,
1257	"DB_ENV->rep_set_nsites: must be called before DB_ENV->rep_start");
1258			return (EINVAL);
1259		}
1260		rep->config_nsites = n;
1261	} else
1262		db_rep->config_nsites = n;
1263	return (0);
1264}
1265
1266/*
1267 * PUBLIC: int __rep_get_nsites __P((DB_ENV *, u_int32_t *));
1268 */
1269int
1270__rep_get_nsites(dbenv, n)
1271	DB_ENV *dbenv;
1272	u_int32_t *n;
1273{
1274	DB_REP *db_rep;
1275	ENV *env;
1276	REP *rep;
1277
1278	env = dbenv->env;
1279	db_rep = env->rep_handle;
1280
1281	/* TODO: ENV_REQUIRES_CONFIG(... ) and/or ENV_NOT_CONFIGURED (?) */
1282
1283	if (REP_ON(env)) {
1284		rep = db_rep->region;
1285		*n = rep->config_nsites;
1286	} else
1287		*n = db_rep->config_nsites;
1288
1289	return (0);
1290}
1291
1292/*
1293 * PUBLIC: int __rep_set_priority __P((DB_ENV *, u_int32_t));
1294 */
1295int
1296__rep_set_priority(dbenv, priority)
1297	DB_ENV *dbenv;
1298	u_int32_t priority;
1299{
1300	DB_REP *db_rep;
1301	ENV *env;
1302	REP *rep;
1303
1304	env = dbenv->env;
1305	db_rep = env->rep_handle;
1306
1307	if (REP_ON(env)) {
1308		rep = db_rep->region;
1309		rep->priority = priority;
1310	} else
1311		db_rep->my_priority = priority;
1312	return (0);
1313}
1314
1315/*
1316 * PUBLIC: int __rep_get_priority __P((DB_ENV *, u_int32_t *));
1317 */
1318int
1319__rep_get_priority(dbenv, priority)
1320	DB_ENV *dbenv;
1321	u_int32_t *priority;
1322{
1323	DB_REP *db_rep;
1324	ENV *env;
1325	REP *rep;
1326
1327	env = dbenv->env;
1328	db_rep = env->rep_handle;
1329
1330	if (REP_ON(env)) {
1331		rep = db_rep->region;
1332		*priority = rep->priority;
1333	} else
1334		*priority = db_rep->my_priority;
1335	return (0);
1336}
1337
1338/*
1339 * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t));
1340 */
1341int
1342__rep_set_timeout(dbenv, which, timeout)
1343	DB_ENV *dbenv;
1344	int which;
1345	db_timeout_t timeout;
1346{
1347	DB_REP *db_rep;
1348	ENV *env;
1349	REP *rep;
1350	int ret;
1351
1352	env = dbenv->env;
1353	db_rep = env->rep_handle;
1354	rep = db_rep->region;
1355	ret = 0;
1356
1357	switch (which) {
1358	case DB_REP_CHECKPOINT_DELAY:
1359		if (REP_ON(env))
1360			rep->chkpt_delay = timeout;
1361		else
1362			db_rep->chkpt_delay = timeout;
1363		break;
1364	case DB_REP_ELECTION_TIMEOUT:
1365		if (REP_ON(env))
1366			rep->elect_timeout = timeout;
1367		else
1368			db_rep->elect_timeout = timeout;
1369		break;
1370	case DB_REP_FULL_ELECTION_TIMEOUT:
1371		if (REP_ON(env))
1372			rep->full_elect_timeout = timeout;
1373		else
1374			db_rep->full_elect_timeout = timeout;
1375		break;
1376	case DB_REP_LEASE_TIMEOUT:
1377		if (REP_ON(env) && F_ISSET(rep, REP_F_START_CALLED)) {
1378			ret = EINVAL;
1379			__db_errx(env,
1380"DB_ENV->rep_set_timeout: lease timeout must be set before DB_ENV->rep_start.");
1381			goto out;
1382		}
1383		if (REP_ON(env))
1384			rep->lease_timeout = timeout;
1385		else
1386			db_rep->lease_timeout = timeout;
1387		break;
1388#ifdef HAVE_REPLICATION_THREADS
1389	case DB_REP_ACK_TIMEOUT:
1390		db_rep->ack_timeout = timeout;
1391		break;
1392	case DB_REP_CONNECTION_RETRY:
1393		db_rep->connection_retry_wait = timeout;
1394		break;
1395	case DB_REP_ELECTION_RETRY:
1396		db_rep->election_retry_wait = timeout;
1397		break;
1398	case DB_REP_HEARTBEAT_MONITOR:
1399		db_rep->heartbeat_monitor_timeout = timeout;
1400		break;
1401	case DB_REP_HEARTBEAT_SEND:
1402		db_rep->heartbeat_frequency = timeout;
1403		break;
1404#endif
1405	default:
1406		__db_errx(env,
1407		    "Unknown timeout type argument to DB_ENV->rep_set_timeout");
1408		ret = EINVAL;
1409	}
1410
1411out:
1412	return (ret);
1413}
1414
1415/*
1416 * PUBLIC: int __rep_get_timeout __P((DB_ENV *, int, db_timeout_t *));
1417 */
1418int
1419__rep_get_timeout(dbenv, which, timeout)
1420	DB_ENV *dbenv;
1421	int which;
1422	db_timeout_t *timeout;
1423{
1424	DB_REP *db_rep;
1425	ENV *env;
1426	REP *rep;
1427
1428	env = dbenv->env;
1429	db_rep = env->rep_handle;
1430	rep = db_rep->region;
1431
1432	switch (which) {
1433	case DB_REP_CHECKPOINT_DELAY:
1434		*timeout = REP_ON(env) ?
1435		    rep->chkpt_delay : db_rep->chkpt_delay;
1436		break;
1437	case DB_REP_ELECTION_TIMEOUT:
1438		*timeout = REP_ON(env) ?
1439		    rep->elect_timeout : db_rep->elect_timeout;
1440		break;
1441	case DB_REP_FULL_ELECTION_TIMEOUT:
1442		*timeout = REP_ON(env) ?
1443		    rep->full_elect_timeout : db_rep->full_elect_timeout;
1444		break;
1445	case DB_REP_LEASE_TIMEOUT:
1446		*timeout = REP_ON(env) ?
1447		    rep->lease_timeout : db_rep->lease_timeout;
1448		break;
1449#ifdef HAVE_REPLICATION_THREADS
1450	case DB_REP_ACK_TIMEOUT:
1451		*timeout = db_rep->ack_timeout;
1452		break;
1453	case DB_REP_ELECTION_RETRY:
1454		*timeout = db_rep->election_retry_wait;
1455		break;
1456	case DB_REP_CONNECTION_RETRY:
1457		*timeout = db_rep->connection_retry_wait;
1458		break;
1459#endif
1460	default:
1461		__db_errx(env,
1462		    "unknown timeout type argument to DB_ENV->rep_get_timeout");
1463		return (EINVAL);
1464	}
1465
1466	return (0);
1467}
1468
1469/*
1470 * __rep_get_request --
1471 *	Get the minimum and maximum number of log records that we wait
1472 *	before retransmitting.
1473 *
1474 * PUBLIC: int __rep_get_request
1475 * PUBLIC:     __P((DB_ENV *, db_timeout_t *, db_timeout_t *));
1476 */
1477int
1478__rep_get_request(dbenv, minp, maxp)
1479	DB_ENV *dbenv;
1480	db_timeout_t *minp, *maxp;
1481{
1482	DB_REP *db_rep;
1483	DB_THREAD_INFO *ip;
1484	ENV *env;
1485	REP *rep;
1486
1487	env = dbenv->env;
1488	db_rep = env->rep_handle;
1489
1490	ENV_NOT_CONFIGURED(
1491	    env, db_rep->region, "DB_ENV->rep_get_request", DB_INIT_REP);
1492
1493	if (REP_ON(env)) {
1494		rep = db_rep->region;
1495		ENV_ENTER(env, ip);
1496		/*
1497		 * We acquire the mtx_region or mtx_clientdb mutexes as needed.
1498		 */
1499		REP_SYSTEM_LOCK(env);
1500		if (minp != NULL)
1501			DB_TIMESPEC_TO_TIMEOUT((*minp), &rep->request_gap, 0);
1502		if (maxp != NULL)
1503			DB_TIMESPEC_TO_TIMEOUT((*maxp), &rep->max_gap, 0);
1504		REP_SYSTEM_UNLOCK(env);
1505		ENV_LEAVE(env, ip);
1506	} else {
1507		if (minp != NULL)
1508			DB_TIMESPEC_TO_TIMEOUT((*minp),
1509			    &db_rep->request_gap, 0);
1510		if (maxp != NULL)
1511			DB_TIMESPEC_TO_TIMEOUT((*maxp), &db_rep->max_gap, 0);
1512	}
1513
1514	return (0);
1515}
1516
1517/*
1518 * __rep_set_request --
1519 *	Set the minimum and maximum number of log records that we wait
1520 *	before retransmitting.
1521 *
1522 * PUBLIC: int __rep_set_request __P((DB_ENV *, db_timeout_t, db_timeout_t));
1523 */
1524int
1525__rep_set_request(dbenv, min, max)
1526	DB_ENV *dbenv;
1527	db_timeout_t min, max;
1528{
1529	DB_LOG *dblp;
1530	DB_REP *db_rep;
1531	DB_THREAD_INFO *ip;
1532	ENV *env;
1533	LOG *lp;
1534	REP *rep;
1535
1536	env = dbenv->env;
1537	db_rep = env->rep_handle;
1538
1539	ENV_NOT_CONFIGURED(
1540	    env, db_rep->region, "DB_ENV->rep_set_request", DB_INIT_REP);
1541
1542	if (min == 0 || max < min) {
1543		__db_errx(env,
1544		    "DB_ENV->rep_set_request: Invalid min or max values");
1545		return (EINVAL);
1546	}
1547	if (REP_ON(env)) {
1548		rep = db_rep->region;
1549		ENV_ENTER(env, ip);
1550		/*
1551		 * We acquire the mtx_region or mtx_clientdb mutexes as needed.
1552		 */
1553		REP_SYSTEM_LOCK(env);
1554		DB_TIMEOUT_TO_TIMESPEC(min, &rep->request_gap);
1555		DB_TIMEOUT_TO_TIMESPEC(max, &rep->max_gap);
1556		REP_SYSTEM_UNLOCK(env);
1557
1558		MUTEX_LOCK(env, rep->mtx_clientdb);
1559		dblp = env->lg_handle;
1560		if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
1561			DB_TIMEOUT_TO_TIMESPEC(min, &lp->wait_ts);
1562		}
1563		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1564		ENV_LEAVE(env, ip);
1565	} else {
1566		DB_TIMEOUT_TO_TIMESPEC(min, &db_rep->request_gap);
1567		DB_TIMEOUT_TO_TIMESPEC(max, &db_rep->max_gap);
1568	}
1569
1570	return (0);
1571}
1572
1573/*
1574 * __rep_set_transport --
1575 *	Set the transport function for replication.
1576 *
1577 * PUBLIC: int __rep_set_transport __P((DB_ENV *, int,
1578 * PUBLIC:     int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
1579 * PUBLIC:     int, u_int32_t)));
1580 */
1581int
1582__rep_set_transport(dbenv, eid, f_send)
1583	DB_ENV *dbenv;
1584	int eid;
1585	int (*f_send) __P((DB_ENV *,
1586	    const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
1587{
1588	DB_REP *db_rep;
1589	DB_THREAD_INFO *ip;
1590	ENV *env;
1591	REP *rep;
1592
1593	env = dbenv->env;
1594
1595	if (f_send == NULL) {
1596		__db_errx(env,
1597		    "DB_ENV->rep_set_transport: no send function specified");
1598		return (EINVAL);
1599	}
1600
1601	if (eid < 0) {
1602		__db_errx(env,
1603	"DB_ENV->rep_set_transport: eid must be greater than or equal to 0");
1604		return (EINVAL);
1605	}
1606
1607	db_rep = env->rep_handle;
1608	db_rep->send = f_send;
1609
1610	if (REP_ON(env)) {
1611		rep = db_rep->region;
1612		ENV_ENTER(env, ip);
1613		REP_SYSTEM_LOCK(env);
1614		rep->eid = eid;
1615		REP_SYSTEM_UNLOCK(env);
1616		ENV_LEAVE(env, ip);
1617	} else
1618		db_rep->eid = eid;
1619	return (0);
1620}
1621
1622/*
1623 * PUBLIC: int __rep_get_clockskew __P((DB_ENV *, u_int32_t *, u_int32_t *));
1624 */
1625int
1626__rep_get_clockskew(dbenv, fast_clockp, slow_clockp)
1627	DB_ENV *dbenv;
1628	u_int32_t *fast_clockp, *slow_clockp;
1629{
1630	DB_REP *db_rep;
1631	ENV *env;
1632	REP *rep;
1633
1634	env = dbenv->env;
1635	db_rep = env->rep_handle;
1636
1637	if (REP_ON(env)) {
1638		rep = db_rep->region;
1639		*fast_clockp = rep->clock_skew;
1640		*slow_clockp = rep->clock_base;
1641	} else {
1642		*fast_clockp = db_rep->clock_skew;
1643		*slow_clockp = db_rep->clock_base;
1644	}
1645
1646	return (0);
1647}
1648
1649/*
1650 * PUBLIC: int __rep_set_clockskew __P((DB_ENV *, u_int32_t, u_int32_t));
1651 */
1652int
1653__rep_set_clockskew(dbenv, fast_clock, slow_clock)
1654	DB_ENV *dbenv;
1655	u_int32_t fast_clock, slow_clock;
1656{
1657	DB_REP *db_rep;
1658	DB_THREAD_INFO *ip;
1659	ENV *env;
1660	REP *rep;
1661	int ret;
1662
1663	env = dbenv->env;
1664	db_rep = env->rep_handle;
1665	ret = 0;
1666
1667	ENV_NOT_CONFIGURED(
1668	    env, db_rep->region, "DB_ENV->rep_set_clockskew", DB_INIT_REP);
1669
1670	/*
1671	 * Check for valid values.  The fast clock should be a larger
1672	 * number than the slow clock.  We use the slow clock value as
1673	 * our base for adjustment - therefore, a 2% difference should
1674	 * be fast == 102, slow == 100.  Check for values being 0.  If
1675	 * they are, then set them both to 1 internally.
1676	 *
1677	 * We will use these numbers to compute the larger ratio to be
1678	 * most conservative about the user's intention.
1679	 */
1680	if (fast_clock == 0 || slow_clock == 0) {
1681		/*
1682		 * If one value is zero, reject if both aren't zero.
1683		 */
1684		if (slow_clock != 0 || fast_clock != 0) {
1685			__db_errx(env,
1686"DB_ENV->rep_set_clockskew: Zero only valid for when used for both arguments");
1687			return (EINVAL);
1688		}
1689		fast_clock = 1;
1690		slow_clock = 1;
1691	}
1692	if (fast_clock < slow_clock) {
1693		__db_errx(env,
1694"DB_ENV->rep_set_clockskew: slow_clock value is larger than fast_clock_value");
1695		return (EINVAL);
1696	}
1697	if (REP_ON(env)) {
1698		rep = db_rep->region;
1699		if (F_ISSET(rep, REP_F_START_CALLED)) {
1700			__db_errx(env,
1701	"DB_ENV->rep_set_clockskew: must be called before DB_ENV->rep_start");
1702			return (EINVAL);
1703		}
1704		ENV_ENTER(env, ip);
1705		REP_SYSTEM_LOCK(env);
1706		rep->clock_skew = fast_clock;
1707		rep->clock_base = slow_clock;
1708		REP_SYSTEM_UNLOCK(env);
1709		ENV_LEAVE(env, ip);
1710	} else {
1711		db_rep->clock_skew = fast_clock;
1712		db_rep->clock_base = slow_clock;
1713	}
1714	return (ret);
1715}
1716
1717/*
1718 * __rep_flush --
1719 *	Re-push the last log record to all clients, in case they've lost
1720 *	messages and don't know it.
1721 *
1722 * PUBLIC: int __rep_flush __P((DB_ENV *));
1723 */
1724int
1725__rep_flush(dbenv)
1726	DB_ENV *dbenv;
1727{
1728	DBT rec;
1729	DB_LOGC *logc;
1730	DB_LSN lsn;
1731	DB_THREAD_INFO *ip;
1732	ENV *env;
1733	int ret, t_ret;
1734
1735	env = dbenv->env;
1736
1737	ENV_REQUIRES_CONFIG_XX(
1738	    env, rep_handle, "DB_ENV->rep_flush", DB_INIT_REP);
1739	ENV_ENTER(env, ip);
1740
1741	if ((ret = __log_cursor(env, &logc)) != 0)
1742		return (ret);
1743
1744	memset(&rec, 0, sizeof(rec));
1745	memset(&lsn, 0, sizeof(lsn));
1746
1747	if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0)
1748		goto err;
1749
1750	(void)__rep_send_message(env,
1751	    DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0, 0);
1752
1753err:	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
1754		ret = t_ret;
1755	ENV_LEAVE(env, ip);
1756	return (ret);
1757}
1758
1759/*
1760 * __rep_sync --
1761 *	Force a synchronization to occur between this client and the master.
1762 *	This is the other half of configuring DELAYCLIENT.
1763 *
1764 * PUBLIC: int __rep_sync __P((DB_ENV *, u_int32_t));
1765 */
1766int
1767__rep_sync(dbenv, flags)
1768	DB_ENV *dbenv;
1769	u_int32_t flags;
1770{
1771	DB_LOG *dblp;
1772	DB_LSN lsn;
1773	DB_REP *db_rep;
1774	DB_THREAD_INFO *ip;
1775	ENV *env;
1776	LOG *lp;
1777	REP *rep;
1778	int master, ret;
1779	u_int32_t repflags, type;
1780
1781	env = dbenv->env;
1782
1783	COMPQUIET(flags, 0);
1784
1785	ENV_REQUIRES_CONFIG_XX(
1786	    env, rep_handle, "DB_ENV->rep_sync", DB_INIT_REP);
1787
1788	dblp = env->lg_handle;
1789	lp = dblp->reginfo.primary;
1790	db_rep = env->rep_handle;
1791	rep = db_rep->region;
1792	ret = 0;
1793
1794	ENV_ENTER(env, ip);
1795
1796	/*
1797	 * Simple cases.  If we're not in the DELAY state we have nothing
1798	 * to do.  If we don't know who the master is, send a MASTER_REQ.
1799	 */
1800	MUTEX_LOCK(env, rep->mtx_clientdb);
1801	lsn = lp->verify_lsn;
1802	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1803	REP_SYSTEM_LOCK(env);
1804	master = rep->master_id;
1805	if (master == DB_EID_INVALID) {
1806		REP_SYSTEM_UNLOCK(env);
1807		(void)__rep_send_message(env, DB_EID_BROADCAST,
1808		    REP_MASTER_REQ, NULL, NULL, 0, 0);
1809		goto out;
1810	}
1811	/*
1812	 * We want to hold the rep mutex to test and then clear the
1813	 * DELAY flag.  Racing threads in here could otherwise result
1814	 * in dual data streams.
1815	 */
1816	if (!F_ISSET(rep, REP_F_DELAY)) {
1817		REP_SYSTEM_UNLOCK(env);
1818		goto out;
1819	}
1820
1821	DB_ASSERT(env,
1822	    !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
1823
1824	/*
1825	 * If we get here, we clear the delay flag and kick off a
1826	 * synchronization.  From this point forward, we will
1827	 * synchronize until the next time the master changes.
1828	 */
1829	F_CLR(rep, REP_F_DELAY);
1830	if (IS_ZERO_LSN(lsn) && FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) {
1831		F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
1832		ret = DB_REP_JOIN_FAILURE;
1833		REP_SYSTEM_UNLOCK(env);
1834		goto out;
1835	}
1836	REP_SYSTEM_UNLOCK(env);
1837	/*
1838	 * When we set REP_F_DELAY, we set verify_lsn to the real verify lsn if
1839	 * we need to verify, or we zeroed it out if this is a client that needs
1840	 * internal init.  So, send the type of message now that
1841	 * __rep_new_master delayed sending.
1842	 */
1843	if (IS_ZERO_LSN(lsn)) {
1844		DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_UPDATE));
1845		type = REP_UPDATE_REQ;
1846		repflags = 0;
1847	} else {
1848		DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_VERIFY));
1849		type = REP_VERIFY_REQ;
1850		repflags = DB_REP_ANYWHERE;
1851	}
1852	(void)__rep_send_message(env, master, type, &lsn, NULL, 0, repflags);
1853
1854out:	ENV_LEAVE(env, ip);
1855	return (ret);
1856}
1857
1858/*
1859 * __rep_conv_vers --
1860 *	Convert from a log version to the replication message version
1861 *	that release used.
1862 */
1863static u_int32_t
1864__rep_conv_vers(env, log_ver)
1865	ENV *env;
1866	u_int32_t log_ver;
1867{
1868	COMPQUIET(env, NULL);
1869
1870	/*
1871	 * We can't use a switch statement, some of the DB_LOGVERSION_XX
1872	 * constants are the same
1873	 */
1874	if (log_ver == DB_LOGVERSION_44)
1875		return (DB_REPVERSION_44);
1876	if (log_ver == DB_LOGVERSION_45)
1877		return (DB_REPVERSION_45);
1878	if (log_ver == DB_LOGVERSION_46)
1879		return (DB_REPVERSION_46);
1880	if (log_ver == DB_LOGVERSION_47)
1881		return (DB_REPVERSION_47);
1882	return (DB_REPVERSION_INVALID);
1883}
1884