1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#define	__INCLUDE_SELECT_H	1
13#include "db_int.h"
14
15/*
16 * A very rough guess at the maximum stack space one of our threads could ever
17 * need, which we hope is plenty conservative.  This can be patched in the field
18 * if necessary.
19 */
20#ifdef _POSIX_THREAD_ATTR_STACKSIZE
21size_t __repmgr_guesstimated_max = (128 * 1024);
22#endif
23
24/*
25 * Invalid open file descriptor value, that can be used as an out-of-band
26 * sentinel to mark our signalling pipe as unopened.
27 */
28#define	NO_SUCH_FILE_DESC	(-1)
29
30/* Aggregated control info needed for preparing for select() call. */
31struct io_info {
32	fd_set *reads, *writes;
33	int maxfd;
34};
35
36static int __repmgr_conn_work __P((ENV *, REPMGR_CONNECTION *, void *));
37static int finish_connecting __P((ENV *, REPMGR_CONNECTION *));
38static int prepare_io __P((ENV *, REPMGR_CONNECTION *, void *));
39
40/*
41 * Starts the thread described in the argument, and stores the resulting thread
42 * ID therein.
43 *
44 * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *));
45 */
46int
47__repmgr_thread_start(env, runnable)
48	ENV *env;
49	REPMGR_RUNNABLE *runnable;
50{
51	pthread_attr_t *attrp;
52#ifdef _POSIX_THREAD_ATTR_STACKSIZE
53	pthread_attr_t attributes;
54	size_t size;
55	int ret;
56#endif
57
58	runnable->finished = FALSE;
59
60#ifdef _POSIX_THREAD_ATTR_STACKSIZE
61	attrp = &attributes;
62	if ((ret = pthread_attr_init(&attributes)) != 0) {
63		__db_err(env,
64		    ret, "pthread_attr_init in repmgr_thread_start");
65		return (ret);
66	}
67
68	/*
69	 * On a 64-bit machine it seems reasonable that we could need twice as
70	 * much stack space as we did on a 32-bit machine.
71	 */
72	size = __repmgr_guesstimated_max;
73	if (sizeof(size_t) > 4)
74		size *= 2;
75#ifdef PTHREAD_STACK_MIN
76	if (size < PTHREAD_STACK_MIN)
77		size = PTHREAD_STACK_MIN;
78#endif
79	if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
80		__db_err(env,
81		    ret, "pthread_attr_setstacksize in repmgr_thread_start");
82		return (ret);
83	}
84#else
85	attrp = NULL;
86#endif
87
88	return (pthread_create(&runnable->thread_id, attrp,
89		    runnable->run, env));
90}
91
92/*
93 * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *));
94 */
95int
96__repmgr_thread_join(thread)
97	REPMGR_RUNNABLE *thread;
98{
99	return (pthread_join(thread->thread_id, NULL));
100}
101
102/*
103 * PUBLIC: int __repmgr_set_nonblocking __P((socket_t));
104 */
105int
106__repmgr_set_nonblocking(fd)
107	socket_t fd;
108{
109	int flags;
110
111	if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
112		return (errno);
113	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
114		return (errno);
115	return (0);
116}
117
118/*
119 * PUBLIC: int __repmgr_wake_waiting_senders __P((ENV *));
120 *
121 * Wake any send()-ing threads waiting for an acknowledgement.
122 *
123 * !!!
124 * Caller must hold the db_rep->mutex, if this thread synchronization is to work
125 * properly.
126 */
127int
128__repmgr_wake_waiting_senders(env)
129	ENV *env;
130{
131	return (pthread_cond_broadcast(&env->rep_handle->ack_condition));
132}
133
134/*
135 * PUBLIC: int __repmgr_await_ack __P((ENV *, const DB_LSN *));
136 *
137 * Waits (a limited time) for configured number of remote sites to ack the given
138 * LSN.
139 *
140 * !!!
141 * Caller must hold repmgr->mutex.
142 */
143int
144__repmgr_await_ack(env, lsnp)
145	ENV *env;
146	const DB_LSN *lsnp;
147{
148	DB_REP *db_rep;
149	struct timespec deadline;
150	int ret, timed;
151
152	db_rep = env->rep_handle;
153
154	if ((timed = (db_rep->ack_timeout > 0)))
155		__repmgr_compute_wait_deadline(env, &deadline,
156		    db_rep->ack_timeout);
157	else
158		COMPQUIET(deadline.tv_sec, 0);
159
160	while (!__repmgr_is_permanent(env, lsnp)) {
161		if (timed)
162			ret = pthread_cond_timedwait(&db_rep->ack_condition,
163			    db_rep->mutex, &deadline);
164		else
165			ret = pthread_cond_wait(&db_rep->ack_condition,
166			    db_rep->mutex);
167		if (db_rep->finished)
168			return (DB_REP_UNAVAIL);
169		if (ret != 0)
170			return (ret);
171	}
172	return (0);
173}
174
175/*
176 * __repmgr_compute_wait_deadline --
177 *	Computes a deadline time a certain distance into the future.
178 *
179 * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*,
180 * PUBLIC:    struct timespec *, db_timeout_t));
181 */
182void
183__repmgr_compute_wait_deadline(env, result, wait)
184	ENV *env;
185	struct timespec *result;
186	db_timeout_t wait;
187{
188	/*
189	 * The result is suitable for the pthread_cond_timewait call.  (That
190	 * call uses nano-second resolution; elsewhere we use microseconds.)
191	 *
192	 * Start with "now"; then add the "wait" offset.
193	 *
194	 * A db_timespec is the same as a "struct timespec" so we can pass
195	 * result directly to the underlying Berkeley DB OS routine.
196	 *
197	 * !!!
198	 * We use the system clock for the pthread_cond_timedwait call, but
199	 * that's not optimal on systems with monotonic timers.   Instead,
200	 * we should call pthread_condattr_setclock on systems where it and
201	 * monotonic timers are available, and then configure both this call
202	 * and the subsequent pthread_cond_timewait call to use a monotonic
203	 * timer.
204	 */
205	__os_gettime(env, (db_timespec *)result, 0);
206	TIMESPEC_ADD_DB_TIMEOUT(result, wait);
207}
208
209/*
210 * PUBLIC: int __repmgr_await_drain __P((ENV *,
211 * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
212 *
213 * Waits for space to become available on the connection's output queue.
214 * Various ways we can exit:
215 *
216 * 1. queue becomes non-full
217 * 2. exceed time limit
218 * 3. connection becomes defunct (due to error in another thread)
219 * 4. repmgr is shutting down
220 * 5. any unexpected system resource failure
221 *
222 * In cases #3 and #5 we return an error code.  Caller is responsible for
223 * distinguishing the remaining cases if desired.
224 *
225 * !!!
226 * Caller must hold repmgr->mutex.
227 */
228int
229__repmgr_await_drain(env, conn, timeout)
230	ENV *env;
231	REPMGR_CONNECTION *conn;
232	db_timeout_t timeout;
233{
234	DB_REP *db_rep;
235	struct timespec deadline;
236	int ret;
237
238	db_rep = env->rep_handle;
239
240	__repmgr_compute_wait_deadline(env, &deadline, timeout);
241
242	ret = 0;
243	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
244		ret = pthread_cond_timedwait(&conn->drained,
245		    db_rep->mutex, &deadline);
246		switch (ret) {
247		case 0:
248			if (db_rep->finished)
249				goto out; /* #4. */
250			/*
251			 * Another thread could have stumbled into an error on
252			 * the socket while we were waiting.
253			 */
254			if (conn->state == CONN_DEFUNCT) {
255				ret = DB_REP_UNAVAIL; /* #3. */
256				goto out;
257			}
258			break;
259		case ETIMEDOUT:
260			conn->state = CONN_CONGESTED;
261			ret = 0;
262			goto out; /* #2. */
263		default:
264			goto out; /* #5. */
265		}
266	}
267	/* #1. */
268
269out:
270	return (ret);
271}
272
273/*
274 * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
275 *
276 * Initialize a condition variable (in allocated space).
277 */
278int
279__repmgr_alloc_cond(c)
280	cond_var_t *c;
281{
282	return (pthread_cond_init(c, NULL));
283}
284
285/*
286 * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
287 *
288 * Clean up a previously initialized condition variable.
289 */
290int
291__repmgr_free_cond(c)
292	cond_var_t *c;
293{
294	return (pthread_cond_destroy(c));
295}
296
297/*
298 * PUBLIC: void __repmgr_env_create_pf __P((DB_REP *));
299 */
300void
301__repmgr_env_create_pf(db_rep)
302	DB_REP *db_rep;
303{
304	db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
305}
306
307/*
308 * PUBLIC: int __repmgr_create_mutex_pf __P((mgr_mutex_t *));
309 */
310int
311__repmgr_create_mutex_pf(mutex)
312	mgr_mutex_t *mutex;
313{
314	return (pthread_mutex_init(mutex, NULL));
315}
316
317/*
318 * PUBLIC: int __repmgr_destroy_mutex_pf __P((mgr_mutex_t *));
319 */
320int
321__repmgr_destroy_mutex_pf(mutex)
322	mgr_mutex_t *mutex;
323{
324	return (pthread_mutex_destroy(mutex));
325}
326
327/*
328 * PUBLIC: int __repmgr_init __P((ENV *));
329 */
330int
331__repmgr_init(env)
332	ENV *env;
333{
334	DB_REP *db_rep;
335	struct sigaction sigact;
336	int ack_inited, elect_inited, file_desc[2], queue_inited, ret;
337
338	db_rep = env->rep_handle;
339
340	/*
341	 * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed
342	 * just for trying to write onto a socket that had been reset.  Note
343	 * that we don't undo this in case of a later error, since we document
344	 * that we leave the signal handling state like this, even after env
345	 * close.
346	 */
347	if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
348		ret = errno;
349		__db_err(env, ret, "can't access signal handler");
350		return (ret);
351	}
352	if (sigact.sa_handler == SIG_DFL) {
353		sigact.sa_handler = SIG_IGN;
354		sigact.sa_flags = 0;
355		if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
356			ret = errno;
357			__db_err(env, ret, "can't access signal handler");
358			return (ret);
359		}
360	}
361
362	ack_inited = elect_inited = queue_inited = FALSE;
363	if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0)
364		goto err;
365	ack_inited = TRUE;
366
367	if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
368		goto err;
369	elect_inited = TRUE;
370
371	if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0)
372		goto err;
373	queue_inited = TRUE;
374
375	if ((ret = pipe(file_desc)) == -1) {
376		ret = errno;
377		goto err;
378	}
379
380	db_rep->read_pipe = file_desc[0];
381	db_rep->write_pipe = file_desc[1];
382	return (0);
383err:
384	if (queue_inited)
385		(void)pthread_cond_destroy(&db_rep->queue_nonempty);
386	if (elect_inited)
387		(void)pthread_cond_destroy(&db_rep->check_election);
388	if (ack_inited)
389		(void)pthread_cond_destroy(&db_rep->ack_condition);
390	db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
391
392	return (ret);
393}
394
395/*
396 * PUBLIC: int __repmgr_deinit __P((ENV *));
397 */
398int
399__repmgr_deinit(env)
400	ENV *env;
401{
402	DB_REP *db_rep;
403	int ret, t_ret;
404
405	db_rep = env->rep_handle;
406
407	if (!(REPMGR_INITED(db_rep)))
408		return (0);
409
410	ret = pthread_cond_destroy(&db_rep->queue_nonempty);
411
412	if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
413	    ret == 0)
414		ret = t_ret;
415
416	if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 &&
417	    ret == 0)
418		ret = t_ret;
419
420	if (close(db_rep->read_pipe) == -1 && ret == 0)
421		ret = errno;
422	if (close(db_rep->write_pipe) == -1 && ret == 0)
423		ret = errno;
424
425	db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
426	return (ret);
427}
428
429/*
430 * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *));
431 */
432int
433__repmgr_lock_mutex(mutex)
434	mgr_mutex_t  *mutex;
435{
436	return (pthread_mutex_lock(mutex));
437}
438
439/*
440 * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *));
441 */
442int
443__repmgr_unlock_mutex(mutex)
444	mgr_mutex_t  *mutex;
445{
446	return (pthread_mutex_unlock(mutex));
447}
448
449/*
450 * Signals a condition variable.
451 *
452 * !!!
453 * Caller must hold mutex.
454 *
455 * PUBLIC: int __repmgr_signal __P((cond_var_t *));
456 */
457int
458__repmgr_signal(v)
459	cond_var_t *v;
460{
461	return (pthread_cond_broadcast(v));
462}
463
464/*
465 * PUBLIC: int __repmgr_wake_main_thread __P((ENV*));
466 */
467int
468__repmgr_wake_main_thread(env)
469	ENV *env;
470{
471	DB_REP *db_rep;
472	u_int8_t any_value;
473
474	COMPQUIET(any_value, 0);
475	db_rep = env->rep_handle;
476
477	/*
478	 * It doesn't matter what byte value we write.  Just the appearance of a
479	 * byte in the stream is enough to wake up the select() thread reading
480	 * the pipe.
481	 */
482	if (write(db_rep->write_pipe, &any_value, 1) == -1)
483		return (errno);
484	return (0);
485}
486
487/*
488 * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *));
489 */
490int
491__repmgr_writev(fd, iovec, buf_count, byte_count_p)
492	socket_t fd;
493	db_iovec_t *iovec;
494	int buf_count;
495	size_t *byte_count_p;
496{
497	int nw;
498
499	if ((nw = writev(fd, iovec, buf_count)) == -1)
500		return (errno);
501	*byte_count_p = (size_t)nw;
502	return (0);
503}
504
505/*
506 * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *));
507 */
508int
509__repmgr_readv(fd, iovec, buf_count, byte_count_p)
510	socket_t fd;
511	db_iovec_t *iovec;
512	int buf_count;
513	size_t *byte_count_p;
514{
515	ssize_t nw;
516
517	if ((nw = readv(fd, iovec, buf_count)) == -1)
518		return (errno);
519	*byte_count_p = (size_t)nw;
520	return (0);
521}
522
523/*
524 * PUBLIC: int __repmgr_select_loop __P((ENV *));
525 */
526int
527__repmgr_select_loop(env)
528	ENV *env;
529{
530	struct timeval select_timeout, *select_timeout_p;
531	DB_REP *db_rep;
532	db_timespec timeout;
533	fd_set reads, writes;
534	struct io_info io_info;
535	int ret;
536	u_int8_t buf[10];	/* arbitrary size */
537
538	db_rep = env->rep_handle;
539	/*
540	 * Almost this entire thread operates while holding the mutex.  But note
541	 * that it never blocks, except in the call to select() (which is the
542	 * one place we relinquish the mutex).
543	 */
544	LOCK_MUTEX(db_rep->mutex);
545	if ((ret = __repmgr_first_try_connections(env)) != 0)
546		goto out;
547	for (;;) {
548		FD_ZERO(&reads);
549		FD_ZERO(&writes);
550
551		/*
552		 * Figure out which sockets to ask for input and output.  It's
553		 * simple for the signalling pipe and listen socket; but depends
554		 * on backlog states for the connections to other sites.
555		 */
556		FD_SET((u_int)db_rep->read_pipe, &reads);
557		io_info.maxfd = db_rep->read_pipe;
558
559		if (!IS_SUBORDINATE(db_rep)) {
560			FD_SET((u_int)db_rep->listen_fd, &reads);
561			if (db_rep->listen_fd > io_info.maxfd)
562				io_info.maxfd = db_rep->listen_fd;
563		}
564
565		io_info.reads = &reads;
566		io_info.writes = &writes;
567		if ((ret = __repmgr_each_connection(env,
568		    prepare_io, &io_info, TRUE)) != 0)
569			goto out;
570
571		if (__repmgr_compute_timeout(env, &timeout)) {
572			/* Convert the timespec to a timeval. */
573			select_timeout.tv_sec = timeout.tv_sec;
574			select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
575			select_timeout_p = &select_timeout;
576		} else {
577			/* No time-based events, so wait only for I/O. */
578			select_timeout_p = NULL;
579		}
580
581		UNLOCK_MUTEX(db_rep->mutex);
582
583		if ((ret = select(io_info.maxfd + 1,
584		    &reads, &writes, NULL, select_timeout_p)) == -1) {
585			switch (ret = errno) {
586			case EINTR:
587			case EWOULDBLOCK:
588				LOCK_MUTEX(db_rep->mutex);
589				continue; /* simply retry */
590			default:
591				__db_err(env, ret, "select");
592				return (ret);
593			}
594		}
595		LOCK_MUTEX(db_rep->mutex);
596
597		/*
598		 * Timer expiration events include retrying of lost connections.
599		 * Obviously elements can be added to the connection list there.
600		 */
601		if ((ret = __repmgr_check_timeouts(env)) != 0)
602			goto out;
603
604		if ((ret = __repmgr_each_connection(env,
605		    __repmgr_conn_work, &io_info, TRUE)) != 0)
606			goto out;
607
608		/*
609		 * Read any bytes in the signalling pipe.  Note that we don't
610		 * actually need to do anything with them; they're just there to
611		 * wake us up when necessary.
612		 */
613		if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) {
614			if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) {
615				ret = errno;
616				goto out;
617			} else if (db_rep->finished) {
618				ret = 0;
619				goto out;
620			}
621		}
622		/*
623		 * Obviously elements can be added to the connection list here.
624		 */
625		if (!IS_SUBORDINATE(db_rep) &&
626		    FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
627		    (ret = __repmgr_accept(env)) != 0)
628			goto out;
629	}
630out:
631	UNLOCK_MUTEX(db_rep->mutex);
632	return (ret);
633}
634
635/*
636 * Examines a connection to see what sort of I/O to ask for.  Clean up defunct
637 * connections.
638 */
639static int
640prepare_io(env, conn, info_)
641	ENV *env;
642	REPMGR_CONNECTION *conn;
643	void *info_;
644{
645	struct io_info *info;
646
647	info = info_;
648
649	if (conn->state == CONN_DEFUNCT)
650		return (__repmgr_cleanup_connection(env, conn));
651
652	if (conn->state == CONN_CONNECTING) {
653		FD_SET((u_int)conn->fd, info->reads);
654		FD_SET((u_int)conn->fd, info->writes);
655		if (conn->fd > info->maxfd)
656			info->maxfd = conn->fd;
657		return (0);
658	}
659
660	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
661		FD_SET((u_int)conn->fd, info->writes);
662		if (conn->fd > info->maxfd)
663			info->maxfd = conn->fd;
664	}
665	/*
666	 * For now we always accept incoming data.  If we ever implement some
667	 * kind of flow control, we should override it for fledgling connections
668	 * (!IS_VALID_EID(conn->eid)) -- in other words, allow reading such a
669	 * connection even during flow control duress.
670	 */
671	FD_SET((u_int)conn->fd, info->reads);
672	if (conn->fd > info->maxfd)
673		info->maxfd = conn->fd;
674
675	return (0);
676}
677
678/*
679 * Examine a connection, to see what work needs to be done.
680 */
681static int
682__repmgr_conn_work(env, conn, info_)
683	ENV *env;
684	REPMGR_CONNECTION *conn;
685	void *info_;
686{
687	struct io_info *info;
688	int ret;
689	u_int fd;
690
691	ret = 0;
692	fd = (u_int)conn->fd;
693	info = info_;
694
695	if (conn->state == CONN_DEFUNCT)
696		return (0);
697
698	if (conn->state == CONN_CONNECTING) {
699		if (FD_ISSET(fd, info->reads) || FD_ISSET(fd, info->writes))
700			ret = finish_connecting(env, conn);
701	} else {
702		/*
703		 * Here, the site is connected, and the FD_SET's are valid.
704		 */
705		if (FD_ISSET(fd, info->writes))
706			ret = __repmgr_write_some(env, conn);
707
708		if (ret == 0 && FD_ISSET(fd, info->reads))
709			ret = __repmgr_read_from_site(env, conn);
710	}
711
712	if (ret == DB_REP_UNAVAIL)
713		ret = __repmgr_bust_connection(env, conn);
714	return (ret);
715}
716
717static int
718finish_connecting(env, conn)
719	ENV *env;
720	REPMGR_CONNECTION *conn;
721{
722	DB_REP *db_rep;
723	REPMGR_SITE *site;
724	socklen_t len;
725	SITE_STRING_BUFFER buffer;
726	u_int eid;
727	int error, ret;
728
729	db_rep = env->rep_handle;
730
731	DB_ASSERT(env, IS_VALID_EID(conn->eid));
732	eid = (u_int)conn->eid;
733	site = SITE_FROM_EID(eid);
734
735	len = sizeof(error);
736	if (getsockopt(
737	    conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0)
738		goto err_rpt;
739	if (error) {
740		errno = error;
741		goto err_rpt;
742	}
743
744	conn->state = CONN_CONNECTED;
745	__os_gettime(env, &site->last_rcvd_timestamp, 1);
746	return (__repmgr_propose_version(env, conn));
747
748err_rpt:
749	__db_err(env, errno,
750	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
751
752	/* If we've exhausted the list of possible addresses, give up. */
753	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
754		STAT(db_rep->region->mstat.st_connect_fail++);
755		return (DB_REP_UNAVAIL);
756	}
757
758	/*
759	 * Since we're immediately trying the next address in the list, simply
760	 * disable the failed connection, without the usual recovery.
761	 */
762	__repmgr_disable_connection(env, conn);
763
764	ret = __repmgr_connect_site(env, eid);
765	DB_ASSERT(env, ret != DB_REP_UNAVAIL);
766	return (ret);
767}
768