1/*
2 * Copyright (c) 2004-2007 Marcus Overhagen <marcus@overhagen.de>
3 *
4 * Permission is hereby granted, free of charge, to any person
5 * obtaining a copy of this software and associated documentation
6 * files (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify,
8 * merge, publish, distribute, sublicense, and/or sell copies of
9 * the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
17 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
19 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
22 * OTHER DEALINGS IN THE SOFTWARE.
23 */
24
25#include <stdio.h>
26#include <OS.h>
27
28#include "Packet.h"
29#include "PacketQueue.h"
30
31#define TRACE_PACKET_QUEUE
32#ifdef TRACE_PACKET_QUEUE
33  #define TRACE printf
34#else
35  #define TRACE(a...)
36#endif
37
38
39PacketQueue::PacketQueue(int max_packets)
40 :	fQueue(new Packet* [max_packets])
41 ,	fSem(create_sem(0, "packet queue sem"))
42 ,	fTerminate(false)
43 ,	fWriteIndex(0)
44 ,	fReadIndex(0)
45 ,	fMaxPackets(max_packets)
46 ,	fPacketCount(0)
47{
48}
49
50
51PacketQueue::~PacketQueue()
52{
53	Empty();
54	delete_sem(fSem);
55	delete [] fQueue;
56}
57
58
59void
60PacketQueue::Empty()
61{
62	while (fPacketCount--) {
63		delete fQueue[fReadIndex];
64		fReadIndex = (fReadIndex + 1) % fMaxPackets;
65	}
66}
67
68
69status_t
70PacketQueue::Insert(Packet *packet)
71{
72	if (fTerminate) {
73		return B_NOT_ALLOWED;
74	}
75	if (atomic_add(&fPacketCount, 1) == fMaxPackets) {
76		atomic_add(&fPacketCount, -1);
77		return B_WOULD_BLOCK;
78	}
79	fQueue[fWriteIndex] = packet;
80	fWriteIndex = (fWriteIndex + 1) % fMaxPackets;
81	release_sem(fSem);
82	return B_OK;
83}
84
85
86status_t
87PacketQueue::Remove(Packet **packet)
88{
89	if (fTerminate) {
90		return B_NOT_ALLOWED;
91	}
92	if (acquire_sem(fSem) != B_OK)
93		return B_ERROR;
94	if (fTerminate) {
95		return B_NOT_ALLOWED;
96	}
97	*packet = fQueue[fReadIndex];
98	atomic_add(&fPacketCount, -1);
99	fReadIndex = (fReadIndex + 1) % fMaxPackets;
100	return B_OK;
101}
102
103
104void
105PacketQueue::Flush(bigtime_t timeout)
106{
107	if (fTerminate) {
108		return;
109	}
110
111	timeout += system_time();
112
113	while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) {
114		if (fTerminate) {
115			return;
116		}
117		Packet *packet = fQueue[fReadIndex];
118		atomic_add(&fPacketCount, -1);
119		fReadIndex = (fReadIndex + 1) % fMaxPackets;
120		delete packet;
121	}
122}
123
124
125// peeks into queue and delivers a copy
126status_t
127PacketQueue::Peek(Packet **packet)
128{
129	if (fTerminate) {
130		return B_NOT_ALLOWED;
131	}
132	if (acquire_sem(fSem) != B_OK)
133		return B_ERROR;
134	if (fTerminate) {
135		return B_NOT_ALLOWED;
136	}
137	*packet = new Packet(*fQueue[fReadIndex]);
138	release_sem(fSem);
139	return B_OK;
140}
141
142
143void
144PacketQueue::Terminate()
145{
146	fTerminate = true;
147	release_sem(fSem);
148}
149
150
151void
152PacketQueue::Restart()
153{
154	Empty();
155
156	delete_sem(fSem);
157	fSem = create_sem(0, "packet queue sem");
158	fTerminate = false;
159	fWriteIndex = 0;
160	fReadIndex = 0;
161	fPacketCount = 0;
162}
163