1/** 2 * \file 3 * \brief Unidirectional bulk data transfer via shared memory 4 */ 5 6/* 7 * Copyright (c) 2013, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <barrelfish/event_queue.h> 16#include <string.h> 17 18#include <bulk_transfer/bulk_transfer.h> 19#include <bulk_transfer/bulk_allocator.h> 20#include <bulk_transfer/bulk_local.h> 21 22#include "../../bulk_pool.h" 23#include "../../bulk_buffer.h" 24 25#include "../../helpers.h" 26 27//#define IMPL_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg); 28#define IMPL_DEBUG(fmt, msg...) 29 30//#define EVENT_DEBUG(fmt, msg...) debug_printf("%s: "fmt"\n", __func__, msg); 31#define EVENT_DEBUG(fmt, msg...) 32//#define EVENT_DEBUG_TRACE debug_printf("%s\n", __func__); 33#define EVENT_DEBUG_TRACE 34 35struct local_channel 36{ 37 struct bulk_channel *other; 38 struct event_queue events; 39}; 40 41struct local_event 42{ 43 struct event_queue_node eqn; 44 struct bulk_channel *channel; 45 void (*handler)(struct local_event *); 46 struct bulk_continuation cont; 47 union 48 { 49 struct 50 { 51 errval_t err; 52 } status; 53 54 struct 55 { 56 errval_t err; 57 } bind_done; 58 59 struct 60 { 61 errval_t err; 62 struct bulk_pool *pool; 63 } pool_assigned; 64 65 struct 66 { 67 struct bulk_pool_id pool_id; 68 size_t buf_id; 69 struct bulk_buffer *buffer; 70 void *meta; 71 } move_received; 72 73 struct 74 { 75 struct bulk_buffer *buffer; 76 struct bulk_pool_id pool_id; 77 size_t buf_id; 78 void *meta; 79 } buffer_received; 80 81 struct 82 { 83 struct bulk_pool_id pool_id; 84 size_t buf_id; 85 struct bulk_buffer *buffer; 86 void *meta; 87 } copy_received; 88 89 struct 90 { 91 struct bulk_pool_id pool_id; 92 size_t buf_id; 93 struct bulk_buffer *buffer; 94 } copy_released; 95 } params; 96}; 97 98static void event_handler(void *arg); 99 100static errval_t impl_create(struct bulk_channel *channel); 101static errval_t impl_bind(struct bulk_channel *channel, 102 struct bulk_continuation cont); 103static errval_t impl_assign_pool(struct bulk_channel *channel, 104 struct bulk_pool *pool, 105 struct bulk_continuation cont); 106static errval_t impl_move(struct bulk_channel *channel, 107 struct bulk_buffer *buffer, 108 void *meta, 109 struct bulk_continuation cont); 110static errval_t impl_pass(struct bulk_channel *channel, 111 struct bulk_buffer *buffer, 112 void *meta, 113 struct bulk_continuation cont); 114static errval_t impl_copy(struct bulk_channel *channel, 115 struct bulk_buffer *buffer, 116 void *meta, 117 struct bulk_continuation cont); 118static errval_t impl_release(struct bulk_channel *channel, 119 struct bulk_buffer *buffer, 120 struct bulk_continuation cont); 121 122static struct bulk_implementation implementation = { 123 .channel_create = impl_create, 124 .channel_bind = impl_bind, 125 .assign_pool = impl_assign_pool, 126 .move = impl_move, 127 .pass = impl_pass, 128 .copy = impl_copy, 129 .release = impl_release, }; 130 131static errval_t init_channel(struct bulk_channel *channel) 132{ 133 struct local_channel *l = malloc(sizeof(*l)); 134 if (l == NULL) { 135 return BULK_TRANSFER_MEM; 136 } 137 138 channel->impl_data = l; 139 event_queue_init(&l->events, channel->waitset, EVENT_QUEUE_CONTINUOUS); 140 return SYS_ERR_OK; 141} 142 143/* ----------------------- event management --------------------------------*/ 144 145/** 146 * allocates a new event 147 */ 148static errval_t event_alloc(struct bulk_channel *channel, 149 struct local_event **ev, 150 void (*handler)(struct local_event *), 151 size_t extra) 152{ 153 *ev = malloc(sizeof(struct local_event) + extra); 154 if (*ev == NULL) { 155 return BULK_TRANSFER_MEM; 156 } 157 158 (*ev)->channel = channel; 159 (*ev)->handler = handler; 160 return SYS_ERR_OK; 161} 162 163/** 164 * enqueues the local event to the event queue of the channel 165 */ 166static void event_enqueue(struct local_event *lev) 167{ 168 struct local_channel *local = lev->channel->impl_data; 169 event_queue_add(&local->events, &lev->eqn, MKCLOSURE(event_handler, lev)); 170} 171 172/* ========================= EVENT HANDLERS ================================ */ 173 174/* ------------------------- event handlers -------------------------------- */ 175 176/** 177 * 178 */ 179static void event_handler(void *arg) 180{ 181 struct local_event *lev = arg; 182 lev->handler(lev); 183 free(lev); 184} 185 186static void event_op_done(struct local_event *lev) 187{ 188 EVENT_DEBUG_TRACE 189 if (lev->cont.handler) { 190 lev->cont.handler(NULL, SYS_ERR_OK, lev->channel); 191 } else { 192 EVENT_DEBUG("event_op_done(): no handler set...\n"); 193 } 194 free(lev); 195} 196 197/* -------------------------- event bind ----------------------------------- */ 198 199/** 200 * Gets called when the binding procedure is over. 201 * 202 * Side: Binding Domain. 203 */ 204static void event_bind_done(struct local_event *lev) 205{ 206 EVENT_DEBUG_TRACE; 207 assert(lev); 208 209 if (lev->cont.handler == NULL) { 210 EVENT_DEBUG("%s", "handler not set"); 211 return; 212 } 213 214 lev->cont.handler(lev->cont.arg, lev->params.bind_done.err, lev->channel); 215} 216 217/** 218 * Gets called when a bind request has been received 219 * 220 * Side: Creating Side 221 */ 222static void event_bind_received(struct local_event *lev) 223{ 224 EVENT_DEBUG_TRACE 225 226 errval_t err, reply = SYS_ERR_OK; 227 struct local_event *ev; 228 struct local_channel *l = lev->channel->impl_data; 229 230 /* do the callback to the application to inform about the binding */ 231 if (lev->channel->callbacks->bind_received) { 232 err = lev->channel->callbacks->bind_received(lev->channel); 233 if (err_is_fail(err)) { 234 reply = err; 235 } 236 } else { 237 /* XXX: or if no cb set, just say SYS_ERR_OK ? */ 238 reply = BULK_TRANSFER_NO_CALLBACK; 239 } 240 241 /* allocate and trigger event bind_done */ 242 err = event_alloc(l->other, &ev, event_bind_done, 0); 243 if (!err_is_fail(err)) { 244 ev->params.bind_done.err = reply; 245 ev->cont = lev->cont; 246 event_enqueue(ev); 247 } 248} 249 250/** 251 * Implementation specific bind procedure 252 * 253 * Side: Binding Side 254 */ 255static errval_t impl_bind(struct bulk_channel *channel, 256 struct bulk_continuation cont) 257{ 258 errval_t err; 259 struct local_channel *l, *o_l; 260 struct bulk_local_endpoint *ep; 261 struct local_event *ev; 262 263 /* Initialize the channel */ 264 err = init_channel(channel); 265 if (err_is_fail(err)) { 266 return err; 267 } 268 269 /* setting the pointers to the other channel */ 270 ep = (struct bulk_local_endpoint *) channel->ep; 271 l = channel->impl_data; 272 l->other = ep->other_channel; 273 o_l = l->other->impl_data; 274 o_l->other = channel; 275 276 /* set channel parameters from the other side */ 277 channel->role = bulk_role_other(l->other->role); 278 channel->direction = bulk_direction_other(l->other->role); 279 channel->meta_size = l->other->meta_size; 280 281 /* update the channel state */ 282 channel->state = BULK_STATE_CONNECTED; 283 l->other->state = BULK_STATE_CONNECTED; 284 285 /* allocate and trigger the bind event to the other side */ 286 err = event_alloc(l->other, &ev, event_bind_received, 0); 287 if (err_is_fail(err)) { 288 goto error; 289 } 290 291 // Trigger first event 292 ev->cont = cont; 293 event_enqueue(ev); 294 295 return SYS_ERR_OK; 296 297 error: free(l); 298 299 return err; 300} 301 302/* -------------------------- event pool assign ---------------------------- */ 303 304/** 305 * Gets called when the pool assignment on the other side is completed 306 * 307 * Side: Assigning Side 308 */ 309static void event_pool_assigned(struct local_event *lev) 310{ 311 EVENT_DEBUG_TRACE 312 313 errval_t err; 314 errval_t result = lev->params.pool_assigned.err; 315 316 if (lev->cont.handler) { 317 if (!err_is_fail(result)) { 318 err = bulk_pool_assign(lev->params.pool_assigned.pool, 319 lev->channel); 320 if (err_is_fail(err)) { 321 result = err; 322 } 323 } 324 325 EVENT_DEBUG(" > [%s]", (result==SYS_ERR_OK) ? "Success", "Failure"); 326 327 /* call the continuation */ 328 lev->cont.handler(lev->params.pool_assigned.pool, result, lev->channel); 329 } else { 330 EVENT_DEBUG("%s", "continuation handler not set"); 331 } 332} 333 334/** 335 * Gets called when a pool is assigned to the channel 336 * 337 * Side: Other 338 */ 339static void event_pool_assign(struct local_event *lev) 340{ 341 EVENT_DEBUG_TRACE 342 343 errval_t err; 344 errval_t assigned; 345 struct local_event *ev; 346 347 struct bulk_pool *pool = lev->params.pool_assigned.pool; 348 struct bulk_channel *chan = lev->channel; 349 struct local_channel *l = chan->impl_data; 350 351 if (bulk_pool_is_assigned(pool, chan)) { 352 /* channel is already assigned */ 353 EVENT_DEBUG("pool [%p] is already assigned to channel.", pool); 354 err = event_alloc(l->other, &ev, event_pool_assigned, 0); 355 if (!err_is_fail(err)) { 356 ev->params.pool_assigned.err = BULK_TRANSFER_POOL_ALREADY_ASSIGNED; 357 ev->cont = lev->cont; 358 event_enqueue(ev); 359 } 360 return; 361 } 362 363 /* allocate the structures for the pool */ 364 err = bulk_pool_alloc_with_id(&pool, pool->num_buffers, pool->buffer_size, 365 pool->id); 366 if (err_is_fail(err)) { 367 USER_PANIC_ERR(err, "Failed to allocate pool struct\n"); 368 return; 369 } 370 371 /* 372 * prepare the cap 373 */ 374 if (lev->params.pool_assigned.pool->trust == BULK_TRUST_FULL) { 375 err = slot_alloc(&pool->pool_cap); 376 if (err_is_fail(err)) { 377 EVENT_DEBUG("could not allocate a new slot for the cap: %s", 378 err_getstring(err)); 379 assigned = err; 380 goto done; 381 } 382 383 err = cap_copy(pool->pool_cap, 384 lev->params.pool_assigned.pool->pool_cap); 385 if (err_is_fail(err)) { 386 EVENT_DEBUG("could not allocate a new slot for the cap: %s", 387 err_getstring(err)); 388 assigned = err; 389 goto done; 390 } 391 } 392 393 pool->trust = lev->params.pool_assigned.pool->trust; 394 395 /* 396 * XXX: we set the trust level to none here to avoid the creation of 397 * the buffer caps. These have already been created and cannot 398 * be created a second time. [SYS_ERR_REVOKE_FIRST] 399 */ 400 401 err = bulk_pool_map(pool); 402 if (err_is_fail(err)) { 403 assigned = err; 404 goto done; 405 } 406 407 assert(lev->channel->callbacks->pool_assigned); 408 if (lev->channel->callbacks->pool_assigned) { 409 assigned = lev->channel->callbacks->pool_assigned(lev->channel, pool); 410 } else { 411 /* XXX: or if no cb set, just say SYS_ERR_OK ? */ 412 assigned = BULK_TRANSFER_NO_CALLBACK; 413 } 414 done: 415 416 if (err_is_fail(assigned)) { 417 bulk_pool_unmap(pool); 418 bulk_pool_dealloc(pool); 419 } else { 420 err = bulk_pool_assign(pool, lev->channel); 421 if (err_is_fail(err)) { 422 USER_PANIC_ERR(err, "Failed to assign the pool to the channel\n"); 423 } 424 } 425 426 err = event_alloc(l->other, &ev, event_pool_assigned, 0); 427 if (!err_is_fail(err)) { 428 ev->params.pool_assigned.err = assigned; 429 ev->params.pool_assigned.pool = lev->params.pool_assigned.pool; 430 ev->cont = lev->cont; 431 event_enqueue(ev); 432 } 433 434} 435 436/** 437 * Implementation specific handler for pool assing requests 438 * 439 * Side: Assigning Side 440 */ 441static errval_t impl_assign_pool(struct bulk_channel *channel, 442 struct bulk_pool *pool, 443 struct bulk_continuation cont) 444{ 445 errval_t err; 446 struct local_event *ev; 447 struct local_channel *l = channel->impl_data; 448 449 /* allocate and trigger the event */ 450 err = event_alloc(l->other, &ev, event_pool_assign, 0); 451 if (!err_is_fail(err)) { 452 ev->params.pool_assigned.pool = pool; 453 ev->cont = cont; 454 event_enqueue(ev); 455 } 456 return err; 457} 458 459/* -------------------------- event buffer move ---------------------------- */ 460 461/** 462 * Gets called when a buffer arrives via move operation 463 * 464 * Side: Receiving Side (SINK) 465 */ 466static void event_move_received(struct local_event *lev) 467{ 468 errval_t err; 469 470 struct bulk_pool *pool = bulk_pool_get(&lev->params.move_received.pool_id, 471 lev->channel); 472 size_t bufid = lev->params.copy_released.buf_id; 473 474 EVENT_DEBUG(" > pool=%p, bufid=%x", pool, (unsigned int )bufid); 475 assert(pool); 476 477 struct bulk_buffer *buf = pool->buffers[bufid]; 478 479 err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE); 480 if (err_is_fail(err)) { 481 USER_PANIC_ERR(err, "could not change the state of the buffer."); 482 } 483 484 if (lev->channel->callbacks->move_received) { 485 lev->channel->callbacks->move_received(lev->channel, buf, 486 lev->params.move_received.meta); 487 } 488} 489 490/** 491 * Implementation specific handler of the buffer move operation 492 * 493 * Side: Sending Side (SOURCE) 494 */ 495static errval_t impl_move(struct bulk_channel *channel, 496 struct bulk_buffer *buffer, 497 void *meta, 498 struct bulk_continuation cont) 499{ 500 errval_t err; 501 struct local_event *ev, *ev2; 502 503 struct local_channel *l = channel->impl_data; 504 505 void *m; 506 507 IMPL_DEBUG(" > buffer=%p", buffer->address); 508 509 /* trigger event to other channel */ 510 size_t meta_size = 0; 511 if (meta) { 512 meta_size = channel->meta_size; 513 } 514 err = event_alloc(l->other, &ev, event_move_received, meta_size); 515 if (!err_is_fail(err)) { 516 ev->params.move_received.meta = NULL; 517 if (meta) { 518 /* copy the meta data */ 519 m = ev + 1; 520 memcpy(m, meta, channel->meta_size); 521 ev->params.move_received.meta = m; 522 } 523 /* set parameters of the event */ 524 ev->params.move_received.pool_id = buffer->pool->id; 525 ev->params.move_received.buf_id = ((lvaddr_t) buffer->address 526 - buffer->pool->base_address) 527 / buffer->pool->buffer_size; 528 ev->params.move_received.buffer = buffer; 529 event_enqueue(ev); 530 } 531 532 /* trigger operation done event to this channel */ 533 err = event_alloc(channel, &ev2, event_op_done, 0); 534 if (!err_is_fail(err)) { 535 ev2->cont = cont; 536 event_op_done(ev2); 537 } 538 539 return err; 540} 541 542/* -------------------------- event buffer pass ---------------------------- */ 543 544/** 545 * Gets called when a buffer pass event occurs on the sending side 546 * 547 * Side: Sending Side (SOURCE) 548 */ 549static void event_buffer_received(struct local_event *lev) 550{ 551 errval_t err; 552 553 struct bulk_pool *pool = bulk_pool_get(&lev->params.buffer_received.pool_id, 554 lev->channel); 555 assert(pool); 556 557 size_t bufid = lev->params.buffer_received.buf_id; 558 struct bulk_buffer *buf = pool->buffers[bufid]; 559 560 assert(bufid < pool->num_buffers); 561 562 EVENT_DEBUG(" > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid); 563 564 /* we need to change the state of the buffer */ 565 err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE); 566 if (err_is_fail(err)) { 567 /* TODO: error handling */ 568 USER_PANIC_ERR(err, "could not change the state of the buffer."); 569 } 570 571 /* inform the application */ 572 if (lev->channel->callbacks->buffer_received) { 573 lev->channel->callbacks->buffer_received( 574 lev->channel, buf, lev->params.buffer_received.meta); 575 } 576} 577 578/** 579 * Backend specific handler for buffer pass operations 580 * 581 * Side: Receiving Side (SINK) 582 */ 583static errval_t impl_pass(struct bulk_channel *channel, 584 struct bulk_buffer *buffer, 585 void *meta, 586 struct bulk_continuation cont) 587{ 588 errval_t err; 589 struct local_event *ev, *ev2; 590 void *m; 591 struct local_channel *l = channel->impl_data; 592 593 IMPL_DEBUG(" > buffer=%p", buffer->address); 594 595 size_t meta_size = 0; 596 if (meta) { 597 meta_size = channel->meta_size; 598 } 599 /* allocate and trigger event */ 600 err = event_alloc(l->other, &ev, event_buffer_received, meta_size); 601 if (!err_is_fail(err)) { 602 ev->params.buffer_received.meta = NULL; 603 604 if (meta) { 605 /* copy meta data */ 606 m = ev + 1; 607 memcpy(m, meta, channel->meta_size); 608 ev->params.buffer_received.meta = m; 609 610 } 611 /* set event params */ 612 ev->params.buffer_received.pool_id = buffer->pool->id; 613 ev->params.buffer_received.buf_id = ((lvaddr_t) buffer->address 614 - buffer->pool->base_address) 615 / buffer->pool->buffer_size; 616 ev->params.buffer_received.buffer = buffer; 617 event_enqueue(ev); 618 } 619 620 /* trigger op done event */ 621 err = event_alloc(channel, &ev2, event_op_done, 0); 622 if (!err_is_fail(err)) { 623 ev2->cont = cont; 624 event_op_done(ev2); 625 } 626 return err; 627} 628 629/* -------------------------- event buffer copy ---------------------------- */ 630 631static void event_copy_received(struct local_event *lev) 632{ 633 struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_received.pool_id, 634 lev->channel); 635 size_t bufid = lev->params.copy_released.buf_id; 636 EVENT_DEBUG(" > pool=%p, bufid=%x", pool, (unsigned int )bufid); 637 assert(pool); 638 639 struct bulk_buffer *buf = pool->buffers[bufid]; 640 641 errval_t err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_ONLY); 642 if (err_is_fail(err)) { 643 /* TODO: error handling */ 644 USER_PANIC_ERR(err, "failed to change the state"); 645 } 646 if (lev->channel->callbacks->copy_received) { 647 lev->channel->callbacks->copy_received(lev->channel, buf, 648 lev->params.copy_received.meta); 649 650 } 651} 652 653static errval_t impl_copy(struct bulk_channel *channel, 654 struct bulk_buffer *buffer, 655 void *meta, 656 struct bulk_continuation cont) 657{ 658 IMPL_DEBUG(" > buffer=%p", buffer->address); 659 struct local_channel *l = channel->impl_data; 660 struct local_event *ev, *ev2; 661 void *m; 662 errval_t err; 663 size_t meta_size = 0; 664 if (meta) { 665 meta_size = channel->meta_size; 666 } 667 err = event_alloc(l->other, &ev, event_copy_received, meta_size); 668 if (!err_is_fail(err)) { 669 ev->params.copy_received.meta = NULL; 670 if (meta) { 671 m = ev + 1; 672 memcpy(m, meta, channel->meta_size); 673 ev->params.copy_received.meta = m; 674 } 675 ev->params.copy_received.buffer = buffer; 676 ev->params.copy_received.pool_id = buffer->pool->id; 677 ev->params.copy_received.buf_id = ((lvaddr_t) buffer->address 678 - buffer->pool->base_address) 679 / buffer->pool->buffer_size; 680 event_enqueue(ev); 681 } 682 683 /* trigger op done event */ 684 err = event_alloc(channel, &ev2, event_op_done, 0); 685 if (!err_is_fail(err)) { 686 ev2->cont = cont; 687 event_op_done(ev2); 688 } 689 return err; 690 691} 692 693/* -------------------------- event copy release --------------------------- */ 694 695/** 696 * Gets called when a copy release event occurred 697 * 698 * Side: Sending Side (SOURCE) 699 */ 700static void event_copy_released(struct local_event *lev) 701{ 702 errval_t err; 703 704 struct bulk_pool *pool = bulk_pool_get(&lev->params.copy_released.pool_id, 705 lev->channel); 706 assert(pool); 707 708 size_t bufid = lev->params.copy_released.buf_id; 709 struct bulk_buffer *buf = pool->buffers[bufid]; 710 711 buf->local_ref_count--; 712 713 EVENT_DEBUG(" > buffer=[%p], bufid=0x%x", buf, (unsigned int )bufid); 714 715 /* change the state of the buffer */ 716 if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) { 717 err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE); 718 if (err_is_fail(err)) { 719 USER_PANIC_ERR(err, "failed to change the state"); 720 } 721 } 722 723 /* inform the application */ 724 if (lev->channel->callbacks->copy_released) { 725 lev->channel->callbacks->copy_released(lev->channel, buf); 726 } 727} 728 729/** 730 * 731 */ 732static errval_t impl_release(struct bulk_channel *channel, 733 struct bulk_buffer *buffer, 734 struct bulk_continuation cont) 735{ 736 struct local_event *ev, *ev2; 737 errval_t err; 738 struct local_channel *l = channel->impl_data; 739 740 IMPL_DEBUG(" > buffer=%p", buffer->address); 741 742 /* allocate and trigger event */ 743 err = event_alloc(l->other, &ev, event_copy_released, 0); 744 if (!err_is_fail(err)) { 745 ev->params.copy_released.buffer = buffer; 746 ev->params.copy_released.pool_id = buffer->pool->id; 747 ev->params.copy_released.buf_id = ((lvaddr_t) buffer->address 748 - buffer->pool->base_address) 749 / buffer->pool->buffer_size; 750 751 event_enqueue(ev); 752 } 753 754 /* trigger op done event */ 755 err = event_alloc(channel, &ev2, event_op_done, 0); 756 if (!err_is_fail(err)) { 757 ev2->cont = cont; 758 event_op_done(ev2); 759 } 760 return err; 761} 762 763/* -------------------------- channel creation ----------------------------- */ 764 765/** 766 * Implementation specific handler for channel creation 767 */ 768static errval_t impl_create(struct bulk_channel *channel) 769{ 770 return init_channel(channel); 771} 772 773/** 774 * initializes the local endpoint 775 */ 776void bulk_local_init_endpoint(struct bulk_local_endpoint *endpoint, 777 struct bulk_channel *other_channel) 778{ 779 endpoint->generic.f = &implementation; 780 endpoint->other_channel = other_channel; 781} 782 783