1/* 2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved. 3 * 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ 5 * 6 * This file contains Original Code and/or Modifications of Original Code 7 * as defined in and that are subject to the Apple Public Source License 8 * Version 2.0 (the 'License'). You may not use this file except in 9 * compliance with the License. The rights granted to you under the License 10 * may not be used to create, or enable the creation or redistribution of, 11 * unlawful or unlicensed copies of an Apple operating system, or to 12 * circumvent, violate, or enable the circumvention or violation of, any 13 * terms of an Apple operating system software license agreement. 14 * 15 * Please obtain a copy of the License at 16 * http://www.opensource.apple.com/apsl/ and read it before using this file. 17 * 18 * The Original Code and all software distributed under the License are 19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 23 * Please see the License for the specific language governing rights and 24 * limitations under the License. 25 * 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ 27 */ 28/* 29 * @OSF_FREE_COPYRIGHT@ 30 */ 31/* 32 * Mach Operating System 33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University 34 * All Rights Reserved. 35 * 36 * Permission to use, copy, modify and distribute this software and its 37 * documentation is hereby granted, provided that both the copyright 38 * notice and this permission notice appear in all copies of the 39 * software, derivative works or modified versions, and any portions 40 * thereof, and that both notices appear in supporting documentation. 41 * 42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR 44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 45 * 46 * Carnegie Mellon requests users of this software to return to 47 * 48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 49 * School of Computer Science 50 * Carnegie Mellon University 51 * Pittsburgh PA 15213-3890 52 * 53 * any improvements or extensions that they make and grant Carnegie Mellon 54 * the rights to redistribute these changes. 55 */ 56/* 57 */ 58/* 59 * File: ipc/ipc_mqueue.c 60 * Author: Rich Draves 61 * Date: 1989 62 * 63 * Functions to manipulate IPC message queues. 64 */ 65/* 66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce 67 * support for mandatory and extensible security protections. This notice 68 * is included in support of clause 2.2 (b) of the Apple Public License, 69 * Version 2.0. 70 */ 71 72 73#include <mach/port.h> 74#include <mach/message.h> 75#include <mach/sync_policy.h> 76 77#include <kern/assert.h> 78#include <kern/counters.h> 79#include <kern/sched_prim.h> 80#include <kern/ipc_kobject.h> 81#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */ 82#include <kern/misc_protos.h> 83#include <kern/task.h> 84#include <kern/thread.h> 85#include <kern/wait_queue.h> 86 87#include <ipc/ipc_mqueue.h> 88#include <ipc/ipc_kmsg.h> 89#include <ipc/ipc_port.h> 90#include <ipc/ipc_pset.h> 91#include <ipc/ipc_space.h> 92 93#ifdef __LP64__ 94#include <vm/vm_map.h> 95#endif 96 97#if CONFIG_MACF_MACH 98#include <security/mac_mach_internal.h> 99#endif 100 101int ipc_mqueue_full; /* address is event for queue space */ 102int ipc_mqueue_rcv; /* address is event for message arrival */ 103 104/* forward declarations */ 105void ipc_mqueue_receive_results(wait_result_t result); 106 107/* 108 * Routine: ipc_mqueue_init 109 * Purpose: 110 * Initialize a newly-allocated message queue. 111 */ 112void 113ipc_mqueue_init( 114 ipc_mqueue_t mqueue, 115 boolean_t is_set) 116{ 117 if (is_set) { 118 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST); 119 } else { 120 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); 121 ipc_kmsg_queue_init(&mqueue->imq_messages); 122 mqueue->imq_seqno = 0; 123 mqueue->imq_msgcount = 0; 124 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT; 125 mqueue->imq_fullwaiters = FALSE; 126 } 127} 128 129/* 130 * Routine: ipc_mqueue_member 131 * Purpose: 132 * Indicate whether the (port) mqueue is a member of 133 * this portset's mqueue. We do this by checking 134 * whether the portset mqueue's waitq is an member of 135 * the port's mqueue waitq. 136 * Conditions: 137 * the portset's mqueue is not already a member 138 * this may block while allocating linkage structures. 139 */ 140 141boolean_t 142ipc_mqueue_member( 143 ipc_mqueue_t port_mqueue, 144 ipc_mqueue_t set_mqueue) 145{ 146 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; 147 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; 148 149 return (wait_queue_member(port_waitq, set_waitq)); 150 151} 152 153/* 154 * Routine: ipc_mqueue_remove 155 * Purpose: 156 * Remove the association between the queue and the specified 157 * set message queue. 158 */ 159 160kern_return_t 161ipc_mqueue_remove( 162 ipc_mqueue_t mqueue, 163 ipc_mqueue_t set_mqueue, 164 wait_queue_link_t *wqlp) 165{ 166 wait_queue_t mq_waitq = &mqueue->imq_wait_queue; 167 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; 168 169 return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp); 170} 171 172/* 173 * Routine: ipc_mqueue_remove_from_all 174 * Purpose: 175 * Remove the mqueue from all the sets it is a member of 176 * Conditions: 177 * Nothing locked. 178 */ 179void 180ipc_mqueue_remove_from_all( 181 ipc_mqueue_t mqueue, 182 queue_t links) 183{ 184 wait_queue_t mq_waitq = &mqueue->imq_wait_queue; 185 186 wait_queue_unlink_all_nofree(mq_waitq, links); 187 return; 188} 189 190/* 191 * Routine: ipc_mqueue_remove_all 192 * Purpose: 193 * Remove all the member queues from the specified set. 194 * Conditions: 195 * Nothing locked. 196 */ 197void 198ipc_mqueue_remove_all( 199 ipc_mqueue_t mqueue, 200 queue_t links) 201{ 202 wait_queue_set_t mq_setq = &mqueue->imq_set_queue; 203 204 wait_queue_set_unlink_all_nofree(mq_setq, links); 205 return; 206} 207 208 209/* 210 * Routine: ipc_mqueue_add 211 * Purpose: 212 * Associate the portset's mqueue with the port's mqueue. 213 * This has to be done so that posting the port will wakeup 214 * a portset waiter. If there are waiters on the portset 215 * mqueue and messages on the port mqueue, try to match them 216 * up now. 217 * Conditions: 218 * May block. 219 */ 220kern_return_t 221ipc_mqueue_add( 222 ipc_mqueue_t port_mqueue, 223 ipc_mqueue_t set_mqueue, 224 wait_queue_link_t wql) 225{ 226 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; 227 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; 228 ipc_kmsg_queue_t kmsgq; 229 ipc_kmsg_t kmsg, next; 230 kern_return_t kr; 231 spl_t s; 232 233 kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql); 234 if (kr != KERN_SUCCESS) 235 return kr; 236 237 /* 238 * Now that the set has been added to the port, there may be 239 * messages queued on the port and threads waiting on the set 240 * waitq. Lets get them together. 241 */ 242 s = splsched(); 243 imq_lock(port_mqueue); 244 kmsgq = &port_mqueue->imq_messages; 245 for (kmsg = ipc_kmsg_queue_first(kmsgq); 246 kmsg != IKM_NULL; 247 kmsg = next) { 248 next = ipc_kmsg_queue_next(kmsgq, kmsg); 249 250 for (;;) { 251 thread_t th; 252 mach_msg_size_t msize; 253 254 th = wait_queue_wakeup64_identity_locked( 255 port_waitq, 256 IPC_MQUEUE_RECEIVE, 257 THREAD_AWAKENED, 258 FALSE); 259 /* waitq/mqueue still locked, thread locked */ 260 261 if (th == THREAD_NULL) 262 goto leave; 263 264 /* 265 * If the receiver waited with a facility not directly 266 * related to Mach messaging, then it isn't prepared to get 267 * handed the message directly. Just set it running, and 268 * go look for another thread that can. 269 */ 270 if (th->ith_state != MACH_RCV_IN_PROGRESS) { 271 thread_unlock(th); 272 continue; 273 } 274 275 /* 276 * Found a receiver. see if they can handle the message 277 * correctly (the message is not too large for them, or 278 * they didn't care to be informed that the message was 279 * too large). If they can't handle it, take them off 280 * the list and let them go back and figure it out and 281 * just move onto the next. 282 */ 283 msize = ipc_kmsg_copyout_size(kmsg, th->map); 284 if (th->ith_msize < 285 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) { 286 th->ith_state = MACH_RCV_TOO_LARGE; 287 th->ith_msize = msize; 288 if (th->ith_option & MACH_RCV_LARGE) { 289 /* 290 * let him go without message 291 */ 292 th->ith_receiver_name = port_mqueue->imq_receiver_name; 293 th->ith_kmsg = IKM_NULL; 294 th->ith_seqno = 0; 295 thread_unlock(th); 296 continue; /* find another thread */ 297 } 298 } else { 299 th->ith_state = MACH_MSG_SUCCESS; 300 } 301 302 /* 303 * This thread is going to take this message, 304 * so give it to him. 305 */ 306 ipc_kmsg_rmqueue(kmsgq, kmsg); 307 ipc_mqueue_release_msgcount(port_mqueue); 308 309 th->ith_kmsg = kmsg; 310 th->ith_seqno = port_mqueue->imq_seqno++; 311 thread_unlock(th); 312 break; /* go to next message */ 313 } 314 315 } 316 leave: 317 imq_unlock(port_mqueue); 318 splx(s); 319 return KERN_SUCCESS; 320} 321 322/* 323 * Routine: ipc_mqueue_changed 324 * Purpose: 325 * Wake up receivers waiting in a message queue. 326 * Conditions: 327 * The message queue is locked. 328 */ 329 330void 331ipc_mqueue_changed( 332 ipc_mqueue_t mqueue) 333{ 334 wait_queue_wakeup64_all_locked( 335 &mqueue->imq_wait_queue, 336 IPC_MQUEUE_RECEIVE, 337 THREAD_RESTART, 338 FALSE); /* unlock waitq? */ 339} 340 341 342 343 344/* 345 * Routine: ipc_mqueue_send 346 * Purpose: 347 * Send a message to a message queue. The message holds a reference 348 * for the destination port for this message queue in the 349 * msgh_remote_port field. 350 * 351 * If unsuccessful, the caller still has possession of 352 * the message and must do something with it. If successful, 353 * the message is queued, given to a receiver, or destroyed. 354 * Conditions: 355 * Nothing locked. 356 * Returns: 357 * MACH_MSG_SUCCESS The message was accepted. 358 * MACH_SEND_TIMED_OUT Caller still has message. 359 * MACH_SEND_INTERRUPTED Caller still has message. 360 */ 361mach_msg_return_t 362ipc_mqueue_send( 363 ipc_mqueue_t mqueue, 364 ipc_kmsg_t kmsg, 365 mach_msg_option_t option, 366 mach_msg_timeout_t send_timeout, 367 spl_t s) 368{ 369 int wresult; 370 371 /* 372 * Don't block if: 373 * 1) We're under the queue limit. 374 * 2) Caller used the MACH_SEND_ALWAYS internal option. 375 * 3) Message is sent to a send-once right. 376 */ 377 if (!imq_full(mqueue) || 378 (!imq_full_kernel(mqueue) && 379 ((option & MACH_SEND_ALWAYS) || 380 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == 381 MACH_MSG_TYPE_PORT_SEND_ONCE)))) { 382 mqueue->imq_msgcount++; 383 assert(mqueue->imq_msgcount > 0); 384 imq_unlock(mqueue); 385 splx(s); 386 } else { 387 thread_t cur_thread = current_thread(); 388 uint64_t deadline; 389 390 /* 391 * We have to wait for space to be granted to us. 392 */ 393 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) { 394 imq_unlock(mqueue); 395 splx(s); 396 return MACH_SEND_TIMED_OUT; 397 } 398 if (imq_full_kernel(mqueue)) { 399 imq_unlock(mqueue); 400 splx(s); 401 return MACH_SEND_NO_BUFFER; 402 } 403 mqueue->imq_fullwaiters = TRUE; 404 thread_lock(cur_thread); 405 if (option & MACH_SEND_TIMEOUT) 406 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline); 407 else 408 deadline = 0; 409 wresult = wait_queue_assert_wait64_locked( 410 &mqueue->imq_wait_queue, 411 IPC_MQUEUE_FULL, 412 THREAD_ABORTSAFE, deadline, 413 cur_thread); 414 thread_unlock(cur_thread); 415 imq_unlock(mqueue); 416 splx(s); 417 418 if (wresult == THREAD_WAITING) { 419 wresult = thread_block(THREAD_CONTINUE_NULL); 420 counter(c_ipc_mqueue_send_block++); 421 } 422 423 switch (wresult) { 424 case THREAD_TIMED_OUT: 425 assert(option & MACH_SEND_TIMEOUT); 426 return MACH_SEND_TIMED_OUT; 427 428 case THREAD_AWAKENED: 429 /* we can proceed - inherited msgcount from waker */ 430 assert(mqueue->imq_msgcount > 0); 431 break; 432 433 case THREAD_INTERRUPTED: 434 return MACH_SEND_INTERRUPTED; 435 436 case THREAD_RESTART: 437 /* mqueue is being destroyed */ 438 return MACH_SEND_INVALID_DEST; 439 default: 440 panic("ipc_mqueue_send"); 441 } 442 } 443 444 ipc_mqueue_post(mqueue, kmsg); 445 return MACH_MSG_SUCCESS; 446} 447 448/* 449 * Routine: ipc_mqueue_release_msgcount 450 * Purpose: 451 * Release a message queue reference in the case where we 452 * found a waiter. 453 * 454 * Conditions: 455 * The message queue is locked. 456 * The message corresponding to this reference is off the queue. 457 */ 458void 459ipc_mqueue_release_msgcount( 460 ipc_mqueue_t mqueue) 461{ 462 assert(imq_held(mqueue)); 463 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages)); 464 465 mqueue->imq_msgcount--; 466 467 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) { 468 if (wait_queue_wakeup64_one_locked( 469 &mqueue->imq_wait_queue, 470 IPC_MQUEUE_FULL, 471 THREAD_AWAKENED, 472 FALSE) != KERN_SUCCESS) { 473 mqueue->imq_fullwaiters = FALSE; 474 } else { 475 /* gave away our slot - add reference back */ 476 mqueue->imq_msgcount++; 477 } 478 } 479} 480 481/* 482 * Routine: ipc_mqueue_post 483 * Purpose: 484 * Post a message to a waiting receiver or enqueue it. If a 485 * receiver is waiting, we can release our reserved space in 486 * the message queue. 487 * 488 * Conditions: 489 * If we need to queue, our space in the message queue is reserved. 490 */ 491void 492ipc_mqueue_post( 493 register ipc_mqueue_t mqueue, 494 register ipc_kmsg_t kmsg) 495{ 496 spl_t s; 497 498 /* 499 * While the msg queue is locked, we have control of the 500 * kmsg, so the ref in it for the port is still good. 501 * 502 * Check for a receiver for the message. 503 */ 504 s = splsched(); 505 imq_lock(mqueue); 506 for (;;) { 507 wait_queue_t waitq = &mqueue->imq_wait_queue; 508 thread_t receiver; 509 mach_msg_size_t msize; 510 511 receiver = wait_queue_wakeup64_identity_locked( 512 waitq, 513 IPC_MQUEUE_RECEIVE, 514 THREAD_AWAKENED, 515 FALSE); 516 /* waitq still locked, thread locked */ 517 518 if (receiver == THREAD_NULL) { 519 /* 520 * no receivers; queue kmsg 521 */ 522 assert(mqueue->imq_msgcount > 0); 523 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); 524 break; 525 } 526 527 /* 528 * If the receiver waited with a facility not directly 529 * related to Mach messaging, then it isn't prepared to get 530 * handed the message directly. Just set it running, and 531 * go look for another thread that can. 532 */ 533 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) { 534 thread_unlock(receiver); 535 continue; 536 } 537 538 539 /* 540 * We found a waiting thread. 541 * If the message is too large or the scatter list is too small 542 * the thread we wake up will get that as its status. 543 */ 544 msize = ipc_kmsg_copyout_size(kmsg, receiver->map); 545 if (receiver->ith_msize < 546 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) { 547 receiver->ith_msize = msize; 548 receiver->ith_state = MACH_RCV_TOO_LARGE; 549 } else { 550 receiver->ith_state = MACH_MSG_SUCCESS; 551 } 552 553 /* 554 * If there is no problem with the upcoming receive, or the 555 * receiver thread didn't specifically ask for special too 556 * large error condition, go ahead and select it anyway. 557 */ 558 if ((receiver->ith_state == MACH_MSG_SUCCESS) || 559 !(receiver->ith_option & MACH_RCV_LARGE)) { 560 561 receiver->ith_kmsg = kmsg; 562 receiver->ith_seqno = mqueue->imq_seqno++; 563 thread_unlock(receiver); 564 565 /* we didn't need our reserved spot in the queue */ 566 ipc_mqueue_release_msgcount(mqueue); 567 break; 568 } 569 570 /* 571 * Otherwise, this thread needs to be released to run 572 * and handle its error without getting the message. We 573 * need to go back and pick another one. 574 */ 575 receiver->ith_kmsg = IKM_NULL; 576 receiver->ith_seqno = 0; 577 thread_unlock(receiver); 578 } 579 580 imq_unlock(mqueue); 581 splx(s); 582 583 current_task()->messages_sent++; 584 return; 585} 586 587 588/* static */ void 589ipc_mqueue_receive_results(wait_result_t saved_wait_result) 590{ 591 thread_t self = current_thread(); 592 mach_msg_option_t option = self->ith_option; 593 594 /* 595 * why did we wake up? 596 */ 597 switch (saved_wait_result) { 598 case THREAD_TIMED_OUT: 599 self->ith_state = MACH_RCV_TIMED_OUT; 600 return; 601 602 case THREAD_INTERRUPTED: 603 self->ith_state = MACH_RCV_INTERRUPTED; 604 return; 605 606 case THREAD_RESTART: 607 /* something bad happened to the port/set */ 608 self->ith_state = MACH_RCV_PORT_CHANGED; 609 return; 610 611 case THREAD_AWAKENED: 612 /* 613 * We do not need to go select a message, somebody 614 * handed us one (or a too-large indication). 615 */ 616 switch (self->ith_state) { 617 case MACH_RCV_SCATTER_SMALL: 618 case MACH_RCV_TOO_LARGE: 619 /* 620 * Somebody tried to give us a too large 621 * message. If we indicated that we cared, 622 * then they only gave us the indication, 623 * otherwise they gave us the indication 624 * AND the message anyway. 625 */ 626 if (option & MACH_RCV_LARGE) { 627 return; 628 } 629 630 case MACH_MSG_SUCCESS: 631 return; 632 633 default: 634 panic("ipc_mqueue_receive_results: strange ith_state"); 635 } 636 637 default: 638 panic("ipc_mqueue_receive_results: strange wait_result"); 639 } 640} 641 642void 643ipc_mqueue_receive_continue( 644 __unused void *param, 645 wait_result_t wresult) 646{ 647 ipc_mqueue_receive_results(wresult); 648 mach_msg_receive_continue(); /* hard-coded for now */ 649} 650 651/* 652 * Routine: ipc_mqueue_receive 653 * Purpose: 654 * Receive a message from a message queue. 655 * 656 * If continuation is non-zero, then we might discard 657 * our kernel stack when we block. We will continue 658 * after unblocking by executing continuation. 659 * 660 * If resume is true, then we are resuming a receive 661 * operation after a blocked receive discarded our stack. 662 * Conditions: 663 * Our caller must hold a reference for the port or port set 664 * to which this queue belongs, to keep the queue 665 * from being deallocated. 666 * 667 * The kmsg is returned with clean header fields 668 * and with the circular bit turned off. 669 * Returns: 670 * MACH_MSG_SUCCESS Message returned in kmsgp. 671 * MACH_RCV_TOO_LARGE Message size returned in kmsgp. 672 * MACH_RCV_TIMED_OUT No message obtained. 673 * MACH_RCV_INTERRUPTED No message obtained. 674 * MACH_RCV_PORT_DIED Port/set died; no message. 675 * MACH_RCV_PORT_CHANGED Port moved into set; no msg. 676 * 677 */ 678 679void 680ipc_mqueue_receive( 681 ipc_mqueue_t mqueue, 682 mach_msg_option_t option, 683 mach_msg_size_t max_size, 684 mach_msg_timeout_t rcv_timeout, 685 int interruptible) 686{ 687 wait_result_t wresult; 688 thread_t self = current_thread(); 689 690 wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size, 691 rcv_timeout, interruptible, 692 self); 693 if (wresult == THREAD_NOT_WAITING) 694 return; 695 696 if (wresult == THREAD_WAITING) { 697 counter((interruptible == THREAD_ABORTSAFE) ? 698 c_ipc_mqueue_receive_block_user++ : 699 c_ipc_mqueue_receive_block_kernel++); 700 701 if (self->ith_continuation) 702 thread_block(ipc_mqueue_receive_continue); 703 /* NOTREACHED */ 704 705 wresult = thread_block(THREAD_CONTINUE_NULL); 706 } 707 ipc_mqueue_receive_results(wresult); 708} 709 710wait_result_t 711ipc_mqueue_receive_on_thread( 712 ipc_mqueue_t mqueue, 713 mach_msg_option_t option, 714 mach_msg_size_t max_size, 715 mach_msg_timeout_t rcv_timeout, 716 int interruptible, 717 thread_t thread) 718{ 719 ipc_kmsg_queue_t kmsgs; 720 wait_result_t wresult; 721 uint64_t deadline; 722 spl_t s; 723#if CONFIG_MACF_MACH 724 ipc_labelh_t lh; 725 task_t task; 726 int rc; 727#endif 728 729 s = splsched(); 730 imq_lock(mqueue); 731 732 if (imq_is_set(mqueue)) { 733 queue_t q; 734 735 q = &mqueue->imq_preposts; 736 737 /* 738 * If we are waiting on a portset mqueue, we need to see if 739 * any of the member ports have work for us. Ports that 740 * have (or recently had) messages will be linked in the 741 * prepost queue for the portset. By holding the portset's 742 * mqueue lock during the search, we tie up any attempts by 743 * mqueue_deliver or portset membership changes that may 744 * cross our path. 745 */ 746 search_set: 747 while(!queue_empty(q)) { 748 wait_queue_link_t wql; 749 ipc_mqueue_t port_mq; 750 751 queue_remove_first(q, wql, wait_queue_link_t, wql_preposts); 752 assert(!wql_is_preposted(wql)); 753 754 /* 755 * This is a lock order violation, so we have to do it 756 * "softly," putting the link back on the prepost list 757 * if it fails (at the tail is fine since the order of 758 * handling messages from different sources in a set is 759 * not guaranteed and we'd like to skip to the next source 760 * if one is available). 761 */ 762 port_mq = (ipc_mqueue_t)wql->wql_queue; 763 if (!imq_lock_try(port_mq)) { 764 queue_enter(q, wql, wait_queue_link_t, wql_preposts); 765 imq_unlock(mqueue); 766 splx(s); 767 mutex_pause(0); 768 s = splsched(); 769 imq_lock(mqueue); 770 goto search_set; /* start again at beginning - SMP */ 771 } 772 773 /* 774 * If there are no messages on this queue, just skip it 775 * (we already removed the link from the set's prepost queue). 776 */ 777 kmsgs = &port_mq->imq_messages; 778 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) { 779 imq_unlock(port_mq); 780 continue; 781 } 782 783 /* 784 * There are messages, so reinsert the link back 785 * at the tail of the preposted queue (for fairness) 786 * while we still have the portset mqueue locked. 787 */ 788 queue_enter(q, wql, wait_queue_link_t, wql_preposts); 789 imq_unlock(mqueue); 790 791 /* 792 * Continue on to handling the message with just 793 * the port mqueue locked. 794 */ 795 ipc_mqueue_select_on_thread(port_mq, option, max_size, thread); 796 imq_unlock(port_mq); 797#if CONFIG_MACF_MACH 798 if (thread->task != TASK_NULL && 799 thread->ith_kmsg != NULL && 800 thread->ith_kmsg->ikm_sender != NULL) { 801 lh = thread->ith_kmsg->ikm_sender->label; 802 tasklabel_lock(thread->task); 803 ip_lock(lh->lh_port); 804 rc = mac_port_check_receive(&thread->task->maclabel, 805 &lh->lh_label); 806 ip_unlock(lh->lh_port); 807 tasklabel_unlock(thread->task); 808 if (rc) 809 thread->ith_state = MACH_RCV_INVALID_DATA; 810 } 811#endif 812 splx(s); 813 return THREAD_NOT_WAITING; 814 815 } 816 817 } else { 818 819 /* 820 * Receive on a single port. Just try to get the messages. 821 */ 822 kmsgs = &mqueue->imq_messages; 823 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { 824 ipc_mqueue_select_on_thread(mqueue, option, max_size, thread); 825 imq_unlock(mqueue); 826#if CONFIG_MACF_MACH 827 if (thread->task != TASK_NULL && 828 thread->ith_kmsg != NULL && 829 thread->ith_kmsg->ikm_sender != NULL) { 830 lh = thread->ith_kmsg->ikm_sender->label; 831 tasklabel_lock(thread->task); 832 ip_lock(lh->lh_port); 833 rc = mac_port_check_receive(&thread->task->maclabel, 834 &lh->lh_label); 835 ip_unlock(lh->lh_port); 836 tasklabel_unlock(thread->task); 837 if (rc) 838 thread->ith_state = MACH_RCV_INVALID_DATA; 839 } 840#endif 841 splx(s); 842 return THREAD_NOT_WAITING; 843 } 844 } 845 846 /* 847 * Looks like we'll have to block. The mqueue we will 848 * block on (whether the set's or the local port's) is 849 * still locked. 850 */ 851 if (option & MACH_RCV_TIMEOUT) { 852 if (rcv_timeout == 0) { 853 imq_unlock(mqueue); 854 splx(s); 855 thread->ith_state = MACH_RCV_TIMED_OUT; 856 return THREAD_NOT_WAITING; 857 } 858 } 859 860 thread_lock(thread); 861 thread->ith_state = MACH_RCV_IN_PROGRESS; 862 thread->ith_option = option; 863 thread->ith_msize = max_size; 864 865 if (option & MACH_RCV_TIMEOUT) 866 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline); 867 else 868 deadline = 0; 869 870 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue, 871 IPC_MQUEUE_RECEIVE, 872 interruptible, deadline, 873 thread); 874 /* preposts should be detected above, not here */ 875 if (wresult == THREAD_AWAKENED) 876 panic("ipc_mqueue_receive_on_thread: sleep walking"); 877 878 thread_unlock(thread); 879 imq_unlock(mqueue); 880 splx(s); 881 return wresult; 882} 883 884 885/* 886 * Routine: ipc_mqueue_select_on_thread 887 * Purpose: 888 * A receiver discovered that there was a message on the queue 889 * before they had to block. Pick the message off the queue and 890 * "post" it to thread. 891 * Conditions: 892 * mqueue locked. 893 * thread not locked. 894 * There is a message. 895 * Returns: 896 * MACH_MSG_SUCCESS Actually selected a message for ourselves. 897 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large 898 */ 899void 900ipc_mqueue_select_on_thread( 901 ipc_mqueue_t mqueue, 902 mach_msg_option_t option, 903 mach_msg_size_t max_size, 904 thread_t thread) 905{ 906 ipc_kmsg_t kmsg; 907 mach_msg_return_t mr = MACH_MSG_SUCCESS; 908 mach_msg_size_t rcv_size; 909 910 /* 911 * Do some sanity checking of our ability to receive 912 * before pulling the message off the queue. 913 */ 914 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages); 915 assert(kmsg != IKM_NULL); 916 917 /* 918 * If we really can't receive it, but we had the 919 * MACH_RCV_LARGE option set, then don't take it off 920 * the queue, instead return the appropriate error 921 * (and size needed). 922 */ 923 rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map); 924 if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) { 925 mr = MACH_RCV_TOO_LARGE; 926 if (option & MACH_RCV_LARGE) { 927 thread->ith_receiver_name = mqueue->imq_receiver_name; 928 thread->ith_kmsg = IKM_NULL; 929 thread->ith_msize = rcv_size; 930 thread->ith_seqno = 0; 931 thread->ith_state = mr; 932 return; 933 } 934 } 935 936 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg); 937 ipc_mqueue_release_msgcount(mqueue); 938 thread->ith_seqno = mqueue->imq_seqno++; 939 thread->ith_kmsg = kmsg; 940 thread->ith_state = mr; 941 942 current_task()->messages_received++; 943 return; 944} 945 946/* 947 * Routine: ipc_mqueue_peek 948 * Purpose: 949 * Peek at a message queue to see if it has any messages 950 * (in it or contained message queues for a set). 951 * 952 * Conditions: 953 * Locks may be held by callers, so this routine cannot block. 954 * Caller holds reference on the message queue. 955 */ 956unsigned 957ipc_mqueue_peek(ipc_mqueue_t mq) 958{ 959 wait_queue_link_t wql; 960 queue_t q; 961 spl_t s; 962 963 if (!imq_is_set(mq)) 964 return (ipc_kmsg_queue_first(&mq->imq_messages) != IKM_NULL); 965 966 /* 967 * Don't block trying to get the lock. 968 */ 969 s = splsched(); 970 imq_lock(mq); 971 972 /* 973 * peek at the contained port message queues, return as soon as 974 * we spot a message on one of the message queues linked on the 975 * prepost list. 976 */ 977 q = &mq->imq_preposts; 978 queue_iterate(q, wql, wait_queue_link_t, wql_preposts) { 979 ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue; 980 ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; 981 982 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { 983 imq_unlock(mq); 984 splx(s); 985 return 1; 986 } 987 } 988 imq_unlock(mq); 989 splx(s); 990 return 0; 991} 992 993/* 994 * Routine: ipc_mqueue_destroy 995 * Purpose: 996 * Destroy a (non-set) message queue. 997 * Set any blocked senders running. 998 * Destroy the kmsgs in the queue. 999 * Conditions: 1000 * Nothing locked. 1001 * Receivers were removed when the receive right was "changed" 1002 */ 1003void 1004ipc_mqueue_destroy( 1005 ipc_mqueue_t mqueue) 1006{ 1007 ipc_kmsg_queue_t kmqueue; 1008 ipc_kmsg_t kmsg; 1009 boolean_t reap = FALSE; 1010 spl_t s; 1011 1012 1013 s = splsched(); 1014 imq_lock(mqueue); 1015 /* 1016 * rouse all blocked senders 1017 */ 1018 mqueue->imq_fullwaiters = FALSE; 1019 wait_queue_wakeup64_all_locked( 1020 &mqueue->imq_wait_queue, 1021 IPC_MQUEUE_FULL, 1022 THREAD_RESTART, 1023 FALSE); 1024 1025 /* 1026 * Move messages from the specified queue to the per-thread 1027 * clean/drain queue while we have the mqueue lock. 1028 */ 1029 kmqueue = &mqueue->imq_messages; 1030 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) { 1031 boolean_t first; 1032 first = ipc_kmsg_delayed_destroy(kmsg); 1033 if (first) 1034 reap = first; 1035 } 1036 1037 imq_unlock(mqueue); 1038 splx(s); 1039 1040 /* 1041 * Destroy the messages we enqueued if we aren't nested 1042 * inside some other attempt to drain the same queue. 1043 */ 1044 if (reap) 1045 ipc_kmsg_reap_delayed(); 1046} 1047 1048/* 1049 * Routine: ipc_mqueue_set_qlimit 1050 * Purpose: 1051 * Changes a message queue limit; the maximum number 1052 * of messages which may be queued. 1053 * Conditions: 1054 * Nothing locked. 1055 */ 1056 1057void 1058ipc_mqueue_set_qlimit( 1059 ipc_mqueue_t mqueue, 1060 mach_port_msgcount_t qlimit) 1061{ 1062 spl_t s; 1063 1064 assert(qlimit <= MACH_PORT_QLIMIT_MAX); 1065 1066 /* wake up senders allowed by the new qlimit */ 1067 s = splsched(); 1068 imq_lock(mqueue); 1069 if (qlimit > mqueue->imq_qlimit) { 1070 mach_port_msgcount_t i, wakeup; 1071 1072 /* caution: wakeup, qlimit are unsigned */ 1073 wakeup = qlimit - mqueue->imq_qlimit; 1074 1075 for (i = 0; i < wakeup; i++) { 1076 if (wait_queue_wakeup64_one_locked( 1077 &mqueue->imq_wait_queue, 1078 IPC_MQUEUE_FULL, 1079 THREAD_AWAKENED, 1080 FALSE) == KERN_NOT_WAITING) { 1081 mqueue->imq_fullwaiters = FALSE; 1082 break; 1083 } 1084 mqueue->imq_msgcount++; /* give it to the awakened thread */ 1085 } 1086 } 1087 mqueue->imq_qlimit = qlimit; 1088 imq_unlock(mqueue); 1089 splx(s); 1090} 1091 1092/* 1093 * Routine: ipc_mqueue_set_seqno 1094 * Purpose: 1095 * Changes an mqueue's sequence number. 1096 * Conditions: 1097 * Caller holds a reference to the queue's containing object. 1098 */ 1099void 1100ipc_mqueue_set_seqno( 1101 ipc_mqueue_t mqueue, 1102 mach_port_seqno_t seqno) 1103{ 1104 spl_t s; 1105 1106 s = splsched(); 1107 imq_lock(mqueue); 1108 mqueue->imq_seqno = seqno; 1109 imq_unlock(mqueue); 1110 splx(s); 1111} 1112 1113 1114/* 1115 * Routine: ipc_mqueue_copyin 1116 * Purpose: 1117 * Convert a name in a space to a message queue. 1118 * Conditions: 1119 * Nothing locked. If successful, the caller gets a ref for 1120 * for the object. This ref ensures the continued existence of 1121 * the queue. 1122 * Returns: 1123 * MACH_MSG_SUCCESS Found a message queue. 1124 * MACH_RCV_INVALID_NAME The space is dead. 1125 * MACH_RCV_INVALID_NAME The name doesn't denote a right. 1126 * MACH_RCV_INVALID_NAME 1127 * The denoted right is not receive or port set. 1128 * MACH_RCV_IN_SET Receive right is a member of a set. 1129 */ 1130 1131mach_msg_return_t 1132ipc_mqueue_copyin( 1133 ipc_space_t space, 1134 mach_port_name_t name, 1135 ipc_mqueue_t *mqueuep, 1136 ipc_object_t *objectp) 1137{ 1138 ipc_entry_t entry; 1139 ipc_object_t object; 1140 ipc_mqueue_t mqueue; 1141 1142 is_read_lock(space); 1143 if (!is_active(space)) { 1144 is_read_unlock(space); 1145 return MACH_RCV_INVALID_NAME; 1146 } 1147 1148 entry = ipc_entry_lookup(space, name); 1149 if (entry == IE_NULL) { 1150 is_read_unlock(space); 1151 return MACH_RCV_INVALID_NAME; 1152 } 1153 1154 object = entry->ie_object; 1155 1156 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { 1157 ipc_port_t port; 1158 1159 port = (ipc_port_t) object; 1160 assert(port != IP_NULL); 1161 1162 ip_lock(port); 1163 assert(ip_active(port)); 1164 assert(port->ip_receiver_name == name); 1165 assert(port->ip_receiver == space); 1166 is_read_unlock(space); 1167 mqueue = &port->ip_messages; 1168 1169 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) { 1170 ipc_pset_t pset; 1171 1172 pset = (ipc_pset_t) object; 1173 assert(pset != IPS_NULL); 1174 1175 ips_lock(pset); 1176 assert(ips_active(pset)); 1177 assert(pset->ips_local_name == name); 1178 is_read_unlock(space); 1179 1180 mqueue = &pset->ips_messages; 1181 } else { 1182 is_read_unlock(space); 1183 return MACH_RCV_INVALID_NAME; 1184 } 1185 1186 /* 1187 * At this point, the object is locked and active, 1188 * the space is unlocked, and mqueue is initialized. 1189 */ 1190 1191 io_reference(object); 1192 io_unlock(object); 1193 1194 *objectp = object; 1195 *mqueuep = mqueue; 1196 return MACH_MSG_SUCCESS; 1197} 1198 1199