1/*
2 * Copyright 2008, Axel D��rfler, axeld@pinc-software.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7#include "AdaptiveBuffering.h"
8
9#include <stdlib.h>
10
11
12//#define TRACE(x...) printf(x)
13#define TRACE(x...) ;
14
15
16AdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize,
17		size_t maxBufferSize, uint32 count)
18	:
19	fWriterThread(-1),
20	fBuffers(NULL),
21	fReadBytes(NULL),
22	fBufferCount(count),
23	fReadIndex(0),
24	fWriteIndex(0),
25	fReadCount(0),
26	fWriteCount(0),
27	fMaxBufferSize(maxBufferSize),
28	fCurrentBufferSize(initialBufferSize),
29	fReadSem(-1),
30	fWriteSem(-1),
31	fFinishedSem(-1),
32	fWriteStatus(B_OK),
33	fWriteTime(0),
34	fFinished(false),
35	fQuit(false)
36{
37}
38
39
40AdaptiveBuffering::~AdaptiveBuffering()
41{
42	_QuitWriter();
43
44	delete_sem(fReadSem);
45	delete_sem(fWriteSem);
46
47	if (fBuffers != NULL) {
48		for (uint32 i = 0; i < fBufferCount; i++) {
49			if (fBuffers[i] == NULL)
50				break;
51
52			free(fBuffers[i]);
53		}
54
55		free(fBuffers);
56	}
57
58	free(fReadBytes);
59}
60
61
62status_t
63AdaptiveBuffering::Init()
64{
65	fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t));
66	if (fReadBytes == NULL)
67		return B_NO_MEMORY;
68
69	fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*));
70	if (fBuffers == NULL)
71		return B_NO_MEMORY;
72
73	for (uint32 i = 0; i < fBufferCount; i++) {
74		fBuffers[i] = (uint8*)malloc(fMaxBufferSize);
75		if (fBuffers[i] == NULL)
76			return B_NO_MEMORY;
77	}
78
79	fReadSem = create_sem(0, "reader");
80	if (fReadSem < B_OK)
81		return fReadSem;
82
83	fWriteSem = create_sem(fBufferCount - 1, "writer");
84	if (fWriteSem < B_OK)
85		return fWriteSem;
86
87	fFinishedSem = create_sem(0, "finished");
88	if (fFinishedSem < B_OK)
89		return fFinishedSem;
90
91	fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY,
92		this);
93	if (fWriterThread < B_OK)
94		return fWriterThread;
95
96	return resume_thread(fWriterThread);
97}
98
99
100status_t
101AdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length)
102{
103	*_length = 0;
104	return B_OK;
105}
106
107
108status_t
109AdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/)
110{
111	return B_OK;
112}
113
114
115status_t
116AdaptiveBuffering::Run()
117{
118	fReadIndex = 0;
119	fWriteIndex = 0;
120	fReadCount = 0;
121	fWriteCount = 0;
122	fWriteStatus = B_OK;
123	fWriteTime = 0;
124
125	while (fWriteStatus >= B_OK) {
126		bigtime_t start = system_time();
127		int32 index = fReadIndex;
128
129		TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index,
130			fCurrentBufferSize);
131
132		fReadBytes[index] = fCurrentBufferSize;
133		status_t status = Read(fBuffers[index], &fReadBytes[index]);
134		if (status < B_OK)
135			return status;
136
137		TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]);
138
139		fReadCount++;
140		fReadIndex = (index + 1) % fBufferCount;
141		if (fReadBytes[index] == 0)
142			fFinished = true;
143		release_sem(fReadSem);
144
145		while (acquire_sem(fWriteSem) == B_INTERRUPTED)
146			;
147
148		if (fFinished)
149			break;
150
151		bigtime_t readTime = system_time() - start;
152		uint32 writeTime = fWriteTime;
153		if (writeTime) {
154			if (writeTime > readTime) {
155				fCurrentBufferSize = fCurrentBufferSize * 8/9;
156				fCurrentBufferSize &= ~65535;
157			} else {
158				fCurrentBufferSize = fCurrentBufferSize * 9/8;
159				fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535;
160
161				if (fCurrentBufferSize > fMaxBufferSize)
162					fCurrentBufferSize = fMaxBufferSize;
163			}
164		}
165	}
166
167	while (acquire_sem(fFinishedSem) == B_INTERRUPTED)
168		;
169
170	return fWriteStatus;
171}
172
173
174void
175AdaptiveBuffering::_QuitWriter()
176{
177	if (fWriterThread >= B_OK) {
178		fQuit = true;
179		release_sem(fReadSem);
180
181		status_t status;
182		wait_for_thread(fWriterThread, &status);
183
184		fWriterThread = -1;
185	}
186}
187
188
189status_t
190AdaptiveBuffering::_Writer()
191{
192	while (true) {
193		while (acquire_sem(fReadSem) == B_INTERRUPTED)
194			;
195		if (fQuit)
196			break;
197
198		bigtime_t start = system_time();
199
200		TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex,
201			fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
202
203		fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
204
205		TRACE("%ld. write done\n", fWriteCount);
206
207		fWriteIndex = (fWriteIndex + 1) % fBufferCount;
208		fWriteTime = uint32(system_time() - start);
209		fWriteCount++;
210
211		release_sem(fWriteSem);
212
213		if (fWriteStatus < B_OK)
214			return fWriteStatus;
215		if (fFinished && fWriteCount == fReadCount)
216			release_sem(fFinishedSem);
217	}
218
219	return B_OK;
220}
221
222
223/*static*/ status_t
224AdaptiveBuffering::_Writer(void* self)
225{
226	return ((AdaptiveBuffering*)self)->_Writer();
227}
228
229