1/**
2 * \file
3 * \brief Network server thread of the bulk server
4 */
5
6/*
7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14#include <string.h>
15
16#include <barrelfish/barrelfish.h>
17#include <barrelfish/nameservice_client.h>
18
19#include <bulk_transfer/bulk_transfer.h>
20#include <bulk_transfer/bulk_sm.h>
21
22#include <if/block_service_defs.h>
23
24#include "block_storage.h"
25#include "block_server.h"
26#include "network_client.h"
27#include "local_server.h"
28
29/* ---------------------------  Server State  ---------------------------- */
30static uint8_t server_state = SERVICE_STATE_UNINITIALIZED;
31
32static iref_t server_iref = 0;
33
34static uint8_t is_client = 0;
35
36static struct block_net_service *block_server;
37
38static uint8_t is_bound = false;
39
40static struct block_local_service local_server;
41
42static void send_status_reply(struct block_service_binding *binding,
43                              enum block_err_code error_code,
44                              uint32_t seqn,
45                              enum block_net_msg_type reqid);
46
47#if BLOCK_ENABLE_NETWORKING
48// condition used to singal controlling code to wait for a condition
49static bool wait_cond;
50
51static inline void wait_for_condition(volatile bool *cond)
52{
53    while (*cond) {
54        messages_wait_and_handle_next();
55    }
56}
57
58struct pool_assign_arg
59{
60    struct bulk_channel *chan_server;
61    struct bulk_channel *chan_client;
62    struct bulk_pool *pool;
63    volatile bool wait_cond;
64    errval_t err;
65};
66
67/* ------------------------  Bulk Transfer Callbacks ----------------------*/
68
69static void pool_assigned_handler(void *arg,
70                                  errval_t err,
71                                  struct bulk_channel *chan)
72{
73    struct pool_assign_arg *pa = arg;
74    pa->err = err;
75    pa->wait_cond = false;
76}
77
78#endif
79
80static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel,
81                                      struct bulk_pool *pool)
82{
83
84#if !BLOCK_ENABLE_NETWORKING
85    return SYS_ERR_OK;
86#else
87
88    errval_t err = SYS_ERR_OK;
89
90    debug_printf("Pool assign request received. \n");
91
92    struct pool_assign_arg *arg = malloc(sizeof(struct pool_assign_arg));
93    assert(arg);
94    arg->chan_client = channel;
95    arg->pool = pool;
96    arg->wait_cond = 1;
97
98    struct bulk_continuation cont = {
99        .handler = pool_assigned_handler,
100        .arg = arg, };
101
102    /* forward the pool assignment */
103    if (channel->direction == BULK_DIRECTION_RX) {
104        arg->chan_server = &block_server->tx_chan;
105        err = bulk_channel_assign_pool(&block_server->tx_chan, pool, cont);
106    } else {
107        arg->chan_server = &block_server->rx_chan;
108        err = bulk_channel_assign_pool(&block_server->rx_chan, pool, cont);
109    }
110
111    if (err_is_fail(err)) {
112        return err;
113    }
114
115    wait_cond = true;
116    wait_for_condition(&arg->wait_cond);
117
118    assert(pool == arg->pool);
119    assert(arg->chan_client == channel);
120    err = arg->err;
121    free(arg);
122
123    return err;
124#endif
125}
126
127static errval_t bulk_pool_removed_cb(struct bulk_channel *channel,
128                                     struct bulk_pool *pool)
129{
130    return SYS_ERR_OK;
131}
132
133
134static void bulk_move_received(struct bulk_channel *channel,
135                               struct bulk_buffer *buffer,
136                               void *meta)
137{
138    BS_LOCAL_DEBUG_TRACE
139
140    errval_t err;
141
142    struct bs_meta_data *bs = (struct bs_meta_data *) meta;
143
144    if (is_client) {
145        err = block_net_write(block_server, 1, &buffer, meta, BULK_CONT_NOP);
146        if (err_is_fail(err)) {
147            debug_printf("ERROR: block net write. %s", err_getstring(err));
148        }
149    } else {
150        err = block_storage_write(bs->block_id, buffer->address);
151        if (err_is_fail(err)) {
152            BS_LOCAL_DEBUG("%s", "ERROR: block could not be written");
153        }
154        err = bulk_channel_pass(channel, buffer, meta,
155        BULK_CONT_NOP);
156        if (err_is_fail(err)) {
157            DEBUG_ERR(err, "failed to pass the buffer back");
158        };
159        send_status_reply(local_server.binding, err, bs->req_id,
160                          BLOCK_NET_MSG_WRITE);
161    }
162}
163
164static void bulk_copy_received(struct bulk_channel *channel,
165                               struct bulk_buffer *buffer,
166                               void *meta)
167{
168    BS_LOCAL_DEBUG_TRACE
169
170    assert(!"Block server does not deal with copies for now..");
171
172}
173
174static void bulk_buffer_received(struct bulk_channel *channel,
175                                 struct bulk_buffer *buffer,
176                                 void *meta)
177{
178    BS_LOCAL_DEBUG_TRACE
179
180    if (is_client) {
181        /* forward buffer to the server */
182        if (channel->direction == BULK_DIRECTION_RX) {
183            bulk_channel_pass(&block_server->tx_chan, buffer, meta,
184            BULK_CONT_NOP);
185        } else {
186            bulk_channel_pass(&block_server->rx_chan, buffer, meta,
187            BULK_CONT_NOP);
188        }
189
190    } else {
191        /* put it into the buffer store */
192        block_server_insert_buffer(&bs_bulk_buffers, buffer, channel);
193    }
194}
195
196static void bulk_copy_released(struct bulk_channel *channel,
197                               struct bulk_buffer *buffer)
198{
199    if (is_client) {
200
201    } else {
202        block_server_insert_buffer(&bs_bulk_buffers, buffer, NULL);
203    }
204}
205
206static void bulk_bind_done_cb(void *arg,
207                              errval_t err,
208                              struct bulk_channel *channel)
209{
210    BS_LOCAL_DEBUG_TRACE
211    struct block_local_service *ls = (struct block_local_service *) arg;
212
213    if (err_is_fail(err)) {
214        DEBUG_ERR(err, "callback: Binding failed.\n");
215        return;
216    }
217
218    ls->bound++;
219}
220
221static struct bulk_channel_callbacks bulk_rx_cb = {
222    .pool_assigned = bulk_pool_assigned_cb,
223    .pool_removed = bulk_pool_removed_cb,
224    .move_received = bulk_move_received,
225    .copy_received = bulk_copy_received };
226
227static struct bulk_channel_callbacks bulk_tx_cb = {
228    .pool_assigned = bulk_pool_assigned_cb,
229    .pool_removed = bulk_pool_removed_cb,
230    .buffer_received = bulk_buffer_received,
231    .copy_released = bulk_copy_released };
232
233/* ------------------------ Sending Status Reply ---------------------------*/
234
235struct bs_status_reply
236{
237    enum block_net_msg_type req;
238    uint32_t seqn;
239    enum block_net_err err;
240    struct block_service_binding *binding;
241};
242
243static void status_reply_sent_cb(void *a)
244{
245    free(a);
246}
247
248static void status_reply_do_send(void *a)
249{
250    errval_t err;
251
252    struct bs_status_reply *req = (struct bs_status_reply*) a;
253
254    struct event_closure txcont = MKCONT(status_reply_sent_cb, req);
255    err = block_service_status__tx(req->binding, txcont, req->err, req->seqn,
256                                   req->req);
257    if (err_is_fail(err)) {
258        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
259            struct waitset *ws = get_default_waitset();
260            txcont = MKCONT(status_reply_do_send, req);
261            err = req->binding->register_send(req->binding, ws, txcont);
262            if (err_is_fail(err)) {
263                // note that only one continuation may be registered at a time
264                DEBUG_ERR(err, "register_send on binding failed!");
265            }
266
267        }
268    }
269}
270
271static void send_status_reply(struct block_service_binding *binding,
272                              enum block_err_code error_code,
273                              uint32_t seqn,
274                              enum block_net_msg_type reqid)
275{
276    struct bs_status_reply *req = malloc(sizeof(struct bs_status_reply));
277
278    req->seqn = seqn;
279    req->binding = binding;
280    req->err = error_code;
281    req->req = reqid;
282
283    status_reply_do_send(req);
284}
285
286/* -------------------------  Request Callbacks  ------------------------- */
287
288/**
289 * \brief callback for read requests on the flounder channel
290 */
291static void rx_read_request(struct block_service_binding *binding,
292                            uint32_t start_block,
293                            uint32_t count,
294                            uint32_t seqn)
295{
296    BS_LOCAL_DEBUG_TRACE
297
298    errval_t err;
299    /*
300     * if is server, then serve the request locally
301     * else forward it to the network server
302     */
303
304    if (is_client) {
305        err = block_net_read(block_server, start_block, count, seqn,
306        BULK_CONT_NOP);
307        if (err_is_fail(err)) {
308            debug_printf("failed to issue read request to server.\n");
309        }
310        return;
311    }
312
313    // send_status_reply(binding, SYS_ERR_OK, seqn, BLOCK_NET_MSG_READ);
314
315    /* we're the server, so we can serve it locally */
316
317    struct block_local_service *ls = (struct block_local_service *) binding->st;
318    struct bulk_channel        *chan = &ls->tx_chan;
319
320    struct bs_meta_data *meta_data = malloc(
321                    count * sizeof(struct bs_meta_data));
322    assert(meta_data);
323
324    for (uint32_t i = 0; i < count; ++i) {
325        /* TODO: specify a pool */
326        struct bulk_buffer *buf = block_server_get_buffer(&bs_bulk_buffers,
327                                                          chan);
328        if (!buf) {
329            debug_printf("ERROR: Has no buffers left...\n");
330            send_status_reply(binding, BLOCK_NET_MSG_READ, seqn,
331                    BLOCK_NET_ERR_NO_BUFS);
332            free(meta_data);
333            return ;
334        }
335
336        err = block_storage_read(start_block + i, buf->address);
337        if (err_is_fail(err)) {
338            debug_printf("ERROR: block id is out of range: %i",
339                         (uint32_t) (start_block + count));
340            send_status_reply(binding, BLOCK_NET_MSG_READ, seqn,
341                    BLOCK_NET_ERR_BLOCK_ID);
342        }
343        meta_data[i].block_id = start_block + i;
344        meta_data[i].req_id = 0; // XXX not available. not used by bs_user.
345        meta_data[i].cont = BULK_CONT_NOP;
346        BS_LOCAL_DEBUG("bulk_channel_move: chan=%p, buf=%p\n", chan, buf);
347        err = bulk_channel_move(chan, buf, meta_data + i, BULK_CONT_NOP);
348        if (err_is_fail(err)) {
349            send_status_reply(binding, BLOCK_NET_MSG_READ, seqn, err);
350            debug_printf("channel move failed");
351        }
352    }
353
354    /* XXX: assume that the meta data has been copied... */
355    free(meta_data);
356}
357
358static void rx_setup_request(struct block_service_binding *binding,
359                             iref_t tx_iref,
360                             iref_t rx_iref)
361{
362    BS_LOCAL_DEBUG("tx_iref=%i, rx_iref=%i", (uint32_t )tx_iref,
363                   (uint32_t )rx_iref);
364
365    errval_t err;
366
367    struct block_local_service *ls = (struct block_local_service *) binding->st;
368
369    assert(ls);
370
371    if (ls->bound != 0) {
372        /* binding already in progress */
373        debug_printf("already bound\n");
374        return;
375    }
376
377    err = bulk_sm_ep_create_remote(&ls->rx_ep, rx_iref);
378    assert(!err_is_fail(err));
379
380    err = bulk_sm_ep_create_remote(&ls->tx_ep, tx_iref);
381    assert(!err_is_fail(err));
382
383    struct bulk_channel_bind_params params = {
384        .role = BULK_ROLE_GENERIC,
385        .waitset = get_default_waitset(),
386        .user_state = ls,
387        .trust = BULK_TRUST_FULL, };
388
389    struct bulk_continuation cont = {
390        .arg = ls,
391        .handler = bulk_bind_done_cb, };
392
393    err = bulk_channel_bind(&ls->rx_chan,
394                            (struct bulk_endpoint_descriptor *) &ls->rx_ep,
395                            &bulk_rx_cb, &params, cont);
396    if (err_is_fail(err)) {
397        DEBUG_ERR(err, "binding failed.");
398    }
399
400    err = bulk_channel_bind(&ls->tx_chan,
401                            (struct bulk_endpoint_descriptor *) &ls->tx_ep,
402                            &bulk_tx_cb, &params, cont);
403    if (err_is_fail(err)) {
404        DEBUG_ERR(err, "binding failed.");
405    }
406}
407
408/* ---------------------  Connection Initialization  --------------------- */
409
410/**
411 * \brief accepts new connections to the local  via the flounder interface
412 */
413static errval_t block_local_accept_cb(void *st, struct block_service_binding *b)
414{
415    BS_LOCAL_DEBUG_TRACE
416
417    debug_printf("> New connection on the flounder interface\n");
418
419    // do the channel initialization
420    b->rx_vtbl.read = rx_read_request;
421    b->rx_vtbl.setup = rx_setup_request;
422
423    assert(!is_bound);
424
425    is_bound = true;
426
427    b->st = &local_server;
428    local_server.binding = b;
429
430    return SYS_ERR_OK;
431}
432
433/* ----------------------  Channel Initialization  ----------------------- */
434
435/**
436 * \brief callback for interface export
437 */
438static void block_local_export_cb(void *st, errval_t err, iref_t iref)
439{
440    BS_LOCAL_DEBUG_TRACE
441    if (err_is_fail(err)) {
442        /* TODO: Error handling */
443        server_state = SERVICE_STATE_FAILURE;
444    }
445
446    server_iref = iref;
447
448    server_state = SERVICE_STATE_EXPORTED;
449}
450
451/**
452 * \brief initializes the machine local block server
453 */
454errval_t block_local_init(struct block_net_service *server, uint32_t flags)
455{
456    BS_LOCAL_DEBUG_TRACE
457
458    errval_t err;
459
460    is_client = (flags & SERVICE_FLAG_CLIENT) && 1;
461
462    block_server = server;
463
464    // export the interface
465    err = block_service_export(NULL, block_local_export_cb,
466                               block_local_accept_cb, get_default_waitset(),
467                               IDC_EXPORT_FLAGS_DEFAULT);
468    if (err_is_fail(err)) {
469        return err;
470    }
471
472    while (server_state != SERVICE_STATE_EXPORTED) {
473        if (server_state == SERVICE_STATE_FAILURE) {
474            USER_PANIC("export resulted in a failure condition");
475        }
476        messages_wait_and_handle_next();
477    }
478
479    return SYS_ERR_OK;
480}
481
482/* -------------------------  Server Management  ------------------------- */
483
484/**
485 * \brief starts the machine local server of block service to accept requests
486 *
487 * This function should not return until stopped.
488 */
489errval_t block_local_start(void)
490{
491    BS_LOCAL_DEBUG_TRACE
492
493    if (server_state != SERVICE_STATE_EXPORTED) {
494        assert(server_state == SERVICE_STATE_EXPORTED);
495        /* TODO: return with error */
496    }
497
498    errval_t err = nameservice_register(BLOCK_SERVER_NAME, server_iref);
499    if (err_is_fail(err)) {
500        return err;
501    }
502
503    return SYS_ERR_OK;
504}
505/**
506 * \brief stops the request handling of the machine local block service requests
507 */
508errval_t block_local_stop(void)
509{
510    // set the stop flag.
511
512    // tear down all bulk channels
513
514    // stop the flounder service
515
516    // free up resources
517
518    assert(!"NYI: block_local_stop");
519    return SYS_ERR_OK;
520}
521
522/* ------------------ callbacks for the network server --------------------- */
523
524/**
525 * forwards the received buffer to the requesting client
526 */
527void block_local_data_ready(struct bulk_buffer *buffer, void *meta)
528{
529    BS_LOCAL_DEBUG_TRACE
530
531    errval_t err;
532
533    err = bulk_channel_move(&local_server.tx_chan, buffer, meta, BULK_CONT_NOP);
534    if (err_is_fail(err)) {
535        DEBUG_ERR(err, "could not do the move");
536    }
537}
538/**
539 * forwards the buffer ot the originating client
540 */
541errval_t block_local_return_buffer(struct bulk_channel *chan,
542                                   struct bulk_buffer *buffer,
543                                   void *meta)
544{
545    BS_LOCAL_DEBUG_TRACE
546
547    errval_t err;
548    struct bulk_channel *local_chan;
549    if (chan->direction == BULK_DIRECTION_RX) {
550        local_chan = &local_server.tx_chan;
551    } else {
552        local_chan = &local_server.rx_chan;
553    }
554    err = bulk_channel_pass(local_chan, buffer, meta, BULK_CONT_NOP);
555    if (err_is_fail(err)) {
556        DEBUG_ERR(err, "could not pass the buffer");
557    }
558
559    return SYS_ERR_OK;
560}
561
562/**
563 *
564 */
565errval_t block_local_release_copy(struct bulk_buffer *buffer)
566{
567    //struct bs_meta_data *data = (struct bs_meta_data *)meta;
568    assert(!"NYI");
569
570    return SYS_ERR_OK;
571}
572
573errval_t block_local_send_status(enum block_net_msg_type req,
574                                 uint32_t reqid,
575                                 enum block_net_err stats)
576{
577    BS_LOCAL_DEBUG_TRACE
578    send_status_reply(local_server.binding, stats, reqid, req);
579    return SYS_ERR_OK;
580}
581