1/*
2 * Copyright (c) 2016 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <barrelfish/nameservice_client.h>
12#include <devif/queue_interface.h>
13#include <devif/backends/descq.h>
14#include <devif/queue_interface_backend.h>
15#include <if/descq_defs.h>
16#include "descq_debug.h"
17#include <barrelfish/systime.h>
18#include <barrelfish/notificator.h>
19#include <barrelfish/waitset_chan.h>
20
21struct __attribute__((aligned(DESCQ_ALIGNMENT))) desc {
22    genoffset_t offset; // 8
23    genoffset_t length; // 16
24    genoffset_t valid_data; // 24
25    genoffset_t valid_length; // 32
26    uint64_t flags; // 40
27    uint64_t seq; // 48
28    regionid_t rid; // 52
29    uint8_t pad[12];
30};
31
32union __attribute__((aligned(DESCQ_ALIGNMENT))) pointer {
33    volatile size_t value;
34    uint8_t pad[64];
35};
36
37
38struct descq {
39    struct devq q;
40    struct descq_func_pointer f;
41
42    // General info
43    size_t slots;
44    char* name;
45    bool bound_done;
46
47    // Descriptor Ring
48    struct desc* rx_descs;
49    struct desc* tx_descs;
50
51    // Flow control
52    uint64_t rx_seq;
53    uint64_t tx_seq;
54    union pointer* rx_seq_ack;
55    union pointer* tx_seq_ack;
56
57    // Flounder
58    struct descq_binding* binding;
59    bool local_bind;
60    uint64_t resend_args;
61
62    // linked list
63    struct descq* next;
64    uint64_t qid;
65
66    struct notificator notificator;
67    bool notifications;
68};
69
70struct descq_endpoint_state {
71    bool exp_done;
72    char* name;
73    struct descq_func_pointer f;
74    struct descq* head;
75    struct descq* tail;
76    uint64_t qid;
77};
78
79// Check if there's anything to read from the queue
80static bool descq_can_read(void *arg)
81{
82    struct descq *q = arg;
83    uint64_t seq = q->rx_descs[q->rx_seq % q->slots].seq;
84
85    if (q->rx_seq > seq) { // the queue is empty
86        return false;
87    }
88    return true;
89}
90
91// Check if we can write to the queue
92static bool descq_can_write(void *arg)
93{
94    struct descq *q = arg;
95
96    if ((q->tx_seq - q->tx_seq_ack->value) >= q->slots) { // the queue is full
97        return false;
98    }
99    return true;
100}
101
102
103/**
104 * @brief Enqueue a descriptor (as seperate fields)
105 *        into the descriptor queue
106 *
107 * @param q                     The descriptor queue
108 * @param region_id             Region id of the enqueued buffer
109 * @param offset                Offset into the region where the buffer resides
110 * @param length                Length of the buffer
111 * @param valid_data            Offset into the region where the valid data
112 *                              of the buffer resides
113 * @param valid_length          Length of the valid data of the buffer
114 * @param misc_flags            Miscellaneous flags
115 *
116 * @returns error if queue is full or SYS_ERR_OK on success
117 */
118static errval_t descq_enqueue(struct devq* queue,
119                              regionid_t region_id,
120                              genoffset_t offset,
121                              genoffset_t length,
122                              genoffset_t valid_data,
123                              genoffset_t valid_length,
124                              uint64_t misc_flags)
125{
126    struct descq* q = (struct descq*) queue;
127    size_t head = q->tx_seq % q->slots;
128
129    if (!descq_can_write(queue)) {
130        return DEVQ_ERR_QUEUE_FULL;
131    }
132
133    q->tx_descs[head].rid = region_id;
134    q->tx_descs[head].offset = offset;
135    q->tx_descs[head].length = length;
136    q->tx_descs[head].valid_data = valid_data;
137    q->tx_descs[head].valid_length = valid_length;
138    q->tx_descs[head].flags = misc_flags;
139
140    __sync_synchronize();
141
142    q->tx_descs[head].seq = q->tx_seq;
143
144    // only write local head
145    q->tx_seq++;
146
147    DESCQ_DEBUG("tx_seq=%lu tx_seq_ack=%lu \n",
148                    q->tx_seq, q->tx_seq_ack->value);
149    return SYS_ERR_OK;
150}
151
152/**
153 * @brief Dequeue a descriptor (as seperate fields)
154 *        from the descriptor queue
155 *
156 * @param q                     The descriptor queue
157 * @param region_id             Return pointer to the region id of
158 *                              the denqueued buffer
159 * @param offset                Return pointer to the offset into the region
160 *                              where the buffer resides
161 * @param length                Return pointer to the length of the buffer
162 * @param valid_data            Return pointer to the offset into the region
163 *                              where the valid data of the buffer resides
164 * @param valid_lenght          Return pointer to the length of the valid
165 *                              data of the buffer
166 * @param misc_flags            Return pointer to miscellaneous flags
167 *
168 * @returns error if queue is empty or SYS_ERR_OK on success
169 */
170static errval_t descq_dequeue(struct devq* queue,
171                              regionid_t* region_id,
172                              genoffset_t* offset,
173                              genoffset_t* length,
174                              genoffset_t* valid_data,
175                              genoffset_t* valid_length,
176                              uint64_t* misc_flags)
177{
178    struct descq* q = (struct descq*) queue;
179
180    if (!descq_can_read(queue)) {
181        return DEVQ_ERR_QUEUE_EMPTY;
182    }
183
184    size_t tail = q->rx_seq % q->slots;
185    *region_id = q->rx_descs[tail].rid;
186    *offset = q->rx_descs[tail].offset;
187    *length = q->rx_descs[tail].length;
188    *valid_data = q->rx_descs[tail].valid_data;
189    *valid_length = q->rx_descs[tail].valid_length;
190    *misc_flags = q->rx_descs[tail].flags;
191
192    //assert(*length > 0);
193
194    q->rx_seq++;
195    q->rx_seq_ack->value = q->rx_seq;
196
197    DESCQ_DEBUG("rx_seq_ack=%lu\n", q->rx_seq_ack->value);
198    return SYS_ERR_OK;
199}
200
201static errval_t descq_notify(struct devq* q)
202{
203    // errval_t err;
204    //errval_t err2;
205    // struct descq* queue = (struct descq*) q;
206    //
207    // err = queue->binding->tx_vtbl.notify(queue->binding, NOP_CONT);
208    // if (err_is_fail(err)) {
209    //
210    //     err = queue->binding->register_send(queue->binding, get_default_waitset(),
211    //                                         MKCONT(resend_notify, queue));
212    //     if (err == LIB_ERR_CHAN_ALREADY_REGISTERED) {
213    //         // dont care about this failure since there is an oustanding message
214    //         // anyway if this fails
215    //         return SYS_ERR_OK;
216    //     } else {
217    //         return err;
218    //     }
219    // }
220    return SYS_ERR_OK;
221}
222
223static errval_t descq_control(struct devq* q, uint64_t cmd,
224                              uint64_t value, uint64_t *result)
225{
226    errval_t err, err2;
227    struct descq* queue = (struct descq*) q;
228
229    DESCQ_DEBUG("start \n");
230    err = queue->binding->rpc_tx_vtbl.control(queue->binding, cmd, value, result, &err2);
231    err = err_is_fail(err) ? err : err2;
232    DESCQ_DEBUG("end\n");
233    return err;
234}
235
236static errval_t descq_register(struct devq* q, struct capref cap,
237                               regionid_t rid)
238{
239    errval_t err, err2;
240    struct descq* queue = (struct descq*) q;
241
242    DESCQ_DEBUG("start %p\n", queue);
243    err = queue->binding->rpc_tx_vtbl.register_region(queue->binding, cap, rid, &err2);
244    err = err_is_fail(err) ? err : err2;
245    DESCQ_DEBUG("end\n");
246    return err;
247}
248
249
250
251/**
252 * @brief Destroys a descriptor queue and frees its resources
253 *
254 * @param que                     The descriptor queue
255 *
256 * @returns error on failure or SYS_ERR_OK on success
257 */
258static errval_t descq_destroy(struct devq* que)
259{
260    errval_t err;
261
262    struct descq* q = (struct descq*) que;
263
264    err = vspace_unmap(q->tx_descs);
265    if (err_is_fail(err)) {
266        return err;
267    }
268
269    err = vspace_unmap(q->rx_descs);
270    if (err_is_fail(err)) {
271        return err;
272    }
273    free(q->name);
274    free(q);
275
276    return SYS_ERR_OK;
277}
278
279static void try_deregister(void* a)
280{
281    errval_t err, err2;
282    struct descq* queue = (struct descq*) a;
283
284    err = queue->binding->rpc_tx_vtbl.deregister_region(queue->binding, queue->resend_args,
285                                                        &err2);
286    assert(err_is_ok(err2) && err_is_ok(err));
287}
288
289
290static errval_t descq_deregister(struct devq* q, regionid_t rid)
291{
292    errval_t err, err2;
293    err2 = SYS_ERR_OK;
294    struct descq* queue = (struct descq*) q;
295
296    err = queue->binding->rpc_tx_vtbl.deregister_region(queue->binding, rid, &err2);
297    if (err_is_fail(err)) {
298        queue->resend_args = rid;
299        while(err_is_fail(err)) {
300            err = queue->binding->register_send(queue->binding, get_default_waitset(),
301                                                MKCONT(try_deregister, queue));
302            if (err_is_fail(err)) {
303                event_dispatch(get_default_waitset());
304            }
305        }
306    }
307    return err2;
308}
309
310/*
311 * Flounder interface implementation
312 */
313
314static void mp_notify(void *arg) {
315    DESCQ_DEBUG("start \n");
316    errval_t err;
317    struct descq* q = arg;
318
319    DESCQ_DEBUG("%p \n",q->f.notify);
320    err = q->f.notify(q);
321
322    DESCQ_DEBUG("end\n");
323    assert(err_is_ok(err));
324}
325
326
327static errval_t mp_reg(struct descq_binding* b, struct capref cap,
328                   uint32_t rid, errval_t *err)
329{
330    DESCQ_DEBUG("start \n");
331    struct descq* q = (struct descq*) b->st;
332
333    *err = devq_add_region((struct devq*) q, cap, rid);
334    if (err_is_fail(*err)) {
335        return SYS_ERR_OK;
336    }
337
338    *err = q->f.reg(q, cap, rid);
339    DESCQ_DEBUG("end \n");
340    return SYS_ERR_OK;
341}
342
343static errval_t mp_dereg(struct descq_binding* b, uint32_t rid,
344                         errval_t *err)
345{
346    struct descq* q = (struct descq*) b->st;
347
348    *err = devq_remove_region((struct devq*) q, rid);
349    if (err_is_fail(*err)) {
350        return SYS_ERR_OK;
351    }
352
353    *err = q->f.dereg(q, rid);
354    return SYS_ERR_OK;
355}
356
357static errval_t mp_control(struct descq_binding* b, uint64_t cmd,
358                       uint64_t value, uint64_t *result, errval_t *err)
359{
360    struct descq* q = (struct descq*) b->st;
361
362    *err = q->f.control(q, cmd, value, result);
363    return SYS_ERR_OK;
364}
365
366static errval_t mp_destroy(struct descq_binding* b, errval_t *err)
367{
368    struct descq* q = (struct descq*) b->st;
369
370    *err = q->f.destroy(q);
371
372    USER_PANIC("Destroy NYI \n");
373    return SYS_ERR_OK;
374}
375
376static errval_t mp_create(struct descq_binding* b, uint32_t slots,
377        struct capref rx, struct capref tx, bool notifications, uint8_t role,
378        errval_t *err, uint64_t *queue_id) {
379
380    struct descq* q = (struct descq*) b->st;
381    DESCQ_DEBUG("start %p\n",q);
382
383    // switch RX/TX for correct setup
384    *err = vspace_map_one_frame_attr((void**) &(q->rx_descs),
385                                    slots*DESCQ_ALIGNMENT, tx,
386                                    VREGION_FLAGS_READ_WRITE, NULL, NULL);
387    if (err_is_fail(*err)) {
388        goto end2;
389    }
390
391    *err = vspace_map_one_frame_attr((void**) &(q->tx_descs),
392                                    slots*DESCQ_ALIGNMENT, rx,
393                                    VREGION_FLAGS_READ_WRITE, NULL, NULL);
394    if (err_is_fail(*err)) {
395        goto end1;
396    }
397
398    q->tx_seq_ack = (void*)q->tx_descs;
399    q->rx_seq_ack = (void*)q->rx_descs;
400    q->tx_descs++;
401    q->rx_descs++;
402    q->slots = slots-1;
403    q->rx_seq = 1;
404    q->tx_seq = 1;
405
406    devq_init(&q->q, true);
407
408    q->q.f.enq = descq_enqueue;
409    q->q.f.deq = descq_dequeue;
410    q->q.f.notify = descq_notify;
411    q->q.f.reg = descq_register;
412    q->q.f.dereg = descq_deregister;
413    q->q.f.ctrl = descq_control;
414    q->q.f.destroy = descq_destroy;
415
416    notificator_init(&q->notificator, q, descq_can_read, descq_can_write);
417    *err = waitset_chan_register(get_default_waitset(), &q->notificator.ready_to_read, MKCLOSURE(mp_notify, q));
418    assert(err_is_ok(*err));
419
420    *err = q->f.create(q, notifications, role, queue_id);
421    if (err_is_ok(*err)) {
422        goto end2;
423    }
424
425end1:
426    *err = vspace_unmap(q->rx_descs);
427    assert(err_is_ok(*err));
428end2:
429    DESCQ_DEBUG("end \n");
430    return SYS_ERR_OK;
431}
432
433static struct descq_rpc_rx_vtbl rpc_rx_vtbl = {
434    .create_queue_call = mp_create,
435    .destroy_queue_call = mp_destroy,
436    .register_region_call = mp_reg,
437    .deregister_region_call = mp_dereg,
438    .control_call = mp_control,
439};
440
441static void export_cb(void *st, errval_t err, iref_t iref)
442{
443    struct descq_endpoint_state* q = (struct descq_endpoint_state*) st;
444
445    err = nameservice_register(q->name, iref);
446    assert(err_is_ok(err));
447    q->exp_done = true;
448    // state is only function pointers
449    DESCQ_DEBUG("Control interface exported (%s)\n", q->name);
450}
451
452static errval_t connect_cb(void *st, struct descq_binding* b)
453{
454    struct descq* q;
455    struct descq_endpoint_state* state = (struct descq_endpoint_state*) st;
456    // Allocate state
457    q = malloc(sizeof(struct descq));
458    if (q == NULL) {
459        return DEVQ_ERR_DESCQ_INIT;
460    }
461    q->binding = b;
462
463    q->qid = state->qid;
464    state->qid++;
465    q->next = NULL;
466
467    q->f.create = state->f.create;
468    q->f.notify = state->f.notify;
469    q->f.destroy = state->f.destroy;
470    q->f.control = state->f.control;
471    q->f.reg = state->f.reg;
472    q->f.dereg = state->f.dereg;
473
474    if (state->head == NULL) {
475        // allocated state
476        state->head = q;
477        state->tail = q;
478    } else {
479        state->tail->next = q;
480        state->tail = q;
481    }
482
483    b->rpc_rx_vtbl = rpc_rx_vtbl;
484    b->st = q;
485    q->local_bind = b->local_binding != NULL;
486
487    return SYS_ERR_OK;
488}
489
490
491static void bind_cb(void *st, errval_t err, struct descq_binding* b)
492
493{
494    struct descq* q = (struct descq*) st;
495    DESCQ_DEBUG("Interface bound \n");
496    q->binding = b;
497    descq_rpc_client_init(q->binding);
498
499    q->bound_done = true;
500    b->st = q;
501}
502
503/**
504 * @brief initialized a descriptor queue
505 */
506errval_t descq_create(struct descq** q,
507                      size_t slots,
508                      char* name,
509                      bool exp,
510                      bool notifications,
511                      uint8_t role,
512                      uint64_t *queue_id,
513                      struct descq_func_pointer* f)
514{
515    DESCQ_DEBUG("create start\n");
516    errval_t err;
517    struct descq* tmp;
518    struct capref rx;
519    struct capref tx;
520
521    // Init basic struct fields
522    tmp = malloc(sizeof(struct descq));
523    assert(tmp != NULL);
524    tmp->name = strdup(name);
525    assert(tmp->name != NULL);
526
527    if (exp) {  // exporting
528        struct descq_endpoint_state* state = malloc(sizeof(struct descq_endpoint_state));
529        state->name = strdup(name);
530        assert(state->name);
531
532        state->f.notify = f->notify;
533        state->f.dereg = f->dereg;
534        state->f.reg = f->reg;
535        state->f.create = f->create;
536        state->f.destroy = f->destroy;
537        state->f.control = f->control;
538
539        err = descq_export(state, export_cb, connect_cb,
540                                get_default_waitset(), IDC_BIND_FLAGS_DEFAULT);
541        if (err_is_fail(err)) {
542            goto cleanup1;
543        }
544
545        while(!state->exp_done) {
546            event_dispatch(get_default_waitset());
547        }
548
549    } else {
550
551        tmp->f.notify = f->notify;
552        tmp->f.dereg = f->dereg;
553        tmp->f.reg = f->reg;
554        tmp->f.create = f->create;
555        tmp->f.destroy = f->destroy;
556        tmp->f.control = f->control;
557        size_t bytes;
558
559        err = frame_alloc(&rx, DESCQ_ALIGNMENT*slots, &bytes);
560        if (err_is_fail(err)) {
561            goto cleanup1;
562        }
563
564        assert(bytes >= DESCQ_ALIGNMENT*slots);
565
566        err = frame_alloc(&tx, DESCQ_ALIGNMENT*slots, &bytes);
567        if (err_is_fail(err)) {
568            goto cleanup2;
569        }
570
571        assert(bytes >= DESCQ_ALIGNMENT*slots);
572
573        err = vspace_map_one_frame_attr((void**) &(tmp->rx_descs),
574                                        slots*DESCQ_ALIGNMENT, rx,
575                                        VREGION_FLAGS_READ_WRITE, NULL, NULL);
576        if (err_is_fail(err)) {
577            goto cleanup3;
578        }
579
580        err = vspace_map_one_frame_attr((void**) &(tmp->tx_descs),
581                                        slots*DESCQ_ALIGNMENT, tx,
582                                        VREGION_FLAGS_READ_WRITE, NULL, NULL);
583        if (err_is_fail(err)) {
584            goto cleanup4;
585        }
586
587        memset(tmp->tx_descs, 0, slots*DESCQ_ALIGNMENT);
588        memset(tmp->rx_descs, 0, slots*DESCQ_ALIGNMENT);
589
590        tmp->bound_done = false;
591        iref_t iref;
592
593        err = nameservice_blocking_lookup(name, &iref);
594        if (err_is_fail(err)) {
595            goto cleanup5;
596        }
597
598        err = descq_bind(iref, bind_cb, tmp, get_default_waitset(),
599                              IDC_BIND_FLAGS_DEFAULT);
600        if (err_is_fail(err)) {
601            goto cleanup5;
602        }
603
604        while(!tmp->bound_done) {
605            event_dispatch(get_default_waitset());
606        }
607
608        tmp->local_bind = tmp->binding->local_binding != NULL;
609
610        errval_t err2;
611        err = tmp->binding->rpc_tx_vtbl.create_queue(tmp->binding, slots, rx, tx,
612            notifications, role, &err2, queue_id);
613        if (err_is_fail(err) || err_is_fail(err2)) {
614            err = err_is_fail(err) ? err: err2;
615            goto cleanup5;
616        }
617
618        tmp->tx_seq_ack = (void*)tmp->tx_descs;
619        tmp->rx_seq_ack = (void*)tmp->rx_descs;
620        tmp->tx_seq_ack->value = 0;
621        tmp->rx_seq_ack->value = 0;
622        tmp->tx_descs++;
623        tmp->rx_descs++;
624        tmp->slots = slots-1;
625        tmp->rx_seq = 1;
626        tmp->tx_seq = 1;
627
628        devq_init(&tmp->q, false);
629
630        tmp->q.f.enq = descq_enqueue;
631        tmp->q.f.deq = descq_dequeue;
632        tmp->q.f.notify = descq_notify;
633        tmp->q.f.reg = descq_register;
634        tmp->q.f.dereg = descq_deregister;
635        tmp->q.f.ctrl = descq_control;
636
637        tmp->notifications = notifications;
638
639        notificator_init(&tmp->notificator, tmp, descq_can_read, descq_can_write);
640        err = waitset_chan_register(get_default_waitset(), &tmp->notificator.ready_to_read, MKCLOSURE(mp_notify, tmp));
641        assert(err_is_ok(err));
642    }
643
644
645    *q = tmp;
646
647    DESCQ_DEBUG("create end %p \n", *q);
648    return SYS_ERR_OK;
649
650cleanup5:
651    vspace_unmap(tmp->rx_descs);
652cleanup4:
653    vspace_unmap(tmp->rx_descs);
654cleanup3:
655    cap_destroy(tx);
656cleanup2:
657    cap_destroy(rx);
658cleanup1:
659    free(tmp->name);
660    free(tmp);
661
662    return err;
663
664}
665
666
667