1/*
2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6#include "UnixFifo.h"
7
8#include <new>
9
10#include <AutoDeleter.h>
11
12#include <net_stack.h>
13#include <util/ring_buffer.h>
14
15#include "unix.h"
16
17
18#define UNIX_FIFO_DEBUG_LEVEL	0
19#define UNIX_DEBUG_LEVEL		UNIX_FIFO_DEBUG_LEVEL
20#include "UnixDebug.h"
21
22
23// #pragma mark - UnixRequest
24
25
26UnixRequest::UnixRequest(const iovec* vecs, size_t count,
27		ancillary_data_container* ancillaryData,
28		struct sockaddr_storage* address)
29	:
30	fVecs(vecs),
31	fVecCount(count),
32	fAncillaryData(ancillaryData),
33	fTotalSize(0),
34	fBytesTransferred(0),
35	fVecIndex(0),
36	fVecOffset(0),
37	fAddress(address)
38{
39	for (size_t i = 0; i < fVecCount; i++)
40		fTotalSize += fVecs[i].iov_len;
41}
42
43
44void
45UnixRequest::AddBytesTransferred(size_t size)
46{
47	fBytesTransferred += size;
48
49	// also adjust the current iovec index/offset
50	while (fVecIndex < fVecCount
51			&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
52		size -= fVecs[fVecIndex].iov_len - fVecOffset;
53		fVecIndex++;
54		fVecOffset = 0;
55	}
56
57	if (fVecIndex < fVecCount)
58		fVecOffset += size;
59}
60
61
62bool
63UnixRequest::GetCurrentChunk(void*& data, size_t& size)
64{
65	while (fVecIndex < fVecCount
66			&& fVecOffset >= fVecs[fVecIndex].iov_len) {
67		fVecIndex++;
68		fVecOffset = 0;
69	}
70	if (fVecIndex >= fVecCount)
71		return false;
72
73	data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
74	size = fVecs[fVecIndex].iov_len - fVecOffset;
75	return true;
76}
77
78
79void
80UnixRequest::SetAncillaryData(ancillary_data_container* data)
81{
82	fAncillaryData = data;
83}
84
85
86void
87UnixRequest::AddAncillaryData(ancillary_data_container* data)
88{
89	if (fAncillaryData != NULL) {
90		gStackModule->move_ancillary_data(data, fAncillaryData);
91		gStackModule->delete_ancillary_data_container(data);
92	} else
93		fAncillaryData = data;
94}
95
96
97// #pragma mark - UnixBufferQueue
98
99
100UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type)
101	:
102	fBuffer(NULL),
103	fCapacity(capacity),
104	fType(type)
105{
106}
107
108
109UnixBufferQueue::~UnixBufferQueue()
110{
111	while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
112		gStackModule->delete_ancillary_data_container(entry->data);
113		delete entry;
114	}
115
116	delete_ring_buffer(fBuffer);
117}
118
119
120status_t
121UnixBufferQueue::Init()
122{
123	fBuffer = create_ring_buffer(fCapacity);
124	if (fBuffer == NULL)
125		return B_NO_MEMORY;
126	return B_OK;
127}
128
129
130size_t
131UnixBufferQueue::Readable() const
132{
133	return ring_buffer_readable(fBuffer);
134}
135
136
137size_t
138UnixBufferQueue::Writable() const
139{
140	return ring_buffer_writable(fBuffer);
141}
142
143
144status_t
145UnixBufferQueue::Read(UnixRequest& request)
146{
147	bool user = gStackModule->is_syscall();
148
149	size_t readable = Readable();
150	void* data;
151	size_t size;
152
153	DatagramEntry* datagramEntry = NULL;
154	if (fType == UnixFifoType::Datagram) {
155		datagramEntry = fDatagrams.Head();
156		if (datagramEntry == NULL)
157			return B_ERROR;
158
159		if (datagramEntry->size > readable)
160			TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, "
161				"but only %lu bytes are readable\n", datagramEntry->size, readable);
162		else
163			readable = datagramEntry->size;
164	}
165
166	while (readable > 0 && request.GetCurrentChunk(data, size)) {
167		if (size > readable)
168			size = readable;
169
170		ssize_t bytesRead;
171		if (user)
172			bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
173		else
174			bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
175
176		if (bytesRead < 0)
177			return bytesRead;
178		if (bytesRead == 0)
179			return B_ERROR;
180
181		// Adjust ancillary data entry offsets, respectively attach the ones
182		// that belong to the read data to the request.
183		if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
184			size_t offsetDelta = bytesRead;
185			while (entry != NULL && offsetDelta > entry->offset) {
186				// entry data have been read -- add ancillary data to request
187				fAncillaryData.RemoveHead();
188				offsetDelta -= entry->offset;
189				request.AddAncillaryData(entry->data);
190				delete entry;
191
192				entry = fAncillaryData.Head();
193			}
194
195			if (entry != NULL)
196				entry->offset -= offsetDelta;
197		}
198
199		request.AddBytesTransferred(bytesRead);
200		readable -= bytesRead;
201	}
202
203	if (fType == UnixFifoType::Datagram) {
204		fDatagrams.RemoveHead();
205
206		memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
207		delete datagramEntry;
208
209		if (readable > 0) {
210			ring_buffer_flush(fBuffer, readable);
211			if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
212				size_t offsetDelta = readable;
213				while (entry != NULL && offsetDelta > entry->offset) {
214					fAncillaryData.RemoveHead();
215					offsetDelta -= entry->offset;
216					delete entry;
217
218					entry = fAncillaryData.Head();
219				}
220
221				if (entry != NULL)
222					entry->offset -= offsetDelta;
223			}
224		}
225	}
226
227	return B_OK;
228}
229
230
231status_t
232UnixBufferQueue::Write(UnixRequest& request)
233{
234	bool user = gStackModule->is_syscall();
235
236	size_t writable = Writable();
237	void* data;
238	size_t size;
239
240	DatagramEntry* datagramEntry = NULL;
241	ObjectDeleter<DatagramEntry> datagramEntryDeleter;
242	if (fType == UnixFifoType::Datagram) {
243		datagramEntry = new(std::nothrow) DatagramEntry;
244		if (datagramEntry == NULL)
245			return B_NO_MEMORY;
246
247		datagramEntryDeleter.SetTo(datagramEntry);
248		memcpy(&datagramEntry->address, request.Address(),
249			sizeof(datagramEntry->address));
250		datagramEntry->size = request.TotalSize();
251
252		// This should have been handled in UnixFifo
253		if (writable < datagramEntry->size) {
254			TRACE("UnixBufferQueue::Write(): not enough space for"
255				"datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable);
256			return B_ERROR;
257		}
258	}
259
260	// If the request has ancillary data create an entry first.
261	AncillaryDataEntry* ancillaryEntry = NULL;
262	ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
263	if (writable > 0 && request.AncillaryData() != NULL) {
264		ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
265		if (ancillaryEntry == NULL)
266			return B_NO_MEMORY;
267
268		ancillaryEntryDeleter.SetTo(ancillaryEntry);
269		ancillaryEntry->data = request.AncillaryData();
270		ancillaryEntry->offset = Readable();
271
272		// The offsets are relative to the previous entry.
273		AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
274		while (AncillaryDataEntry* entry = it.Next())
275			ancillaryEntry->offset -= entry->offset;
276			// TODO: This is inefficient when the list is long. Rather also
277			// store and maintain the absolute offset of the last queued entry.
278	}
279
280	// write as much as we can
281	while (writable > 0 && request.GetCurrentChunk(data, size)) {
282		if (size > writable)
283			size = writable;
284
285		ssize_t bytesWritten;
286		if (user)
287			bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
288		else
289			bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
290
291		if (bytesWritten < 0)
292			return bytesWritten;
293		if (bytesWritten == 0)
294			return B_ERROR;
295
296		if (ancillaryEntry != NULL) {
297			fAncillaryData.Add(ancillaryEntry);
298			ancillaryEntryDeleter.Detach();
299			request.SetAncillaryData(NULL);
300			ancillaryEntry = NULL;
301		}
302
303		request.AddBytesTransferred(bytesWritten);
304		writable -= bytesWritten;
305	}
306
307	if (fType == UnixFifoType::Datagram) {
308		fDatagrams.Add(datagramEntry);
309		datagramEntryDeleter.Detach();
310	}
311
312	return B_OK;
313}
314
315
316status_t
317UnixBufferQueue::SetCapacity(size_t capacity)
318{
319	if (capacity <= fCapacity)
320		return B_OK;
321
322	ring_buffer* newBuffer = create_ring_buffer(capacity);
323	if (newBuffer == NULL)
324		return B_NO_MEMORY;
325
326	ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer);
327	delete_ring_buffer(fBuffer);
328
329	fBuffer = newBuffer;
330	fCapacity = capacity;
331
332	return B_OK;
333}
334
335
336// #pragma mark -
337
338
339UnixFifo::UnixFifo(size_t capacity, UnixFifoType type)
340	:
341	fBuffer(capacity, type),
342	fReaders(),
343	fWriters(),
344	fReadRequested(0),
345	fWriteRequested(0),
346	fShutdown(0)
347
348{
349	fReadCondition.Init(this, "unix fifo read");
350	fWriteCondition.Init(this, "unix fifo write");
351	mutex_init(&fLock, "unix fifo");
352}
353
354
355UnixFifo::~UnixFifo()
356{
357	mutex_destroy(&fLock);
358}
359
360
361status_t
362UnixFifo::Init()
363{
364	return fBuffer.Init();
365}
366
367
368void
369UnixFifo::Shutdown(uint32 shutdown)
370{
371	TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
372		find_thread(NULL), this, shutdown);
373
374	fShutdown |= shutdown;
375
376	if (shutdown != 0) {
377		// Shutting down either end also effects the other, so notify both.
378		fReadCondition.NotifyAll();
379		fWriteCondition.NotifyAll();
380	}
381}
382
383
384ssize_t
385UnixFifo::Read(const iovec* vecs, size_t vecCount,
386	ancillary_data_container** _ancillaryData,
387	struct sockaddr_storage* address, bigtime_t timeout)
388{
389	TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n",
390		find_thread(NULL), this, vecs, vecCount, timeout);
391
392	if (IsReadShutdown() && fBuffer.Readable() == 0)
393		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
394
395	UnixRequest request(vecs, vecCount, NULL, address);
396	fReaders.Add(&request);
397	fReadRequested += request.TotalSize();
398
399	status_t error = _Read(request, timeout);
400
401	bool firstInQueue = fReaders.Head() == &request;
402	fReaders.Remove(&request);
403	fReadRequested -= request.TotalSize();
404
405	if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
406			&& !IsReadShutdown()) {
407		// There's more to read, other readers, and we were first in the queue.
408		// So we need to notify the others.
409		fReadCondition.NotifyAll();
410	}
411
412	if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
413			&& !IsWriteShutdown()) {
414		// We read something and there are writers. Notify them
415		fWriteCondition.NotifyAll();
416	}
417
418	*_ancillaryData = request.AncillaryData();
419
420	if (request.BytesTransferred() > 0) {
421		if (request.BytesTransferred() > SSIZE_MAX)
422			RETURN_ERROR(SSIZE_MAX);
423		RETURN_ERROR((ssize_t)request.BytesTransferred());
424	}
425
426	RETURN_ERROR(error);
427}
428
429
430ssize_t
431UnixFifo::Write(const iovec* vecs, size_t vecCount,
432	ancillary_data_container* ancillaryData,
433	const struct sockaddr_storage* address, bigtime_t timeout)
434{
435	TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
436		")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
437		timeout);
438
439	if (IsWriteShutdown())
440		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
441
442	if (IsReadShutdown())
443		RETURN_ERROR(EPIPE);
444
445	UnixRequest request(vecs, vecCount, ancillaryData,
446		(struct sockaddr_storage*)address);
447	fWriters.Add(&request);
448	fWriteRequested += request.TotalSize();
449
450	status_t error = _Write(request, timeout);
451
452	bool firstInQueue = fWriters.Head() == &request;
453	fWriters.Remove(&request);
454	fWriteRequested -= request.TotalSize();
455
456	if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
457			&& !IsWriteShutdown()) {
458		// There's more space for writing, other writers, and we were first in
459		// the queue. So we need to notify the others.
460		fWriteCondition.NotifyAll();
461	}
462
463	if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
464			&& !IsReadShutdown()) {
465		// We've written something and there are readers. Notify them.
466		fReadCondition.NotifyAll();
467	}
468
469	if (request.BytesTransferred() > 0) {
470		if (request.BytesTransferred() > SSIZE_MAX)
471			RETURN_ERROR(SSIZE_MAX);
472		RETURN_ERROR((ssize_t)request.BytesTransferred());
473	}
474
475	RETURN_ERROR(error);
476}
477
478
479size_t
480UnixFifo::Readable() const
481{
482	size_t readable = fBuffer.Readable();
483	return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
484}
485
486
487size_t
488UnixFifo::Writable() const
489{
490	size_t writable = fBuffer.Writable();
491	return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
492}
493
494
495status_t
496UnixFifo::SetBufferCapacity(size_t capacity)
497{
498	// check against allowed minimal/maximal value
499	if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
500		capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
501	else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
502		capacity = UNIX_FIFO_MINIMAL_CAPACITY;
503
504	size_t oldCapacity = fBuffer.Capacity();
505	if (capacity == oldCapacity)
506		return B_OK;
507
508	// set capacity
509	status_t error = fBuffer.SetCapacity(capacity);
510	if (error != B_OK)
511		return error;
512
513	// wake up waiting writers, if the capacity increased
514	if (!fWriters.IsEmpty() && !IsWriteShutdown())
515		fWriteCondition.NotifyAll();
516
517	return B_OK;
518}
519
520
521status_t
522UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
523{
524	// wait for the request to reach the front of the queue
525	if (fReaders.Head() != &request && timeout == 0)
526		RETURN_ERROR(B_WOULD_BLOCK);
527
528	while (fReaders.Head() != &request
529		&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
530		ConditionVariableEntry entry;
531		fReadCondition.Add(&entry);
532
533		mutex_unlock(&fLock);
534		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
535			timeout);
536		mutex_lock(&fLock);
537
538		if (error != B_OK)
539			RETURN_ERROR(error);
540	}
541
542	if (fBuffer.Readable() == 0) {
543		if (IsReadShutdown())
544			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
545
546		if (IsWriteShutdown())
547			RETURN_ERROR(0);
548
549		if (timeout == 0)
550			RETURN_ERROR(B_WOULD_BLOCK);
551	}
552
553	// wait for any data to become available
554// TODO: Support low water marks!
555	while (fBuffer.Readable() == 0
556			&& !IsReadShutdown() && !IsWriteShutdown()) {
557		ConditionVariableEntry entry;
558		fReadCondition.Add(&entry);
559
560		mutex_unlock(&fLock);
561		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
562			timeout);
563		mutex_lock(&fLock);
564
565		if (error != B_OK)
566			RETURN_ERROR(error);
567	}
568
569	if (fBuffer.Readable() == 0) {
570		if (IsReadShutdown())
571			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
572		if (IsWriteShutdown())
573			RETURN_ERROR(0);
574	}
575
576	RETURN_ERROR(fBuffer.Read(request));
577}
578
579
580status_t
581UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
582{
583	if (timeout == 0)
584		RETURN_ERROR(_WriteNonBlocking(request));
585
586	// wait for the request to reach the front of the queue
587	while (fWriters.Head() != &request && !IsWriteShutdown()) {
588		ConditionVariableEntry entry;
589		fWriteCondition.Add(&entry);
590
591		mutex_unlock(&fLock);
592		status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
593			timeout);
594		mutex_lock(&fLock);
595
596		if (error != B_OK)
597			RETURN_ERROR(error);
598	}
599
600	if (IsWriteShutdown())
601		RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
602
603	if (IsReadShutdown())
604		RETURN_ERROR(EPIPE);
605
606	if (request.TotalSize() == 0)
607		return 0;
608
609	status_t error = B_OK;
610
611	while (error == B_OK && request.BytesRemaining() > 0) {
612		// wait for any space to become available
613		while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request)
614				&& !IsWriteShutdown() && !IsReadShutdown()) {
615			ConditionVariableEntry entry;
616			fWriteCondition.Add(&entry);
617
618			mutex_unlock(&fLock);
619			error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
620			mutex_lock(&fLock);
621
622			if (error != B_OK)
623				RETURN_ERROR(error);
624		}
625
626		if (IsWriteShutdown())
627			RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
628
629		if (IsReadShutdown())
630			RETURN_ERROR(EPIPE);
631
632		// write as much as we can
633		error = fBuffer.Write(request);
634
635		if (error == B_OK) {
636// TODO: Whenever we've successfully written a part, we should reset the
637// timeout!
638		}
639	}
640
641	RETURN_ERROR(error);
642}
643
644
645status_t
646UnixFifo::_WriteNonBlocking(UnixRequest& request)
647{
648	// We need to be first in queue and space should be available right now,
649	// otherwise we need to fail.
650	if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request))
651		RETURN_ERROR(B_WOULD_BLOCK);
652
653	if (request.TotalSize() == 0)
654		return 0;
655
656	// Write as much as we can.
657	RETURN_ERROR(fBuffer.Write(request));
658}
659
660
661size_t
662UnixFifo::_MinimumWritableSize(const UnixRequest& request) const
663{
664	switch (fType) {
665		case UnixFifoType::Datagram:
666			return request.TotalSize();
667		case UnixFifoType::Stream:
668		default:
669			return 1;
670	}
671}
672