1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_method.c,v 1.48 2008/04/25 19:02:47 alanb Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14static int __repmgr_await_threads __P((ENV *)); 15 16/* 17 * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); 18 */ 19int 20__repmgr_start(dbenv, nthreads, flags) 21 DB_ENV *dbenv; 22 int nthreads; 23 u_int32_t flags; 24{ 25 DBT my_addr; 26 DB_REP *db_rep; 27 ENV *env; 28 REPMGR_RUNNABLE *selector, *messenger; 29 int ret, i; 30 31 env = dbenv->env; 32 db_rep = env->rep_handle; 33 34 if (!F_ISSET(env, ENV_THREAD)) { 35 __db_errx(env, 36 "Replication Manager needs an environment with DB_THREAD"); 37 return (EINVAL); 38 } 39 40 /* Check that the required initialization has been done. */ 41 if (db_rep->my_addr.port == 0) { 42 __db_errx(env, 43 "repmgr_set_local_site must be called before repmgr_start"); 44 return (EINVAL); 45 } 46 47 if (db_rep->selector != NULL || db_rep->finished) { 48 __db_errx(env, 49 "DB_ENV->repmgr_start may not be called more than once"); 50 return (EINVAL); 51 } 52 53 switch (flags) { 54 case DB_REP_CLIENT: 55 case DB_REP_ELECTION: 56 case DB_REP_MASTER: 57 break; 58 default: 59 __db_errx(env, 60 "repmgr_start: unrecognized flags parameter value"); 61 return (EINVAL); 62 } 63 64 if (nthreads <= 0) { 65 __db_errx(env, 66 "repmgr_start: nthreads parameter must be >= 1"); 67 return (EINVAL); 68 } 69 70 if ((ret = __os_calloc(env, (u_int)nthreads, 71 sizeof(REPMGR_RUNNABLE *), &db_rep->messengers)) != 0) 72 return (ret); 73 db_rep->nthreads = nthreads; 74 75 if ((ret = __repmgr_net_init(env, db_rep)) != 0 || 76 (ret = __repmgr_init_sync(env, db_rep)) != 0 || 77 (ret = __rep_set_transport(dbenv, SELF_EID, __repmgr_send)) != 0) 78 return (ret); 79 80 /* 81 * Make some sort of call to rep_start before starting other threads, to 82 * ensure that incoming messages being processed always have a rep 83 * context properly configured. 84 */ 85 if ((db_rep->init_policy = flags) == DB_REP_MASTER) 86 ret = __repmgr_become_master(env); 87 else { 88 if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) 89 return (ret); 90 ret = __rep_start(dbenv, &my_addr, DB_REP_CLIENT); 91 __os_free(env, my_addr.data); 92 if (ret == 0) { 93 LOCK_MUTEX(db_rep->mutex); 94 ret = __repmgr_init_election(env, ELECT_SEEK_MASTER); 95 UNLOCK_MUTEX(db_rep->mutex); 96 } 97 } 98 if (ret != 0) 99 return (ret); 100 101 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), &selector)) 102 != 0) 103 return (ret); 104 selector->env = env; 105 selector->run = __repmgr_select_thread; 106 if ((ret = __repmgr_thread_start(env, selector)) != 0) { 107 __db_err(env, ret, "can't start selector thread"); 108 __os_free(env, selector); 109 return (ret); 110 } 111 db_rep->selector = selector; 112 113 for (i=0; i<nthreads; i++) { 114 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), 115 &messenger)) != 0) 116 return (ret); 117 118 messenger->env = env; 119 messenger->run = __repmgr_msg_thread; 120 if ((ret = __repmgr_thread_start(env, messenger)) != 0) { 121 __os_free(env, messenger); 122 return (ret); 123 } 124 db_rep->messengers[i] = messenger; 125 } 126 127 return (ret); 128} 129 130/* 131 * PUBLIC: int __repmgr_close __P((ENV *)); 132 */ 133int 134__repmgr_close(env) 135 ENV *env; 136{ 137 DB_REP *db_rep; 138 int ret, t_ret; 139 140 ret = 0; 141 db_rep = env->rep_handle; 142 if (db_rep->selector != NULL) { 143 RPRINT(env, DB_VERB_REPMGR_MISC, 144 (env, "Stopping repmgr threads")); 145 ret = __repmgr_stop_threads(env); 146 if ((t_ret = __repmgr_await_threads(env)) != 0 && ret == 0) 147 ret = t_ret; 148 RPRINT(env, DB_VERB_REPMGR_MISC, 149 (env, "Repmgr threads are finished")); 150 } 151 152 if ((t_ret = __repmgr_net_close(env)) != 0 && ret == 0) 153 ret = t_ret; 154 155 if ((t_ret = __repmgr_close_sync(env)) != 0 && ret == 0) 156 ret = t_ret; 157 158 return (ret); 159} 160 161/* 162 * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); 163 */ 164int 165__repmgr_set_ack_policy(dbenv, policy) 166 DB_ENV *dbenv; 167 int policy; 168{ 169 ENV *env; 170 171 env = dbenv->env; 172 173 switch (policy) { 174 case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */ 175 case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */ 176 case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */ 177 case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */ 178 case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */ 179 case DB_REPMGR_ACKS_QUORUM: 180 env->rep_handle->perm_policy = policy; 181 return (0); 182 default: 183 __db_errx(env, 184 "unknown ack_policy in DB_ENV->repmgr_set_ack_policy"); 185 return (EINVAL); 186 } 187} 188 189/* 190 * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); 191 */ 192int 193__repmgr_get_ack_policy(dbenv, policy) 194 DB_ENV *dbenv; 195 int *policy; 196{ 197 ENV *env; 198 199 env = dbenv->env; 200 *policy = env->rep_handle->perm_policy; 201 return (0); 202} 203 204/* 205 * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *)); 206 */ 207int 208__repmgr_env_create(env, db_rep) 209 ENV *env; 210 DB_REP *db_rep; 211{ 212 int ret; 213 214 /* Set some default values. */ 215 db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; 216 db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; 217 db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; 218 db_rep->config_nsites = 0; 219 db_rep->peer = DB_EID_INVALID; 220 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; 221 222#ifdef DB_WIN32 223 db_rep->waiters = NULL; 224#else 225 db_rep->read_pipe = db_rep->write_pipe = -1; 226#endif 227 if ((ret = __repmgr_net_create(db_rep)) == 0) 228 ret = __repmgr_queue_create(env, db_rep); 229 230 return (ret); 231} 232 233/* 234 * PUBLIC: void __repmgr_env_destroy __P((ENV *, DB_REP *)); 235 */ 236void 237__repmgr_env_destroy(env, db_rep) 238 ENV *env; 239 DB_REP *db_rep; 240{ 241 __repmgr_queue_destroy(env); 242 __repmgr_net_destroy(env, db_rep); 243 if (db_rep->messengers != NULL) { 244 __os_free(env, db_rep->messengers); 245 db_rep->messengers = NULL; 246 } 247} 248 249/* 250 * PUBLIC: int __repmgr_stop_threads __P((ENV *)); 251 */ 252int 253__repmgr_stop_threads(env) 254 ENV *env; 255{ 256 DB_REP *db_rep; 257 REPMGR_CONNECTION *conn; 258 int ret; 259 260 db_rep = env->rep_handle; 261 262 /* 263 * Hold mutex for the purpose of waking up threads, but then get out of 264 * the way to let them clean up and exit. 265 */ 266 LOCK_MUTEX(db_rep->mutex); 267 db_rep->finished = TRUE; 268 if (db_rep->elect_thread != NULL && 269 (ret = __repmgr_signal(&db_rep->check_election)) != 0) 270 goto unlock; 271 272 if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) 273 goto unlock; 274 275 TAILQ_FOREACH(conn, &db_rep->connections, entries) { 276 if (conn->blockers > 0 && 277 ((ret = __repmgr_signal(&conn->drained)) != 0)) 278 goto unlock; 279 } 280 UNLOCK_MUTEX(db_rep->mutex); 281 282 return (__repmgr_wake_main_thread(env)); 283 284unlock: 285 UNLOCK_MUTEX(db_rep->mutex); 286 return (ret); 287} 288 289static int 290__repmgr_await_threads(env) 291 ENV *env; 292{ 293 DB_REP *db_rep; 294 REPMGR_RUNNABLE *messenger; 295 int ret, t_ret, i; 296 297 db_rep = env->rep_handle; 298 ret = 0; 299 if (db_rep->elect_thread != NULL) { 300 ret = __repmgr_thread_join(db_rep->elect_thread); 301 __os_free(env, db_rep->elect_thread); 302 db_rep->elect_thread = NULL; 303 } 304 305 for (i=0; i<db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { 306 messenger = db_rep->messengers[i]; 307 if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0) 308 ret = t_ret; 309 __os_free(env, messenger); 310 db_rep->messengers[i] = NULL; /* necessary? */ 311 } 312 __os_free(env, db_rep->messengers); 313 db_rep->messengers = NULL; 314 315 if (db_rep->selector != NULL) { 316 if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 && 317 ret == 0) 318 ret = t_ret; 319 __os_free(env, db_rep->selector); 320 db_rep->selector = NULL; 321 } 322 323 return (ret); 324} 325 326/* 327 * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int, 328 * PUBLIC: u_int32_t)); 329 */ 330int 331__repmgr_set_local_site(dbenv, host, port, flags) 332 DB_ENV *dbenv; 333 const char *host; 334 u_int port; 335 u_int32_t flags; 336{ 337 ADDRINFO *address_list; 338 DB_REP *db_rep; 339 ENV *env; 340 repmgr_netaddr_t addr; 341 int locked, ret; 342 343 env = dbenv->env; 344 345 if (flags != 0) 346 return (__db_ferr(env, "DB_ENV->repmgr_set_local_site", 0)); 347 348 db_rep = env->rep_handle; 349 if (db_rep->my_addr.port != 0) { 350 __db_errx(env, "Listen address already set"); 351 return (EINVAL); 352 } 353 354 if (host == NULL) { 355 __db_errx(env, 356 "repmgr_set_local_site: host name is required"); 357 return (EINVAL); 358 } 359 360 if ((ret = __repmgr_getaddr( 361 env, host, port, AI_PASSIVE, &address_list)) != 0) 362 return (ret); 363 364 if ((ret = __repmgr_pack_netaddr(env, 365 host, port, address_list, &addr)) != 0) { 366 __os_freeaddrinfo(env, address_list); 367 return (ret); 368 } 369 370 if (REPMGR_SYNC_INITED(db_rep)) { 371 LOCK_MUTEX(db_rep->mutex); 372 locked = TRUE; 373 } else 374 locked = FALSE; 375 376 memcpy(&db_rep->my_addr, &addr, sizeof(addr)); 377 378 if (locked) 379 UNLOCK_MUTEX(db_rep->mutex); 380 return (0); 381} 382 383/* 384 * If the application only calls this method from a single thread (e.g., during 385 * its initialization), it will avoid the problems with the non-thread-safe host 386 * name lookup. In any case, if we relegate the blocking lookup to here it 387 * won't affect our select() loop. 388 * 389 * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int, 390 * PUBLIC: int *, u_int32_t)); 391 */ 392int 393__repmgr_add_remote_site(dbenv, host, port, eidp, flags) 394 DB_ENV *dbenv; 395 const char *host; 396 u_int port; 397 int *eidp; 398 u_int32_t flags; 399{ 400 DB_REP *db_rep; 401 ENV *env; 402 REPMGR_SITE *site; 403 int eid, locked, ret; 404 405 env = dbenv->env; 406 407 if ((ret = __db_fchk(env, 408 "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0) 409 return (ret); 410 411 if (host == NULL) { 412 __db_errx(env, 413 "repmgr_add_remote_site: host name is required"); 414 return (EINVAL); 415 } 416 417 db_rep = env->rep_handle; 418 419 if (REPMGR_SYNC_INITED(db_rep)) { 420 LOCK_MUTEX(db_rep->mutex); 421 locked = TRUE; 422 } else 423 locked = FALSE; 424 425 switch (ret = __repmgr_add_site(env, host, port, &site)) { 426 case 0: 427 case EEXIST: 428 ret = 0; 429 break; 430 default: 431 goto unlock; 432 } 433 eid = EID_FROM_SITE(site); 434 435 if (LF_ISSET(DB_REPMGR_PEER)) 436 db_rep->peer = eid; 437 if (eidp != NULL) 438 *eidp = eid; 439 440unlock: if (locked) 441 UNLOCK_MUTEX(db_rep->mutex); 442 return (ret); 443} 444