1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2004,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_backup.c,v 12.153 2008/05/05 17:47:02 sue Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/fop.h"
15#include "dbinc/lock.h"
16#include "dbinc/log.h"
17#include "dbinc/mp.h"
18#include "dbinc/qam.h"
19#include "dbinc/txn.h"
20
21static int __rep_check_uid __P((ENV *, u_int8_t *, u_int8_t *, u_int32_t,
22    u_int8_t *));
23static int __rep_filedone __P((ENV *, DB_THREAD_INFO *ip, int,
24     REP *, __rep_fileinfo_args *, u_int32_t));
25static int __rep_find_dbs __P((ENV *, u_int32_t, u_int8_t **, size_t *,
26    size_t *, u_int32_t *));
27static int __rep_get_fileinfo __P((ENV *, const char *,
28    const char *, __rep_fileinfo_args *, u_int8_t *, u_int32_t *));
29static int __rep_get_file_list __P((ENV *,
30    DB_FH *, u_int32_t, u_int32_t *, DBT *));
31static int __rep_log_setup __P((ENV *,
32    REP *, u_int32_t, u_int32_t, DB_LSN *));
33static int __rep_mpf_open __P((ENV *, DB_MPOOLFILE **,
34    __rep_fileinfo_args *, u_int32_t));
35static int __rep_nextfile __P((ENV *, int, REP *));
36static int __rep_page_gap __P((ENV *,
37     REP *, __rep_fileinfo_args *, u_int32_t));
38static int __rep_page_sendpages __P((ENV *, DB_THREAD_INFO *, int,
39    __rep_control_args *, __rep_fileinfo_args *, DB_MPOOLFILE *, DB *));
40static int __rep_queue_filedone __P((ENV *,
41    DB_THREAD_INFO *, REP *, __rep_fileinfo_args *));
42static int __rep_remove_all __P((ENV *, u_int32_t, DBT *));
43static int __rep_remove_file __P((ENV *, u_int8_t *, const char *,
44    u_int32_t, u_int32_t));
45static int __rep_remove_logs __P((ENV *));
46static int __rep_remove_by_list __P((ENV *, u_int32_t,
47    u_int8_t *, u_int32_t, u_int32_t));
48static int __rep_remove_by_prefix __P((ENV *, const char *, const char *,
49    size_t, APPNAME));
50static int __rep_walk_dir __P((ENV *, const char *, u_int32_t, u_int8_t **,
51    u_int8_t *, size_t *, size_t *, u_int32_t *));
52static int __rep_write_page __P((ENV *,
53    DB_THREAD_INFO *, REP *, __rep_fileinfo_args *));
54
55/*
56 * __rep_update_req -
57 *	Process an update_req and send the file information to the client.
58 *
59 * PUBLIC: int __rep_update_req __P((ENV *, __rep_control_args *, int));
60 */
61int
62__rep_update_req(env, rp, eid)
63	ENV *env;
64	__rep_control_args *rp;
65	int eid;
66{
67	DBT updbt, vdbt;
68	DB_LOG *dblp;
69	DB_LOGC *logc;
70	DB_LSN lsn;
71	__rep_update_args u_args;
72	size_t filelen, filesz, updlen;
73	u_int32_t filecnt, flag, version;
74	u_int8_t *buf, *fp;
75	int ret, t_ret;
76
77	/*
78	 * Allocate enough for all currently open files and then some.
79	 * Optimize for the common use of having most databases open.
80	 * Allocate dbentry_cnt * 2 plus an estimated 60 bytes per
81	 * file for the filename/path (or multiplied by 120).
82	 *
83	 * The data we send looks like this:
84	 *	__rep_update_args
85	 *	__rep_fileinfo_args
86	 *	__rep_fileinfo_args
87	 *	...
88	 */
89	dblp = env->lg_handle;
90	logc = NULL;
91	filecnt = 0;
92	filelen = 0;
93	updlen = 0;
94	filesz = MEGABYTE;
95	if ((ret = __os_calloc(env, 1, filesz, &buf)) != 0)
96		return (ret);
97
98	/*
99	 * First get our file information.  Get in-memory files first
100	 * then get on-disk files.
101	 */
102	fp = buf + __REP_UPDATE_SIZE;
103	if ((ret = __rep_find_dbs(env, rp->rep_version,
104	    &fp, &filesz, &filelen, &filecnt)) != 0)
105		goto err;
106
107	/*
108	 * Now get our first LSN.  We send the lsn of the first
109	 * non-archivable log file.
110	 */
111	flag = DB_SET;
112	if ((ret = __log_get_stable_lsn(env, &lsn)) != 0) {
113		if (ret != DB_NOTFOUND)
114			goto err;
115		/*
116		 * If ret is DB_NOTFOUND then there is no checkpoint
117		 * in this log, that is okay, just start at the beginning.
118		 */
119		ret = 0;
120		flag = DB_FIRST;
121	}
122
123	/*
124	 * Now get the version number of the log file of that LSN.
125	 */
126	if ((ret = __log_cursor(env, &logc)) != 0)
127		goto err;
128
129	memset(&vdbt, 0, sizeof(vdbt));
130	/*
131	 * Set our log cursor on the LSN we are sending.  Or
132	 * to the first LSN if we have no stable LSN.
133	 */
134	if ((ret = __logc_get(logc, &lsn, &vdbt, flag)) != 0) {
135		/*
136		 * We could be racing a fresh master starting up.  If we
137		 * have no log records, assume an initial LSN and current
138		 * log version.
139		 */
140		if (ret != DB_NOTFOUND)
141			goto err;
142		INIT_LSN(lsn);
143		version = DB_LOGVERSION;
144	} else {
145		if ((ret = __logc_version(logc, &version)) != 0)
146			goto err;
147	}
148	/*
149	 * Package up the update information.
150	 */
151	u_args.first_lsn = lsn;
152	u_args.first_vers = version;
153	u_args.num_files = filecnt;
154	if ((ret = __rep_update_marshal(env, rp->rep_version,
155	    &u_args, buf, filesz, &updlen)) != 0)
156		goto err;
157	/*
158	 * We have all the file information now.  Send it to the client.
159	 */
160	DB_INIT_DBT(updbt, buf, filelen + updlen);
161
162	LOG_SYSTEM_LOCK(env);
163	lsn = ((LOG *)dblp->reginfo.primary)->lsn;
164	LOG_SYSTEM_UNLOCK(env);
165	(void)__rep_send_message(
166	    env, eid, REP_UPDATE, &lsn, &updbt, 0, 0);
167
168err:	__os_free(env, buf);
169	if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0)
170		ret = t_ret;
171	return (ret);
172}
173
174/*
175 * __rep_find_dbs -
176 *	Walk through all the named files/databases including those in the
177 *	environment or data_dirs and those that in named and in-memory.  We
178 *	need to	open them, gather the necessary information and then close
179 *	them.
180 *
181 * !!!
182 * The pointer *fp is expected to point into a buffer that may be used for an
183 * UPDATE message, at an offset equal to the size of __rep_update_args.  This
184 * assumption is relied upon if the buffer is found to be too small and must be
185 * reallocated.
186 */
187static int
188__rep_find_dbs(env, version, fp, fileszp, filelenp, filecntp)
189	ENV *env;
190	u_int32_t version;
191	u_int8_t **fp;
192	size_t *fileszp, *filelenp;
193	u_int32_t *filecntp;
194{
195	DB_ENV *dbenv;
196	int ret;
197	char **ddir, *real_dir;
198	u_int8_t *origfp;
199
200	dbenv = env->dbenv;
201	ret = 0;
202	real_dir = NULL;
203
204	if (dbenv->db_data_dir == NULL) {
205		/*
206		 * If we don't have a data dir, we have just the
207		 * env home dir.
208		 */
209		ret = __rep_walk_dir(env, env->db_home, version, fp, NULL,
210		    fileszp, filelenp, filecntp);
211	} else {
212		origfp = *fp;
213		for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
214			if ((ret = __db_appname(env, DB_APP_NONE,
215			    *ddir, 0, NULL, &real_dir)) != 0)
216				break;
217			if ((ret = __rep_walk_dir(env, real_dir, version, fp,
218			    origfp, fileszp, filelenp, filecntp)) != 0)
219				break;
220			__os_free(env, real_dir);
221			real_dir = NULL;
222		}
223	}
224
225	/* Now, collect any in-memory named databases. */
226	if (ret == 0)
227		ret = __rep_walk_dir(env, NULL, version,
228		    fp, NULL, fileszp, filelenp, filecntp);
229
230	if (real_dir != NULL)
231		__os_free(env, real_dir);
232	return (ret);
233}
234
235/*
236 * __rep_walk_dir --
237 *
238 * This is the routine that walks a directory and fills in the structures
239 * that we use to generate messages to the client telling it what
240 * files are available.  If the directory name is NULL, then we should
241 * walk the list of in-memory named files.
242 */
243static int
244__rep_walk_dir(env, dir, version, fp, origfp, fileszp, filelenp, filecntp)
245	ENV *env;
246	const char *dir;
247	u_int32_t version;
248	u_int8_t **fp, *origfp;
249	size_t *fileszp, *filelenp;
250	u_int32_t *filecntp;
251{
252	__rep_fileinfo_args tmpfp;
253	size_t len, offset;
254	int cnt, first_file, i, ret;
255	u_int8_t *rfp, uid[DB_FILE_ID_LEN];
256	char *file, **names, *subdb;
257
258	if (dir == NULL) {
259		RPRINT(env, DB_VERB_REP_SYNC, (env,
260		    "Walk_dir: Getting info for in-memory named files"));
261		if ((ret = __memp_inmemlist(env, &names, &cnt)) != 0)
262			return (ret);
263	} else {
264		RPRINT(env, DB_VERB_REP_SYNC, (env,
265		    "Walk_dir: Getting info for dir: %s", dir));
266		if ((ret = __os_dirlist(env, dir, 0, &names, &cnt)) != 0)
267			return (ret);
268	}
269	rfp = NULL;
270	if (fp != NULL)
271		rfp = *fp;
272	RPRINT(env, DB_VERB_REP_SYNC, (env, "Walk_dir: Dir %s has %d files",
273	    (dir == NULL) ? "INMEM" : dir, cnt));
274	first_file = 1;
275	for (i = 0; i < cnt; i++) {
276		RPRINT(env, DB_VERB_REP_SYNC, (env,
277		    "Walk_dir: File %d name: %s", i, names[i]));
278		/*
279		 * Skip DB-owned files: __db*, DB_CONFIG, log*
280		 */
281		if (strncmp(names[i],
282		    DB_REGION_PREFIX, sizeof(DB_REGION_PREFIX) - 1) == 0)
283			continue;
284		if (strncmp(names[i], "DB_CONFIG", 9) == 0)
285			continue;
286		if (strncmp(names[i], "log.", 4) == 0)
287			continue;
288		/*
289		 * We found a file to process.  Check if we need
290		 * to allocate more space.
291		 */
292		if (dir == NULL) {
293			file = NULL;
294			subdb = names[i];
295		} else {
296			file = names[i];
297			subdb = NULL;
298		}
299		if ((ret = __rep_get_fileinfo(env,
300		    file, subdb, &tmpfp, uid, filecntp)) != 0) {
301			/*
302			 * If we find a file that isn't a database, skip it.
303			 */
304			RPRINT(env, DB_VERB_REP_SYNC, (env,
305			    "Walk_dir: File %d %s: returned error %s",
306			    i, names[i], db_strerror(ret)));
307			ret = 0;
308			continue;
309		}
310		RPRINT(env, DB_VERB_REP_SYNC, (env,
311    "Walk_dir: File %d (of %d) %s at 0x%lx: pgsize %lu, max_pgno %lu",
312		    tmpfp.filenum, *filecntp, names[i], P_TO_ULONG(rfp),
313		    (u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno));
314
315		/*
316		 * Check if we already have info on this file.  Since we're
317		 * walking directories, we only need to check the first
318		 * file to discover if we have a duplicate data_dir.
319		 */
320		if (first_file && origfp != NULL) {
321			/*
322			 * If we have any file info, check if we have this uid.
323			 */
324			if (rfp != origfp &&
325			    (ret = __rep_check_uid(env, origfp,
326			    origfp + *filelenp, version, uid)) != 0) {
327				/*
328				 * If we have this uid.  Adjust the file
329				 * count and stop processing this dir.
330				 */
331				if (ret == DB_KEYEXIST) {
332					ret = 0;
333					(*filecntp)--;
334				}
335				goto err;
336			}
337			first_file = 0;
338		}
339
340		DB_SET_DBT(tmpfp.info, names[i], strlen(names[i]) + 1);
341		DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN);
342retry:		ret = __rep_fileinfo_marshal(env, version,
343		    &tmpfp, rfp, *fileszp, &len);
344		if (ret == ENOMEM) {
345			offset = (size_t)(rfp - *fp);
346			*fileszp *= 2;
347			/*
348			 * Need to account for update info on both sides
349			 * of the allocation.
350			 */
351			*fp -= __REP_UPDATE_SIZE;
352			if ((ret = __os_realloc(env, *fileszp, *fp)) != 0)
353				break;
354			*fp += __REP_UPDATE_SIZE;
355			rfp = *fp + offset;
356			/*
357			 * Now that we've reallocated the space, try to
358			 * store it again.
359			 */
360			goto retry;
361		}
362		rfp += len;
363		*fp = rfp;
364		*filelenp += len;
365	}
366err:
367	__os_dirfree(env, names, cnt);
368	return (ret);
369}
370
371/*
372 * This function is called when we process the first file of any
373 * new directory for internal init.  We walk the list of current
374 * files to see if we have already processed these files.  This
375 * is to prevent transmitting the same file multiple times if the
376 * user calls env->set_data_dir on the same directory more than once.
377 */
378static int
379__rep_check_uid(env, origfp, endfp, version, uid)
380	ENV *env;
381	u_int32_t version;
382	u_int8_t *origfp, *endfp, *uid;
383{
384	__rep_fileinfo_args *rfp;
385	size_t filesz;
386	u_int8_t *fp, *fuid, *new_fp;
387	int ret;
388
389	ret = 0;
390	fp = origfp;
391	rfp = NULL;
392	/*
393	 * We don't know how many fp's there are, so compute the maximum
394	 * size based on the endfp and the first fp.
395	 */
396	filesz = (uintptr_t)endfp - (uintptr_t)origfp;
397	while (fp <= endfp) {
398		if ((ret = __rep_fileinfo_unmarshal(env, version,
399		    &rfp, fp, filesz, &new_fp)) != 0) {
400			__db_errx(env, "rep_check_uid: Could not malloc");
401			goto err;
402		}
403		filesz -= (u_int32_t)(new_fp - fp);
404		fp = new_fp;
405		fuid = (u_int8_t *)rfp->uid.data;
406		if (memcmp(fuid, uid, DB_FILE_ID_LEN) == 0) {
407			RPRINT(env, DB_VERB_REP_SYNC, (env,
408			    "Check_uid: Found matching file."));
409			ret = DB_KEYEXIST;
410			goto err;
411		}
412		__os_free(env, rfp);
413		rfp = NULL;
414	}
415err:
416	if (rfp != NULL)
417		__os_free(env, rfp);
418	return (ret);
419
420}
421
422static int
423__rep_get_fileinfo(env, file, subdb, rfp, uid, filecntp)
424	ENV *env;
425	const char *file, *subdb;
426	__rep_fileinfo_args *rfp;
427	u_int8_t *uid;
428	u_int32_t *filecntp;
429{
430	DB *dbp;
431	DBC *dbc;
432	DBMETA *dbmeta;
433	DB_LOCK lk;
434	DB_MPOOLFILE *mpf;
435	DB_THREAD_INFO *ip;
436	PAGE *pagep;
437	int lorder, ret, t_ret;
438
439	dbp = NULL;
440	dbc = NULL;
441	pagep = NULL;
442	mpf = NULL;
443	LOCK_INIT(lk);
444
445	ENV_GET_THREAD_INFO(env, ip);
446
447	if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
448		goto err;
449	if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN,
450	    DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0),
451	    0, PGNO_BASE_MD)) != 0)
452		goto err;
453
454	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
455		goto err;
456	if ((ret = __db_lget(
457	    dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0)
458		goto err;
459	if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn,
460	    0, &pagep)) != 0)
461		goto err;
462	/*
463	 * We have the meta page.  Set up our information.
464	 */
465	dbmeta = (DBMETA *)pagep;
466	rfp->pgno = 0;
467	/*
468	 * Queue is a special-case.  We need to set max_pgno to 0 so that
469	 * the client can compute the pages from the meta-data.
470	 */
471	if (dbp->type == DB_QUEUE)
472		rfp->max_pgno = 0;
473	else
474		rfp->max_pgno = dbmeta->last_pgno;
475	rfp->pgsize = dbp->pgsize;
476	memcpy(uid, dbp->fileid, DB_FILE_ID_LEN);
477	rfp->filenum = (*filecntp)++;
478	rfp->type = (u_int32_t)dbp->type;
479	rfp->db_flags = dbp->flags;
480	rfp->finfo_flags = 0;
481	/*
482	 * Send the lorder of this database.
483	 */
484	(void)__db_get_lorder(dbp, &lorder);
485	if (lorder == 1234)
486		FLD_SET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN);
487	else
488		FLD_CLR(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN);
489
490	ret = __memp_fput(dbp->mpf, ip, pagep, dbc->priority);
491	pagep = NULL;
492	if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
493		ret = t_ret;
494	if (ret != 0)
495		goto err;
496err:
497	if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
498		ret = t_ret;
499	if (pagep != NULL && (t_ret =
500	    __memp_fput(mpf, ip, pagep, dbc->priority)) != 0 && ret == 0)
501		ret = t_ret;
502	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
503		ret = t_ret;
504	if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
505		ret = t_ret;
506	return (ret);
507}
508
509/*
510 * __rep_page_req
511 *	Process a page_req and send the page information to the client.
512 *
513 * PUBLIC: int __rep_page_req __P((ENV *,
514 * PUBLIC:     DB_THREAD_INFO *, int, __rep_control_args *, DBT *));
515 */
516int
517__rep_page_req(env, ip, eid, rp, rec)
518	ENV *env;
519	DB_THREAD_INFO *ip;
520	int eid;
521	__rep_control_args *rp;
522	DBT *rec;
523{
524	__rep_fileinfo_args *msgfp;
525	DB_MPOOLFILE *mpf;
526	DB_REP *db_rep;
527	REP *rep;
528	int ret, t_ret;
529	u_int8_t *next;
530
531	db_rep = env->rep_handle;
532	rep = db_rep->region;
533
534	if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
535	    &msgfp, rec->data, rec->size, &next)) != 0)
536		return (ret);
537
538	RPRINT(env, DB_VERB_REP_SYNC,
539	    (env, "page_req: file %d page %lu to %lu",
540	    msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
541
542	/*
543	 * We need to open the file and then send its pages.
544	 * If we cannot open the file, we send REP_FILE_FAIL.
545	 */
546	RPRINT(env, DB_VERB_REP_SYNC,
547	    (env, "page_req: Open %d via mpf_open", msgfp->filenum));
548	if ((ret = __rep_mpf_open(env, &mpf, msgfp, 0)) != 0) {
549		RPRINT(env, DB_VERB_REP_SYNC,
550		    (env, "page_req: Open %d failed", msgfp->filenum));
551		if (F_ISSET(rep, REP_F_MASTER))
552			(void)__rep_send_message(env, eid, REP_FILE_FAIL,
553			    NULL, rec, 0, 0);
554		else
555			ret = DB_NOTFOUND;
556		goto err;
557	}
558
559	ret = __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, NULL);
560	t_ret = __memp_fclose(mpf, 0);
561	if (ret == 0 && t_ret != 0)
562		ret = t_ret;
563err:
564	__os_free(env, msgfp);
565	return (ret);
566}
567
568static int
569__rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp)
570	ENV *env;
571	DB_THREAD_INFO *ip;
572	int eid;
573	__rep_control_args *rp;
574	__rep_fileinfo_args *msgfp;
575	DB_MPOOLFILE *mpf;
576	DB *dbp;
577{
578	DB *qdbp;
579	DBC *qdbc;
580	DBT lockdbt, msgdbt;
581	DB_LOCK lock;
582	DB_LOCKER *locker;
583	DB_LOCK_ILOCK lock_obj;
584	DB_LOG *dblp;
585	DB_LSN lsn;
586	DB_REP *db_rep;
587	PAGE *pagep;
588	REP *rep;
589	REP_BULK bulk;
590	REP_THROTTLE repth;
591	db_pgno_t p;
592	uintptr_t bulkoff;
593	size_t len, msgsz;
594	u_int32_t bulkflags, use_bulk;
595	int opened, ret, t_ret;
596	u_int8_t *buf;
597
598	db_rep = env->rep_handle;
599	rep = db_rep->region;
600	locker = NULL;
601	opened = 0;
602	t_ret = 0;
603	qdbp = NULL;
604	qdbc = NULL;
605	buf = NULL;
606	bulk.addr = NULL;
607	use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
608	if (msgfp->type == (u_int32_t)DB_QUEUE) {
609		if (dbp == NULL) {
610			if ((ret = __db_create_internal(&qdbp, env, 0)) != 0)
611				goto err;
612			/*
613			 * We need to check whether this is in-memory so that
614			 * we pass the name correctly as either the file or
615			 * the database name.
616			 */
617			if ((ret = __db_open(qdbp, ip, NULL,
618			    FLD_ISSET(msgfp->db_flags, DB_AM_INMEM) ?
619			    NULL : msgfp->info.data,
620			    FLD_ISSET(msgfp->db_flags, DB_AM_INMEM) ?
621			    msgfp->info.data : NULL,
622			    DB_UNKNOWN,
623			DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0),
624			    0, PGNO_BASE_MD)) != 0)
625				goto err;
626			opened = 1;
627		} else
628			qdbp = dbp;
629		if ((ret = __db_cursor(qdbp, ip, NULL, &qdbc, 0)) != 0)
630			goto err;
631	}
632	msgsz = __REP_FILEINFO_SIZE + DB_FILE_ID_LEN + msgfp->pgsize;
633	if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0)
634		goto err;
635	memset(&msgdbt, 0, sizeof(msgdbt));
636	RPRINT(env, DB_VERB_REP_SYNC,
637	    (env, "sendpages: file %d page %lu to %lu",
638	    msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
639	memset(&repth, 0, sizeof(repth));
640	/*
641	 * If we're doing bulk transfer, allocate a bulk buffer to put our
642	 * pages in.  We still need to initialize the throttle info
643	 * because if we encounter a page larger than our entire bulk
644	 * buffer, we need to send it as a singleton.
645	 *
646	 * Use a local var so that we don't need to worry if someone else
647	 * turns on/off bulk in the middle of our call here.
648	 */
649	if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid,
650	    &bulkoff, &bulkflags, REP_BULK_PAGE)) != 0)
651		goto err;
652	REP_SYSTEM_LOCK(env);
653	repth.gbytes = rep->gbytes;
654	repth.bytes = rep->bytes;
655	repth.type = REP_PAGE;
656	repth.data_dbt = &msgdbt;
657	REP_SYSTEM_UNLOCK(env);
658
659	/*
660	 * Set up locking.
661	 */
662	LOCK_INIT(lock);
663	memset(&lock_obj, 0, sizeof(lock_obj));
664	if ((ret = __lock_id(env, NULL, &locker)) != 0)
665		goto err;
666	memcpy(lock_obj.fileid, mpf->fileid, DB_FILE_ID_LEN);
667	lock_obj.type = DB_PAGE_LOCK;
668
669	memset(&lockdbt, 0, sizeof(lockdbt));
670	lockdbt.data = &lock_obj;
671	lockdbt.size = sizeof(lock_obj);
672
673	for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) {
674		/*
675		 * We're not waiting for the lock, if we cannot get
676		 * the lock for this page, skip it.  The gap
677		 * code will rerequest it.
678		 */
679		lock_obj.pgno = p;
680		if ((ret = __lock_get(env, locker, DB_LOCK_NOWAIT, &lockdbt,
681		    DB_LOCK_READ, &lock)) != 0) {
682			/*
683			 * Continue if we couldn't get the lock.
684			 */
685			if (ret == DB_LOCK_NOTGRANTED) {
686				ret = 0;
687				continue;
688			}
689			/*
690			 * Otherwise we have an error.
691			 */
692			goto err;
693		}
694		if (msgfp->type == (u_int32_t)DB_QUEUE && p != 0)
695#ifdef HAVE_QUEUE
696			ret = __qam_fget(qdbc, &p, DB_MPOOL_CREATE, &pagep);
697#else
698			ret = DB_PAGE_NOTFOUND;
699#endif
700		else
701			ret = __memp_fget(mpf, &p, ip, NULL,
702			    DB_MPOOL_CREATE, &pagep);
703		msgfp->pgno = p;
704		if (ret == DB_PAGE_NOTFOUND) {
705			ZERO_LSN(lsn);
706			if (F_ISSET(rep, REP_F_MASTER)) {
707				ret = 0;
708				RPRINT(env, DB_VERB_REP_SYNC, (env,
709				    "sendpages: PAGE_FAIL on page %lu",
710				    (u_long)p));
711				(void)__rep_send_message(env, eid,
712				    REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
713			} else
714				ret = DB_NOTFOUND;
715			goto lockerr;
716		} else if (ret != 0)
717			goto lockerr;
718		else
719			DB_SET_DBT(msgfp->info, pagep, msgfp->pgsize);
720		len = 0;
721		/*
722		 * Send along an indication of the byte order of this mpool
723		 * page.  Since mpool always keeps pages in the native byte
724		 * order of the local environment, this is simply my
725		 * environment's byte order.
726		 *
727		 * Since pages can be served from a variety of sites when using
728		 * client-to-client synchronization, the receiving client needs
729		 * to know the byte order of each page independently.
730		 */
731		if (F_ISSET(env, ENV_LITTLEENDIAN))
732			FLD_SET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN);
733		else
734			FLD_CLR(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN);
735		RPRINT(env, DB_VERB_REP_SYNC, (env,
736		    "sendpages: %lu, page lsn [%lu][%lu]", (u_long)p,
737		    (u_long)pagep->lsn.file, (u_long)pagep->lsn.offset));
738		ret = __rep_fileinfo_marshal(env, rp->rep_version,
739		    msgfp, buf, msgsz, &len);
740		if (msgfp->type != (u_int32_t)DB_QUEUE || p == 0)
741			t_ret = __memp_fput(mpf,
742			    ip, pagep, DB_PRIORITY_UNCHANGED);
743#ifdef HAVE_QUEUE
744		else
745			/*
746			 * We don't need an #else for HAVE_QUEUE here because if
747			 * we're not compiled with queue, then we're guaranteed
748			 * to have set REP_PAGE_FAIL above.
749			 */
750			t_ret = __qam_fput(qdbc, p, pagep, qdbp->priority);
751#endif
752		if (t_ret != 0 && ret == 0)
753			ret = t_ret;
754		if ((t_ret = __ENV_LPUT(env, lock)) != 0 && ret == 0)
755			ret = t_ret;
756		if (ret != 0)
757			goto err;
758
759		DB_ASSERT(env, len <= msgsz);
760		DB_SET_DBT(msgdbt, buf, len);
761
762		dblp = env->lg_handle;
763		LOG_SYSTEM_LOCK(env);
764		repth.lsn = ((LOG *)dblp->reginfo.primary)->lsn;
765		LOG_SYSTEM_UNLOCK(env);
766		/*
767		 * If we are configured for bulk, try to send this as a bulk
768		 * request.  If not configured, or it is too big for bulk
769		 * then just send normally.
770		 */
771		if (use_bulk)
772			ret = __rep_bulk_message(env, &bulk, &repth,
773			    &repth.lsn, &msgdbt, 0);
774		if (!use_bulk || ret == DB_REP_BULKOVF)
775			ret = __rep_send_throttle(env, eid, &repth, 0, 0);
776		RPRINT(env, DB_VERB_REP_SYNC, (env,
777		    "sendpages: %lu, lsn [%lu][%lu]", (u_long)p,
778		    (u_long)repth.lsn.file, (u_long)repth.lsn.offset));
779		/*
780		 * If we have REP_PAGE_MORE we need to break this loop.
781		 * Otherwise, with REP_PAGE, we keep going.
782		 */
783		if (repth.type == REP_PAGE_MORE || ret != 0) {
784			/* Ignore send failure, except to break the loop. */
785			if (ret == DB_REP_UNAVAIL)
786				ret = 0;
787			break;
788		}
789	}
790
791	if (0) {
792lockerr:	if ((t_ret = __ENV_LPUT(env, lock)) != 0 && ret == 0)
793			ret = t_ret;
794	}
795err:
796	/*
797	 * We're done, force out whatever remains in the bulk buffer and
798	 * free it.
799	 */
800	if (use_bulk && bulk.addr != NULL &&
801	    (t_ret = __rep_bulk_free(env, &bulk, 0)) != 0 && ret == 0)
802		ret = t_ret;
803	if (qdbc != NULL && (t_ret = __dbc_close(qdbc)) != 0 && ret == 0)
804		ret = t_ret;
805	if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 &&
806	    ret == 0)
807		ret = t_ret;
808	if (buf != NULL)
809		__os_free(env, buf);
810	if (locker != NULL && (t_ret = __lock_id_free(env,
811	    locker)) != 0 && ret == 0)
812		ret = t_ret;
813	return (ret);
814}
815
816/*
817 * __rep_update_setup
818 *	Process and setup with this file information.
819 *
820 * PUBLIC: int __rep_update_setup __P((ENV *, int, __rep_control_args *,
821 * PUBLIC:     DBT *));
822 */
823int
824__rep_update_setup(env, eid, rp, rec)
825	ENV *env;
826	int eid;
827	__rep_control_args *rp;
828	DBT *rec;
829{
830	DB_LOG *dblp;
831	DB_REP *db_rep;
832	DB_THREAD_INFO *ip;
833	LOG *lp;
834	REGENV *renv;
835	REGINFO *infop;
836	REP *rep;
837	__rep_update_args *rup;
838	int ret;
839	u_int32_t count;
840	u_int8_t *next;
841
842	db_rep = env->rep_handle;
843	rep = db_rep->region;
844	dblp = env->lg_handle;
845	lp = dblp->reginfo.primary;
846	ret = 0;
847
848	REP_SYSTEM_LOCK(env);
849	if (!F_ISSET(rep, REP_F_RECOVER_UPDATE) || IN_ELECTION(rep)) {
850		REP_SYSTEM_UNLOCK(env);
851		return (0);
852	}
853	F_CLR(rep, REP_F_RECOVER_UPDATE);
854	/*
855	 * We know we're the first to come in here due to the
856	 * REP_F_RECOVER_UPDATE flag.
857	 */
858	F_SET(rep, REP_F_RECOVER_PAGE);
859	/*
860	 * We should not ever be in internal init with a lease granted.
861	 */
862	DB_ASSERT(env,
863	    !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
864
865	/*
866	 * We do not clear REP_F_READY_* in this code.
867	 * We'll eventually call the normal __rep_verify_match recovery
868	 * code and that will clear all the flags and allow others to
869	 * proceed.  We only need to lockout the API here.  We do not
870	 * need to lockout other message threads.
871	 */
872	if ((ret = __rep_lockout_api(env, rep)) != 0)
873		goto err;
874	/*
875	 * We need to update the timestamp and kill any open handles
876	 * on this client.  The files are changing completely.
877	 */
878	infop = env->reginfo;
879	renv = infop->primary;
880	(void)time(&renv->rep_timestamp);
881
882	REP_SYSTEM_UNLOCK(env);
883	MUTEX_LOCK(env, rep->mtx_clientdb);
884	__os_gettime(env, &lp->rcvd_ts, 1);
885	lp->wait_ts = rep->request_gap;
886	ZERO_LSN(lp->ready_lsn);
887	ZERO_LSN(lp->verify_lsn);
888	ZERO_LSN(lp->waiting_lsn);
889	ZERO_LSN(lp->max_wait_lsn);
890	ZERO_LSN(lp->max_perm_lsn);
891	if (db_rep->rep_db == NULL)
892		ret = __rep_client_dbinit(env, 0, REP_DB);
893	MUTEX_UNLOCK(env, rep->mtx_clientdb);
894	if (ret != 0)
895		goto err_nolock;
896	if ((ret = __rep_update_unmarshal(env, rp->rep_version,
897	    &rup, rec->data, rec->size, &next)) != 0)
898		goto err_nolock;
899
900	/*
901	 * We need to empty out any old log records that might be in the
902	 * temp database.
903	 */
904	ENV_GET_THREAD_INFO(env, ip);
905	if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &count)) != 0)
906		goto err_nolock;
907
908	/*
909	 * We will remove all logs we have so we need to request
910	 * from the master's beginning.
911	 */
912	REP_SYSTEM_LOCK(env);
913	rep->first_lsn = rup->first_lsn;
914	rep->first_vers = rup->first_vers;
915	rep->last_lsn = rp->lsn;
916	rep->nfiles = rup->num_files;
917
918	__os_free(env, rup);
919
920	RPRINT(env, DB_VERB_REP_SYNC,
921	    (env, "Update setup for %d files.", rep->nfiles));
922	RPRINT(env, DB_VERB_REP_SYNC,
923	    (env, "Update setup:  First LSN [%lu][%lu].",
924	    (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset));
925	RPRINT(env, DB_VERB_REP_SYNC,
926	    (env, "Update setup:  Last LSN [%lu][%lu]",
927	    (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
928
929	if (rep->nfiles > 0) {
930		rep->infoversion = rp->rep_version;
931		rep->originfolen = rep->infolen =
932		    rec->size - __REP_UPDATE_SIZE;
933		if ((ret = __os_calloc(env, 1, rep->infolen,
934		    &rep->originfo)) != 0)
935			goto err;
936		memcpy(rep->originfo, next, rep->infolen);
937		rep->nextinfo = rep->originfo;
938	}
939
940	/*
941	 * We need to remove all logs and databases the client has prior to
942	 * getting pages for current databases on the master.
943	 */
944	if ((ret = __rep_remove_all(env, rp->rep_version, rec)) != 0)
945		goto err;
946
947	rep->curfile = 0;
948	if ((ret = __rep_nextfile(env, eid, rep)) != 0)
949		goto err;
950
951	if (0) {
952err_nolock:	REP_SYSTEM_LOCK(env);
953	}
954
955err:	/*
956	 * If we get an error, we cannot leave ourselves in the RECOVER_PAGE
957	 * state because we have no file information.  That also means undo'ing
958	 * the rep_lockout.  We need to move back to the RECOVER_UPDATE stage.
959	 */
960	if (ret != 0) {
961		if (rep->originfo != NULL) {
962			__os_free(env, rep->originfo);
963			rep->originfo = NULL;
964		}
965		RPRINT(env, DB_VERB_REP_SYNC, (env,
966		    "Update_setup: Error: Clear PAGE, set UPDATE again. %s",
967		    db_strerror(ret)));
968		F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY_API |
969		    REP_F_READY_OP);
970		F_SET(rep, REP_F_RECOVER_UPDATE);
971	}
972	REP_SYSTEM_UNLOCK(env);
973	return (ret);
974}
975
976/*
977 * Removes all existing logs and databases, at the start of internal init.  But
978 * before we do, write a list of the databases onto the init file, so that in
979 * case we crash in the middle, we'll know how to resume when we restart.
980 * Finally, also write into the init file the UPDATE message from the master (in
981 * the "rec" DBT), which includes the (new) list of databases we intend to
982 * request copies of (again, so that we know what to do if we crash in the
983 * middle).
984 *
985 * For the sake of simplicity, these database lists are in the form of an UPDATE
986 * message (since we already have the mechanisms in place), even though strictly
987 * speaking that contains more information than we really need to store.
988 */
989static int
990__rep_remove_all(env, msg_version, rec)
991	ENV *env;
992	u_int32_t msg_version;
993	DBT *rec;
994{
995	__rep_fileinfo_args *finfo;
996	__rep_update_args u_args;
997	DB_FH *fhp;
998	size_t cnt, filelen, filesz, updlen;
999	u_int32_t bufsz, filecnt, fvers, mvers, zero;
1000	u_int8_t *buf, *fp, *new_fp, *origfp;
1001	int ret, t_ret;
1002	char *fname;
1003
1004	finfo = NULL;
1005	fname = NULL;
1006	fhp = NULL;
1007
1008	/*
1009	 * 1. Get list of databases currently present at this client, which we
1010	 *    intend to remove.
1011	 */
1012	filelen = 0;
1013	filecnt = 0;
1014	filesz = MEGABYTE;
1015	if ((ret = __os_calloc(env, 1, filesz, &buf)) != 0)
1016		return (ret);
1017	origfp = fp = buf + __REP_UPDATE_SIZE;
1018	filesz -= __REP_UPDATE_SIZE;
1019	if ((ret = __rep_find_dbs(env, DB_REPVERSION,
1020	    &fp, &filesz, &filelen, &filecnt)) != 0)
1021		goto out;
1022	ZERO_LSN(u_args.first_lsn);
1023	u_args.first_vers = 0;
1024	u_args.num_files = filecnt;
1025	if ((ret = __rep_update_marshal(env, DB_REPVERSION,
1026	    &u_args, buf, filesz, &updlen)) != 0)
1027		goto out;
1028
1029	/*
1030	 * 2. Before removing anything, safe-store the database list, so that in
1031	 *    case we crash before we've removed them all, when we restart we
1032	 *    can clean up what we were doing.
1033	 *
1034	 * The original version of the file contains:
1035	 * data1 size (4 bytes)
1036	 * data1
1037	 * data2 size (possibly) (4 bytes)
1038	 * data2 (possibly)
1039	 *
1040	 * As of 4.7 the file has the following form:
1041	 * 0 (4 bytes - to indicate a new style file)
1042	 * file version (4 bytes)
1043	 * data1 version (4 bytes)
1044	 * data1 size (4 bytes)
1045	 * data1
1046	 * data2 version (possibly) (4 bytes)
1047	 * data2 size (possibly) (4 bytes)
1048	 * data2 (possibly)
1049	 */
1050	if ((ret = __db_appname(
1051	    env, DB_APP_NONE, REP_INITNAME, 0, NULL, &fname)) != 0)
1052		goto out;
1053	/* Sanity check that the write size fits into 32 bits. */
1054	DB_ASSERT(env, updlen + filelen == (u_int32_t)(updlen + filelen));
1055	bufsz = (u_int32_t)(updlen + filelen);
1056
1057	/*
1058	 * (Short writes aren't possible, so we don't have to verify 'cnt'.)
1059	 * This first list is generated internally, so it is always in
1060	 * the form of the current message version.
1061	 */
1062	zero = 0;
1063	fvers = REP_INITVERSION;
1064	mvers = DB_REPVERSION;
1065	if ((ret = __os_open(env, fname, 0,
1066	    DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) != 0 ||
1067	    (ret = __os_write(env, fhp, &zero, sizeof(zero), &cnt)) != 0 ||
1068	    (ret = __os_write(env, fhp, &fvers, sizeof(fvers), &cnt)) != 0 ||
1069	    (ret = __os_write(env, fhp, &mvers, sizeof(mvers), &cnt)) != 0 ||
1070	    (ret = __os_write(env, fhp, &bufsz, sizeof(bufsz), &cnt)) != 0 ||
1071	    (ret = __os_write(env, fhp, buf, bufsz, &cnt)) != 0 ||
1072	    (ret = __os_fsync(env, fhp)) != 0) {
1073		__db_err(env, ret, "%s", fname);
1074		goto out;
1075	}
1076
1077	/*
1078	 * 3. Go ahead and remove logs and databases.  The databases get removed
1079	 *    according to the list we just finished safe-storing.
1080	 */
1081	if ((ret = __rep_remove_logs(env)) != 0)
1082		goto out;
1083	if ((ret = __rep_closefiles(env, 0)) != 0)
1084		goto out;
1085	fp = origfp;
1086	while (filecnt-- > 0) {
1087		if ((ret = __rep_fileinfo_unmarshal(env, DB_REPVERSION,
1088		    &finfo, fp, filesz, &new_fp)) != 0)
1089			goto out;
1090		if ((ret = __rep_remove_file(env, finfo->uid.data,
1091		    finfo->info.data, finfo->type, finfo->db_flags)) != 0)
1092			goto out;
1093		filesz -= (u_int32_t)(new_fp - fp);
1094		fp = new_fp;
1095		__os_free(env, finfo);
1096		finfo = NULL;
1097	}
1098
1099	/*
1100	 * 4. Safe-store the (new) list of database files we intend to copy from
1101	 *    the master (again, so that in case we crash before we're finished
1102	 *    doing so, we'll have enough information to clean up and start over
1103	 *    again).  This list is the list from the master, so it uses
1104	 *    the message version.
1105	 */
1106	mvers = msg_version;
1107	if ((ret = __os_write(env, fhp, &mvers, sizeof(mvers), &cnt)) != 0 ||
1108	    (ret = __os_write(env, fhp,
1109	    &rec->size, sizeof(rec->size), &cnt)) != 0 ||
1110	    (ret = __os_write(env, fhp, rec->data, rec->size, &cnt)) != 0 ||
1111	    (ret = __os_fsync(env, fhp)) != 0) {
1112		__db_err(env, ret, "%s", fname);
1113		goto out;
1114	}
1115
1116out:
1117	if (fhp != NULL && (t_ret = __os_closehandle(env, fhp)) && ret == 0)
1118		ret = t_ret;
1119	if (fname != NULL)
1120		__os_free(env, fname);
1121	if (finfo != NULL)
1122		__os_free(env, finfo);
1123	__os_free(env, buf);
1124	return (ret);
1125}
1126
1127/*
1128 * __rep_remove_logs -
1129 *	Remove our logs to prepare for internal init.
1130 */
1131static int
1132__rep_remove_logs(env)
1133	ENV *env;
1134{
1135	DB_LOG *dblp;
1136	DB_LSN lsn;
1137	LOG *lp;
1138	u_int32_t fnum, lastfile;
1139	int ret;
1140	char *name;
1141
1142	dblp = env->lg_handle;
1143	lp = dblp->reginfo.primary;
1144	ret = 0;
1145
1146	/*
1147	 * Call memp_sync to flush any pages that might be in the log buffers
1148	 * and not on disk before we remove files on disk.  If there were no
1149	 * dirty pages, the log isn't flushed.  Yet the log buffers could still
1150	 * be dirty: __log_flush should take care of this rare situation.
1151	 */
1152	if ((ret = __memp_sync_int(env,
1153	    NULL, 0, DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
1154		return (ret);
1155	if ((ret = __log_flush(env, NULL)) != 0)
1156		return (ret);
1157	/*
1158	 * Forcibly remove existing log files or reset
1159	 * the in-memory log space.
1160	 */
1161	if (lp->db_log_inmemory) {
1162		ZERO_LSN(lsn);
1163		if ((ret = __log_zero(env, &lsn)) != 0)
1164			return (ret);
1165	} else {
1166		lastfile = lp->lsn.file;
1167		for (fnum = 1; fnum <= lastfile; fnum++) {
1168			if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0)
1169				return (ret);
1170			(void)time(&lp->timestamp);
1171			(void)__os_unlink(env, name, 0);
1172			__os_free(env, name);
1173		}
1174	}
1175	return (0);
1176}
1177
1178/*
1179 * Removes a file during internal init.  Assumes underlying subsystems are
1180 * active; therefore, this can't be used for internal init crash recovery.
1181 */
1182static int
1183__rep_remove_file(env, uid, name, type, flags)
1184	ENV *env;
1185	u_int8_t *uid;
1186	const char *name;
1187	u_int32_t type, flags;
1188{
1189	/*
1190	 * Calling __fop_remove will both purge any matching
1191	 * fileid from mpool and unlink it on disk.
1192	 */
1193#ifdef HAVE_QUEUE
1194	DB *dbp;
1195	int ret;
1196
1197	/*
1198	 * Handle queue separately.  __fop_remove will not
1199	 * remove extent files.  Use __qam_remove to remove
1200	 * extent files that might exist under this name.  Note that
1201	 * in-memory queue databases can't have extent files.
1202	 */
1203	if (type == (u_int32_t)DB_QUEUE && !LF_ISSET(DB_AM_INMEM)) {
1204		if ((ret = __db_create_internal(&dbp, env, 0)) != 0)
1205			return (ret);
1206
1207		/*
1208		 * At present, qam_remove expects the passed-in dbp to have a
1209		 * locker allocated, and if not, db_open allocates a locker
1210		 * which qam_remove then leaks.
1211		 *
1212		 * TODO: it would be better to avoid cobbling together this
1213		 * sequence of low-level operations, if fileops provided some
1214		 * API to allow us to remove a database without write-locking
1215		 * its handle.
1216		 */
1217		if ((ret = __lock_id(env, NULL, &dbp->locker)) != 0)
1218			return (ret);
1219
1220		RPRINT(env, DB_VERB_REP_SYNC,
1221		    (env, "QAM: Unlink %s via __qam_remove", name));
1222		if ((ret = __qam_remove(dbp, NULL, name, NULL)) != 0) {
1223			RPRINT(env, DB_VERB_REP_SYNC,
1224			    (env, "qam_remove returned %d", ret));
1225			(void)__db_close(dbp, NULL, DB_NOSYNC);
1226			return (ret);
1227		}
1228		if ((ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0)
1229			return (ret);
1230	}
1231#else
1232	COMPQUIET(type, 0);
1233	COMPQUIET(flags, 0);
1234#endif
1235	/*
1236	 * We call fop_remove even if we've called qam_remove.
1237	 * That will only have removed extent files.  Now
1238	 * we need to deal with the actual file itself.
1239	 */
1240	return (__fop_remove(env, NULL, uid, name, DB_APP_DATA, 0));
1241}
1242
1243/*
1244 * __rep_bulk_page
1245 *	Process a bulk page message.
1246 *
1247 * PUBLIC: int __rep_bulk_page __P((ENV *,
1248 * PUBLIC:     DB_THREAD_INFO *, int, __rep_control_args *, DBT *));
1249 */
1250int
1251__rep_bulk_page(env, ip, eid, rp, rec)
1252	ENV *env;
1253	DB_THREAD_INFO *ip;
1254	int eid;
1255	__rep_control_args *rp;
1256	DBT *rec;
1257{
1258	__rep_control_args tmprp;
1259	__rep_bulk_args b_args;
1260	int ret;
1261	u_int8_t *p, *ep;
1262
1263	/*
1264	 * We're going to be modifying the rp LSN contents so make
1265	 * our own private copy to play with.  We need to set the
1266	 * rectype to REP_PAGE because we're calling through __rep_page
1267	 * to process each page, and lower functions make decisions
1268	 * based on the rectypes (for throttling/gap processing)
1269	 */
1270	memcpy(&tmprp, rp, sizeof(tmprp));
1271	tmprp.rectype = REP_PAGE;
1272	ret = 0;
1273	for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data;
1274	    p < ep;) {
1275		/*
1276		 * First thing in the buffer is the length.  Then the LSN
1277		 * of this page, then the page info itself.
1278		 */
1279		if ((ret = __rep_bulk_unmarshal(env,
1280		    &b_args, p, rec->size, &p)) != 0)
1281			return (ret);
1282		RPRINT(env, DB_VERB_REP_SYNC, (env,
1283		    "rep_bulk_page: Processing LSN [%lu][%lu]",
1284		    (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset));
1285		RPRINT(env, DB_VERB_REP_SYNC, (env,
1286    "rep_bulk_page: p %#lx ep %#lx pgrec data %#lx, size %lu (%#lx)",
1287		    P_TO_ULONG(p), P_TO_ULONG(ep),
1288		    P_TO_ULONG(b_args.bulkdata.data),
1289		    (u_long)b_args.bulkdata.size,
1290		    (u_long)b_args.bulkdata.size));
1291		/*
1292		 * Now send the page info DBT to the page processing function.
1293		 */
1294		ret = __rep_page(env, ip, eid, &tmprp, &b_args.bulkdata);
1295		RPRINT(env, DB_VERB_REP_SYNC, (env,
1296		    "rep_bulk_page: rep_page ret %d", ret));
1297
1298		/*
1299		 * If this set of pages is already done just return.
1300		 */
1301		if (ret != 0) {
1302			if (ret == DB_REP_PAGEDONE)
1303				ret = 0;
1304			break;
1305		}
1306	}
1307	return (ret);
1308}
1309
1310/*
1311 * __rep_page
1312 *	Process a page message.
1313 *
1314 * PUBLIC: int __rep_page __P((ENV *,
1315 * PUBLIC:     DB_THREAD_INFO *, int, __rep_control_args *, DBT *));
1316 */
1317int
1318__rep_page(env, ip, eid, rp, rec)
1319	ENV *env;
1320	DB_THREAD_INFO *ip;
1321	int eid;
1322	__rep_control_args *rp;
1323	DBT *rec;
1324{
1325
1326	DB_REP *db_rep;
1327	DBT key, data;
1328	REP *rep;
1329	__rep_fileinfo_args *msgfp;
1330	db_recno_t recno;
1331	int ret;
1332
1333	ret = 0;
1334	db_rep = env->rep_handle;
1335	rep = db_rep->region;
1336
1337	if (!F_ISSET(rep, REP_F_RECOVER_PAGE))
1338		return (DB_REP_PAGEDONE);
1339	/*
1340	 * If we restarted internal init, it is possible to receive
1341	 * an old REP_PAGE message, while we're in the current
1342	 * stage of recovering pages.  Until we have some sort of
1343	 * an init generation number, ignore any message that has
1344	 * a message LSN that is before this internal init's first_lsn.
1345	 */
1346	if (LOG_COMPARE(&rp->lsn, &rep->first_lsn) < 0) {
1347		RPRINT(env, DB_VERB_REP_SYNC, (env,
1348		    "PAGE: Old page: msg LSN [%lu][%lu] first_lsn [%lu][%lu]",
1349		    (u_long)rp->lsn.file, (u_long)rp->lsn.offset,
1350		    (u_long)rep->first_lsn.file,
1351		    (u_long)rep->first_lsn.offset));
1352		return (DB_REP_PAGEDONE);
1353	}
1354	if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
1355	    &msgfp, rec->data, rec->size, NULL)) != 0)
1356		return (ret);
1357	MUTEX_LOCK(env, rep->mtx_clientdb);
1358	REP_SYSTEM_LOCK(env);
1359	/*
1360	 * We should not ever be in internal init with a lease granted.
1361	 */
1362	DB_ASSERT(env,
1363	    !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
1364
1365	RPRINT(env, DB_VERB_REP_SYNC, (env,
1366	    "PAGE: Received page %lu from file %d",
1367	    (u_long)msgfp->pgno, msgfp->filenum));
1368	/*
1369	 * Check if this page is from the file we're expecting.
1370	 * This may be an old or delayed page message.
1371	 */
1372	/*
1373	 * !!!
1374	 * If we allow dbrename/dbremove on the master while a client
1375	 * is updating, then we'd have to verify the file's uid here too.
1376	 */
1377	if (msgfp->filenum != rep->curfile) {
1378		RPRINT(env, DB_VERB_REP_SYNC,
1379		    (env, "Msg file %d != curfile %d",
1380		    msgfp->filenum, rep->curfile));
1381		ret = DB_REP_PAGEDONE;
1382		goto err;
1383	}
1384	/*
1385	 * We want to create/open our dbp to the database
1386	 * where we'll keep our page information.
1387	 */
1388	if ((ret = __rep_client_dbinit(env, 1, REP_PG)) != 0) {
1389		RPRINT(env, DB_VERB_REP_SYNC, (env,
1390		    "PAGE: Client_dbinit %s", db_strerror(ret)));
1391		goto err;
1392	}
1393
1394	memset(&key, 0, sizeof(key));
1395	memset(&data, 0, sizeof(data));
1396	recno = (db_recno_t)(msgfp->pgno + 1);
1397	key.data = &recno;
1398	key.ulen = key.size = sizeof(db_recno_t);
1399	key.flags = DB_DBT_USERMEM;
1400
1401	/*
1402	 * If we already have this page, then we don't want to bother
1403	 * rewriting it into the file.  Otherwise, any other error
1404	 * we want to return.
1405	 */
1406	ret = __db_put(rep->file_dbp, ip, NULL, &key, &data, DB_NOOVERWRITE);
1407	if (ret == DB_KEYEXIST) {
1408		RPRINT(env, DB_VERB_REP_SYNC, (env,
1409		    "PAGE: Received duplicate page %lu from file %d",
1410		    (u_long)msgfp->pgno, msgfp->filenum));
1411		STAT(rep->stat.st_pg_duplicated++);
1412		ret = 0;
1413		goto err;
1414	}
1415	if (ret != 0)
1416		goto err;
1417
1418	RPRINT(env, DB_VERB_REP_SYNC, (env,
1419	    "PAGE: Write page %lu into mpool", (u_long)msgfp->pgno));
1420	/*
1421	 * We put the page in the database file itself.
1422	 */
1423	ret = __rep_write_page(env, ip, rep, msgfp);
1424	if (ret != 0) {
1425		/*
1426		 * We got an error storing the page, therefore, we need
1427		 * remove this page marker from the page database too.
1428		 * !!!
1429		 * I'm ignoring errors from the delete because we want to
1430		 * return the original error.  If we cannot write the page
1431		 * and we cannot delete the item we just put, what should
1432		 * we do?  Panic the env and return DB_RUNRECOVERY?
1433		 */
1434		(void)__db_del(rep->file_dbp, NULL, NULL, &key, 0);
1435		goto err;
1436	}
1437	STAT(rep->stat.st_pg_records++);
1438	rep->npages++;
1439
1440	/*
1441	 * Now check the LSN on the page and save it if it is later
1442	 * than the one we have.
1443	 */
1444	if (LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0)
1445		rep->last_lsn = rp->lsn;
1446
1447	/*
1448	 * We've successfully written the page.  Now we need to see if
1449	 * we're done with this file.  __rep_filedone will check if we
1450	 * have all the pages expected and if so, set up for the next
1451	 * file and send out a page request for the next file's pages.
1452	 */
1453	ret = __rep_filedone(env, ip, eid, rep, msgfp, rp->rectype);
1454
1455err:	REP_SYSTEM_UNLOCK(env);
1456	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1457
1458	__os_free(env, msgfp);
1459	return (ret);
1460}
1461
1462/*
1463 * __rep_page_fail
1464 *	Process a page fail message.
1465 *
1466 * PUBLIC: int __rep_page_fail __P((ENV *,
1467 * PUBLIC:     DB_THREAD_INFO *, int, __rep_control_args *, DBT *));
1468 */
1469int
1470__rep_page_fail(env, ip, eid, rp, rec)
1471	ENV *env;
1472	DB_THREAD_INFO *ip;
1473	int eid;
1474	__rep_control_args *rp;
1475	DBT *rec;
1476{
1477
1478	DB_REP *db_rep;
1479	REP *rep;
1480	__rep_fileinfo_args *msgfp, *rfp;
1481	int ret;
1482
1483	ret = 0;
1484	db_rep = env->rep_handle;
1485	rep = db_rep->region;
1486
1487	if (!F_ISSET(rep, REP_F_RECOVER_PAGE))
1488		return (0);
1489	if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version,
1490	    &msgfp, rec->data, rec->size, NULL)) != 0)
1491		return (ret);
1492	/*
1493	 * Check if this page is from the file we're expecting.
1494	 * This may be an old or delayed page message.
1495	 */
1496	/*
1497	 * !!!
1498	 * If we allow dbrename/dbremove on the master while a client
1499	 * is updating, then we'd have to verify the file's uid here too.
1500	 */
1501	MUTEX_LOCK(env, rep->mtx_clientdb);
1502	REP_SYSTEM_LOCK(env);
1503	/*
1504	 * We should not ever be in internal init with a lease granted.
1505	 */
1506	DB_ASSERT(env,
1507	    !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
1508
1509	if (msgfp->filenum != rep->curfile) {
1510		RPRINT(env, DB_VERB_REP_SYNC,
1511		    (env, "Msg file %d != curfile %d",
1512		    msgfp->filenum, rep->curfile));
1513		goto out;
1514	}
1515	rfp = rep->curinfo;
1516	if (rfp->type != (u_int32_t)DB_QUEUE)
1517		--rfp->max_pgno;
1518	else {
1519		/*
1520		 * Queue is special.  Pages at the beginning of the queue
1521		 * may disappear, as well as at the end.  Use msgfp->pgno
1522		 * to adjust accordingly.
1523		 */
1524		RPRINT(env, DB_VERB_REP_SYNC, (env,
1525	    "page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d",
1526		    (u_long)msgfp->pgno, (u_long)rep->ready_pg,
1527		    (u_long)rfp->max_pgno, rep->npages));
1528		if (msgfp->pgno == rfp->max_pgno)
1529			--rfp->max_pgno;
1530		if (msgfp->pgno >= rep->ready_pg) {
1531			rep->ready_pg = msgfp->pgno + 1;
1532			rep->npages = rep->ready_pg;
1533		}
1534		RPRINT(env, DB_VERB_REP_SYNC, (env,
1535	    "page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d",
1536		    (u_long)msgfp->pgno, (u_long)rep->ready_pg,
1537		    (u_long)rfp->max_pgno, rep->npages));
1538	}
1539
1540	/*
1541	 * We've lowered the number of pages expected.  It is possible that
1542	 * this was the last page we were expecting.  Now we need to see if
1543	 * we're done with this file.  __rep_filedone will check if we have
1544	 * all the pages expected and if so, set up for the next file and
1545	 * send out a page request for the next file's pages.
1546	 */
1547	ret = __rep_filedone(env, ip, eid, rep, msgfp, REP_PAGE_FAIL);
1548out:
1549	REP_SYSTEM_UNLOCK(env);
1550	MUTEX_UNLOCK(env, rep->mtx_clientdb);
1551	__os_free(env, msgfp);
1552	return (ret);
1553}
1554
1555/*
1556 * __rep_write_page -
1557 *	Write this page into a database.
1558 */
1559static int
1560__rep_write_page(env, ip, rep, msgfp)
1561	ENV *env;
1562	DB_THREAD_INFO *ip;
1563	REP *rep;
1564	__rep_fileinfo_args *msgfp;
1565{
1566	DB db;
1567	DBT pgcookie;
1568	DB_MPOOLFILE *mpf;
1569	DB_PGINFO *pginfo;
1570	__rep_fileinfo_args *rfp;
1571	int ret;
1572	void *dst;
1573
1574	rfp = NULL;
1575
1576	/*
1577	 * If this is the first page we're putting in this database, we need
1578	 * to create the mpool file.  Otherwise call memp_fget to create the
1579	 * page in mpool.  Then copy the data to the page, and memp_fput the
1580	 * page to give it back to mpool.
1581	 *
1582	 * We need to create the file, removing any existing file and associate
1583	 * the correct file ID with the new one.
1584	 */
1585	rfp = rep->curinfo;
1586	if (rep->file_mpf == NULL) {
1587		if (!FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) {
1588			/*
1589			 * Recreate the file on disk.  We'll be putting
1590			 * the data into the file via mpool.
1591			 */
1592			RPRINT(env, DB_VERB_REP_SYNC, (env,
1593			    "rep_write_page: Calling fop_create for %s",
1594			    (char *)rfp->info.data));
1595			if ((ret = __fop_create(env, NULL, NULL,
1596			    rfp->info.data, DB_APP_DATA,
1597			    env->db_mode, 0)) != 0)
1598				goto err;
1599		}
1600
1601		if ((ret =
1602		    __rep_mpf_open(env, &rep->file_mpf, rep->curinfo,
1603		    FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_CREATE : 0)) != 0)
1604			goto err;
1605	}
1606	/*
1607	 * Handle queue specially.  If we're a QUEUE database, we need to
1608	 * use the __qam_fget/put calls.  We need to use rep->queue_dbc for
1609	 * that.  That dbp is opened after getting the metapage for the
1610	 * queue database.  Since the meta-page is always in the queue file,
1611	 * we'll use the normal path for that first page.  After that we
1612	 * can assume the dbp is opened.
1613	 */
1614	if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) {
1615#ifdef HAVE_QUEUE
1616		ret = __qam_fget(rep->queue_dbc, &msgfp->pgno,
1617		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst);
1618#else
1619		/*
1620		 * This always returns an error.
1621		 */
1622		ret = __db_no_queue_am(env);
1623#endif
1624	} else
1625		ret = __memp_fget(rep->file_mpf, &msgfp->pgno, ip, NULL,
1626		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst);
1627
1628	if (ret != 0)
1629		goto err;
1630
1631	/*
1632	 * Before writing this page into our local mpool, see if its byte order
1633	 * needs to be swapped.  When in mpool the page should be in the native
1634	 * byte order of our local environment.  But the page image we've
1635	 * received may be in the opposite order (as indicated in finfo_flags).
1636	 */
1637	if ((F_ISSET(env, ENV_LITTLEENDIAN) &&
1638	    !FLD_ISSET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN)) ||
1639	    (!F_ISSET(env, ENV_LITTLEENDIAN) &&
1640	    FLD_ISSET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN))) {
1641		RPRINT(env, DB_VERB_REP_SYNC, (env,
1642		    "write_page: Page %d needs to be swapped", msgfp->pgno));
1643		/*
1644		 * Set up a dbp to pass into the swap functions.  We need
1645		 * only a few things:  The environment and any special
1646		 * dbp flags and some obvious basics like db type and
1647		 * pagesize.  Those flags were set back in rep_mpf_open
1648		 * and are available in the pgcookie set up with the
1649		 * mpoolfile associated with this database.
1650		 */
1651		memset(&db, 0, sizeof(db));
1652		db.env = env;
1653		db.type = (DBTYPE)msgfp->type;
1654		db.pgsize = msgfp->pgsize;
1655		mpf = rep->file_mpf;
1656		if ((ret = __memp_get_pgcookie(mpf, &pgcookie)) != 0)
1657			goto err;
1658		pginfo = (DB_PGINFO *)pgcookie.data;
1659		db.flags = pginfo->flags;
1660		if ((ret = __db_pageswap(&db, msgfp->info.data, msgfp->pgsize,
1661		    NULL, 1)) != 0)
1662			goto err;
1663	}
1664
1665	memcpy(dst, msgfp->info.data, msgfp->pgsize);
1666#ifdef HAVE_QUEUE
1667	if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0)
1668		ret = __qam_fput(rep->queue_dbc,
1669		     msgfp->pgno, dst, rep->queue_dbc->priority);
1670	else
1671#endif
1672		ret = __memp_fput(rep->file_mpf,
1673		    ip, dst, rep->file_dbp->priority);
1674
1675err:	return (ret);
1676}
1677
1678/*
1679 * __rep_page_gap -
1680 *	After we've put the page into the database, we need to check if
1681 *	we have a page gap and whether we need to request pages.
1682 */
1683static int
1684__rep_page_gap(env, rep, msgfp, type)
1685	ENV *env;
1686	REP *rep;
1687	__rep_fileinfo_args *msgfp;
1688	u_int32_t type;
1689{
1690	DBC *dbc;
1691	DBT data, key;
1692	DB_LOG *dblp;
1693	DB_THREAD_INFO *ip;
1694	LOG *lp;
1695	__rep_fileinfo_args *rfp;
1696	db_recno_t recno;
1697	int ret, t_ret;
1698
1699	dblp = env->lg_handle;
1700	lp = dblp->reginfo.primary;
1701	ret = 0;
1702	dbc = NULL;
1703
1704	/*
1705	 * We've successfully put this page into our file.
1706	 * Now we need to account for it and re-request new pages
1707	 * if necessary.
1708	 */
1709	/*
1710	 * We already hold both the db mutex and rep mutex.
1711	 */
1712	rfp = rep->curinfo;
1713
1714	/*
1715	 * Make sure we're still talking about the same file.
1716	 * If not, we're done here.
1717	 */
1718	if (rfp->filenum != msgfp->filenum) {
1719		ret = DB_REP_PAGEDONE;
1720		goto err;
1721	}
1722
1723	/*
1724	 * We have 3 possible states:
1725	 * 1.  We receive a page we already have accounted for.
1726	 *	msg pgno < ready pgno
1727	 * 2.  We receive a page that is beyond a gap.
1728	 *	msg pgno > ready pgno
1729	 * 3.  We receive the page we're expecting next.
1730	 *	msg pgno == ready pgno
1731	 */
1732	/*
1733	 * State 1.  This can happen once we put our page record into the
1734	 * database, but by the time we acquire the mutex other
1735	 * threads have already accounted for this page and moved on.
1736	 * We just want to return.
1737	 */
1738	if (msgfp->pgno < rep->ready_pg) {
1739		RPRINT(env, DB_VERB_REP_SYNC, (env,
1740		    "PAGE_GAP: pgno %lu < ready %lu, waiting %lu",
1741		    (u_long)msgfp->pgno, (u_long)rep->ready_pg,
1742		    (u_long)rep->waiting_pg));
1743		goto err;
1744	}
1745
1746	/*
1747	 * State 2.  This page is beyond the page we're expecting.
1748	 * We need to update waiting_pg if this page is less than
1749	 * (earlier) the current waiting_pg.  There is nothing
1750	 * to do but see if we need to request.
1751	 */
1752	RPRINT(env, DB_VERB_REP_SYNC, (env,
1753    "PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu",
1754	    (u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg,
1755	    (u_long)rep->waiting_pg, (u_long)rep->max_wait_pg));
1756	if (msgfp->pgno > rep->ready_pg) {
1757		if (rep->waiting_pg == PGNO_INVALID ||
1758		    msgfp->pgno < rep->waiting_pg)
1759			rep->waiting_pg = msgfp->pgno;
1760	} else {
1761		/*
1762		 * We received the page we're expecting.
1763		 */
1764		rep->ready_pg++;
1765		__os_gettime(env, &lp->rcvd_ts, 1);
1766		if (rep->ready_pg == rep->waiting_pg) {
1767			/*
1768			 * If we get here we know we just filled a gap.
1769			 * Move the cursor to that place and then walk
1770			 * forward looking for the next gap, if it exists.
1771			 */
1772			lp->wait_ts = rep->request_gap;
1773			rep->max_wait_pg = PGNO_INVALID;
1774			/*
1775			 * We need to walk the recno database looking for the
1776			 * next page we need or expect.
1777			 */
1778			memset(&key, 0, sizeof(key));
1779			memset(&data, 0, sizeof(data));
1780			ENV_GET_THREAD_INFO(env, ip);
1781			if ((ret = __db_cursor(rep->file_dbp, ip, NULL,
1782			    &dbc, 0)) != 0)
1783				goto err;
1784			/*
1785			 * Set cursor to the first waiting page.
1786			 * Page numbers/record numbers are offset by 1.
1787			 */
1788			recno = (db_recno_t)rep->waiting_pg + 1;
1789			key.data = &recno;
1790			key.ulen = key.size = sizeof(db_recno_t);
1791			key.flags = DB_DBT_USERMEM;
1792			/*
1793			 * We know that page is there, this should
1794			 * find the record.
1795			 */
1796			ret = __dbc_get(dbc, &key, &data, DB_SET);
1797			if (ret != 0)
1798				goto err;
1799			RPRINT(env, DB_VERB_REP_SYNC, (env,
1800			    "PAGE_GAP: Set cursor for ready %lu, waiting %lu",
1801			    (u_long)rep->ready_pg, (u_long)rep->waiting_pg));
1802		}
1803		while (ret == 0 && rep->ready_pg == rep->waiting_pg) {
1804			rep->ready_pg++;
1805			ret = __dbc_get(dbc, &key, &data, DB_NEXT);
1806			/*
1807			 * If we get to the end of the list, there are no
1808			 * more gaps.  Reset waiting_pg.
1809			 */
1810			if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY) {
1811				rep->waiting_pg = PGNO_INVALID;
1812				RPRINT(env, DB_VERB_REP_SYNC, (env,
1813	    "PAGE_GAP: Next cursor No next - ready %lu, waiting %lu",
1814				    (u_long)rep->ready_pg,
1815				    (u_long)rep->waiting_pg));
1816				break;
1817			}
1818			/*
1819			 * Subtract 1 from waiting_pg because record numbers
1820			 * are 1-based and pages are 0-based and we added 1
1821			 * into the page number when we put it into the db.
1822			 */
1823			rep->waiting_pg = *(db_pgno_t *)key.data;
1824			rep->waiting_pg--;
1825			RPRINT(env, DB_VERB_REP_SYNC, (env,
1826	    "PAGE_GAP: Next cursor ready %lu, waiting %lu",
1827			    (u_long)rep->ready_pg, (u_long)rep->waiting_pg));
1828		}
1829	}
1830
1831	/*
1832	 * If we filled a gap and now have the entire file, there's
1833	 * nothing to do.  We're done when ready_pg is > max_pgno
1834	 * because ready_pg is larger than the last page we received.
1835	 */
1836	if (rep->ready_pg > rfp->max_pgno)
1837		goto err;
1838
1839	/*
1840	 * Check if we need to ask for more pages.
1841	 */
1842	if ((rep->waiting_pg != PGNO_INVALID &&
1843	    rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) {
1844		/*
1845		 * We got a page but we may still be waiting for more.
1846		 * If we got REP_PAGE_MORE we always want to ask for more.
1847		 * We need to set rfp->pgno to the current page number
1848		 * we will use to ask for more pages.
1849		 */
1850		if (type == REP_PAGE_MORE)
1851			rfp->pgno = msgfp->pgno;
1852		if ((__rep_check_doreq(env, rep) || type == REP_PAGE_MORE) &&
1853		    ((ret = __rep_pggap_req(env, rep, rfp,
1854		    (type == REP_PAGE_MORE) ? REP_GAP_FORCE : 0)) != 0))
1855			goto err;
1856	} else {
1857		lp->wait_ts = rep->request_gap;
1858		rep->max_wait_pg = PGNO_INVALID;
1859	}
1860
1861err:
1862	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1863		ret = t_ret;
1864
1865	return (ret);
1866}
1867
1868/*
1869 * __rep_init_cleanup -
1870 *	Clean up internal initialization pieces.
1871 *
1872 * !!!
1873 * Caller must hold client database mutex (mtx_clientdb) and REP_SYSTEM_LOCK.
1874 *
1875 * PUBLIC: int __rep_init_cleanup __P((ENV *, REP *, int));
1876 */
1877int
1878__rep_init_cleanup(env, rep, force)
1879	ENV *env;
1880	REP *rep;
1881	int force;
1882{
1883	DB *queue_dbp;
1884	DB_LOG *dblp;
1885	LOG *lp;
1886	int cleanup_failure, ret, t_ret;
1887
1888	ret = 0;
1889	/*
1890	 * 1.  Close up the file data pointer we used.
1891	 * 2.  Close/reset the page database.
1892	 * 3.  Close/reset the queue database if we're forcing a cleanup.
1893	 * 4.  Free current file info.
1894	 * 5.  If we have all files or need to force, free original file info.
1895	 */
1896	if (rep->file_mpf != NULL) {
1897		ret = __memp_fclose(rep->file_mpf, 0);
1898		rep->file_mpf = NULL;
1899	}
1900	if (rep->file_dbp != NULL) {
1901		t_ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC);
1902		rep->file_dbp = NULL;
1903		if (t_ret != 0 && ret == 0)
1904			ret = t_ret;
1905	}
1906	if (force && rep->queue_dbc != NULL) {
1907		queue_dbp = rep->queue_dbc->dbp;
1908		if ((t_ret = __dbc_close(rep->queue_dbc)) != 0 && ret == 0)
1909			ret = t_ret;
1910		rep->queue_dbc = NULL;
1911		if ((t_ret = __db_close(queue_dbp, NULL, DB_NOSYNC)) != 0 &&
1912		    ret == 0)
1913			ret = t_ret;
1914	}
1915	if (rep->curinfo != NULL) {
1916		__os_free(env, rep->curinfo);
1917		rep->curinfo = NULL;
1918	}
1919	if (F_ISSET(rep, REP_F_INTERNAL_INIT_MASK) && force) {
1920		/*
1921		 * Clean up files involved in an interrupted internal init.
1922		 *
1923		 * 1. logs
1924		 *   a) remove old log files
1925		 *   b) set up initial log file #1
1926		 * 2. database files
1927		 * 3. the "init file"
1928		 *
1929		 * Steps 1 and 2 can be attempted independently.  Step 1b is
1930		 * dependent on successful completion of 1a.  Step 3 must not be
1931		 * done if anything fails along the way, because the init file's
1932		 * raison d'etre is to show that some files remain to be cleaned
1933		 * up.
1934		 */
1935		RPRINT(env, DB_VERB_REP_SYNC,
1936		    (env, "clean up interrupted internal init"));
1937		cleanup_failure = 0;
1938
1939		if ((t_ret = __rep_remove_logs(env)) == 0) {
1940			/*
1941			 * Since we have no logs, recover by making it look like
1942			 * the case when a new client first starts up, namely we
1943			 * have nothing but a fresh log file #1.  This is a
1944			 * little wasteful, since we may soon remove this log
1945			 * file again.  But that's OK, because this is the
1946			 * unusual case of NEWMASTER during internal init, and
1947			 * the rest of internal init doubtless dwarfs this.
1948			 */
1949			dblp = env->lg_handle;
1950			lp = dblp->reginfo.primary;
1951
1952			if ((t_ret = __rep_log_setup(env,
1953			    rep, 1, DB_LOGVERSION, &lp->ready_lsn)) != 0) {
1954				cleanup_failure = 1;
1955				if (ret == 0)
1956					ret = t_ret;
1957			}
1958		} else {
1959			cleanup_failure = 1;
1960			if (ret == 0)
1961				ret = t_ret;
1962		}
1963
1964		if ((t_ret = __rep_remove_by_list(env, rep->infoversion,
1965		    rep->originfo, rep->originfolen, rep->nfiles)) != 0) {
1966			cleanup_failure = 1;
1967			if (ret == 0)
1968				ret = t_ret;
1969		}
1970
1971		if (!cleanup_failure &&
1972		    (t_ret = __rep_remove_init_file(env)) != 0) {
1973			if (ret == 0)
1974				ret = t_ret;
1975		}
1976
1977		if (rep->originfo != NULL) {
1978			__os_free(env, rep->originfo);
1979			rep->originfo = NULL;
1980		}
1981	}
1982
1983	return (ret);
1984}
1985
1986/*
1987 * __rep_filedone -
1988 *	We need to check if we're done with the current file after
1989 *	processing the current page.  Stat the database to see if
1990 *	we have all the pages.  If so, we need to clean up/close
1991 *	this one, set up for the next one, and ask for its pages,
1992 *	or if this is the last file, request the log records and
1993 *	move to the REP_RECOVER_LOG state.
1994 */
1995static int
1996__rep_filedone(env, ip, eid, rep, msgfp, type)
1997	ENV *env;
1998	DB_THREAD_INFO *ip;
1999	int eid;
2000	REP *rep;
2001	__rep_fileinfo_args *msgfp;
2002	u_int32_t type;
2003{
2004	__rep_fileinfo_args *rfp;
2005	int ret;
2006
2007	/*
2008	 * We've put our page, now we need to do any gap processing
2009	 * that might be needed to re-request pages.
2010	 */
2011	ret = __rep_page_gap(env, rep, msgfp, type);
2012	/*
2013	 * The world changed while we were doing gap processing.
2014	 * We're done here.
2015	 */
2016	if (ret == DB_REP_PAGEDONE)
2017		return (0);
2018
2019	rfp = rep->curinfo;
2020	/*
2021	 * max_pgno is 0-based and npages is 1-based, so we don't have
2022	 * all the pages until npages is > max_pgno.
2023	 */
2024	RPRINT(env, DB_VERB_REP_SYNC,
2025	    (env, "FILEDONE: have %lu pages. Need %lu.",
2026	    (u_long)rep->npages, (u_long)rfp->max_pgno + 1));
2027	if (rep->npages <= rfp->max_pgno)
2028		return (0);
2029
2030	/*
2031	 * If we're queue and we think we have all the pages for this file,
2032	 * we need to do special queue processing.  Queue is handled in
2033	 * several stages.
2034	 */
2035	if (rfp->type == (u_int32_t)DB_QUEUE &&
2036	    ((ret = __rep_queue_filedone(env, ip, rep, rfp)) !=
2037	    DB_REP_PAGEDONE))
2038		return (ret);
2039	/*
2040	 * We have all the pages for this file.  Clean up.
2041	 */
2042	if ((ret = __rep_init_cleanup(env, rep, 0)) != 0)
2043		goto err;
2044
2045	rep->curfile++;
2046	ret = __rep_nextfile(env, eid, rep);
2047err:
2048	return (ret);
2049}
2050
2051/*
2052 * Starts requesting pages for the next file in the list (if any), or if not,
2053 * proceeds to the next stage: requesting logs.
2054 *
2055 * !!!
2056 * Called with REP_SYSTEM_LOCK held.
2057 */
2058static int
2059__rep_nextfile(env, eid, rep)
2060	ENV *env;
2061	int eid;
2062	REP *rep;
2063{
2064	DBT dbt;
2065	__rep_logreq_args lr_args;
2066	int ret;
2067	u_int8_t *buf, lrbuf[__REP_LOGREQ_SIZE];
2068	size_t len, msgsz;
2069
2070	/*
2071	 * Always direct the next request to the master (at least nominally),
2072	 * regardless of where the current response came from.  The application
2073	 * can always still redirect it to another client.
2074	 */
2075	if (rep->master_id != DB_EID_INVALID)
2076		eid = rep->master_id;
2077	if (rep->curfile == rep->nfiles) {
2078		RPRINT(env, DB_VERB_REP_SYNC, (env,
2079		    "NEXTFILE: have %d files.  RECOVER_LOG now", rep->nfiles));
2080		/*
2081		 * Move to REP_RECOVER_LOG state.
2082		 * Request logs.
2083		 */
2084		/*
2085		 * We need to do a sync here so that any later opens
2086		 * can find the file and file id.  We need to do it
2087		 * before we clear REP_F_RECOVER_PAGE so that we do not
2088		 * try to flush the log.
2089		 */
2090		if ((ret = __memp_sync_int(env, NULL, 0,
2091		    DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
2092			return (ret);
2093		F_CLR(rep, REP_F_RECOVER_PAGE);
2094		F_SET(rep, REP_F_RECOVER_LOG);
2095		memset(&dbt, 0, sizeof(dbt));
2096		lr_args.endlsn = rep->last_lsn;
2097		if ((ret = __rep_logreq_marshal(env, &lr_args, lrbuf,
2098		    __REP_LOGREQ_SIZE, &len)) != 0)
2099			return (ret);
2100		DB_INIT_DBT(dbt, lrbuf, len);
2101		REP_SYSTEM_UNLOCK(env);
2102		if ((ret = __rep_log_setup(env, rep,
2103		    rep->first_lsn.file, rep->first_vers, NULL)) != 0)
2104			return (ret);
2105		RPRINT(env, DB_VERB_REP_SYNC, (env,
2106		    "NEXTFILE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]",
2107		    (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset,
2108		    (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
2109		(void)__rep_send_message(env, eid,
2110		    REP_LOG_REQ, &rep->first_lsn, &dbt,
2111		    REPCTL_INIT, DB_REP_ANYWHERE);
2112		REP_SYSTEM_LOCK(env);
2113		return (0);
2114	}
2115
2116	/*
2117	 * 4.  If not, set curinfo to next file and request its pages.
2118	 */
2119	rep->finfo = rep->nextinfo;
2120	if ((ret = __rep_fileinfo_unmarshal(env, rep->infoversion,
2121	    &rep->curinfo, rep->finfo, rep->infolen, &rep->nextinfo)) != 0) {
2122		RPRINT(env, DB_VERB_REP_SYNC, (env,
2123		    "NEXTINFO: Fileinfo read: %s", db_strerror(ret)));
2124		return (ret);
2125	}
2126	DB_ASSERT(env, rep->curinfo->pgno == 0);
2127	rep->infolen -= (u_int32_t)(rep->nextinfo - rep->finfo);
2128	rep->ready_pg = 0;
2129	rep->npages = 0;
2130	rep->waiting_pg = PGNO_INVALID;
2131	rep->max_wait_pg = PGNO_INVALID;
2132	memset(&dbt, 0, sizeof(dbt));
2133	RPRINT(env, DB_VERB_REP_SYNC, (env,
2134	    "Next file %d: pgsize %lu, maxpg %lu", rep->curinfo->filenum,
2135	    (u_long)rep->curinfo->pgsize, (u_long)rep->curinfo->max_pgno));
2136	msgsz = __REP_FILEINFO_SIZE +
2137	    rep->curinfo->uid.size + rep->curinfo->info.size;
2138	if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0)
2139		return (ret);
2140	if ((ret = __rep_fileinfo_marshal(env, rep->infoversion,
2141	    rep->curinfo, buf, msgsz, &len)) != 0)
2142		return (ret);
2143	DB_INIT_DBT(dbt, buf, len);
2144	(void)__rep_send_message(env, eid, REP_PAGE_REQ,
2145	    NULL, &dbt, 0, DB_REP_ANYWHERE);
2146	__os_free(env, buf);
2147
2148	return (0);
2149}
2150
2151/*
2152 * __rep_mpf_open -
2153 *	Create and open the mpool file for a database.
2154 *	Used by both master and client to bring files into mpool.
2155 */
2156static int
2157__rep_mpf_open(env, mpfp, rfp, flags)
2158	ENV *env;
2159	DB_MPOOLFILE **mpfp;
2160	__rep_fileinfo_args *rfp;
2161	u_int32_t flags;
2162{
2163	DB db;
2164	int ret;
2165
2166	if ((ret = __memp_fcreate(env, mpfp)) != 0)
2167		return (ret);
2168
2169	/*
2170	 * We need a dbp to pass into to __env_mpool.  Set up
2171	 * only the parts that it needs.
2172	 */
2173	db.env = env;
2174	db.type = (DBTYPE)rfp->type;
2175	db.pgsize = rfp->pgsize;
2176	memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN);
2177	db.flags = rfp->db_flags;
2178	/* We need to make sure the dbp isn't marked open. */
2179	F_CLR(&db, DB_AM_OPEN_CALLED);
2180	/*
2181	 * The byte order of this database may be different from my local native
2182	 * byte order.  If so, set the swap bit so that the necessary swapping
2183	 * will be done during file I/O.
2184	 */
2185	if ((F_ISSET(env, ENV_LITTLEENDIAN) &&
2186	    !FLD_ISSET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN)) ||
2187	    (!F_ISSET(env, ENV_LITTLEENDIAN) &&
2188	    FLD_ISSET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN))) {
2189		RPRINT(env, DB_VERB_REP_SYNC, (env,
2190		    "rep_mpf_open: Different endian database.  Set swap bit."));
2191		F_SET(&db, DB_AM_SWAP);
2192	} else
2193		F_CLR(&db, DB_AM_SWAP);
2194
2195	db.mpf = *mpfp;
2196	if (F_ISSET(&db, DB_AM_INMEM))
2197		(void)__memp_set_flags(db.mpf, DB_MPOOL_NOFILE, 1);
2198	if ((ret = __env_mpool(&db, rfp->info.data, flags)) != 0) {
2199		(void)__memp_fclose(db.mpf, 0);
2200		*mpfp = NULL;
2201	}
2202	return (ret);
2203}
2204
2205/*
2206 * __rep_pggap_req -
2207 *	Request a page gap.  Assumes the caller holds the rep_mutex.
2208 *
2209 * PUBLIC: int __rep_pggap_req __P((ENV *, REP *, __rep_fileinfo_args *,
2210 * PUBLIC:     u_int32_t));
2211 */
2212int
2213__rep_pggap_req(env, rep, reqfp, gapflags)
2214	ENV *env;
2215	REP *rep;
2216	__rep_fileinfo_args *reqfp;
2217	u_int32_t gapflags;
2218{
2219	DBT max_pg_dbt;
2220	__rep_fileinfo_args *tmpfp, t;
2221	size_t len, msgsz;
2222	u_int32_t flags;
2223	int alloc, ret;
2224	u_int8_t *buf;
2225
2226	ret = 0;
2227	alloc = 0;
2228	/*
2229	 * There is a window where we have to set REP_RECOVER_PAGE when
2230	 * we receive the update information to transition from getting
2231	 * file information to getting page information.  However, that
2232	 * thread does release and then reacquire mutexes.  So, we might
2233	 * try re-requesting before the original thread can get curinfo
2234	 * setup.  If curinfo isn't set up there is nothing to do.
2235	 */
2236	if (rep->curinfo == NULL)
2237		return (0);
2238	if (reqfp == NULL) {
2239		if ((ret = __rep_finfo_alloc(env, rep->curinfo, &tmpfp)) != 0)
2240			return (ret);
2241		alloc = 1;
2242	} else {
2243		t = *reqfp;
2244		tmpfp = &t;
2245	}
2246
2247	/*
2248	 * If we've never requested this page, then
2249	 * request everything between it and the first
2250	 * page we have.  If we have requested this page
2251	 * then only request this record, not the entire gap.
2252	 */
2253	flags = 0;
2254	memset(&max_pg_dbt, 0, sizeof(max_pg_dbt));
2255	/*
2256	 * If this is a PAGE_MORE and we're forcing then we want to
2257	 * force the request to ask for the next page after this one.
2258	 */
2259	if (FLD_ISSET(gapflags, REP_GAP_FORCE))
2260		tmpfp->pgno++;
2261	else
2262		tmpfp->pgno = rep->ready_pg;
2263	msgsz = __REP_FILEINFO_SIZE +
2264	    tmpfp->uid.size + tmpfp->info.size;
2265	if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0)
2266		goto err;
2267	if (rep->max_wait_pg == PGNO_INVALID ||
2268	    FLD_ISSET(gapflags, REP_GAP_FORCE | REP_GAP_REREQUEST)) {
2269		/*
2270		 * Request the gap - set max to waiting_pg - 1 or if
2271		 * there is no waiting_pg, just ask for one.
2272		 */
2273		if (rep->waiting_pg == PGNO_INVALID) {
2274			if (FLD_ISSET(gapflags,
2275			    REP_GAP_FORCE | REP_GAP_REREQUEST))
2276				rep->max_wait_pg = rep->curinfo->max_pgno;
2277			else
2278				rep->max_wait_pg = rep->ready_pg;
2279		} else {
2280			/*
2281			 * If we're forcing, and waiting_pg is less than
2282			 * the page we want to start this request at, then
2283			 * we set max_wait_pg to the max pgno in the file.
2284			 */
2285			if (FLD_ISSET(gapflags, REP_GAP_FORCE) &&
2286			  rep->waiting_pg < tmpfp->pgno)
2287				rep->max_wait_pg = rep->curinfo->max_pgno;
2288			else
2289				rep->max_wait_pg = rep->waiting_pg - 1;
2290		}
2291		tmpfp->max_pgno = rep->max_wait_pg;
2292		/*
2293		 * Gap requests are "new" and can go anywhere.
2294		 */
2295		if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
2296			flags = DB_REP_REREQUEST;
2297		else
2298			flags = DB_REP_ANYWHERE;
2299	} else {
2300		/*
2301		 * Request 1 page - set max to ready_pg.
2302		 */
2303		rep->max_wait_pg = rep->ready_pg;
2304		tmpfp->max_pgno = rep->ready_pg;
2305		/*
2306		 * If we're dropping to singletons, this is a rerequest.
2307		 */
2308		flags = DB_REP_REREQUEST;
2309	}
2310	if (rep->master_id != DB_EID_INVALID) {
2311		STAT(rep->stat.st_pg_requested++);
2312		/*
2313		 * We need to request the pages, but we need to get the
2314		 * new info into rep->finfo.  Assert that the sizes never
2315		 * change.  The only thing this should do is change
2316		 * the pgno field.  Everything else remains the same.
2317		 */
2318		if ((ret = __rep_fileinfo_marshal(env, rep->infoversion,
2319		    tmpfp, buf, msgsz, &len)) == 0) {
2320			DB_INIT_DBT(max_pg_dbt, buf, len);
2321			DB_ASSERT(env, len == max_pg_dbt.size);
2322			(void)__rep_send_message(env, rep->master_id,
2323			    REP_PAGE_REQ, NULL, &max_pg_dbt, 0, flags);
2324		}
2325	} else
2326		(void)__rep_send_message(env, DB_EID_BROADCAST,
2327		    REP_MASTER_REQ, NULL, NULL, 0, 0);
2328
2329	__os_free(env, buf);
2330err:
2331	if (alloc)
2332		__os_free(env, tmpfp);
2333	return (ret);
2334}
2335
2336/*
2337 * __rep_finfo_alloc -
2338 *	Allocate and initialize a fileinfo structure.
2339 *
2340 * PUBLIC: int __rep_finfo_alloc __P((ENV *, __rep_fileinfo_args *,
2341 * PUBLIC:     __rep_fileinfo_args **));
2342 */
2343int
2344__rep_finfo_alloc(env, rfpsrc, rfpp)
2345	ENV *env;
2346	__rep_fileinfo_args *rfpsrc, **rfpp;
2347{
2348	__rep_fileinfo_args *rfp;
2349	size_t size;
2350	int ret;
2351	void *uidp, *infop;
2352
2353	/*
2354	 * Allocate enough for the structure and the two DBT data areas.
2355	 */
2356	size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size +
2357	    rfpsrc->info.size;
2358	if ((ret = __os_malloc(env, size, &rfp)) != 0)
2359		return (ret);
2360
2361	/*
2362	 * Copy the structure itself, and then set the DBT data pointers
2363	 * to their space and copy the data itself as well.
2364	 */
2365	memcpy(rfp, rfpsrc, sizeof(__rep_fileinfo_args));
2366	uidp = (u_int8_t *)rfp + sizeof(__rep_fileinfo_args);
2367	rfp->uid.data = uidp;
2368	memcpy(uidp, rfpsrc->uid.data, rfpsrc->uid.size);
2369
2370	infop = (u_int8_t *)uidp + rfpsrc->uid.size;
2371	rfp->info.data = infop;
2372	memcpy(infop, rfpsrc->info.data, rfpsrc->info.size);
2373	*rfpp = rfp;
2374	return (ret);
2375}
2376
2377/*
2378 * __rep_log_setup -
2379 *	We know our first LSN and need to reset the log subsystem
2380 *	to get our logs set up for the proper file.
2381 */
2382static int
2383__rep_log_setup(env, rep, file, version, lsnp)
2384	ENV *env;
2385	REP *rep;
2386	u_int32_t file;
2387	u_int32_t version;
2388	DB_LSN *lsnp;
2389{
2390	DB_LOG *dblp;
2391	DB_LSN lsn;
2392	DB_TXNMGR *mgr;
2393	DB_TXNREGION *region;
2394	LOG *lp;
2395	int ret;
2396
2397	dblp = env->lg_handle;
2398	lp = dblp->reginfo.primary;
2399	mgr = env->tx_handle;
2400	region = mgr->reginfo.primary;
2401
2402	/*
2403	 * Set up the log starting at the file number of the first LSN we
2404	 * need to get from the master.
2405	 */
2406	LOG_SYSTEM_LOCK(env);
2407	if ((ret = __log_newfile(dblp, &lsn, file, version)) == 0 &&
2408	    lsnp != NULL)
2409		*lsnp = lsn;
2410	LOG_SYSTEM_UNLOCK(env);
2411
2412	/*
2413	 * We reset first_lsn to the lp->lsn.  We were given the LSN of
2414	 * the checkpoint and we now need the LSN for the beginning of
2415	 * the file, which __log_newfile conveniently set up for us
2416	 * in lp->lsn.
2417	 */
2418	rep->first_lsn = lp->lsn;
2419	TXN_SYSTEM_LOCK(env);
2420	ZERO_LSN(region->last_ckp);
2421	TXN_SYSTEM_UNLOCK(env);
2422	return (ret);
2423}
2424
2425/*
2426 * __rep_queue_filedone -
2427 *	Determine if we're really done getting the pages for a queue file.
2428 *	Queue is handled in several steps.
2429 *	1.  First we get the meta page only.
2430 *	2.  We use the meta-page information to figure out first and last
2431 *	    page numbers (and if queue wraps, first can be > last.
2432 *	3.  If first < last, we do a REP_PAGE_REQ for all pages.
2433 *	4.  If first > last, we REP_PAGE_REQ from first -> max page number.
2434 *	    Then we'll ask for page 1 -> last.
2435 *
2436 * This function can return several things:
2437 *	DB_REP_PAGEDONE - if we're done with this file.
2438 *	0 - if we're not done with this file.
2439 *	error - if we get an error doing some operations.
2440 *
2441 * This function will open a dbp handle to the queue file.  This is needed
2442 * by most of the QAM macros.  We'll open it on the first pass through
2443 * here and we'll close it whenever we decide we're done.
2444 */
2445static int
2446__rep_queue_filedone(env, ip, rep, rfp)
2447	ENV *env;
2448	DB_THREAD_INFO *ip;
2449	REP *rep;
2450	__rep_fileinfo_args *rfp;
2451{
2452#ifndef HAVE_QUEUE
2453	COMPQUIET(ip, NULL);
2454	COMPQUIET(rep, NULL);
2455	COMPQUIET(rfp, NULL);
2456	return (__db_no_queue_am(env));
2457#else
2458	DB *queue_dbp;
2459	db_pgno_t first, last;
2460	u_int32_t flags;
2461	int empty, ret, t_ret;
2462
2463	ret = 0;
2464	queue_dbp = NULL;
2465	if (rep->queue_dbc == NULL) {
2466		/*
2467		 * We need to do a sync here so that the open
2468		 * can find the file and file id.
2469		 */
2470		if ((ret = __memp_sync_int(env, NULL, 0,
2471		    DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0)
2472			goto out;
2473		if ((ret =
2474		    __db_create_internal(&queue_dbp, env, 0)) != 0)
2475			goto out;
2476		flags = DB_NO_AUTO_COMMIT |
2477		    (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0);
2478		/*
2479		 * We need to check whether this is in-memory so that we pass
2480		 * the name correctly as either the file or the database name.
2481		 */
2482		if ((ret = __db_open(queue_dbp, ip, NULL,
2483		    FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? NULL :
2484			rfp->info.data,
2485		    FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? rfp->info.data :
2486			NULL,
2487		    DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0)
2488			goto out;
2489
2490		if ((ret = __db_cursor(queue_dbp,
2491		    ip, NULL, &rep->queue_dbc, 0)) != 0)
2492			goto out;
2493	} else
2494		queue_dbp = rep->queue_dbc->dbp;
2495
2496	if ((ret = __queue_pageinfo(queue_dbp,
2497	    &first, &last, &empty, 0, 0)) != 0)
2498		goto out;
2499	RPRINT(env, DB_VERB_REP_SYNC, (env,
2500	    "Queue fileinfo: first %lu, last %lu, empty %d",
2501	    (u_long)first, (u_long)last, empty));
2502	/*
2503	 * We can be at the end of 3 possible states.
2504	 * 1.  We have received the meta-page and now need to get the
2505	 *     rest of the pages in the database.
2506	 * 2.  We have received from first -> max_pgno.  We might be done,
2507	 *     or we might need to ask for wrapped pages.
2508	 * 3.  We have received all pages in the file.  We're done.
2509	 */
2510	if (rfp->max_pgno == 0) {
2511		/*
2512		 * We have just received the meta page.  Set up the next
2513		 * pages to ask for and check if the file is empty.
2514		 */
2515		if (empty)
2516			goto out;
2517		if (first > last) {
2518			rfp->max_pgno =
2519			    QAM_RECNO_PAGE(rep->queue_dbc->dbp, UINT32_MAX);
2520		} else
2521			rfp->max_pgno = last;
2522		RPRINT(env, DB_VERB_REP_SYNC, (env,
2523		    "Queue fileinfo: First req: first %lu, last %lu",
2524		    (u_long)first, (u_long)rfp->max_pgno));
2525		goto req;
2526	} else if (rfp->max_pgno != last) {
2527		/*
2528		 * If max_pgno != last that means we're dealing with a
2529		 * wrapped situation.  Request next batch of pages.
2530		 * Set npages to 1 because we already have page 0, the
2531		 * meta-page, now we need pages 1-max_pgno.
2532		 */
2533		first = 1;
2534		rfp->max_pgno = last;
2535		RPRINT(env, DB_VERB_REP_SYNC, (env,
2536		    "Queue fileinfo: Wrap req: first %lu, last %lu",
2537		    (u_long)first, (u_long)last));
2538req:
2539		/*
2540		 * Since we're simulating a "gap" to resend new PAGE_REQ
2541		 * for this file, we need to set waiting page to last + 1
2542		 * so that we'll ask for all from ready_pg -> last.
2543		 */
2544		rep->npages = first;
2545		rep->ready_pg = first;
2546		rep->waiting_pg = rfp->max_pgno + 1;
2547		rep->max_wait_pg = PGNO_INVALID;
2548		ret = __rep_pggap_req(env, rep, rfp, 0);
2549		return (ret);
2550	}
2551	/*
2552	 * max_pgno == last
2553	 * If we get here, we have all the pages we need.
2554	 * Close the dbp and return.
2555	 */
2556out:
2557	if (rep->queue_dbc != NULL &&
2558	    (t_ret = __dbc_close(rep->queue_dbc)) != 0 && ret == 0)
2559		ret = t_ret;
2560	rep->queue_dbc = NULL;
2561
2562	if (queue_dbp != NULL &&
2563	    (t_ret = __db_close(queue_dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
2564		ret = t_ret;
2565	if (ret == 0)
2566		ret = DB_REP_PAGEDONE;
2567	return (ret);
2568#endif
2569}
2570
2571/*
2572 * PUBLIC: int __rep_remove_init_file __P((ENV *));
2573 */
2574int
2575__rep_remove_init_file(env)
2576	ENV *env;
2577{
2578	int ret;
2579	char *name;
2580
2581	if ((ret = __db_appname(
2582	    env, DB_APP_NONE, REP_INITNAME, 0, NULL, &name)) != 0)
2583		return (ret);
2584	(void)__os_unlink(env, name, 0);
2585	__os_free(env, name);
2586	return (0);
2587}
2588
2589/*
2590 * Checks for the existence of the internal init flag file.  If it exists, we
2591 * remove all logs and databases, and then remove the flag file.  This is
2592 * intended to force the internal init to start over again, and thus affords
2593 * protection against a client crashing during internal init.  This function
2594 * must be called before normal recovery in order to be properly effective.
2595 *
2596 * !!!
2597 * This function should only be called during initial set-up of the environment,
2598 * before various subsystems are initialized.  It doesn't rely on the
2599 * subsystems' code having been initialized, and it summarily deletes files "out
2600 * from under" them, which might disturb the subsystems if they were up.
2601 *
2602 * PUBLIC: int __rep_reset_init __P((ENV *));
2603 */
2604int
2605__rep_reset_init(env)
2606	ENV *env;
2607{
2608	DB_FH *fhp;
2609	__rep_update_args *rup;
2610	DBT dbt;
2611	char *allocated_dir, *dir, *init_name;
2612	size_t cnt;
2613	u_int32_t dbtvers, fvers, zero;
2614	u_int8_t *next;
2615	int ret, t_ret;
2616
2617	allocated_dir = NULL;
2618	rup = NULL;
2619	dbt.data = NULL;
2620
2621	if ((ret = __db_appname(
2622	    env, DB_APP_NONE, REP_INITNAME, 0, NULL, &init_name)) != 0)
2623		return (ret);
2624
2625	if ((ret = __os_open(
2626	    env, init_name, 0, DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0) {
2627		if (ret == ENOENT)
2628			ret = 0;
2629		goto out;
2630	}
2631
2632	RPRINT(env, DB_VERB_REP_SYNC,
2633	    (env, "Cleaning up interrupted internal init"));
2634
2635	/* There are a few possibilities:
2636	 *   1. no init file, or less than 1 full file list
2637	 *   2. exactly one full file list
2638	 *   3. more than one, less then a second full file list
2639	 *   4. second file list in full
2640	 *
2641	 * In cases 2 or 4, we need to remove all logs, and then remove files
2642	 * according to the (most recent) file list.  (In case 1 or 3, we don't
2643	 * have to do anything.)
2644	 *
2645	 * The __rep_get_file_list function takes care of folding these cases
2646	 * into two simple outcomes.
2647	 *
2648	 * As of 4.7, the first 4 bytes are 0.  Read the first 4 bytes now.
2649	 * If they are non-zero it means we have an old-style init file.
2650	 * Otherwise, pass the file version in to rep_get_file_list.
2651	 */
2652	if ((ret = __os_read(env, fhp, &zero, sizeof(zero), &cnt)) != 0)
2653		goto out;
2654	/*
2655	 * If we read successfully, but not enough, then unlink the file.
2656	 */
2657	if (cnt != sizeof(zero))
2658		goto rm;
2659	if (zero != 0) {
2660		/*
2661		 * Old style file.  We have to set fvers to the 4.6
2662		 * version of the file and also rewind the file so
2663		 * that __rep_get_file_list can read out the length itself.
2664		 */
2665		if ((ret = __os_seek(env, fhp, 0, 0, 0)) != 0)
2666			goto out;
2667		fvers = REP_INITVERSION_46;
2668	} else if ((ret = __os_read(env,
2669	    fhp, &fvers, sizeof(fvers), &cnt)) != 0)
2670		goto out;
2671	else if (cnt != sizeof(fvers))
2672		goto rm;
2673	ret = __rep_get_file_list(env, fhp, fvers, &dbtvers, &dbt);
2674	if ((t_ret = __os_closehandle(env, fhp)) != 0 || ret != 0) {
2675		if (ret == 0)
2676			ret = t_ret;
2677		goto out;
2678	}
2679	if (dbt.data == NULL) {
2680		/*
2681		 * The init file did not end with an intact file list.  Since we
2682		 * never start log/db removal without an intact file list
2683		 * sync'ed to the init file, this must mean we don't have any
2684		 * partial set of files to clean up.  So all we need to do is
2685		 * remove the init file.
2686		 */
2687		goto rm;
2688	}
2689
2690	/* Remove all log files. */
2691	if (env->dbenv->db_log_dir == NULL)
2692		dir = env->db_home;
2693	else {
2694		if ((ret = __db_appname(env, DB_APP_NONE,
2695		    env->dbenv->db_log_dir, 0, NULL, &dir)) != 0)
2696			goto out;
2697		allocated_dir = dir;
2698	}
2699
2700	if ((ret = __rep_remove_by_prefix(env,
2701	    dir, LFPREFIX, sizeof(LFPREFIX)-1, DB_APP_LOG)) != 0)
2702		goto out;
2703
2704	/*
2705	 * Remove databases according to the list, and queue extent files by
2706	 * searching them out on a walk through the data_dir's.
2707	 */
2708	if ((ret = __rep_update_unmarshal(env, dbtvers,
2709	    &rup, dbt.data, dbt.size, &next)) != 0)
2710		goto out;
2711	if ((ret = __rep_remove_by_list(env, dbtvers,
2712	    next, dbt.size, rup->num_files)) != 0)
2713		goto out;
2714
2715	/* Here, we've established that the file exists. */
2716rm:	(void)__os_unlink(env, init_name, 0);
2717out:	if (rup != NULL)
2718		__os_free(env, rup);
2719	if (allocated_dir != NULL)
2720		__os_free(env, allocated_dir);
2721	if (dbt.data != NULL)
2722		__os_free(env, dbt.data);
2723
2724	__os_free(env, init_name);
2725	return (ret);
2726}
2727
2728/*
2729 * Reads the last fully intact file list from the init file.  If the file ends
2730 * with a partial list (or is empty), we're not interested in it.  Lack of a
2731 * full file list is indicated by a NULL dbt->data.  On success, the list is
2732 * returned in allocated space, which becomes the responsibility of the caller.
2733 *
2734 * The file format is a u_int32_t buffer length, in native format, followed by
2735 * the file list itself, in the same format as in an UPDATE message (though
2736 * many parts of it in this case are meaningless).
2737 */
2738static int
2739__rep_get_file_list(env, fhp, fvers, dbtvers, dbt)
2740	ENV *env;
2741	DB_FH *fhp;
2742	u_int32_t fvers;
2743	u_int32_t *dbtvers;
2744	DBT *dbt;
2745{
2746	u_int32_t length, mvers;
2747	size_t cnt;
2748	int i, ret;
2749
2750	/* At most 2 file lists: old and new. */
2751	dbt->data = NULL;
2752	mvers = DB_REPVERSION_46;
2753	length = 0;
2754	for (i = 1; i <= 2; i++) {
2755		if (fvers >= REP_INITVERSION_47) {
2756			if ((ret = __os_read(env, fhp, &mvers,
2757			    sizeof(mvers), &cnt)) != 0)
2758				goto err;
2759			if (cnt == 0 && dbt->data != NULL)
2760				break;
2761			if (cnt != sizeof(mvers))
2762				goto err;
2763		}
2764		if ((ret = __os_read(env,
2765		    fhp, &length, sizeof(length), &cnt)) != 0)
2766			goto err;
2767
2768		/*
2769		 * Reaching the end here is fine, if we've been through at least
2770		 * once already.
2771		 */
2772		if (cnt == 0 && dbt->data != NULL)
2773			break;
2774		if (cnt != sizeof(length))
2775			goto err;
2776
2777		if ((ret = __os_realloc(env,
2778		    (size_t)length, &dbt->data)) != 0)
2779			goto err;
2780
2781		if ((ret = __os_read(
2782		    env, fhp, dbt->data, length, &cnt)) != 0 ||
2783		    cnt != (size_t)length)
2784			goto err;
2785	}
2786
2787	*dbtvers = mvers;
2788	dbt->size = length;
2789	return (0);
2790
2791err:
2792	/*
2793	 * Note that it's OK to get here with a zero value in 'ret': it means we
2794	 * read less than we expected, and dbt->data == NULL indicates to the
2795	 * caller that we don't have an intact list.
2796	 */
2797	if (dbt->data != NULL)
2798		__os_free(env, dbt->data);
2799	dbt->data = NULL;
2800	return (ret);
2801}
2802
2803/*
2804 * Removes every file in a given directory that matches a given prefix.  Notice
2805 * how similar this is to __rep_walk_dir.
2806 */
2807static int
2808__rep_remove_by_prefix(env, dir, prefix, pref_len, appname)
2809	ENV *env;
2810	const char *dir;
2811	const char *prefix;
2812	size_t pref_len;
2813	APPNAME appname;	/* What kind of name. */
2814{
2815	char *namep, **names;
2816	int cnt, i, ret;
2817
2818	if ((ret = __os_dirlist(env, dir, 0, &names, &cnt)) != 0)
2819		return (ret);
2820	for (i = 0; i < cnt; i++) {
2821		if (strncmp(names[i], prefix, pref_len) == 0) {
2822			if ((ret = __db_appname(env,
2823			    appname, names[i], 0, NULL, &namep)) != 0)
2824				goto out;
2825			(void)__os_unlink(env, namep, 0);
2826			__os_free(env, namep);
2827		}
2828	}
2829out:	__os_dirfree(env, names, cnt);
2830	return (ret);
2831}
2832
2833/*
2834 * Removes database files according to the contents of a list.
2835 *
2836 * This function must support removal either during environment creation, or
2837 * when an internal init is reset in the middle.  This means it must work
2838 * regardless of whether underlying subsystems are initialized.  However, it may
2839 * assume that databases are not open.  That means there is no REP!
2840 */
2841static int
2842__rep_remove_by_list(env, version, filelist, filesz, count)
2843	ENV *env;
2844	u_int32_t version;
2845	u_int8_t *filelist;
2846	u_int32_t filesz;
2847	u_int32_t count;
2848{
2849	DB_ENV *dbenv;
2850	__rep_fileinfo_args *rfp;
2851	char **ddir, *dir, *namep;
2852	u_int8_t *new_fp;
2853	int ret;
2854
2855	dbenv = env->dbenv;
2856	ret = 0;
2857	rfp = NULL;
2858	while (count-- > 0) {
2859		if ((ret = __rep_fileinfo_unmarshal(env, version,
2860		    &rfp, filelist, filesz, &new_fp)) != 0)
2861			goto out;
2862		filesz -= (u_int32_t)(new_fp - filelist);
2863		filelist = new_fp;
2864		if ((ret = __db_appname(env,
2865		    DB_APP_DATA, rfp->info.data, 0, NULL, &namep)) != 0)
2866			goto out;
2867		(void)__os_unlink(env, namep, 0);
2868		__os_free(env, namep);
2869		__os_free(env, rfp);
2870		rfp = NULL;
2871	}
2872
2873	/* Notice how similar this code is to __rep_find_dbs. */
2874	if (dbenv->db_data_dir == NULL)
2875		ret = __rep_remove_by_prefix(env, env->db_home,
2876		    QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX) - 1,
2877		    DB_APP_DATA);
2878	else {
2879		for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) {
2880			if ((ret = __db_appname(env, DB_APP_NONE,
2881			    *ddir, 0, NULL, &dir)) != 0)
2882				break;
2883			ret = __rep_remove_by_prefix(env, dir,
2884			    QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX)-1,
2885			    DB_APP_DATA);
2886			__os_free(env, dir);
2887			if (ret != 0)
2888				break;
2889		}
2890	}
2891
2892out:
2893	if (rfp != NULL)
2894		__os_free(env, rfp);
2895	return (ret);
2896}
2897