1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/txn.h"
18
19static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *));
20static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
21static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
22static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
23static int __rep_getnext __P((ENV *, DB_THREAD_INFO *));
24static int __rep_lsn_cmp __P((const void *, const void *));
25static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *));
26static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
27    DBT *, db_timespec *, DB_LSN *));
28static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *));
29static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t));
30
31/* Used to consistently designate which messages ought to be received where. */
32
33#define	MASTER_ONLY(rep, rp) do {					\
34	if (!F_ISSET(rep, REP_F_MASTER)) {				\
35		RPRINT(env, DB_VERB_REP_MSGS,				\
36		    (env, "Master record received on client"));		\
37		REP_PRINT_MESSAGE(env,					\
38		    eid, rp, "rep_process_message", 0);			\
39		/* Just skip/ignore it. */				\
40		ret = 0;						\
41		goto errlock;						\
42	}								\
43} while (0)
44
45#define	CLIENT_ONLY(rep, rp) do {					\
46	if (!F_ISSET(rep, REP_F_CLIENT)) {				\
47		RPRINT(env, DB_VERB_REP_MSGS,				\
48		    (env, "Client record received on master"));		\
49		/*							\
50		 * Only broadcast DUPMASTER if leases are not		\
51		 * in effect.  If I am an old master, using		\
52		 * leases and I get a newer message, my leases		\
53		 * had better all be expired.				\
54		 */							\
55		if (IS_USING_LEASES(env))				\
56			DB_ASSERT(env,					\
57			    __rep_lease_check(env, 0) ==		\
58			    DB_REP_LEASE_EXPIRED);			\
59		else {							\
60			REP_PRINT_MESSAGE(env,				\
61			    eid, rp, "rep_process_message", 0);		\
62			(void)__rep_send_message(env, DB_EID_BROADCAST, \
63			    REP_DUPMASTER, NULL, NULL, 0, 0);		\
64		}							\
65		ret = DB_REP_DUPMASTER;					\
66		goto errlock;						\
67	}								\
68} while (0)
69
70/*
71 * If a client is attempting to service a request it does not have,
72 * call rep_skip_msg to skip this message and force a rerequest to the
73 * sender.  We don't hold the mutex for the stats and may miscount.
74 */
75#define	CLIENT_REREQ do {						\
76	if (F_ISSET(rep, REP_F_CLIENT)) {				\
77		STAT(rep->stat.st_client_svc_req++);			\
78		if (ret == DB_NOTFOUND) {				\
79			STAT(rep->stat.st_client_svc_miss++);		\
80			ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
81		}							\
82	}								\
83} while (0)
84
85#define	MASTER_UPDATE(env, renv) do {					\
86	REP_SYSTEM_LOCK(env);						\
87	F_SET((renv), DB_REGENV_REPLOCKED);				\
88	(void)time(&(renv)->op_timestamp);				\
89	REP_SYSTEM_UNLOCK(env);					\
90} while (0)
91
92#define	RECOVERING_SKIP do {						\
93	if (IS_REP_CLIENT(env) && recovering) {			\
94		/* Not holding region mutex, may miscount */		\
95		STAT(rep->stat.st_msgs_recover++);			\
96		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
97		goto errlock;						\
98	}								\
99} while (0)
100
101/*
102 * If we're recovering the log we only want log records that are in the
103 * range we need to recover.  Otherwise we can end up storing a huge
104 * number of "new" records, only to truncate the temp database later after
105 * we run recovery.  If we are actively delaying a sync-up, we also skip
106 * all incoming log records until the application requests sync-up.
107 */
108#define	RECOVERING_LOG_SKIP do {					\
109	if (F_ISSET(rep, REP_F_DELAY) ||				\
110	    rep->master_id == DB_EID_INVALID ||				\
111	    (recovering &&						\
112	    (!F_ISSET(rep, REP_F_RECOVER_LOG) ||			\
113	     LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) {		\
114		/* Not holding region mutex, may miscount */		\
115		STAT(rep->stat.st_msgs_recover++);			\
116		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
117		goto errlock;						\
118	}								\
119} while (0)
120
121#define	ANYSITE(rep)
122
123/*
124 * __rep_process_message_pp --
125 *
126 * This routine takes an incoming message and processes it.
127 *
128 * control: contains the control fields from the record
129 * rec: contains the actual record
130 * eid: the environment id of the sender of the message;
131 * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
132 *	lsn of the maximum permanent or current not permanent log record
133 *	(respectively).
134 *
135 * PUBLIC: int __rep_process_message_pp
136 * PUBLIC:      __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *));
137 */
138int
139__rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
140	DB_ENV *dbenv;
141	DBT *control, *rec;
142	int eid;
143	DB_LSN *ret_lsnp;
144{
145	ENV *env;
146	int ret;
147
148	env = dbenv->env;
149	ret = 0;
150
151	ENV_REQUIRES_CONFIG_XX(
152	    env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP);
153
154	if (APP_IS_REPMGR(env)) {
155		__db_errx(env, "%s %s", "DB_ENV->rep_process_message:",
156		    "cannot call from Replication Manager application");
157		return (EINVAL);
158	}
159
160	/* Control argument must be non-Null. */
161	if (control == NULL || control->size == 0) {
162		__db_errx(env,
163	"DB_ENV->rep_process_message: control argument must be specified");
164		return (EINVAL);
165	}
166
167	/*
168	 * Make sure site is a master or a client, which implies that
169	 * replication has been started.
170	 */
171	if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) {
172		__db_errx(env,
173	"Environment not configured as replication master or client");
174		return (EINVAL);
175	}
176
177	if ((ret = __dbt_usercopy(env, control)) != 0 ||
178	    (ret = __dbt_usercopy(env, rec)) != 0) {
179		__dbt_userfree(env, control, rec, NULL);
180		__db_errx(env,
181	"DB_ENV->rep_process_message: error retrieving DBT contents");
182		return ret;
183	}
184
185	ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp);
186
187	return (ret);
188}
189
190/*
191 * __rep_process_message_int --
192 *
193 * This routine performs the internal steps to process an incoming message.
194 *
195 * PUBLIC: int __rep_process_message_int
196 * PUBLIC:     __P((ENV *, DBT *, DBT *, int, DB_LSN *));
197 */
198int
199__rep_process_message_int(env, control, rec, eid, ret_lsnp)
200	ENV *env;
201	DBT *control, *rec;
202	int eid;
203	DB_LSN *ret_lsnp;
204{
205	DBT data_dbt;
206	DB_LOG *dblp;
207	DB_LSN last_lsn, lsn;
208	DB_REP *db_rep;
209	DB_THREAD_INFO *ip;
210	LOG *lp;
211	REGENV *renv;
212	REGINFO *infop;
213	REP *rep;
214	REP_46_CONTROL *rp46;
215	REP_OLD_CONTROL *orp;
216	__rep_control_args *rp, tmprp;
217	__rep_egen_args egen_arg;
218	size_t len;
219	u_int32_t gen, rep_version;
220	int cmp, do_sync, lockout, recovering, ret, t_ret;
221	time_t savetime;
222	u_int8_t buf[__REP_MAXMSG_SIZE];
223
224	ret = 0;
225	do_sync = 0;
226	lockout = 0;
227	db_rep = env->rep_handle;
228	rep = db_rep->region;
229	dblp = env->lg_handle;
230	lp = dblp->reginfo.primary;
231	infop = env->reginfo;
232	renv = infop->primary;
233	/*
234	 * Casting this to REP_OLD_CONTROL is just kind of stylistic: the
235	 * rep_version field of course has to be in the same offset in all
236	 * versions in order for this to work.
237	 *
238	 * We can look at the rep_version unswapped here because if we're
239	 * talking to an old version, it will always be unswapped.  If
240	 * we're talking to a new version, the only issue is if it is
241	 * swapped and we take one of the old version conditionals
242	 * incorrectly.  The rep_version would need to be very, very
243	 * large for a swapped version to look like a small, older
244	 * version.  There is no problem here looking at it unswapped.
245	 */
246	rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version;
247	if (rep_version <= DB_REPVERSION_45) {
248		orp = (REP_OLD_CONTROL *)control->data;
249		if (rep_version == DB_REPVERSION_45 &&
250		    F_ISSET(orp, REPCTL_INIT_45)) {
251			F_CLR(orp, REPCTL_INIT_45);
252			F_SET(orp, REPCTL_INIT);
253		}
254		tmprp.rep_version = orp->rep_version;
255		tmprp.log_version = orp->log_version;
256		tmprp.lsn = orp->lsn;
257		tmprp.rectype = orp->rectype;
258		tmprp.gen = orp->gen;
259		tmprp.flags = orp->flags;
260		tmprp.msg_sec = 0;
261		tmprp.msg_nsec = 0;
262	} else if (rep_version == DB_REPVERSION_46) {
263		rp46 = (REP_46_CONTROL *)control->data;
264		tmprp.rep_version = rp46->rep_version;
265		tmprp.log_version = rp46->log_version;
266		tmprp.lsn = rp46->lsn;
267		tmprp.rectype = rp46->rectype;
268		tmprp.gen = rp46->gen;
269		tmprp.flags = rp46->flags;
270		tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec;
271		tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec;
272	} else
273		if ((ret = __rep_control_unmarshal(env, &tmprp,
274		    control->data, control->size, NULL)) != 0)
275			return (ret);
276	rp = &tmprp;
277	if (ret_lsnp != NULL)
278		ZERO_LSN(*ret_lsnp);
279
280	ENV_ENTER(env, ip);
281
282	REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
283	/*
284	 * Check the version number for both rep and log.  If it is
285	 * an old version we support, convert it.  Otherwise complain.
286	 */
287	if (rp->rep_version < DB_REPVERSION) {
288		if (rp->rep_version < DB_REPVERSION_MIN) {
289			__db_errx(env,
290 "unsupported old replication message version %lu, minimum version %d",
291			    (u_long)rp->rep_version, DB_REPVERSION_MIN);
292			ret = EINVAL;
293			goto errlock;
294		}
295		RPRINT(env, DB_VERB_REP_MSGS, (env,
296		    "Received record %lu with old rep version %lu",
297		    (u_long)rp->rectype, (u_long)rp->rep_version));
298		rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype);
299		DB_ASSERT(env, rp->rectype != REP_INVALID);
300		/*
301		 * We should have a valid new record type for all the old
302		 * versions.
303		 */
304		RPRINT(env, DB_VERB_REP_MSGS, (env,
305		    "Converted to record %lu with old rep version %lu",
306		    (u_long)rp->rectype, (u_long)rp->rep_version));
307	} else if (rp->rep_version > DB_REPVERSION) {
308		__db_errx(env,
309		    "unexpected replication message version %lu, expected %d",
310		    (u_long)rp->rep_version, DB_REPVERSION);
311		ret = EINVAL;
312		goto errlock;
313	}
314
315	if (rp->log_version < DB_LOGVERSION) {
316		if (rp->log_version < DB_LOGVERSION_MIN) {
317			__db_errx(env,
318 "unsupported old replication log version %lu, minimum version %d",
319			    (u_long)rp->log_version, DB_LOGVERSION_MIN);
320			ret = EINVAL;
321			goto errlock;
322		}
323		RPRINT(env, DB_VERB_REP_MSGS, (env,
324		    "Received record %lu with old log version %lu",
325		    (u_long)rp->rectype, (u_long)rp->log_version));
326	} else if (rp->log_version > DB_LOGVERSION) {
327		__db_errx(env,
328		    "unexpected log record version %lu, expected %d",
329		    (u_long)rp->log_version, DB_LOGVERSION);
330		ret = EINVAL;
331		goto errlock;
332	}
333
334	/*
335	 * Acquire the replication lock.
336	 */
337	REP_SYSTEM_LOCK(env);
338	if (F_ISSET(rep, REP_F_READY_MSG)) {
339		/*
340		 * If we're racing with a thread in rep_start, then
341		 * just ignore the message and return.
342		 */
343		RPRINT(env, DB_VERB_REP_MSGS, (env,
344		    "Racing replication msg lockout, ignore message."));
345		if (F_ISSET(rp, REPCTL_PERM))
346			ret = DB_REP_IGNORE;
347		REP_SYSTEM_UNLOCK(env);
348		/*
349		 * If another client has sent a c2c request to us, it may be a
350		 * long time before it resends the request (due to its dual data
351		 * streams avoidance heuristic); let it know we can't serve the
352		 * request just now.
353		 */
354		if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) {
355			STAT(rep->stat.st_client_svc_req++);
356			STAT(rep->stat.st_client_svc_miss++);
357			(void)__rep_send_message(env,
358			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
359		}
360		goto out;
361	}
362	rep->msg_th++;
363	gen = rep->gen;
364	recovering = F_ISSET(rep, REP_F_RECOVER_MASK);
365	savetime = renv->rep_timestamp;
366
367	STAT(rep->stat.st_msgs_processed++);
368	REP_SYSTEM_UNLOCK(env);
369
370	/*
371	 * Check for lease configuration matching.  Leases must be
372	 * configured all or none.  If I am a client and I receive a
373	 * message requesting a lease, and I'm not using leases, that
374	 * is an error.
375	 */
376	if (!IS_USING_LEASES(env) &&
377	    (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) {
378		__db_errx(env,
379		    "Inconsistent lease configuration");
380		RPRINT(env, DB_VERB_REP_MSGS, (env,
381		    "Client received lease message and not using leases"));
382		ret = EINVAL;
383		ret = __env_panic(env, ret);
384		goto errlock;
385	}
386
387	/*
388	 * Check for generation number matching.  Ignore any old messages
389	 * except requests that are indicative of a new client that needs
390	 * to get in sync.
391	 */
392	if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
393	    rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
394	    rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) {
395		/*
396		 * We don't hold the rep mutex, and could miscount if we race.
397		 */
398		STAT(rep->stat.st_msgs_badgen++);
399		if (F_ISSET(rp, REPCTL_PERM))
400			ret = DB_REP_IGNORE;
401		goto errlock;
402	}
403
404	if (rp->gen > gen) {
405		/*
406		 * If I am a master and am out of date with a lower generation
407		 * number, I am in bad shape and should downgrade.
408		 */
409		if (F_ISSET(rep, REP_F_MASTER)) {
410			STAT(rep->stat.st_dupmasters++);
411			ret = DB_REP_DUPMASTER;
412			/*
413			 * Only broadcast DUPMASTER if leases are not
414			 * in effect.  If I am an old master, using
415			 * leases and I get a newer message, my leases
416			 * had better all be expired.
417			 */
418			if (IS_USING_LEASES(env))
419				DB_ASSERT(env,
420				    __rep_lease_check(env, 0) ==
421				    DB_REP_LEASE_EXPIRED);
422			else if (rp->rectype != REP_DUPMASTER)
423				(void)__rep_send_message(env,
424				    DB_EID_BROADCAST, REP_DUPMASTER,
425				    NULL, NULL, 0, 0);
426			goto errlock;
427		}
428
429		/*
430		 * I am a client and am out of date.  If this is an election,
431		 * or a response from the first site I contacted, then I can
432		 * accept the generation number and participate in future
433		 * elections and communication. Otherwise, I need to hear about
434		 * a new master and sync up.
435		 */
436		if (rp->rectype == REP_ALIVE ||
437		    rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
438			REP_SYSTEM_LOCK(env);
439			RPRINT(env, DB_VERB_REP_MSGS, (env,
440			    "Updating gen from %lu to %lu",
441			    (u_long)gen, (u_long)rp->gen));
442			rep->master_id = DB_EID_INVALID;
443			gen = rep->gen = rp->gen;
444			/*
445			 * Updating of egen will happen when we process the
446			 * message below for each message type.
447			 */
448			REP_SYSTEM_UNLOCK(env);
449			if (rp->rectype == REP_ALIVE)
450				(void)__rep_send_message(env,
451				    DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
452				    NULL, 0, 0);
453		} else if (rp->rectype != REP_NEWMASTER) {
454			/*
455			 * Ignore this message, retransmit if needed.
456			 */
457			if (__rep_check_doreq(env, rep))
458				(void)__rep_send_message(env,
459				    DB_EID_BROADCAST, REP_MASTER_REQ,
460				    NULL, NULL, 0, 0);
461			goto errlock;
462		}
463		/*
464		 * If you get here, then you're a client and either you're
465		 * in an election or you have a NEWMASTER or an ALIVE message
466		 * whose processing will do the right thing below.
467		 */
468	}
469
470	/*
471	 * If the sender is part of an established group, so are we now.
472	 */
473	if (F_ISSET(rp, REPCTL_GROUP_ESTD)) {
474		REP_SYSTEM_LOCK(env);
475#ifdef	DIAGNOSTIC
476		if (!F_ISSET(rep, REP_F_GROUP_ESTD))
477			RPRINT(env, DB_VERB_REP_MSGS, (env,
478			    "I am now part of an established group"));
479#endif
480		F_SET(rep, REP_F_GROUP_ESTD);
481		REP_SYSTEM_UNLOCK(env);
482	}
483
484	/*
485	 * We need to check if we're in recovery and if we are
486	 * then we need to ignore any messages except VERIFY*, VOTE*,
487	 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
488	 * PAGE* and FILE*.  We need to also accept LOG messages
489	 * if we're copying the log for recovery/backup.
490	 */
491	switch (rp->rectype) {
492	case REP_ALIVE:
493		/*
494		 * Handle even if we're recovering.
495		 */
496		ANYSITE(rep);
497		if (rp->rep_version < DB_REPVERSION_47)
498			egen_arg.egen = *(u_int32_t *)rec->data;
499		else if ((ret = __rep_egen_unmarshal(env, &egen_arg,
500		    rec->data, rec->size, NULL)) != 0)
501			return (ret);
502		REP_SYSTEM_LOCK(env);
503		RPRINT(env, DB_VERB_REP_MSGS, (env,
504		    "Received ALIVE egen of %lu, mine %lu",
505		    (u_long)egen_arg.egen, (u_long)rep->egen));
506		if (egen_arg.egen > rep->egen) {
507			/*
508			 * We're changing egen, need to clear out any old
509			 * election information.  We need to set the
510			 * REP_F_EGENUPDATE flag here so that any thread
511			 * waiting in rep_elect/rep_wait can distinguish
512			 * this situation (and restart its election) from
513			 * a current master saying it is still master and
514			 * the egen getting incremented on that path.
515			 */
516			__rep_elect_done(env, rep, 0);
517			rep->egen = egen_arg.egen;
518			F_SET(rep, REP_F_EGENUPDATE);
519		}
520		REP_SYSTEM_UNLOCK(env);
521		break;
522	case REP_ALIVE_REQ:
523		/*
524		 * Handle even if we're recovering.
525		 */
526		ANYSITE(rep);
527		LOG_SYSTEM_LOCK(env);
528		lsn = lp->lsn;
529		LOG_SYSTEM_UNLOCK(env);
530#ifdef	CONFIG_TEST
531		/*
532		 * Send this first, before the ALIVE message because of the
533		 * way the test suite and messaging is done sequentially.
534		 * In some sequences it is possible to get into a situation
535		 * where the test suite cannot get the later NEWMASTER because
536		 * we break out of the messaging loop too early.
537		 */
538		if (F_ISSET(rep, REP_F_MASTER))
539			(void)__rep_send_message(env,
540			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
541#endif
542		REP_SYSTEM_LOCK(env);
543		egen_arg.egen = rep->egen;
544		REP_SYSTEM_UNLOCK(env);
545		if (rep->version < DB_REPVERSION_47)
546			DB_INIT_DBT(data_dbt, &egen_arg.egen,
547			    sizeof(egen_arg.egen));
548		else {
549			if ((ret = __rep_egen_marshal(env,
550			    &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0)
551				goto errlock;
552			DB_INIT_DBT(data_dbt, buf, len);
553		}
554		(void)__rep_send_message(env,
555		    eid, REP_ALIVE, &lsn, &data_dbt, 0, 0);
556		break;
557	case REP_ALL_REQ:
558		RECOVERING_SKIP;
559		ret = __rep_allreq(env, rp, eid);
560		CLIENT_REREQ;
561		break;
562	case REP_BULK_LOG:
563		RECOVERING_LOG_SKIP;
564		CLIENT_ONLY(rep, rp);
565		ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp);
566		break;
567	case REP_BULK_PAGE:
568		/*
569		 * Handle even if we're recovering.
570		 */
571		CLIENT_ONLY(rep, rp);
572		ret = __rep_bulk_page(env, ip, eid, rp, rec);
573		break;
574	case REP_DUPMASTER:
575		/*
576		 * Handle even if we're recovering.
577		 */
578		if (F_ISSET(rep, REP_F_MASTER))
579			ret = DB_REP_DUPMASTER;
580		break;
581#ifdef NOTYET
582	case REP_FILE: /* TODO */
583		CLIENT_ONLY(rep, rp);
584		break;
585	case REP_FILE_REQ:
586		ret = __rep_send_file(env, rec, eid);
587		break;
588#endif
589	case REP_FILE_FAIL:
590		/*
591		 * Handle even if we're recovering.
592		 */
593		CLIENT_ONLY(rep, rp);
594		/*
595		 * Clean up any internal init that was in progress.
596		 */
597		if (eid == rep->master_id) {
598			REP_SYSTEM_LOCK(env);
599			/*
600			 * If we're already locking out messages, give up.
601			 */
602			if (F_ISSET(rep, REP_F_READY_MSG))
603				goto errhlk;
604			/*
605			 * Lock out other messages to prevent race
606			 * conditions.
607			 */
608			if ((ret =
609			    __rep_lockout_msg(env, rep, 1)) != 0) {
610				goto errhlk;
611			}
612			lockout = 1;
613			/*
614			 * Need mtx_clientdb to safely clean up
615			 * page database in __rep_init_cleanup().
616			 */
617			REP_SYSTEM_UNLOCK(env);
618			MUTEX_LOCK(env, rep->mtx_clientdb);
619			REP_SYSTEM_LOCK(env);
620			/*
621			 * Clean up internal init if one was in progress.
622			 */
623			if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) {
624				RPRINT(env, DB_VERB_REP_MSGS, (env,
625    "FILE_FAIL is cleaning up old internal init"));
626#ifdef	CONFIG_TEST
627				STAT(rep->stat.st_filefail_cleanups++);
628#endif
629				ret = __rep_init_cleanup(env, rep, DB_FORCE);
630				F_CLR(rep,
631				    REP_F_ABBREVIATED | REP_F_RECOVER_MASK);
632			}
633			MUTEX_UNLOCK(env, rep->mtx_clientdb);
634			if (ret != 0) {
635				RPRINT(env, DB_VERB_REP_MSGS, (env,
636    "FILE_FAIL error cleaning up internal init: %d", ret));
637				goto errhlk;
638			}
639			F_CLR(rep, REP_F_READY_MSG);
640			lockout = 0;
641			/*
642			 * Restart internal init, setting UPDATE flag and
643			 * zeroing applicable LSNs.
644			 */
645			F_SET(rep, REP_F_RECOVER_UPDATE);
646			ZERO_LSN(rep->first_lsn);
647			ZERO_LSN(rep->ckp_lsn);
648			REP_SYSTEM_UNLOCK(env);
649			(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
650			    NULL, NULL, 0, 0);
651		}
652		break;
653	case REP_LEASE_GRANT:
654		/*
655		 * Handle even if we're recovering.
656		 */
657		MASTER_ONLY(rep, rp);
658		ret = __rep_lease_grant(env, rp, rec, eid);
659		break;
660	case REP_LOG:
661	case REP_LOG_MORE:
662		RECOVERING_LOG_SKIP;
663		CLIENT_ONLY(rep, rp);
664		ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp);
665		break;
666	case REP_LOG_REQ:
667		RECOVERING_SKIP;
668		if (F_ISSET(rp, REPCTL_INIT))
669			MASTER_UPDATE(env, renv);
670		ret = __rep_logreq(env, rp, rec, eid);
671		CLIENT_REREQ;
672		break;
673	case REP_NEWSITE:
674		/*
675		 * Handle even if we're recovering.
676		 */
677		/* We don't hold the rep mutex, and may miscount. */
678		STAT(rep->stat.st_newsites++);
679
680		/* This is a rebroadcast; simply tell the application. */
681		if (F_ISSET(rep, REP_F_MASTER)) {
682			dblp = env->lg_handle;
683			lp = dblp->reginfo.primary;
684			LOG_SYSTEM_LOCK(env);
685			lsn = lp->lsn;
686			LOG_SYSTEM_UNLOCK(env);
687			(void)__rep_send_message(env,
688			    eid, REP_NEWMASTER, &lsn, NULL, 0, 0);
689		}
690		ret = DB_REP_NEWSITE;
691		break;
692	case REP_NEWCLIENT:
693		/*
694		 * Handle even if we're recovering.
695		 */
696		/*
697		 * This message was received and should have resulted in the
698		 * application entering the machine ID in its machine table.
699		 * We respond to this with an ALIVE to send relevant information
700		 * to the new client (if we are a master, we'll send a
701		 * NEWMASTER, so we only need to send the ALIVE if we're a
702		 * client).  But first, broadcast the new client's record to
703		 * all the clients.
704		 */
705		(void)__rep_send_message(env,
706		    DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
707
708		ret = DB_REP_NEWSITE;
709
710		if (F_ISSET(rep, REP_F_CLIENT)) {
711			REP_SYSTEM_LOCK(env);
712			egen_arg.egen = rep->egen;
713
714			/*
715			 * Clean up any previous master remnants by making
716			 * master_id invalid and cleaning up any internal
717			 * init that was in progress.
718			 */
719			if (eid == rep->master_id) {
720				rep->master_id = DB_EID_INVALID;
721
722				/*
723				 * Already locking out messages, must be
724				 * in sync-up recover or internal init,
725				 * give up.
726				 */
727				if (F_ISSET(rep, REP_F_READY_MSG))
728					goto errhlk;
729
730				/*
731				 * Lock out other messages to prevent race
732				 * conditions.
733				 */
734				if ((t_ret =
735				    __rep_lockout_msg(env, rep, 1)) != 0) {
736					ret = t_ret;
737					goto errhlk;
738				}
739				lockout = 1;
740
741				/*
742				 * Need mtx_clientdb to safely clean up
743				 * page database in __rep_init_cleanup().
744				 */
745				REP_SYSTEM_UNLOCK(env);
746				MUTEX_LOCK(env, rep->mtx_clientdb);
747				REP_SYSTEM_LOCK(env);
748
749				/*
750				 * Clean up internal init if one was in
751				 * progress.
752				 */
753				if (F_ISSET(rep, REP_F_READY_API |
754				    REP_F_READY_OP)) {
755					RPRINT(env, DB_VERB_REP_MSGS, (env,
756    "NEWCLIENT is cleaning up old internal init for invalid master"));
757					t_ret = __rep_init_cleanup(env,
758					    rep, DB_FORCE);
759					F_CLR(rep, REP_F_ABBREVIATED |
760					    REP_F_RECOVER_MASK);
761				}
762				MUTEX_UNLOCK(env, rep->mtx_clientdb);
763				if (t_ret != 0) {
764					ret = t_ret;
765					RPRINT(env, DB_VERB_REP_MSGS, (env,
766    "NEWCLIENT error cleaning up internal init for invalid master: %d", ret));
767					goto errhlk;
768				}
769				F_CLR(rep, REP_F_READY_MSG);
770				lockout = 0;
771			}
772			REP_SYSTEM_UNLOCK(env);
773			if (rep->version < DB_REPVERSION_47)
774				DB_INIT_DBT(data_dbt, &egen_arg.egen,
775				    sizeof(egen_arg.egen));
776			else {
777				if ((ret = __rep_egen_marshal(env, &egen_arg,
778				    buf, __REP_EGEN_SIZE, &len)) != 0)
779					goto errlock;
780				DB_INIT_DBT(data_dbt, buf, len);
781			}
782			(void)__rep_send_message(env, DB_EID_BROADCAST,
783			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
784			break;
785		}
786		/* FALLTHROUGH */
787	case REP_MASTER_REQ:
788		RECOVERING_SKIP;
789		if (F_ISSET(rep, REP_F_MASTER)) {
790			LOG_SYSTEM_LOCK(env);
791			lsn = lp->lsn;
792			LOG_SYSTEM_UNLOCK(env);
793			(void)__rep_send_message(env,
794			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
795			if (IS_USING_LEASES(env))
796				(void)__rep_lease_refresh(env);
797		}
798		/*
799		 * If there is no master, then we could get into a state
800		 * where an old client lost the initial ALIVE message and
801		 * is calling an election under an old gen and can
802		 * never get to the current gen.
803		 */
804		if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
805			REP_SYSTEM_LOCK(env);
806			egen_arg.egen = rep->egen;
807			if (eid == rep->master_id)
808				rep->master_id = DB_EID_INVALID;
809			REP_SYSTEM_UNLOCK(env);
810			if (rep->version < DB_REPVERSION_47)
811				DB_INIT_DBT(data_dbt, &egen_arg.egen,
812				    sizeof(egen_arg.egen));
813			else {
814				if ((ret = __rep_egen_marshal(env, &egen_arg,
815				    buf, __REP_EGEN_SIZE, &len)) != 0)
816					goto errlock;
817				DB_INIT_DBT(data_dbt, buf, len);
818			}
819			(void)__rep_send_message(env, eid,
820			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
821		}
822		break;
823	case REP_NEWFILE:
824		RECOVERING_LOG_SKIP;
825		CLIENT_ONLY(rep, rp);
826		ret = __rep_apply(env,
827		     ip, rp, rec, ret_lsnp, NULL, &last_lsn);
828		if (ret == DB_REP_LOGREADY)
829			ret = __rep_logready(env, rep, savetime, &last_lsn);
830		break;
831	case REP_NEWMASTER:
832		/*
833		 * Handle even if we're recovering.
834		 */
835		ANYSITE(rep);
836		if (F_ISSET(rep, REP_F_MASTER) &&
837		    eid != rep->eid) {
838			/* We don't hold the rep mutex, and may miscount. */
839			STAT(rep->stat.st_dupmasters++);
840			ret = DB_REP_DUPMASTER;
841			if (IS_USING_LEASES(env))
842				DB_ASSERT(env,
843				    __rep_lease_check(env, 0) ==
844				    DB_REP_LEASE_EXPIRED);
845			else
846				(void)__rep_send_message(env,
847				    DB_EID_BROADCAST, REP_DUPMASTER,
848				    NULL, NULL, 0, 0);
849			break;
850		}
851		if ((ret =
852		    __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER)
853			ret = __rep_fire_newmaster(env, rp->gen, eid);
854		break;
855	case REP_PAGE:
856	case REP_PAGE_MORE:
857		/*
858		 * Handle even if we're recovering.
859		 */
860		CLIENT_ONLY(rep, rp);
861		ret = __rep_page(env, ip, eid, rp, rec);
862		if (ret == DB_REP_PAGEDONE)
863			ret = 0;
864		break;
865	case REP_PAGE_FAIL:
866		/*
867		 * Handle even if we're recovering.
868		 */
869		CLIENT_ONLY(rep, rp);
870		ret = __rep_page_fail(env, ip, eid, rp, rec);
871		break;
872	case REP_PAGE_REQ:
873		RECOVERING_SKIP;
874		MASTER_UPDATE(env, renv);
875		ret = __rep_page_req(env, ip, eid, rp, rec);
876		CLIENT_REREQ;
877		break;
878	case REP_REREQUEST:
879		/*
880		 * Handle even if we're recovering.  Don't do a master
881		 * check.
882		 */
883		CLIENT_ONLY(rep, rp);
884		/*
885		 * Don't hold any mutex, may miscount.
886		 */
887		STAT(rep->stat.st_client_rerequests++);
888		ret = __rep_resend_req(env, 1);
889		break;
890	case REP_START_SYNC:
891		RECOVERING_SKIP;
892		MUTEX_LOCK(env, rep->mtx_clientdb);
893		cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
894		/*
895		 * The comparison needs to be <= because the LSN in
896		 * the message can be the LSN of the first outstanding
897		 * txn, which may be the LSN immediately after the
898		 * previous commit.  The ready_lsn is the LSN of the
899		 * next record expected.  In that case, the LSNs
900		 * could be equal and the client has the commit and
901		 * wants to sync. [SR #15338]
902		 */
903		if (cmp <= 0) {
904			MUTEX_UNLOCK(env, rep->mtx_clientdb);
905			do_sync = 1;
906		} else {
907			STAT(rep->stat.st_startsync_delayed++);
908			/*
909			 * There are cases where keeping the first ckp_lsn
910			 * LSN is advantageous and cases where keeping
911			 * a later LSN is better.  If random, earlier
912			 * log records are missing, keeping the later
913			 * LSN seems to be better.  That is what we'll
914			 * do for now.
915			 */
916			if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0)
917				rep->ckp_lsn = rp->lsn;
918			RPRINT(env, DB_VERB_REP_MSGS, (env,
919    "Delayed START_SYNC memp_sync due to missing records."));
920			RPRINT(env, DB_VERB_REP_MSGS, (env,
921    "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]",
922		    (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset,
923		    (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset));
924			MUTEX_UNLOCK(env, rep->mtx_clientdb);
925		}
926		break;
927	case REP_UPDATE:
928		/*
929		 * Handle even if we're recovering.
930		 */
931		CLIENT_ONLY(rep, rp);
932		ret = __rep_update_setup(env, eid, rp, rec, savetime);
933		break;
934	case REP_UPDATE_REQ:
935		/*
936		 * Handle even if we're recovering.
937		 */
938		MASTER_ONLY(rep, rp);
939		infop = env->reginfo;
940		renv = infop->primary;
941		MASTER_UPDATE(env, renv);
942		ret = __rep_update_req(env, rp, eid);
943		break;
944	case REP_VERIFY:
945		if (recovering) {
946			MUTEX_LOCK(env, rep->mtx_clientdb);
947			cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn);
948			MUTEX_UNLOCK(env, rep->mtx_clientdb);
949			/*
950			 * If this is not the verify record I want, skip it.
951			 */
952			if (cmp != 0) {
953				ret = __rep_skip_msg(
954				    env, rep, eid, rp->rectype);
955				break;
956			}
957		}
958		CLIENT_ONLY(rep, rp);
959		ret = __rep_verify(env, rp, rec, eid, savetime);
960		break;
961	case REP_VERIFY_FAIL:
962		/*
963		 * Handle even if we're recovering.
964		 */
965		CLIENT_ONLY(rep, rp);
966		ret = __rep_verify_fail(env, rp);
967		break;
968	case REP_VERIFY_REQ:
969		RECOVERING_SKIP;
970		ret = __rep_verify_req(env, rp, eid);
971		CLIENT_REREQ;
972		break;
973	case REP_VOTE1:
974		/*
975		 * Handle even if we're recovering.
976		 */
977		ret = __rep_vote1(env, rp, rec, eid);
978		break;
979	case REP_VOTE2:
980		/*
981		 * Handle even if we're recovering.
982		 */
983		ret = __rep_vote2(env, rp, rec, eid);
984		break;
985	default:
986		__db_errx(env,
987	"DB_ENV->rep_process_message: unknown replication message: type %lu",
988		   (u_long)rp->rectype);
989		ret = EINVAL;
990		break;
991	}
992
993errlock:
994	REP_SYSTEM_LOCK(env);
995errhlk:	if (lockout)
996		F_CLR(rep, REP_F_READY_MSG);
997	rep->msg_th--;
998	REP_SYSTEM_UNLOCK(env);
999	if (do_sync) {
1000		MUTEX_LOCK(env, rep->mtx_ckp);
1001		lsn = rp->lsn;
1002		/*
1003		 * This is the REP_START_SYNC sync, and so we permit it to be
1004		 * interrupted.
1005		 */
1006		ret = __memp_sync(
1007		    env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn);
1008		MUTEX_UNLOCK(env, rep->mtx_ckp);
1009		RPRINT(env, DB_VERB_REP_MSGS,
1010		    (env, "ALIVE: Completed sync [%lu][%lu]",
1011		    (u_long)lsn.file, (u_long)lsn.offset));
1012	}
1013out:
1014	if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1015		if (ret_lsnp != NULL)
1016			*ret_lsnp = rp->lsn;
1017		ret = DB_REP_NOTPERM;
1018	}
1019	__dbt_userfree(env, control, rec, NULL);
1020	ENV_LEAVE(env, ip);
1021	return (ret);
1022}
1023
1024/*
1025 * __rep_apply --
1026 *
1027 * Handle incoming log records on a client, applying when possible and
1028 * entering into the bookkeeping table otherwise.  This routine manages
1029 * the state of the incoming message stream -- processing records, via
1030 * __rep_process_rec, when possible and enqueuing in the __db.rep.db
1031 * when necessary.  As gaps in the stream are filled in, this is where
1032 * we try to process as much as possible from __db.rep.db to catch up.
1033 *
1034 * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
1035 * PUBLIC:     DBT *, DB_LSN *, int *, DB_LSN *));
1036 */
1037int
1038__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp)
1039	ENV *env;
1040	DB_THREAD_INFO *ip;
1041	__rep_control_args *rp;
1042	DBT *rec;
1043	DB_LSN *ret_lsnp;
1044	int *is_dupp;
1045	DB_LSN *last_lsnp;
1046{
1047	DB *dbp;
1048	DBT control_dbt, key_dbt;
1049	DBT rec_dbt;
1050	DB_LOG *dblp;
1051	DB_LSN max_lsn, save_lsn;
1052	DB_REP *db_rep;
1053	LOG *lp;
1054	REP *rep;
1055	db_timespec msg_time, max_ts;
1056	u_int32_t gen, rectype;
1057	int cmp, event, master, newfile_seen, ret, set_apply, t_ret;
1058
1059	COMPQUIET(gen, 0);
1060	COMPQUIET(master, DB_EID_INVALID);
1061
1062	db_rep = env->rep_handle;
1063	rep = db_rep->region;
1064	event = ret = set_apply = 0;
1065	memset(&control_dbt, 0, sizeof(control_dbt));
1066	memset(&rec_dbt, 0, sizeof(rec_dbt));
1067	ZERO_LSN(max_lsn);
1068	timespecclear(&max_ts);
1069	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1070	cmp = -2;		/* OOB value that LOG_COMPARE can't return. */
1071
1072	dblp = env->lg_handle;
1073	MUTEX_LOCK(env, rep->mtx_clientdb);
1074	/*
1075	 * Lazily open the temp db.  Always set the startup flag to 0
1076	 * because it was initialized from rep_start.
1077	 */
1078	if (db_rep->rep_db == NULL &&
1079	    (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
1080		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1081		goto out;
1082	}
1083	dbp = db_rep->rep_db;
1084	lp = dblp->reginfo.primary;
1085	newfile_seen = 0;
1086	REP_SYSTEM_LOCK(env);
1087	if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
1088	    LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0)
1089		lp->ready_lsn = rep->first_lsn;
1090	cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
1091	/*
1092	 * If we are going to skip or process any message other
1093	 * than a duplicate, make note of it if we're in an
1094	 * election so that the election can rerequest proactively.
1095	 */
1096	if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0)
1097		F_SET(rep, REP_F_SKIPPED_APPLY);
1098
1099	/*
1100	 * If we're in the middle of processing a NEWFILE, we've dropped
1101	 * the mutex and if this matches it is a duplicate record.  We
1102	 * do not want this call taking the "matching" code below because
1103	 * we may then process later records in the temp db and the
1104	 * original NEWFILE may not have the log file ready.  It will
1105	 * process those temp db items when it completes.
1106	 */
1107	if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0)
1108		cmp = -1;
1109
1110	if (cmp == 0) {
1111		/*
1112		 * If we are in an election (i.e. we've sent a vote
1113		 * with an LSN in it), then we drop the next record
1114		 * we're expecting.  When we find a master, we'll
1115		 * either go into sync, or if it was an existing
1116		 * master, rerequest this one record (later records
1117		 * are accumulating in the temp db).
1118		 *
1119		 * We can simply return here, and rep_process_message
1120		 * will set NOTPERM if necessary for this record.
1121		 */
1122		if (F_ISSET(rep, REP_F_READY_APPLY)) {
1123			/*
1124			 * We will simply return now.  All special return
1125			 * processing should be ignored because the special
1126			 * values are just initialized.  Variables like
1127			 * max_lsn are still 0.
1128			 */
1129			RPRINT(env, DB_VERB_REP_MISC, (env,
1130			    "rep_apply: In election. Ignoring [%lu][%lu]",
1131			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1132			REP_SYSTEM_UNLOCK(env);
1133			MUTEX_UNLOCK(env, rep->mtx_clientdb);
1134			goto out;
1135		}
1136		rep->apply_th++;
1137		set_apply = 1;
1138		RPRINT(env, DB_VERB_REP_MISC, (env,
1139		    "rep_apply: Set apply_th %d", rep->apply_th));
1140		REP_SYSTEM_UNLOCK(env);
1141		if (rp->rectype == REP_NEWFILE)
1142			newfile_seen = 1;
1143		if ((ret = __rep_process_rec(env, ip,
1144		    rp, rec, &max_ts, &max_lsn)) != 0)
1145			goto err;
1146		/*
1147		 * If we get the record we are expecting, reset
1148		 * the count of records we've received and are applying
1149		 * towards the request interval.
1150		 */
1151		__os_gettime(env, &lp->rcvd_ts, 1);
1152		ZERO_LSN(lp->max_wait_lsn);
1153
1154		/*
1155		 * The __rep_remfirst() and __rep_getnext() functions each open,
1156		 * use and then close a cursor on the temp db, each time through
1157		 * the loop.  Although this may seem excessive, it is necessary
1158		 * to avoid locking problems with checkpoints.
1159		 */
1160		while (ret == 0 &&
1161		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
1162			/*
1163			 * We just filled in a gap in the log record stream.
1164			 * Write subsequent records to the log.
1165			 */
1166gap_check:
1167			if ((ret = __rep_remfirst(env, ip,
1168			     &control_dbt, &rec_dbt)) != 0)
1169				goto err;
1170
1171			rp = (__rep_control_args *)control_dbt.data;
1172			timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1173			rec = &rec_dbt;
1174			if (rp->rectype == REP_NEWFILE)
1175				newfile_seen = 1;
1176			if ((ret = __rep_process_rec(env, ip,
1177			    rp, rec, &max_ts, &max_lsn)) != 0)
1178				goto err;
1179
1180			--rep->stat.st_log_queued;
1181
1182			/*
1183			 * Since we just filled a gap in the log stream, and
1184			 * we're writing subsequent records to the log, we want
1185			 * to use rcvd_ts and wait_ts so that we will
1186			 * request the next gap if we end up with a gap and
1187			 * not so recent records in the temp db, but not
1188			 * request if recent records are in the temp db and
1189			 * likely to arrive on its own shortly.  We want to
1190			 * avoid requesting the record in that case.  Also
1191			 * reset max_wait_lsn because the next gap is a
1192			 * fresh gap.
1193			 */
1194			lp->rcvd_ts = lp->last_ts;
1195			lp->wait_ts = rep->request_gap;
1196			if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) {
1197				__os_gettime(env, &lp->rcvd_ts, 1);
1198				ret = 0;
1199				break;
1200			} else if (ret != 0)
1201				goto err;
1202		}
1203
1204		/*
1205		 * Check if we're at a gap in the table and if so, whether we
1206		 * need to ask for any records.
1207		 */
1208		if (!IS_ZERO_LSN(lp->waiting_lsn) &&
1209		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
1210			/*
1211			 * We got a record and processed it, but we may
1212			 * still be waiting for more records.  If we
1213			 * filled a gap we keep a count of how many other
1214			 * records are in the temp database and if we should
1215			 * request the next gap at this time.
1216			 */
1217			if (__rep_check_doreq(env, rep) && (ret =
1218			    __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0)
1219				goto err;
1220		} else {
1221			lp->wait_ts = rep->request_gap;
1222			ZERO_LSN(lp->max_wait_lsn);
1223		}
1224
1225	} else if (cmp > 0) {
1226		/*
1227		 * The LSN is higher than the one we were waiting for.
1228		 * This record isn't in sequence; add it to the temporary
1229		 * database, update waiting_lsn if necessary, and perform
1230		 * calculations to determine if we should issue requests
1231		 * for new records.
1232		 */
1233		REP_SYSTEM_UNLOCK(env);
1234		memset(&key_dbt, 0, sizeof(key_dbt));
1235		key_dbt.data = rp;
1236		key_dbt.size = sizeof(*rp);
1237		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
1238		if (ret == 0) {
1239			rep->stat.st_log_queued++;
1240			__os_gettime(env, &lp->last_ts, 1);
1241#ifdef HAVE_STATISTICS
1242			STAT(rep->stat.st_log_queued_total++);
1243			if (rep->stat.st_log_queued_max <
1244			    rep->stat.st_log_queued)
1245				rep->stat.st_log_queued_max =
1246				    rep->stat.st_log_queued;
1247#endif
1248		}
1249
1250		if (ret == DB_KEYEXIST)
1251			ret = 0;
1252		if (ret != 0)
1253			goto done;
1254
1255		if (IS_ZERO_LSN(lp->waiting_lsn) ||
1256		    LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0) {
1257			/*
1258			 * If this is a new gap, then reset the rcvd_ts so
1259			 * that an out-of-order record after an idle period
1260			 * does not (likely) immediately rerequest.
1261			 */
1262			if (IS_ZERO_LSN(lp->waiting_lsn))
1263				__os_gettime(env, &lp->rcvd_ts, 1);
1264			lp->waiting_lsn = rp->lsn;
1265		}
1266
1267		if (__rep_check_doreq(env, rep) &&
1268		    (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0))
1269			goto err;
1270
1271		/*
1272		 * If this is permanent; let the caller know that we have
1273		 * not yet written it to disk, but we've accepted it.
1274		 */
1275		if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1276			max_lsn = rp->lsn;
1277			ret = DB_REP_NOTPERM;
1278		}
1279		goto done;
1280	} else {
1281		STAT(rep->stat.st_log_duplicated++);
1282		REP_SYSTEM_UNLOCK(env);
1283		if (is_dupp != NULL)
1284			*is_dupp = 1;
1285		LOGCOPY_32(env, &rectype, rec->data);
1286		if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
1287			max_lsn = lp->max_perm_lsn;
1288		/*
1289		 * We check REPCTL_LEASE here, because this client may
1290		 * have leases configured but the master may not (especially
1291		 * in a mixed version group.  If the master has leases
1292		 * configured, all clients must also.
1293		 */
1294		if (IS_USING_LEASES(env) &&
1295		    F_ISSET(rp, REPCTL_LEASE) &&
1296		    timespecisset(&msg_time)) {
1297			if (timespeccmp(&msg_time, &lp->max_lease_ts, >))
1298				max_ts = msg_time;
1299			else
1300				max_ts = lp->max_lease_ts;
1301		}
1302		goto done;
1303	}
1304
1305	/* Check if we need to go back into the table. */
1306	if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0)
1307		goto gap_check;
1308
1309done:
1310err:	/*
1311	 * In case of a race, to make sure only one thread can get
1312	 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to
1313	 * this point.
1314	 */
1315	REP_SYSTEM_LOCK(env);
1316	if (ret == 0 &&
1317	    F_ISSET(rep, REP_F_RECOVER_LOG) &&
1318	    !IS_ZERO_LSN(rep->last_lsn) &&
1319	    LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) {
1320		*last_lsnp = max_lsn;
1321		ZERO_LSN(rep->last_lsn);
1322		ZERO_LSN(max_lsn);
1323		ret = DB_REP_LOGREADY;
1324	}
1325	/*
1326	 * Only decrement if we were actually applying log records.
1327	 * We do not care if we processed a dup record or put one
1328	 * in the temp db.
1329	 */
1330	if (set_apply) {
1331		rep->apply_th--;
1332		RPRINT(env, DB_VERB_REP_MISC, (env,
1333		    "rep_apply: Decrement apply_th %d [%lu][%lu]",
1334		    rep->apply_th, (u_long)lp->ready_lsn.file,
1335		    (u_long)lp->ready_lsn.offset));
1336	}
1337
1338	if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
1339	    !IS_ZERO_LSN(max_lsn)) {
1340		if (ret_lsnp != NULL)
1341			*ret_lsnp = max_lsn;
1342		ret = DB_REP_ISPERM;
1343		DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0);
1344		lp->max_perm_lsn = max_lsn;
1345	}
1346
1347	/*
1348	 * Start-up is complete when we process (or have already processed) up
1349	 * to the end of the replication group's log.  In case we miss that
1350	 * message, as a back-up, we also recognize start-up completion when we
1351	 * actually process a live log record.  Having cmp==0 here (with a good
1352	 * "ret" value) implies we actually processed the record.
1353	 */
1354	if ((ret == 0 || ret == DB_REP_ISPERM) &&
1355	    rep->stat.st_startup_complete == 0 &&
1356	    !F_ISSET(rep, REP_F_RECOVER_LOG) &&
1357	    ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) ||
1358	    (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) {
1359		rep->stat.st_startup_complete = 1;
1360		event = 1;
1361		gen = rep->gen;
1362		master = rep->master_id;
1363	}
1364	REP_SYSTEM_UNLOCK(env);
1365	/*
1366	 * If we've processed beyond the needed LSN for a pending
1367	 * start sync, start it now.  We can compare >= here
1368	 * because ready_lsn is the next record we expect.
1369	 * Since ckp_lsn can point to the last commit record itself,
1370	 * but if it does and ready_lsn == commit (i.e. we haven't
1371	 * written the commit yet), we can still start to sync
1372	 * because we're guaranteed no additional buffers can
1373	 * be dirtied.
1374	 */
1375	if (!IS_ZERO_LSN(rep->ckp_lsn) &&
1376	    LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) {
1377		save_lsn = rep->ckp_lsn;
1378		ZERO_LSN(rep->ckp_lsn);
1379	} else
1380		ZERO_LSN(save_lsn);
1381
1382	/*
1383	 * If this is a perm record, we are using leases, update the lease
1384	 * grant.  We must hold the clientdb mutex.  We must not hold
1385	 * the region mutex because rep_update_grant will acquire it.
1386	 */
1387	if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) &&
1388	    timespecisset(&max_ts)) {
1389		if ((t_ret = __rep_update_grant(env, &max_ts)) != 0)
1390			ret = t_ret;
1391		else if (timespeccmp(&max_ts, &lp->max_lease_ts, >))
1392			lp->max_lease_ts = max_ts;
1393	}
1394
1395	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1396	if (!IS_ZERO_LSN(save_lsn)) {
1397		/*
1398		 * Now call memp_sync holding only the ckp mutex.
1399		 */
1400		MUTEX_LOCK(env, rep->mtx_ckp);
1401		RPRINT(env, DB_VERB_REP_MISC, (env,
1402		    "Starting delayed __memp_sync call [%lu][%lu]",
1403		    (u_long)save_lsn.file, (u_long)save_lsn.offset));
1404		t_ret = __memp_sync(env,
1405		    DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &save_lsn);
1406		MUTEX_UNLOCK(env, rep->mtx_ckp);
1407	}
1408	if (event) {
1409		RPRINT(env, DB_VERB_REP_MISC, (env,
1410		    "Start-up is done [%lu][%lu]",
1411		    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1412
1413		if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) {
1414			DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM);
1415			/* Failure trumps either of those values. */
1416			ret = t_ret;
1417			goto out;
1418		}
1419	}
1420	if ((ret == 0 || ret == DB_REP_ISPERM) &&
1421	    newfile_seen && lp->db_log_autoremove)
1422		__log_autoremove(env);
1423	if (control_dbt.data != NULL)
1424		__os_ufree(env, control_dbt.data);
1425	if (rec_dbt.data != NULL)
1426		__os_ufree(env, rec_dbt.data);
1427
1428out:
1429	switch (ret) {
1430	case 0:
1431		break;
1432	case DB_REP_ISPERM:
1433		RPRINT(env, DB_VERB_REP_MSGS,
1434		    (env, "Returning ISPERM [%lu][%lu], cmp = %d",
1435		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1436		break;
1437	case DB_REP_LOGREADY:
1438		RPRINT(env, DB_VERB_REP_MSGS, (env,
1439		    "Returning LOGREADY up to [%lu][%lu], cmp = %d",
1440		    (u_long)last_lsnp->file,
1441		    (u_long)last_lsnp->offset, cmp));
1442		break;
1443	case DB_REP_NOTPERM:
1444		if (!F_ISSET(rep, REP_F_RECOVER_LOG) &&
1445		    !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
1446			*ret_lsnp = max_lsn;
1447
1448		RPRINT(env, DB_VERB_REP_MSGS,
1449		    (env, "Returning NOTPERM [%lu][%lu], cmp = %d",
1450		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1451		break;
1452	default:
1453		RPRINT(env, DB_VERB_REP_MSGS,
1454		    (env, "Returning %d [%lu][%lu], cmp = %d", ret,
1455		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1456		break;
1457	}
1458
1459	return (ret);
1460}
1461
1462/*
1463 * __rep_process_txn --
1464 *
1465 * This is the routine that actually gets a transaction ready for
1466 * processing.
1467 *
1468 * PUBLIC: int __rep_process_txn __P((ENV *, DBT *));
1469 */
1470int
1471__rep_process_txn(env, rec)
1472	ENV *env;
1473	DBT *rec;
1474{
1475	DBT data_dbt, *lock_dbt;
1476	DB_LOCKER *locker;
1477	DB_LOCKREQ req, *lvp;
1478	DB_LOGC *logc;
1479	DB_LSN prev_lsn, *lsnp;
1480	DB_REP *db_rep;
1481	DB_THREAD_INFO *ip;
1482	DB_TXNHEAD *txninfo;
1483	LSN_COLLECTION lc;
1484	REP *rep;
1485	__txn_regop_args *txn_args;
1486	__txn_regop_42_args *txn42_args;
1487	__txn_prepare_args *prep_args;
1488	u_int32_t rectype;
1489	u_int i;
1490	int ret, t_ret;
1491
1492	db_rep = env->rep_handle;
1493	rep = db_rep->region;
1494	logc = NULL;
1495	txn_args = NULL;
1496	txn42_args = NULL;
1497	prep_args = NULL;
1498	txninfo = NULL;
1499
1500	ENV_ENTER(env, ip);
1501	memset(&data_dbt, 0, sizeof(data_dbt));
1502	if (F_ISSET(env, ENV_THREAD))
1503		F_SET(&data_dbt, DB_DBT_REALLOC);
1504
1505	/*
1506	 * There are two phases:  First, we have to traverse backwards through
1507	 * the log records gathering the list of all LSNs in the transaction.
1508	 * Once we have this information, we can loop through and then apply it.
1509	 *
1510	 * We may be passed a prepare (if we're restoring a prepare on upgrade)
1511	 * instead of a commit (the common case).  Check which it is and behave
1512	 * appropriately.
1513	 */
1514	LOGCOPY_32(env, &rectype, rec->data);
1515	memset(&lc, 0, sizeof(lc));
1516	if (rectype == DB___txn_regop) {
1517		/*
1518		 * We're the end of a transaction.  Make sure this is
1519		 * really a commit and not an abort!
1520		 */
1521		if (rep->version >= DB_REPVERSION_44) {
1522			if ((ret = __txn_regop_read(
1523			    env, rec->data, &txn_args)) != 0)
1524				return (ret);
1525			if (txn_args->opcode != TXN_COMMIT) {
1526				__os_free(env, txn_args);
1527				return (0);
1528			}
1529			prev_lsn = txn_args->prev_lsn;
1530			lock_dbt = &txn_args->locks;
1531		} else {
1532			if ((ret = __txn_regop_42_read(
1533			    env, rec->data, &txn42_args)) != 0)
1534				return (ret);
1535			if (txn42_args->opcode != TXN_COMMIT) {
1536				__os_free(env, txn42_args);
1537				return (0);
1538			}
1539			prev_lsn = txn42_args->prev_lsn;
1540			lock_dbt = &txn42_args->locks;
1541		}
1542	} else {
1543		/* We're a prepare. */
1544		DB_ASSERT(env, rectype == DB___txn_prepare);
1545
1546		if ((ret = __txn_prepare_read(
1547		    env, rec->data, &prep_args)) != 0)
1548			return (ret);
1549		prev_lsn = prep_args->prev_lsn;
1550		lock_dbt = &prep_args->locks;
1551	}
1552
1553	/* Get locks. */
1554	if ((ret = __lock_id(env, NULL, &locker)) != 0)
1555		goto err1;
1556
1557	if ((ret =
1558	      __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
1559		goto err;
1560
1561	/* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
1562	if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0)
1563		goto err;
1564	qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
1565
1566	/*
1567	 * The set of records for a transaction may include dbreg_register
1568	 * records.  Create a txnlist so that they can keep track of file
1569	 * state between records.
1570	 */
1571	if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
1572		goto err;
1573
1574	/* Phase 2: Apply updates. */
1575	if ((ret = __log_cursor(env, &logc)) != 0)
1576		goto err;
1577	for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
1578		if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
1579			__db_errx(env, "failed to read the log at [%lu][%lu]",
1580			    (u_long)lsnp->file, (u_long)lsnp->offset);
1581			goto err;
1582		}
1583		if ((ret = __db_dispatch(env, &env->recover_dtab,
1584		    &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) {
1585			__db_errx(env, "transaction failed at [%lu][%lu]",
1586			    (u_long)lsnp->file, (u_long)lsnp->offset);
1587			goto err;
1588		}
1589	}
1590
1591err:	memset(&req, 0, sizeof(req));
1592	req.op = DB_LOCK_PUT_ALL;
1593	if ((t_ret =
1594	     __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0)
1595		ret = t_ret;
1596
1597	if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
1598		ret = t_ret;
1599
1600err1:	if (txn_args != NULL)
1601		__os_free(env, txn_args);
1602	if (txn42_args != NULL)
1603		__os_free(env, txn42_args);
1604	if (prep_args != NULL)
1605		__os_free(env, prep_args);
1606	if (lc.array != NULL)
1607		__os_free(env, lc.array);
1608
1609	if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
1610		ret = t_ret;
1611
1612	if (txninfo != NULL)
1613		__db_txnlist_end(env, txninfo);
1614
1615	if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
1616		__os_ufree(env, data_dbt.data);
1617
1618#ifdef HAVE_STATISTICS
1619	if (ret == 0)
1620		/*
1621		 * We don't hold the rep mutex, and could miscount if we race.
1622		 */
1623		rep->stat.st_txns_applied++;
1624#endif
1625
1626	return (ret);
1627}
1628
1629/*
1630 * __rep_collect_txn
1631 *	Recursive function that will let us visit every entry in a transaction
1632 *	chain including all child transactions so that we can then apply
1633 *	the entire transaction family at once.
1634 */
1635static int
1636__rep_collect_txn(env, lsnp, lc)
1637	ENV *env;
1638	DB_LSN *lsnp;
1639	LSN_COLLECTION *lc;
1640{
1641	__txn_child_args *argp;
1642	DB_LOGC *logc;
1643	DB_LSN c_lsn;
1644	DBT data;
1645	u_int32_t rectype;
1646	u_int nalloc;
1647	int ret, t_ret;
1648
1649	memset(&data, 0, sizeof(data));
1650	F_SET(&data, DB_DBT_REALLOC);
1651
1652	if ((ret = __log_cursor(env, &logc)) != 0)
1653		return (ret);
1654
1655	while (!IS_ZERO_LSN(*lsnp) &&
1656	    (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
1657		LOGCOPY_32(env, &rectype, data.data);
1658		if (rectype == DB___txn_child) {
1659			if ((ret = __txn_child_read(
1660			    env, data.data, &argp)) != 0)
1661				goto err;
1662			c_lsn = argp->c_lsn;
1663			*lsnp = argp->prev_lsn;
1664			__os_free(env, argp);
1665			ret = __rep_collect_txn(env, &c_lsn, lc);
1666		} else {
1667			if (lc->nalloc < lc->nlsns + 1) {
1668				nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
1669				if ((ret = __os_realloc(env,
1670				    nalloc * sizeof(DB_LSN), &lc->array)) != 0)
1671					goto err;
1672				lc->nalloc = nalloc;
1673			}
1674			lc->array[lc->nlsns++] = *lsnp;
1675
1676			/*
1677			 * Explicitly copy the previous lsn.  The record
1678			 * starts with a u_int32_t record type, a u_int32_t
1679			 * txn id, and then the DB_LSN (prev_lsn) that we
1680			 * want.  We copy explicitly because we have no idea
1681			 * what kind of record this is.
1682			 */
1683			LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data +
1684			    sizeof(u_int32_t) + sizeof(u_int32_t));
1685		}
1686
1687		if (ret != 0)
1688			goto err;
1689	}
1690	if (ret != 0)
1691		__db_errx(env, "collect failed at: [%lu][%lu]",
1692		    (u_long)lsnp->file, (u_long)lsnp->offset);
1693
1694err:	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
1695		ret = t_ret;
1696	if (data.data != NULL)
1697		__os_ufree(env, data.data);
1698	return (ret);
1699}
1700
1701/*
1702 * __rep_lsn_cmp --
1703 *	qsort-type-compatible wrapper for LOG_COMPARE.
1704 */
1705static int
1706__rep_lsn_cmp(lsn1, lsn2)
1707	const void *lsn1, *lsn2;
1708{
1709
1710	return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2));
1711}
1712
1713/*
1714 * __rep_newfile --
1715 *	NEWFILE messages have the LSN of the last record in the previous
1716 * log file.  When applying a NEWFILE message, make sure we haven't already
1717 * swapped files.  Assume caller hold mtx_clientdb.
1718 */
1719static int
1720__rep_newfile(env, rp, rec)
1721	ENV *env;
1722	__rep_control_args *rp;
1723	DBT *rec;
1724{
1725	DB_LOG *dblp;
1726	DB_LSN tmplsn;
1727	DB_REP *db_rep;
1728	LOG *lp;
1729	REP *rep;
1730	__rep_newfile_args nf_args;
1731	int ret;
1732
1733	dblp = env->lg_handle;
1734	lp = dblp->reginfo.primary;
1735	db_rep = env->rep_handle;
1736	rep = db_rep->region;
1737
1738	/*
1739	 * If a newfile is already in progress, just ignore.
1740	 */
1741	if (F_ISSET(rep, REP_F_NEWFILE))
1742		return (0);
1743	if (rp->lsn.file + 1 > lp->ready_lsn.file) {
1744		if (rec == NULL || rec->size == 0) {
1745			RPRINT(env, DB_VERB_REP_MISC, (env,
1746"rep_newfile: Old-style NEWFILE msg.  Use control msg log version: %lu",
1747    (u_long) rp->log_version));
1748			nf_args.version = rp->log_version;
1749		} else if (rp->rep_version < DB_REPVERSION_47)
1750			nf_args.version = *(u_int32_t *)rec->data;
1751		else if ((ret = __rep_newfile_unmarshal(env, &nf_args,
1752		    rec->data, rec->size, NULL)) != 0)
1753			return (ret);
1754		RPRINT(env, DB_VERB_REP_MISC,
1755		    (env, "rep_newfile: File %lu vers %lu",
1756		    (u_long)rp->lsn.file + 1, (u_long)nf_args.version));
1757
1758		/*
1759		 * We drop the mtx_clientdb mutex during
1760		 * the file operation, and then reacquire it when
1761		 * we're done.  We avoid colliding with new incoming
1762		 * log records because lp->ready_lsn is not getting
1763		 * updated and there is no real log record at this
1764		 * ready_lsn.  We avoid colliding with a duplicate
1765		 * NEWFILE message by setting an in-progress flag.
1766		 */
1767		REP_SYSTEM_LOCK(env);
1768		F_SET(rep, REP_F_NEWFILE);
1769		REP_SYSTEM_UNLOCK(env);
1770		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1771		LOG_SYSTEM_LOCK(env);
1772		ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version);
1773		LOG_SYSTEM_UNLOCK(env);
1774		MUTEX_LOCK(env, rep->mtx_clientdb);
1775		REP_SYSTEM_LOCK(env);
1776		F_CLR(rep, REP_F_NEWFILE);
1777		REP_SYSTEM_UNLOCK(env);
1778		if (ret == 0)
1779			lp->ready_lsn = tmplsn;
1780		return (ret);
1781	} else
1782		/* We've already applied this NEWFILE.  Just ignore it. */
1783		return (0);
1784}
1785
1786/*
1787 * __rep_do_ckp --
1788 * Perform the memp_sync necessary for this checkpoint without holding the
1789 * REP->mtx_clientdb.  Callers of this function must hold REP->mtx_clientdb
1790 * and must not be holding the region mutex.
1791 */
1792static int
1793__rep_do_ckp(env, rec, rp)
1794	ENV *env;
1795	DBT *rec;
1796	__rep_control_args *rp;
1797{
1798	DB_ENV *dbenv;
1799	__txn_ckp_args *ckp_args;
1800	DB_LSN ckp_lsn;
1801	REP *rep;
1802	int ret;
1803
1804	dbenv = env->dbenv;
1805
1806	/* Crack the log record and extract the checkpoint LSN. */
1807	if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0)
1808		return (ret);
1809	ckp_lsn = ckp_args->ckp_lsn;
1810	__os_free(env, ckp_args);
1811
1812	rep = env->rep_handle->region;
1813
1814	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1815	DB_TEST_WAIT(env, env->test_check);
1816
1817	/*
1818	 * Sync the memory pool.
1819	 *
1820	 * This is the real PERM lock record/ckp.  We cannot return ISPERM
1821	 * if we haven't truly completed the checkpoint, so we don't allow
1822	 * this call to be interrupted.
1823	 *
1824	 * We may be overlapping our log record with an in-progress startsync
1825	 * of this checkpoint; suppress the max_write settings on any running
1826	 * cache-flush operation so it completes quickly.
1827	 */
1828	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1);
1829	MUTEX_LOCK(env, rep->mtx_ckp);
1830	ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn);
1831	MUTEX_UNLOCK(env, rep->mtx_ckp);
1832	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0);
1833
1834	/* Update the last_ckp in the txn region. */
1835	if (ret == 0)
1836		ret = __txn_updateckp(env, &rp->lsn);
1837	else {
1838		__db_errx(env, "Error syncing ckp [%lu][%lu]",
1839		    (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
1840		ret = __env_panic(env, ret);
1841	}
1842
1843	MUTEX_LOCK(env, rep->mtx_clientdb);
1844	return (ret);
1845}
1846
1847/*
1848 * __rep_remfirst --
1849 * Remove the first entry from the __db.rep.db
1850 */
1851static int
1852__rep_remfirst(env, ip, cntrl, rec)
1853	ENV *env;
1854	DB_THREAD_INFO *ip;
1855	DBT *cntrl;
1856	DBT *rec;
1857{
1858	DB *dbp;
1859	DBC *dbc;
1860	DB_REP *db_rep;
1861	int ret, t_ret;
1862
1863	db_rep = env->rep_handle;
1864	dbp = db_rep->rep_db;
1865	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
1866		return (ret);
1867
1868	/* The DBTs need to persist through another call. */
1869	F_SET(cntrl, DB_DBT_REALLOC);
1870	F_SET(rec, DB_DBT_REALLOC);
1871	if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
1872		ret = __dbc_del(dbc, 0);
1873	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1874		ret = t_ret;
1875
1876	return (ret);
1877}
1878
1879/*
1880 * __rep_getnext --
1881 * Get the next record out of the __db.rep.db table.
1882 */
1883static int
1884__rep_getnext(env, ip)
1885	ENV *env;
1886	DB_THREAD_INFO *ip;
1887{
1888	DB *dbp;
1889	DBC *dbc;
1890	DBT lsn_dbt, nextrec_dbt;
1891	DB_LOG *dblp;
1892	DB_REP *db_rep;
1893	LOG *lp;
1894	__rep_control_args *rp;
1895	int ret, t_ret;
1896
1897	dblp = env->lg_handle;
1898	lp = dblp->reginfo.primary;
1899
1900	db_rep = env->rep_handle;
1901	dbp = db_rep->rep_db;
1902
1903	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
1904		return (ret);
1905
1906	/*
1907	 * Update waiting_lsn.  We need to move it
1908	 * forward to the LSN of the next record
1909	 * in the queue.
1910	 *
1911	 * If the next item in the database is a log
1912	 * record--the common case--we're not
1913	 * interested in its contents, just in its LSN.
1914	 * Optimize by doing a partial get of the data item.
1915	 */
1916	memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
1917	F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
1918	nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
1919
1920	memset(&lsn_dbt, 0, sizeof(lsn_dbt));
1921	ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
1922	if (ret != DB_NOTFOUND && ret != 0)
1923		goto err;
1924
1925	if (ret == DB_NOTFOUND) {
1926		ZERO_LSN(lp->waiting_lsn);
1927		/*
1928		 * Whether or not the current record is
1929		 * simple, there's no next one, and
1930		 * therefore we haven't got anything
1931		 * else to do right now.  Break out.
1932		 */
1933		goto err;
1934	}
1935	rp = (__rep_control_args *)lsn_dbt.data;
1936	lp->waiting_lsn = rp->lsn;
1937
1938err:	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1939		ret = t_ret;
1940	return (ret);
1941}
1942
1943/*
1944 * __rep_process_rec --
1945 *
1946 * Given a record in 'rp', process it.  In the case of a NEWFILE, that means
1947 * potentially switching files.  In the case of a checkpoint, it means doing
1948 * the checkpoint, and in other cases, it means simply writing the record into
1949 * the log.
1950 */
1951static int
1952__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
1953	ENV *env;
1954	DB_THREAD_INFO *ip;
1955	__rep_control_args *rp;
1956	DBT *rec;
1957	db_timespec *ret_tsp;
1958	DB_LSN *ret_lsnp;
1959{
1960	DB *dbp;
1961	DBT control_dbt, key_dbt, rec_dbt;
1962	DB_REP *db_rep;
1963	REP *rep;
1964	db_timespec msg_time;
1965	u_int32_t rectype, txnid;
1966	int ret, t_ret;
1967
1968	db_rep = env->rep_handle;
1969	rep = db_rep->region;
1970	dbp = db_rep->rep_db;
1971	ret = 0;
1972
1973	if (rp->rectype == REP_NEWFILE) {
1974		ret = __rep_newfile(env, rp, rec);
1975		return (0);
1976	}
1977
1978	LOGCOPY_32(env, &rectype, rec->data);
1979	memset(&control_dbt, 0, sizeof(control_dbt));
1980	memset(&rec_dbt, 0, sizeof(rec_dbt));
1981	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1982
1983	/*
1984	 * We write all records except for checkpoint records here.
1985	 * All non-checkpoint records need to appear in the log before
1986	 * we take action upon them (i.e., we enforce write-ahead logging).
1987	 * However, we can't write the checkpoint record here until the
1988	 * data buffers are actually written to disk, else we are creating
1989	 * an invalid log -- one that says all data before a certain point
1990	 * has been written to disk.
1991	 *
1992	 * If two threads are both processing the same checkpoint record
1993	 * (because, for example, it was resent and the original finally
1994	 * arrived), we handle that below by checking for the existence of
1995	 * the log record when we add it to the replication database.
1996	 *
1997	 * Any log records that arrive while we are processing the checkpoint
1998	 * are added to the bookkeeping database because ready_lsn is not yet
1999	 * updated to point after the checkpoint record.
2000	 */
2001	if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
2002		if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0)
2003			return (ret);
2004		STAT(rep->stat.st_log_records++);
2005		if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
2006			*ret_lsnp = rp->lsn;
2007			goto out;
2008		}
2009	}
2010
2011	switch (rectype) {
2012	case DB___dbreg_register:
2013		/*
2014		 * DB opens occur in the context of a transaction, so we can
2015		 * simply handle them when we process the transaction.  Closes,
2016		 * however, are not transaction-protected, so we have to handle
2017		 * them here.
2018		 *
2019		 * It should be unsafe for the master to do a close of a file
2020		 * that was opened in an active transaction, so we should be
2021		 * guaranteed to get the ordering right.
2022		 *
2023		 * !!!
2024		 * The txn ID is the second 4-byte field of the log record.
2025		 * We should really be calling __dbreg_register_read() and
2026		 * working from the __dbreg_register_args structure, but this
2027		 * is considerably faster and the order of the fields won't
2028		 * change.
2029		 */
2030		LOGCOPY_32(env, &txnid,
2031		    (u_int8_t *)rec->data + sizeof(u_int32_t));
2032		if (txnid == TXN_INVALID)
2033			ret = __db_dispatch(env, &env->recover_dtab,
2034			    rec, &rp->lsn, DB_TXN_APPLY, NULL);
2035		break;
2036	case DB___txn_regop:
2037		/*
2038		 * If an application is doing app-specific recovery
2039		 * and acquires locks while applying a transaction,
2040		 * it can deadlock.  Any other locks held by this
2041		 * thread should have been discarded in the
2042		 * __rep_process_txn error path, so if we simply
2043		 * retry, we should eventually succeed.
2044		 */
2045		do {
2046			ret = 0;
2047			if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
2048				ret = __txn_openfiles(env, ip, NULL, 1);
2049				F_SET(db_rep, DBREP_OPENFILES);
2050			}
2051			if (ret == 0)
2052				ret = __rep_process_txn(env, rec);
2053		} while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED);
2054
2055		/* Now flush the log unless we're running TXN_NOSYNC. */
2056		if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
2057			ret = __log_flush(env, NULL);
2058		if (ret != 0) {
2059			__db_errx(env, "Error processing txn [%lu][%lu]",
2060			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
2061			ret = __env_panic(env, ret);
2062		}
2063		*ret_lsnp = rp->lsn;
2064		break;
2065	case DB___txn_prepare:
2066		ret = __log_flush(env, NULL);
2067		/*
2068		 * Save the biggest prepared LSN we've seen.
2069		 */
2070		rep->max_prep_lsn = rp->lsn;
2071		RPRINT(env, DB_VERB_REP_MSGS,
2072		    (env, "process_rec: prepare at [%lu][%lu]",
2073		    (u_long)rep->max_prep_lsn.file,
2074		    (u_long)rep->max_prep_lsn.offset));
2075		break;
2076	case DB___txn_ckp:
2077		/*
2078		 * We do not want to hold the REP->mtx_clientdb mutex while
2079		 * syncing the mpool, so if we get a checkpoint record we are
2080		 * supposed to process, add it to the __db.rep.db, do the
2081		 * memp_sync and then go back and process it later, when the
2082		 * sync has finished.  If this record is already in the table,
2083		 * then some other thread will process it, so simply return
2084		 * REP_NOTPERM.
2085		 */
2086		memset(&key_dbt, 0, sizeof(key_dbt));
2087		key_dbt.data = rp;
2088		key_dbt.size = sizeof(*rp);
2089
2090		/*
2091		 * We want to put this record into the tmp DB only if
2092		 * it doesn't exist, so use DB_NOOVERWRITE.
2093		 */
2094		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
2095		if (ret == DB_KEYEXIST) {
2096			if (ret_lsnp != NULL)
2097				*ret_lsnp = rp->lsn;
2098			ret = DB_REP_NOTPERM;
2099		}
2100		if (ret != 0)
2101			break;
2102
2103		/*
2104		 * Now, do the checkpoint.  Regardless of
2105		 * whether the checkpoint succeeds or not,
2106		 * we need to remove the record we just put
2107		 * in the temporary database.  If the
2108		 * checkpoint failed, return an error.  We
2109		 * will act like we never received the
2110		 * checkpoint.
2111		 */
2112		if ((ret = __rep_do_ckp(env, rec, rp)) == 0)
2113			ret = __log_rep_put(env, &rp->lsn, rec,
2114			    DB_LOG_CHKPNT);
2115		if ((t_ret = __rep_remfirst(env, ip,
2116		    &control_dbt, &rec_dbt)) != 0 && ret == 0)
2117			ret = t_ret;
2118		/*
2119		 * If we're successful putting the log record in the
2120		 * log, flush it for a checkpoint.
2121		 */
2122		if (ret == 0) {
2123			*ret_lsnp = rp->lsn;
2124			ret = __log_flush(env, NULL);
2125		}
2126		break;
2127	default:
2128		break;
2129	}
2130
2131out:
2132	if (ret == 0 && F_ISSET(rp, REPCTL_PERM))
2133		*ret_lsnp = rp->lsn;
2134	if (IS_USING_LEASES(env) &&
2135	    F_ISSET(rp, REPCTL_LEASE))
2136		*ret_tsp = msg_time;
2137	/*
2138	 * Set ret_lsnp before flushing the log because if the
2139	 * flush fails, we've still written the record to the
2140	 * log and the LSN has been entered.
2141	 */
2142	if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH))
2143		ret = __log_flush(env, NULL);
2144	if (control_dbt.data != NULL)
2145		__os_ufree(env, control_dbt.data);
2146	if (rec_dbt.data != NULL)
2147		__os_ufree(env, rec_dbt.data);
2148
2149	return (ret);
2150}
2151
2152/*
2153 * __rep_resend_req --
2154 *	We might have dropped a message, we need to resend our request.
2155 *	The request we send is dependent on what recovery state we're in.
2156 *	The caller holds no locks.
2157 *
2158 * PUBLIC: int __rep_resend_req __P((ENV *, int));
2159 */
2160int
2161__rep_resend_req(env, rereq)
2162	ENV *env;
2163	int rereq;
2164{
2165	DB_LOG *dblp;
2166	DB_LSN lsn, *lsnp;
2167	DB_REP *db_rep;
2168	LOG *lp;
2169	REP *rep;
2170	int master, ret;
2171	u_int32_t gapflags, msgtype, repflags, sendflags;
2172
2173	db_rep = env->rep_handle;
2174	rep = db_rep->region;
2175	dblp = env->lg_handle;
2176	lp = dblp->reginfo.primary;
2177	ret = 0;
2178	lsnp = NULL;
2179	msgtype = REP_INVALID;
2180	sendflags = 0;
2181
2182	repflags = rep->flags;
2183	/*
2184	 * If we are delayed we do not rerequest anything.
2185	 */
2186	if (FLD_ISSET(repflags, REP_F_DELAY))
2187		return (ret);
2188	gapflags = rereq ? REP_GAP_REREQUEST : 0;
2189
2190	if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
2191		MUTEX_LOCK(env, rep->mtx_clientdb);
2192		lsn = lp->verify_lsn;
2193		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2194		if (!IS_ZERO_LSN(lsn)) {
2195			msgtype = REP_VERIFY_REQ;
2196			lsnp = &lsn;
2197			sendflags = DB_REP_REREQUEST;
2198		}
2199	} else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
2200		/*
2201		 * UPDATE_REQ only goes to the master.
2202		 */
2203		msgtype = REP_UPDATE_REQ;
2204	} else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
2205		REP_SYSTEM_LOCK(env);
2206		ret = __rep_pggap_req(env, rep, NULL, gapflags);
2207		REP_SYSTEM_UNLOCK(env);
2208	} else {
2209		MUTEX_LOCK(env, rep->mtx_clientdb);
2210		ret = __rep_loggap_req(env, rep, NULL, gapflags);
2211		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2212	}
2213
2214	if (msgtype != REP_INVALID) {
2215		master = rep->master_id;
2216		if (master == DB_EID_INVALID)
2217			(void)__rep_send_message(env,
2218			    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2219		else
2220			(void)__rep_send_message(env,
2221			    master, msgtype, lsnp, NULL, 0, sendflags);
2222	}
2223
2224	return (ret);
2225}
2226
2227/*
2228 * __rep_check_doreq --
2229 * PUBLIC: int __rep_check_doreq __P((ENV *, REP *));
2230 *
2231 * Check if we need to send another request.  If so, compare with
2232 * the request limits the user might have set.  This assumes the
2233 * caller holds the REP->mtx_clientdb mutex.  Returns 1 if a request
2234 * needs to be made, and 0 if it does not.
2235 */
2236int
2237__rep_check_doreq(env, rep)
2238	ENV *env;
2239	REP *rep;
2240{
2241
2242	DB_LOG *dblp;
2243	LOG *lp;
2244	db_timespec now;
2245	int req;
2246
2247	dblp = env->lg_handle;
2248	lp = dblp->reginfo.primary;
2249	__os_gettime(env, &now, 1);
2250	timespecsub(&now, &lp->rcvd_ts);
2251	req = timespeccmp(&now, &lp->wait_ts, >=);
2252	if (req) {
2253		/*
2254		 * Add wait_ts to itself to double it.
2255		 */
2256		timespecadd(&lp->wait_ts, &lp->wait_ts);
2257		if (timespeccmp(&lp->wait_ts, &rep->max_gap, >))
2258			lp->wait_ts = rep->max_gap;
2259		__os_gettime(env, &lp->rcvd_ts, 1);
2260	}
2261	return (req);
2262}
2263
2264/*
2265 * __rep_skip_msg -
2266 *
2267 *	If we're in recovery we want to skip/ignore the message, but
2268 *	we also need to see if we need to re-request any retransmissions.
2269 */
2270static int
2271__rep_skip_msg(env, rep, eid, rectype)
2272	ENV *env;
2273	REP *rep;
2274	int eid;
2275	u_int32_t rectype;
2276{
2277	int do_req, ret;
2278
2279	ret = 0;
2280	/*
2281	 * If we have a request message from a client then immediately
2282	 * send a REP_REREQUEST back to that client since we're skipping it.
2283	 */
2284	if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype))
2285		do_req = 1;
2286	else {
2287		/* Check for need to retransmit. */
2288		MUTEX_LOCK(env, rep->mtx_clientdb);
2289		do_req = __rep_check_doreq(env, rep);
2290		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2291	}
2292	/*
2293	 * Don't respond to a MASTER_REQ with
2294	 * a MASTER_REQ or REREQUEST.
2295	 */
2296	if (do_req && rectype != REP_MASTER_REQ) {
2297		/*
2298		 * There are three cases:
2299		 * 1.  If we don't know who the master is, then send MASTER_REQ.
2300		 * 2.  If the message we're skipping came from the master,
2301		 * then we need to rerequest.
2302		 * 3.  If the message didn't come from a master (i.e. client
2303		 * to client), then send a rerequest back to the sender so
2304		 * the sender can rerequest it elsewhere, if we are a client.
2305		 */
2306		if (rep->master_id == DB_EID_INVALID)	/* Case 1. */
2307			(void)__rep_send_message(env,
2308			    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2309		else if (eid == rep->master_id)		/* Case 2. */
2310			ret = __rep_resend_req(env, 0);
2311		else if (F_ISSET(rep, REP_F_CLIENT))	/* Case 3. */
2312			(void)__rep_send_message(env,
2313			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
2314	}
2315	return (ret);
2316}
2317
2318static int
2319__rep_fire_newmaster(env, gen, master)
2320	ENV *env;
2321	u_int32_t gen;
2322	int master;
2323{
2324	DB_REP *db_rep;
2325	REP *rep;
2326
2327	db_rep = env->rep_handle;
2328	rep = db_rep->region;
2329
2330	REP_EVENT_LOCK(env);
2331	/*
2332	 * The firing of this event should be idempotent with respect to a
2333	 * particular generation number.
2334	 */
2335	if (rep->newmaster_event_gen < gen) {
2336		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2337		rep->newmaster_event_gen = gen;
2338	}
2339	REP_EVENT_UNLOCK(env);
2340	return (0);
2341}
2342
2343static int
2344__rep_fire_startupdone(env, gen, master)
2345	ENV *env;
2346	u_int32_t gen;
2347	int master;
2348{
2349	DB_REP *db_rep;
2350	REP *rep;
2351
2352	db_rep = env->rep_handle;
2353	rep = db_rep->region;
2354
2355	REP_EVENT_LOCK(env);
2356	/*
2357	 * Usually NEWMASTER will already have been fired.  But if not, fire
2358	 * it here now, to ensure the application receives events in the
2359	 * expected order.
2360	 */
2361	if (rep->newmaster_event_gen < gen) {
2362		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2363		rep->newmaster_event_gen = gen;
2364	}
2365
2366	/*
2367	 * Caller already ensures that it only tries to fire STARTUPDONE once
2368	 * per generation.  If we did not want to rely on that, we could add a
2369	 * simple boolean flag (to the set of data protected by the mtx_event).
2370	 * The precise meaning of that flag would be "STARTUPDONE has been fired
2371	 * for the generation value stored in `newmaster_event_gen'".  Then the
2372	 * more accurate test here would be simply to check that flag, and fire
2373	 * the event (and set the flag) if it were not already set.
2374	 */
2375	if (rep->newmaster_event_gen == gen)
2376		__rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL);
2377	REP_EVENT_UNLOCK(env);
2378	return (0);
2379}
2380