1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_msg.c,v 12.16 2008/01/08 20:58:24 bostic Exp $
7 */
8
9#include <sys/types.h>
10#include <errno.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <string.h>
14
15#include <db.h>
16
17#include "rep_base.h"
18
19static int   connect_site __P((DB_ENV *, machtab_t *,
20		 const char *, repsite_t *, int *, thread_t *));
21static void *elect_thread __P((void *));
22static void *hm_loop __P((void *));
23
24typedef struct {
25	DB_ENV *dbenv;
26	machtab_t *machtab;
27} elect_args;
28
29typedef struct {
30	DB_ENV *dbenv;
31	const char *progname;
32	const char *home;
33	socket_t fd;
34	u_int32_t eid;
35	machtab_t *tab;
36} hm_loop_args;
37
38/*
39 * This is a generic message handling loop that is used both by the
40 * master to accept messages from a client as well as by clients
41 * to communicate with other clients.
42 */
43static void *
44hm_loop(args)
45	void *args;
46{
47	DB_ENV *dbenv;
48	DB_LSN permlsn;
49	DBT rec, control;
50	APP_DATA *app;
51	const char *c, *home, *progname;
52	elect_args *ea;
53	hm_loop_args *ha;
54	machtab_t *tab;
55	thread_t elect_thr, *site_thrs, *tmp, tid;
56	repsite_t self;
57	u_int32_t timeout;
58	int eid, n, nsites, nsites_allocd;
59	int already_open, r, ret, t_ret;
60	socket_t fd;
61	void *status;
62
63	ea = NULL;
64	site_thrs = NULL;
65	nsites_allocd = 0;
66	nsites = 0;
67
68	ha = (hm_loop_args *)args;
69	dbenv = ha->dbenv;
70	fd = ha->fd;
71	home = ha->home;
72	eid = ha->eid;
73	progname = ha->progname;
74	tab = ha->tab;
75	free(ha);
76	app = dbenv->app_private;
77
78	memset(&rec, 0, sizeof(DBT));
79	memset(&control, 0, sizeof(DBT));
80
81	for (ret = 0; ret == 0;) {
82		if ((ret = get_next_message(fd, &rec, &control)) != 0) {
83			/*
84			 * Close this connection; if it's the master call
85			 * for an election.
86			 */
87			closesocket(fd);
88			if ((ret = machtab_rem(tab, eid, 1)) != 0)
89				break;
90
91			/*
92			 * If I'm the master, I just lost a client and this
93			 * thread is done.
94			 */
95			if (master_eid == SELF_EID)
96				break;
97
98			/*
99			 * If I was talking with the master and the master
100			 * went away, I need to call an election; else I'm
101			 * done.
102			 */
103			if (master_eid != eid)
104				break;
105
106			master_eid = DB_EID_INVALID;
107			machtab_parm(tab, &n, &timeout);
108			(void)dbenv->rep_set_timeout(dbenv,
109			    DB_REP_ELECTION_TIMEOUT, timeout);
110			if ((ret = dbenv->rep_elect(dbenv,
111			    n, (n/2+1), 0)) != 0)
112				continue;
113
114			/*
115			 * Regardless of the results, the site I was talking
116			 * to is gone, so I have nothing to do but exit.
117			 */
118			if (app->elected) {
119				app->elected = 0;
120				ret = dbenv->rep_start(dbenv,
121				    NULL, DB_REP_MASTER);
122			}
123			break;
124		}
125
126		switch (r = dbenv->rep_process_message(dbenv,
127		    &control, &rec, eid, &permlsn)) {
128		case DB_REP_NEWSITE:
129			/*
130			 * Check if we got sent connect information and if we
131			 * did, if this is me or if we already have a
132			 * connection to this new site.  If we don't,
133			 * establish a new one.
134			 */
135
136			/* No connect info. */
137			if (rec.size == 0)
138				break;
139
140			/* It's me, do nothing. */
141			if (strncmp(myaddr, rec.data, rec.size) == 0)
142				break;
143
144			self.host = (char *)rec.data;
145			self.host = strtok(self.host, ":");
146			if ((c = strtok(NULL, ":")) == NULL) {
147				dbenv->errx(dbenv, "Bad host specification");
148				goto out;
149			}
150			self.port = atoi(c);
151
152			/*
153			 * We try to connect to the new site.  If we can't,
154			 * we treat it as an error since we know that the site
155			 * should be up if we got a message from it (even
156			 * indirectly).
157			 */
158			if (nsites == nsites_allocd) {
159				/* Need to allocate more space. */
160				if ((tmp = realloc(
161				    site_thrs, (10 + nsites) *
162				    sizeof(thread_t))) == NULL) {
163					ret = errno;
164					goto out;
165				}
166				site_thrs = tmp;
167				nsites_allocd += 10;
168			}
169			if ((ret = connect_site(dbenv, tab, progname,
170			    &self, &already_open, &tid)) != 0)
171				goto out;
172			if (!already_open)
173				memcpy(&site_thrs
174				    [nsites++], &tid, sizeof(thread_t));
175			break;
176		case DB_REP_HOLDELECTION:
177			if (master_eid == SELF_EID)
178				break;
179			/* Make sure that previous election has finished. */
180			if (ea != NULL) {
181				if (thread_join(elect_thr, &status) != 0) {
182					dbenv->errx(dbenv,
183					    "thread join failure");
184					goto out;
185				}
186				ea = NULL;
187			}
188			if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
189				dbenv->errx(dbenv, "can't allocate memory");
190				ret = errno;
191				goto out;
192			}
193			ea->dbenv = dbenv;
194			ea->machtab = tab;
195			if ((ret = thread_create(&elect_thr,
196			     NULL, elect_thread, (void *)ea)) != 0) {
197				dbenv->errx(dbenv,
198				    "can't create election thread");
199			}
200			break;
201		case DB_REP_ISPERM:
202			break;
203		case 0:
204			if (app->elected) {
205				app->elected = 0;
206				if ((ret = dbenv->rep_start(dbenv,
207				    NULL, DB_REP_MASTER)) != 0) {
208					dbenv->err(dbenv, ret,
209					    "can't start as master");
210					goto out;
211				}
212			}
213			break;
214		default:
215			dbenv->err(dbenv, r, "DB_ENV->rep_process_message");
216			break;
217		}
218	}
219
220out:	if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
221		ret = t_ret;
222
223	/* Don't close the environment before any children exit. */
224	if (ea != NULL && thread_join(elect_thr, &status) != 0)
225		dbenv->errx(dbenv, "can't join election thread");
226
227	if (site_thrs != NULL)
228		while (--nsites >= 0)
229			if (thread_join(site_thrs[nsites], &status) != 0)
230				dbenv->errx(dbenv, "can't join site thread");
231
232	return ((void *)(uintptr_t)ret);
233}
234
235/*
236 * This is a generic thread that spawns a thread to listen for connections
237 * on a socket and then spawns off child threads to handle each new
238 * connection.
239 */
240void *
241connect_thread(args)
242	void *args;
243{
244	DB_ENV *dbenv;
245	const char *home, *progname;
246	hm_loop_args *ha;
247	connect_args *cargs;
248	machtab_t *machtab;
249	thread_t hm_thrs[MAX_THREADS];
250	void *status;
251	int i, eid, port, ret;
252	socket_t fd, ns;
253
254	ha = NULL;
255	cargs = (connect_args *)args;
256	dbenv = cargs->dbenv;
257	home = cargs->home;
258	progname = cargs->progname;
259	machtab = cargs->machtab;
260	port = cargs->port;
261
262	/*
263	 * Loop forever, accepting connections from new machines,
264	 * and forking off a thread to handle each.
265	 */
266	if ((fd = listen_socket_init(progname, port)) < 0) {
267		ret = errno;
268		goto err;
269	}
270
271	for (i = 0; i < MAX_THREADS; i++) {
272		if ((ns = listen_socket_accept(machtab,
273		    progname, fd, &eid)) == SOCKET_CREATION_FAILURE) {
274			ret = errno;
275			goto err;
276		}
277		if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
278			dbenv->errx(dbenv, "can't allocate memory");
279			ret = errno;
280			goto err;
281		}
282		ha->progname = progname;
283		ha->home = home;
284		ha->fd = ns;
285		ha->eid = eid;
286		ha->tab = machtab;
287		ha->dbenv = dbenv;
288		if ((ret = thread_create(&hm_thrs[i++], NULL,
289		    hm_loop, (void *)ha)) != 0) {
290			dbenv->errx(dbenv, "can't create thread for site");
291			goto err;
292		}
293		ha = NULL;
294	}
295
296	/* If we fell out, we ended up with too many threads. */
297	dbenv->errx(dbenv, "Too many threads");
298	ret = ENOMEM;
299
300	/* Do not return until all threads have exited. */
301	while (--i >= 0)
302		if (thread_join(hm_thrs[i], &status) != 0)
303			dbenv->errx(dbenv, "can't join site thread");
304
305err:	return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
306}
307
308/*
309 * Open a connection to everyone that we've been told about.  If we
310 * cannot open some connections, keep trying.
311 */
312void *
313connect_all(args)
314	void *args;
315{
316	DB_ENV *dbenv;
317	all_args *aa;
318	const char *home, *progname;
319	hm_loop_args *ha;
320	int failed, i, nsites, open, ret, *success;
321	machtab_t *machtab;
322	thread_t *hm_thr;
323	repsite_t *sites;
324
325	ha = NULL;
326	aa = (all_args *)args;
327	dbenv = aa->dbenv;
328	progname = aa->progname;
329	home = aa->home;
330	machtab = aa->machtab;
331	nsites = aa->nsites;
332	sites = aa->sites;
333
334	ret = 0;
335	hm_thr = NULL;
336	success = NULL;
337
338	/* Some implementations of calloc are sad about allocating 0 things. */
339	if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) {
340		dbenv->err(dbenv, errno, "connect_all");
341		ret = 1;
342		goto err;
343	}
344
345	if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) {
346		dbenv->err(dbenv, errno, "connect_all");
347		ret = 1;
348		goto err;
349	}
350
351	for (failed = nsites; failed > 0;) {
352		for (i = 0; i < nsites; i++) {
353			if (success[i])
354				continue;
355
356			ret = connect_site(dbenv, machtab,
357			    progname, &sites[i], &open, &hm_thr[i]);
358
359			/*
360			 * If we couldn't make the connection, this isn't
361			 * fatal to the loop, but we have nothing further
362			 * to do on this machine at the moment.
363			 */
364			if (ret == DB_REP_UNAVAIL)
365				continue;
366
367			if (ret != 0)
368				goto err;
369
370			failed--;
371			success[i] = 1;
372
373			/* If the connection is already open, we're done. */
374			if (ret == 0 && open == 1)
375				continue;
376
377		}
378		sleep(1);
379	}
380
381err:	if (success != NULL)
382		free(success);
383	if (hm_thr != NULL)
384		free(hm_thr);
385	return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS);
386}
387
388static int
389connect_site(dbenv, machtab, progname, site, is_open, hm_thrp)
390	DB_ENV *dbenv;
391	machtab_t *machtab;
392	const char *progname;
393	repsite_t *site;
394	int *is_open;
395	thread_t *hm_thrp;
396{
397	int eid, ret;
398	socket_t s;
399	hm_loop_args *ha;
400
401	if ((s = get_connected_socket(machtab, progname,
402	    site->host, site->port, is_open, &eid)) < 0)
403		return (DB_REP_UNAVAIL);
404
405	if (*is_open)
406		return (0);
407
408	if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
409		dbenv->errx(dbenv, "can't allocate memory");
410		ret = errno;
411		goto err;
412	}
413
414	ha->progname = progname;
415	ha->fd = s;
416	ha->eid = eid;
417	ha->tab = machtab;
418	ha->dbenv = dbenv;
419
420	if ((ret = thread_create(hm_thrp, NULL,
421	    hm_loop, (void *)ha)) != 0) {
422		dbenv->errx(dbenv, "can't create thread for connected site");
423		goto err1;
424	}
425
426	return (0);
427
428err1:	free(ha);
429err:
430	return (ret);
431}
432
433/*
434 * We need to spawn off a new thread in which to hold an election in
435 * case we are the only thread listening on for messages.
436 */
437static void *
438elect_thread(args)
439	void *args;
440{
441	DB_ENV *dbenv;
442	elect_args *eargs;
443	machtab_t *machtab;
444	u_int32_t timeout;
445	int n, ret;
446	APP_DATA *app;
447
448	eargs = (elect_args *)args;
449	dbenv = eargs->dbenv;
450	machtab = eargs->machtab;
451	free(eargs);
452	app = dbenv->app_private;
453
454	machtab_parm(machtab, &n, &timeout);
455	(void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout);
456	while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0)
457		sleep(2);
458
459	if (app->elected) {
460		app->elected = 0;
461		if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0)
462			dbenv->err(dbenv, ret,
463			    "can't start as master in election thread");
464	}
465
466	return (NULL);
467}
468