1/*
2 * Copyright 2007-2011, Ingo Weinhold, ingo_weinhold@gmx.de.
3 * Copyright 2003-2010, Axel Dörfler, axeld@pinc-software.de.
4 * Distributed under the terms of the MIT License.
5 */
6
7
8#include "fifo.h"
9
10#include <limits.h>
11#include <stdio.h>
12#include <stdlib.h>
13#include <string.h>
14#include <sys/stat.h>
15
16#include <new>
17
18#include <KernelExport.h>
19#include <NodeMonitor.h>
20#include <Select.h>
21
22#include <condition_variable.h>
23#include <debug.h>
24#include <khash.h>
25#include <lock.h>
26#include <select_sync_pool.h>
27#include <team.h>
28#include <thread.h>
29#include <util/DoublyLinkedList.h>
30#include <util/AutoLock.h>
31#include <util/ring_buffer.h>
32#include <vfs.h>
33#include <vm/vm.h>
34
35
36//#define TRACE_FIFO
37#ifdef TRACE_FIFO
38#	define TRACE(x...) dprintf(x)
39#else
40#	define TRACE(x...)
41#endif
42
43
44#define PIPEFS_HASH_SIZE		16
45#define PIPEFS_MAX_BUFFER_SIZE	32768
46
47
48// TODO: PIPE_BUF is supposed to be defined somewhere else.
49#define PIPE_BUF	_POSIX_PIPE_BUF
50
51
52namespace fifo {
53
54
55struct file_cookie;
56class Inode;
57
58
59class RingBuffer {
60public:
61								RingBuffer();
62								~RingBuffer();
63
64			status_t			CreateBuffer();
65			void				DeleteBuffer();
66
67			size_t				Write(const void* buffer, size_t length);
68			size_t				Read(void* buffer, size_t length);
69			ssize_t				UserWrite(const void* buffer, ssize_t length);
70			ssize_t				UserRead(void* buffer, ssize_t length);
71
72			size_t				Readable() const;
73			size_t				Writable() const;
74
75private:
76			struct ring_buffer*	fBuffer;
77};
78
79
80class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
81public:
82	ReadRequest(file_cookie* cookie)
83		:
84		fThread(thread_get_current_thread()),
85		fCookie(cookie),
86		fNotified(true)
87	{
88		B_INITIALIZE_SPINLOCK(&fLock);
89	}
90
91	void SetNotified(bool notified)
92	{
93		InterruptsSpinLocker _(fLock);
94		fNotified = notified;
95	}
96
97	void Notify(status_t status = B_OK)
98	{
99		InterruptsSpinLocker _(fLock);
100		TRACE("ReadRequest %p::Notify(), fNotified %d\n", this, fNotified);
101
102		if (!fNotified) {
103			SpinLocker schedulerLocker(gSchedulerLock);
104			thread_unblock_locked(fThread, status);
105			fNotified = true;
106		}
107	}
108
109	file_cookie* Cookie() const
110	{
111		return fCookie;
112	}
113
114private:
115	spinlock		fLock;
116	Thread*			fThread;
117	file_cookie*	fCookie;
118	volatile bool	fNotified;
119};
120
121
122class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
123public:
124	WriteRequest(size_t minimalWriteCount)
125		:
126		fMinimalWriteCount(minimalWriteCount)
127	{
128	}
129
130	size_t MinimalWriteCount() const
131	{
132		return fMinimalWriteCount;
133	}
134
135private:
136	size_t	fMinimalWriteCount;
137};
138
139
140typedef DoublyLinkedList<ReadRequest> ReadRequestList;
141typedef DoublyLinkedList<WriteRequest> WriteRequestList;
142
143
144class Inode {
145public:
146								Inode();
147								~Inode();
148
149			status_t			InitCheck();
150
151			bool				IsActive() const { return fActive; }
152			timespec			CreationTime() const { return fCreationTime; }
153			void				SetCreationTime(timespec creationTime)
154									{ fCreationTime = creationTime; }
155			timespec			ModificationTime() const
156									{ return fModificationTime; }
157			void				SetModificationTime(timespec modificationTime)
158									{ fModificationTime = modificationTime; }
159
160			mutex*				RequestLock() { return &fRequestLock; }
161
162			status_t			WriteDataToBuffer(const void* data,
163									size_t* _length, bool nonBlocking);
164			status_t			ReadDataFromBuffer(void* data, size_t* _length,
165									bool nonBlocking, ReadRequest& request);
166			size_t				BytesAvailable() const
167									{ return fBuffer.Readable(); }
168			size_t				BytesWritable() const
169									{ return fBuffer.Writable(); }
170
171			void				AddReadRequest(ReadRequest& request);
172			void				RemoveReadRequest(ReadRequest& request);
173			status_t			WaitForReadRequest(ReadRequest& request);
174
175			void				NotifyBytesRead(size_t bytes);
176			void				NotifyReadDone();
177			void				NotifyBytesWritten(size_t bytes);
178			void				NotifyEndClosed(bool writer);
179
180			void				Open(int openMode);
181			void				Close(int openMode, file_cookie* cookie);
182			int32				ReaderCount() const { return fReaderCount; }
183			int32				WriterCount() const { return fWriterCount; }
184
185			status_t			Select(uint8 event, selectsync* sync,
186									int openMode);
187			status_t			Deselect(uint8 event, selectsync* sync,
188									int openMode);
189
190private:
191			timespec			fCreationTime;
192			timespec			fModificationTime;
193
194			RingBuffer			fBuffer;
195
196			ReadRequestList		fReadRequests;
197			WriteRequestList	fWriteRequests;
198
199			mutex				fRequestLock;
200
201			ConditionVariable	fWriteCondition;
202
203			int32				fReaderCount;
204			int32				fWriterCount;
205			bool				fActive;
206
207			select_sync_pool*	fReadSelectSyncPool;
208			select_sync_pool*	fWriteSelectSyncPool;
209};
210
211
212class FIFOInode : public Inode {
213public:
214	FIFOInode(fs_vnode* vnode)
215		:
216		Inode(),
217		fSuperVnode(*vnode)
218	{
219	}
220
221	fs_vnode*	SuperVnode() { return &fSuperVnode; }
222
223private:
224	fs_vnode	fSuperVnode;
225};
226
227
228struct file_cookie {
229	int				open_mode;
230};
231
232
233// #pragma mark -
234
235
236RingBuffer::RingBuffer()
237	:
238	fBuffer(NULL)
239{
240}
241
242
243RingBuffer::~RingBuffer()
244{
245	DeleteBuffer();
246}
247
248
249status_t
250RingBuffer::CreateBuffer()
251{
252	if (fBuffer != NULL)
253		return B_OK;
254
255	fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
256	return fBuffer != NULL ? B_OK : B_NO_MEMORY;
257}
258
259
260void
261RingBuffer::DeleteBuffer()
262{
263	if (fBuffer != NULL) {
264		delete_ring_buffer(fBuffer);
265		fBuffer = NULL;
266	}
267}
268
269
270inline size_t
271RingBuffer::Write(const void* buffer, size_t length)
272{
273	if (fBuffer == NULL)
274		return B_NO_MEMORY;
275
276	return ring_buffer_write(fBuffer, (const uint8*)buffer, length);
277}
278
279
280inline size_t
281RingBuffer::Read(void* buffer, size_t length)
282{
283	if (fBuffer == NULL)
284		return B_NO_MEMORY;
285
286	return ring_buffer_read(fBuffer, (uint8*)buffer, length);
287}
288
289
290inline ssize_t
291RingBuffer::UserWrite(const void* buffer, ssize_t length)
292{
293	if (fBuffer == NULL)
294		return B_NO_MEMORY;
295
296	return ring_buffer_user_write(fBuffer, (const uint8*)buffer, length);
297}
298
299
300inline ssize_t
301RingBuffer::UserRead(void* buffer, ssize_t length)
302{
303	if (fBuffer == NULL)
304		return B_NO_MEMORY;
305
306	return ring_buffer_user_read(fBuffer, (uint8*)buffer, length);
307}
308
309
310inline size_t
311RingBuffer::Readable() const
312{
313	return fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0;
314}
315
316
317inline size_t
318RingBuffer::Writable() const
319{
320	return fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0;
321}
322
323
324//	#pragma mark -
325
326
327Inode::Inode()
328	:
329	fReadRequests(),
330	fWriteRequests(),
331	fReaderCount(0),
332	fWriterCount(0),
333	fActive(false),
334	fReadSelectSyncPool(NULL),
335	fWriteSelectSyncPool(NULL)
336{
337	fWriteCondition.Publish(this, "pipe");
338	mutex_init(&fRequestLock, "pipe request");
339
340	bigtime_t time = real_time_clock();
341	fModificationTime.tv_sec = time / 1000000;
342	fModificationTime.tv_nsec = (time % 1000000) * 1000;
343	fCreationTime = fModificationTime;
344}
345
346
347Inode::~Inode()
348{
349	fWriteCondition.Unpublish();
350	mutex_destroy(&fRequestLock);
351}
352
353
354status_t
355Inode::InitCheck()
356{
357	return B_OK;
358}
359
360
361/*!	Writes the specified data bytes to the inode's ring buffer. The
362	request lock must be held when calling this method.
363	Notifies readers if necessary, so that blocking readers will get started.
364	Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
365	and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
366	the returned length is > 0, the returned error code can be ignored.
367*/
368status_t
369Inode::WriteDataToBuffer(const void* _data, size_t* _length, bool nonBlocking)
370{
371	const uint8* data = (const uint8*)_data;
372	size_t dataSize = *_length;
373	size_t& written = *_length;
374	written = 0;
375
376	TRACE("Inode %p::WriteDataToBuffer(data = %p, bytes = %zu)\n", this, data,
377		dataSize);
378
379	// According to the standard, request up to PIPE_BUF bytes shall not be
380	// interleaved with other writer's data.
381	size_t minToWrite = 1;
382	if (dataSize <= PIPE_BUF)
383		minToWrite = dataSize;
384
385	while (dataSize > 0) {
386		// Wait until enough space in the buffer is available.
387		while (!fActive
388				|| (fBuffer.Writable() < minToWrite && fReaderCount > 0)) {
389			if (nonBlocking)
390				return B_WOULD_BLOCK;
391
392			ConditionVariableEntry entry;
393			entry.Add(this);
394
395			WriteRequest request(minToWrite);
396			fWriteRequests.Add(&request);
397
398			mutex_unlock(&fRequestLock);
399			status_t status = entry.Wait(B_CAN_INTERRUPT);
400			mutex_lock(&fRequestLock);
401
402			fWriteRequests.Remove(&request);
403
404			if (status != B_OK)
405				return status;
406		}
407
408		// write only as long as there are readers left
409		if (fActive && fReaderCount == 0) {
410			if (written == 0)
411				send_signal(find_thread(NULL), SIGPIPE);
412			return EPIPE;
413		}
414
415		// write as much as we can
416
417		size_t toWrite = (fActive ? fBuffer.Writable() : 0);
418		if (toWrite > dataSize)
419			toWrite = dataSize;
420
421		if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
422			return B_BAD_ADDRESS;
423
424		data += toWrite;
425		dataSize -= toWrite;
426		written += toWrite;
427
428		NotifyBytesWritten(toWrite);
429	}
430
431	return B_OK;
432}
433
434
435status_t
436Inode::ReadDataFromBuffer(void* data, size_t* _length, bool nonBlocking,
437	ReadRequest& request)
438{
439	size_t dataSize = *_length;
440	*_length = 0;
441
442	// wait until our request is first in queue
443	status_t error;
444	if (fReadRequests.Head() != &request) {
445		if (nonBlocking)
446			return B_WOULD_BLOCK;
447
448		TRACE("Inode %p::%s(): wait for request %p to become the first "
449			"request.\n", this, __FUNCTION__, &request);
450
451		error = WaitForReadRequest(request);
452		if (error != B_OK)
453			return error;
454	}
455
456	// wait until data are available
457	while (fBuffer.Readable() == 0) {
458		if (nonBlocking)
459			return B_WOULD_BLOCK;
460
461		if (fActive && fWriterCount == 0)
462			return B_OK;
463
464		TRACE("Inode %p::%s(): wait for data, request %p\n", this, __FUNCTION__,
465			&request);
466
467		error = WaitForReadRequest(request);
468		if (error != B_OK)
469			return error;
470	}
471
472	// read as much as we can
473	size_t toRead = fBuffer.Readable();
474	if (toRead > dataSize)
475		toRead = dataSize;
476
477	if (fBuffer.UserRead(data, toRead) < B_OK)
478		return B_BAD_ADDRESS;
479
480	NotifyBytesRead(toRead);
481
482	*_length = toRead;
483
484	return B_OK;
485}
486
487
488void
489Inode::AddReadRequest(ReadRequest& request)
490{
491	fReadRequests.Add(&request);
492}
493
494
495void
496Inode::RemoveReadRequest(ReadRequest& request)
497{
498	fReadRequests.Remove(&request);
499}
500
501
502status_t
503Inode::WaitForReadRequest(ReadRequest& request)
504{
505	// add the entry to wait on
506	thread_prepare_to_block(thread_get_current_thread(), B_CAN_INTERRUPT,
507		THREAD_BLOCK_TYPE_OTHER, "fifo read request");
508
509	request.SetNotified(false);
510
511	// wait
512	mutex_unlock(&fRequestLock);
513	status_t status = thread_block();
514
515	// Before going to lock again, we need to make sure no one tries to
516	// unblock us. Otherwise that would screw with mutex_lock().
517	request.SetNotified(true);
518
519	mutex_lock(&fRequestLock);
520
521	return status;
522}
523
524
525void
526Inode::NotifyBytesRead(size_t bytes)
527{
528	// notify writer, if something can be written now
529	size_t writable = fBuffer.Writable();
530	if (bytes > 0) {
531		// notify select()ors only, if nothing was writable before
532		if (writable == bytes) {
533			if (fWriteSelectSyncPool)
534				notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
535		}
536
537		// If any of the waiting writers has a minimal write count that has
538		// now become satisfied, we notify all of them (condition variables
539		// don't support doing that selectively).
540		WriteRequest* request;
541		WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
542		while ((request = iterator.Next()) != NULL) {
543			size_t minWriteCount = request->MinimalWriteCount();
544			if (minWriteCount > 0 && minWriteCount <= writable
545					&& minWriteCount > writable - bytes) {
546				fWriteCondition.NotifyAll();
547				break;
548			}
549		}
550	}
551}
552
553
554void
555Inode::NotifyReadDone()
556{
557	// notify next reader, if there's still something to be read
558	if (fBuffer.Readable() > 0) {
559		if (ReadRequest* request = fReadRequests.First())
560			request->Notify();
561	}
562}
563
564
565void
566Inode::NotifyBytesWritten(size_t bytes)
567{
568	// notify reader, if something can be read now
569	if (bytes > 0 && fBuffer.Readable() == bytes) {
570		if (fReadSelectSyncPool)
571			notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
572
573		if (ReadRequest* request = fReadRequests.First())
574			request->Notify();
575	}
576}
577
578
579void
580Inode::NotifyEndClosed(bool writer)
581{
582	TRACE("Inode %p::%s(%s)\n", this, __FUNCTION__,
583		writer ? "writer" : "reader");
584
585	if (writer) {
586		// Our last writer has been closed; if the pipe
587		// contains no data, unlock all waiting readers
588		TRACE("  buffer readable: %zu\n", fBuffer.Readable());
589		if (fBuffer.Readable() == 0) {
590			ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
591			while (ReadRequest* request = iterator.Next())
592				request->Notify();
593
594			if (fReadSelectSyncPool)
595				notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
596		}
597	} else {
598		// Last reader is gone. Wake up all writers.
599		fWriteCondition.NotifyAll();
600
601		if (fWriteSelectSyncPool) {
602			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
603			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
604		}
605	}
606}
607
608
609void
610Inode::Open(int openMode)
611{
612	MutexLocker locker(RequestLock());
613
614	if ((openMode & O_ACCMODE) == O_WRONLY)
615		fWriterCount++;
616
617	if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
618		fReaderCount++;
619
620	if (fReaderCount > 0 && fWriterCount > 0) {
621		TRACE("Inode %p::Open(): fifo becomes active\n", this);
622		fBuffer.CreateBuffer();
623		fActive = true;
624
625		// notify all waiting writers that they can start
626		if (fWriteSelectSyncPool)
627			notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
628		fWriteCondition.NotifyAll();
629	}
630}
631
632
633void
634Inode::Close(int openMode, file_cookie* cookie)
635{
636	TRACE("Inode %p::Close(openMode = %d)\n", this, openMode);
637
638	MutexLocker locker(RequestLock());
639
640	// Notify all currently reading file descriptors
641	ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
642	while (ReadRequest* request = iterator.Next()) {
643		if (request->Cookie() == cookie)
644			request->Notify(B_FILE_ERROR);
645	}
646
647	if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
648		NotifyEndClosed(true);
649
650	if ((openMode & O_ACCMODE) == O_RDONLY
651		|| (openMode & O_ACCMODE) == O_RDWR) {
652		if (--fReaderCount == 0)
653			NotifyEndClosed(false);
654	}
655
656	if (fWriterCount == 0) {
657		// Notify any still reading writers to stop
658		// TODO: This only works reliable if there is only one writer - we could
659		// do the same thing done for the read requests.
660		fWriteCondition.NotifyAll(B_FILE_ERROR);
661	}
662
663	if (fReaderCount == 0 && fWriterCount == 0) {
664		fActive = false;
665		fBuffer.DeleteBuffer();
666	}
667}
668
669
670status_t
671Inode::Select(uint8 event, selectsync* sync, int openMode)
672{
673	bool writer = true;
674	select_sync_pool** pool;
675	if ((openMode & O_RWMASK) == O_RDONLY) {
676		pool = &fReadSelectSyncPool;
677		writer = false;
678	} else if ((openMode & O_RWMASK) == O_WRONLY) {
679		pool = &fWriteSelectSyncPool;
680	} else
681		return B_NOT_ALLOWED;
682
683	if (add_select_sync_pool_entry(pool, sync, event) != B_OK)
684		return B_ERROR;
685
686	// signal right away, if the condition holds already
687	if (writer) {
688		if ((event == B_SELECT_WRITE
689				&& (fBuffer.Writable() > 0 || fReaderCount == 0))
690			|| (event == B_SELECT_ERROR && fReaderCount == 0)) {
691			return notify_select_event(sync, event);
692		}
693	} else {
694		if (event == B_SELECT_READ
695				&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
696			return notify_select_event(sync, event);
697		}
698	}
699
700	return B_OK;
701}
702
703
704status_t
705Inode::Deselect(uint8 event, selectsync* sync, int openMode)
706{
707	select_sync_pool** pool;
708	if ((openMode & O_RWMASK) == O_RDONLY) {
709		pool = &fReadSelectSyncPool;
710	} else if ((openMode & O_RWMASK) == O_WRONLY) {
711		pool = &fWriteSelectSyncPool;
712	} else
713		return B_NOT_ALLOWED;
714
715	remove_select_sync_pool_entry(pool, sync, event);
716	return B_OK;
717}
718
719
720//	#pragma mark - vnode API
721
722
723static status_t
724fifo_put_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
725{
726	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
727	fs_vnode* superVnode = fifo->SuperVnode();
728
729	status_t error = B_OK;
730	if (superVnode->ops->put_vnode != NULL)
731		error = superVnode->ops->put_vnode(volume, superVnode, reenter);
732
733	delete fifo;
734
735	return error;
736}
737
738
739static status_t
740fifo_remove_vnode(fs_volume* volume, fs_vnode* vnode, bool reenter)
741{
742	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
743	fs_vnode* superVnode = fifo->SuperVnode();
744
745	status_t error = B_OK;
746	if (superVnode->ops->remove_vnode != NULL)
747		error = superVnode->ops->remove_vnode(volume, superVnode, reenter);
748
749	delete fifo;
750
751	return error;
752}
753
754
755static status_t
756fifo_open(fs_volume* _volume, fs_vnode* _node, int openMode,
757	void** _cookie)
758{
759	Inode* inode = (Inode*)_node->private_node;
760
761	TRACE("fifo_open(): node = %p, openMode = %d\n", inode, openMode);
762
763	file_cookie* cookie = (file_cookie*)malloc(sizeof(file_cookie));
764	if (cookie == NULL)
765		return B_NO_MEMORY;
766
767	TRACE("  open cookie = %p\n", cookie);
768	cookie->open_mode = openMode;
769	inode->Open(openMode);
770
771	*_cookie = (void*)cookie;
772
773	return B_OK;
774}
775
776
777static status_t
778fifo_close(fs_volume* volume, fs_vnode* vnode, void* _cookie)
779{
780	file_cookie* cookie = (file_cookie*)_cookie;
781	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
782
783	fifo->Close(cookie->open_mode, cookie);
784
785	return B_OK;
786}
787
788
789static status_t
790fifo_free_cookie(fs_volume* _volume, fs_vnode* _node, void* _cookie)
791{
792	file_cookie* cookie = (file_cookie*)_cookie;
793
794	TRACE("fifo_freecookie: entry vnode %p, cookie %p\n", _node, _cookie);
795
796	free(cookie);
797
798	return B_OK;
799}
800
801
802static status_t
803fifo_fsync(fs_volume* _volume, fs_vnode* _node)
804{
805	return B_OK;
806}
807
808
809static status_t
810fifo_read(fs_volume* _volume, fs_vnode* _node, void* _cookie,
811	off_t /*pos*/, void* buffer, size_t* _length)
812{
813	file_cookie* cookie = (file_cookie*)_cookie;
814	Inode* inode = (Inode*)_node->private_node;
815
816	TRACE("fifo_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
817		inode, cookie, *_length, cookie->open_mode);
818
819	if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
820		return B_NOT_ALLOWED;
821
822	MutexLocker locker(inode->RequestLock());
823
824	if (inode->IsActive() && inode->WriterCount() == 0) {
825		// as long there is no writer, and the pipe is empty,
826		// we always just return 0 to indicate end of file
827		if (inode->BytesAvailable() == 0) {
828			*_length = 0;
829			return B_OK;
830		}
831	}
832
833	// issue read request
834
835	ReadRequest request(cookie);
836	inode->AddReadRequest(request);
837
838	TRACE("  issue read request %p\n", &request);
839
840	size_t length = *_length;
841	status_t status = inode->ReadDataFromBuffer(buffer, &length,
842		(cookie->open_mode & O_NONBLOCK) != 0, request);
843
844	inode->RemoveReadRequest(request);
845	inode->NotifyReadDone();
846
847	TRACE("  done reading request %p, length %zu\n", &request, length);
848
849	if (length > 0)
850		status = B_OK;
851
852	*_length = length;
853	return status;
854}
855
856
857static status_t
858fifo_write(fs_volume* _volume, fs_vnode* _node, void* _cookie,
859	off_t /*pos*/, const void* buffer, size_t* _length)
860{
861	file_cookie* cookie = (file_cookie*)_cookie;
862	Inode* inode = (Inode*)_node->private_node;
863
864	TRACE("fifo_write(vnode = %p, cookie = %p, length = %lu)\n",
865		_node, cookie, *_length);
866
867	if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
868		return B_NOT_ALLOWED;
869
870	MutexLocker locker(inode->RequestLock());
871
872	size_t length = *_length;
873	if (length == 0)
874		return B_OK;
875
876	// copy data into ring buffer
877	status_t status = inode->WriteDataToBuffer(buffer, &length,
878		(cookie->open_mode & O_NONBLOCK) != 0);
879
880	if (length > 0)
881		status = B_OK;
882
883	*_length = length;
884	return status;
885}
886
887
888static status_t
889fifo_read_stat(fs_volume* volume, fs_vnode* vnode, struct ::stat* st)
890{
891	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
892	fs_vnode* superVnode = fifo->SuperVnode();
893
894	if (superVnode->ops->read_stat == NULL)
895		return B_BAD_VALUE;
896
897	status_t error = superVnode->ops->read_stat(volume, superVnode, st);
898	if (error != B_OK)
899		return error;
900
901
902	MutexLocker locker(fifo->RequestLock());
903
904	st->st_size = fifo->BytesAvailable();
905
906	st->st_blksize = 4096;
907
908	// TODO: Just pass the changes to our modification time on to the super node.
909	st->st_atim.tv_sec = time(NULL);
910	st->st_atim.tv_nsec = 0;
911	st->st_mtim = st->st_ctim = fifo->ModificationTime();
912
913	return B_OK;
914}
915
916
917static status_t
918fifo_write_stat(fs_volume* volume, fs_vnode* vnode, const struct ::stat* st,
919	uint32 statMask)
920{
921	// we cannot change the size of anything
922	if ((statMask & B_STAT_SIZE) != 0)
923		return B_BAD_VALUE;
924
925	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
926	fs_vnode* superVnode = fifo->SuperVnode();
927
928	if (superVnode->ops->write_stat == NULL)
929		return B_BAD_VALUE;
930
931	status_t error = superVnode->ops->write_stat(volume, superVnode, st,
932		statMask);
933	if (error != B_OK)
934		return error;
935
936	return B_OK;
937}
938
939
940static status_t
941fifo_ioctl(fs_volume* _volume, fs_vnode* _vnode, void* _cookie, uint32 op,
942	void* buffer, size_t length)
943{
944	TRACE("fifo_ioctl: vnode %p, cookie %p, op %ld, buf %p, len %ld\n",
945		_vnode, _cookie, op, buffer, length);
946
947	return EINVAL;
948}
949
950
951static status_t
952fifo_set_flags(fs_volume* _volume, fs_vnode* _vnode, void* _cookie,
953	int flags)
954{
955	file_cookie* cookie = (file_cookie*)_cookie;
956
957	TRACE("fifo_set_flags(vnode = %p, flags = %x)\n", _vnode, flags);
958	cookie->open_mode = (cookie->open_mode & ~(O_APPEND | O_NONBLOCK)) | flags;
959	return B_OK;
960}
961
962
963static status_t
964fifo_select(fs_volume* _volume, fs_vnode* _node, void* _cookie,
965	uint8 event, selectsync* sync)
966{
967	file_cookie* cookie = (file_cookie*)_cookie;
968
969	TRACE("fifo_select(vnode = %p)\n", _node);
970	Inode* inode = (Inode*)_node->private_node;
971	if (!inode)
972		return B_ERROR;
973
974	MutexLocker locker(inode->RequestLock());
975	return inode->Select(event, sync, cookie->open_mode);
976}
977
978
979static status_t
980fifo_deselect(fs_volume* _volume, fs_vnode* _node, void* _cookie,
981	uint8 event, selectsync* sync)
982{
983	file_cookie* cookie = (file_cookie*)_cookie;
984
985	TRACE("fifo_deselect(vnode = %p)\n", _node);
986	Inode* inode = (Inode*)_node->private_node;
987	if (inode == NULL)
988		return B_ERROR;
989
990	MutexLocker locker(inode->RequestLock());
991	return inode->Deselect(event, sync, cookie->open_mode);
992}
993
994
995static bool
996fifo_can_page(fs_volume* _volume, fs_vnode* _node, void* cookie)
997{
998	return false;
999}
1000
1001
1002static status_t
1003fifo_read_pages(fs_volume* _volume, fs_vnode* _node, void* cookie, off_t pos,
1004	const iovec* vecs, size_t count, size_t* _numBytes)
1005{
1006	return B_NOT_ALLOWED;
1007}
1008
1009
1010static status_t
1011fifo_write_pages(fs_volume* _volume, fs_vnode* _node, void* cookie,
1012	off_t pos, const iovec* vecs, size_t count, size_t* _numBytes)
1013{
1014	return B_NOT_ALLOWED;
1015}
1016
1017
1018static status_t
1019fifo_get_super_vnode(fs_volume* volume, fs_vnode* vnode, fs_volume* superVolume,
1020	fs_vnode* _superVnode)
1021{
1022	FIFOInode* fifo = (FIFOInode*)vnode->private_node;
1023	fs_vnode* superVnode = fifo->SuperVnode();
1024
1025	if (superVnode->ops->get_super_vnode != NULL) {
1026		return superVnode->ops->get_super_vnode(volume, superVnode, superVolume,
1027			_superVnode);
1028	}
1029
1030	*_superVnode = *superVnode;
1031
1032	return B_OK;
1033}
1034
1035
1036static fs_vnode_ops sFIFOVnodeOps = {
1037	NULL,	// lookup
1038	NULL,	// get_vnode_name
1039					// TODO: This is suboptimal! We'd need to forward the
1040					// super node's hook, if it has got one.
1041
1042	&fifo_put_vnode,
1043	&fifo_remove_vnode,
1044
1045	&fifo_can_page,
1046	&fifo_read_pages,
1047	&fifo_write_pages,
1048
1049	NULL,	// io()
1050	NULL,	// cancel_io()
1051
1052	NULL,	// get_file_map
1053
1054	/* common */
1055	&fifo_ioctl,
1056	&fifo_set_flags,
1057	&fifo_select,
1058	&fifo_deselect,
1059	&fifo_fsync,
1060
1061	NULL,	// fs_read_link
1062	NULL,	// fs_symlink
1063	NULL,	// fs_link
1064	NULL,	// unlink
1065	NULL,	// rename
1066
1067	NULL,	// fs_access()
1068	&fifo_read_stat,
1069	&fifo_write_stat,
1070	NULL,
1071
1072	/* file */
1073	NULL,	// create()
1074	&fifo_open,
1075	&fifo_close,
1076	&fifo_free_cookie,
1077	&fifo_read,
1078	&fifo_write,
1079
1080	/* directory */
1081	NULL,	// create_dir
1082	NULL,	// remove_dir
1083	NULL,	// open_dir
1084	NULL,	// close_dir
1085	NULL,	// free_dir_cookie
1086	NULL,	// read_dir
1087	NULL,	// rewind_dir
1088
1089	/* attribute directory operations */
1090	NULL,	// open_attr_dir
1091	NULL,	// close_attr_dir
1092	NULL,	// free_attr_dir_cookie
1093	NULL,	// read_attr_dir
1094	NULL,	// rewind_attr_dir
1095
1096	/* attribute operations */
1097	NULL,	// create_attr
1098	NULL,	// open_attr
1099	NULL,	// close_attr
1100	NULL,	// free_attr_cookie
1101	NULL,	// read_attr
1102	NULL,	// write_attr
1103
1104	NULL,	// read_attr_stat
1105	NULL,	// write_attr_stat
1106	NULL,	// rename_attr
1107	NULL,	// remove_attr
1108
1109	/* support for node and FS layers */
1110	NULL,	// create_special_node
1111	&fifo_get_super_vnode,
1112};
1113
1114
1115}	// namespace fifo
1116
1117
1118using namespace fifo;
1119
1120
1121// #pragma mark -
1122
1123
1124status_t
1125create_fifo_vnode(fs_volume* superVolume, fs_vnode* vnode)
1126{
1127	FIFOInode* fifo = new(std::nothrow) FIFOInode(vnode);
1128	if (fifo == NULL)
1129		return B_NO_MEMORY;
1130
1131	status_t status = fifo->InitCheck();
1132	if (status != B_OK) {
1133		delete fifo;
1134		return status;
1135	}
1136
1137	vnode->private_node = fifo;
1138	vnode->ops = &sFIFOVnodeOps;
1139
1140	return B_OK;
1141}
1142