1/*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
49 *  School of Computer Science
50 *  Carnegie Mellon University
51 *  Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 *	File:	ipc/ipc_mqueue.c
60 *	Author:	Rich Draves
61 *	Date:	1989
62 *
63 *	Functions to manipulate IPC message queues.
64 */
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections.  This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counters.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
81#include <kern/ipc_mig.h>	/* XXX - for mach_msg_receive_continue */
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
85#include <kern/wait_queue.h>
86
87#include <ipc/ipc_mqueue.h>
88#include <ipc/ipc_kmsg.h>
89#include <ipc/ipc_port.h>
90#include <ipc/ipc_pset.h>
91#include <ipc/ipc_space.h>
92
93#ifdef __LP64__
94#include <vm/vm_map.h>
95#endif
96
97#if CONFIG_MACF_MACH
98#include <security/mac_mach_internal.h>
99#endif
100
101int ipc_mqueue_full;		/* address is event for queue space */
102int ipc_mqueue_rcv;		/* address is event for message arrival */
103
104/* forward declarations */
105void ipc_mqueue_receive_results(wait_result_t result);
106
107/*
108 *	Routine:	ipc_mqueue_init
109 *	Purpose:
110 *		Initialize a newly-allocated message queue.
111 */
112void
113ipc_mqueue_init(
114	ipc_mqueue_t	mqueue,
115	boolean_t	is_set)
116{
117	if (is_set) {
118		wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
119	} else {
120		wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
121		ipc_kmsg_queue_init(&mqueue->imq_messages);
122		mqueue->imq_seqno = 0;
123		mqueue->imq_msgcount = 0;
124		mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
125		mqueue->imq_fullwaiters = FALSE;
126	}
127}
128
129/*
130 *	Routine:	ipc_mqueue_member
131 *	Purpose:
132 *		Indicate whether the (port) mqueue is a member of
133 *		this portset's mqueue.  We do this by checking
134 *		whether the portset mqueue's waitq is an member of
135 *		the port's mqueue waitq.
136 *	Conditions:
137 *		the portset's mqueue is not already a member
138 *		this may block while allocating linkage structures.
139 */
140
141boolean_t
142ipc_mqueue_member(
143	ipc_mqueue_t		port_mqueue,
144	ipc_mqueue_t		set_mqueue)
145{
146	wait_queue_t	port_waitq = &port_mqueue->imq_wait_queue;
147	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
148
149	return (wait_queue_member(port_waitq, set_waitq));
150
151}
152
153/*
154 *	Routine:	ipc_mqueue_remove
155 *	Purpose:
156 *		Remove the association between the queue and the specified
157 *		set message queue.
158 */
159
160kern_return_t
161ipc_mqueue_remove(
162	ipc_mqueue_t	  mqueue,
163	ipc_mqueue_t	  set_mqueue,
164	wait_queue_link_t *wqlp)
165{
166	wait_queue_t	 mq_waitq = &mqueue->imq_wait_queue;
167	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
168
169	return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
170}
171
172/*
173 *	Routine:	ipc_mqueue_remove_from_all
174 *	Purpose:
175 *		Remove the mqueue from all the sets it is a member of
176 *	Conditions:
177 *		Nothing locked.
178 */
179void
180ipc_mqueue_remove_from_all(
181	ipc_mqueue_t	mqueue,
182	queue_t 	links)
183{
184	wait_queue_t	mq_waitq = &mqueue->imq_wait_queue;
185
186	wait_queue_unlink_all_nofree(mq_waitq, links);
187	return;
188}
189
190/*
191 *	Routine:	ipc_mqueue_remove_all
192 *	Purpose:
193 *		Remove all the member queues from the specified set.
194 *	Conditions:
195 *		Nothing locked.
196 */
197void
198ipc_mqueue_remove_all(
199	ipc_mqueue_t	mqueue,
200	queue_t		links)
201{
202	wait_queue_set_t	mq_setq = &mqueue->imq_set_queue;
203
204	wait_queue_set_unlink_all_nofree(mq_setq, links);
205	return;
206}
207
208
209/*
210 *	Routine:	ipc_mqueue_add
211 *	Purpose:
212 *		Associate the portset's mqueue with the port's mqueue.
213 *		This has to be done so that posting the port will wakeup
214 *		a portset waiter.  If there are waiters on the portset
215 *		mqueue and messages on the port mqueue, try to match them
216 *		up now.
217 *	Conditions:
218 *		May block.
219 */
220kern_return_t
221ipc_mqueue_add(
222	ipc_mqueue_t	 port_mqueue,
223	ipc_mqueue_t	 set_mqueue,
224	wait_queue_link_t wql)
225{
226	wait_queue_t	 port_waitq = &port_mqueue->imq_wait_queue;
227	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
228	ipc_kmsg_queue_t kmsgq;
229	ipc_kmsg_t       kmsg, next;
230	kern_return_t	 kr;
231	spl_t		 s;
232
233	kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
234	if (kr != KERN_SUCCESS)
235		return kr;
236
237	/*
238	 * Now that the set has been added to the port, there may be
239	 * messages queued on the port and threads waiting on the set
240	 * waitq.  Lets get them together.
241	 */
242	s = splsched();
243	imq_lock(port_mqueue);
244	kmsgq = &port_mqueue->imq_messages;
245	for (kmsg = ipc_kmsg_queue_first(kmsgq);
246	     kmsg != IKM_NULL;
247	     kmsg = next) {
248		next = ipc_kmsg_queue_next(kmsgq, kmsg);
249
250		for (;;) {
251			thread_t th;
252			mach_msg_size_t msize;
253
254			th = wait_queue_wakeup64_identity_locked(
255						port_waitq,
256						IPC_MQUEUE_RECEIVE,
257						THREAD_AWAKENED,
258						FALSE);
259			/* waitq/mqueue still locked, thread locked */
260
261			if (th == THREAD_NULL)
262				goto leave;
263
264			/*
265			 * If the receiver waited with a facility not directly
266			 * related to Mach messaging, then it isn't prepared to get
267			 * handed the message directly.  Just set it running, and
268			 * go look for another thread that can.
269			 */
270			if (th->ith_state != MACH_RCV_IN_PROGRESS) {
271				  thread_unlock(th);
272				  continue;
273			}
274
275			/*
276			 * Found a receiver. see if they can handle the message
277			 * correctly (the message is not too large for them, or
278			 * they didn't care to be informed that the message was
279			 * too large).  If they can't handle it, take them off
280			 * the list and let them go back and figure it out and
281			 * just move onto the next.
282			 */
283			msize = ipc_kmsg_copyout_size(kmsg, th->map);
284			if (th->ith_msize <
285					(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
286				th->ith_state = MACH_RCV_TOO_LARGE;
287				th->ith_msize = msize;
288				if (th->ith_option & MACH_RCV_LARGE) {
289					/*
290					 * let him go without message
291					 */
292					th->ith_receiver_name = port_mqueue->imq_receiver_name;
293					th->ith_kmsg = IKM_NULL;
294					th->ith_seqno = 0;
295					thread_unlock(th);
296					continue; /* find another thread */
297				}
298			} else {
299				th->ith_state = MACH_MSG_SUCCESS;
300			}
301
302			/*
303			 * This thread is going to take this message,
304			 * so give it to him.
305			 */
306			ipc_kmsg_rmqueue(kmsgq, kmsg);
307			ipc_mqueue_release_msgcount(port_mqueue);
308
309			th->ith_kmsg = kmsg;
310			th->ith_seqno = port_mqueue->imq_seqno++;
311			thread_unlock(th);
312			break;  /* go to next message */
313		}
314
315	}
316 leave:
317	imq_unlock(port_mqueue);
318	splx(s);
319	return KERN_SUCCESS;
320}
321
322/*
323 *	Routine:	ipc_mqueue_changed
324 *	Purpose:
325 *		Wake up receivers waiting in a message queue.
326 *	Conditions:
327 *		The message queue is locked.
328 */
329
330void
331ipc_mqueue_changed(
332	ipc_mqueue_t		mqueue)
333{
334	wait_queue_wakeup64_all_locked(
335				&mqueue->imq_wait_queue,
336				IPC_MQUEUE_RECEIVE,
337				THREAD_RESTART,
338				FALSE);		/* unlock waitq? */
339}
340
341
342
343
344/*
345 *	Routine:	ipc_mqueue_send
346 *	Purpose:
347 *		Send a message to a message queue.  The message holds a reference
348 *		for the destination port for this message queue in the
349 *		msgh_remote_port field.
350 *
351 *		If unsuccessful, the caller still has possession of
352 *		the message and must do something with it.  If successful,
353 *		the message is queued, given to a receiver, or destroyed.
354 *	Conditions:
355 *		Nothing locked.
356 *	Returns:
357 *		MACH_MSG_SUCCESS	The message was accepted.
358 *		MACH_SEND_TIMED_OUT	Caller still has message.
359 *		MACH_SEND_INTERRUPTED	Caller still has message.
360 */
361mach_msg_return_t
362ipc_mqueue_send(
363	ipc_mqueue_t		mqueue,
364	ipc_kmsg_t		kmsg,
365	mach_msg_option_t	option,
366	mach_msg_timeout_t	send_timeout,
367	spl_t			s)
368{
369	int wresult;
370
371	/*
372	 *  Don't block if:
373	 *	1) We're under the queue limit.
374	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
375	 *	3) Message is sent to a send-once right.
376	 */
377	if (!imq_full(mqueue) ||
378	    (!imq_full_kernel(mqueue) &&
379	     ((option & MACH_SEND_ALWAYS) ||
380	      (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
381	       MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
382		mqueue->imq_msgcount++;
383		assert(mqueue->imq_msgcount > 0);
384		imq_unlock(mqueue);
385		splx(s);
386	} else {
387		thread_t cur_thread = current_thread();
388		uint64_t deadline;
389
390		/*
391		 * We have to wait for space to be granted to us.
392		 */
393		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
394			imq_unlock(mqueue);
395			splx(s);
396			return MACH_SEND_TIMED_OUT;
397		}
398		if (imq_full_kernel(mqueue)) {
399			imq_unlock(mqueue);
400			splx(s);
401			return MACH_SEND_NO_BUFFER;
402		}
403		mqueue->imq_fullwaiters = TRUE;
404		thread_lock(cur_thread);
405		if (option & MACH_SEND_TIMEOUT)
406			clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
407		else
408			deadline = 0;
409		wresult = wait_queue_assert_wait64_locked(
410						&mqueue->imq_wait_queue,
411						IPC_MQUEUE_FULL,
412						THREAD_ABORTSAFE, deadline,
413						cur_thread);
414		thread_unlock(cur_thread);
415		imq_unlock(mqueue);
416		splx(s);
417
418		if (wresult == THREAD_WAITING) {
419			wresult = thread_block(THREAD_CONTINUE_NULL);
420			counter(c_ipc_mqueue_send_block++);
421		}
422
423		switch (wresult) {
424		case THREAD_TIMED_OUT:
425			assert(option & MACH_SEND_TIMEOUT);
426			return MACH_SEND_TIMED_OUT;
427
428		case THREAD_AWAKENED:
429			/* we can proceed - inherited msgcount from waker */
430			assert(mqueue->imq_msgcount > 0);
431			break;
432
433		case THREAD_INTERRUPTED:
434			return MACH_SEND_INTERRUPTED;
435
436		case THREAD_RESTART:
437			/* mqueue is being destroyed */
438			return MACH_SEND_INVALID_DEST;
439		default:
440			panic("ipc_mqueue_send");
441		}
442	}
443
444	ipc_mqueue_post(mqueue, kmsg);
445	return MACH_MSG_SUCCESS;
446}
447
448/*
449 *	Routine:	ipc_mqueue_release_msgcount
450 *	Purpose:
451 *		Release a message queue reference in the case where we
452 *		found a waiter.
453 *
454 *	Conditions:
455 *		The message queue is locked.
456 *		The message corresponding to this reference is off the queue.
457 */
458void
459ipc_mqueue_release_msgcount(
460	ipc_mqueue_t mqueue)
461{
462	assert(imq_held(mqueue));
463	assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
464
465	mqueue->imq_msgcount--;
466
467	if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
468		if (wait_queue_wakeup64_one_locked(
469						&mqueue->imq_wait_queue,
470						IPC_MQUEUE_FULL,
471						THREAD_AWAKENED,
472						FALSE) != KERN_SUCCESS) {
473			mqueue->imq_fullwaiters = FALSE;
474		} else {
475			/* gave away our slot - add reference back */
476			mqueue->imq_msgcount++;
477		}
478	}
479}
480
481/*
482 *	Routine:	ipc_mqueue_post
483 *	Purpose:
484 *		Post a message to a waiting receiver or enqueue it.  If a
485 *		receiver is waiting, we can release our reserved space in
486 *		the message queue.
487 *
488 *	Conditions:
489 *		If we need to queue, our space in the message queue is reserved.
490 */
491void
492ipc_mqueue_post(
493	register ipc_mqueue_t 	mqueue,
494	register ipc_kmsg_t		kmsg)
495{
496	spl_t s;
497
498	/*
499	 *	While the msg queue	is locked, we have control of the
500	 *  kmsg, so the ref in	it for the port is still good.
501	 *
502	 *	Check for a receiver for the message.
503	 */
504	s = splsched();
505	imq_lock(mqueue);
506	for (;;) {
507		wait_queue_t waitq = &mqueue->imq_wait_queue;
508		thread_t receiver;
509		mach_msg_size_t msize;
510
511		receiver = wait_queue_wakeup64_identity_locked(
512							waitq,
513							IPC_MQUEUE_RECEIVE,
514							THREAD_AWAKENED,
515							FALSE);
516		/* waitq still locked, thread locked */
517
518		if (receiver == THREAD_NULL) {
519			/*
520			 * no receivers; queue kmsg
521			 */
522			assert(mqueue->imq_msgcount > 0);
523			ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
524			break;
525		}
526
527		/*
528		 * If the receiver waited with a facility not directly
529		 * related to Mach messaging, then it isn't prepared to get
530		 * handed the message directly.  Just set it running, and
531		 * go look for another thread that can.
532		 */
533		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
534				  thread_unlock(receiver);
535				  continue;
536		}
537
538
539		/*
540		 * We found a waiting thread.
541		 * If the message is too large or the scatter list is too small
542		 * the thread we wake up will get that as its status.
543		 */
544		msize =	ipc_kmsg_copyout_size(kmsg, receiver->map);
545		if (receiver->ith_msize <
546				(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
547			receiver->ith_msize = msize;
548			receiver->ith_state = MACH_RCV_TOO_LARGE;
549		} else {
550			receiver->ith_state = MACH_MSG_SUCCESS;
551		}
552
553		/*
554		 * If there is no problem with the upcoming receive, or the
555		 * receiver thread didn't specifically ask for special too
556		 * large error condition, go ahead and select it anyway.
557		 */
558		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
559		    !(receiver->ith_option & MACH_RCV_LARGE)) {
560
561			receiver->ith_kmsg = kmsg;
562			receiver->ith_seqno = mqueue->imq_seqno++;
563			thread_unlock(receiver);
564
565			/* we didn't need our reserved spot in the queue */
566			ipc_mqueue_release_msgcount(mqueue);
567			break;
568		}
569
570		/*
571		 * Otherwise, this thread needs to be released to run
572		 * and handle its error without getting the message.  We
573		 * need to go back and pick another one.
574		 */
575		receiver->ith_kmsg = IKM_NULL;
576		receiver->ith_seqno = 0;
577		thread_unlock(receiver);
578	}
579
580	imq_unlock(mqueue);
581	splx(s);
582
583	current_task()->messages_sent++;
584	return;
585}
586
587
588/* static */ void
589ipc_mqueue_receive_results(wait_result_t saved_wait_result)
590{
591	thread_t     		self = current_thread();
592	mach_msg_option_t	option = self->ith_option;
593
594	/*
595	 * why did we wake up?
596	 */
597	switch (saved_wait_result) {
598	case THREAD_TIMED_OUT:
599		self->ith_state = MACH_RCV_TIMED_OUT;
600		return;
601
602	case THREAD_INTERRUPTED:
603		self->ith_state = MACH_RCV_INTERRUPTED;
604		return;
605
606	case THREAD_RESTART:
607		/* something bad happened to the port/set */
608		self->ith_state = MACH_RCV_PORT_CHANGED;
609		return;
610
611	case THREAD_AWAKENED:
612		/*
613		 * We do not need to go select a message, somebody
614		 * handed us one (or a too-large indication).
615		 */
616		switch (self->ith_state) {
617		case MACH_RCV_SCATTER_SMALL:
618		case MACH_RCV_TOO_LARGE:
619			/*
620			 * Somebody tried to give us a too large
621			 * message. If we indicated that we cared,
622			 * then they only gave us the indication,
623			 * otherwise they gave us the indication
624			 * AND the message anyway.
625			 */
626			if (option & MACH_RCV_LARGE) {
627				return;
628			}
629
630		case MACH_MSG_SUCCESS:
631			return;
632
633		default:
634			panic("ipc_mqueue_receive_results: strange ith_state");
635		}
636
637	default:
638		panic("ipc_mqueue_receive_results: strange wait_result");
639	}
640}
641
642void
643ipc_mqueue_receive_continue(
644	__unused void *param,
645	wait_result_t wresult)
646{
647	ipc_mqueue_receive_results(wresult);
648	mach_msg_receive_continue();  /* hard-coded for now */
649}
650
651/*
652 *	Routine:	ipc_mqueue_receive
653 *	Purpose:
654 *		Receive a message from a message queue.
655 *
656 *		If continuation is non-zero, then we might discard
657 *		our kernel stack when we block.  We will continue
658 *		after unblocking by executing continuation.
659 *
660 *		If resume is true, then we are resuming a receive
661 *		operation after a blocked receive discarded our stack.
662 *	Conditions:
663 *		Our caller must hold a reference for the port or port set
664 *		to which this queue belongs, to keep the queue
665 *		from being deallocated.
666 *
667 *		The kmsg is returned with clean header fields
668 *		and with the circular bit turned off.
669 *	Returns:
670 *		MACH_MSG_SUCCESS	Message returned in kmsgp.
671 *		MACH_RCV_TOO_LARGE	Message size returned in kmsgp.
672 *		MACH_RCV_TIMED_OUT	No message obtained.
673 *		MACH_RCV_INTERRUPTED	No message obtained.
674 *		MACH_RCV_PORT_DIED	Port/set died; no message.
675 *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
676 *
677 */
678
679void
680ipc_mqueue_receive(
681	ipc_mqueue_t            mqueue,
682	mach_msg_option_t       option,
683	mach_msg_size_t         max_size,
684	mach_msg_timeout_t      rcv_timeout,
685	int                     interruptible)
686{
687	wait_result_t           wresult;
688        thread_t                self = current_thread();
689
690        wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
691                                               rcv_timeout, interruptible,
692                                               self);
693        if (wresult == THREAD_NOT_WAITING)
694                return;
695
696	if (wresult == THREAD_WAITING) {
697		counter((interruptible == THREAD_ABORTSAFE) ?
698			c_ipc_mqueue_receive_block_user++ :
699			c_ipc_mqueue_receive_block_kernel++);
700
701		if (self->ith_continuation)
702			thread_block(ipc_mqueue_receive_continue);
703			/* NOTREACHED */
704
705		wresult = thread_block(THREAD_CONTINUE_NULL);
706	}
707	ipc_mqueue_receive_results(wresult);
708}
709
710wait_result_t
711ipc_mqueue_receive_on_thread(
712        ipc_mqueue_t            mqueue,
713	mach_msg_option_t       option,
714	mach_msg_size_t         max_size,
715	mach_msg_timeout_t      rcv_timeout,
716	int                     interruptible,
717	thread_t                thread)
718{
719	ipc_kmsg_queue_t        kmsgs;
720	wait_result_t           wresult;
721	uint64_t		deadline;
722	spl_t                   s;
723#if CONFIG_MACF_MACH
724	ipc_labelh_t lh;
725	task_t task;
726	int rc;
727#endif
728
729	s = splsched();
730	imq_lock(mqueue);
731
732	if (imq_is_set(mqueue)) {
733		queue_t q;
734
735		q = &mqueue->imq_preposts;
736
737		/*
738		 * If we are waiting on a portset mqueue, we need to see if
739		 * any of the member ports have work for us.  Ports that
740		 * have (or recently had) messages will be linked in the
741		 * prepost queue for the portset. By holding the portset's
742		 * mqueue lock during the search, we tie up any attempts by
743		 * mqueue_deliver or portset membership changes that may
744		 * cross our path.
745		 */
746	search_set:
747		while(!queue_empty(q)) {
748			wait_queue_link_t wql;
749			ipc_mqueue_t port_mq;
750
751			queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
752			assert(!wql_is_preposted(wql));
753
754			/*
755			 * This is a lock order violation, so we have to do it
756			 * "softly," putting the link back on the prepost list
757			 * if it fails (at the tail is fine since the order of
758			 * handling messages from different sources in a set is
759			 * not guaranteed and we'd like to skip to the next source
760			 * if one is available).
761			 */
762			port_mq = (ipc_mqueue_t)wql->wql_queue;
763			if (!imq_lock_try(port_mq)) {
764				queue_enter(q, wql, wait_queue_link_t, wql_preposts);
765				imq_unlock(mqueue);
766				splx(s);
767				mutex_pause(0);
768				s = splsched();
769				imq_lock(mqueue);
770				goto search_set; /* start again at beginning - SMP */
771			}
772
773			/*
774			 * If there are no messages on this queue, just skip it
775			 * (we already removed the link from the set's prepost queue).
776			 */
777			kmsgs = &port_mq->imq_messages;
778			if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
779				imq_unlock(port_mq);
780				continue;
781			}
782
783			/*
784			 * There are messages, so reinsert the link back
785			 * at the tail of the preposted queue (for fairness)
786			 * while we still have the portset mqueue locked.
787			 */
788			queue_enter(q, wql, wait_queue_link_t, wql_preposts);
789			imq_unlock(mqueue);
790
791			/*
792			 * Continue on to handling the message with just
793			 * the port mqueue locked.
794			 */
795			ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
796			imq_unlock(port_mq);
797#if CONFIG_MACF_MACH
798			if (thread->task != TASK_NULL &&
799			    thread->ith_kmsg != NULL &&
800			    thread->ith_kmsg->ikm_sender != NULL) {
801				lh = thread->ith_kmsg->ikm_sender->label;
802				tasklabel_lock(thread->task);
803				ip_lock(lh->lh_port);
804				rc = mac_port_check_receive(&thread->task->maclabel,
805                                                            &lh->lh_label);
806				ip_unlock(lh->lh_port);
807				tasklabel_unlock(thread->task);
808				if (rc)
809					thread->ith_state = MACH_RCV_INVALID_DATA;
810			}
811#endif
812			splx(s);
813			return THREAD_NOT_WAITING;
814
815		}
816
817	} else {
818
819		/*
820		 * Receive on a single port. Just try to get the messages.
821		 */
822	  	kmsgs = &mqueue->imq_messages;
823		if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
824			ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
825			imq_unlock(mqueue);
826#if CONFIG_MACF_MACH
827			if (thread->task != TASK_NULL &&
828			    thread->ith_kmsg != NULL &&
829			    thread->ith_kmsg->ikm_sender != NULL) {
830				lh = thread->ith_kmsg->ikm_sender->label;
831				tasklabel_lock(thread->task);
832				ip_lock(lh->lh_port);
833				rc = mac_port_check_receive(&thread->task->maclabel,
834                                                            &lh->lh_label);
835				ip_unlock(lh->lh_port);
836				tasklabel_unlock(thread->task);
837				if (rc)
838					thread->ith_state = MACH_RCV_INVALID_DATA;
839			}
840#endif
841			splx(s);
842			return THREAD_NOT_WAITING;
843		}
844	}
845
846	/*
847	 * Looks like we'll have to block.  The mqueue we will
848	 * block on (whether the set's or the local port's) is
849	 * still locked.
850	 */
851	if (option & MACH_RCV_TIMEOUT) {
852		if (rcv_timeout == 0) {
853			imq_unlock(mqueue);
854			splx(s);
855			thread->ith_state = MACH_RCV_TIMED_OUT;
856			return THREAD_NOT_WAITING;
857		}
858	}
859
860	thread_lock(thread);
861	thread->ith_state = MACH_RCV_IN_PROGRESS;
862	thread->ith_option = option;
863	thread->ith_msize = max_size;
864
865	if (option & MACH_RCV_TIMEOUT)
866		clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
867	else
868		deadline = 0;
869
870	wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
871						  IPC_MQUEUE_RECEIVE,
872						  interruptible, deadline,
873						  thread);
874	/* preposts should be detected above, not here */
875	if (wresult == THREAD_AWAKENED)
876		panic("ipc_mqueue_receive_on_thread: sleep walking");
877
878	thread_unlock(thread);
879	imq_unlock(mqueue);
880	splx(s);
881	return wresult;
882}
883
884
885/*
886 *	Routine:	ipc_mqueue_select_on_thread
887 *	Purpose:
888 *		A receiver discovered that there was a message on the queue
889 *		before they had to block.  Pick the message off the queue and
890 *		"post" it to thread.
891 *	Conditions:
892 *		mqueue locked.
893 *              thread not locked.
894 *		There is a message.
895 *	Returns:
896 *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
897 *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
898 */
899void
900ipc_mqueue_select_on_thread(
901	ipc_mqueue_t		mqueue,
902	mach_msg_option_t	option,
903	mach_msg_size_t		max_size,
904	thread_t                thread)
905{
906	ipc_kmsg_t kmsg;
907	mach_msg_return_t mr = MACH_MSG_SUCCESS;
908	mach_msg_size_t rcv_size;
909
910	/*
911	 * Do some sanity checking of our ability to receive
912	 * before pulling the message off the queue.
913	 */
914	kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
915	assert(kmsg != IKM_NULL);
916
917	/*
918	 * If we really can't receive it, but we had the
919	 * MACH_RCV_LARGE option set, then don't take it off
920	 * the queue, instead return the appropriate error
921	 * (and size needed).
922	 */
923	rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
924	if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
925		mr = MACH_RCV_TOO_LARGE;
926		if (option & MACH_RCV_LARGE) {
927			thread->ith_receiver_name = mqueue->imq_receiver_name;
928			thread->ith_kmsg = IKM_NULL;
929			thread->ith_msize = rcv_size;
930			thread->ith_seqno = 0;
931			thread->ith_state = mr;
932			return;
933		}
934	}
935
936	ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
937	ipc_mqueue_release_msgcount(mqueue);
938	thread->ith_seqno = mqueue->imq_seqno++;
939	thread->ith_kmsg = kmsg;
940	thread->ith_state = mr;
941
942	current_task()->messages_received++;
943	return;
944}
945
946/*
947 *	Routine:	ipc_mqueue_peek
948 *	Purpose:
949 *		Peek at a message queue to see if it has any messages
950 *		(in it or contained message queues for a set).
951 *
952 *	Conditions:
953 *		Locks may be held by callers, so this routine cannot block.
954 *		Caller holds reference on the message queue.
955 */
956unsigned
957ipc_mqueue_peek(ipc_mqueue_t mq)
958{
959	wait_queue_link_t	wql;
960	queue_t			q;
961	spl_t s;
962
963	if (!imq_is_set(mq))
964		return (ipc_kmsg_queue_first(&mq->imq_messages) != IKM_NULL);
965
966	/*
967	 * Don't block trying to get the lock.
968	 */
969	s = splsched();
970	imq_lock(mq);
971
972	/*
973	 * peek at the contained port message queues, return as soon as
974	 * we spot a message on one of the message queues linked on the
975	 * prepost list.
976	 */
977	q = &mq->imq_preposts;
978	queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
979		ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
980		ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
981
982		if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
983			imq_unlock(mq);
984			splx(s);
985			return 1;
986		}
987	}
988	imq_unlock(mq);
989	splx(s);
990	return 0;
991}
992
993/*
994 *	Routine:	ipc_mqueue_destroy
995 *	Purpose:
996 *		Destroy a (non-set) message queue.
997 *		Set any blocked senders running.
998 *	   	Destroy the kmsgs in the queue.
999 *	Conditions:
1000 *		Nothing locked.
1001 *		Receivers were removed when the receive right was "changed"
1002 */
1003void
1004ipc_mqueue_destroy(
1005	ipc_mqueue_t	mqueue)
1006{
1007	ipc_kmsg_queue_t kmqueue;
1008	ipc_kmsg_t kmsg;
1009	boolean_t reap = FALSE;
1010	spl_t s;
1011
1012
1013	s = splsched();
1014	imq_lock(mqueue);
1015	/*
1016	 *	rouse all blocked senders
1017	 */
1018	mqueue->imq_fullwaiters = FALSE;
1019	wait_queue_wakeup64_all_locked(
1020				&mqueue->imq_wait_queue,
1021				IPC_MQUEUE_FULL,
1022				THREAD_RESTART,
1023				FALSE);
1024
1025	/*
1026	 * Move messages from the specified queue to the per-thread
1027	 * clean/drain queue while we have the mqueue lock.
1028	 */
1029	kmqueue = &mqueue->imq_messages;
1030	while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
1031		boolean_t first;
1032		first = ipc_kmsg_delayed_destroy(kmsg);
1033		if (first)
1034			reap = first;
1035	}
1036
1037	imq_unlock(mqueue);
1038	splx(s);
1039
1040	/*
1041	 * Destroy the messages we enqueued if we aren't nested
1042	 * inside some other attempt to drain the same queue.
1043	 */
1044	if (reap)
1045		ipc_kmsg_reap_delayed();
1046}
1047
1048/*
1049 *	Routine:	ipc_mqueue_set_qlimit
1050 *	Purpose:
1051 *		Changes a message queue limit; the maximum number
1052 *		of messages which may be queued.
1053 *	Conditions:
1054 *		Nothing locked.
1055 */
1056
1057void
1058ipc_mqueue_set_qlimit(
1059	 ipc_mqueue_t			mqueue,
1060	 mach_port_msgcount_t	qlimit)
1061{
1062	 spl_t s;
1063
1064	 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1065
1066	 /* wake up senders allowed by the new qlimit */
1067	 s = splsched();
1068	 imq_lock(mqueue);
1069	 if (qlimit > mqueue->imq_qlimit) {
1070		 mach_port_msgcount_t i, wakeup;
1071
1072		 /* caution: wakeup, qlimit are unsigned */
1073		 wakeup = qlimit - mqueue->imq_qlimit;
1074
1075		 for (i = 0; i < wakeup; i++) {
1076			 if (wait_queue_wakeup64_one_locked(
1077							&mqueue->imq_wait_queue,
1078							IPC_MQUEUE_FULL,
1079							THREAD_AWAKENED,
1080							FALSE) == KERN_NOT_WAITING) {
1081					 mqueue->imq_fullwaiters = FALSE;
1082					 break;
1083			 }
1084			 mqueue->imq_msgcount++;  /* give it to the awakened thread */
1085		 }
1086	 }
1087	mqueue->imq_qlimit = qlimit;
1088	imq_unlock(mqueue);
1089	splx(s);
1090}
1091
1092/*
1093 *	Routine:	ipc_mqueue_set_seqno
1094 *	Purpose:
1095 *		Changes an mqueue's sequence number.
1096 *	Conditions:
1097 *		Caller holds a reference to the queue's containing object.
1098 */
1099void
1100ipc_mqueue_set_seqno(
1101	ipc_mqueue_t		mqueue,
1102	mach_port_seqno_t	seqno)
1103{
1104	spl_t s;
1105
1106	s = splsched();
1107	imq_lock(mqueue);
1108	mqueue->imq_seqno = seqno;
1109	imq_unlock(mqueue);
1110	splx(s);
1111}
1112
1113
1114/*
1115 *	Routine:	ipc_mqueue_copyin
1116 *	Purpose:
1117 *		Convert a name in a space to a message queue.
1118 *	Conditions:
1119 *		Nothing locked.  If successful, the caller gets a ref for
1120 *		for the object.	This ref ensures the continued existence of
1121 *		the queue.
1122 *	Returns:
1123 *		MACH_MSG_SUCCESS	Found a message queue.
1124 *		MACH_RCV_INVALID_NAME	The space is dead.
1125 *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1126 *		MACH_RCV_INVALID_NAME
1127 *			The denoted right is not receive or port set.
1128 *		MACH_RCV_IN_SET		Receive right is a member of a set.
1129 */
1130
1131mach_msg_return_t
1132ipc_mqueue_copyin(
1133	ipc_space_t		space,
1134	mach_port_name_t	name,
1135	ipc_mqueue_t		*mqueuep,
1136	ipc_object_t		*objectp)
1137{
1138	ipc_entry_t entry;
1139	ipc_object_t object;
1140	ipc_mqueue_t mqueue;
1141
1142	is_read_lock(space);
1143	if (!is_active(space)) {
1144		is_read_unlock(space);
1145		return MACH_RCV_INVALID_NAME;
1146	}
1147
1148	entry = ipc_entry_lookup(space, name);
1149	if (entry == IE_NULL) {
1150		is_read_unlock(space);
1151		return MACH_RCV_INVALID_NAME;
1152	}
1153
1154	object = entry->ie_object;
1155
1156	if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1157		ipc_port_t port;
1158
1159		port = (ipc_port_t) object;
1160		assert(port != IP_NULL);
1161
1162		ip_lock(port);
1163		assert(ip_active(port));
1164		assert(port->ip_receiver_name == name);
1165		assert(port->ip_receiver == space);
1166		is_read_unlock(space);
1167		mqueue = &port->ip_messages;
1168
1169	} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1170		ipc_pset_t pset;
1171
1172		pset = (ipc_pset_t) object;
1173		assert(pset != IPS_NULL);
1174
1175		ips_lock(pset);
1176		assert(ips_active(pset));
1177		assert(pset->ips_local_name == name);
1178		is_read_unlock(space);
1179
1180		mqueue = &pset->ips_messages;
1181	} else {
1182		is_read_unlock(space);
1183		return MACH_RCV_INVALID_NAME;
1184	}
1185
1186	/*
1187	 *	At this point, the object is locked and active,
1188	 *	the space is unlocked, and mqueue is initialized.
1189	 */
1190
1191	io_reference(object);
1192	io_unlock(object);
1193
1194	*objectp = object;
1195	*mqueuep = mqueue;
1196	return MACH_MSG_SUCCESS;
1197}
1198
1199