1#include "ProducerNode.h"
2
3#include <string.h>
4
5#include <Buffer.h>
6#include <BufferGroup.h>
7#include <MediaNode.h>
8#include <TimeSource.h>
9
10#include "misc.h"
11
12
13#define DELAY 2000000
14
15
16ProducerNode::ProducerNode()
17	:
18	BMediaNode("ProducerNode"),
19	BBufferProducer(B_MEDIA_RAW_AUDIO),
20	BMediaEventLooper(),
21	mBufferGroup(0),
22	mBufferProducerSem(-1),
23	mBufferProducer(-1),
24	mOutputEnabled(false)
25{
26	out("ProducerNode::ProducerNode\n");
27	mBufferGroup = new BBufferGroup(4096,3);
28}
29
30
31ProducerNode::~ProducerNode()
32{
33	out("ProducerNode::~ProducerNode\n");
34	Quit();
35	delete mBufferGroup;
36}
37
38
39void
40ProducerNode::NodeRegistered()
41{
42	out("ProducerNode::NodeRegistered\n");
43	InitializeOutput();
44	SetPriority(108);
45	Run();
46}
47
48
49status_t
50ProducerNode::GetNodeAttributes(
51	media_node_attribute* attributes, size_t count)
52{
53	uint32 what = media_node_attribute::B_FIRST_USER_ATTRIBUTE;
54	for (size_t i = 0; i < count; i++) {
55		attributes[i].what = what;
56		what++;
57	}
58	return B_OK;
59}
60
61
62status_t
63ProducerNode::FormatSuggestionRequested(media_type type, int32 quality,
64	media_format* format)
65{
66	out("ProducerNode::FormatSuggestionRequested\n");
67
68	if (type != B_MEDIA_RAW_AUDIO)
69		return B_MEDIA_BAD_FORMAT;
70
71	format->u.raw_audio = media_raw_audio_format::wildcard;
72	format->u.raw_audio.format = media_raw_audio_format::B_AUDIO_FLOAT;
73	format->u.raw_audio.channel_count = 1;
74	format->u.raw_audio.frame_rate = 44100;
75	format->u.raw_audio.byte_order = (B_HOST_IS_BENDIAN) ? B_MEDIA_BIG_ENDIAN : B_MEDIA_LITTLE_ENDIAN;
76
77	return B_OK;
78}
79
80
81status_t
82ProducerNode::FormatProposal(const media_source& output, media_format* format)
83{
84	out("ProducerNode::FormatProposal\n");
85
86	if (format == NULL)
87		return B_BAD_VALUE;
88
89	if (output != mOutput.source)
90		return B_MEDIA_BAD_SOURCE;
91
92	return B_OK;
93}
94
95
96status_t
97ProducerNode::FormatChangeRequested(const media_source& source,
98	const media_destination& destination, media_format* _format,
99	int32* _deprecated_)
100{
101	out("ProducerNode::FormatChangeRequested\n");
102	return B_ERROR;
103}
104
105
106status_t
107ProducerNode::GetNextOutput(int32* cookie, media_output* _output)
108{
109	out("ProducerNode::GetNextOutput\n");
110	if (++(*cookie) > 1)
111		return B_BAD_INDEX;
112
113	mOutput.node = Node();
114	*_output = mOutput;
115	return B_OK;
116}
117
118
119status_t
120ProducerNode::DisposeOutputCookie(int32 cookie)
121{
122	out("ProducerNode::DisposeOutputCookie\n");
123	return B_OK;
124}
125
126
127/*!	In this function, you should either pass on the group to your upstream guy,
128	or delete your current group and hang on to this group. Deleting the
129	previous group (unless you passed it on with the reclaim flag set to false)
130	is very important, else you will 1) leak memory and 2) block someone who may
131	want to reclaim the buffers living in that group.
132*/
133status_t
134ProducerNode::SetBufferGroup(const media_source& forSource, BBufferGroup* group)
135{
136	out("ProducerNode::SetBufferGroup\n");
137
138	if (forSource != mOutput.source)
139		return B_MEDIA_BAD_SOURCE;
140
141#if 0
142	if (mBufferGroup != NULL && mBufferGroup != mOwnBufferGroup) {
143		// fixme! really delete if it isn't ours ?
144		trace("deleting buffer group!...\n");
145		delete mBufferGroup;
146		trace("done!\n");
147	}
148
149	/* release the previous buffer group */
150	if (mOwnBufferGroup != NULL) {
151		delete_own_buffer_group();
152	}
153
154	mBufferGroup = group;
155
156	/* allocate new buffer group if necessary */
157	if (mBufferGroup == NULL) {
158		create_own_buffer_group();
159		mBufferGroup = mOwnBufferGroup;
160	}
161	return B_OK;
162#endif
163
164	return B_ERROR;
165}
166
167
168status_t
169ProducerNode::VideoClippingChanged(const media_source& forSource,
170	int16 numShorts, int16* clipData, const media_video_display_info& display,
171	int32* _deprecated_)
172{
173	out("ProducerNode::VideoClippingChanged\n");
174	return B_ERROR;
175}
176
177
178status_t
179ProducerNode::GetLatency(bigtime_t* _latency)
180{
181	out("ProducerNode::GetLatency\n");
182	*_latency = 23000;
183	return B_OK;
184}
185
186
187status_t
188ProducerNode::PrepareToConnect(const media_source& what,
189	const media_destination& where, media_format* format, media_source* _source,
190	char* _name)
191{
192	out("ProducerNode::PrepareToConnect\n");
193
194	if (mOutput.source != what)
195		return B_MEDIA_BAD_SOURCE;
196
197	if (mOutput.destination != media_destination::null)
198		return B_MEDIA_ALREADY_CONNECTED;
199
200	if (format == NULL || _source == NULL || _name == NULL)
201		return B_BAD_VALUE;
202
203#if 0
204	ASSERT(mOutputEnabled == false);
205
206	trace("old format:\n");
207	dump_format(format);
208
209	status_t status;
210
211	status = specialize_format_to_inputformat(format);
212	if (status != B_OK)
213		return status;
214
215#endif
216
217
218	*_source = mOutput.source;
219	strcpy(_name, mOutput.name);
220	//mOutput.destination = where; //really now? fixme
221
222	return B_OK;
223}
224
225
226void
227ProducerNode::Connect(status_t error, const media_source& source,
228	const media_destination& destination, const media_format& format,
229	char* name)
230{
231	out("ProducerNode::Connect\n");
232
233	if (error != B_OK) {
234		InitializeOutput();
235		return;
236	}
237/*
238	if (mOutput.destination != destination) { //if connected in PrepareToConnect fixme?
239		trace("error mOutput.destination != destination\n");
240		return;
241	}
242*/
243	mOutput.destination = destination;
244
245	if (mOutput.source != source) {
246		out("error mOutput.source != source\n");
247		return;
248	}
249
250	strcpy(name, mOutput.name);
251
252#if 0
253	trace("format (final and approved):\n");
254	dump_format(&format);
255#endif
256
257	mOutputEnabled = true;
258
259	return;
260}
261
262
263void
264ProducerNode::Disconnect(const media_source& what,
265	const media_destination& where)
266{
267	out("ProducerNode::Disconnect\n");
268	mOutputEnabled = false;
269
270	// unreserve connection
271	InitializeOutput();
272}
273
274
275void
276ProducerNode::LateNoticeReceived(const media_source& what, bigtime_t howMuch,
277	bigtime_t performanceTime)
278{
279	out("ProducerNode::LateNoticeReceived\n");
280	return;
281}
282
283
284void
285ProducerNode::EnableOutput(const media_source& what, bool enabled,
286	int32* _deprecated_)
287{
288	out("ProducerNode::EnableOutput\n");
289	mOutputEnabled = enabled;
290	return;
291}
292
293
294BMediaAddOn*
295ProducerNode::AddOn(int32* internalID) const
296{
297	out("ProducerNode::AddOn\n");
298	return NULL;
299}
300
301
302void
303ProducerNode::HandleEvent(const media_timed_event* event, bigtime_t lateness,
304	bool realTimeEvent)
305{
306	out("ProducerNode::HandleEvent\n");
307	switch (event->type) {
308		case BTimedEventQueue::B_HANDLE_BUFFER:
309			out("B_HANDLE_BUFFER (should not happen)\n");
310			break;
311
312		case BTimedEventQueue::B_PARAMETER:
313			out("B_PARAMETER\n");
314			break;
315
316		case BTimedEventQueue::B_START:
317		{
318			out("B_START\n");
319			if (mBufferProducer != -1) {
320				out("already running\n");
321				break;
322			}
323			mBufferProducerSem = create_sem(0, "producer blocking sem");
324			mBufferProducer = spawn_thread(_bufferproducer, "Buffer Producer",
325				B_NORMAL_PRIORITY, this);
326			resume_thread(mBufferProducer);
327			break;
328		}
329
330		case BTimedEventQueue::B_STOP:
331		{
332			out("B_STOP\n");
333			if (mBufferProducer == -1) {
334				out("not running\n");
335				break;
336			}
337			status_t err;
338			delete_sem(mBufferProducerSem);
339			wait_for_thread(mBufferProducer,&err);
340			mBufferProducer = -1;
341			mBufferProducerSem = -1;
342
343			// stopping implies not handling any more buffers.  So, we flush
344			// all pending buffers out of the event queue before returning to
345			// the event loop.
346			EventQueue()->FlushEvents(0, BTimedEventQueue::B_ALWAYS, true,
347				BTimedEventQueue::B_HANDLE_BUFFER);
348			break;
349		}
350
351		case BTimedEventQueue::B_SEEK:
352			out("B_SEEK\n");
353			break;
354
355		case BTimedEventQueue::B_WARP:
356			out("B_WARP\n");
357			// similarly, time warps aren't meaningful to the logger, so just
358			// record it and return
359			//mLogger->Log(LOG_WARP_HANDLED, logMsg);
360			break;
361
362		case BTimedEventQueue::B_DATA_STATUS:
363			out("B_DATA_STATUS\n");
364			break;
365
366		default:
367			out("default\n");
368			break;
369	}
370}
371
372
373status_t
374ProducerNode::HandleMessage(int32 message,const void *data, size_t size)
375{
376	out("ProducerNode::HandleMessage %lx\n",message);
377	if (B_OK == BBufferProducer::HandleMessage(message,data,size))
378		return B_OK;
379	if (B_OK == BMediaEventLooper::HandleMessage(message,data,size))
380		return B_OK;
381	return BMediaNode::HandleMessage(message,data,size);
382}
383
384
385void
386ProducerNode::AdditionalBufferRequested(const media_source& source,
387	media_buffer_id previousBuffer, bigtime_t previousTime,
388	const media_seek_tag* previousTag)
389{
390	out("ProducerNode::AdditionalBufferRequested\n");
391	release_sem(mBufferProducerSem);
392}
393
394
395void
396ProducerNode::LatencyChanged(const media_source& source,
397	const media_destination& destination, bigtime_t newLatency, uint32 flags)
398{
399	out("ProducerNode::LatencyChanged\n");
400}
401
402
403void
404ProducerNode::InitializeOutput()
405{
406	out("ConsumerNode::InitializeOutput()\n");
407	mOutput.source.port = ControlPort();
408	mOutput.source.id = 0;
409	mOutput.destination = media_destination::null;
410	mOutput.node = Node();
411	mOutput.format.type = B_MEDIA_RAW_AUDIO;
412	mOutput.format.u.raw_audio = media_raw_audio_format::wildcard;
413	mOutput.format.u.raw_audio.format = media_raw_audio_format::B_AUDIO_FLOAT;
414	mOutput.format.u.raw_audio.channel_count = 1;
415	mOutput.format.u.raw_audio.frame_rate = 44100;
416	mOutput.format.u.raw_audio.byte_order = B_HOST_IS_BENDIAN
417		? B_MEDIA_BIG_ENDIAN : B_MEDIA_LITTLE_ENDIAN;
418	strcpy(mOutput.name, "this way out");
419}
420
421
422int32
423ProducerNode::_bufferproducer(void* arg)
424{
425	((ProducerNode*)arg)->BufferProducer();
426	return 0;
427}
428
429
430void
431ProducerNode::BufferProducer()
432{
433	// this thread produces one buffer each two seconds,
434	// and shedules it to be handled one second later than produced
435	// assuming a realtime timesource
436
437	status_t rv;
438	for (;;) {
439		rv = acquire_sem_etc(mBufferProducerSem, 1, B_RELATIVE_TIMEOUT, DELAY);
440		if (rv == B_INTERRUPTED) {
441			continue;
442		} else if (rv == B_OK) {
443			// triggered by AdditionalBufferRequested
444			release_sem(mBufferProducerSem);
445		} else if (rv != B_TIMED_OUT) {
446			// triggered by deleting the semaphore (stop request)
447			break;
448		}
449		if (!mOutputEnabled)
450			continue;
451
452		BBuffer *buffer;
453//		out("ProducerNode: RequestBuffer\n");
454		buffer = mBufferGroup->RequestBuffer(2048);
455		if (!buffer) {
456		}
457		buffer->Header()->start_time = TimeSource()->Now() + DELAY / 2;
458		out("ProducerNode: SendBuffer, sheduled time = %5.4f\n",
459			buffer->Header()->start_time / 1E6);
460		rv = SendBuffer(buffer, mOutput.source, mOutput.destination);
461		if (rv != B_OK) {
462		}
463	}
464}
465