1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_msg.c,v 1.42 2008/01/08 20:58:48 bostic Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14static int message_loop __P((ENV *)); 15static int process_message __P((ENV*, DBT*, DBT*, int)); 16static int handle_newsite __P((ENV *, const DBT *)); 17static int ack_message __P((ENV *, u_int32_t, DB_LSN *)); 18 19/* 20 * PUBLIC: void *__repmgr_msg_thread __P((void *)); 21 */ 22void * 23__repmgr_msg_thread(args) 24 void *args; 25{ 26 ENV *env = args; 27 int ret; 28 29 if ((ret = message_loop(env)) != 0) { 30 __db_err(env, ret, "message thread failed"); 31 __repmgr_thread_failure(env, ret); 32 } 33 return (NULL); 34} 35 36static int 37message_loop(env) 38 ENV *env; 39{ 40 REPMGR_MESSAGE *msg; 41 int ret; 42 43 while ((ret = __repmgr_queue_get(env, &msg)) == 0) { 44 while ((ret = process_message(env, &msg->control, &msg->rec, 45 msg->originating_eid)) == DB_LOCK_DEADLOCK) 46 RPRINT(env, DB_VERB_REPMGR_MISC, 47 (env, "repmgr deadlock retry")); 48 49 __os_free(env, msg); 50 if (ret != 0) 51 return (ret); 52 } 53 54 return (ret == DB_REP_UNAVAIL ? 0 : ret); 55} 56 57static int 58process_message(env, control, rec, eid) 59 ENV *env; 60 DBT *control, *rec; 61 int eid; 62{ 63 DB_LSN permlsn; 64 DB_REP *db_rep; 65 REP *rep; 66 int ret; 67 u_int32_t generation; 68 69 db_rep = env->rep_handle; 70 71 /* 72 * Save initial generation number, in case it changes in a close race 73 * with a NEWMASTER. See msgdir.10000/10039/msg00086.html. 74 */ 75 generation = db_rep->generation; 76 77 switch (ret = 78 __rep_process_message(env->dbenv, control, rec, eid, &permlsn)) { 79 case 0: 80 if (db_rep->takeover_pending) { 81 db_rep->takeover_pending = FALSE; 82 return (__repmgr_become_master(env)); 83 } 84 break; 85 86 case DB_REP_NEWSITE: 87 return (handle_newsite(env, rec)); 88 89 case DB_REP_HOLDELECTION: 90 LOCK_MUTEX(db_rep->mutex); 91 ret = __repmgr_init_election(env, ELECT_ELECTION); 92 UNLOCK_MUTEX(db_rep->mutex); 93 if (ret != 0) 94 return (ret); 95 break; 96 97 case DB_REP_DUPMASTER: 98 if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) 99 return (ret); 100 LOCK_MUTEX(db_rep->mutex); 101 ret = __repmgr_init_election(env, ELECT_ELECTION); 102 UNLOCK_MUTEX(db_rep->mutex); 103 if (ret != 0) 104 return (ret); 105 break; 106 107 case DB_REP_ISPERM: 108 /* 109 * Don't bother sending ack if master doesn't care about it. 110 */ 111 rep = db_rep->region; 112 if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE || 113 (IS_PEER_POLICY(db_rep->perm_policy) && 114 rep->priority == 0)) 115 break; 116 117 if ((ret = ack_message(env, generation, &permlsn)) != 0) 118 return (ret); 119 120 break; 121 122 case DB_REP_NOTPERM: /* FALLTHROUGH */ 123 case DB_REP_IGNORE: /* FALLTHROUGH */ 124 case DB_LOCK_DEADLOCK: 125 break; 126 127 default: 128 __db_err(env, ret, "DB_ENV->rep_process_message"); 129 return (ret); 130 } 131 return (0); 132} 133 134/* 135 * Handle replication-related events. Returns only 0 or DB_EVENT_NOT_HANDLED; 136 * no other error returns are tolerated. 137 * 138 * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *)); 139 */ 140int 141__repmgr_handle_event(env, event, info) 142 ENV *env; 143 u_int32_t event; 144 void *info; 145{ 146 DB_REP *db_rep; 147 148 db_rep = env->rep_handle; 149 150 if (db_rep->selector == NULL) { 151 /* Repmgr is not in use, so all events go to application. */ 152 return (DB_EVENT_NOT_HANDLED); 153 } 154 155 switch (event) { 156 case DB_EVENT_REP_ELECTED: 157 DB_ASSERT(env, info == NULL); 158 159 db_rep->found_master = TRUE; 160 db_rep->takeover_pending = TRUE; 161 162 /* 163 * The application doesn't really need to see this, because the 164 * purpose of this event is to tell the winning site that it 165 * should call rep_start(MASTER), and in repmgr we do that 166 * automatically. Still, they could conceivably be curious, and 167 * it doesn't hurt anything to let them know. 168 */ 169 break; 170 case DB_EVENT_REP_NEWMASTER: 171 DB_ASSERT(env, info != NULL); 172 173 db_rep->found_master = TRUE; 174 db_rep->master_eid = *(int *)info; 175 __repmgr_stash_generation(env); 176 177 /* Application still needs to see this. */ 178 break; 179 default: 180 break; 181 } 182 return (DB_EVENT_NOT_HANDLED); 183} 184 185/* 186 * Acknowledges a message. 187 */ 188static int 189ack_message(env, generation, lsn) 190 ENV *env; 191 u_int32_t generation; 192 DB_LSN *lsn; 193{ 194 DBT control2, rec2; 195 DB_REP *db_rep; 196 __repmgr_ack_args ack; 197 u_int8_t buf[__REPMGR_ACK_SIZE]; 198 REPMGR_CONNECTION *conn; 199 REPMGR_SITE *site; 200 int ret; 201 202 db_rep = env->rep_handle; 203 /* 204 * Regardless of where a message came from, all ack's go to the master 205 * site. If we're not in touch with the master, we drop it, since 206 * there's not much else we can do. 207 */ 208 if (!IS_VALID_EID(db_rep->master_eid) || 209 db_rep->master_eid == SELF_EID) { 210 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 211 "dropping ack with master %d", db_rep->master_eid)); 212 return (0); 213 } 214 215 ret = 0; 216 LOCK_MUTEX(db_rep->mutex); 217 site = SITE_FROM_EID(db_rep->master_eid); 218 if (site->state == SITE_CONNECTED && 219 site->ref.conn->state == CONN_READY) { 220 conn = site->ref.conn; 221 DB_ASSERT(env, conn->version > 0); 222 ack.generation = generation; 223 memcpy(&ack.lsn, lsn, sizeof(DB_LSN)); 224 if (conn->version == 1) { 225 control2.data = &ack; 226 control2.size = sizeof(ack); 227 } else { 228 __repmgr_ack_marshal(env, &ack, buf); 229 control2.data = buf; 230 control2.size = __REPMGR_ACK_SIZE; 231 } 232 rec2.size = 0; 233 /* 234 * It's hard to imagine anyone would care about a lost ack if 235 * the path to the master is so congested as to need blocking; 236 * so pass "blockable" argument as FALSE. 237 */ 238 if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK, 239 &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) 240 ret = __repmgr_bust_connection(env, conn); 241 } 242 243 UNLOCK_MUTEX(db_rep->mutex); 244 return (ret); 245} 246 247/* 248 * Does everything necessary to handle the processing of a NEWSITE return. 249 */ 250static int 251handle_newsite(env, rec) 252 ENV *env; 253 const DBT *rec; 254{ 255 ADDRINFO *ai; 256 DB_REP *db_rep; 257 REPMGR_SITE *site; 258 SITE_STRING_BUFFER buffer; 259 repmgr_netaddr_t *addr; 260 size_t hlen; 261 u_int16_t port; 262 int ret; 263 char *host; 264 265 db_rep = env->rep_handle; 266 /* 267 * Check if we got sent connect information and if we did, if 268 * this is me or if we already have a connection to this new 269 * site. If we don't, establish a new one. 270 * 271 * Unmarshall the cdata: a 2-byte port number, in network byte order, 272 * followed by the host name string, which should already be 273 * null-terminated, but let's make sure. 274 */ 275 if (rec->size < sizeof(port) + 1) { 276 __db_errx(env, "unexpected cdata size, msg ignored"); 277 return (0); 278 } 279 memcpy(&port, rec->data, sizeof(port)); 280 port = ntohs(port); 281 282 host = (char*)((u_int8_t*)rec->data + sizeof(port)); 283 hlen = (rec->size - sizeof(port)) - 1; 284 host[hlen] = '\0'; 285 286 /* It's me, do nothing. */ 287 if (strcmp(host, db_rep->my_addr.host) == 0 && 288 port == db_rep->my_addr.port) { 289 RPRINT(env, DB_VERB_REPMGR_MISC, 290 (env, "repmgr ignores own NEWSITE info")); 291 return (0); 292 } 293 294 LOCK_MUTEX(db_rep->mutex); 295 if ((ret = __repmgr_add_site(env, host, port, &site)) == EEXIST) { 296 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 297 "NEWSITE info from %s was already known", 298 __repmgr_format_site_loc(site, buffer))); 299 /* 300 * In case we already know about this site only because it 301 * first connected to us, we may not yet have had a chance to 302 * look up its addresses. Even though we don't need them just 303 * now, this is an advantageous opportunity to get them since we 304 * can do so away from the critical select thread. Give up only 305 * for a disastrous failure. 306 */ 307 addr = &site->net_addr; 308 if (addr->address_list == NULL) { 309 if ((ret = __repmgr_getaddr(env, 310 addr->host, addr->port, 0, &ai)) == 0) 311 addr->address_list = ai; 312 else if (ret != DB_REP_UNAVAIL) 313 goto unlock; 314 } 315 316 ret = 0; 317 if (site->state == SITE_CONNECTED) 318 goto unlock; /* Nothing to do. */ 319 } else { 320 if (ret != 0) 321 goto unlock; 322 RPRINT(env, DB_VERB_REPMGR_MISC, 323 (env, "NEWSITE info added %s", 324 __repmgr_format_site_loc(site, buffer))); 325 } 326 327 /* 328 * Wake up the main thread to connect to the new or reawakened 329 * site. 330 */ 331 ret = __repmgr_wake_main_thread(env); 332 333unlock: UNLOCK_MUTEX(db_rep->mutex); 334 return (ret); 335} 336 337/* 338 * PUBLIC: void __repmgr_stash_generation __P((ENV *)); 339 */ 340void 341__repmgr_stash_generation(env) 342 ENV *env; 343{ 344 DB_REP *db_rep; 345 REP *rep; 346 347 db_rep = env->rep_handle; 348 rep = db_rep->region; 349 350 db_rep->generation = rep->gen; 351} 352