1/**
2 * \file
3 * \brief Multi-hop channel support at the monitor
4 */
5
6/*
7 * Copyright (c) 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include "monitor.h"
16#include <collections/hash_table.h>
17#include <bench/bench.h>
18#include <barrelfish/multihop_chan.h>
19
20///////////////////////////////////////////////////////
21
22// ROUTING TABLE
23
24///////////////////////////////////////////////////////
25
26/* The routing table is used to determine where to
27 * forward a connection set-up request.
28 *
29 * The routing table is constructed by the RTS
30 * (Routing table set-up dispatcher), using
31 * information from the System Knowledge Base (SKB).
32 * The RTS will send the routing table to the monitor
33 * that is first booted once it has constructed the
34 * routing table.
35 *
36 * In cases where there is no SKB, the RTS will also
37 * never send us an routing table. We use direct routing
38 * is this case.
39 */
40
41// the routing table (as two dimensional array indexed by source and dest core)
42static coreid_t **routing_table;
43
44// the maximum source core ID in the routing table
45static coreid_t routing_table_max_coreid;
46
47// the number of outstanding entries to expect in the routing table
48// (this is a kludge used while receiving entries from the rts program)
49static coreid_t routing_table_nentries;
50
51// hack for synchronisation when requesting routing table from another monitor
52static bool saw_routing_table_response;
53
54/*
55 *  Print the routing table of a monitor, if present.
56 */
57static void multihop_print_routing_table(void)
58{
59#if MULTIHOP_DEBUG_ENABLED
60    if (routing_table == NULL) {
61        MULTIHOP_DEBUG("routing table not present on core %u\n", my_core_id);
62        return;
63    }
64
65    size_t buffer_size = (((size_t)routing_table_max_coreid) + 1) * 5;
66    char buffer[buffer_size];
67
68    // Print the header
69    MULTIHOP_DEBUG("routing table of monitor %u:\n", my_core_id);
70    {
71        char *p = buffer;
72        for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
73            p += sprintf(p, " %3u", i);
74        }
75        MULTIHOP_DEBUG("      To:%s\n", buffer);
76    }
77
78    // Print each line
79    for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
80        if (routing_table[src] == NULL) {
81            continue;
82        }
83
84        // convert (my part of) the routing table into a single string
85        char *p = buffer;
86        int total_char = 0, w_char = 0;
87        for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
88            w_char = snprintf(p, buffer_size - total_char, " %3u",
89                              routing_table[src][i]);
90            assert(w_char > 0);
91            total_char += w_char;
92            p += w_char;
93        }
94        MULTIHOP_DEBUG("From %3u:%s\n", src, buffer);
95    }
96#endif // MULTIHOP_DEBUG_ENABLED
97}
98
99// start to receive a new routing table from the RTS
100static void multihop_routing_table_new(struct monitor_binding *b,
101                                       coreid_t max_coreid, coreid_t nentries)
102{
103    // sanity-check input (FIXME: report errors!)
104    assert(max_coreid >= my_core_id);
105    assert(nentries > 0 && nentries <= (max_coreid + 1));
106
107    // FIXME: we don't yet support changes to the existing routing table
108    assert(routing_table == NULL);
109
110    routing_table_max_coreid = max_coreid;
111    routing_table_nentries = nentries;
112
113    // allocate space for the max core ID
114    routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
115    assert(routing_table != NULL);
116}
117
118// receive a part of the routing table from RTS (routing table set-up dispatcher)
119static void multihop_routing_table_set(struct monitor_binding *b,
120                                       coreid_t from, const coreid_t *to,
121                                       size_t len)
122{
123    // sanity-check input (FIXME: report errors!)
124    // FIXME: we don't yet support changes to the existing routing table
125    assert(routing_table != NULL);
126    assert(from <= routing_table_max_coreid);
127    assert(routing_table[from] == NULL);
128    assert(len == routing_table_max_coreid + 1);
129    routing_table[from] = memdup(to, len * sizeof(coreid_t));
130
131    if (--routing_table_nentries == 0) {
132        // we have received the complete table!
133        MULTIHOP_DEBUG("monitor on core %d has received the complete"
134                       " routing table (from RTS)\n", my_core_id);
135        multihop_print_routing_table();
136    }
137}
138
139/*
140 * Request (my part of) the routing table from another monitor.
141 * This method blocks until a reply is received.
142 */
143errval_t multihop_request_routing_table(struct intermon_binding *b)
144{
145    errval_t err;
146
147    // request the routing table
148    err = b->tx_vtbl.multihop_routing_table_request(b, NOP_CONT, my_core_id);
149    if (err_is_fail(err)) {
150        return err;
151    }
152
153    // wait until we have received a reply
154    while (!saw_routing_table_response) {
155        messages_wait_and_handle_next();
156    }
157
158    return SYS_ERR_OK;
159}
160
161// handle request for a portion of the routing table from another monitor
162static void multihop_handle_routing_table_request(struct intermon_binding *b,
163                                                  coreid_t core_id)
164{
165    errval_t err;
166
167    if (routing_table != NULL && core_id <= routing_table_max_coreid
168        && routing_table[core_id] != NULL) {
169        // if we have a routing table, send routing table to other core
170        err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
171                SYS_ERR_OK, core_id, routing_table_max_coreid,
172                routing_table[core_id], routing_table_max_coreid + 1);
173    } else {
174        // if we don't have a routing table, send an error reply
175        err = b->tx_vtbl.multihop_routing_table_response(b, NOP_CONT,
176                MON_ERR_INCOMPLETE_ROUTE, core_id, routing_table_max_coreid,
177                NULL, 0);
178    }
179
180    assert(err_is_ok(err)); // FIXME
181}
182
183// handle the response to a routing table request from the other monitor
184static void multihop_handle_routing_table_response(struct intermon_binding *b,
185                                                   errval_t err,
186                                                   coreid_t source_coreid,
187                                                   coreid_t max_coreid,
188                                                   const coreid_t *to, size_t len)
189{
190    assert(routing_table == NULL);
191    assert(source_coreid == my_core_id);
192
193    if (err_is_ok(err)) {
194        assert(to != NULL);
195        routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
196        assert(routing_table != NULL);
197        routing_table_max_coreid = max_coreid;
198
199        assert(len == max_coreid + 1);
200        assert(source_coreid <= max_coreid);
201        routing_table[source_coreid] = memdup(to, len * sizeof(coreid_t));
202    } else {
203        if (err_no(err) != MON_ERR_INCOMPLETE_ROUTE) {
204            DEBUG_ERR(err, "unexpected error retrieving routing table");
205        }
206    }
207
208    saw_routing_table_response = true;
209}
210
211// grow the routing table to a set of desination cores, via a given forwarder
212static void multihop_routing_table_grow(struct intermon_binding *b,
213                                        coreid_t forwarder,
214                                        const coreid_t *destinations,
215                                        size_t ndests)
216{
217    assert(ndests > 0);
218
219    // check the max core ID in the destinations
220    coreid_t max_coreid = my_core_id;
221    for (unsigned i = 0; i < ndests; i++) {
222        if (destinations[i] > max_coreid) {
223            max_coreid = destinations[i];
224        }
225    }
226
227    // ensure we have an allocated routing table; if necessary, grow it
228    if (routing_table == NULL) {
229        routing_table = calloc(((uintptr_t)max_coreid) + 1, sizeof(coreid_t *));
230        assert(routing_table != NULL);
231        routing_table_max_coreid = max_coreid;
232    } else if (max_coreid > routing_table_max_coreid) {
233        for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
234            if (routing_table[i] != NULL) {
235                routing_table[i] = realloc(routing_table[i],
236                                           (((uintptr_t)max_coreid) + 1)
237                                           * sizeof(coreid_t));
238                assert(routing_table[i] != NULL);
239                // XXX: the default for the unconfigured part of the routing
240                // table is direct routing
241                for (unsigned j = routing_table_max_coreid + 1; j <= max_coreid; j++) {
242                    routing_table[i][j] = j;
243                }
244            }
245        }
246
247        routing_table = realloc(routing_table, (((uintptr_t)max_coreid) + 1)
248                                               * sizeof(coreid_t *));
249        assert(routing_table != NULL);
250        memset(&routing_table[routing_table_max_coreid + 1], 0,
251               (max_coreid - routing_table_max_coreid) * sizeof(coreid_t *));
252        routing_table_max_coreid = max_coreid;
253    }
254
255    // ensure I have my own routes (the default is direct routing)
256    if (routing_table[my_core_id] == NULL) {
257        routing_table[my_core_id] = malloc((((uintptr_t)routing_table_max_coreid) + 1)
258                                           * sizeof(coreid_t));
259        assert(routing_table[my_core_id] != NULL);
260        for (unsigned i = 0; i <= routing_table_max_coreid; i++) {
261            routing_table[my_core_id][i] = i;
262        }
263    }
264
265    // update routes to destinations for all origins in my routing table and myself
266    for (unsigned src = 0; src <= routing_table_max_coreid; src++) {
267        if (routing_table[src] != NULL) {
268            for (unsigned i = 0; i < ndests; i++) {
269                routing_table[src][destinations[i]] = routing_table[src][forwarder];
270            }
271        }
272    }
273}
274
275// return the next hop (based on the routing table)
276static inline coreid_t get_next_hop(coreid_t dest)
277{
278
279    assert(dest != my_core_id);
280
281    if (routing_table != NULL
282        && my_core_id <= routing_table_max_coreid
283        && dest <= routing_table_max_coreid
284        && routing_table[my_core_id] != NULL) {
285        // if we have a routing table, look up next hop
286        return routing_table[my_core_id][dest];
287    } else {
288        // if we don't have a routing table, route directly
289        return dest;
290    }
291}
292
293///////////////////////////////////////////////////////
294
295// FORWARDING (HASH) TABLE
296
297///////////////////////////////////////////////////////
298
299/**
300 * Messages are forwarded based on the forwarding table.
301 * We use a hash table to map virtual circuit identifiers (VCIs)
302 * to a pointer to the channel state.
303 */
304static collections_hash_table *forwarding_table;
305
306// is forwarding table initialized?
307static bool is_forwarding_table_initialized = false;
308
309struct monitor_multihop_chan_state;
310
311// initialize the forwarding table
312static inline void init_forwarding_table(void)
313{
314
315    if (!is_forwarding_table_initialized) {
316        is_forwarding_table_initialized = true;
317        collections_hash_create_with_buckets(&forwarding_table,
318                MULTIHOP_FORWARDING_TABLE_BUCKETS, free);
319
320        /**
321         * We initialize the random function with the current time stamp
322         * in order to make assigned VCIs unpredictable. This makes it hard
323         * for an attacker that sends message with manipulated VCIs to actually
324         * find a valid VCI.
325         */
326        srand(bench_tsc());
327    }
328}
329
330// insert entry in forwarding table and return VCI
331static inline multihop_vci_t forwarding_table_insert(
332        struct monitor_multihop_chan_state *chan_state)
333{
334
335    assert(chan_state != NULL);
336    multihop_vci_t vci;
337
338    // we call initialize before we insert an entry
339    init_forwarding_table();
340
341    do {
342        // we assign VCIs randomly, but need to
343        // make sure, that it is not yet taken
344        vci = (multihop_vci_t) rand();
345    } while (collections_hash_find(forwarding_table, vci) != NULL);
346
347    // insert into forwarding table
348    collections_hash_insert(forwarding_table, vci, chan_state);
349    return vci;
350}
351
352// delete entry from forwarding table
353static inline void forwarding_table_delete(multihop_vci_t vci)
354{
355    assert(is_forwarding_table_initialized);
356    collections_hash_delete(forwarding_table, vci);
357}
358
359// get entry from the forwarding table
360static inline struct monitor_multihop_chan_state* forwarding_table_lookup(
361        multihop_vci_t vci)
362{
363
364    assert(is_forwarding_table_initialized);
365    struct monitor_multihop_chan_state *chan_state = collections_hash_find(forwarding_table,
366            vci);
367
368    if (chan_state == NULL) {
369        USER_PANIC("invalid virtual circuit identifier in multi-hop channel");
370    }
371    return chan_state;
372}
373
374///////////////////////////////////////////////////////
375
376// STRUCT FOR THE PER - CHANNEL STATE
377
378///////////////////////////////////////////////////////
379
380struct monitor_multihop_chan_state {
381    struct direction {
382        enum {
383            MULTIHOP_ENDPOINT, // if this is an endpoint, the communication partner is on the same core
384            MULTIHOP_NODE
385        // communication partner is a monitor on another core
386        } type;
387
388        multihop_vci_t vci; // the virtual circuit identifier to use on outgoing messages
389
390        // bindings to the "next hop"
391        union {
392            struct monitor_binding *monitor_binding; // used at endpoints to identify the dispatcher
393            struct intermon_binding *intermon_binding; // monitor binding of next hop
394        } binding;
395    } dir1, dir2;
396
397    // temporary storage for a virtual circuit identifier
398    multihop_vci_t tmp_vci;
399
400    // connection state
401    enum {
402        MONTIOR_MULTIHOP_DISCONNECTED, // Disconnected
403        MONITOR_MULTIHOP_BIND_WAIT, // Waiting for a bind reply message
404        MONITOR_MULTIHOP_CONNECTED,
405    // Connection established
406    } connstate;
407};
408
409// get the direction
410static inline struct direction* multihop_get_direction(
411        struct monitor_multihop_chan_state *chan_state, uint8_t direction)
412{
413    if (direction == 1) {
414        return &chan_state->dir1;
415    } else if (direction == 2) {
416        return &chan_state->dir2;
417    } else {
418        USER_PANIC("unknown direction in multihop channel: %d", direction);
419        return NULL;
420    }
421}
422
423// get the opposite direction
424static inline uint8_t multihop_get_opposite_direction(
425        struct monitor_multihop_chan_state *chan_state, uint8_t direction,
426        struct direction **dir)
427{
428    if (direction == 2) {
429        *dir = &chan_state->dir1;
430        return 1;
431    } else if (direction == 1) {
432        *dir = &chan_state->dir2;
433        return 2;
434    } else {
435        USER_PANIC("unknown direction in multihop channel: %d", direction);
436        return 0;
437    }
438}
439
440////////////////////////////////////////////////////////////
441
442// MULTI-HOP CHANNEL SETUP
443
444////////////////////////////////////////////////////////////
445
446static void
447multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
448        struct intermon_msg_queue_elem *e);
449
450static void
451multihop_monitor_bind_request_cont(
452        struct monitor_multihop_chan_state *chan_state, iref_t iref,
453        coreid_t core);
454
455static void
456multihop_bind_service_request(uintptr_t service_id,
457        struct monitor_multihop_chan_state *chan_state);
458
459static void
460multihop_intermon_bind_reply_cont(struct intermon_binding *intermon_binding,
461        multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
462
463static void
464multihop_monitor_bind_reply_client(struct monitor_binding *domain_binding,
465        multihop_vci_t receiver_vci, multihop_vci_t sender_vci, errval_t msgerr);
466
467static inline void
468multihop_monitor_request_error(struct monitor_multihop_chan_state *chan_state,
469        errval_t msgerr);
470
471/**
472 * \brief This method handles a bind request message from a local dispatcher
473 *
474 * \param b The monitor binding
475 * \param iref The iref of the service
476 * \param vci The vci of the local dispatcher (this vci should be used for messages sent to the dispatcher)
477 */
478static void multihop_monitor_bind_request_handler(struct monitor_binding *b,
479        iref_t iref, multihop_vci_t vci)
480{
481
482    errval_t err;
483    coreid_t core_id;
484    struct monitor_multihop_chan_state *chan_state = NULL;
485
486    MULTIHOP_DEBUG(
487            "monitor on core %d received a bind multi-hop message from a local dispatcher, iref: %d\n", my_core_id, (int) iref);
488
489    // Look up core_id from the iref
490    err = iref_get_core_id(iref, &core_id);
491    if (err_is_fail(err)) {
492        debug_err(__FILE__, __func__, __LINE__, err, "iref_get_core_id failed");
493        multihop_monitor_bind_reply_client(b, vci, 0, err); // send back error message
494        return;
495    }
496
497    // allocate local state for the connection
498    chan_state = malloc(sizeof(struct monitor_multihop_chan_state));
499    assert(chan_state != NULL);
500    chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
501    chan_state->dir2.type = MULTIHOP_ENDPOINT;
502    chan_state->dir2.vci = vci;
503    chan_state->dir2.binding.monitor_binding = b;
504
505    // get a virtual circuit identifier (VCI) for this channel
506    // and insert mapping into forwarding table
507    chan_state->tmp_vci = forwarding_table_insert(chan_state);
508
509    // make sure that service is not on same core as the client
510    if (core_id == my_core_id) {
511        multihop_monitor_request_error(chan_state,
512                LIB_ERR_BIND_MULTIHOP_SAME_CORE);
513        forwarding_table_delete(chan_state->tmp_vci);
514        return;
515    }
516
517    // determine where to forward the message
518    coreid_t next_hop = get_next_hop(core_id);
519
520    // Get connection to the monitor to forward request to
521    err = intermon_binding_get(next_hop,
522            &chan_state->dir1.binding.intermon_binding);
523    if (err_is_fail(err)) {
524        debug_err(__FILE__, __func__, __LINE__, err,
525                "intermon_binding_get failed");
526        multihop_monitor_request_error(chan_state, err);
527        forwarding_table_delete(chan_state->tmp_vci);
528        return;
529    }
530
531    // call continuation function
532    multihop_monitor_bind_request_cont(chan_state, iref, core_id);
533}
534
535struct multihop_monitor_bind_request_state {
536    struct intermon_msg_queue_elem elem;
537    struct monitor_multihop_chan_state *chan_state;
538    iref_t iref;
539    coreid_t core;
540};
541
542// called when channel is no longer busy
543static void multihop_monitor_bind_request_busy_cont(struct intermon_binding *b,
544        struct intermon_msg_queue_elem *e)
545{
546    struct multihop_monitor_bind_request_state *st =
547            (struct multihop_monitor_bind_request_state *) e;
548    multihop_monitor_bind_request_cont(st->chan_state, st->iref, st->core);
549    free(e);
550}
551
552/**
553 * \brief Sends a bind request to the "next hop"
554 *
555 * \param chan_state pointer to the channel state
556 * \param iref the iref of the service
557 * \param core core ID of the service
558 */
559static void multihop_monitor_bind_request_cont(
560        struct monitor_multihop_chan_state *chan_state, iref_t iref,
561        coreid_t core)
562{
563
564    errval_t err;
565    struct intermon_binding *mon_binding =
566            chan_state->dir1.binding.intermon_binding;
567
568    // send request to next hop
569    err = mon_binding->tx_vtbl.bind_multihop_intermon_request(mon_binding,
570            NOP_CONT, iref, chan_state->tmp_vci, core);
571
572    if (err_is_fail(err)) {
573        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
574            struct multihop_monitor_bind_request_state *me = malloc(
575                    sizeof(struct multihop_monitor_bind_request_state));
576            struct intermon_state *ist = mon_binding->st;
577            me->chan_state = chan_state;
578            me->iref = iref;
579            me->elem.cont = multihop_monitor_bind_request_busy_cont;
580
581            err = intermon_enqueue_send(mon_binding, &ist->queue,
582                    get_default_waitset(), &me->elem.queue);
583            assert(err_is_ok(err));
584            return;
585        }
586        // return error code to client
587        multihop_monitor_request_error(chan_state, err);
588        forwarding_table_delete(chan_state->tmp_vci);
589    }
590}
591
592/**
593 * \brief Handles a bind request from another monitor
594 *
595 * \param b intermonitor binding
596 * \param iref the iref of the service
597 * \param vci The vci to use
598 * \param the core ID of the service
599 */
600static void multihop_intermon_bind_request_handler(struct intermon_binding *b,
601        iref_t iref, multihop_vci_t vci, coreid_t core)
602{
603    errval_t err;
604
605    MULTIHOP_DEBUG(
606            "monitor on core %d received multi-hop bind request with vci %d\n", my_core_id, (int) vci);
607
608    // allocate channel state & fill in needed information
609    struct monitor_multihop_chan_state *chan_state = malloc(
610            sizeof(struct monitor_multihop_chan_state));
611    chan_state->connstate = MONITOR_MULTIHOP_BIND_WAIT;
612    chan_state->dir2.vci = vci;
613    chan_state->dir2.binding.intermon_binding = b;
614    chan_state->dir2.type = MULTIHOP_NODE;
615    chan_state->tmp_vci = forwarding_table_insert(chan_state);
616
617    if (core == my_core_id) {
618        // service is on same core than this monitor, therefore we forward to local dispatcher
619
620        // get the service's connection
621        err = iref_get_binding(iref, &chan_state->dir1.binding.monitor_binding);
622        if (err_is_fail(err)) {
623            USER_PANIC_ERR(err,
624                    "Multihop set-up: could not get domain-binding for iref");
625        }
626
627        // get the service id
628        uintptr_t service_id = 0;
629        err = iref_get_service_id(iref, &service_id);
630        if (err_is_fail(err)) {
631            USER_PANIC_ERR(err,
632                    "Multihop set-up: could not get service id for iref");
633        }
634
635        // forward request to service
636        multihop_bind_service_request(service_id, chan_state);
637
638    } else {
639        // we have to forward the request to another monitor
640        // we get the core id of the next hop from the routing table
641        coreid_t next_hop = get_next_hop(core);
642
643        // get connection to the "next-hop" monitor
644        err = intermon_binding_get(next_hop,
645                &chan_state->dir1.binding.intermon_binding);
646        if (err_is_fail(err)) {
647            debug_err(__FILE__, __func__, __LINE__, err,
648                    "intermon_binding_get failed");
649            multihop_monitor_request_error(chan_state, err);
650            forwarding_table_delete(chan_state->tmp_vci);
651            return;
652        }
653
654        // send request to next hop
655        multihop_monitor_bind_request_cont(chan_state, iref, core);
656    }
657}
658
659// used if channel is busy while sending request to service
660struct multihop_bind_service_request_state {
661    struct monitor_msg_queue_elem elem;
662    uintptr_t service_id;
663    struct monitor_multihop_chan_state *chan_state;
664};
665
666// used when channel is no longer busy
667static void multihop_bind_service_busy_cont(struct monitor_binding *b,
668        struct monitor_msg_queue_elem *e)
669{
670    struct multihop_bind_service_request_state *st =
671            (struct multihop_bind_service_request_state *) e;
672    multihop_bind_service_request(st->service_id, st->chan_state);
673    free(e);
674}
675
676/**
677 * \brief Forward bind request to service's dispatcher
678 *
679 * \param domain_binding binding to service
680 * \param service_id Id of the service
681 * \param vci my vci
682 */
683static void multihop_bind_service_request(uintptr_t service_id,
684        struct monitor_multihop_chan_state *chan_state)
685{
686    errval_t err;
687    MULTIHOP_DEBUG(
688            "monitor on core %d is forwarding bind request to local dispatcher...\n", my_core_id);
689    err =
690            chan_state->dir1.binding.monitor_binding->tx_vtbl.multihop_bind_service_request(
691                    chan_state->dir1.binding.monitor_binding, NOP_CONT,
692                    service_id, chan_state->tmp_vci);
693    if (err_is_fail(err)) {
694        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
695            struct monitor_binding *monitor_binding =
696                    chan_state->dir1.binding.monitor_binding;
697            struct multihop_bind_service_request_state *me = malloc(
698                    sizeof(struct multihop_bind_service_request_state));
699            struct monitor_state *ist = monitor_binding->st;
700            me->service_id = service_id;
701            me->chan_state = chan_state;
702            me->elem.cont = &multihop_bind_service_busy_cont;
703
704            err = monitor_enqueue_send(monitor_binding, &ist->queue,
705                    get_default_waitset(), &me->elem.queue);
706            assert(err_is_ok(err));
707            return;
708        }
709
710        // return error code to client
711        multihop_monitor_request_error(chan_state, err);
712        forwarding_table_delete(chan_state->tmp_vci);
713    }
714}
715
716/**
717 * \brief Handle a reply message coming from service's dispatcher
718 *
719 * \param mon_binding Binding to service's dispatcher
720 * \param my_vci my virtual circuit identifier
721 * \param sender_vci virtual circuit identifier of the sender
722 * \param msgerr error code
723 */
724static void multihop_monitor_service_bind_reply_handler(
725        struct monitor_binding *mon_binding, multihop_vci_t receiver_vci,
726        multihop_vci_t sender_vci, errval_t msgerr)
727{
728    MULTIHOP_DEBUG(
729            "monitor on core %d received bind reply message. Status: %s. my_vci: %d\n", my_core_id, err_is_ok(msgerr) ? "success" : "failed", (int) receiver_vci);
730
731    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
732            receiver_vci);
733
734    assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
735
736    multihop_vci_t next_receiver_vci = chan_state->dir2.vci;
737    struct intermon_binding *next_hop_binding =
738            chan_state->dir2.binding.intermon_binding;
739    if (err_is_ok(msgerr)) { /* bind succeeded */
740        chan_state->dir1.type = MULTIHOP_ENDPOINT;
741        chan_state->dir1.vci = sender_vci;
742        chan_state->dir1.binding.monitor_binding = mon_binding;
743        chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
744    } else {
745        // delete entry from forwarding table
746        forwarding_table_delete(receiver_vci);
747    }
748
749    // (stack-ripped) forward reply to next monitor
750    multihop_intermon_bind_reply_cont(next_hop_binding, next_receiver_vci,
751            receiver_vci, msgerr);
752}
753
754struct multihop_intermon_bind_reply_state {
755    struct intermon_msg_queue_elem elem;
756    struct intermon_bind_multihop_intermon_reply__tx_args args;
757};
758
759// called when channel is no longer busy
760static void multihop_intermon_bind_reply_busy_cont(struct intermon_binding *b,
761        struct intermon_msg_queue_elem *e)
762{
763    struct multihop_intermon_bind_reply_state *st =
764            (struct multihop_intermon_bind_reply_state *) e;
765    multihop_intermon_bind_reply_cont(b, st->args.receiver_vci,
766            st->args.sender_vci, st->args.err);
767    free(e);
768}
769
770/**
771 * \brief Forward a bind reply message to the next monitor
772 *
773 * \param intermon_binding binding to the next monitor
774 */
775static void multihop_intermon_bind_reply_cont(
776        struct intermon_binding *intermon_binding, multihop_vci_t receiver_vci,
777        multihop_vci_t sender_vci, errval_t msgerr)
778{
779    errval_t err;
780    MULTIHOP_DEBUG("monitor on core %d is forwarding reply\n", my_core_id);
781    err = intermon_binding->tx_vtbl.bind_multihop_intermon_reply(
782            intermon_binding, NOP_CONT, receiver_vci, sender_vci, msgerr);
783    if (err_is_fail(err)) {
784        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
785            struct multihop_intermon_bind_reply_state *me = malloc(
786                    sizeof(struct multihop_intermon_bind_reply_state));
787            struct intermon_state *ist = intermon_binding->st;
788            me->args.sender_vci = sender_vci;
789            me->args.receiver_vci = receiver_vci;
790            me->args.err = msgerr;
791            me->elem.cont = multihop_intermon_bind_reply_busy_cont;
792
793            err = intermon_enqueue_send(intermon_binding, &ist->queue,
794                    get_default_waitset(), &me->elem.queue);
795            assert(err_is_ok(err));
796            return;
797        }USER_PANIC_ERR(err,
798                "Could not forward bind reply message in multi-hop channel");
799    }
800}
801
802/**
803 * \brief Handles a reply message from another monitor
804 *
805 * \param binding Binding to the other monitor
806 * \param my_vci My virtual circuit identifier
807 * \param sender_vci virtual circuit identifier of the sender
808 * \param msgerr error code
809 */
810static void multihop_intermon_bind_reply_handler(
811        struct intermon_binding *binding, multihop_vci_t receiver_vci,
812        multihop_vci_t sender_vci, errval_t msgerr)
813{
814    MULTIHOP_DEBUG(
815            "monitor on core %d has received a bind reply\n", my_core_id);
816    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
817            receiver_vci);
818
819    assert(chan_state->connstate == MONITOR_MULTIHOP_BIND_WAIT);
820
821    if (err_is_ok(msgerr)) {
822        chan_state->dir1.type = MULTIHOP_NODE;
823        chan_state->dir1.binding.intermon_binding = binding;
824        chan_state->dir1.vci = sender_vci;
825        chan_state->connstate = MONITOR_MULTIHOP_CONNECTED;
826
827        if (chan_state->dir2.type == MULTIHOP_NODE) {
828            multihop_intermon_bind_reply_cont(
829                    chan_state->dir2.binding.intermon_binding,
830                    chan_state->dir2.vci, receiver_vci, msgerr);
831        } else {
832            multihop_monitor_bind_reply_client(
833                    chan_state->dir2.binding.monitor_binding,
834                    chan_state->dir2.vci, receiver_vci, msgerr);
835        }
836    } else {
837
838        // connection was refused
839
840        if (chan_state->dir2.type == MULTIHOP_NODE) {
841            multihop_intermon_bind_reply_cont(
842                    chan_state->dir2.binding.intermon_binding,
843                    chan_state->dir2.vci, 0, msgerr);
844        } else {
845            multihop_monitor_bind_reply_client(
846                    chan_state->dir2.binding.monitor_binding,
847                    chan_state->dir2.vci, 0, msgerr);
848        }
849
850        // delete entry from forwarding table
851        forwarding_table_delete(receiver_vci);
852    }
853}
854
855struct multihop_monitor_bind_reply_state {
856    struct monitor_msg_queue_elem elem;
857    struct monitor_multihop_bind_client_reply__tx_args args;
858};
859
860// continue function to forward a message to a dispatcher
861static void multihop_monitor_bind_reply_busy_cont(struct monitor_binding *b,
862        struct monitor_msg_queue_elem *e)
863{
864    struct multihop_monitor_bind_reply_state *st =
865            (struct multihop_monitor_bind_reply_state *) e;
866    multihop_monitor_bind_reply_client(b, st->args.receiver_vci,
867            st->args.sender_vci, st->args.err);
868    free(e);
869}
870
871/**
872 * \brief Send a reply to the dispatcher who originally sent the request
873 *
874 * \param domain_binding The monitor_binding to use
875 * \param receiver_vci The VCI of the receiver
876 * \param sender_vci The VCI of the sender
877 * \param msgerr The error code
878 */
879static void multihop_monitor_bind_reply_client(
880        struct monitor_binding *domain_binding, multihop_vci_t receiver_vci,
881        multihop_vci_t sender_vci, errval_t msgerr)
882{
883    errval_t err;
884    MULTIHOP_DEBUG(
885            "monitor on core %d is sending reply to dispatcher\n", my_core_id);
886    err = domain_binding->tx_vtbl.multihop_bind_client_reply(domain_binding,
887            NOP_CONT, receiver_vci, sender_vci, msgerr);
888    if (err_is_fail(err)) {
889        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
890            struct multihop_monitor_bind_reply_state *me = malloc(
891                    sizeof(struct multihop_monitor_bind_reply_state));
892            assert(me != NULL);
893            struct monitor_state *ist = domain_binding->st;
894            me->args.receiver_vci = receiver_vci;
895            me->args.sender_vci = sender_vci;
896            me->args.err = msgerr;
897            me->elem.cont = multihop_monitor_bind_reply_busy_cont;
898
899            err = monitor_enqueue_send(domain_binding, &ist->queue,
900                    get_default_waitset(), &me->elem.queue);
901            assert(err_is_ok(err));
902            return;
903
904        }
905
906        USER_PANIC_ERR(
907                err,
908                "Could not forward bind reply to client's dispatcher in multi-hop channel");
909    }
910}
911
912/**
913 * \brief send an error code back
914 *
915 */
916static inline void multihop_monitor_request_error(
917        struct monitor_multihop_chan_state *chan_state, errval_t msgerr)
918{
919    assert(chan_state != NULL);
920    if (chan_state->dir2.type == MULTIHOP_NODE) {
921        multihop_intermon_bind_reply_cont(
922                chan_state->dir2.binding.intermon_binding, chan_state->dir2.vci,
923                0, msgerr);
924    } else {
925        multihop_monitor_bind_reply_client(
926                chan_state->dir2.binding.monitor_binding, chan_state->dir2.vci,
927                0, msgerr);
928    }
929}
930
931///////////////////////////////////////////////////////
932
933// MESSAGE FORWARDING
934
935///////////////////////////////////////////////////////
936
937static inline void multihop_message_monitor_forward(struct monitor_binding *b,
938        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
939        const uint8_t *payload, size_t size, bool first_try);
940
941static void multihop_message_forward_continue(struct monitor_binding *b,
942        struct monitor_msg_queue_elem *e);
943
944static inline void multihop_message_intermon_forward(struct intermon_binding *b,
945        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
946        const uint8_t *payload, size_t size, bool first_try);
947
948static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
949        struct intermon_msg_queue_elem *e);
950
951// monitor message forwarding state
952struct monitor_multihop_message_forwarding_state {
953    struct monitor_msg_queue_elem elem;
954    struct monitor_multihop_message__rx_args args;
955};
956
957// inter-monitor forwarding state
958struct intermon_message_forwarding_state {
959    struct intermon_msg_queue_elem elem;
960    struct intermon_multihop_message__rx_args args;
961};
962
963/**
964 * \brief Handle a multi-hop message coming from a local dispatcher.
965 *        The message must be forwarded to the next hop.
966 *
967 * \param mon_binding the monitor binding
968 * \param vci the virtual circuit identifier of the message
969 * \param direction direction of the message
970 * \param flags message flags
971 * \param ack number of messages acknowledged with this message
972 * \param payload pointer to the message payload
973 * \size size of the message payload
974 *
975 */
976static void multihop_message_handler(struct monitor_binding *mon_binding,
977        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
978        const uint8_t *payload, size_t size)
979{
980
981    MULTIHOP_DEBUG(
982            "monitor on core %d received multi-hop message (from local dispatcher). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
983
984    // get forwarding information
985    errval_t err;
986    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
987            vci);
988    struct direction *dir = multihop_get_direction(chan_state, direction);
989    struct intermon_binding *b = dir->binding.intermon_binding;
990
991    struct intermon_state *ist = b->st;
992    if (msg_queue_is_empty(&ist->queue)) {
993
994        // if the message queue is empty, we can directly forward
995        // the message
996        multihop_message_intermon_forward(b, dir->vci, direction, flags, ack,
997                payload, size, true);
998    } else {
999        // if the message queue is not empty, we have to
1000        // enqueue the message (to make sure we do not bypass
1001        // other messages)
1002        struct intermon_message_forwarding_state *me = malloc(
1003                sizeof(struct intermon_message_forwarding_state));
1004        me->args.vci = dir->vci;
1005        me->args.direction = direction;
1006        me->args.flags = flags;
1007        me->args.ack = ack;
1008        memcpy(me->args.payload, payload, size);
1009
1010        me->args.size = size;
1011        me->elem.cont = multihop_message_intermon_forward_cont;
1012
1013        err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1014                &me->elem.queue);
1015        assert(err_is_ok(err));
1016    }
1017}
1018
1019// continue function for intermonitor message forwarding
1020static void multihop_message_intermon_forward_cont(struct intermon_binding *b,
1021        struct intermon_msg_queue_elem *e)
1022{
1023
1024    struct intermon_message_forwarding_state *st =
1025            (struct intermon_message_forwarding_state *) e;
1026
1027    multihop_message_intermon_forward(b, st->args.vci, st->args.direction,
1028            st->args.flags, st->args.ack, st->args.payload, st->args.size,
1029            false);
1030    free(e);
1031}
1032
1033/**
1034 * \brief Forward a message to another monitor.
1035 *
1036 */
1037static inline void multihop_message_intermon_forward(struct intermon_binding *b,
1038        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1039        const uint8_t *payload, size_t size, bool first_try)
1040{
1041
1042    errval_t err;
1043
1044    // try to forward message
1045    err = b->tx_vtbl.multihop_message(b, NOP_CONT, vci, direction,
1046            flags, ack, payload, size);
1047
1048    if (err_is_fail(err)) {
1049        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1050            struct intermon_message_forwarding_state *me = malloc(
1051                    sizeof(struct intermon_message_forwarding_state));
1052            struct intermon_state *ist = b->st;
1053            me->args.vci = vci;
1054            me->args.direction = direction;
1055            me->args.flags = flags;
1056            me->args.ack = ack;
1057            memcpy(me->args.payload, payload, size);
1058
1059            me->args.size = size;
1060            me->elem.cont = multihop_message_intermon_forward_cont;
1061
1062            if (first_try) {
1063                // if this is the first time that we try to send this message
1064                // we can enqueue it at the back of the message queue
1065                err = intermon_enqueue_send(b, &ist->queue,
1066                        get_default_waitset(), &me->elem.queue);
1067            } else {
1068                // if this is NOT the first time that we try to send this message
1069                // we have to enqueue it at the FRONT to make sure that the
1070                // original message order is preserved
1071                err = intermon_enqueue_send_at_front(b, &ist->queue,
1072                        get_default_waitset(), &me->elem.queue);
1073            }
1074
1075            assert(err_is_ok(err));
1076            return;
1077        }
1078
1079        USER_PANIC_ERR(err, "Could not forward multi-hop message\n");
1080    }
1081}
1082
1083/**
1084 * \brief Handle a message coming from another monitor. We have
1085 *        to forward the message either to another monitor or
1086 *        to a dispatcher
1087 *
1088 * \param mon_binding the monitor binding
1089 * \param vci the virtual circuit identifier of the message
1090 * \param direction direction of the message
1091 * \param flags message flags
1092 * \param ack number of messages acknowledged with this message
1093 * \param payload pointer to the message payload
1094 * \size size of the message payload
1095 */
1096static void intermon_multihop_message_handler(struct intermon_binding *binding,
1097        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1098        const uint8_t *payload, size_t size)
1099{
1100
1101    MULTIHOP_DEBUG(
1102            "monitor on core %d received multi-hop message (from other monitor). VCI %llu, direction %d, flags %d, ack %d\n", my_core_id, (unsigned long long) vci, direction, flags, ack);
1103
1104    errval_t err;
1105    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1106            vci);
1107    struct direction *dir = multihop_get_direction(chan_state, direction);
1108
1109    if (dir->type == MULTIHOP_ENDPOINT) {
1110        // we have to forward the message to a local dispatcher
1111        struct monitor_binding *b = dir->binding.monitor_binding;
1112        struct monitor_state *ist = b->st;
1113
1114        if (msg_queue_is_empty(&ist->queue)) {
1115            // if the message queue is empty, we can directly forward
1116            // the message
1117            multihop_message_monitor_forward(b, dir->vci, direction, flags, ack,
1118                    payload, size, true);
1119        } else {
1120            // if the message queue is not empty, we have to
1121            // enqueue the message (to make sure we do not bypass
1122            // other messages)
1123            struct monitor_multihop_message_forwarding_state *me = malloc(
1124                    sizeof(struct monitor_multihop_message_forwarding_state));
1125            assert(me != NULL);
1126            me->args.vci = dir->vci;
1127            me->args.direction = direction;
1128            me->args.flags = flags;
1129            me->args.ack = ack;
1130            memcpy(me->args.payload, payload, size);
1131
1132            me->args.size = size;
1133            me->elem.cont = multihop_message_forward_continue;
1134
1135            err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1136                    &me->elem.queue);
1137            assert(err_is_ok(err));
1138        }
1139        return;
1140    } else {
1141        // we have to forward the message to the next hop (--> another monitor)
1142        struct intermon_binding *b = dir->binding.intermon_binding;
1143        struct intermon_state *ist = b->st;
1144
1145        if (msg_queue_is_empty(&ist->queue)) {
1146            // message queue is empty --> send directly
1147            multihop_message_intermon_forward(b, dir->vci, direction, flags,
1148                    ack, payload, size, true);
1149        } else {
1150            // enqueue message
1151            struct intermon_message_forwarding_state *me = malloc(
1152                    sizeof(struct intermon_message_forwarding_state));
1153            me->args.vci = dir->vci;
1154            me->args.direction = direction;
1155            me->args.flags = flags;
1156            me->args.ack = ack;
1157            memcpy(me->args.payload, payload, size);
1158
1159            me->args.size = size;
1160            me->elem.cont = multihop_message_intermon_forward_cont;
1161
1162            err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1163                    &me->elem.queue);
1164            assert(err_is_ok(err));
1165        }
1166        return;
1167    }
1168}
1169
1170// continue function to forward a message to a dispatcher
1171static void multihop_message_forward_continue(struct monitor_binding *b,
1172        struct monitor_msg_queue_elem *e)
1173{
1174
1175    struct monitor_multihop_message_forwarding_state *st =
1176            (struct monitor_multihop_message_forwarding_state *) e;
1177
1178    multihop_message_monitor_forward(b, st->args.vci, st->args.direction,
1179            st->args.flags, st->args.ack, st->args.payload, st->args.size,
1180            false);
1181    free(e);
1182}
1183
1184/**
1185 * \brief Forward a message to a dispatcher
1186 */
1187static inline void multihop_message_monitor_forward(struct monitor_binding *b,
1188        multihop_vci_t vci, uint8_t direction, uint8_t flags, uint32_t ack,
1189        const uint8_t *payload, size_t size, bool first_try)
1190{
1191
1192    errval_t err;
1193
1194    // try to forward message
1195    err = b->tx_vtbl.multihop_message(b, NOP_CONT, vci, direction,
1196            flags, ack, payload, size);
1197
1198    if (err_is_fail(err)) {
1199        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1200            struct monitor_multihop_message_forwarding_state *me = malloc(
1201                    sizeof(struct monitor_multihop_message_forwarding_state));
1202            assert(me != NULL);
1203            struct monitor_state *ist = b->st;
1204            me->args.vci = vci;
1205            me->args.direction = direction;
1206            me->args.flags = flags;
1207            me->args.ack = ack;
1208            memcpy(me->args.payload, payload, size);
1209
1210            me->args.size = size;
1211            me->elem.cont = multihop_message_forward_continue;
1212
1213            if (first_try) {
1214                err = monitor_enqueue_send(b, &ist->queue,
1215                        get_default_waitset(), &me->elem.queue);
1216            } else {
1217                err = monitor_enqueue_send_at_front(b, &ist->queue,
1218                        get_default_waitset(), &me->elem.queue);
1219            }
1220
1221            assert(err_is_ok(err));
1222            return;
1223        }
1224
1225        USER_PANIC_ERR(err, "failed forwarding multihop message\n");
1226    }
1227}
1228
1229///////////////////////////////////////////////////////
1230
1231// CAPABILITY FORWARDING
1232
1233///////////////////////////////////////////////////////
1234
1235static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1236        struct intermon_msg_queue_elem *e);
1237
1238static inline void multihop_cap_send_intermon_forward(
1239        struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1240        uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap,
1241        coreid_t owner);
1242
1243static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1244        struct monitor_msg_queue_elem *e);
1245
1246inline static void multihop_cap_send_forward(struct monitor_binding *b,
1247        multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1248        struct capref cap);
1249
1250// intermonitor capability forwarding state
1251struct multihop_intermon_capability_forwarding_state {
1252    struct intermon_msg_queue_elem elem;
1253    struct intermon_multihop_cap_send__tx_args args;
1254};
1255
1256// monitor capability forwarding state
1257struct multihop_capability_forwarding_state {
1258    struct monitor_msg_queue_elem elem;
1259    struct monitor_multihop_cap_send__tx_args args;
1260};
1261
1262/**
1263 * \brief Handle capability send request from local monitor.
1264 *        The capability must be forwarded to the next hop.
1265 *
1266 * \param monitor_binding
1267 * \param vci the virtual circuit identifier (VCI)
1268 * \param direction the direction
1269 * \param cap reference to the capability
1270 * \capid ID of the capability
1271 *
1272 */
1273static void multihop_cap_send_request_handler(
1274        struct monitor_binding *monitor_binding, multihop_vci_t vci,
1275        uint8_t direction, errval_t msgerr, struct capref cap, uint32_t capid)
1276{
1277
1278    MULTIHOP_DEBUG(
1279            "monitor on core %d received a capability (from local dispatcher). VCI %llu, direction %d, cap ID %d\n", my_core_id, (unsigned long long) vci, direction, capid);
1280
1281    errval_t err;
1282    struct capability capability;
1283    intermon_caprep_t caprep;
1284    coreid_t capowner = 0;
1285    memset(&caprep, 0, sizeof(caprep));
1286    bool null_cap = capref_is_null(cap);
1287
1288    // XXX: this field is ignored when the local dispatcher originates the cap
1289    msgerr = SYS_ERR_OK;
1290
1291    // get forwarding information
1292    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1293            vci);
1294    struct direction *dir = multihop_get_direction(chan_state, direction);
1295    struct intermon_binding *b = dir->binding.intermon_binding;
1296
1297    if (!null_cap) {
1298        // get binary representation of capability
1299        err = monitor_cap_identify(cap, &capability);
1300        if (err_is_fail(err)) {
1301            USER_PANIC_ERR(err, "monitor_cap_identify failed, ignored");
1302            return;
1303        }
1304
1305        err = monitor_get_domcap_owner(get_cap_domref(cap), &capowner);
1306        if (err_is_fail(err)) {
1307            USER_PANIC_ERR(err, "getting owner failed, ignored");
1308            return;
1309        }
1310
1311        // if we can't transfer the cap, it is delivered as NULL
1312        if (!monitor_can_send_cap(&capability)) {
1313            cap = NULL_CAP;
1314            null_cap = true;
1315            msgerr = MON_ERR_CAP_SEND;
1316        }
1317    }
1318
1319    if (!null_cap) {
1320        // FIXME: this seems to be totally bogus. it assumes a give_away cap -AB
1321
1322        // mark capability as remote
1323        err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1324        if (err_is_fail(err)) {
1325            USER_PANIC_ERR(err, "monitor_cap_remote failed");
1326            return;
1327        }
1328
1329        // XXX: This is a typedef of struct that flounder is generating.
1330        // Flounder should not be generating this and we shouldn't be using it.
1331        capability_to_caprep(&capability, &caprep);
1332
1333        // destroy capability on this core
1334        err = cap_destroy(cap);
1335        if (err_is_fail(err)) {
1336            USER_PANIC_ERR(err, "cap destroy failed");
1337        }
1338    }
1339
1340    // enqueue capability in order to be forwarded
1341    struct multihop_intermon_capability_forwarding_state *me = malloc(
1342            sizeof(struct multihop_intermon_capability_forwarding_state));
1343    struct intermon_state *ist = b->st;
1344    me->args.vci = dir->vci;
1345    me->args.direction = direction;
1346    me->args.capid = capid;
1347    me->args.err = msgerr;
1348    me->args.cap = caprep;
1349    me->args.null_cap = null_cap;
1350    me->args.owner = capowner;
1351    me->elem.cont = multihop_cap_send_intermon_forward_cont;
1352
1353    err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1354            &me->elem.queue);
1355    assert(err_is_ok(err));
1356}
1357
1358// continue function for intermonitor capability forwarding
1359static void multihop_cap_send_intermon_forward_cont(struct intermon_binding *b,
1360        struct intermon_msg_queue_elem *e)
1361{
1362    struct multihop_intermon_capability_forwarding_state *st =
1363            (struct multihop_intermon_capability_forwarding_state *) e;
1364    multihop_cap_send_intermon_forward(b, st->args.vci, st->args.direction,
1365        st->args.capid, st->args.err, st->args.cap, st->args.null_cap,
1366        st->args.owner);
1367    free(e);
1368}
1369
1370/**
1371 * \brief Forward capability to the next hop
1372 *
1373 */
1374static inline void multihop_cap_send_intermon_forward(
1375        struct intermon_binding *b, multihop_vci_t vci, uint8_t direction,
1376        uint32_t capid, errval_t msgerr, intermon_caprep_t caprep, bool null_cap,
1377        coreid_t owner)
1378{
1379
1380    errval_t err;
1381
1382    // try to forward
1383    err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, capid, msgerr,
1384            caprep, null_cap, owner);
1385
1386    if (err_is_fail(err)) {
1387        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1388            struct multihop_intermon_capability_forwarding_state *me =
1389                    malloc(
1390                            sizeof(struct multihop_intermon_capability_forwarding_state));
1391            struct intermon_state *ist = b->st;
1392            me->args.vci = vci;
1393            me->args.direction = direction;
1394            me->args.capid = capid;
1395            me->args.err = msgerr;
1396            me->args.cap = caprep;
1397            me->args.null_cap = null_cap;
1398            me->elem.cont = multihop_cap_send_intermon_forward_cont;
1399
1400            err = intermon_enqueue_send_at_front(b, &ist->queue,
1401                    get_default_waitset(), &me->elem.queue);
1402            assert(err_is_ok(err));
1403            return;
1404        }
1405
1406        USER_PANIC_ERR(err,
1407                "Could not forward capability over multi-hop channel\n");
1408    }
1409}
1410
1411/**
1412 * \brief Handle a capability coming from another monitor.
1413 *        The capability must be either forwarded to another
1414 *        monitor or to a local dispatcher.
1415 *
1416 */
1417static void multihop_intermon_cap_send_handler(
1418        struct intermon_binding *intermon_binding, multihop_vci_t vci,
1419        uint8_t direction, uint32_t capid, errval_t msgerr,
1420        intermon_caprep_t caprep, bool null_cap, coreid_t owner)
1421{
1422
1423    MULTIHOP_DEBUG(
1424            "monitor on core %d received a capability (from other monitor). VCI %llu, direction %d, cap ID %d, owner %d\n", my_core_id, (unsigned long long) vci, direction, capid, owner);
1425
1426    errval_t err;
1427    struct monitor_multihop_chan_state *chan_state = forwarding_table_lookup(
1428            vci);
1429    struct direction *dir = multihop_get_direction(chan_state, direction);
1430
1431    if (dir->type == MULTIHOP_ENDPOINT) {
1432        // we have to forward the message to a local dispatcher
1433
1434        // Construct the capability
1435        struct capability *capability = (struct capability *) &caprep;
1436        struct capref cap;
1437
1438        if (null_cap) {
1439            cap = NULL_CAP;
1440        } else {
1441            err = slot_alloc(&cap);
1442            if (err_is_fail(err)) {
1443
1444                // send a msg indicating that we failed
1445                // to allocate a slot for the capability
1446                cap = NULL_CAP;
1447                msgerr = err;
1448                goto do_send;
1449            }
1450
1451            // create capability
1452            err = monitor_cap_create(cap, capability, owner);
1453            if (err_is_fail(err)) {
1454                slot_free(cap);
1455
1456                // send a msg indicating that we failed
1457                // to create the capability
1458                cap = NULL_CAP;
1459                msgerr = err_push(err, MON_ERR_CAP_CREATE);
1460                goto do_send;
1461            }
1462
1463            // mark capability as remote
1464            err = monitor_remote_relations(cap, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
1465            if (err_is_fail(err)) {
1466                USER_PANIC_ERR(err, "monitor_cap_remote failed");
1467                return;
1468            }
1469        }
1470
1471do_send: ;
1472        // enqueue the capability in order to be forwarded to
1473        // the local dispatcher
1474        struct monitor_binding *b = dir->binding.monitor_binding;
1475        struct multihop_capability_forwarding_state *me = malloc(
1476                sizeof(struct multihop_capability_forwarding_state));
1477        assert(me != NULL);
1478        struct monitor_state *ist = b->st;
1479        me->args.vci = dir->vci;
1480        me->args.direction = direction;
1481        me->args.cap = cap;
1482        me->args.capid = capid;
1483        me->args.err = msgerr;
1484        me->elem.cont = multihop_cap_send_forward_cont;
1485
1486        err = monitor_enqueue_send(b, &ist->queue, get_default_waitset(),
1487                &me->elem.queue);
1488        assert(err_is_ok(err));
1489        return;
1490
1491    } else {
1492        // we have to forward the capability to the next hop
1493        // we therefore enqueue the capability
1494        struct intermon_binding *b = dir->binding.intermon_binding;
1495        struct multihop_intermon_capability_forwarding_state *me = malloc(
1496                sizeof(struct multihop_intermon_capability_forwarding_state));
1497        struct intermon_state *ist = b->st;
1498        me->args.vci = dir->vci;
1499        me->args.direction = direction;
1500        me->args.capid = capid;
1501        me->args.err = msgerr;
1502        me->args.cap = caprep;
1503        me->args.null_cap = null_cap;
1504        me->elem.cont = multihop_cap_send_intermon_forward_cont;
1505
1506        err = intermon_enqueue_send(b, &ist->queue, get_default_waitset(),
1507                &me->elem.queue);
1508        assert(err_is_ok(err));
1509        return;
1510    }
1511}
1512
1513// continue function for monitor capability forwarding
1514static void multihop_cap_send_forward_cont(struct monitor_binding *b,
1515        struct monitor_msg_queue_elem *e)
1516{
1517    struct multihop_capability_forwarding_state *st =
1518            (struct multihop_capability_forwarding_state *) e;
1519    multihop_cap_send_forward(b, st->args.vci, st->args.direction,
1520                              st->args.capid, st->args.err, st->args.cap);
1521    free(e);
1522}
1523
1524/**
1525 * \brief Forward capability to a local dispatcher
1526 *
1527 */
1528inline static void multihop_cap_send_forward(struct monitor_binding *b,
1529        multihop_vci_t vci, uint8_t direction, uint32_t capid, errval_t msgerr,
1530        struct capref cap)
1531{
1532    errval_t err;
1533
1534// try to send
1535    err = b->tx_vtbl.multihop_cap_send(b, NOP_CONT, vci, direction, msgerr,
1536                                       cap, capid);
1537
1538    if (err_is_fail(err)) {
1539        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
1540            struct multihop_capability_forwarding_state *me = malloc(
1541                    sizeof(struct multihop_capability_forwarding_state));
1542            assert(me != NULL);
1543            struct monitor_state *ist = b->st;
1544            me->args.vci = vci;
1545            me->args.direction = direction;
1546            me->args.cap = cap;
1547            me->args.capid = capid;
1548            me->args.err = msgerr;
1549            me->elem.cont = multihop_cap_send_forward_cont;
1550
1551            err = monitor_enqueue_send_at_front(b, &ist->queue,
1552                    get_default_waitset(), &me->elem.queue);
1553            assert(err_is_ok(err));
1554            return;
1555        }
1556
1557        USER_PANIC_ERR(err,
1558                "failed to forward capability over multi-hop channel\n");
1559    }
1560}
1561
1562///////////////////////////////////////////////////////
1563
1564// INITIALIZATION
1565
1566///////////////////////////////////////////////////////
1567
1568// set up receive vtable in the intermonitor interface
1569errval_t multihop_intermon_init(struct intermon_binding *ib)
1570{
1571    ib->rx_vtbl.bind_multihop_intermon_request =
1572            &multihop_intermon_bind_request_handler;
1573    ib->rx_vtbl.bind_multihop_intermon_reply =
1574            &multihop_intermon_bind_reply_handler;
1575    ib->rx_vtbl.multihop_message = &intermon_multihop_message_handler;
1576    ib->rx_vtbl.multihop_cap_send = &multihop_intermon_cap_send_handler;
1577    ib->rx_vtbl.multihop_routing_table_request =
1578            &multihop_handle_routing_table_request;
1579    ib->rx_vtbl.multihop_routing_table_response =
1580            &multihop_handle_routing_table_response;
1581    ib->rx_vtbl.multihop_routing_table_grow =
1582            &multihop_routing_table_grow;
1583
1584    return SYS_ERR_OK;
1585}
1586
1587// set up receive vtable in the monitor interface
1588errval_t multihop_monitor_init(struct monitor_binding *mb)
1589{
1590    mb->rx_vtbl.multihop_bind_client_request =
1591            &multihop_monitor_bind_request_handler;
1592    mb->rx_vtbl.multihop_bind_service_reply =
1593            &multihop_monitor_service_bind_reply_handler;
1594    mb->rx_vtbl.multihop_message = &multihop_message_handler;
1595    mb->rx_vtbl.multihop_cap_send = &multihop_cap_send_request_handler;
1596    mb->rx_vtbl.multihop_routing_table_new =
1597            &multihop_routing_table_new;
1598    mb->rx_vtbl.multihop_routing_table_set =
1599            &multihop_routing_table_set;
1600
1601    return SYS_ERR_OK;
1602}
1603