1/*
2 * Copyright 2009, 2017, Haiku, Inc.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Michael Lotz <mmlr@mlotz.ch>
7 */
8
9#include "StreamingRingBuffer.h"
10
11#include <Autolock.h>
12
13#include <stdio.h>
14#include <stdlib.h>
15#include <string.h>
16
17
18#ifdef CLIENT_COMPILE
19#define TRACE_ALWAYS(x...)		printf("StreamingRingBuffer: " x)
20#else
21#define TRACE_ALWAYS(x...)		debug_printf("StreamingRingBuffer: " x)
22#endif
23
24#define TRACE(x...)				/*TRACE_ALWAYS(x)*/
25#define TRACE_ERROR(x...)		TRACE_ALWAYS(x)
26
27
28StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
29	:
30	fReaderWaiting(false),
31	fWriterWaiting(false),
32	fCancelRead(false),
33	fCancelWrite(false),
34	fReaderNotifier(-1),
35	fWriterNotifier(-1),
36	fReaderLocker("StreamingRingBuffer reader"),
37	fWriterLocker("StreamingRingBuffer writer"),
38	fDataLocker("StreamingRingBuffer data"),
39	fBuffer(NULL),
40	fBufferSize(bufferSize),
41	fReadable(0),
42	fReadPosition(0),
43	fWritePosition(0)
44{
45	fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
46	fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
47
48	fBuffer = (uint8 *)malloc(fBufferSize);
49	if (fBuffer == NULL)
50		fBufferSize = 0;
51}
52
53
54StreamingRingBuffer::~StreamingRingBuffer()
55{
56	delete_sem(fReaderNotifier);
57	delete_sem(fWriterNotifier);
58	free(fBuffer);
59}
60
61
62status_t
63StreamingRingBuffer::InitCheck()
64{
65	if (fReaderNotifier < 0)
66		return fReaderNotifier;
67	if (fWriterNotifier < 0)
68		return fWriterNotifier;
69	if (fBuffer == NULL)
70		return B_NO_MEMORY;
71
72	return B_OK;
73}
74
75
76int32
77StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
78{
79	BAutolock readerLock(fReaderLocker);
80	if (!readerLock.IsLocked())
81		return B_ERROR;
82
83	BAutolock dataLock(fDataLocker);
84	if (!dataLock.IsLocked())
85		return B_ERROR;
86
87	int32 readSize = 0;
88	while (length > 0) {
89		size_t copyLength = min_c(length, fBufferSize - fReadPosition);
90		copyLength = min_c(copyLength, fReadable);
91
92		if (copyLength == 0) {
93			if (onlyBlockOnNoData && readSize > 0)
94				return readSize;
95
96			fReaderWaiting = true;
97			dataLock.Unlock();
98
99			status_t result;
100			do {
101				TRACE("waiting in reader\n");
102				result = acquire_sem(fReaderNotifier);
103				TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
104					result);
105			} while (result == B_INTERRUPTED);
106
107			if (result != B_OK)
108				return result;
109
110			if (!dataLock.Lock()) {
111				TRACE_ERROR("failed to acquire data lock\n");
112				return B_ERROR;
113			}
114
115			if (fCancelRead) {
116				TRACE("read canceled\n");
117				fCancelRead = false;
118				return B_CANCELED;
119			}
120
121			continue;
122		}
123
124		// support discarding input
125		if (buffer != NULL) {
126			memcpy(buffer, fBuffer + fReadPosition, copyLength);
127			buffer = (uint8 *)buffer + copyLength;
128		}
129
130		fReadPosition = (fReadPosition + copyLength) % fBufferSize;
131		fReadable -= copyLength;
132		readSize += copyLength;
133		length -= copyLength;
134
135		if (fWriterWaiting) {
136			release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
137			fWriterWaiting = false;
138		}
139	}
140
141	return readSize;
142}
143
144
145status_t
146StreamingRingBuffer::Write(const void *buffer, size_t length)
147{
148	BAutolock writerLock(fWriterLocker);
149	if (!writerLock.IsLocked())
150		return B_ERROR;
151
152	BAutolock dataLock(fDataLocker);
153	if (!dataLock.IsLocked())
154		return B_ERROR;
155
156	while (length > 0) {
157		size_t copyLength = min_c(length, fBufferSize - fWritePosition);
158		copyLength = min_c(copyLength, fBufferSize - fReadable);
159
160		if (copyLength == 0) {
161			fWriterWaiting = true;
162			dataLock.Unlock();
163
164			status_t result;
165			do {
166				TRACE("waiting in writer\n");
167				result = acquire_sem(fWriterNotifier);
168				TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
169					result);
170			} while (result == B_INTERRUPTED);
171
172			if (result != B_OK)
173				return result;
174
175			if (!dataLock.Lock()) {
176				TRACE_ERROR("failed to acquire data lock\n");
177				return B_ERROR;
178			}
179
180			if (fCancelWrite) {
181				TRACE("write canceled\n");
182				fCancelWrite = false;
183				return B_CANCELED;
184			}
185
186			continue;
187		}
188
189		memcpy(fBuffer + fWritePosition, buffer, copyLength);
190		fWritePosition = (fWritePosition + copyLength) % fBufferSize;
191		fReadable += copyLength;
192
193		buffer = (uint8 *)buffer + copyLength;
194		length -= copyLength;
195
196		if (fReaderWaiting) {
197			release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
198			fReaderWaiting = false;
199		}
200	}
201
202	return B_OK;
203}
204
205
206void
207StreamingRingBuffer::MakeEmpty()
208{
209	BAutolock dataLock(fDataLocker);
210	if (!dataLock.IsLocked())
211		return;
212
213	fReadPosition = fWritePosition = 0;
214	fReadable = 0;
215
216	if (fWriterWaiting) {
217		release_sem_etc(fWriterNotifier, 1, 0);
218		fWriterWaiting = false;
219		fCancelWrite = true;
220	}
221
222	if (fReaderWaiting) {
223		release_sem_etc(fReaderNotifier, 1, 0);
224		fReaderWaiting = false;
225		fCancelRead = true;
226	}
227}
228