1/**
2 * \file
3 * \brief Waitset and low-level event handling mechanism
4 *
5 * A "wait set" is a collection of channels to wait on, much like an
6 * FDSET in POSIX. There should be a default, static wait set for each
7 * dispatcher. Threads which wait for events specify the wait set they
8 * are waiting on.
9 */
10
11/*
12 * Copyright (c) 2009-2012, ETH Zurich.
13 * Copyright (c) 2015, Hewlett Packard Enterprise Development LP.
14 * All rights reserved.
15 *
16 * This file is distributed under the terms in the attached LICENSE file.
17 * If you do not find this file, copies can be found by writing to:
18 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
19 */
20
21#include <barrelfish/barrelfish.h>
22#include <barrelfish/waitset.h>
23#include <barrelfish/waitset_chan.h>
24#include <barrelfish/threads.h>
25#include <barrelfish/dispatch.h>
26#include "threads_priv.h"
27#include "waitset_chan_priv.h"
28#include <stdio.h>
29#include <string.h>
30
31#include <flounder/flounder.h>
32
33#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
34#  include <barrelfish/ump_endpoint.h>
35#endif
36
37/// Dequeue a chanstate from a queue
38static void dequeue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
39{
40    if (chan->next == chan) {
41        assert(chan->prev == chan);
42        assert(*queue == chan);
43        *queue = NULL;
44    } else {
45        chan->prev->next = chan->next;
46        chan->next->prev = chan->prev;
47        if (*queue == chan) {
48            *queue = chan->next;
49        }
50    }
51    chan->prev = chan->next = NULL;
52}
53
54/// Enqueue a chanstate on a queue
55static void enqueue(struct waitset_chanstate **queue, struct waitset_chanstate *chan)
56{
57    if (*queue == NULL) {
58        *queue = chan;
59        chan->next = chan->prev = chan;
60    } else {
61        chan->next = *queue;
62        chan->prev = (*queue)->prev;
63        chan->next->prev = chan;
64        chan->prev->next = chan;
65    }
66}
67
68/// Dequeue a chanstate from polled queue
69static void dequeue_polled(struct waitset_chanstate **queue,
70                            struct waitset_chanstate *chan)
71{
72    if (chan->polled_next == chan) {
73        assert(chan->polled_prev == chan);
74        assert(*queue == chan);
75        *queue = NULL;
76    } else {
77        chan->polled_prev->polled_next = chan->polled_next;
78        chan->polled_next->polled_prev = chan->polled_prev;
79        if (*queue == chan) {
80            *queue = chan->polled_next;
81        }
82    }
83    chan->polled_prev = chan->polled_next = NULL;
84}
85
86/// Enqueue a chanstate on polled queue
87static void enqueue_polled(struct waitset_chanstate **queue,
88                            struct waitset_chanstate *chan)
89{
90    if (*queue == NULL) {
91        *queue = chan;
92        chan->polled_next = chan->polled_prev = chan;
93    } else {
94        chan->polled_next = *queue;
95        chan->polled_prev = (*queue)->polled_prev;
96        chan->polled_next->polled_prev = chan;
97        chan->polled_prev->polled_next = chan;
98    }
99}
100
101/**
102 * \brief Initialise a new waitset
103 */
104void waitset_init(struct waitset *ws)
105{
106    assert(ws != NULL);
107    ws->pending = ws->polled = ws->idle = ws->waiting = NULL;
108    ws->waiting_threads = NULL;
109}
110
111/**
112 * \brief Destroy a previously initialised waitset
113 */
114errval_t waitset_destroy(struct waitset *ws)
115{
116    assert(ws != NULL);
117
118    // FIXME: do we want to support cancelling all the pending events/channels?
119    if (ws->pending || ws->waiting_threads) {
120        return LIB_ERR_WAITSET_IN_USE;
121    }
122
123    // remove idle and polled channels from waitset
124    struct waitset_chanstate *chan, *next;
125    for (chan = ws->idle; chan != NULL; chan = next) {
126        next = chan->next;
127        assert(chan->state == CHAN_IDLE);
128        assert(chan->waitset == ws);
129        chan->waitset = NULL;
130        chan->next = chan->prev = NULL;
131
132        if (next == ws->idle) {
133            break;
134        }
135    }
136    ws->idle = NULL;
137
138    for (chan = ws->polled; chan != NULL; chan = next) {
139        next = chan->next;
140        assert(chan->state == CHAN_POLLED);
141        assert(chan->waitset == ws);
142        chan->waitset = NULL;
143        chan->next = chan->prev = NULL;
144
145        if (next == ws->polled) {
146            break;
147        }
148    }
149    ws->polled = NULL;
150
151    return SYS_ERR_OK;
152}
153
154/// Check if the thread can receive the event
155static bool waitset_can_receive(struct waitset_chanstate *chan,
156                                struct thread *thread)
157{
158    bool res = false;
159
160    if (chan->wait_for) // if a thread is waiting for this specific event
161        res = chan->wait_for == thread;
162    else
163        res = (chan->token & 1 && !thread->token) // incoming token is a request
164            // and a thread is not waiting for a token
165            || (!chan->token && chan != thread->channel) // there's no token
166            // and a thread is not waiting specifically for that event
167            || (chan->token == thread->token && chan == thread->channel);
168            // there is a token and it matches thread's token and event
169    return res;
170}
171
172/// Returns a channel with a pending event on the given waitset matching
173/// our thread
174static struct waitset_chanstate *get_pending_event_disabled(struct waitset *ws,
175          struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2)
176{
177    struct thread *me = thread_self_disabled();
178
179    if (waitfor) { // channel that we wait for
180        if ((waitfor->state == CHAN_PENDING || waitfor->state == CHAN_WAITING)
181            && waitset_can_receive(waitfor, me)) {
182            return waitfor;
183        }
184        if (waitfor2 && (waitfor2->state == CHAN_PENDING || waitfor2->state == CHAN_WAITING)
185            && waitset_can_receive(waitfor2, me)) {
186            return waitfor2;
187        }
188        return NULL;
189    }
190    struct waitset_chanstate *chan;
191    // check a waiting queue for matching event
192    for (chan = ws->waiting; chan; ) {
193        if (waitset_can_receive(chan, me)) {
194            assert_disabled(chan->state == CHAN_WAITING);
195            return chan;
196        }
197        chan = chan->next;
198        if (chan == ws->waiting)
199            break;
200    }
201    // check a pending queue for matching event
202    for (chan = ws->pending; chan;) {
203        if (waitset_can_receive(chan, me)) {
204            assert_disabled(chan->state == CHAN_PENDING);
205            return chan;
206        }
207        chan = chan->next;
208        if (chan == ws->pending)
209            break;
210    }
211    return NULL;
212}
213
214void arranet_polling_loop_proxy(void) __attribute__((weak));
215void arranet_polling_loop_proxy(void)
216{
217    USER_PANIC("Network polling not available without Arranet!\n");
218}
219
220void poll_ahci(struct waitset_chanstate *) __attribute__((weak));
221void poll_ahci(struct waitset_chanstate *chan)
222{
223    errval_t err = waitset_chan_trigger(chan);
224    assert(err_is_ok(err)); // should not be able to fail
225}
226
227/// Check polled channels
228void poll_channels_disabled(dispatcher_handle_t handle) {
229    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
230    struct waitset_chanstate *chan;
231
232    if (!dp->polled_channels)
233        return;
234    chan = dp->polled_channels;
235    do {
236        switch (chan->chantype) {
237#ifdef CONFIG_INTERCONNECT_DRIVER_UMP
238        case CHANTYPE_UMP_IN: {
239            if (ump_endpoint_poll(chan)) {
240                errval_t err = waitset_chan_trigger_disabled(chan, handle);
241                assert(err_is_ok(err)); // should not fail
242                if (!dp->polled_channels) // restart scan
243                    return;
244                chan = dp->polled_channels;
245                continue;
246            } else
247                chan = chan->polled_next;
248        } break;
249#endif // CONFIG_INTERCONNECT_DRIVER_UMP
250        case CHANTYPE_LWIP_SOCKET:
251            arranet_polling_loop_proxy();
252            break;
253        case CHANTYPE_AHCI:
254            poll_ahci(chan);
255            break;
256        default:
257            assert(!"invalid channel type to poll!");
258        }
259    } while (chan != dp->polled_channels);
260}
261
262/// Re-register a channel (if persistent)
263static void reregister_channel(struct waitset *ws, struct waitset_chanstate *chan,
264                                dispatcher_handle_t handle)
265{
266    assert(chan->waitset == ws);
267    if (chan->state == CHAN_PENDING) {
268        dequeue(&ws->pending, chan);
269    } else {
270        assert(chan->state == CHAN_WAITING);
271        dequeue(&ws->waiting, chan);
272    }
273
274    chan->token = 0;
275    if (chan->chantype == CHANTYPE_UMP_IN
276        || chan->chantype == CHANTYPE_LWIP_SOCKET
277        || chan->chantype == CHANTYPE_AHCI) {
278        enqueue(&ws->polled, chan);
279        enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
280        chan->state = CHAN_POLLED;
281    } else {
282        enqueue(&ws->idle, chan);
283        chan->state = CHAN_IDLE;
284    }
285}
286
287/// Find a thread that is able to receive an event
288static struct thread * find_recipient(struct waitset *ws,
289                        struct waitset_chanstate *channel, struct thread *me)
290{
291    struct thread *t = ws->waiting_threads;
292
293    if (!t)
294        return NULL;
295    do {
296        if (waitset_can_receive(channel, t))
297            return t;
298        t = t->next;
299    } while (t != ws->waiting_threads);
300    return ws->waiting_threads;
301}
302
303/// Wake up other thread if there's more pending events
304static void wake_up_other_thread(dispatcher_handle_t handle, struct waitset *ws)
305{
306    if (ws->pending && ws->waiting_threads) {
307        struct thread *t;
308
309        t = thread_unblock_one_disabled(handle, &ws->waiting_threads, NULL);
310        assert_disabled(t == NULL); // shouldn't see a remote thread
311    }
312}
313
314/**
315 * \brief Get next pending event
316 *
317 * Check if there is a pending event that matches current thread and return it.
318 * Pending events are in a pending queue and in a waiting queue.
319 * A pending event then will be removed from a pending/waiting queue and become
320 * unregistered or, if it's persistent, will be re-registered to an idle queue
321 * or a polled queue (UMP channels) of a waitset.
322 * If there's no pending event, block this thread.
323 * If there's a pending event but it doesn't match our thread, don't remove it
324 * from a pending queue and wake up a matching thread.
325 * If there's no matching thread, add it to a waiting queue.
326 *
327 * \param ws Waitset with sources of events
328 * \param retchannel Holder of returned event
329 * \param retclosure Holder of returned closure
330 * \param waitfor Specific event that we're waiting for (can be NULL)
331 * \param handle Dispatcher's handle
332 * \param debug Debug mode (not used)
333 */
334
335errval_t get_next_event_disabled(struct waitset *ws,
336    struct waitset_chanstate **retchannel, struct event_closure *retclosure,
337    struct waitset_chanstate *waitfor, struct waitset_chanstate *waitfor2,
338    dispatcher_handle_t handle, bool debug)
339{
340    struct waitset_chanstate * chan;
341
342// debug_printf("%s: %p %p %p %p\n", __func__, __builtin_return_address(0), __builtin_return_address(1), __builtin_return_address(2), __builtin_return_address(3));
343    for (;;) {
344        chan = get_pending_event_disabled(ws, waitfor, waitfor2); // get our event
345        if (chan) {
346            *retchannel = chan;
347            *retclosure = chan->closure;
348            chan->wait_for = NULL;
349            chan->token = 0;
350            if (chan->persistent) {
351                reregister_channel(ws, chan, handle);
352            } else {
353                waitset_chan_deregister_disabled(chan, handle);
354            }
355            wake_up_other_thread(handle, ws);
356    // debug_printf("%s.%d: %p\n", __func__, __LINE__, retclosure->handler);
357            return SYS_ERR_OK;
358        }
359        chan = ws->pending; // check a pending queue
360        if (!chan) { // if nothing then wait
361            thread_block_disabled(handle, &ws->waiting_threads);
362            disp_disable();
363        } else { // something but it's not our event
364            if (!ws->waiting_threads) { // no other thread interested in
365                dequeue(&ws->pending, chan);
366                enqueue(&ws->waiting, chan);
367                chan->state = CHAN_WAITING;
368                chan->waitset = ws;
369            } else {
370                // find a matching thread
371                struct thread *t;
372                for (t = ws->waiting_threads; t; ) {
373                    if (waitset_can_receive(chan, t)) { // match found, wake it
374                        ws->waiting_threads = t;
375                        t = thread_unblock_one_disabled(handle,
376                                                    &ws->waiting_threads, chan);
377                        assert_disabled(t == NULL); // shouldn't see a remote thread
378                        break;
379                    }
380                    t = t->next;
381                    if (t == ws->waiting_threads) { // no recipient found
382                        dequeue(&ws->pending, chan);
383                        enqueue(&ws->waiting, chan);
384                        chan->state = CHAN_WAITING;
385                        chan->waitset = ws;
386                        break;
387                    }
388                }
389            }
390        }
391    }
392}
393
394/**
395 * \brief Wait for (block) and return next event on given waitset
396 *
397 * Wait until something happens, either activity on some channel, or a deferred
398 * call, and then return the corresponding closure. This is the core of the
399 * event-handling system.
400 *
401 * \param ws Waitset
402 * \param retclosure Pointer to storage space for returned event closure
403 */
404errval_t get_next_event(struct waitset *ws, struct event_closure *retclosure)
405{
406    dispatcher_handle_t handle = disp_disable();
407    struct waitset_chanstate *channel;
408    errval_t err = get_next_event_disabled(ws, &channel, retclosure, NULL, NULL,
409                                            handle, false);
410    disp_enable(handle);
411    return err;
412}
413
414
415
416/**
417 * \brief Check if there is an event pending on given waitset
418 *
419 * This is essentially a non-blocking variant of get_next_event(). It should be
420 * used with great care, to avoid the creation of busy-waiting loops.
421 *
422 * \param ws Waitset
423 *
424 * \returns LIB_ERR_NO_EVENT if nothing is pending
425 */
426static errval_t check_for_event_disabled(struct waitset *ws, dispatcher_handle_t handle)
427{
428    struct waitset_chanstate *chan;
429
430    poll_channels_disabled(handle);
431    chan = get_pending_event_disabled(ws, NULL, NULL);
432    if (chan != NULL) {
433        return SYS_ERR_OK;
434    }
435    return LIB_ERR_NO_EVENT;
436}
437
438errval_t check_for_event(struct waitset *ws)
439{
440    errval_t err;
441
442    assert(ws != NULL);
443    dispatcher_handle_t handle = disp_disable();
444    err = check_for_event_disabled(ws, handle);
445    disp_enable(handle);
446    return err;
447}
448
449/**
450 * \brief Wait for (block) and dispatch next event on given waitset
451 *
452 * Wait until something happens, either activity on some channel, or deferred
453 * call, and then call the corresponding closure.
454 *
455 * \param ws Waitset
456 */
457
458errval_t event_dispatch(struct waitset *ws)
459{
460    struct event_closure closure;
461    errval_t err = get_next_event(ws, &closure);
462    if (err_is_fail(err)) {
463        return err;
464    }
465
466    assert(closure.handler != NULL);
467    closure.handler(closure.arg);
468    return SYS_ERR_OK;
469}
470
471errval_t event_dispatch_debug(struct waitset *ws)
472{
473    struct event_closure closure;
474    struct waitset_chanstate *channel;
475    dispatcher_handle_t handle = disp_disable();
476    errval_t err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL,
477                                           handle, true);
478    disp_enable(handle);
479    if (err_is_fail(err)) {
480        return err;
481    }
482
483    assert(closure.handler != NULL);
484    closure.handler(closure.arg);
485    return SYS_ERR_OK;
486}
487
488/**
489 * \brief Dispatch events until a specific event is received
490 *
491 * Wait for events and dispatch them. If a specific event comes, don't call
492 * a closure, just return.
493 *
494 * \param ws Waitset
495 * \param waitfor Event, that we are waiting for
496 * \param error_var Error variable that can be changed by closures
497 */
498
499errval_t wait_for_channel(struct waitset *ws, struct waitset_chanstate *waitfor,
500                          errval_t *error_var)
501{
502    assert(waitfor->waitset == ws);
503    thread_set_local_trigger(NULL);
504    for (;;) {
505        struct event_closure closure;
506        struct waitset_chanstate *channel, *trigger;
507
508        trigger = thread_get_local_trigger();
509        dispatcher_handle_t handle = disp_disable();
510        errval_t err = get_next_event_disabled(ws, &channel, &closure, waitfor,
511            trigger ? trigger: waitfor->trigger, handle, false);
512        disp_enable(handle);
513        if (err_is_fail(err)) {
514            assert(0);
515            return err;
516        }
517        if (channel == waitfor)
518            return SYS_ERR_OK;
519        assert(!channel->wait_for);
520        assert(closure.handler != NULL);
521        closure.handler(closure.arg);
522        if (err_is_fail(*error_var))
523            return *error_var;
524    }
525}
526
527/**
528 * \brief check and dispatch next event on given waitset
529 *
530 * Check if there is any pending activity on some channel, or deferred
531 * call, and then call the corresponding closure.
532 *
533 * Do not wait!  In case of no pending events, return err LIB_ERR_NO_EVENT.
534 *
535 * \param ws Waitset
536 */
537errval_t event_dispatch_non_block(struct waitset *ws)
538{
539    struct waitset_chanstate *channel;
540    struct event_closure closure;
541
542    assert(ws != NULL);
543
544    // are there any pending events on the waitset?
545    dispatcher_handle_t handle = disp_disable();
546    errval_t err = check_for_event_disabled(ws, handle);
547    if (err_is_fail(err)) {
548        disp_enable(handle);
549        return err;
550    }
551    err = get_next_event_disabled(ws, &channel, &closure, NULL, NULL, handle,
552                                            false);
553    if (err_is_fail(err))
554        return err;
555    disp_enable(handle);
556    assert(closure.handler != NULL);
557    closure.handler(closure.arg);
558    return SYS_ERR_OK;
559}
560
561
562/**
563 * \privatesection
564 * "Private" functions that are called only by the channel implementations
565 */
566
567/**
568 * \brief Initialise per-channel waitset state
569 *
570 * \param chan Channel state
571 * \param chantype Channel type
572 */
573void waitset_chanstate_init(struct waitset_chanstate *chan,
574                            enum ws_chantype chantype)
575{
576    assert(chan != NULL);
577    chan->waitset = NULL;
578    chan->chantype = chantype;
579    chan->state = CHAN_UNREGISTERED;
580#ifndef NDEBUG
581    chan->prev = chan->next = NULL;
582#endif
583    chan->persistent = false;
584    chan->token = 0;
585    chan->wait_for = NULL;
586    chan->trigger = NULL;
587}
588
589/**
590 * \brief Destroy previously-initialised per-channel waitset state
591 * \param chan Channel state
592 */
593void waitset_chanstate_destroy(struct waitset_chanstate *chan)
594{
595    assert(chan != NULL);
596    if (chan->waitset != NULL) {
597        errval_t err = waitset_chan_deregister(chan);
598        assert(err_is_ok(err)); // can't fail if registered
599    }
600}
601
602/**
603 * \brief Register a closure to be called when a channel is triggered
604 *
605 * In the Future, call the closure on a thread associated with the waitset
606 * when the channel is triggered. Only one closure may be registered per
607 * channel state at any one time.
608 * This function must only be called when disabled.
609 *
610 * \param ws Waitset
611 * \param chan Waitset's per-channel state
612 * \param closure Event handler
613 */
614errval_t waitset_chan_register_disabled(struct waitset *ws,
615                                        struct waitset_chanstate *chan,
616                                        struct event_closure closure)
617{
618    if (chan->waitset != NULL) {
619        return LIB_ERR_CHAN_ALREADY_REGISTERED;
620    }
621
622    chan->waitset = ws;
623    chan->token = 0;
624
625    // channel must not already be registered!
626    assert_disabled(chan->next == NULL && chan->prev == NULL);
627    assert_disabled(chan->state == CHAN_UNREGISTERED);
628
629    // this is probably insane! :)
630    // assert_disabled(closure.handler != NULL);
631
632    // store closure
633    chan->closure = closure;
634
635    // enqueue this channel on the waitset's queue of idle channels
636    enqueue(&ws->idle, chan);
637    chan->state = CHAN_IDLE;
638
639    return SYS_ERR_OK;
640}
641
642/**
643 * \brief Register a closure on a channel, and mark the channel as polled
644 *
645 * In the Future, call the closure on a thread associated with the waitset
646 * when the channel is triggered. Only one closure may be registered per
647 * channel state at any one time. Additionally, mark the channel as polled.
648 * This function must only be called when disabled.
649 *
650 * \param ws Waitset
651 * \param chan Waitset's per-channel state
652 * \param closure Event handler
653 * \param disp Current dispatcher pointer
654 */
655errval_t waitset_chan_register_polled_disabled(struct waitset *ws,
656                                               struct waitset_chanstate *chan,
657                                               struct event_closure closure,
658                                               dispatcher_handle_t handle)
659{
660    if (chan->waitset != NULL) {
661        return LIB_ERR_CHAN_ALREADY_REGISTERED;
662    }
663
664    chan->waitset = ws;
665    chan->token = 0;
666
667    // channel must not already be registered!
668    assert_disabled(chan->next == NULL && chan->prev == NULL);
669    assert_disabled(chan->state == CHAN_UNREGISTERED);
670
671    // store closure
672    chan->closure = closure;
673
674    // enqueue this channel on the waitset's queue of polled channels
675    enqueue(&ws->polled, chan);
676    chan->state = CHAN_POLLED;
677    enqueue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
678
679    return SYS_ERR_OK;
680}
681
682/**
683 * \brief Register a closure to be called when a channel is triggered
684 *
685 * In the Future, call the closure on a thread associated with the waitset
686 * when the channel is triggered. Only one closure may be registered per
687 * channel state at any one time.
688 * This function must only be called when enabled.
689 *
690 * \param ws Waitset
691 * \param chan Waitset's per-channel state
692 * \param closure Event handler
693 */
694errval_t waitset_chan_register(struct waitset *ws, struct waitset_chanstate *chan,
695                               struct event_closure closure)
696{
697    dispatcher_handle_t handle = disp_disable();
698    errval_t err = waitset_chan_register_disabled(ws, chan, closure);
699    disp_enable(handle);
700    return err;
701}
702
703/**
704 * \brief Register a closure on a channel, and mark the channel as polled
705 *
706 * In the Future, call the closure on a thread associated with the waitset
707 * when the channel is triggered. Only one closure may be registered per
708 * channel state at any one time. Additionally, mark the channel as polled.
709 * This function must only be called when enabled. It is equivalent to
710 * calling waitset_chan_register() followed by waitset_chan_start_polling().
711 *
712 * \param ws Waitset
713 * \param chan Waitset's per-channel state
714 * \param closure Event handler
715 */
716errval_t waitset_chan_register_polled(struct waitset *ws,
717                                      struct waitset_chanstate *chan,
718                                      struct event_closure closure)
719{
720    dispatcher_handle_t handle = disp_disable();
721    errval_t err = waitset_chan_register_polled_disabled(ws, chan, closure, handle);
722    disp_enable(handle);
723    return err;
724}
725
726/**
727 * \brief Cancel a previous callback registration
728 *
729 * Remove the registration for a callback on the given channel.
730 * This function must only be called when disabled.
731 *
732 * \param chan Waitset's per-channel state
733 */
734errval_t waitset_chan_deregister_disabled(struct waitset_chanstate *chan,
735                                          dispatcher_handle_t handle)
736{
737    assert_disabled(chan != NULL);
738    struct waitset *ws = chan->waitset;
739    if (ws == NULL) {
740        return LIB_ERR_CHAN_NOT_REGISTERED;
741    }
742
743    // remove this channel from the queue in which it is waiting
744    chan->waitset = NULL;
745    assert_disabled(chan->next != NULL && chan->prev != NULL);
746
747    switch (chan->state) {
748    case CHAN_IDLE:
749        dequeue(&ws->idle, chan);
750        break;
751
752    case CHAN_POLLED:
753        dequeue(&ws->polled, chan);
754        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
755        break;
756
757    case CHAN_PENDING:
758        dequeue(&ws->pending, chan);
759        break;
760
761    case CHAN_WAITING:
762        dequeue(&ws->waiting, chan);
763        break;
764
765    default:
766        assert_disabled(!"invalid channel state in deregister");
767    }
768    chan->state = CHAN_UNREGISTERED;
769    chan->wait_for = NULL;
770    return SYS_ERR_OK;
771}
772
773/**
774 * \brief Cancel a previous callback registration
775 *
776 * Remove the registration for a callback on the given channel.
777 * This function must only be called when enabled.
778 *
779 * \param chan Waitset's per-channel state
780 */
781errval_t waitset_chan_deregister(struct waitset_chanstate *chan)
782{
783    dispatcher_handle_t handle = disp_disable();
784    errval_t err = waitset_chan_deregister_disabled(chan, handle);
785    disp_enable(handle);
786    return err;
787}
788
789/**
790 * \brief Migrate callback registrations to a new waitset.
791 *
792 * \param chan Old waitset's per-channel state to migrate
793 * \param new_ws New waitset to migrate to
794 */
795void waitset_chan_migrate(struct waitset_chanstate *chan,
796                          struct waitset *new_ws)
797{
798    struct waitset *ws = chan->waitset;
799
800    // Only when registered
801    if(ws == NULL) {
802        return;
803    }
804
805    switch(chan->state) {
806    case CHAN_IDLE:
807        dequeue(&ws->idle, chan);
808        enqueue(&new_ws->idle, chan);
809        break;
810
811    case CHAN_POLLED:
812        dequeue(&ws->polled, chan);
813        enqueue(&new_ws->polled, chan);
814        break;
815
816    case CHAN_PENDING:
817        dequeue(&ws->pending, chan);
818        enqueue(&new_ws->pending, chan);
819        break;
820
821    case CHAN_WAITING:
822        dequeue(&ws->waiting, chan);
823        enqueue(&new_ws->waiting, chan);
824        break;
825
826    case CHAN_UNREGISTERED:
827        // Do nothing
828        break;
829    }
830
831    // Remember new waitset association
832    chan->waitset = new_ws;
833}
834
835/**
836 * \brief Trigger an event callback on a channel
837 *
838 * Marks the given channel as having a pending event, causing some future call
839 * to get_next_event() to return the registered closure.
840 * This function must only be called when disabled.
841 *
842 * \param chan Waitset's per-channel state
843 * \param disp Current dispatcher pointer
844 */
845errval_t waitset_chan_trigger_disabled(struct waitset_chanstate *chan,
846                                       dispatcher_handle_t handle)
847{
848    assert_disabled(chan != NULL);
849    struct waitset *ws = chan->waitset;
850    assert_disabled(ws != NULL);
851    assert_disabled(chan->prev != NULL && chan->next != NULL);
852
853    // no-op if already pending
854    if (chan->state == CHAN_PENDING) {
855        return SYS_ERR_OK;
856    }
857
858    // remove from previous queue (either idle or polled)
859    if (chan->state == CHAN_IDLE) {
860        dequeue(&ws->idle, chan);
861    } else {
862        assert_disabled(chan->state == CHAN_POLLED);
863        dequeue(&ws->polled, chan);
864        dequeue_polled(&get_dispatcher_generic(handle)->polled_channels, chan);
865    }
866
867    // else mark channel pending and move to end of pending event queue
868    enqueue(&ws->pending, chan);
869    chan->state = CHAN_PENDING;
870
871    // is there a thread blocked on this waitset? if so, awaken it with the event
872    struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
873    if (thread) {
874        struct thread *t;
875        ws->waiting_threads = thread;
876        t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
877        assert_disabled(t == NULL);
878    }
879    return SYS_ERR_OK;
880}
881
882/**
883 * \brief Trigger an event callback on a channel
884 *
885 * Marks the given channel as having a pending event, causing some future call
886 * to get_next_event() to return the registered closure.
887 * This function must only be called when enabled.
888 *
889 * \param chan Waitset's per-channel state
890 * \param disp Current dispatcher pointer
891 */
892errval_t waitset_chan_trigger(struct waitset_chanstate *chan)
893{
894    dispatcher_handle_t handle = disp_disable();
895    errval_t err = waitset_chan_trigger_disabled(chan, handle);
896    disp_enable(handle);
897    return err;
898}
899
900/**
901 * \brief Trigger a specific event callback on an unregistered channel
902 *
903 * This function is equivalent to waitset_chan_register_disabled() immediately
904 * followed by waitset_chan_trigger_disabled(), but avoids unneccessary queue
905 * manipulation. This function must only be called when disabled.
906 *
907 * \param ws Waitset
908 * \param chan Waitset's per-channel state
909 * \param closure Event handler
910 * \param disp Current dispatcher pointer
911 */
912errval_t waitset_chan_trigger_closure_disabled(struct waitset *ws,
913                                               struct waitset_chanstate *chan,
914                                               struct event_closure closure,
915                                               dispatcher_handle_t handle)
916{
917    assert_disabled(chan != NULL);
918    assert_disabled(ws != NULL);
919
920    // check if already registered
921    if (chan->waitset != NULL || chan->state != CHAN_UNREGISTERED) {
922        return LIB_ERR_CHAN_ALREADY_REGISTERED;
923    }
924
925    assert_disabled(chan->prev == NULL && chan->next == NULL);
926
927    // set closure
928    chan->closure = closure;
929
930    // mark channel pending and place on end of pending event queue
931    chan->waitset = ws;
932    enqueue(&ws->pending, chan);
933    // if (first)
934    //     ws->pending = chan;
935    chan->state = CHAN_PENDING;
936
937    // is there a thread blocked on this waitset? if so, awaken it with the event
938    struct thread *thread = find_recipient(ws, chan, thread_self_disabled());
939    if (thread) {
940        struct thread *t;
941        ws->waiting_threads = thread;
942        t = thread_unblock_one_disabled(handle, &ws->waiting_threads, chan);
943        assert_disabled(t == NULL);
944    }
945    return SYS_ERR_OK;
946}
947
948
949/**
950 * \brief Trigger a specific event callback on an unregistered channel
951 *
952 * This function is equivalent to waitset_chan_register()
953 * followed by waitset_chan_trigger(), but avoids unneccessary queue
954 * manipulation. This function must only be called when enabled.
955 *
956 * \param ws Waitset
957 * \param chan Waitset's per-channel state
958 * \param closure Event handler
959 */
960errval_t waitset_chan_trigger_closure(struct waitset *ws,
961                                      struct waitset_chanstate *chan,
962                                      struct event_closure closure)
963{
964    dispatcher_handle_t disp = disp_disable();
965    errval_t err = waitset_chan_trigger_closure_disabled(ws, chan, closure, disp);
966    disp_enable(disp);
967    return err;
968}
969