1/*
2 * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7 */
8
9#include <posix/xsi_message_queue.h>
10
11#include <new>
12
13#include <sys/ipc.h>
14#include <sys/types.h>
15
16#include <OS.h>
17
18#include <kernel.h>
19#include <syscall_restart.h>
20
21#include <util/atomic.h>
22#include <util/AutoLock.h>
23#include <util/DoublyLinkedList.h>
24#include <util/OpenHashTable.h>
25
26
27#define TRACE_XSI_MSG_QUEUE
28#ifdef TRACE_XSI_MSG_QUEUE
29#	define TRACE(x)			dprintf x
30#	define TRACE_ERROR(x)	dprintf x
31#else
32#	define TRACE(x)			/* nothing */
33#	define TRACE_ERROR(x)	dprintf x
34#endif
35
36// Queue for holding blocked threads
37struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
38	queued_thread(Thread *_thread, int32 _message_length)
39		:
40		thread(_thread),
41		message_length(_message_length),
42		queued(false)
43	{
44	}
45
46	Thread	*thread;
47	int32	message_length;
48	bool	queued;
49};
50
51typedef DoublyLinkedList<queued_thread> ThreadQueue;
52
53
54struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
55	queued_message(const void *_message, ssize_t _length)
56		:
57		initOK(false),
58		length(_length)
59	{
60		message = (char *)malloc(sizeof(char) * _length);
61		if (message == NULL)
62			return;
63
64		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
65			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
66			_length) != B_OK) {
67			free(message);
68			return;
69		}
70		initOK = true;
71	}
72
73	~queued_message()
74	{
75		if (initOK)
76			free(message);
77	}
78
79	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
80	{
81		if (_length > length)
82			_length = length;
83
84		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
85			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
86			_length) != B_OK)
87			return B_ERROR;
88		return _length;
89	}
90
91	bool		initOK;
92	ssize_t		length;
93	char		*message;
94	long		type;
95};
96
97typedef DoublyLinkedList<queued_message> MessageQueue;
98
99// Arbitrary limit
100#define MAX_BYTES_PER_QUEUE		2048
101
102class XsiMessageQueue {
103public:
104	XsiMessageQueue(int flags)
105		:
106		fBytesInQueue(0),
107		fThreadsWaitingToReceive(0),
108		fThreadsWaitingToSend(0)
109	{
110		mutex_init(&fLock, "XsiMessageQueue private mutex");
111		SetIpcKey((key_t)-1);
112		SetPermissions(flags);
113		// Initialize all fields to zero
114		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
115		fMessageQueue.msg_ctime = (time_t)real_time_clock();
116		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
117	}
118
119	// Implemented after sXsiMessageCount is declared
120	~XsiMessageQueue();
121
122	status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
123	{
124		thread_prepare_to_block(thread, B_CAN_INTERRUPT,
125				THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
126		// Unlock the queue before blocking
127		queueLocker->Unlock();
128
129		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
130// TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
131// interruption, we will still be queued. A WakeUpThread() at this point will
132// call thread_unblock() and might thus screw with our trying to re-lock the
133// mutex.
134		return thread_block_locked(thread);
135	}
136
137	void DoIpcSet(struct msqid_ds *result)
138	{
139		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
140		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
141		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
142			| (result->msg_perm.mode & 0x01ff);
143		fMessageQueue.msg_qbytes = result->msg_qbytes;
144		fMessageQueue.msg_ctime = (time_t)real_time_clock();
145	}
146
147	void Deque(queued_thread *queueEntry, bool waitForMessage)
148	{
149		if (queueEntry->queued) {
150			if (waitForMessage) {
151				fWaitingToReceive.Remove(queueEntry);
152				fThreadsWaitingToReceive--;
153			} else {
154				fWaitingToSend.Remove(queueEntry);
155				fThreadsWaitingToSend--;
156			}
157		}
158	}
159
160	void Enqueue(queued_thread *queueEntry, bool waitForMessage)
161	{
162		if (waitForMessage) {
163			fWaitingToReceive.Add(queueEntry);
164			fThreadsWaitingToReceive++;
165		} else {
166			fWaitingToSend.Add(queueEntry);
167			fThreadsWaitingToSend++;
168		}
169		queueEntry->queued = true;
170	}
171
172	struct msqid_ds &GetMessageQueue()
173	{
174		return fMessageQueue;
175	}
176
177	bool HasPermission() const
178	{
179		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
180			return true;
181
182		uid_t uid = geteuid();
183		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
184			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
185			return true;
186
187		gid_t gid = getegid();
188		if (gid == fMessageQueue.msg_perm.gid
189			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
190			return true;
191
192		return false;
193	}
194
195	bool HasReadPermission() const
196	{
197		// TODO: fix this
198		return HasPermission();
199	}
200
201	int ID() const
202	{
203		return fID;
204	}
205
206	// Implemented after sXsiMessageCount is declared
207	bool Insert(queued_message *message);
208
209	key_t IpcKey() const
210	{
211		return fMessageQueue.msg_perm.key;
212	}
213
214	mutex &Lock()
215	{
216		return fLock;
217	}
218
219	msglen_t MaxBytes() const
220	{
221		return fMessageQueue.msg_qbytes;
222	}
223
224	// Implemented after sXsiMessageCount is declared
225	queued_message *Remove(long typeRequested);
226
227	uint32 SequenceNumber() const
228	{
229		return fSequenceNumber;
230	}
231
232	// Implemented after sMessageQueueHashTable is declared
233	void SetID();
234
235	void SetIpcKey(key_t key)
236	{
237		fMessageQueue.msg_perm.key = key;
238	}
239
240	void SetPermissions(int flags)
241	{
242		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
243		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
244		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
245	}
246
247	void WakeUpThread(bool waitForMessage)
248	{
249		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
250
251		if (waitForMessage) {
252			// Wake up all waiting thread for a message
253			// TODO: this can cause starvation for any
254			// very-unlucky-and-slow thread
255			while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
256				entry->queued = false;
257				fThreadsWaitingToReceive--;
258				thread_unblock_locked(entry->thread, 0);
259			}
260		} else {
261			// Wake up only one thread waiting to send
262			if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
263				entry->queued = false;
264				fThreadsWaitingToSend--;
265				thread_unblock_locked(entry->thread, 0);
266			}
267		}
268	}
269
270	XsiMessageQueue*& Link()
271	{
272		return fLink;
273	}
274
275private:
276	msglen_t			fBytesInQueue;
277	int					fID;
278	mutex				fLock;
279	MessageQueue		fMessage;
280	struct msqid_ds		fMessageQueue;
281	uint32				fSequenceNumber;
282	uint32				fThreadsWaitingToReceive;
283	uint32				fThreadsWaitingToSend;
284
285	ThreadQueue			fWaitingToReceive;
286	ThreadQueue			fWaitingToSend;
287
288	XsiMessageQueue*	fLink;
289};
290
291
292// Xsi message queue hash table
293struct MessageQueueHashTableDefinition {
294	typedef int					KeyType;
295	typedef XsiMessageQueue		ValueType;
296
297	size_t HashKey (const int key) const
298	{
299		return (size_t)key;
300	}
301
302	size_t Hash(XsiMessageQueue *variable) const
303	{
304		return (size_t)variable->ID();
305	}
306
307	bool Compare(const int key, XsiMessageQueue *variable) const
308	{
309		return (int)key == (int)variable->ID();
310	}
311
312	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
313	{
314		return variable->Link();
315	}
316};
317
318
319// IPC class
320class Ipc {
321public:
322	Ipc(key_t key)
323		: fKey(key),
324		fMessageQueueId(-1)
325	{
326	}
327
328	key_t Key() const
329	{
330		return fKey;
331	}
332
333	int MessageQueueID() const
334	{
335		return fMessageQueueId;
336	}
337
338	void SetMessageQueueID(XsiMessageQueue *messageQueue)
339	{
340		fMessageQueueId = messageQueue->ID();
341	}
342
343	Ipc*& Link()
344	{
345		return fLink;
346	}
347
348private:
349	key_t				fKey;
350	int					fMessageQueueId;
351	Ipc*				fLink;
352};
353
354
355struct IpcHashTableDefinition {
356	typedef key_t	KeyType;
357	typedef Ipc		ValueType;
358
359	size_t HashKey (const key_t key) const
360	{
361		return (size_t)(key);
362	}
363
364	size_t Hash(Ipc *variable) const
365	{
366		return (size_t)HashKey(variable->Key());
367	}
368
369	bool Compare(const key_t key, Ipc *variable) const
370	{
371		return (key_t)key == (key_t)variable->Key();
372	}
373
374	Ipc*& GetLink(Ipc *variable) const
375	{
376		return variable->Link();
377	}
378};
379
380// Arbitrary limits
381#define MAX_XSI_MESSAGE			4096
382#define MAX_XSI_MESSAGE_QUEUE	1024
383static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
384static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
385
386static mutex sIpcLock;
387static mutex sXsiMessageQueueLock;
388
389static uint32 sGlobalSequenceNumber = 1;
390static vint32 sXsiMessageCount = 0;
391static vint32 sXsiMessageQueueCount = 0;
392
393
394//	#pragma mark -
395
396
397XsiMessageQueue::~XsiMessageQueue()
398{
399	mutex_destroy(&fLock);
400
401	// Wake up any threads still waiting
402	if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
403		InterruptsSpinLocker schedulerLocker(gSchedulerLock);
404
405		while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
406			entry->queued = false;
407			thread_unblock_locked(entry->thread, EIDRM);
408		}
409		while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
410			entry->queued = false;
411			thread_unblock_locked(entry->thread, EIDRM);
412		}
413	}
414
415	// Free up any remaining messages
416	if (fMessageQueue.msg_qnum) {
417		while (queued_message *message = fMessage.RemoveHead()) {
418			atomic_add(&sXsiMessageCount, -1);
419			delete message;
420		}
421	}
422}
423
424
425bool
426XsiMessageQueue::Insert(queued_message *message)
427{
428	// The only situation that would make us (potentially) wait
429	// is that we exceed with bytes or with the total number of messages
430	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
431		return true;
432
433	while (true) {
434		int32 oldCount = atomic_get(&sXsiMessageCount);
435		if (oldCount >= MAX_XSI_MESSAGE)
436			return true;
437		// If another thread updates the counter we keep
438		// iterating
439		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
440			== oldCount)
441			break;
442	}
443
444	fMessage.Add(message);
445	fMessageQueue.msg_qnum++;
446	fMessageQueue.msg_lspid = getpid();
447	fMessageQueue.msg_stime = real_time_clock();
448	fBytesInQueue += message->length;
449	if (fThreadsWaitingToReceive)
450		WakeUpThread(true /* WaitForMessage */);
451	return false;
452}
453
454
455queued_message*
456XsiMessageQueue::Remove(long typeRequested)
457{
458	queued_message *message = NULL;
459	if (typeRequested < 0) {
460		// Return first message of the lowest type
461		// that is less than or equal to the absolute
462		// value of type requested.
463		MessageQueue::Iterator iterator = fMessage.GetIterator();
464		while (iterator.HasNext()) {
465			queued_message *current = iterator.Next();
466			if (current->type <= -typeRequested) {
467				message = iterator.Remove();
468				break;
469			}
470		}
471	} else if (typeRequested == 0) {
472		// Return the first message on the queue
473		message = fMessage.RemoveHead();
474	} else {
475		// Return the first message of type requested
476		MessageQueue::Iterator iterator = fMessage.GetIterator();
477		while (iterator.HasNext()) {
478			queued_message *current = iterator.Next();
479			if (current->type == typeRequested) {
480				message = iterator.Remove();
481				break;
482			}
483		}
484	}
485
486	if (message == NULL)
487		return NULL;
488
489	fMessageQueue.msg_qnum--;
490	fMessageQueue.msg_lrpid = getpid();
491	fMessageQueue.msg_rtime = real_time_clock();
492	fBytesInQueue -= message->length;
493	atomic_add(&sXsiMessageCount, -1);
494	if (fThreadsWaitingToSend)
495		WakeUpThread(false /* WaitForMessage */);
496	return message;
497}
498
499
500void
501XsiMessageQueue::SetID()
502{
503	fID = real_time_clock();
504	// The lock is held before calling us
505	while (true) {
506		if (sMessageQueueHashTable.Lookup(fID) == NULL)
507			break;
508		fID++;
509	}
510	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
511	fSequenceNumber = sGlobalSequenceNumber;
512}
513
514
515//	#pragma mark - Kernel exported API
516
517
518void
519xsi_msg_init()
520{
521	// Initialize hash tables
522	status_t status = sIpcHashTable.Init();
523	if (status != B_OK)
524		panic("xsi_msg_init() failed to initialize ipc hash table\n");
525	status =  sMessageQueueHashTable.Init();
526	if (status != B_OK)
527		panic("xsi_msg_init() failed to initialize message queue hash table\n");
528
529	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
530	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
531}
532
533
534//	#pragma mark - Syscalls
535
536
537int
538_user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
539{
540	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
541	MutexLocker ipcHashLocker(sIpcLock);
542	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
543	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
544	if (messageQueue == NULL) {
545		TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
546		return EINVAL;
547	}
548	if (!IS_USER_ADDRESS(buffer)) {
549		TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
550		return B_BAD_ADDRESS;
551	}
552
553	// Lock the message queue itself and release both the ipc hash table lock
554	// and the message queue hash table lock _only_ if the command it's not
555	// IPC_RMID, this prevents undesidered situation from happening while
556	// (hopefully) improving the concurrency.
557	MutexLocker messageQueueLocker;
558	if (command != IPC_RMID) {
559		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
560		messageQueueHashLocker.Unlock();
561		ipcHashLocker.Unlock();
562	} else
563		// Since we are going to delete the message queue object
564		// along with its mutex, we can't use a MutexLocker object,
565		// as the mutex itself won't exist on function exit
566		mutex_lock(&messageQueue->Lock());
567
568	switch (command) {
569		case IPC_STAT: {
570			if (!messageQueue->HasReadPermission()) {
571				TRACE_ERROR(("xsi_msgctl: calling process has not read "
572					"permission on message queue %d, key %d\n", messageQueueID,
573					(int)messageQueue->IpcKey()));
574				return EACCES;
575			}
576			struct msqid_ds msg = messageQueue->GetMessageQueue();
577			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
578				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
579				return B_BAD_ADDRESS;
580			}
581			break;
582		}
583
584		case IPC_SET: {
585			if (!messageQueue->HasPermission()) {
586				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
587					"on message queue %d, key %d\n", messageQueueID,
588					(int)messageQueue->IpcKey()));
589				return EPERM;
590			}
591			struct msqid_ds msg;
592			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
593				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
594				return B_BAD_ADDRESS;
595			}
596			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
597				TRACE_ERROR(("xsi_msgctl: user does not have permission to "
598					"increase the maximum number of bytes allowed on queue\n"));
599				return EPERM;
600			}
601			if (msg.msg_qbytes == 0) {
602				TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
603				return EINVAL;
604			}
605
606			messageQueue->DoIpcSet(&msg);
607			break;
608		}
609
610		case IPC_RMID: {
611			// If this was the command, we are still holding the message
612			// queue hash table lock along with the ipc one, but not the
613			// message queue lock itself. This prevents other process
614			// to try and acquire a destroyed mutex
615			if (!messageQueue->HasPermission()) {
616				TRACE_ERROR(("xsi_msgctl: calling process has not permission "
617					"on message queue %d, key %d\n", messageQueueID,
618					(int)messageQueue->IpcKey()));
619				return EPERM;
620			}
621			key_t key = messageQueue->IpcKey();
622			Ipc *ipcKey = NULL;
623			if (key != -1) {
624				ipcKey = sIpcHashTable.Lookup(key);
625				sIpcHashTable.Remove(ipcKey);
626			}
627			sMessageQueueHashTable.Remove(messageQueue);
628			// Wake up of any threads waiting on this
629			// queue happens in destructor
630			if (key != -1)
631				delete ipcKey;
632			atomic_add(&sXsiMessageQueueCount, -1);
633
634			delete messageQueue;
635			break;
636		}
637
638		default:
639			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
640			return EINVAL;
641	}
642
643	return B_OK;
644}
645
646
647int
648_user_xsi_msgget(key_t key, int flags)
649{
650	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
651	XsiMessageQueue *messageQueue = NULL;
652	Ipc *ipcKey = NULL;
653	// Default assumptions
654	bool isPrivate = true;
655	bool create = true;
656
657	if (key != IPC_PRIVATE) {
658		isPrivate = false;
659		// Check if key already exist, if it does it already has a message
660		// queue associated with it
661		ipcKey = sIpcHashTable.Lookup(key);
662		if (ipcKey == NULL) {
663			if (!(flags & IPC_CREAT)) {
664				TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
665					"caller did not ask for creation\n", (int)key));
666				return ENOENT;
667			}
668			ipcKey = new(std::nothrow) Ipc(key);
669			if (ipcKey == NULL) {
670				TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
671					"for key %d\n", (int)key));
672				return ENOMEM;
673			}
674			sIpcHashTable.Insert(ipcKey);
675		} else {
676			// The IPC key exist and it already has a message queue
677			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
678				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
679				return EEXIST;
680			}
681			int messageQueueID = ipcKey->MessageQueueID();
682
683			MutexLocker _(sXsiMessageQueueLock);
684			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
685			if (!messageQueue->HasPermission()) {
686				TRACE_ERROR(("xsi_msgget: calling process has not permission "
687					"on message queue %d, key %d\n", messageQueue->ID(),
688					(int)key));
689				return EACCES;
690			}
691			create = false;
692		}
693	}
694
695	if (create) {
696		// Create a new message queue for this key
697		if (sXsiMessageQueueCount >= MAX_XSI_MESSAGE_QUEUE) {
698			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
699				"message queues\n"));
700			return ENOSPC;
701		}
702
703		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
704		if (messageQueue == NULL) {
705			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
706				"message queue\n"));
707			return ENOMEM;
708		}
709		atomic_add(&sXsiMessageQueueCount, 1);
710
711		MutexLocker _(sXsiMessageQueueLock);
712		messageQueue->SetID();
713		if (isPrivate)
714			messageQueue->SetIpcKey((key_t)-1);
715		else {
716			messageQueue->SetIpcKey(key);
717			ipcKey->SetMessageQueueID(messageQueue);
718		}
719		sMessageQueueHashTable.Insert(messageQueue);
720	}
721
722	return messageQueue->ID();
723}
724
725
726ssize_t
727_user_xsi_msgrcv(int messageQueueID, void *messagePointer,
728	size_t messageSize, long messageType, int messageFlags)
729{
730	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
731		messageQueueID, messageSize));
732	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
733	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
734	if (messageQueue == NULL) {
735		TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
736			messageQueueID));
737		return EINVAL;
738	}
739	MutexLocker messageQueueLocker(messageQueue->Lock());
740	messageQueueHashLocker.Unlock();
741
742	if (messageSize > MAX_BYTES_PER_QUEUE) {
743		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
744		return EINVAL;
745	}
746	if (!messageQueue->HasPermission()) {
747		TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
748			"on message queue id %d, key %d\n", messageQueueID,
749			(int)messageQueue->IpcKey()));
750		return EACCES;
751	}
752	if (!IS_USER_ADDRESS(messagePointer)) {
753		TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
754		return B_BAD_ADDRESS;
755	}
756
757	queued_message *message = NULL;
758	while (true) {
759		message = messageQueue->Remove(messageType);
760
761		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
762			// We are going to sleep
763			Thread *thread = thread_get_current_thread();
764			queued_thread queueEntry(thread, messageSize);
765			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
766
767			uint32 sequenceNumber = messageQueue->SequenceNumber();
768
769			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
770			status_t result
771				= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
772			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
773
774			messageQueueHashLocker.Lock();
775			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
776			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
777				&& sequenceNumber != messageQueue->SequenceNumber())) {
778				TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = "
779					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
780					sequenceNumber));
781				return EIDRM;
782			} else if (result == B_INTERRUPTED) {
783				TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
784					"waiting on message queue %d\n",(int)thread->id,
785					messageQueueID));
786				messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
787				return EINTR;
788			} else {
789				messageQueueLocker.Lock();
790				messageQueueHashLocker.Unlock();
791			}
792		} else if (message == NULL) {
793			// There is not message of type requested and
794			// we can't wait
795			return ENOMSG;
796		} else {
797			// Message received correctly (so far)
798			if ((ssize_t)messageSize < message->length
799				&& !(messageFlags & MSG_NOERROR)) {
800				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
801				// Put the message back inside. Since we hold the
802				// queue message lock, not one else could have filled
803				// up the queue meanwhile
804				messageQueue->Insert(message);
805				return E2BIG;
806			}
807
808			ssize_t result
809				= message->copy_to_user_buffer(messagePointer, messageSize);
810			if (result < 0) {
811				messageQueue->Insert(message);
812				return B_BAD_ADDRESS;
813			}
814
815			delete message;
816			TRACE(("xsi_msgrcv: message received correctly\n"));
817			return result;
818		}
819	}
820
821	return B_OK;
822}
823
824
825int
826_user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
827	size_t messageSize, int messageFlags)
828{
829	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
830		messageQueueID, messageSize));
831	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
832	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
833	if (messageQueue == NULL) {
834		TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
835			messageQueueID));
836		return EINVAL;
837	}
838	MutexLocker messageQueueLocker(messageQueue->Lock());
839	messageQueueHashLocker.Unlock();
840
841	if (messageSize > MAX_BYTES_PER_QUEUE) {
842		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
843		return EINVAL;
844	}
845	if (!messageQueue->HasPermission()) {
846		TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
847			"on message queue id %d, key %d\n", messageQueueID,
848			(int)messageQueue->IpcKey()));
849		return EACCES;
850	}
851	if (!IS_USER_ADDRESS(messagePointer)) {
852		TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
853		return B_BAD_ADDRESS;
854	}
855
856	queued_message *message
857		= new(std::nothrow) queued_message(messagePointer, messageSize);
858	if (message == NULL || message->initOK != true) {
859		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
860		delete message;
861		return ENOMEM;
862	}
863
864	bool notSent = true;
865	status_t result = B_OK;
866	while (notSent) {
867		bool goToSleep = messageQueue->Insert(message);
868
869		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
870			// We are going to sleep
871			Thread *thread = thread_get_current_thread();
872			queued_thread queueEntry(thread, messageSize);
873			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
874
875			uint32 sequenceNumber = messageQueue->SequenceNumber();
876
877			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
878			result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
879			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
880
881			messageQueueHashLocker.Lock();
882			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
883			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
884				&& sequenceNumber != messageQueue->SequenceNumber())) {
885				TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = "
886					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
887					sequenceNumber));
888				delete message;
889				notSent = false;
890				result = EIDRM;
891			} else if (result == B_INTERRUPTED) {
892				TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
893					"waiting on message queue %d\n",(int)thread->id,
894					messageQueueID));
895				messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
896				delete message;
897				notSent = false;
898				result = EINTR;
899			} else {
900				messageQueueLocker.Lock();
901				messageQueueHashLocker.Unlock();
902			}
903		} else if (goToSleep) {
904			// We did not send the message and we can't wait
905			delete message;
906			notSent = false;
907			result = EAGAIN;
908		} else {
909			// Message delivered correctly
910			TRACE(("xsi_msgsnd: message sent correctly\n"));
911			notSent = false;
912		}
913	}
914
915	return result;
916}
917