1/*
2 * Copyright (c) 2012 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <barrelfish_kpi/distcaps.h>
12#include <if/intermon_defs.h>
13#include "monitor.h"
14#include "capops.h"
15#include "caplock.h"
16#include "capsend.h"
17#include "internal.h"
18
19struct cap_copy_rpc_st;
20
21typedef void (*copy_result_recv_fn_t)(errval_t, capaddr_t, uint8_t, cslot_t,
22                                      struct cap_copy_rpc_st*);
23static void
24recv_copy_result__src(errval_t status, capaddr_t capaddr, uint8_t level,
25                      cslot_t slot, struct cap_copy_rpc_st *rpc_st);
26static void
27recv_copy_result__fwd(errval_t status, capaddr_t capaddr, uint8_t level,
28                      cslot_t slot, struct cap_copy_rpc_st *rpc_st);
29
30/*
31 * RPC state {{{1
32 */
33
34struct cap_copy_rpc_st {
35    /// caller/sender st
36    genvaddr_t st;
37    /// sender if acting as intermediary
38    coreid_t from;
39    /// cap that is being copied out
40    struct capref cap;
41    /// copy destination
42    coreid_t to;
43    /// handler for receiving rpc result
44    copy_result_recv_fn_t recv_handler;
45    /// result handler if being called directly
46    copy_result_handler_t result_handler;
47    /// whether the local cap should be deleted when the rpc is complete
48    bool delete_after;
49    /// cap was last copy on request source (only relevant if delete_after is true)
50    bool is_last;
51};
52
53
54/*
55 * Copy result {{{1
56 */
57
58/**
59 * \brief Send state struct for recv_copy_result
60 */
61struct recv_copy_result_msg_st {
62    struct intermon_msg_queue_elem queue_elem;
63    errval_t status;
64    capaddr_t capaddr;
65    uint8_t level;
66    cslot_t slot;
67    genvaddr_t st;
68};
69
70/**
71 * \brief Intermon is ready to send recv_copy_result
72 */
73static void
74recv_copy_result_send__rdy(struct intermon_binding *b,
75                           struct intermon_msg_queue_elem *e)
76{
77    DEBUG_CAPOPS("recv_copy_result_send__rdy: %p, %p\n", b, e);
78    errval_t err;
79    struct recv_copy_result_msg_st *msg_st = (struct recv_copy_result_msg_st*)e;
80    err = intermon_capops_recv_copy_result__tx(b, NOP_CONT, msg_st->status,
81                                               msg_st->capaddr, msg_st->level,
82                                               msg_st->slot, msg_st->st);
83
84    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
85        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
86        struct intermon_state *inter_st = (struct intermon_state *)b->st;
87        // requeue send request at front and return
88        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
89                                             (struct msg_queue_elem *)e);
90        GOTO_IF_ERR(err, handle_err);
91        return;
92    }
93
94handle_err:
95    PANIC_IF_ERR(err, "failed to send recv_copy_result");
96    free(msg_st);
97}
98
99/**
100 * \brief A recv_copy_result needs to be enqueued
101 */
102static errval_t
103recv_copy_result__enq(coreid_t dest, errval_t status, capaddr_t capaddr,
104                      uint8_t level, cslot_t slot, genvaddr_t st)
105{
106    errval_t err;
107    DEBUG_CAPOPS("recv_copy_result__enq: ->%d, %s\n", dest, err_getstring(status));
108
109    // create send state
110    struct recv_copy_result_msg_st *msg_st = calloc(1, sizeof(struct recv_copy_result_msg_st));
111    if (!msg_st) {
112        return LIB_ERR_MALLOC_FAIL;
113    }
114    msg_st->queue_elem.cont = recv_copy_result_send__rdy;
115    msg_st->status = status;
116    msg_st->capaddr = capaddr;
117    msg_st->level = level;
118    msg_st->slot = slot;
119    msg_st->st = st;
120
121    // enqueue message
122    err = capsend_target(dest, (struct msg_queue_elem*)msg_st);
123    if (err_is_fail(err)) {
124        free(msg_st);
125        return err;
126    }
127
128    return SYS_ERR_OK;
129}
130
131/*
132 * Copy from owner to dest (possibly as intermediary) {{{1
133 */
134
135/**
136 * \brief Send state struct for owner_copy
137 */
138struct owner_copy_msg_st {
139    struct intermon_msg_queue_elem queue_elem;
140    intermon_caprep_t caprep;
141    uint8_t owner_relations;
142    genvaddr_t st;
143};
144
145/**
146 * \brief Intermon is ready to send owner_copy
147 */
148static void
149owner_copy_send__rdy(struct intermon_binding *b,
150                     struct intermon_msg_queue_elem *e)
151{
152    DEBUG_CAPOPS("owner_copy_send__rdy: ->%p, %p\n", b, e);
153    struct owner_copy_msg_st *msg_st = (struct owner_copy_msg_st*)e;
154    struct cap_copy_rpc_st *rpc_st = (struct cap_copy_rpc_st*)((lvaddr_t)msg_st->st);
155    assert(rpc_st);
156    errval_t err;
157
158    err = intermon_capops_recv_copy__tx(b, NOP_CONT, msg_st->caprep,
159                                        msg_st->owner_relations, msg_st->st);
160
161    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
162        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
163        struct intermon_state *inter_st = (struct intermon_state *)b->st;
164        // requeue send request at front and return
165        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
166                                             (struct msg_queue_elem *)e);
167        GOTO_IF_ERR(err, handle_err);
168        return;
169    }
170
171handle_err:
172    if (err_is_fail(err)) {
173        // send failed, report result
174        rpc_st->recv_handler(err, 0, 0, 0, rpc_st);
175    }
176
177    free(msg_st);
178    msg_st = NULL;
179}
180
181/**
182 * \brief An owner_copy is to be enqueued
183 */
184static void
185owner_copy__enq(struct capref capref, struct capability *cap, coreid_t from,
186                coreid_t dest, bool give_away,
187                copy_result_handler_t result_handler, genvaddr_t st)
188{
189    DEBUG_CAPOPS("owner_copy__enq: %d->%d, give_away=%d\n", from, dest, give_away);
190    errval_t err = SYS_ERR_OK, err2;
191
192    // create new rpc state to associate return message
193    struct cap_copy_rpc_st *rpc_st;
194    err = calloce(1, sizeof(struct cap_copy_rpc_st), &rpc_st);
195    if (err_is_fail(err)) {
196        // rpc_st hasn't been set up so we have to do failure handling manually
197        if (from == my_core_id) {
198            result_handler(err, 0, 0, 0, (void*)(lvaddr_t)st);
199        }
200        else {
201            err2 = recv_copy_result__enq(from, err, 0, 0, 0, st);
202            PANIC_IF_ERR2(err2, "failed to send recv_copy_result",
203                          err, "allocating rpc forwarding state on owner");
204        }
205        return;
206    }
207    rpc_st->st = st;
208    rpc_st->from = from;
209    rpc_st->cap = capref;
210    rpc_st->to = dest;
211    rpc_st->delete_after = give_away;
212    rpc_st->is_last = false;
213    rpc_st->recv_handler = (from == my_core_id ?
214                            recv_copy_result__src :
215                            recv_copy_result__fwd);
216    rpc_st->result_handler = result_handler;
217
218    uint8_t remote_relations = RRELS_COPY_BIT;
219
220    // XXX: short-circuit out if cap we're sending is null cap
221    if (cap->type == ObjType_Null) {
222        rpc_st->cap = NULL_CAP;
223        goto null_shortcircuit;
224    }
225
226    // check for special handling of giving away last copy
227    if (rpc_st->delete_after) {
228        uint8_t relations = 0;
229        err = monitor_cap_has_relations(capref, RRELS_COPY_BIT, &relations);
230        PANIC_IF_ERR(err, "checking for local copies of give_away cap");
231        rpc_st->is_last = !(relations & RRELS_COPY_BIT);
232        remote_relations = relations;
233    }
234    if (rpc_st->is_last) {
235        uint8_t relations = 0;
236        err = monitor_remote_relations(capref, 0, 0, &relations);
237        if (err_is_fail(err)) {
238            DEBUG_ERR(err, "checking for remote copies of give_away cap");
239            rpc_st->is_last = false;
240            remote_relations |= RRELS_COPY_BIT;
241        }
242        else {
243            rpc_st->is_last = !(remote_relations & RRELS_COPY_BIT);
244            remote_relations |= relations;
245        }
246    }
247
248    /*
249     * Here, we're handling the give_away parameter. If set, the copy operation
250     * is also responsible for deleting the input cap. In the case when the
251     * input cap has no other copies, this allows the copy operation to
252     * optimize transfer across cores by simultaneously passing over ownership
253     * (this is called a "true" give away below).
254     */
255
256    // unless we're performing a "true" give_away, set the remote relations
257    // copy bit
258    if (!(rpc_st->delete_after && rpc_st->is_last)) {
259        err = monitor_remote_relations(capref, RRELS_COPY_BIT, RRELS_COPY_BIT, NULL);
260        GOTO_IF_ERR(err, err_cont);
261    }
262
263    // if we're performing a "true" give_away, lock the cap locally as the
264    // intermediate state will be inconsistent
265    if (rpc_st->delete_after && rpc_st->is_last) {
266        struct domcapref domcapref = get_cap_domref(capref);
267        err = monitor_lock_cap(domcapref.croot, domcapref.cptr,
268                               domcapref.level);
269        // callers of owner_copy should already check cap lock state
270        PANIC_IF_ERR(err, "locking cap for true give_away failed");
271        assert(!(remote_relations & RRELS_COPY_BIT));
272    }
273
274    // create send state
275    struct owner_copy_msg_st *msg_st;
276null_shortcircuit:
277    err = calloce(1, sizeof(struct owner_copy_msg_st), &msg_st);
278    GOTO_IF_ERR(err, err_cont);
279
280    msg_st->queue_elem.cont = owner_copy_send__rdy;
281    capability_to_caprep(cap, &msg_st->caprep);
282    msg_st->owner_relations = remote_relations;
283    msg_st->st = (lvaddr_t)rpc_st;
284
285    // enqueue message
286    err = capsend_target(dest, (struct msg_queue_elem*)msg_st);
287    GOTO_IF_ERR(err, free_msg_st);
288
289    return;
290
291free_msg_st:
292    free(msg_st);
293
294err_cont:
295    rpc_st->recv_handler(err, 0, 0, 0, rpc_st);
296}
297
298/*
299 * Copy request from non-owner to owner {{{1
300 */
301
302/**
303 * \brief Send state struct for request_copy
304 */
305struct request_copy_msg_st {
306    struct intermon_msg_queue_elem queue_elem;
307    intermon_caprep_t caprep;
308    coreid_t dest;
309    struct cap_copy_rpc_st *st;
310};
311
312/**
313 * \brief Intermon is ready to send request_copy
314 */
315static void
316request_copy_send__rdy(struct intermon_binding *b,
317                       struct intermon_msg_queue_elem *e)
318{
319    errval_t err;
320    struct request_copy_msg_st *msg_st = (struct request_copy_msg_st*)e;
321    err = intermon_capops_request_copy__tx(b, NOP_CONT, msg_st->dest,
322                                           msg_st->caprep,
323                                           (lvaddr_t)msg_st->st);
324
325    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
326        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
327        struct intermon_state *inter_st = (struct intermon_state *)b->st;
328        // requeue send request at front and return
329        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
330                                             (struct msg_queue_elem *)e);
331        GOTO_IF_ERR(err, handle_err);
332        return;
333    }
334
335handle_err:
336    if (err_is_fail(err)) {
337        assert(msg_st->st);
338        struct cap_copy_rpc_st *rpc_st = (struct cap_copy_rpc_st*)msg_st->st;
339        rpc_st->recv_handler(err, 0, 0, 0, rpc_st);
340    }
341
342    free(msg_st);
343}
344
345/**
346 * \brief A copy request is to be enqueued
347 */
348static void
349request_copy__enq(struct capref capref, coreid_t dest, bool give_away,
350                  copy_result_handler_t result_handler, void *st)
351{
352    errval_t err = SYS_ERR_OK;
353    struct capability cap;
354    err = monitor_cap_identify(capref, &cap);
355    GOTO_IF_ERR(err, cont);
356
357    // cap is foreign so it must be a type that needs "locality" on a particular core
358    assert(distcap_needs_locality(cap.type));
359
360    // create new rpc state to associate return message
361    struct cap_copy_rpc_st *rpc_st;
362    err = calloce(1, sizeof(struct cap_copy_rpc_st), &rpc_st);
363    if (err_is_fail(err)) {
364        result_handler(err, 0, 0, 0, st);
365        return;
366    }
367    rpc_st->st = (lvaddr_t)st;
368    rpc_st->from = my_core_id;
369    rpc_st->delete_after = give_away;
370    rpc_st->is_last = false;
371    rpc_st->recv_handler = recv_copy_result__src;
372    rpc_st->result_handler = result_handler;
373
374    // create send state
375    struct request_copy_msg_st *msg_st;
376    err = calloce(1, sizeof(struct request_copy_msg_st), &msg_st);
377    GOTO_IF_ERR(err, free_rpc_st);
378    msg_st->queue_elem.cont = request_copy_send__rdy;
379    msg_st->dest = dest;
380    capability_to_caprep(&cap, &msg_st->caprep);
381    msg_st->st = rpc_st;
382
383    // enqueue message
384    err = capsend_owner(get_cap_domref(capref), (struct msg_queue_elem*)msg_st);
385    GOTO_IF_ERR(err, free_msg_st);
386
387    return;
388
389free_msg_st:
390    free(msg_st);
391
392free_rpc_st:
393    free(rpc_st);
394
395cont:
396    result_handler((err), 0, 0, 0, st);
397}
398
399/*
400 * Receive copy result {{{1
401 */
402
403static void
404recv_copy_result__fwd(errval_t status, capaddr_t capaddr, uint8_t level,
405                      cslot_t slot, struct cap_copy_rpc_st *rpc_st)
406{
407    // acting as intermediary, forward to origin
408    errval_t err;
409
410    assert(!rpc_st->result_handler); // must not have a result handler
411    assert(rpc_st->delete_after); // also temp cap should be del'd
412    assert(!rpc_st->is_last); // and must have other local copies
413
414    err = cap_destroy(rpc_st->cap);
415    PANIC_IF_ERR(err, "destroying temp cap for f-to-f copy");
416    rpc_st->cap = NULL_CAP;
417    err = recv_copy_result__enq(rpc_st->from, status, capaddr, level, slot,
418                                rpc_st->st);
419    PANIC_IF_ERR2(err, "failed to send recv_copy_result",
420                  status, "sending copy from owner");
421
422    free(rpc_st);
423}
424
425static void
426recv_copy_result__src(errval_t status, capaddr_t capaddr, uint8_t level,
427                      cslot_t slot, struct cap_copy_rpc_st *rpc_st)
428{
429    DEBUG_CAPOPS("recv_copy_result__src: %s\n", err_getstring(status));
430    // origin of copy
431    errval_t err;
432
433    if (err_is_ok(status) && !capref_is_null(rpc_st->cap)) {
434        if (rpc_st->delete_after) {
435            DEBUG_CAPOPS("deleting our copy\n");
436            if (rpc_st->is_last) {
437                DEBUG_CAPOPS("our copy is last\n");
438                // a give_away was performed, need to unlock and set new owner
439                err = monitor_set_cap_owner(cap_root,
440                                            get_cap_addr(rpc_st->cap),
441                                            get_cap_level(rpc_st->cap),
442                                            rpc_st->to);
443                PANIC_IF_ERR(err, "updating owner after true"
444                             " give_away failed");
445                caplock_unlock(get_cap_domref(rpc_st->cap));
446            }
447            // this should always succeed either because there are local
448            // copies or because the cap is now foreign
449            DEBUG_CAPOPS("%s st=%p, cap = %#"PRIxCADDR"\n", __FUNCTION__,
450                    rpc_st, get_cap_addr(rpc_st->cap));
451            err = cap_destroy(rpc_st->cap);
452            PANIC_IF_ERR(err, "cap_destroy after give_away failed (st=%p)", rpc_st);
453        } else if (err_is_ok(status)) {
454            debug_printf("copy succeeded but we're leaving monitor's copy alone?\n");
455        }
456    }
457    // call result handler
458    DEBUG_CAPOPS("result_handler: %p\n", rpc_st->result_handler);
459    if (rpc_st->result_handler) {
460        rpc_st->result_handler(status, capaddr, level, slot, (void*)((lvaddr_t)rpc_st->st));
461    }
462
463    free(rpc_st);
464}
465
466/**
467 * \brief Result from sending copy to dest has been received
468 */
469void
470recv_copy_result__rx(struct intermon_binding *b, errval_t status,
471                     capaddr_t capaddr, uint8_t level, cslot_t slot,
472                     genvaddr_t st)
473{
474    assert(st);
475    DEBUG_CAPOPS("recv_copy_result__rx: %p, st=%p, %s\n", b, (void*)st,
476            err_getstring(status));
477    struct cap_copy_rpc_st *rpc_st = (struct cap_copy_rpc_st*)((lvaddr_t)st);
478    rpc_st->recv_handler(status, capaddr, level, slot, rpc_st);
479}
480
481/*
482 * Receive cap copy {{1
483 */
484
485/**
486 * \brief A cap has been received
487 */
488void
489recv_copy__rx(struct intermon_binding *b, intermon_caprep_t caprep,
490              uint8_t owner_relations, genvaddr_t st)
491{
492    DEBUG_CAPOPS("recv_copy__rx: %p, %"PRIu8"\n", b, owner_relations);
493    debug_print_caprep(&caprep);
494    errval_t err, err2;
495    struct intermon_state *inter_st = (struct intermon_state*)b->st;
496    coreid_t from = inter_st->core_id;
497    assert(from != my_core_id);
498    struct capref dest = NULL_CAP;
499    struct capability cap;
500
501    caprep_to_capability(&caprep, &cap);
502
503    if (cap.type == ObjType_Null) {
504        // short-circuit null-cap transfer
505        err = SYS_ERR_OK;
506        goto zero_slot;
507    }
508
509    // get a slot to put the cap
510    err = slot_alloc(&dest);
511    GOTO_IF_ERR(err, zero_slot);
512
513    coreid_t owner;
514    if (distcap_needs_locality(cap.type)) {
515        if (owner_relations & RRELS_COPY_BIT) {
516            // if cap needs locality and ownership is not being transferred,
517            // message source is owner
518            owner = from;
519        }
520        else {
521            // if there are no remote copies, ownership is being transferred to
522            // this core
523            owner = my_core_id;
524        }
525    }
526    else {
527        // otherwise every core is owner
528        owner = my_core_id;
529    }
530
531    // create a cap from the cap data and owner. may fail if given owner does
532    // not match owner of existing copies
533    err = monitor_cap_create(dest, &cap, owner);
534    GOTO_IF_ERR(err, free_slot);
535
536    // if we are now owner we need to set up the relations
537    if (owner == my_core_id && owner_relations) {
538        err = monitor_remote_relations(dest, owner_relations, ~(uint8_t)0, NULL);
539        PANIC_IF_ERR(err, "setting remote rels on copy recv with ownership");
540    }
541
542    goto send_result;
543
544free_slot:
545    err2 = slot_free(dest);
546    if (err_is_fail(err2)) {
547        DEBUG_ERR(err2, "slot_free failed, slot will leak");
548    }
549
550zero_slot:
551    dest = NULL_CAP;
552
553send_result:
554    err2 = recv_copy_result__enq(from, err, get_cnode_addr(dest),
555                                 get_cnode_level(dest), dest.slot, st);
556    if (err_is_fail(err2)) {
557        USER_PANIC_ERR(err2, "recv_copy_result enque failed, cap will leak");
558    }
559}
560
561/**
562 * \brief A copy request has been received
563 */
564void
565request_copy__rx(struct intermon_binding *b, coreid_t dest,
566                 intermon_caprep_t caprep, genvaddr_t st)
567{
568    errval_t err, err2;
569    struct intermon_state *inter_st = (struct intermon_state*)b->st;
570    coreid_t from = inter_st->core_id;
571    assert(from != my_core_id);
572    struct capref capref = NULL_CAP;
573    memset(&capref, 0, sizeof(capref));
574    struct capability cap;
575    caprep_to_capability(&caprep, &cap);
576    distcap_state_t state;
577
578    // copy requests should never happen for types that don't need locality,
579    // since every core is owner and can copy directly
580    assert(distcap_needs_locality(cap.type));
581
582    // find and validate cap
583    err = slot_alloc(&capref);
584    GOTO_IF_ERR(err, send_err);
585    err = monitor_copy_if_exists(&cap, capref);
586    GOTO_IF_ERR(err, free_slot);
587    err = cap_get_state(capref, &state);
588    GOTO_IF_ERR(err, destroy_cap);
589    if (distcap_state_is_foreign(state)) {
590        err = MON_ERR_CAP_FOREIGN;
591        goto destroy_cap;
592    }
593    if (distcap_state_is_busy(state)) {
594        err = MON_ERR_REMOTE_CAP_RETRY;
595        goto destroy_cap;
596    }
597
598    if (dest == my_core_id) {
599        // tried to send copy to owning core, success!
600        err = recv_copy_result__enq(from, SYS_ERR_OK, get_cnode_addr(capref),
601                                    get_cnode_level(capref), capref.slot,
602                                    st);
603        PANIC_IF_ERR(err, "sending result to request_copy sender");
604    }
605    else {
606        // forward copy to destination core
607        DEBUG_CAPOPS("%s: owner_copy__enq(st=%p) capref = %#"PRIxCADDR"\n",
608            __FUNCTION__, (void *)st, get_cap_addr(capref));
609        owner_copy__enq(capref, &cap, from, dest, true, NULL, st);
610    }
611
612    return;
613
614destroy_cap:
615    err2 = cap_delete(capref);
616    PANIC_IF_ERR2(err2, "deleting received cap",
617                  err, "handling copy request");
618
619free_slot:
620    err2 = slot_free(capref);
621    PANIC_IF_ERR2(err2, "freeing slot for cap recv",
622                  err, "handling copy request");
623
624send_err:
625    err2 = recv_copy_result__enq(from, err, 0, 0, 0, st);
626    PANIC_IF_ERR2(err2, "sending error to request_copy sender",
627                  err, "handling copy request");
628}
629
630/*
631 * Copy operation {{{1
632 */
633
634void
635capops_copy(struct capref capref, coreid_t dest, bool give_away,
636            copy_result_handler_t result_handler, void *st)
637{
638#if defined(DEBUG_MONITOR_CAPOPS)
639    char buf[256];
640    debug_print_capref(buf, 256, capref);
641    DEBUG_CAPOPS("capops_copy: dest=%d, give_away=%d, capref=%s\n", dest, give_away, buf);
642#endif
643    errval_t err, err2;
644    struct capability cap;
645    distcap_state_t state;
646
647    // short-circuit out with null capref
648    if (capref_is_null(capref)) {
649        memset(&cap, sizeof(cap), 0);
650        cap.type = ObjType_Null;
651        DEBUG_CAPOPS("%s: owner_copy__enq(st=%p) capref = %#"PRIxCADDR"\n",
652            __FUNCTION__, st, get_cap_addr(capref));
653        owner_copy__enq(capref, &cap, my_core_id, dest, give_away,
654                result_handler, (lvaddr_t)st);
655        return;
656    }
657
658    // check that cap is valid
659    err = cap_get_state(capref, &state);
660    GOTO_IF_ERR(err, err_cont);
661    if (distcap_state_is_busy(state)) {
662        err = MON_ERR_REMOTE_CAP_RETRY;
663        goto err_cont;
664    }
665
666    // get internal cap representation
667    err = monitor_cap_identify(capref, &cap);
668    GOTO_IF_ERR(err, err_cont);
669
670    if (dest == my_core_id && !(cap.type == ObjType_Null)) {
671        // tried to send to self, just create a local copy
672        struct capref res;
673        err = slot_alloc(&res);
674        GOTO_IF_ERR(err, err_cont);
675
676        err = cap_copy(capref, res);
677        if (err_is_fail(err)) {
678            err2 = slot_free(res);
679            PANIC_IF_ERR(err2, "while freeing slot due to local copy failure");
680            goto err_cont;
681        }
682
683        if (give_away) {
684            err2 = cap_delete(capref);
685            DEBUG_IF_ERR(err2, "delete for give away failed, cap will leak");
686        }
687
688        result_handler(err, get_cnode_addr(res),
689                       get_cnode_level(res), res.slot, st);
690    } else if (distcap_state_is_foreign(state) && distcap_needs_locality(cap.type)) {
691        DEBUG_CAPOPS("capops_copy: sending copy from non-owner, forward request to owner\n");
692
693        // sending copy from non-owner, send copy request to owner
694        DEBUG_CAPOPS("%s: request_copy__enq(st=%p) capref = %#"PRIxCADDR"\n",
695            __FUNCTION__, st, get_cap_addr(capref));
696        request_copy__enq(capref, dest, give_away, result_handler, st);
697    } else {
698        DEBUG_CAPOPS("capops_copy: sending copy from here/owner\n");
699
700        // sending copy from here/owner
701        DEBUG_CAPOPS("%s: owner_copy__enq(st=%p) capref = %#"PRIxCADDR"\n",
702            __FUNCTION__, st, get_cap_addr(capref));
703        owner_copy__enq(capref, &cap, my_core_id, dest, give_away,
704                        result_handler, (lvaddr_t)st);
705    }
706
707    return;
708
709err_cont:
710    result_handler(err, 0, 0, 0, st);
711}
712