1/*
2 * Copyright (c) 2016 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <barrelfish/nameservice_client.h>
12#include <devif/queue_interface.h>
13#include <devif/backends/descq.h>
14#include <devif/queue_interface_backend.h>
15#include <if/descq_defs.h>
16#include "descq_debug.h"
17#include <barrelfish/systime.h>
18#include <barrelfish/notificator.h>
19#include <barrelfish/waitset_chan.h>
20
21struct __attribute__((aligned(DESCQ_ALIGNMENT))) desc {
22    genoffset_t offset; // 8
23    genoffset_t length; // 16
24    genoffset_t valid_data; // 24
25    genoffset_t valid_length; // 32
26    uint64_t flags; // 40
27    uint64_t seq; // 48
28    regionid_t rid; // 52
29    uint8_t pad[12];
30};
31
32union __attribute__((aligned(DESCQ_ALIGNMENT))) pointer {
33    volatile size_t value;
34    uint8_t pad[64];
35};
36
37
38struct descq {
39    struct devq q;
40    struct descq_func_pointer f;
41
42    // to get endpoints
43    struct descq_endpoint_state* state;
44
45    // General info
46    size_t slots;
47    char* name;
48    bool bound_done;
49
50    // Descriptor Ring
51    struct desc* rx_descs;
52    struct desc* tx_descs;
53
54    // Flow control
55    uint64_t rx_seq;
56    uint64_t tx_seq;
57    union pointer* rx_seq_ack;
58    union pointer* tx_seq_ack;
59
60    // Flounder
61    struct descq_binding* binding;
62    bool local_bind;
63    uint64_t resend_args;
64
65    // linked list
66    struct descq* next;
67    uint64_t qid;
68
69    struct notificator notificator;
70};
71
72struct descq_endpoint_state {
73    bool exp_done;
74    char* name;
75    struct descq_func_pointer f;
76    struct descq* head;
77    struct descq* tail;
78    uint64_t qid;
79};
80
81// Check if there's anything to read from the queue
82static bool descq_can_read(void *arg)
83{
84    struct descq *q = arg;
85    uint64_t seq = q->rx_descs[q->rx_seq % q->slots].seq;
86
87    if (q->rx_seq > seq) { // the queue is empty
88        return false;
89    }
90    return true;
91}
92
93// Check if we can write to the queue
94static bool descq_can_write(void *arg)
95{
96    struct descq *q = arg;
97
98    if ((q->tx_seq - q->tx_seq_ack->value) >= q->slots) { // the queue is full
99        return false;
100    }
101    return true;
102}
103
104
105/**
106 * @brief Enqueue a descriptor (as seperate fields)
107 *        into the descriptor queue
108 *
109 * @param q                     The descriptor queue
110 * @param region_id             Region id of the enqueued buffer
111 * @param offset                Offset into the region where the buffer resides
112 * @param length                Length of the buffer
113 * @param valid_data            Offset into the region where the valid data
114 *                              of the buffer resides
115 * @param valid_length          Length of the valid data of the buffer
116 * @param misc_flags            Miscellaneous flags
117 *
118 * @returns error if queue is full or SYS_ERR_OK on success
119 */
120static errval_t descq_enqueue(struct devq* queue,
121                              regionid_t region_id,
122                              genoffset_t offset,
123                              genoffset_t length,
124                              genoffset_t valid_data,
125                              genoffset_t valid_length,
126                              uint64_t misc_flags)
127{
128    struct descq* q = (struct descq*) queue;
129    size_t head = q->tx_seq % q->slots;
130
131    if (!descq_can_write(queue)) {
132        return DEVQ_ERR_QUEUE_FULL;
133    }
134
135    q->tx_descs[head].rid = region_id;
136    q->tx_descs[head].offset = offset;
137    q->tx_descs[head].length = length;
138    q->tx_descs[head].valid_data = valid_data;
139    q->tx_descs[head].valid_length = valid_length;
140    q->tx_descs[head].flags = misc_flags;
141
142    //__sync_synchronize();
143
144    q->tx_descs[head].seq = q->tx_seq;
145
146    // only write local head
147    q->tx_seq++;
148
149    DESCQ_DEBUG("tx_seq=%lu tx_seq_ack=%lu \n",
150                q->tx_seq, q->tx_seq_ack->value);
151    return SYS_ERR_OK;
152}
153
154/**
155 * @brief Dequeue a descriptor (as seperate fields)
156 *        from the descriptor queue
157 *
158 * @param q                     The descriptor queue
159 * @param region_id             Return pointer to the region id of
160 *                              the denqueued buffer
161 * @param offset                Return pointer to the offset into the region
162 *                              where the buffer resides
163 * @param length                Return pointer to the length of the buffer
164 * @param valid_data            Return pointer to the offset into the region
165 *                              where the valid data of the buffer resides
166 * @param valid_lenght          Return pointer to the length of the valid
167 *                              data of the buffer
168 * @param misc_flags            Return pointer to miscellaneous flags
169 *
170 * @returns error if queue is empty or SYS_ERR_OK on success
171 */
172static errval_t descq_dequeue(struct devq* queue,
173                              regionid_t* region_id,
174                              genoffset_t* offset,
175                              genoffset_t* length,
176                              genoffset_t* valid_data,
177                              genoffset_t* valid_length,
178                              uint64_t* misc_flags)
179{
180    struct descq* q = (struct descq*) queue;
181
182    if (!descq_can_read(queue)) {
183        return DEVQ_ERR_QUEUE_EMPTY;
184    }
185
186    size_t tail = q->rx_seq % q->slots;
187    *region_id = q->rx_descs[tail].rid;
188    *offset = q->rx_descs[tail].offset;
189    *length = q->rx_descs[tail].length;
190    *valid_data = q->rx_descs[tail].valid_data;
191    *valid_length = q->rx_descs[tail].valid_length;
192    *misc_flags = q->rx_descs[tail].flags;
193
194    //assert(*length > 0);
195
196    q->rx_seq++;
197    q->rx_seq_ack->value = q->rx_seq;
198
199    DESCQ_DEBUG("rx_seq_ack=%lu\n", q->rx_seq_ack->value);
200    return SYS_ERR_OK;
201}
202
203static errval_t descq_notify(struct devq* q)
204{
205    // errval_t err;
206    //errval_t err2;
207    // struct descq* queue = (struct descq*) q;
208    //
209    // err = queue->binding->tx_vtbl.notify(queue->binding, NOP_CONT);
210    // if (err_is_fail(err)) {
211    //
212    //     err = queue->binding->register_send(queue->binding, get_default_waitset(),
213    //                                         MKCONT(resend_notify, queue));
214    //     if (err == LIB_ERR_CHAN_ALREADY_REGISTERED) {
215    //         // dont care about this failure since there is an oustanding message
216    //         // anyway if this fails
217    //         return SYS_ERR_OK;
218    //     } else {
219    //         return err;
220    //     }
221    // }
222    return SYS_ERR_OK;
223}
224
225static errval_t descq_control(struct devq* q, uint64_t cmd,
226                              uint64_t value, uint64_t *result)
227{
228    errval_t err, err2;
229    struct descq* queue = (struct descq*) q;
230
231    DESCQ_DEBUG("start \n");
232    err = queue->binding->rpc_tx_vtbl.control(queue->binding, cmd, value, result, &err2);
233    err = err_is_fail(err) ? err : err2;
234    DESCQ_DEBUG("end\n");
235    return err;
236}
237
238static errval_t descq_register(struct devq* q, struct capref cap,
239                               regionid_t rid)
240{
241    errval_t err, err2;
242    struct descq* queue = (struct descq*) q;
243
244    DESCQ_DEBUG("start %p\n", queue);
245    err = queue->binding->rpc_tx_vtbl.register_region(queue->binding, cap, rid, &err2);
246    err = err_is_fail(err) ? err : err2;
247    DESCQ_DEBUG("end\n");
248    return err;
249}
250
251
252
253/**
254 * @brief Destroys a descriptor queue and frees its resources
255 *
256 * @param que                     The descriptor queue
257 *
258 * @returns error on failure or SYS_ERR_OK on success
259 */
260static errval_t descq_destroy(struct devq* que)
261{
262    errval_t err;
263
264    struct descq* q = (struct descq*) que;
265
266    err = vspace_unmap(q->tx_descs);
267    if (err_is_fail(err)) {
268        return err;
269    }
270
271    err = vspace_unmap(q->rx_descs);
272    if (err_is_fail(err)) {
273        return err;
274    }
275    free(q->name);
276    free(q);
277
278    return SYS_ERR_OK;
279}
280
281static void try_deregister(void* a)
282{
283    errval_t err, err2;
284    struct descq* queue = (struct descq*) a;
285
286    err = queue->binding->rpc_tx_vtbl.deregister_region(queue->binding, queue->resend_args,
287                                                        &err2);
288    assert(err_is_ok(err2) && err_is_ok(err));
289}
290
291
292static errval_t descq_deregister(struct devq* q, regionid_t rid)
293{
294    errval_t err, err2;
295    err2 = SYS_ERR_OK;
296    struct descq* queue = (struct descq*) q;
297
298    err = queue->binding->rpc_tx_vtbl.deregister_region(queue->binding, rid, &err2);
299    if (err_is_fail(err)) {
300        queue->resend_args = rid;
301        while(err_is_fail(err)) {
302            err = queue->binding->register_send(queue->binding, get_default_waitset(),
303                                                MKCONT(try_deregister, queue));
304            if (err_is_fail(err)) {
305                event_dispatch(get_default_waitset());
306            }
307        }
308    }
309    return err2;
310}
311
312/*
313 * Flounder interface implementation
314 */
315
316static void mp_notify(void *arg) {
317    errval_t err;
318    struct descq* q = arg;
319
320    //DESCQ_DEBUG("%p \n",q->f.notify);
321    err = q->f.notify(q);
322
323    assert(err_is_ok(err));
324}
325
326static errval_t rpc_mp_reg(struct descq_binding* b, struct capref cap,
327                           uint32_t rid, errval_t *err)
328{
329    DESCQ_DEBUG("start \n");
330    struct descq* q = (struct descq*) b->st;
331
332    *err = devq_add_region((struct devq*) q, cap, rid);
333    if (err_is_fail(*err)) {
334        return SYS_ERR_OK;
335    }
336
337    *err = q->f.reg(q, cap, rid);
338    DESCQ_DEBUG("end \n");
339    return SYS_ERR_OK;
340}
341static void mp_reg(struct descq_binding* b, struct capref cap,
342                   uint32_t rid)
343{
344    errval_t err, err2;
345    err = rpc_mp_reg(b, cap, rid, &err2);
346    err = b->tx_vtbl.register_region_response(b, NOP_CONT, err2);
347    assert(err_is_ok(err));
348}
349
350static errval_t rpc_mp_dereg(struct descq_binding* b, uint32_t rid,
351                             errval_t *err)
352{
353    struct descq* q = (struct descq*) b->st;
354
355    *err = devq_remove_region((struct devq*) q, rid);
356    if (err_is_fail(*err)) {
357        return SYS_ERR_OK;
358    }
359
360    *err = q->f.dereg(q, rid);
361    return SYS_ERR_OK;
362}
363
364static void mp_dereg(struct descq_binding* b, uint32_t rid)
365{
366    errval_t err, err2;
367    err = rpc_mp_dereg(b, rid, &err2);
368    err = b->tx_vtbl.deregister_region_response(b, NOP_CONT, err2);
369    assert(err_is_ok(err));
370}
371
372static errval_t rpc_mp_control(struct descq_binding* b, uint64_t cmd,
373                               uint64_t value, uint64_t *result, errval_t *err)
374{
375    struct descq* q = (struct descq*) b->st;
376
377    *err = q->f.control(q, cmd, value, result);
378    return SYS_ERR_OK;
379}
380
381static void mp_control(struct descq_binding* b, uint64_t cmd,
382                       uint64_t value)
383{
384    errval_t err, err2;
385    uint64_t result;
386    err = rpc_mp_control(b, cmd, value, &result, &err2);
387    err = b->tx_vtbl.control_response(b, NOP_CONT, result, err2);
388    assert(err_is_ok(err));
389}
390
391static errval_t rpc_mp_destroy(struct descq_binding* b, errval_t *err)
392{
393    struct descq* q = (struct descq*) b->st;
394
395    *err = q->f.destroy(q);
396
397    USER_PANIC("Destroy NYI \n");
398    return SYS_ERR_OK;
399}
400
401static void  mp_destroy(struct descq_binding* b)
402{
403    errval_t err, err2;
404    err = rpc_mp_destroy(b, &err2);
405    err = b->tx_vtbl.destroy_queue_response(b, NOP_CONT, err2);
406    assert(err_is_ok(err));
407}
408
409static errval_t rpc_mp_create(struct descq_binding* b, uint32_t slots,
410                              struct capref rx, struct capref tx,
411                              errval_t *err, uint64_t *queue_id)
412{
413
414    struct descq* q = (struct descq*) b->st;
415    DESCQ_DEBUG("start %p\n",q);
416
417    // switch RX/TX for correct setup
418    *err = vspace_map_one_frame_attr((void**) &(q->rx_descs),
419                                    slots*DESCQ_ALIGNMENT, tx,
420                                    VREGION_FLAGS_READ_WRITE, NULL, NULL);
421    if (err_is_fail(*err)) {
422        goto end2;
423    }
424
425    *err = vspace_map_one_frame_attr((void**) &(q->tx_descs),
426                                    slots*DESCQ_ALIGNMENT, rx,
427                                    VREGION_FLAGS_READ_WRITE, NULL, NULL);
428    if (err_is_fail(*err)) {
429        goto end1;
430    }
431
432    q->tx_seq_ack = (void*)q->tx_descs;
433    q->rx_seq_ack = (void*)q->rx_descs;
434    q->tx_descs++;
435    q->rx_descs++;
436    q->slots = slots-1;
437    q->rx_seq = 1;
438    q->tx_seq = 1;
439
440    devq_init(&q->q, true);
441
442    q->q.f.enq = descq_enqueue;
443    q->q.f.deq = descq_dequeue;
444    q->q.f.notify = descq_notify;
445    q->q.f.reg = descq_register;
446    q->q.f.dereg = descq_deregister;
447    q->q.f.ctrl = descq_control;
448    q->q.f.destroy = descq_destroy;
449
450    notificator_init(&q->notificator, q, descq_can_read, descq_can_write);
451    *err = waitset_chan_register(get_default_waitset(), &q->notificator.ready_to_read, MKCLOSURE(mp_notify, q));
452    assert(err_is_ok(*err));
453
454    *err = q->f.create(q, queue_id);
455    if (err_is_ok(*err)) {
456        goto end2;
457    }
458
459end1:
460    *err = vspace_unmap(q->rx_descs);
461    assert(err_is_ok(*err));
462end2:
463    DESCQ_DEBUG("end \n");
464    return SYS_ERR_OK;
465}
466
467static void mp_create(struct descq_binding* b, uint32_t slots,
468                      struct capref rx, struct capref tx)
469{
470    errval_t err, err2;
471    uint64_t qid;
472    err = rpc_mp_create(b, slots, rx, tx, &err2, &qid);
473    err = b->tx_vtbl.create_queue_response(b, NOP_CONT, err2, qid);
474    assert(err_is_ok(err));
475}
476
477static struct descq_rpc_rx_vtbl rpc_rx_vtbl = {
478    .create_queue_call = rpc_mp_create,
479    .destroy_queue_call = rpc_mp_destroy,
480    .register_region_call = rpc_mp_reg,
481    .deregister_region_call = rpc_mp_dereg,
482    .control_call = rpc_mp_control,
483};
484
485static struct descq_rx_vtbl rx_vtbl = {
486    .create_queue_call = mp_create,
487    .destroy_queue_call = mp_destroy,
488    .register_region_call = mp_reg,
489    .deregister_region_call = mp_dereg,
490    .control_call = mp_control,
491};
492
493static void export_cb(void *st, errval_t err, iref_t iref)
494{
495    struct descq_endpoint_state* q = (struct descq_endpoint_state*) st;
496
497    err = nameservice_register(q->name, iref);
498    assert(err_is_ok(err));
499    q->exp_done = true;
500    // state is only function pointers
501    DESCQ_DEBUG("Control interface exported (%s)\n", q->name);
502}
503
504static errval_t connect_cb(void *st, struct descq_binding* b)
505{
506    struct descq* q;
507    struct descq_endpoint_state* state = (struct descq_endpoint_state*) st;
508    // Allocate state
509    q = malloc(sizeof(struct descq));
510    if (q == NULL) {
511        return DEVQ_ERR_DESCQ_INIT;
512    }
513    q->binding = b;
514
515    q->qid = state->qid;
516    state->qid++;
517    q->next = NULL;
518
519    q->f.create = state->f.create;
520    q->f.notify = state->f.notify;
521    q->f.destroy = state->f.destroy;
522    q->f.control = state->f.control;
523    q->f.reg = state->f.reg;
524    q->f.dereg = state->f.dereg;
525
526    if (state->head == NULL) {
527        // allocated state
528        state->head = q;
529        state->tail = q;
530    } else {
531        state->tail->next = q;
532        state->tail = q;
533    }
534
535    b->rpc_rx_vtbl = rpc_rx_vtbl;
536    b->st = q;
537    q->local_bind = b->local_binding != NULL;
538
539    return SYS_ERR_OK;
540}
541
542
543static void bind_cb(void *st, errval_t err, struct descq_binding* b)
544
545{
546    struct descq* q = (struct descq*) st;
547    DESCQ_DEBUG("Interface bound \n");
548    q->binding = b;
549    descq_rpc_client_init(q->binding);
550
551    q->bound_done = true;
552    b->st = q;
553}
554
555// all arguments (some might be null) so we can crate descq by using
556// an EP or the name service
557static errval_t descq_create_internal(struct descq** q,
558                                      size_t slots,
559                                      char* name,
560                                      struct capref ep,
561                                      bool exp,
562                                      uint64_t *queue_id,
563                                      struct descq_func_pointer* f)
564{
565    DESCQ_DEBUG("create start\n");
566    errval_t err;
567    struct descq* tmp;
568    struct capref rx;
569    struct capref tx;
570
571    if (!capref_is_null(ep) && exp) {
572        printf("Can not initalized descq with an endpoint when export. Endpoints are"
573               " currently only 1:1 connections \n");
574        return DEVQ_ERR_DESCQ_INIT;
575    }
576
577    // Init basic struct fields
578    tmp = malloc(sizeof(struct descq));
579    assert(tmp != NULL);
580    tmp->name = strdup(name);
581    assert(tmp->name != NULL);
582
583    if (exp) {  // exporting
584
585        DESCQ_DEBUG("Exporting using name %s\n", name);
586        struct descq_endpoint_state* state = malloc(sizeof(struct descq_endpoint_state));
587        state->name = strdup(name);
588        assert(state->name);
589
590
591        state->f.notify = f->notify;
592        state->f.dereg = f->dereg;
593        state->f.reg = f->reg;
594        state->f.create = f->create;
595        state->f.destroy = f->destroy;
596        state->f.control = f->control;
597        tmp->state = state;
598
599        err = descq_export(tmp->state, export_cb, connect_cb,
600                           get_default_waitset(), IDC_BIND_FLAGS_DEFAULT);
601        if (err_is_fail(err)) {
602            goto cleanup1;
603        }
604
605        while(!state->exp_done) {
606            event_dispatch(get_default_waitset());
607        }
608
609        DESCQ_DEBUG("Exporting done \n");
610    } else {
611
612        tmp->f.notify = f->notify;
613        tmp->f.dereg = f->dereg;
614        tmp->f.reg = f->reg;
615        tmp->f.create = f->create;
616        tmp->f.destroy = f->destroy;
617        tmp->f.control = f->control;
618        size_t bytes;
619
620        DESCQ_DEBUG("Allocating RX/TX frame\n");
621        err = frame_alloc(&rx, DESCQ_ALIGNMENT*slots, &bytes);
622        if (err_is_fail(err)) {
623            goto cleanup1;
624        }
625
626        assert(bytes >= DESCQ_ALIGNMENT*slots);
627
628        err = frame_alloc(&tx, DESCQ_ALIGNMENT*slots, &bytes);
629        if (err_is_fail(err)) {
630            goto cleanup2;
631        }
632
633        assert(bytes >= DESCQ_ALIGNMENT*slots);
634
635        DESCQ_DEBUG("Mapping RX/TX frame\n");
636        err = vspace_map_one_frame_attr((void**) &(tmp->rx_descs),
637                                        slots*DESCQ_ALIGNMENT, rx,
638                                        VREGION_FLAGS_READ_WRITE, NULL, NULL);
639        if (err_is_fail(err)) {
640            goto cleanup3;
641        }
642
643        err = vspace_map_one_frame_attr((void**) &(tmp->tx_descs),
644                                        slots*DESCQ_ALIGNMENT, tx,
645                                        VREGION_FLAGS_READ_WRITE, NULL, NULL);
646        if (err_is_fail(err)) {
647            goto cleanup4;
648        }
649
650        memset(tmp->tx_descs, 0, slots*DESCQ_ALIGNMENT);
651        memset(tmp->rx_descs, 0, slots*DESCQ_ALIGNMENT);
652
653        tmp->bound_done = false;
654        iref_t iref;
655
656        if (!capref_is_null(ep)) {
657            DESCQ_DEBUG("Bind to other endpoint using endpoint\n");
658            err = descq_bind_to_endpoint(ep, bind_cb, tmp, get_default_waitset(),
659                                         IDC_BIND_FLAGS_DEFAULT);
660            if (err_is_fail(err)) {
661                goto cleanup5;
662            }
663        } else {
664            DESCQ_DEBUG("Bind to other endpoint using nameservice\n");
665            if (strcmp("", name) == 0) {
666                printf("Can not initalized descq with empty name \n");
667                err = DEVQ_ERR_DESCQ_INIT;
668                goto cleanup5;
669            }
670
671            err = nameservice_blocking_lookup(name, &iref);
672            if (err_is_fail(err)) {
673                goto cleanup5;
674            }
675
676            err = descq_bind(iref, bind_cb, tmp, get_default_waitset(),
677                             IDC_BIND_FLAGS_DEFAULT);
678            if (err_is_fail(err)) {
679                goto cleanup5;
680            }
681        }
682
683        while(!tmp->bound_done) {
684            event_dispatch(get_default_waitset());
685        }
686
687        tmp->local_bind = tmp->binding->local_binding != NULL;
688
689        DESCQ_DEBUG("Create queue RPC call\n");
690        errval_t err2;
691        err = tmp->binding->rpc_tx_vtbl.create_queue(tmp->binding, slots, rx, tx, &err2, queue_id);
692        if (err_is_fail(err) || err_is_fail(err2)) {
693            err = err_is_fail(err) ? err: err2;
694            goto cleanup5;
695        }
696
697        DESCQ_DEBUG("Create queue RPC call done\n");
698        tmp->tx_seq_ack = (void*)tmp->tx_descs;
699        tmp->rx_seq_ack = (void*)tmp->rx_descs;
700        tmp->tx_seq_ack->value = 0;
701        tmp->rx_seq_ack->value = 0;
702        tmp->tx_descs++;
703        tmp->rx_descs++;
704        tmp->slots = slots-1;
705        tmp->rx_seq = 1;
706        tmp->tx_seq = 1;
707
708        devq_init(&tmp->q, false);
709
710        tmp->q.f.enq = descq_enqueue;
711        tmp->q.f.deq = descq_dequeue;
712        tmp->q.f.notify = descq_notify;
713        tmp->q.f.reg = descq_register;
714        tmp->q.f.dereg = descq_deregister;
715        tmp->q.f.ctrl = descq_control;
716
717        notificator_init(&tmp->notificator, tmp, descq_can_read, descq_can_write);
718        err = waitset_chan_register(get_default_waitset(), &tmp->notificator.ready_to_read, MKCLOSURE(mp_notify, tmp));
719        assert(err_is_ok(err));
720    }
721
722
723    *q = tmp;
724
725    DESCQ_DEBUG("create end %p \n", *q);
726    return SYS_ERR_OK;
727
728cleanup5:
729    vspace_unmap(tmp->rx_descs);
730cleanup4:
731    vspace_unmap(tmp->rx_descs);
732cleanup3:
733    cap_destroy(tx);
734cleanup2:
735    cap_destroy(rx);
736cleanup1:
737    free(tmp->name);
738    free(tmp);
739
740    return err;
741
742}
743
744/**
745 * @brief initialized a descriptor queue
746 */
747errval_t descq_create(struct descq** q,
748                      size_t slots,
749                      char* name,
750                      bool exp,
751                      uint64_t *queue_id,
752                      struct descq_func_pointer* f)
753{
754    return descq_create_internal(q, slots, name, NULL_CAP, exp, queue_id, f);
755}
756
757/**
758 * @brief initialized a descriptor queue
759 */
760errval_t descq_create_with_ep(struct descq** q,
761                              size_t slots,
762                              struct capref ep,
763                              uint64_t *queue_id,
764                              struct descq_func_pointer* f)
765{
766    return descq_create_internal(q, slots, "", ep, false, queue_id, f);
767}
768
769
770/**
771 * @brief Create an endpoint from an exporting queue. If the queue is not exporting,
772 *        the call will fail.
773 *
774 * @param q                     Pointer to the descriptor queue
775 * @param slots                 Core on which the other EP will be used
776 * @param ep                    Returned endpoint
777 * @param exp                   Export desq_ctrl/descq_data flounder interface
778 *                              (At least one of the sides of the channel hast to do so)
779 * @param queue_id              queue id
780 * @param f                     Function pointers to be called on message recv
781 *
782 * @returns error on failure or SYS_ERR_OK on success
783 */
784errval_t descq_create_ep(struct descq* queue,
785                         coreid_t core,
786                         struct capref* ep)
787{
788    errval_t err = SYS_ERR_OK;
789    if (queue->state != NULL) {
790        struct descq* q;
791        struct descq_endpoint_state* state = (struct descq_endpoint_state*) queue->state;
792        // Allocate state
793        q = malloc(sizeof(struct descq));
794        if (q == NULL) {
795            return DEVQ_ERR_DESCQ_INIT;
796        }
797
798        q->qid = state->qid;
799        state->qid++;
800        q->next = NULL;
801
802        q->f.create = state->f.create;
803        q->f.notify = state->f.notify;
804        q->f.destroy = state->f.destroy;
805        q->f.control = state->f.control;
806        q->f.reg = state->f.reg;
807        q->f.dereg = state->f.dereg;
808
809
810        idc_endpoint_t type = (core == disp_get_core_id())? IDC_ENDPOINT_LMP: IDC_ENDPOINT_UMP;
811        err = descq_create_endpoint(type, &rx_vtbl, q,
812                                    get_default_waitset(),
813                                    IDC_ENDPOINT_FLAGS_DUMMY,
814                                    &q->binding, *ep);
815
816        q->local_bind = q->binding->local_binding != NULL;
817        q->binding->rpc_rx_vtbl = rpc_rx_vtbl;
818        q->binding->st = q;
819    } else {
820        printf("Can not create an endpoint from not exporting queue \n");
821        return DEVQ_ERR_DESCQ_INIT;
822    }
823    return err;
824}
825