1/**
2 * \file
3 * \brief Network client of the block service
4 */
5
6/*
7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14#include <string.h>
15
16#include <barrelfish/barrelfish.h>
17
18#include "block_server.h"
19#include <lwip/init.h>
20#include <lwip/tcp.h>
21#include <lwip/ip_addr.h>
22
23#include "network_common.h"
24#include "network_client.h"
25#include "local_server.h"
26
27#if BULK_NET_BACKEND_PROXY
28#include <bulk_transfer/bulk_net_proxy.h>
29#include <bulk_transfer/bulk_local.h>
30#endif
31
32/* number of bulk connections */
33static volatile uint32_t bulk_connections = 0;
34
35// condition used to singal controlling code to wait for a condition
36static bool wait_cond;
37
38static inline void wait_for_condition(void)
39{
40    while (wait_cond) {
41        messages_wait_and_handle_next();
42    }
43}
44
45/* ----------------------bulk transfer callbacks ----------------------------*/
46
47static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel,
48                                      struct bulk_pool *pool);
49static errval_t bulk_pool_removed_cb(struct bulk_channel *channel,
50                                     struct bulk_pool *pool);
51static void bulk_move_received(struct bulk_channel *channel,
52                               struct bulk_buffer *buffer,
53                               void *meta);
54static void bulk_buffer_received(struct bulk_channel *channel,
55                                 struct bulk_buffer *buffer,
56                                 void *meta);
57static void bulk_copy_received(struct bulk_channel *channel,
58                               struct bulk_buffer *buffer,
59                               void *meta);
60static void bulk_copy_released(struct bulk_channel *channel,
61                               struct bulk_buffer *buffer);
62static errval_t bulk_bind_received(struct bulk_channel *channel);
63
64static struct bulk_channel_callbacks bulk_rx_cb = {
65    .bind_received = bulk_bind_received,
66    .pool_assigned = bulk_pool_assigned_cb,
67    .pool_removed = bulk_pool_removed_cb,
68    .move_received = bulk_move_received,
69    .copy_received = bulk_copy_received,
70    .buffer_received = bulk_buffer_received,
71    .copy_released = bulk_copy_released, };
72
73static struct bulk_channel_callbacks bulk_tx_cb = {
74    .bind_received = bulk_bind_received,
75    .pool_assigned = bulk_pool_assigned_cb,
76    .pool_removed = bulk_pool_removed_cb,
77    .buffer_received = bulk_buffer_received,
78    .copy_released = bulk_copy_released,
79    .move_received = bulk_move_received,
80    .copy_received = bulk_copy_received,
81
82};
83
84/* --------------------- lwip/tcp callbacks ---------------------------- */
85
86/**
87 * callback when a send event has been completed
88 */
89static err_t block_net_sent_cb(void *arg, struct tcp_pcb *pcb, u16_t len)
90{
91    BS_NET_DEBUG_TRACE
92
93    assert(pcb != NULL);
94
95    return ERR_OK;
96}
97
98/**
99 * callback when a new packet has been arrived
100 */
101static err_t block_net_recv_cb(void *arg,
102                               struct tcp_pcb *pcb,
103                               struct pbuf *pb,
104                               err_t err)
105{
106    BS_NET_DEBUG_TRACE
107
108    assert(pcb != NULL);
109    assert(pb);
110    if (pb == NULL) {
111        /* connection closed. clean up and EXIT the program. */
112        tcp_close(pcb);
113        assert(!"NYI: cleaning up the resources!");
114        exit(EXIT_SUCCESS);
115    }
116
117    if ((err != ERR_OK) || !pb) {
118        /* there was an error... TODO */
119        debug_printf("there was an error...\n");
120        return err;
121    }
122
123    struct block_net_msg *msg = (struct block_net_msg *) pb->payload;
124    assert(msg);
125    if (pb->tot_len != msg->size) {
126        /* some thing wrong... */
127    }
128
129    struct block_net_service *c = (struct block_net_service *) arg;
130
131    enum block_net_msg_type req;
132
133    switch (msg->type) {
134        case BLOCK_NET_MSG_STATUS:
135
136            req = msg->msg.status.req;
137            uint32_t reqid = msg->msg.status.reqid;
138            enum block_net_err status = msg->msg.status.code;
139            tcp_recved(pcb, pb->tot_len);
140
141            pbuf_free(pb);
142
143            switch (msg->msg.status.req) {
144                case BLOCK_NET_MSG_INIT:
145                    if (status == BLOCK_NET_ERR_OK) {
146                        debug_printf("/* all fine, connection fully established */\n");
147                        wait_cond = 0;
148                        c->bound = 1;
149                    }
150                    break;
151                case BLOCK_NET_MSG_READ:
152                    block_local_send_status(req, reqid, status);
153                    break;
154                case BLOCK_NET_MSG_WRITE:
155#if BLOCK_BENCH_ENABLE
156                    testrun_handle_status(req, reqid, status);
157#else
158                    block_local_send_status(req, reqid, status);
159#endif
160                    break;
161                default:
162                    break;
163            }
164            return ERR_OK;
165            break;
166        default:
167            debug_printf("got an unknown reply...");
168            break;
169    }
170
171    tcp_recved(pcb, pb->tot_len);
172
173    pbuf_free(pb);
174
175    return ERR_OK;
176}
177
178/**
179 * callback when error occures
180 */
181static void block_net_err_cb(void *arg, err_t err)
182{
183    BS_NET_DEBUG_TRACE
184
185    debug_printf("tcp is err: %d\n", (int) err);
186}
187
188/**
189 * callback for polling the connection
190 */
191static err_t block_net_poll_cb(void *arg, struct tcp_pcb *pcb)
192{
193
194    return ERR_OK;
195}
196
197/**
198 * callback when the connection to the server is established.
199 */
200static err_t block_net_connected_cb(void *arg, struct tcp_pcb *pcb, err_t err)
201{
202    BS_NET_DEBUG_TRACE
203
204    if (err != ERR_OK) {
205        fprintf(stderr, "tcp connection failed\n");
206        if (err == ERR_TIMEOUT) {
207            debug_printf("connection attempt timed out..\n");
208            /* TODO: try again */
209        }
210        wait_cond = false;
211        return err;
212    }
213
214    tcp_sent(pcb, block_net_sent_cb);
215    tcp_recv(pcb, block_net_recv_cb);
216    tcp_err(pcb, block_net_err_cb);
217    tcp_poll(pcb, block_net_poll_cb, 10);
218
219    wait_cond = false;
220
221    return ERR_OK;
222}
223
224/* ------------------------- connect / disconnect ---------------------------*/
225
226static errval_t block_net_send_ep(struct block_net_service *server)
227{
228    BS_NET_DEBUG_TRACE
229
230    assert(server->rx_chan.state != BULK_STATE_UNINITIALIZED);
231    assert(server->tx_chan.state != BULK_STATE_UNINITIALIZED);
232    err_t err;
233
234    uint16_t size = sizeof(struct block_net_msg);
235    struct block_net_msg *msg = malloc(size);
236    if (!msg) {
237        return LWIP_ERR_MEM; /* ERROR CODE ?? */
238    }
239
240    msg->type = BLOCK_NET_MSG_INIT;
241    msg->size = size;
242#if BULK_NET_BACKEND_PROXY
243    msg->msg.setup.tx_ep.ip = server->tpcb->local_ip;
244    msg->msg.setup.tx_ep.port = BLOCK_NET_PORT_RX;
245    msg->msg.setup.rx_ep.ip = server->tpcb->local_ip;
246    msg->msg.setup.rx_ep.port = BLOCK_NET_PORT_TX;
247#else
248    msg->msg.setup.rx_ep = *((struct bulk_net_endpoint_descriptor*) server
249                    ->tx_chan.ep);
250    msg->msg.setup.do_bind = 1;
251    msg->msg.setup.tx_ep = *((struct bulk_net_endpoint_descriptor*) server
252                    ->rx_chan.ep);
253#endif
254    /* TODO: check if there is enough space left... */
255
256    err = tcp_write(server->tpcb, msg, size, TCP_WRITE_FLAG_COPY);
257    if (err != ERR_OK) {
258        /* abort the sending */
259        fprintf(stderr, "error writing %d\n", err);
260        return LWIP_ERR_MEM;
261    }
262
263    err = tcp_output(server->tpcb);
264
265    if (err != ERR_OK) {
266        fprintf(stderr, "error in tcp_output %d\n", err);
267        return LWIP_ERR_MEM;
268    }
269
270    wait_cond = 1;
271
272    BS_NET_DEBUG_BULK("%s", "waiting for bulk binding\n");
273
274    wait_for_condition();
275
276    BS_NET_DEBUG_BULK("%s", "binding complete. \n");
277
278    return SYS_ERR_OK;
279}
280
281#if BULK_NET_BACKEND_PROXY
282
283static void proxy_connected_cb(struct bulk_net_proxy *proxy)
284{
285    debug_printf("APPL: cb_rx_connected\n");
286
287    bulk_connections++;
288
289    if (bulk_connections == 2) {
290        wait_cond = 0;
291    }
292}
293#endif
294
295/**
296 * \brief connects to a network block service
297 */
298errval_t block_net_connect(struct block_net_service *server,
299                           struct ip_addr *ip,
300                           uint16_t port)
301{
302    BS_NET_DEBUG_TRACE
303
304    err_t err;
305    struct tcp_pcb *pcb;
306
307    memset(server, 0, sizeof(*server));
308    memcpy(&server->ip, ip, sizeof(*ip));
309    server->port = port;
310
311    BS_NET_DEBUG_NET("%s", "lwip initializing... ");
312    if (lwip_init("e10k", 1) == false) {
313        debug_printf("ERROR: lwip_init_auto failed!\n");
314        return 1;
315    }
316
317    BS_NET_DEBUG_NET("%s", "lwip init done.");
318
319    // initialize network connection to the block server
320    pcb = tcp_new();
321    if (pcb == NULL) {
322        return LWIP_ERR_CONN;
323    }
324
325    wait_cond = true;
326    err = tcp_connect(pcb, &server->ip, server->port, block_net_connected_cb);
327    // obtain basic information about the block server
328
329    BS_NET_DEBUG_NET("%s", "waiting for condition");
330    wait_for_condition();
331
332    if (err != ERR_OK) {
333        /* there was something wrong */
334        return err;
335    }
336
337    tcp_arg(pcb, server);
338    server->tpcb = pcb;
339
340    // initialize the bulk channel to the block server
341    errval_t b_err;
342
343#if BULK_NET_BACKEND_PROXY
344    BS_NET_DEBUG_NET("%s", "creating bulk net proxy channel");
345
346    bulk_local_init_endpoint(&server->rx_ep, NULL);
347    bulk_local_init_endpoint(&server->tx_ep, NULL);
348
349#else
350    BS_NET_DEBUG_NET("%s", "creating bulk channels");
351
352    struct bulk_net_ep_setup ep_setup = {
353        .port = BLOCK_NET_PORT_RX,
354        .ip = pcb->local_ip,    /// XXX: is this correct?
355        .queue = BLOCK_NET_RX_QUEUE,
356        .max_queues = BLOCK_NET_MAX_QUEUES,
357        .buffer_size = BLOCK_SIZE,
358        .buffer_count = BLOCK_NET_BUFFER_COUNT,
359        .no_copy = BULK_NET_BACKEND_NOCOPY };
360    b_err = bulk_net_ep_create(&server->rx_ep, &ep_setup);
361    assert(!err_is_fail(b_err));
362
363    ep_setup.port = BLOCK_NET_PORT_TX;
364    ep_setup.queue = BLOCK_NET_TX_QUEUE;
365    b_err = bulk_net_ep_create(&server->tx_ep, &ep_setup);
366    assert(!err_is_fail(b_err));
367#endif
368
369    struct bulk_channel_setup chan_setup = {
370        .direction = BULK_DIRECTION_TX,
371        .role = BULK_ROLE_MASTER,
372        .trust = BULK_TRUST_FULL,
373        .meta_size = sizeof(struct bs_meta_data),
374        .waitset = get_default_waitset(),
375        .user_state = server, };
376
377    b_err = bulk_channel_create(
378                    &server->tx_chan,
379                    (struct bulk_endpoint_descriptor *) &server->tx_ep,
380                    &bulk_tx_cb, &chan_setup);
381    if (err_is_fail(b_err)) {
382        bulk_channel_destroy(&server->tx_chan, BULK_CONT_NOP);
383        debug_printf("Failed to create the TX channel\n");
384        return b_err;
385    }
386
387    chan_setup.direction = BULK_DIRECTION_RX;
388    b_err = bulk_channel_create(
389                    &server->rx_chan,
390                    (struct bulk_endpoint_descriptor *) &server->rx_ep,
391                    &bulk_rx_cb, &chan_setup);
392    if (err_is_fail(b_err)) {
393        bulk_channel_destroy(&server->tx_chan, BULK_CONT_NOP);
394        debug_printf("Failed to create the RX channel\n");
395        return b_err;
396    }
397#if BULK_NET_BACKEND_PROXY
398
399    bulk_local_init_endpoint(&server->rx_p_ep, &server->rx_chan);
400    bulk_local_init_endpoint(&server->tx_p_ep, &server->tx_chan);
401
402    err = bulk_net_proxy_listen(&server->rx_proxy, &server->rx_p_ep.generic,
403                    server->rx_chan.waitset, BLOCK_SIZE, "e10k",
404                    BLOCK_NET_RX_QUEUE,
405                    BLOCK_NET_PORT_RX, proxy_connected_cb);
406    if (err_is_fail(err)) {
407        return err;
408    }
409    server->rx_proxy.user_state = server;
410
411    err = bulk_net_proxy_listen(&server->tx_proxy, &server->tx_p_ep.generic,
412                    server->tx_chan.waitset, BLOCK_SIZE, "e10k",
413                    BLOCK_NET_TX_QUEUE,
414                    BLOCK_NET_PORT_TX, proxy_connected_cb);
415    server->tx_proxy.user_state = server;
416    if (err_is_fail(err)) {
417        return err;
418    }
419
420#endif
421    return block_net_send_ep(server);
422}
423
424/**
425 * \brief disconnects form the network block service
426 */
427errval_t block_net_disconnect(struct block_net_service *server)
428{
429    BS_NET_DEBUG_TRACE
430
431    if (server->tx_chan.state == BULK_STATE_CONNECTED) {
432        /* free up the bulk channel */
433    }
434
435    if (server->rx_chan.state == BULK_STATE_CONNECTED) {
436        /* free up the bulk channel */
437    }
438
439    // tear down the bulk channel
440    if (server->tpcb) {
441        // tear down the service level network connection
442        tcp_close(server->tpcb);
443        tcp_arg(server->tpcb, NULL);
444        tcp_sent(server->tpcb, NULL);
445        tcp_recv(server->tpcb, NULL);
446    }
447    /*TODO:  do we have to free up something from lwip ? */
448
449    return SYS_ERR_OK;
450}
451
452/* ------------------------- read / write  ---------------------------*/
453
454/**
455 * \brief issues a new retrieval request to the network block server
456 *
457 * \param block_start   the id of the first block
458 * \param count         the number of blocks to read
459 */
460errval_t block_net_read(struct block_net_service *server,
461                        size_t block_start,
462                        size_t count,
463                        uint32_t seqn,
464                        struct bulk_continuation cont)
465{
466    BS_NET_DEBUG_TRACE
467
468    err_t err;
469
470    if (server->rx_chan.state != BULK_STATE_CONNECTED || !server->tpcb) {
471        return BLOCK_ERR_NOT_CONNECTED;
472    }
473
474    if (server->bound != 1) {
475        /* the connection has not yet been fully established yet */
476        return BLOCK_ERR_NOT_CONNECTED;
477    }
478
479    uint16_t size = sizeof(struct block_net_msg);
480    struct block_net_msg *msg = malloc(size);
481    if (!msg) {
482        return LWIP_ERR_MEM; /* ERROR CODE ?? */
483    }
484
485    msg->type = BLOCK_NET_MSG_READ;
486    msg->size = size;
487    msg->msg.read.block_id = block_start;
488    msg->msg.read.count = count;
489    msg->msg.read.req_id = seqn;
490    msg->msg.read.cont = cont;
491
492    server->tpcb->flags |= TF_NODELAY;
493
494    /* TODO: check if there is enough space left... */
495
496    err = tcp_write(server->tpcb, msg, size, TCP_WRITE_FLAG_COPY);
497    if (err != ERR_OK) {
498        /* abort the sending */
499        fprintf(stderr, "error writing %d\n", err);
500        return LWIP_ERR_MEM;
501    }
502
503    err = tcp_output(server->tpcb);
504
505    if (err != ERR_OK) {
506        fprintf(stderr, "error in tcp_output %d\n", err);
507        return LWIP_ERR_MEM;
508    }
509
510    /* XXX: assume that the data hase been copied into the pbuf */
511    free(msg);
512
513    return SYS_ERR_OK;
514}
515
516/**
517 * \brief forwards the write request to the network block server
518 *
519 * \param block_start   the id of the first block
520 * \param count         the numbers of blocks to write
521 * \param buf           pointer to an array of buffers
522 * \param cont          continuation for the callback when the buffer is sent
523 *
524 */
525errval_t block_net_write(struct block_net_service *server,
526                         size_t count,
527                         struct bulk_buffer **buf,
528                         struct bs_meta_data *meta,
529                         struct bulk_continuation cont)
530{
531    BS_NET_DEBUG_TRACE
532
533    assert(server->tx_chan.state == BULK_STATE_CONNECTED);
534
535    if (server->bound != 1) {
536        /* the connection has not yet been fully established */
537        return 1;
538    }
539
540    /*
541     * Note: we just need to send the buffers over the bulk channel with the
542     *       corresponding meta data.
543     */
544    errval_t err;
545    for (uint32_t i = 0; i < count; ++i) {
546        err = bulk_channel_move(&server->tx_chan, buf[i], meta + i, cont);
547        if (err_is_fail(err)) {
548            if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) {
549                /* this indicates a serious problem and all will fail */
550                return err;
551            }
552        }
553    }
554
555    // return status code
556    return SYS_ERR_OK;
557}
558
559errval_t block_net_pass(struct block_net_service *server,
560                        size_t count,
561                        struct bulk_buffer **buf,
562                        struct bs_meta_data *meta,
563                        struct bulk_continuation cont)
564{
565    BS_NET_DEBUG_TRACE
566
567    assert(server->rx_chan.state == BULK_STATE_CONNECTED);
568
569    if (server->bound != 1) {
570        /* the connection has not yet been fully established */
571        return 1;
572    }
573
574    /*
575     * Note: we just need to send the buffers over the bulk channel with the
576     *       corresponding meta data.
577     */
578    errval_t err;
579    for (uint32_t i = 0; i < count; ++i) {
580        err = bulk_channel_pass(&server->rx_chan, buf[i], meta + i, cont);
581        if (err_is_fail(err)) {
582            if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) {
583                /* this indicates a serious problem and all will fail */
584                return err;
585            }
586        }
587    }
588
589    // return status code
590    return SYS_ERR_OK;
591}
592
593errval_t block_net_release(struct block_net_service *server,
594                           size_t count,
595                           struct bulk_buffer **buf,
596                           struct bs_meta_data *meta,
597                           struct bulk_continuation cont)
598{
599    BS_NET_DEBUG_TRACE
600
601    assert(server->rx_chan.state == BULK_STATE_CONNECTED);
602
603    if (server->bound != 1) {
604        /* the connection has not yet been fully established */
605        return 1;
606    }
607
608    /*
609     * Note: we just need to send the buffers over the bulk channel with the
610     *       corresponding meta data.
611     */
612    errval_t err;
613    for (uint32_t i = 0; i < count; ++i) {
614        err = bulk_channel_release(&server->rx_chan, buf[i], cont);
615        if (err_is_fail(err)) {
616            if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) {
617                /* this indicates a serious problem and all will fail */
618                return err;
619            }
620        }
621    }
622
623    // return status code
624    return SYS_ERR_OK;
625}
626
627/* ------------------------ bulk transfer callbacks --------------------------*/
628
629static errval_t bulk_bind_received(struct bulk_channel *channel)
630{
631    BS_NET_DEBUG_TRACE
632
633    bulk_connections++;
634
635    if (bulk_connections == 2) {
636        wait_cond = 0;
637    }
638    return SYS_ERR_OK;
639}
640
641static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel,
642                                      struct bulk_pool *pool)
643{
644    BS_NET_DEBUG_TRACE
645
646    return SYS_ERR_OK;
647}
648
649static errval_t bulk_pool_removed_cb(struct bulk_channel *channel,
650                                     struct bulk_pool *pool)
651{
652    assert(!"NYI");
653    return SYS_ERR_OK;
654}
655
656static void bulk_move_received(struct bulk_channel *channel,
657                               struct bulk_buffer *buffer,
658                               void *meta)
659{
660    BS_NET_DEBUG_TRACE
661
662#if BLOCK_BENCH_ENABLE
663    testrun_bulk_move_received(channel, buffer,meta);
664#else
665    /* todo directly forward.. */
666    block_local_data_ready(buffer, meta);
667#endif
668}
669
670static void bulk_buffer_received(struct bulk_channel *channel,
671                                 struct bulk_buffer *buffer,
672                                 void *meta)
673{
674#if BLOCK_BENCH_ENABLE
675    testrun_bulk_buffer_received(channel,
676                                    buffer,
677                                    meta);
678#else
679    block_local_return_buffer(channel, buffer, meta);
680#endif
681}
682
683static void bulk_copy_received(struct bulk_channel *channel,
684                               struct bulk_buffer *buffer,
685                               void *meta)
686{
687#if BLOCK_BENCH_ENABLE
688    struct bs_meta_data *bsmeta = (struct bs_meta_data *) meta;
689
690    struct bs_callback_arg arg = {
691        .binding = bsmeta->cont.arg,
692        .buf = buffer,
693        .block_id = bsmeta->block_id,
694        .req_id = bsmeta->req_id};
695
696    if (bsmeta->cont.handler) {
697        bsmeta->cont.handler(&arg, SYS_ERR_OK, channel);
698    }
699#else
700    /* todo directly forward.. */
701#endif
702}
703
704static void bulk_copy_released(struct bulk_channel *channel,
705                               struct bulk_buffer *buffer)
706{
707#if BLOCK_BENCH_ENABLE
708
709#else
710    /* todo directly forward.. */
711    block_local_release_copy(buffer);
712#endif
713}
714