1/*
2 * Copyright (c) 2012, 2016 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <barrelfish/deferred.h>
12#include "capsend.h"
13#include "monitor.h"
14#include "capops.h"
15#include "internal.h"
16
17/*
18 * Single-cast {{{1
19 */
20
21errval_t
22capsend_target(coreid_t dest, struct msg_queue_elem *queue_elem)
23{
24    errval_t err;
25
26    // get destination intermon_binding and _state
27    struct intermon_binding *dest_b;
28    err = intermon_binding_get(dest, &dest_b);
29    if (err_is_fail(err)) {
30        return err;
31    }
32    DEBUG_CAPOPS("capsend_target: ->%d (%p)\n", dest, queue_elem);
33    struct intermon_state *inter_st = (struct intermon_state*)dest_b->st;
34    if (!inter_st->capops_ready) {
35        // XXX: custom error value
36        return MON_ERR_CAPOPS_BUSY;
37    }
38
39    // enqueue message
40    return intermon_enqueue_send(dest_b, &inter_st->queue, dest_b->waitset, queue_elem);
41}
42
43errval_t
44capsend_owner(struct domcapref capref, struct msg_queue_elem *queue_elem)
45{
46    errval_t err;
47
48    // read cap owner
49    coreid_t owner;
50    err = monitor_get_domcap_owner(capref, &owner);
51    if (err_is_fail(err)) {
52        return err;
53    }
54
55    // enqueue to owner
56    return capsend_target(owner, queue_elem);
57}
58
59/*
60 * Multicast helpers {{{2
61 */
62
63struct capsend_mc_msg_st;
64struct capsend_mc_st;
65
66typedef errval_t (*capsend_mc_send_cont_t)(struct intermon_binding*, struct capsend_mc_st*);
67
68struct capsend_mc_msg_st {
69    struct intermon_msg_queue_elem queue_elem;
70    struct capsend_mc_st *mc_st;
71    coreid_t dest;
72};
73
74static void
75capsend_mc_send_cont(struct intermon_binding *b, struct intermon_msg_queue_elem *e)
76{
77    struct capsend_mc_msg_st *msg_st = (struct capsend_mc_msg_st*)e;
78    struct capsend_mc_st *mc_st = msg_st->mc_st;
79    errval_t err = SYS_ERR_OK;
80
81    // if do_send is false, an error occured in the multicast setup, so do not
82    // send anything
83    if (mc_st->do_send) {
84        err = mc_st->send_fn(b, &mc_st->caprep, mc_st);
85    }
86
87    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
88        err = capsend_target(msg_st->dest, (struct msg_queue_elem*)msg_st);
89    }
90
91    if (err_is_fail(err)) {
92        USER_PANIC_ERR(err, "sending dequeued capops message");
93    }
94
95    // decrement counter of number of queued messages
96    if (!--mc_st->num_queued) {
97        // if counter is zero, cleanup outgoing memory
98        free(mc_st->msg_st_arr);
99        mc_st->msg_st_arr = NULL;
100        if (!mc_st->do_send || !mc_st->num_pending) {
101            // if the send has been aborted, also cleanup cross-call state
102            free(mc_st);
103        }
104    }
105}
106
107static errval_t
108capsend_mc_enqueue(struct capsend_mc_st *mc_st, coreid_t dest)
109{
110    errval_t err;
111
112    // get next msg_st
113    struct capsend_mc_msg_st *msg_st = &mc_st->msg_st_arr[mc_st->num_queued];
114    msg_st->queue_elem.cont = capsend_mc_send_cont;
115    msg_st->mc_st = mc_st;
116    msg_st->dest = dest;
117
118    err = capsend_target(dest, (struct msg_queue_elem*)msg_st);
119    if (err_is_ok(err)) {
120        // count successful enqueue
121        mc_st->num_queued++;
122        if (mc_st->num_pending >= 0) {
123            // also track number of pending exchanges if requested
124            mc_st->num_pending++;
125        }
126    }
127    return err;
128}
129
130static errval_t
131capsend_mc_init(struct capsend_mc_st *mc_st, struct capability *cap,
132                capsend_send_fn send_fn,
133                size_t num_dests, bool track_pending)
134{
135    mc_st->num_queued = 0;
136    mc_st->num_pending = track_pending ? 0 : -1;
137    mc_st->do_send = true;
138    mc_st->send_fn = send_fn;
139    if (cap) {
140        capability_to_caprep(cap, &mc_st->caprep);
141    }
142    mc_st->msg_st_arr = calloc(num_dests, sizeof(*mc_st->msg_st_arr));
143    if (!mc_st->msg_st_arr) {
144        return LIB_ERR_MALLOC_FAIL;
145    }
146    return SYS_ERR_OK;
147}
148
149bool capsend_handle_mc_reply(struct capsend_mc_st *st)
150{
151    // return true iff st->num_pending == 0 after acking one more reply
152    return --st->num_pending == 0;
153}
154
155/*
156 * Broadcast helpers {{{2
157 */
158
159static errval_t
160capsend_broadcast(struct capsend_mc_st *bc_st, struct capsend_destset *dests,
161        struct capability *cap, capsend_send_fn send_cont)
162{
163    DEBUG_CAPOPS("%s\n", __FUNCTION__);
164    errval_t err;
165    size_t dest_count;
166    bool init_destset = false;
167    size_t online_monitors = num_monitors_ready_for_capops();
168    // do not count self when calculating #dest cores
169    dest_count = online_monitors - 1;
170    DEBUG_CAPOPS("%s: dest_count = %zu\n", __FUNCTION__, dest_count);
171    DEBUG_CAPOPS("%s: num_queued = %d\n", __FUNCTION__, bc_st->num_queued);
172    DEBUG_CAPOPS("%s: num_pending = %d\n", __FUNCTION__, bc_st->num_pending);
173    if (dests && dests->set == NULL) {
174        dests->set = calloc(dest_count, sizeof(coreid_t));
175        dests->capacity = dest_count;
176        dests->count = 0;
177        init_destset = true;
178    } else if (dests) {
179        dest_count = dests->count;
180    }
181    err = capsend_mc_init(bc_st, cap, send_cont, dest_count, true);
182    if (err_is_fail(err)) {
183        free(bc_st);
184    }
185
186    if (init_destset || !dests) {
187        for (coreid_t dest = 0; dest < MAX_COREID && bc_st->num_queued < dest_count; dest++)
188        {
189            if (dest == my_core_id) {
190                // do not send to self
191                continue;
192            }
193            err = capsend_mc_enqueue(bc_st, dest);
194            if (err_is_ok(err) && dests) {
195                // if we're initializing destination set, add destination
196                // cores that we were able to enqueue msg for to set.
197                dests->set[dests->count++] = dest;
198            }
199            if (err_no(err) == MON_ERR_NO_MONITOR_FOR_CORE) {
200                // no connection for this core, skip
201                continue;
202            } else if (err_no(err) == MON_ERR_CAPOPS_BUSY) {
203                debug_printf("monitor.%d not ready to participate in distops, skipping\n",
204                        dest);
205            } else if (err_is_fail(err)) {
206                // failure, disable broadcast
207                bc_st->do_send = false;
208                if (!bc_st->num_queued) {
209                    // only cleanup of no messages have been enqueued
210                    free(bc_st->msg_st_arr);
211                    free(bc_st);
212                }
213                return err;
214            }
215        }
216    } else {
217        for (int i = 0; i < dest_count; i++) {
218            coreid_t dest = dests->set[i];
219
220            err = capsend_mc_enqueue(bc_st, dest);
221            if (err_no(err) == MON_ERR_NO_MONITOR_FOR_CORE) {
222                // no connection for this core, skip
223                continue;
224            } else if (err_no(err) == MON_ERR_CAPOPS_BUSY) {
225                debug_printf("monitor.%d not ready to participate in distops, skipping\n",
226                        dest);
227            } else if (err_is_fail(err)) {
228                // failure, disable broadcast
229                bc_st->do_send = false;
230                if (!bc_st->num_queued) {
231                    // only cleanup of no messages have been enqueued
232                    free(bc_st->msg_st_arr);
233                    free(bc_st);
234                }
235                return err;
236            }
237        }
238    }
239
240    if (!bc_st->num_pending && dest_count > 1) {
241        // XXX: needs sane error -SG
242        return MON_ERR_NO_MONITOR_FOR_CORE;
243    }
244
245    return SYS_ERR_OK;
246}
247
248/*
249 * Find relations {{{1
250 */
251
252/*
253 * Find copies {{{2
254 */
255
256/*
257 * Find copies broadcast {{{3
258 */
259
260struct find_cap_broadcast_msg_st;
261
262struct find_cap_broadcast_st {
263    struct capsend_mc_st bc;
264    capsend_find_cap_result_fn result_handler;
265    bool found;
266    void *st;
267};
268
269static errval_t
270find_cap_broadcast_send_cont(struct intermon_binding *b, intermon_caprep_t *caprep, struct capsend_mc_st *st)
271{
272    DEBUG_CAPOPS("%s\n", __FUNCTION__);
273    return intermon_capops_find_cap__tx(b, NOP_CONT, *caprep, (uintptr_t)st);
274}
275
276errval_t
277capsend_find_cap(struct capability *cap, capsend_find_cap_result_fn result_handler, void *st)
278{
279    DEBUG_CAPOPS("%s\n", __FUNCTION__);
280    struct find_cap_broadcast_st *bc_st = calloc(1, sizeof(struct find_cap_broadcast_st));
281    if (!bc_st) {
282        return LIB_ERR_MALLOC_FAIL;
283    }
284    bc_st->result_handler = result_handler;
285    bc_st->found = false;
286    bc_st->st = st;
287
288    return capsend_broadcast((struct capsend_mc_st*)bc_st, NULL, cap, find_cap_broadcast_send_cont);
289}
290
291/*
292 * Find copies result {{{3
293 */
294
295struct find_cap_result_msg_st {
296    struct intermon_msg_queue_elem queue_elem;
297    errval_t result;
298    genvaddr_t st;
299};
300
301static void
302find_cap_result_send_cont(struct intermon_binding *b, struct intermon_msg_queue_elem *e)
303{
304    DEBUG_CAPOPS("%s\n", __FUNCTION__);
305    errval_t err;
306    struct find_cap_result_msg_st *msg_st = (struct find_cap_result_msg_st*)e;
307
308    err = intermon_capops_find_cap_result__tx(b, NOP_CONT, msg_st->result, msg_st->st);
309
310    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
311        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
312        struct intermon_state *inter_st = (struct intermon_state *)b->st;
313        // requeue send request at front and return
314        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
315                                             (struct msg_queue_elem *)e);
316        GOTO_IF_ERR(err, handle_err);
317        return;
318    }
319
320handle_err:
321    if (err_is_fail(err)) {
322        USER_PANIC_ERR(err, "failed to send find_cap_result message");
323    }
324    free(msg_st);
325}
326
327static errval_t
328find_cap_result(coreid_t dest, errval_t result, genvaddr_t st)
329{
330    DEBUG_CAPOPS("%s\n", __FUNCTION__);
331    errval_t err;
332    struct find_cap_result_msg_st *msg_st = calloc(1, sizeof(struct find_cap_result_msg_st));
333    if (!msg_st) {
334        return LIB_ERR_MALLOC_FAIL;
335    }
336    msg_st->queue_elem.cont = find_cap_result_send_cont;
337    msg_st->result = result;
338    msg_st->st = st;
339
340    err = capsend_target(dest, (struct msg_queue_elem*)msg_st);
341    if (err_is_fail(err)) {
342        free(msg_st);
343    }
344
345    return err;
346}
347
348/*
349 * Find copies receive handlers {{{3
350 */
351
352void
353find_cap__rx_handler(struct intermon_binding *b, intermon_caprep_t caprep, genvaddr_t st)
354{
355    DEBUG_CAPOPS("%s\n", __FUNCTION__);
356    errval_t err, cleanup_err;
357    struct intermon_state *inter_st = (struct intermon_state*)b->st;
358    coreid_t from = inter_st->core_id;
359    struct capability cap;
360    caprep_to_capability(&caprep, &cap);
361    struct capref capref;
362
363    err = slot_alloc(&capref);
364    if (err_is_fail(err)) {
365        goto send_err;
366    }
367
368    err = monitor_copy_if_exists(&cap, capref);
369    if (err_is_fail(err)) {
370        goto free_slot;
371    }
372
373    cleanup_err = cap_delete(capref);
374    if (err_is_fail(cleanup_err)) {
375        USER_PANIC_ERR(err, "failed to delete temporary cap");
376    }
377
378free_slot:
379    cleanup_err = slot_free(capref);
380    if (err_is_fail(cleanup_err)) {
381        USER_PANIC_ERR(err, "failed to free slot for temporary cap");
382    }
383
384send_err:
385    cleanup_err = find_cap_result(from, err, st);
386    if (err_is_fail(cleanup_err)) {
387        USER_PANIC_ERR(err, "failed to send find_cap result");
388    }
389}
390
391void
392find_cap_result__rx_handler(struct intermon_binding *b, errval_t result, genvaddr_t st)
393{
394    DEBUG_CAPOPS("%s\n", __FUNCTION__);
395    // if we receive a positive result, immediately forward to caller
396    lvaddr_t lst = (lvaddr_t)st;
397    struct find_cap_broadcast_st *fc_bc_st = (struct find_cap_broadcast_st*)lst;
398    if (err_is_ok(result)) {
399        if (!fc_bc_st->found) {
400            fc_bc_st->found = true;
401            struct intermon_state *inter_st = (struct intermon_state*)b->st;
402            coreid_t from = inter_st->core_id;
403            fc_bc_st->result_handler(SYS_ERR_OK, from, fc_bc_st->st);
404        }
405    }
406    else if (err_no(result) != SYS_ERR_CAP_NOT_FOUND) {
407        DEBUG_ERR(result, "ignoring bad find_cap_result");
408    }
409
410    // check to see if broadcast is complete
411    if (capsend_handle_mc_reply(&fc_bc_st->bc)) {
412        if (!fc_bc_st->found) {
413            // broadcast did not find a core, report notfound to caller
414            fc_bc_st->result_handler(SYS_ERR_CAP_NOT_FOUND, 0, fc_bc_st->st);
415        }
416        free(fc_bc_st);
417    }
418}
419
420/*
421 * Find descendants {{{2
422 */
423
424struct find_descendants_mc_st {
425    struct capsend_mc_st mc_st;
426    capsend_result_fn result_fn;
427    void *st;
428    bool have_result;
429};
430
431static errval_t
432find_descendants_send_cont(struct intermon_binding *b, intermon_caprep_t *caprep, struct capsend_mc_st *mc_st)
433{
434    DEBUG_CAPOPS("%s\n", __FUNCTION__);
435    lvaddr_t lst = (lvaddr_t)mc_st;
436    return intermon_capops_find_descendants__tx(b, NOP_CONT, *caprep, (genvaddr_t)lst);
437}
438
439errval_t
440capsend_find_descendants(struct domcapref src, capsend_result_fn result_fn, void *st)
441{
442    DEBUG_CAPOPS("%s\n", __FUNCTION__);
443    errval_t err;
444
445    struct capability cap;
446    err = monitor_domains_cap_identify(src.croot, src.cptr, src.level, &cap);
447    if (err_is_fail(err)) {
448        return err;
449    }
450
451    struct find_descendants_mc_st *mc_st;
452    mc_st = malloc(sizeof(*mc_st));
453    if (!mc_st) {
454        return LIB_ERR_MALLOC_FAIL;
455    }
456
457    mc_st->result_fn = result_fn;
458    mc_st->st = st;
459    mc_st->have_result = false;
460    DEBUG_CAPOPS("%s: broadcasting find_descendants\n", __FUNCTION__);
461    return capsend_relations(&cap, find_descendants_send_cont,
462            (struct capsend_mc_st*)mc_st, NULL);
463}
464
465
466struct find_descendants_result_msg_st {
467    struct intermon_msg_queue_elem queue_elem;
468    errval_t status;
469    genvaddr_t st;
470};
471
472static void
473find_descendants_result_send_cont(struct intermon_binding *b, struct intermon_msg_queue_elem *e)
474{
475    DEBUG_CAPOPS("%s\n", __FUNCTION__);
476    errval_t err;
477    struct find_descendants_result_msg_st *msg_st;
478    msg_st = (struct find_descendants_result_msg_st*)e;
479    err = intermon_capops_find_descendants_result__tx(b, NOP_CONT, msg_st->status, msg_st->st);
480
481    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
482        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
483        struct intermon_state *inter_st = (struct intermon_state *)b->st;
484        // requeue send request at front and return
485        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
486                                             (struct msg_queue_elem *)e);
487        GOTO_IF_ERR(err, handle_err);
488        return;
489    }
490
491handle_err:
492    free(msg_st);
493    if (err_is_fail(err)) {
494        USER_PANIC_ERR(err, "could not send find_descendants_result");
495    }
496}
497
498void
499find_descendants__rx_handler(struct intermon_binding *b, intermon_caprep_t caprep, genvaddr_t st)
500{
501    DEBUG_CAPOPS("%s\n", __FUNCTION__);
502    errval_t err;
503
504    struct intermon_state *inter_st = (struct intermon_state*)b->st;
505    coreid_t from = inter_st->core_id;
506
507    struct capability cap;
508    caprep_to_capability(&caprep, &cap);
509
510    bool has_descendants;
511    err = monitor_has_descendants(&cap, &has_descendants);
512    assert(err_is_ok(err));
513
514    struct find_descendants_result_msg_st *msg_st;
515    msg_st = malloc(sizeof(*msg_st));
516    if (!msg_st) {
517        err = LIB_ERR_MALLOC_FAIL;
518        USER_PANIC_ERR(err, "could not alloc find_descendants_result_msg_st");
519    }
520    msg_st->queue_elem.cont = find_descendants_result_send_cont;
521    msg_st->st = st;
522
523    if (err_is_ok(err)) {
524        err = has_descendants ? SYS_ERR_OK : SYS_ERR_CAP_NOT_FOUND;
525    }
526    msg_st->status = err;
527
528    err = capsend_target(from, (struct msg_queue_elem*)msg_st);
529    if (err_is_fail(err)) {
530        USER_PANIC_ERR(err, "could not enqueue find_descendants_result msg");
531    }
532}
533
534void
535find_descendants_result__rx_handler(struct intermon_binding *b, errval_t status, genvaddr_t st)
536{
537    DEBUG_CAPOPS("%s\n", __FUNCTION__);
538    lvaddr_t lst = (lvaddr_t) st;
539    struct find_descendants_mc_st *mc_st = (struct find_descendants_mc_st*)lst;
540
541    if (err_is_ok(status)) {
542        // found result
543        if (!mc_st->have_result) {
544            mc_st->have_result = true;
545            mc_st->result_fn(SYS_ERR_OK, mc_st->st);
546        }
547    }
548    else if (err_no(status) != SYS_ERR_CAP_NOT_FOUND) {
549        DEBUG_ERR(status, "ignoring bad find_descendants result");
550    }
551
552    if (capsend_handle_mc_reply(&mc_st->mc_st)) {
553        if (!mc_st->have_result) {
554            mc_st->result_fn(SYS_ERR_CAP_NOT_FOUND, mc_st->st);
555        }
556        free(mc_st);
557    }
558}
559
560
561/*
562 * Check retypeability {{{1
563 */
564
565struct check_retypeable_mc_st {
566    struct capsend_mc_st mc_st;
567    capsend_result_fn result_fn;
568    void *st;
569    // msg args
570    gensize_t offset;
571    gensize_t objsize;
572    size_t count;
573    bool have_result;
574};
575
576static errval_t
577check_retypeable_send_cont(struct intermon_binding *b, intermon_caprep_t *caprep, struct capsend_mc_st *mc_st)
578{
579    DEBUG_CAPOPS("%s\n", __FUNCTION__);
580    lvaddr_t lst = (lvaddr_t)mc_st;
581    struct check_retypeable_mc_st *rst = (struct check_retypeable_mc_st *)mc_st;
582    return intermon_capops_check_retypeable__tx(b, NOP_CONT, *caprep,
583                (genvaddr_t)lst, rst->offset, rst->objsize, rst->count);
584}
585
586errval_t
587capsend_check_retypeable(struct domcapref src, gensize_t offset, gensize_t objsize,
588                         size_t count, capsend_result_fn result_fn, void *st)
589{
590    TRACE(CAPOPS, CAPSEND_CHECK_RETYPEABLE, 0);
591    DEBUG_CAPOPS("%s\n", __FUNCTION__);
592    errval_t err;
593
594    struct capability cap;
595    err = monitor_domains_cap_identify(src.croot, src.cptr, src.level, &cap);
596    if (err_is_fail(err)) {
597        return err;
598    }
599
600    struct check_retypeable_mc_st *mc_st;
601    mc_st = malloc(sizeof(*mc_st));
602    if (!mc_st) {
603        return LIB_ERR_MALLOC_FAIL;
604    }
605
606    // Setup multicast state
607    mc_st->result_fn   = result_fn;
608    mc_st->st          = st;
609    mc_st->offset      = offset;
610    mc_st->objsize     = objsize;
611    mc_st->count       = count;
612    mc_st->have_result = false;
613
614    DEBUG_CAPOPS("%s: broadcasting check_retypeable\n", __FUNCTION__);
615    return capsend_relations(&cap, check_retypeable_send_cont,
616            (struct capsend_mc_st*)mc_st, NULL);
617}
618
619
620struct check_retypeable_result_msg_st {
621    struct intermon_msg_queue_elem queue_elem;
622    errval_t status;
623    genvaddr_t st;
624};
625
626static void
627check_retypeable_result_send_cont(struct intermon_binding *b, struct intermon_msg_queue_elem *e)
628{
629    DEBUG_CAPOPS("%s\n", __FUNCTION__);
630    errval_t err;
631    struct check_retypeable_result_msg_st *msg_st;
632    msg_st = (struct check_retypeable_result_msg_st*)e;
633    err = intermon_capops_check_retypeable_result__tx(b, NOP_CONT, msg_st->status, msg_st->st);
634
635    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
636        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
637        struct intermon_state *inter_st = (struct intermon_state *)b->st;
638        // requeue send request at front and return
639        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
640                                             (struct msg_queue_elem *)e);
641        GOTO_IF_ERR(err, handle_err);
642        return;
643    }
644
645handle_err:
646    free(msg_st);
647    if (err_is_fail(err)) {
648        USER_PANIC_ERR(err, "could not send check_retypeable_result");
649    }
650}
651
652void
653check_retypeable__rx_handler(struct intermon_binding *b, intermon_caprep_t caprep,
654                             genvaddr_t st, uint64_t offset, uint64_t objsize,
655                             uint64_t count)
656{
657    TRACE(CAPOPS, CAPSEND_CHECK_RETYPEABLE_RX, 0);
658    DEBUG_CAPOPS("%s\n", __FUNCTION__);
659    errval_t err;
660
661    struct intermon_state *inter_st = (struct intermon_state*)b->st;
662    coreid_t from = inter_st->core_id;
663
664    struct capability cap;
665    caprep_to_capability(&caprep, &cap);
666
667    err = monitor_is_retypeable(&cap, offset, objsize, count);
668
669    DEBUG_CAPOPS("%s: got %s from kernel\n", __FUNCTION__, err_getcode(err));
670
671    struct check_retypeable_result_msg_st *msg_st;
672    msg_st = malloc(sizeof(*msg_st));
673    if (!msg_st) {
674        err = LIB_ERR_MALLOC_FAIL;
675        USER_PANIC_ERR(err, "could not alloc check_retypeable_result_msg_st");
676    }
677    msg_st->queue_elem.cont = check_retypeable_result_send_cont;
678    msg_st->st = st;
679    msg_st->status = err;
680
681    err = capsend_target(from, (struct msg_queue_elem*)msg_st);
682    if (err_is_fail(err)) {
683        USER_PANIC_ERR(err, "could not enqueue check_retypeable_result msg");
684    }
685}
686
687void
688check_retypeable_result__rx_handler(struct intermon_binding *b, errval_t status, genvaddr_t st)
689{
690    TRACE(CAPOPS, CAPSEND_CHECK_RETYPEABLE_RESULT_RX, 0);
691    DEBUG_CAPOPS("%s: got %s from %d\n", __FUNCTION__, err_getcode(status),
692                 ((struct intermon_state *) b->st)->core_id);
693    lvaddr_t lst = (lvaddr_t) st;
694    struct check_retypeable_mc_st *mc_st = (struct check_retypeable_mc_st*)lst;
695
696    // Short-circuit when we get SYS_ERR_REVOKE_FIRST
697    if (err_no(status) == SYS_ERR_REVOKE_FIRST) {
698        if (!mc_st->have_result) {
699            DEBUG_CAPOPS("%s: short-circuit with status=%s\n", __FUNCTION__,
700                    err_getcode(status));
701            mc_st->have_result = true;
702            mc_st->result_fn(status, mc_st->st);
703        }
704    }
705
706    if (capsend_handle_mc_reply(&mc_st->mc_st)) {
707        // If we haven't called the callback yet, call it now with the last
708        // status value. Calling code needs to figure out what
709        // SYS_ERR_CAP_NOT_FOUND means.
710        if (!mc_st->have_result) {
711            DEBUG_CAPOPS("%s: notifying caller with final status=%s\n", __FUNCTION__,
712                    err_getcode(status));
713            mc_st->result_fn(status, mc_st->st);
714        }
715        free(mc_st);
716    }
717}
718
719/*
720 * Ownership update {{{1
721 */
722
723/*
724 * Update owner broadcast {{{2
725 */
726
727struct update_owner_broadcast_st {
728    struct capsend_mc_st bc;
729    struct event_closure completion_continuation;
730};
731
732static errval_t
733update_owner_broadcast_send_cont(struct intermon_binding *b, intermon_caprep_t *caprep, struct capsend_mc_st *bc_st)
734{
735    lvaddr_t lst = (lvaddr_t)bc_st;
736    return intermon_capops_update_owner__tx(b, NOP_CONT, *caprep, (genvaddr_t)lst);
737}
738
739errval_t
740capsend_update_owner(struct domcapref capref, struct event_closure completion_continuation)
741{
742    errval_t err;
743    struct capability cap;
744    err = monitor_domains_cap_identify(capref.croot, capref.cptr, capref.level,
745                                       &cap);
746    if (err_is_fail(err)) {
747        return err;
748    }
749
750    struct update_owner_broadcast_st *bc_st = calloc(1, sizeof(struct update_owner_broadcast_st));
751    if (!bc_st) {
752        return LIB_ERR_MALLOC_FAIL;
753    }
754    bc_st->completion_continuation = completion_continuation;
755
756    return capsend_broadcast((struct capsend_mc_st*)bc_st, NULL, &cap, update_owner_broadcast_send_cont);
757}
758
759/*
760 * Owner updated response {{{2
761 */
762
763struct owner_updated_msg_st {
764    struct intermon_msg_queue_elem queue_elem;
765    genvaddr_t st;
766};
767
768static void
769owner_updated_send_cont(struct intermon_binding *b, struct intermon_msg_queue_elem *e)
770{
771    errval_t err;
772    struct owner_updated_msg_st *msg_st = (struct owner_updated_msg_st*)e;
773
774    err = intermon_capops_owner_updated__tx(b, NOP_CONT, msg_st->st);
775
776    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
777        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
778        struct intermon_state *inter_st = (struct intermon_state *)b->st;
779        // requeue send request at front and return
780        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
781                                             (struct msg_queue_elem *)e);
782        GOTO_IF_ERR(err, handle_err);
783        return;
784    }
785
786handle_err:
787    if (err_is_fail(err)) {
788        USER_PANIC_ERR(err, "failed to send owner_updated message");
789    }
790    free(msg_st);
791}
792
793static errval_t
794owner_updated(coreid_t owner, genvaddr_t st)
795{
796    errval_t err;
797    struct owner_updated_msg_st *msg_st = calloc(1, sizeof(struct owner_updated_msg_st));
798    if (!msg_st) {
799        return LIB_ERR_MALLOC_FAIL;
800    }
801    msg_st->queue_elem.cont = owner_updated_send_cont;
802    msg_st->st = st;
803
804    err = capsend_target(owner, (struct msg_queue_elem*)msg_st);
805    if (err_is_fail(err)) {
806        free(msg_st);
807    }
808
809    return err;
810}
811
812/*
813 * Receive handlers {{{2
814 */
815
816void
817owner_updated__rx_handler(struct intermon_binding *b, genvaddr_t st)
818{
819    lvaddr_t lst = (lvaddr_t)st;
820    struct update_owner_broadcast_st *uo_bc_st = (struct update_owner_broadcast_st*)lst;
821    if (!capsend_handle_mc_reply(&uo_bc_st->bc)) {
822        // broadcast is not complete
823        return;
824    }
825    struct event_closure *cl = &uo_bc_st->completion_continuation;
826    cl->handler(cl->arg);
827    free(uo_bc_st);
828}
829
830struct delayed_cleanup_st {
831    struct deferred_event d;
832    struct event_closure ev;
833    struct capref capref;
834    delayus_t delay;
835};
836
837static void defer_free_owner_rx_cap(struct delayed_cleanup_st *st)
838{
839    errval_t err;
840    deferred_event_init(&st->d);
841    err = deferred_event_register(&st->d, get_default_waitset(), st->delay, st->ev);
842    if (err_is_fail(err)) {
843        DEBUG_ERR(err, "unable to register deferred event, leaking cap");
844        free(st);
845    }
846}
847
848static void free_owner_rx_cap(void *st_)
849{
850    errval_t err;
851    struct delayed_cleanup_st *st = st_;
852    err = cap_destroy(st->capref);
853    if (err_no(err) == SYS_ERR_CAP_LOCKED) {
854        // exponential backoff
855        st->delay *= 2;
856        defer_free_owner_rx_cap(st);
857        return;
858    }
859    PANIC_IF_ERR(err, "cap cleanup after update_owner_rx");
860    free(st);
861}
862
863void
864update_owner__rx_handler(struct intermon_binding *b, intermon_caprep_t caprep, genvaddr_t st)
865{
866    errval_t err;
867    struct intermon_state *inter_st = (struct intermon_state*)b->st;
868    coreid_t from = inter_st->core_id;
869    struct capref capref;
870    struct capability cap;
871    caprep_to_capability(&caprep, &cap);
872
873    err = slot_alloc(&capref);
874    if (err_is_fail(err)) {
875        USER_PANIC_ERR(err, "failed to allocate slot for owner update");
876    }
877
878    err = monitor_copy_if_exists(&cap, capref);
879    if (err_is_ok(err)) {
880        err = monitor_set_cap_owner(cap_root, get_cap_addr(capref),
881                                    get_cap_level(capref), from);
882    }
883    if (err_no(err) == SYS_ERR_CAP_NOT_FOUND) {
884        err = SYS_ERR_OK;
885        slot_free(capref);
886        goto reply;
887    }
888
889    if (err_is_fail(err)) {
890        USER_PANIC_ERR(err, "failed to update cap ownership");
891    }
892
893    err = cap_destroy(capref);
894    if (err_no(err) == SYS_ERR_CAP_LOCKED) {
895        // ownership updates still in flight, delete cap later
896        struct delayed_cleanup_st *dst = malloc(sizeof(*dst));
897        assert(dst);
898        dst->capref = capref;
899        dst->ev = MKCLOSURE(free_owner_rx_cap, dst);
900        dst->delay = 1000; // 1ms delay
901        defer_free_owner_rx_cap(dst);
902    }
903
904reply:
905    err = owner_updated(from, st);
906    if (err_is_fail(err)) {
907        USER_PANIC_ERR(err, "failed to send ownership update response");
908    }
909}
910
911/*
912 * Send to all relations of cap {{{1
913 */
914
915errval_t
916capsend_copies(struct capability *cap,
917            capsend_send_fn send_fn,
918            struct capsend_mc_st *mc_st)
919{
920    DEBUG_CAPOPS("%s: doing broadcast\n", __FUNCTION__);
921    // this is currently just a broadcast
922    return capsend_broadcast(mc_st, NULL, cap, send_fn);
923}
924
925errval_t
926capsend_relations(struct capability *cap,
927                  capsend_send_fn send_fn,
928                  struct capsend_mc_st *mc_st,
929                  struct capsend_destset *dests)
930{
931    DEBUG_CAPOPS("%s: doing broadcast\n", __FUNCTION__);
932    // this is currently just a broadcast
933    return capsend_broadcast(mc_st, dests, cap, send_fn);
934}
935