1/**
2 * \file
3 * \brief block_server client process.
4 */
5
6/*
7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <stdio.h>
16
17#include <barrelfish/barrelfish.h>
18#include <barrelfish/waitset.h>
19#include <barrelfish/nameservice_client.h>
20
21#include <bulk_transfer/bulk_transfer.h>
22#include <bulk_transfer/bulk_sm.h>
23
24#include "bs_connector.h"
25#include "benchmark.h"
26
27enum bs_connect_stage
28{
29    CSTAGE_START,
30    CSTAGE_SERVICE_BOUND,
31    CSTAGE_SERVICE_SETUP,
32    CSTAGE_ERR
33};
34
35static uint32_t seq_number = 0;
36
37volatile enum bs_connect_stage connection_stage = CSTAGE_START;
38
39struct waitset *bs_service_ws    = NULL;
40struct waitset *bs_service_tx_ws = NULL;
41struct waitset *bs_service_rx_ws = NULL;
42
43static inline void wait_for_condition(enum bs_connect_stage stage)
44{
45    struct bulk_sm_ws_item ws_list[3];
46
47    ws_list[0].ws   = bs_service_ws;
48    ws_list[1].ws   = bs_service_tx_ws;
49    ws_list[2].ws   = bs_service_rx_ws;
50    ws_list[0].next = &ws_list[1];
51    ws_list[1].next = &ws_list[2];
52    ws_list[2].next = NULL;
53
54    while (connection_stage != stage) {
55        errval_t err = bulk_sm_multiple_event_dispatch(ws_list);
56        if (err_is_fail(err)) {
57            USER_PANIC_ERR(err, "wait_for_condition: event_dispach");
58        }
59    }
60}
61
62
63static void bs_service_status_cb(struct block_service_binding *b,
64                                 errval_t err,
65                                 uint32_t seqn,
66                                 uint32_t req)
67{
68    BS_CONN_DEBUG("status message: error=%s, seqn=%u, req=%u",
69            err_getstring(err), seqn, req);
70    bench_signal(err, seqn, req);
71}
72
73static struct block_service_rx_vtbl bs_rx_vtbl = {
74    .status = bs_service_status_cb };
75
76static void bs_service_bind_cb(void *st,
77                               errval_t err,
78                               struct block_service_binding *b)
79{
80    struct bs_connection *conn = (struct bs_connection *) st;
81
82    b->st = conn;
83    conn->service = b;
84
85    conn->state = BS_CONN_SERVICE_BOUND;
86
87    b->rx_vtbl = bs_rx_vtbl;
88
89    connection_stage = CSTAGE_SERVICE_BOUND;
90}
91
92static void bs_service_setup_cb(void *a)
93{
94    /* DUMMY */
95}
96
97static errval_t bs_bulk_bind_cb(struct bulk_channel *channel)
98{
99    struct bs_connection *conn = (struct bs_connection *) channel->user_state;
100    if (conn->state == BS_CONN_SERVICE_BOUND) {
101        BS_CONN_DEBUG("%s", "first bulk channel bound.");
102        conn->state = BS_CONN_BULK_BINDING;
103        return SYS_ERR_OK;
104    } else if (conn->state == BS_CONN_BULK_BINDING) {
105        BS_CONN_DEBUG("%s", "second channel bound.");
106        conn->state = BS_CONN_CONNECTED;
107        connection_stage = CSTAGE_SERVICE_SETUP;
108        return SYS_ERR_OK;
109    }
110    assert(!"This should not happen...");
111    return SYS_ERR_OK;
112}
113
114static void bs_service_setup(void *st)
115{
116    errval_t err;
117    BS_CONN_DEBUG("sending setup message to %s.", BLOCK_SERVICE_NAME);
118
119    struct bs_connection *conn = (struct bs_connection *) st;
120
121    struct event_closure txcont = MKCONT(bs_service_setup_cb, conn);
122
123    BS_CONN_DEBUG("tx_iref=%i, rx_iref=%i", (uint32_t )conn->tx_ep.iref,
124                  (uint32_t )conn->rx_ep.iref);
125
126    err = block_service_setup__tx(conn->service, txcont, conn->rx_ep.iref,
127                                  conn->tx_ep.iref);
128    if (err_is_fail(err)) {
129        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
130            txcont = MKCONT(bs_service_setup, conn);
131            struct waitset *ws = get_default_waitset();
132            err = conn->service->register_send(conn->service, ws, txcont);
133            if (err_is_fail(err)) {
134                connection_stage = CSTAGE_ERR;
135                conn->state = BS_CONN_ERR;
136                USER_PANIC_ERR(err, "could not register send\n");
137            }
138        } else {
139            BS_CONN_DEBUG("sending setup message %s", BLOCK_SERVICE_NAME);
140            connection_stage = CSTAGE_ERR;
141            conn->state = BS_CONN_ERR;
142            USER_PANIC_ERR(err, "flounder failure\n");
143        }
144    }
145}
146
147errval_t bs_service_connect(struct bs_connection *conn,
148                            struct bulk_channel_callbacks *rx_cb,
149                            struct bulk_channel_callbacks *tx_cb)
150{
151    errval_t err;
152    iref_t service_iref;
153
154    BS_CONN_DEBUG("looking up service: %s", BLOCK_SERVICE_NAME);
155    /* connect to the block service */
156    err = nameservice_blocking_lookup(BLOCK_SERVICE_NAME, &service_iref);
157    if (err_is_fail(err)) {
158        return err;
159    }
160
161    BS_CONN_DEBUG("binding to iref %i", (uint32_t )service_iref);
162    err = block_service_bind(service_iref, bs_service_bind_cb, conn,
163                             get_default_waitset(), IDC_BIND_FLAGS_DEFAULT);
164    if (err_is_fail(err)) {
165        return err;
166    }
167
168    bs_service_ws = get_default_waitset();
169
170    wait_for_condition(CSTAGE_SERVICE_BOUND);
171
172    BS_CONN_DEBUG("%s", "setting up bulk endpoints and channels");
173
174    /* setting up bulk endpoints */
175    err = bulk_sm_ep_create(&conn->rx_ep);
176    assert(!err_is_fail(err));
177    err = bulk_sm_ep_create(&conn->tx_ep);
178    assert(!err_is_fail(err));
179
180    // need exclusive waitset per channel
181    struct waitset *rx_ws = malloc(sizeof(*rx_ws));
182    struct waitset *tx_ws = malloc(sizeof(*tx_ws));
183    assert(rx_ws);
184    assert(tx_ws);
185    waitset_init(rx_ws);
186    waitset_init(tx_ws);
187
188    bs_service_tx_ws = tx_ws;
189    bs_service_rx_ws = rx_ws;
190
191    struct bulk_channel_setup setup = {
192        .role = BULK_ROLE_MASTER,
193        .trust = BULK_TRUST_FULL,
194        .direction = BULK_DIRECTION_RX,
195        .meta_size = sizeof(struct bs_meta_data),
196        .waitset = rx_ws,
197        .user_state = conn };
198
199    rx_cb->bind_received = bs_bulk_bind_cb;
200
201    err = bulk_channel_create(&conn->rx_channel,
202                              (struct bulk_endpoint_descriptor *) &conn->rx_ep,
203                              rx_cb, &setup);
204    if (err_is_fail(err)) {
205        return err;
206    }
207
208    setup.direction = BULK_DIRECTION_TX;
209    setup.waitset   = tx_ws;
210
211    tx_cb->bind_received = bs_bulk_bind_cb;
212
213    err = bulk_channel_create(&conn->tx_channel,
214                              (struct bulk_endpoint_descriptor *) &conn->tx_ep,
215                              tx_cb, &setup);
216    if (err_is_fail(err)) {
217        return err;
218    }
219
220    bs_service_setup(conn);
221
222    wait_for_condition(CSTAGE_SERVICE_SETUP);
223
224    BS_CONN_DEBUG("%s", "Service and bulk channels setup.");
225
226    return SYS_ERR_OK;
227}
228
229struct bs_read_request {
230    struct bs_connection *conn;
231    uint32_t block_id;
232    uint32_t block_count;
233    struct bulk_continuation cont;
234};
235
236static void bs_service_read_sent_cb(void *a)
237{
238    free(a);
239}
240
241static errval_t bs_service_read_send(void *a)
242{
243    errval_t err;
244
245    struct bs_read_request *req = (struct bs_read_request*)a;
246
247    struct event_closure txcont = MKCONT(bs_service_read_sent_cb, req);
248    err = block_service_read__tx(req->conn->service, txcont, req->block_id, req->block_count, seq_number);
249    if (err_is_fail(err)) {
250        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
251            // cannot do this: would need to register multiple resend functions.
252            // struct waitset *ws = get_default_waitset();
253            // txcont = MKCONT(bs_service_read_send, req);
254            // err = req->conn->service->register_send(req->conn->service, ws, txcont);
255            // if (err_is_fail(err)) {
256            //     // note that only one continuation may be registered at a time
257            //     DEBUG_ERR(err, "register_send on binding failed!");
258            // }
259        }
260    } else {
261        seq_number++;
262    }
263
264    return err;
265}
266
267errval_t bs_service_read(struct bs_connection *conn,
268                         uint32_t block_id,
269                         uint32_t block_count,
270                         struct bulk_continuation cont)
271{
272    struct bs_read_request *req = malloc(sizeof(struct bs_read_request));
273
274    req->block_count = block_count;
275    req->conn = conn;
276    req->block_id = block_id;
277    req->cont = cont;
278
279    errval_t err;
280    do { // busy wait because no send queue available.
281        err = bs_service_read_send(req);
282        if (err_is_fail(err) && err != FLOUNDER_ERR_TX_BUSY) {
283            USER_PANIC_ERR(err, "bs_service_read");
284        }
285
286        errval_t other = event_dispatch(get_default_waitset());
287        // maybe need dispatch service->tx/rx_channel.waitset as well?
288        assert(err_is_ok(other));
289    } while (err_is_fail(err));
290
291    return SYS_ERR_OK;
292}
293