1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2004,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_log.c,v 12.79 2008/03/13 16:21:04 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/log.h"
13
14static int __rep_chk_newfile __P((ENV *, DB_LOGC *, REP *,
15    __rep_control_args *, int));
16
17/*
18 * __rep_allreq --
19 *      Handle a REP_ALL_REQ message.
20 *
21 * PUBLIC: int __rep_allreq __P((ENV *, __rep_control_args *, int));
22 */
23int
24__rep_allreq(env, rp, eid)
25	ENV *env;
26	__rep_control_args *rp;
27	int eid;
28{
29	DBT data_dbt, newfiledbt;
30	DB_LOGC *logc;
31	DB_LSN log_end, oldfilelsn;
32	DB_REP *db_rep;
33	REP *rep;
34	REP_BULK bulk;
35	REP_THROTTLE repth;
36	__rep_newfile_args nf_args;
37	uintptr_t bulkoff;
38	u_int32_t bulkflags, end_flag, flags, use_bulk;
39	int ret, t_ret;
40	u_int8_t buf[__REP_NEWFILE_SIZE];
41	size_t len;
42
43	ret = 0;
44	db_rep = env->rep_handle;
45	rep = db_rep->region;
46	end_flag = 0;
47
48	if ((ret = __log_cursor(env, &logc)) != 0)
49		return (ret);
50	memset(&data_dbt, 0, sizeof(data_dbt));
51	/*
52	 * If we're doing bulk transfer, allocate a bulk buffer to put our
53	 * log records in.  We still need to initialize the throttle info
54	 * because if we encounter a log record larger than our entire bulk
55	 * buffer, we need to send it as a singleton and also we want to
56	 * support throttling with bulk.
57	 *
58	 * Use a local var so we don't need to worry if someone else turns
59	 * on/off bulk in the middle of our call.
60	 */
61	use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
62	bulk.addr = NULL;
63	if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid,
64	    &bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
65		goto err;
66	memset(&repth, 0, sizeof(repth));
67	REP_SYSTEM_LOCK(env);
68	repth.gbytes = rep->gbytes;
69	repth.bytes = rep->bytes;
70	oldfilelsn = repth.lsn = rp->lsn;
71	repth.type = REP_LOG;
72	repth.data_dbt = &data_dbt;
73	REP_SYSTEM_UNLOCK(env);
74
75	/*
76	 * Get the LSN of the end of the log, so that in our reading loop
77	 * (below), we can recognize when we get there, and set the
78	 * REPCTL_LOG_END flag.
79	 */
80	if ((ret = __logc_get(logc, &log_end, &data_dbt, DB_LAST)) != 0) {
81		if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER))
82			ret = 0;
83		goto err;
84	}
85
86	flags = IS_ZERO_LSN(rp->lsn) ||
87	    IS_INIT_LSN(rp->lsn) ?  DB_FIRST : DB_SET;
88	/*
89	 * We get the first item so that a client servicing requests
90	 * can distinguish between not having the records and reaching
91	 * the end of its log.  Return the DB_NOTFOUND if the client
92	 * cannot get the record.  Return 0 if we finish the loop and
93	 * sent all that we have.
94	 */
95	ret = __logc_get(logc, &repth.lsn, &data_dbt, flags);
96	/*
97	 * If the client is asking for all records
98	 * because it doesn't have any, and our first
99	 * record is not in the first log file, then
100	 * the client is outdated and needs to get a
101	 * VERIFY_FAIL.
102	 */
103	if (ret == 0 && repth.lsn.file != 1 && flags == DB_FIRST) {
104		(void)__rep_send_message(env, eid,
105		    REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0);
106		goto err;
107	}
108	/*
109	 * If we got DB_NOTFOUND it could be because the LSN we were
110	 * given is at the end of the log file and we need to switch
111	 * log files.  Reinitialize and get the current record when we return.
112	 */
113	if (ret == DB_NOTFOUND) {
114		ret = __rep_chk_newfile(env, logc, rep, rp, eid);
115		/*
116		 * If we still get DB_NOTFOUND the client gave us a
117		 * bad or unknown LSN.  Ignore it if we're the master.
118		 * Any other error is returned.
119		 */
120		if (ret == 0)
121			ret = __logc_get(logc, &repth.lsn,
122			    &data_dbt, DB_CURRENT);
123		if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) {
124			ret = 0;
125			goto err;
126		}
127		if (ret != 0)
128			goto err;
129	}
130
131	/*
132	 * For singleton log records, we break when we get a REP_LOG_MORE.
133	 * Or if we're not using throttling, or we are using bulk, we stop
134	 * when we reach the end (i.e. ret != 0).
135	 */
136	for (end_flag = 0;
137	    ret == 0 && repth.type != REP_LOG_MORE && end_flag == 0;
138	    ret = __logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) {
139		/*
140		 * If we just changed log files, we need to send the
141		 * version of this log file to the client.
142		 */
143		if (repth.lsn.file != oldfilelsn.file) {
144			if ((ret = __logc_version(logc, &nf_args.version)) != 0)
145				break;
146			memset(&newfiledbt, 0, sizeof(newfiledbt));
147			if (rep->version < DB_REPVERSION_47)
148				DB_INIT_DBT(newfiledbt, &nf_args.version,
149				    sizeof(nf_args.version));
150			else {
151				if ((ret = __rep_newfile_marshal(env, &nf_args,
152				    buf, __REP_NEWFILE_SIZE, &len)) != 0)
153					goto err;
154				DB_INIT_DBT(newfiledbt, buf, len);
155			}
156			(void)__rep_send_message(env,
157			    eid, REP_NEWFILE, &oldfilelsn, &newfiledbt,
158			    REPCTL_RESEND, 0);
159		}
160
161		/*
162		 * Mark the end of the ALL_REQ response to show that the
163		 * receiving client should now be "caught up" with the
164		 * replication group.  If we're the master, then our log end is
165		 * certainly authoritative.  If we're another client, only if we
166		 * ourselves have reached STARTUPDONE.
167		 */
168		end_flag = (LOG_COMPARE(&repth.lsn, &log_end) >= 0 &&
169		    (F_ISSET(rep, REP_F_MASTER) ||
170		    rep->stat.st_startup_complete)) ?
171		    REPCTL_LOG_END : 0;
172		/*
173		 * If we are configured for bulk, try to send this as a bulk
174		 * request.  If not configured, or it is too big for bulk
175		 * then just send normally.
176		 */
177		if (use_bulk)
178			ret = __rep_bulk_message(env, &bulk, &repth,
179			    &repth.lsn, &data_dbt, (REPCTL_RESEND | end_flag));
180		if (!use_bulk || ret == DB_REP_BULKOVF)
181			ret = __rep_send_throttle(env,
182			    eid, &repth, 0, end_flag);
183		if (ret != 0)
184			break;
185		/*
186		 * If we are about to change files, then we'll need the
187		 * last LSN in the previous file.  Save it here.
188		 */
189		oldfilelsn = repth.lsn;
190		oldfilelsn.offset += logc->len;
191	}
192
193	if (ret == DB_NOTFOUND || ret == DB_REP_UNAVAIL)
194		ret = 0;
195	/*
196	 * We're done, force out whatever remains in the bulk buffer and
197	 * free it.
198	 */
199err:
200	if (bulk.addr != NULL && (t_ret = __rep_bulk_free(env, &bulk,
201	    (REPCTL_RESEND | end_flag))) != 0 && ret == 0)
202		ret = t_ret;
203	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
204		ret = t_ret;
205	return (ret);
206}
207
208/*
209 * __rep_log --
210 *      Handle a REP_LOG/REP_LOG_MORE message.
211 *
212 * PUBLIC: int __rep_log __P((ENV *, DB_THREAD_INFO *,
213 * PUBLIC:     __rep_control_args *, DBT *, time_t, DB_LSN *));
214 */
215int
216__rep_log(env, ip, rp, rec, savetime, ret_lsnp)
217	ENV *env;
218	DB_THREAD_INFO *ip;
219	__rep_control_args *rp;
220	DBT *rec;
221	time_t savetime;
222	DB_LSN *ret_lsnp;
223{
224	DB_LOG *dblp;
225	DB_LSN last_lsn, lsn;
226	DB_REP *db_rep;
227	LOG *lp;
228	REP *rep;
229	int is_dup, master, ret;
230
231	is_dup = ret = 0;
232	db_rep = env->rep_handle;
233	rep = db_rep->region;
234	dblp = env->lg_handle;
235	lp = dblp->reginfo.primary;
236
237	ret = __rep_apply(env, ip, rp, rec, ret_lsnp, &is_dup, &last_lsn);
238	switch (ret) {
239	/*
240	 * We're in an internal backup and we've gotten
241	 * all the log we need to run recovery.  Do so now.
242	 */
243	case DB_REP_LOGREADY:
244		if ((ret =
245		    __rep_logready(env, rep, savetime, &last_lsn)) != 0)
246			goto out;
247		break;
248	/*
249	 * If we get any of the "normal" returns, we only process
250	 * LOG_MORE if this is not a duplicate record.  If the
251	 * record is a duplicate we don't want to handle LOG_MORE
252	 * and request a multiple data stream (or trigger internal
253	 * initialization) since this could be a very old record
254	 * that no longer exists on the master.
255	 */
256	case DB_REP_ISPERM:
257	case DB_REP_NOTPERM:
258	case 0:
259		if (is_dup)
260			goto out;
261		else
262			break;
263	/*
264	 * Any other return (errors), we're done.
265	 */
266	default:
267		goto out;
268	}
269	if (rp->rectype == REP_LOG_MORE) {
270		master = rep->master_id;
271
272		/*
273		 * Keep the cycle from stalling: In case we got the LOG_MORE out
274		 * of order, before some preceding log records, we want to make
275		 * sure our follow-up request resumes from where the LOG_MORE
276		 * said it should.  (If the preceding log records never arrive,
277		 * normal gap processing should take care of asking for them.)
278		 * But if we already have this record and/or more, we need to
279		 * ask to resume from what we need.  The upshot is we need the
280		 * max of lp->lsn and the lsn from the message.
281		 */
282		MUTEX_LOCK(env, rep->mtx_clientdb);
283		lsn = lp->ready_lsn;
284		if (LOG_COMPARE(&rp->lsn, &lsn) > 0)
285			lsn = rp->lsn;
286
287		/*
288		 * If the master_id is invalid, this means that since
289		 * the last record was sent, somebody declared an
290		 * election and we may not have a master to request
291		 * things of.
292		 *
293		 * This is not an error;  when we find a new master,
294		 * we'll re-negotiate where the end of the log is and
295		 * try to bring ourselves up to date again anyway.
296		 */
297		if (master == DB_EID_INVALID) {
298			ret = 0;
299			MUTEX_UNLOCK(env, rep->mtx_clientdb);
300			goto out;
301		}
302		/*
303		 * If we're waiting for records, set the wait_ts
304		 * high so that we avoid re-requesting too soon and
305		 * end up with multiple data streams.
306		 */
307		if (IS_ZERO_LSN(lp->waiting_lsn))
308			lp->wait_ts = rep->max_gap;
309		ret = __rep_loggap_req(env, rep, &lsn, REP_GAP_FORCE);
310		MUTEX_UNLOCK(env, rep->mtx_clientdb);
311	}
312out:
313	return (ret);
314}
315
316/*
317 * __rep_bulk_log --
318 *      Handle a REP_BULK_LOG message.
319 *
320 * PUBLIC: int __rep_bulk_log __P((ENV *, DB_THREAD_INFO *,
321 * PUBLIC:     __rep_control_args *, DBT *, time_t, DB_LSN *));
322 */
323int
324__rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp)
325	ENV *env;
326	DB_THREAD_INFO *ip;
327	__rep_control_args *rp;
328	DBT *rec;
329	time_t savetime;
330	DB_LSN *ret_lsnp;
331{
332	DB_LSN last_lsn;
333	DB_REP *db_rep;
334	REP *rep;
335	int ret;
336
337	db_rep = env->rep_handle;
338	rep = db_rep->region;
339
340	ret = __log_rep_split(env, ip, rp, rec, ret_lsnp, &last_lsn);
341	switch (ret) {
342	/*
343	 * We're in an internal backup and we've gotten
344	 * all the log we need to run recovery.  Do so now.
345	 */
346	case DB_REP_LOGREADY:
347		ret = __rep_logready(env, rep, savetime, &last_lsn);
348		break;
349	/*
350	 * Any other return (errors), we're done.
351	 */
352	default:
353		break;
354	}
355	return (ret);
356}
357
358/*
359 * __rep_log_req --
360 *      Handle a REP_LOG_REQ message.
361 *
362 * PUBLIC: int __rep_logreq __P((ENV *, __rep_control_args *, DBT *, int));
363 */
364int
365__rep_logreq(env, rp, rec, eid)
366	ENV *env;
367	__rep_control_args *rp;
368	DBT *rec;
369	int eid;
370{
371	DBT data_dbt, newfiledbt;
372	DB_LOGC *logc;
373	DB_LSN firstlsn, lsn, oldfilelsn;
374	DB_REP *db_rep;
375	REP *rep;
376	REP_BULK bulk;
377	REP_THROTTLE repth;
378	__rep_logreq_args lr_args;
379	__rep_newfile_args nf_args;
380	uintptr_t bulkoff;
381	u_int32_t bulkflags, use_bulk;
382	int ret, t_ret;
383	u_int8_t buf[__REP_NEWFILE_SIZE];
384	size_t len;
385
386	ret = 0;
387	db_rep = env->rep_handle;
388	rep = db_rep->region;
389
390	/* COMPQUIET_LSN is what this is...  */
391	ZERO_LSN(lr_args.endlsn);
392
393	if (rec != NULL && rec->size != 0) {
394		if (rp->rep_version < DB_REPVERSION_47)
395			lr_args.endlsn = *(DB_LSN *)rec->data;
396		else if ((ret = __rep_logreq_unmarshal(env, &lr_args,
397		    rec->data, rec->size, NULL)) != 0)
398			return (ret);
399		RPRINT(env, DB_VERB_REP_MISC, (env,
400		    "[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]",
401		    (u_long) rp->lsn.file, (u_long)rp->lsn.offset,
402		    (u_long)lr_args.endlsn.file,
403		    (u_long)lr_args.endlsn.offset));
404	}
405	/*
406	 * There are several different cases here.
407	 * 1. We asked logc_get for a particular LSN and got it.
408	 * 2. We asked logc_get for an LSN and it's not found because it is
409	 *	beyond the end of a log file and we need a NEWFILE msg.
410	 *	and then the record that was requested.
411	 * 3. We asked logc_get for an LSN and it is already archived.
412	 * 4. We asked logc_get for an LSN and it simply doesn't exist, but
413	 *    doesn't meet any of those other criteria, in which case
414	 *    it's an error (that should never happen on a master).
415	 *
416	 * If we have a valid LSN and the request has a data_dbt with
417	 * it, the sender is asking for a chunk of log records.
418	 * Then we need to send all records up to the LSN in the data dbt.
419	 */
420	memset(&data_dbt, 0, sizeof(data_dbt));
421	oldfilelsn = lsn = rp->lsn;
422	if ((ret = __log_cursor(env, &logc)) != 0)
423		return (ret);
424	if ((ret = __logc_get(logc, &firstlsn, &data_dbt, DB_FIRST)) != 0)
425		goto err;
426	ret = __logc_get(logc, &lsn, &data_dbt, DB_SET);
427	if (ret == 0) {		/* Case 1 */
428		(void)__rep_send_message(env,
429		   eid, REP_LOG, &lsn, &data_dbt, REPCTL_RESEND, 0);
430		oldfilelsn.offset += logc->len;
431	} else if (ret == DB_NOTFOUND) {
432		if (LOG_COMPARE(&firstlsn, &rp->lsn) > 0) {
433			/* Case 3 */
434			(void)__rep_send_message(env, eid,
435			    REP_VERIFY_FAIL, &rp->lsn, NULL, 0, 0);
436			ret = 0;
437			goto err;
438		}
439		ret = __rep_chk_newfile(env, logc, rep, rp, eid);
440		if (ret == DB_NOTFOUND) {
441			/* Case 4 */
442			/*
443			 * If we're a master, this is a problem.
444			 * If we're a client servicing a request
445			 * just return the DB_NOTFOUND.
446			 */
447			if (F_ISSET(rep, REP_F_MASTER)) {
448				__db_errx(env,
449				    "Request for LSN [%lu][%lu] fails",
450				    (u_long)rp->lsn.file,
451				    (u_long)rp->lsn.offset);
452				ret = EINVAL;
453			} else
454				ret = DB_NOTFOUND;
455		}
456	}
457
458	if (ret != 0)
459		goto err;
460
461	/*
462	 * If the user requested a gap, send the whole thing, while observing
463	 * the limits from rep_set_limit.
464	 *
465	 * If we're doing bulk transfer, allocate a bulk buffer to put our
466	 * log records in.  We still need to initialize the throttle info
467	 * because if we encounter a log record larger than our entire bulk
468	 * buffer, we need to send it as a singleton.
469	 *
470	 * Use a local var so we don't need to worry if someone else turns
471	 * on/off bulk in the middle of our call.
472	 */
473	use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
474	if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid,
475	    &bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
476		goto err;
477	memset(&repth, 0, sizeof(repth));
478	REP_SYSTEM_LOCK(env);
479	repth.gbytes = rep->gbytes;
480	repth.bytes = rep->bytes;
481	repth.type = REP_LOG;
482	repth.data_dbt = &data_dbt;
483	REP_SYSTEM_UNLOCK(env);
484	while (ret == 0 && rec != NULL && rec->size != 0 &&
485	    repth.type == REP_LOG) {
486		if ((ret =
487		    __logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) != 0) {
488			/*
489			 * If we're a client and we only have part of the gap,
490			 * return DB_NOTFOUND so that we send a REREQUEST
491			 * back to the requester and it can ask for more.
492			 */
493			if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER))
494				ret = 0;
495			break;
496		}
497		if (LOG_COMPARE(&repth.lsn, &lr_args.endlsn) >= 0)
498			break;
499		if (repth.lsn.file != oldfilelsn.file) {
500			if ((ret = __logc_version(logc, &nf_args.version)) != 0)
501				break;
502			memset(&newfiledbt, 0, sizeof(newfiledbt));
503			if (rep->version < DB_REPVERSION_47)
504				DB_INIT_DBT(newfiledbt, &nf_args.version,
505				    sizeof(nf_args.version));
506			else {
507				if ((ret = __rep_newfile_marshal(env, &nf_args,
508				    buf, __REP_NEWFILE_SIZE, &len)) != 0)
509					goto err;
510				DB_INIT_DBT(newfiledbt, buf, len);
511			}
512			(void)__rep_send_message(env,
513			    eid, REP_NEWFILE, &oldfilelsn, &newfiledbt,
514			    REPCTL_RESEND, 0);
515		}
516		/*
517		 * If we are configured for bulk, try to send this as a bulk
518		 * request.  If not configured, or it is too big for bulk
519		 * then just send normally.
520		 */
521		if (use_bulk)
522			ret = __rep_bulk_message(env, &bulk, &repth,
523			    &repth.lsn, &data_dbt, REPCTL_RESEND);
524		if (!use_bulk || ret == DB_REP_BULKOVF)
525			ret = __rep_send_throttle(env, eid, &repth, 0, 0);
526		if (ret != 0) {
527			/* Ignore send failure, except to break the loop. */
528			if (ret == DB_REP_UNAVAIL)
529				ret = 0;
530			break;
531		}
532		/*
533		 * If we are about to change files, then we'll need the
534		 * last LSN in the previous file.  Save it here.
535		 */
536		oldfilelsn = repth.lsn;
537		oldfilelsn.offset += logc->len;
538	}
539
540	/*
541	 * We're done, force out whatever remains in the bulk buffer and
542	 * free it.
543	 */
544	if (use_bulk && (t_ret = __rep_bulk_free(env, &bulk,
545	    REPCTL_RESEND)) != 0 && ret == 0)
546		ret = t_ret;
547err:
548	if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
549		ret = t_ret;
550	return (ret);
551}
552
553/*
554 * __rep_loggap_req -
555 *	Request a log gap.  Assumes the caller holds the REP->mtx_clientdb.
556 *
557 * lsnp is the current LSN we're handling.  It is used to help decide
558 *	if we ask for a gap or singleton.
559 * gapflags are flags that may override the algorithm or control the
560 *	processing in some way.
561 *
562 * PUBLIC: int __rep_loggap_req __P((ENV *, REP *, DB_LSN *, u_int32_t));
563 */
564int
565__rep_loggap_req(env, rep, lsnp, gapflags)
566	ENV *env;
567	REP *rep;
568	DB_LSN *lsnp;
569	u_int32_t gapflags;
570{
571	DBT max_lsn_dbt, *max_lsn_dbtp;
572	DB_LOG *dblp;
573	DB_LSN next_lsn;
574	LOG *lp;
575	__rep_logreq_args lr_args;
576	size_t len;
577	u_int32_t ctlflags, flags, type;
578	int ret;
579	u_int8_t buf[__REP_LOGREQ_SIZE];
580
581	dblp = env->lg_handle;
582	lp = dblp->reginfo.primary;
583	if (FLD_ISSET(gapflags, REP_GAP_FORCE))
584		next_lsn = *lsnp;
585	else
586		next_lsn = lp->ready_lsn;
587	ctlflags = flags = 0;
588	type = REP_LOG_REQ;
589	ret = 0;
590
591	/*
592	 * Check if we need to ask for the gap.
593	 * We ask for the gap if:
594	 *	We are forced to with gapflags.
595	 *	If max_wait_lsn is ZERO_LSN - we've never asked for
596	 *	  records before.
597	 *	If we asked for a single record and received it.
598	 *
599	 * If we want a gap, but don't have an ending LSN (waiting_lsn)
600	 * send an ALL_REQ.  This is primarily used by REP_REREQUEST when
601	 * an ALL_REQ was not able to be fulfilled by another client.
602	 */
603	if (FLD_ISSET(gapflags, (REP_GAP_FORCE | REP_GAP_REREQUEST)) ||
604	    IS_ZERO_LSN(lp->max_wait_lsn) ||
605	    (lsnp != NULL && LOG_COMPARE(lsnp, &lp->max_wait_lsn) == 0)) {
606		lp->max_wait_lsn = lp->waiting_lsn;
607		/*
608		 * If we are forcing a gap, we need to send a max_wait_lsn
609		 * that may be beyond the current gap/waiting_lsn (but
610		 * it may not be).  If we cannot determine any future
611		 * waiting LSN, then it should be zero.  If we're in
612		 * internal init, it should be our ending LSN.
613		 */
614		if (FLD_ISSET(gapflags, REP_GAP_FORCE)) {
615			if (LOG_COMPARE(&lp->max_wait_lsn, lsnp) <= 0) {
616				if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
617					DB_ASSERT(env, LOG_COMPARE(lsnp,
618					    &rep->last_lsn) <= 0);
619					lp->max_wait_lsn = rep->last_lsn;
620				} else
621					ZERO_LSN(lp->max_wait_lsn);
622			}
623		}
624		if (IS_ZERO_LSN(lp->max_wait_lsn))
625			type = REP_ALL_REQ;
626		memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt));
627		lr_args.endlsn = lp->max_wait_lsn;
628		if (rep->version < DB_REPVERSION_47)
629			DB_INIT_DBT(max_lsn_dbt, &lp->max_wait_lsn,
630			    sizeof(DB_LSN));
631		else {
632			if ((ret = __rep_logreq_marshal(env, &lr_args, buf,
633			    __REP_LOGREQ_SIZE, &len)) != 0)
634				goto err;
635			DB_INIT_DBT(max_lsn_dbt, buf, len);
636		}
637		max_lsn_dbtp = &max_lsn_dbt;
638		/*
639		 * Gap requests are "new" and can go anywhere, unless
640		 * this is already a re-request.
641		 */
642		if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
643			flags = DB_REP_REREQUEST;
644		else
645			flags = DB_REP_ANYWHERE;
646	} else {
647		max_lsn_dbtp = NULL;
648		lp->max_wait_lsn = next_lsn;
649		/*
650		 * If we're dropping to singletons, this is a re-request.
651		 */
652		flags = DB_REP_REREQUEST;
653	}
654	if (rep->master_id != DB_EID_INVALID) {
655		STAT(rep->stat.st_log_requested++);
656		if (F_ISSET(rep, REP_F_RECOVER_LOG))
657			ctlflags = REPCTL_INIT;
658		(void)__rep_send_message(env, rep->master_id,
659		    type, &next_lsn, max_lsn_dbtp, ctlflags, flags);
660	} else
661		(void)__rep_send_message(env, DB_EID_BROADCAST,
662		    REP_MASTER_REQ, NULL, NULL, 0, 0);
663err:
664	return (ret);
665}
666
667/*
668 * __rep_logready -
669 *	Handle getting back REP_LOGREADY.  Any call to __rep_apply
670 * can return it.
671 *
672 * PUBLIC: int __rep_logready __P((ENV *, REP *, time_t, DB_LSN *));
673 */
674int
675__rep_logready(env, rep, savetime, last_lsnp)
676	ENV *env;
677	REP *rep;
678	time_t savetime;
679	DB_LSN *last_lsnp;
680{
681	int ret;
682
683	if ((ret = __log_flush(env, NULL)) != 0)
684		goto out;
685	if ((ret = __rep_verify_match(env, last_lsnp,
686	    savetime)) == 0) {
687		REP_SYSTEM_LOCK(env);
688		ZERO_LSN(rep->first_lsn);
689
690		if (rep->originfo != NULL) {
691			__os_free(env, rep->originfo);
692			rep->originfo = NULL;
693		}
694
695		F_CLR(rep, REP_F_RECOVER_LOG);
696		REP_SYSTEM_UNLOCK(env);
697	} else {
698out:		__db_errx(env,
699	"Client initialization failed.  Need to manually restore client");
700		return (__env_panic(env, ret));
701	}
702	return (ret);
703
704}
705
706/*
707 * __rep_chk_newfile --
708 *     Determine if getting DB_NOTFOUND is because we're at the
709 * end of a log file and need to send a NEWFILE message.
710 *
711 * This function handles these cases:
712 * [Case 1 was that we found the record we were looking for - it
713 * is already handled by the caller.]
714 * 2. We asked logc_get for an LSN and it's not found because it is
715 *	beyond the end of a log file and we need a NEWFILE msg.
716 * 3. We asked logc_get for an LSN and it simply doesn't exist, but
717 *    doesn't meet any of those other criteria, in which case
718 *    we return DB_NOTFOUND and the caller decides if it's an error.
719 *
720 * This function returns 0 if we had to send a message and the bad
721 * LSN is dealt with and DB_NOTFOUND if this really is an unknown LSN
722 * (on a client) and errors if it isn't found on the master.
723 */
724static int
725__rep_chk_newfile(env, logc, rep, rp, eid)
726	ENV *env;
727	DB_LOGC *logc;
728	REP *rep;
729	__rep_control_args *rp;
730	int eid;
731{
732	DBT data_dbt, newfiledbt;
733	DB_LOG *dblp;
734	DB_LSN endlsn;
735	LOG *lp;
736	__rep_newfile_args nf_args;
737	int ret;
738	u_int8_t buf[__REP_NEWFILE_SIZE];
739	size_t len;
740
741	ret = 0;
742	dblp = env->lg_handle;
743	lp = dblp->reginfo.primary;
744	memset(&data_dbt, 0, sizeof(data_dbt));
745	LOG_SYSTEM_LOCK(env);
746	endlsn = lp->lsn;
747	LOG_SYSTEM_UNLOCK(env);
748	if (endlsn.file > rp->lsn.file) {
749		/*
750		 * Case 2:
751		 * Need to find the LSN of the last record in
752		 * file lsn.file so that we can send it with
753		 * the NEWFILE call.  In order to do that, we
754		 * need to try to get {lsn.file + 1, 0} and
755		 * then backup.
756		 */
757		endlsn.file = rp->lsn.file + 1;
758		endlsn.offset = 0;
759		if ((ret = __logc_get(logc,
760		    &endlsn, &data_dbt, DB_SET)) != 0 ||
761		    (ret = __logc_get(logc,
762			&endlsn, &data_dbt, DB_PREV)) != 0) {
763			RPRINT(env, DB_VERB_REP_MISC, (env,
764			    "Unable to get prev of [%lu][%lu]",
765			    (u_long)rp->lsn.file,
766			    (u_long)rp->lsn.offset));
767			/*
768			 * We want to push the error back
769			 * to the client so that the client
770			 * does an internal backup.  The
771			 * client asked for a log record
772			 * we no longer have and it is
773			 * outdated.
774			 * XXX - This could be optimized by
775			 * having the master perform and
776			 * send a REP_UPDATE message.  We
777			 * currently want the client to set
778			 * up its 'update' state prior to
779			 * requesting REP_UPDATE_REQ.
780			 *
781			 * If we're a client servicing a request
782			 * just return DB_NOTFOUND.
783			 */
784			if (F_ISSET(rep, REP_F_MASTER)) {
785				ret = 0;
786				(void)__rep_send_message(env, eid,
787				    REP_VERIFY_FAIL, &rp->lsn,
788				    NULL, 0, 0);
789			} else
790				ret = DB_NOTFOUND;
791		} else {
792			endlsn.offset += logc->len;
793			if ((ret = __logc_version(logc,
794			    &nf_args.version)) == 0) {
795				memset(&newfiledbt, 0,
796				    sizeof(newfiledbt));
797				if (rep->version < DB_REPVERSION_47)
798					DB_INIT_DBT(newfiledbt,
799					    &nf_args.version,
800					    sizeof(nf_args.version));
801				else {
802					if ((ret = __rep_newfile_marshal(env,
803					    &nf_args, buf, __REP_NEWFILE_SIZE,
804					    &len)) != 0)
805						return (ret);
806					DB_INIT_DBT(newfiledbt, buf, len);
807				}
808				(void)__rep_send_message(env, eid,
809				    REP_NEWFILE, &endlsn,
810				    &newfiledbt, REPCTL_RESEND, 0);
811			}
812		}
813	} else
814		ret = DB_NOTFOUND;
815
816	return (ret);
817}
818