1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_queue.c,v 1.12 2008/01/08 20:58:48 bostic Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER;
15struct __repmgr_queue {
16	int size;
17	QUEUE_HEADER header;
18};
19
20/*
21 * PUBLIC: int __repmgr_queue_create __P((ENV *, DB_REP *));
22 */
23int
24__repmgr_queue_create(env, db_rep)
25	ENV *env;
26	DB_REP *db_rep;
27{
28	REPMGR_QUEUE *q;
29	int ret;
30
31	if ((ret = __os_calloc(env, 1, sizeof(REPMGR_QUEUE), &q)) != 0)
32		return (ret);
33	q->size = 0;
34	STAILQ_INIT(&q->header);
35	db_rep->input_queue = q;
36	return (0);
37}
38
39/*
40 * Frees not only the queue header, but also any messages that may be on it,
41 * along with their data buffers.
42 *
43 * PUBLIC: void __repmgr_queue_destroy __P((ENV *));
44 */
45void
46__repmgr_queue_destroy(env)
47	ENV *env;
48{
49	REPMGR_MESSAGE *m;
50	REPMGR_QUEUE *q;
51
52	if ((q = env->rep_handle->input_queue) == NULL)
53		return;
54
55	while (!STAILQ_EMPTY(&q->header)) {
56		m = STAILQ_FIRST(&q->header);
57		STAILQ_REMOVE_HEAD(&q->header, entries);
58		__os_free(env, m);
59	}
60	__os_free(env, q);
61}
62
63/*
64 * PUBLIC: int __repmgr_queue_get __P((ENV *, REPMGR_MESSAGE **));
65 *
66 * Get the first input message from the queue and return it to the caller.  The
67 * caller hereby takes responsibility for the entire message buffer, and should
68 * free it when done.
69 *
70 * Note that caller is NOT expected to hold the mutex.  This is asymmetric with
71 * put(), because put() is expected to be called in a loop after select, where
72 * it's already necessary to be holding the mutex.
73 */
74int
75__repmgr_queue_get(env, msgp)
76	ENV *env;
77	REPMGR_MESSAGE **msgp;
78{
79	DB_REP *db_rep;
80	REPMGR_MESSAGE *m;
81	REPMGR_QUEUE *q;
82	int ret;
83
84	ret = 0;
85	db_rep = env->rep_handle;
86	q = db_rep->input_queue;
87
88	LOCK_MUTEX(db_rep->mutex);
89	while (STAILQ_EMPTY(&q->header) && !db_rep->finished) {
90#ifdef DB_WIN32
91		if (!ResetEvent(db_rep->queue_nonempty)) {
92			ret = GetLastError();
93			goto err;
94		}
95		if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty,
96			INFINITE, FALSE) != WAIT_OBJECT_0) {
97			ret = GetLastError();
98			goto err;
99		}
100		LOCK_MUTEX(db_rep->mutex);
101#else
102		if ((ret = pthread_cond_wait(&db_rep->queue_nonempty,
103		    &db_rep->mutex)) != 0)
104			goto err;
105#endif
106	}
107	if (db_rep->finished)
108		ret = DB_REP_UNAVAIL;
109	else {
110		m = STAILQ_FIRST(&q->header);
111		STAILQ_REMOVE_HEAD(&q->header, entries);
112		q->size--;
113		*msgp = m;
114	}
115
116err:
117	UNLOCK_MUTEX(db_rep->mutex);
118	return (ret);
119}
120
121/*
122 * PUBLIC: int __repmgr_queue_put __P((ENV *, REPMGR_MESSAGE *));
123 *
124 * !!!
125 * Caller must hold repmgr->mutex.
126 */
127int
128__repmgr_queue_put(env, msg)
129	ENV *env;
130	REPMGR_MESSAGE *msg;
131{
132	DB_REP *db_rep;
133	REPMGR_QUEUE *q;
134
135	db_rep = env->rep_handle;
136	q = db_rep->input_queue;
137
138	STAILQ_INSERT_TAIL(&q->header, msg, entries);
139	q->size++;
140
141	return (__repmgr_signal(&db_rep->queue_nonempty));
142}
143
144/*
145 * PUBLIC: int __repmgr_queue_size __P((ENV *));
146 *
147 * !!!
148 * Caller must hold repmgr->mutex.
149 */
150int
151__repmgr_queue_size(env)
152	ENV *env;
153{
154	return (env->rep_handle->input_queue->size);
155}
156