• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/router/db-4.8.30/examples_cxx/excxx_repquote/
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9/*
10 * In this application, we specify all communication via the command line.  In
11 * a real application, we would expect that information about the other sites
12 * in the system would be maintained in some sort of configuration file.  The
13 * critical part of this interface is that we assume at startup that we can
14 * find out
15 * 	1) what our Berkeley DB home environment is,
16 * 	2) what host/port we wish to listen on for connections; and
17 * 	3) an optional list of other sites we should attempt to connect to.
18 *
19 * These pieces of information are expressed by the following flags.
20 * -h home (required; h stands for home directory)
21 * -l host:port (required; l stands for local)
22 * -C or -M (optional; start up as client or master)
23 * -r host:port (optional; r stands for remote; any number of these may be
24 *	specified)
25 * -R host:port (optional; R stands for remote peer; only one of these may
26 *      be specified)
27 * -a all|quorum (optional; a stands for ack policy)
28 * -b (optional; b stands for bulk)
29 * -n nsites (optional; number of sites in replication group; defaults to 0
30 *	to try to dynamically compute nsites)
31 * -p priority (optional; defaults to 100)
32 * -v (optional; v stands for verbose)
33 */
34
35#include <cstdlib>
36#include <cstring>
37
38#include <iostream>
39#include <string>
40#include <sstream>
41
42#include <db_cxx.h>
43#include "RepConfigInfo.h"
44#include "dbc_auto.h"
45
46using std::cout;
47using std::cin;
48using std::cerr;
49using std::endl;
50using std::flush;
51using std::istream;
52using std::istringstream;
53using std::string;
54using std::getline;
55
56#define	CACHESIZE	(10 * 1024 * 1024)
57#define	DATABASE	"quote.db"
58
59const char *progname = "excxx_repquote";
60
61#include <errno.h>
62#ifdef _WIN32
63#define WIN32_LEAN_AND_MEAN
64#include <windows.h>
65#define	snprintf		_snprintf
66#define	sleep(s)		Sleep(1000 * (s))
67
68extern "C" {
69  extern int getopt(int, char * const *, const char *);
70  extern char *optarg;
71}
72
73typedef HANDLE thread_t;
74typedef DWORD thread_exit_status_t;
75#define	thread_create(thrp, attr, func, arg)				   \
76    (((*(thrp) = CreateThread(NULL, 0,					   \
77	(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
78#define	thread_join(thr, statusp)					   \
79    ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&		   \
80    GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
81#else /* !_WIN32 */
82#include <pthread.h>
83
84typedef pthread_t thread_t;
85typedef void* thread_exit_status_t;
86#define	thread_create(thrp, attr, func, arg)				   \
87    pthread_create((thrp), (attr), (func), (arg))
88#define	thread_join(thr, statusp) pthread_join((thr), (statusp))
89#endif
90
91// Struct used to store information in Db app_private field.
92typedef struct {
93	bool app_finished;
94	bool in_client_sync;
95	bool is_master;
96} APP_DATA;
97
98static void log(const char *);
99void *checkpoint_thread (void *);
100void *log_archive_thread (void *);
101
102class RepQuoteExample {
103public:
104	RepQuoteExample();
105	void init(RepConfigInfo* config);
106	void doloop();
107	int terminate();
108
109	static void event_callback(DbEnv* dbenv, u_int32_t which, void *info);
110
111private:
112	// disable copy constructor.
113	RepQuoteExample(const RepQuoteExample &);
114	void operator = (const RepQuoteExample &);
115
116	// internal data members.
117	APP_DATA		app_data;
118	RepConfigInfo   *app_config;
119	DbEnv		   cur_env;
120	thread_t ckp_thr;
121	thread_t lga_thr;
122
123	// private methods.
124	void print_stocks(Db *dbp);
125	void prompt();
126};
127
128class DbHolder {
129public:
130	DbHolder(DbEnv *env) : env(env) {
131	dbp = 0;
132	}
133
134	~DbHolder() {
135	try {
136		close();
137	} catch (...) {
138		// Ignore: this may mean another exception is pending
139	}
140	}
141
142	bool ensure_open(bool creating) {
143	if (dbp)
144		return (true);
145	dbp = new Db(env, 0);
146
147	u_int32_t flags = DB_AUTO_COMMIT;
148	if (creating)
149		flags |= DB_CREATE;
150	try {
151		dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
152		return (true);
153	} catch (DbDeadlockException e) {
154	} catch (DbRepHandleDeadException e) {
155	} catch (DbException e) {
156		if (e.get_errno() == DB_REP_LOCKOUT) {
157		// Just fall through.
158		} else if (e.get_errno() == ENOENT && !creating) {
159		// Provide a bit of extra explanation.
160		//
161		log("Stock DB does not yet exist");
162		} else
163		throw;
164	}
165
166	// (All retryable errors fall through to here.)
167	//
168	log("please retry the operation");
169	close();
170	return (false);
171	}
172
173	void close() {
174	if (dbp) {
175		try {
176		dbp->close(0);
177		delete dbp;
178		dbp = 0;
179		} catch (...) {
180		delete dbp;
181		dbp = 0;
182		throw;
183		}
184	}
185	}
186
187	operator Db *() {
188	return dbp;
189	}
190
191	Db *operator->() {
192	return dbp;
193	}
194
195private:
196	Db *dbp;
197	DbEnv *env;
198};
199
200class StringDbt : public Dbt {
201public:
202#define GET_STRING_OK 0
203#define GET_STRING_INVALID_PARAM 1
204#define GET_STRING_SMALL_BUFFER 2
205#define GET_STRING_EMPTY_DATA 3
206	int get_string(char **buf, size_t buf_len)
207	{
208		size_t copy_len;
209		int ret = GET_STRING_OK;
210		if (buf == NULL) {
211			cerr << "Invalid input buffer to get_string" << endl;
212			return GET_STRING_INVALID_PARAM;
213		}
214
215		// make sure the string is null terminated.
216		memset(*buf, 0, buf_len);
217
218		// if there is no string, just return.
219		if (get_data() == NULL || get_size() == 0)
220			return GET_STRING_OK;
221
222		if (get_size() >= buf_len) {
223			ret = GET_STRING_SMALL_BUFFER;
224			copy_len = buf_len - 1; // save room for a terminator.
225		} else
226			copy_len = get_size();
227		memcpy(*buf, get_data(), copy_len);
228
229		return ret;
230	}
231	size_t get_string_length()
232	{
233		if (get_size() == 0)
234			return 0;
235		return strlen((char *)get_data());
236	}
237	void set_string(char *string)
238	{
239		set_data(string);
240		set_size((u_int32_t)strlen(string));
241	}
242
243	StringDbt(char *string) :
244	    Dbt(string, (u_int32_t)strlen(string)) {};
245	StringDbt() : Dbt() {};
246	~StringDbt() {};
247
248	// Don't add extra data to this sub-class since we want it to remain
249	// compatible with Dbt objects created internally by Berkeley DB.
250};
251
252RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(0) {
253	app_data.app_finished = 0;
254	app_data.in_client_sync = 0;
255	app_data.is_master = 0; // assume I start out as client
256}
257
258void RepQuoteExample::init(RepConfigInfo *config) {
259	app_config = config;
260
261	cur_env.set_app_private(&app_data);
262	cur_env.set_errfile(stderr);
263	cur_env.set_errpfx(progname);
264	cur_env.set_event_notify(event_callback);
265
266	// Configure bulk transfer to send groups of records to clients
267	// in a single network transfer.  This is useful for master sites
268	// and clients participating in client-to-client synchronization.
269	//
270	if (app_config->bulk)
271		cur_env.rep_set_config(DB_REP_CONF_BULK, 1);
272
273	// Set the total number of sites in the replication group.
274	// This is used by repmgr internal election processing.
275	//
276	if (app_config->totalsites > 0)
277		cur_env.rep_set_nsites(app_config->totalsites);
278
279	// Turn on debugging and informational output if requested.
280	if (app_config->verbose)
281		cur_env.set_verbose(DB_VERB_REPLICATION, 1);
282
283	// Set replication group election priority for this environment.
284	// An election first selects the site with the most recent log
285	// records as the new master.  If multiple sites have the most
286	// recent log records, the site with the highest priority value
287	// is selected as master.
288	//
289	cur_env.rep_set_priority(app_config->priority);
290
291	// Set the policy that determines how master and client sites
292	// handle acknowledgement of replication messages needed for
293	// permanent records.  The default policy of "quorum" requires only
294	// a quorum of electable peers sufficient to ensure a permanent
295	// record remains durable if an election is held.  The "all" option
296	// requires all clients to acknowledge a permanent replication
297	// message instead.
298	//
299	cur_env.repmgr_set_ack_policy(app_config->ack_policy);
300
301	// Set the threshold for the minimum and maximum time the client
302	// waits before requesting retransmission of a missing message.
303	// Base these values on the performance and load characteristics
304	// of the master and client host platforms as well as the round
305	// trip message time.
306	//
307	cur_env.rep_set_request(20000, 500000);
308
309	// Configure deadlock detection to ensure that any deadlocks
310	// are broken by having one of the conflicting lock requests
311	// rejected. DB_LOCK_DEFAULT uses the lock policy specified
312	// at environment creation time or DB_LOCK_RANDOM if none was
313	// specified.
314	//
315	cur_env.set_lk_detect(DB_LOCK_DEFAULT);
316
317	// The following base replication features may also be useful to your
318	// application. See Berkeley DB documentation for more details.
319	//   - Master leases: Provide stricter consistency for data reads
320	//     on a master site.
321	//   - Timeouts: Customize the amount of time Berkeley DB waits
322	//     for such things as an election to be concluded or a master
323	//     lease to be granted.
324	//   - Delayed client synchronization: Manage the master site's
325	//     resources by spreading out resource-intensive client
326	//     synchronizations.
327	//   - Blocked client operations: Return immediately with an error
328	//     instead of waiting indefinitely if a client operation is
329	//     blocked by an ongoing client synchronization.
330
331	cur_env.repmgr_set_local_site(app_config->this_host.host,
332	    app_config->this_host.port, 0);
333
334	for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL;
335		cur = cur->next) {
336		cur_env.repmgr_add_remote_site(cur->host, cur->port,
337		    NULL, cur->peer ? DB_REPMGR_PEER : 0);
338	}
339
340	// Configure heartbeat timeouts so that repmgr monitors the
341	// health of the TCP connection.  Master sites broadcast a heartbeat
342	// at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
343	// Client sites wait for message activity the length of the
344	// DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
345	// connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
346	// timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
347	//
348	cur_env.rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
349	cur_env.rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
350
351	// The following repmgr features may also be useful to your
352	// application.  See Berkeley DB documentation for more details.
353	//  - Two-site strict majority rule - In a two-site replication
354	//    group, require both sites to be available to elect a new
355	//    master.
356	//  - Timeouts - Customize the amount of time repmgr waits
357	//    for such things as waiting for acknowledgements or attempting
358	//    to reconnect to other sites.
359	//  - Site list - return a list of sites currently known to repmgr.
360
361	// We can now open our environment, although we're not ready to
362	// begin replicating.  However, we want to have a dbenv around
363	// so that we can send it into any of our message handlers.
364	//
365	cur_env.set_cachesize(0, CACHESIZE, 0);
366	cur_env.set_flags(DB_TXN_NOSYNC, 1);
367
368	cur_env.open(app_config->home, DB_CREATE | DB_RECOVER |
369	    DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
370	    DB_INIT_MPOOL | DB_INIT_TXN, 0);
371
372	// Start checkpoint and log archive support threads.
373	(void)thread_create(&ckp_thr, NULL, checkpoint_thread, &cur_env);
374	(void)thread_create(&lga_thr, NULL, log_archive_thread, &cur_env);
375
376	cur_env.repmgr_start(3, app_config->start_policy);
377}
378
379int RepQuoteExample::terminate() {
380	try {
381		// Wait for checkpoint and log archive threads to finish.
382		// Windows does not allow NULL pointer for exit code variable.
383		thread_exit_status_t exstat;
384
385		(void)thread_join(lga_thr, &exstat);
386		(void)thread_join(ckp_thr, &exstat);
387
388		// We have used the DB_TXN_NOSYNC environment flag for
389		// improved performance without the usual sacrifice of
390		// transactional durability, as discussed in the
391		// "Transactional guarantees" page of the Reference
392		// Guide: if one replication site crashes, we can
393		// expect the data to exist at another site.  However,
394		// in case we shut down all sites gracefully, we push
395		// out the end of the log here so that the most
396		// recent transactions don't mysteriously disappear.
397		//
398		cur_env.log_flush(NULL);
399
400		cur_env.close(0);
401	} catch (DbException dbe) {
402		cout << "error closing environment: " << dbe.what() << endl;
403	}
404	return 0;
405}
406
407void RepQuoteExample::prompt() {
408	cout << "QUOTESERVER";
409	if (!app_data.is_master)
410		cout << "(read-only)";
411	cout << "> " << flush;
412}
413
414void log(const char *msg) {
415	cerr << msg << endl;
416}
417
418// Simple command-line user interface:
419//  - enter "<stock symbol> <price>" to insert or update a record in the
420//	database;
421//  - just press Return (i.e., blank input line) to print out the contents of
422//	the database;
423//  - enter "quit" or "exit" to quit.
424//
425void RepQuoteExample::doloop() {
426	DbHolder dbh(&cur_env);
427
428	string input;
429	while (prompt(), getline(cin, input)) {
430		istringstream is(input);
431		string token1, token2;
432
433		// Read 0, 1 or 2 tokens from the input.
434		//
435		int count = 0;
436		if (is >> token1) {
437			count++;
438			if (is >> token2)
439			count++;
440		}
441
442		if (count == 1) {
443			if (token1 == "exit" || token1 == "quit") {
444				app_data.app_finished = 1;
445				break;
446			} else {
447				log("Format: <stock> <price>");
448				continue;
449			}
450		}
451
452		// Here we know count is either 0 or 2, so we're about to try a
453		// DB operation.
454		//
455		// Open database with DB_CREATE only if this is a master
456		// database.  A client database uses polling to attempt
457		// to open the database without DB_CREATE until it is
458		// successful.
459		//
460		// This DB_CREATE polling logic can be simplified under
461		// some circumstances.  For example, if the application can
462		// be sure a database is already there, it would never need
463		// to open it with DB_CREATE.
464		//
465		if (!dbh.ensure_open(app_data.is_master))
466			continue;
467
468		try {
469			if (count == 0)
470				if (app_data.in_client_sync)
471					log(
472    "Cannot read data during client initialization - please try again.");
473				else
474					print_stocks(dbh);
475			else if (!app_data.is_master)
476				log("Can't update at client");
477			else {
478				const char *symbol = token1.c_str();
479				StringDbt key(const_cast<char*>(symbol));
480
481				const char *price = token2.c_str();
482				StringDbt data(const_cast<char*>(price));
483
484				dbh->put(NULL, &key, &data, 0);
485			}
486		} catch (DbDeadlockException e) {
487			log("please retry the operation");
488			dbh.close();
489		} catch (DbRepHandleDeadException e) {
490			log("please retry the operation");
491			dbh.close();
492		} catch (DbException e) {
493			if (e.get_errno() == DB_REP_LOCKOUT) {
494			log("please retry the operation");
495			dbh.close();
496			} else
497			throw;
498		}
499	}
500
501	dbh.close();
502}
503
504void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
505{
506	APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
507
508	info = NULL;		/* Currently unused. */
509
510	switch (which) {
511	case DB_EVENT_REP_CLIENT:
512		app->is_master = 0;
513		app->in_client_sync = 1;
514		break;
515	case DB_EVENT_REP_MASTER:
516		app->is_master = 1;
517		app->in_client_sync = 0;
518		break;
519	case DB_EVENT_REP_NEWMASTER:
520		app->in_client_sync = 1;
521		break;
522	case DB_EVENT_REP_PERM_FAILED:
523		// Did not get enough acks to guarantee transaction
524		// durability based on the configured ack policy.  This
525		// transaction will be flushed to the master site's
526		// local disk storage for durability.
527		//
528		log(
529    "Insufficient acknowledgements to guarantee transaction durability.");
530		break;
531	case DB_EVENT_REP_STARTUPDONE:
532		app->in_client_sync = 0;
533		break;
534	default:
535		dbenv->errx("ignoring event %d", which);
536	}
537}
538
539void RepQuoteExample::print_stocks(Db *dbp) {
540	StringDbt key, data;
541#define	MAXKEYSIZE	10
542#define	MAXDATASIZE	20
543	char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
544	char *kbuf, *dbuf;
545
546	memset(&key, 0, sizeof(key));
547	memset(&data, 0, sizeof(data));
548	kbuf = keybuf;
549	dbuf = databuf;
550
551	DbcAuto dbc(dbp, 0, 0);
552	cout << "\tSymbol\tPrice" << endl
553		<< "\t======\t=====" << endl;
554
555	for (int ret = dbc->get(&key, &data, DB_FIRST);
556		ret == 0;
557		ret = dbc->get(&key, &data, DB_NEXT)) {
558		key.get_string(&kbuf, MAXKEYSIZE);
559		data.get_string(&dbuf, MAXDATASIZE);
560
561		cout << "\t" << keybuf << "\t" << databuf << endl;
562	}
563	cout << endl << flush;
564	dbc.close();
565}
566
567static void usage() {
568	cerr << "usage: " << progname << " -h home -l host:port [-CM]"
569	    << "[-r host:port][-R host:port]" << endl
570	    << "  [-a all|quorum][-b][-n nsites][-p priority][-v]" << endl;
571
572	cerr << "\t -h home (required; h stands for home directory)" << endl
573	    << "\t -l host:port (required; l stands for local)" << endl
574	    << "\t -C or -M (optional; start up as client or master)" << endl
575	    << "\t -r host:port (optional; r stands for remote; any "
576	    << "number of these" << endl
577	    << "\t               may be specified)" << endl
578	    << "\t -R host:port (optional; R stands for remote peer; only "
579	    << "one of" << endl
580	    << "\t               these may be specified)" << endl
581	    << "\t -a all|quorum (optional; a stands for ack policy)" << endl
582	    << "\t -b (optional; b stands for bulk)" << endl
583	    << "\t -n nsites (optional; number of sites in replication "
584	    << "group; defaults " << endl
585	    << "\t	    to 0 to try to dynamically compute nsites)" << endl
586	    << "\t -p priority (optional; defaults to 100)" << endl
587	    << "\t -v (optional; v stands for verbose)" << endl;
588
589	exit(EXIT_FAILURE);
590}
591
592int main(int argc, char **argv) {
593	RepConfigInfo config;
594	char ch, *portstr, *tmphost;
595	int tmpport;
596	bool tmppeer;
597
598	// Extract the command line parameters
599	while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) {
600		tmppeer = false;
601		switch (ch) {
602		case 'a':
603			if (strncmp(optarg, "all", 3) == 0)
604				config.ack_policy = DB_REPMGR_ACKS_ALL;
605			else if (strncmp(optarg, "quorum", 6) != 0)
606				usage();
607			break;
608		case 'b':
609			config.bulk = true;
610			break;
611		case 'C':
612			config.start_policy = DB_REP_CLIENT;
613			break;
614		case 'h':
615			config.home = optarg;
616			break;
617		case 'l':
618			config.this_host.host = strtok(optarg, ":");
619			if ((portstr = strtok(NULL, ":")) == NULL) {
620				cerr << "Bad host specification." << endl;
621				usage();
622			}
623			config.this_host.port = (unsigned short)atoi(portstr);
624			config.got_listen_address = true;
625			break;
626		case 'M':
627			config.start_policy = DB_REP_MASTER;
628			break;
629		case 'n':
630			config.totalsites = atoi(optarg);
631			break;
632		case 'p':
633			config.priority = atoi(optarg);
634			break;
635		case 'R':
636			tmppeer = true; // FALLTHROUGH
637		case 'r':
638			tmphost = strtok(optarg, ":");
639			if ((portstr = strtok(NULL, ":")) == NULL) {
640				cerr << "Bad host specification." << endl;
641				usage();
642			}
643			tmpport = (unsigned short)atoi(portstr);
644
645			config.addOtherHost(tmphost, tmpport, tmppeer);
646
647			break;
648		case 'v':
649			config.verbose = true;
650			break;
651		case '?':
652		default:
653			usage();
654		}
655	}
656
657	// Error check command line.
658	if ((!config.got_listen_address) || config.home == NULL)
659		usage();
660
661	RepQuoteExample runner;
662	try {
663		runner.init(&config);
664		runner.doloop();
665	} catch (DbException dbe) {
666		cerr << "Caught an exception during initialization or"
667			<< " processing: " << dbe.what() << endl;
668	}
669	runner.terminate();
670	return 0;
671}
672
673// This is a very simple thread that performs checkpoints at a fixed
674// time interval.  For a master site, the time interval is one minute
675// plus the duration of the checkpoint_delay timeout (30 seconds by
676// default.)  For a client site, the time interval is one minute.
677//
678void *checkpoint_thread(void *args)
679{
680	DbEnv *env;
681	APP_DATA *app;
682	int i, ret;
683
684	env = (DbEnv *)args;
685	app = (APP_DATA *)env->get_app_private();
686
687	for (;;) {
688		// Wait for one minute, polling once per second to see if
689		// application has finished.  When application has finished,
690		// terminate this thread.
691		//
692		for (i = 0; i < 60; i++) {
693			sleep(1);
694			if (app->app_finished == 1)
695				return ((void *)EXIT_SUCCESS);
696		}
697
698		// Perform a checkpoint.
699		if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
700			env->err(ret, "Could not perform checkpoint.\n");
701			return ((void *)EXIT_FAILURE);
702		}
703	}
704}
705
706// This is a simple log archive thread.  Once per minute, it removes all but
707// the most recent 3 logs that are safe to remove according to a call to
708// DBENV->log_archive().
709//
710// Log cleanup is needed to conserve disk space, but aggressive log cleanup
711// can cause more frequent client initializations if a client lags too far
712// behind the current master.  This can happen in the event of a slow client,
713// a network partition, or a new master that has not kept as many logs as the
714// previous master.
715//
716// The approach in this routine balances the need to mitigate against a
717// lagging client by keeping a few more of the most recent unneeded logs
718// with the need to conserve disk space by regularly cleaning up log files.
719// Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
720// flag) is not recommended for replication due to the risk of frequent
721// client initializations.
722//
723void *log_archive_thread(void *args)
724{
725	DbEnv *env;
726	APP_DATA *app;
727	char **begin, **list;
728	int i, listlen, logs_to_keep, minlog, ret;
729
730	env = (DbEnv *)args;
731	app = (APP_DATA *)env->get_app_private();
732	logs_to_keep = 3;
733
734	for (;;) {
735		// Wait for one minute, polling once per second to see if
736		// application has finished.  When application has finished,
737		// terminate this thread.
738		//
739		for (i = 0; i < 60; i++) {
740			sleep(1);
741			if (app->app_finished == 1)
742				return ((void *)EXIT_SUCCESS);
743		}
744
745		// Get the list of unneeded log files.
746		if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
747			env->err(ret, "Could not get log archive list.");
748			return ((void *)EXIT_FAILURE);
749		}
750		if (list != NULL) {
751			listlen = 0;
752			// Get the number of logs in the list.
753			for (begin = list; *begin != NULL; begin++, listlen++);
754			// Remove all but the logs_to_keep most recent
755			// unneeded log files.
756			//
757			minlog = listlen - logs_to_keep;
758			for (begin = list, i= 0; i < minlog; list++, i++) {
759				if ((ret = unlink(*list)) != 0) {
760					env->err(ret,
761					    "logclean: remove %s", *list);
762					env->errx(
763					    "logclean: Error remove %s", *list);
764					free(begin);
765					return ((void *)EXIT_FAILURE);
766				}
767			}
768			free(begin);
769		}
770	}
771}
772