1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_util.c,v 1.45 2008/04/30 02:33:34 alexg Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14/* 15 * Schedules a future attempt to re-establish a connection with the given site. 16 * Usually, we wait the configured retry_wait period. But if the "immediate" 17 * parameter is given as TRUE, we'll make the wait time 0, and put the request 18 * at the _beginning_ of the retry queue. Note how this allows us to preserve 19 * the property that the queue stays in time order simply by appending to the 20 * end. 21 * 22 * PUBLIC: int __repmgr_schedule_connection_attempt __P((ENV *, u_int, int)); 23 * 24 * !!! 25 * Caller should hold mutex. 26 * 27 * Unless an error occurs, we always attempt to wake the main thread; 28 * __repmgr_bust_connection relies on this behavior. 29 */ 30int 31__repmgr_schedule_connection_attempt(env, eid, immediate) 32 ENV *env; 33 u_int eid; 34 int immediate; 35{ 36 DB_REP *db_rep; 37 REPMGR_RETRY *retry; 38 REPMGR_SITE *site; 39 db_timespec t; 40 int ret; 41 42 db_rep = env->rep_handle; 43 if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0) 44 return (ret); 45 46 __os_gettime(env, &t, 1); 47 if (immediate) 48 TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); 49 else { 50 TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->connection_retry_wait); 51 TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries); 52 } 53 retry->eid = eid; 54 retry->time = t; 55 56 site = SITE_FROM_EID(eid); 57 site->state = SITE_IDLE; 58 site->ref.retry = retry; 59 60 return (__repmgr_wake_main_thread(env)); 61} 62 63/* 64 * Initialize the necessary control structures to begin reading a new input 65 * message. 66 * 67 * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *)); 68 */ 69void 70__repmgr_reset_for_reading(con) 71 REPMGR_CONNECTION *con; 72{ 73 con->reading_phase = SIZES_PHASE; 74 __repmgr_iovec_init(&con->iovecs); 75 __repmgr_add_buffer(&con->iovecs, &con->msg_type, 76 sizeof(con->msg_type)); 77 __repmgr_add_buffer(&con->iovecs, &con->control_size_buf, 78 sizeof(con->control_size_buf)); 79 __repmgr_add_buffer(&con->iovecs, &con->rec_size_buf, 80 sizeof(con->rec_size_buf)); 81} 82 83/* 84 * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of 85 * connections. It does not initialize eid, since that isn't needed and/or 86 * immediately known in all cases. 87 * 88 * PUBLIC: int __repmgr_new_connection __P((ENV *, REPMGR_CONNECTION **, 89 * PUBLIC: socket_t, int)); 90 */ 91int 92__repmgr_new_connection(env, connp, s, state) 93 ENV *env; 94 REPMGR_CONNECTION **connp; 95 socket_t s; 96 int state; 97{ 98 DB_REP *db_rep; 99 REPMGR_CONNECTION *c; 100 int ret; 101 102 db_rep = env->rep_handle; 103 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0) 104 return (ret); 105 if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { 106 __os_free(env, c); 107 return (ret); 108 } 109 c->blockers = 0; 110 111 c->fd = s; 112 c->state = state; 113 114 STAILQ_INIT(&c->outbound_queue); 115 c->out_queue_length = 0; 116 117 __repmgr_reset_for_reading(c); 118 TAILQ_INSERT_TAIL(&db_rep->connections, c, entries); 119 *connp = c; 120 121 return (0); 122} 123 124/* 125 * PUBLIC: int __repmgr_new_site __P((ENV *, REPMGR_SITE**, 126 * PUBLIC: const repmgr_netaddr_t *, int)); 127 * 128 * !!! 129 * Caller must hold mutex. 130 */ 131int 132__repmgr_new_site(env, sitep, addr, state) 133 ENV *env; 134 REPMGR_SITE **sitep; 135 const repmgr_netaddr_t *addr; 136 int state; 137{ 138 DB_REP *db_rep; 139 REPMGR_SITE *site; 140 SITE_STRING_BUFFER buffer; 141 u_int new_site_max, eid; 142 int ret; 143 144 db_rep = env->rep_handle; 145 if (db_rep->site_cnt >= db_rep->site_max) { 146#define INITIAL_SITES_ALLOCATION 10 /* Arbitrary guess. */ 147 new_site_max = db_rep->site_max == 0 ? 148 INITIAL_SITES_ALLOCATION : db_rep->site_max * 2; 149 if ((ret = __os_realloc(env, 150 sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0) 151 return (ret); 152 db_rep->site_max = new_site_max; 153 } 154 eid = db_rep->site_cnt++; 155 156 site = &db_rep->sites[eid]; 157 158 memcpy(&site->net_addr, addr, sizeof(*addr)); 159 ZERO_LSN(site->max_ack); 160 site->flags = 0; 161 timespecclear(&site->last_rcvd_timestamp); 162 site->state = state; 163 164 RPRINT(env, DB_VERB_REPMGR_MISC, 165 (env, "EID %u is assigned for %s", eid, 166 __repmgr_format_site_loc(site, buffer))); 167 *sitep = site; 168 return (0); 169} 170 171/* 172 * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to 173 * by the addr. 174 * 175 * PUBLIC: void __repmgr_cleanup_netaddr __P((ENV *, repmgr_netaddr_t *)); 176 */ 177void 178__repmgr_cleanup_netaddr(env, addr) 179 ENV *env; 180 repmgr_netaddr_t *addr; 181{ 182 if (addr->address_list != NULL) { 183 __os_freeaddrinfo(env, addr->address_list); 184 addr->address_list = addr->current = NULL; 185 } 186 if (addr->host != NULL) { 187 __os_free(env, addr->host); 188 addr->host = NULL; 189 } 190} 191 192/* 193 * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *)); 194 */ 195void 196__repmgr_iovec_init(v) 197 REPMGR_IOVECS *v; 198{ 199 v->offset = v->count = 0; 200 v->total_bytes = 0; 201} 202 203/* 204 * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t)); 205 * 206 * !!! 207 * There is no checking for overflow of the vectors[5] array. 208 */ 209void 210__repmgr_add_buffer(v, address, length) 211 REPMGR_IOVECS *v; 212 void *address; 213 size_t length; 214{ 215 v->vectors[v->count].iov_base = address; 216 v->vectors[v->count++].iov_len = length; 217 v->total_bytes += length; 218} 219 220/* 221 * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *)); 222 */ 223void 224__repmgr_add_dbt(v, dbt) 225 REPMGR_IOVECS *v; 226 const DBT *dbt; 227{ 228 v->vectors[v->count].iov_base = dbt->data; 229 v->vectors[v->count++].iov_len = dbt->size; 230 v->total_bytes += dbt->size; 231} 232 233/* 234 * Update a set of iovecs to reflect the number of bytes transferred in an I/O 235 * operation, so that the iovecs can be used to continue transferring where we 236 * left off. 237 * Returns TRUE if the set of buffers is now fully consumed, FALSE if more 238 * remains. 239 * 240 * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t)); 241 */ 242int 243__repmgr_update_consumed(v, byte_count) 244 REPMGR_IOVECS *v; 245 size_t byte_count; 246{ 247 db_iovec_t *iov; 248 int i; 249 250 for (i = v->offset; ; i++) { 251 DB_ASSERT(NULL, i < v->count && byte_count > 0); 252 iov = &v->vectors[i]; 253 if (byte_count > iov->iov_len) { 254 /* 255 * We've consumed (more than) this vector's worth. 256 * Adjust count and continue. 257 */ 258 byte_count -= iov->iov_len; 259 } else { 260 /* 261 * Adjust length of remaining portion of vector. 262 * byte_count can never be greater than iov_len, or we 263 * would not be in this section of the if clause. 264 */ 265 iov->iov_len -= (u_int32_t)byte_count; 266 if (iov->iov_len > 0) { 267 /* 268 * Still some left in this vector. Adjust base 269 * address too, and leave offset pointing here. 270 */ 271 iov->iov_base = (void *) 272 ((u_int8_t *)iov->iov_base + byte_count); 273 v->offset = i; 274 } else { 275 /* 276 * Consumed exactly to a vector boundary. 277 * Advance to next vector for next time. 278 */ 279 v->offset = i+1; 280 } 281 /* 282 * If offset has reached count, the entire thing is 283 * consumed. 284 */ 285 return (v->offset >= v->count); 286 } 287 } 288} 289 290/* 291 * Builds a buffer containing our network address information, suitable for 292 * publishing as cdata via a call to rep_start, and sets up the given DBT to 293 * point to it. The buffer is dynamically allocated memory, and the caller must 294 * assume responsibility for it. 295 * 296 * PUBLIC: int __repmgr_prepare_my_addr __P((ENV *, DBT *)); 297 */ 298int 299__repmgr_prepare_my_addr(env, dbt) 300 ENV *env; 301 DBT *dbt; 302{ 303 DB_REP *db_rep; 304 size_t size, hlen; 305 u_int16_t port_buffer; 306 u_int8_t *ptr; 307 int ret; 308 309 db_rep = env->rep_handle; 310 311 /* 312 * The cdata message consists of the 2-byte port number, in network byte 313 * order, followed by the null-terminated host name string. 314 */ 315 port_buffer = htons(db_rep->my_addr.port); 316 size = sizeof(port_buffer) + 317 (hlen = strlen(db_rep->my_addr.host) + 1); 318 if ((ret = __os_malloc(env, size, &ptr)) != 0) 319 return (ret); 320 321 DB_INIT_DBT(*dbt, ptr, size); 322 323 memcpy(ptr, &port_buffer, sizeof(port_buffer)); 324 ptr = &ptr[sizeof(port_buffer)]; 325 memcpy(ptr, db_rep->my_addr.host, hlen); 326 327 return (0); 328} 329 330/* 331 * Provide the appropriate value for nsites, the number of sites in the 332 * replication group. If the application has specified a value, use that. 333 * Otherwise, just use the number of sites we know of. 334 * 335 * !!! 336 * This may only be called after the environment has been opened, because we 337 * assume we have a rep region. That should be OK, because we only need this 338 * for starting an election, or counting acks after sending a PERM message. 339 * 340 * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *)); 341 */ 342u_int 343__repmgr_get_nsites(db_rep) 344 DB_REP *db_rep; 345{ 346 REP *rep; 347 348 rep = db_rep->region; 349 if (rep->config_nsites > 0) 350 return ((u_int)rep->config_nsites); 351 352 /* 353 * The number of other sites in our table, plus 1 to count ourself. 354 */ 355 return (db_rep->site_cnt + 1); 356} 357 358/* 359 * PUBLIC: void __repmgr_thread_failure __P((ENV *, int)); 360 */ 361void 362__repmgr_thread_failure(env, why) 363 ENV *env; 364 int why; 365{ 366 (void)__repmgr_stop_threads(env); 367 (void)__env_panic(env, why); 368} 369 370/* 371 * Format a printable representation of a site location, suitable for inclusion 372 * in an error message. The buffer must be at least as big as 373 * MAX_SITE_LOC_STRING. 374 * 375 * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *)); 376 */ 377char * 378__repmgr_format_eid_loc(db_rep, eid, buffer) 379 DB_REP *db_rep; 380 int eid; 381 char *buffer; 382{ 383 if (IS_VALID_EID(eid)) 384 return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer)); 385 386 snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)"); 387 return (buffer); 388} 389 390/* 391 * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *)); 392 */ 393char * 394__repmgr_format_site_loc(site, buffer) 395 REPMGR_SITE *site; 396 char *buffer; 397{ 398 snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu", 399 site->net_addr.host, (u_long)site->net_addr.port); 400 return (buffer); 401} 402 403/* 404 * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t)); 405 */ 406int 407__repmgr_repstart(env, flags) 408 ENV *env; 409 u_int32_t flags; 410{ 411 DBT my_addr; 412 int ret; 413 414 if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) 415 return (ret); 416 ret = __rep_start(env->dbenv, &my_addr, flags); 417 __os_free(env, my_addr.data); 418 if (ret != 0) 419 __db_err(env, ret, "rep_start"); 420 return (ret); 421} 422