1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_record.c,v 12.143 2008/03/13 16:21:04 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/txn.h"
18
19static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *));
20static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
21static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
22static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
23static int __rep_getnext __P((ENV *, DB_THREAD_INFO *));
24static int __rep_lsn_cmp __P((const void *, const void *));
25static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *));
26static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
27    DBT *, db_timespec *, DB_LSN *));
28static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *));
29static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t));
30
31/* Used to consistently designate which messages ought to be received where. */
32
33#define	MASTER_ONLY(rep, rp) do {					\
34	if (!F_ISSET(rep, REP_F_MASTER)) {				\
35		RPRINT(env, DB_VERB_REP_MSGS,				\
36		    (env, "Master record received on client"));		\
37		REP_PRINT_MESSAGE(env,					\
38		    eid, rp, "rep_process_message", 0);			\
39		ret = EINVAL;						\
40		goto errlock;						\
41	}								\
42} while (0)
43
44#define	CLIENT_ONLY(rep, rp) do {					\
45	if (!F_ISSET(rep, REP_F_CLIENT)) {				\
46		RPRINT(env, DB_VERB_REP_MSGS,				\
47		    (env, "Client record received on master"));		\
48		/*							\
49		 * Only broadcast DUPMASTER if leases are not		\
50		 * in effect.  If I am an old master, using		\
51		 * leases and I get a newer message, my leases		\
52		 * had better all be expired.				\
53		 */							\
54		if (IS_USING_LEASES(env))				\
55			DB_ASSERT(env,					\
56			    __rep_lease_check(env, 0) ==		\
57			    DB_REP_LEASE_EXPIRED);			\
58		else {							\
59			REP_PRINT_MESSAGE(env,				\
60			    eid, rp, "rep_process_message", 0);		\
61			(void)__rep_send_message(env,	 DB_EID_BROADCAST, \
62			    REP_DUPMASTER, NULL, NULL, 0, 0);		\
63		}							\
64		ret = DB_REP_DUPMASTER;					\
65		goto errlock;						\
66	}								\
67} while (0)
68
69/*
70 * If a client is attempting to service a request it does not have,
71 * call rep_skip_msg to skip this message and force a rerequest to the
72 * sender.  We don't hold the mutex for the stats and may miscount.
73 */
74#define	CLIENT_REREQ do {						\
75	if (F_ISSET(rep, REP_F_CLIENT)) {				\
76		STAT(rep->stat.st_client_svc_req++);			\
77		if (ret == DB_NOTFOUND) {				\
78			STAT(rep->stat.st_client_svc_miss++);		\
79			ret = __rep_skip_msg(env, rep, eid, rp->rectype);\
80		}							\
81	}								\
82} while (0)
83
84#define	MASTER_UPDATE(env, renv) do {					\
85	REP_SYSTEM_LOCK(env);						\
86	F_SET((renv), DB_REGENV_REPLOCKED);				\
87	(void)time(&(renv)->op_timestamp);				\
88	REP_SYSTEM_UNLOCK(env);					\
89} while (0)
90
91#define	RECOVERING_SKIP do {						\
92	if (IS_REP_CLIENT(env) && recovering) {			\
93		/* Not holding region mutex, may miscount */		\
94		STAT(rep->stat.st_msgs_recover++);			\
95		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
96		goto errlock;						\
97	}								\
98} while (0)
99
100/*
101 * If we're recovering the log we only want log records that are in the
102 * range we need to recover.  Otherwise we can end up storing a huge
103 * number of "new" records, only to truncate the temp database later after
104 * we run recovery.  If we are actively delaying a sync-up, we also skip
105 * all incoming log records until the application requests sync-up.
106 */
107#define	RECOVERING_LOG_SKIP do {					\
108	if (F_ISSET(rep, REP_F_DELAY) ||				\
109	    rep->master_id == DB_EID_INVALID ||				\
110	    (recovering &&						\
111	    (!F_ISSET(rep, REP_F_RECOVER_LOG) ||			\
112	     LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) {		\
113		/* Not holding region mutex, may miscount */		\
114		STAT(rep->stat.st_msgs_recover++);			\
115		ret = __rep_skip_msg(env, rep, eid, rp->rectype);	\
116		goto errlock;						\
117	}								\
118} while (0)
119
120#define	ANYSITE(rep)
121
122/*
123 * __rep_process_message --
124 *
125 * This routine takes an incoming message and processes it.
126 *
127 * control: contains the control fields from the record
128 * rec: contains the actual record
129 * eid: the environment id of the sender of the message;
130 * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
131 *	lsn of the maximum permanent or current not permanent log record
132 *	(respectively).
133 *
134 * PUBLIC: int __rep_process_message
135 * PUBLIC:      __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *));
136 */
137int
138__rep_process_message(dbenv, control, rec, eid, ret_lsnp)
139	DB_ENV *dbenv;
140	DBT *control, *rec;
141	int eid;
142	DB_LSN *ret_lsnp;
143{
144	DBT data_dbt;
145	DB_LOG *dblp;
146	DB_LSN last_lsn, lsn;
147	DB_REP *db_rep;
148	DB_THREAD_INFO *ip;
149	ENV *env;
150	LOG *lp;
151	REGENV *renv;
152	REGINFO *infop;
153	REP *rep;
154	REP_46_CONTROL *rp46;
155	REP_OLD_CONTROL *orp;
156	__rep_control_args *rp, tmprp;
157	__rep_egen_args egen_arg;
158	size_t len;
159	u_int32_t gen, rep_version;
160	int cmp, do_sync, lockout, recovering, ret, t_ret;
161	time_t savetime;
162	u_int8_t buf[__REP_MAXMSG_SIZE];
163
164	env = dbenv->env;
165
166	ENV_REQUIRES_CONFIG_XX(
167	    env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP);
168
169	/* Control argument must be non-Null. */
170	if (control == NULL || control->size == 0) {
171		__db_errx(env,
172	"DB_ENV->rep_process_message: control argument must be specified");
173		return (EINVAL);
174	}
175
176	if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) {
177		__db_errx(env,
178	"Environment not configured as replication master or client");
179		return (EINVAL);
180	}
181
182	if ((ret = __dbt_usercopy(env, control)) != 0 ||
183	    (ret = __dbt_usercopy(env, rec)) != 0) {
184		__dbt_userfree(env, control, rec, NULL);
185		__db_errx(env,
186	"DB_ENV->rep_process_message: error retrieving DBT contents");
187		return ret;
188	}
189
190	ret = 0;
191	do_sync = 0;
192	lockout = 0;
193	db_rep = env->rep_handle;
194	rep = db_rep->region;
195	dblp = env->lg_handle;
196	lp = dblp->reginfo.primary;
197	infop = env->reginfo;
198	renv = infop->primary;
199	/*
200	 * Casting this to REP_OLD_CONTROL is just kind of stylistic: the
201	 * rep_version field of course has to be in the same offset in all
202	 * versions in order for this to work.
203	 *
204	 * We can look at the rep_version unswapped here because if we're
205	 * talking to an old version, it will always be unswapped.  If
206	 * we're talking to a new version, the only issue is if it is
207	 * swapped and we take one of the old version conditionals
208	 * incorrectly.  The rep_version would need to be very, very
209	 * large for a swapped version to look like a small, older
210	 * version.  There is no problem here looking at it unswapped.
211	 */
212	rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version;
213	if (rep_version <= DB_REPVERSION_45) {
214		orp = (REP_OLD_CONTROL *)control->data;
215		if (rep_version == DB_REPVERSION_45 &&
216		    F_ISSET(orp, REPCTL_INIT_45)) {
217			F_CLR(orp, REPCTL_INIT_45);
218			F_SET(orp, REPCTL_INIT);
219		}
220		tmprp.rep_version = orp->rep_version;
221		tmprp.log_version = orp->log_version;
222		tmprp.lsn = orp->lsn;
223		tmprp.rectype = orp->rectype;
224		tmprp.gen = orp->gen;
225		tmprp.flags = orp->flags;
226		tmprp.msg_sec = 0;
227		tmprp.msg_nsec = 0;
228	} else if (rep_version == DB_REPVERSION_46) {
229		rp46 = (REP_46_CONTROL *)control->data;
230		tmprp.rep_version = rp46->rep_version;
231		tmprp.log_version = rp46->log_version;
232		tmprp.lsn = rp46->lsn;
233		tmprp.rectype = rp46->rectype;
234		tmprp.gen = rp46->gen;
235		tmprp.flags = rp46->flags;
236		tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec;
237		tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec;
238	} else
239		if ((ret = __rep_control_unmarshal(env, &tmprp,
240		    control->data, control->size, NULL)) != 0)
241			return (ret);
242	rp = &tmprp;
243	if (ret_lsnp != NULL)
244		ZERO_LSN(*ret_lsnp);
245
246	ENV_ENTER(env, ip);
247
248	REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
249	/*
250	 * Check the version number for both rep and log.  If it is
251	 * an old version we support, convert it.  Otherwise complain.
252	 */
253	if (rp->rep_version < DB_REPVERSION) {
254		if (rp->rep_version < DB_REPVERSION_MIN) {
255			__db_errx(env,
256 "unsupported old replication message version %lu, minimum version %d",
257			    (u_long)rp->rep_version, DB_REPVERSION_MIN);
258			ret = EINVAL;
259			goto errlock;
260		}
261		RPRINT(env, DB_VERB_REP_MSGS, (env,
262		    "Received record %lu with old rep version %lu",
263		    (u_long)rp->rectype, (u_long)rp->rep_version));
264		rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype);
265		DB_ASSERT(env, rp->rectype != REP_INVALID);
266		/*
267		 * We should have a valid new record type for all the old
268		 * versions.
269		 */
270		RPRINT(env, DB_VERB_REP_MSGS, (env,
271		    "Converted to record %lu with old rep version %lu",
272		    (u_long)rp->rectype, (u_long)rp->rep_version));
273	} else if (rp->rep_version > DB_REPVERSION) {
274		__db_errx(env,
275		    "unexpected replication message version %lu, expected %d",
276		    (u_long)rp->rep_version, DB_REPVERSION);
277		ret = EINVAL;
278		goto errlock;
279	}
280
281	if (rp->log_version < DB_LOGVERSION) {
282		if (rp->log_version < DB_LOGVERSION_MIN) {
283			__db_errx(env,
284 "unsupported old replication log version %lu, minimum version %d",
285			    (u_long)rp->log_version, DB_LOGVERSION_MIN);
286			ret = EINVAL;
287			goto errlock;
288		}
289		RPRINT(env, DB_VERB_REP_MSGS, (env,
290		    "Received record %lu with old log version %lu",
291		    (u_long)rp->rectype, (u_long)rp->log_version));
292	} else if (rp->log_version > DB_LOGVERSION) {
293		__db_errx(env,
294		    "unexpected log record version %lu, expected %d",
295		    (u_long)rp->log_version, DB_LOGVERSION);
296		ret = EINVAL;
297		goto errlock;
298	}
299
300	/*
301	 * Acquire the replication lock.
302	 */
303	REP_SYSTEM_LOCK(env);
304	if (F_ISSET(rep, REP_F_READY_MSG)) {
305		/*
306		 * If we're racing with a thread in rep_start, then
307		 * just ignore the message and return.
308		 */
309		RPRINT(env, DB_VERB_REP_MSGS, (env,
310		    "Racing replication msg lockout, ignore message."));
311		if (F_ISSET(rp, REPCTL_PERM))
312			ret = DB_REP_IGNORE;
313		REP_SYSTEM_UNLOCK(env);
314		/*
315		 * If another client has sent a c2c request to us, it may be a
316		 * long time before it resends the request (due to its dual data
317		 * streams avoidance heuristic); let it know we can't serve the
318		 * request just now.
319		 */
320		if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) {
321			STAT(rep->stat.st_client_svc_req++);
322			STAT(rep->stat.st_client_svc_miss++);
323			(void)__rep_send_message(env,
324			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
325		}
326		goto out;
327	}
328	rep->msg_th++;
329	gen = rep->gen;
330	recovering = F_ISSET(rep, REP_F_RECOVER_MASK);
331	savetime = renv->rep_timestamp;
332
333	STAT(rep->stat.st_msgs_processed++);
334	REP_SYSTEM_UNLOCK(env);
335
336	/*
337	 * Check for lease configuration matching.  Leases must be
338	 * configured all or none.  If I am a client and I receive a
339	 * message requesting a lease, and I'm not using leases, that
340	 * is an error.
341	 */
342	if (!IS_USING_LEASES(env) &&
343	    (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) {
344		__db_errx(env,
345		    "Inconsistent lease configuration");
346		RPRINT(env, DB_VERB_REP_MSGS, (env,
347		    "Client received lease message and not using leases"));
348		ret = EINVAL;
349		ret = __env_panic(env, ret);
350		goto errlock;
351	}
352
353	/*
354	 * Check for generation number matching.  Ignore any old messages
355	 * except requests that are indicative of a new client that needs
356	 * to get in sync.
357	 */
358	if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
359	    rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
360	    rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) {
361		/*
362		 * We don't hold the rep mutex, and could miscount if we race.
363		 */
364		STAT(rep->stat.st_msgs_badgen++);
365		if (F_ISSET(rp, REPCTL_PERM))
366			ret = DB_REP_IGNORE;
367		goto errlock;
368	}
369
370	if (rp->gen > gen) {
371		/*
372		 * If I am a master and am out of date with a lower generation
373		 * number, I am in bad shape and should downgrade.
374		 */
375		if (F_ISSET(rep, REP_F_MASTER)) {
376			STAT(rep->stat.st_dupmasters++);
377			ret = DB_REP_DUPMASTER;
378			/*
379			 * Only broadcast DUPMASTER if leases are not
380			 * in effect.  If I am an old master, using
381			 * leases and I get a newer message, my leases
382			 * had better all be expired.
383			 */
384			if (IS_USING_LEASES(env))
385				DB_ASSERT(env,
386				    __rep_lease_check(env, 0) ==
387				    DB_REP_LEASE_EXPIRED);
388			else if (rp->rectype != REP_DUPMASTER)
389				(void)__rep_send_message(env,
390				    DB_EID_BROADCAST, REP_DUPMASTER,
391				    NULL, NULL, 0, 0);
392			goto errlock;
393		}
394
395		/*
396		 * I am a client and am out of date.  If this is an election,
397		 * or a response from the first site I contacted, then I can
398		 * accept the generation number and participate in future
399		 * elections and communication. Otherwise, I need to hear about
400		 * a new master and sync up.
401		 */
402		if (rp->rectype == REP_ALIVE ||
403		    rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
404			REP_SYSTEM_LOCK(env);
405			RPRINT(env, DB_VERB_REP_MSGS, (env,
406			    "Updating gen from %lu to %lu",
407			    (u_long)gen, (u_long)rp->gen));
408			rep->master_id = DB_EID_INVALID;
409			gen = rep->gen = rp->gen;
410			/*
411			 * Updating of egen will happen when we process the
412			 * message below for each message type.
413			 */
414			REP_SYSTEM_UNLOCK(env);
415			if (rp->rectype == REP_ALIVE)
416				(void)__rep_send_message(env,
417				    DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
418				    NULL, 0, 0);
419		} else if (rp->rectype != REP_NEWMASTER) {
420			/*
421			 * Ignore this message, retransmit if needed.
422			 */
423			if (__rep_check_doreq(env, rep))
424				(void)__rep_send_message(env,
425				    DB_EID_BROADCAST, REP_MASTER_REQ,
426				    NULL, NULL, 0, 0);
427			goto errlock;
428		}
429		/*
430		 * If you get here, then you're a client and either you're
431		 * in an election or you have a NEWMASTER or an ALIVE message
432		 * whose processing will do the right thing below.
433		 */
434	}
435
436	/*
437	 * If the sender is part of an established group, so are we now.
438	 */
439	if (F_ISSET(rp, REPCTL_GROUP_ESTD)) {
440		REP_SYSTEM_LOCK(env);
441#ifdef	DIAGNOSTIC
442		if (!F_ISSET(rep, REP_F_GROUP_ESTD))
443			RPRINT(env, DB_VERB_REP_MSGS, (env,
444			    "I am now part of an established group"));
445#endif
446		F_SET(rep, REP_F_GROUP_ESTD);
447		REP_SYSTEM_UNLOCK(env);
448	}
449
450	/*
451	 * We need to check if we're in recovery and if we are
452	 * then we need to ignore any messages except VERIFY*, VOTE*,
453	 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
454	 * PAGE* and FILE*.  We need to also accept LOG messages
455	 * if we're copying the log for recovery/backup.
456	 */
457	switch (rp->rectype) {
458	case REP_ALIVE:
459		/*
460		 * Handle even if we're recovering.
461		 */
462		ANYSITE(rep);
463		if (rp->rep_version < DB_REPVERSION_47)
464			egen_arg.egen = *(u_int32_t *)rec->data;
465		else if ((ret = __rep_egen_unmarshal(env, &egen_arg,
466		    rec->data, rec->size, NULL)) != 0)
467			return (ret);
468		REP_SYSTEM_LOCK(env);
469		RPRINT(env, DB_VERB_REP_MSGS, (env,
470		    "Received ALIVE egen of %lu, mine %lu",
471		    (u_long)egen_arg.egen, (u_long)rep->egen));
472		if (egen_arg.egen > rep->egen) {
473			/*
474			 * We're changing egen, need to clear out any old
475			 * election information.  We need to set the
476			 * REP_F_EGENUPDATE flag here so that any thread
477			 * waiting in rep_elect/rep_wait can distinguish
478			 * this situation (and restart its election) from
479			 * a current master saying it is still master and
480			 * the egen getting incremented on that path.
481			 */
482			__rep_elect_done(env, rep, 0);
483			rep->egen = egen_arg.egen;
484			F_SET(rep, REP_F_EGENUPDATE);
485		}
486		REP_SYSTEM_UNLOCK(env);
487		break;
488	case REP_ALIVE_REQ:
489		/*
490		 * Handle even if we're recovering.
491		 */
492		ANYSITE(rep);
493		LOG_SYSTEM_LOCK(env);
494		lsn = lp->lsn;
495		LOG_SYSTEM_UNLOCK(env);
496#ifdef	CONFIG_TEST
497		/*
498		 * Send this first, before the ALIVE message because of the
499		 * way the test suite and messaging is done sequentially.
500		 * In some sequences it is possible to get into a situation
501		 * where the test suite cannot get the later NEWMASTER because
502		 * we break out of the messaging loop too early.
503		 */
504		if (F_ISSET(rep, REP_F_MASTER))
505			(void)__rep_send_message(env,
506			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
507#endif
508		REP_SYSTEM_LOCK(env);
509		egen_arg.egen = rep->egen;
510		REP_SYSTEM_UNLOCK(env);
511		if (rep->version < DB_REPVERSION_47)
512			DB_INIT_DBT(data_dbt, &egen_arg.egen,
513			    sizeof(egen_arg.egen));
514		else {
515			if ((ret = __rep_egen_marshal(env,
516			    &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0)
517				goto errlock;
518			DB_INIT_DBT(data_dbt, buf, len);
519		}
520		(void)__rep_send_message(env,
521		    eid, REP_ALIVE, &lsn, &data_dbt, 0, 0);
522		break;
523	case REP_ALL_REQ:
524		RECOVERING_SKIP;
525		ret = __rep_allreq(env, rp, eid);
526		CLIENT_REREQ;
527		break;
528	case REP_BULK_LOG:
529		RECOVERING_LOG_SKIP;
530		CLIENT_ONLY(rep, rp);
531		ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp);
532		break;
533	case REP_BULK_PAGE:
534		/*
535		 * Handle even if we're recovering.
536		 */
537		CLIENT_ONLY(rep, rp);
538		ret = __rep_bulk_page(env, ip, eid, rp, rec);
539		break;
540	case REP_DUPMASTER:
541		/*
542		 * Handle even if we're recovering.
543		 */
544		if (F_ISSET(rep, REP_F_MASTER))
545			ret = DB_REP_DUPMASTER;
546		break;
547#ifdef NOTYET
548	case REP_FILE: /* TODO */
549		CLIENT_ONLY(rep, rp);
550		break;
551	case REP_FILE_REQ:
552		ret = __rep_send_file(env, rec, eid);
553		break;
554#endif
555	case REP_FILE_FAIL:
556		/*
557		 * Handle even if we're recovering.
558		 */
559		CLIENT_ONLY(rep, rp);
560		/*
561		 * XXX
562		 */
563		break;
564	case REP_LEASE_GRANT:
565		/*
566		 * Handle even if we're recovering.
567		 */
568		MASTER_ONLY(rep, rp);
569		ret = __rep_lease_grant(env, rp, rec, eid);
570		break;
571	case REP_LOG:
572	case REP_LOG_MORE:
573		RECOVERING_LOG_SKIP;
574		CLIENT_ONLY(rep, rp);
575		ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp);
576		break;
577	case REP_LOG_REQ:
578		RECOVERING_SKIP;
579		if (F_ISSET(rp, REPCTL_INIT))
580			MASTER_UPDATE(env, renv);
581		ret = __rep_logreq(env, rp, rec, eid);
582		CLIENT_REREQ;
583		break;
584	case REP_NEWSITE:
585		/*
586		 * Handle even if we're recovering.
587		 */
588		/* We don't hold the rep mutex, and may miscount. */
589		STAT(rep->stat.st_newsites++);
590
591		/* This is a rebroadcast; simply tell the application. */
592		if (F_ISSET(rep, REP_F_MASTER)) {
593			dblp = env->lg_handle;
594			lp = dblp->reginfo.primary;
595			LOG_SYSTEM_LOCK(env);
596			lsn = lp->lsn;
597			LOG_SYSTEM_UNLOCK(env);
598			(void)__rep_send_message(env,
599			    eid, REP_NEWMASTER, &lsn, NULL, 0, 0);
600			if (IS_USING_LEASES(env))
601				(void)__rep_lease_refresh(env);
602		}
603		ret = DB_REP_NEWSITE;
604		break;
605	case REP_NEWCLIENT:
606		/*
607		 * Handle even if we're recovering.
608		 */
609		/*
610		 * This message was received and should have resulted in the
611		 * application entering the machine ID in its machine table.
612		 * We respond to this with an ALIVE to send relevant information
613		 * to the new client (if we are a master, we'll send a
614		 * NEWMASTER, so we only need to send the ALIVE if we're a
615		 * client).  But first, broadcast the new client's record to
616		 * all the clients.
617		 */
618		(void)__rep_send_message(env,
619		    DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
620
621		ret = DB_REP_NEWSITE;
622
623		if (F_ISSET(rep, REP_F_CLIENT)) {
624			REP_SYSTEM_LOCK(env);
625			egen_arg.egen = rep->egen;
626
627			/*
628			 * Clean up any previous master remnants by making
629			 * master_id invalid and cleaning up any internal
630			 * init that was in progress.
631			 */
632			if (eid == rep->master_id) {
633				rep->master_id = DB_EID_INVALID;
634
635				/*
636				 * Already locking out messages, must be
637				 * in sync-up recover or internal init,
638				 * give up.
639				 */
640				if (F_ISSET(rep, REP_F_READY_MSG))
641					goto errhlk;
642
643				/*
644				 * Lock out other messages to prevent race
645				 * conditions.
646				 */
647				if ((t_ret =
648				    __rep_lockout_msg(env, rep, 1)) != 0) {
649					ret = t_ret;
650					goto errhlk;
651				}
652				lockout = 1;
653
654				/*
655				 * Need mtx_clientdb to safely clean up
656				 * page database in __rep_init_cleanup().
657				 */
658				REP_SYSTEM_UNLOCK(env);
659				MUTEX_LOCK(env, rep->mtx_clientdb);
660				REP_SYSTEM_LOCK(env);
661
662				/*
663				 * Clean up internal init if one was in
664				 * progress.
665				 */
666				if (F_ISSET(rep, REP_F_READY_API |
667				    REP_F_READY_OP)) {
668					RPRINT(env, DB_VERB_REP_MSGS, (env,
669    "NEWCLIENT is cleaning up old internal init for invalid master"));
670					t_ret = __rep_init_cleanup(env,
671					    rep, DB_FORCE);
672					F_CLR(rep, REP_F_RECOVER_MASK);
673				}
674				MUTEX_UNLOCK(env, rep->mtx_clientdb);
675				if (t_ret != 0) {
676					ret = t_ret;
677					RPRINT(env, DB_VERB_REP_MSGS, (env,
678    "NEWCLIENT error cleaning up internal init for invalid master: %d", ret));
679					goto errhlk;
680				}
681				F_CLR(rep, REP_F_READY_MSG);
682				lockout = 0;
683			}
684			REP_SYSTEM_UNLOCK(env);
685			if (rep->version < DB_REPVERSION_47)
686				DB_INIT_DBT(data_dbt, &egen_arg.egen,
687				    sizeof(egen_arg.egen));
688			else {
689				if ((ret = __rep_egen_marshal(env, &egen_arg,
690				    buf, __REP_EGEN_SIZE, &len)) != 0)
691					goto errlock;
692				DB_INIT_DBT(data_dbt, buf, len);
693			}
694			(void)__rep_send_message(env, DB_EID_BROADCAST,
695			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
696			break;
697		}
698		/* FALLTHROUGH */
699	case REP_MASTER_REQ:
700		RECOVERING_SKIP;
701		if (F_ISSET(rep, REP_F_MASTER)) {
702			LOG_SYSTEM_LOCK(env);
703			lsn = lp->lsn;
704			LOG_SYSTEM_UNLOCK(env);
705			(void)__rep_send_message(env,
706			    DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
707			if (IS_USING_LEASES(env))
708				(void)__rep_lease_refresh(env);
709		}
710		/*
711		 * If there is no master, then we could get into a state
712		 * where an old client lost the initial ALIVE message and
713		 * is calling an election under an old gen and can
714		 * never get to the current gen.
715		 */
716		if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
717			REP_SYSTEM_LOCK(env);
718			egen_arg.egen = rep->egen;
719			if (eid == rep->master_id)
720				rep->master_id = DB_EID_INVALID;
721			REP_SYSTEM_UNLOCK(env);
722			if (rep->version < DB_REPVERSION_47)
723				DB_INIT_DBT(data_dbt, &egen_arg.egen,
724				    sizeof(egen_arg.egen));
725			else {
726				if ((ret = __rep_egen_marshal(env, &egen_arg,
727				    buf, __REP_EGEN_SIZE, &len)) != 0)
728					goto errlock;
729				DB_INIT_DBT(data_dbt, buf, len);
730			}
731			(void)__rep_send_message(env, eid,
732			    REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
733		}
734		break;
735	case REP_NEWFILE:
736		RECOVERING_LOG_SKIP;
737		CLIENT_ONLY(rep, rp);
738		ret = __rep_apply(env,
739		     ip, rp, rec, ret_lsnp, NULL, &last_lsn);
740		if (ret == DB_REP_LOGREADY)
741			ret = __rep_logready(env, rep, savetime, &last_lsn);
742		break;
743	case REP_NEWMASTER:
744		/*
745		 * Handle even if we're recovering.
746		 */
747		ANYSITE(rep);
748		if (F_ISSET(rep, REP_F_MASTER) &&
749		    eid != rep->eid) {
750			/* We don't hold the rep mutex, and may miscount. */
751			STAT(rep->stat.st_dupmasters++);
752			ret = DB_REP_DUPMASTER;
753			if (IS_USING_LEASES(env))
754				DB_ASSERT(env,
755				    __rep_lease_check(env, 0) ==
756				    DB_REP_LEASE_EXPIRED);
757			else
758				(void)__rep_send_message(env,
759				    DB_EID_BROADCAST, REP_DUPMASTER,
760				    NULL, NULL, 0, 0);
761			break;
762		}
763		if ((ret =
764		    __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER)
765			ret = __rep_fire_newmaster(env, rp->gen, eid);
766		break;
767	case REP_PAGE:
768	case REP_PAGE_MORE:
769		/*
770		 * Handle even if we're recovering.
771		 */
772		CLIENT_ONLY(rep, rp);
773		ret = __rep_page(env, ip, eid, rp, rec);
774		if (ret == DB_REP_PAGEDONE)
775			ret = 0;
776		break;
777	case REP_PAGE_FAIL:
778		/*
779		 * Handle even if we're recovering.
780		 */
781		CLIENT_ONLY(rep, rp);
782		ret = __rep_page_fail(env, ip, eid, rp, rec);
783		break;
784	case REP_PAGE_REQ:
785		RECOVERING_SKIP;
786		MASTER_UPDATE(env, renv);
787		ret = __rep_page_req(env, ip, eid, rp, rec);
788		CLIENT_REREQ;
789		break;
790	case REP_REREQUEST:
791		/*
792		 * Handle even if we're recovering.  Don't do a master
793		 * check.
794		 */
795		CLIENT_ONLY(rep, rp);
796		/*
797		 * Don't hold any mutex, may miscount.
798		 */
799		STAT(rep->stat.st_client_rerequests++);
800		ret = __rep_resend_req(env, 1);
801		break;
802	case REP_START_SYNC:
803		RECOVERING_SKIP;
804		MUTEX_LOCK(env, rep->mtx_clientdb);
805		cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
806		/*
807		 * The comparison needs to be <= because the LSN in
808		 * the message can be the LSN of the first outstanding
809		 * txn, which may be the LSN immediately after the
810		 * previous commit.  The ready_lsn is the LSN of the
811		 * next record expected.  In that case, the LSNs
812		 * could be equal and the client has the commit and
813		 * wants to sync. [SR #15338]
814		 */
815		if (cmp <= 0) {
816			MUTEX_UNLOCK(env, rep->mtx_clientdb);
817			do_sync = 1;
818		} else {
819			STAT(rep->stat.st_startsync_delayed++);
820			/*
821			 * There are cases where keeping the first ckp_lsn
822			 * LSN is advantageous and cases where keeping
823			 * a later LSN is better.  If random, earlier
824			 * log records are missing, keeping the later
825			 * LSN seems to be better.  That is what we'll
826			 * do for now.
827			 */
828			if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0)
829				rep->ckp_lsn = rp->lsn;
830			RPRINT(env, DB_VERB_REP_MSGS, (env,
831    "Delayed START_SYNC memp_sync due to missing records."));
832			RPRINT(env, DB_VERB_REP_MSGS, (env,
833    "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]",
834		    (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset,
835		    (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset));
836			MUTEX_UNLOCK(env, rep->mtx_clientdb);
837		}
838		break;
839	case REP_UPDATE:
840		/*
841		 * Handle even if we're recovering.
842		 */
843		CLIENT_ONLY(rep, rp);
844		ret = __rep_update_setup(env, eid, rp, rec);
845		break;
846	case REP_UPDATE_REQ:
847		/*
848		 * Handle even if we're recovering.
849		 */
850		MASTER_ONLY(rep, rp);
851		infop = env->reginfo;
852		renv = infop->primary;
853		MASTER_UPDATE(env, renv);
854		ret = __rep_update_req(env, rp, eid);
855		break;
856	case REP_VERIFY:
857		if (recovering) {
858			MUTEX_LOCK(env, rep->mtx_clientdb);
859			cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn);
860			MUTEX_UNLOCK(env, rep->mtx_clientdb);
861			/*
862			 * If this is not the verify record I want, skip it.
863			 */
864			if (cmp != 0) {
865				ret = __rep_skip_msg(
866				    env, rep, eid, rp->rectype);
867				break;
868			}
869		}
870		CLIENT_ONLY(rep, rp);
871		ret = __rep_verify(env, rp, rec, eid, savetime);
872		break;
873	case REP_VERIFY_FAIL:
874		/*
875		 * Handle even if we're recovering.
876		 */
877		CLIENT_ONLY(rep, rp);
878		ret = __rep_verify_fail(env, rp, eid);
879		break;
880	case REP_VERIFY_REQ:
881		RECOVERING_SKIP;
882		ret = __rep_verify_req(env, rp, eid);
883		CLIENT_REREQ;
884		break;
885	case REP_VOTE1:
886		/*
887		 * Handle even if we're recovering.
888		 */
889		ret = __rep_vote1(env, rp, rec, eid);
890		break;
891	case REP_VOTE2:
892		/*
893		 * Handle even if we're recovering.
894		 */
895		ret = __rep_vote2(env, rp, rec, eid);
896		break;
897	default:
898		__db_errx(env,
899	"DB_ENV->rep_process_message: unknown replication message: type %lu",
900		   (u_long)rp->rectype);
901		ret = EINVAL;
902		break;
903	}
904
905errlock:
906	REP_SYSTEM_LOCK(env);
907errhlk:	if (lockout)
908		F_CLR(rep, REP_F_READY_MSG);
909	rep->msg_th--;
910	REP_SYSTEM_UNLOCK(env);
911	if (do_sync) {
912		MUTEX_LOCK(env, rep->mtx_ckp);
913		lsn = rp->lsn;
914		/*
915		 * This is the REP_START_SYNC sync, and so we permit it to be
916		 * interrupted.
917		 */
918		ret = __memp_sync(
919		    env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn);
920		MUTEX_UNLOCK(env, rep->mtx_ckp);
921		RPRINT(env, DB_VERB_REP_MSGS,
922		    (env, "ALIVE: Completed sync [%lu][%lu]",
923		    (u_long)lsn.file, (u_long)lsn.offset));
924	}
925out:
926	if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
927		if (ret_lsnp != NULL)
928			*ret_lsnp = rp->lsn;
929		ret = DB_REP_NOTPERM;
930	}
931	__dbt_userfree(env, control, rec, NULL);
932	ENV_LEAVE(env, ip);
933	return (ret);
934}
935
936/*
937 * __rep_apply --
938 *
939 * Handle incoming log records on a client, applying when possible and
940 * entering into the bookkeeping table otherwise.  This routine manages
941 * the state of the incoming message stream -- processing records, via
942 * __rep_process_rec, when possible and enqueuing in the __db.rep.db
943 * when necessary.  As gaps in the stream are filled in, this is where
944 * we try to process as much as possible from __db.rep.db to catch up.
945 *
946 * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *,
947 * PUBLIC:     DBT *, DB_LSN *, int *, DB_LSN *));
948 */
949int
950__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp)
951	ENV *env;
952	DB_THREAD_INFO *ip;
953	__rep_control_args *rp;
954	DBT *rec;
955	DB_LSN *ret_lsnp;
956	int *is_dupp;
957	DB_LSN *last_lsnp;
958{
959	DB *dbp;
960	DBT control_dbt, key_dbt;
961	DBT rec_dbt;
962	DB_LOG *dblp;
963	DB_LSN max_lsn, save_lsn;
964	DB_REP *db_rep;
965	LOG *lp;
966	REP *rep;
967	db_timespec msg_time, max_ts;
968	u_int32_t gen;
969	int cmp, event, master, ret, set_apply, t_ret;
970
971	COMPQUIET(gen, 0);
972	COMPQUIET(master, DB_EID_INVALID);
973
974	db_rep = env->rep_handle;
975	rep = db_rep->region;
976	event = ret = set_apply = 0;
977	memset(&control_dbt, 0, sizeof(control_dbt));
978	memset(&rec_dbt, 0, sizeof(rec_dbt));
979	ZERO_LSN(max_lsn);
980	timespecclear(&max_ts);
981	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
982	cmp = -2;		/* OOB value that LOG_COMPARE can't return. */
983
984	dblp = env->lg_handle;
985	MUTEX_LOCK(env, rep->mtx_clientdb);
986	/*
987	 * Lazily open the temp db.  Always set the startup flag to 0
988	 * because it was initialized from rep_start.
989	 */
990	if (db_rep->rep_db == NULL &&
991	    (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
992		MUTEX_UNLOCK(env, rep->mtx_clientdb);
993		goto out;
994	}
995	dbp = db_rep->rep_db;
996	lp = dblp->reginfo.primary;
997	REP_SYSTEM_LOCK(env);
998	if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
999	    LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0)
1000		lp->ready_lsn = rep->first_lsn;
1001	cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn);
1002	/*
1003	 * If we are going to skip or process any message other
1004	 * than a duplicate, make note of it if we're in an
1005	 * election so that the election can rerequest proactively.
1006	 */
1007	if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0)
1008		F_SET(rep, REP_F_SKIPPED_APPLY);
1009
1010	/*
1011	 * If we're in the middle of processing a NEWFILE, we've dropped
1012	 * the mutex and if this matches it is a duplicate record.  We
1013	 * do not want this call taking the "matching" code below because
1014	 * we may then process later records in the temp db and the
1015	 * original NEWFILE may not have the log file ready.  It will
1016	 * process those temp db items when it completes.
1017	 */
1018	if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0)
1019		cmp = -1;
1020
1021	if (cmp == 0) {
1022		/*
1023		 * If we are in an election (i.e. we've sent a vote
1024		 * with an LSN in it), then we drop the next record
1025		 * we're expecting.  When we find a master, we'll
1026		 * either go into sync, or if it was an existing
1027		 * master, rerequest this one record (later records
1028		 * are accumulating in the temp db).
1029		 *
1030		 * We can simply return here, and rep_process_message
1031		 * will set NOTPERM if necessary for this record.
1032		 */
1033		if (F_ISSET(rep, REP_F_READY_APPLY)) {
1034			/*
1035			 * We will simply return now.  All special return
1036			 * processing should be ignored because the special
1037			 * values are just initialized.  Variables like
1038			 * max_lsn are still 0.
1039			 */
1040			RPRINT(env, DB_VERB_REP_MISC, (env,
1041			    "rep_apply: In election. Ignoring [%lu][%lu]",
1042			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1043			REP_SYSTEM_UNLOCK(env);
1044			MUTEX_UNLOCK(env, rep->mtx_clientdb);
1045			goto out;
1046		}
1047		rep->apply_th++;
1048		set_apply = 1;
1049		RPRINT(env, DB_VERB_REP_MISC, (env,
1050		    "rep_apply: Set apply_th %d", rep->apply_th));
1051		REP_SYSTEM_UNLOCK(env);
1052		if ((ret = __rep_process_rec(env, ip,
1053		    rp, rec, &max_ts, &max_lsn)) != 0)
1054			goto err;
1055		/*
1056		 * If we get the record we are expecting, reset
1057		 * the count of records we've received and are applying
1058		 * towards the request interval.
1059		 */
1060		__os_gettime(env, &lp->rcvd_ts, 1);
1061		ZERO_LSN(lp->max_wait_lsn);
1062
1063		while (ret == 0 &&
1064		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
1065			/*
1066			 * We just filled in a gap in the log record stream.
1067			 * Write subsequent records to the log.
1068			 */
1069gap_check:
1070			if ((ret = __rep_remfirst(env, ip,
1071			     &control_dbt, &rec_dbt)) != 0)
1072				goto err;
1073
1074			rp = (__rep_control_args *)control_dbt.data;
1075			timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1076			rec = &rec_dbt;
1077			if ((ret = __rep_process_rec(env, ip,
1078			    rp, rec, &max_ts, &max_lsn)) != 0)
1079				goto err;
1080
1081			--rep->stat.st_log_queued;
1082
1083			/*
1084			 * Since we just filled a gap in the log stream, and
1085			 * we're writing subsequent records to the log, we want
1086			 * to use rcvd_ts and wait_ts so that we will
1087			 * request the next gap if we end up with a gap and
1088			 * not so recent records in the temp db, but not
1089			 * request if recent records are in the temp db and
1090			 * likely to arrive on its own shortly.  We want to
1091			 * avoid requesting the record in that case.  Also
1092			 * reset max_wait_lsn because the next gap is a
1093			 * fresh gap.
1094			 */
1095			lp->rcvd_ts = lp->last_ts;
1096			lp->wait_ts = rep->request_gap;
1097			if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) {
1098				__os_gettime(env, &lp->rcvd_ts, 1);
1099				ret = 0;
1100				break;
1101			} else if (ret != 0)
1102				goto err;
1103		}
1104
1105		/*
1106		 * Check if we're at a gap in the table and if so, whether we
1107		 * need to ask for any records.
1108		 */
1109		if (!IS_ZERO_LSN(lp->waiting_lsn) &&
1110		    LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
1111			/*
1112			 * We got a record and processed it, but we may
1113			 * still be waiting for more records.  If we
1114			 * filled a gap we keep a count of how many other
1115			 * records are in the temp database and if we should
1116			 * request the next gap at this time.
1117			 */
1118			if (__rep_check_doreq(env, rep) && (ret =
1119			    __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0)
1120				goto err;
1121		} else {
1122			lp->wait_ts = rep->request_gap;
1123			ZERO_LSN(lp->max_wait_lsn);
1124		}
1125
1126	} else if (cmp > 0) {
1127		/*
1128		 * The LSN is higher than the one we were waiting for.
1129		 * This record isn't in sequence; add it to the temporary
1130		 * database, update waiting_lsn if necessary, and perform
1131		 * calculations to determine if we should issue requests
1132		 * for new records.
1133		 */
1134		REP_SYSTEM_UNLOCK(env);
1135		memset(&key_dbt, 0, sizeof(key_dbt));
1136		key_dbt.data = rp;
1137		key_dbt.size = sizeof(*rp);
1138		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
1139		if (ret == 0) {
1140			rep->stat.st_log_queued++;
1141			__os_gettime(env, &lp->last_ts, 1);
1142#ifdef HAVE_STATISTICS
1143			STAT(rep->stat.st_log_queued_total++);
1144			if (rep->stat.st_log_queued_max <
1145			    rep->stat.st_log_queued)
1146				rep->stat.st_log_queued_max =
1147				    rep->stat.st_log_queued;
1148#endif
1149		}
1150
1151		if (ret == DB_KEYEXIST)
1152			ret = 0;
1153		if (ret != 0)
1154			goto done;
1155
1156		if (IS_ZERO_LSN(lp->waiting_lsn) ||
1157		    LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0)
1158			lp->waiting_lsn = rp->lsn;
1159
1160		if (__rep_check_doreq(env, rep) &&
1161		    (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0))
1162			goto err;
1163
1164		/*
1165		 * If this is permanent; let the caller know that we have
1166		 * not yet written it to disk, but we've accepted it.
1167		 */
1168		if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) {
1169			max_lsn = rp->lsn;
1170			ret = DB_REP_NOTPERM;
1171		}
1172		goto done;
1173	} else {
1174		STAT(rep->stat.st_log_duplicated++);
1175		REP_SYSTEM_UNLOCK(env);
1176		if (is_dupp != NULL)
1177			*is_dupp = 1;
1178		if (F_ISSET(rp, REPCTL_PERM))
1179			max_lsn = lp->max_perm_lsn;
1180		/*
1181		 * We check REPCTL_LEASE here, because this client may
1182		 * have leases configured but the master may not (especially
1183		 * in a mixed version group.  If the master has leases
1184		 * configured, all clients must also.
1185		 */
1186		if (IS_USING_LEASES(env) &&
1187		    F_ISSET(rp, REPCTL_LEASE) &&
1188		    timespecisset(&msg_time)) {
1189			if (timespeccmp(&msg_time, &lp->max_lease_ts, >))
1190				max_ts = msg_time;
1191			else
1192				max_ts = lp->max_lease_ts;
1193		}
1194		goto done;
1195	}
1196
1197	/* Check if we need to go back into the table. */
1198	if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0)
1199		goto gap_check;
1200
1201done:
1202err:	/*
1203	 * In case of a race, to make sure only one thread can get
1204	 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to
1205	 * this point.
1206	 */
1207	REP_SYSTEM_LOCK(env);
1208	if (ret == 0 &&
1209	    F_ISSET(rep, REP_F_RECOVER_LOG) &&
1210	    !IS_ZERO_LSN(rep->last_lsn) &&
1211	    LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) {
1212		*last_lsnp = max_lsn;
1213		ZERO_LSN(rep->last_lsn);
1214		ZERO_LSN(max_lsn);
1215		ret = DB_REP_LOGREADY;
1216	}
1217	/*
1218	 * Only decrement if we were actually applying log records.
1219	 * We do not care if we processed a dup record or put one
1220	 * in the temp db.
1221	 */
1222	if (set_apply) {
1223		rep->apply_th--;
1224		RPRINT(env, DB_VERB_REP_MISC, (env,
1225		    "rep_apply: Decrement apply_th %d", rep->apply_th));
1226	}
1227
1228	if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
1229	    !IS_ZERO_LSN(max_lsn)) {
1230		if (ret_lsnp != NULL)
1231			*ret_lsnp = max_lsn;
1232		ret = DB_REP_ISPERM;
1233		DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0);
1234		lp->max_perm_lsn = max_lsn;
1235	}
1236
1237	/*
1238	 * Start-up is complete when we process (or have already processed) up
1239	 * to the end of the replication group's log.  In case we miss that
1240	 * message, as a back-up, we also recognize start-up completion when we
1241	 * actually process a live log record.  Having cmp==0 here (with a good
1242	 * "ret" value) implies we actually processed the record.
1243	 */
1244	if ((ret == 0 || ret == DB_REP_ISPERM) &&
1245	    rep->stat.st_startup_complete == 0 &&
1246	    !F_ISSET(rep, REP_F_RECOVER_LOG) &&
1247	    ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) ||
1248	    (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) {
1249		rep->stat.st_startup_complete = 1;
1250		event = 1;
1251		gen = rep->gen;
1252		master = rep->master_id;
1253	}
1254	REP_SYSTEM_UNLOCK(env);
1255	/*
1256	 * If we've processed beyond the needed LSN for a pending
1257	 * start sync, start it now.  We can compare >= here
1258	 * because ready_lsn is the next record we expect.
1259	 * Since ckp_lsn can point to the last commit record itself,
1260	 * but if it does and ready_lsn == commit (i.e. we haven't
1261	 * written the commit yet), we can still start to sync
1262	 * because we're guaranteed no additional buffers can
1263	 * be dirtied.
1264	 */
1265	if (!IS_ZERO_LSN(rep->ckp_lsn) &&
1266	    LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) {
1267		save_lsn = rep->ckp_lsn;
1268		ZERO_LSN(rep->ckp_lsn);
1269	} else
1270		ZERO_LSN(save_lsn);
1271
1272	/*
1273	 * If this is a perm record, we are using leases, update the lease
1274	 * grant.  We must hold the clientdb mutex.  We must not hold
1275	 * the region mutex because rep_update_grant will acquire it.
1276	 */
1277	if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) &&
1278	    timespecisset(&max_ts)) {
1279		if ((t_ret = __rep_update_grant(env, &max_ts)) != 0)
1280			ret = t_ret;
1281		else if (timespeccmp(&max_ts, &lp->max_lease_ts, >))
1282			lp->max_lease_ts = max_ts;
1283	}
1284
1285	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1286	if (!IS_ZERO_LSN(save_lsn)) {
1287		/*
1288		 * Now call memp_sync holding only the ckp mutex.
1289		 */
1290		MUTEX_LOCK(env, rep->mtx_ckp);
1291		RPRINT(env, DB_VERB_REP_MISC, (env,
1292		    "Starting delayed __memp_sync call [%lu][%lu]",
1293		    (u_long)save_lsn.file, (u_long)save_lsn.offset));
1294		t_ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &save_lsn);
1295		MUTEX_UNLOCK(env, rep->mtx_ckp);
1296	}
1297	if (event) {
1298		RPRINT(env, DB_VERB_REP_MISC, (env,
1299		    "Start-up is done [%lu][%lu]",
1300		    (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
1301
1302		if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) {
1303			DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM);
1304			/* Failure trumps either of those values. */
1305			ret = t_ret;
1306			goto out;
1307		}
1308	}
1309	if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove)
1310		__log_autoremove(env);
1311	if (control_dbt.data != NULL)
1312		__os_ufree(env, control_dbt.data);
1313	if (rec_dbt.data != NULL)
1314		__os_ufree(env, rec_dbt.data);
1315
1316out:
1317	switch (ret) {
1318	case 0:
1319		break;
1320	case DB_REP_ISPERM:
1321		RPRINT(env, DB_VERB_REP_MSGS,
1322		    (env, "Returning ISPERM [%lu][%lu], cmp = %d",
1323		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1324		break;
1325	case DB_REP_LOGREADY:
1326		RPRINT(env, DB_VERB_REP_MSGS, (env,
1327		    "Returning LOGREADY up to [%lu][%lu], cmp = %d",
1328		    (u_long)last_lsnp->file,
1329		    (u_long)last_lsnp->offset, cmp));
1330		break;
1331	case DB_REP_NOTPERM:
1332		if (!F_ISSET(rep, REP_F_RECOVER_LOG) &&
1333		    !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
1334			*ret_lsnp = max_lsn;
1335
1336		RPRINT(env, DB_VERB_REP_MSGS,
1337		    (env, "Returning NOTPERM [%lu][%lu], cmp = %d",
1338		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1339		break;
1340	default:
1341		RPRINT(env, DB_VERB_REP_MSGS,
1342		    (env, "Returning %d [%lu][%lu], cmp = %d", ret,
1343		    (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp));
1344		break;
1345	}
1346
1347	return (ret);
1348}
1349
1350/*
1351 * __rep_process_txn --
1352 *
1353 * This is the routine that actually gets a transaction ready for
1354 * processing.
1355 *
1356 * PUBLIC: int __rep_process_txn __P((ENV *, DBT *));
1357 */
1358int
1359__rep_process_txn(env, rec)
1360	ENV *env;
1361	DBT *rec;
1362{
1363	DBT data_dbt, *lock_dbt;
1364	DB_LOCKER *locker;
1365	DB_LOCKREQ req, *lvp;
1366	DB_LOGC *logc;
1367	DB_LSN prev_lsn, *lsnp;
1368	DB_REP *db_rep;
1369	DB_THREAD_INFO *ip;
1370	DB_TXNHEAD *txninfo;
1371	LSN_COLLECTION lc;
1372	REP *rep;
1373	__txn_regop_args *txn_args;
1374	__txn_regop_42_args *txn42_args;
1375	__txn_xa_regop_args *prep_args;
1376	u_int32_t rectype;
1377	u_int i;
1378	int ret, t_ret;
1379
1380	db_rep = env->rep_handle;
1381	rep = db_rep->region;
1382	logc = NULL;
1383	txn_args = NULL;
1384	txn42_args = NULL;
1385	prep_args = NULL;
1386	txninfo = NULL;
1387
1388	ENV_ENTER(env, ip);
1389	memset(&data_dbt, 0, sizeof(data_dbt));
1390	if (F_ISSET(env, ENV_THREAD))
1391		F_SET(&data_dbt, DB_DBT_REALLOC);
1392
1393	/*
1394	 * There are two phases:  First, we have to traverse backwards through
1395	 * the log records gathering the list of all LSNs in the transaction.
1396	 * Once we have this information, we can loop through and then apply it.
1397	 *
1398	 * We may be passed a prepare (if we're restoring a prepare on upgrade)
1399	 * instead of a commit (the common case).  Check which it is and behave
1400	 * appropriately.
1401	 */
1402	LOGCOPY_32(env, &rectype, rec->data);
1403	memset(&lc, 0, sizeof(lc));
1404	if (rectype == DB___txn_regop) {
1405		/*
1406		 * We're the end of a transaction.  Make sure this is
1407		 * really a commit and not an abort!
1408		 */
1409		if (rep->version >= DB_REPVERSION_44) {
1410			if ((ret = __txn_regop_read(
1411			    env, rec->data, &txn_args)) != 0)
1412				return (ret);
1413			if (txn_args->opcode != TXN_COMMIT) {
1414				__os_free(env, txn_args);
1415				return (0);
1416			}
1417			prev_lsn = txn_args->prev_lsn;
1418			lock_dbt = &txn_args->locks;
1419		} else {
1420			if ((ret = __txn_regop_42_read(
1421			    env, rec->data, &txn42_args)) != 0)
1422				return (ret);
1423			if (txn42_args->opcode != TXN_COMMIT) {
1424				__os_free(env, txn42_args);
1425				return (0);
1426			}
1427			prev_lsn = txn42_args->prev_lsn;
1428			lock_dbt = &txn42_args->locks;
1429		}
1430	} else {
1431		/* We're a prepare. */
1432		DB_ASSERT(env, rectype == DB___txn_xa_regop);
1433
1434		if ((ret = __txn_xa_regop_read(
1435		    env, rec->data, &prep_args)) != 0)
1436			return (ret);
1437		prev_lsn = prep_args->prev_lsn;
1438		lock_dbt = &prep_args->locks;
1439	}
1440
1441	/* Get locks. */
1442	if ((ret = __lock_id(env, NULL, &locker)) != 0)
1443		goto err1;
1444
1445	if ((ret =
1446	      __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
1447		goto err;
1448
1449	/* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
1450	if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0)
1451		goto err;
1452	qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
1453
1454	/*
1455	 * The set of records for a transaction may include dbreg_register
1456	 * records.  Create a txnlist so that they can keep track of file
1457	 * state between records.
1458	 */
1459	if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
1460		goto err;
1461
1462	/* Phase 2: Apply updates. */
1463	if ((ret = __log_cursor(env, &logc)) != 0)
1464		goto err;
1465	for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
1466		if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
1467			__db_errx(env, "failed to read the log at [%lu][%lu]",
1468			    (u_long)lsnp->file, (u_long)lsnp->offset);
1469			goto err;
1470		}
1471		if ((ret = __db_dispatch(env, &env->recover_dtab,
1472		    &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) {
1473			__db_errx(env, "transaction failed at [%lu][%lu]",
1474			    (u_long)lsnp->file, (u_long)lsnp->offset);
1475			goto err;
1476		}
1477	}
1478
1479err:	memset(&req, 0, sizeof(req));
1480	req.op = DB_LOCK_PUT_ALL;
1481	if ((t_ret =
1482	     __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0)
1483		ret = t_ret;
1484
1485	if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
1486		ret = t_ret;
1487
1488err1:	if (txn_args != NULL)
1489		__os_free(env, txn_args);
1490	if (txn42_args != NULL)
1491		__os_free(env, txn42_args);
1492	if (prep_args != NULL)
1493		__os_free(env, prep_args);
1494	if (lc.array != NULL)
1495		__os_free(env, lc.array);
1496
1497	if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
1498		ret = t_ret;
1499
1500	if (txninfo != NULL)
1501		__db_txnlist_end(env, txninfo);
1502
1503	if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
1504		__os_ufree(env, data_dbt.data);
1505
1506#ifdef HAVE_STATISTICS
1507	if (ret == 0)
1508		/*
1509		 * We don't hold the rep mutex, and could miscount if we race.
1510		 */
1511		rep->stat.st_txns_applied++;
1512#endif
1513
1514	return (ret);
1515}
1516
1517/*
1518 * __rep_collect_txn
1519 *	Recursive function that will let us visit every entry in a transaction
1520 *	chain including all child transactions so that we can then apply
1521 *	the entire transaction family at once.
1522 */
1523static int
1524__rep_collect_txn(env, lsnp, lc)
1525	ENV *env;
1526	DB_LSN *lsnp;
1527	LSN_COLLECTION *lc;
1528{
1529	__txn_child_args *argp;
1530	DB_LOGC *logc;
1531	DB_LSN c_lsn;
1532	DBT data;
1533	u_int32_t rectype;
1534	u_int nalloc;
1535	int ret, t_ret;
1536
1537	memset(&data, 0, sizeof(data));
1538	F_SET(&data, DB_DBT_REALLOC);
1539
1540	if ((ret = __log_cursor(env, &logc)) != 0)
1541		return (ret);
1542
1543	while (!IS_ZERO_LSN(*lsnp) &&
1544	    (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
1545		LOGCOPY_32(env, &rectype, data.data);
1546		if (rectype == DB___txn_child) {
1547			if ((ret = __txn_child_read(
1548			    env, data.data, &argp)) != 0)
1549				goto err;
1550			c_lsn = argp->c_lsn;
1551			*lsnp = argp->prev_lsn;
1552			__os_free(env, argp);
1553			ret = __rep_collect_txn(env, &c_lsn, lc);
1554		} else {
1555			if (lc->nalloc < lc->nlsns + 1) {
1556				nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
1557				if ((ret = __os_realloc(env,
1558				    nalloc * sizeof(DB_LSN), &lc->array)) != 0)
1559					goto err;
1560				lc->nalloc = nalloc;
1561			}
1562			lc->array[lc->nlsns++] = *lsnp;
1563
1564			/*
1565			 * Explicitly copy the previous lsn.  The record
1566			 * starts with a u_int32_t record type, a u_int32_t
1567			 * txn id, and then the DB_LSN (prev_lsn) that we
1568			 * want.  We copy explicitly because we have no idea
1569			 * what kind of record this is.
1570			 */
1571			LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data +
1572			    sizeof(u_int32_t) + sizeof(u_int32_t));
1573		}
1574
1575		if (ret != 0)
1576			goto err;
1577	}
1578	if (ret != 0)
1579		__db_errx(env, "collect failed at: [%lu][%lu]",
1580		    (u_long)lsnp->file, (u_long)lsnp->offset);
1581
1582err:	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
1583		ret = t_ret;
1584	if (data.data != NULL)
1585		__os_ufree(env, data.data);
1586	return (ret);
1587}
1588
1589/*
1590 * __rep_lsn_cmp --
1591 *	qsort-type-compatible wrapper for LOG_COMPARE.
1592 */
1593static int
1594__rep_lsn_cmp(lsn1, lsn2)
1595	const void *lsn1, *lsn2;
1596{
1597
1598	return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2));
1599}
1600
1601/*
1602 * __rep_newfile --
1603 *	NEWFILE messages have the LSN of the last record in the previous
1604 * log file.  When applying a NEWFILE message, make sure we haven't already
1605 * swapped files.  Assume caller hold mtx_clientdb.
1606 */
1607static int
1608__rep_newfile(env, rp, rec)
1609	ENV *env;
1610	__rep_control_args *rp;
1611	DBT *rec;
1612{
1613	DB_LOG *dblp;
1614	DB_LSN tmplsn;
1615	DB_REP *db_rep;
1616	LOG *lp;
1617	REP *rep;
1618	__rep_newfile_args nf_args;
1619	int ret;
1620
1621	dblp = env->lg_handle;
1622	lp = dblp->reginfo.primary;
1623	db_rep = env->rep_handle;
1624	rep = db_rep->region;
1625
1626	/*
1627	 * If a newfile is already in progress, just ignore.
1628	 */
1629	if (F_ISSET(rep, REP_F_NEWFILE))
1630		return (0);
1631	if (rp->lsn.file + 1 > lp->ready_lsn.file) {
1632		if (rec == NULL || rec->size == 0) {
1633			RPRINT(env, DB_VERB_REP_MISC, (env,
1634"rep_newfile: Old-style NEWFILE msg.  Use control msg log version: %lu",
1635    (u_long) rp->log_version));
1636			nf_args.version = rp->log_version;
1637		} else if (rp->rep_version < DB_REPVERSION_47)
1638			nf_args.version = *(u_int32_t *)rec->data;
1639		else if ((ret = __rep_newfile_unmarshal(env, &nf_args,
1640		    rec->data, rec->size, NULL)) != 0)
1641			return (ret);
1642		RPRINT(env, DB_VERB_REP_MISC,
1643		    (env, "rep_newfile: File %lu vers %lu",
1644		    (u_long)rp->lsn.file + 1, (u_long)nf_args.version));
1645
1646		/*
1647		 * We drop the mtx_clientdb mutex during
1648		 * the file operation, and then reacquire it when
1649		 * we're done.  We avoid colliding with new incoming
1650		 * log records because lp->ready_lsn is not getting
1651		 * updated and there is no real log record at this
1652		 * ready_lsn.  We avoid colliding with a duplicate
1653		 * NEWFILE message by setting an in-progress flag.
1654		 */
1655		REP_SYSTEM_LOCK(env);
1656		F_SET(rep, REP_F_NEWFILE);
1657		REP_SYSTEM_UNLOCK(env);
1658		MUTEX_UNLOCK(env, rep->mtx_clientdb);
1659		LOG_SYSTEM_LOCK(env);
1660		ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version);
1661		LOG_SYSTEM_UNLOCK(env);
1662		MUTEX_LOCK(env, rep->mtx_clientdb);
1663		REP_SYSTEM_LOCK(env);
1664		F_CLR(rep, REP_F_NEWFILE);
1665		REP_SYSTEM_UNLOCK(env);
1666		if (ret == 0)
1667			lp->ready_lsn = tmplsn;
1668		return (ret);
1669	} else
1670		/* We've already applied this NEWFILE.  Just ignore it. */
1671		return (0);
1672}
1673
1674/*
1675 * __rep_do_ckp --
1676 * Perform the memp_sync necessary for this checkpoint without holding the
1677 * REP->mtx_clientdb.  Callers of this function must hold REP->mtx_clientdb
1678 * and must not be holding the region mutex.
1679 */
1680static int
1681__rep_do_ckp(env, rec, rp)
1682	ENV *env;
1683	DBT *rec;
1684	__rep_control_args *rp;
1685{
1686	DB_ENV *dbenv;
1687	__txn_ckp_args *ckp_args;
1688	DB_LSN ckp_lsn;
1689	REP *rep;
1690	int ret;
1691
1692	dbenv = env->dbenv;
1693
1694	/* Crack the log record and extract the checkpoint LSN. */
1695	if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0)
1696		return (ret);
1697	ckp_lsn = ckp_args->ckp_lsn;
1698	__os_free(env, ckp_args);
1699
1700	rep = env->rep_handle->region;
1701
1702	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1703	DB_TEST_WAIT(env, env->test_check);
1704
1705	/*
1706	 * Sync the memory pool.
1707	 *
1708	 * This is the real PERM lock record/ckp.  We cannot return ISPERM
1709	 * if we haven't truly completed the checkpoint, so we don't allow
1710	 * this call to be interrupted.
1711	 *
1712	 * We may be overlapping our log record with an in-progress startsync
1713	 * of this checkpoint; suppress the max_write settings on any running
1714	 * cache-flush operation so it completes quickly.
1715	 */
1716	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1);
1717	MUTEX_LOCK(env, rep->mtx_ckp);
1718	ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn);
1719	MUTEX_UNLOCK(env, rep->mtx_ckp);
1720	(void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0);
1721
1722	/* Update the last_ckp in the txn region. */
1723	if (ret == 0)
1724		ret = __txn_updateckp(env, &rp->lsn);
1725	else {
1726		__db_errx(env, "Error syncing ckp [%lu][%lu]",
1727		    (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
1728		ret = __env_panic(env, ret);
1729	}
1730
1731	MUTEX_LOCK(env, rep->mtx_clientdb);
1732	return (ret);
1733}
1734
1735/*
1736 * __rep_remfirst --
1737 * Remove the first entry from the __db.rep.db
1738 */
1739static int
1740__rep_remfirst(env, ip, cntrl, rec)
1741	ENV *env;
1742	DB_THREAD_INFO *ip;
1743	DBT *cntrl;
1744	DBT *rec;
1745{
1746	DB *dbp;
1747	DBC *dbc;
1748	DB_REP *db_rep;
1749	int ret, t_ret;
1750
1751	db_rep = env->rep_handle;
1752	dbp = db_rep->rep_db;
1753	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
1754		return (ret);
1755
1756	/* The DBTs need to persist through another call. */
1757	F_SET(cntrl, DB_DBT_REALLOC);
1758	F_SET(rec, DB_DBT_REALLOC);
1759	if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
1760		ret = __dbc_del(dbc, 0);
1761	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1762		ret = t_ret;
1763
1764	return (ret);
1765}
1766
1767/*
1768 * __rep_getnext --
1769 * Get the next record out of the __db.rep.db table.
1770 */
1771static int
1772__rep_getnext(env, ip)
1773	ENV *env;
1774	DB_THREAD_INFO *ip;
1775{
1776	DB *dbp;
1777	DBC *dbc;
1778	DBT lsn_dbt, nextrec_dbt;
1779	DB_LOG *dblp;
1780	DB_REP *db_rep;
1781	LOG *lp;
1782	__rep_control_args *rp;
1783	int ret, t_ret;
1784
1785	dblp = env->lg_handle;
1786	lp = dblp->reginfo.primary;
1787
1788	db_rep = env->rep_handle;
1789	dbp = db_rep->rep_db;
1790
1791	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
1792		return (ret);
1793
1794	/*
1795	 * Update waiting_lsn.  We need to move it
1796	 * forward to the LSN of the next record
1797	 * in the queue.
1798	 *
1799	 * If the next item in the database is a log
1800	 * record--the common case--we're not
1801	 * interested in its contents, just in its LSN.
1802	 * Optimize by doing a partial get of the data item.
1803	 */
1804	memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
1805	F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
1806	nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
1807
1808	memset(&lsn_dbt, 0, sizeof(lsn_dbt));
1809	ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
1810	if (ret != DB_NOTFOUND && ret != 0)
1811		goto err;
1812
1813	if (ret == DB_NOTFOUND) {
1814		ZERO_LSN(lp->waiting_lsn);
1815		/*
1816		 * Whether or not the current record is
1817		 * simple, there's no next one, and
1818		 * therefore we haven't got anything
1819		 * else to do right now.  Break out.
1820		 */
1821		goto err;
1822	}
1823	rp = (__rep_control_args *)lsn_dbt.data;
1824	lp->waiting_lsn = rp->lsn;
1825
1826err:	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1827		ret = t_ret;
1828	return (ret);
1829}
1830
1831/*
1832 * __rep_process_rec --
1833 *
1834 * Given a record in 'rp', process it.  In the case of a NEWFILE, that means
1835 * potentially switching files.  In the case of a checkpoint, it means doing
1836 * the checkpoint, and in other cases, it means simply writing the record into
1837 * the log.
1838 */
1839static int
1840__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
1841	ENV *env;
1842	DB_THREAD_INFO *ip;
1843	__rep_control_args *rp;
1844	DBT *rec;
1845	db_timespec *ret_tsp;
1846	DB_LSN *ret_lsnp;
1847{
1848	DB *dbp;
1849	DBT control_dbt, key_dbt, rec_dbt;
1850	DB_REP *db_rep;
1851	REP *rep;
1852	db_timespec msg_time;
1853	u_int32_t rectype, txnid;
1854	int ret, t_ret;
1855
1856	db_rep = env->rep_handle;
1857	rep = db_rep->region;
1858	dbp = db_rep->rep_db;
1859	ret = 0;
1860
1861	if (rp->rectype == REP_NEWFILE) {
1862		ret = __rep_newfile(env, rp, rec);
1863		return (0);
1864	}
1865
1866	LOGCOPY_32(env, &rectype, rec->data);
1867	memset(&control_dbt, 0, sizeof(control_dbt));
1868	memset(&rec_dbt, 0, sizeof(rec_dbt));
1869	timespecset(&msg_time, rp->msg_sec, rp->msg_nsec);
1870
1871	/*
1872	 * We write all records except for checkpoint records here.
1873	 * All non-checkpoint records need to appear in the log before
1874	 * we take action upon them (i.e., we enforce write-ahead logging).
1875	 * However, we can't write the checkpoint record here until the
1876	 * data buffers are actually written to disk, else we are creating
1877	 * an invalid log -- one that says all data before a certain point
1878	 * has been written to disk.
1879	 *
1880	 * If two threads are both processing the same checkpoint record
1881	 * (because, for example, it was resent and the original finally
1882	 * arrived), we handle that below by checking for the existence of
1883	 * the log record when we add it to the replication database.
1884	 *
1885	 * Any log records that arrive while we are processing the checkpoint
1886	 * are added to the bookkeeping database because ready_lsn is not yet
1887	 * updated to point after the checkpoint record.
1888	 */
1889	if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
1890		if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0)
1891			return (ret);
1892		STAT(rep->stat.st_log_records++);
1893		if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
1894			*ret_lsnp = rp->lsn;
1895			goto out;
1896		}
1897	}
1898
1899	switch (rectype) {
1900	case DB___dbreg_register:
1901		/*
1902		 * DB opens occur in the context of a transaction, so we can
1903		 * simply handle them when we process the transaction.  Closes,
1904		 * however, are not transaction-protected, so we have to handle
1905		 * them here.
1906		 *
1907		 * It should be unsafe for the master to do a close of a file
1908		 * that was opened in an active transaction, so we should be
1909		 * guaranteed to get the ordering right.
1910		 *
1911		 * !!!
1912		 * The txn ID is the second 4-byte field of the log record.
1913		 * We should really be calling __dbreg_register_read() and
1914		 * working from the __dbreg_register_args structure, but this
1915		 * is considerably faster and the order of the fields won't
1916		 * change.
1917		 */
1918		LOGCOPY_32(env, &txnid,
1919		    (u_int8_t *)rec->data + sizeof(u_int32_t));
1920		if (txnid == TXN_INVALID)
1921			ret = __db_dispatch(env, &env->recover_dtab,
1922			    rec, &rp->lsn, DB_TXN_APPLY, NULL);
1923		break;
1924	case DB___txn_regop:
1925		/*
1926		 * If an application is doing app-specific recovery
1927		 * and acquires locks while applying a transaction,
1928		 * it can deadlock.  Any other locks held by this
1929		 * thread should have been discarded in the
1930		 * __rep_process_txn error path, so if we simply
1931		 * retry, we should eventually succeed.
1932		 */
1933		do {
1934			ret = 0;
1935			if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
1936				ret = __txn_openfiles(env, ip, NULL, 1);
1937				F_SET(db_rep, DBREP_OPENFILES);
1938			}
1939			if (ret == 0)
1940				ret = __rep_process_txn(env, rec);
1941		} while (ret == DB_LOCK_DEADLOCK);
1942
1943		/* Now flush the log unless we're running TXN_NOSYNC. */
1944		if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
1945			ret = __log_flush(env, NULL);
1946		if (ret != 0) {
1947			__db_errx(env, "Error processing txn [%lu][%lu]",
1948			    (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
1949			ret = __env_panic(env, ret);
1950		}
1951		break;
1952	case DB___txn_xa_regop:
1953		ret = __log_flush(env, NULL);
1954		/*
1955		 * Save the biggest prepared LSN we've seen.
1956		 */
1957		rep->max_prep_lsn = rp->lsn;
1958		RPRINT(env, DB_VERB_REP_MSGS,
1959		    (env, "process_rec: prepare at [%lu][%lu]",
1960		    (u_long)rep->max_prep_lsn.file,
1961		    (u_long)rep->max_prep_lsn.offset));
1962		break;
1963	case DB___txn_ckp:
1964		/*
1965		 * We do not want to hold the REP->mtx_clientdb mutex while
1966		 * syncing the mpool, so if we get a checkpoint record we are
1967		 * supposed to process, add it to the __db.rep.db, do the
1968		 * memp_sync and then go back and process it later, when the
1969		 * sync has finished.  If this record is already in the table,
1970		 * then some other thread will process it, so simply return
1971		 * REP_NOTPERM.
1972		 */
1973		memset(&key_dbt, 0, sizeof(key_dbt));
1974		key_dbt.data = rp;
1975		key_dbt.size = sizeof(*rp);
1976
1977		/*
1978		 * We want to put this record into the tmp DB only if
1979		 * it doesn't exist, so use DB_NOOVERWRITE.
1980		 */
1981		ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE);
1982		if (ret == DB_KEYEXIST) {
1983			if (ret_lsnp != NULL)
1984				*ret_lsnp = rp->lsn;
1985			ret = DB_REP_NOTPERM;
1986		}
1987		if (ret != 0)
1988			break;
1989
1990		/*
1991		 * Now, do the checkpoint.  Regardless of
1992		 * whether the checkpoint succeeds or not,
1993		 * we need to remove the record we just put
1994		 * in the temporary database.  If the
1995		 * checkpoint failed, return an error.  We
1996		 * will act like we never received the
1997		 * checkpoint.
1998		 */
1999		if ((ret = __rep_do_ckp(env, rec, rp)) == 0)
2000			ret = __log_rep_put(env, &rp->lsn, rec,
2001			    DB_LOG_CHKPNT);
2002		if ((t_ret = __rep_remfirst(env, ip,
2003		    &control_dbt, &rec_dbt)) != 0 && ret == 0)
2004			ret = t_ret;
2005		/*
2006		 * If we're successful putting the log record in the
2007		 * log, flush it for a checkpoint.
2008		 */
2009		if (ret == 0)
2010			ret = __log_flush(env, NULL);
2011		break;
2012	default:
2013		break;
2014	}
2015
2016out:
2017	if (ret == 0 && F_ISSET(rp, REPCTL_PERM))
2018		*ret_lsnp = rp->lsn;
2019	if (IS_USING_LEASES(env) &&
2020	    F_ISSET(rp, REPCTL_LEASE))
2021		*ret_tsp = msg_time;
2022	/*
2023	 * Set ret_lsnp before flushing the log because if the
2024	 * flush fails, we've still written the record to the
2025	 * log and the LSN has been entered.
2026	 */
2027	if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH))
2028		ret = __log_flush(env, NULL);
2029	if (control_dbt.data != NULL)
2030		__os_ufree(env, control_dbt.data);
2031	if (rec_dbt.data != NULL)
2032		__os_ufree(env, rec_dbt.data);
2033
2034	return (ret);
2035}
2036
2037/*
2038 * __rep_resend_req --
2039 *	We might have dropped a message, we need to resend our request.
2040 *	The request we send is dependent on what recovery state we're in.
2041 *	The caller holds no locks.
2042 *
2043 * PUBLIC: int __rep_resend_req __P((ENV *, int));
2044 */
2045int
2046__rep_resend_req(env, rereq)
2047	ENV *env;
2048	int rereq;
2049{
2050	DB_LOG *dblp;
2051	DB_LSN lsn;
2052	DB_REP *db_rep;
2053	LOG *lp;
2054	REP *rep;
2055	int ret;
2056	u_int32_t gapflags, repflags;
2057
2058	db_rep = env->rep_handle;
2059	rep = db_rep->region;
2060	dblp = env->lg_handle;
2061	lp = dblp->reginfo.primary;
2062	ret = 0;
2063
2064	repflags = rep->flags;
2065	/*
2066	 * If we are delayed we do not rerequest anything.
2067	 */
2068	if (FLD_ISSET(repflags, REP_F_DELAY))
2069		return (ret);
2070	gapflags = rereq ? REP_GAP_REREQUEST : 0;
2071
2072	if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
2073		MUTEX_LOCK(env, rep->mtx_clientdb);
2074		lsn = lp->verify_lsn;
2075		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2076		if (!IS_ZERO_LSN(lsn))
2077			(void)__rep_send_message(env, rep->master_id,
2078			    REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_REREQUEST);
2079	} else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
2080		/*
2081		 * UPDATE_REQ only goes to the master.
2082		 */
2083		(void)__rep_send_message(env, rep->master_id,
2084		    REP_UPDATE_REQ, NULL, NULL, 0, 0);
2085	} else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
2086		REP_SYSTEM_LOCK(env);
2087		ret = __rep_pggap_req(env, rep, NULL, gapflags);
2088		REP_SYSTEM_UNLOCK(env);
2089	} else {
2090		MUTEX_LOCK(env, rep->mtx_clientdb);
2091		ret = __rep_loggap_req(env, rep, NULL, gapflags);
2092		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2093	}
2094
2095	return (ret);
2096}
2097
2098/*
2099 * __rep_check_doreq --
2100 * PUBLIC: int __rep_check_doreq __P((ENV *, REP *));
2101 *
2102 * Check if we need to send another request.  If so, compare with
2103 * the request limits the user might have set.  This assumes the
2104 * caller holds the REP->mtx_clientdb mutex.  Returns 1 if a request
2105 * needs to be made, and 0 if it does not.
2106 */
2107int
2108__rep_check_doreq(env, rep)
2109	ENV *env;
2110	REP *rep;
2111{
2112
2113	DB_LOG *dblp;
2114	LOG *lp;
2115	db_timespec now;
2116	int req;
2117
2118	dblp = env->lg_handle;
2119	lp = dblp->reginfo.primary;
2120	__os_gettime(env, &now, 1);
2121	timespecsub(&now, &lp->rcvd_ts);
2122	req = timespeccmp(&now, &lp->wait_ts, >=);
2123	if (req) {
2124		/*
2125		 * Add wait_ts to itself to double it.
2126		 */
2127		timespecadd(&lp->wait_ts, &lp->wait_ts);
2128		if (timespeccmp(&lp->wait_ts, &rep->max_gap, >))
2129			lp->wait_ts = rep->max_gap;
2130		__os_gettime(env, &lp->rcvd_ts, 1);
2131	}
2132	return (req);
2133}
2134
2135/*
2136 * __rep_skip_msg -
2137 *
2138 *	If we're in recovery we want to skip/ignore the message, but
2139 *	we also need to see if we need to re-request any retransmissions.
2140 */
2141static int
2142__rep_skip_msg(env, rep, eid, rectype)
2143	ENV *env;
2144	REP *rep;
2145	int eid;
2146	u_int32_t rectype;
2147{
2148	int do_req, ret;
2149
2150	ret = 0;
2151	/*
2152	 * If we have a request message from a client then immediately
2153	 * send a REP_REREQUEST back to that client since we're skipping it.
2154	 */
2155	if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype))
2156		do_req = 1;
2157	else {
2158		/* Check for need to retransmit. */
2159		MUTEX_LOCK(env, rep->mtx_clientdb);
2160		do_req = __rep_check_doreq(env, rep);
2161		MUTEX_UNLOCK(env, rep->mtx_clientdb);
2162	}
2163	/*
2164	 * Don't respond to a MASTER_REQ with
2165	 * a MASTER_REQ or REREQUEST.
2166	 */
2167	if (do_req && rectype != REP_MASTER_REQ) {
2168		/*
2169		 * There are three cases:
2170		 * 1.  If we don't know who the master is, then send MASTER_REQ.
2171		 * 2.  If the message we're skipping came from the master,
2172		 * then we need to rerequest.
2173		 * 3.  If the message didn't come from a master (i.e. client
2174		 * to client), then send a rerequest back to the sender so
2175		 * the sender can rerequest it elsewhere, if we are a client.
2176		 */
2177		if (rep->master_id == DB_EID_INVALID)	/* Case 1. */
2178			(void)__rep_send_message(env,
2179			    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
2180		else if (eid == rep->master_id)		/* Case 2. */
2181			ret = __rep_resend_req(env, 0);
2182		else if (F_ISSET(rep, REP_F_CLIENT))	/* Case 3. */
2183			(void)__rep_send_message(env,
2184			    eid, REP_REREQUEST, NULL, NULL, 0, 0);
2185	}
2186	return (ret);
2187}
2188
2189static int
2190__rep_fire_newmaster(env, gen, master)
2191	ENV *env;
2192	u_int32_t gen;
2193	int master;
2194{
2195	DB_REP *db_rep;
2196	REP *rep;
2197
2198	db_rep = env->rep_handle;
2199	rep = db_rep->region;
2200
2201	REP_EVENT_LOCK(env);
2202	/*
2203	 * The firing of this event should be idempotent with respect to a
2204	 * particular generation number.
2205	 */
2206	if (rep->newmaster_event_gen < gen) {
2207		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2208		rep->newmaster_event_gen = gen;
2209	}
2210	REP_EVENT_UNLOCK(env);
2211	return (0);
2212}
2213
2214static int
2215__rep_fire_startupdone(env, gen, master)
2216	ENV *env;
2217	u_int32_t gen;
2218	int master;
2219{
2220	DB_REP *db_rep;
2221	REP *rep;
2222
2223	db_rep = env->rep_handle;
2224	rep = db_rep->region;
2225
2226	REP_EVENT_LOCK(env);
2227	/*
2228	 * Usually NEWMASTER will already have been fired.  But if not, fire
2229	 * it here now, to ensure the application receives events in the
2230	 * expected order.
2231	 */
2232	if (rep->newmaster_event_gen < gen) {
2233		__rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master);
2234		rep->newmaster_event_gen = gen;
2235	}
2236
2237	/*
2238	 * Caller already ensures that it only tries to fire STARTUPDONE once
2239	 * per generation.  If we did not want to rely on that, we could add a
2240	 * simple boolean flag (to the set of data protected by the mtx_event).
2241	 * The precise meaning of that flag would be "STARTUPDONE has been fired
2242	 * for the generation value stored in `newmaster_event_gen'".  Then the
2243	 * more accurate test here would be simply to check that flag, and fire
2244	 * the event (and set the flag) if it were not already set.
2245	 */
2246	if (rep->newmaster_event_gen == gen)
2247		__rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL);
2248	REP_EVENT_UNLOCK(env);
2249	return (0);
2250}
2251