1#include "StreamSource.h" 2#include <string> 3#include "misc.h" 4 5using namespace std; 6 7CFStringRef gStreamSourceName = CFSTR("StreamSource"); 8 9const CFIndex kMaximumSize = 2048; 10 11StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name) 12 : Source(gStreamSourceName, transform, name), 13 mReadStream(input), 14 mReading(dispatch_group_create()) 15{ 16 dispatch_group_enter(mReading); 17 CFRetain(mReadStream); 18} 19 20void StreamSource::BackgroundActivate() 21{ 22 CFIndex result = 0; 23 24 do 25 { 26 // make a buffer big enough to handle the object 27 // NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin. At least for 2K chunks. Retest if changing the size. 28 UInt8 buffer[kMaximumSize]; 29 30 result = CFReadStreamRead(mReadStream, buffer, kMaximumSize); 31 32 if (result > 0) // was data returned? 33 { 34 // make the data and send it to the transform 35 CFDataRef data = CFDataCreate(NULL, buffer, result); 36 37 CFErrorRef error = mDestination->SetAttribute(mDestinationName, data); 38 39 CFRelease(data); 40 41 if (error != NULL) // we have a problem, there was probably an abort on the chain 42 { 43 return; // quiesce the source 44 } 45 } 46 } while (result > 0); 47 48 if (result < 0) 49 { 50 // we got an error! 51 CFErrorRef error = CFReadStreamCopyError(mReadStream); 52 mDestination->SetAttribute(mDestinationName, error); 53 if (error) 54 { 55 // NOTE: CF doesn't always tell us about this error. Arguably it could be better to 56 // "invent" a generic error, but it is a hard argument that we want to crash in CFRelease(NULL)... 57 CFRelease(error); 58 } 59 } 60 else 61 { 62 // send an EOS 63 mDestination->SetAttribute(mDestinationName, NULL); // end of stream 64 } 65} 66 67void StreamSource::DoActivate() 68{ 69 CFRetain(mDestination->GetCFObject()); 70 dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ 71 this->BackgroundActivate(); 72 CFRelease(mDestination->GetCFObject()); 73 }); 74 dispatch_group_leave(mReading); 75} 76 77void StreamSource::Finalize() 78{ 79 dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{ 80 delete this; 81 }); 82} 83 84StreamSource::~StreamSource() 85{ 86 CFRelease(mReadStream); 87 mReadStream = NULL; 88 dispatch_release(mReading); 89 mReading = NULL; 90} 91 92 93Boolean StreamSource::Equal(const CoreFoundationObject* object) 94{ 95 // not equal if we are not the same object 96 if (Source::Equal(object)) 97 { 98 const StreamSource* ss = (StreamSource*) object; 99 return CFEqual(ss->mReadStream, mReadStream); 100 } 101 102 return false; 103} 104 105 106 107CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name) 108{ 109 return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name)); 110} 111 112 113 114string StreamSource::DebugDescription() 115{ 116 string result = Source::DebugDescription() + ": Stream "; 117 118 char buffer[256]; 119 snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream); 120 121 result += buffer; 122 123 return result; 124} 125