1/** 2 * \file 3 * \brief Network client of the block service 4 */ 5 6/* 7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group. 13 */ 14#include <string.h> 15 16#include <barrelfish/barrelfish.h> 17 18#include "block_server.h" 19#include <lwip/init.h> 20#include <lwip/tcp.h> 21#include <lwip/ip_addr.h> 22 23#include "network_common.h" 24#include "network_client.h" 25#include "local_server.h" 26 27#if BULK_NET_BACKEND_PROXY 28#include <bulk_transfer/bulk_net_proxy.h> 29#include <bulk_transfer/bulk_local.h> 30#endif 31 32/* number of bulk connections */ 33static volatile uint32_t bulk_connections = 0; 34 35// condition used to singal controlling code to wait for a condition 36static bool wait_cond; 37 38static inline void wait_for_condition(void) 39{ 40 while (wait_cond) { 41 messages_wait_and_handle_next(); 42 } 43} 44 45/* ----------------------bulk transfer callbacks ----------------------------*/ 46 47static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel, 48 struct bulk_pool *pool); 49static errval_t bulk_pool_removed_cb(struct bulk_channel *channel, 50 struct bulk_pool *pool); 51static void bulk_move_received(struct bulk_channel *channel, 52 struct bulk_buffer *buffer, 53 void *meta); 54static void bulk_buffer_received(struct bulk_channel *channel, 55 struct bulk_buffer *buffer, 56 void *meta); 57static void bulk_copy_received(struct bulk_channel *channel, 58 struct bulk_buffer *buffer, 59 void *meta); 60static void bulk_copy_released(struct bulk_channel *channel, 61 struct bulk_buffer *buffer); 62static errval_t bulk_bind_received(struct bulk_channel *channel); 63 64static struct bulk_channel_callbacks bulk_rx_cb = { 65 .bind_received = bulk_bind_received, 66 .pool_assigned = bulk_pool_assigned_cb, 67 .pool_removed = bulk_pool_removed_cb, 68 .move_received = bulk_move_received, 69 .copy_received = bulk_copy_received, 70 .buffer_received = bulk_buffer_received, 71 .copy_released = bulk_copy_released, }; 72 73static struct bulk_channel_callbacks bulk_tx_cb = { 74 .bind_received = bulk_bind_received, 75 .pool_assigned = bulk_pool_assigned_cb, 76 .pool_removed = bulk_pool_removed_cb, 77 .buffer_received = bulk_buffer_received, 78 .copy_released = bulk_copy_released, 79 .move_received = bulk_move_received, 80 .copy_received = bulk_copy_received, 81 82}; 83 84/* --------------------- lwip/tcp callbacks ---------------------------- */ 85 86/** 87 * callback when a send event has been completed 88 */ 89static err_t block_net_sent_cb(void *arg, struct tcp_pcb *pcb, u16_t len) 90{ 91 BS_NET_DEBUG_TRACE 92 93 assert(pcb != NULL); 94 95 return ERR_OK; 96} 97 98/** 99 * callback when a new packet has been arrived 100 */ 101static err_t block_net_recv_cb(void *arg, 102 struct tcp_pcb *pcb, 103 struct pbuf *pb, 104 err_t err) 105{ 106 BS_NET_DEBUG_TRACE 107 108 assert(pcb != NULL); 109 assert(pb); 110 if (pb == NULL) { 111 /* connection closed. clean up and EXIT the program. */ 112 tcp_close(pcb); 113 assert(!"NYI: cleaning up the resources!"); 114 exit(EXIT_SUCCESS); 115 } 116 117 if ((err != ERR_OK) || !pb) { 118 /* there was an error... TODO */ 119 debug_printf("there was an error...\n"); 120 return err; 121 } 122 123 struct block_net_msg *msg = (struct block_net_msg *) pb->payload; 124 assert(msg); 125 if (pb->tot_len != msg->size) { 126 /* some thing wrong... */ 127 } 128 129 struct block_net_service *c = (struct block_net_service *) arg; 130 131 enum block_net_msg_type req; 132 133 switch (msg->type) { 134 case BLOCK_NET_MSG_STATUS: 135 136 req = msg->msg.status.req; 137 uint32_t reqid = msg->msg.status.reqid; 138 enum block_net_err status = msg->msg.status.code; 139 tcp_recved(pcb, pb->tot_len); 140 141 pbuf_free(pb); 142 143 switch (msg->msg.status.req) { 144 case BLOCK_NET_MSG_INIT: 145 if (status == BLOCK_NET_ERR_OK) { 146 debug_printf("/* all fine, connection fully established */\n"); 147 wait_cond = 0; 148 c->bound = 1; 149 } 150 break; 151 case BLOCK_NET_MSG_READ: 152 block_local_send_status(req, reqid, status); 153 break; 154 case BLOCK_NET_MSG_WRITE: 155#if BLOCK_BENCH_ENABLE 156 testrun_handle_status(req, reqid, status); 157#else 158 block_local_send_status(req, reqid, status); 159#endif 160 break; 161 default: 162 break; 163 } 164 return ERR_OK; 165 break; 166 default: 167 debug_printf("got an unknown reply..."); 168 break; 169 } 170 171 tcp_recved(pcb, pb->tot_len); 172 173 pbuf_free(pb); 174 175 return ERR_OK; 176} 177 178/** 179 * callback when error occures 180 */ 181static void block_net_err_cb(void *arg, err_t err) 182{ 183 BS_NET_DEBUG_TRACE 184 185 debug_printf("tcp is err: %d\n", (int) err); 186} 187 188/** 189 * callback for polling the connection 190 */ 191static err_t block_net_poll_cb(void *arg, struct tcp_pcb *pcb) 192{ 193 194 return ERR_OK; 195} 196 197/** 198 * callback when the connection to the server is established. 199 */ 200static err_t block_net_connected_cb(void *arg, struct tcp_pcb *pcb, err_t err) 201{ 202 BS_NET_DEBUG_TRACE 203 204 if (err != ERR_OK) { 205 fprintf(stderr, "tcp connection failed\n"); 206 if (err == ERR_TIMEOUT) { 207 debug_printf("connection attempt timed out..\n"); 208 /* TODO: try again */ 209 } 210 wait_cond = false; 211 return err; 212 } 213 214 tcp_sent(pcb, block_net_sent_cb); 215 tcp_recv(pcb, block_net_recv_cb); 216 tcp_err(pcb, block_net_err_cb); 217 tcp_poll(pcb, block_net_poll_cb, 10); 218 219 wait_cond = false; 220 221 return ERR_OK; 222} 223 224/* ------------------------- connect / disconnect ---------------------------*/ 225 226static errval_t block_net_send_ep(struct block_net_service *server) 227{ 228 BS_NET_DEBUG_TRACE 229 230 assert(server->rx_chan.state != BULK_STATE_UNINITIALIZED); 231 assert(server->tx_chan.state != BULK_STATE_UNINITIALIZED); 232 err_t err; 233 234 uint16_t size = sizeof(struct block_net_msg); 235 struct block_net_msg *msg = malloc(size); 236 if (!msg) { 237 return LWIP_ERR_MEM; /* ERROR CODE ?? */ 238 } 239 240 msg->type = BLOCK_NET_MSG_INIT; 241 msg->size = size; 242#if BULK_NET_BACKEND_PROXY 243 msg->msg.setup.tx_ep.ip = server->tpcb->local_ip; 244 msg->msg.setup.tx_ep.port = BLOCK_NET_PORT_RX; 245 msg->msg.setup.rx_ep.ip = server->tpcb->local_ip; 246 msg->msg.setup.rx_ep.port = BLOCK_NET_PORT_TX; 247#else 248 msg->msg.setup.rx_ep = *((struct bulk_net_endpoint_descriptor*) server 249 ->tx_chan.ep); 250 msg->msg.setup.do_bind = 1; 251 msg->msg.setup.tx_ep = *((struct bulk_net_endpoint_descriptor*) server 252 ->rx_chan.ep); 253#endif 254 /* TODO: check if there is enough space left... */ 255 256 err = tcp_write(server->tpcb, msg, size, TCP_WRITE_FLAG_COPY); 257 if (err != ERR_OK) { 258 /* abort the sending */ 259 fprintf(stderr, "error writing %d\n", err); 260 return LWIP_ERR_MEM; 261 } 262 263 err = tcp_output(server->tpcb); 264 265 if (err != ERR_OK) { 266 fprintf(stderr, "error in tcp_output %d\n", err); 267 return LWIP_ERR_MEM; 268 } 269 270 wait_cond = 1; 271 272 BS_NET_DEBUG_BULK("%s", "waiting for bulk binding\n"); 273 274 wait_for_condition(); 275 276 BS_NET_DEBUG_BULK("%s", "binding complete. \n"); 277 278 return SYS_ERR_OK; 279} 280 281#if BULK_NET_BACKEND_PROXY 282 283static void proxy_connected_cb(struct bulk_net_proxy *proxy) 284{ 285 debug_printf("APPL: cb_rx_connected\n"); 286 287 bulk_connections++; 288 289 if (bulk_connections == 2) { 290 wait_cond = 0; 291 } 292} 293#endif 294 295/** 296 * \brief connects to a network block service 297 */ 298errval_t block_net_connect(struct block_net_service *server, 299 struct ip_addr *ip, 300 uint16_t port) 301{ 302 BS_NET_DEBUG_TRACE 303 304 err_t err; 305 struct tcp_pcb *pcb; 306 307 memset(server, 0, sizeof(*server)); 308 memcpy(&server->ip, ip, sizeof(*ip)); 309 server->port = port; 310 311 BS_NET_DEBUG_NET("%s", "lwip initializing... "); 312 if (lwip_init("e10k", 1) == false) { 313 debug_printf("ERROR: lwip_init_auto failed!\n"); 314 return 1; 315 } 316 317 BS_NET_DEBUG_NET("%s", "lwip init done."); 318 319 // initialize network connection to the block server 320 pcb = tcp_new(); 321 if (pcb == NULL) { 322 return LWIP_ERR_CONN; 323 } 324 325 wait_cond = true; 326 err = tcp_connect(pcb, &server->ip, server->port, block_net_connected_cb); 327 // obtain basic information about the block server 328 329 BS_NET_DEBUG_NET("%s", "waiting for condition"); 330 wait_for_condition(); 331 332 if (err != ERR_OK) { 333 /* there was something wrong */ 334 return err; 335 } 336 337 tcp_arg(pcb, server); 338 server->tpcb = pcb; 339 340 // initialize the bulk channel to the block server 341 errval_t b_err; 342 343#if BULK_NET_BACKEND_PROXY 344 BS_NET_DEBUG_NET("%s", "creating bulk net proxy channel"); 345 346 bulk_local_init_endpoint(&server->rx_ep, NULL); 347 bulk_local_init_endpoint(&server->tx_ep, NULL); 348 349#else 350 BS_NET_DEBUG_NET("%s", "creating bulk channels"); 351 352 struct bulk_net_ep_setup ep_setup = { 353 .port = BLOCK_NET_PORT_RX, 354 .ip = pcb->local_ip, /// XXX: is this correct? 355 .queue = BLOCK_NET_RX_QUEUE, 356 .max_queues = BLOCK_NET_MAX_QUEUES, 357 .buffer_size = BLOCK_SIZE, 358 .buffer_count = BLOCK_NET_BUFFER_COUNT, 359 .no_copy = BULK_NET_BACKEND_NOCOPY }; 360 b_err = bulk_net_ep_create(&server->rx_ep, &ep_setup); 361 assert(!err_is_fail(b_err)); 362 363 ep_setup.port = BLOCK_NET_PORT_TX; 364 ep_setup.queue = BLOCK_NET_TX_QUEUE; 365 b_err = bulk_net_ep_create(&server->tx_ep, &ep_setup); 366 assert(!err_is_fail(b_err)); 367#endif 368 369 struct bulk_channel_setup chan_setup = { 370 .direction = BULK_DIRECTION_TX, 371 .role = BULK_ROLE_MASTER, 372 .trust = BULK_TRUST_FULL, 373 .meta_size = sizeof(struct bs_meta_data), 374 .waitset = get_default_waitset(), 375 .user_state = server, }; 376 377 b_err = bulk_channel_create( 378 &server->tx_chan, 379 (struct bulk_endpoint_descriptor *) &server->tx_ep, 380 &bulk_tx_cb, &chan_setup); 381 if (err_is_fail(b_err)) { 382 bulk_channel_destroy(&server->tx_chan, BULK_CONT_NOP); 383 debug_printf("Failed to create the TX channel\n"); 384 return b_err; 385 } 386 387 chan_setup.direction = BULK_DIRECTION_RX; 388 b_err = bulk_channel_create( 389 &server->rx_chan, 390 (struct bulk_endpoint_descriptor *) &server->rx_ep, 391 &bulk_rx_cb, &chan_setup); 392 if (err_is_fail(b_err)) { 393 bulk_channel_destroy(&server->tx_chan, BULK_CONT_NOP); 394 debug_printf("Failed to create the RX channel\n"); 395 return b_err; 396 } 397#if BULK_NET_BACKEND_PROXY 398 399 bulk_local_init_endpoint(&server->rx_p_ep, &server->rx_chan); 400 bulk_local_init_endpoint(&server->tx_p_ep, &server->tx_chan); 401 402 err = bulk_net_proxy_listen(&server->rx_proxy, &server->rx_p_ep.generic, 403 server->rx_chan.waitset, BLOCK_SIZE, "e10k", 404 BLOCK_NET_RX_QUEUE, 405 BLOCK_NET_PORT_RX, proxy_connected_cb); 406 if (err_is_fail(err)) { 407 return err; 408 } 409 server->rx_proxy.user_state = server; 410 411 err = bulk_net_proxy_listen(&server->tx_proxy, &server->tx_p_ep.generic, 412 server->tx_chan.waitset, BLOCK_SIZE, "e10k", 413 BLOCK_NET_TX_QUEUE, 414 BLOCK_NET_PORT_TX, proxy_connected_cb); 415 server->tx_proxy.user_state = server; 416 if (err_is_fail(err)) { 417 return err; 418 } 419 420#endif 421 return block_net_send_ep(server); 422} 423 424/** 425 * \brief disconnects form the network block service 426 */ 427errval_t block_net_disconnect(struct block_net_service *server) 428{ 429 BS_NET_DEBUG_TRACE 430 431 if (server->tx_chan.state == BULK_STATE_CONNECTED) { 432 /* free up the bulk channel */ 433 } 434 435 if (server->rx_chan.state == BULK_STATE_CONNECTED) { 436 /* free up the bulk channel */ 437 } 438 439 // tear down the bulk channel 440 if (server->tpcb) { 441 // tear down the service level network connection 442 tcp_close(server->tpcb); 443 tcp_arg(server->tpcb, NULL); 444 tcp_sent(server->tpcb, NULL); 445 tcp_recv(server->tpcb, NULL); 446 } 447 /*TODO: do we have to free up something from lwip ? */ 448 449 return SYS_ERR_OK; 450} 451 452/* ------------------------- read / write ---------------------------*/ 453 454/** 455 * \brief issues a new retrieval request to the network block server 456 * 457 * \param block_start the id of the first block 458 * \param count the number of blocks to read 459 */ 460errval_t block_net_read(struct block_net_service *server, 461 size_t block_start, 462 size_t count, 463 uint32_t seqn, 464 struct bulk_continuation cont) 465{ 466 BS_NET_DEBUG_TRACE 467 468 err_t err; 469 470 if (server->rx_chan.state != BULK_STATE_CONNECTED || !server->tpcb) { 471 return BLOCK_ERR_NOT_CONNECTED; 472 } 473 474 if (server->bound != 1) { 475 /* the connection has not yet been fully established yet */ 476 return BLOCK_ERR_NOT_CONNECTED; 477 } 478 479 uint16_t size = sizeof(struct block_net_msg); 480 struct block_net_msg *msg = malloc(size); 481 if (!msg) { 482 return LWIP_ERR_MEM; /* ERROR CODE ?? */ 483 } 484 485 msg->type = BLOCK_NET_MSG_READ; 486 msg->size = size; 487 msg->msg.read.block_id = block_start; 488 msg->msg.read.count = count; 489 msg->msg.read.req_id = seqn; 490 msg->msg.read.cont = cont; 491 492 server->tpcb->flags |= TF_NODELAY; 493 494 /* TODO: check if there is enough space left... */ 495 496 err = tcp_write(server->tpcb, msg, size, TCP_WRITE_FLAG_COPY); 497 if (err != ERR_OK) { 498 /* abort the sending */ 499 fprintf(stderr, "error writing %d\n", err); 500 return LWIP_ERR_MEM; 501 } 502 503 err = tcp_output(server->tpcb); 504 505 if (err != ERR_OK) { 506 fprintf(stderr, "error in tcp_output %d\n", err); 507 return LWIP_ERR_MEM; 508 } 509 510 /* XXX: assume that the data hase been copied into the pbuf */ 511 free(msg); 512 513 return SYS_ERR_OK; 514} 515 516/** 517 * \brief forwards the write request to the network block server 518 * 519 * \param block_start the id of the first block 520 * \param count the numbers of blocks to write 521 * \param buf pointer to an array of buffers 522 * \param cont continuation for the callback when the buffer is sent 523 * 524 */ 525errval_t block_net_write(struct block_net_service *server, 526 size_t count, 527 struct bulk_buffer **buf, 528 struct bs_meta_data *meta, 529 struct bulk_continuation cont) 530{ 531 BS_NET_DEBUG_TRACE 532 533 assert(server->tx_chan.state == BULK_STATE_CONNECTED); 534 535 if (server->bound != 1) { 536 /* the connection has not yet been fully established */ 537 return 1; 538 } 539 540 /* 541 * Note: we just need to send the buffers over the bulk channel with the 542 * corresponding meta data. 543 */ 544 errval_t err; 545 for (uint32_t i = 0; i < count; ++i) { 546 err = bulk_channel_move(&server->tx_chan, buf[i], meta + i, cont); 547 if (err_is_fail(err)) { 548 if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) { 549 /* this indicates a serious problem and all will fail */ 550 return err; 551 } 552 } 553 } 554 555 // return status code 556 return SYS_ERR_OK; 557} 558 559errval_t block_net_pass(struct block_net_service *server, 560 size_t count, 561 struct bulk_buffer **buf, 562 struct bs_meta_data *meta, 563 struct bulk_continuation cont) 564{ 565 BS_NET_DEBUG_TRACE 566 567 assert(server->rx_chan.state == BULK_STATE_CONNECTED); 568 569 if (server->bound != 1) { 570 /* the connection has not yet been fully established */ 571 return 1; 572 } 573 574 /* 575 * Note: we just need to send the buffers over the bulk channel with the 576 * corresponding meta data. 577 */ 578 errval_t err; 579 for (uint32_t i = 0; i < count; ++i) { 580 err = bulk_channel_pass(&server->rx_chan, buf[i], meta + i, cont); 581 if (err_is_fail(err)) { 582 if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) { 583 /* this indicates a serious problem and all will fail */ 584 return err; 585 } 586 } 587 } 588 589 // return status code 590 return SYS_ERR_OK; 591} 592 593errval_t block_net_release(struct block_net_service *server, 594 size_t count, 595 struct bulk_buffer **buf, 596 struct bs_meta_data *meta, 597 struct bulk_continuation cont) 598{ 599 BS_NET_DEBUG_TRACE 600 601 assert(server->rx_chan.state == BULK_STATE_CONNECTED); 602 603 if (server->bound != 1) { 604 /* the connection has not yet been fully established */ 605 return 1; 606 } 607 608 /* 609 * Note: we just need to send the buffers over the bulk channel with the 610 * corresponding meta data. 611 */ 612 errval_t err; 613 for (uint32_t i = 0; i < count; ++i) { 614 err = bulk_channel_release(&server->rx_chan, buf[i], cont); 615 if (err_is_fail(err)) { 616 if (err != BULK_TRANSFER_BUFFER_NOT_OWNED) { 617 /* this indicates a serious problem and all will fail */ 618 return err; 619 } 620 } 621 } 622 623 // return status code 624 return SYS_ERR_OK; 625} 626 627/* ------------------------ bulk transfer callbacks --------------------------*/ 628 629static errval_t bulk_bind_received(struct bulk_channel *channel) 630{ 631 BS_NET_DEBUG_TRACE 632 633 bulk_connections++; 634 635 if (bulk_connections == 2) { 636 wait_cond = 0; 637 } 638 return SYS_ERR_OK; 639} 640 641static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel, 642 struct bulk_pool *pool) 643{ 644 BS_NET_DEBUG_TRACE 645 646 return SYS_ERR_OK; 647} 648 649static errval_t bulk_pool_removed_cb(struct bulk_channel *channel, 650 struct bulk_pool *pool) 651{ 652 assert(!"NYI"); 653 return SYS_ERR_OK; 654} 655 656static void bulk_move_received(struct bulk_channel *channel, 657 struct bulk_buffer *buffer, 658 void *meta) 659{ 660 BS_NET_DEBUG_TRACE 661 662#if BLOCK_BENCH_ENABLE 663 testrun_bulk_move_received(channel, buffer,meta); 664#else 665 /* todo directly forward.. */ 666 block_local_data_ready(buffer, meta); 667#endif 668} 669 670static void bulk_buffer_received(struct bulk_channel *channel, 671 struct bulk_buffer *buffer, 672 void *meta) 673{ 674#if BLOCK_BENCH_ENABLE 675 testrun_bulk_buffer_received(channel, 676 buffer, 677 meta); 678#else 679 block_local_return_buffer(channel, buffer, meta); 680#endif 681} 682 683static void bulk_copy_received(struct bulk_channel *channel, 684 struct bulk_buffer *buffer, 685 void *meta) 686{ 687#if BLOCK_BENCH_ENABLE 688 struct bs_meta_data *bsmeta = (struct bs_meta_data *) meta; 689 690 struct bs_callback_arg arg = { 691 .binding = bsmeta->cont.arg, 692 .buf = buffer, 693 .block_id = bsmeta->block_id, 694 .req_id = bsmeta->req_id}; 695 696 if (bsmeta->cont.handler) { 697 bsmeta->cont.handler(&arg, SYS_ERR_OK, channel); 698 } 699#else 700 /* todo directly forward.. */ 701#endif 702} 703 704static void bulk_copy_released(struct bulk_channel *channel, 705 struct bulk_buffer *buffer) 706{ 707#if BLOCK_BENCH_ENABLE 708 709#else 710 /* todo directly forward.. */ 711 block_local_release_copy(buffer); 712#endif 713} 714