1/*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
49 *  School of Computer Science
50 *  Carnegie Mellon University
51 *  Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 *	File:	ipc/ipc_mqueue.c
60 *	Author:	Rich Draves
61 *	Date:	1989
62 *
63 *	Functions to manipulate IPC message queues.
64 */
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections.  This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counters.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
81#include <kern/ipc_mig.h>	/* XXX - for mach_msg_receive_continue */
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
85#include <kern/wait_queue.h>
86
87#include <ipc/ipc_mqueue.h>
88#include <ipc/ipc_kmsg.h>
89#include <ipc/ipc_port.h>
90#include <ipc/ipc_pset.h>
91#include <ipc/ipc_space.h>
92
93#ifdef __LP64__
94#include <vm/vm_map.h>
95#endif
96
97int ipc_mqueue_full;		/* address is event for queue space */
98int ipc_mqueue_rcv;		/* address is event for message arrival */
99
100/* forward declarations */
101void ipc_mqueue_receive_results(wait_result_t result);
102
103/*
104 *	Routine:	ipc_mqueue_init
105 *	Purpose:
106 *		Initialize a newly-allocated message queue.
107 */
108void
109ipc_mqueue_init(
110	ipc_mqueue_t	mqueue,
111	boolean_t	is_set)
112{
113	if (is_set) {
114		wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
115	} else {
116		wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
117		ipc_kmsg_queue_init(&mqueue->imq_messages);
118		mqueue->imq_seqno = 0;
119		mqueue->imq_msgcount = 0;
120		mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
121		mqueue->imq_fullwaiters = FALSE;
122	}
123}
124
125/*
126 *	Routine:	ipc_mqueue_member
127 *	Purpose:
128 *		Indicate whether the (port) mqueue is a member of
129 *		this portset's mqueue.  We do this by checking
130 *		whether the portset mqueue's waitq is an member of
131 *		the port's mqueue waitq.
132 *	Conditions:
133 *		the portset's mqueue is not already a member
134 *		this may block while allocating linkage structures.
135 */
136
137boolean_t
138ipc_mqueue_member(
139	ipc_mqueue_t		port_mqueue,
140	ipc_mqueue_t		set_mqueue)
141{
142	wait_queue_t	port_waitq = &port_mqueue->imq_wait_queue;
143	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
144
145	return (wait_queue_member(port_waitq, set_waitq));
146
147}
148
149/*
150 *	Routine:	ipc_mqueue_remove
151 *	Purpose:
152 *		Remove the association between the queue and the specified
153 *		set message queue.
154 */
155
156kern_return_t
157ipc_mqueue_remove(
158	ipc_mqueue_t	  mqueue,
159	ipc_mqueue_t	  set_mqueue,
160	wait_queue_link_t *wqlp)
161{
162	wait_queue_t	 mq_waitq = &mqueue->imq_wait_queue;
163	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
164
165	return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
166}
167
168/*
169 *	Routine:	ipc_mqueue_remove_from_all
170 *	Purpose:
171 *		Remove the mqueue from all the sets it is a member of
172 *	Conditions:
173 *		Nothing locked.
174 */
175void
176ipc_mqueue_remove_from_all(
177	ipc_mqueue_t	mqueue,
178	queue_t 	links)
179{
180	wait_queue_t	mq_waitq = &mqueue->imq_wait_queue;
181
182	wait_queue_unlink_all_nofree(mq_waitq, links);
183	return;
184}
185
186/*
187 *	Routine:	ipc_mqueue_remove_all
188 *	Purpose:
189 *		Remove all the member queues from the specified set.
190 *	Conditions:
191 *		Nothing locked.
192 */
193void
194ipc_mqueue_remove_all(
195	ipc_mqueue_t	mqueue,
196	queue_t		links)
197{
198	wait_queue_set_t	mq_setq = &mqueue->imq_set_queue;
199
200	wait_queue_set_unlink_all_nofree(mq_setq, links);
201	return;
202}
203
204
205/*
206 *	Routine:	ipc_mqueue_add
207 *	Purpose:
208 *		Associate the portset's mqueue with the port's mqueue.
209 *		This has to be done so that posting the port will wakeup
210 *		a portset waiter.  If there are waiters on the portset
211 *		mqueue and messages on the port mqueue, try to match them
212 *		up now.
213 *	Conditions:
214 *		May block.
215 */
216kern_return_t
217ipc_mqueue_add(
218	ipc_mqueue_t	 port_mqueue,
219	ipc_mqueue_t	 set_mqueue,
220	wait_queue_link_t wql)
221{
222	wait_queue_t	 port_waitq = &port_mqueue->imq_wait_queue;
223	wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
224	ipc_kmsg_queue_t kmsgq;
225	ipc_kmsg_t       kmsg, next;
226	kern_return_t	 kr;
227	spl_t		 s;
228
229	kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
230	if (kr != KERN_SUCCESS)
231		return kr;
232
233	/*
234	 * Now that the set has been added to the port, there may be
235	 * messages queued on the port and threads waiting on the set
236	 * waitq.  Lets get them together.
237	 */
238	s = splsched();
239	imq_lock(port_mqueue);
240	kmsgq = &port_mqueue->imq_messages;
241	for (kmsg = ipc_kmsg_queue_first(kmsgq);
242	     kmsg != IKM_NULL;
243	     kmsg = next) {
244		next = ipc_kmsg_queue_next(kmsgq, kmsg);
245
246		for (;;) {
247			thread_t th;
248			mach_msg_size_t msize;
249
250			th = wait_queue_wakeup64_identity_locked(
251						port_waitq,
252						IPC_MQUEUE_RECEIVE,
253						THREAD_AWAKENED,
254						FALSE);
255			/* waitq/mqueue still locked, thread locked */
256
257			if (th == THREAD_NULL)
258				goto leave;
259
260			/*
261			 * If the receiver waited with a facility not directly
262			 * related to Mach messaging, then it isn't prepared to get
263			 * handed the message directly.  Just set it running, and
264			 * go look for another thread that can.
265			 */
266			if (th->ith_state != MACH_RCV_IN_PROGRESS) {
267				  thread_unlock(th);
268				  continue;
269			}
270
271			/*
272			 * Found a receiver. see if they can handle the message
273			 * correctly (the message is not too large for them, or
274			 * they didn't care to be informed that the message was
275			 * too large).  If they can't handle it, take them off
276			 * the list and let them go back and figure it out and
277			 * just move onto the next.
278			 */
279			msize = ipc_kmsg_copyout_size(kmsg, th->map);
280			if (th->ith_msize <
281					(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
282				th->ith_state = MACH_RCV_TOO_LARGE;
283				th->ith_msize = msize;
284				if (th->ith_option & MACH_RCV_LARGE) {
285					/*
286					 * let him go without message
287					 */
288					th->ith_receiver_name = port_mqueue->imq_receiver_name;
289					th->ith_kmsg = IKM_NULL;
290					th->ith_seqno = 0;
291					thread_unlock(th);
292					continue; /* find another thread */
293				}
294			} else {
295				th->ith_state = MACH_MSG_SUCCESS;
296			}
297
298			/*
299			 * This thread is going to take this message,
300			 * so give it to him.
301			 */
302			ipc_kmsg_rmqueue(kmsgq, kmsg);
303			ipc_mqueue_release_msgcount(port_mqueue);
304
305			th->ith_kmsg = kmsg;
306			th->ith_seqno = port_mqueue->imq_seqno++;
307			thread_unlock(th);
308			break;  /* go to next message */
309		}
310
311	}
312 leave:
313	imq_unlock(port_mqueue);
314	splx(s);
315	return KERN_SUCCESS;
316}
317
318/*
319 *	Routine:	ipc_mqueue_changed
320 *	Purpose:
321 *		Wake up receivers waiting in a message queue.
322 *	Conditions:
323 *		The message queue is locked.
324 */
325
326void
327ipc_mqueue_changed(
328	ipc_mqueue_t		mqueue)
329{
330	wait_queue_wakeup64_all_locked(
331				&mqueue->imq_wait_queue,
332				IPC_MQUEUE_RECEIVE,
333				THREAD_RESTART,
334				FALSE);		/* unlock waitq? */
335}
336
337
338
339
340/*
341 *	Routine:	ipc_mqueue_send
342 *	Purpose:
343 *		Send a message to a message queue.  The message holds a reference
344 *		for the destination port for this message queue in the
345 *		msgh_remote_port field.
346 *
347 *		If unsuccessful, the caller still has possession of
348 *		the message and must do something with it.  If successful,
349 *		the message is queued, given to a receiver, or destroyed.
350 *	Conditions:
351 *		mqueue is locked.
352 *	Returns:
353 *		MACH_MSG_SUCCESS	The message was accepted.
354 *		MACH_SEND_TIMED_OUT	Caller still has message.
355 *		MACH_SEND_INTERRUPTED	Caller still has message.
356 */
357mach_msg_return_t
358ipc_mqueue_send(
359	ipc_mqueue_t		mqueue,
360	ipc_kmsg_t		kmsg,
361	mach_msg_option_t	option,
362	mach_msg_timeout_t	send_timeout,
363	spl_t			s)
364{
365	int wresult;
366
367	/*
368	 *  Don't block if:
369	 *	1) We're under the queue limit.
370	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
371	 *	3) Message is sent to a send-once right.
372	 */
373	if (!imq_full(mqueue) ||
374	    (!imq_full_kernel(mqueue) &&
375	     ((option & MACH_SEND_ALWAYS) ||
376	      (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
377	       MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
378		mqueue->imq_msgcount++;
379		assert(mqueue->imq_msgcount > 0);
380		imq_unlock(mqueue);
381		splx(s);
382	} else {
383		thread_t cur_thread = current_thread();
384		uint64_t deadline;
385
386		/*
387		 * We have to wait for space to be granted to us.
388		 */
389		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
390			imq_unlock(mqueue);
391			splx(s);
392			return MACH_SEND_TIMED_OUT;
393		}
394		if (imq_full_kernel(mqueue)) {
395			imq_unlock(mqueue);
396			splx(s);
397			return MACH_SEND_NO_BUFFER;
398		}
399		mqueue->imq_fullwaiters = TRUE;
400		thread_lock(cur_thread);
401		if (option & MACH_SEND_TIMEOUT)
402			clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
403		else
404			deadline = 0;
405		wresult = wait_queue_assert_wait64_locked(
406						&mqueue->imq_wait_queue,
407						IPC_MQUEUE_FULL,
408						THREAD_ABORTSAFE,
409						TIMEOUT_URGENCY_USER_NORMAL,
410						deadline, 0,
411						cur_thread);
412		thread_unlock(cur_thread);
413		imq_unlock(mqueue);
414		splx(s);
415
416		if (wresult == THREAD_WAITING) {
417			wresult = thread_block(THREAD_CONTINUE_NULL);
418			counter(c_ipc_mqueue_send_block++);
419		}
420
421		switch (wresult) {
422		case THREAD_TIMED_OUT:
423			assert(option & MACH_SEND_TIMEOUT);
424			return MACH_SEND_TIMED_OUT;
425
426		case THREAD_AWAKENED:
427			/* we can proceed - inherited msgcount from waker */
428			assert(mqueue->imq_msgcount > 0);
429			break;
430
431		case THREAD_INTERRUPTED:
432			return MACH_SEND_INTERRUPTED;
433
434		case THREAD_RESTART:
435			/* mqueue is being destroyed */
436			return MACH_SEND_INVALID_DEST;
437		default:
438			panic("ipc_mqueue_send");
439		}
440	}
441
442	ipc_mqueue_post(mqueue, kmsg);
443	return MACH_MSG_SUCCESS;
444}
445
446
447/*
448 *	Routine:	ipc_mqueue_release_msgcount
449 *	Purpose:
450 *		Release a message queue reference in the case where we
451 *		found a waiter.
452 *
453 *	Conditions:
454 *		The message queue is locked.
455 *		The message corresponding to this reference is off the queue.
456 */
457void
458ipc_mqueue_release_msgcount(
459	ipc_mqueue_t mqueue)
460{
461	assert(imq_held(mqueue));
462	assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
463
464	mqueue->imq_msgcount--;
465
466	if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
467		if (wait_queue_wakeup64_one_locked(
468						&mqueue->imq_wait_queue,
469						IPC_MQUEUE_FULL,
470						THREAD_AWAKENED,
471						FALSE) != KERN_SUCCESS) {
472			mqueue->imq_fullwaiters = FALSE;
473		} else {
474			/* gave away our slot - add reference back */
475			mqueue->imq_msgcount++;
476		}
477	}
478}
479
480/*
481 *	Routine:	ipc_mqueue_post
482 *	Purpose:
483 *		Post a message to a waiting receiver or enqueue it.  If a
484 *		receiver is waiting, we can release our reserved space in
485 *		the message queue.
486 *
487 *	Conditions:
488 *		If we need to queue, our space in the message queue is reserved.
489 */
490void
491ipc_mqueue_post(
492	register ipc_mqueue_t 	mqueue,
493	register ipc_kmsg_t		kmsg)
494{
495	spl_t s;
496
497	/*
498	 *	While the msg queue	is locked, we have control of the
499	 *  kmsg, so the ref in	it for the port is still good.
500	 *
501	 *	Check for a receiver for the message.
502	 */
503	s = splsched();
504	imq_lock(mqueue);
505	for (;;) {
506		wait_queue_t waitq = &mqueue->imq_wait_queue;
507		thread_t receiver;
508		mach_msg_size_t msize;
509
510		receiver = wait_queue_wakeup64_identity_locked(
511							waitq,
512							IPC_MQUEUE_RECEIVE,
513							THREAD_AWAKENED,
514							FALSE);
515		/* waitq still locked, thread locked */
516
517		if (receiver == THREAD_NULL) {
518			/*
519			 * no receivers; queue kmsg
520			 */
521			assert(mqueue->imq_msgcount > 0);
522			ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
523			break;
524		}
525
526		/*
527		 * If the receiver waited with a facility not directly
528		 * related to Mach messaging, then it isn't prepared to get
529		 * handed the message directly.  Just set it running, and
530		 * go look for another thread that can.
531		 */
532		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
533				  thread_unlock(receiver);
534				  continue;
535		}
536
537
538		/*
539		 * We found a waiting thread.
540		 * If the message is too large or the scatter list is too small
541		 * the thread we wake up will get that as its status.
542		 */
543		msize =	ipc_kmsg_copyout_size(kmsg, receiver->map);
544		if (receiver->ith_msize <
545				(msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
546			receiver->ith_msize = msize;
547			receiver->ith_state = MACH_RCV_TOO_LARGE;
548		} else {
549			receiver->ith_state = MACH_MSG_SUCCESS;
550		}
551
552		/*
553		 * If there is no problem with the upcoming receive, or the
554		 * receiver thread didn't specifically ask for special too
555		 * large error condition, go ahead and select it anyway.
556		 */
557		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
558		    !(receiver->ith_option & MACH_RCV_LARGE)) {
559
560			receiver->ith_kmsg = kmsg;
561			receiver->ith_seqno = mqueue->imq_seqno++;
562			thread_unlock(receiver);
563
564			/* we didn't need our reserved spot in the queue */
565			ipc_mqueue_release_msgcount(mqueue);
566			break;
567		}
568
569		/*
570		 * Otherwise, this thread needs to be released to run
571		 * and handle its error without getting the message.  We
572		 * need to go back and pick another one.
573		 */
574		receiver->ith_receiver_name = mqueue->imq_receiver_name;
575		receiver->ith_kmsg = IKM_NULL;
576		receiver->ith_seqno = 0;
577		thread_unlock(receiver);
578	}
579
580	imq_unlock(mqueue);
581	splx(s);
582
583	current_task()->messages_sent++;
584	return;
585}
586
587
588/* static */ void
589ipc_mqueue_receive_results(wait_result_t saved_wait_result)
590{
591	thread_t     		self = current_thread();
592	mach_msg_option_t	option = self->ith_option;
593
594	/*
595	 * why did we wake up?
596	 */
597	switch (saved_wait_result) {
598	case THREAD_TIMED_OUT:
599		self->ith_state = MACH_RCV_TIMED_OUT;
600		return;
601
602	case THREAD_INTERRUPTED:
603		self->ith_state = MACH_RCV_INTERRUPTED;
604		return;
605
606	case THREAD_RESTART:
607		/* something bad happened to the port/set */
608		self->ith_state = MACH_RCV_PORT_CHANGED;
609		return;
610
611	case THREAD_AWAKENED:
612		/*
613		 * We do not need to go select a message, somebody
614		 * handed us one (or a too-large indication).
615		 */
616		switch (self->ith_state) {
617		case MACH_RCV_SCATTER_SMALL:
618		case MACH_RCV_TOO_LARGE:
619			/*
620			 * Somebody tried to give us a too large
621			 * message. If we indicated that we cared,
622			 * then they only gave us the indication,
623			 * otherwise they gave us the indication
624			 * AND the message anyway.
625			 */
626			if (option & MACH_RCV_LARGE) {
627				return;
628			}
629
630		case MACH_MSG_SUCCESS:
631			return;
632
633		default:
634			panic("ipc_mqueue_receive_results: strange ith_state");
635		}
636
637	default:
638		panic("ipc_mqueue_receive_results: strange wait_result");
639	}
640}
641
642void
643ipc_mqueue_receive_continue(
644	__unused void *param,
645	wait_result_t wresult)
646{
647	ipc_mqueue_receive_results(wresult);
648	mach_msg_receive_continue();  /* hard-coded for now */
649}
650
651/*
652 *	Routine:	ipc_mqueue_receive
653 *	Purpose:
654 *		Receive a message from a message queue.
655 *
656 *		If continuation is non-zero, then we might discard
657 *		our kernel stack when we block.  We will continue
658 *		after unblocking by executing continuation.
659 *
660 *		If resume is true, then we are resuming a receive
661 *		operation after a blocked receive discarded our stack.
662 *	Conditions:
663 *		Our caller must hold a reference for the port or port set
664 *		to which this queue belongs, to keep the queue
665 *		from being deallocated.
666 *
667 *		The kmsg is returned with clean header fields
668 *		and with the circular bit turned off.
669 *	Returns:
670 *		MACH_MSG_SUCCESS	Message returned in kmsgp.
671 *		MACH_RCV_TOO_LARGE	Message size returned in kmsgp.
672 *		MACH_RCV_TIMED_OUT	No message obtained.
673 *		MACH_RCV_INTERRUPTED	No message obtained.
674 *		MACH_RCV_PORT_DIED	Port/set died; no message.
675 *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
676 *
677 */
678
679void
680ipc_mqueue_receive(
681	ipc_mqueue_t            mqueue,
682	mach_msg_option_t       option,
683	mach_msg_size_t         max_size,
684	mach_msg_timeout_t      rcv_timeout,
685	int                     interruptible)
686{
687	wait_result_t           wresult;
688        thread_t                self = current_thread();
689
690        wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
691                                               rcv_timeout, interruptible,
692                                               self);
693        if (wresult == THREAD_NOT_WAITING)
694                return;
695
696	if (wresult == THREAD_WAITING) {
697		counter((interruptible == THREAD_ABORTSAFE) ?
698			c_ipc_mqueue_receive_block_user++ :
699			c_ipc_mqueue_receive_block_kernel++);
700
701		if (self->ith_continuation)
702			thread_block(ipc_mqueue_receive_continue);
703			/* NOTREACHED */
704
705		wresult = thread_block(THREAD_CONTINUE_NULL);
706	}
707	ipc_mqueue_receive_results(wresult);
708}
709
710wait_result_t
711ipc_mqueue_receive_on_thread(
712        ipc_mqueue_t            mqueue,
713	mach_msg_option_t       option,
714	mach_msg_size_t         max_size,
715	mach_msg_timeout_t      rcv_timeout,
716	int                     interruptible,
717	thread_t                thread)
718{
719	ipc_kmsg_queue_t        kmsgs;
720	wait_result_t           wresult;
721	uint64_t		deadline;
722	spl_t                   s;
723
724	s = splsched();
725	imq_lock(mqueue);
726
727	if (imq_is_set(mqueue)) {
728		queue_t q;
729
730		q = &mqueue->imq_preposts;
731
732		/*
733		 * If we are waiting on a portset mqueue, we need to see if
734		 * any of the member ports have work for us.  Ports that
735		 * have (or recently had) messages will be linked in the
736		 * prepost queue for the portset. By holding the portset's
737		 * mqueue lock during the search, we tie up any attempts by
738		 * mqueue_deliver or portset membership changes that may
739		 * cross our path.
740		 */
741	search_set:
742		while(!queue_empty(q)) {
743			wait_queue_link_t wql;
744			ipc_mqueue_t port_mq;
745
746			queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
747			assert(!wql_is_preposted(wql));
748
749			/*
750			 * This is a lock order violation, so we have to do it
751			 * "softly," putting the link back on the prepost list
752			 * if it fails (at the tail is fine since the order of
753			 * handling messages from different sources in a set is
754			 * not guaranteed and we'd like to skip to the next source
755			 * if one is available).
756			 */
757			port_mq = (ipc_mqueue_t)wql->wql_queue;
758			if (!imq_lock_try(port_mq)) {
759				queue_enter(q, wql, wait_queue_link_t, wql_preposts);
760				imq_unlock(mqueue);
761				splx(s);
762				mutex_pause(0);
763				s = splsched();
764				imq_lock(mqueue);
765				goto search_set; /* start again at beginning - SMP */
766			}
767
768			/*
769			 * If there are no messages on this queue, just skip it
770			 * (we already removed the link from the set's prepost queue).
771			 */
772			kmsgs = &port_mq->imq_messages;
773			if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
774				imq_unlock(port_mq);
775				continue;
776			}
777
778			/*
779			 * There are messages, so reinsert the link back
780			 * at the tail of the preposted queue (for fairness)
781			 * while we still have the portset mqueue locked.
782			 */
783			queue_enter(q, wql, wait_queue_link_t, wql_preposts);
784			imq_unlock(mqueue);
785
786			/*
787			 * Continue on to handling the message with just
788			 * the port mqueue locked.
789			 */
790			ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
791			imq_unlock(port_mq);
792			splx(s);
793			return THREAD_NOT_WAITING;
794
795		}
796
797	} else {
798
799		/*
800		 * Receive on a single port. Just try to get the messages.
801		 */
802	  	kmsgs = &mqueue->imq_messages;
803		if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
804			ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
805			imq_unlock(mqueue);
806			splx(s);
807			return THREAD_NOT_WAITING;
808		}
809	}
810
811	/*
812	 * Looks like we'll have to block.  The mqueue we will
813	 * block on (whether the set's or the local port's) is
814	 * still locked.
815	 */
816	if (option & MACH_RCV_TIMEOUT) {
817		if (rcv_timeout == 0) {
818			imq_unlock(mqueue);
819			splx(s);
820			thread->ith_state = MACH_RCV_TIMED_OUT;
821			return THREAD_NOT_WAITING;
822		}
823	}
824
825	thread_lock(thread);
826	thread->ith_state = MACH_RCV_IN_PROGRESS;
827	thread->ith_option = option;
828	thread->ith_msize = max_size;
829
830	if (option & MACH_RCV_TIMEOUT)
831		clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
832	else
833		deadline = 0;
834
835	wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
836						  IPC_MQUEUE_RECEIVE,
837						  interruptible,
838						  TIMEOUT_URGENCY_USER_NORMAL,
839						  deadline, 0,
840						  thread);
841	/* preposts should be detected above, not here */
842	if (wresult == THREAD_AWAKENED)
843		panic("ipc_mqueue_receive_on_thread: sleep walking");
844
845	thread_unlock(thread);
846	imq_unlock(mqueue);
847	splx(s);
848	return wresult;
849}
850
851
852/*
853 *	Routine:	ipc_mqueue_select_on_thread
854 *	Purpose:
855 *		A receiver discovered that there was a message on the queue
856 *		before he had to block.  Pick the message off the queue and
857 *		"post" it to thread.
858 *	Conditions:
859 *		mqueue locked.
860 *              thread not locked.
861 *		There is a message.
862 *	Returns:
863 *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
864 *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
865 */
866void
867ipc_mqueue_select_on_thread(
868	ipc_mqueue_t		mqueue,
869	mach_msg_option_t	option,
870	mach_msg_size_t		max_size,
871	thread_t                thread)
872{
873	ipc_kmsg_t kmsg;
874	mach_msg_return_t mr = MACH_MSG_SUCCESS;
875	mach_msg_size_t rcv_size;
876
877	/*
878	 * Do some sanity checking of our ability to receive
879	 * before pulling the message off the queue.
880	 */
881	kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
882	assert(kmsg != IKM_NULL);
883
884	/*
885	 * If we really can't receive it, but we had the
886	 * MACH_RCV_LARGE option set, then don't take it off
887	 * the queue, instead return the appropriate error
888	 * (and size needed).
889	 */
890	rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
891	if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
892		mr = MACH_RCV_TOO_LARGE;
893		if (option & MACH_RCV_LARGE) {
894			thread->ith_receiver_name = mqueue->imq_receiver_name;
895			thread->ith_kmsg = IKM_NULL;
896			thread->ith_msize = rcv_size;
897			thread->ith_seqno = 0;
898			thread->ith_state = mr;
899			return;
900		}
901	}
902
903	ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
904	ipc_mqueue_release_msgcount(mqueue);
905	thread->ith_seqno = mqueue->imq_seqno++;
906	thread->ith_kmsg = kmsg;
907	thread->ith_state = mr;
908
909	current_task()->messages_received++;
910	return;
911}
912
913/*
914 *	Routine:	ipc_mqueue_peek
915 *	Purpose:
916 *		Peek at a (non-set) message queue to see if it has a message
917 *		matching the sequence number provided (if zero, then the
918 *		first message in the queue) and return vital info about the
919 *		message.
920 *
921 *	Conditions:
922 *		Locks may be held by callers, so this routine cannot block.
923 *		Caller holds reference on the message queue.
924 */
925unsigned
926ipc_mqueue_peek(ipc_mqueue_t 		mq,
927		mach_port_seqno_t	*seqnop,
928		mach_msg_size_t		*msg_sizep,
929		mach_msg_id_t		*msg_idp,
930		mach_msg_max_trailer_t 	*msg_trailerp)
931{
932	ipc_kmsg_queue_t kmsgq;
933	ipc_kmsg_t kmsg;
934	mach_port_seqno_t seqno, msgoff;
935	int res = 0;
936	spl_t s;
937
938	assert(!imq_is_set(mq));
939
940	s = splsched();
941	imq_lock(mq);
942
943	seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
944
945	if (seqno == 0) {
946		seqno = mq->imq_seqno;
947		msgoff = 0;
948	} else if (seqno >= mq->imq_seqno &&
949		   seqno < mq->imq_seqno + mq->imq_msgcount) {
950		msgoff = seqno - mq->imq_seqno;
951	} else
952		goto out;
953
954	/* look for the message that would match that seqno */
955	kmsgq = &mq->imq_messages;
956	kmsg = ipc_kmsg_queue_first(kmsgq);
957	while (msgoff-- && kmsg != IKM_NULL) {
958		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
959	}
960	if (kmsg == IKM_NULL)
961		goto out;
962
963	/* found one - return the requested info */
964	if (seqnop != NULL)
965		*seqnop = seqno;
966	if (msg_sizep != NULL)
967		*msg_sizep = kmsg->ikm_header->msgh_size;
968	if (msg_idp != NULL)
969		*msg_idp = kmsg->ikm_header->msgh_id;
970	if (msg_trailerp != NULL)
971		memcpy(msg_trailerp,
972		       (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
973						  round_msg(kmsg->ikm_header->msgh_size)),
974		       sizeof(mach_msg_max_trailer_t));
975	res = 1;
976
977 out:
978	imq_unlock(mq);
979	splx(s);
980	return res;
981}
982
983/*
984 *	Routine:	ipc_mqueue_set_peek
985 *	Purpose:
986 *		Peek at a message queue set to see if it has any ports
987 *		with messages.
988 *
989 *	Conditions:
990 *		Locks may be held by callers, so this routine cannot block.
991 *		Caller holds reference on the message queue.
992 */
993unsigned
994ipc_mqueue_set_peek(ipc_mqueue_t mq)
995{
996	wait_queue_link_t	wql;
997	queue_t			q;
998	spl_t s;
999	int res;
1000
1001	assert(imq_is_set(mq));
1002
1003	s = splsched();
1004	imq_lock(mq);
1005
1006	/*
1007	 * peek at the contained port message queues, return as soon as
1008	 * we spot a message on one of the message queues linked on the
1009	 * prepost list.  No need to lock each message queue, as only the
1010	 * head of each queue is checked. If a message wasn't there before
1011	 * we entered here, no need to find it (if we do, great).
1012	 */
1013	res = 0;
1014	q = &mq->imq_preposts;
1015	queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
1016		ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
1017		ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
1018
1019		if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
1020			res = 1;
1021			break;
1022		}
1023	}
1024	imq_unlock(mq);
1025	splx(s);
1026	return res;
1027}
1028
1029/*
1030 *	Routine:	ipc_mqueue_set_gather_member_names
1031 *	Purpose:
1032 *		Iterate a message queue set to identify the member port
1033 *		names. Actual returned names is limited to maxnames entries,
1034 *		but we keep counting the actual number of members to let
1035 *		the caller decide to retry if necessary.
1036 *
1037 *	Conditions:
1038 *		Locks may be held by callers, so this routine cannot block.
1039 *		Caller holds reference on the message queue.
1040 */
1041void
1042ipc_mqueue_set_gather_member_names(
1043	ipc_mqueue_t mq,
1044	ipc_entry_num_t maxnames,
1045	mach_port_name_t *names,
1046	ipc_entry_num_t *actualp)
1047{
1048	wait_queue_link_t	wql;
1049	queue_t			q;
1050	spl_t s;
1051	ipc_entry_num_t actual = 0;
1052
1053	assert(imq_is_set(mq));
1054
1055	s = splsched();
1056	imq_lock(mq);
1057
1058	/*
1059	 * Iterate over the member ports through the mqueue set links
1060	 * capturing as many names as we can.
1061	 */
1062	q = &mq->imq_setlinks;
1063	queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
1064		ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
1065
1066		if (actual < maxnames)
1067			names[actual] = port_mq->imq_receiver_name;
1068		actual++;
1069	}
1070	imq_unlock(mq);
1071	splx(s);
1072
1073	*actualp = actual;
1074}
1075
1076
1077/*
1078 *	Routine:	ipc_mqueue_destroy
1079 *	Purpose:
1080 *		Destroy a (non-set) message queue.
1081 *		Set any blocked senders running.
1082 *	   	Destroy the kmsgs in the queue.
1083 *	Conditions:
1084 *		Nothing locked.
1085 *		Receivers were removed when the receive right was "changed"
1086 */
1087void
1088ipc_mqueue_destroy(
1089	ipc_mqueue_t	mqueue)
1090{
1091	ipc_kmsg_queue_t kmqueue;
1092	ipc_kmsg_t kmsg;
1093	boolean_t reap = FALSE;
1094	spl_t s;
1095
1096	s = splsched();
1097	imq_lock(mqueue);
1098	/*
1099	 *	rouse all blocked senders
1100	 */
1101	mqueue->imq_fullwaiters = FALSE;
1102	wait_queue_wakeup64_all_locked(
1103				&mqueue->imq_wait_queue,
1104				IPC_MQUEUE_FULL,
1105				THREAD_RESTART,
1106				FALSE);
1107
1108	/*
1109	 * Move messages from the specified queue to the per-thread
1110	 * clean/drain queue while we have the mqueue lock.
1111	 */
1112	kmqueue = &mqueue->imq_messages;
1113	while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
1114		boolean_t first;
1115		first = ipc_kmsg_delayed_destroy(kmsg);
1116		if (first)
1117			reap = first;
1118	}
1119
1120	imq_unlock(mqueue);
1121	splx(s);
1122
1123	/*
1124	 * Destroy the messages we enqueued if we aren't nested
1125	 * inside some other attempt to drain the same queue.
1126	 */
1127	if (reap)
1128		ipc_kmsg_reap_delayed();
1129}
1130
1131/*
1132 *	Routine:	ipc_mqueue_set_qlimit
1133 *	Purpose:
1134 *		Changes a message queue limit; the maximum number
1135 *		of messages which may be queued.
1136 *	Conditions:
1137 *		Nothing locked.
1138 */
1139
1140void
1141ipc_mqueue_set_qlimit(
1142	 ipc_mqueue_t			mqueue,
1143	 mach_port_msgcount_t	qlimit)
1144{
1145	 spl_t s;
1146
1147	 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1148
1149	 /* wake up senders allowed by the new qlimit */
1150	 s = splsched();
1151	 imq_lock(mqueue);
1152	 if (qlimit > mqueue->imq_qlimit) {
1153		 mach_port_msgcount_t i, wakeup;
1154
1155		 /* caution: wakeup, qlimit are unsigned */
1156		 wakeup = qlimit - mqueue->imq_qlimit;
1157
1158		 for (i = 0; i < wakeup; i++) {
1159			 if (wait_queue_wakeup64_one_locked(
1160							&mqueue->imq_wait_queue,
1161							IPC_MQUEUE_FULL,
1162							THREAD_AWAKENED,
1163							FALSE) == KERN_NOT_WAITING) {
1164					 mqueue->imq_fullwaiters = FALSE;
1165					 break;
1166			 }
1167			 mqueue->imq_msgcount++;  /* give it to the awakened thread */
1168		 }
1169	 }
1170	mqueue->imq_qlimit = qlimit;
1171	imq_unlock(mqueue);
1172	splx(s);
1173}
1174
1175/*
1176 *	Routine:	ipc_mqueue_set_seqno
1177 *	Purpose:
1178 *		Changes an mqueue's sequence number.
1179 *	Conditions:
1180 *		Caller holds a reference to the queue's containing object.
1181 */
1182void
1183ipc_mqueue_set_seqno(
1184	ipc_mqueue_t		mqueue,
1185	mach_port_seqno_t	seqno)
1186{
1187	spl_t s;
1188
1189	s = splsched();
1190	imq_lock(mqueue);
1191	mqueue->imq_seqno = seqno;
1192	imq_unlock(mqueue);
1193	splx(s);
1194}
1195
1196
1197/*
1198 *	Routine:	ipc_mqueue_copyin
1199 *	Purpose:
1200 *		Convert a name in a space to a message queue.
1201 *	Conditions:
1202 *		Nothing locked.  If successful, the caller gets a ref for
1203 *		for the object.	This ref ensures the continued existence of
1204 *		the queue.
1205 *	Returns:
1206 *		MACH_MSG_SUCCESS	Found a message queue.
1207 *		MACH_RCV_INVALID_NAME	The space is dead.
1208 *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1209 *		MACH_RCV_INVALID_NAME
1210 *			The denoted right is not receive or port set.
1211 *		MACH_RCV_IN_SET		Receive right is a member of a set.
1212 */
1213
1214mach_msg_return_t
1215ipc_mqueue_copyin(
1216	ipc_space_t		space,
1217	mach_port_name_t	name,
1218	ipc_mqueue_t		*mqueuep,
1219	ipc_object_t		*objectp)
1220{
1221	ipc_entry_t entry;
1222	ipc_object_t object;
1223	ipc_mqueue_t mqueue;
1224
1225	is_read_lock(space);
1226	if (!is_active(space)) {
1227		is_read_unlock(space);
1228		return MACH_RCV_INVALID_NAME;
1229	}
1230
1231	entry = ipc_entry_lookup(space, name);
1232	if (entry == IE_NULL) {
1233		is_read_unlock(space);
1234		return MACH_RCV_INVALID_NAME;
1235	}
1236
1237	object = entry->ie_object;
1238
1239	if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1240		ipc_port_t port;
1241
1242		port = (ipc_port_t) object;
1243		assert(port != IP_NULL);
1244
1245		ip_lock(port);
1246		assert(ip_active(port));
1247		assert(port->ip_receiver_name == name);
1248		assert(port->ip_receiver == space);
1249		is_read_unlock(space);
1250		mqueue = &port->ip_messages;
1251
1252	} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1253		ipc_pset_t pset;
1254
1255		pset = (ipc_pset_t) object;
1256		assert(pset != IPS_NULL);
1257
1258		ips_lock(pset);
1259		assert(ips_active(pset));
1260		assert(pset->ips_local_name == name);
1261		is_read_unlock(space);
1262
1263		mqueue = &pset->ips_messages;
1264	} else {
1265		is_read_unlock(space);
1266		return MACH_RCV_INVALID_NAME;
1267	}
1268
1269	/*
1270	 *	At this point, the object is locked and active,
1271	 *	the space is unlocked, and mqueue is initialized.
1272	 */
1273
1274	io_reference(object);
1275	io_unlock(object);
1276
1277	*objectp = object;
1278	*mqueuep = mqueue;
1279	return MACH_MSG_SUCCESS;
1280}
1281
1282