1#include <stdio.h> 2#include <stdlib.h> 3#include <assert.h> 4#include <string.h> 5#ifndef __linux__ 6#include <barrelfish/barrelfish.h> 7#include <vfs/vfs.h> 8#include <barrelfish/nameservice_client.h> 9#include <barrelfish/bulk_transfer.h> 10#include <if/replay_defs.h> 11#include <errno.h> 12#else 13static const char vfs_cache_str[] = "linux (TCP/IP)"; 14#include <unistd.h> 15#include <stdbool.h> 16#include <sys/types.h> 17#include <sys/socket.h> 18#include <sys/time.h> 19#include <netinet/in.h> 20#include <netinet/ip.h> 21#include <poll.h> 22#include <netdb.h> 23#include <errno.h> 24#include <sched.h> 25#include <inttypes.h> 26#endif 27 28#include "defs.h" 29#include "hash.h" 30 31#define MAX_LINE 1024 32#define MAX_SLAVES 64 33#define MAX_DEPS 60 34#define BULK_BLOCK_SIZE (4096*256) // 1MiB 35#define BULK_BLOCKS_NR 1 36#define BULK_TOTAL_SIZE (BULK_BLOCK_SIZE*BULK_BLOCKS_NR) 37 38/* TRACE LIST */ 39 40struct trace_list { 41 struct trace_entry *head, *tail; 42}; 43 44static inline void trace_add_tail(struct trace_list *tl, struct trace_entry *te) 45{ 46 te->next = NULL; 47 48 if (tl->head == NULL) { 49 tl->head = te; 50 } else { 51 tl->tail->next = te; 52 } 53 54 tl->tail = te; 55} 56 57/* PID ENTRIES (TASKS) */ 58 59struct slave; /* forward reference */ 60struct pid_entry { 61 int pid; 62 struct trace_list trace_l; 63 size_t tentries_nr; 64 /* list of children */ 65 struct pid_entry *children[MAX_DEPS]; 66 size_t children_nr; 67 /* list of parents */ 68 struct pid_entry *parents[MAX_DEPS]; 69 size_t parents_nr; 70 71 /* buffer for trace entries */ 72 replay_eventrec_t *tes; 73 size_t tes_size; /* size of tes buffer in bytes */ 74 75 size_t parents_completed; 76 struct slave *sl; 77 unsigned completed:1; 78}; 79 80/* SLAVES */ 81 82struct slave { 83 //int pid[MAX_PIDS]; 84 struct pid_entry *pe; 85 //struct trace_entry *current_te; 86 //struct qelem *queue, *qend; 87#ifndef __linux__ 88 struct replay_binding *b; 89 /* bulk transfer data */ 90 struct capref frame; 91 struct bulk_transfer bt; 92 uintptr_t current_bid; 93#else 94 int socket; 95 ssize_t sentbytes; 96#endif 97}; 98 99static struct { 100 int num_slaves; 101 int num_finished; 102 struct slave slaves[MAX_SLAVES]; 103} SlState; 104 105//struct qelem { 106// replay_eventrec_t er; 107// struct qelem *next, *prev; 108//}; 109// 110//struct writer { 111// int fnum, pid; 112// struct slave *slave; 113// struct writer *prev, *next; 114//}; 115 116/* 117 * Dependencies: 118 * W writes to a file that R reads 119 * R needs to wait for W to finish 120 * W is parent, R is child 121 * nodes that are ready to execute have no parents 122 */ 123struct task_graph { 124 struct pid_entry *pes; /* array of (all) pid entries */ 125 int pes_nr, pes_size; /* number of pid entries, size of array */ 126 struct pid_entry **stack; /* current stack */ 127 int stack_nr, stack_size; /* stack entries/size */ 128 int pes_completed; /* completed entries */ 129 hash_t *pids_h; /* pid -> pid_entry hash */ 130}; 131static struct task_graph TG; 132 133static struct pid_entry * 134tg_stack_peek(struct task_graph *tg, int i) 135{ 136 assert(i < tg->stack_nr); 137 return tg->stack[tg->stack_size - i]; 138} 139 140static struct pid_entry * 141tg_pop(struct task_graph *tg) 142{ 143 struct pid_entry *ret; 144 if (tg->stack_nr == 0) 145 return NULL; 146 ret = tg->stack[tg->stack_size +1 - tg->stack_nr]; 147 tg->stack_nr--; 148 assert(ret != NULL); 149 return ret; 150} 151 152static void 153tg_push(struct task_graph *tg, struct pid_entry *pe) 154{ 155 assert(tg->stack_nr < tg->stack_size); 156 tg->stack[tg->stack_size - tg->stack_nr] = pe; 157 tg->stack_nr++; 158} 159 160static void 161__build_taskgraph_stack(struct task_graph *tg) 162{ 163 tg->stack_nr = 0; 164 for (int i=0; i<tg->pes_nr; i++) { 165 struct pid_entry *pe = tg->pes + i; 166 if (pe->parents_nr == 0) 167 tg_push(tg, pe); 168 } 169} 170 171static void 172build_taskgraph_stack(struct task_graph *tg) 173{ 174 // build stack, and fill it with ready nodes (i.e., no parent nodes) 175 tg->stack_size = TOTAL_PIDS; 176 tg->stack = calloc(tg->stack_size, sizeof(struct pid_entry *)); 177 assert(tg->stack); 178 __build_taskgraph_stack(tg); 179} 180 181static void __attribute__((unused)) 182tg_reset(struct task_graph *tg) 183{ 184 for (int i=0; i<tg->pes_nr; i++) { 185 tg->pes[i].completed = 0; 186 tg->pes[i].parents_completed = 0; 187 } 188 tg->pes_completed = 0; 189 __build_taskgraph_stack(tg); 190} 191 192static void 193tg_complete(struct task_graph *tg, struct pid_entry *pe) 194{ 195 assert(pe->completed == 0); 196 pe->completed = 1; 197 tg->pes_completed++; 198 dmsg("\tpe: %p (idx:%ld,pid:%d) completed (%d/%d)\n", pe, (pe - tg->pes), pe->pid, tg->pes_completed, tg->pes_nr); 199 for (int i=0; i < pe->children_nr; i++) { 200 struct pid_entry *pe_child = pe->children[i]; 201 if (++pe_child->parents_completed == pe_child->parents_nr) { 202 tg_push(tg, pe_child); 203 } 204 } 205} 206 207 208//static struct writer *writers = NULL; 209#ifndef __linux__ 210static bool bound; /* XXX: make this volatile? No, handler runs on the same context */ 211static bool init_ok; 212static bool print_stats_ok; 213#endif 214 215#ifdef __linux__ 216#endif 217 218#ifndef __linux__ 219//static void event_done(struct replay_binding *b, uint32_t fnum) 220//{ 221// /* if(te->op == TOP_Close) { */ 222// // See if it was a writer and remove 223// 224// printf("writer done for %u\n", fnum); 225// 226// for(struct writer *w = writers; w != NULL; w = w->next) { 227// if(w->fnum == fnum) { 228// assert(w != NULL); 229// if(w != writers) { 230// assert(w != NULL); 231// assert(w->prev != NULL); 232// w->prev->next = w->next; 233// } else { 234// writers = w->next; 235// } 236// free(w); 237// break; 238// } 239// } 240// /* } */ 241//} 242 243static void 244init_reply_handler(struct replay_binding *b) 245{ 246 assert(!init_ok); 247 init_ok = true; 248} 249 250static void 251print_stats_reply_handler(struct replay_binding *b) 252{ 253 assert(!print_stats_ok); 254 print_stats_ok = true; 255} 256static void 257task_completed_handler(struct replay_binding *b, uint16_t pid, uint64_t bulk_id) 258{ 259 struct task_graph *tg = b->st; 260 struct slave *sl; 261 struct pid_entry *pe; 262 263 pe = (void *)hash_lookup(tg->pids_h, pid); 264 assert(pe != (void *)HASH_ENTRY_NOTFOUND); 265 sl = pe->sl; 266 tg_complete(tg, pe); 267 bulk_free(&sl->bt, bulk_id); 268 sl->pe = NULL; 269} 270 271static void 272finish_reply_handler(struct replay_binding *b) 273{ 274 dmsg("ENTER\n"); 275 SlState.num_finished++; 276} 277 278static struct replay_rx_vtbl replay_vtbl = { 279 .task_completed = task_completed_handler, 280 .slave_finish_reply = finish_reply_handler, 281 .slave_init_reply = init_reply_handler, 282 .slave_print_stats_reply = print_stats_reply_handler 283}; 284 285static void replay_bind_cont(void *st, errval_t err, struct replay_binding *b) 286{ 287 static int slavenum = 0; 288 struct slave *sl = &SlState.slaves[slavenum]; 289 slavenum++; 290 291 dmsg("ENTER\n"); 292 //printf("%s:%s MY TASKGRAPH IS %p\n", __FILE__, __FUNCTION__, &TG); 293 294 assert(err_is_ok(err)); 295 sl->b = b; 296 b->rx_vtbl = replay_vtbl; 297 b->st = &TG; 298 bound = true; 299 /* printf("assigned binding to %p\n", sl); */ 300} 301#else 302static inline uint64_t rdtsc(void) 303{ 304 uint32_t eax, edx; 305 __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx)); 306 return ((uint64_t)edx << 32) | eax; 307} 308#endif 309 310//static bool printall = false; 311 312 313static inline void 314pentry_add_dependency(struct pid_entry *parent, struct pid_entry *child) 315{ 316 int idx; 317 for (int i=0; i<parent->children_nr; i++) 318 if (parent->children[i]->pid == child->pid) 319 return; 320 321 //printf("%d -> %d;\n", parent->pid, child->pid); 322 323 idx = parent->children_nr; 324 assert(idx < MAX_DEPS); 325 parent->children[idx] = child; 326 parent->children_nr++; 327 328 idx = child->parents_nr; 329 assert(idx < MAX_DEPS); 330 child->parents[idx] = parent; 331 child->parents_nr++; 332} 333 334//static struct pid_entry *allpids[nOTAL_PIDS]; 335 336static void 337parse_tracefile_line(char *line, int linen, struct trace_entry *te) 338{ 339 size_t fnum, size; 340 char flags[1024]; 341 int fd; 342 unsigned int pid; 343 344 if(sscanf(line, "open %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) { 345 te->op = TOP_Open; 346 te->fd = fd; 347 te->u.fnum = fnum; 348 } else if(sscanf(line, "close %d %u", &fd, &pid) >= 2) { 349 te->op = TOP_Close; 350 te->fd = fd; 351 } else if(sscanf(line, "read %d %zu %u", &fd, &size, &pid) >= 3) { 352 te->op = TOP_Read; 353 te->fd = fd; 354 te->u.size = size; 355 } else if(sscanf(line, "write %d %zu %u", &fd, &size, &pid) >= 3) { 356 te->op = TOP_Write; 357 te->fd = fd; 358 te->u.size = size; 359 } else if(sscanf(line, "seek %d %zu %u", &fd, &size, &pid) >= 3) { 360 te->op = TOP_Seek; 361 te->fd = fd; 362 te->u.size = size; 363 } else if(sscanf(line, "creat %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) { 364 te->op = TOP_Create; 365 te->fd = fd; 366 te->u.fnum = fnum; 367 } else if(sscanf(line, "unlink %zu %s %u", &fnum, flags, &pid) >= 3) { 368 te->op = TOP_Unlink; 369 te->u.fnum = fnum; 370 } else if(sscanf(line, "exit %u", &pid) >= 1) { 371 te->op = TOP_Exit; 372 } else { 373 printf("Invalid line %d: %s\n", linen, line); 374 exit(EXIT_FAILURE); 375 } 376 377 // There's always a PID 378 te->pid = pid; 379 assert(pid != 0); 380 te->fline = linen; 381 382 // If we have flags, set them now 383 if(te->op == TOP_Open || te->op == TOP_Create) { 384 if(!strcmp(flags, "rdonly")) { 385 te->mode = FLAGS_RdOnly; 386 } else if(!strcmp(flags, "wronly")) { 387 te->mode = FLAGS_WrOnly; 388 } else if(!strcmp(flags, "rdwr")) { 389 te->mode = FLAGS_RdWr; 390 } else { 391 printf("Invalid open flags: %s\n", flags); 392 exit(EXIT_FAILURE); 393 } 394 } 395} 396 397static void 398parse_tracefile(const char *tracefile, struct trace_list *tlist) 399{ 400 printf("reading tracefile...\n"); 401 402 FILE *f = fopen(tracefile, "r"); 403 assert(f != NULL); 404 int linen = 0; 405 406 while(!feof(f)) { 407 char line[MAX_LINE]; 408 409 if (fgets(line, MAX_LINE, f) == NULL) { 410 break; 411 } 412 413 linen++; 414 if (linen % 1000 == 0) { 415 printf("---- %s:%s() parsing line = %d\n", __FILE__, __FUNCTION__, linen); 416 } 417 418 struct trace_entry *tentry = malloc(sizeof(struct trace_entry)); 419 assert(tentry != NULL); 420 421 // parse current line to tentry 422 parse_tracefile_line(line, linen, tentry); 423 424 // Link it in with the rest of the list (forward order) 425 trace_add_tail(tlist, tentry); 426 } 427 fclose(f); 428 printf("tracefile read [number of lines:%d]\n", linen); 429} 430 431static void 432build_taskgraph(struct trace_list *tl, struct task_graph *tg) 433{ 434 assert(tl->head != NULL && tl->tail != NULL); 435 struct trace_entry *te, *te_next = NULL; 436 hash_t *writers_h = hash_init(TOTAL_PIDS); /* fnum -> pid_entry */ 437 // alloc a sequential array for all pid entries 438 tg->pids_h = hash_init(TOTAL_PIDS); /* pid -> pid_entry */ 439 tg->pes_size = TOTAL_PIDS; 440 tg->pes = calloc(tg->pes_size, sizeof(struct pid_entry)); // we can realloc it later 441 tg->pes_nr = 0; 442 443 for (te = tl->head; te != NULL; te = te_next) { 444 // store next pointer (trace_add_tail will change it) 445 te_next = te->next; 446 447 // if this is a new pid, create a new pid_entry 448 struct pid_entry *pe = (void *)hash_lookup(tg->pids_h, te->pid); 449 if (pe == (void *)HASH_ENTRY_NOTFOUND) { 450 // if a pid's first operation is not Open/Create ignore it 451 if (te->op != TOP_Open && te->op != TOP_Create) { 452 printf("%s() :: IGNORING operation %d for pid %d\n", __FUNCTION__, te->op, te->pid); 453 continue; 454 } 455 assert(tg->pes_nr < tg->pes_size); 456 pe = tg->pes + tg->pes_nr++; 457 pe->pid = te->pid; 458 hash_insert(tg->pids_h, pe->pid, (unsigned long)pe); 459 //printf("%d;\n", pe->pid); 460 } 461 462 // add trace entry into pid list 463 trace_add_tail(&pe->trace_l, te); 464 pe->tentries_nr++; 465 466 /* track dependencies: 467 * - look at open/create operations 468 * - put writers in a hash table, based on the file they write 469 * - for readers, check if a writer exists for the file they read 470 * - avoid cyclic dependencies of RW open/create 471 * Note: For multiple writers we just track the latest open/create */ 472 if (te->op == TOP_Open && te->op == TOP_Create) { 473 size_t fnum = te->u.fnum; 474 struct pid_entry *pe_writer = (void *)hash_lookup(writers_h, fnum); 475 476 if (te->mode != FLAGS_RdOnly) { // writer 477 if (pe_writer != (void *)HASH_ENTRY_NOTFOUND && pe_writer->pid != pe->pid) { 478 //assert(0 && "multiple different writers"); 479 } 480 hash_insert(writers_h, fnum, (unsigned long)pe); 481 pe_writer = pe; 482 } 483 484 if ((te->mode != FLAGS_WrOnly) && // this is a reader 485 (pe_writer != (void *)HASH_ENTRY_NOTFOUND) && // a writer exists 486 (pe_writer->pid != pe->pid)) { // and is not the reader 487 pentry_add_dependency(pe_writer, pe); 488 } 489 } 490 } 491 build_taskgraph_stack(tg); 492 //cleanup and return 493 tl->head = tl->tail = NULL; 494 hash_destroy(writers_h); 495} 496 497static void 498__print_taskgraph(struct pid_entry *root, int level) 499{ 500 if (root == NULL) 501 return; 502 for (int i=0; i<level; i++) 503 printf("\t"); 504 printf("%d (completed:%d)\n", root->pid, root->completed); 505 506 for (int i=0; i<root->children_nr; i++) { 507 __print_taskgraph(root->children[i], level+1); 508 } 509} 510 511static void __attribute__((unused)) 512print_taskgraph(struct task_graph *tg) 513{ 514 for (int i=0; i<tg->stack_nr; i++) { 515 struct pid_entry *pe = tg_stack_peek(tg, i); 516 __print_taskgraph(pe, 0); 517 } 518} 519 520static void 521print_pid_entry(struct pid_entry *pe, int print_ops) 522{ 523 struct trace_entry *te; 524 printf("pid entry (%p) pid:%d children:%zu parents:%zu completed:%d tentries:%zd\n", pe, pe->pid, pe->children_nr, pe->parents_nr, pe->completed, pe->tentries_nr); 525 te = pe->trace_l.head; 526 527 if (!print_ops) { 528 return; 529 } 530 531 do { 532 printf("\t op:%d pid:%d\n", te->op, te->pid); 533 } while ((te = te->next) != NULL); 534 printf("\tEND\n"); 535} 536 537static void __attribute__((unused)) 538print_task(struct task_graph *tg, int pid) 539{ 540 for (int i=0; i<tg->pes_nr; i++) { 541 struct pid_entry *pe = tg->pes + i; 542 if (pe->pid == pid) { 543 print_pid_entry(pe, 0); 544 } 545 } 546} 547 548static void 549mk_replay_event_req(struct trace_entry *te, replay_eventrec_t *req) 550{ 551 req->op = te->op; 552 req->fd = te->fd; 553 req->mode = te->mode; 554 //req->fline = te->fline; 555 req->pid = te->pid; 556 557 switch(te->op) { 558 case TOP_Open: 559 case TOP_Create: 560 case TOP_Unlink: 561 req->fnumsize = te->u.fnum; 562 break; 563 564 case TOP_Read: 565 case TOP_Write: 566 case TOP_Seek: 567 req->fnumsize = te->u.size; 568 break; 569 570 case TOP_Close: 571 case TOP_Exit: 572 break; 573 574 default: 575 assert(0); 576 break; 577 } 578 579 assert(req->pid != 0); 580} 581 582 583static void 584trace_bufs_init(struct task_graph *tg) 585{ 586 for (int i=0; i < tg->pes_nr; i++) { 587 struct pid_entry *pe = tg->pes + i; 588 589 size_t size = pe->tes_size = sizeof(replay_eventrec_t)*pe->tentries_nr; 590 if (size >= BULK_TOTAL_SIZE) { 591 msg("size for pid:%d [%zd] larger than %d\n", pe->pid, size, BULK_TOTAL_SIZE); 592 assert(0); 593 } 594 595 assert(size <= BULK_TOTAL_SIZE); 596 pe->tes = malloc(size); 597 assert(pe->tes); 598 599 struct trace_entry *te = pe->trace_l.head; 600 for (int ti=0; ti < pe->tentries_nr; ti++) { 601 mk_replay_event_req(te, pe->tes + ti); 602 te = te->next; 603 } 604 assert(te == NULL); // make sure the count is right! 605 } 606} 607 608static void __attribute__((unused)) 609print_all_tasks(struct task_graph *tg) 610{ 611 for (int i=0; i<tg->pes_nr; i++) { 612 struct pid_entry *pe = tg->pes + i; 613 print_pid_entry(pe, 0); 614 } 615} 616 617/* functions to be implemented seperately by bfish/linux */ 618static void slaves_connect(struct task_graph *tg); 619static void slave_push_work(struct slave *); 620static void slaves_finalize(void); 621static void slaves_print_stats(void); 622static void master_process_reqs(void); 623cycles_t tscperms; 624 625int main(int argc, char *argv[]) 626{ 627#ifndef __linux__ 628 if(argc < 5) { 629 printf("Usage: %s tracefile nslaves mountdir mount-URL\n", argv[0]); 630 exit(EXIT_FAILURE); 631 } 632 633 assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms))); 634 errval_t err = vfs_mkdir(argv[3]); 635 assert(err_is_ok(err)); 636 637 printf("----------------------------------- VFS MOUNT\n"); 638 err = vfs_mount(argv[3], argv[4]); 639 if(err_is_fail(err)) { 640 DEBUG_ERR(err, "vfs_mount"); 641 } 642 printf("----------------------------------- VFS MOUNT DONE\n"); 643 assert(err_is_ok(err)); 644#else 645 if(argc < 3) { 646 printf("Usage: %s tracefile nslaves\n", argv[0]); 647 exit(EXIT_FAILURE); 648 } 649#endif 650 651 memset(&SlState, 0, sizeof(SlState)); 652 //SlState.waitset = get_default_waitset(); 653 //struct waitset ws; 654 //waitset_init(&ws); 655 //SlState.waitset = &ws; 656 657 char *tracefile = argv[1]; 658 SlState.num_slaves = atoi(argv[2]); 659 printf("tracefile=%s\n", tracefile); 660 661 printf("reading dependency graph...\n"); 662 663 // Parse trace file into memory records 664 struct trace_list tlist = {.head = NULL, .tail = NULL}; 665 parse_tracefile(tracefile, &tlist); 666 667 // Build task graph. roots are nodes without dependencies 668 #ifndef __linux__ 669 msg("[MASTER] My cpu is: %d\n", disp_get_core_id()); 670 #endif 671 memset(&TG, 0, sizeof(TG)); 672 build_taskgraph(&tlist, &TG); 673 //print_all_tasks(&TG); 674 //print_taskgraph(&TG); 675 //printf("TG entries:%d completed:%d stack_size:%d\n", TG.pes_nr, TG.pes_completed, TG.stack_nr); 676 677 msg("[MASTER] INITIALIZING BUFFERS...\n"); 678 trace_bufs_init(&TG); 679 680 msg("[MASTER] CONNECTING TO SLAVES...\n"); 681 slaves_connect(&TG); 682 683 msg("[MASTER] STARTING WORK...\n"); 684 uint64_t start_ticks = rdtsc(); 685 for (;;) { 686 /* enqueue work to the slaves */ 687 for (int sid=0; sid < SlState.num_slaves; sid++) { 688 struct slave *sl = SlState.slaves + sid; 689 // try to assign a pid entry to a slave, if it doesn't hove one 690 if (sl->pe == NULL) { 691 sl->pe = tg_pop(&TG); 692 if (sl->pe == NULL) { 693 continue; /* no more tasks in the stack */ 694 } 695 dmsg("[MASTER] assigned pid:%d to sl:%d (stack_nr:%d completed:%d total:%d bytes:%zd)\n", 696 sl->pe->pid, sid, TG.stack_nr, TG.pes_completed, TG.pes_nr, sl->pe->tes_size); 697 sl->pe->sl = sl; 698 slave_push_work(sl); 699 } 700 master_process_reqs(); 701 } 702 703 if (TG.pes_completed == TG.pes_nr) 704 break; 705 } 706 707 uint64_t work_ticks = rdtsc() - start_ticks; 708 slaves_finalize(); 709 uint64_t total_ticks = rdtsc() - start_ticks; 710 printf("[MASTER] replay done> cache:%s slaves:%d ticks:%" PRIu64 711 " (%lf ms) [total: %lfms]\n", 712 vfs_cache_str, SlState.num_slaves, work_ticks, 713 (double)work_ticks /(double)tscperms, 714 (double)total_ticks/(double)tscperms); 715 slaves_print_stats(); 716 return 0; 717} 718 719#ifndef __linux__ 720static void 721master_process_reqs(void) 722{ 723 /* process slave requests */ 724 for (;;){ 725 errval_t ret; 726 ret = event_dispatch(get_default_waitset()); 727 if (ret == LIB_ERR_NO_EVENT) 728 break; 729 assert(err_is_ok(ret)); 730 } 731} 732 733static void 734slaves_finalize(void) 735{ 736 int err; 737 738 /* notify slaves */ 739 for (int sid=0; sid < SlState.num_slaves; sid++) { 740 struct slave *sl = SlState.slaves + sid; 741 do { 742 err = sl->b->tx_vtbl.slave_finish(sl->b, NOP_CONT); 743 } while (err_no(err) == FLOUNDER_ERR_TX_BUSY); 744 assert(err_is_ok(err)); 745 } 746 747 /* wait for their replies */ 748 do { 749 err = event_dispatch(get_default_waitset()); 750 assert(err_is_ok(err)); 751 } while (SlState.num_finished < SlState.num_slaves); 752} 753 754static void 755slaves_print_stats(void) 756{ 757 int err; 758 759 /* have slaves print stats synchronously */ 760 for (int sid=0; sid < SlState.num_slaves; sid++) { 761 struct slave *sl = SlState.slaves + sid; 762 print_stats_ok = false; 763 err = sl->b->tx_vtbl.slave_print_stats(sl->b, NOP_CONT); 764 assert(err_is_ok(err)); 765 while (!print_stats_ok) { 766 err = event_dispatch(get_default_waitset()); 767 assert(err_is_ok(err)); 768 } 769 } 770} 771 772static void 773slave_push_work(struct slave *sl) 774{ 775 int err; 776 struct bulk_buf *bb; 777 uint64_t bulk_id; 778 779 //dmsg("pushing work for slave: %ld (%p) pid:%d (completed:%d)\n", sl-slaves, sl, sl->pe->pid, sl->pe->completed); 780 bb = bulk_alloc(&sl->bt); 781 if (bb == NULL) { 782 return; 783 } 784 bulk_buf_copy(bb, sl->pe->tes, sl->pe->tes_size); 785 bulk_id = bulk_prepare_send(bb); 786 err = sl->b->tx_vtbl.new_task(sl->b, NOP_CONT, bulk_id, sl->pe->tes_size); 787 if (err == FLOUNDER_ERR_TX_BUSY) { 788 bulk_free(&sl->bt, bulk_id); 789 return; 790 } 791 assert(err_is_ok(err)); 792} 793 794static void 795slaves_connect(struct task_graph *tg) 796{ 797 char name[128]; 798 iref_t iref; 799 int err; 800 801 for (int sid=0; sid < SlState.num_slaves; sid++) { 802 int r = snprintf(name, 128, "replay_slave.%u", sid + 1); 803 struct slave *sl = SlState.slaves + sid; 804 assert(r != -1); 805 806 err = nameservice_blocking_lookup(name, &iref); 807 if (err_is_fail(err)) { 808 DEBUG_ERR(err, "could not lookup IREF for replay slave"); 809 abort(); 810 } 811 812 /* bound to slave */ 813 bound = false; 814 err = replay_bind(iref, replay_bind_cont, NULL, 815 get_default_waitset(), 816 IDC_BIND_FLAGS_DEFAULT); 817 if(err_is_fail(err)) { 818 DEBUG_ERR(err, "replay_bind"); 819 } 820 while(!bound) { 821 err = event_dispatch(get_default_waitset()); 822 assert(err_is_ok(err)); 823 } 824 msg("Bound to slave %d\n", sid); 825 826 /* initialize bulk transfer for slave */ 827 init_ok = false; 828 err = bulk_create(BULK_TOTAL_SIZE, BULK_BLOCK_SIZE, &sl->frame, &sl->bt); 829 assert(err_is_ok(err)); 830 err = sl->b->tx_vtbl.slave_init(sl->b, NOP_CONT, sl->frame, BULK_TOTAL_SIZE); 831 assert(err_is_ok(err)); 832 while (!init_ok) { 833 err = event_dispatch(get_default_waitset()); 834 assert(err_is_ok(err)); 835 } 836 837 msg("Slave %d initialized\n", sid); 838 } 839} 840#endif 841 842#ifdef __linux__ 843static void 844slave_push_work(struct slave *sl) 845{ 846 ssize_t ret; 847 848 assert(sl->pe != NULL); 849 assert(sl->pe->tes_size > sl->sentbytes); 850 851 dmsg("TRYING TO SEND: %zd bytes for pid=%u\n", sl->pe->tes_size - sl->sentbytes, sl->pe->pid); 852 ret = send(sl->socket, 853 (char *)sl->pe->tes + sl->sentbytes, 854 sl->pe->tes_size - sl->sentbytes, 855 0); /* setting MSG_DONTWAIT seems to cause problems */ 856 if (ret <= 0 && errno != EAGAIN) { 857 perror("send"); 858 exit(1); 859 } 860 dmsg("SENT: %zd bytes for pid=%u\n", ret, sl->pe->pid); 861 sl->sentbytes += ret; 862} 863 864static void 865slaves_finalize(void) 866{ 867 for (int i=0; i<SlState.num_slaves; i++) { 868 struct slave *sl = SlState.slaves + i; 869 close(sl->socket); 870 } 871} 872 873static void 874master_process_reqs(void) 875{ 876 uint16_t rpid; 877 // read from available fds 878 for (int i=0; i<SlState.num_slaves; i++) { 879 struct slave *sl = SlState.slaves + i; 880 881 if (sl->pe == NULL) { 882 continue; 883 } 884 885 /* first check if we need to send more data to the slave 886 * (main loop will only call slave_push_work() once) */ 887 if (sl->pe->tes_size > sl->sentbytes) { 888 slave_push_work(sl); 889 } 890 891 int r = recv(sl->socket, &rpid, sizeof(rpid), MSG_DONTWAIT); 892 if (r == -1) { 893 assert(errno == EWOULDBLOCK || errno == EAGAIN); 894 continue; /* no data here, move on */ 895 } 896 /* slave is done with task */ 897 assert(r == sizeof(rpid)); 898 assert(rpid == sl->pe->pid); 899 tg_complete(&TG, sl->pe); 900 sl->pe = NULL; 901 sl->sentbytes = 0; 902 } 903} 904 905static void slaves_print_stats(void) 906{ 907} 908 909/* connection info */ 910 911static void 912slaves_connect(struct task_graph *tg) 913{ 914 msg("connecting to slaves...\n"); 915 for(int i = 0; i < SlState.num_slaves; i++) { 916 int ret; 917 struct slave *sl = &SlState.slaves[i]; 918 919 sl->socket = socket(AF_INET, SOCK_STREAM, 0); 920 if (sl->socket == -1) { 921 perror("socket"); 922 exit(1); 923 } 924 925 struct sockaddr_in a = { 926 .sin_family = PF_INET, 927 .sin_port = htons(0), 928 .sin_addr = { 929 .s_addr = htonl(INADDR_ANY) 930 } 931 }; 932 933 ret = bind(sl->socket, (struct sockaddr *)&a, sizeof(a)); 934 if (ret != 0) { 935 perror("bind"); 936 exit(1); 937 } 938 939 int port = 1234; 940 char host[128]; 941 snprintf(host, 128, "rck%02u", i + 1); 942 943 // FOR DEBUGGING! 944 snprintf(host, 128, "localhost"); 945 printf("connecting to %s:%d ...\n", host, port); 946 port = 1234 + i; 947 948 struct hostent *h; 949 h = gethostbyname(host); 950 assert(h != NULL && h->h_length == sizeof(struct in_addr)); 951 952 struct sockaddr_in sa = { 953 .sin_family = AF_INET, 954 .sin_port = htons(port), 955 .sin_addr = *(struct in_addr *)h->h_addr_list[0] 956 }; 957 958 ret = connect(sl->socket, (struct sockaddr *)&sa, sizeof(sa)); 959 if (ret < 0) { 960 perror("connect"); 961 exit(1); 962 } 963 964 sl->sentbytes = 0; 965 #if 0 /* do a recv with MSG_DONTWAIT */ 966 /* set non-blocking flag */ 967 int sock_fl = fcntl(sl->socket, F_GETFD); 968 sock_fl |= O_NONBLOCK; 969 sock_fl = fcntl(sl->socket, F_SETFD, sock_fl); 970 assert(sock_fl & O_NONBLOCK); 971 #endif 972 } 973} 974#endif 975 976// 977// 978// uint64_t tscperms; 979//#ifndef __linux__ 980// err = sys_debug_get_tsc_per_ms(&tscperms); 981// assert(err_is_ok(err)); 982// 983//#else 984// tscperms = 533000; 985// 986//#endif 987// 988// printf("starting replay\n"); 989// 990// /* for(struct trace_entry *te = trace; te != NULL; te = te->next) { */ 991// /* static int cnt = 0; */ 992// /* printf("%d: %d, %zu, %d, %d, %d, fline %d\n", */ 993// /* cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, te->fline); */ 994// /* cnt++; */ 995// /* } */ 996// 997// uint64_t start = rdtsc(); 998// 999// // Start trace replay 1000// for(struct trace_entry *te = trace; te != NULL; te = te->next) { 1001// // Distribute work to slaves -- either they are empty (PID == 1002// // 0) or they already execute for a PID, in which case we keep 1003// // sending them that PID's work until the PID exits) 1004// 1005// static int cnt = 0; 1006// 1007// /* if(((cnt * 100) / linen) % 5 == 0) { */ 1008// /* printf("%d / %d\n", cnt, linen); */ 1009// /* } */ 1010// cnt++; 1011// 1012// /* printall = false; */ 1013// /* if(cnt == 6186 || cnt == 5840) { */ 1014// /* printall = true; */ 1015// /* } */ 1016// 1017// // If this is an exit, remove the PID and continue 1018// if(te->op == TOP_Exit) { 1019// int i; 1020// /* printf("PIDs: "); */ 1021// for(i = 0; i < num_slaves; i++) { 1022// /* printf("%u ", slaves[i].pid); */ 1023// for(int j = 0; j < MAX_PIDS; j++) { 1024// if(slaves[i].pid[j] == te->pid) { 1025// slaves[i].pid[j] = 0; 1026// goto outexit; 1027// } 1028// } 1029// } 1030// outexit: 1031// /* printf("\n"); */ 1032// 1033// if(i < num_slaves) { 1034// continue; 1035// } else { 1036// printf("%d: exit on non-existant PID (%u), file line %d\n", 1037// cnt, te->pid, te->fline); 1038// exit(EXIT_FAILURE); 1039// } 1040// } 1041// 1042// if(printall) { 1043// printf("find slave\n"); 1044// } 1045// 1046// /* again: */ 1047// // Find a slave with the same PID 1048// struct slave *emptyslave = NULL, *s = NULL; 1049// int i; 1050// for(i = 0; i < num_slaves; i++) { 1051// s = &slaves[i]; 1052// 1053// /* if(s->pid == 0) { */ 1054// /* /\* printf("slave %d is the empty slave\n", i); *\/ */ 1055// /* emptyslave = s; */ 1056// /* } */ 1057// 1058// for(int j = 0; j < MAX_PIDS; j++) { 1059// if(s->pid[j] == te->pid) { 1060// goto out; 1061// } 1062// } 1063// } 1064// out: 1065// 1066// // Didn't find one, find an empty one 1067// if(i == num_slaves) { 1068// // No empty slave -- wait for something to happen and try again 1069// if(emptyslave == NULL) { 1070// // Pick one randomly 1071// int randslave = rand() / (RAND_MAX / num_slaves); 1072// assert(randslave < num_slaves); 1073// s = &slaves[randslave]; 1074// 1075// /* printf("no empty slave\n"); */ 1076// /* err = event_dispatch(get_default_waitset()); */ 1077// /* assert(err_is_ok(err)); */ 1078// /* printf("past no empty slave\n"); */ 1079// /* goto again; */ 1080// } else { 1081// s = emptyslave; 1082// } 1083// } 1084// 1085// // Assign slave this PID 1086// int j; 1087// for(j = 0; j < MAX_PIDS; j++) { 1088// if(s->pid[j] == 0 || s->pid[j] == te->pid) { 1089// break; 1090// } 1091// } 1092// assert(j < MAX_PIDS); 1093// s->pid[j] = te->pid; 1094// 1095// /* if(i == num_slaves) { */ 1096// /* printf("found empty slave\n"); */ 1097// /* } else { */ 1098// /* printf("found slave %d, PID %d\n", i, s->pid); */ 1099// /* } */ 1100// 1101// /* if(te->fline >= 41352 && te->fline <= 41365) { */ 1102// /* printf("%d: %d, %zu, %d, %d, %d to slave %d, fline %d\n", */ 1103// /* cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, i, te->fline); */ 1104// /* } */ 1105// 1106//#if 1 1107// if(te->op == TOP_Exit) { 1108// printf("exit %u\n", te->pid); 1109// // See if it was a writer and remove 1110// for(struct writer *w = writers; w != NULL; w = w->next) { 1111// assert(te != NULL); 1112// assert(w != NULL); 1113// if(w->pid == te->pid) { 1114// assert(w != NULL); 1115// if(w != writers) { 1116// assert(w != NULL); 1117// assert(w->prev != NULL); 1118// w->prev->next = w->next; 1119// } else { 1120// writers = w->next; 1121// } 1122// free(w); 1123// break; 1124// } 1125// } 1126// } 1127//#endif 1128// 1129// // If someone opens a file, we have to make sure 1130// // that anyone else has stopped writing to that file. 1131// if(te->op == TOP_Open || te->op == TOP_Create) { 1132// /* for(;;) { */ 1133// if(printall) { 1134// printf("find writer\n"); 1135// } 1136// 1137// struct writer *w; 1138// for(w = writers; w != NULL; w = w->next) { 1139// assert(w != NULL); 1140// assert(te != NULL); 1141// if(w->fnum == te->u.fnum) { 1142// // Somebody's writing to this file -- wait for him to finish 1143// /* printf("Warning: Concurrent file writer, fline = %d, fnum = %zu\n", */ 1144// /* te->fline, te->u.fnum); */ 1145// /* assert(!"NYI"); */ 1146// break; 1147// } 1148// } 1149// 1150//#if 0 1151// // There's a writer -- wait for it to finish 1152// if(w != NULL) { 1153// printf("Waiting for close from previous writer\n"); 1154// err = event_dispatch(get_default_waitset()); 1155// assert(err_is_ok(err)); 1156// } else { 1157// break; 1158// } 1159//#endif 1160// } 1161// 1162// // Add a new writer to the list 1163// if(te->mode != FLAGS_RdOnly) { 1164// struct writer *w = malloc(sizeof(struct writer)); 1165// 1166// /* printf("new writer to file %zu\n", te->u.fnum); */ 1167// 1168// // printall = true; 1169// 1170// w->fnum = te->u.fnum; 1171// w->pid = te->pid; 1172// w->slave = s; 1173// w->prev = NULL; 1174// w->next = writers; 1175// if(writers) { 1176// w->next->prev = w; 1177// } 1178// writers = w; 1179// } 1180// /* } */ 1181// 1182// 1183// if(printall) { 1184// printf("sending\n"); 1185// } 1186// 1187// assert(s != NULL); 1188// if(s->queue == NULL) { 1189//#ifndef __linux__ 1190// BARELLFISH -> send request 1191//#else 1192// if(printall) { 1193// printf("send_buf 1\n"); 1194// } 1195// ssize_t r = send_buf(s, &er); 1196// if(printall) { 1197// printf("after send_buf 1\n"); 1198// } 1199// /* ssize_t r = send(s->socket, &er, sizeof(er), MSG_DONTWAIT); */ 1200// if(r == -1) { 1201// if(errno == EAGAIN) { 1202// if(printall) { 1203// printf("queueing\n"); 1204// } 1205// /* printf("queueing\n"); */ 1206// struct qelem *q = malloc(sizeof(struct qelem)); 1207// assert(q != NULL); 1208// q->er = er; 1209// q->next = s->queue; 1210// if(s->queue != NULL) { 1211// s->queue->prev = q; 1212// } else { 1213// assert(s->qend == NULL); 1214// } 1215// q->prev = NULL; 1216// s->queue = q; 1217// if(s->qend == NULL) { 1218// s->qend = q; 1219// } 1220// } else { 1221// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1222// abort(); 1223// } 1224// } else { 1225// if(r != sizeof(er)) { 1226// printf("send_message: r == %zd, size = %zu\n", r, sizeof(er)); 1227// } 1228// assert(r == sizeof(er)); 1229// } 1230//#endif 1231// } else { 1232// // Put on slave's queue 1233// if(printall) { 1234// printf("queueing\n"); 1235// } 1236// /* printf("queueing\n"); */ 1237// struct qelem *q = malloc(sizeof(struct qelem)); 1238// assert(q != NULL); 1239// q->er = er; 1240// q->next = s->queue; 1241// if(s->queue != NULL) { 1242// s->queue->prev = q; 1243// } else { 1244// assert(s->qend == NULL); 1245// } 1246// q->prev = NULL; 1247// s->queue = q; 1248// if(s->qend == NULL) { 1249// s->qend = q; 1250// } 1251// } 1252// 1253// if(printall) { 1254// printf("resending\n"); 1255// } 1256// 1257// // Resend items that got queued 1258// for(i = 0; i < num_slaves; i++) { 1259// s = &slaves[i]; 1260// for(struct qelem *q = s->qend; q != NULL;) { 1261// // Need to keep pumping and dispatch at least one event 1262// 1263//#ifndef __linux__ 1264// err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er); 1265// if(err_is_ok(err)) { 1266// if(printall) { 1267// printf("resent %d\n", q->er.fline); 1268// } 1269// struct qelem *oldq = q; 1270// s->qend = q = q->prev; 1271// free(oldq); 1272// if(s->qend == NULL) { 1273// s->queue = NULL; 1274// } 1275// } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) { 1276// DEBUG_ERR(err, "error"); 1277// abort(); 1278// } else { 1279// // still busy, can't dequeue anything 1280// /* printf("busy2\n"); */ 1281// err = event_dispatch(get_default_waitset()); 1282// assert(err_is_ok(err)); 1283// break; 1284// /* printf("still busy\n"); */ 1285// /* qend = q = q->prev; */ 1286// } 1287//#else 1288// if(printall) { 1289// printf("send_buf 2\n"); 1290// } 1291// ssize_t r = send_buf(s, &q->er); 1292// if(printall) { 1293// printf("after send_buf 2\n"); 1294// } 1295// /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */ 1296// if(r == -1) { 1297// if(errno == EAGAIN) { 1298// break; 1299// } else { 1300// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1301// abort(); 1302// } 1303// } else { 1304// if(r != sizeof(er)) { 1305// printf("send_message: r == %zd, size = %zu\n", r, sizeof(er)); 1306// } 1307// assert(r == sizeof(er)); 1308// struct qelem *oldq = q; 1309// s->qend = q = q->prev; 1310// free(oldq); 1311// if(s->qend == NULL) { 1312// s->queue = NULL; 1313// } 1314// } 1315//#endif 1316// } 1317// } 1318// } 1319// 1320// printf("draining\n"); 1321// 1322// // Drain the queue 1323// for(int i = 0; i < num_slaves; i++) { 1324// struct slave *s = &slaves[i]; 1325// for(struct qelem *q = s->qend; q != NULL;) { 1326//#ifndef __linux__ 1327// err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er); 1328// if(err_is_ok(err)) { 1329// /* printf("resent %d\n", q->er.fline); */ 1330// struct qelem *oldq = q; 1331// s->qend = q = q->prev; 1332// free(oldq); 1333// if(s->qend == NULL) { 1334// s->queue = NULL; 1335// } 1336// } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) { 1337// DEBUG_ERR(err, "error"); 1338// abort(); 1339// } else { 1340// // still busy, can't dequeue anything 1341// break; 1342// /* printf("still busy\n"); */ 1343// /* qend = q = q->prev; */ 1344// } 1345//#else 1346// ssize_t r = send_buf(s, &q->er); 1347// /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */ 1348// if(r == -1) { 1349// if(errno == EAGAIN) { 1350// break; 1351// } else { 1352// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1353// abort(); 1354// } 1355// } else { 1356// if(r != sizeof(q->er)) { 1357// printf("send_message: r == %zd, size = %zu\n", r, sizeof(q->er)); 1358// } 1359// assert(r == sizeof(q->er)); 1360// struct qelem *oldq = q; 1361// s->qend = q = q->prev; 1362// free(oldq); 1363// if(s->qend == NULL) { 1364// s->queue = NULL; 1365// } 1366// } 1367//#endif 1368// } 1369// } 1370// 1371// for(int i = 0; i < num_slaves; i++) { 1372// struct slave *s = &slaves[i]; 1373// replay_eventrec_t er = { 1374// .op = TOP_End 1375// }; 1376//#ifndef __linux__ 1377// err = s->b->tx_vtbl.event(s->b, NOP_CONT, er); 1378// assert(err_is_ok(err)); 1379//#else 1380// ssize_t r = send_buf(s, &er); 1381// if(r == -1) { 1382// if(errno == EAGAIN) { 1383// printf("buffer full\n"); 1384// abort(); 1385// } else { 1386// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1387// abort(); 1388// } 1389// } 1390//#endif 1391// } 1392// 1393// do { 1394// err = event_dispatch(get_default_waitset()); 1395// assert(err_is_ok(err)); 1396// } while(num_finished < num_slaves); 1397// 1398// uint64_t end = rdtsc(); 1399// 1400//#if 0 1401// // Wait for 5 seconds 1402// uint64_t beg = rdtsc(); 1403// while(rdtsc() - beg < tscperms * 5000) { 1404//#ifndef __linux__ 1405// thread_yield(); 1406//#else 1407// sched_yield(); 1408//#endif 1409// } 1410//#endif 1411// 1412// printf("replay done, took %" PRIu64" ms\n", (end - start) / tscperms); 1413// 1414// 1415