1/*
2 * Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7#include "UnixStreamEndpoint.h"
8
9#include <stdio.h>
10#include <sys/stat.h>
11
12#include <AutoDeleter.h>
13
14#include <vfs.h>
15
16#include "UnixAddressManager.h"
17#include "UnixFifo.h"
18
19
20#define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL	0
21#define UNIX_DEBUG_LEVEL					UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
22#include "UnixDebug.h"
23
24
25// Note on locking order (outermost -> innermost):
26// UnixStreamEndpoint: connecting -> listening -> child
27// -> UnixFifo (never lock more than one at a time)
28// -> UnixAddressManager
29
30
31UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket)
32	:
33	UnixEndpoint(socket),
34	fPeerEndpoint(NULL),
35	fReceiveFifo(NULL),
36	fState(unix_stream_endpoint_state::Closed),
37	fAcceptSemaphore(-1),
38	fIsChild(false),
39	fWasConnected(false)
40{
41	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
42		find_thread(NULL), this);
43}
44
45
46UnixStreamEndpoint::~UnixStreamEndpoint()
47{
48	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
49		find_thread(NULL), this);
50}
51
52
53status_t
54UnixStreamEndpoint::Init()
55{
56	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
57		this);
58
59	RETURN_ERROR(B_OK);
60}
61
62
63void
64UnixStreamEndpoint::Uninit()
65{
66	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
67		this);
68
69	// check whether we're closed
70	UnixStreamEndpointLocker locker(this);
71	bool closed = (fState == unix_stream_endpoint_state::Closed);
72	locker.Unlock();
73
74	if (!closed) {
75		// That probably means, we're a child endpoint of a listener and
76		// have been fully connected, but not yet accepted. Our Close()
77		// hook isn't called in this case. Do it manually.
78		Close();
79	}
80
81	ReleaseReference();
82}
83
84
85status_t
86UnixStreamEndpoint::Open()
87{
88	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
89		this);
90
91	status_t error = ProtocolSocket::Open();
92	if (error != B_OK)
93		RETURN_ERROR(error);
94
95	fState = unix_stream_endpoint_state::NotConnected;
96
97	RETURN_ERROR(B_OK);
98}
99
100
101status_t
102UnixStreamEndpoint::Close()
103{
104	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
105		this);
106
107	UnixStreamEndpointLocker locker(this);
108
109	if (fState == unix_stream_endpoint_state::Connected) {
110		UnixStreamEndpointLocker peerLocker;
111		if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
112			// We're still connected. Disconnect both endpoints!
113			fPeerEndpoint->_Disconnect();
114			_Disconnect();
115		}
116	}
117
118	if (fState == unix_stream_endpoint_state::Listening)
119		_StopListening();
120
121	_Unbind();
122
123	fState = unix_stream_endpoint_state::Closed;
124	RETURN_ERROR(B_OK);
125}
126
127
128status_t
129UnixStreamEndpoint::Free()
130{
131	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
132		this);
133
134	UnixStreamEndpointLocker locker(this);
135
136	_UnsetReceiveFifo();
137
138	RETURN_ERROR(B_OK);
139}
140
141
142status_t
143UnixStreamEndpoint::Bind(const struct sockaddr* _address)
144{
145	if (_address->sa_family != AF_UNIX)
146		RETURN_ERROR(EAFNOSUPPORT);
147
148	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
149		find_thread(NULL), this,
150		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
151
152	const sockaddr_un* address = (const sockaddr_un*)_address;
153
154	UnixStreamEndpointLocker endpointLocker(this);
155
156	if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
157		RETURN_ERROR(B_BAD_VALUE);
158
159	RETURN_ERROR(_Bind(address));
160}
161
162
163status_t
164UnixStreamEndpoint::Unbind()
165{
166	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
167		this);
168
169	UnixStreamEndpointLocker endpointLocker(this);
170
171	RETURN_ERROR(_Unbind());
172}
173
174
175status_t
176UnixStreamEndpoint::Listen(int backlog)
177{
178	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
179		this, backlog);
180
181	UnixStreamEndpointLocker endpointLocker(this);
182
183	if (!IsBound())
184		RETURN_ERROR(EDESTADDRREQ);
185	if (fState != unix_stream_endpoint_state::NotConnected
186		&& fState != unix_stream_endpoint_state::Listening)
187		RETURN_ERROR(EINVAL);
188
189	gSocketModule->set_max_backlog(socket, backlog);
190
191	if (fState == unix_stream_endpoint_state::NotConnected) {
192		fAcceptSemaphore = create_sem(0, "unix accept");
193		if (fAcceptSemaphore < 0)
194			RETURN_ERROR(ENOBUFS);
195
196		_UnsetReceiveFifo();
197
198		fCredentials.pid = getpid();
199		fCredentials.uid = geteuid();
200		fCredentials.gid = getegid();
201
202		fState = unix_stream_endpoint_state::Listening;
203	}
204
205	RETURN_ERROR(B_OK);
206}
207
208
209status_t
210UnixStreamEndpoint::Connect(const struct sockaddr* _address)
211{
212	if (_address->sa_family != AF_UNIX)
213		RETURN_ERROR(EAFNOSUPPORT);
214
215	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
216		find_thread(NULL), this,
217		ConstSocketAddress(&gAddressModule, _address).AsString().Data());
218
219	const sockaddr_un* address = (const sockaddr_un*)_address;
220
221	UnixStreamEndpointLocker endpointLocker(this);
222
223	if (fState == unix_stream_endpoint_state::Connected)
224		RETURN_ERROR(EISCONN);
225
226	if (fState != unix_stream_endpoint_state::NotConnected)
227		RETURN_ERROR(B_BAD_VALUE);
228// TODO: If listening, we could set the backlog to 0 and connect.
229
230	// check the address first
231	UnixAddress unixAddress;
232
233	if (address->sun_path[0] == '\0') {
234		// internal address space (or empty address)
235		int32 internalID;
236		if (UnixAddress::IsEmptyAddress(*address))
237			RETURN_ERROR(B_BAD_VALUE);
238
239		internalID = UnixAddress::InternalID(*address);
240		if (internalID < 0)
241			RETURN_ERROR(internalID);
242
243		unixAddress.SetTo(internalID);
244	} else {
245		// FS address space
246		size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
247		if (pathLen == 0 || pathLen == sizeof(address->sun_path))
248			RETURN_ERROR(B_BAD_VALUE);
249
250		struct stat st;
251		status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
252			!gStackModule->is_syscall());
253		if (error != B_OK)
254			RETURN_ERROR(error);
255
256		if (!S_ISSOCK(st.st_mode))
257			RETURN_ERROR(B_BAD_VALUE);
258
259		unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
260	}
261
262	// get the peer endpoint
263	UnixAddressManagerLocker addressLocker(gAddressManager);
264	UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
265	if (listeningUnixEndpoint == NULL)
266		RETURN_ERROR(ECONNREFUSED);
267	UnixStreamEndpoint* listeningEndpoint
268		= dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
269	if (listeningEndpoint == NULL)
270		RETURN_ERROR(EPROTOTYPE);
271	BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
272	addressLocker.Unlock();
273
274	UnixStreamEndpointLocker peerLocker(listeningEndpoint);
275
276	if (!listeningEndpoint->IsBound()
277		|| listeningEndpoint->fState != unix_stream_endpoint_state::Listening
278		|| listeningEndpoint->fAddress != unixAddress) {
279		RETURN_ERROR(ECONNREFUSED);
280	}
281
282	// Allocate FIFOs for us and the socket we're going to spawn. We do that
283	// now, so that the mess we need to cleanup, if allocating them fails, is
284	// harmless.
285	UnixFifo* fifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
286	UnixFifo* peerFifo = new(nothrow) UnixFifo(UNIX_MAX_TRANSFER_UNIT, UnixFifoType::Stream);
287	ObjectDeleter<UnixFifo> fifoDeleter(fifo);
288	ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
289
290	status_t error;
291	if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
292		return error;
293
294	// spawn new endpoint for accept()
295	net_socket* newSocket;
296	error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
297		&newSocket);
298	if (error != B_OK)
299		RETURN_ERROR(error);
300
301	// init connected peer endpoint
302	UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
303
304	UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
305
306	connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
307
308	// update our attributes
309	_UnsetReceiveFifo();
310
311	fPeerEndpoint = connectedEndpoint;
312	PeerAddress().SetTo(&connectedEndpoint->socket->address);
313	fPeerEndpoint->AcquireReference();
314	fReceiveFifo = fifo;
315
316	fCredentials.pid = getpid();
317	fCredentials.uid = geteuid();
318	fCredentials.gid = getegid();
319
320	fifoDeleter.Detach();
321	peerFifoDeleter.Detach();
322
323	fState = unix_stream_endpoint_state::Connected;
324	fWasConnected = true;
325
326	gSocketModule->set_connected(Socket());
327
328	release_sem(listeningEndpoint->fAcceptSemaphore);
329
330	connectedLocker.Unlock();
331	peerLocker.Unlock();
332	endpointLocker.Unlock();
333
334	RETURN_ERROR(B_OK);
335}
336
337
338status_t
339UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
340{
341	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
342		this);
343
344	bigtime_t timeout = absolute_timeout(socket->receive.timeout);
345	if (gStackModule->is_restarted_syscall())
346		timeout = gStackModule->restore_syscall_restart_timeout();
347	else
348		gStackModule->store_syscall_restart_timeout(timeout);
349
350	UnixStreamEndpointLocker locker(this);
351
352	status_t error;
353	do {
354		locker.Unlock();
355
356		error = acquire_sem_etc(fAcceptSemaphore, 1,
357			B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
358		if (error < B_OK)
359			break;
360
361		locker.Lock();
362		error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
363	} while (error != B_OK);
364
365	if (error == B_TIMED_OUT && timeout == 0) {
366		// translate non-blocking timeouts to the correct error code
367		error = B_WOULD_BLOCK;
368	}
369
370	RETURN_ERROR(error);
371}
372
373
374ssize_t
375UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
376	ancillary_data_container* ancillaryData,
377	const struct sockaddr* address, socklen_t addressLength, int flags)
378{
379	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
380		find_thread(NULL), this, vecs, vecCount, ancillaryData);
381
382	if ((flags & ~(MSG_DONTWAIT)) != 0)
383		return EOPNOTSUPP;
384
385	bigtime_t timeout = 0;
386	if ((flags & MSG_DONTWAIT) == 0) {
387		timeout = absolute_timeout(socket->send.timeout);
388		if (gStackModule->is_restarted_syscall())
389			timeout = gStackModule->restore_syscall_restart_timeout();
390		else
391			gStackModule->store_syscall_restart_timeout(timeout);
392	}
393
394	UnixStreamEndpointLocker locker(this);
395
396	BReference<UnixStreamEndpoint> peerReference;
397	UnixStreamEndpointLocker peerLocker;
398
399	status_t error = _LockConnectedEndpoints(locker, peerLocker);
400	if (error != B_OK)
401		RETURN_ERROR(error);
402
403	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
404	peerReference.SetTo(peerEndpoint);
405
406	// lock the peer's FIFO
407	UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
408	BReference<UnixFifo> _(peerFifo);
409	UnixFifoLocker fifoLocker(peerFifo);
410
411	// unlock endpoints
412	locker.Unlock();
413	peerLocker.Unlock();
414
415	ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
416
417	// Notify select()ing readers, if we successfully wrote anything.
418	size_t readable = peerFifo->Readable();
419	bool notifyRead = (error == B_OK && readable > 0
420		&& !peerFifo->IsReadShutdown());
421
422	// Notify select()ing writers, if we failed to write anything and there's
423	// still room to write.
424	size_t writable = peerFifo->Writable();
425	bool notifyWrite = (error != B_OK && writable > 0
426		&& !peerFifo->IsWriteShutdown());
427
428	// re-lock our endpoint (unlock FIFO to respect locking order)
429	fifoLocker.Unlock();
430	locker.Lock();
431
432	bool peerLocked = (fPeerEndpoint == peerEndpoint
433		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
434
435	// send notifications
436	if (peerLocked && notifyRead)
437		gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
438	if (notifyWrite)
439		gSocketModule->notify(socket, B_SELECT_WRITE, writable);
440
441	switch (result) {
442		case UNIX_FIFO_SHUTDOWN:
443			if (fPeerEndpoint == peerEndpoint
444					&& fState == unix_stream_endpoint_state::Connected) {
445				// Orderly write shutdown on our side.
446				// Note: Linux and Solaris also send a SIGPIPE, but according
447				// the send() specification that shouldn't be done.
448				result = EPIPE;
449			} else {
450				// The FD has been closed.
451				result = EBADF;
452			}
453			break;
454		case EPIPE:
455			// The socket module will generate SIGPIPE for us, if necessary.
456			break;
457		case B_TIMED_OUT:
458			// Translate non-blocking timeouts to the correct error code.
459			if (timeout == 0)
460				result = B_WOULD_BLOCK;
461			break;
462	}
463
464	RETURN_ERROR(result);
465}
466
467
468ssize_t
469UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
470	ancillary_data_container** _ancillaryData, struct sockaddr* _address,
471	socklen_t* _addressLength, int flags)
472{
473	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
474		find_thread(NULL), this, vecs, vecCount);
475
476	if ((flags & ~(MSG_DONTWAIT)) != 0)
477		return EOPNOTSUPP;
478
479	bigtime_t timeout = 0;
480	if ((flags & MSG_DONTWAIT) == 0) {
481		timeout = absolute_timeout(socket->receive.timeout);
482		if (gStackModule->is_restarted_syscall())
483			timeout = gStackModule->restore_syscall_restart_timeout();
484		else
485			gStackModule->store_syscall_restart_timeout(timeout);
486	}
487
488	UnixStreamEndpointLocker locker(this);
489
490	// We can read as long as we have a FIFO. I.e. we are still connected, or
491	// disconnected and not yet reconnected/listening/closed.
492	if (fReceiveFifo == NULL)
493		RETURN_ERROR(ENOTCONN);
494
495	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
496	BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
497
498	// Copy the peer address upfront. This way, if we read something, we don't
499	// get into a potential race with Close().
500	if (_address != NULL) {
501		socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
502		memcpy(_address, &socket->peer, addrLen);
503		*_addressLength = addrLen;
504	}
505
506	// lock our FIFO
507	UnixFifo* fifo = fReceiveFifo;
508	BReference<UnixFifo> _(fifo);
509	UnixFifoLocker fifoLocker(fifo);
510
511	// unlock endpoint
512	locker.Unlock();
513
514	ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout);
515
516	// Notify select()ing writers, if we successfully read anything.
517	size_t writable = fifo->Writable();
518	bool notifyWrite = (result >= 0 && writable > 0
519		&& !fifo->IsWriteShutdown());
520
521	// Notify select()ing readers, if we failed to read anything and there's
522	// still something left to read.
523	size_t readable = fifo->Readable();
524	bool notifyRead = (result < 0 && readable > 0
525		&& !fifo->IsReadShutdown());
526
527	// re-lock our endpoint (unlock FIFO to respect locking order)
528	fifoLocker.Unlock();
529	locker.Lock();
530
531	UnixStreamEndpointLocker peerLocker;
532	bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
533		&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
534
535	// send notifications
536	if (notifyRead)
537		gSocketModule->notify(socket, B_SELECT_READ, readable);
538	if (peerLocked && notifyWrite)
539		gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
540
541	switch (result) {
542		case UNIX_FIFO_SHUTDOWN:
543			// Either our socket was closed or read shutdown.
544			if (fState == unix_stream_endpoint_state::Closed) {
545				// The FD has been closed.
546				result = EBADF;
547			} else {
548				// if (fReceiveFifo == fifo) {
549				// 		Orderly shutdown or the peer closed the connection.
550				// } else {
551				//		Weird case: Peer closed connection and we are already
552				// 		reconnected (or listening).
553				// }
554				result = 0;
555			}
556			break;
557		case B_TIMED_OUT:
558			// translate non-blocking timeouts to the correct error code
559			if (timeout == 0)
560				result = B_WOULD_BLOCK;
561			break;
562	}
563
564	RETURN_ERROR(result);
565}
566
567
568ssize_t
569UnixStreamEndpoint::Sendable()
570{
571	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
572		this);
573
574	UnixStreamEndpointLocker locker(this);
575	UnixStreamEndpointLocker peerLocker;
576
577	status_t error = _LockConnectedEndpoints(locker, peerLocker);
578	if (error != B_OK)
579		RETURN_ERROR(error);
580
581	// lock the peer's FIFO
582	UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
583	UnixFifoLocker fifoLocker(peerFifo);
584
585	RETURN_ERROR(peerFifo->Writable());
586}
587
588
589ssize_t
590UnixStreamEndpoint::Receivable()
591{
592	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
593		this);
594
595	UnixStreamEndpointLocker locker(this);
596
597	if (fState == unix_stream_endpoint_state::Listening)
598		return gSocketModule->count_connected(socket);
599
600	if (fState != unix_stream_endpoint_state::Connected)
601		RETURN_ERROR(ENOTCONN);
602
603	UnixFifoLocker fifoLocker(fReceiveFifo);
604	ssize_t readable = fReceiveFifo->Readable();
605	if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
606			|| fReceiveFifo->IsReadShutdown())) {
607		RETURN_ERROR(ENOTCONN);
608	}
609	RETURN_ERROR(readable);
610}
611
612
613status_t
614UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
615{
616	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
617		find_thread(NULL), this, size);
618
619	UnixStreamEndpointLocker locker(this);
620
621	if (fReceiveFifo == NULL)
622		return B_BAD_VALUE;
623
624	UnixFifoLocker fifoLocker(fReceiveFifo);
625	return fReceiveFifo->SetBufferCapacity(size);
626}
627
628
629status_t
630UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
631{
632	UnixStreamEndpointLocker locker(this);
633	UnixStreamEndpointLocker peerLocker;
634
635	status_t error = _LockConnectedEndpoints(locker, peerLocker);
636	if (error != B_OK)
637		RETURN_ERROR(error);
638
639	*credentials = fPeerEndpoint->fCredentials;
640
641	return B_OK;
642}
643
644
645status_t
646UnixStreamEndpoint::Shutdown(int direction)
647{
648	TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
649		find_thread(NULL), this, direction);
650
651	uint32 shutdown;
652	uint32 peerShutdown;
653
654	// translate the direction into shutdown flags for our and the peer fifo
655	switch (direction) {
656		case SHUT_RD:
657			shutdown = UNIX_FIFO_SHUTDOWN_READ;
658			peerShutdown = 0;
659			break;
660		case SHUT_WR:
661			shutdown = 0;
662			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
663			break;
664		case SHUT_RDWR:
665			shutdown = UNIX_FIFO_SHUTDOWN_READ;
666			peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
667			break;
668		default:
669			RETURN_ERROR(B_BAD_VALUE);
670	}
671
672	// lock endpoints
673	UnixStreamEndpointLocker locker(this);
674	UnixStreamEndpointLocker peerLocker;
675
676	status_t error = _LockConnectedEndpoints(locker, peerLocker);
677	if (error != B_OK)
678		RETURN_ERROR(error);
679
680	// shutdown our FIFO
681	fReceiveFifo->Lock();
682	fReceiveFifo->Shutdown(shutdown);
683	fReceiveFifo->Unlock();
684
685	// shutdown peer FIFO
686	fPeerEndpoint->fReceiveFifo->Lock();
687	fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
688	fPeerEndpoint->fReceiveFifo->Unlock();
689
690	// send select notifications
691	if (direction == SHUT_RD || direction == SHUT_RDWR) {
692		gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
693		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
694	}
695	if (direction == SHUT_WR || direction == SHUT_RDWR) {
696		gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
697		gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
698	}
699
700	RETURN_ERROR(B_OK);
701}
702
703
704void
705UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
706	UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
707{
708	ProtocolSocket::Open();
709
710	fIsChild = true;
711	fPeerEndpoint = connectingEndpoint;
712	fPeerEndpoint->AcquireReference();
713
714	fReceiveFifo = fifo;
715
716	PeerAddress().SetTo(&connectingEndpoint->socket->address);
717
718	fCredentials = listeningEndpoint->fCredentials;
719
720	fState = unix_stream_endpoint_state::Connected;
721
722	gSocketModule->set_connected(Socket());
723}
724
725
726void
727UnixStreamEndpoint::_Disconnect()
728{
729	// Both endpoints must be locked.
730
731	// Write shutdown the receive FIFO.
732	fReceiveFifo->Lock();
733	fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
734	fReceiveFifo->Unlock();
735
736	// select() notification.
737	gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
738	gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
739
740	// Unset the peer endpoint.
741	fPeerEndpoint->ReleaseReference();
742	fPeerEndpoint = NULL;
743
744	// We're officially disconnected.
745// TODO: Deal with non accept()ed connections correctly!
746	fIsChild = false;
747	fState = unix_stream_endpoint_state::NotConnected;
748}
749
750
751status_t
752UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
753	UnixStreamEndpointLocker& peerLocker)
754{
755	if (fState != unix_stream_endpoint_state::Connected)
756		RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
757
758	// We need to lock the peer, too. Get a reference -- we might need to
759	// unlock ourselves to get the locking order right.
760	BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
761	UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
762
763	if (fIsChild) {
764		// We're the child, but locking order is the other way around.
765		locker.Unlock();
766		peerLocker.SetTo(peerEndpoint, false);
767
768		locker.Lock();
769
770		// recheck our state, also whether the peer is still the same
771		if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint)
772			RETURN_ERROR(ENOTCONN);
773	} else
774		peerLocker.SetTo(peerEndpoint, false);
775
776	RETURN_ERROR(B_OK);
777}
778
779
780status_t
781UnixStreamEndpoint::_Unbind()
782{
783	if (fState == unix_stream_endpoint_state::Connected
784		|| fState == unix_stream_endpoint_state::Listening)
785		RETURN_ERROR(B_BAD_VALUE);
786
787	if (IsBound())
788		RETURN_ERROR(UnixEndpoint::_Unbind());
789
790	RETURN_ERROR(B_OK);
791}
792
793
794void
795UnixStreamEndpoint::_UnsetReceiveFifo()
796{
797	if (fReceiveFifo) {
798		fReceiveFifo->ReleaseReference();
799		fReceiveFifo = NULL;
800	}
801}
802
803
804void
805UnixStreamEndpoint::_StopListening()
806{
807	if (fState == unix_stream_endpoint_state::Listening) {
808		delete_sem(fAcceptSemaphore);
809		fAcceptSemaphore = -1;
810		fState = unix_stream_endpoint_state::NotConnected;
811	}
812}
813