1/**
2 * \file
3 * \brief Unidirectional bulk data transfer via shared memory
4 */
5
6/*
7 * Copyright (c) 2013, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <barrelfish/event_queue.h>
16#include <string.h>
17
18#include <bulk_transfer/bulk_transfer.h>
19#include <bulk_transfer/bulk_allocator.h>
20#include <bulk_transfer/bulk_local.h>
21
22#include "../../bulk_pool.h"
23#include "../../bulk_buffer.h"
24
25#include "../../helpers.h"
26
27//#define IMPL_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg);
28#define IMPL_DEBUG(fmt, msg...)
29
30//#define EVENT_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg);
31#define EVENT_DEBUG(fmt, msg...)
32//#define EVENT_DEBUG_TRACE debug_printf("%s\n", __func__);
33#define EVENT_DEBUG_TRACE
34
35struct local_channel
36{
37    struct bulk_channel *other;
38    struct event_queue events;
39};
40
41struct local_event
42{
43    struct event_queue_node eqn;
44    struct bulk_channel *channel;
45    void (*handler)(struct local_event *);
46    struct bulk_continuation cont;
47    union
48    {
49        struct
50        {
51            errval_t err;
52        } status;
53
54        struct
55        {
56            errval_t err;
57        } bind_done;
58
59        struct
60        {
61            errval_t err;
62            struct bulk_pool *pool;
63        } pool_assigned;
64
65        struct
66        {
67            struct bulk_pool_id pool_id;
68            size_t buf_id;
69            struct bulk_buffer *buffer;
70            void *meta;
71        } move_received;
72
73        struct
74        {
75            struct bulk_buffer *buffer;
76            struct bulk_pool_id pool_id;
77            size_t buf_id;
78            void *meta;
79        } buffer_received;
80
81        struct
82        {
83            struct bulk_pool_id pool_id;
84            size_t buf_id;
85            struct bulk_buffer *buffer;
86            void *meta;
87        } copy_received;
88
89        struct
90        {
91            struct bulk_pool_id pool_id;
92            size_t buf_id;
93            struct bulk_buffer *buffer;
94        } copy_released;
95    } params;
96};
97
98static void event_handler(void *arg);
99
100static errval_t impl_create(struct bulk_channel *channel);
101static errval_t impl_bind(struct bulk_channel *channel,
102                          struct bulk_continuation cont);
103static errval_t impl_assign_pool(struct bulk_channel *channel,
104                                 struct bulk_pool *pool,
105                                 struct bulk_continuation cont);
106static errval_t impl_move(struct bulk_channel *channel,
107                          struct bulk_buffer *buffer,
108                          void *meta,
109                          struct bulk_continuation cont);
110static errval_t impl_pass(struct bulk_channel *channel,
111                          struct bulk_buffer *buffer,
112                          void *meta,
113                          struct bulk_continuation cont);
114static errval_t impl_copy(struct bulk_channel *channel,
115                          struct bulk_buffer *buffer,
116                          void *meta,
117                          struct bulk_continuation cont);
118static errval_t impl_release(struct bulk_channel *channel,
119                             struct bulk_buffer *buffer,
120                             struct bulk_continuation cont);
121
122static struct bulk_implementation implementation = {
123    .channel_create = impl_create,
124    .channel_bind = impl_bind,
125    .assign_pool = impl_assign_pool,
126    .move = impl_move,
127    .pass = impl_pass,
128    .copy = impl_copy,
129    .release = impl_release, };
130
131static errval_t init_channel(struct bulk_channel *channel)
132{
133    struct local_channel *l = malloc(sizeof(*l));
134    if (l == NULL) {
135        return BULK_TRANSFER_MEM;
136    }
137
138    channel->impl_data = l;
139    event_queue_init(&l->events, channel->waitset, EVENT_QUEUE_CONTINUOUS);
140    return SYS_ERR_OK;
141}
142
143/* ----------------------- event management --------------------------------*/
144
145/**
146 * allocates a new event
147 */
148static errval_t event_alloc(struct bulk_channel *channel,
149                            struct local_event **ev,
150                            void (*handler)(struct local_event *),
151                            size_t extra)
152{
153    *ev = malloc(sizeof(struct local_event) + extra);
154    if (*ev == NULL) {
155        return BULK_TRANSFER_MEM;
156    }
157
158    (*ev)->channel = channel;
159    (*ev)->handler = handler;
160    return SYS_ERR_OK;
161}
162
163/**
164 * enqueues the local event to the event queue of the channel
165 */
166static void event_enqueue(struct local_event *lev)
167{
168    struct local_channel *local = lev->channel->impl_data;
169    event_queue_add(&local->events, &lev->eqn, MKCLOSURE(event_handler, lev));
170}
171
172/* ========================= EVENT HANDLERS ================================ */
173
174/* ------------------------- event handlers -------------------------------- */
175
176/**
177 *
178 */
179static void event_handler(void *arg)
180{
181    struct local_event *lev = arg;
182    lev->handler(lev);
183    free(lev);
184}
185
186static void event_op_done(struct local_event *lev)
187{
188    EVENT_DEBUG_TRACE
189    if (lev->cont.handler) {
190        lev->cont.handler(NULL, SYS_ERR_OK, lev->channel);
191    } else {
192        EVENT_DEBUG("event_op_done(): no handler set...\n");
193    }
194    free(lev);
195}
196
197/* -------------------------- event bind ----------------------------------- */
198
199/**
200 * Gets called when the binding procedure is over.
201 *
202 * Side: Binding Domain.
203 */
204static void event_bind_done(struct local_event *lev)
205{
206    EVENT_DEBUG_TRACE;
207    assert(lev);
208
209    if (lev->cont.handler == NULL) {
210        EVENT_DEBUG("%s", "handler not set");
211        return;
212    }
213
214    lev->cont.handler(lev->cont.arg, lev->params.bind_done.err, lev->channel);
215}
216
217/**
218 * Gets called when a bind request has been received
219 *
220 * Side: Creating Side
221 */
222static void event_bind_received(struct local_event *lev)
223{
224    EVENT_DEBUG_TRACE
225
226    errval_t err, reply = SYS_ERR_OK;
227    struct local_event *ev;
228    struct local_channel *l = lev->channel->impl_data;
229
230    /* do the callback to the application to inform about the binding */
231    if (lev->channel->callbacks->bind_received) {
232        err = lev->channel->callbacks->bind_received(lev->channel);
233        if (err_is_fail(err)) {
234            reply = err;
235        }
236    } else {
237        /* XXX: or if no cb set, just say SYS_ERR_OK ? */
238        reply = BULK_TRANSFER_NO_CALLBACK;
239    }
240
241    /* allocate and trigger event bind_done */
242    err = event_alloc(l->other, &ev, event_bind_done, 0);
243    if (!err_is_fail(err)) {
244        ev->params.bind_done.err = reply;
245        ev->cont = lev->cont;
246        event_enqueue(ev);
247    }
248}
249
250/**
251 * Implementation specific bind procedure
252 *
253 * Side: Binding Side
254 */
255static errval_t impl_bind(struct bulk_channel *channel,
256                          struct bulk_continuation cont)
257{
258    errval_t err;
259    struct local_channel *l, *o_l;
260    struct bulk_local_endpoint *ep;
261    struct local_event *ev;
262
263    /* Initialize the channel */
264    err = init_channel(channel);
265    if (err_is_fail(err)) {
266        return err;
267    }
268
269    /* setting the pointers to the other channel */
270    ep = (struct bulk_local_endpoint *) channel->ep;
271    l = channel->impl_data;
272    l->other = ep->other_channel;
273    o_l = l->other->impl_data;
274    o_l->other = channel;
275
276    /* set channel parameters from the other side */
277    channel->role = bulk_role_other(l->other->role);
278    channel->direction = bulk_direction_other(l->other->role);
279    channel->meta_size = l->other->meta_size;
280
281    /* update the channel state */
282    channel->state = BULK_STATE_CONNECTED;
283    l->other->state = BULK_STATE_CONNECTED;
284
285    /* allocate and trigger the bind event to the other side */
286    err = event_alloc(l->other, &ev, event_bind_received, 0);
287    if (err_is_fail(err)) {
288        goto error;
289    }
290
291    // Trigger first event
292    ev->cont = cont;
293    event_enqueue(ev);
294
295    return SYS_ERR_OK;
296
297    error: free(l);
298
299    return err;
300}
301
302/* -------------------------- event pool assign ---------------------------- */
303
304/**
305 * Gets called when the pool assignment on the other side is completed
306 *
307 * Side: Assigning Side
308 */
309static void event_pool_assigned(struct local_event *lev)
310{
311    EVENT_DEBUG_TRACE
312
313    errval_t err;
314    errval_t result = lev->params.pool_assigned.err;
315
316    if (lev->cont.handler) {
317        if (!err_is_fail(result)) {
318            err = bulk_pool_assign(lev->params.pool_assigned.pool,
319                                   lev->channel);
320            if (err_is_fail(err)) {
321                result = err;
322            }
323        }
324
325        EVENT_DEBUG(" > [%s]", (result==SYS_ERR_OK) ? "Success", "Failure");
326
327        /* call the continuation */
328        lev->cont.handler(lev->params.pool_assigned.pool, result, lev->channel);
329    } else {
330        EVENT_DEBUG("%s", "continuation handler not set");
331    }
332}
333
334/**
335 * Gets called when a pool is assigned to the channel
336 *
337 * Side: Other
338 */
339static void event_pool_assign(struct local_event *lev)
340{
341    EVENT_DEBUG_TRACE
342
343    errval_t err;
344    errval_t assigned;
345    struct local_event *ev;
346
347    struct bulk_pool *pool = lev->params.pool_assigned.pool;
348    struct bulk_channel *chan = lev->channel;
349    struct local_channel *l = chan->impl_data;
350
351    if (bulk_pool_is_assigned(pool, chan)) {
352        /* channel is already assigned */
353        EVENT_DEBUG("pool [%p] is already assigned to channel.", pool);
354        err = event_alloc(l->other, &ev, event_pool_assigned, 0);
355        if (!err_is_fail(err)) {
356            ev->params.pool_assigned.err = BULK_TRANSFER_POOL_ALREADY_ASSIGNED;
357            ev->cont = lev->cont;
358            event_enqueue(ev);
359        }
360        return;
361    }
362
363    /* allocate the structures for the pool */
364    err = bulk_pool_alloc_with_id(&pool, pool->num_buffers, pool->buffer_size,
365                                  pool->id);
366    if (err_is_fail(err)) {
367        USER_PANIC_ERR(err, "Failed to allocate pool struct\n");
368        return;
369    }
370
371    /*
372     * prepare the cap
373     */
374    if (lev->params.pool_assigned.pool->trust == BULK_TRUST_FULL) {
375        err = slot_alloc(&pool->pool_cap);
376        if (err_is_fail(err)) {
377            EVENT_DEBUG("could not allocate a new slot for the cap: %s",
378                            err_getstring(err));
379            assigned = err;
380            goto done;
381        }
382
383        err = cap_copy(pool->pool_cap,
384                       lev->params.pool_assigned.pool->pool_cap);
385        if (err_is_fail(err)) {
386            EVENT_DEBUG("could not allocate a new slot for the cap: %s",
387                            err_getstring(err));
388            assigned = err;
389            goto done;
390        }
391    }
392
393    pool->trust = lev->params.pool_assigned.pool->trust;
394
395    /*
396     * XXX: we set the trust level to none here to avoid the creation of
397     *      the buffer caps. These have already been created and cannot
398     *      be created a second time. [SYS_ERR_REVOKE_FIRST]
399     */
400
401    err = bulk_pool_map(pool);
402    if (err_is_fail(err)) {
403        assigned = err;
404        goto done;
405    }
406
407    assert(lev->channel->callbacks->pool_assigned);
408    if (lev->channel->callbacks->pool_assigned) {
409        assigned = lev->channel->callbacks->pool_assigned(lev->channel, pool);
410    } else {
411        /* XXX: or if no cb set, just say SYS_ERR_OK ? */
412        assigned = BULK_TRANSFER_NO_CALLBACK;
413    }
414    done:
415
416    if (err_is_fail(assigned)) {
417        bulk_pool_unmap(pool);
418        bulk_pool_dealloc(pool);
419    } else {
420        err = bulk_pool_assign(pool, lev->channel);
421        if (err_is_fail(err)) {
422            USER_PANIC_ERR(err, "Failed to assign the pool to the channel\n");
423        }
424    }
425
426    err = event_alloc(l->other, &ev, event_pool_assigned, 0);
427    if (!err_is_fail(err)) {
428        ev->params.pool_assigned.err = assigned;
429        ev->params.pool_assigned.pool = lev->params.pool_assigned.pool;
430        ev->cont = lev->cont;
431        event_enqueue(ev);
432    }
433
434}
435
436/**
437 * Implementation specific handler for pool assing requests
438 *
439 * Side: Assigning Side
440 */
441static errval_t impl_assign_pool(struct bulk_channel *channel,
442                                 struct bulk_pool *pool,
443                                 struct bulk_continuation cont)
444{
445    errval_t err;
446    struct local_event *ev;
447    struct local_channel *l = channel->impl_data;
448
449    /* allocate and trigger the event */
450    err = event_alloc(l->other, &ev, event_pool_assign, 0);
451    if (!err_is_fail(err)) {
452        ev->params.pool_assigned.pool = pool;
453        ev->cont = cont;
454        event_enqueue(ev);
455    }
456    return err;
457}
458
459/* -------------------------- event buffer move ---------------------------- */
460
461/**
462 * Gets called when a buffer arrives via move operation
463 *
464 * Side: Receiving Side (SINK)
465 */
466static void event_move_received(struct local_event *lev)
467{
468    errval_t err;
469
470    struct bulk_pool *pool = bulk_pool_get(&lev->params.move_received.pool_id,
471                                           lev->channel);
472    size_t bufid = lev->params.copy_released.buf_id;
473
474    EVENT_DEBUG("  > pool=%p, bufid=%x", pool, (unsigned int )bufid);
475    assert(pool);
476
477    struct bulk_buffer *buf = pool->buffers[bufid];
478
479    err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
480    if (err_is_fail(err)) {
481        USER_PANIC_ERR(err, "could not change the state of the buffer.");
482    }
483
484    if (lev->channel->callbacks->move_received) {
485        lev->channel->callbacks->move_received(lev->channel, buf,
486                                               lev->params.move_received.meta);
487    }
488}
489
490/**
491 * Implementation specific handler of the buffer move operation
492 *
493 * Side: Sending Side (SOURCE)
494 */
495static errval_t impl_move(struct bulk_channel *channel,
496                          struct bulk_buffer *buffer,
497                          void *meta,
498                          struct bulk_continuation cont)
499{
500    errval_t err;
501    struct local_event *ev, *ev2;
502
503    struct local_channel *l = channel->impl_data;
504
505    void *m;
506
507    IMPL_DEBUG("  > buffer=%p", buffer->address);
508
509    /* trigger event to other channel */
510    size_t meta_size = 0;
511    if (meta) {
512        meta_size = channel->meta_size;
513    }
514    err = event_alloc(l->other, &ev, event_move_received, meta_size);
515    if (!err_is_fail(err)) {
516        ev->params.move_received.meta = NULL;
517        if (meta) {
518            /* copy the meta data */
519            m = ev + 1;
520            memcpy(m, meta, channel->meta_size);
521            ev->params.move_received.meta = m;
522        }
523        /* set parameters of the event */
524        ev->params.move_received.pool_id = buffer->pool->id;
525        ev->params.move_received.buf_id = ((lvaddr_t) buffer->address
526                        - buffer->pool->base_address)
527                        / buffer->pool->buffer_size;
528        ev->params.move_received.buffer = buffer;
529        event_enqueue(ev);
530    }
531
532    /* trigger operation done event to this channel */
533    err = event_alloc(channel, &ev2, event_op_done, 0);
534    if (!err_is_fail(err)) {
535        ev2->cont = cont;
536        event_op_done(ev2);
537    }
538
539    return err;
540}
541
542/* -------------------------- event buffer pass ---------------------------- */
543
544/**
545 * Gets called when a buffer pass event occurs on the sending side
546 *
547 * Side: Sending Side (SOURCE)
548 */
549static void event_buffer_received(struct local_event *lev)
550{
551    errval_t err;
552
553    struct bulk_pool *pool = bulk_pool_get(&lev->params.buffer_received.pool_id,
554                                           lev->channel);
555    assert(pool);
556
557    size_t bufid = lev->params.buffer_received.buf_id;
558    struct bulk_buffer *buf = pool->buffers[bufid];
559
560    assert(bufid < pool->num_buffers);
561
562    EVENT_DEBUG("  > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid);
563
564    /* we need to change the state of the buffer */
565    err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
566    if (err_is_fail(err)) {
567        /* TODO: error handling */
568        USER_PANIC_ERR(err, "could not change the state of the buffer.");
569    }
570
571    /* inform the application */
572    if (lev->channel->callbacks->buffer_received) {
573        lev->channel->callbacks->buffer_received(
574                        lev->channel, buf, lev->params.buffer_received.meta);
575    }
576}
577
578/**
579 * Backend specific handler for buffer pass operations
580 *
581 * Side: Receiving Side (SINK)
582 */
583static errval_t impl_pass(struct bulk_channel *channel,
584                          struct bulk_buffer *buffer,
585                          void *meta,
586                          struct bulk_continuation cont)
587{
588    errval_t err;
589    struct local_event *ev, *ev2;
590    void *m;
591    struct local_channel *l = channel->impl_data;
592
593    IMPL_DEBUG("  > buffer=%p", buffer->address);
594
595    size_t meta_size = 0;
596    if (meta) {
597        meta_size = channel->meta_size;
598    }
599    /* allocate and trigger event */
600    err = event_alloc(l->other, &ev, event_buffer_received, meta_size);
601    if (!err_is_fail(err)) {
602        ev->params.buffer_received.meta = NULL;
603
604        if (meta) {
605            /* copy meta data */
606            m = ev + 1;
607            memcpy(m, meta, channel->meta_size);
608            ev->params.buffer_received.meta = m;
609
610        }
611        /* set event params */
612        ev->params.buffer_received.pool_id = buffer->pool->id;
613        ev->params.buffer_received.buf_id = ((lvaddr_t) buffer->address
614                        - buffer->pool->base_address)
615                        / buffer->pool->buffer_size;
616        ev->params.buffer_received.buffer = buffer;
617        event_enqueue(ev);
618    }
619
620    /* trigger op done event */
621    err = event_alloc(channel, &ev2, event_op_done, 0);
622    if (!err_is_fail(err)) {
623        ev2->cont = cont;
624        event_op_done(ev2);
625    }
626    return err;
627}
628
629/* -------------------------- event buffer copy ---------------------------- */
630
631static void event_copy_received(struct local_event *lev)
632{
633    struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_received.pool_id,
634                                           lev->channel);
635    size_t bufid = lev->params.copy_released.buf_id;
636    EVENT_DEBUG("  > pool=%p, bufid=%x", pool, (unsigned int )bufid);
637    assert(pool);
638
639    struct bulk_buffer *buf = pool->buffers[bufid];
640
641    errval_t err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_ONLY);
642    if (err_is_fail(err)) {
643        /* TODO: error handling */
644        USER_PANIC_ERR(err, "failed to change the state");
645    }
646    if (lev->channel->callbacks->copy_received) {
647        lev->channel->callbacks->copy_received(lev->channel, buf,
648                                               lev->params.copy_received.meta);
649
650    }
651}
652
653static errval_t impl_copy(struct bulk_channel *channel,
654                          struct bulk_buffer *buffer,
655                          void *meta,
656                          struct bulk_continuation cont)
657{
658    IMPL_DEBUG("  > buffer=%p", buffer->address);
659    struct local_channel *l = channel->impl_data;
660    struct local_event *ev, *ev2;
661    void *m;
662    errval_t err;
663    size_t meta_size = 0;
664    if (meta) {
665        meta_size = channel->meta_size;
666    }
667    err = event_alloc(l->other, &ev, event_copy_received, meta_size);
668    if (!err_is_fail(err)) {
669        ev->params.copy_received.meta = NULL;
670        if (meta) {
671            m = ev + 1;
672            memcpy(m, meta, channel->meta_size);
673            ev->params.copy_received.meta = m;
674        }
675        ev->params.copy_received.buffer = buffer;
676        ev->params.copy_received.pool_id = buffer->pool->id;
677        ev->params.copy_received.buf_id = ((lvaddr_t) buffer->address
678                        - buffer->pool->base_address)
679                        / buffer->pool->buffer_size;
680        event_enqueue(ev);
681    }
682
683    /* trigger op done event */
684    err = event_alloc(channel, &ev2, event_op_done, 0);
685    if (!err_is_fail(err)) {
686        ev2->cont = cont;
687        event_op_done(ev2);
688    }
689    return err;
690
691}
692
693/* -------------------------- event copy release --------------------------- */
694
695/**
696 * Gets called when a copy release event occurred
697 *
698 * Side: Sending Side (SOURCE)
699 */
700static void event_copy_released(struct local_event *lev)
701{
702    errval_t err;
703
704    struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_released.pool_id,
705                                           lev->channel);
706    assert(pool);
707
708    size_t bufid = lev->params.copy_released.buf_id;
709    struct bulk_buffer *buf = pool->buffers[bufid];
710
711    buf->local_ref_count--;
712
713    EVENT_DEBUG("  > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid);
714
715    /* change the state of the buffer */
716    if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) {
717        err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
718        if (err_is_fail(err)) {
719            USER_PANIC_ERR(err, "failed to change the state");
720        }
721    }
722
723    /* inform the application */
724    if (lev->channel->callbacks->copy_released) {
725        lev->channel->callbacks->copy_released(lev->channel, buf);
726    }
727}
728
729/**
730 *
731 */
732static errval_t impl_release(struct bulk_channel *channel,
733                             struct bulk_buffer *buffer,
734                             struct bulk_continuation cont)
735{
736    struct local_event *ev, *ev2;
737    errval_t err;
738    struct local_channel *l = channel->impl_data;
739
740    IMPL_DEBUG("  > buffer=%p", buffer->address);
741
742    /* allocate and trigger event */
743    err = event_alloc(l->other, &ev, event_copy_released, 0);
744    if (!err_is_fail(err)) {
745        ev->params.copy_released.buffer = buffer;
746        ev->params.copy_released.pool_id = buffer->pool->id;
747        ev->params.copy_released.buf_id = ((lvaddr_t) buffer->address
748                        - buffer->pool->base_address)
749                        / buffer->pool->buffer_size;
750
751        event_enqueue(ev);
752    }
753
754    /* trigger op done event */
755    err = event_alloc(channel, &ev2, event_op_done, 0);
756    if (!err_is_fail(err)) {
757        ev2->cont = cont;
758        event_op_done(ev2);
759    }
760    return err;
761}
762
763/* -------------------------- channel creation ----------------------------- */
764
765/**
766 * Implementation specific handler for channel creation
767 */
768static errval_t impl_create(struct bulk_channel *channel)
769{
770    return init_channel(channel);
771}
772
773/**
774 * initializes the local endpoint
775 */
776void bulk_local_init_endpoint(struct bulk_local_endpoint *endpoint,
777                              struct bulk_channel *other_channel)
778{
779    endpoint->generic.f = &implementation;
780    endpoint->other_channel = other_channel;
781}
782
783