1/** 2 * \file 3 * \brief producer consumer library 4 * 5 * This file provides a producer consumer protocol 6 */ 7 8/* 9 * Copyright (c) 2007-12 ETH Zurich. 10 * All rights reserved. 11 * 12 * This file is distributed under the terms in the attached LICENSE file. 13 * If you do not find this file, copies can be found by writing to: 14 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 15 */ 16#include <stdio.h> 17#include <string.h> 18#include <barrelfish/barrelfish.h> 19#include <barrelfish/bulk_transfer.h> 20#include <procon/procon.h> 21 22 23 24// Hack for profiling to see how much mfence slows the code down 25// should probably not be disabled. 26#define DISABLE_MFENCE 1 27 28#if 0 29static uint64_t sp_atomic_read_reg(union vreg *reg) 30{ 31 return reg->value; 32#ifndef DISABLE_MFENCE 33 mfence(); 34#endif 35} // end function: sp_atomic_read_reg 36#endif // 0 37 38static void sp_atomic_set_reg(union vreg *reg, uint64_t value) 39{ 40 reg->value = value; 41#ifndef DISABLE_MFENCE 42 mfence(); 43#endif 44/* 45#if !defined(__i386__) 46 cache_flush_range(reg, CACHESIZE); 47#endif // !defined(__i386__) 48*/ 49} 50// 3 mfence 51void sp_reload_regs(struct shared_pool_private *spp) 52{ 53 assert(spp != NULL); 54 struct shared_pool *sp = spp->sp; 55 assert(sp != NULL); 56 spp->c_read_id = spp->sp->read_reg.value; 57 spp->c_write_id = spp->sp->write_reg.value; 58 spp->c_size = spp->sp->size_reg.value; 59// spp->c_read_id = sp_atomic_read_reg(&spp->sp->read_reg); 60// spp->c_write_id = sp_atomic_read_reg(&spp->sp->write_reg); 61// spp->c_size = sp_atomic_read_reg(&spp->sp->size_reg); 62} 63 64 65 66// **************************** generic queue based code 67bool sp_gen_queue_empty(uint64_t read, uint64_t write) 68{ 69 return (read == write); 70} 71 72bool sp_gen_queue_full(uint64_t read, uint64_t write, uint64_t size) 73{ 74 return (((write + 1) % size ) == read); 75} 76 77uint64_t sp_c_range_size(uint64_t start, uint64_t end, uint64_t size) 78{ 79 80 // simple, non-wrapped space 81 if (start <= end) { 82 return (end - start); 83 } 84 85 // wrapped queue, so more complicated! 86 return ((size - start) + end); 87} 88 89 90// checks for (start <= value < end ) in circular queue of size "size" 91bool sp_c_between(uint64_t start, uint64_t value, uint64_t end, uint64_t size) 92{ 93 94 // sanity check: value must be less than size 95 if (value >= size) { 96 return false; 97 } 98 99 // Logical queue empty state 100 if (start == end) { 101 if (start == value) { 102 return true; 103 } 104 return false; 105 } 106 107 // simple, non-wrapped space 108 if (start < end) { 109 if ((start <= value) && (value < end)) { 110 return true; 111 } 112 return false; 113 } 114 115 // wrapped space, more complicated 116 if ((value < end) || (start <= value)) { 117 return true; 118 } 119 return false; 120} 121 122// ******************* spp queue code for condition checking 123 124// 4 mfence 125uint64_t sp_get_read_index(struct shared_pool_private *spp) 126{ 127 sp_reload_regs(spp); 128 return spp->c_read_id; 129} 130 131uint64_t sp_get_write_index(struct shared_pool_private *spp) 132{ 133 sp_reload_regs(spp); 134 return spp->c_write_id; 135} 136 137uint64_t sp_get_queue_size(struct shared_pool_private *spp) 138{ 139 sp_reload_regs(spp); 140 return spp->c_size; 141} 142 143 144// 0 mfence 145// Checks for queue empty condition 146bool sp_queue_empty(struct shared_pool_private *spp) 147{ 148// sp_reload_regs(spp); 149 return sp_gen_queue_empty(spp->c_read_id, spp->c_write_id); 150} 151 152 153// Check for queue full condition 154bool sp_queue_full(struct shared_pool_private *spp) 155{ 156 return sp_gen_queue_full(spp->c_read_id, spp->c_write_id, 157 spp->c_size); 158} 159 160 161// Checks if given index is peekable or not 162bool sp_read_peekable_index(struct shared_pool_private *spp, uint64_t idx) 163{ 164 sp_reload_regs(spp); 165 return sp_c_between(spp->c_read_id, idx, spp->c_write_id, spp->c_size); 166} // end function: sp_read_peekable_index 167 168 169// Checks if given index is settable for not for read_reg 170bool sp_validate_read_index(struct shared_pool_private *spp, uint64_t idx) 171{ 172 sp_reload_regs(spp); 173 // Since sp_c_between only checks for value < end and we want <= end, we 174 // check this case manually here 175 if (idx == spp->c_write_id) { 176 return true; 177 } 178 return sp_c_between(spp->c_read_id, idx, spp->c_write_id, spp->c_size); 179} 180 181 182// Returns no. of elements available for consumption 183uint64_t sp_queue_elements_count(struct shared_pool_private *spp) 184{ 185// sp_reload_regs(spp); 186 return sp_c_range_size(spp->c_read_id, spp->c_write_id, spp->c_size); 187} // end function: sp_queue_elements_count 188 189// Checks if given index is write peekable or not 190bool sp_write_peekable_index(struct shared_pool_private *spp, uint64_t idx) 191{ 192 sp_reload_regs(spp); 193 194 // Trivial case: index bigger than queue size 195 if (idx >= spp->c_size){ 196 return false; 197 } 198 199 // Trivial case: queue empty 200 if (sp_queue_empty(spp)) { 201 return true; 202 } 203 204 return sp_c_between(spp->c_write_id, idx, spp->c_read_id, spp->c_size); 205} // end function: sp_write_peekable_index 206 207 208// Checks if given index is valid for write or not 209bool sp_validate_write_index(struct shared_pool_private *spp, uint64_t idx) 210{ 211 return sp_write_peekable_index(spp, idx); 212} // end function: sp_validate_write_index 213 214// 4 mfence 215// Returns no. of free slots available for production 216uint64_t sp_queue_free_slots_count(struct shared_pool_private *spp) 217{ 218 sp_reload_regs(spp); 219 if (sp_queue_empty(spp)) { 220 return spp->c_size; 221 } 222 return sp_c_range_size(spp->c_write_id, spp->c_read_id, spp->c_size); 223} // end function: sp_queue_free_slots_count 224 225 226// ************* Initialization functions *********************** 227 228static size_t calculate_shared_pool_size(uint64_t slot_no) 229{ 230 return (sizeof(struct shared_pool) + 231 ((sizeof(union slot)) * (slot_no - TMP_SLOTS))); 232} 233 234// 4 mfence 235static void sp_reset_pool(struct shared_pool_private *spp, uint64_t slot_count) 236{ 237 assert(spp != NULL); 238 struct shared_pool *sp = spp->sp; 239 assert(sp != NULL); 240 assert(slot_count > TMP_SLOTS); 241 242 int i = 0; 243 244 // Esure that slot_count is <= alloted_slots 245 assert(slot_count <= spp->alloted_slots); 246 247 sp_atomic_set_reg(&sp->read_reg, 0); 248 sp_atomic_set_reg(&sp->write_reg, 0); 249 sp_atomic_set_reg(&sp->size_reg, slot_count); 250 for(i = 0; i < slot_count; ++i) { 251 memset(&sp->slot_list[i], 0, sizeof(union slot)); 252 } // end for: 253 254 sp_reload_regs(spp); 255 spp->notify_other_side = 0; 256 spp->ghost_read_id = spp->c_read_id; 257 spp->ghost_write_id = spp->c_write_id; 258 spp->pre_write_id = spp->c_read_id; 259 spp->produce_counter = 0; 260 spp->consume_counter = 0; 261 spp->clear_counter = 0; 262#ifndef DISABLE_MFENCE 263 mfence(); 264#endif 265} // sp_reset_pool 266 267 268// Creates a new shared_pool area and initializes it as creator 269struct shared_pool_private *sp_create_shared_pool(uint64_t slot_no, 270 uint8_t role) 271{ 272 273 struct shared_pool_private *spp = (struct shared_pool_private *) 274 malloc(sizeof(struct shared_pool_private)); 275 assert(spp != NULL); 276 277 errval_t err; 278 assert(slot_no > 2); 279 280 // adding 1 more slot for safety 281 size_t mem_size = calculate_shared_pool_size((slot_no)); 282 283 // NOTE: using bulk create here because bulk_create code has 284 // been modified to suit the shared buffer allocation 285 // FIXME: code repetation with mem_barrelfish_alloc_and_register 286 struct bulk_transfer bt_sp; 287 err = bulk_create(mem_size, sizeof(union slot), &(spp->cap), &bt_sp); 288 if (err_is_fail(err)) { 289 DEBUG_ERR(err, "bulk_create failed."); 290 return NULL; 291 } 292 spp->va = bt_sp.mem; 293 spp->sp = (struct shared_pool *)spp->va; 294 295 struct frame_identity f; 296 297 err = frame_identify(spp->cap, &f); 298 if (err_is_fail(err)) { 299 DEBUG_ERR(err, "frame_identify failed"); 300 return NULL; 301 } 302 spp->pa = f.base; 303 spp->mem_size = f.bytes; 304 spp->alloted_slots = slot_no; 305 spp->is_creator = true; 306 spp->role = role; 307 308 sp_reset_pool(spp, slot_no); 309 printf("Created shared_pool of size(Req %"PRIu64", Actual %"PRIu64") " 310 "with role [%"PRIu8"] and slots [%"PRIu64"]\n", 311 (uint64_t)mem_size, spp->mem_size, spp->role, 312 spp->alloted_slots); 313 314/* printf("##### procon sizeof spp[%lu], sizeof sp[%lu]\n", 315 sizeof(struct shared_pool_private), 316 sizeof(struct shared_pool) ); 317*/ 318#ifndef DISABLE_MFENCE 319 mfence(); 320#endif 321 return spp; 322} // end function: sp_create_shared_pool 323 324 325// Loads shared_pool area which is already created by some other creator 326errval_t sp_map_shared_pool(struct shared_pool_private *spp, struct capref cap, 327 uint64_t slot_no, uint8_t role) 328{ 329 errval_t err = SYS_ERR_OK; 330 assert(spp != NULL); 331 assert(spp->sp == NULL); 332 assert(slot_no > 2); 333 spp->cap = cap; 334 spp->alloted_slots = slot_no; 335 spp->role = role; 336 spp->is_creator = 0; 337 338 struct frame_identity f; 339 340 err = frame_identify(cap, &f); 341 if (err_is_fail(err)) { 342 DEBUG_ERR(err, "frame_identify failed"); 343 return err; 344 } 345 spp->pa = f.base; 346 spp->mem_size = f.bytes; 347 size_t mem_size = calculate_shared_pool_size(slot_no); 348 349 assert(spp->mem_size >= mem_size); 350 351 err = vspace_map_one_frame_attr(&spp->va, f.bytes, cap, 352 VREGION_FLAGS_READ_WRITE_NOCACHE, NULL, NULL); 353 354 if (err_is_fail(err)) { 355 DEBUG_ERR(err, "vspace_map_one_frame failed"); 356 return err; 357 } 358 359 spp->sp = (struct shared_pool *)spp->va; 360 361 sp_reload_regs(spp); 362 assert(spp->c_size == spp->alloted_slots); 363 364 spp->ghost_read_id = spp->c_read_id; 365 spp->ghost_write_id = spp->c_write_id; 366 spp->pre_write_id = spp->c_read_id; 367 spp->notify_other_side = 0; 368 spp->produce_counter = 0; 369 spp->consume_counter = 0; 370 spp->clear_counter = 0; 371 372 printf("Mapped shared_pool of size(R %"PRIu64", A %"PRIu64") " 373 "with role [%"PRIu8"], slots[%"PRIu64"] and pool len[%"PRIu64"]\n", 374 (uint64_t)mem_size, spp->mem_size, spp->role, spp->alloted_slots, 375 spp->c_size); 376 377#ifndef DISABLE_MFENCE 378 mfence(); 379#endif 380 return SYS_ERR_OK; 381 382} // end function: sp_map_shared_pool 383 384 385// *************************** State modifying functions ************* 386static bool validate_slot(struct slot_data *d) 387{ 388 if (d == NULL) { 389 return false; 390 } 391 392 // FIXME: check if the buffer_id, pbuff_id, len and all are sensible! 393 return true; 394} // end function: validate_slot 395 396void copy_data_into_slot(struct shared_pool_private *spp, uint64_t buf_id, 397 uint64_t id, uint64_t offset, uint64_t len, uint64_t no_pbufs, 398 uint64_t client_data, uint64_t ts) 399{ 400 assert(id < spp->c_size); 401 spp->sp->slot_list[id].d.buffer_id = buf_id; 402 spp->sp->slot_list[id].d.no_pbufs = no_pbufs; 403 spp->sp->slot_list[id].d.pbuf_id = id; 404 spp->sp->slot_list[id].d.offset = offset; 405 spp->sp->slot_list[id].d.len = len; 406 spp->sp->slot_list[id].d.client_data = client_data; 407 spp->sp->slot_list[id].d.ts = ts; 408 409#ifndef DISABLE_MFENCE 410 mfence(); 411#endif 412 // copy the s into shared_pool 413#if 0 414#if !defined(__i386__) 415 cache_flush_range(&spp->sp->slot_list[id], SLOT_SIZE); 416#endif // !defined(__i386__) 417#endif // 0 418} 419 420void sp_copy_slot_data(struct slot_data *d, struct slot_data *s) 421{ 422 assert(d != NULL); 423 assert(s != NULL); 424 *d = *s; 425 /* 426 d->buffer_id = s->buffer_id; 427 d->pbuf_id = s->pbuf_id; 428 d->offset = s->offset; 429 d->len = s->len; 430 d->no_pbufs = s->no_pbufs; 431 d->client_data = s->client_data; 432 d->ts = s->ts; 433#ifndef DISABLE_MFENCE 434 mfence(); 435#endif 436*/ 437} 438 439void sp_copy_slot_data_from_index(struct shared_pool_private *spp, 440 uint64_t idx, struct slot_data *d) 441{ 442 sp_copy_slot_data(d, &spp->sp->slot_list[idx].d); 443} // end function: sp_copy_slot_data_index 444 445 446// Set the value of read index 447// To be used with sp_read_peek_slot 448bool sp_set_read_index(struct shared_pool_private *spp, uint64_t idx) 449{ 450 451 sp_reload_regs(spp); 452 // Trivial case: 453 if (spp->c_read_id == idx) { 454 return true; 455 } 456 457 if (!sp_validate_read_index(spp, idx)) { 458 // The value in index is invalid! 459 return false; 460 } 461 462 if (sp_queue_full(spp)) { 463 // Producer is assuming that there is no free space in this pool. 464 // As we have created some free space by reading, we should inform 465 // the producer to produce more! 466 // Typically means, I am slow! 467 ++spp->notify_other_side; 468 } 469 470 sp_atomic_set_reg(&spp->sp->read_reg, idx); 471 sp_reload_regs(spp); 472 473// spp->ghost_read_id = spp->c_read_id; 474// printf("changing read_index!\n"); 475 if (sp_queue_empty(spp)) { 476 // There is nothing more to consume, 477 // We should inform producer to produce quickly 478 // Typically means, Producer is slow! 479 ++spp->notify_other_side; 480 } 481 482 ++spp->consume_counter; 483 return true; 484} // end function: sp_set_read_index 485 486 487// 9 mfence 488// Set the value of write index 489// To be used with sp_ghost_produce_slot 490bool sp_set_write_index(struct shared_pool_private *spp, uint64_t idx) 491{ 492 sp_reload_regs(spp); 493 494 // Trivial case: 495 if (spp->c_write_id == idx) { 496 return true; 497 } 498 499 if (!sp_validate_write_index(spp, idx)) { 500 // The value in index is invalid! 501 return false; 502 } 503 504 if (sp_queue_empty(spp)) { 505 // Consumer is assuming that there is no data in the pool 506 // As we have created new data, we should inform 507 // the consumer to consume more! 508 // Typically means, I am slow! 509 ++spp->notify_other_side; 510 } 511 512 sp_atomic_set_reg(&spp->sp->write_reg, idx); 513 sp_reload_regs(spp); 514// spp->ghost_write_id = spp->c_write_id; 515 if (sp_queue_elements_count(spp) <= 1) { 516 ++spp->notify_other_side; 517 } 518 519 if (sp_queue_full(spp)) { 520 // There no free space left to create new items. 521 // We should inform the consumer that it is slow! 522 // Typically means, consumer is slow! 523 ++spp->notify_other_side; 524 } 525 526 ++spp->produce_counter; 527 return true; 528} // end function: sp_set_write_index 529 530bool sp_increment_write_index(struct shared_pool_private *spp) 531{ 532 sp_reload_regs(spp); 533 uint64_t idx = ((spp->c_write_id + 1) % spp->c_size); 534 535 if (sp_queue_empty(spp)) { 536 // Consumer is assuming that there is no data in the pool 537 // As we have created new data, we should inform 538 // the consumer to consume more! 539 // Typically means, I am slow! 540 ++spp->notify_other_side; 541 } 542 543 544 sp_atomic_set_reg(&spp->sp->write_reg, idx); 545 spp->c_write_id = idx; 546 547 if (sp_queue_full(spp)) { 548 // There no free space left to create new items. 549 // We should inform the consumer that it is slow! 550 // Typically means, consumer is slow! 551 ++spp->notify_other_side; 552 } 553 554 ++spp->produce_counter; 555 return true; 556} // end function: sp_increment_write_index 557 558 559uint64_t sp_is_slot_clear(struct shared_pool_private *spp, uint64_t id) 560{ 561 sp_reload_regs(spp); 562 if (!sp_queue_empty(spp)) { 563 if (!sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) { 564 sp_print_metadata(spp); 565 printf("failed for id %"PRIu64"\n", id); 566/* 567 printf("callstack: %p %p %p %p\n", 568 __builtin_return_address(0), 569 __builtin_return_address(1), 570 __builtin_return_address(2), 571 __builtin_return_address(3)); 572*/ 573 } 574 if (!sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) { 575 printf("sp_is_slot_clear failed: " 576 " (%"PRIu64", %"PRIu64", %"PRIu64") S %"PRIu64"\n", 577 spp->c_write_id, id, spp->c_read_id, spp->c_size); 578// abort(); 579 } 580 581 } 582 /* 583 else { 584 // queue empty! 585 if (id == spp->c_write_id) { 586 sp_print_metadata(spp); 587 printf("failed for id %"PRIu64"\n", id); 588 printf("callstack: %p %p %p %p\n", 589 __builtin_return_address(0), 590 __builtin_return_address(1), 591 __builtin_return_address(2), 592 __builtin_return_address(3)); 593 } 594 assert(id != spp->c_write_id); 595 } 596 */ 597 return spp->sp->slot_list[id].d.client_data; 598} 599 600bool sp_clear_slot(struct shared_pool_private *spp, struct slot_data *d, 601 uint64_t id) 602{ 603 sp_reload_regs(spp); 604 605 if (sp_queue_full(spp)) { 606 return false; 607 } 608 609 if (sp_queue_empty(spp) || 610 sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) { 611 612 sp_copy_slot_data(d, &spp->sp->slot_list[id].d); 613 spp->pre_write_id = id; 614// printf("%s Slot %p with id %"PRIu64" is cleared and had " 615// "%"PRIu64", %"PRIu64"\n", 616// disp_name(), &spp->sp->slot_list[id].d, 617// id, spp->sp->slot_list[id].d.client_data, d->client_data); 618 619 spp->sp->slot_list[id].d.client_data = 0; 620 ++spp->clear_counter; 621 return true; 622 } 623 624 return false; 625} // end function: sp_clear_slot 626 627bool validate_and_empty_produce_slot(struct shared_pool_private *spp, 628 uint64_t produced_slot_id) 629{ 630 sp_reload_regs(spp); 631 632 if (sp_queue_full(spp)) { 633 return false; 634 } 635 636 uint64_t wi = spp->c_write_id; 637 assert(spp->c_write_id == produced_slot_id); 638 // If needed, mark the slot as produced 639 if(!sp_set_write_index(spp, ((wi + 1) % spp->c_size))) { 640 printf("ERROR: validate_and_empty_produce_slot: sp_set_write_index " 641 "failed\n"); 642 abort(); 643 } 644 return true; 645} // end function: validate_and_empty_produce_slot 646 647 648// Adds the data from parameter d into appropriate slot of shared pool queue 649bool sp_produce_slot(struct shared_pool_private *spp, struct slot_data *d) 650{ 651 652 sp_reload_regs(spp); 653 654 if (sp_queue_full(spp)) { 655 return false; 656 } 657 658 uint64_t wi = spp->c_write_id; 659 sp_copy_slot_data(&spp->sp->slot_list[wi].d, d); 660 661#if 0 662#if !defined(__i386__) 663 cache_flush_range(&spp->sp->slot_list[wi], SLOT_SIZE); 664#endif // !defined(__i386__) 665#endif // 0 666 667 // Incrementing write pointer 668 if(!sp_set_write_index(spp, ((wi + 1) % spp->c_size))) { 669 printf("ERROR: sp_produce_slot: sp_set_write_index failed\n"); 670 abort(); 671 } 672 return true; 673} // end function: sp_produce_slot 674 675 676// 9 mfence 677// Gost-add data into shared_pool 678// Add data into free slots, but don't increment write index 679// This allows adding multiple slots and then atomically increment write index 680bool sp_ghost_produce_slot(struct shared_pool_private *spp, 681 struct slot_data *d, uint64_t idx) 682{ 683 sp_reload_regs(spp); 684 685 // Make sure that slot provided is proper 686 assert(d != NULL); 687 688 if (sp_queue_full(spp)) { 689// printf("sp_ghost_produce_slot: queue full\n"); 690 return false; 691 } 692 693 // Check if the requested peak is valid or not 694 if (!sp_write_peekable_index(spp, idx)) 695 { 696 return false; 697 } 698 699 sp_copy_slot_data(&spp->sp->slot_list[idx].d, d); 700#if 0 701#if !defined(__i386__) 702 cache_flush_range(&spp->sp->slot_list[idx], SLOT_SIZE); 703#endif // !defined(__i386__) 704#endif // 0 705 // Incrementing write pointer 706 spp->ghost_write_id = (idx + 1) % spp->c_size; 707 /* 708 printf("ghost produce slot, producing for %"PRIu64", val %"PRIu64"\n", 709 idx, d->client_data); 710 sp_print_slot(&spp->sp->slot_list[idx].d); 711 */ 712 return true; 713} // end function: sp_produce_slot 714 715// Reads the slot without changing the read pointer, instead changes the local 716// ghost_read_id to know how much is read. 717// To bu used by driver when it adds the packet in hardware queue for sending 718// but the packet is not yet sent. 719// when packet is actually done, then read pointer can be changed. 720bool sp_ghost_read_slot(struct shared_pool_private *spp, struct slot_data *dst) 721{ 722 sp_reload_regs(spp); 723 724 // Make sure that slot provided is proper 725 assert(dst != NULL); 726 727 // Make sure that there is slot available for consumption 728 if (sp_queue_empty(spp)) { 729 return false; 730 } 731 732 // Check if the requested peak is valid or not 733 if (!sp_read_peekable_index(spp, spp->ghost_read_id)) 734 { 735 return false; 736 } 737 738 // Copying the slot data contents into provided slot 739/* 740#if !defined(__i386__) 741 cache_flush_range(&spp->sp->slot_list[spp->ghost_read_id], SLOT_SIZE); 742#endif // !defined(__i386__) 743*/ 744 sp_copy_slot_data(dst, &spp->sp->slot_list[spp->ghost_read_id].d); 745/* printf("After copying data from id %"PRIu64"\n", spp->ghost_read_id); 746 sp_print_slot(&spp->sp->slot_list[spp->ghost_read_id].d); 747*/ 748 spp->ghost_read_id = (spp->ghost_read_id + 1) % spp->c_size; 749 return true; 750} // end function: sp_read_peak_slot 751 752 753 754// FIXME: not used, may be it should be removed 755bool sp_ghost_read_confirm(struct shared_pool_private *spp) 756{ 757 return (sp_set_read_index(spp, spp->ghost_read_id)); 758} 759 760// swaps the slot provided in parameter d with next available slot for 761// consumption. 762// TO be used by application to receive packet and register new pbuf 763// at same time. 764bool sp_replace_slot(struct shared_pool_private *spp, struct slot_data *new_slot) 765{ 766 sp_reload_regs(spp); 767 768 // Make sure that slot provided is proper 769 if (!validate_slot(new_slot)) { 770 return false; 771 } 772 773 // Make sure that there is slot available for consumption 774 if (sp_queue_empty(spp)) { 775 return false; 776 } 777 778 uint64_t ri = spp->c_read_id; 779 // swapping the slot_data contents between ri and new_slot 780 struct slot_data tmp; 781#if 0 782#if !defined(__i386__) 783 cache_flush_range(&spp->sp->slot_list[ri], SLOT_SIZE); 784#endif // !defined(__i386__) 785#endif // 0 786 sp_copy_slot_data(&tmp, &spp->sp->slot_list[ri].d); 787 sp_copy_slot_data(&spp->sp->slot_list[ri].d, new_slot); 788 sp_copy_slot_data(new_slot, &tmp); 789#if 0 790#if !defined(__i386__) 791 cache_flush_range(&spp->sp->slot_list[ri], SLOT_SIZE); 792#endif // !defined(__i386__) 793#endif // 0 794 // Incrementing read index 795 if(!sp_set_read_index(spp, ((ri + 1) % spp->c_size))) { 796 printf("sp_set_read_index failed\n"); 797 sp_print_metadata(spp); 798 abort(); 799 } 800 return true; 801} // end function: sp_consume_slot 802 803 804// ****************** For debugging purposes ************** 805void sp_print_metadata(struct shared_pool_private *spp) 806{ 807 assert(spp != NULL); 808// sp_reload_regs(spp); 809/* printf("SPP Q C[%"PRIu8"], R[%"PRIu8"], GRI[%"PRIu64"], GWI[%"PRIu64"] " 810 "pre_write_id[%"PRIu64"]\n", 811 spp->is_creator?1:0, spp->role, 812 spp->ghost_read_id, spp->ghost_write_id, spp->pre_write_id); 813*/ 814 printf("SPP S PRO[%"PRIu64"], CON[%"PRIu64"], CLEAR[%"PRIu64"]\n", 815 spp->produce_counter, spp->consume_counter, spp->clear_counter); 816 printf("SPP S C C-R[%"PRIu64"], C-W[%"PRIu64"] C-S[%"PRIu64"]\n", 817 spp->c_read_id, spp->c_write_id, spp->c_size); 818 819 struct shared_pool *sp = spp->sp; 820 assert(sp != NULL); 821/* 822 printf("SP Q len[%"PRIu64"], RI[%"PRIu64"], WI[%"PRIu64"], elem[%"PRIu64"]" 823 " free[%"PRIu64"]\n", 824 sp->size_reg.value, sp->read_reg.value, sp->write_reg.value, 825 sp_queue_elements_count(spp), 826 sp_queue_free_slots_count(spp)); 827*/ 828} 829 830 831void sp_print_slot(struct slot_data *d) 832{ 833 printf("@%p, buf[%"PRIu64"], pbuf_id[%"PRIu64"], offset[%"PRIu64"], " 834 "len[%"PRIu64"], n_p[%"PRIu64"], CL[%"PRIu64"], ts[%"PRIu64"]\n", 835 d, d->buffer_id, d->pbuf_id, d->offset, d->len, 836 d->no_pbufs, d->client_data, d->ts); 837} 838 839// Code for testing and debugging the library 840void sp_print_pool(struct shared_pool_private *spp) 841{ 842 sp_reload_regs(spp); 843 assert(spp != NULL); 844 struct shared_pool *sp = spp->sp; 845 assert(sp != NULL); 846 847 uint64_t queue_size = sp->size_reg.value; 848 sp_print_metadata(spp); 849 int i = 0; 850 for(i = 0; i < queue_size; ++i) { 851 sp_print_slot(&sp->slot_list[i].d); 852 } 853} 854 855