1// BlockingQueue.h
2//
3// Copyright (c) 2004, Ingo Weinhold (bonefish@cs.tu-berlin.de)
4//
5// Permission is hereby granted, free of charge, to any person obtaining a
6// copy of this software and associated documentation files (the "Software"),
7// to deal in the Software without restriction, including without limitation
8// the rights to use, copy, modify, merge, publish, distribute, sublicense,
9// and/or sell copies of the Software, and to permit persons to whom the
10// Software is furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in
13// all copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21// DEALINGS IN THE SOFTWARE.
22//
23// Except as contained in this notice, the name of a copyright holder shall
24// not be used in advertising or otherwise to promote the sale, use or other
25// dealings in this Software without prior written authorization of the
26// copyright holder.
27
28#ifndef BLOCKING_QUEUE_H
29#define BLOCKING_QUEUE_H
30
31#include <vector>
32
33#include <Locker.h>
34
35#include "AutoLocker.h"
36
37using std::vector;
38
39typedef BLocker Locker;
40
41template<typename Element>
42class BlockingQueue : public Locker {
43public:
44								BlockingQueue(const char* name = NULL);
45								~BlockingQueue();
46
47			status_t			InitCheck() const;
48
49			status_t			Close(bool deleteElements,
50									const vector<Element*>** elements = NULL);
51
52			status_t			Push(Element* element);
53			status_t			Pop(Element** element,
54									bigtime_t timeout = B_INFINITE_TIMEOUT);
55			status_t			Peek(Element** element);
56			status_t			Remove(Element* element);
57
58			int32				Size();
59
60private:
61			vector<Element*>	fElements;
62			sem_id				fElementSemaphore;
63};
64
65// constructor
66template<typename Element>
67BlockingQueue<Element>::BlockingQueue(const char* name)
68	: fElements(),
69	  fElementSemaphore(-1)
70{
71	fElementSemaphore = create_sem(0, (name ? name : "blocking queue"));
72}
73
74// destructor
75template<typename Element>
76BlockingQueue<Element>::~BlockingQueue()
77{
78	if (fElementSemaphore >= 0)
79		delete_sem(fElementSemaphore);
80}
81
82// InitCheck
83template<typename Element>
84status_t
85BlockingQueue<Element>::InitCheck() const
86{
87	return (fElementSemaphore < 0 ? fElementSemaphore : B_OK);
88}
89
90// Close
91template<typename Element>
92status_t
93BlockingQueue<Element>::Close(bool deleteElements,
94	const vector<Element*>** elements)
95{
96	AutoLocker<Locker> _(this);
97	status_t error = delete_sem(fElementSemaphore);
98	if (error != B_OK)
99		return error;
100	fElementSemaphore = -1;
101	if (elements)
102		*elements = &fElements;
103	if (deleteElements) {
104		int32 count = fElements.size();
105		for (int32 i = 0; i < count; i++)
106			delete fElements[i];
107	}
108	return error;
109}
110
111// Push
112template<typename Element>
113status_t
114BlockingQueue<Element>::Push(Element* element)
115{
116	AutoLocker<Locker> _(this);
117	if (fElementSemaphore < 0)
118		return B_NO_INIT;
119	try {
120		fElements.push_back(element);
121	} catch (std::bad_alloc) {
122		return B_NO_MEMORY;
123	}
124	status_t error = release_sem(fElementSemaphore);
125	if (error != B_OK)
126		fElements.erase(fElements.begin() + fElements.size() - 1);
127	return error;
128}
129
130// Pop
131template<typename Element>
132status_t
133BlockingQueue<Element>::Pop(Element** element, bigtime_t timeout)
134{
135	status_t error = acquire_sem_etc(fElementSemaphore, 1, B_RELATIVE_TIMEOUT,
136		timeout);
137	if (error != B_OK)
138		return error;
139	AutoLocker<Locker> _(this);
140	if (fElementSemaphore < 0)
141		return B_NO_INIT;
142	int32 count = fElements.size();
143	if (count == 0)
144		return B_ERROR;
145	*element = fElements[0];
146	fElements.erase(fElements.begin());
147	return B_OK;
148}
149
150// Peek
151template<typename Element>
152status_t
153BlockingQueue<Element>::Peek(Element** element)
154{
155	AutoLocker<Locker> _(this);
156	if (fElementSemaphore < 0)
157		return B_NO_INIT;
158	int32 count = fElements.size();
159	if (count == 0)
160		return B_ENTRY_NOT_FOUND;
161	*element = fElements[0];
162	return B_OK;
163}
164
165// Remove
166template<typename Element>
167status_t
168BlockingQueue<Element>::Remove(Element* element)
169{
170	status_t error = acquire_sem_etc(fElementSemaphore, 1,
171		B_RELATIVE_TIMEOUT, 0);
172	if (error != B_OK)
173		return error;
174	AutoLocker<Locker> _(this);
175	if (fElementSemaphore < 0)
176		return B_NO_INIT;
177
178	int32 count = 0;
179	for (int32 i = fElements.size() - 1; i >= 0; i--) {
180		if (fElements[i] == element) {
181			fElements.erase(fElements.begin() + i);
182			count++;
183		}
184	}
185	if (count == 0) {
186		release_sem(fElementSemaphore);
187		return B_ENTRY_NOT_FOUND;
188	}
189#if 0
190	if (count > 1) {
191		ERROR(("ERROR: BlockingQueue::Remove(): Removed %ld elements!\n",
192			count));
193	}
194#endif
195	return error;
196}
197
198// Size
199template<typename Element>
200int32
201BlockingQueue<Element>::Size()
202{
203	AutoLocker<Locker> _(this);
204	return (fElements.size());
205}
206
207#endif	// BLOCKING_QUEUE_H
208