1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_util.c,v 1.45 2008/04/30 02:33:34 alexg Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14/*
15 * Schedules a future attempt to re-establish a connection with the given site.
16 * Usually, we wait the configured retry_wait period.  But if the "immediate"
17 * parameter is given as TRUE, we'll make the wait time 0, and put the request
18 * at the _beginning_ of the retry queue.  Note how this allows us to preserve
19 * the property that the queue stays in time order simply by appending to the
20 * end.
21 *
22 * PUBLIC: int __repmgr_schedule_connection_attempt __P((ENV *, u_int, int));
23 *
24 * !!!
25 * Caller should hold mutex.
26 *
27 * Unless an error occurs, we always attempt to wake the main thread;
28 * __repmgr_bust_connection relies on this behavior.
29 */
30int
31__repmgr_schedule_connection_attempt(env, eid, immediate)
32	ENV *env;
33	u_int eid;
34	int immediate;
35{
36	DB_REP *db_rep;
37	REPMGR_RETRY *retry;
38	REPMGR_SITE *site;
39	db_timespec t;
40	int ret;
41
42	db_rep = env->rep_handle;
43	if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0)
44		return (ret);
45
46	__os_gettime(env, &t, 1);
47	if (immediate)
48		TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
49	else {
50		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->connection_retry_wait);
51		TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
52	}
53	retry->eid = eid;
54	retry->time = t;
55
56	site = SITE_FROM_EID(eid);
57	site->state = SITE_IDLE;
58	site->ref.retry = retry;
59
60	return (__repmgr_wake_main_thread(env));
61}
62
63/*
64 * Initialize the necessary control structures to begin reading a new input
65 * message.
66 *
67 * PUBLIC: void __repmgr_reset_for_reading __P((REPMGR_CONNECTION *));
68 */
69void
70__repmgr_reset_for_reading(con)
71	REPMGR_CONNECTION *con;
72{
73	con->reading_phase = SIZES_PHASE;
74	__repmgr_iovec_init(&con->iovecs);
75	__repmgr_add_buffer(&con->iovecs, &con->msg_type,
76	    sizeof(con->msg_type));
77	__repmgr_add_buffer(&con->iovecs, &con->control_size_buf,
78	    sizeof(con->control_size_buf));
79	__repmgr_add_buffer(&con->iovecs, &con->rec_size_buf,
80	    sizeof(con->rec_size_buf));
81}
82
83/*
84 * Constructs a DB_REPMGR_CONNECTION structure, and puts it on the main list of
85 * connections.  It does not initialize eid, since that isn't needed and/or
86 * immediately known in all cases.
87 *
88 * PUBLIC:  int __repmgr_new_connection __P((ENV *, REPMGR_CONNECTION **,
89 * PUBLIC:				   socket_t, int));
90 */
91int
92__repmgr_new_connection(env, connp, s, state)
93	ENV *env;
94	REPMGR_CONNECTION **connp;
95	socket_t s;
96	int state;
97{
98	DB_REP *db_rep;
99	REPMGR_CONNECTION *c;
100	int ret;
101
102	db_rep = env->rep_handle;
103	if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0)
104		return (ret);
105	if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
106		__os_free(env, c);
107		return (ret);
108	}
109	c->blockers = 0;
110
111	c->fd = s;
112	c->state = state;
113
114	STAILQ_INIT(&c->outbound_queue);
115	c->out_queue_length = 0;
116
117	__repmgr_reset_for_reading(c);
118	TAILQ_INSERT_TAIL(&db_rep->connections, c, entries);
119	*connp = c;
120
121	return (0);
122}
123
124/*
125 * PUBLIC: int __repmgr_new_site __P((ENV *, REPMGR_SITE**,
126 * PUBLIC:     const repmgr_netaddr_t *, int));
127 *
128 * !!!
129 * Caller must hold mutex.
130 */
131int
132__repmgr_new_site(env, sitep, addr, state)
133	ENV *env;
134	REPMGR_SITE **sitep;
135	const repmgr_netaddr_t *addr;
136	int state;
137{
138	DB_REP *db_rep;
139	REPMGR_SITE *site;
140	SITE_STRING_BUFFER buffer;
141	u_int new_site_max, eid;
142	int ret;
143
144	db_rep = env->rep_handle;
145	if (db_rep->site_cnt >= db_rep->site_max) {
146#define	INITIAL_SITES_ALLOCATION	10		/* Arbitrary guess. */
147		new_site_max = db_rep->site_max == 0 ?
148		    INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
149		if ((ret = __os_realloc(env,
150		     sizeof(REPMGR_SITE) * new_site_max, &db_rep->sites)) != 0)
151			 return (ret);
152		db_rep->site_max = new_site_max;
153	}
154	eid = db_rep->site_cnt++;
155
156	site = &db_rep->sites[eid];
157
158	memcpy(&site->net_addr, addr, sizeof(*addr));
159	ZERO_LSN(site->max_ack);
160	site->flags = 0;
161	timespecclear(&site->last_rcvd_timestamp);
162	site->state = state;
163
164	RPRINT(env, DB_VERB_REPMGR_MISC,
165	    (env, "EID %u is assigned for %s", eid,
166	    __repmgr_format_site_loc(site, buffer)));
167	*sitep = site;
168	return (0);
169}
170
171/*
172 * Destructor for a repmgr_netaddr_t, cleans up any allocated memory pointed to
173 * by the addr.
174 *
175 * PUBLIC: void __repmgr_cleanup_netaddr __P((ENV *, repmgr_netaddr_t *));
176 */
177void
178__repmgr_cleanup_netaddr(env, addr)
179	ENV *env;
180	repmgr_netaddr_t *addr;
181{
182	if (addr->address_list != NULL) {
183		__os_freeaddrinfo(env, addr->address_list);
184		addr->address_list = addr->current = NULL;
185	}
186	if (addr->host != NULL) {
187		__os_free(env, addr->host);
188		addr->host = NULL;
189	}
190}
191
192/*
193 * PUBLIC: void __repmgr_iovec_init __P((REPMGR_IOVECS *));
194 */
195void
196__repmgr_iovec_init(v)
197	REPMGR_IOVECS *v;
198{
199	v->offset = v->count = 0;
200	v->total_bytes = 0;
201}
202
203/*
204 * PUBLIC: void __repmgr_add_buffer __P((REPMGR_IOVECS *, void *, size_t));
205 *
206 * !!!
207 * There is no checking for overflow of the vectors[5] array.
208 */
209void
210__repmgr_add_buffer(v, address, length)
211	REPMGR_IOVECS *v;
212	void *address;
213	size_t length;
214{
215	v->vectors[v->count].iov_base = address;
216	v->vectors[v->count++].iov_len = length;
217	v->total_bytes += length;
218}
219
220/*
221 * PUBLIC: void __repmgr_add_dbt __P((REPMGR_IOVECS *, const DBT *));
222 */
223void
224__repmgr_add_dbt(v, dbt)
225	REPMGR_IOVECS *v;
226	const DBT *dbt;
227{
228	v->vectors[v->count].iov_base = dbt->data;
229	v->vectors[v->count++].iov_len = dbt->size;
230	v->total_bytes += dbt->size;
231}
232
233/*
234 * Update a set of iovecs to reflect the number of bytes transferred in an I/O
235 * operation, so that the iovecs can be used to continue transferring where we
236 * left off.
237 *     Returns TRUE if the set of buffers is now fully consumed, FALSE if more
238 * remains.
239 *
240 * PUBLIC: int __repmgr_update_consumed __P((REPMGR_IOVECS *, size_t));
241 */
242int
243__repmgr_update_consumed(v, byte_count)
244	REPMGR_IOVECS *v;
245	size_t byte_count;
246{
247	db_iovec_t *iov;
248	int i;
249
250	for (i = v->offset; ; i++) {
251		DB_ASSERT(NULL, i < v->count && byte_count > 0);
252		iov = &v->vectors[i];
253		if (byte_count > iov->iov_len) {
254			/*
255			 * We've consumed (more than) this vector's worth.
256			 * Adjust count and continue.
257			 */
258			byte_count -= iov->iov_len;
259		} else {
260			/*
261			 * Adjust length of remaining portion of vector.
262			 * byte_count can never be greater than iov_len, or we
263			 * would not be in this section of the if clause.
264			 */
265			iov->iov_len -= (u_int32_t)byte_count;
266			if (iov->iov_len > 0) {
267				/*
268				 * Still some left in this vector.  Adjust base
269				 * address too, and leave offset pointing here.
270				 */
271				iov->iov_base = (void *)
272				    ((u_int8_t *)iov->iov_base + byte_count);
273				v->offset = i;
274			} else {
275				/*
276				 * Consumed exactly to a vector boundary.
277				 * Advance to next vector for next time.
278				 */
279				v->offset = i+1;
280			}
281			/*
282			 * If offset has reached count, the entire thing is
283			 * consumed.
284			 */
285			return (v->offset >= v->count);
286		}
287	}
288}
289
290/*
291 * Builds a buffer containing our network address information, suitable for
292 * publishing as cdata via a call to rep_start, and sets up the given DBT to
293 * point to it.  The buffer is dynamically allocated memory, and the caller must
294 * assume responsibility for it.
295 *
296 * PUBLIC: int __repmgr_prepare_my_addr __P((ENV *, DBT *));
297 */
298int
299__repmgr_prepare_my_addr(env, dbt)
300	ENV *env;
301	DBT *dbt;
302{
303	DB_REP *db_rep;
304	size_t size, hlen;
305	u_int16_t port_buffer;
306	u_int8_t *ptr;
307	int ret;
308
309	db_rep = env->rep_handle;
310
311	/*
312	 * The cdata message consists of the 2-byte port number, in network byte
313	 * order, followed by the null-terminated host name string.
314	 */
315	port_buffer = htons(db_rep->my_addr.port);
316	size = sizeof(port_buffer) +
317	    (hlen = strlen(db_rep->my_addr.host) + 1);
318	if ((ret = __os_malloc(env, size, &ptr)) != 0)
319		return (ret);
320
321	DB_INIT_DBT(*dbt, ptr, size);
322
323	memcpy(ptr, &port_buffer, sizeof(port_buffer));
324	ptr = &ptr[sizeof(port_buffer)];
325	memcpy(ptr, db_rep->my_addr.host, hlen);
326
327	return (0);
328}
329
330/*
331 * Provide the appropriate value for nsites, the number of sites in the
332 * replication group.  If the application has specified a value, use that.
333 * Otherwise, just use the number of sites we know of.
334 *
335 * !!!
336 * This may only be called after the environment has been opened, because we
337 * assume we have a rep region.  That should be OK, because we only need this
338 * for starting an election, or counting acks after sending a PERM message.
339 *
340 * PUBLIC: u_int __repmgr_get_nsites __P((DB_REP *));
341 */
342u_int
343__repmgr_get_nsites(db_rep)
344	DB_REP *db_rep;
345{
346	REP *rep;
347
348	rep = db_rep->region;
349	if (rep->config_nsites > 0)
350		return ((u_int)rep->config_nsites);
351
352	/*
353	 * The number of other sites in our table, plus 1 to count ourself.
354	 */
355	return (db_rep->site_cnt + 1);
356}
357
358/*
359 * PUBLIC: void __repmgr_thread_failure __P((ENV *, int));
360 */
361void
362__repmgr_thread_failure(env, why)
363	ENV *env;
364	int why;
365{
366	(void)__repmgr_stop_threads(env);
367	(void)__env_panic(env, why);
368}
369
370/*
371 * Format a printable representation of a site location, suitable for inclusion
372 * in an error message.  The buffer must be at least as big as
373 * MAX_SITE_LOC_STRING.
374 *
375 * PUBLIC: char *__repmgr_format_eid_loc __P((DB_REP *, int, char *));
376 */
377char *
378__repmgr_format_eid_loc(db_rep, eid, buffer)
379	DB_REP *db_rep;
380	int eid;
381	char *buffer;
382{
383	if (IS_VALID_EID(eid))
384		return (__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer));
385
386	snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
387	return (buffer);
388}
389
390/*
391 * PUBLIC: char *__repmgr_format_site_loc __P((REPMGR_SITE *, char *));
392 */
393char *
394__repmgr_format_site_loc(site, buffer)
395	REPMGR_SITE *site;
396	char *buffer;
397{
398	snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
399	    site->net_addr.host, (u_long)site->net_addr.port);
400	return (buffer);
401}
402
403/*
404 * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t));
405 */
406int
407__repmgr_repstart(env, flags)
408	ENV *env;
409	u_int32_t flags;
410{
411	DBT my_addr;
412	int ret;
413
414	if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
415		return (ret);
416	ret = __rep_start(env->dbenv, &my_addr, flags);
417	__os_free(env, my_addr.data);
418	if (ret != 0)
419		__db_err(env, ret, "rep_start");
420	return (ret);
421}
422