1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_posix.c,v 1.37 2008/03/13 17:31:28 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#define	__INCLUDE_SELECT_H	1
13#include "db_int.h"
14
15/*
16 * A very rough guess at the maximum stack space one of our threads could ever
17 * need, which we hope is plenty conservative.  This can be patched in the field
18 * if necessary.
19 */
20#ifdef _POSIX_THREAD_ATTR_STACKSIZE
21size_t __repmgr_guesstimated_max = (128 * 1024);
22#endif
23
24static int __repmgr_conn_work __P((ENV *,
25    REPMGR_CONNECTION *, fd_set *, fd_set *, int));
26static int finish_connecting __P((ENV *, REPMGR_CONNECTION *));
27
28/*
29 * Starts the thread described in the argument, and stores the resulting thread
30 * ID therein.
31 *
32 * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *));
33 */
34int
35__repmgr_thread_start(env, runnable)
36	ENV *env;
37	REPMGR_RUNNABLE *runnable;
38{
39	pthread_attr_t *attrp;
40#ifdef _POSIX_THREAD_ATTR_STACKSIZE
41	pthread_attr_t attributes;
42	size_t size;
43	int ret;
44#endif
45
46	runnable->finished = FALSE;
47
48#ifdef _POSIX_THREAD_ATTR_STACKSIZE
49	attrp = &attributes;
50	if ((ret = pthread_attr_init(&attributes)) != 0) {
51		__db_err(env,
52		    ret, "pthread_attr_init in repmgr_thread_start");
53		return (ret);
54	}
55
56	/*
57	 * On a 64-bit machine it seems reasonable that we could need twice as
58	 * much stack space as we did on a 32-bit machine.
59	 */
60	size = __repmgr_guesstimated_max;
61	if (sizeof(size_t) > 4)
62		size *= 2;
63#ifdef PTHREAD_STACK_MIN
64	if (size < PTHREAD_STACK_MIN)
65		size = PTHREAD_STACK_MIN;
66#endif
67	if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
68		__db_err(env,
69		    ret, "pthread_attr_setstacksize in repmgr_thread_start");
70		return (ret);
71	}
72#else
73	attrp = NULL;
74#endif
75
76	return (pthread_create(&runnable->thread_id, attrp,
77		    runnable->run, env));
78}
79
80/*
81 * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
82 */
83int
84__repmgr_thread_join(thread)
85	REPMGR_RUNNABLE *thread;
86{
87	return (pthread_join(thread->thread_id, NULL));
88}
89
90/*
91 * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
92 */
93int
94__repmgr_set_nonblocking(fd)
95	socket_t fd;
96{
97	int flags;
98
99	if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
100		return (errno);
101	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
102		return (errno);
103	return (0);
104}
105
106/*
107 * PUBLIC: int __repmgr_wake_waiting_senders __P((ENV *));
108 *
109 * Wake any send()-ing threads waiting for an acknowledgement.
110 *
111 * !!!
112 * Caller must hold the db_rep->mutex, if this thread synchronization is to work
113 * properly.
114 */
115int
116__repmgr_wake_waiting_senders(env)
117	ENV *env;
118{
119	return (pthread_cond_broadcast(&env->rep_handle->ack_condition));
120}
121
122/*
123 * PUBLIC: int __repmgr_await_ack __P((ENV *, const DB_LSN *));
124 *
125 * Waits (a limited time) for configured number of remote sites to ack the given
126 * LSN.
127 *
128 * !!!
129 * Caller must hold repmgr->mutex.
130 */
131int
132__repmgr_await_ack(env, lsnp)
133	ENV *env;
134	const DB_LSN *lsnp;
135{
136	DB_REP *db_rep;
137	struct timespec deadline;
138	int ret, timed;
139
140	db_rep = env->rep_handle;
141
142	if ((timed = (db_rep->ack_timeout > 0)))
143		__repmgr_compute_wait_deadline(env, &deadline,
144		    db_rep->ack_timeout);
145	else
146		COMPQUIET(deadline.tv_sec, 0);
147
148	while (!__repmgr_is_permanent(env, lsnp)) {
149		if (timed)
150			ret = pthread_cond_timedwait(&db_rep->ack_condition,
151			    &db_rep->mutex, &deadline);
152		else
153			ret = pthread_cond_wait(&db_rep->ack_condition,
154			    &db_rep->mutex);
155		if (db_rep->finished)
156			return (DB_REP_UNAVAIL);
157		if (ret != 0)
158			return (ret);
159	}
160	return (0);
161}
162
163/*
164 * __repmgr_compute_wait_deadline --
165 *	Computes a deadline time a certain distance into the future.
166 *
167 * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*,
168 * PUBLIC:    struct timespec *, db_timeout_t));
169 */
170void
171__repmgr_compute_wait_deadline(env, result, wait)
172	ENV *env;
173	struct timespec *result;
174	db_timeout_t wait;
175{
176	/*
177	 * The result is suitable for the pthread_cond_timewait call.  (That
178	 * call uses nano-second resolution; elsewhere we use microseconds.)
179	 *
180	 * Start with "now"; then add the "wait" offset.
181	 *
182	 * A db_timespec is the same as a "struct timespec" so we can pass
183	 * result directly to the underlying Berkeley DB OS routine.
184	 *
185	 * !!!
186	 * We use the system clock for the pthread_cond_timedwait call, but
187	 * that's not optimal on systems with monotonic timers.   Instead,
188	 * we should call pthread_condattr_setclock on systems where it and
189	 * monotonic timers are available, and then configure both this call
190	 * and the subsequent pthread_cond_timewait call to use a monotonic
191	 * timer.
192	 */
193	__os_gettime(env, (db_timespec *)result, 0);
194	TIMESPEC_ADD_DB_TIMEOUT(result, wait);
195}
196
197/*
198 * PUBLIC: int __repmgr_await_drain __P((ENV *,
199 * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
200 *
201 * Waits for space to become available on the connection's output queue.
202 * Various ways we can exit:
203 *
204 * 1. queue becomes non-full
205 * 2. exceed time limit
206 * 3. connection becomes defunct (due to error in another thread)
207 * 4. repmgr is shutting down
208 * 5. any unexpected system resource failure
209 *
210 * In cases #3 and #5 we return an error code.  Caller is responsible for
211 * distinguishing the remaining cases if desired.
212 *
213 * !!!
214 * Caller must hold repmgr->mutex.
215 */
216int
217__repmgr_await_drain(env, conn, timeout)
218	ENV *env;
219	REPMGR_CONNECTION *conn;
220	db_timeout_t timeout;
221{
222	DB_REP *db_rep;
223	struct timespec deadline;
224	int ret;
225
226	db_rep = env->rep_handle;
227
228	__repmgr_compute_wait_deadline(env, &deadline, timeout);
229
230	ret = 0;
231	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
232		ret = pthread_cond_timedwait(&conn->drained,
233		    &db_rep->mutex, &deadline);
234		switch (ret) {
235		case 0:
236			if (db_rep->finished)
237				goto out; /* #4. */
238			/*
239			 * Another thread could have stumbled into an error on
240			 * the socket while we were waiting.
241			 */
242			if (conn->state == CONN_DEFUNCT) {
243				ret = DB_REP_UNAVAIL; /* #3. */
244				goto out;
245			}
246			break;
247		case ETIMEDOUT:
248			conn->state = CONN_CONGESTED;
249			ret = 0;
250			goto out; /* #2. */
251		default:
252			goto out; /* #5. */
253		}
254	}
255	/* #1. */
256
257out:
258	return (ret);
259}
260
261/*
262 * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
263 *
264 * Initialize a condition variable (in allocated space).
265 */
266int
267__repmgr_alloc_cond(c)
268	cond_var_t *c;
269{
270	return (pthread_cond_init(c, NULL));
271}
272
273/*
274 * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
275 *
276 * Clean up a previously initialized condition variable.
277 */
278int
279__repmgr_free_cond(c)
280	cond_var_t *c;
281{
282	return (pthread_cond_destroy(c));
283}
284
285/*
286 * PUBLIC: int __repmgr_init_sync __P((ENV *, DB_REP *));
287 *
288 * Allocate/initialize all data necessary for thread synchronization.  This
289 * should be an all-or-nothing affair.  Other than here and in _close_sync there
290 * should never be a time when these resources aren't either all allocated or
291 * all freed.  If that's true, then we can safely use the values of the file
292 * descriptor(s) to keep track of which it is.
293 */
294int
295__repmgr_init_sync(env, db_rep)
296	ENV *env;
297	DB_REP *db_rep;
298{
299	int ret, mutex_inited, ack_inited, elect_inited, queue_inited,
300	    file_desc[2];
301
302	COMPQUIET(env, NULL);
303
304	mutex_inited = ack_inited = elect_inited = queue_inited = FALSE;
305
306	if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0)
307		goto err;
308	mutex_inited = TRUE;
309
310	if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0)
311		goto err;
312	ack_inited = TRUE;
313
314	if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
315		goto err;
316	elect_inited = TRUE;
317
318	if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0)
319		goto err;
320	queue_inited = TRUE;
321
322	if ((ret = pipe(file_desc)) == -1) {
323		ret = errno;
324		goto err;
325	}
326
327	db_rep->read_pipe = file_desc[0];
328	db_rep->write_pipe = file_desc[1];
329	return (0);
330err:
331	if (queue_inited)
332		(void)pthread_cond_destroy(&db_rep->queue_nonempty);
333	if (elect_inited)
334		(void)pthread_cond_destroy(&db_rep->check_election);
335	if (ack_inited)
336		(void)pthread_cond_destroy(&db_rep->ack_condition);
337	if (mutex_inited)
338		(void)pthread_mutex_destroy(&db_rep->mutex);
339	db_rep->read_pipe = db_rep->write_pipe = -1;
340
341	return (ret);
342}
343
344/*
345 * PUBLIC: int __repmgr_close_sync __P((ENV *));
346 *
347 * Frees the thread synchronization data within a repmgr struct, in a
348 * platform-specific way.
349 */
350int
351__repmgr_close_sync(env)
352	ENV *env;
353{
354	DB_REP *db_rep;
355	int ret, t_ret;
356
357	db_rep = env->rep_handle;
358
359	if (!(REPMGR_SYNC_INITED(db_rep)))
360		return (0);
361
362	ret = pthread_cond_destroy(&db_rep->queue_nonempty);
363
364	if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
365	    ret == 0)
366		ret = t_ret;
367
368	if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 &&
369	    ret == 0)
370		ret = t_ret;
371
372	if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 &&
373	    ret == 0)
374		ret = t_ret;
375
376	if (close(db_rep->read_pipe) == -1 && ret == 0)
377		ret = errno;
378	if (close(db_rep->write_pipe) == -1 && ret == 0)
379		ret = errno;
380
381	db_rep->read_pipe = db_rep->write_pipe = -1;
382	return (ret);
383}
384
385/*
386 * Performs net-related resource initialization other than memory initialization
387 * and allocation.  A valid db_rep->listen_fd acts as the "all-or-nothing"
388 * sentinel signifying that these resources are allocated.
389 *
390 * PUBLIC: int __repmgr_net_init __P((ENV *, DB_REP *));
391 */
392int
393__repmgr_net_init(env, db_rep)
394	ENV *env;
395	DB_REP *db_rep;
396{
397	int ret;
398	struct sigaction sigact;
399
400	if ((ret = __repmgr_listen(env)) != 0)
401		return (ret);
402
403	/*
404	 * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
405	 * just for trying to write onto a socket that had been reset.
406	 */
407	if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
408		ret = errno;
409		__db_err(env, ret, "can't access signal handler");
410		goto err;
411	}
412	/*
413	 * If we need to change the sig handler, do so, and also set a flag so
414	 * that we remember we did.
415	 */
416	if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) {
417		sigact.sa_handler = SIG_IGN;
418		sigact.sa_flags = 0;
419		if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
420			ret = errno;
421			__db_err(env, ret, "can't access signal handler");
422			goto err;
423		}
424	}
425	return (0);
426
427err:
428	(void)closesocket(db_rep->listen_fd);
429	db_rep->listen_fd = INVALID_SOCKET;
430	return (ret);
431}
432
433/*
434 * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
435 */
436int
437__repmgr_lock_mutex(mutex)
438	mgr_mutex_t  *mutex;
439{
440	return (pthread_mutex_lock(mutex));
441}
442
443/*
444 * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
445 */
446int
447__repmgr_unlock_mutex(mutex)
448	mgr_mutex_t  *mutex;
449{
450	return (pthread_mutex_unlock(mutex));
451}
452
453/*
454 * Signals a condition variable.
455 *
456 * !!!
457 * Caller must hold mutex.
458 *
459 * PUBLIC: int __repmgr_signal __P((cond_var_t *));
460 */
461int
462__repmgr_signal(v)
463	cond_var_t *v;
464{
465	return (pthread_cond_broadcast(v));
466}
467
468/*
469 * PUBLIC: int __repmgr_wake_main_thread __P((ENV*));
470 */
471int
472__repmgr_wake_main_thread(env)
473	ENV *env;
474{
475	DB_REP *db_rep;
476	u_int8_t any_value;
477
478	COMPQUIET(any_value, 0);
479	db_rep = env->rep_handle;
480
481	/*
482	 * It doesn't matter what byte value we write.  Just the appearance of a
483	 * byte in the stream is enough to wake up the select() thread reading
484	 * the pipe.
485	 */
486	if (write(db_rep->write_pipe, &any_value, 1) == -1)
487		return (errno);
488	return (0);
489}
490
491/*
492 * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
493 */
494int
495__repmgr_writev(fd, iovec, buf_count, byte_count_p)
496	socket_t fd;
497	db_iovec_t *iovec;
498	int buf_count;
499	size_t *byte_count_p;
500{
501	int nw;
502
503	if ((nw = writev(fd, iovec, buf_count)) == -1)
504		return (errno);
505	*byte_count_p = (size_t)nw;
506	return (0);
507}
508
509/*
510 * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
511 */
512int
513__repmgr_readv(fd, iovec, buf_count, byte_count_p)
514	socket_t fd;
515	db_iovec_t *iovec;
516	int buf_count;
517	size_t *byte_count_p;
518{
519	ssize_t nw;
520
521	if ((nw = readv(fd, iovec, buf_count)) == -1)
522		return (errno);
523	*byte_count_p = (size_t)nw;
524	return (0);
525}
526
527/*
528 * PUBLIC: int __repmgr_select_loop __P((ENV *));
529 */
530int
531__repmgr_select_loop(env)
532	ENV *env;
533{
534	struct timeval select_timeout, *select_timeout_p;
535	DB_REP *db_rep;
536	REPMGR_CONNECTION *conn, *next;
537	db_timespec timeout;
538	fd_set reads, writes;
539	int ret, flow_control, maxfd;
540	u_int8_t buf[10];	/* arbitrary size */
541
542	flow_control = FALSE;
543
544	db_rep = env->rep_handle;
545	/*
546	 * Almost this entire thread operates while holding the mutex.  But note
547	 * that it never blocks, except in the call to select() (which is the
548	 * one place we relinquish the mutex).
549	 */
550	LOCK_MUTEX(db_rep->mutex);
551	if ((ret = __repmgr_first_try_connections(env)) != 0)
552		goto out;
553	for (;;) {
554		FD_ZERO(&reads);
555		FD_ZERO(&writes);
556
557		/*
558		 * Always ask for input on listening socket and signalling
559		 * pipe.
560		 */
561		FD_SET((u_int)db_rep->listen_fd, &reads);
562		maxfd = db_rep->listen_fd;
563
564		FD_SET((u_int)db_rep->read_pipe, &reads);
565		if (db_rep->read_pipe > maxfd)
566			maxfd = db_rep->read_pipe;
567
568		/*
569		 * Examine all connections to see what sort of I/O to ask for on
570		 * each one.  Clean up defunct connections; note that this is
571		 * the only place where elements get deleted from this list.
572		 *
573		 * The TAILQ_FOREACH macro would be suitable here, except that
574		 * it doesn't allow unlinking the current element., which is
575		 * needed for cleanup_connection.
576		 */
577		for (conn = TAILQ_FIRST(&db_rep->connections);
578		     conn != NULL;
579		     conn = next) {
580			next = TAILQ_NEXT(conn, entries);
581
582			if (conn->state == CONN_DEFUNCT) {
583				if ((ret = __repmgr_cleanup_connection(env,
584				    conn)) != 0)
585					goto out;
586				continue;
587			}
588
589			if (conn->state == CONN_CONNECTING) {
590				FD_SET((u_int)conn->fd, &reads);
591				FD_SET((u_int)conn->fd, &writes);
592				if (conn->fd > maxfd)
593					maxfd = conn->fd;
594				continue;
595			}
596
597			if (!STAILQ_EMPTY(&conn->outbound_queue)) {
598				FD_SET((u_int)conn->fd, &writes);
599				if (conn->fd > maxfd)
600					maxfd = conn->fd;
601			}
602			/*
603			 * If we haven't yet gotten site's handshake, then read
604			 * from it even if we're flow-controlling.
605			 */
606			if (!flow_control || !IS_VALID_EID(conn->eid)) {
607				FD_SET((u_int)conn->fd, &reads);
608				if (conn->fd > maxfd)
609					maxfd = conn->fd;
610			}
611		}
612
613		if (__repmgr_compute_timeout(env, &timeout)) {
614			/* Convert the timespec to a timeval. */
615			select_timeout.tv_sec = timeout.tv_sec;
616			select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
617			select_timeout_p = &select_timeout;
618		} else {
619			/* No time-based events, so wait only for I/O. */
620			select_timeout_p = NULL;
621		}
622
623		UNLOCK_MUTEX(db_rep->mutex);
624
625		if ((ret = select(maxfd + 1,
626		    &reads, &writes, NULL, select_timeout_p)) == -1) {
627			switch (ret = errno) {
628			case EINTR:
629			case EWOULDBLOCK:
630				LOCK_MUTEX(db_rep->mutex);
631				continue; /* simply retry */
632			default:
633				__db_err(env, ret, "select");
634				return (ret);
635			}
636		}
637		LOCK_MUTEX(db_rep->mutex);
638
639		/*
640		 * Timer expiration events include retrying of lost connections.
641		 * Obviously elements can be added to the connection list there.
642		 */
643		if ((ret = __repmgr_check_timeouts(env)) != 0)
644			goto out;
645
646		/*
647		 * Examine each connection, to see what work needs to be done.
648		 * Except for one obscure case in finish_connecting, no
649		 * structural change to the connections list happens here.
650		 */
651		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
652			if (conn->state == CONN_DEFUNCT)
653				continue;
654
655			if ((ret = __repmgr_conn_work(env,
656			    conn, &reads, &writes, flow_control)) != 0)
657				goto out;
658		}
659
660		/*
661		 * Read any bytes in the signalling pipe.  Note that we don't
662		 * actually need to do anything with them; they're just there to
663		 * wake us up when necessary.
664		 */
665		if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) {
666			if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) {
667				ret = errno;
668				goto out;
669			} else if (db_rep->finished) {
670				ret = 0;
671				goto out;
672			}
673		}
674		/*
675		 * Obviously elements can be added to the connection list here.
676		 */
677		if (FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
678		    (ret = __repmgr_accept(env)) != 0)
679			goto out;
680	}
681out:
682	UNLOCK_MUTEX(db_rep->mutex);
683	return (ret);
684}
685
686static int
687__repmgr_conn_work(env, conn, reads, writes, flow_control)
688	ENV *env;
689	REPMGR_CONNECTION *conn;
690	fd_set *reads, *writes;
691	int flow_control;
692{
693	int ret;
694	u_int fd;
695
696	ret = 0;
697	fd = (u_int)conn->fd;
698
699	if (conn->state == CONN_CONNECTING) {
700		if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
701			ret = finish_connecting(env, conn);
702	} else {
703		/*
704		 * Here, the site is connected, and the FD_SET's are valid.
705		 */
706		if (FD_ISSET(fd, writes))
707			ret = __repmgr_write_some(env, conn);
708
709		if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
710			ret = __repmgr_read_from_site(env, conn);
711	}
712
713	if (ret == DB_REP_UNAVAIL)
714		ret = __repmgr_bust_connection(env, conn);
715	return (ret);
716}
717
718static int
719finish_connecting(env, conn)
720	ENV *env;
721	REPMGR_CONNECTION *conn;
722{
723	DB_REP *db_rep;
724	REPMGR_SITE *site;
725	socklen_t len;
726	SITE_STRING_BUFFER buffer;
727	u_int eid;
728	int error, ret;
729
730	len = sizeof(error);
731	if (getsockopt(
732	    conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0)
733		goto err_rpt;
734	if (error) {
735		errno = error;
736		goto err_rpt;
737	}
738
739	conn->state = CONN_CONNECTED;
740	return (__repmgr_propose_version(env, conn));
741
742err_rpt:
743	db_rep = env->rep_handle;
744
745	DB_ASSERT(env, IS_VALID_EID(conn->eid));
746	eid = (u_int)conn->eid;
747
748	site = SITE_FROM_EID(eid);
749	__db_err(env, errno,
750	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
751
752	/* If we've exhausted the list of possible addresses, give up. */
753	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
754		STAT(db_rep->region->mstat.st_connect_fail++);
755		return (DB_REP_UNAVAIL);
756	}
757
758	/*
759	 * Since we're immediately trying the next address in the list, simply
760	 * disable the failed connection, without the usual recovery.
761	 */
762	DISABLE_CONNECTION(conn);
763
764	ret = __repmgr_connect_site(env, eid);
765	DB_ASSERT(env, ret != DB_REP_UNAVAIL);
766	return (ret);
767}
768