1/*
2 * Copyright 2008-2023, Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Salvatore Benedetto <salvatore.benedetto@gmail.com>
7 */
8
9#include <posix/xsi_message_queue.h>
10
11#include <new>
12
13#include <sys/ipc.h>
14#include <sys/types.h>
15
16#include <OS.h>
17
18#include <kernel.h>
19#include <syscall_restart.h>
20
21#include <util/atomic.h>
22#include <util/AutoLock.h>
23#include <util/DoublyLinkedList.h>
24#include <util/OpenHashTable.h>
25
26
27#define TRACE_XSI_MSG_QUEUE
28#ifdef TRACE_XSI_MSG_QUEUE
29#	define TRACE(x)			dprintf x
30#	define TRACE_ERROR(x)	dprintf x
31#else
32#	define TRACE(x)			/* nothing */
33#	define TRACE_ERROR(x)	dprintf x
34#endif
35
36
37namespace {
38
39
40struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
41	queued_message(const void *_message, ssize_t _length)
42		:
43		initOK(false),
44		length(_length)
45	{
46		message = (char *)malloc(sizeof(char) * _length);
47		if (message == NULL)
48			return;
49
50		if (user_memcpy(&type, _message, sizeof(long)) != B_OK
51			|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
52			_length) != B_OK) {
53			free(message);
54			return;
55		}
56		initOK = true;
57	}
58
59	~queued_message()
60	{
61		if (initOK)
62			free(message);
63	}
64
65	ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
66	{
67		if (_length > length)
68			_length = length;
69
70		if (user_memcpy(_message, &type, sizeof(long)) != B_OK
71			|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
72			_length) != B_OK)
73			return B_ERROR;
74		return _length;
75	}
76
77	bool		initOK;
78	ssize_t		length;
79	char		*message;
80	long		type;
81};
82
83typedef DoublyLinkedList<queued_message> MessageQueue;
84
85// Arbitrary limit
86#define MAX_BYTES_PER_QUEUE		2048
87
88class XsiMessageQueue {
89public:
90	XsiMessageQueue(int flags)
91		:
92		fBytesInQueue(0)
93	{
94		mutex_init(&fLock, "XsiMessageQueue private mutex");
95		fWaitingToReceive.Init(this, "XsiMessageQueue");
96		fWaitingToSend.Init(this, "XsiMessageQueue");
97
98		SetIpcKey((key_t)-1);
99		SetPermissions(flags);
100		// Initialize all fields to zero
101		memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
102		fMessageQueue.msg_ctime = (time_t)real_time_clock();
103		fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
104	}
105
106	// Implemented after sXsiMessageCount is declared
107	~XsiMessageQueue();
108
109	status_t BlockAndUnlock(ConditionVariableEntry *queueEntry, MutexLocker *queueLocker)
110	{
111		// Unlock the queue before blocking
112		queueLocker->Unlock();
113		return queueEntry->Wait(B_CAN_INTERRUPT);
114	}
115
116	void DoIpcSet(struct msqid_ds *result)
117	{
118		fMessageQueue.msg_perm.uid = result->msg_perm.uid;
119		fMessageQueue.msg_perm.gid = result->msg_perm.gid;
120		fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
121			| (result->msg_perm.mode & 0x01ff);
122		fMessageQueue.msg_qbytes = result->msg_qbytes;
123		fMessageQueue.msg_ctime = (time_t)real_time_clock();
124	}
125
126	void Dequeue(ConditionVariableEntry *queueEntry)
127	{
128		queueEntry->Wait(B_RELATIVE_TIMEOUT, 0);
129	}
130
131	void Enqueue(ConditionVariableEntry *queueEntry, bool waitForMessage)
132	{
133		if (waitForMessage) {
134			fWaitingToReceive.Add(queueEntry);
135		} else {
136			fWaitingToSend.Add(queueEntry);
137		}
138	}
139
140	struct msqid_ds &GetMessageQueue()
141	{
142		return fMessageQueue;
143	}
144
145	bool HasPermission() const
146	{
147		if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
148			return true;
149
150		uid_t uid = geteuid();
151		if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
152			&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
153			return true;
154
155		gid_t gid = getegid();
156		if (gid == fMessageQueue.msg_perm.gid
157			&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
158			return true;
159
160		return false;
161	}
162
163	bool HasReadPermission() const
164	{
165		// TODO: fix this
166		return HasPermission();
167	}
168
169	int ID() const
170	{
171		return fID;
172	}
173
174	// Implemented after sXsiMessageCount is declared
175	bool Insert(queued_message *message);
176
177	key_t IpcKey() const
178	{
179		return fMessageQueue.msg_perm.key;
180	}
181
182	mutex &Lock()
183	{
184		return fLock;
185	}
186
187	msglen_t MaxBytes() const
188	{
189		return fMessageQueue.msg_qbytes;
190	}
191
192	// Implemented after sXsiMessageCount is declared
193	queued_message *Remove(long typeRequested);
194
195	uint32 SequenceNumber() const
196	{
197		return fSequenceNumber;
198	}
199
200	// Implemented after sMessageQueueHashTable is declared
201	void SetID();
202
203	void SetIpcKey(key_t key)
204	{
205		fMessageQueue.msg_perm.key = key;
206	}
207
208	void SetPermissions(int flags)
209	{
210		fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
211		fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
212		fMessageQueue.msg_perm.mode = (flags & 0x01ff);
213	}
214
215	void WakeUpThread(bool waitForMessage)
216	{
217		if (waitForMessage) {
218			// Wake up all waiting thread for a message
219			// TODO: this can cause starvation for any
220			// very-unlucky-and-slow thread
221			fWaitingToReceive.NotifyAll();
222		} else {
223			// Wake up only one thread waiting to send
224			fWaitingToSend.NotifyOne();
225		}
226	}
227
228	XsiMessageQueue*& Link()
229	{
230		return fLink;
231	}
232
233private:
234	msglen_t			fBytesInQueue;
235	int					fID;
236	mutex				fLock;
237	MessageQueue		fMessage;
238	struct msqid_ds		fMessageQueue;
239	uint32				fSequenceNumber;
240
241	ConditionVariable	fWaitingToReceive;
242	ConditionVariable	fWaitingToSend;
243
244	XsiMessageQueue*	fLink;
245};
246
247
248// Xsi message queue hash table
249struct MessageQueueHashTableDefinition {
250	typedef int					KeyType;
251	typedef XsiMessageQueue		ValueType;
252
253	size_t HashKey (const int key) const
254	{
255		return (size_t)key;
256	}
257
258	size_t Hash(XsiMessageQueue *variable) const
259	{
260		return (size_t)variable->ID();
261	}
262
263	bool Compare(const int key, XsiMessageQueue *variable) const
264	{
265		return (int)key == (int)variable->ID();
266	}
267
268	XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
269	{
270		return variable->Link();
271	}
272};
273
274
275// IPC class
276class Ipc {
277public:
278	Ipc(key_t key)
279		: fKey(key),
280		fMessageQueueId(-1)
281	{
282	}
283
284	key_t Key() const
285	{
286		return fKey;
287	}
288
289	int MessageQueueID() const
290	{
291		return fMessageQueueId;
292	}
293
294	void SetMessageQueueID(XsiMessageQueue *messageQueue)
295	{
296		fMessageQueueId = messageQueue->ID();
297	}
298
299	Ipc*& Link()
300	{
301		return fLink;
302	}
303
304private:
305	key_t				fKey;
306	int					fMessageQueueId;
307	Ipc*				fLink;
308};
309
310
311struct IpcHashTableDefinition {
312	typedef key_t	KeyType;
313	typedef Ipc		ValueType;
314
315	size_t HashKey (const key_t key) const
316	{
317		return (size_t)(key);
318	}
319
320	size_t Hash(Ipc *variable) const
321	{
322		return (size_t)HashKey(variable->Key());
323	}
324
325	bool Compare(const key_t key, Ipc *variable) const
326	{
327		return (key_t)key == (key_t)variable->Key();
328	}
329
330	Ipc*& GetLink(Ipc *variable) const
331	{
332		return variable->Link();
333	}
334};
335
336} // namespace
337
338
339// Arbitrary limits
340#define MAX_XSI_MESSAGE			4096
341#define MAX_XSI_MESSAGE_QUEUE	1024
342static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
343static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
344
345static mutex sIpcLock;
346static mutex sXsiMessageQueueLock;
347
348static uint32 sGlobalSequenceNumber = 1;
349static int32 sXsiMessageCount = 0;
350static int32 sXsiMessageQueueCount = 0;
351
352
353//	#pragma mark -
354
355
356XsiMessageQueue::~XsiMessageQueue()
357{
358	mutex_destroy(&fLock);
359
360	// Wake up any threads still waiting
361	fWaitingToReceive.NotifyAll(EIDRM);
362	fWaitingToSend.NotifyAll(EIDRM);
363
364	// Free up any remaining messages
365	if (fMessageQueue.msg_qnum) {
366		while (queued_message *message = fMessage.RemoveHead()) {
367			atomic_add(&sXsiMessageCount, -1);
368			delete message;
369		}
370	}
371}
372
373
374bool
375XsiMessageQueue::Insert(queued_message *message)
376{
377	// The only situation that would make us (potentially) wait
378	// is that we exceed with bytes or with the total number of messages
379	if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
380		return true;
381
382	while (true) {
383		int32 oldCount = atomic_get(&sXsiMessageCount);
384		if (oldCount >= MAX_XSI_MESSAGE)
385			return true;
386		// If another thread updates the counter we keep
387		// iterating
388		if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
389			== oldCount)
390			break;
391	}
392
393	fMessage.Add(message);
394	fMessageQueue.msg_qnum++;
395	fMessageQueue.msg_lspid = getpid();
396	fMessageQueue.msg_stime = real_time_clock();
397	fBytesInQueue += message->length;
398
399	WakeUpThread(true /* WaitForMessage */);
400	return false;
401}
402
403
404queued_message*
405XsiMessageQueue::Remove(long typeRequested)
406{
407	queued_message *message = NULL;
408	if (typeRequested < 0) {
409		// Return first message of the lowest type
410		// that is less than or equal to the absolute
411		// value of type requested.
412		MessageQueue::Iterator iterator = fMessage.GetIterator();
413		while (iterator.HasNext()) {
414			queued_message *current = iterator.Next();
415			if (current->type <= -typeRequested) {
416				message = iterator.Remove();
417				break;
418			}
419		}
420	} else if (typeRequested == 0) {
421		// Return the first message on the queue
422		message = fMessage.RemoveHead();
423	} else {
424		// Return the first message of type requested
425		MessageQueue::Iterator iterator = fMessage.GetIterator();
426		while (iterator.HasNext()) {
427			queued_message *current = iterator.Next();
428			if (current->type == typeRequested) {
429				message = iterator.Remove();
430				break;
431			}
432		}
433	}
434
435	if (message == NULL)
436		return NULL;
437
438	fMessageQueue.msg_qnum--;
439	fMessageQueue.msg_lrpid = getpid();
440	fMessageQueue.msg_rtime = real_time_clock();
441	fBytesInQueue -= message->length;
442	atomic_add(&sXsiMessageCount, -1);
443
444	WakeUpThread(false /* WaitForMessage */);
445	return message;
446}
447
448
449void
450XsiMessageQueue::SetID()
451{
452	fID = real_time_clock();
453	// The lock is held before calling us
454	while (true) {
455		if (sMessageQueueHashTable.Lookup(fID) == NULL)
456			break;
457		fID++;
458	}
459	sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
460	fSequenceNumber = sGlobalSequenceNumber;
461}
462
463
464//	#pragma mark - Kernel exported API
465
466
467void
468xsi_msg_init()
469{
470	// Initialize hash tables
471	status_t status = sIpcHashTable.Init();
472	if (status != B_OK)
473		panic("xsi_msg_init() failed to initialize ipc hash table\n");
474	status =  sMessageQueueHashTable.Init();
475	if (status != B_OK)
476		panic("xsi_msg_init() failed to initialize message queue hash table\n");
477
478	mutex_init(&sIpcLock, "global POSIX message queue IPC table");
479	mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
480}
481
482
483//	#pragma mark - Syscalls
484
485
486int
487_user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
488{
489	TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
490	MutexLocker ipcHashLocker(sIpcLock);
491	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
492	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
493	if (messageQueue == NULL) {
494		TRACE(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
495		return EINVAL;
496	}
497	if (buffer != NULL && !IS_USER_ADDRESS(buffer)) {
498		TRACE(("xsi_msgctl: buffer address is not valid\n"));
499		return B_BAD_ADDRESS;
500	}
501
502	// Lock the message queue itself and release both the ipc hash table lock
503	// and the message queue hash table lock _only_ if the command it's not
504	// IPC_RMID, this prevents undesidered situation from happening while
505	// (hopefully) improving the concurrency.
506	MutexLocker messageQueueLocker;
507	if (command != IPC_RMID) {
508		messageQueueLocker.SetTo(&messageQueue->Lock(), false);
509		messageQueueHashLocker.Unlock();
510		ipcHashLocker.Unlock();
511	} else
512		// Since we are going to delete the message queue object
513		// along with its mutex, we can't use a MutexLocker object,
514		// as the mutex itself won't exist on function exit
515		mutex_lock(&messageQueue->Lock());
516
517	switch (command) {
518		case IPC_STAT: {
519			if (!messageQueue->HasReadPermission()) {
520				TRACE(("xsi_msgctl: calling process has not read "
521					"permission on message queue %d, key %d\n", messageQueueID,
522					(int)messageQueue->IpcKey()));
523				return EACCES;
524			}
525			struct msqid_ds msg = messageQueue->GetMessageQueue();
526			if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
527				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
528				return B_BAD_ADDRESS;
529			}
530			break;
531		}
532
533		case IPC_SET: {
534			if (!messageQueue->HasPermission()) {
535				TRACE(("xsi_msgctl: calling process has not permission "
536					"on message queue %d, key %d\n", messageQueueID,
537					(int)messageQueue->IpcKey()));
538				return EPERM;
539			}
540			struct msqid_ds msg;
541			if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
542				TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
543				return B_BAD_ADDRESS;
544			}
545			if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
546				TRACE(("xsi_msgctl: user does not have permission to "
547					"increase the maximum number of bytes allowed on queue\n"));
548				return EPERM;
549			}
550			if (msg.msg_qbytes == 0) {
551				TRACE(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
552				return EINVAL;
553			}
554
555			messageQueue->DoIpcSet(&msg);
556			break;
557		}
558
559		case IPC_RMID: {
560			// If this was the command, we are still holding the message
561			// queue hash table lock along with the ipc one, but not the
562			// message queue lock itself. This prevents other process
563			// to try and acquire a destroyed mutex
564			if (!messageQueue->HasPermission()) {
565				TRACE(("xsi_msgctl: calling process has not permission "
566					"on message queue %d, key %d\n", messageQueueID,
567					(int)messageQueue->IpcKey()));
568				return EPERM;
569			}
570			key_t key = messageQueue->IpcKey();
571			Ipc *ipcKey = NULL;
572			if (key != -1) {
573				ipcKey = sIpcHashTable.Lookup(key);
574				sIpcHashTable.Remove(ipcKey);
575			}
576			sMessageQueueHashTable.Remove(messageQueue);
577			// Wake up of any threads waiting on this
578			// queue happens in destructor
579			if (key != -1)
580				delete ipcKey;
581			atomic_add(&sXsiMessageQueueCount, -1);
582
583			delete messageQueue;
584			break;
585		}
586
587		default:
588			TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
589			return EINVAL;
590	}
591
592	return B_OK;
593}
594
595
596int
597_user_xsi_msgget(key_t key, int flags)
598{
599	TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
600	XsiMessageQueue *messageQueue = NULL;
601	Ipc *ipcKey = NULL;
602	// Default assumptions
603	bool isPrivate = true;
604	bool create = true;
605
606	if (key != IPC_PRIVATE) {
607		isPrivate = false;
608		// Check if key already exist, if it does it already has a message
609		// queue associated with it
610		ipcKey = sIpcHashTable.Lookup(key);
611		if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) {
612			if (!(flags & IPC_CREAT)) {
613				TRACE(("xsi_msgget: key %d does not exist, but the "
614					"caller did not ask for creation\n", (int)key));
615				return ENOENT;
616			}
617			if (ipcKey == NULL) {
618				ipcKey = new(std::nothrow) Ipc(key);
619				if (ipcKey == NULL) {
620					TRACE(("xsi_msgget: failed to create new Ipc object "
621						"for key %d\n", (int)key));
622					return ENOMEM;
623				}
624				sIpcHashTable.Insert(ipcKey);
625			}
626		} else {
627			// The IPC key exist and it already has a message queue
628			if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
629				TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
630				return EEXIST;
631			}
632			int messageQueueID = ipcKey->MessageQueueID();
633
634			MutexLocker _(sXsiMessageQueueLock);
635			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
636			if (!messageQueue->HasPermission()) {
637				TRACE(("xsi_msgget: calling process has not permission "
638					"on message queue %d, key %d\n", messageQueue->ID(),
639					(int)key));
640				return EACCES;
641			}
642			create = false;
643		}
644	}
645
646	if (create) {
647		// Create a new message queue for this key
648		if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
649			TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
650				"message queues\n"));
651			return ENOSPC;
652		}
653
654		messageQueue = new(std::nothrow) XsiMessageQueue(flags);
655		if (messageQueue == NULL) {
656			TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
657				"message queue\n"));
658			return ENOMEM;
659		}
660		atomic_add(&sXsiMessageQueueCount, 1);
661
662		MutexLocker _(sXsiMessageQueueLock);
663		messageQueue->SetID();
664		if (isPrivate)
665			messageQueue->SetIpcKey((key_t)-1);
666		else {
667			messageQueue->SetIpcKey(key);
668			ipcKey->SetMessageQueueID(messageQueue);
669		}
670		sMessageQueueHashTable.Insert(messageQueue);
671	}
672
673	return messageQueue->ID();
674}
675
676
677ssize_t
678_user_xsi_msgrcv(int messageQueueID, void *messagePointer,
679	size_t messageSize, long messageType, int messageFlags)
680{
681	TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
682		messageQueueID, messageSize));
683	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
684	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
685	if (messageQueue == NULL) {
686		TRACE(("xsi_msgrcv: message queue id %d not valid\n",
687			messageQueueID));
688		return EINVAL;
689	}
690	MutexLocker messageQueueLocker(messageQueue->Lock());
691	messageQueueHashLocker.Unlock();
692
693	if (messageSize > MAX_BYTES_PER_QUEUE) {
694		TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
695		return EINVAL;
696	}
697	if (!messageQueue->HasPermission()) {
698		TRACE(("xsi_msgrcv: calling process has not permission "
699			"on message queue id %d, key %d\n", messageQueueID,
700			(int)messageQueue->IpcKey()));
701		return EACCES;
702	}
703	if (!IS_USER_ADDRESS(messagePointer)) {
704		TRACE(("xsi_msgrcv: message address is not valid\n"));
705		return B_BAD_ADDRESS;
706	}
707
708	queued_message *message = NULL;
709	while (true) {
710		message = messageQueue->Remove(messageType);
711
712		if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
713			// We are going to sleep
714			ConditionVariableEntry queueEntry;
715			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
716
717			uint32 sequenceNumber = messageQueue->SequenceNumber();
718
719			TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
720			status_t result
721				= messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
722			TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread_get_current_thread_id()));
723
724			messageQueueHashLocker.Lock();
725			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
726			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
727				&& sequenceNumber != messageQueue->SequenceNumber())) {
728				TRACE(("xsi_msgrcv: message queue id %d (sequence = "
729					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
730					sequenceNumber));
731				return EIDRM;
732			} else if (result == B_INTERRUPTED) {
733				TRACE(("xsi_msgrcv: thread %d got interrupted while "
734					"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
735					messageQueueID));
736				messageQueue->Dequeue(&queueEntry);
737				return EINTR;
738			} else {
739				messageQueueLocker.Lock();
740				messageQueueHashLocker.Unlock();
741			}
742		} else if (message == NULL) {
743			// There is not message of type requested and
744			// we can't wait
745			return ENOMSG;
746		} else {
747			// Message received correctly (so far)
748			if ((ssize_t)messageSize < message->length
749				&& !(messageFlags & MSG_NOERROR)) {
750				TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
751				// Put the message back inside. Since we hold the
752				// queue message lock, not one else could have filled
753				// up the queue meanwhile
754				messageQueue->Insert(message);
755				return E2BIG;
756			}
757
758			ssize_t result
759				= message->copy_to_user_buffer(messagePointer, messageSize);
760			if (result < 0) {
761				messageQueue->Insert(message);
762				return B_BAD_ADDRESS;
763			}
764
765			delete message;
766			TRACE(("xsi_msgrcv: message received correctly\n"));
767			return result;
768		}
769	}
770
771	return B_OK;
772}
773
774
775int
776_user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
777	size_t messageSize, int messageFlags)
778{
779	TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
780		messageQueueID, messageSize));
781	MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
782	XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
783	if (messageQueue == NULL) {
784		TRACE(("xsi_msgsnd: message queue id %d not valid\n",
785			messageQueueID));
786		return EINVAL;
787	}
788	MutexLocker messageQueueLocker(messageQueue->Lock());
789	messageQueueHashLocker.Unlock();
790
791	if (messageSize > MAX_BYTES_PER_QUEUE) {
792		TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
793		return EINVAL;
794	}
795	if (!messageQueue->HasPermission()) {
796		TRACE(("xsi_msgsnd: calling process has not permission "
797			"on message queue id %d, key %d\n", messageQueueID,
798			(int)messageQueue->IpcKey()));
799		return EACCES;
800	}
801	if (!IS_USER_ADDRESS(messagePointer)) {
802		TRACE(("xsi_msgsnd: message address is not valid\n"));
803		return B_BAD_ADDRESS;
804	}
805
806	queued_message *message
807		= new(std::nothrow) queued_message(messagePointer, messageSize);
808	if (message == NULL || message->initOK != true) {
809		TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
810		delete message;
811		return ENOMEM;
812	}
813
814	bool notSent = true;
815	status_t result = B_OK;
816	while (notSent) {
817		bool goToSleep = messageQueue->Insert(message);
818
819		if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
820			// We are going to sleep
821			ConditionVariableEntry queueEntry;
822			messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
823
824			uint32 sequenceNumber = messageQueue->SequenceNumber();
825
826			TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
827			result = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
828			TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread_get_current_thread_id()));
829
830			messageQueueHashLocker.Lock();
831			messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
832			if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
833				&& sequenceNumber != messageQueue->SequenceNumber())) {
834				TRACE(("xsi_msgsnd: message queue id %d (sequence = "
835					"%" B_PRIu32 ") got destroyed\n", messageQueueID,
836					sequenceNumber));
837				delete message;
838				notSent = false;
839				result = EIDRM;
840			} else if (result == B_INTERRUPTED) {
841				TRACE(("xsi_msgsnd: thread %d got interrupted while "
842					"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
843					messageQueueID));
844				messageQueue->Dequeue(&queueEntry);
845				delete message;
846				notSent = false;
847				result = EINTR;
848			} else {
849				messageQueueLocker.Lock();
850				messageQueueHashLocker.Unlock();
851			}
852		} else if (goToSleep) {
853			// We did not send the message and we can't wait
854			delete message;
855			notSent = false;
856			result = EAGAIN;
857		} else {
858			// Message delivered correctly
859			TRACE(("xsi_msgsnd: message sent correctly\n"));
860			notSent = false;
861		}
862	}
863
864	return result;
865}
866