1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_util.c,v 12.149 2008/03/13 16:21:04 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/log.h"
15#include "dbinc/mp.h"
16#include "dbinc/txn.h"
17
18#ifdef REP_DIAGNOSTIC
19#include "dbinc/db_page.h"
20#include "dbinc/fop.h"
21#include "dbinc/btree.h"
22#include "dbinc/hash.h"
23#include "dbinc/qam.h"
24#endif
25
26/*
27 * rep_util.c:
28 *	Miscellaneous replication-related utility functions, including
29 *	those called by other subsystems.
30 */
31#define	TIMESTAMP_CHECK(env, ts, renv) do {				\
32	if (renv->op_timestamp != 0 &&					\
33	    renv->op_timestamp + DB_REGENV_TIMEOUT < ts) {		\
34		REP_SYSTEM_LOCK(env);					\
35		F_CLR(renv, DB_REGENV_REPLOCKED);			\
36		renv->op_timestamp = 0;					\
37		REP_SYSTEM_UNLOCK(env);					\
38	}								\
39} while (0)
40
41static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t,
42    const char *, u_int32_t));
43static int __rep_newmaster_empty __P((ENV *, int));
44#ifdef REP_DIAGNOSTIC
45static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *));
46#endif
47
48/*
49 * __rep_bulk_message --
50 *	This is a wrapper for putting a record into a bulk buffer.  Since
51 * we have different bulk buffers, the caller must hand us the information
52 * we need to put the record into the correct buffer.  All bulk buffers
53 * are protected by the REP->mtx_clientdb.
54 *
55 * PUBLIC: int __rep_bulk_message __P((ENV *, REP_BULK *, REP_THROTTLE *,
56 * PUBLIC:     DB_LSN *, const DBT *, u_int32_t));
57 */
58int
59__rep_bulk_message(env, bulk, repth, lsn, dbt, flags)
60	ENV *env;
61	REP_BULK *bulk;
62	REP_THROTTLE *repth;
63	DB_LSN *lsn;
64	const DBT *dbt;
65	u_int32_t flags;
66{
67	DB_REP *db_rep;
68	REP *rep;
69	__rep_bulk_args b_args;
70	size_t len;
71	int ret;
72	u_int32_t recsize, typemore;
73	u_int8_t *p;
74
75	db_rep = env->rep_handle;
76	rep = db_rep->region;
77	ret = 0;
78
79	/*
80	 * Figure out the total number of bytes needed for this record.
81	 */
82	recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);
83
84	/*
85	 * If *this* buffer is actively being transmitted, wait until
86	 * we can use it.
87	 */
88	MUTEX_LOCK(env, rep->mtx_clientdb);
89	while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
90		MUTEX_UNLOCK(env, rep->mtx_clientdb);
91		__os_yield(env, 1, 0);
92		MUTEX_LOCK(env, rep->mtx_clientdb);
93	}
94
95	/*
96	 * If the record is bigger than the buffer entirely, send the
97	 * current buffer and then return DB_REP_BULKOVF so that this
98	 * record is sent as a singleton.  Do we have enough info to
99	 * do that here?  XXX
100	 */
101	if (recsize > bulk->len) {
102		RPRINT(env, DB_VERB_REP_MSGS, (env,
103		    "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
104		    recsize, recsize, bulk->len));
105		STAT(rep->stat.st_bulk_overflows++);
106		(void)__rep_send_bulk(env, bulk, flags);
107		/*
108		 * XXX __rep_send_message...
109		 */
110		MUTEX_UNLOCK(env, rep->mtx_clientdb);
111		return (DB_REP_BULKOVF);
112	}
113	/*
114	 * If this record doesn't fit, send the current buffer.
115	 * Sending the buffer will reset the offset, but we will
116	 * drop the mutex while sending so we need to keep checking
117	 * if we're racing.
118	 */
119	while (recsize + *(bulk->offp) > bulk->len) {
120		RPRINT(env, DB_VERB_REP_MSGS, (env,
121	    "bulk_msg: Record %lu (%#lx) doesn't fit.  Send %lu (%#lx) now.",
122		    (u_long)recsize, (u_long)recsize,
123		    (u_long)bulk->len, (u_long)bulk->len));
124		STAT(rep->stat.st_bulk_fills++);
125		if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) {
126			MUTEX_UNLOCK(env, rep->mtx_clientdb);
127			return (ret);
128		}
129	}
130
131	/*
132	 * If we're using throttling, see if we are at the throttling
133	 * limit before we do any more work here, by checking if the
134	 * call to rep_send_throttle changed the repth->type to the
135	 * *_MORE message type.  If the throttling code hits the limit
136	 * then we're done here.
137	 */
138	if (bulk->type == REP_BULK_LOG)
139		typemore = REP_LOG_MORE;
140	else
141		typemore = REP_PAGE_MORE;
142	if (repth != NULL) {
143		if ((ret = __rep_send_throttle(env,
144		    bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) {
145			MUTEX_UNLOCK(env, rep->mtx_clientdb);
146			return (ret);
147		}
148		if (repth->type == typemore) {
149			RPRINT(env, DB_VERB_REP_MSGS, (env,
150			    "bulk_msg: Record %lu (0x%lx) hit throttle limit.",
151			    (u_long)recsize, (u_long)recsize));
152			MUTEX_UNLOCK(env, rep->mtx_clientdb);
153			return (ret);
154		}
155	}
156
157	/*
158	 * Now we own the buffer, and we know our record fits into it.
159	 * The buffer is structured with the len, LSN and then the record.
160	 * Copy the record into the buffer.  Then if we need to,
161	 * send the buffer.
162	 */
163	p = bulk->addr + *(bulk->offp);
164	b_args.len = dbt->size;
165	b_args.lsn = *lsn;
166	b_args.bulkdata = *dbt;
167	/*
168	 * If we're the first record, we need to save the first
169	 * LSN in the bulk structure.
170	 */
171	if (*(bulk->offp) == 0)
172		bulk->lsn = *lsn;
173	if (rep->version < DB_REPVERSION_47) {
174		len = 0;
175		memcpy(p, &dbt->size, sizeof(dbt->size));
176		p += sizeof(dbt->size);
177		memcpy(p, lsn, sizeof(DB_LSN));
178		p += sizeof(DB_LSN);
179		memcpy(p, dbt->data, dbt->size);
180		p += dbt->size;
181	} else if ((ret = __rep_bulk_marshal(env, &b_args, p,
182	    bulk->len, &len)) != 0)
183		goto err;
184	*(bulk->offp) = (uintptr_t)p + (uintptr_t)len - (uintptr_t)bulk->addr;
185	STAT(rep->stat.st_bulk_records++);
186	/*
187	 * Send the buffer if it is a perm record or a force.
188	 */
189	if (LF_ISSET(REPCTL_PERM)) {
190		RPRINT(env, DB_VERB_REP_MSGS, (env,
191		    "bulk_msg: Send buffer after copy due to PERM"));
192		ret = __rep_send_bulk(env, bulk, flags);
193	}
194err:
195	MUTEX_UNLOCK(env, rep->mtx_clientdb);
196	return (ret);
197
198}
199
200/*
201 * __rep_send_bulk --
202 *	This function transmits the bulk buffer given.  It assumes the
203 * caller holds the REP->mtx_clientdb.  We may release it and reacquire
204 * it during this call.  We will return with it held.
205 *
206 * PUBLIC: int __rep_send_bulk __P((ENV *, REP_BULK *, u_int32_t));
207 */
208int
209__rep_send_bulk(env, bulkp, ctlflags)
210	ENV *env;
211	REP_BULK *bulkp;
212	u_int32_t ctlflags;
213{
214	DBT dbt;
215	DB_REP *db_rep;
216	REP *rep;
217	int ret;
218
219	/*
220	 * If the offset is 0, we're done.  There is nothing to send.
221	 */
222	if (*(bulkp->offp) == 0)
223		return (0);
224
225	db_rep = env->rep_handle;
226	rep = db_rep->region;
227
228	/*
229	 * Set that this buffer is being actively transmitted.
230	 */
231	FLD_SET(*(bulkp->flagsp), BULK_XMIT);
232	DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp));
233	MUTEX_UNLOCK(env, rep->mtx_clientdb);
234	RPRINT(env, DB_VERB_REP_MSGS, (env,
235	    "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));
236
237	/*
238	 * Unlocked the mutex and now send the message.
239	 */
240	STAT(rep->stat.st_bulk_transfers++);
241	if ((ret = __rep_send_message(env,
242	    bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0)
243		ret = DB_REP_UNAVAIL;
244
245	MUTEX_LOCK(env, rep->mtx_clientdb);
246	/*
247	 * Ready the buffer for further records.
248	 */
249	*(bulkp->offp) = 0;
250	FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
251	return (ret);
252}
253
254/*
255 * __rep_bulk_alloc --
256 *	This function allocates and initializes an internal bulk buffer.
257 * This is used by the master when fulfilling a request for a chunk of
258 * log records or a bunch of pages.
259 *
260 * PUBLIC: int __rep_bulk_alloc __P((ENV *, REP_BULK *, int, uintptr_t *,
261 * PUBLIC:    u_int32_t *, u_int32_t));
262 */
263int
264__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type)
265	ENV *env;
266	REP_BULK *bulkp;
267	int eid;
268	uintptr_t *offp;
269	u_int32_t *flagsp, type;
270{
271	int ret;
272
273	memset(bulkp, 0, sizeof(REP_BULK));
274	*offp = *flagsp = 0;
275	bulkp->len = MEGABYTE;
276	if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0)
277		return (ret);
278	bulkp->offp = offp;
279	bulkp->type = type;
280	bulkp->eid = eid;
281	bulkp->flagsp = flagsp;
282	return (ret);
283}
284
285/*
286 * __rep_bulk_free --
287 *	This function sends the remainder of the bulk buffer and frees it.
288 *
289 * PUBLIC: int __rep_bulk_free __P((ENV *, REP_BULK *, u_int32_t));
290 */
291int
292__rep_bulk_free(env, bulkp, flags)
293	ENV *env;
294	REP_BULK *bulkp;
295	u_int32_t flags;
296{
297	DB_REP *db_rep;
298	int ret;
299
300	db_rep = env->rep_handle;
301
302	MUTEX_LOCK(env, db_rep->region->mtx_clientdb);
303	ret = __rep_send_bulk(env, bulkp, flags);
304	MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb);
305	__os_free(env, bulkp->addr);
306	return (ret);
307}
308
309/*
310 * __rep_send_message --
311 *	This is a wrapper for sending a message.  It takes care of constructing
312 * the control structure and calling the user's specified send function.
313 *
314 * PUBLIC: int __rep_send_message __P((ENV *, int,
315 * PUBLIC:     u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t));
316 */
317int
318__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags)
319	ENV *env;
320	int eid;
321	u_int32_t rtype;
322	DB_LSN *lsnp;
323	const DBT *dbt;
324	u_int32_t ctlflags, repflags;
325{
326	DBT cdbt, scrap_dbt;
327	DB_ENV *dbenv;
328	DB_LOG *dblp;
329	DB_REP *db_rep;
330	LOG *lp;
331	REP *rep;
332	REP_46_CONTROL cntrl46;
333	REP_OLD_CONTROL ocntrl;
334	__rep_control_args cntrl;
335	db_timespec msg_time;
336	int ret;
337	u_int32_t myflags, rectype;
338	u_int8_t buf[__REP_CONTROL_SIZE];
339	size_t len;
340
341	dbenv = env->dbenv;
342	db_rep = env->rep_handle;
343	rep = db_rep->region;
344	dblp = env->lg_handle;
345	lp = dblp->reginfo.primary;
346	ret = 0;
347
348#if defined(DEBUG_ROP) || defined(DEBUG_WOP)
349	if (db_rep->send == NULL)
350		return (0);
351#endif
352
353	/* Set up control structure. */
354	memset(&cntrl, 0, sizeof(cntrl));
355	memset(&ocntrl, 0, sizeof(ocntrl));
356	memset(&cntrl46, 0, sizeof(cntrl46));
357	if (lsnp == NULL)
358		ZERO_LSN(cntrl.lsn);
359	else
360		cntrl.lsn = *lsnp;
361	/*
362	 * Set the rectype based on the version we need to speak.
363	 */
364	if (rep->version == DB_REPVERSION)
365		cntrl.rectype = rtype;
366	else if (rep->version < DB_REPVERSION) {
367		cntrl.rectype = __rep_msg_to_old(rep->version, rtype);
368		RPRINT(env, DB_VERB_REP_MSGS, (env,
369		    "rep_send_msg: rtype %lu to version %lu record %lu.",
370		    (u_long)rtype, (u_long)rep->version,
371		    (u_long)cntrl.rectype));
372		if (cntrl.rectype == REP_INVALID)
373			return (ret);
374	} else {
375		__db_errx(env,
376    "rep_send_message: Unknown rep version %lu, my version %lu",
377		    (u_long)rep->version, (u_long)DB_REPVERSION);
378		return (__env_panic(env, EINVAL));
379	}
380	cntrl.flags = ctlflags;
381	cntrl.rep_version = rep->version;
382	cntrl.log_version = lp->persist.version;
383	cntrl.gen = rep->gen;
384
385	/* Don't assume the send function will be tolerant of NULL records. */
386	if (dbt == NULL) {
387		memset(&scrap_dbt, 0, sizeof(DBT));
388		dbt = &scrap_dbt;
389	}
390
391	/*
392	 * There are several types of records: commit and checkpoint records
393	 * that affect database durability, regular log records that might
394	 * be buffered on the master before being transmitted, and control
395	 * messages which don't require the guarantees of permanency, but
396	 * should not be buffered.
397	 *
398	 * There are request records that can be sent anywhere, and there
399	 * are rerequest records that the app might want to send to the master.
400	 */
401	myflags = repflags;
402	if (FLD_ISSET(ctlflags, REPCTL_PERM))
403		myflags |= DB_REP_PERMANENT;
404	else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND))
405		myflags |= DB_REP_NOBUFFER;
406	if (rtype == REP_LOG && !FLD_ISSET(ctlflags, REPCTL_PERM)) {
407		/*
408		 * Check if this is a log record we just read that
409		 * may need a REPCTL_PERM.  This is of type REP_LOG,
410		 * so we know that dbt is a log record.
411		 */
412		LOGCOPY_32(env, &rectype, dbt->data);
413		if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
414			F_SET(&cntrl, REPCTL_PERM);
415	}
416
417	/*
418	 * Let everyone know if we've been in an established group.
419	 */
420	if (F_ISSET(rep, REP_F_GROUP_ESTD))
421		F_SET(&cntrl, REPCTL_GROUP_ESTD);
422
423	/*
424	 * We're sending messages to some other version.  We cannot
425	 * assume DB_REP_ANYWHERE is available.  Turn it off.
426	 */
427	if (rep->version != DB_REPVERSION)
428		FLD_CLR(myflags, DB_REP_ANYWHERE);
429
430	/*
431	 * If we are a master sending a perm record, then set the
432	 * REPCTL_LEASE flag to have the client reply.  Also set
433	 * the start time that the client will echo back to us.
434	 *
435	 * !!! If we are a master, using leases, we had better not be
436	 * sending to an older version.
437	 */
438	if (IS_REP_MASTER(env) && IS_USING_LEASES(env) &&
439	    FLD_ISSET(ctlflags, REPCTL_PERM)) {
440		F_SET(&cntrl, REPCTL_LEASE);
441		DB_ASSERT(env, rep->version == DB_REPVERSION);
442		__os_gettime(env, &msg_time, 1);
443		cntrl.msg_sec = (u_int32_t)msg_time.tv_sec;
444		cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec;
445	}
446
447	REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags);
448#ifdef REP_DIAGNOSTIC
449	if (FLD_ISSET(
450	    env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG)
451		__rep_print_logmsg(env, dbt, lsnp);
452#endif
453
454	/*
455	 * If DB_REP_PERMANENT is set, the LSN better be non-zero.
456	 */
457	DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) ||
458	    !IS_ZERO_LSN(cntrl.lsn));
459
460	/*
461	 * If we're talking to an old version, send an old control structure.
462	 */
463	memset(&cdbt, 0, sizeof(cdbt));
464	if (rep->version <= DB_REPVERSION_45) {
465		if (rep->version == DB_REPVERSION_45 &&
466		    F_ISSET(&cntrl, REPCTL_INIT)) {
467			F_CLR(&cntrl, REPCTL_INIT);
468			F_SET(&cntrl, REPCTL_INIT_45);
469		}
470		ocntrl.rep_version = cntrl.rep_version;
471		ocntrl.log_version = cntrl.log_version;
472		ocntrl.lsn = cntrl.lsn;
473		ocntrl.rectype = cntrl.rectype;
474		ocntrl.gen = cntrl.gen;
475		ocntrl.flags = cntrl.flags;
476		cdbt.data = &ocntrl;
477		cdbt.size = sizeof(ocntrl);
478	} else if (rep->version == DB_REPVERSION_46) {
479		cntrl46.rep_version = cntrl.rep_version;
480		cntrl46.log_version = cntrl.log_version;
481		cntrl46.lsn = cntrl.lsn;
482		cntrl46.rectype = cntrl.rectype;
483		cntrl46.gen = cntrl.gen;
484		cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec;
485		cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec;
486		cntrl46.flags = cntrl.flags;
487		cdbt.data = &cntrl46;
488		cdbt.size = sizeof(cntrl46);
489	} else {
490		(void)__rep_control_marshal(env, &cntrl, buf,
491		    __REP_CONTROL_SIZE, &len);
492		DB_INIT_DBT(cdbt, buf, len);
493	}
494
495	/*
496	 * We set the LSN above to something valid.  Give the master the
497	 * actual LSN so that they can coordinate with permanent records from
498	 * the client if they want to.
499	 *
500	 * !!! Even though we marshalled the control message for transmission,
501	 * give the transport function the real LSN.
502	 */
503	ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);
504
505	/*
506	 * We don't hold the rep lock, so this could miscount if we race.
507	 * I don't think it's worth grabbing the mutex for that bit of
508	 * extra accuracy.
509	 */
510	if (ret != 0) {
511		RPRINT(env, DB_VERB_REP_MSGS, (env,
512		    "rep_send_function returned: %d", ret));
513#ifdef HAVE_STATISTICS
514		rep->stat.st_msgs_send_failures++;
515	} else
516		rep->stat.st_msgs_sent++;
517#else
518	}
519#endif
520	return (ret);
521}
522
523#ifdef REP_DIAGNOSTIC
524/*
525 * __rep_print_logmsg --
526 *	This is a debugging routine for printing out log records that
527 * we are about to transmit to a client.
528 */
529static void
530__rep_print_logmsg(env, logdbt, lsnp)
531	ENV *env;
532	const DBT *logdbt;
533	DB_LSN *lsnp;
534{
535	static int first = 1;
536	static DB_DISTAB dtab;
537
538	if (first) {
539		first = 0;
540
541		(void)__bam_init_print(env, &dtab);
542		(void)__crdel_init_print(env, &dtab);
543		(void)__db_init_print(env, &dtab);
544		(void)__dbreg_init_print(env, &dtab);
545		(void)__fop_init_print(env, &dtab);
546		(void)__ham_init_print(env, &dtab);
547		(void)__qam_init_print(env, &dtab);
548		(void)__txn_init_print(env, &dtab);
549	}
550
551	(void)__db_dispatch(
552	    env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
553}
554#endif
555
556/*
557 * __rep_new_master --
558 *	Called after a master election to sync back up with a new master.
559 * It's possible that we already know of this new master in which case
560 * we don't need to do anything.
561 *
562 * This is written assuming that this message came from the master; we
563 * need to enforce that in __rep_process_record, but right now, we have
564 * no way to identify the master.
565 *
566 * PUBLIC: int __rep_new_master __P((ENV *, __rep_control_args *, int));
567 */
568int
569__rep_new_master(env, cntrl, eid)
570	ENV *env;
571	__rep_control_args *cntrl;
572	int eid;
573{
574	DBT dbt;
575	DB_ENV *dbenv;
576	DB_LOG *dblp;
577	DB_LOGC *logc;
578	DB_LSN first_lsn, lsn;
579	DB_REP *db_rep;
580	DB_THREAD_INFO *ip;
581	LOG *lp;
582	REGENV *renv;
583	REGINFO *infop;
584	REP *rep;
585	db_timeout_t lease_to;
586	u_int32_t unused;
587	int change, do_req, lockout, ret, t_ret;
588
589	dbenv = env->dbenv;
590	db_rep = env->rep_handle;
591	rep = db_rep->region;
592	dblp = env->lg_handle;
593	lp = dblp->reginfo.primary;
594	ret = 0;
595	logc = NULL;
596	lockout = 0;
597	REP_SYSTEM_LOCK(env);
598	change = rep->gen != cntrl->gen || rep->master_id != eid;
599	if (change) {
600		/*
601		 * If we are already locking out others, we're either
602		 * in the middle of sync-up recovery or internal init
603		 * when this newmaster comes in (we also lockout in
604		 * rep_start, but we cannot be racing that because we
605		 * don't allow rep_proc_msg when rep_start is going on).
606		 *
607		 * If we were in the middle of an internal initialization
608		 * and we've discovered a new master instead, clean up
609		 * our old internal init information.  We need to clean
610		 * up any flags and unlock our lockout.
611		 */
612		if (F_ISSET(rep, REP_F_READY_MSG))
613			goto lckout;
614
615		if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
616			goto errlck;
617
618		lockout = 1;
619		/*
620		 * We must wait any remaining lease time before accepting
621		 * this new master.  This must be after the lockout above
622		 * so that no new message can be processed and re-grant
623		 * the lease out from under us.
624		 */
625		if (IS_USING_LEASES(env) &&
626		    ((lease_to = __rep_lease_waittime(env)) != 0)) {
627			REP_SYSTEM_UNLOCK(env);
628			__os_yield(env, 0, (u_long)lease_to);
629			REP_SYSTEM_LOCK(env);
630		}
631
632		if ((ret = __env_init_rec(env, cntrl->log_version)) != 0)
633			goto errlck;
634
635		REP_SYSTEM_UNLOCK(env);
636
637		MUTEX_LOCK(env, rep->mtx_clientdb);
638		__os_gettime(env, &lp->rcvd_ts, 1);
639		lp->wait_ts = rep->request_gap;
640		ZERO_LSN(lp->verify_lsn);
641		ZERO_LSN(lp->waiting_lsn);
642		ZERO_LSN(lp->max_wait_lsn);
643		/*
644		 * Open if we need to, in preparation for the truncate
645		 * we'll do in a moment.
646		 */
647		if (db_rep->rep_db == NULL &&
648		    (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
649			MUTEX_UNLOCK(env, rep->mtx_clientdb);
650			goto err;
651		}
652
653		REP_SYSTEM_LOCK(env);
654		if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) {
655			ret = __rep_init_cleanup(env, rep, DB_FORCE);
656			/*
657			 * Note that if an in-progress internal init was indeed
658			 * "cleaned up", clearing these flags now will allow the
659			 * application to see a completely empty database
660			 * environment for a moment (until the master responds
661			 * to our ALL_REQ).
662			 */
663			F_CLR(rep, REP_F_RECOVER_MASK);
664		}
665		MUTEX_UNLOCK(env, rep->mtx_clientdb);
666		if (ret != 0) {
667			/* TODO: consider add'l error recovery steps. */
668			goto errlck;
669		}
670		ENV_GET_THREAD_INFO(env, ip);
671		if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused))
672		    != 0)
673			goto errlck;
674
675		/*
676		 * This needs to be performed under message lockout
677		 * if we're actually changing master.
678		 */
679		__rep_elect_done(env, rep, 1);
680		RPRINT(env, DB_VERB_REP_MISC, (env,
681		    "Updating gen from %lu to %lu from master %d",
682		    (u_long)rep->gen, (u_long)cntrl->gen, eid));
683		rep->gen = cntrl->gen;
684		(void)__rep_write_gen(env, rep->gen);
685		if (rep->egen <= rep->gen)
686			rep->egen = rep->gen + 1;
687		rep->master_id = eid;
688		STAT(rep->stat.st_master_changes++);
689		rep->stat.st_startup_complete = 0;
690		__log_set_version(env, cntrl->log_version);
691		rep->version = cntrl->rep_version;
692		RPRINT(env, DB_VERB_REP_MISC, (env,
693		    "egen: %lu. rep version %lu",
694		    (u_long)rep->egen, (u_long)rep->version));
695
696		/*
697		 * If we're delaying client sync-up, we know we have a
698		 * new/changed master now, set flag indicating we are
699		 * actively delaying.
700		 */
701		if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
702			F_SET(rep, REP_F_DELAY);
703		F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY);
704		F_CLR(rep, REP_F_READY_MSG);
705		lockout = 0;
706	} else
707		__rep_elect_done(env, rep, 1);
708	REP_SYSTEM_UNLOCK(env);
709
710	MUTEX_LOCK(env, rep->mtx_clientdb);
711	lsn = lp->ready_lsn;
712
713	if (!change) {
714		ret = 0;
715		do_req = __rep_check_doreq(env, rep);
716		MUTEX_UNLOCK(env, rep->mtx_clientdb);
717		/*
718		 * If there wasn't a change, we might still have some
719		 * catching up or verification to do.
720		 */
721		if (do_req &&
722		    (F_ISSET(rep, REP_F_RECOVER_MASK) ||
723		    LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) {
724			ret = __rep_resend_req(env, 0);
725			if (ret != 0)
726				RPRINT(env, DB_VERB_REP_MISC, (env,
727				    "resend_req ret is %lu", (u_long)ret));
728		}
729		/*
730		 * If we're not in one of the recovery modes, we need to
731		 * clear the NOARCHIVE flag.  Elections set NOARCHIVE
732		 * and if we called an election and found the same
733		 * master, we need to clear NOARCHIVE here.
734		 */
735		if (!F_ISSET(rep, REP_F_RECOVER_MASK)) {
736			REP_SYSTEM_LOCK(env);
737			F_CLR(rep, REP_F_NOARCHIVE);
738			REP_SYSTEM_UNLOCK(env);
739		}
740		return (ret);
741	}
742	MUTEX_UNLOCK(env, rep->mtx_clientdb);
743
744	/*
745	 * If the master changed, we need to start the process of
746	 * figuring out what our last valid log record is.  However,
747	 * if both the master and we agree that the max LSN is 0,0,
748	 * then there is no recovery to be done.  If we are at 0 and
749	 * the master is not, then we just need to request all the log
750	 * records from the master.
751	 */
752	if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
753		if ((ret = __rep_newmaster_empty(env, eid)) != 0)
754			goto err;
755		(void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
756		return (DB_REP_NEWMASTER);
757	}
758
759	memset(&dbt, 0, sizeof(dbt));
760	/*
761	 * If this client is farther ahead on the log file than the master, see
762	 * if there is any overlap in the logs.  If not, the client is too
763	 * far ahead of the master and we cannot determine they're part of
764	 * the same replication group.
765	 */
766	if (cntrl->lsn.file < lsn.file) {
767		if ((ret = __log_cursor(env, &logc)) != 0)
768			goto err;
769		ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST);
770		if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
771			ret = t_ret;
772		if (ret == DB_NOTFOUND)
773			goto notfound;
774		else if (ret != 0)
775			goto err;
776		if (cntrl->lsn.file < first_lsn.file)
777			goto notfound;
778	}
779	if ((ret = __log_cursor(env, &logc)) != 0)
780		goto err;
781	ret = __rep_log_backup(env, rep, logc, &lsn);
782	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
783		ret = t_ret;
784	if (ret == DB_NOTFOUND)
785		goto notfound;
786	else if (ret != 0)
787		goto err;
788
789	/*
790	 * Finally, we have a record to ask for.
791	 */
792	MUTEX_LOCK(env, rep->mtx_clientdb);
793	lp->verify_lsn = lsn;
794	__os_gettime(env, &lp->rcvd_ts, 1);
795	lp->wait_ts = rep->request_gap;
796	MUTEX_UNLOCK(env, rep->mtx_clientdb);
797	if (!F_ISSET(rep, REP_F_DELAY))
798		(void)__rep_send_message(env,
799		    eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
800
801	(void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
802	return (DB_REP_NEWMASTER);
803
804err:	/*
805	 * If we failed, we need to clear the flags we may have set above
806	 * because we're not going to be setting the verify_lsn.
807	 */
808	REP_SYSTEM_LOCK(env);
809errlck:	if (lockout)
810		F_CLR(rep, REP_F_READY_MSG);
811	F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY);
812lckout:	REP_SYSTEM_UNLOCK(env);
813	return (ret);
814
815notfound:
816	/*
817	 * If we don't have an identification record, we still
818	 * might have some log records but we're discarding them
819	 * to sync up with the master from the start.
820	 * Therefore, truncate our log and treat it as if it
821	 * were empty.  In-memory logs can't be completely
822	 * zeroed using __log_vtruncate, so just zero them out.
823	 */
824	if (lp->db_log_inmemory)
825		ZERO_LSN(lsn);
826	else
827		INIT_LSN(lsn);
828	RPRINT(env, DB_VERB_REP_MISC,
829	    (env, "No commit or ckp found.  Truncate log."));
830	ret = lp->db_log_inmemory ?
831	    __log_zero(env, &lsn) :
832	    __log_vtruncate(env, &lsn, &lsn, NULL);
833	if (ret != 0 && ret != DB_NOTFOUND)
834		return (ret);
835	infop = env->reginfo;
836	renv = infop->primary;
837	REP_SYSTEM_LOCK(env);
838	(void)time(&renv->rep_timestamp);
839	REP_SYSTEM_UNLOCK(env);
840	if ((ret = __rep_newmaster_empty(env, eid)) != 0)
841		goto err;
842	return (DB_REP_NEWMASTER);
843}
844
845/*
846 * __rep_newmaster_empty
847 *      Handle the case of a NEWMASTER message received when we have an empty
848 * log.  This requires internal init.  If we can't do that because of
849 * NOAUTOINIT, return JOIN_FAILURE.  If F_DELAY is in effect, don't even
850 * consider NOAUTOINIT yet, because they could change it before rep_sync call.
851 */
852static int
853__rep_newmaster_empty(env, eid)
854	ENV *env;
855	int eid;
856{
857	DB_REP *db_rep;
858	LOG *lp;
859	REP *rep;
860	int msg, ret;
861
862	db_rep = env->rep_handle;
863	rep = db_rep->region;
864	lp = env->lg_handle->reginfo.primary;
865	msg = ret = 0;
866
867	MUTEX_LOCK(env, rep->mtx_clientdb);
868	REP_SYSTEM_LOCK(env);
869	lp->wait_ts = rep->request_gap;
870
871	/* Usual case is to skip to UPDATE state; we may revise this below. */
872	F_CLR(rep, REP_F_RECOVER_VERIFY);
873	F_SET(rep, REP_F_RECOVER_UPDATE);
874
875	if (F_ISSET(rep, REP_F_DELAY)) {
876		/*
877		 * Having properly set up wait_ts for later, nothing more to
878		 * do now.
879		 */
880	} else if (FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) {
881		F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
882		ret = DB_REP_JOIN_FAILURE;
883	} else {
884		/* Normal case: neither DELAY nor NOAUTOINIT. */
885		msg = 1;
886	}
887	REP_SYSTEM_UNLOCK(env);
888	MUTEX_UNLOCK(env, rep->mtx_clientdb);
889
890	if (msg)
891		(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
892		    NULL, NULL, 0, 0);
893	return (ret);
894}
895
896/*
897 * __rep_noarchive
898 *	Used by log_archive to determine if it is okay to remove
899 * log files.
900 *
901 * PUBLIC: int __rep_noarchive __P((ENV *));
902 */
903int
904__rep_noarchive(env)
905	ENV *env;
906{
907	DB_REP *db_rep;
908	REGENV *renv;
909	REGINFO *infop;
910	REP *rep;
911	time_t timestamp;
912
913	infop = env->reginfo;
914	renv = infop->primary;
915
916	/*
917	 * This is tested before REP_ON below because we always need
918	 * to obey if any replication process has disabled archiving.
919	 * Everything is in the environment region that we need here.
920	 */
921	if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
922		(void)time(&timestamp);
923		TIMESTAMP_CHECK(env, timestamp, renv);
924		/*
925		 * Check if we're still locked out after checking
926		 * the timestamp.
927		 */
928		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
929			return (EINVAL);
930	}
931
932	if (!REP_ON(env))
933		return (0);
934
935	db_rep = env->rep_handle;
936	rep = db_rep->region;
937	return (F_ISSET(rep, REP_F_NOARCHIVE) ? 1 : 0);
938}
939
940/*
941 * __rep_send_vote
942 *	Send this site's vote for the election.
943 *
944 * PUBLIC: void __rep_send_vote __P((ENV *, DB_LSN *, u_int32_t, u_int32_t,
945 * PUBLIC:    u_int32_t, u_int32_t, u_int32_t, int, u_int32_t, u_int32_t));
946 */
947void
948__rep_send_vote(env, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype, flags)
949	ENV *env;
950	DB_LSN *lsnp;
951	int eid;
952	u_int32_t nsites, nvotes, pri;
953	u_int32_t flags, egen, tie, vtype;
954{
955	DB_REP *db_rep;
956	DBT vote_dbt;
957	REP *rep;
958	REP_OLD_VOTE_INFO ovi;
959	__rep_vote_info_args vi;
960	u_int8_t buf[__REP_VOTE_INFO_SIZE];
961	size_t len;
962
963	db_rep = env->rep_handle;
964	rep = db_rep->region;
965
966	memset(&vi, 0, sizeof(vi));
967	memset(&vote_dbt, 0, sizeof(vote_dbt));
968
969	/*
970	 * In 4.7 we went to fixed sized fields.  They may not be
971	 * the same as the sizes in older versions.
972	 */
973	if (rep->version < DB_REPVERSION_47) {
974		memset(&ovi, 0, sizeof(ovi));
975		ovi.egen = egen;
976		ovi.priority = (int) pri;
977		ovi.nsites = (int) nsites;
978		ovi.nvotes = (int) nvotes;
979		ovi.tiebreaker = tie;
980		vote_dbt.data = &ovi;
981		vote_dbt.size = sizeof(ovi);
982	} else {
983		vi.egen = egen;
984		vi.priority = pri;
985		vi.nsites = nsites;
986		vi.nvotes = nvotes;
987		vi.tiebreaker = tie;
988		(void)__rep_vote_info_marshal(env, &vi, buf,
989		    __REP_VOTE_INFO_SIZE, &len);
990		DB_INIT_DBT(vote_dbt, buf, len);
991	}
992
993	(void)__rep_send_message(env, eid, vtype, lsnp, &vote_dbt, flags, 0);
994}
995
996/*
997 * __rep_elect_done
998 *	Clear all election information for this site.  Assumes the
999 *	caller hold the region mutex.
1000 *
1001 * PUBLIC: void __rep_elect_done __P((ENV *, REP *, int));
1002 */
1003void
1004__rep_elect_done(env, rep, found_master)
1005	ENV *env;
1006	REP *rep;
1007	int found_master;
1008{
1009	int inelect;
1010	db_timespec endtime;
1011
1012	inelect = IN_ELECTION(rep);
1013	F_CLR(rep,
1014	    REP_F_EPHASE0 | REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY);
1015	/*
1016	 * Finding a master trumps finding a new egen.
1017	 */
1018	if (found_master)
1019		F_CLR(rep, REP_F_EGENUPDATE);
1020	rep->sites = 0;
1021	rep->votes = 0;
1022	if (inelect) {
1023		if (timespecisset(&rep->etime)) {
1024			__os_gettime(env, &endtime, 1);
1025			timespecsub(&endtime, &rep->etime);
1026#ifdef HAVE_STATISTICS
1027			rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec;
1028			rep->stat.st_election_usec = (u_int32_t)
1029			    (endtime.tv_nsec / NS_PER_US);
1030#endif
1031			RPRINT(env, DB_VERB_REP_ELECT, (env,
1032			    "Election finished in %lu.%09lu sec",
1033			    (u_long)endtime.tv_sec, (u_long)endtime.tv_nsec));
1034			timespecclear(&rep->etime);
1035		}
1036		rep->egen++;
1037	}
1038	RPRINT(env, DB_VERB_REP_ELECT,
1039	    (env, "Election done; egen %lu", (u_long)rep->egen));
1040}
1041
1042/*
1043 * __rep_grow_sites --
1044 *	Called to allocate more space in the election tally information.
1045 * Called with the rep mutex held.  We need to call the region mutex, so
1046 * we need to make sure that we *never* acquire those mutexes in the
1047 * opposite order.
1048 *
1049 * PUBLIC: int __rep_grow_sites __P((ENV *, u_int32_t));
1050 */
1051int
1052__rep_grow_sites(env, nsites)
1053	ENV *env;
1054	u_int32_t nsites;
1055{
1056	REGENV *renv;
1057	REGINFO *infop;
1058	REP *rep;
1059	int ret, *tally;
1060	u_int32_t nalloc;
1061
1062	rep = env->rep_handle->region;
1063
1064	/*
1065	 * Allocate either twice the current allocation or nsites,
1066	 * whichever is more.
1067	 */
1068	nalloc = 2 * rep->asites;
1069	if (nalloc < nsites)
1070		nalloc = nsites;
1071
1072	infop = env->reginfo;
1073	renv = infop->primary;
1074	MUTEX_LOCK(env, renv->mtx_regenv);
1075
1076	/*
1077	 * We allocate 2 tally regions, one for tallying VOTE1's and
1078	 * one for VOTE2's.  Always grow them in tandem, because if we
1079	 * get more VOTE1's we'll always expect more VOTE2's then too.
1080	 */
1081	if ((ret = __env_alloc(infop,
1082	    (size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) {
1083		if (rep->tally_off != INVALID_ROFF)
1084			 __env_alloc_free(
1085			     infop, R_ADDR(infop, rep->tally_off));
1086		rep->tally_off = R_OFFSET(infop, tally);
1087		if ((ret = __env_alloc(infop,
1088		    (size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) {
1089			/* Success */
1090			if (rep->v2tally_off != INVALID_ROFF)
1091				 __env_alloc_free(infop,
1092				    R_ADDR(infop, rep->v2tally_off));
1093			rep->v2tally_off = R_OFFSET(infop, tally);
1094			rep->asites = nalloc;
1095			rep->nsites = nsites;
1096		} else {
1097			/*
1098			 * We were unable to allocate both.  So, we must
1099			 * free the first one and reinitialize.  If
1100			 * v2tally_off is valid, it is from an old
1101			 * allocation and we are clearing it all out due
1102			 * to the error.
1103			 */
1104			if (rep->v2tally_off != INVALID_ROFF)
1105				 __env_alloc_free(infop,
1106				    R_ADDR(infop, rep->v2tally_off));
1107			__env_alloc_free(infop,
1108			    R_ADDR(infop, rep->tally_off));
1109			rep->v2tally_off = rep->tally_off = INVALID_ROFF;
1110			rep->asites = 0;
1111			rep->nsites = 0;
1112		}
1113	}
1114	MUTEX_UNLOCK(env, renv->mtx_regenv);
1115	return (ret);
1116}
1117
1118/*
1119 * __env_rep_enter --
1120 *
1121 * Check if we are in the middle of replication initialization and/or
1122 * recovery, and if so, disallow operations.  If operations are allowed,
1123 * increment handle-counts, so that we do not start recovery while we
1124 * are operating in the library.
1125 *
1126 * PUBLIC: int __env_rep_enter __P((ENV *, int));
1127 */
1128int
1129__env_rep_enter(env, checklock)
1130	ENV *env;
1131	int checklock;
1132{
1133	DB_REP *db_rep;
1134	REGENV *renv;
1135	REGINFO *infop;
1136	REP *rep;
1137	int cnt;
1138	time_t	timestamp;
1139
1140	/* Check if locks have been globally turned off. */
1141	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1142		return (0);
1143
1144	db_rep = env->rep_handle;
1145	rep = db_rep->region;
1146
1147	infop = env->reginfo;
1148	renv = infop->primary;
1149	if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
1150		(void)time(&timestamp);
1151		TIMESTAMP_CHECK(env, timestamp, renv);
1152		/*
1153		 * Check if we're still locked out after checking
1154		 * the timestamp.
1155		 */
1156		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
1157			return (EINVAL);
1158	}
1159
1160	REP_SYSTEM_LOCK(env);
1161	for (cnt = 0; F_ISSET(rep, REP_F_READY_API);) {
1162		REP_SYSTEM_UNLOCK(env);
1163		if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
1164			__db_errx(env,
1165    "Operation locked out.  Waiting for replication lockout to complete");
1166			return (DB_REP_LOCKOUT);
1167		}
1168		__os_yield(env, 1, 0);
1169		REP_SYSTEM_LOCK(env);
1170		if (++cnt % 60 == 0)
1171			__db_errx(env,
1172    "DB_ENV handle waiting %d minutes for replication lockout to complete",
1173			    cnt / 60);
1174	}
1175	rep->handle_cnt++;
1176	REP_SYSTEM_UNLOCK(env);
1177
1178	return (0);
1179}
1180
1181/*
1182 * __env_db_rep_exit --
1183 *
1184 *	Decrement handle count upon routine exit.
1185 *
1186 * PUBLIC: int __env_db_rep_exit __P((ENV *));
1187 */
1188int
1189__env_db_rep_exit(env)
1190	ENV *env;
1191{
1192	DB_REP *db_rep;
1193	REP *rep;
1194
1195	/* Check if locks have been globally turned off. */
1196	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1197		return (0);
1198
1199	db_rep = env->rep_handle;
1200	rep = db_rep->region;
1201
1202	REP_SYSTEM_LOCK(env);
1203	rep->handle_cnt--;
1204	REP_SYSTEM_UNLOCK(env);
1205
1206	return (0);
1207}
1208
1209/*
1210 * __db_rep_enter --
1211 *	Called in replicated environments to keep track of in-use handles
1212 * and prevent any concurrent operation during recovery.  If checkgen is
1213 * non-zero, then we verify that the dbp has the same handle as the env.
1214 *
1215 * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll
1216 * sleep before returning DB_DEADLOCK.  Without the sleep, it is likely
1217 * the application will immediately try again and could reach a retry
1218 * limit before replication has a chance to finish.  The sleep increases
1219 * the probability that an application retry will succeed.
1220 *
1221 * PUBLIC: int __db_rep_enter __P((DB *, int, int, int));
1222 */
1223int
1224__db_rep_enter(dbp, checkgen, checklock, return_now)
1225	DB *dbp;
1226	int checkgen, checklock, return_now;
1227{
1228	DB_REP *db_rep;
1229	ENV *env;
1230	REGENV *renv;
1231	REGINFO *infop;
1232	REP *rep;
1233	time_t	timestamp;
1234
1235	env = dbp->env;
1236	/* Check if locks have been globally turned off. */
1237	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1238		return (0);
1239
1240	db_rep = env->rep_handle;
1241	rep = db_rep->region;
1242	infop = env->reginfo;
1243	renv = infop->primary;
1244
1245	if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
1246		(void)time(&timestamp);
1247		TIMESTAMP_CHECK(env, timestamp, renv);
1248		/*
1249		 * Check if we're still locked out after checking
1250		 * the timestamp.
1251		 */
1252		if (F_ISSET(renv, DB_REGENV_REPLOCKED))
1253			return (EINVAL);
1254	}
1255	REP_SYSTEM_LOCK(env);
1256	/*
1257	 * !!!
1258	 * Note, we are checking REP_F_READY_OP, but we are
1259	 * incrementing rep->handle_cnt.  That seems like a mismatch,
1260	 * but the intention is to return DEADLOCK to the application
1261	 * which will cause them to abort the txn quickly and allow
1262	 * the lockout to proceed.
1263	 *
1264	 * The correctness of doing this depends on the fact that
1265	 * lockout of the API always sets REP_F_READY_OP first.
1266	 */
1267	if (F_ISSET(rep, REP_F_READY_OP)) {
1268		REP_SYSTEM_UNLOCK(env);
1269		if (!return_now)
1270			__os_yield(env, 5, 0);
1271		return (DB_LOCK_DEADLOCK);
1272	}
1273
1274	if (checkgen && dbp->timestamp != renv->rep_timestamp) {
1275		REP_SYSTEM_UNLOCK(env);
1276		__db_errx(env, "%s %s",
1277		    "replication recovery unrolled committed transactions;",
1278		    "open DB and DBcursor handles must be closed");
1279		return (DB_REP_HANDLE_DEAD);
1280	}
1281	rep->handle_cnt++;
1282	REP_SYSTEM_UNLOCK(env);
1283
1284	return (0);
1285}
1286
1287/*
1288 * __op_rep_enter --
1289 *
1290 *	Check if we are in the middle of replication initialization and/or
1291 * recovery, and if so, disallow new multi-step operations, such as
1292 * transaction and memp gets.  If operations are allowed,
1293 * increment the op_cnt, so that we do not start recovery while we have
1294 * active operations.
1295 *
1296 * PUBLIC: int __op_rep_enter __P((ENV *));
1297 */
1298int
1299__op_rep_enter(env)
1300	ENV *env;
1301{
1302	DB_REP *db_rep;
1303	REP *rep;
1304	int cnt;
1305
1306	/* Check if locks have been globally turned off. */
1307	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1308		return (0);
1309
1310	db_rep = env->rep_handle;
1311	rep = db_rep->region;
1312
1313	REP_SYSTEM_LOCK(env);
1314	for (cnt = 0; F_ISSET(rep, REP_F_READY_OP);) {
1315		REP_SYSTEM_UNLOCK(env);
1316		if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
1317			__db_errx(env,
1318    "Operation locked out.  Waiting for replication lockout to complete");
1319			return (DB_REP_LOCKOUT);
1320		}
1321		__os_yield(env, 5, 0);
1322		cnt += 5;
1323		REP_SYSTEM_LOCK(env);
1324		if (cnt % 60 == 0)
1325			__db_errx(env,
1326	"__op_rep_enter waiting %d minutes for lockout to complete",
1327			    cnt / 60);
1328	}
1329	rep->op_cnt++;
1330	REP_SYSTEM_UNLOCK(env);
1331
1332	return (0);
1333}
1334
1335/*
1336 * __op_rep_exit --
1337 *
1338 *	Decrement op count upon transaction commit/abort/discard or
1339 *	memp_fput.
1340 *
1341 * PUBLIC: int __op_rep_exit __P((ENV *));
1342 */
1343int
1344__op_rep_exit(env)
1345	ENV *env;
1346{
1347	DB_REP *db_rep;
1348	REP *rep;
1349
1350	/* Check if locks have been globally turned off. */
1351	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1352		return (0);
1353
1354	db_rep = env->rep_handle;
1355	rep = db_rep->region;
1356
1357	REP_SYSTEM_LOCK(env);
1358	DB_ASSERT(env, rep->op_cnt > 0);
1359	rep->op_cnt--;
1360	REP_SYSTEM_UNLOCK(env);
1361
1362	return (0);
1363}
1364
1365/*
1366 * __rep_lockout_api --
1367 *	Coordinate with other threads in the library and active txns so
1368 *	that we can run single-threaded, for recovery or internal backup.
1369 *	Assumes the caller holds the region mutex.
1370 *
1371 * PUBLIC: int __rep_lockout_api __P((ENV *, REP *));
1372 */
1373int
1374__rep_lockout_api(env, rep)
1375	ENV *env;
1376	REP *rep;
1377{
1378	int ret;
1379
1380	/*
1381	 * We must drain long-running operations first.  We check
1382	 * REP_F_READY_OP in __db_rep_enter in order to allow them
1383	 * to abort existing txns quickly.  Therefore, we must
1384	 * always lockout REP_F_READY_OP first, then REP_F_READY_API.
1385	 */
1386	if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0,
1387	    "op_cnt", REP_F_READY_OP)) != 0)
1388		return (ret);
1389	return (__rep_lockout_int(env, rep, &rep->handle_cnt, 0,
1390	    "handle_cnt", REP_F_READY_API));
1391}
1392
1393/*
1394 * __rep_lockout_apply --
1395 *	Coordinate with other threads processing messages so that
1396 *	we can run single-threaded and know that no incoming
1397 *	message can apply new log records.
1398 *	This call should be short-term covering a specific critical
1399 *	operation where we need to make sure no new records change
1400 *	the log.  Currently used to coordinate with elections.
1401 *	Assumes the caller holds the region mutex.
1402 *
1403 * PUBLIC: int __rep_lockout_apply __P((ENV *, REP *, u_int32_t));
1404 */
1405int
1406__rep_lockout_apply(env, rep, apply_th)
1407	ENV *env;
1408	REP *rep;
1409	u_int32_t apply_th;
1410{
1411	return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th,
1412	    "apply_th", REP_F_READY_APPLY));
1413}
1414
1415/*
1416 * __rep_lockout_msg --
1417 *	Coordinate with other threads processing messages so that
1418 *	we can run single-threaded and know that no incoming
1419 *	message can change the world (i.e., like a NEWMASTER message).
1420 *	This call should be short-term covering a specific critical
1421 *	operation where we need to make sure no new messages arrive
1422 *	in the middle and all message threads are out before we start it.
1423 *	Assumes the caller holds the region mutex.
1424 *
1425 * PUBLIC: int __rep_lockout_msg __P((ENV *, REP *, u_int32_t));
1426 */
1427int
1428__rep_lockout_msg(env, rep, msg_th)
1429	ENV *env;
1430	REP *rep;
1431	u_int32_t msg_th;
1432{
1433	return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th,
1434	    "msg_th", REP_F_READY_MSG));
1435}
1436
1437/*
1438 * __rep_lockout_int --
1439 *	Internal common code for locking out and coordinating
1440 *	with other areas of the code.
1441 *	Assumes the caller holds the region mutex.
1442 *
1443 */
1444static int
1445__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag)
1446	ENV *env;
1447	REP *rep;
1448	u_int32_t *fieldp;
1449	const char *msg;
1450	u_int32_t field_val, lockout_flag;
1451{
1452	int wait_cnt;
1453
1454	F_SET(rep, lockout_flag);
1455	for (wait_cnt = 0; *fieldp > field_val;) {
1456		REP_SYSTEM_UNLOCK(env);
1457		__os_yield(env, 1, 0);
1458#ifdef DIAGNOSTIC
1459		if (wait_cnt == 5)
1460			__db_errx(env,
1461"Waiting for %s (%lu) to complete replication lockout",
1462			msg, (u_long)*fieldp);
1463		if (++wait_cnt % 60 == 0)
1464			__db_errx(env,
1465"Waiting for %s (%lu) to complete replication lockout for %d minutes",
1466			msg, (u_long)*fieldp, wait_cnt / 60);
1467#endif
1468		REP_SYSTEM_LOCK(env);
1469	}
1470
1471	COMPQUIET(msg, NULL);
1472	return (0);
1473}
1474
1475/*
1476 * __rep_send_throttle -
1477 *	Send a record, throttling if necessary.  Callers of this function
1478 * will throttle - breaking out of their loop, if the repth->type field
1479 * changes from the normal message type to the *_MORE message type.
1480 * This function will send the normal type unless throttling gets invoked.
1481 * Then it sets the type field and sends the _MORE message.
1482 *
1483 * Throttling is always only relevant in serving requests, so we always send
1484 * with REPCTL_RESEND.  Additional desired flags can be passed in the ctlflags
1485 * argument.
1486 *
1487 * PUBLIC: int __rep_send_throttle __P((ENV *, int, REP_THROTTLE *,
1488 * PUBLIC:    u_int32_t, u_int32_t));
1489 */
1490int
1491__rep_send_throttle(env, eid, repth, flags, ctlflags)
1492	ENV *env;
1493	int eid;
1494	REP_THROTTLE *repth;
1495	u_int32_t ctlflags, flags;
1496{
1497	DB_REP *db_rep;
1498	REP *rep;
1499	u_int32_t size, typemore;
1500	int check_limit;
1501
1502	check_limit = repth->gbytes != 0 || repth->bytes != 0;
1503	/*
1504	 * If we only want to do throttle processing and we don't have it
1505	 * turned on, return immediately.
1506	 */
1507	if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
1508		return (0);
1509
1510	db_rep = env->rep_handle;
1511	rep = db_rep->region;
1512	typemore = 0;
1513	if (repth->type == REP_LOG)
1514		typemore = REP_LOG_MORE;
1515	if (repth->type == REP_PAGE)
1516		typemore = REP_PAGE_MORE;
1517	DB_ASSERT(env, typemore != 0);
1518
1519	/*
1520	 * data_dbt.size is only the size of the log
1521	 * record;  it doesn't count the size of the
1522	 * control structure. Factor that in as well
1523	 * so we're not off by a lot if our log records
1524	 * are small.
1525	 */
1526	size = repth->data_dbt->size + sizeof(__rep_control_args);
1527	if (check_limit) {
1528		while (repth->bytes <= size) {
1529			if (repth->gbytes > 0) {
1530				repth->bytes += GIGABYTE;
1531				--(repth->gbytes);
1532				continue;
1533			}
1534			/*
1535			 * We don't hold the rep mutex,
1536			 * and may miscount.
1537			 */
1538			STAT(rep->stat.st_nthrottles++);
1539			repth->type = typemore;
1540			goto send;
1541		}
1542		repth->bytes -= size;
1543	}
1544	/*
1545	 * Always send if it is typemore, otherwise send only if
1546	 * REP_THROTTLE_ONLY is not set.
1547	 *
1548	 * NOTE:  It is the responsibility of the caller to marshal, if
1549	 * needed, the data_dbt.  This function just sends what it is given.
1550	 */
1551send:	if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
1552	    (__rep_send_message(env, eid, repth->type,
1553	    &repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0))
1554		return (DB_REP_UNAVAIL);
1555	return (0);
1556}
1557
1558/*
1559 * __rep_msg_to_old --
1560 *	Convert current message numbers to old message numbers.
1561 *
1562 * PUBLIC: u_int32_t __rep_msg_to_old __P((u_int32_t, u_int32_t));
1563 */
1564u_int32_t
1565__rep_msg_to_old(version, rectype)
1566	u_int32_t version, rectype;
1567{
1568	/*
1569	 * We need to convert from current message numbers to old numbers and
1570	 * we need to convert from old numbers to current numbers.  Offset by
1571	 * one for more readable code.
1572	 */
1573	/*
1574	 * Everything for version 0 is invalid, there is no version 0.
1575	 */
1576	static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
1577	/* There is no DB_REPVERSION 0. */
1578	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1579	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1580	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1581	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1582	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1583	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1584	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1585	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1586	/*
1587	 * 4.2/DB_REPVERSION 1 no longer supported.
1588	 */
1589	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1590	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1591	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1592	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1593	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1594	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1595	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1596	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1597	/*
1598	 * 4.3/DB_REPVERSION 2 no longer supported.
1599	 */
1600	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1601	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1602	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1603	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1604	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1605	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1606	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1607	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1608	/*
1609	 * From 4.7 message number To 4.4/4.5 message number
1610	 */
1611	{   REP_INVALID,	/* NO message 0 */
1612	    1,			/* REP_ALIVE */
1613	    2,			/* REP_ALIVE_REQ */
1614	    3,			/* REP_ALL_REQ */
1615	    4,			/* REP_BULK_LOG */
1616	    5,			/* REP_BULK_PAGE */
1617	    6,			/* REP_DUPMASTER */
1618	    7,			/* REP_FILE */
1619	    8,			/* REP_FILE_FAIL */
1620	    9,			/* REP_FILE_REQ */
1621	    REP_INVALID,	/* REP_LEASE_GRANT */
1622	    10,			/* REP_LOG */
1623	    11,			/* REP_LOG_MORE */
1624	    12,			/* REP_LOG_REQ */
1625	    13,			/* REP_MASTER_REQ */
1626	    14,			/* REP_NEWCLIENT */
1627	    15,			/* REP_NEWFILE */
1628	    16,			/* REP_NEWMASTER */
1629	    17,			/* REP_NEWSITE */
1630	    18,			/* REP_PAGE */
1631	    19,			/* REP_PAGE_FAIL */
1632	    20,			/* REP_PAGE_MORE */
1633	    21,			/* REP_PAGE_REQ */
1634	    22,			/* REP_REREQUEST */
1635	    REP_INVALID,	/* REP_START_SYNC */
1636	    23,			/* REP_UPDATE */
1637	    24,			/* REP_UPDATE_REQ */
1638	    25,			/* REP_VERIFY */
1639	    26,			/* REP_VERIFY_FAIL */
1640	    27,			/* REP_VERIFY_REQ */
1641	    28,			/* REP_VOTE1 */
1642	    29			/* REP_VOTE2 */
1643	},
1644	/*
1645	 * From 4.7 message number To 4.6 message number.  There are
1646	 * NO message differences between 4.6 and 4.7.  The
1647	 * control structure changed.
1648	 */
1649	{   REP_INVALID,	/* NO message 0 */
1650	    1,			/* REP_ALIVE */
1651	    2,			/* REP_ALIVE_REQ */
1652	    3,			/* REP_ALL_REQ */
1653	    4,			/* REP_BULK_LOG */
1654	    5,			/* REP_BULK_PAGE */
1655	    6,			/* REP_DUPMASTER */
1656	    7,			/* REP_FILE */
1657	    8,			/* REP_FILE_FAIL */
1658	    9,			/* REP_FILE_REQ */
1659	    10,			/* REP_LEASE_GRANT */
1660	    11,			/* REP_LOG */
1661	    12,			/* REP_LOG_MORE */
1662	    13,			/* REP_LOG_REQ */
1663	    14,			/* REP_MASTER_REQ */
1664	    15,			/* REP_NEWCLIENT */
1665	    16,			/* REP_NEWFILE */
1666	    17,			/* REP_NEWMASTER */
1667	    18,			/* REP_NEWSITE */
1668	    19,			/* REP_PAGE */
1669	    20,			/* REP_PAGE_FAIL */
1670	    21,			/* REP_PAGE_MORE */
1671	    22,			/* REP_PAGE_REQ */
1672	    23,			/* REP_REREQUEST */
1673	    24,			/* REP_START_SYNC */
1674	    25,			/* REP_UPDATE */
1675	    26,			/* REP_UPDATE_REQ */
1676	    27,			/* REP_VERIFY */
1677	    28,			/* REP_VERIFY_FAIL */
1678	    29,			/* REP_VERIFY_REQ */
1679	    30,			/* REP_VOTE1 */
1680	    31			/* REP_VOTE2 */
1681	}
1682	};
1683	return (table[version][rectype]);
1684}
1685
1686/*
1687 * __rep_msg_from_old --
1688 *	Convert old message numbers to current message numbers.
1689 *
1690 * PUBLIC: u_int32_t __rep_msg_from_old __P((u_int32_t, u_int32_t));
1691 */
1692u_int32_t
1693__rep_msg_from_old(version, rectype)
1694	u_int32_t version, rectype;
1695{
1696	/*
1697	 * We need to convert from current message numbers to old numbers and
1698	 * we need to convert from old numbers to current numbers.  Offset by
1699	 * one for more readable code.
1700	 */
1701	/*
1702	 * Everything for version 0 is invalid, there is no version 0.
1703	 */
1704	static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
1705	/* There is no DB_REPVERSION 0. */
1706	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1707	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1708	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1709	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1710	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1711	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1712	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1713	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1714	/*
1715	 * 4.2/DB_REPVERSION 1 no longer supported.
1716	 */
1717	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1718	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1719	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1720	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1721	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1722	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1723	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1724	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1725	/*
1726	 * 4.3/DB_REPVERSION 2 no longer supported.
1727	 */
1728	{   REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1729	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1730	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1731	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1732	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1733	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1734	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
1735	    REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
1736	/*
1737	 * From 4.4/4.5 message number To 4.7 message number
1738	 */
1739	{   REP_INVALID,	/* NO message 0 */
1740	    1,			/* 1, REP_ALIVE */
1741	    2,			/* 2, REP_ALIVE_REQ */
1742	    3,			/* 3, REP_ALL_REQ */
1743	    4,			/* 4, REP_BULK_LOG */
1744	    5,			/* 5, REP_BULK_PAGE */
1745	    6,			/* 6, REP_DUPMASTER */
1746	    7,			/* 7, REP_FILE */
1747	    8,			/* 8, REP_FILE_FAIL */
1748	    9,			/* 9, REP_FILE_REQ */
1749	    /* 10, REP_LEASE_GRANT doesn't exist */
1750	    11,			/* 10, REP_LOG */
1751	    12,			/* 11, REP_LOG_MORE */
1752	    13,			/* 12, REP_LOG_REQ */
1753	    14,			/* 13, REP_MASTER_REQ */
1754	    15,			/* 14, REP_NEWCLIENT */
1755	    16,			/* 15, REP_NEWFILE */
1756	    17,			/* 16, REP_NEWMASTER */
1757	    18,			/* 17, REP_NEWSITE */
1758	    19,			/* 18, REP_PAGE */
1759	    20,			/* 19, REP_PAGE_FAIL */
1760	    21,			/* 20, REP_PAGE_MORE */
1761	    22,			/* 21, REP_PAGE_REQ */
1762	    23,			/* 22, REP_REREQUEST */
1763	    /* 24, REP_START_SYNC doesn't exist */
1764	    25,			/* 23, REP_UPDATE */
1765	    26,			/* 24, REP_UPDATE_REQ */
1766	    27,			/* 25, REP_VERIFY */
1767	    28,			/* 26, REP_VERIFY_FAIL */
1768	    29,			/* 27, REP_VERIFY_REQ */
1769	    30,			/* 28, REP_VOTE1 */
1770	    31,			/* 29, REP_VOTE2 */
1771	    REP_INVALID,	/* 30, 4.4/4.5 no message */
1772	    REP_INVALID		/* 31, 4.4/4.5 no message */
1773	},
1774	/*
1775	 * From 4.6 message number To 4.6 message number.  There are
1776	 * NO message differences between 4.6 and 4.7.  The
1777	 * control structure changed.
1778	 */
1779	{   REP_INVALID,	/* NO message 0 */
1780	    1,			/* 1, REP_ALIVE */
1781	    2,			/* 2, REP_ALIVE_REQ */
1782	    3,			/* 3, REP_ALL_REQ */
1783	    4,			/* 4, REP_BULK_LOG */
1784	    5,			/* 5, REP_BULK_PAGE */
1785	    6,			/* 6, REP_DUPMASTER */
1786	    7,			/* 7, REP_FILE */
1787	    8,			/* 8, REP_FILE_FAIL */
1788	    9,			/* 9, REP_FILE_REQ */
1789	    10,			/* 10, REP_LEASE_GRANT */
1790	    11,			/* 11, REP_LOG */
1791	    12,			/* 12, REP_LOG_MORE */
1792	    13,			/* 13, REP_LOG_REQ */
1793	    14,			/* 14, REP_MASTER_REQ */
1794	    15,			/* 15, REP_NEWCLIENT */
1795	    16,			/* 16, REP_NEWFILE */
1796	    17,			/* 17, REP_NEWMASTER */
1797	    18,			/* 18, REP_NEWSITE */
1798	    19,			/* 19, REP_PAGE */
1799	    20,			/* 20, REP_PAGE_FAIL */
1800	    21,			/* 21, REP_PAGE_MORE */
1801	    22,			/* 22, REP_PAGE_REQ */
1802	    23,			/* 22, REP_REREQUEST */
1803	    24,			/* 24, REP_START_SYNC */
1804	    25,			/* 25, REP_UPDATE */
1805	    26,			/* 26, REP_UPDATE_REQ */
1806	    27,			/* 27, REP_VERIFY */
1807	    28,			/* 28, REP_VERIFY_FAIL */
1808	    29,			/* 29, REP_VERIFY_REQ */
1809	    30,			/* 30, REP_VOTE1 */
1810	    31			/* 31, REP_VOTE2 */
1811	}
1812	};
1813	return (table[version][rectype]);
1814}
1815
1816/*
1817 * __rep_print --
1818 *	Optionally print a verbose message.
1819 *
1820 * PUBLIC: void __rep_print __P((ENV *, const char *, ...))
1821 * PUBLIC:    __attribute__ ((__format__ (__printf__, 2, 3)));
1822 */
1823void
1824#ifdef STDC_HEADERS
1825__rep_print(ENV *env, const char *fmt, ...)
1826#else
1827__rep_print(env, fmt, va_alist)
1828	ENV *env;
1829	const char *fmt;
1830	va_dcl
1831#endif
1832{
1833	va_list ap;
1834	DB_MSGBUF mb;
1835	REP *rep;
1836	const char *s;
1837
1838	DB_MSGBUF_INIT(&mb);
1839
1840	s = NULL;
1841	if (env->dbenv->db_errpfx != NULL)
1842		s = env->dbenv->db_errpfx;
1843	else if (REP_ON(env)) {
1844		rep = env->rep_handle->region;
1845		if (F_ISSET(rep, REP_F_CLIENT))
1846			s = "CLIENT";
1847		else if (F_ISSET(rep, REP_F_MASTER))
1848			s = "MASTER";
1849	}
1850	if (s == NULL)
1851		s = "REP_UNDEF";
1852	__db_msgadd(env, &mb, "%s: ", s);
1853
1854#ifdef STDC_HEADERS
1855	va_start(ap, fmt);
1856#else
1857	va_start(ap);
1858#endif
1859	__db_msgadd_ap(env, &mb, fmt, ap);
1860	va_end(ap);
1861
1862	DB_MSGBUF_FLUSH(env, &mb);
1863}
1864
1865/*
1866 * PUBLIC: void __rep_print_message
1867 * PUBLIC:     __P((ENV *, int, __rep_control_args *, char *, u_int32_t));
1868 */
1869void
1870__rep_print_message(env, eid, rp, str, flags)
1871	ENV *env;
1872	int eid;
1873	__rep_control_args *rp;
1874	char *str;
1875	u_int32_t flags;
1876{
1877	u_int32_t ctlflags, rectype;
1878	char ftype[64], *type;
1879
1880	rectype = rp->rectype;
1881	ctlflags = rp->flags;
1882	if (rp->rep_version != DB_REPVERSION)
1883		rectype = __rep_msg_from_old(rp->rep_version, rectype);
1884	switch (rectype) {
1885	case REP_ALIVE:
1886		type = "alive";
1887		break;
1888	case REP_ALIVE_REQ:
1889		type = "alive_req";
1890		break;
1891	case REP_ALL_REQ:
1892		type = "all_req";
1893		break;
1894	case REP_BULK_LOG:
1895		type = "bulk_log";
1896		break;
1897	case REP_BULK_PAGE:
1898		type = "bulk_page";
1899		break;
1900	case REP_DUPMASTER:
1901		type = "dupmaster";
1902		break;
1903	case REP_FILE:
1904		type = "file";
1905		break;
1906	case REP_FILE_FAIL:
1907		type = "file_fail";
1908		break;
1909	case REP_FILE_REQ:
1910		type = "file_req";
1911		break;
1912	case REP_LEASE_GRANT:
1913		type = "lease_grant";
1914		break;
1915	case REP_LOG:
1916		type = "log";
1917		break;
1918	case REP_LOG_MORE:
1919		type = "log_more";
1920		break;
1921	case REP_LOG_REQ:
1922		type = "log_req";
1923		break;
1924	case REP_MASTER_REQ:
1925		type = "master_req";
1926		break;
1927	case REP_NEWCLIENT:
1928		type = "newclient";
1929		break;
1930	case REP_NEWFILE:
1931		type = "newfile";
1932		break;
1933	case REP_NEWMASTER:
1934		type = "newmaster";
1935		break;
1936	case REP_NEWSITE:
1937		type = "newsite";
1938		break;
1939	case REP_PAGE:
1940		type = "page";
1941		break;
1942	case REP_PAGE_FAIL:
1943		type = "page_fail";
1944		break;
1945	case REP_PAGE_MORE:
1946		type = "page_more";
1947		break;
1948	case REP_PAGE_REQ:
1949		type = "page_req";
1950		break;
1951	case REP_REREQUEST:
1952		type = "rerequest";
1953		break;
1954	case REP_START_SYNC:
1955		type = "start_sync";
1956		break;
1957	case REP_UPDATE:
1958		type = "update";
1959		break;
1960	case REP_UPDATE_REQ:
1961		type = "update_req";
1962		break;
1963	case REP_VERIFY:
1964		type = "verify";
1965		break;
1966	case REP_VERIFY_FAIL:
1967		type = "verify_fail";
1968		break;
1969	case REP_VERIFY_REQ:
1970		type = "verify_req";
1971		break;
1972	case REP_VOTE1:
1973		type = "vote1";
1974		break;
1975	case REP_VOTE2:
1976		type = "vote2";
1977		break;
1978	default:
1979		type = "NOTYPE";
1980		break;
1981	}
1982
1983	/*
1984	 * !!!
1985	 * If adding new flags to print out make sure the aggregate
1986	 * length cannot overflow the buffer.
1987	 */
1988	ftype[0] = '\0';
1989	if (LF_ISSET(DB_REP_ANYWHERE))
1990		(void)strcat(ftype, " any");		/* 4 */
1991	if (FLD_ISSET(ctlflags, REPCTL_FLUSH))
1992		(void)strcat(ftype, " flush");		/* 10 */
1993	/*
1994	 * We expect most of the time the messages will indicate
1995	 * group membership.  Only print if we're not already
1996	 * part of a group.
1997	 */
1998	if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD))
1999		(void)strcat(ftype, " nogroup");	/* 18 */
2000	if (FLD_ISSET(ctlflags, REPCTL_LEASE))
2001		(void)strcat(ftype, " lease");		/* 24 */
2002	if (LF_ISSET(DB_REP_NOBUFFER))
2003		(void)strcat(ftype, " nobuf");		/* 30 */
2004	if (LF_ISSET(DB_REP_PERMANENT))
2005		(void)strcat(ftype, " perm");		/* 35 */
2006	if (LF_ISSET(DB_REP_REREQUEST))
2007		(void)strcat(ftype, " rereq");		/* 41 */
2008	if (FLD_ISSET(ctlflags, REPCTL_RESEND))
2009		(void)strcat(ftype, " resend");		/* 48 */
2010	if (FLD_ISSET(ctlflags, REPCTL_LOG_END))
2011		(void)strcat(ftype, " logend");		/* 55 */
2012	RPRINT(env, DB_VERB_REP_MSGS,
2013	    (env,
2014    "%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s",
2015	    env->db_home, str,
2016	    (u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen,
2017	    eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype));
2018	/*
2019	 * Make sure the version is close, and not swapped
2020	 * here.  Check for current version,  +/- a little bit.
2021	 */
2022	DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10);
2023	DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10);
2024}
2025
2026/*
2027 * PUBLIC: void __rep_fire_event __P((ENV *, u_int32_t, void *));
2028 */
2029void
2030__rep_fire_event(env, event, info)
2031	ENV *env;
2032	u_int32_t event;
2033	void *info;
2034{
2035	int ret;
2036
2037	/*
2038	 * Give repmgr first crack at handling all replication-related events.
2039	 * If it can't (or chooses not to) handle the event fully, then pass it
2040	 * along to the application.
2041	 */
2042	ret = __repmgr_handle_event(env, event, info);
2043	DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED);
2044
2045	if (ret == DB_EVENT_NOT_HANDLED)
2046		DB_EVENT(env, event, info);
2047}
2048