1#include "ConsumerNode.h"
2
3#include <stdlib.h>
4#include <string.h>
5
6#include <Buffer.h>
7#include <TimeSource.h>
8
9#include "misc.h"
10
11
12ConsumerNode::ConsumerNode()
13	:
14	BMediaNode("ConsumerNode"),
15	BBufferConsumer(B_MEDIA_RAW_AUDIO),
16	BMediaEventLooper()
17{
18	out("ConsumerNode::ConsumerNode\n");
19}
20
21
22ConsumerNode::~ConsumerNode()
23{
24	out("ConsumerNode::~ConsumerNode\n");
25	Quit();
26}
27
28
29void
30ConsumerNode::NodeRegistered()
31{
32	out("ConsumerNode::NodeRegistered\n");
33	InitializeInput();
34	SetPriority(108);
35	Run();
36}
37
38
39status_t
40ConsumerNode::AcceptFormat(const media_destination& dest, media_format* format)
41{
42	out("ConsumerNode::AcceptFormat\n");
43
44	if (dest != fInput.destination)
45		return B_MEDIA_BAD_DESTINATION;
46
47	if (format == NULL)
48		return B_BAD_VALUE;
49
50	if (format->type != B_MEDIA_RAW_AUDIO)
51		return B_MEDIA_BAD_FORMAT;
52
53	return B_OK;
54}
55
56
57status_t
58ConsumerNode::GetNextInput(int32* cookie, media_input* _input)
59{
60	out("ConsumerNode::GetNextInput\n");
61
62	if (_input == NULL)
63		return B_BAD_VALUE;
64
65	if (++(*cookie) > 1)
66		return B_BAD_INDEX;
67
68	*_input = fInput;
69	return B_OK;
70}
71
72
73void
74ConsumerNode::DisposeInputCookie(int32 cookie)
75{
76	out("ConsumerNode::DisposeInputCookie\n");
77	return;
78}
79
80
81void
82ConsumerNode::BufferReceived(BBuffer* buffer)
83{
84	out("ConsumerNode::BufferReceived, sheduled time = %5.4f\n",
85		buffer->Header()->start_time / 1E6);
86
87	media_timed_event event(buffer->Header()->start_time,
88		BTimedEventQueue::B_HANDLE_BUFFER, buffer,
89		BTimedEventQueue::B_RECYCLE_BUFFER);
90	EventQueue()->AddEvent(event);
91	return;
92}
93
94
95void
96ConsumerNode::ProducerDataStatus(const media_destination& forWhom, int32 status,
97	bigtime_t atPerformanceTime)
98{
99	out("ConsumerNode::ProducerDataStatus\n");
100
101	if (forWhom == fInput.destination) {
102		media_timed_event event(atPerformanceTime,
103			BTimedEventQueue::B_DATA_STATUS, &fInput,
104			BTimedEventQueue::B_NO_CLEANUP, status, 0, NULL);
105		EventQueue()->AddEvent(event);
106	}
107}
108
109
110status_t
111ConsumerNode::GetLatencyFor(const media_destination& forWhom,
112	bigtime_t* _latency, media_node_id* _timesource)
113{
114	out("ConsumerNode::GetLatencyFor\n");
115	// make sure this is one of my valid inputs
116	if (forWhom != fInput.destination)
117		return B_MEDIA_BAD_DESTINATION;
118
119	*_latency = 23000;
120	*_timesource = TimeSource()->ID();
121	return B_OK;
122}
123
124
125status_t
126ConsumerNode::Connected(const media_source& producer,
127	const media_destination& where, const media_format& withFormat,
128	media_input* _input)
129{
130	out("ConsumerNode::Connected\n");
131	if (where != fInput.destination)
132		return B_MEDIA_BAD_DESTINATION;
133
134	// calculate my latency here, because it may depend on buffer
135	// sizes/durations, then tell the BMediaEventLooper how early
136	// we need to get the buffers
137	SetEventLatency(10 * 1000); // TODO: fix me
138
139	/* reserve the connection */
140	fInput.source = producer;
141	fInput.format = withFormat;
142
143	/* and publish it's name and connection info */
144	*_input = fInput;
145
146#if 0
147	/* create the buffer group */
148	if (mBufferGroup == NULL) {
149		create_own_buffer_group();
150		mBufferGroup = mOwnBufferGroup;
151	}
152
153	/* set the duration of the node's buffers */
154	int32 numBuffers;
155	mBufferGroup->CountBuffers(&numBuffers);
156	SetBufferDuration((1000000LL * numBuffers) / mOutput.format.u.raw_video.field_rate);
157#endif
158
159	return B_OK;
160}
161
162
163void
164ConsumerNode::Disconnected(const media_source& producer,
165	const media_destination& where)
166{
167	out("ConsumerNode::Disconnected\n");
168
169	/* unreserve the connection */
170	InitializeInput();
171
172#if 0
173	/* release buffer group */
174	mBufferGroup = NULL;
175	if (mOwnBufferGroup != NULL) {
176		delete_own_buffer_group();
177	}
178#endif
179
180	return;
181}
182
183
184status_t
185ConsumerNode::FormatChanged(const media_source& producer,
186	const media_destination& consumer, int32 changeTag,
187	const media_format& format)
188{
189	out("ConsumerNode::FormatChanged\n");
190	return B_OK;
191}
192
193
194status_t
195ConsumerNode::SeekTagRequested(const media_destination& destination,
196	bigtime_t targetTime, uint32 flags, media_seek_tag* _seekTag,
197	bigtime_t* _taggedTime, uint32* _flags)
198{
199	out("ConsumerNode::SeekTagRequested\n");
200	return B_OK;
201}
202
203
204BMediaAddOn*
205ConsumerNode::AddOn(int32* internalID) const
206{
207	out("ConsumerNode::AddOn\n");
208	return NULL;
209}
210
211
212void
213ConsumerNode::HandleEvent(const media_timed_event* event, bigtime_t lateness,
214	bool realTimeEvent)
215{
216	switch (event->type) {
217		case BTimedEventQueue::B_HANDLE_BUFFER:
218		{
219			out("ConsumerNode::HandleEvent B_HANDLE_BUFFER\n");
220			BBuffer* buffer = const_cast<BBuffer*>((BBuffer*)event->pointer);
221
222			out("### sheduled time = %5.4f, current time = %5.4f, lateness = "
223				"%5.4f\n", buffer->Header()->start_time / 1E6,
224				TimeSource()->Now() / 1E6,lateness / 1E6);
225
226			snooze((rand() * 100) % 200000);
227
228			if (buffer)
229				buffer->Recycle();
230			break;
231		}
232
233		case BTimedEventQueue::B_PARAMETER:
234			out("ConsumerNode::HandleEvent B_PARAMETER\n");
235			break;
236
237		case BTimedEventQueue::B_START:
238			out("ConsumerNode::HandleEvent B_START\n");
239			break;
240
241		case BTimedEventQueue::B_STOP:
242			out("ConsumerNode::HandleEvent B_STOP\n");
243			// stopping implies not handling any more buffers.  So, we flush
244			// all pending buffers out of the event queue before returning to
245			// the event loop.
246			EventQueue()->FlushEvents(0, BTimedEventQueue::B_ALWAYS, true,
247				BTimedEventQueue::B_HANDLE_BUFFER);
248			break;
249
250		case BTimedEventQueue::B_SEEK:
251			out("ConsumerNode::HandleEvent B_SEEK\n");
252			break;
253
254		case BTimedEventQueue::B_WARP:
255			out("ConsumerNode::HandleEvent B_WARP\n");
256			// similarly, time warps aren't meaningful to the logger, so just
257			// record it and return
258			//mLogger->Log(LOG_WARP_HANDLED, logMsg);
259			break;
260
261		case BTimedEventQueue::B_DATA_STATUS:
262			out("ConsumerNode::HandleEvent B_DATA_STATUS\n");
263			break;
264
265		default:
266			out("ConsumerNode::HandleEvent default\n");
267			break;
268	}
269}
270
271
272status_t
273ConsumerNode::HandleMessage(int32 message, const void* data, size_t size)
274{
275	out("ConsumerNode::HandleMessage %lx\n", message);
276
277	if (BBufferConsumer::HandleMessage(message, data, size) == B_OK
278		|| BMediaEventLooper::HandleMessage(message, data, size) == B_OK)
279		return B_OK;
280
281	return BMediaNode::HandleMessage(message, data, size);
282}
283
284
285void
286ConsumerNode::InitializeInput()
287{
288	out("ConsumerNode::InitializeInput()\n");
289	fInput.source = media_source::null;
290	fInput.destination.port = ControlPort();
291	fInput.destination.id = 0;
292	fInput.node = Node();
293	fInput.format.type = B_MEDIA_RAW_AUDIO;
294	fInput.format.u.raw_audio = media_raw_audio_format::wildcard;
295	fInput.format.u.raw_audio.format = media_raw_audio_format::B_AUDIO_FLOAT;
296	fInput.format.u.raw_audio.channel_count = 1;
297	fInput.format.u.raw_audio.frame_rate = 44100;
298	fInput.format.u.raw_audio.byte_order = B_HOST_IS_BENDIAN
299		? B_MEDIA_BIG_ENDIAN : B_MEDIA_LITTLE_ENDIAN;
300	strcpy(fInput.name, "this way in");
301}
302