1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/log.h"
15
16static int __rep_egen_init  __P((ENV *, REP *));
17static int __rep_gen_init  __P((ENV *, REP *));
18
19/*
20 * __rep_open --
21 *	Initialize the shared memory state for the replication system.
22 *
23 * PUBLIC: int __rep_open __P((ENV *));
24 */
25int
26__rep_open(env)
27	ENV *env;
28{
29	DB_REP *db_rep;
30	REGENV *renv;
31	REGINFO *infop;
32	REP *rep;
33	int ret;
34
35	db_rep = env->rep_handle;
36	infop = env->reginfo;
37	renv = infop->primary;
38	ret = 0;
39
40	if (renv->rep_off == INVALID_ROFF) {
41		/* Must create the region. */
42		if ((ret = __env_alloc(infop, sizeof(REP), &rep)) != 0)
43			return (ret);
44		memset(rep, 0, sizeof(*rep));
45
46		/*
47		 * We have the region; fill in the values.  Some values may
48		 * have been configured before we open the region, and those
49		 * are taken from the DB_REP structure.
50		 */
51		if ((ret = __mutex_alloc(
52		    env, MTX_REP_REGION, 0, &rep->mtx_region)) != 0)
53			return (ret);
54		/*
55		 * Because we have no way to prevent deadlocks and cannot log
56		 * changes made to it, we single-thread access to the client
57		 * bookkeeping database.  This is suboptimal, but it only gets
58		 * accessed when messages arrive out-of-order, so it should
59		 * stay small and not be used in a high-performance app.
60		 */
61		if ((ret = __mutex_alloc(
62		    env, MTX_REP_DATABASE, 0, &rep->mtx_clientdb)) != 0)
63			return (ret);
64
65		if ((ret = __mutex_alloc(
66		    env, MTX_REP_CHKPT, 0, &rep->mtx_ckp)) != 0)
67			return (ret);
68
69		if ((ret = __mutex_alloc(
70		    env, MTX_REP_EVENT, 0, &rep->mtx_event)) != 0)
71			return (ret);
72
73		rep->newmaster_event_gen = 0;
74		rep->notified_egen = 0;
75		rep->lease_off = INVALID_ROFF;
76		rep->tally_off = INVALID_ROFF;
77		rep->v2tally_off = INVALID_ROFF;
78		rep->eid = db_rep->eid;
79		rep->master_id = DB_EID_INVALID;
80		rep->gen = 0;
81		rep->version = DB_REPVERSION;
82		rep->config = db_rep->config;
83		if ((ret = __rep_gen_init(env, rep)) != 0)
84			return (ret);
85		if ((ret = __rep_egen_init(env, rep)) != 0)
86			return (ret);
87		rep->gbytes = db_rep->gbytes;
88		rep->bytes = db_rep->bytes;
89		rep->request_gap = db_rep->request_gap;
90		rep->max_gap = db_rep->max_gap;
91		rep->config_nsites = db_rep->config_nsites;
92		rep->elect_timeout = db_rep->elect_timeout;
93		rep->full_elect_timeout = db_rep->full_elect_timeout;
94		rep->lease_timeout = db_rep->lease_timeout;
95		rep->clock_skew = db_rep->clock_skew;
96		rep->clock_base = db_rep->clock_base;
97		timespecclear(&rep->lease_duration);
98		timespecclear(&rep->grant_expire);
99		rep->chkpt_delay = db_rep->chkpt_delay;
100		rep->priority = db_rep->my_priority;
101
102		F_SET(rep, REP_F_NOARCHIVE);
103
104		/* Copy application type flags if set before env open. */
105		if (F_ISSET(db_rep, DBREP_APP_REPMGR))
106			F_SET(rep, REP_F_APP_REPMGR);
107		if (F_ISSET(db_rep, DBREP_APP_BASEAPI))
108			F_SET(rep, REP_F_APP_BASEAPI);
109
110		/* Initialize encapsulating region. */
111		renv->rep_off = R_OFFSET(infop, rep);
112		(void)time(&renv->rep_timestamp);
113		renv->op_timestamp = 0;
114		F_CLR(renv, DB_REGENV_REPLOCKED);
115
116#ifdef HAVE_REPLICATION_THREADS
117		if ((ret = __repmgr_open(env, rep)) != 0)
118			return (ret);
119#endif
120	} else {
121		rep = R_ADDR(infop, renv->rep_off);
122		/*
123		 * Prevent an application type mismatch between a process
124		 * and the environment it is trying to join.
125		 */
126		if ((F_ISSET(db_rep, DBREP_APP_REPMGR) &&
127		    F_ISSET(rep, REP_F_APP_BASEAPI)) ||
128		    (F_ISSET(db_rep, DBREP_APP_BASEAPI) &&
129		    F_ISSET(rep, REP_F_APP_REPMGR))) {
130			__db_errx(env,
131"Application type mismatch for a replication process joining the environment");
132			return (EINVAL);
133		}
134#ifdef HAVE_REPLICATION_THREADS
135		if ((ret = __repmgr_join(env, rep)) != 0)
136			return (ret);
137#endif
138	}
139
140	db_rep->region = rep;
141
142	return (0);
143}
144
145/*
146 * __rep_env_refresh --
147 *	Replication-specific refresh of the ENV structure.
148 *
149 * PUBLIC: int __rep_env_refresh __P((ENV *));
150 */
151int
152__rep_env_refresh(env)
153	ENV *env;
154{
155	DB_REP *db_rep;
156	REGENV *renv;
157	REGINFO *infop;
158	REP *rep;
159	int ret, t_ret;
160
161	db_rep = env->rep_handle;
162	rep = db_rep->region;
163	infop = env->reginfo;
164	renv = infop->primary;
165	ret = 0;
166
167	/*
168	 * If we are the last reference closing the env, clear our knowledge of
169	 * belonging to a group and that there is a valid handle where
170	 * rep_start had already been called.
171	 */
172	if (renv->refcnt == 1) {
173		F_CLR(rep, REP_F_GROUP_ESTD);
174		F_CLR(rep, REP_F_START_CALLED);
175	}
176
177#ifdef HAVE_REPLICATION_THREADS
178	ret = __repmgr_env_refresh(env);
179#endif
180
181	/*
182	 * If a private region, return the memory to the heap.  Not needed for
183	 * filesystem-backed or system shared memory regions, that memory isn't
184	 * owned by any particular process.
185	 */
186	if (F_ISSET(env, ENV_PRIVATE)) {
187		db_rep = env->rep_handle;
188		if (db_rep->region != NULL) {
189			ret = __mutex_free(env, &db_rep->region->mtx_region);
190			if ((t_ret = __mutex_free(env,
191			    &db_rep->region->mtx_clientdb)) != 0 && ret == 0)
192				ret = t_ret;
193			if ((t_ret = __mutex_free(env,
194			    &db_rep->region->mtx_ckp)) != 0 && ret == 0)
195				ret = t_ret;
196			if ((t_ret = __mutex_free(env,
197			    &db_rep->region->mtx_event)) != 0 && ret == 0)
198				ret = t_ret;
199		}
200
201		if (renv->rep_off != INVALID_ROFF)
202			__env_alloc_free(infop, R_ADDR(infop, renv->rep_off));
203	}
204
205	env->rep_handle->region = NULL;
206	return (ret);
207}
208
209/*
210 * __rep_close --
211 *      Shut down all of replication.
212 *
213 * PUBLIC: int __rep_env_close __P((ENV *));
214 */
215int
216__rep_env_close(env)
217	ENV *env;
218{
219	int ret, t_ret;
220
221	ret = __rep_preclose(env);
222	if ((t_ret = __rep_closefiles(env)) != 0 && ret == 0)
223		ret = t_ret;
224	return (ret);
225}
226
227/*
228 * __rep_preclose --
229 *	If we are a client, shut down our client database and send
230 * any outstanding bulk buffers.
231 *
232 * PUBLIC: int __rep_preclose __P((ENV *));
233 */
234int
235__rep_preclose(env)
236	ENV *env;
237{
238	DB_LOG *dblp;
239	DB_REP *db_rep;
240	LOG *lp;
241	REP_BULK bulk;
242	int ret;
243
244	ret = 0;
245
246	db_rep = env->rep_handle;
247	dblp = env->lg_handle;
248
249	/*
250	 * If we have a rep region, we can preclose.  Otherwise, return.
251	 * If we're on an error path from env open, we may not have
252	 * a region, even though we have a handle.
253	 */
254	if (db_rep == NULL || db_rep->region == NULL)
255		return (ret);
256	MUTEX_LOCK(env, db_rep->region->mtx_clientdb);
257	if (db_rep->rep_db != NULL) {
258		ret = __db_close(db_rep->rep_db, NULL, DB_NOSYNC);
259		db_rep->rep_db = NULL;
260	}
261	/*
262	 * We could be called early in an env_open error path, so
263	 * only do this if we have a log region set up.
264	 */
265	if (dblp == NULL)
266		goto out;
267	lp = dblp->reginfo.primary;
268	/*
269	 * If we have something in the bulk buffer, send anything in it
270	 * if we are able to.
271	 */
272	if (lp->bulk_off != 0 && db_rep->send != NULL) {
273		memset(&bulk, 0, sizeof(bulk));
274		bulk.addr = R_ADDR(&dblp->reginfo, lp->bulk_buf);
275		bulk.offp = &lp->bulk_off;
276		bulk.len = lp->bulk_len;
277		bulk.type = REP_BULK_LOG;
278		bulk.eid = DB_EID_BROADCAST;
279		bulk.flagsp = &lp->bulk_flags;
280		/*
281		 * Ignore send errors here.  This can be called on the
282		 * env->close path - make a best attempt to send.
283		 */
284		(void)__rep_send_bulk(env, &bulk, 0);
285	}
286out:	MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb);
287	return (ret);
288}
289
290/*
291 * __rep_closefiles --
292 *	If we were a client and are now a master, close all databases
293 *	we've opened while applying messages as a client.  This can
294 *	be called from __env_close and we need to check if the env,
295 *	handles and regions are set up, or not.
296 *
297 * PUBLIC: int __rep_closefiles __P((ENV *));
298 */
299int
300__rep_closefiles(env)
301	ENV *env;
302{
303	DB_LOG *dblp;
304	DB_REP *db_rep;
305	int ret;
306
307	ret = 0;
308
309	db_rep = env->rep_handle;
310	dblp = env->lg_handle;
311
312	if (db_rep == NULL || db_rep->region == NULL)
313		return (ret);
314	if (dblp == NULL)
315		return (ret);
316	if ((ret = __dbreg_close_files(env, 0)) == 0)
317		F_CLR(db_rep, DBREP_OPENFILES);
318
319	return (ret);
320}
321
322/*
323 * __rep_egen_init --
324 *	Initialize the value of egen in the region.  Called only from
325 *	__rep_region_init, which is guaranteed to be single-threaded
326 *	as we create the rep region.  We set the rep->egen field which
327 *	is normally protected by db_rep->region->mutex.
328 */
329static int
330__rep_egen_init(env, rep)
331	ENV *env;
332	REP *rep;
333{
334	DB_FH *fhp;
335	int ret;
336	size_t cnt;
337	char *p;
338
339	if ((ret = __db_appname(env,
340	    DB_APP_NONE, REP_EGENNAME, NULL, &p)) != 0)
341		return (ret);
342	/*
343	 * If the file doesn't exist, create it now and initialize with 1.
344	 */
345	if (__os_exists(env, p, NULL) != 0) {
346		rep->egen = rep->gen + 1;
347		if ((ret = __rep_write_egen(env, rep, rep->egen)) != 0)
348			goto err;
349	} else {
350		/*
351		 * File exists, open it and read in our egen.
352		 */
353		if ((ret = __os_open(env, p, 0,
354		    DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0)
355			goto err;
356		if ((ret = __os_read(env, fhp, &rep->egen, sizeof(u_int32_t),
357		    &cnt)) != 0 || cnt != sizeof(u_int32_t))
358			goto err1;
359		RPRINT(env, DB_VERB_REP_MISC,
360		    (env, "Read in egen %lu", (u_long)rep->egen));
361err1:		 (void)__os_closehandle(env, fhp);
362	}
363err:	__os_free(env, p);
364	return (ret);
365}
366
367/*
368 * __rep_write_egen --
369 *	Write out the egen into the env file.
370 *
371 * PUBLIC: int __rep_write_egen __P((ENV *, REP *, u_int32_t));
372 */
373int
374__rep_write_egen(env, rep, egen)
375	ENV *env;
376	REP *rep;
377	u_int32_t egen;
378{
379	DB_FH *fhp;
380	int ret;
381	size_t cnt;
382	char *p;
383
384	/*
385	 * If running in-memory replication, return without any file
386	 * operations.
387	 */
388	if (FLD_ISSET(rep->config, REP_C_INMEM)) {
389		return (0);
390	}
391
392	if ((ret = __db_appname(env,
393	    DB_APP_NONE, REP_EGENNAME, NULL, &p)) != 0)
394		return (ret);
395	if ((ret = __os_open(
396	    env, p, 0, DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) == 0) {
397		if ((ret = __os_write(env, fhp, &egen, sizeof(u_int32_t),
398		    &cnt)) != 0 || ((ret = __os_fsync(env, fhp)) != 0))
399			__db_err(env, ret, "%s", p);
400		(void)__os_closehandle(env, fhp);
401	}
402	__os_free(env, p);
403	return (ret);
404}
405
406/*
407 * __rep_gen_init --
408 *	Initialize the value of gen in the region.  Called only from
409 *	__rep_region_init, which is guaranteed to be single-threaded
410 *	as we create the rep region.  We set the rep->gen field which
411 *	is normally protected by db_rep->region->mutex.
412 */
413static int
414__rep_gen_init(env, rep)
415	ENV *env;
416	REP *rep;
417{
418	DB_FH *fhp;
419	int ret;
420	size_t cnt;
421	char *p;
422
423	if ((ret = __db_appname(env,
424	    DB_APP_NONE, REP_GENNAME, NULL, &p)) != 0)
425		return (ret);
426	/*
427	 * If the file doesn't exist, create it now and initialize with 0.
428	 */
429	if (__os_exists(env, p, NULL) != 0) {
430		rep->gen = 0;
431		if ((ret = __rep_write_gen(env, rep, rep->gen)) != 0)
432			goto err;
433	} else {
434		/*
435		 * File exists, open it and read in our gen.
436		 */
437		if ((ret = __os_open(env, p, 0,
438		    DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0)
439			goto err;
440		if ((ret = __os_read(env, fhp, &rep->gen, sizeof(u_int32_t),
441		    &cnt)) < 0 || cnt == 0)
442			goto err1;
443		RPRINT(env, DB_VERB_REP_MISC, (env, "Read in gen %lu",
444		    (u_long)rep->gen));
445err1:		 (void)__os_closehandle(env, fhp);
446	}
447err:	__os_free(env, p);
448	return (ret);
449}
450
451/*
452 * __rep_write_gen --
453 *	Write out the gen into the env file.
454 *
455 * PUBLIC: int __rep_write_gen __P((ENV *, REP *, u_int32_t));
456 */
457int
458__rep_write_gen(env, rep, gen)
459	ENV *env;
460	REP *rep;
461	u_int32_t gen;
462{
463	DB_FH *fhp;
464	int ret;
465	size_t cnt;
466	char *p;
467
468	/*
469	 * If running in-memory replication, return without any file
470	 * operations.
471	 */
472	if (FLD_ISSET(rep->config, REP_C_INMEM)) {
473		return (0);
474	}
475
476	if ((ret = __db_appname(env,
477	    DB_APP_NONE, REP_GENNAME, NULL, &p)) != 0)
478		return (ret);
479	if ((ret = __os_open(
480	    env, p, 0, DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) == 0) {
481		if ((ret = __os_write(env, fhp, &gen, sizeof(u_int32_t),
482		    &cnt)) != 0 || ((ret = __os_fsync(env, fhp)) != 0))
483			__db_err(env, ret, "%s", p);
484		(void)__os_closehandle(env, fhp);
485	}
486	__os_free(env, p);
487	return (ret);
488}
489