1/******************************************************************************
2/
3/	File:			SoundConsumer.cpp
4/
5/   Description:	Record sound from some sound-producing Node.
6/
7/	Copyright 1998-1999, Be Incorporated, All Rights Reserved
8/
9******************************************************************************/
10#include "SoundConsumer.h"
11
12#include <new>
13#include <stdio.h>
14
15#include <OS.h>
16#include <scheduler.h>
17#include <Buffer.h>
18#include <TimeSource.h>
19
20#include "AutoDeleter.h"
21
22
23using std::nothrow;
24
25
26namespace BPrivate {
27
28
29//	If we don't mind the format changing to another format while
30//	running, we can define this to 1. Look for the symbol down in the source.
31#define ACCEPT_ANY_FORMAT_CHANGE 0
32
33
34//	Compiling with NDEBUG means "release" -- it also turns off assert() and
35//	other such debugging aids. By contrast, DEBUG turns on extra debugging
36//	in some programs.
37#if !NDEBUG
38#define TRACE_SOUNDCONSUMER
39#endif
40
41//	Comment out the FPRINTF part of these lines to reduce verbiage.
42//	Enabling MESSAGE will kill performance on slower machines, because it
43//	prints for each message received (including each buffer).
44#ifdef TRACE_SOUNDCONSUMER
45#define NODE fprintf
46#define MESSAGE fprintf
47#else
48#define NODE(x...)
49#define MESSAGE(x...)
50#endif
51
52
53//	This structure is the body of a request that we use to
54//	implement SetHooks().
55struct set_hooks_q {
56	port_id reply;
57	void * cookie;
58	SoundProcessFunc process;
59	SoundNotifyFunc notify;
60};
61
62
63//	All incoming buffers and Media Kit requests arrive at a
64//	media node in the form of messages (which are generally
65//	dispatched for you by your superclasses' HandleMessage
66//	implementations). Each message has a 'type' which is
67//	analagous to a BMessage's 'what' field. We'll define our
68//	own private message types for our SoundConsumer and
69//	SoundProducer to use. The BeOS reserves a range,
70//	0x60000000 to 0x7fffffff, for us to use.
71enum {
72	MSG_QUIT_NOW = 0x60000000L,
73	MSG_CHANGE_HOOKS
74};
75
76
77SoundConsumer::SoundConsumer(
78	const char * name,
79	SoundProcessFunc recordFunc,
80	SoundNotifyFunc notifyFunc,
81	void * cookie) :
82	BMediaNode(name ? name : "SoundConsumer"),
83	BBufferConsumer(B_MEDIA_RAW_AUDIO)
84{
85	NODE(stderr, "SoundConsumer::SoundConsumer(%p, %p, %p, %p)\n", name,
86		recordFunc, notifyFunc, cookie);
87
88	if (!name) name = "SoundConsumer";
89
90	//	Set up the hook functions.
91	m_recordHook = recordFunc;
92	m_notifyHook = notifyFunc;
93	m_cookie = cookie;
94
95	//	Create the port that we publish as our Control Port.
96	char pname[32];
97	sprintf(pname, "%.20s Control", name);
98	m_port = create_port(10, pname);
99
100	// Initialize our single media_input. Make sure it knows
101	// the Control Port associated with the destination, and
102	// the index of the destination (since we only have one,
103	// that's trivial).
104	m_input.destination.port = m_port;
105	m_input.destination.id = 1;
106	sprintf(m_input.name, "%.20s Input", name);
107
108	// Set up the timing variables that we'll be using.
109	m_trTimeout = 0LL;
110	m_tpSeekAt = 0;
111	m_tmSeekTo = 0;
112	m_delta = 0;
113	m_seeking = false;
114
115	//	Create, and run, the thread that we use to service
116	// the Control Port.
117	sprintf(pname, "%.20s Service", name);
118	m_thread = spawn_thread(ThreadEntry, pname, 110, this);
119	resume_thread(m_thread);
120}
121
122
123SoundConsumer::~SoundConsumer()
124{
125	NODE(stderr, "SoundConsumer::~SoundConsumer()\n");
126	//	Signal to our thread that it's time to go home.
127	write_port(m_port, MSG_QUIT_NOW, 0, 0);
128	status_t s;
129	while (wait_for_thread(m_thread, &s) == B_INTERRUPTED)
130		NODE(stderr, "wait_for_thread() B_INTERRUPTED\n");
131	delete_port(m_port);
132}
133
134
135status_t
136SoundConsumer::SetHooks(SoundProcessFunc recordFunc, SoundNotifyFunc notifyFunc,
137	void* cookie)
138{
139	//	SetHooks needs to be synchronized with the service thread, else we may
140	//	call the wrong hook function with the wrong cookie, which would be bad.
141	//	Rather than do locking, which is expensive, we streamline the process
142	//	by sending our service thread a request to change the hooks, and waiting
143	//	for the acknowledge.
144	status_t err = B_OK;
145	set_hooks_q cmd;
146	cmd.process = recordFunc;
147	cmd.notify = notifyFunc;
148	cmd.cookie = cookie;
149	//	If we're not in the service thread, we need to round-trip a message.
150	if (find_thread(0) != m_thread) {
151		cmd.reply = create_port(1, "SetHooks reply");
152		//	Send the private message to our service thread.
153		err = write_port(ControlPort(), MSG_CHANGE_HOOKS, &cmd, sizeof(cmd));
154		if (err >= 0) {
155			int32 code;
156			//	Wait for acknowledge from the service thread.
157			err = read_port_etc(cmd.reply, &code, 0, 0, B_TIMEOUT, 6000000LL);
158			if (err > 0) err = 0;
159			NODE(stderr, "SoundConsumer::SetHooks read reply: %#" B_PRIx32
160				"\n", err);
161		}
162		//	Clean up.
163		delete_port(cmd.reply);
164	} else {
165		//	Within the service thread, it's OK to just go ahead and do the
166		//	change.
167		DoHookChange(&cmd);
168	}
169	return err;
170}
171
172
173// #pragma mark -BMediaNode-derived methods
174
175
176port_id
177SoundConsumer::ControlPort() const
178{
179	return m_port;
180}
181
182
183BMediaAddOn*
184SoundConsumer::AddOn(int32 * internal_id) const
185{
186	//	This object is instantiated inside an application.
187	//	Therefore, it has no add-on.
188	if (internal_id) *internal_id = 0;
189	return 0;
190}
191
192
193void
194SoundConsumer::Start(bigtime_t performance_time)
195{
196	//	Since we are a consumer and just blindly accept buffers that are
197	//	thrown at us, we don't need to do anything special in Start()/Stop().
198	//	If we were (also) a producer, we'd have to be more elaborate.
199	//	The only thing we do is immediately perform any queued Seek based on
200	//	the start time, which is the right thing to do (seeing as we were
201	//	Seek()-ed when we weren't started).
202	if (m_seeking) {
203		m_delta = performance_time - m_tmSeekTo;
204		m_seeking = false;
205	}
206	if (m_notifyHook)
207		(*m_notifyHook)(m_cookie, B_WILL_START, performance_time);
208	else
209		Notify(B_WILL_START, performance_time);
210}
211
212
213void
214SoundConsumer::Stop(bigtime_t performance_time, bool immediate)
215{
216	//	Since we are a consumer and just blindly accept buffers that are
217	//	thrown at us, we don't need to do anything special in Start()/Stop().
218	//	If we were (also) a producer, we'd have to be more elaborate.
219	//	Note that this is not strictly in conformance with The Rules,
220	//	but since this is not an add-on Node for use with any application;
221	//	it's a Node over which we have complete control, we can live with
222	//	treating buffers received before the start time or after the stop
223	//	time as any other buffer.
224	if (m_notifyHook)
225		(*m_notifyHook)(m_cookie, B_WILL_STOP, performance_time, immediate);
226	else
227		Notify(B_WILL_STOP, performance_time, immediate);
228}
229
230
231void
232SoundConsumer::Seek(bigtime_t media_time, bigtime_t performance_time)
233{
234	//	Seek() on a consumer just serves to offset the time stamp
235	//	of received buffers passed to our Record hook function.
236	//	In the hook function, you may wish to save those time stamps
237	//	to disk or otherwise store them. You may also want to
238	//	synchronize this node's media time with an upstream
239	//	producer's media time to make this offset meaningful.
240	if (m_notifyHook)
241		(*m_notifyHook)(m_cookie, B_WILL_SEEK, performance_time, media_time);
242	else
243		Notify(B_WILL_SEEK, performance_time, media_time);
244
245	m_tpSeekAt = performance_time;
246	m_tmSeekTo = media_time;
247	m_seeking = true;
248}
249
250
251void
252SoundConsumer::SetRunMode(run_mode mode)
253{
254	if (mode == BMediaNode::B_OFFLINE) {
255		// BMediaNode::B_OFFLINE means we don't need to run in
256		// real time. So, we shouldn't run as a real time
257		// thread.
258		int32 new_prio = suggest_thread_priority(B_OFFLINE_PROCESSING);
259		set_thread_priority(m_thread, new_prio);
260	} else {
261		//	We're running in real time, so we'd better have
262		//	a big enough thread priority to handle it!
263		//	Here's where those magic scheduler values
264		//	come from:
265		//
266		//	* In the worst case, we process one buffer per
267		//	  reschedule (we get rescheduled when we go to
268		//	  look for a message on our Control Port), so
269		//	  in order to keep up with the incoming buffers,
270		//	  the duration of one buffer becomes our
271		//	  scheduling period. If we don't know anything
272		//	  about the buffers, we pick a reasonable
273		//	  default.
274		//	* We're a simple consumer, so we don't have to
275		//	  be too picky about the jitter. Half a period
276		//	  of jitter means that we'd have to get two
277		//	  consecutive worst-case reschedules before
278		//	  we'd fall behind.
279		//	* The amount of time we spend processing is
280		//	  our ProcessingLatency().
281		bigtime_t period = 10000;
282		if (buffer_duration(m_input.format.u.raw_audio) > 0)
283			period = buffer_duration(m_input.format.u.raw_audio);
284
285		//	assuming we're running for 500 us or less per buffer
286		int32 new_prio = suggest_thread_priority(B_AUDIO_RECORDING,
287			period, period / 2, ProcessingLatency());
288		set_thread_priority(m_thread, new_prio);
289	}
290}
291
292
293void
294SoundConsumer::TimeWarp(bigtime_t at_real_time, bigtime_t to_performance_time)
295{
296	//	Since buffers will come pre-time-stamped, we only need to look
297	//	at them, so we can ignore the time warp as a consumer.
298	if (m_notifyHook) {
299		(*m_notifyHook)(m_cookie, B_WILL_TIMEWARP, at_real_time,
300			to_performance_time);
301	} else
302		Notify(B_WILL_TIMEWARP, at_real_time, to_performance_time);
303}
304
305
306void
307SoundConsumer::Preroll()
308{
309	//	There is nothing for us to do in Preroll()
310}
311
312
313void
314SoundConsumer::SetTimeSource(BTimeSource* /* time_source */)
315{
316	//	We don't need to do anything special to take note of the
317	//	fact that the time source changed, because we get our timing
318	//	information from the buffers we receive.
319}
320
321
322status_t
323SoundConsumer::HandleMessage(int32 message, const void* data, size_t size)
324{
325	// Check with each of our superclasses to see if they
326	// understand the message. If none of them do, call
327	// BMediaNode::HandleBadMessage().
328	if (BBufferConsumer::HandleMessage(message, data, size) < 0
329		&& BMediaNode::HandleMessage(message, data, size) < 0) {
330		HandleBadMessage(message, data, size);
331		return B_ERROR;
332	}
333	return B_OK;
334}
335
336
337// #pragma mark - BBufferConsumer-derived methods
338
339
340status_t
341SoundConsumer::AcceptFormat(const media_destination& dest, media_format* format)
342{
343	//	We only accept formats aimed at our single input.
344	if (dest != m_input.destination)
345		return B_MEDIA_BAD_DESTINATION;
346
347	if (format->type <= 0) {
348		//	If no format is specified, we say we want raw audio.
349		format->type = B_MEDIA_RAW_AUDIO;
350		format->u.raw_audio = media_raw_audio_format::wildcard;
351	} else if (format->type != B_MEDIA_RAW_AUDIO) {
352		//	If a non-raw-audio format is specified, we tell the world what
353		//	we want, and that the specified format was unacceptable to us.
354		format->type = B_MEDIA_RAW_AUDIO;
355		format->u.raw_audio = media_raw_audio_format::wildcard;
356		return B_MEDIA_BAD_FORMAT;
357	}
358#if !ACCEPT_ANY_FORMAT_CHANGE
359	//	If we're already connected, and this format doesn't go with the
360	//	format in effect, we dont' accept this new format.
361	if (!format_is_compatible(*format, m_input.format)) {
362		*format = m_input.format;
363		return B_MEDIA_BAD_FORMAT;
364	}
365#endif
366	//	I guess we're OK by now, because we have no particular needs as
367	//	far as frame rate, sample format, etc go.
368	return B_OK;
369}
370
371
372status_t
373SoundConsumer::GetNextInput(int32* cookie, media_input* out_input)
374{
375	NODE(stderr, "SoundConsumer: GetNextInput()\n");
376	//	The "next" is kind of misleading, since it's also used for
377	//	getting the first (and only) input.
378	if (*cookie == 0) {
379		if (m_input.source == media_source::null) {
380			//	If there's no current connection, make sure we return a
381			//	reasonable format telling the world we accept any raw audio.
382			m_input.format.type = B_MEDIA_RAW_AUDIO;
383			m_input.format.u.raw_audio = media_raw_audio_format::wildcard;
384			m_input.node = Node();
385			m_input.destination.port = ControlPort();
386			m_input.destination.id = 1;
387		}
388		*out_input = m_input;
389		*cookie = 1;
390		return B_OK;
391	}
392	//	There's only one input.
393	return B_BAD_INDEX;
394}
395
396
397void
398SoundConsumer::DisposeInputCookie(int32 /* cookie */)
399{
400	//	We didn't allocate any memory or set any state in GetNextInput()
401	//	so this function is a no-op.
402}
403
404
405void
406SoundConsumer::BufferReceived(BBuffer* buffer)
407{
408	NODE(stderr, "SoundConsumer::BufferReceived()\n");
409	//	Whee, a buffer! Update the seek info, if necessary.
410	if (m_seeking && buffer->Header()->start_time >= m_tpSeekAt) {
411		m_delta = m_tpSeekAt - m_tmSeekTo;
412		m_seeking = false;
413	}
414	//	If there is a record hook, let the interested party have at it!
415	if (m_recordHook) {
416		(*m_recordHook)(m_cookie, buffer->Header()->start_time-m_delta,
417			buffer->Data(), buffer->Header()->size_used,
418			m_input.format.u.raw_audio);
419	} else {
420		Record(buffer->Header()->start_time-m_delta, buffer->Data(),
421			buffer->Header()->size_used, m_input.format.u.raw_audio);
422	}
423	//	Buffers should ALWAYS be recycled, else whomever is producing them
424	//	will starve.
425	buffer->Recycle();
426}
427
428
429void
430SoundConsumer::ProducerDataStatus(const media_destination& for_whom,
431	int32 status, bigtime_t at_media_time)
432{
433	if (for_whom != m_input.destination)
434		return;
435
436	//	Tell whomever is interested that the upstream producer will or won't
437	//	send more data in the immediate future.
438	if (m_notifyHook) {
439		(*m_notifyHook)(m_cookie, B_PRODUCER_DATA_STATUS, status,
440			at_media_time);
441	} else
442		Notify(B_PRODUCER_DATA_STATUS, status, at_media_time);
443}
444
445
446status_t
447SoundConsumer::GetLatencyFor(const media_destination& for_whom,
448	bigtime_t* out_latency, media_node_id* out_timesource)
449{
450	//	We only accept requests for the one-and-only input of our Node.
451	if (for_whom != m_input.destination)
452		return B_MEDIA_BAD_DESTINATION;
453
454	//	Tell the world about our latency information (overridable by user).
455	*out_latency = TotalLatency();
456	*out_timesource = TimeSource()->Node().node;
457	return B_OK;
458}
459
460
461status_t
462SoundConsumer::Connected(const media_source& producer,
463	const media_destination& where, const media_format& with_format,
464	media_input* out_input)
465{
466	NODE(stderr, "SoundConsumer::Connected()\n");
467	//	Only accept connection requests when we're not already connected.
468	if (m_input.source != media_source::null)
469		return B_MEDIA_BAD_DESTINATION;
470
471	//	Only accept connection requests on the one-and-only available input.
472	if (where != m_input.destination)
473		return B_MEDIA_BAD_DESTINATION;
474
475	//	Other than that, we accept pretty much anything. The format has been
476	//	pre-cleared through AcceptFormat(), and we accept any format anyway.
477	m_input.source = producer;
478	m_input.format = with_format;
479	//	Tell whomever is interested that there's now a connection.
480	if (m_notifyHook)
481		(*m_notifyHook)(m_cookie, B_CONNECTED, m_input.name);
482	else
483		Notify(B_CONNECTED, m_input.name);
484
485	//	This is the most important line -- return our connection information
486	//	to the world so it can use it!
487	*out_input = m_input;
488	return B_OK;
489}
490
491
492void
493SoundConsumer::Disconnected(const media_source& producer,
494	const media_destination& where)
495{
496	//	We can't disconnect something which isn't us.
497	if (where != m_input.destination)
498		return;
499	//	We can't disconnect from someone who isn't connected to us.
500	if (producer != m_input.source)
501		return;
502	//	Tell the interested party that it's time to leave.
503	if (m_notifyHook)
504		(*m_notifyHook)(m_cookie, B_DISCONNECTED);
505	else
506		Notify(B_DISCONNECTED);
507	//	Mark ourselves as not-connected.
508	m_input.source = media_source::null;
509}
510
511
512status_t
513SoundConsumer::FormatChanged(const media_source& producer,
514	const media_destination& consumer, int32 from_change_count,
515	const media_format& format)
516{
517	NODE(stderr, "SoundConsumer::Connected()\n");
518	//	The up-stream guy feels like changing the format. If we can accept
519	//	arbitrary format changes, we just say "OK". If, however, we're recording
520	//	to a file, that's not such a good idea; we only accept format changes
521	//	that are compatible with the format we're already using. You set this
522	//	behaviour at compile time by defining ACCEPT_ANY_FORMAT_CHANGE to 1 or
523	//	0.
524	status_t err = B_OK;
525#if ACCEPT_ANY_FORMAT_CHANGE
526	media_format fmt(format);
527	err = AcceptFormat(m_input.destination, &fmt);
528#else
529	if (m_input.source != media_source::null) {
530		err = format_is_compatible(format, m_input.format) ? B_OK
531			: B_MEDIA_BAD_FORMAT;
532	}
533#endif
534	if (err >= B_OK) {
535		m_input.format = format;
536		if (m_notifyHook) {
537			(*m_notifyHook)(m_cookie, B_FORMAT_CHANGED,
538				&m_input.format.u.raw_audio);
539		} else
540			Notify(B_FORMAT_CHANGED, &m_input.format.u.raw_audio);
541	}
542	return err;
543}
544
545
546void
547SoundConsumer::DoHookChange(void* msg)
548{
549	//	Tell the old guy we're changing the hooks ...
550	if (m_notifyHook)
551		(*m_notifyHook)(m_cookie, B_HOOKS_CHANGED);
552	else
553		Notify(B_HOOKS_CHANGED);
554
555	//	... and then do it.
556	set_hooks_q * ptr = (set_hooks_q *)msg;
557	m_recordHook = ptr->process;
558	m_notifyHook = ptr->notify;
559	m_cookie = ptr->cookie;
560}
561
562
563status_t
564SoundConsumer::ThreadEntry(void* cookie)
565{
566	SoundConsumer* consumer = (SoundConsumer*)cookie;
567	consumer->ServiceThread();
568	return 0;
569}
570
571
572void
573SoundConsumer::ServiceThread()
574{
575	//	The Big Bad ServiceThread receives messages aimed at this
576	//	Node and dispatches them (typically to HandleMessage()).
577	//	If we were a Producer, we might have to do finicky timing and
578	//	queued Start()/Stop() processing in here. But we ain't.
579
580	//	A media kit message will never be bigger than B_MEDIA_MESSAGE_SIZE.
581	//	Avoid wasing stack space by dynamically allocating at start.
582	char* msg = new (nothrow) char[B_MEDIA_MESSAGE_SIZE];
583	if (msg == NULL)
584		return;
585	//	Make sure we clean up this data when we exit the function.
586	ArrayDeleter<char> _(msg);
587	int bad = 0;
588	while (true) {
589		//	Call read_port_etc() with a timeout derived from a virtual function,
590		//	to allow clients to do special processing if necessary.
591		bigtime_t timeout = Timeout();
592		int32 code = 0;
593		status_t err = read_port_etc(m_port, &code, msg, B_MEDIA_MESSAGE_SIZE,
594			B_TIMEOUT, timeout);
595		MESSAGE(stderr, "SoundConsumer::ServiceThread() port %" B_PRId32
596			" message %#" B_PRIx32 "\n", m_port, code);
597		//	If we received a message, err will be the size of the message
598		//	(including 0).
599		if (err >= B_OK) {
600			//	Real messages reset the timeout time.
601			m_trTimeout = 0;
602			bad = 0;
603			if (code == MSG_QUIT_NOW) {
604				//	Check for our private stop message.
605				if (m_notifyHook)
606					(*m_notifyHook)(m_cookie, B_NODE_DIES, 0);
607				else
608					Notify(B_NODE_DIES, 0);
609				break;
610			} else if (code == MSG_CHANGE_HOOKS) {
611			//	Else check for our private change-hooks message.
612				DoHookChange(msg);
613				//	Write acknowledge to waiting thread.
614				write_port(((set_hooks_q *)msg)->reply, 0, 0, 0);
615			} else {
616				//	Else it has to be a regular media kit message;
617				//	go ahead and dispatch it.
618				HandleMessage(code, msg, err);
619			}
620		} else if (err == B_TIMED_OUT) {
621			//	Timing out means that there was no buffer. Tell the interested
622			//	party.
623			if (m_notifyHook)
624				(*m_notifyHook)(m_cookie, B_OP_TIMED_OUT, timeout);
625			else
626				Notify(B_OP_TIMED_OUT, timeout);
627		} else {
628			//	Other errors are bad.
629			MESSAGE(stderr, "SoundConsumer: error %#" B_PRIx32 "\n", err);
630			bad++;
631			//	If we receive three bad reads with no good messages inbetween,
632			//	things are probably not going to improve (like the port
633			//	disappeared or something) so we call it a day.
634			if (bad > 3) {
635				if (m_notifyHook)
636					(*m_notifyHook)(m_cookie, B_NODE_DIES, bad, err, code, msg);
637				else
638					Notify(B_NODE_DIES, bad, err, code, msg);
639				break;
640			}
641		}
642	}
643}
644
645
646bigtime_t
647SoundConsumer::Timeout()
648{
649	//	Timeout() is called for each call to read_port_etc() in the service
650	//	thread to figure out a reasonable time-out value. The default behaviour
651	//	we've picked is to exponentially back off from one second and upwards.
652	//	While it's true that 44 back-offs will run us out of precision in a
653	//	bigtime_t, the time to actually reach 44 consecutive back-offs is longer
654	//	than the expected market longevity of just about any piece of real
655	//	estate. Is that the sound of an impending year-fifteen-million software
656	//	problem? :-)
657	m_trTimeout = (m_trTimeout < 1000000) ? 1000000 : m_trTimeout*2;
658	return m_trTimeout;
659}
660
661
662bigtime_t
663SoundConsumer::ProcessingLatency()
664{
665	//	We're saying it takes us 500 us to process each buffer. If all we do is
666	//	copy the data, it probably takes much less than that, but it doesn't
667	//	hurt to be slightly conservative.
668	return 500LL;
669}
670
671
672bigtime_t
673SoundConsumer::TotalLatency()
674{
675	//	Had we been a producer that passes buffers on, we'd have to
676	//	include downstream latency in this value. But we are not.
677	return ProcessingLatency();
678}
679
680
681// #pragma mark -
682
683
684void
685SoundConsumer::Record(bigtime_t /*time*/, const void* /*data*/,
686	size_t /*size*/, const media_raw_audio_format& /*format*/)
687{
688	//	If there is no record hook installed, we instead call this function
689	//	for received buffers.
690}
691
692
693void
694SoundConsumer::Notify(int32 /*cause*/, ...)
695{
696	//	If there is no notification hook installed, we instead call this
697	//	function for giving notification of various events.
698}
699
700}
701