1/****************************************************************************** 2 3 File: BufferStream.h 4 5 Copyright 1995-97, Be Incorporated 6 7******************************************************************************/ 8#ifndef _BUFFER_STREAM_H 9#define _BUFFER_STREAM_H 10 11#include <stdlib.h> 12#include <OS.h> 13#include <SupportDefs.h> 14#include <Locker.h> 15#include <Messenger.h> 16 17 18class BSubscriber; 19 20 21/* ================ 22 Per-subscriber information. 23 ================ */ 24 25struct _sbuf_info; 26 27typedef struct _sub_info { 28 _sub_info *fNext; /* next subscriber in the stream*/ 29 _sub_info *fPrev; /* previous subscriber in the stream */ 30 _sbuf_info *fRel; /* next buf to be released */ 31 _sbuf_info *fAcq; /* next buf to be acquired */ 32 sem_id fSem; /* semaphore used for blocking */ 33 bigtime_t fTotalTime; /* accumulated time between acq/rel */ 34 int32 fHeld; /* # of buffers acq'd but not yet rel'd */ 35 sem_id fBlockedOn; /* the semaphore being waited on */ 36 /* or B_BAD_SEM_ID if not blocked */ 37} *subscriber_id; 38 39 40/* ================ 41 Per-buffer information 42 ================ */ 43 44typedef struct _sbuf_info { 45 _sbuf_info *fNext; /* next "newer" buffer in the chain */ 46 subscriber_id fAvailTo; /* next subscriber to acquire this buffer */ 47 subscriber_id fHeldBy; /* subscriber that's acquired this buffer */ 48 bigtime_t fAcqTime; /* time at which this buffer was acquired */ 49 area_id fAreaID; /* for system memory allocation calls */ 50 char *fAddress; 51 int32 fSize; /* usable portion can be smaller than ... */ 52 int32 fAreaSize; /* ... the size of the area. */ 53 bool fIsFinal; /* TRUE => stream is stopping */ 54} *buffer_id; 55 56 57/* ================ 58 Interface definition for BBufferStream class 59 ================ */ 60 61/* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small 62 * enough so that a BBufferStream structure fits in one 4096 byte page. 63 */ 64#define B_MAX_SUBSCRIBER_COUNT 52 65#define B_MAX_BUFFER_COUNT 32 66 67class BBufferStream; 68class BBufferStreamManager; 69 70typedef BBufferStream* stream_id; // for now 71 72 73class BAbstractBufferStream 74{ 75public: 76#if __GNUC__ > 3 77 virtual ~BAbstractBufferStream(); 78#endif 79 80 virtual status_t GetStreamParameters(size_t *bufferSize, 81 int32 *bufferCount, 82 bool *isRunning, 83 int32 *subscriberCount) const; 84 85 virtual status_t SetStreamBuffers(size_t bufferSize, 86 int32 bufferCount); 87 88 virtual status_t StartStreaming(); 89 virtual status_t StopStreaming(); 90 91protected: 92 93virtual void _ReservedAbstractBufferStream1(); 94virtual void _ReservedAbstractBufferStream2(); 95virtual void _ReservedAbstractBufferStream3(); 96virtual void _ReservedAbstractBufferStream4(); 97 98 friend class BSubscriber; 99 friend class BBufferStreamManager; 100 101 virtual stream_id StreamID() const; 102 /* stream identifier for direct access */ 103 104 /* Create or delete a subscriber id for subsequent operations */ 105 virtual status_t Subscribe(char *name, 106 subscriber_id *subID, 107 sem_id semID); 108 virtual status_t Unsubscribe(subscriber_id subID); 109 110/* Enter into or quit the stream */ 111 virtual status_t EnterStream(subscriber_id subID, 112 subscriber_id neighbor, 113 bool before); 114 115 virtual status_t ExitStream(subscriber_id subID); 116 117 virtual BMessenger* Server() const; /* message pipe to server */ 118 status_t SendRPC(BMessage* msg, BMessage* reply = NULL) const; 119}; 120 121 122class BBufferStream : public BAbstractBufferStream 123{ 124public: 125 126 BBufferStream(size_t headerSize, 127 BBufferStreamManager* controller, 128 BSubscriber* headFeeder, 129 BSubscriber* tailFeeder); 130 virtual ~BBufferStream(); 131 132/* BBufferStreams are allocated on shared memory pages */ 133 void *operator new(size_t size); 134 void operator delete(void *stream, size_t size); 135 136/* Return header size */ 137 size_t HeaderSize() const; 138 139/* These four functions are delegated to the stream controller */ 140 status_t GetStreamParameters(size_t *bufferSize, 141 int32 *bufferCount, 142 bool *isRunning, 143 int32 *subscriberCount) const; 144 145 status_t SetStreamBuffers(size_t bufferSize, 146 int32 bufferCount); 147 148 status_t StartStreaming(); 149 status_t StopStreaming(); 150 151/* Get the controller for delegation */ 152 BBufferStreamManager *StreamManager() const; 153 154/* number of buffers in stream */ 155 int32 CountBuffers() const; 156 157/* Create or delete a subscriber id for subsequent operations */ 158 status_t Subscribe(char *name, 159 subscriber_id *subID, 160 sem_id semID); 161 162 status_t Unsubscribe(subscriber_id subID); 163 164/* Enter into or quit the stream */ 165 status_t EnterStream(subscriber_id subID, 166 subscriber_id neighbor, 167 bool before); 168 169 status_t ExitStream(subscriber_id subID); 170 171/* queries about a subscriber */ 172 bool IsSubscribed(subscriber_id subID); 173 bool IsEntered(subscriber_id subID); 174 175 status_t SubscriberInfo(subscriber_id subID, 176 char** name, 177 stream_id* streamID, 178 int32* position); 179 180/* Force an error return of a subscriber if it's blocked */ 181 status_t UnblockSubscriber(subscriber_id subID); 182 183/* Acquire and release a buffer */ 184 status_t AcquireBuffer(subscriber_id subID, 185 buffer_id *bufID, 186 bigtime_t timeout); 187 status_t ReleaseBuffer(subscriber_id subID); 188 189/* Get the attributes of a particular buffer */ 190 size_t BufferSize(buffer_id bufID) const; 191 char *BufferData(buffer_id bufID) const; 192 bool IsFinalBuffer(buffer_id bufID) const; 193 194/* Get attributes of a particular subscriber */ 195 int32 CountBuffersHeld(subscriber_id subID); 196 197/* Queries for the BBufferStream */ 198 int32 CountSubscribers() const; 199 int32 CountEnteredSubscribers() const; 200 201 subscriber_id FirstSubscriber() const; 202 subscriber_id LastSubscriber() const; 203 subscriber_id NextSubscriber(subscriber_id subID); 204 subscriber_id PrevSubscriber(subscriber_id subID); 205 206/* debugging aids */ 207 void PrintStream(); 208 void PrintBuffers(); 209 void PrintSubscribers(); 210 211/* gaining exclusive access to the BBufferStream */ 212 bool Lock(); 213 void Unlock(); 214 215/* introduce a new buffer into the "newest" end of the chain */ 216 status_t AddBuffer(buffer_id bufID); 217 218/* remove a buffer from the "oldest" end of the chain */ 219 buffer_id RemoveBuffer(bool force); 220 221/* allocate a buffer from shared memory and create a bufID for it. */ 222 buffer_id CreateBuffer(size_t size, bool isFinal); 223 224/* deallocate a buffer and returns its bufID to the freelist */ 225 void DestroyBuffer(buffer_id bufID); 226 227/* remove and destroy any "newest" buffers from the head of the chain 228 * that have not yet been claimed by any subscribers. If there are 229 * no subscribers, this clears the entire chain. 230 */ 231 void RescindBuffers(); 232 233/* ================ 234 Private member functions that assume locking already has been done. 235 ================ */ 236 237private: 238 239virtual void _ReservedBufferStream1(); 240virtual void _ReservedBufferStream2(); 241virtual void _ReservedBufferStream3(); 242virtual void _ReservedBufferStream4(); 243 244/* initialize the free list of subscribers */ 245 void InitSubscribers(); 246 247/* return TRUE if subID appears valid */ 248 bool IsSubscribedSafe(subscriber_id subID) const; 249 250/* return TRUE if subID is entered into the stream */ 251 bool IsEnteredSafe(subscriber_id subID) const; 252 253/* initialize the free list of buffer IDs */ 254 void InitBuffers(); 255 256/* Wake a blocked subscriber */ 257 status_t WakeSubscriber(subscriber_id subID); 258 259/* Give subID all the buffers it can get */ 260 void InheritBuffers(subscriber_id subID); 261 262/* Relinquish any buffers held by subID */ 263 void BequeathBuffers(subscriber_id subID); 264 265/* Fast version of ReleaseBuffer() */ 266 status_t ReleaseBufferSafe(subscriber_id subID); 267 268/* Release a buffer to a subscriber */ 269 status_t ReleaseBufferTo(buffer_id bufID, subscriber_id subID); 270 271/* deallocate all buffers */ 272 void FreeAllBuffers(); 273 274/* deallocate all subscribers */ 275 void FreeAllSubscribers(); 276 277/* ================ 278 Private data members 279 ================ */ 280 281 BLocker fLock; 282 area_id fAreaID; /* area id for this BBufferStream */ 283 BBufferStreamManager *fStreamManager; 284 BSubscriber *fHeadFeeder; 285 BSubscriber *fTailFeeder; 286 size_t fHeaderSize; 287 288 /* ================ 289 subscribers 290 ================ */ 291 292 _sub_info *fFreeSubs; /* free list of subscribers */ 293 _sub_info *fFirstSub; /* first entered in itinierary */ 294 _sub_info *fLastSub; /* last entered in itinerary */ 295 296 sem_id fFirstSem; /* semaphore used by fFirstSub */ 297 int32 fSubCount; 298 int32 fEnteredSubCount; 299 300 _sub_info fSubscribers[B_MAX_SUBSCRIBER_COUNT]; 301 302 /* ================ 303 buffers 304 ================ */ 305 306 _sbuf_info *fFreeBuffers; 307 _sbuf_info *fOldestBuffer; /* first in line */ 308 _sbuf_info *fNewestBuffer; /* fNewest->fNext = NULL */ 309 int32 fCountBuffers; 310 311 _sbuf_info fBuffers[B_MAX_BUFFER_COUNT]; 312 313 uint32 _reserved[4]; 314}; 315 316#endif // #ifdef _BUFFER_STREAM_H 317