1/**
2 * \file
3 * \brief Unidirectional bulk data transfer via shared memory
4 */
5
6/*
7 * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <barrelfish/barrelfish.h>
16
17#include <bulk_transfer/bulk_transfer.h>
18#include <bulk_transfer/bulk_sm.h>
19
20#include "../../helpers.h"
21#include "bulk_sm_impl.h" // XXX legal? where to put impl headers?
22
23
24struct bulk_implementation bulk_sm_implementation = {
25    .channel_create  = bulk_sm_channel_create,
26    .channel_bind    = bulk_sm_channel_bind,
27    .channel_destroy = bulk_sm_channel_destroy,
28    .assign_pool     = bulk_sm_assign_pool,
29    .remove_pool     = bulk_sm_remove_pool,
30    .move            = bulk_sm_move,
31    .copy            = bulk_sm_copy,
32    .release         = bulk_sm_release,
33    .pass            = bulk_sm_pass,
34    .request         = NULL, // supported?
35};
36
37struct bulk_implementation *bulk_sm_get_implementation(void)
38{
39    return &bulk_sm_implementation;
40}
41
42struct bulk_ctrl_rx_vtbl bulk_ctrl_rx_vtbl = {
43    .negotiate_call       = bulk_sm_channel_negotiate_rx_call,
44    .negotiate_response   = bulk_sm_channel_negotiate_rx_reply,
45    .assign_pool_call     = bulk_sm_assign_pool_rx_call,
46    .assign_pool_response = bulk_sm_assign_pool_rx_response,
47    .move_untrusted_call  = bulk_sm_move_rx_call,
48    .move_trusted_call    = bulk_sm_move_trusted_rx_call,
49    .move_response        = bulk_sm_move_rx_response,
50    .copy_untrusted_call  = bulk_sm_copy_rx_call,
51    .copy_trusted_call    = bulk_sm_copy_trusted_rx_call,
52    .copy_response        = bulk_sm_copy_rx_response,
53    .pass_untrusted_call  = bulk_sm_pass_rx_call,
54    .pass_trusted_call    = bulk_sm_pass_trusted_rx_call,
55    .pass_response        = bulk_sm_pass_rx_response,
56    .release_call         = bulk_sm_release_rx_call,
57    .release_response     = bulk_sm_release_rx_response,
58};
59
60// Channel Management -----------------------------------------------------
61
62// Functions involved (C = Creator, B = Binder) (+ = public interface)
63// ==================
64//
65// Endpoint creation:
66//   C + bulk_sm_ep_create
67//
68// Channel creation
69//   C + bulk_sm_channel_create
70//   C   bulk_sm_channel_create_cb             (Flounder: if.export_cb)
71//
72// Channel binding 1: establish flounder channel
73//   B + bulk_sm_channel_bind
74//   C   bulk_sm_channel_connect               (Flounder: if.connect_cb)
75//   B   bulk_sm_channel_bind_cb               (Flounder: if.bind_cb)
76//
77// Channel binding 2: negotiate direction, role and trust level
78//   B   bulk_sm_channel_negotiate
79//   C   bulk_sm_channel_negotiate_rx_call     (Flounder: if.rx_bind_negotiate_call)
80//   C   bulk_sm_channel_negotiate_send_reply
81//   C   bulk_sm_channel_negotiate_replied     (Flounder: if.send_bind_negotiate_response_cb)
82//   B   bulk_sm_channel_negotiate_rx_reply    (Flounder: if.rx_bind_negotiate_response)
83//
84// Generalized functions to be used by mulitples (Flounder Helpers):
85//   bulk_sm_flounder_msg_sent_debug_cb()
86
87// Channel binding 2: negotiate direction, role and trust level -----------
88
89struct bulk_sm_properties {
90    enum bulk_channel_role      role;
91    enum bulk_trust_level       trust;
92};
93
94void bulk_sm_channel_negotiate_rx_reply(
95        struct bulk_ctrl_binding   *b,
96        uint64_t                   error,
97        enum bulk_ctrl_direction_t match_direction,
98        enum bulk_ctrl_role_t      match_role,
99        uint64_t                   meta_size)
100{
101    assert( sizeof(errval_t) == sizeof(uint64_t) );
102    errval_t err = (errval_t) error;
103
104    struct bulk_channel      *channel = VOID2CHANNEL(b->st);
105    struct bulk_sm_impl_data *data    = CHANNEL_DATA(channel);
106
107    if (err_is_ok(err)) {
108        channel->direction = flounder2bulk_direction(match_direction);
109        channel->role      = flounder2bulk_role(match_role);
110        channel->meta_size = meta_size;
111        channel->state     = BULK_STATE_CONNECTED;
112    } else {
113        channel->state = BULK_STATE_UNINITIALIZED;
114    }
115
116    // notify user
117    bulk_continuation_call(data->bind_cont, err, channel);
118}
119
120static void bulk_sm_channel_negotiate_replied(void *a)
121{
122    /*
123    struct bulk_channel *channel = VOID2CHANNEL(a);
124
125    if (channel->state == BULK_STATE_BIND_NEGOTIATE) {
126        // negotiate was successful
127        channel->state = BULK_STATE_CONNECTED;
128
129        if (channel->callbacks->bind_received) {
130            channel->callbacks->bind_received(channel);
131        }
132    } else {
133        // negotiation failed. go back to wait for binding
134        channel->state = BULK_STATE_INITIALIZED;
135    }
136    */
137}
138
139static errval_t bulk_sm_channel_negotiate_send_reply(void *a)
140{
141    struct bulk_channel      *channel = VOID2CHANNEL(a);
142    struct bulk_sm_impl_data *data    = CHANNEL_DATA(channel);
143    struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
144
145    struct event_closure txcont = MKCONT(bulk_sm_channel_negotiate_replied,
146            channel);
147
148    bulk_ctrl_direction_t peer_direction =
149            bulk2flounder_direction(bulk_direction_other(channel->direction));
150    bulk_ctrl_role_t peer_role =
151            bulk2flounder_role(bulk_role_other(channel->role));
152
153    errval_t err = bulk_ctrl_negotiate_response__tx(b, txcont,
154            data->bind_error, peer_direction, peer_role, channel->meta_size);
155
156    if (err_is_ok(err)) {
157        // set new channel state. don't do this in
158        // bulk_sm_channel_negotiate_replied.
159        // Reason: when peer receives negotiate_reply, binding is done. If
160        // peer then no longer dispatches events on the waitset, we never get
161        // the above notification.
162
163        if (channel->state == BULK_STATE_BIND_NEGOTIATE) {
164            // negotiate was successful
165            channel->state = BULK_STATE_CONNECTED;
166
167            if (channel->callbacks->bind_received) {
168                channel->callbacks->bind_received(channel);
169            }
170        } else {
171            // negotiation failed. go back to wait for binding
172            channel->state = BULK_STATE_INITIALIZED;
173        }
174    }
175
176    return err;
177}
178
179void bulk_sm_channel_negotiate_rx_call(
180        struct bulk_ctrl_binding   *b,
181        enum bulk_ctrl_role_t      role,
182        enum bulk_ctrl_trust_t     trust)
183{
184    struct bulk_channel      *channel = VOID2CHANNEL(b->st);
185    struct bulk_sm_impl_data *data    = CHANNEL_DATA(channel);
186
187    assert(channel->state == BULK_STATE_BIND_NEGOTIATE);
188
189    // helper structs
190    struct bulk_sm_properties me = {
191        .role      = channel->role,
192        .trust     = channel->trust,
193    };
194
195    struct bulk_sm_properties peer = {
196        .role      = flounder2bulk_role(role),
197        .trust     = flounder2bulk_trust(trust),
198    };
199
200    // Let's decide on the properties.
201    bool valid = true;
202
203    if (me.role == BULK_ROLE_GENERIC) {
204        if (peer.role == BULK_ROLE_GENERIC) {
205            me.role   = BULK_ROLE_MASTER;
206            peer.role = BULK_ROLE_SLAVE;
207        } else {
208            me.role   = bulk_role_other(peer.role);
209        }
210    } else {
211        if (peer.role == BULK_ROLE_GENERIC) {
212            peer.role = bulk_role_other(me.role);
213        } else {
214            valid = valid && (me.role == bulk_role_other(peer.role));
215        }
216    }
217
218    valid = valid && (bulk_trust_compare(me.trust, peer.trust) == 0);
219
220    // Successful?
221    if (valid) {
222        // update possibly updated role
223        channel->role = me.role;
224        data->bind_error = SYS_ERR_OK;
225    } else {
226        // reset binding state
227        channel->state = BULK_STATE_BINDING;
228        data->bind_error = BULK_TRANSFER_CHAN_BIND;
229    }
230
231    bulk_sm_flounder_send_fifo_msg(channel,
232                                   bulk_sm_channel_negotiate_send_reply);
233}
234
235static errval_t bulk_sm_channel_negotiate(void *a)
236{
237    struct bulk_channel      *channel = VOID2CHANNEL(a);
238    struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
239
240    assert(channel->state == BULK_STATE_BIND_NEGOTIATE);
241
242    struct event_closure txcont = MKCONT(bulk_sm_flounder_msg_sent_debug_cb,
243            "bulk_sm_channel_negotiate sent");
244
245    errval_t err = bulk_ctrl_negotiate_call__tx(b, txcont,
246            bulk2flounder_role(channel->role),
247            bulk2flounder_trust(channel->trust)
248            );
249
250    return err;
251}
252
253// Channel binding 1: establish flounder channel --------------------------
254
255static void bulk_sm_channel_bind_cb(void                     *st,
256                                    errval_t                 err,
257                                    struct bulk_ctrl_binding *b)
258{
259    struct bulk_channel      *channel = VOID2CHANNEL(st);
260    struct bulk_sm_impl_data *data    = CHANNEL_DATA(channel);
261    assert(channel);
262
263    assert(err_is_ok(err)); // current implementation doesn't generate failure
264
265    // mutual pointers
266    b->rx_vtbl = bulk_ctrl_rx_vtbl;
267    b->st      = channel;
268    data->b    = b;
269
270    // channel update
271    channel->state     = BULK_STATE_BIND_NEGOTIATE;
272
273    // Flounder channel established. let's negotiate channel properties
274    bulk_sm_flounder_send_fifo_msg(channel, bulk_sm_channel_negotiate);
275}
276
277static errval_t bulk_sm_channel_connect(void                     *st,
278                                        struct bulk_ctrl_binding *b)
279{
280    struct bulk_channel *channel = VOID2CHANNEL(st);
281    assert(channel);
282
283    struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
284
285    // mutual pointers
286    b->rx_vtbl       = bulk_ctrl_rx_vtbl;
287    b->error_handler = bulk_sm_error_handler_debug;
288    b->st            = channel;
289    data->b          = b;
290
291    // channel update
292    channel->state     = BULK_STATE_BIND_NEGOTIATE;
293
294    // Let binding side advance channel state and start negotiate properties.
295    return SYS_ERR_OK;
296}
297
298errval_t bulk_sm_channel_bind(struct bulk_channel      *channel,
299                              struct bulk_continuation cont)
300{
301    assert(channel);
302    assert(channel->state == BULK_STATE_UNINITIALIZED);
303    assert(channel->waitset);
304    assert(channel->ep);
305
306    struct bulk_sm_endpoint_descriptor *ep   = CHANNEL_EP(channel);
307
308    assert(ep->state == BULK_EPSTATE_IREF_EXPORTED);
309
310    // allocate implementation-specific data
311    struct bulk_sm_impl_data *data = malloc(sizeof(struct bulk_sm_impl_data));
312    channel->impl_data = data;
313    if (!data) {
314        return BULK_TRANSFER_MEM;
315    }
316    data->root = NULL;
317    thread_mutex_init(&data->mutex);
318
319    thread_mutex_init(&data->resend_lock);
320    data->resend_closure = NULL;
321
322    // Bind to iref
323    errval_t err = bulk_ctrl_bind(ep->iref,
324                                  bulk_sm_channel_bind_cb,
325                                  channel,
326                                  channel->waitset,
327                                  IDC_EXPORT_FLAGS_DEFAULT);
328
329    if (err_is_fail(err)) {
330        DEBUG_ERR(err, "bulk_sm_channel_bind");
331        free(channel->impl_data);
332        channel->impl_data = NULL;
333        return BULK_TRANSFER_CHAN_BIND;
334    }
335
336    data->bind_cont = cont;
337    channel->state = BULK_STATE_BINDING;
338
339    return SYS_ERR_OK;
340}
341
342// Channel Creation -------------------------------------------------------
343
344static void bulk_sm_channel_create_cb(void *st, errval_t err, iref_t iref)
345{
346    struct bulk_channel *channel = VOID2CHANNEL(st);
347
348    assert(channel);
349
350    struct bulk_sm_endpoint_descriptor *ep = CHANNEL_EP(channel);
351
352    assert(ep);
353    assert(ep->state == BULK_EPSTATE_CREATED);
354
355    ep->iref  = iref;
356    ep->err   = err;
357    ep->state = BULK_EPSTATE_IREF_EXPORTED;
358}
359
360errval_t bulk_sm_channel_create(struct bulk_channel *channel)
361{
362    // We cannot use bulk_continuation here as we do not have a channel.
363    // Given the interface, we cannot take a barrelfish-style continuation.
364    // Mixing different continuation styles in the same library is ugly anyway.
365    // Thus, this call is blocking.
366
367    assert(channel);
368    assert(channel->state == BULK_STATE_UNINITIALIZED);
369    assert(channel->waitset);
370    assert(channel->ep);
371
372    struct bulk_sm_endpoint_descriptor *ep = CHANNEL_EP(channel);
373
374    assert(ep->state == BULK_EPSTATE_CREATED); // interface not yet exported
375
376    // export interface and bind iref
377    ep->err = bulk_ctrl_export(channel,
378                               bulk_sm_channel_create_cb,
379                               bulk_sm_channel_connect,
380                               channel->waitset,
381                               IDC_EXPORT_FLAGS_DEFAULT);
382
383    if (err_is_fail(ep->err)) {
384        DEBUG_ERR(ep->err, "bulk_sm_channel_create");
385        return BULK_TRANSFER_CHAN_CREATE;
386    }
387
388    // wait for export to finish
389    while (ep->state != BULK_EPSTATE_IREF_EXPORTED) {
390        // need to dispatch both, channel waitset and default waitset.
391        // explanation very much appreciated.
392        // (export uses default?)
393
394        struct bulk_sm_ws_item ws_list[2];
395        ws_list[0].ws   = get_default_waitset();
396        ws_list[1].ws   = channel->waitset;
397        ws_list[0].next = &ws_list[1];
398        ws_list[1].next = NULL;
399
400        errval_t err = bulk_sm_multiple_event_dispatch(ws_list);
401        if (err_is_fail(err)) {
402            USER_PANIC_ERR(err, "bulk_sm_channel_create: event_dispatch");
403        }
404    }
405
406    if (err_is_fail(ep->err)) {
407        DEBUG_ERR(ep->err, "bulk_sm_channel_create");
408        return BULK_TRANSFER_CHAN_CREATE;
409    }
410
411    // allocate implementation-specific data
412    struct bulk_sm_impl_data *data = malloc(sizeof(struct bulk_sm_impl_data));
413    channel->impl_data = data;
414    if (!data) {
415        return BULK_TRANSFER_MEM;
416    }
417    data->root = NULL;
418    thread_mutex_init(&data->mutex);
419
420    thread_mutex_init(&data->resend_lock);
421    data->resend_closure = NULL;
422
423    // channel initialized
424    channel->state = BULK_STATE_INITIALIZED;
425    return SYS_ERR_OK;
426}
427
428// Channel destroyal ------------------------------------------------------
429
430errval_t bulk_sm_channel_destroy(struct bulk_channel *channel)
431{
432    assert(!"NYI");
433    return SYS_ERR_OK;
434}
435