1#include "StreamSource.h"
2#include <string>
3#include "misc.h"
4
5using namespace std;
6
7CFStringRef gStreamSourceName = CFSTR("StreamSource");
8
9const CFIndex kMaximumSize = 2048;
10
11StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name)
12	: Source(gStreamSourceName, transform, name),
13	mReadStream(input),
14	mReading(dispatch_group_create())
15{
16	dispatch_group_enter(mReading);
17	CFRetain(mReadStream);
18}
19
20void StreamSource::BackgroundActivate()
21{
22	CFIndex result = 0;
23
24	do
25	{
26		// make a buffer big enough to handle the object
27		// NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin.   At least for 2K chunks.   Retest if changing the size.
28		UInt8 buffer[kMaximumSize];
29
30		result = CFReadStreamRead(mReadStream, buffer, kMaximumSize);
31
32		if (result > 0) // was data returned?
33		{
34			// make the data and send it to the transform
35			CFDataRef data = CFDataCreate(NULL, buffer, result);
36
37			CFErrorRef error = mDestination->SetAttribute(mDestinationName, data);
38
39			CFRelease(data);
40
41			if (error != NULL) // we have a problem, there was probably an abort on the chain
42			{
43				return; // quiesce the source
44			}
45		}
46	} while (result > 0);
47
48	if (result < 0)
49	{
50		// we got an error!
51		CFErrorRef error = CFReadStreamCopyError(mReadStream);
52		mDestination->SetAttribute(mDestinationName, error);
53		if (error)
54		{
55			// NOTE: CF doesn't always tell us about this error.   Arguably it could be better to
56			// "invent" a generic error, but it is a hard argument that we want to crash in CFRelease(NULL)...
57			CFRelease(error);
58		}
59	}
60	else
61	{
62		// send an EOS
63		mDestination->SetAttribute(mDestinationName, NULL); // end of stream
64	}
65}
66
67void StreamSource::DoActivate()
68{
69	CFRetain(mDestination->GetCFObject());
70	dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
71		this->BackgroundActivate();
72		CFRelease(mDestination->GetCFObject());
73	});
74	dispatch_group_leave(mReading);
75}
76
77void StreamSource::Finalize()
78{
79	dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
80		delete this;
81	});
82}
83
84StreamSource::~StreamSource()
85{
86	CFRelease(mReadStream);
87	mReadStream = NULL;
88	dispatch_release(mReading);
89	mReading = NULL;
90}
91
92
93Boolean StreamSource::Equal(const CoreFoundationObject* object)
94{
95	// not equal if we are not the same object
96	if (Source::Equal(object))
97	{
98		const StreamSource* ss = (StreamSource*) object;
99		return CFEqual(ss->mReadStream, mReadStream);
100	}
101
102	return false;
103}
104
105
106
107CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name)
108{
109	return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name));
110}
111
112
113
114string StreamSource::DebugDescription()
115{
116	string result = Source::DebugDescription() + ": Stream ";
117
118	char buffer[256];
119	snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream);
120
121	result += buffer;
122
123	return result;
124}
125