1/** 2 * \file 3 * \brief Network server thread of the bulk server 4 */ 5 6/* 7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group. 13 */ 14#include <string.h> 15 16#include <barrelfish/barrelfish.h> 17#include <barrelfish/nameservice_client.h> 18 19#include <bulk_transfer/bulk_transfer.h> 20#include <bulk_transfer/bulk_sm.h> 21 22#include <if/block_service_defs.h> 23 24#include "block_storage.h" 25#include "block_server.h" 26#include "network_client.h" 27#include "local_server.h" 28 29/* --------------------------- Server State ---------------------------- */ 30static uint8_t server_state = SERVICE_STATE_UNINITIALIZED; 31 32static iref_t server_iref = 0; 33 34static uint8_t is_client = 0; 35 36static struct block_net_service *block_server; 37 38static uint8_t is_bound = false; 39 40static struct block_local_service local_server; 41 42static void send_status_reply(struct block_service_binding *binding, 43 enum block_err_code error_code, 44 uint32_t seqn, 45 enum block_net_msg_type reqid); 46 47#if BLOCK_ENABLE_NETWORKING 48// condition used to singal controlling code to wait for a condition 49static bool wait_cond; 50 51static inline void wait_for_condition(volatile bool *cond) 52{ 53 while (*cond) { 54 messages_wait_and_handle_next(); 55 } 56} 57 58struct pool_assign_arg 59{ 60 struct bulk_channel *chan_server; 61 struct bulk_channel *chan_client; 62 struct bulk_pool *pool; 63 volatile bool wait_cond; 64 errval_t err; 65}; 66 67/* ------------------------ Bulk Transfer Callbacks ----------------------*/ 68 69static void pool_assigned_handler(void *arg, 70 errval_t err, 71 struct bulk_channel *chan) 72{ 73 struct pool_assign_arg *pa = arg; 74 pa->err = err; 75 pa->wait_cond = false; 76} 77 78#endif 79 80static errval_t bulk_pool_assigned_cb(struct bulk_channel *channel, 81 struct bulk_pool *pool) 82{ 83 84#if !BLOCK_ENABLE_NETWORKING 85 return SYS_ERR_OK; 86#else 87 88 errval_t err = SYS_ERR_OK; 89 90 debug_printf("Pool assign request received. \n"); 91 92 struct pool_assign_arg *arg = malloc(sizeof(struct pool_assign_arg)); 93 assert(arg); 94 arg->chan_client = channel; 95 arg->pool = pool; 96 arg->wait_cond = 1; 97 98 struct bulk_continuation cont = { 99 .handler = pool_assigned_handler, 100 .arg = arg, }; 101 102 /* forward the pool assignment */ 103 if (channel->direction == BULK_DIRECTION_RX) { 104 arg->chan_server = &block_server->tx_chan; 105 err = bulk_channel_assign_pool(&block_server->tx_chan, pool, cont); 106 } else { 107 arg->chan_server = &block_server->rx_chan; 108 err = bulk_channel_assign_pool(&block_server->rx_chan, pool, cont); 109 } 110 111 if (err_is_fail(err)) { 112 return err; 113 } 114 115 wait_cond = true; 116 wait_for_condition(&arg->wait_cond); 117 118 assert(pool == arg->pool); 119 assert(arg->chan_client == channel); 120 err = arg->err; 121 free(arg); 122 123 return err; 124#endif 125} 126 127static errval_t bulk_pool_removed_cb(struct bulk_channel *channel, 128 struct bulk_pool *pool) 129{ 130 return SYS_ERR_OK; 131} 132 133 134static void bulk_move_received(struct bulk_channel *channel, 135 struct bulk_buffer *buffer, 136 void *meta) 137{ 138 BS_LOCAL_DEBUG_TRACE 139 140 errval_t err; 141 142 struct bs_meta_data *bs = (struct bs_meta_data *) meta; 143 144 if (is_client) { 145 err = block_net_write(block_server, 1, &buffer, meta, BULK_CONT_NOP); 146 if (err_is_fail(err)) { 147 debug_printf("ERROR: block net write. %s", err_getstring(err)); 148 } 149 } else { 150 err = block_storage_write(bs->block_id, buffer->address); 151 if (err_is_fail(err)) { 152 BS_LOCAL_DEBUG("%s", "ERROR: block could not be written"); 153 } 154 err = bulk_channel_pass(channel, buffer, meta, 155 BULK_CONT_NOP); 156 if (err_is_fail(err)) { 157 DEBUG_ERR(err, "failed to pass the buffer back"); 158 }; 159 send_status_reply(local_server.binding, err, bs->req_id, 160 BLOCK_NET_MSG_WRITE); 161 } 162} 163 164static void bulk_copy_received(struct bulk_channel *channel, 165 struct bulk_buffer *buffer, 166 void *meta) 167{ 168 BS_LOCAL_DEBUG_TRACE 169 170 assert(!"Block server does not deal with copies for now.."); 171 172} 173 174static void bulk_buffer_received(struct bulk_channel *channel, 175 struct bulk_buffer *buffer, 176 void *meta) 177{ 178 BS_LOCAL_DEBUG_TRACE 179 180 if (is_client) { 181 /* forward buffer to the server */ 182 if (channel->direction == BULK_DIRECTION_RX) { 183 bulk_channel_pass(&block_server->tx_chan, buffer, meta, 184 BULK_CONT_NOP); 185 } else { 186 bulk_channel_pass(&block_server->rx_chan, buffer, meta, 187 BULK_CONT_NOP); 188 } 189 190 } else { 191 /* put it into the buffer store */ 192 block_server_insert_buffer(&bs_bulk_buffers, buffer, channel); 193 } 194} 195 196static void bulk_copy_released(struct bulk_channel *channel, 197 struct bulk_buffer *buffer) 198{ 199 if (is_client) { 200 201 } else { 202 block_server_insert_buffer(&bs_bulk_buffers, buffer, NULL); 203 } 204} 205 206static void bulk_bind_done_cb(void *arg, 207 errval_t err, 208 struct bulk_channel *channel) 209{ 210 BS_LOCAL_DEBUG_TRACE 211 struct block_local_service *ls = (struct block_local_service *) arg; 212 213 if (err_is_fail(err)) { 214 DEBUG_ERR(err, "callback: Binding failed.\n"); 215 return; 216 } 217 218 ls->bound++; 219} 220 221static struct bulk_channel_callbacks bulk_rx_cb = { 222 .pool_assigned = bulk_pool_assigned_cb, 223 .pool_removed = bulk_pool_removed_cb, 224 .move_received = bulk_move_received, 225 .copy_received = bulk_copy_received }; 226 227static struct bulk_channel_callbacks bulk_tx_cb = { 228 .pool_assigned = bulk_pool_assigned_cb, 229 .pool_removed = bulk_pool_removed_cb, 230 .buffer_received = bulk_buffer_received, 231 .copy_released = bulk_copy_released }; 232 233/* ------------------------ Sending Status Reply ---------------------------*/ 234 235struct bs_status_reply 236{ 237 enum block_net_msg_type req; 238 uint32_t seqn; 239 enum block_net_err err; 240 struct block_service_binding *binding; 241}; 242 243static void status_reply_sent_cb(void *a) 244{ 245 free(a); 246} 247 248static void status_reply_do_send(void *a) 249{ 250 errval_t err; 251 252 struct bs_status_reply *req = (struct bs_status_reply*) a; 253 254 struct event_closure txcont = MKCONT(status_reply_sent_cb, req); 255 err = block_service_status__tx(req->binding, txcont, req->err, req->seqn, 256 req->req); 257 if (err_is_fail(err)) { 258 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 259 struct waitset *ws = get_default_waitset(); 260 txcont = MKCONT(status_reply_do_send, req); 261 err = req->binding->register_send(req->binding, ws, txcont); 262 if (err_is_fail(err)) { 263 // note that only one continuation may be registered at a time 264 DEBUG_ERR(err, "register_send on binding failed!"); 265 } 266 267 } 268 } 269} 270 271static void send_status_reply(struct block_service_binding *binding, 272 enum block_err_code error_code, 273 uint32_t seqn, 274 enum block_net_msg_type reqid) 275{ 276 struct bs_status_reply *req = malloc(sizeof(struct bs_status_reply)); 277 278 req->seqn = seqn; 279 req->binding = binding; 280 req->err = error_code; 281 req->req = reqid; 282 283 status_reply_do_send(req); 284} 285 286/* ------------------------- Request Callbacks ------------------------- */ 287 288/** 289 * \brief callback for read requests on the flounder channel 290 */ 291static void rx_read_request(struct block_service_binding *binding, 292 uint32_t start_block, 293 uint32_t count, 294 uint32_t seqn) 295{ 296 BS_LOCAL_DEBUG_TRACE 297 298 errval_t err; 299 /* 300 * if is server, then serve the request locally 301 * else forward it to the network server 302 */ 303 304 if (is_client) { 305 err = block_net_read(block_server, start_block, count, seqn, 306 BULK_CONT_NOP); 307 if (err_is_fail(err)) { 308 debug_printf("failed to issue read request to server.\n"); 309 } 310 return; 311 } 312 313 // send_status_reply(binding, SYS_ERR_OK, seqn, BLOCK_NET_MSG_READ); 314 315 /* we're the server, so we can serve it locally */ 316 317 struct block_local_service *ls = (struct block_local_service *) binding->st; 318 struct bulk_channel *chan = &ls->tx_chan; 319 320 struct bs_meta_data *meta_data = malloc( 321 count * sizeof(struct bs_meta_data)); 322 assert(meta_data); 323 324 for (uint32_t i = 0; i < count; ++i) { 325 /* TODO: specify a pool */ 326 struct bulk_buffer *buf = block_server_get_buffer(&bs_bulk_buffers, 327 chan); 328 if (!buf) { 329 debug_printf("ERROR: Has no buffers left...\n"); 330 send_status_reply(binding, BLOCK_NET_MSG_READ, seqn, 331 BLOCK_NET_ERR_NO_BUFS); 332 free(meta_data); 333 return ; 334 } 335 336 err = block_storage_read(start_block + i, buf->address); 337 if (err_is_fail(err)) { 338 debug_printf("ERROR: block id is out of range: %i", 339 (uint32_t) (start_block + count)); 340 send_status_reply(binding, BLOCK_NET_MSG_READ, seqn, 341 BLOCK_NET_ERR_BLOCK_ID); 342 } 343 meta_data[i].block_id = start_block + i; 344 meta_data[i].req_id = 0; // XXX not available. not used by bs_user. 345 meta_data[i].cont = BULK_CONT_NOP; 346 BS_LOCAL_DEBUG("bulk_channel_move: chan=%p, buf=%p\n", chan, buf); 347 err = bulk_channel_move(chan, buf, meta_data + i, BULK_CONT_NOP); 348 if (err_is_fail(err)) { 349 send_status_reply(binding, BLOCK_NET_MSG_READ, seqn, err); 350 debug_printf("channel move failed"); 351 } 352 } 353 354 /* XXX: assume that the meta data has been copied... */ 355 free(meta_data); 356} 357 358static void rx_setup_request(struct block_service_binding *binding, 359 iref_t tx_iref, 360 iref_t rx_iref) 361{ 362 BS_LOCAL_DEBUG("tx_iref=%i, rx_iref=%i", (uint32_t )tx_iref, 363 (uint32_t )rx_iref); 364 365 errval_t err; 366 367 struct block_local_service *ls = (struct block_local_service *) binding->st; 368 369 assert(ls); 370 371 if (ls->bound != 0) { 372 /* binding already in progress */ 373 debug_printf("already bound\n"); 374 return; 375 } 376 377 err = bulk_sm_ep_create_remote(&ls->rx_ep, rx_iref); 378 assert(!err_is_fail(err)); 379 380 err = bulk_sm_ep_create_remote(&ls->tx_ep, tx_iref); 381 assert(!err_is_fail(err)); 382 383 struct bulk_channel_bind_params params = { 384 .role = BULK_ROLE_GENERIC, 385 .waitset = get_default_waitset(), 386 .user_state = ls, 387 .trust = BULK_TRUST_FULL, }; 388 389 struct bulk_continuation cont = { 390 .arg = ls, 391 .handler = bulk_bind_done_cb, }; 392 393 err = bulk_channel_bind(&ls->rx_chan, 394 (struct bulk_endpoint_descriptor *) &ls->rx_ep, 395 &bulk_rx_cb, ¶ms, cont); 396 if (err_is_fail(err)) { 397 DEBUG_ERR(err, "binding failed."); 398 } 399 400 err = bulk_channel_bind(&ls->tx_chan, 401 (struct bulk_endpoint_descriptor *) &ls->tx_ep, 402 &bulk_tx_cb, ¶ms, cont); 403 if (err_is_fail(err)) { 404 DEBUG_ERR(err, "binding failed."); 405 } 406} 407 408/* --------------------- Connection Initialization --------------------- */ 409 410/** 411 * \brief accepts new connections to the local via the flounder interface 412 */ 413static errval_t block_local_accept_cb(void *st, struct block_service_binding *b) 414{ 415 BS_LOCAL_DEBUG_TRACE 416 417 debug_printf("> New connection on the flounder interface\n"); 418 419 // do the channel initialization 420 b->rx_vtbl.read = rx_read_request; 421 b->rx_vtbl.setup = rx_setup_request; 422 423 assert(!is_bound); 424 425 is_bound = true; 426 427 b->st = &local_server; 428 local_server.binding = b; 429 430 return SYS_ERR_OK; 431} 432 433/* ---------------------- Channel Initialization ----------------------- */ 434 435/** 436 * \brief callback for interface export 437 */ 438static void block_local_export_cb(void *st, errval_t err, iref_t iref) 439{ 440 BS_LOCAL_DEBUG_TRACE 441 if (err_is_fail(err)) { 442 /* TODO: Error handling */ 443 server_state = SERVICE_STATE_FAILURE; 444 } 445 446 server_iref = iref; 447 448 server_state = SERVICE_STATE_EXPORTED; 449} 450 451/** 452 * \brief initializes the machine local block server 453 */ 454errval_t block_local_init(struct block_net_service *server, uint32_t flags) 455{ 456 BS_LOCAL_DEBUG_TRACE 457 458 errval_t err; 459 460 is_client = (flags & SERVICE_FLAG_CLIENT) && 1; 461 462 block_server = server; 463 464 // export the interface 465 err = block_service_export(NULL, block_local_export_cb, 466 block_local_accept_cb, get_default_waitset(), 467 IDC_EXPORT_FLAGS_DEFAULT); 468 if (err_is_fail(err)) { 469 return err; 470 } 471 472 while (server_state != SERVICE_STATE_EXPORTED) { 473 if (server_state == SERVICE_STATE_FAILURE) { 474 USER_PANIC("export resulted in a failure condition"); 475 } 476 messages_wait_and_handle_next(); 477 } 478 479 return SYS_ERR_OK; 480} 481 482/* ------------------------- Server Management ------------------------- */ 483 484/** 485 * \brief starts the machine local server of block service to accept requests 486 * 487 * This function should not return until stopped. 488 */ 489errval_t block_local_start(void) 490{ 491 BS_LOCAL_DEBUG_TRACE 492 493 if (server_state != SERVICE_STATE_EXPORTED) { 494 assert(server_state == SERVICE_STATE_EXPORTED); 495 /* TODO: return with error */ 496 } 497 498 errval_t err = nameservice_register(BLOCK_SERVER_NAME, server_iref); 499 if (err_is_fail(err)) { 500 return err; 501 } 502 503 return SYS_ERR_OK; 504} 505/** 506 * \brief stops the request handling of the machine local block service requests 507 */ 508errval_t block_local_stop(void) 509{ 510 // set the stop flag. 511 512 // tear down all bulk channels 513 514 // stop the flounder service 515 516 // free up resources 517 518 assert(!"NYI: block_local_stop"); 519 return SYS_ERR_OK; 520} 521 522/* ------------------ callbacks for the network server --------------------- */ 523 524/** 525 * forwards the received buffer to the requesting client 526 */ 527void block_local_data_ready(struct bulk_buffer *buffer, void *meta) 528{ 529 BS_LOCAL_DEBUG_TRACE 530 531 errval_t err; 532 533 err = bulk_channel_move(&local_server.tx_chan, buffer, meta, BULK_CONT_NOP); 534 if (err_is_fail(err)) { 535 DEBUG_ERR(err, "could not do the move"); 536 } 537} 538/** 539 * forwards the buffer ot the originating client 540 */ 541errval_t block_local_return_buffer(struct bulk_channel *chan, 542 struct bulk_buffer *buffer, 543 void *meta) 544{ 545 BS_LOCAL_DEBUG_TRACE 546 547 errval_t err; 548 struct bulk_channel *local_chan; 549 if (chan->direction == BULK_DIRECTION_RX) { 550 local_chan = &local_server.tx_chan; 551 } else { 552 local_chan = &local_server.rx_chan; 553 } 554 err = bulk_channel_pass(local_chan, buffer, meta, BULK_CONT_NOP); 555 if (err_is_fail(err)) { 556 DEBUG_ERR(err, "could not pass the buffer"); 557 } 558 559 return SYS_ERR_OK; 560} 561 562/** 563 * 564 */ 565errval_t block_local_release_copy(struct bulk_buffer *buffer) 566{ 567 //struct bs_meta_data *data = (struct bs_meta_data *)meta; 568 assert(!"NYI"); 569 570 return SYS_ERR_OK; 571} 572 573errval_t block_local_send_status(enum block_net_msg_type req, 574 uint32_t reqid, 575 enum block_net_err stats) 576{ 577 BS_LOCAL_DEBUG_TRACE 578 send_status_reply(local_server.binding, stats, reqid, req); 579 return SYS_ERR_OK; 580} 581