1/*
2 * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11
12#include <bulk_transfer/bulk_transfer.h>
13#include <bulk_transfer/bulk_sm.h>
14
15#include "bulk_sm_impl.h"
16#include "../../bulk_pool.h"
17
18#if 0
19#define BULK_FH_MSG(fmt, msg...) debug_printf("[%03u: "fmt"]\n", \
20        disp_get_domain_id(), msg)
21#else
22#define BULK_FH_MSG(fmt, msg...)
23#endif
24
25void bulk_sm_error_handler_debug(struct bulk_ctrl_binding *_binding, errval_t err)
26{
27    DEBUG_ERR(err, "BulkTransfer@SHM: async error");
28}
29
30void bulk_sm_flounder_msg_sent_debug_cb(void *a)
31{
32    BULK_FH_MSG("flounder_sent_cb : %s", (char*)a);
33}
34
35// called when we can send message again.
36static void bulk_sm_flounder_resend_handler(void *a)
37{
38    // debug_printf("======== RESEND HANDLER ==============================\n");
39    struct bulk_channel      *channel = VOID2CHANNEL(a);
40    struct waitset           *ws      = channel->waitset;
41    struct bulk_sm_impl_data *data    = CHANNEL_DATA(channel);
42    struct bulk_ctrl_binding *b       = CHANNEL_BINDING(channel);
43    errval_t err = SYS_ERR_OK;
44
45    // <-- BEGIN LOCK
46    thread_mutex_lock(&data->resend_lock);
47
48    // dequeue first element
49    struct bulk_sm_resend_item *item = data->resend_closure;
50
51    if (!item) {
52        USER_PANIC("bulk_sm_flounder_resend_handler called "
53                   "but no message to send.\n");
54    }
55    // Call the registered send function
56    BULK_FH_MSG("Dispatching resend item : %p(%p) -> %p",
57            item->event.handler, item->event.arg, item);
58    struct event_closure ev = item->event;
59
60    if (ev.handler) {
61        //XXX: we expect the handler to be a simple function that just tries to
62        //send a message and nothing else. we still have the lock while calling
63        errval_t (*f)(void*) = (errval_t (*)(void*)) ev.handler;
64        err = f(ev.arg);
65    }
66    if (err_is_ok(err)){
67        data->resend_closure = item->next;//remove item from list
68        free(item);
69    } else if (err_no(err) == FLOUNDER_ERR_TX_BUSY){
70        //handler failed, keep item in front of list (to preserve handler order)
71    } else {
72        DEBUG_ERR(err, "not trying to resend\n");
73        data->resend_closure = item->next;//remove item from list
74        free(item);
75    }
76
77    // if more messages, reregister resend handler
78    if (data->resend_closure) {
79        err = b->register_send(b, ws,
80                            MKCONT(bulk_sm_flounder_resend_handler, channel));
81        if (err_is_fail(err)) {
82            // somebody else already registered a resend handler.
83            // we cannot tolerate this.
84            USER_PANIC_ERR(BULK_TRANSFER_SM_EXCLUSIVE_WS,
85                    "bulk_sm_flounder_resend_msg_with_arg");
86        }
87    }
88
89    thread_mutex_unlock(&data->resend_lock);
90    // --> END LOCK
91}
92
93
94void bulk_sm_flounder_send_fifo_msg(struct bulk_channel *channel,
95                                    errval_t (*send_fn)(void *arg))
96{
97    bulk_sm_flounder_send_fifo_msg_with_arg(channel, send_fn, channel);
98}
99
100void bulk_sm_flounder_send_fifo_msg_with_arg(struct bulk_channel *channel,
101                                             errval_t (*send_fn)(void *arg),
102                                             void *arg)
103{
104    struct waitset           *ws   = channel->waitset;
105    struct bulk_sm_impl_data *data = CHANNEL_DATA(channel);
106    struct bulk_ctrl_binding *b    = CHANNEL_BINDING(channel);
107    errval_t err = SYS_ERR_OK;
108
109    // <-- BEGIN LOCK
110    thread_mutex_lock(&data->resend_lock);
111    struct bulk_sm_resend_item *head = data->resend_closure;
112
113    if (head == NULL) {
114        //no other messages in queue -> try sending directly
115        err = send_fn(arg);
116    }
117
118    if (head != NULL || err_no(err) == FLOUNDER_ERR_TX_BUSY) {
119        // insert continuation into waitlist
120        struct bulk_sm_resend_item *new_item = malloc(sizeof(*new_item));
121        assert(new_item);
122        new_item->event = MKCLOSURE((void (*)(void*)) send_fn, arg);
123        new_item->next  = NULL;
124
125        if (head == NULL) {
126            data->resend_closure = new_item;
127
128            // queue was empty: register resend handler
129            err = b->register_send(b, ws,
130                    MKCONT(bulk_sm_flounder_resend_handler, channel));
131            if (err_is_fail(err)) {
132                // somebody else already registered a resend handler.
133                // we cannot tolerate this.
134                USER_PANIC_ERR(BULK_TRANSFER_SM_EXCLUSIVE_WS,
135                        "bulk_sm_flounder_resend_msg_with_arg");
136            }
137        } else {
138            while (head->next) head = head->next;
139            head->next = new_item;
140        }
141        BULK_FH_MSG("Registered resend item  : %p(%p) -> %p",
142                new_item->event.handler, new_item->event.arg, new_item);
143
144    } else if (err_is_fail(err)){
145        debug_printf("bulk_sm_flounder_send_fifo_msg: sending failed %s\n",
146                        err_getstring(err));
147    }
148    thread_mutex_unlock(&data->resend_lock);
149    // --> END LOCK
150}
151
152
153
154errval_t create_pool_from_flounder(struct bulk_pool       **pool,
155                                   const bulk_ctrl_pool_t *f_pool)
156{
157    assert(pool);
158    assert(f_pool);
159    errval_t err;
160
161    // allocate pool
162    struct bulk_pool_id pool_id = {
163        .machine = f_pool->pool_id_machine,
164        .dom     = f_pool->pool_id_dom,
165        .local   = f_pool->pool_id_local
166    };
167
168    struct bulk_pool *p;
169    err = bulk_pool_alloc_with_id(&p,
170            f_pool->num_buffers, f_pool->buffer_size, pool_id);
171    if (err_is_fail(err)) {
172        return err;
173    }
174
175    // update trust level
176    p->trust = flounder2bulk_trust(f_pool->trust);
177
178    // add capability
179    if (p->trust != BULK_TRUST_NONE) {
180        p->pool_cap = f_pool->cap;
181    } else {
182        p->pool_cap = NULL_CAP;
183    }
184
185    *pool = p;
186    return SYS_ERR_OK;
187}
188
189void generate_pool_for_flounder(const struct bulk_pool *pool,
190                                bulk_ctrl_pool_t       *f_pool)
191{
192    assert(pool);
193    assert(f_pool);
194
195    f_pool->pool_id_machine = pool->id.machine;
196    f_pool->pool_id_dom     = pool->id.dom;
197    f_pool->pool_id_local   = pool->id.local;
198    f_pool->trust           = bulk2flounder_trust(pool->trust);
199    f_pool->buffer_size     = pool->buffer_size;
200    f_pool->num_buffers     = pool->num_buffers;
201    f_pool->cap             = pool->pool_cap;
202}
203
204
205//fills in the poolid fields. does not allocate any new memory
206void fill_pool_id_from_flounder(struct bulk_pool_id         *poolid,
207                                const bulk_ctrl_poolid_t  *f_poolid)
208{
209    assert(poolid);
210    assert(f_poolid);
211    poolid->machine = f_poolid->pool_id_machine;
212    poolid->dom     = f_poolid->pool_id_dom;
213    poolid->local   = f_poolid->pool_id_local;
214}
215
216//fills in the poolid fields. does not allocate any new memory
217void fill_pool_id_for_flounder(const struct bulk_pool_id    *poolid,
218                               bulk_ctrl_poolid_t  *f_poolid)
219{
220    assert(poolid);
221    assert(f_poolid);
222    f_poolid->pool_id_machine = poolid->machine;
223    f_poolid->pool_id_dom     = poolid->dom;
224    f_poolid->pool_id_local   = poolid->local;
225}
226