1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14static int message_loop __P((ENV *)); 15static int process_message __P((ENV*, DBT*, DBT*, int)); 16static int handle_newsite __P((ENV *, const DBT *)); 17static int ack_message __P((ENV *, u_int32_t, DB_LSN *)); 18static int ack_msg_conn __P((ENV *, REPMGR_CONNECTION *, u_int32_t, DB_LSN *)); 19 20/* 21 * PUBLIC: void *__repmgr_msg_thread __P((void *)); 22 */ 23void * 24__repmgr_msg_thread(args) 25 void *args; 26{ 27 ENV *env = args; 28 int ret; 29 30 if ((ret = message_loop(env)) != 0) { 31 __db_err(env, ret, "message thread failed"); 32 __repmgr_thread_failure(env, ret); 33 } 34 return (NULL); 35} 36 37static int 38message_loop(env) 39 ENV *env; 40{ 41 REPMGR_MESSAGE *msg; 42 int ret; 43 44 while ((ret = __repmgr_queue_get(env, &msg)) == 0) { 45 while ((ret = process_message(env, &msg->control, &msg->rec, 46 msg->originating_eid)) == DB_LOCK_DEADLOCK) 47 RPRINT(env, DB_VERB_REPMGR_MISC, 48 (env, "repmgr deadlock retry")); 49 50 __os_free(env, msg); 51 if (ret != 0) 52 return (ret); 53 } 54 55 return (ret == DB_REP_UNAVAIL ? 0 : ret); 56} 57 58static int 59process_message(env, control, rec, eid) 60 ENV *env; 61 DBT *control, *rec; 62 int eid; 63{ 64 DB_LSN permlsn; 65 DB_REP *db_rep; 66 REP *rep; 67 int ret; 68 u_int32_t generation; 69 70 db_rep = env->rep_handle; 71 rep = db_rep->region; 72 73 /* 74 * Save initial generation number, in case it changes in a close race 75 * with a NEWMASTER. 76 */ 77 generation = rep->gen; 78 79 switch (ret = 80 __rep_process_message_int(env, control, rec, eid, &permlsn)) { 81 case 0: 82 if (db_rep->takeover_pending) { 83 db_rep->takeover_pending = FALSE; 84 return (__repmgr_become_master(env)); 85 } 86 break; 87 88 case DB_REP_NEWSITE: 89 return (handle_newsite(env, rec)); 90 91 case DB_REP_HOLDELECTION: 92 LOCK_MUTEX(db_rep->mutex); 93 ret = __repmgr_init_election(env, ELECT_ELECTION); 94 UNLOCK_MUTEX(db_rep->mutex); 95 if (ret != 0) 96 return (ret); 97 break; 98 99 case DB_REP_DUPMASTER: 100 if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) 101 return (ret); 102 LOCK_MUTEX(db_rep->mutex); 103 ret = __repmgr_init_election(env, ELECT_ELECTION); 104 UNLOCK_MUTEX(db_rep->mutex); 105 if (ret != 0) 106 return (ret); 107 break; 108 109 case DB_REP_ISPERM: 110 /* 111 * Don't bother sending ack if master doesn't care about it. 112 */ 113 if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE || 114 (IS_PEER_POLICY(db_rep->perm_policy) && 115 rep->priority == 0)) 116 break; 117 118 if ((ret = ack_message(env, generation, &permlsn)) != 0) 119 return (ret); 120 121 break; 122 123 case DB_REP_NOTPERM: /* FALLTHROUGH */ 124 case DB_REP_IGNORE: /* FALLTHROUGH */ 125 case DB_LOCK_DEADLOCK: 126 break; 127 128 default: 129 __db_err(env, ret, "DB_ENV->rep_process_message"); 130 return (ret); 131 } 132 return (0); 133} 134 135/* 136 * Handle replication-related events. Returns only 0 or DB_EVENT_NOT_HANDLED; 137 * no other error returns are tolerated. 138 * 139 * PUBLIC: int __repmgr_handle_event __P((ENV *, u_int32_t, void *)); 140 */ 141int 142__repmgr_handle_event(env, event, info) 143 ENV *env; 144 u_int32_t event; 145 void *info; 146{ 147 DB_REP *db_rep; 148 149 db_rep = env->rep_handle; 150 151 if (db_rep->selector == NULL) { 152 /* Repmgr is not in use, so all events go to application. */ 153 return (DB_EVENT_NOT_HANDLED); 154 } 155 156 switch (event) { 157 case DB_EVENT_REP_ELECTED: 158 DB_ASSERT(env, info == NULL); 159 160 db_rep->found_master = TRUE; 161 db_rep->takeover_pending = TRUE; 162 163 /* 164 * The application doesn't really need to see this, because the 165 * purpose of this event is to tell the winning site that it 166 * should call rep_start(MASTER), and in repmgr we do that 167 * automatically. Still, they could conceivably be curious, and 168 * it doesn't hurt anything to let them know. 169 */ 170 break; 171 case DB_EVENT_REP_NEWMASTER: 172 DB_ASSERT(env, info != NULL); 173 174 db_rep->found_master = TRUE; 175 db_rep->master_eid = *(int *)info; 176 177 /* Application still needs to see this. */ 178 break; 179 default: 180 break; 181 } 182 return (DB_EVENT_NOT_HANDLED); 183} 184 185static int 186ack_message(env, generation, lsn) 187 ENV *env; 188 u_int32_t generation; 189 DB_LSN *lsn; 190{ 191 DB_REP *db_rep; 192 REPMGR_CONNECTION *conn; 193 REPMGR_SITE *site; 194 int ret; 195 196 db_rep = env->rep_handle; 197 /* 198 * Regardless of where a message came from, all ack's go to the master 199 * site. If we're not in touch with the master, we drop it, since 200 * there's not much else we can do. 201 */ 202 ret = 0; 203 LOCK_MUTEX(db_rep->mutex); 204 if (!IS_VALID_EID(db_rep->master_eid) || 205 db_rep->master_eid == SELF_EID) { 206 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 207 "dropping ack with master %d", db_rep->master_eid)); 208 goto unlock; 209 } 210 211 site = SITE_FROM_EID(db_rep->master_eid); 212 213 /* 214 * Send the ack out on any/all connections that need it, rather than 215 * going to the trouble of trying to keep track of what LSN's each 216 * connection may be waiting for. 217 */ 218 if (site->state == SITE_CONNECTED && 219 (ret = ack_msg_conn(env, site->ref.conn, generation, lsn)) != 0) 220 goto unlock; 221 TAILQ_FOREACH(conn, &site->sub_conns, entries) { 222 if ((ret = ack_msg_conn(env, conn, generation, lsn)) != 0) 223 goto unlock; 224 } 225 226unlock: 227 UNLOCK_MUTEX(db_rep->mutex); 228 return (ret); 229} 230 231/* 232 * Sends an acknowledgment on one connection, if it needs it. 233 * 234 * !!! Called with mutex held. 235 */ 236static int 237ack_msg_conn(env, conn, generation, lsn) 238 ENV *env; 239 REPMGR_CONNECTION *conn; 240 u_int32_t generation; 241 DB_LSN *lsn; 242{ 243 DBT control2, rec2; 244 __repmgr_ack_args ack; 245 u_int8_t buf[__REPMGR_ACK_SIZE]; 246 int ret; 247 248 ret = 0; 249 250 if (conn->state == CONN_READY) { 251 DB_ASSERT(env, conn->version > 0); 252 ack.generation = generation; 253 memcpy(&ack.lsn, lsn, sizeof(DB_LSN)); 254 if (conn->version == 1) { 255 control2.data = &ack; 256 control2.size = sizeof(ack); 257 } else { 258 __repmgr_ack_marshal(env, &ack, buf); 259 control2.data = buf; 260 control2.size = __REPMGR_ACK_SIZE; 261 } 262 rec2.size = 0; 263 /* 264 * It's hard to imagine anyone would care about a lost ack if 265 * the path to the master is so congested as to need blocking; 266 * so pass "blockable" argument as FALSE. 267 */ 268 if ((ret = __repmgr_send_one(env, conn, REPMGR_ACK, 269 &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) 270 ret = __repmgr_bust_connection(env, conn); 271 } 272 return (ret); 273} 274 275/* 276 * Does everything necessary to handle the processing of a NEWSITE return. 277 */ 278static int 279handle_newsite(env, rec) 280 ENV *env; 281 const DBT *rec; 282{ 283 DB_REP *db_rep; 284 REPMGR_SITE *site; 285 SITE_STRING_BUFFER buffer; 286 size_t hlen; 287 u_int16_t port; 288 int ret; 289 char *host; 290 291 db_rep = env->rep_handle; 292 /* 293 * Check if we got sent connect information and if we did, if 294 * this is me or if we already have a connection to this new 295 * site. If we don't, establish a new one. 296 * 297 * Unmarshall the cdata: a 2-byte port number, in network byte order, 298 * followed by the host name string, which should already be 299 * null-terminated, but let's make sure. 300 */ 301 if (rec->size < sizeof(port) + 1) { 302 __db_errx(env, "unexpected cdata size, msg ignored"); 303 return (0); 304 } 305 memcpy(&port, rec->data, sizeof(port)); 306 port = ntohs(port); 307 308 host = (char*)((u_int8_t*)rec->data + sizeof(port)); 309 hlen = (rec->size - sizeof(port)) - 1; 310 host[hlen] = '\0'; 311 312 /* It's me, do nothing. */ 313 if (strcmp(host, db_rep->my_addr.host) == 0 && 314 port == db_rep->my_addr.port) { 315 RPRINT(env, DB_VERB_REPMGR_MISC, 316 (env, "repmgr ignores own NEWSITE info")); 317 return (0); 318 } 319 320 LOCK_MUTEX(db_rep->mutex); 321 if ((ret = __repmgr_add_site(env, host, port, &site, 0)) == EEXIST) { 322 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 323 "NEWSITE info from %s was already known", 324 __repmgr_format_site_loc(site, buffer))); 325 /* 326 * This is a good opportunity to look up the host name, if 327 * needed, because we're on a message thread (not the critical 328 * select() thread). 329 */ 330 if ((ret = __repmgr_check_host_name(env, 331 EID_FROM_SITE(site))) != 0) 332 return (ret); 333 334 if (site->state == SITE_CONNECTED) 335 goto unlock; /* Nothing to do. */ 336 } else { 337 if (ret != 0) 338 goto unlock; 339 RPRINT(env, DB_VERB_REPMGR_MISC, 340 (env, "NEWSITE info added %s", 341 __repmgr_format_site_loc(site, buffer))); 342 } 343 344 /* 345 * Wake up the main thread to connect to the new or reawakened 346 * site. 347 */ 348 ret = __repmgr_wake_main_thread(env); 349 350unlock: UNLOCK_MUTEX(db_rep->mutex); 351 return (ret); 352} 353