1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_am.h" 14#include "dbinc/log.h" 15 16static int __rep_egen_init __P((ENV *, REP *)); 17static int __rep_gen_init __P((ENV *, REP *)); 18 19/* 20 * __rep_open -- 21 * Initialize the shared memory state for the replication system. 22 * 23 * PUBLIC: int __rep_open __P((ENV *)); 24 */ 25int 26__rep_open(env) 27 ENV *env; 28{ 29 DB_REP *db_rep; 30 REGENV *renv; 31 REGINFO *infop; 32 REP *rep; 33 int ret; 34 35 db_rep = env->rep_handle; 36 infop = env->reginfo; 37 renv = infop->primary; 38 ret = 0; 39 40 if (renv->rep_off == INVALID_ROFF) { 41 /* Must create the region. */ 42 if ((ret = __env_alloc(infop, sizeof(REP), &rep)) != 0) 43 return (ret); 44 memset(rep, 0, sizeof(*rep)); 45 46 /* 47 * We have the region; fill in the values. Some values may 48 * have been configured before we open the region, and those 49 * are taken from the DB_REP structure. 50 */ 51 if ((ret = __mutex_alloc( 52 env, MTX_REP_REGION, 0, &rep->mtx_region)) != 0) 53 return (ret); 54 /* 55 * Because we have no way to prevent deadlocks and cannot log 56 * changes made to it, we single-thread access to the client 57 * bookkeeping database. This is suboptimal, but it only gets 58 * accessed when messages arrive out-of-order, so it should 59 * stay small and not be used in a high-performance app. 60 */ 61 if ((ret = __mutex_alloc( 62 env, MTX_REP_DATABASE, 0, &rep->mtx_clientdb)) != 0) 63 return (ret); 64 65 if ((ret = __mutex_alloc( 66 env, MTX_REP_CHKPT, 0, &rep->mtx_ckp)) != 0) 67 return (ret); 68 69 if ((ret = __mutex_alloc( 70 env, MTX_REP_EVENT, 0, &rep->mtx_event)) != 0) 71 return (ret); 72 73 rep->newmaster_event_gen = 0; 74 rep->notified_egen = 0; 75 rep->lease_off = INVALID_ROFF; 76 rep->tally_off = INVALID_ROFF; 77 rep->v2tally_off = INVALID_ROFF; 78 rep->eid = db_rep->eid; 79 rep->master_id = DB_EID_INVALID; 80 rep->gen = 0; 81 rep->version = DB_REPVERSION; 82 rep->config = db_rep->config; 83 if ((ret = __rep_gen_init(env, rep)) != 0) 84 return (ret); 85 if ((ret = __rep_egen_init(env, rep)) != 0) 86 return (ret); 87 rep->gbytes = db_rep->gbytes; 88 rep->bytes = db_rep->bytes; 89 rep->request_gap = db_rep->request_gap; 90 rep->max_gap = db_rep->max_gap; 91 rep->config_nsites = db_rep->config_nsites; 92 rep->elect_timeout = db_rep->elect_timeout; 93 rep->full_elect_timeout = db_rep->full_elect_timeout; 94 rep->lease_timeout = db_rep->lease_timeout; 95 rep->clock_skew = db_rep->clock_skew; 96 rep->clock_base = db_rep->clock_base; 97 timespecclear(&rep->lease_duration); 98 timespecclear(&rep->grant_expire); 99 rep->chkpt_delay = db_rep->chkpt_delay; 100 rep->priority = db_rep->my_priority; 101 102 F_SET(rep, REP_F_NOARCHIVE); 103 104 /* Copy application type flags if set before env open. */ 105 if (F_ISSET(db_rep, DBREP_APP_REPMGR)) 106 F_SET(rep, REP_F_APP_REPMGR); 107 if (F_ISSET(db_rep, DBREP_APP_BASEAPI)) 108 F_SET(rep, REP_F_APP_BASEAPI); 109 110 /* Initialize encapsulating region. */ 111 renv->rep_off = R_OFFSET(infop, rep); 112 (void)time(&renv->rep_timestamp); 113 renv->op_timestamp = 0; 114 F_CLR(renv, DB_REGENV_REPLOCKED); 115 116#ifdef HAVE_REPLICATION_THREADS 117 if ((ret = __repmgr_open(env, rep)) != 0) 118 return (ret); 119#endif 120 } else { 121 rep = R_ADDR(infop, renv->rep_off); 122 /* 123 * Prevent an application type mismatch between a process 124 * and the environment it is trying to join. 125 */ 126 if ((F_ISSET(db_rep, DBREP_APP_REPMGR) && 127 F_ISSET(rep, REP_F_APP_BASEAPI)) || 128 (F_ISSET(db_rep, DBREP_APP_BASEAPI) && 129 F_ISSET(rep, REP_F_APP_REPMGR))) { 130 __db_errx(env, 131"Application type mismatch for a replication process joining the environment"); 132 return (EINVAL); 133 } 134#ifdef HAVE_REPLICATION_THREADS 135 if ((ret = __repmgr_join(env, rep)) != 0) 136 return (ret); 137#endif 138 } 139 140 db_rep->region = rep; 141 142 return (0); 143} 144 145/* 146 * __rep_env_refresh -- 147 * Replication-specific refresh of the ENV structure. 148 * 149 * PUBLIC: int __rep_env_refresh __P((ENV *)); 150 */ 151int 152__rep_env_refresh(env) 153 ENV *env; 154{ 155 DB_REP *db_rep; 156 REGENV *renv; 157 REGINFO *infop; 158 REP *rep; 159 int ret, t_ret; 160 161 db_rep = env->rep_handle; 162 rep = db_rep->region; 163 infop = env->reginfo; 164 renv = infop->primary; 165 ret = 0; 166 167 /* 168 * If we are the last reference closing the env, clear our knowledge of 169 * belonging to a group and that there is a valid handle where 170 * rep_start had already been called. 171 */ 172 if (renv->refcnt == 1) { 173 F_CLR(rep, REP_F_GROUP_ESTD); 174 F_CLR(rep, REP_F_START_CALLED); 175 } 176 177#ifdef HAVE_REPLICATION_THREADS 178 ret = __repmgr_env_refresh(env); 179#endif 180 181 /* 182 * If a private region, return the memory to the heap. Not needed for 183 * filesystem-backed or system shared memory regions, that memory isn't 184 * owned by any particular process. 185 */ 186 if (F_ISSET(env, ENV_PRIVATE)) { 187 db_rep = env->rep_handle; 188 if (db_rep->region != NULL) { 189 ret = __mutex_free(env, &db_rep->region->mtx_region); 190 if ((t_ret = __mutex_free(env, 191 &db_rep->region->mtx_clientdb)) != 0 && ret == 0) 192 ret = t_ret; 193 if ((t_ret = __mutex_free(env, 194 &db_rep->region->mtx_ckp)) != 0 && ret == 0) 195 ret = t_ret; 196 if ((t_ret = __mutex_free(env, 197 &db_rep->region->mtx_event)) != 0 && ret == 0) 198 ret = t_ret; 199 } 200 201 if (renv->rep_off != INVALID_ROFF) 202 __env_alloc_free(infop, R_ADDR(infop, renv->rep_off)); 203 } 204 205 env->rep_handle->region = NULL; 206 return (ret); 207} 208 209/* 210 * __rep_close -- 211 * Shut down all of replication. 212 * 213 * PUBLIC: int __rep_env_close __P((ENV *)); 214 */ 215int 216__rep_env_close(env) 217 ENV *env; 218{ 219 int ret, t_ret; 220 221 ret = __rep_preclose(env); 222 if ((t_ret = __rep_closefiles(env)) != 0 && ret == 0) 223 ret = t_ret; 224 return (ret); 225} 226 227/* 228 * __rep_preclose -- 229 * If we are a client, shut down our client database and send 230 * any outstanding bulk buffers. 231 * 232 * PUBLIC: int __rep_preclose __P((ENV *)); 233 */ 234int 235__rep_preclose(env) 236 ENV *env; 237{ 238 DB_LOG *dblp; 239 DB_REP *db_rep; 240 LOG *lp; 241 REP_BULK bulk; 242 int ret; 243 244 ret = 0; 245 246 db_rep = env->rep_handle; 247 dblp = env->lg_handle; 248 249 /* 250 * If we have a rep region, we can preclose. Otherwise, return. 251 * If we're on an error path from env open, we may not have 252 * a region, even though we have a handle. 253 */ 254 if (db_rep == NULL || db_rep->region == NULL) 255 return (ret); 256 MUTEX_LOCK(env, db_rep->region->mtx_clientdb); 257 if (db_rep->rep_db != NULL) { 258 ret = __db_close(db_rep->rep_db, NULL, DB_NOSYNC); 259 db_rep->rep_db = NULL; 260 } 261 /* 262 * We could be called early in an env_open error path, so 263 * only do this if we have a log region set up. 264 */ 265 if (dblp == NULL) 266 goto out; 267 lp = dblp->reginfo.primary; 268 /* 269 * If we have something in the bulk buffer, send anything in it 270 * if we are able to. 271 */ 272 if (lp->bulk_off != 0 && db_rep->send != NULL) { 273 memset(&bulk, 0, sizeof(bulk)); 274 bulk.addr = R_ADDR(&dblp->reginfo, lp->bulk_buf); 275 bulk.offp = &lp->bulk_off; 276 bulk.len = lp->bulk_len; 277 bulk.type = REP_BULK_LOG; 278 bulk.eid = DB_EID_BROADCAST; 279 bulk.flagsp = &lp->bulk_flags; 280 /* 281 * Ignore send errors here. This can be called on the 282 * env->close path - make a best attempt to send. 283 */ 284 (void)__rep_send_bulk(env, &bulk, 0); 285 } 286out: MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb); 287 return (ret); 288} 289 290/* 291 * __rep_closefiles -- 292 * If we were a client and are now a master, close all databases 293 * we've opened while applying messages as a client. This can 294 * be called from __env_close and we need to check if the env, 295 * handles and regions are set up, or not. 296 * 297 * PUBLIC: int __rep_closefiles __P((ENV *)); 298 */ 299int 300__rep_closefiles(env) 301 ENV *env; 302{ 303 DB_LOG *dblp; 304 DB_REP *db_rep; 305 int ret; 306 307 ret = 0; 308 309 db_rep = env->rep_handle; 310 dblp = env->lg_handle; 311 312 if (db_rep == NULL || db_rep->region == NULL) 313 return (ret); 314 if (dblp == NULL) 315 return (ret); 316 if ((ret = __dbreg_close_files(env, 0)) == 0) 317 F_CLR(db_rep, DBREP_OPENFILES); 318 319 return (ret); 320} 321 322/* 323 * __rep_egen_init -- 324 * Initialize the value of egen in the region. Called only from 325 * __rep_region_init, which is guaranteed to be single-threaded 326 * as we create the rep region. We set the rep->egen field which 327 * is normally protected by db_rep->region->mutex. 328 */ 329static int 330__rep_egen_init(env, rep) 331 ENV *env; 332 REP *rep; 333{ 334 DB_FH *fhp; 335 int ret; 336 size_t cnt; 337 char *p; 338 339 if ((ret = __db_appname(env, 340 DB_APP_NONE, REP_EGENNAME, NULL, &p)) != 0) 341 return (ret); 342 /* 343 * If the file doesn't exist, create it now and initialize with 1. 344 */ 345 if (__os_exists(env, p, NULL) != 0) { 346 rep->egen = rep->gen + 1; 347 if ((ret = __rep_write_egen(env, rep, rep->egen)) != 0) 348 goto err; 349 } else { 350 /* 351 * File exists, open it and read in our egen. 352 */ 353 if ((ret = __os_open(env, p, 0, 354 DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0) 355 goto err; 356 if ((ret = __os_read(env, fhp, &rep->egen, sizeof(u_int32_t), 357 &cnt)) != 0 || cnt != sizeof(u_int32_t)) 358 goto err1; 359 RPRINT(env, DB_VERB_REP_MISC, 360 (env, "Read in egen %lu", (u_long)rep->egen)); 361err1: (void)__os_closehandle(env, fhp); 362 } 363err: __os_free(env, p); 364 return (ret); 365} 366 367/* 368 * __rep_write_egen -- 369 * Write out the egen into the env file. 370 * 371 * PUBLIC: int __rep_write_egen __P((ENV *, REP *, u_int32_t)); 372 */ 373int 374__rep_write_egen(env, rep, egen) 375 ENV *env; 376 REP *rep; 377 u_int32_t egen; 378{ 379 DB_FH *fhp; 380 int ret; 381 size_t cnt; 382 char *p; 383 384 /* 385 * If running in-memory replication, return without any file 386 * operations. 387 */ 388 if (FLD_ISSET(rep->config, REP_C_INMEM)) { 389 return (0); 390 } 391 392 if ((ret = __db_appname(env, 393 DB_APP_NONE, REP_EGENNAME, NULL, &p)) != 0) 394 return (ret); 395 if ((ret = __os_open( 396 env, p, 0, DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) == 0) { 397 if ((ret = __os_write(env, fhp, &egen, sizeof(u_int32_t), 398 &cnt)) != 0 || ((ret = __os_fsync(env, fhp)) != 0)) 399 __db_err(env, ret, "%s", p); 400 (void)__os_closehandle(env, fhp); 401 } 402 __os_free(env, p); 403 return (ret); 404} 405 406/* 407 * __rep_gen_init -- 408 * Initialize the value of gen in the region. Called only from 409 * __rep_region_init, which is guaranteed to be single-threaded 410 * as we create the rep region. We set the rep->gen field which 411 * is normally protected by db_rep->region->mutex. 412 */ 413static int 414__rep_gen_init(env, rep) 415 ENV *env; 416 REP *rep; 417{ 418 DB_FH *fhp; 419 int ret; 420 size_t cnt; 421 char *p; 422 423 if ((ret = __db_appname(env, 424 DB_APP_NONE, REP_GENNAME, NULL, &p)) != 0) 425 return (ret); 426 /* 427 * If the file doesn't exist, create it now and initialize with 0. 428 */ 429 if (__os_exists(env, p, NULL) != 0) { 430 rep->gen = 0; 431 if ((ret = __rep_write_gen(env, rep, rep->gen)) != 0) 432 goto err; 433 } else { 434 /* 435 * File exists, open it and read in our gen. 436 */ 437 if ((ret = __os_open(env, p, 0, 438 DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0) 439 goto err; 440 if ((ret = __os_read(env, fhp, &rep->gen, sizeof(u_int32_t), 441 &cnt)) < 0 || cnt == 0) 442 goto err1; 443 RPRINT(env, DB_VERB_REP_MISC, (env, "Read in gen %lu", 444 (u_long)rep->gen)); 445err1: (void)__os_closehandle(env, fhp); 446 } 447err: __os_free(env, p); 448 return (ret); 449} 450 451/* 452 * __rep_write_gen -- 453 * Write out the gen into the env file. 454 * 455 * PUBLIC: int __rep_write_gen __P((ENV *, REP *, u_int32_t)); 456 */ 457int 458__rep_write_gen(env, rep, gen) 459 ENV *env; 460 REP *rep; 461 u_int32_t gen; 462{ 463 DB_FH *fhp; 464 int ret; 465 size_t cnt; 466 char *p; 467 468 /* 469 * If running in-memory replication, return without any file 470 * operations. 471 */ 472 if (FLD_ISSET(rep->config, REP_C_INMEM)) { 473 return (0); 474 } 475 476 if ((ret = __db_appname(env, 477 DB_APP_NONE, REP_GENNAME, NULL, &p)) != 0) 478 return (ret); 479 if ((ret = __os_open( 480 env, p, 0, DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) == 0) { 481 if ((ret = __os_write(env, fhp, &gen, sizeof(u_int32_t), 482 &cnt)) != 0 || ((ret = __os_fsync(env, fhp)) != 0)) 483 __db_err(env, ret, "%s", p); 484 (void)__os_closehandle(env, fhp); 485 } 486 __os_free(env, p); 487 return (ret); 488} 489