1/*
2 * Copyright (c) 2012, 2016 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <barrelfish/cap_predicates.h>
12#include "monitor.h"
13#include "capops.h"
14#include "capsend.h"
15#include "caplock.h"
16#include "internal.h"
17#include "delete_int.h"
18#include "dom_invocations.h"
19#include "monitor_debug.h"
20
21
22struct revoke_slave_st *slaves_head = 0, *slaves_tail = 0;
23
24struct revoke_master_st {
25    struct delete_queue_node del_qn;
26    struct domcapref cap;
27    struct capability rawcap;
28    struct capsend_mc_st revoke_mc_st;
29    struct capsend_destset dests;
30    revoke_result_handler_t result_handler;
31    size_t pending_agreements;
32    void *st;
33    bool local_fin, remote_fin;
34};
35
36struct revoke_slave_st {
37    struct intermon_msg_queue_elem im_qn;
38    struct delete_queue_node del_qn;
39    struct capability rawcap;
40    struct capref cap;
41    coreid_t from;
42    genvaddr_t st;
43    errval_t status;
44    size_t pending_agreements;
45    struct revoke_slave_st *next;
46    uint64_t seqnum;
47};
48
49static void revoke_result__rx(errval_t result,
50                              struct revoke_master_st *st,
51                              bool locked);
52static void revoke_retrieve__rx(errval_t result, void *st_);
53static void revoke_local(struct revoke_master_st *st);
54static void revoke_no_remote(struct revoke_master_st *st);
55static errval_t revoke_mark__send(struct intermon_binding *b,
56                                  intermon_caprep_t *caprep,
57                                  struct capsend_mc_st *mc_st);
58static void revoke_ready__send(struct intermon_binding *b,
59                               struct intermon_msg_queue_elem *e);
60static errval_t revoke_commit__send(struct intermon_binding *b,
61                                    intermon_caprep_t *caprep,
62                                    struct capsend_mc_st *mc_st);
63static void revoke_slave_steps__fin(void *st);
64static void revoke_done__send(struct intermon_binding *b,
65                              struct intermon_msg_queue_elem *e);
66static void revoke_master_steps__fin(void *st);
67//static errval_t capops_revoke_subscribe()
68
69static uint64_t revoke_seqnum = 0;
70
71
72struct revoke_register_st
73{
74    struct monitor_client_req req;
75    struct revoke_register_st *next, **prev_next;
76    struct monitor_binding *subscriber;
77    genpaddr_t base;
78    genpaddr_t limit;
79    bool acked;
80    bool notified;
81    struct event_closure cont;
82};
83
84struct revoke_register_st *revoke_subs = NULL;
85
86static inline void revoke_subs_add(struct revoke_register_st  *rvk_st)
87{
88    rvk_st->next = revoke_subs;
89
90    if (revoke_subs) {
91        revoke_subs->prev_next = &rvk_st->next;
92    }
93    rvk_st->prev_next = &revoke_subs;
94    revoke_subs = rvk_st;
95}
96
97static inline void revoke_subs_remove(struct revoke_register_st  *rvk_st)
98{
99    if (rvk_st->next) {
100        rvk_st->next->prev_next = rvk_st->prev_next;
101    }
102
103    *rvk_st->prev_next = rvk_st->next;
104}
105
106static inline struct revoke_register_st *revoke_subs_lookup_by_id(uintptr_t id)
107{
108    struct revoke_register_st *rvk_st = revoke_subs;
109    while(rvk_st) {
110        if (rvk_st->req.reqid == id) {
111            return rvk_st;
112        }
113        rvk_st = rvk_st->next;
114    }
115}
116
117static struct revoke_register_st *
118revoke_subs_remove_by_range(genpaddr_t base, genpaddr_t limit,
119                            struct revoke_register_st *curr)
120{
121    if (curr == NULL) {
122        curr = revoke_subs;
123    } else {
124        curr = curr->next;
125    }
126
127    struct revoke_register_st *rvk_st = curr;
128    while(rvk_st) {
129        if (rvk_st->base <= base && rvk_st->limit >= limit) {
130            assert(rvk_st->prev_next);
131            *rvk_st->prev_next = rvk_st->next;
132            return rvk_st;
133        }
134        rvk_st = rvk_st->next;
135    }
136    return NULL;
137}
138
139
140static void cap_revoke_response(struct monitor_binding *sub, uintptr_t id)
141{
142    struct monitor_state *mst = sub->st;
143
144    if (mst->reqs == NULL) {
145        DEBUG_CAPOPS("Received message but no outstanding requests???");
146        assert(mst->reqs == NULL);
147    }
148
149    struct monitor_client_req *reqs = mst->reqs;
150    struct monitor_client_req **prev_next = &mst->reqs;
151    while(reqs) {
152        if (reqs->reqid == id) {
153            *prev_next = reqs->next;
154            break;
155        }
156        prev_next = &reqs->next;
157        reqs = reqs->next;
158    }
159
160    struct revoke_register_st *rvk_st = (struct revoke_register_st *)reqs;
161    rvk_st->cont.handler(rvk_st);
162}
163
164errval_t capops_revoke_register_subscribe(struct capability *cap, uintptr_t id,
165                                          struct monitor_binding *subscriber)
166{
167    assert(cap);
168    assert(subscriber);
169
170    struct revoke_register_st *rvk_st = calloc(1, sizeof(*rvk_st));
171    if (rvk_st == NULL) {
172        return LIB_ERR_MALLOC_FAIL;
173    }
174
175    rvk_st->req.reqid = id;
176    rvk_st->subscriber = subscriber;
177
178    subscriber->rx_vtbl.cap_revoke_response = cap_revoke_response;
179
180    /* set the ranges */
181    rvk_st->base = get_address(cap);
182    rvk_st->limit = rvk_st->base + get_size(cap) - 1;
183
184    DEBUG_CAPOPS("%s:%u: cap=[%" PRIxGENPADDR "..%" PRIxGENPADDR "], id=%"
185                  PRIuPTR " sub=%p\n", __FUNCTION__, __LINE__,
186                 rvk_st->base, rvk_st->limit, id, subscriber);
187
188     /* add it to the subscribers */
189    revoke_subs_add(rvk_st);
190
191    return SYS_ERR_OK;
192}
193
194
195
196static void
197revoke_agreement_request_cont(struct monitor_binding *b,
198                              struct revoke_register_st *st)
199{
200
201    errval_t err = b->tx_vtbl.cap_revoke_request(b, NOP_CONT, 0, st->req.reqid);
202    assert(err_is_ok(err));
203}
204
205static void revoke_master_cont(void *arg)
206{
207    errval_t err;
208    struct revoke_register_st *st = arg;
209    struct revoke_master_st *rvk_st = st->cont.arg;
210
211    free(arg);
212
213    rvk_st->pending_agreements--;
214    if (rvk_st->pending_agreements) {
215        return;
216    }
217
218    /* continue with the protocol */
219    DEBUG_CAPOPS("%s ## revocation: commit phase\n", __FUNCTION__);
220    err = capsend_relations(&rvk_st->rawcap, revoke_commit__send,
221                            &rvk_st->revoke_mc_st, &rvk_st->dests);
222    PANIC_IF_ERR(err, "enqueing revoke_commit multicast");
223
224    delete_steps_resume();
225
226    struct event_closure steps_fin_cont
227            = MKCLOSURE(revoke_master_steps__fin, rvk_st);
228    delete_queue_wait(&rvk_st->del_qn, steps_fin_cont);
229
230}
231
232static void revoke_slave_cont(void *arg)
233{
234    errval_t err;
235    struct revoke_register_st *st = arg;
236    struct revoke_slave_st *rvk_st = st->cont.arg;
237
238    free(arg);
239
240    rvk_st->pending_agreements--;
241    if (rvk_st->pending_agreements) {
242        return;
243    }
244
245    DEBUG_CAPOPS("### %s:%u continue with the protocol\n",
246                 __FUNCTION__, __LINE__);
247
248    /* continue with the protocol */
249    rvk_st->im_qn.cont = revoke_ready__send;
250    err = capsend_target(rvk_st->from, (struct msg_queue_elem*)rvk_st);
251    PANIC_IF_ERR(err, "enqueing revoke_ready");
252}
253
254/*
255 * TODO: the following two functions essentially do the same thing, but with
256 *       a different revoke state (slave or master)
257 */
258
259static bool capops_revoke_requires_agreement_local(struct revoke_master_st *rvk_st)
260{
261
262    if (revoke_subs == NULL) {
263        return false;
264    }
265
266    genpaddr_t base = get_address(&rvk_st->rawcap);
267    gensize_t limit = base + get_size(&rvk_st->rawcap) - 1;
268
269    DEBUG_CAPOPS("%s:%u: cap=[%" PRIxGENPADDR "..%" PRIxGENPADDR "]\n",
270                 __FUNCTION__, __LINE__, base, limit);
271
272    struct revoke_register_st *st = revoke_subs_remove_by_range(base, limit, NULL);
273    if (st == NULL) {
274        DEBUG_CAPOPS("### %s:%u no matching request\n",
275                     __FUNCTION__, __LINE__);
276        return false;
277    }
278
279    while(st != NULL) {
280        if (st->notified) {
281            /* don't notify two times, shouldn't actually happen */
282            st = revoke_subs_remove_by_range(base, limit, st);
283            continue;
284        }
285
286        struct monitor_binding *b = st->subscriber;
287        struct monitor_state *mst = b->st;
288
289        st->req.next = mst->reqs;
290        mst->reqs = &st->req;
291        st->notified = true;
292        st->cont.arg = rvk_st;
293        st->cont.handler = revoke_master_cont;
294        rvk_st->pending_agreements++;
295        revoke_agreement_request_cont(b, st);
296
297        st = revoke_subs_remove_by_range(base, limit, st);
298    }
299
300    return true;
301}
302
303
304static bool capops_revoke_requires_agreement_relations(struct revoke_slave_st *rvk_st)
305{
306    if (revoke_subs == NULL) {
307        return false;
308    }
309
310    genpaddr_t base = get_address(&rvk_st->rawcap);
311    gensize_t limit = base + get_size(&rvk_st->rawcap) - 1;
312
313
314    DEBUG_CAPOPS("%s:%u: cap=[%" PRIxGENPADDR "..%" PRIxGENPADDR "]\n",
315                 __FUNCTION__, __LINE__, base, limit);
316
317    struct revoke_register_st *st = revoke_subs_remove_by_range(base, limit, NULL);
318    if (st == NULL) {
319        DEBUG_CAPOPS("### %s:%u no matching request\n",
320                     __FUNCTION__, __LINE__);
321        return false;
322    }
323
324    while(st != NULL) {
325        if (st->notified) {
326            /* don't notify two times, should'nt actually happen */
327            st = revoke_subs_remove_by_range(base, limit, st);
328            continue;
329        }
330
331        struct monitor_binding *b = st->subscriber;
332        struct monitor_state *mst = b->st;
333
334        st->req.next = mst->reqs;
335        mst->reqs = &st->req;
336        st->notified = true;
337        st->cont.arg = rvk_st;
338        st->cont.handler = revoke_slave_cont;
339        rvk_st->pending_agreements++;
340        revoke_agreement_request_cont(b, st);
341
342        st = revoke_subs_remove_by_range(base, limit, st);
343    }
344
345    return true;
346}
347
348void
349capops_revoke(struct domcapref cap,
350              revoke_result_handler_t result_handler,
351              void *st)
352{
353    errval_t err;
354
355    TRACE(CAPOPS, REVOKE_START, ++revoke_seqnum);
356    DEBUG_CAPOPS("%s ## start revocation protocol\n", __FUNCTION__);
357
358    distcap_state_t state;
359    err = dom_cnode_get_state(cap, &state);
360    GOTO_IF_ERR(err, report_error);
361
362    if (distcap_state_is_busy(state)) {
363        DEBUG_CAPOPS("%s MON_ERR_REMOTE_CAP_RETRY\n", __FUNCTION__);
364        err = MON_ERR_REMOTE_CAP_RETRY;
365        goto report_error;
366    }
367
368    struct revoke_master_st *rst;
369    err = calloce(1, sizeof(*rst), &rst);
370    GOTO_IF_ERR(err, report_error);
371    rst->cap = cap;
372    err = monitor_domains_cap_identify(cap.croot, cap.cptr, cap.level, &rst->rawcap);
373    GOTO_IF_ERR(err, free_st);
374    rst->result_handler = result_handler;
375    rst->st = st;
376
377    if (distcap_state_is_foreign(state)) {
378        // need to retrieve ownership
379        TRACE(CAPOPS, REVOKE_RETRIEVE, 0);
380        DEBUG_CAPOPS("%s getting cap ownership\n", __FUNCTION__);
381        capops_retrieve(rst->cap, revoke_retrieve__rx, rst);
382    }
383    else {
384        if (num_monitors_ready_for_capops() == 1) {
385            DEBUG_CAPOPS("%s: only one monitor: do simpler revoke\n",
386                    __FUNCTION__);
387            // no remote monitors exist; do simplified revocation process
388            revoke_no_remote(rst);
389            // return here
390            return;
391        }
392        // have ownership, initiate revoke
393        revoke_local(rst);
394    }
395
396    return;
397
398free_st:
399    free(rst);
400
401report_error:
402    TRACE(CAPOPS, REVOKE_CALL_RESULT, revoke_seqnum);
403    result_handler(err, st);
404}
405
406static void
407revoke_result__rx(errval_t result,
408                  struct revoke_master_st *st,
409                  bool locked)
410{
411    TRACE(CAPOPS, REVOKE_RESULT_RX, 0);
412    DEBUG_CAPOPS("%s\n", __FUNCTION__);
413    errval_t err;
414
415    if (locked) {
416        caplock_unlock(st->cap);
417    }
418
419    if (err_is_ok(result)) {
420        // clear the remote copies bit
421        err = monitor_domcap_remote_relations(st->cap.croot, st->cap.cptr,
422                                              st->cap.level, 0, RRELS_COPY_BIT,
423                                              NULL);
424        if (err_is_fail(err) && err_no(err) != SYS_ERR_CAP_NOT_FOUND) {
425            DEBUG_ERR(err, "resetting remote copies bit after revoke");
426        }
427    }
428
429    DEBUG_CAPOPS("%s ## revocation completed, calling %p\n", __FUNCTION__,
430                 st->result_handler);
431
432    err = cap_destroy(st->cap.croot);
433    PANIC_IF_ERR(err, "deleting monitor's copy of rootcn");
434    TRACE(CAPOPS, REVOKE_CALL_RESULT, revoke_seqnum);
435    st->result_handler(result, st->st);
436    free(st);
437}
438
439static void
440revoke_retrieve__rx(errval_t result, void *st_)
441{
442    TRACE(CAPOPS, REVOKE_RETRIEVE_RX, 0);
443    struct revoke_master_st *st = (struct revoke_master_st*)st_;
444
445    if (err_is_fail(result)) {
446        revoke_result__rx(result, st, false);
447    }
448    else {
449
450#ifndef NDEBUG
451        distcap_state_t state;
452        errval_t err = dom_cnode_get_state(st->cap, &state);
453        PANIC_IF_ERR(err, "dom_cnode_get_state");
454        assert(!distcap_state_is_foreign(state));
455#endif
456        revoke_local(st);
457    }
458}
459
460static void
461revoke_local(struct revoke_master_st *st)
462{
463    TRACE(CAPOPS, REVOKE_LOCAL, 0);
464    DEBUG_CAPOPS("%s: called from %p\n", __FUNCTION__,
465            __builtin_return_address(0));
466    errval_t err;
467
468    delete_steps_pause();
469
470    err = monitor_revoke_mark_target(st->cap.croot,
471                                     st->cap.cptr,
472                                     st->cap.level);
473    PANIC_IF_ERR(err, "marking revoke");
474
475    TRACE(CAPOPS, REVOKE_DO_MARK, 0);
476    DEBUG_CAPOPS("%s ## revocation: mark phase\n", __FUNCTION__);
477    // XXX: could check whether remote copies exist here(?), -SG, 2014-11-05
478    err = capsend_relations(&st->rawcap, revoke_mark__send,
479            &st->revoke_mc_st, &st->dests);
480    PANIC_IF_ERR(err, "initiating revoke mark multicast");
481}
482
483static void
484revoke_no_remote(struct revoke_master_st *st)
485{
486    TRACE(CAPOPS, REVOKE_NO_REMOTE, 0);
487    assert(num_monitors_ready_for_capops() == 1);
488
489    if (!delete_steps_get_waitset()) {
490        delete_steps_init(get_default_waitset());
491    }
492
493    errval_t err;
494    DEBUG_CAPOPS("%s\n", __FUNCTION__);
495
496    // pause deletion steps
497    DEBUG_CAPOPS("%s: delete_steps_pause()\n", __FUNCTION__);
498    delete_steps_pause();
499
500    // mark target of revoke
501    DEBUG_CAPOPS("%s: mon_revoke_mark_tgt()\n", __FUNCTION__);
502    err = monitor_revoke_mark_target(st->cap.croot,
503                                     st->cap.cptr,
504                                     st->cap.level);
505    PANIC_IF_ERR(err, "marking revoke");
506
507
508    // resume delete steps
509    DEBUG_CAPOPS("%s: delete_steps_resume()\n", __FUNCTION__);
510    delete_steps_resume();
511
512    // wait on delete queue, marking that remote cores are done
513    st->remote_fin = true;
514    DEBUG_CAPOPS("%s: delete_queue_wait()\n", __FUNCTION__);
515    struct event_closure steps_fin_cont
516        = MKCLOSURE(revoke_master_steps__fin, st);
517    delete_queue_wait(&st->del_qn, steps_fin_cont);
518}
519
520static errval_t
521revoke_mark__send(struct intermon_binding *b,
522                  intermon_caprep_t *caprep,
523                  struct capsend_mc_st *mc_st)
524{
525    struct intermon_state *ist = b->st;
526    TRACE(CAPOPS, REVOKE_MARK_SEND, ist->core_id);
527    struct revoke_master_st *st;
528    ptrdiff_t off = offsetof(struct revoke_master_st, revoke_mc_st);
529    st = (struct revoke_master_st*)((uintptr_t)mc_st - off);
530    return intermon_capops_revoke_mark__tx(b, NOP_CONT, *caprep, (lvaddr_t)st);
531}
532
533static uint64_t revoke_slave_seqnum = 0;
534void
535revoke_mark__rx(struct intermon_binding *b,
536                intermon_caprep_t caprep,
537                genvaddr_t st)
538{
539    TRACE(CAPOPS, REVOKE_MARK_RX, ++revoke_slave_seqnum);
540    DEBUG_CAPOPS("%s\n", __FUNCTION__);
541    errval_t err;
542    struct intermon_state *inter_st = (struct intermon_state*)b->st;
543
544    struct revoke_slave_st *rvk_st;
545    err = calloce(1, sizeof(*rvk_st), &rvk_st);
546    PANIC_IF_ERR(err, "allocating revoke slave state");
547
548    rvk_st->seqnum = revoke_slave_seqnum;
549    rvk_st->from = inter_st->core_id;
550    rvk_st->st = st;
551    caprep_to_capability(&caprep, &rvk_st->rawcap);
552
553    if (!slaves_head) {
554        assert(!slaves_tail);
555        slaves_head = slaves_tail = rvk_st;
556    }
557    else {
558        assert(slaves_tail);
559        assert(!slaves_tail->next);
560        slaves_tail->next = rvk_st;
561        slaves_tail = rvk_st;
562    }
563
564    // pause any ongoing "delete stepping" as mark phases on other nodes need
565    // to delete all foreign copies before we can delete locally owned caps
566    delete_steps_pause();
567
568    // XXX: this invocation could create a scheduling hole that could be
569    // problematic in RT systems and should probably be done in a loop.
570    err = monitor_revoke_mark_relations(&rvk_st->rawcap);
571    if (err_no(err) == SYS_ERR_CAP_NOT_FOUND) {
572        // found no copies or descendants of capability on this core,
573        // do nothing. -SG
574        DEBUG_CAPOPS("no copies on core %d\n", disp_get_core_id());
575    } else if (err_is_fail(err)) {
576        USER_PANIC_ERR(err, "marking revoke");
577    }
578
579    if (capops_revoke_requires_agreement_relations(rvk_st)) {
580        return;
581    }
582
583    rvk_st->im_qn.cont = revoke_ready__send;
584    err = capsend_target(rvk_st->from, (struct msg_queue_elem*)rvk_st);
585    PANIC_IF_ERR(err, "enqueing revoke_ready");
586}
587
588static void
589revoke_ready__send(struct intermon_binding *b,
590                   struct intermon_msg_queue_elem *e)
591{
592    errval_t err;
593    struct revoke_slave_st *rvk_st = (struct revoke_slave_st*)e;
594    TRACE(CAPOPS, REVOKE_READY_SEND, rvk_st->seqnum);
595    err = intermon_capops_revoke_ready__tx(b, NOP_CONT, rvk_st->st);
596
597    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
598        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
599        struct intermon_state *inter_st = (struct intermon_state *)b->st;
600        // requeue send request at front and return
601        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
602                                             (struct msg_queue_elem *)e);
603        GOTO_IF_ERR(err, handle_err);
604        return;
605    }
606
607handle_err:
608    PANIC_IF_ERR(err, "sending revoke_ready");
609}
610
611void
612revoke_ready__rx(struct intermon_binding *b, genvaddr_t st)
613{
614    struct intermon_state *ist = b->st;
615    TRACE(CAPOPS, REVOKE_READY_RX, ist->core_id);
616    DEBUG_CAPOPS("%s\n", __FUNCTION__);
617    errval_t err;
618
619    struct revoke_master_st *rvk_st = (struct revoke_master_st*)(lvaddr_t)st;
620    if (!capsend_handle_mc_reply(&rvk_st->revoke_mc_st)) {
621        DEBUG_CAPOPS("%s: waiting for remote cores\n", __FUNCTION__);
622        // multicast not complete
623        return;
624    }
625
626    TRACE(CAPOPS, REVOKE_DO_COMMIT, 0);
627    if (capops_revoke_requires_agreement_local(rvk_st)) {
628        return;
629    }
630
631    DEBUG_CAPOPS("%s ## revocation: commit phase\n", __FUNCTION__);
632    err = capsend_relations(&rvk_st->rawcap, revoke_commit__send,
633            &rvk_st->revoke_mc_st, &rvk_st->dests);
634    PANIC_IF_ERR(err, "enqueing revoke_commit multicast");
635
636    delete_steps_resume();
637
638    struct event_closure steps_fin_cont
639        = MKCLOSURE(revoke_master_steps__fin, rvk_st);
640    delete_queue_wait(&rvk_st->del_qn, steps_fin_cont);
641}
642
643static errval_t
644revoke_commit__send(struct intermon_binding *b,
645                    intermon_caprep_t *caprep,
646                    struct capsend_mc_st *mc_st)
647{
648    struct intermon_state *ist = b->st;
649    TRACE(CAPOPS, REVOKE_COMMIT_SEND, ist->core_id);
650    struct revoke_master_st *st;
651    ptrdiff_t off = offsetof(struct revoke_master_st, revoke_mc_st);
652    st = (struct revoke_master_st*)((char*)mc_st - off);
653    return intermon_capops_revoke_commit__tx(b, NOP_CONT, (lvaddr_t)st);
654}
655
656void
657revoke_commit__rx(struct intermon_binding *b,
658                  genvaddr_t st)
659{
660    assert(slaves_head);
661    assert(slaves_tail);
662    assert(!slaves_tail->next);
663
664    struct revoke_slave_st *rvk_st = slaves_head;
665    TRACE(CAPOPS, REVOKE_COMMIT_RX, rvk_st->seqnum);
666    while (rvk_st && rvk_st->st != st) { rvk_st = rvk_st->next; }
667    assert(rvk_st);
668
669    delete_steps_resume();
670
671    struct event_closure steps_fin_cont
672        = MKCLOSURE(revoke_slave_steps__fin, rvk_st);
673    delete_queue_wait(&rvk_st->del_qn, steps_fin_cont);
674}
675
676static void
677revoke_slave_steps__fin(void *st)
678{
679    errval_t err;
680    struct revoke_slave_st *rvk_st = (struct revoke_slave_st*)st;
681    TRACE(CAPOPS, REVOKE_SLAVE_STEPS_FIN, rvk_st->seqnum);
682
683    rvk_st->im_qn.cont = revoke_done__send;
684    err = capsend_target(rvk_st->from, (struct msg_queue_elem*)rvk_st);
685    PANIC_IF_ERR(err, "enqueueing revoke_done");
686}
687
688inline static void
689remove_slave_from_list(struct revoke_slave_st *rvk_st)
690{
691    // remove from slave list
692    if (slaves_head == slaves_tail) {
693        // only one element in list
694        if (rvk_st == slaves_head) {
695            // we're only, clear list
696            slaves_head = slaves_tail = 0;
697        } else {
698            // we're not the element in list??
699            printf("rvk_st: %p; head&tail: %p\n", rvk_st, slaves_head);
700        }
701    } else {
702        // more than one element in list
703        if (rvk_st == slaves_head) {
704            // we're first, remove from head of list
705            slaves_head=slaves_head->next;
706        } else {
707            // we're non-first
708            // find prev
709            struct revoke_slave_st *p = slaves_head;
710            for (;p&&p->next!=rvk_st;p=p->next);
711            // make sure we found prev of us
712            assert(p&&p->next==rvk_st);
713            // remove us
714            p->next = rvk_st->next;
715            if (rvk_st == slaves_tail) {
716                // we were last, set last to prev
717                slaves_tail = p;
718            }
719        }
720    }
721}
722
723static void
724revoke_done__send(struct intermon_binding *b,
725                  struct intermon_msg_queue_elem *e)
726{
727    errval_t err;
728    struct revoke_slave_st *rvk_st = (struct revoke_slave_st*)e;
729    err = intermon_capops_revoke_done__tx(b, NOP_CONT, rvk_st->st);
730
731    if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
732        DEBUG_CAPOPS("%s: got FLOUNDER_ERR_TX_BUSY; requeueing msg.\n", __FUNCTION__);
733        struct intermon_state *inter_st = (struct intermon_state *)b->st;
734        // requeue send request at front and return
735        err = intermon_enqueue_send_at_front(b, &inter_st->queue, b->waitset,
736                                             (struct msg_queue_elem *)e);
737        GOTO_IF_ERR(err, handle_err);
738        return;
739    }
740
741handle_err:
742    PANIC_IF_ERR(err, "sending revoke_done");
743    remove_slave_from_list(rvk_st);
744    free(rvk_st);
745}
746
747void
748revoke_done__rx(struct intermon_binding *b,
749                genvaddr_t st)
750{
751    struct intermon_state *ist = b->st;
752    TRACE(CAPOPS, REVOKE_DONE_RX, ist->core_id);
753    DEBUG_CAPOPS("%s\n", __FUNCTION__);
754
755    struct revoke_master_st *rvk_st = (struct revoke_master_st*)(lvaddr_t)st;
756
757    if (!capsend_handle_mc_reply(&rvk_st->revoke_mc_st)) {
758        // multicast not complete
759        return;
760    }
761
762    DEBUG_CAPOPS("%s ## revocation: fin phase\n", __FUNCTION__);
763    rvk_st->remote_fin = true;
764    if (rvk_st->local_fin) {
765        revoke_result__rx(SYS_ERR_OK, rvk_st, true);
766    }
767}
768
769static void
770revoke_master_steps__fin(void *st)
771{
772    TRACE(CAPOPS, REVOKE_MASTER_STEPS_FIN, 0);
773    struct revoke_master_st *rvk_st = (struct revoke_master_st*)st;
774    rvk_st->local_fin = true;
775    if (rvk_st->remote_fin) {
776        revoke_result__rx(SYS_ERR_OK, rvk_st, true);
777    }
778}
779