1/******************************************************************************
2
3	File:	BufferStream.h
4
5	Copyright 1995-97, Be Incorporated
6
7******************************************************************************/
8#ifndef _BUFFER_STREAM_H
9#define _BUFFER_STREAM_H
10
11#include <stdlib.h>
12#include <OS.h>
13#include <SupportDefs.h>
14#include <Locker.h>
15#include <Messenger.h>
16
17
18class BSubscriber;
19
20
21/* ================
22   Per-subscriber information.
23   ================ */
24
25struct _sbuf_info;
26
27typedef struct _sub_info {
28  _sub_info			*fNext;		/* next subscriber in the stream*/
29  _sub_info			*fPrev;		/* previous subscriber in the stream */
30  _sbuf_info		*fRel;		/* next buf to be released */
31  _sbuf_info		*fAcq;		/* next buf to be acquired */
32  sem_id			fSem;		/* semaphore used for blocking */
33  bigtime_t			fTotalTime;	/* accumulated time between acq/rel */
34  int32				fHeld;		/* # of buffers acq'd but not yet rel'd */
35  sem_id			fBlockedOn;	/* the semaphore being waited on */
36								/* or B_BAD_SEM_ID if not blocked */
37} *subscriber_id;
38
39
40/* ================
41   Per-buffer information
42   ================ */
43
44typedef struct _sbuf_info {
45  _sbuf_info		*fNext;		/* next "newer" buffer in the chain */
46  subscriber_id		fAvailTo;	/* next subscriber to acquire this buffer */
47  subscriber_id		fHeldBy;	/* subscriber that's acquired this buffer */
48  bigtime_t			fAcqTime;	/* time at which this buffer was acquired */
49  area_id			fAreaID;	/* for system memory allocation calls */
50  char				*fAddress;
51  int32				fSize;     /* usable portion can be smaller than ... */
52  int32				fAreaSize; /* ... the size of the area. */
53  bool				fIsFinal;	/* TRUE => stream is stopping */
54} *buffer_id;
55
56
57/* ================
58   Interface definition for BBufferStream class
59   ================ */
60
61/* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small
62 * enough so that a BBufferStream structure fits in one 4096 byte page.
63 */
64#define	B_MAX_SUBSCRIBER_COUNT	52
65#define	B_MAX_BUFFER_COUNT		32
66
67class BBufferStream;
68class BBufferStreamManager;
69
70typedef BBufferStream* stream_id;  // for now
71
72
73class BAbstractBufferStream
74{
75public:
76#if __GNUC__ > 3
77	virtual				~BAbstractBufferStream();
78#endif
79
80	virtual	status_t	GetStreamParameters(size_t *bufferSize,
81											int32 *bufferCount,
82											bool *isRunning,
83											int32 *subscriberCount) const;
84
85	virtual	status_t	SetStreamBuffers(size_t bufferSize,
86										 int32 bufferCount);
87
88	virtual	status_t	StartStreaming();
89	virtual	status_t	StopStreaming();
90
91protected:
92
93virtual	void		_ReservedAbstractBufferStream1();
94virtual	void		_ReservedAbstractBufferStream2();
95virtual	void		_ReservedAbstractBufferStream3();
96virtual	void		_ReservedAbstractBufferStream4();
97
98	friend class BSubscriber;
99	friend class BBufferStreamManager;
100
101	virtual stream_id	StreamID() const;
102	/* stream identifier for direct access */
103
104	/* Create or delete a subscriber id for subsequent operations */
105	virtual	status_t	Subscribe(char *name,
106								  subscriber_id *subID,
107								  sem_id semID);
108	virtual	status_t	Unsubscribe(subscriber_id subID);
109
110/* Enter into or quit the stream */
111	virtual	status_t	EnterStream(subscriber_id subID,
112									subscriber_id neighbor,
113									bool before);
114
115	virtual	status_t	ExitStream(subscriber_id subID);
116
117	virtual BMessenger*	Server() const;	/* message pipe to server */
118	status_t 		SendRPC(BMessage* msg, BMessage* reply = NULL) const;
119};
120
121
122class BBufferStream : public BAbstractBufferStream
123{
124public:
125
126						BBufferStream(size_t headerSize,
127									  BBufferStreamManager* controller,
128									  BSubscriber* headFeeder,
129									  BSubscriber* tailFeeder);
130		virtual			~BBufferStream();
131
132/* BBufferStreams are allocated on shared memory pages */
133		void			*operator new(size_t size);
134		void			operator delete(void *stream, size_t size);
135
136/* Return header size */
137		size_t			HeaderSize() const;
138
139/* These four functions are delegated to the stream controller */
140		status_t		GetStreamParameters(size_t *bufferSize,
141											int32 *bufferCount,
142											bool *isRunning,
143											int32 *subscriberCount) const;
144
145		status_t		SetStreamBuffers(size_t bufferSize,
146										 int32 bufferCount);
147
148		status_t		StartStreaming();
149		status_t		StopStreaming();
150
151/* Get the controller for delegation */
152		BBufferStreamManager *StreamManager() const;
153
154/* number of buffers in stream */
155		int32			CountBuffers() const;
156
157/* Create or delete a subscriber id for subsequent operations */
158		status_t		Subscribe(char *name,
159								  subscriber_id *subID,
160								  sem_id semID);
161
162		status_t		Unsubscribe(subscriber_id subID);
163
164/* Enter into or quit the stream */
165		status_t		EnterStream(subscriber_id subID,
166									subscriber_id neighbor,
167									bool before);
168
169		status_t		ExitStream(subscriber_id subID);
170
171/* queries about a subscriber */
172		bool			IsSubscribed(subscriber_id subID);
173		bool			IsEntered(subscriber_id subID);
174
175		status_t		SubscriberInfo(subscriber_id subID,
176									   char** name,
177									   stream_id* streamID,
178									   int32* position);
179
180/* Force an error return of a subscriber if it's blocked */
181		status_t		UnblockSubscriber(subscriber_id subID);
182
183/* Acquire and release a buffer */
184		status_t		AcquireBuffer(subscriber_id subID,
185									  buffer_id *bufID,
186									  bigtime_t timeout);
187		status_t		ReleaseBuffer(subscriber_id subID);
188
189/* Get the attributes of a particular buffer */
190		size_t			BufferSize(buffer_id bufID) const;
191		char			*BufferData(buffer_id bufID) const;
192		bool			IsFinalBuffer(buffer_id bufID) const;
193
194/* Get attributes of a particular subscriber */
195		int32			CountBuffersHeld(subscriber_id subID);
196
197/* Queries for the BBufferStream */
198		int32			CountSubscribers() const;
199		int32			CountEnteredSubscribers() const;
200
201		subscriber_id	FirstSubscriber() const;
202		subscriber_id	LastSubscriber() const;
203		subscriber_id	NextSubscriber(subscriber_id subID);
204		subscriber_id	PrevSubscriber(subscriber_id subID);
205
206/* debugging aids */
207		void			PrintStream();
208		void			PrintBuffers();
209		void			PrintSubscribers();
210
211/* gaining exclusive access to the BBufferStream */
212		bool 			Lock();
213		void			Unlock();
214
215/* introduce a new buffer into the "newest" end of the chain */
216		status_t		AddBuffer(buffer_id bufID);
217
218/* remove a buffer from the "oldest" end of the chain */
219		buffer_id		RemoveBuffer(bool force);
220
221/* allocate a buffer from shared memory and create a bufID for it. */
222		buffer_id		CreateBuffer(size_t size, bool isFinal);
223
224/* deallocate a buffer and returns its bufID to the freelist */
225		void			DestroyBuffer(buffer_id bufID);
226
227/* remove and destroy any "newest" buffers from the head of the chain
228 * that have not yet been claimed by any subscribers.  If there are
229 * no subscribers, this clears the entire chain.
230 */
231		void			RescindBuffers();
232
233/* ================
234   Private member functions that assume locking already has been done.
235   ================ */
236
237private:
238
239virtual	void		_ReservedBufferStream1();
240virtual	void		_ReservedBufferStream2();
241virtual	void		_ReservedBufferStream3();
242virtual	void		_ReservedBufferStream4();
243
244/* initialize the free list of subscribers */
245		void			InitSubscribers();
246
247/* return TRUE if subID appears valid */
248		bool			IsSubscribedSafe(subscriber_id subID) const;
249
250/* return TRUE if subID is entered into the stream */
251		bool			IsEnteredSafe(subscriber_id subID) const;
252
253/* initialize the free list of buffer IDs */
254		void			InitBuffers();
255
256/* Wake a blocked subscriber */
257		status_t		WakeSubscriber(subscriber_id subID);
258
259/* Give subID all the buffers it can get */
260		void			InheritBuffers(subscriber_id subID);
261
262/* Relinquish any buffers held by subID */
263		void			BequeathBuffers(subscriber_id subID);
264
265/* Fast version of ReleaseBuffer() */
266		status_t		ReleaseBufferSafe(subscriber_id subID);
267
268/* Release a buffer to a subscriber */
269		status_t		ReleaseBufferTo(buffer_id bufID, subscriber_id subID);
270
271/* deallocate all buffers */
272		void			FreeAllBuffers();
273
274/* deallocate all subscribers */
275		void			FreeAllSubscribers();
276
277/* ================
278   Private data members
279   ================ */
280
281		BLocker				fLock;
282		area_id				fAreaID;		/* area id for this BBufferStream */
283		BBufferStreamManager	*fStreamManager;
284		BSubscriber			*fHeadFeeder;
285		BSubscriber			*fTailFeeder;
286		size_t				fHeaderSize;
287
288		/* ================
289		   subscribers
290		   ================ */
291
292		_sub_info			*fFreeSubs;		/* free list of subscribers */
293		_sub_info			*fFirstSub;		/* first entered in itinierary */
294		_sub_info			*fLastSub;		/* last entered in itinerary */
295
296		sem_id				fFirstSem;		/* semaphore used by fFirstSub */
297		int32				fSubCount;
298		int32				fEnteredSubCount;
299
300		_sub_info			fSubscribers[B_MAX_SUBSCRIBER_COUNT];
301
302		/* ================
303		   buffers
304		   ================ */
305
306		_sbuf_info			*fFreeBuffers;
307		_sbuf_info			*fOldestBuffer;	/* first in line */
308		_sbuf_info			*fNewestBuffer;	/* fNewest->fNext = NULL */
309		int32				fCountBuffers;
310
311		_sbuf_info			fBuffers[B_MAX_BUFFER_COUNT];
312
313		uint32				_reserved[4];
314};
315
316#endif 	// #ifdef _BUFFER_STREAM_H
317