1/** 2 * \file 3 * \brief Waitset and low-level event handling mechanism 4 * 5 * A "wait set" is a collection of channels to wait on, much like an 6 * FDSET in POSIX. There should be a default, static wait set for each 7 * dispatcher. Threads which wait for events specify the wait set they 8 * are waiting on. 9 */ 10 11/* 12 * Copyright (c) 2009-2012, ETH Zurich. 13 * Copyright (c) 2015, Hewlett Packard Enterprise Development LP. 14 * All rights reserved. 15 * 16 * This file is distributed under the terms in the attached LICENSE file. 17 * If you do not find this file, copies can be found by writing to: 18 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group. 19 */ 20 21#include <barrelfish/barrelfish.h> 22#include <barrelfish/waitset.h> 23#include <barrelfish/waitset_chan.h> 24#include <barrelfish/threads.h> 25#include <barrelfish/dispatch.h> 26#include "threads_priv.h" 27#include "waitset_chan_priv.h" 28#include <stdio.h> 29#include <string.h> 30 31#include <flounder/flounder.h> 32 33#ifdef CONFIG_INTERCONNECT_DRIVER_UMP 34# include <barrelfish/ump_endpoint.h> 35#endif 36 37/// Dequeue a chanstate from a queue 38static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan) 39{ 40 if (chan->next == chan) { 41 assert(chan->prev == chan); 42 assert(*queue == chan); 43 *queue = NULL; 44 } else { 45 chan->prev->next = chan->next; 46 chan->next->prev = chan->prev; 47 if (*queue == chan) { 48 *queue = chan->next; 49 } 50 } 51 chan->prev = chan->next = NULL; 52} 53 54/// Enqueue a chanstate on a queue 55static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan) 56{ 57 if (*queue == NULL) { 58 *queue = chan; 59 chan->next = chan->prev = chan; 60 } else { 61 chan->next = *queue; 62 chan->prev = (*queue)->prev; 63 chan->next->prev = chan; 64 chan->prev->next = chan; 65 } 66} 67 68/// Dequeue a chanstate from polled queue 69static void dequeue_polled(struct waitset_chanstate **queue, 70 struct waitset_chanstate *chan) 71{ 72 if (chan->polled_next == chan) { 73 assert(chan->polled_prev == chan); 74 assert(*queue == chan); 75 *queue = NULL; 76 } else { 77 chan->polled_prev->polled_next = chan->polled_next; 78 chan->polled_next->polled_prev = chan->polled_prev; 79 if (*queue == chan) { 80 *queue = chan->polled_next; 81 } 82 } 83 chan->polled_prev = chan->polled_next = NULL; 84} 85 86/// Enqueue a chanstate on polled queue 87static void enqueue_polled(struct waitset_chanstate **queue, 88 struct waitset_chanstate *chan) 89{ 90 if (*queue == NULL) { 91 *queue = chan; 92 chan->polled_next = chan->polled_prev = chan; 93 } else { 94 chan->polled_next = *queue; 95 chan->polled_prev = (*queue)->polled_prev; 96 chan->polled_next->polled_prev = chan; 97 chan->polled_prev->polled_next = chan; 98 } 99} 100 101/** 102 * \brief Initialise a new waitset 103 */ 104void waitset_init(struct waitset *ws) 105{ 106 assert(ws != NULL); 107 ws->pending = ws->polled = ws->idle = ws->waiting = NULL; 108 ws->waiting_threads = NULL; 109} 110 111/** 112 * \brief Destroy a previously initialised waitset 113 */ 114errval_t waitset_destroy(struct waitset *ws) 115{ 116 assert(ws != NULL); 117 118 // FIXME: do we want to support cancelling all the pending events/channels? 119 if (ws->pending || ws->waiting_threads) { 120 return LIB_ERR_WAITSET_IN_USE; 121 } 122 123 // remove idle and polled channels from waitset 124 struct waitset_chanstate *chan, *next; 125 for (chan = ws->idle; chan != NULL; chan = next) { 126 next = chan->next; 127 assert(chan->state == CHAN_IDLE); 128 assert(chan->waitset == ws); 129 chan->waitset = NULL; 130 chan->next = chan->prev = NULL; 131 132 if (next == ws->idle) { 133 break; 134 } 135 } 136 ws->idle = NULL; 137 138 for (chan = ws->polled; chan != NULL; chan = next) { 139 next = chan->next; 140 assert(chan->state == CHAN_POLLED); 141 assert(chan->waitset == ws); 142 chan->waitset = NULL; 143 chan->next = chan->prev = NULL; 144 145 if (next == ws->polled) { 146 break; 147 } 148 } 149 ws->polled = NULL; 150 151 return SYS_ERR_OK; 152} 153 154/// Check if the thread can receive the event 155static bool waitset_can_receive(struct waitset_chanstate *chan, 156 struct thread *thread) 157{ 158 bool res = false; 159 160 if (chan->wait_for) // if a thread is waiting for this specific event 161 res = chan->wait_for == thread; 162 else 163 res = (chan->token & 1 && !thread->token) // incoming token is a request 164 // and a thread is not waiting for a token 165 || (!chan->token && chan != thread->channel) // there's no token 166 // and a thread is not waiting specifically for that event 167 || (chan->token == thread->token && chan == thread->channel); 168 // there is a token and it matches thread's token and event 169 return res; 170} 171 172/// Returns a channel with a pending event on the given waitset matching 173/// our thread 174static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws, 175 struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2) 176{ 177 struct thread *me = thread_self_disabled(); 178 179 if (waitfor) { // channel that we wait for 180 if ((waitfor->state == CHAN_PENDING || waitfor->state == CHAN_WAITING) 181 && waitset_can_receive(waitfor, me)) { 182 return waitfor; 183 } 184 if (waitfor2 && (waitfor2->state == CHAN_PENDING || waitfor2->state == CHAN_WAITING) 185 && waitset_can_receive(waitfor2, me)) { 186 return waitfor2; 187 } 188 return NULL; 189 } 190 struct waitset_chanstate *chan; 191 // check a waiting queue for matching event 192 for (chan = ws->waiting; chan; ) { 193 if (waitset_can_receive(chan, me)) { 194 assert_disabled(chan->state == CHAN_WAITING); 195 return chan; 196 } 197 chan = chan->next; 198 if (chan == ws->waiting) 199 break; 200 } 201 // check a pending queue for matching event 202 for (chan = ws->pending; chan;) { 203 if (waitset_can_receive(chan, me)) { 204 assert_disabled(chan->state == CHAN_PENDING); 205 return chan; 206 } 207 chan = chan->next; 208 if (chan == ws->pending) 209 break; 210 } 211 return NULL; 212} 213 214void arranet_polling_loop_proxy(void) __attribute__((weak)); 215void arranet_polling_loop_proxy(void) 216{ 217 USER_PANIC("Network polling not available without Arranet!\n"); 218} 219 220void poll_ahci(struct waitset_chanstate *) __attribute__((weak)); 221void poll_ahci(struct waitset_chanstate *chan) 222{ 223 errval_t err = waitset_chan_trigger(chan); 224 assert(err_is_ok(err)); // should not be able to fail 225} 226 227/// Check polled channels 228void poll_channels_disabled(dispatcher_handle_t handle) { 229 struct dispatcher_generic *dp = get_dispatcher_generic(handle); 230 struct waitset_chanstate *chan; 231 232 if (!dp->polled_channels) 233 return; 234 chan = dp->polled_channels; 235 do { 236 switch (chan->chantype) { 237#ifdef CONFIG_INTERCONNECT_DRIVER_UMP 238 case CHANTYPE_UMP_IN: { 239 if (ump_endpoint_poll(chan)) { 240 errval_t err = waitset_chan_trigger_disabled(chan, handle); 241 assert(err_is_ok(err)); // should not fail 242 if (!dp->polled_channels) // restart scan 243 return; 244 chan = dp->polled_channels; 245 continue; 246 } else 247 chan = chan->polled_next; 248 } break; 249#endif // CONFIG_INTERCONNECT_DRIVER_UMP 250 case CHANTYPE_LWIP_SOCKET: 251 arranet_polling_loop_proxy(); 252 break; 253 case CHANTYPE_AHCI: 254 poll_ahci(chan); 255 break; 256 default: 257 assert(!"invalid channel type to poll!"); 258 } 259 } while (chan != dp->polled_channels); 260} 261 262/// Re-register a channel (if persistent) 263static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan, 264 dispatcher_handle_t handle) 265{ 266 assert(chan->waitset == ws); 267 if (chan->state == CHAN_PENDING) { 268 dequeue(&ws->pending, chan); 269 } else { 270 assert(chan->state == CHAN_WAITING); 271 dequeue(&ws->waiting, chan); 272 } 273 274 chan->token = 0; 275 if (chan->chantype == CHANTYPE_UMP_IN 276 || chan->chantype == CHANTYPE_LWIP_SOCKET 277 || chan->chantype == CHANTYPE_AHCI) { 278 enqueue(&ws->polled, chan); 279 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan); 280 chan->state = CHAN_POLLED; 281 } else { 282 enqueue(&ws->idle, chan); 283 chan->state = CHAN_IDLE; 284 } 285} 286 287/// Find a thread that is able to receive an event 288static struct thread * find_recipient(struct waitset *ws, 289 struct waitset_chanstate *channel, struct thread *me) 290{ 291 struct thread *t = ws->waiting_threads; 292 293 if (!t) 294 return NULL; 295 do { 296 if (waitset_can_receive(channel, t)) 297 return t; 298 t = t->next; 299 } while (t != ws->waiting_threads); 300 return ws->waiting_threads; 301} 302 303/// Wake up other thread if there's more pending events 304static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws) 305{ 306 if (ws->pending && ws->waiting_threads) { 307 struct thread *t; 308 309 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL); 310 assert_disabled(t == NULL); // shouldn't see a remote thread 311 } 312} 313 314/** 315 * \brief Get next pending event 316 * 317 * Check if there is a pending event that matches current thread and return it. 318 * Pending events are in a pending queue and in a waiting queue. 319 * A pending event then will be removed from a pending/waiting queue and become 320 * unregistered or, if it's persistent, will be re-registered to an idle queue 321 * or a polled queue (UMP channels) of a waitset. 322 * If there's no pending event, block this thread. 323 * If there's a pending event but it doesn't match our thread, don't remove it 324 * from a pending queue and wake up a matching thread. 325 * If there's no matching thread, add it to a waiting queue. 326 * 327 * \param ws Waitset with sources of events 328 * \param retchannel Holder of returned event 329 * \param retclosure Holder of returned closure 330 * \param waitfor Specific event that we're waiting for (can be NULL) 331 * \param handle Dispatcher's handle 332 * \param debug Debug mode (not used) 333 */ 334 335errval_t get_next_event_disabled(struct waitset *ws, 336 struct waitset_chanstate **retchannel, struct event_closure *retclosure, 337 struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2, 338 dispatcher_handle_t handle, bool debug) 339{ 340 struct waitset_chanstate * chan; 341 342// debug_printf("%s: %p %p %p %p\n", __func__, __builtin_return_address(0), __builtin_return_address(1), __builtin_return_address(2), __builtin_return_address(3)); 343 for (;;) { 344 chan = get_pending_event_disabled(ws, waitfor, waitfor2); // get our event 345 if (chan) { 346 *retchannel = chan; 347 *retclosure = chan->closure; 348 chan->wait_for = NULL; 349 chan->token = 0; 350 if (chan->persistent) { 351 reregister_channel(ws, chan, handle); 352 } else { 353 waitset_chan_deregister_disabled(chan, handle); 354 } 355 wake_up_other_thread(handle, ws); 356 // debug_printf("%s.%d: %p\n", __func__, __LINE__, retclosure->handler); 357 return SYS_ERR_OK; 358 } 359 chan = ws->pending; // check a pending queue 360 if (!chan) { // if nothing then wait 361 thread_block_disabled(handle, &ws->waiting_threads); 362 disp_disable(); 363 } else { // something but it's not our event 364 if (!ws->waiting_threads) { // no other thread interested in 365 dequeue(&ws->pending, chan); 366 enqueue(&ws->waiting, chan); 367 chan->state = CHAN_WAITING; 368 chan->waitset = ws; 369 } else { 370 // find a matching thread 371 struct thread *t; 372 for (t = ws->waiting_threads; t; ) { 373 if (waitset_can_receive(chan, t)) { // match found, wake it 374 ws->waiting_threads = t; 375 t = thread_unblock_one_disabled(handle, 376 &ws->waiting_threads, chan); 377 assert_disabled(t == NULL); // shouldn't see a remote thread 378 break; 379 } 380 t = t->next; 381 if (t == ws->waiting_threads) { // no recipient found 382 dequeue(&ws->pending, chan); 383 enqueue(&ws->waiting, chan); 384 chan->state = CHAN_WAITING; 385 chan->waitset = ws; 386 break; 387 } 388 } 389 } 390 } 391 } 392} 393 394/** 395 * \brief Wait for (block) and return next event on given waitset 396 * 397 * Wait until something happens, either activity on some channel, or a deferred 398 * call, and then return the corresponding closure. This is the core of the 399 * event-handling system. 400 * 401 * \param ws Waitset 402 * \param retclosure Pointer to storage space for returned event closure 403 */ 404errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure) 405{ 406 dispatcher_handle_t handle = disp_disable(); 407 struct waitset_chanstate *channel; 408 errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL, NULL, 409 handle, false); 410 disp_enable(handle); 411 return err; 412} 413 414 415 416/** 417 * \brief Check if there is an event pending on given waitset 418 * 419 * This is essentially a non-blocking variant of get_next_event(). It should be 420 * used with great care, to avoid the creation of busy-waiting loops. 421 * 422 * \param ws Waitset 423 * 424 * \returns LIB_ERR_NO_EVENT if nothing is pending 425 */ 426static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle) 427{ 428 struct waitset_chanstate *chan; 429 430 poll_channels_disabled(handle); 431 chan = get_pending_event_disabled(ws, NULL, NULL); 432 if (chan != NULL) { 433 return SYS_ERR_OK; 434 } 435 return LIB_ERR_NO_EVENT; 436} 437 438errval_t check_for_event(struct waitset *ws) 439{ 440 errval_t err; 441 442 assert(ws != NULL); 443 dispatcher_handle_t handle = disp_disable(); 444 err = check_for_event_disabled(ws, handle); 445 disp_enable(handle); 446 return err; 447} 448 449/** 450 * \brief Wait for (block) and dispatch next event on given waitset 451 * 452 * Wait until something happens, either activity on some channel, or deferred 453 * call, and then call the corresponding closure. 454 * 455 * \param ws Waitset 456 */ 457 458errval_t event_dispatch(struct waitset *ws) 459{ 460 struct event_closure closure; 461 errval_t err = get_next_event(ws, &closure); 462 if (err_is_fail(err)) { 463 return err; 464 } 465 466 assert(closure.handler != NULL); 467 closure.handler(closure.arg); 468 return SYS_ERR_OK; 469} 470 471errval_t event_dispatch_debug(struct waitset *ws) 472{ 473 struct event_closure closure; 474 struct waitset_chanstate *channel; 475 dispatcher_handle_t handle = disp_disable(); 476 errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL, 477 handle, true); 478 disp_enable(handle); 479 if (err_is_fail(err)) { 480 return err; 481 } 482 483 assert(closure.handler != NULL); 484 closure.handler(closure.arg); 485 return SYS_ERR_OK; 486} 487 488/** 489 * \brief Dispatch events until a specific event is received 490 * 491 * Wait for events and dispatch them. If a specific event comes, don't call 492 * a closure, just return. 493 * 494 * \param ws Waitset 495 * \param waitfor Event, that we are waiting for 496 * \param error_var Error variable that can be changed by closures 497 */ 498 499errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor, 500 errval_t *error_var) 501{ 502 assert(waitfor->waitset == ws); 503 thread_set_local_trigger(NULL); 504 for (;;) { 505 struct event_closure closure; 506 struct waitset_chanstate *channel, *trigger; 507 508 trigger = thread_get_local_trigger(); 509 dispatcher_handle_t handle = disp_disable(); 510 errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor, 511 trigger ? trigger: waitfor->trigger, handle, false); 512 disp_enable(handle); 513 if (err_is_fail(err)) { 514 assert(0); 515 return err; 516 } 517 if (channel == waitfor) 518 return SYS_ERR_OK; 519 assert(!channel->wait_for); 520 assert(closure.handler != NULL); 521 closure.handler(closure.arg); 522 if (err_is_fail(*error_var)) 523 return *error_var; 524 } 525} 526 527/** 528 * \brief check and dispatch next event on given waitset 529 * 530 * Check if there is any pending activity on some channel, or deferred 531 * call, and then call the corresponding closure. 532 * 533 * Do not wait! In case of no pending events, return err LIB_ERR_NO_EVENT. 534 * 535 * \param ws Waitset 536 */ 537errval_t event_dispatch_non_block(struct waitset *ws) 538{ 539 struct waitset_chanstate *channel; 540 struct event_closure closure; 541 542 assert(ws != NULL); 543 544 // are there any pending events on the waitset? 545 dispatcher_handle_t handle = disp_disable(); 546 errval_t err = check_for_event_disabled(ws, handle); 547 if (err_is_fail(err)) { 548 disp_enable(handle); 549 return err; 550 } 551 err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL, handle, 552 false); 553 if (err_is_fail(err)) 554 return err; 555 disp_enable(handle); 556 assert(closure.handler != NULL); 557 closure.handler(closure.arg); 558 return SYS_ERR_OK; 559} 560 561 562/** 563 * \privatesection 564 * "Private" functions that are called only by the channel implementations 565 */ 566 567/** 568 * \brief Initialise per-channel waitset state 569 * 570 * \param chan Channel state 571 * \param chantype Channel type 572 */ 573void waitset_chanstate_init(struct waitset_chanstate *chan, 574 enum ws_chantype chantype) 575{ 576 assert(chan != NULL); 577 chan->waitset = NULL; 578 chan->chantype = chantype; 579 chan->state = CHAN_UNREGISTERED; 580#ifndef NDEBUG 581 chan->prev = chan->next = NULL; 582#endif 583 chan->persistent = false; 584 chan->token = 0; 585 chan->wait_for = NULL; 586 chan->trigger = NULL; 587} 588 589/** 590 * \brief Destroy previously-initialised per-channel waitset state 591 * \param chan Channel state 592 */ 593void waitset_chanstate_destroy(struct waitset_chanstate *chan) 594{ 595 assert(chan != NULL); 596 if (chan->waitset != NULL) { 597 errval_t err = waitset_chan_deregister(chan); 598 assert(err_is_ok(err)); // can't fail if registered 599 } 600} 601 602/** 603 * \brief Register a closure to be called when a channel is triggered 604 * 605 * In the Future, call the closure on a thread associated with the waitset 606 * when the channel is triggered. Only one closure may be registered per 607 * channel state at any one time. 608 * This function must only be called when disabled. 609 * 610 * \param ws Waitset 611 * \param chan Waitset's per-channel state 612 * \param closure Event handler 613 */ 614errval_t waitset_chan_register_disabled(struct waitset *ws, 615 struct waitset_chanstate *chan, 616 struct event_closure closure) 617{ 618 if (chan->waitset != NULL) { 619 return LIB_ERR_CHAN_ALREADY_REGISTERED; 620 } 621 622 chan->waitset = ws; 623 chan->token = 0; 624 625 // channel must not already be registered! 626 assert_disabled(chan->next == NULL && chan->prev == NULL); 627 assert_disabled(chan->state == CHAN_UNREGISTERED); 628 629 // this is probably insane! :) 630 // assert_disabled(closure.handler != NULL); 631 632 // store closure 633 chan->closure = closure; 634 635 // enqueue this channel on the waitset's queue of idle channels 636 enqueue(&ws->idle, chan); 637 chan->state = CHAN_IDLE; 638 639 return SYS_ERR_OK; 640} 641 642/** 643 * \brief Register a closure on a channel, and mark the channel as polled 644 * 645 * In the Future, call the closure on a thread associated with the waitset 646 * when the channel is triggered. Only one closure may be registered per 647 * channel state at any one time. Additionally, mark the channel as polled. 648 * This function must only be called when disabled. 649 * 650 * \param ws Waitset 651 * \param chan Waitset's per-channel state 652 * \param closure Event handler 653 * \param disp Current dispatcher pointer 654 */ 655errval_t waitset_chan_register_polled_disabled(struct waitset *ws, 656 struct waitset_chanstate *chan, 657 struct event_closure closure, 658 dispatcher_handle_t handle) 659{ 660 if (chan->waitset != NULL) { 661 return LIB_ERR_CHAN_ALREADY_REGISTERED; 662 } 663 664 chan->waitset = ws; 665 chan->token = 0; 666 667 // channel must not already be registered! 668 assert_disabled(chan->next == NULL && chan->prev == NULL); 669 assert_disabled(chan->state == CHAN_UNREGISTERED); 670 671 // store closure 672 chan->closure = closure; 673 674 // enqueue this channel on the waitset's queue of polled channels 675 enqueue(&ws->polled, chan); 676 chan->state = CHAN_POLLED; 677 enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan); 678 679 return SYS_ERR_OK; 680} 681 682/** 683 * \brief Register a closure to be called when a channel is triggered 684 * 685 * In the Future, call the closure on a thread associated with the waitset 686 * when the channel is triggered. Only one closure may be registered per 687 * channel state at any one time. 688 * This function must only be called when enabled. 689 * 690 * \param ws Waitset 691 * \param chan Waitset's per-channel state 692 * \param closure Event handler 693 */ 694errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan, 695 struct event_closure closure) 696{ 697 dispatcher_handle_t handle = disp_disable(); 698 errval_t err = waitset_chan_register_disabled(ws, chan, closure); 699 disp_enable(handle); 700 return err; 701} 702 703/** 704 * \brief Register a closure on a channel, and mark the channel as polled 705 * 706 * In the Future, call the closure on a thread associated with the waitset 707 * when the channel is triggered. Only one closure may be registered per 708 * channel state at any one time. Additionally, mark the channel as polled. 709 * This function must only be called when enabled. It is equivalent to 710 * calling waitset_chan_register() followed by waitset_chan_start_polling(). 711 * 712 * \param ws Waitset 713 * \param chan Waitset's per-channel state 714 * \param closure Event handler 715 */ 716errval_t waitset_chan_register_polled(struct waitset *ws, 717 struct waitset_chanstate *chan, 718 struct event_closure closure) 719{ 720 dispatcher_handle_t handle = disp_disable(); 721 errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle); 722 disp_enable(handle); 723 return err; 724} 725 726/** 727 * \brief Cancel a previous callback registration 728 * 729 * Remove the registration for a callback on the given channel. 730 * This function must only be called when disabled. 731 * 732 * \param chan Waitset's per-channel state 733 */ 734errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan, 735 dispatcher_handle_t handle) 736{ 737 assert_disabled(chan != NULL); 738 struct waitset *ws = chan->waitset; 739 if (ws == NULL) { 740 return LIB_ERR_CHAN_NOT_REGISTERED; 741 } 742 743 // remove this channel from the queue in which it is waiting 744 chan->waitset = NULL; 745 assert_disabled(chan->next != NULL && chan->prev != NULL); 746 747 switch (chan->state) { 748 case CHAN_IDLE: 749 dequeue(&ws->idle, chan); 750 break; 751 752 case CHAN_POLLED: 753 dequeue(&ws->polled, chan); 754 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan); 755 break; 756 757 case CHAN_PENDING: 758 dequeue(&ws->pending, chan); 759 break; 760 761 case CHAN_WAITING: 762 dequeue(&ws->waiting, chan); 763 break; 764 765 default: 766 assert_disabled(!"invalid channel state in deregister"); 767 } 768 chan->state = CHAN_UNREGISTERED; 769 chan->wait_for = NULL; 770 return SYS_ERR_OK; 771} 772 773/** 774 * \brief Cancel a previous callback registration 775 * 776 * Remove the registration for a callback on the given channel. 777 * This function must only be called when enabled. 778 * 779 * \param chan Waitset's per-channel state 780 */ 781errval_t waitset_chan_deregister(struct waitset_chanstate *chan) 782{ 783 dispatcher_handle_t handle = disp_disable(); 784 errval_t err = waitset_chan_deregister_disabled(chan, handle); 785 disp_enable(handle); 786 return err; 787} 788 789/** 790 * \brief Migrate callback registrations to a new waitset. 791 * 792 * \param chan Old waitset's per-channel state to migrate 793 * \param new_ws New waitset to migrate to 794 */ 795void waitset_chan_migrate(struct waitset_chanstate *chan, 796 struct waitset *new_ws) 797{ 798 struct waitset *ws = chan->waitset; 799 800 // Only when registered 801 if(ws == NULL) { 802 return; 803 } 804 805 switch(chan->state) { 806 case CHAN_IDLE: 807 dequeue(&ws->idle, chan); 808 enqueue(&new_ws->idle, chan); 809 break; 810 811 case CHAN_POLLED: 812 dequeue(&ws->polled, chan); 813 enqueue(&new_ws->polled, chan); 814 break; 815 816 case CHAN_PENDING: 817 dequeue(&ws->pending, chan); 818 enqueue(&new_ws->pending, chan); 819 break; 820 821 case CHAN_WAITING: 822 dequeue(&ws->waiting, chan); 823 enqueue(&new_ws->waiting, chan); 824 break; 825 826 case CHAN_UNREGISTERED: 827 // Do nothing 828 break; 829 } 830 831 // Remember new waitset association 832 chan->waitset = new_ws; 833} 834 835/** 836 * \brief Trigger an event callback on a channel 837 * 838 * Marks the given channel as having a pending event, causing some future call 839 * to get_next_event() to return the registered closure. 840 * This function must only be called when disabled. 841 * 842 * \param chan Waitset's per-channel state 843 * \param disp Current dispatcher pointer 844 */ 845errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan, 846 dispatcher_handle_t handle) 847{ 848 assert_disabled(chan != NULL); 849 struct waitset *ws = chan->waitset; 850 assert_disabled(ws != NULL); 851 assert_disabled(chan->prev != NULL && chan->next != NULL); 852 853 // no-op if already pending 854 if (chan->state == CHAN_PENDING) { 855 return SYS_ERR_OK; 856 } 857 858 // remove from previous queue (either idle or polled) 859 if (chan->state == CHAN_IDLE) { 860 dequeue(&ws->idle, chan); 861 } else { 862 assert_disabled(chan->state == CHAN_POLLED); 863 dequeue(&ws->polled, chan); 864 dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan); 865 } 866 867 // else mark channel pending and move to end of pending event queue 868 enqueue(&ws->pending, chan); 869 chan->state = CHAN_PENDING; 870 871 // is there a thread blocked on this waitset? if so, awaken it with the event 872 struct thread *thread = find_recipient(ws, chan, thread_self_disabled()); 873 if (thread) { 874 struct thread *t; 875 ws->waiting_threads = thread; 876 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan); 877 assert_disabled(t == NULL); 878 } 879 return SYS_ERR_OK; 880} 881 882/** 883 * \brief Trigger an event callback on a channel 884 * 885 * Marks the given channel as having a pending event, causing some future call 886 * to get_next_event() to return the registered closure. 887 * This function must only be called when enabled. 888 * 889 * \param chan Waitset's per-channel state 890 * \param disp Current dispatcher pointer 891 */ 892errval_t waitset_chan_trigger(struct waitset_chanstate *chan) 893{ 894 dispatcher_handle_t handle = disp_disable(); 895 errval_t err = waitset_chan_trigger_disabled(chan, handle); 896 disp_enable(handle); 897 return err; 898} 899 900/** 901 * \brief Trigger a specific event callback on an unregistered channel 902 * 903 * This function is equivalent to waitset_chan_register_disabled() immediately 904 * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue 905 * manipulation. This function must only be called when disabled. 906 * 907 * \param ws Waitset 908 * \param chan Waitset's per-channel state 909 * \param closure Event handler 910 * \param disp Current dispatcher pointer 911 */ 912errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws, 913 struct waitset_chanstate *chan, 914 struct event_closure closure, 915 dispatcher_handle_t handle) 916{ 917 assert_disabled(chan != NULL); 918 assert_disabled(ws != NULL); 919 920 // check if already registered 921 if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) { 922 return LIB_ERR_CHAN_ALREADY_REGISTERED; 923 } 924 925 assert_disabled(chan->prev == NULL && chan->next == NULL); 926 927 // set closure 928 chan->closure = closure; 929 930 // mark channel pending and place on end of pending event queue 931 chan->waitset = ws; 932 enqueue(&ws->pending, chan); 933 // if (first) 934 // ws->pending = chan; 935 chan->state = CHAN_PENDING; 936 937 // is there a thread blocked on this waitset? if so, awaken it with the event 938 struct thread *thread = find_recipient(ws, chan, thread_self_disabled()); 939 if (thread) { 940 struct thread *t; 941 ws->waiting_threads = thread; 942 t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan); 943 assert_disabled(t == NULL); 944 } 945 return SYS_ERR_OK; 946} 947 948 949/** 950 * \brief Trigger a specific event callback on an unregistered channel 951 * 952 * This function is equivalent to waitset_chan_register() 953 * followed by waitset_chan_trigger(), but avoids unneccessary queue 954 * manipulation. This function must only be called when enabled. 955 * 956 * \param ws Waitset 957 * \param chan Waitset's per-channel state 958 * \param closure Event handler 959 */ 960errval_t waitset_chan_trigger_closure(struct waitset *ws, 961 struct waitset_chanstate *chan, 962 struct event_closure closure) 963{ 964 dispatcher_handle_t disp = disp_disable(); 965 errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp); 966 disp_enable(disp); 967 return err; 968} 969