1#include <stdio.h> 2#include <stdlib.h> 3#include <assert.h> 4#include <string.h> 5#ifndef __linux__ 6#include <barrelfish/barrelfish.h> 7#include <vfs/vfs.h> 8#include <barrelfish/nameservice_client.h> 9#include <barrelfish/bulk_transfer.h> 10#include <if/replay_defs.h> 11#include <errno.h> 12#else 13static const char vfs_cache_str[] = "linux (TCP/IP)"; 14#include <unistd.h> 15#include <stdbool.h> 16#include <sys/types.h> 17#include <sys/socket.h> 18#include <sys/time.h> 19#include <netinet/in.h> 20#include <netinet/ip.h> 21#include <poll.h> 22#include <netdb.h> 23#include <errno.h> 24#include <sched.h> 25#include <inttypes.h> 26#endif 27 28#include "defs.h" 29#include "hash.h" 30 31#define MAX_LINE 1024 32#define MAX_SLAVES 64 33#define MAX_DEPS 60 34#define BULK_BLOCK_SIZE (4096*256) // 1MiB 35#define BULK_BLOCKS_NR 1 36#define BULK_TOTAL_SIZE (BULK_BLOCK_SIZE*BULK_BLOCKS_NR) 37 38/* TRACE LIST */ 39 40struct trace_list { 41 struct trace_entry *head, *tail; 42}; 43 44static inline void trace_add_tail(struct trace_list *tl, struct trace_entry *te) 45{ 46 te->next = NULL; 47 48 if (tl->head == NULL) { 49 tl->head = te; 50 } else { 51 tl->tail->next = te; 52 } 53 54 tl->tail = te; 55} 56 57/* PID ENTRIES (TASKS) */ 58 59struct slave; /* forward reference */ 60struct pid_entry { 61 int pid; 62 struct trace_list trace_l; 63 size_t tentries_nr; 64 /* list of children */ 65 struct pid_entry *children[MAX_DEPS]; 66 size_t children_nr; 67 /* list of parents */ 68 struct pid_entry *parents[MAX_DEPS]; 69 size_t parents_nr; 70 71 /* buffer for trace entries */ 72 replay_eventrec_t *tes; 73 size_t tes_size; /* size of tes buffer in bytes */ 74 75 size_t parents_completed; 76 struct slave *sl; 77 unsigned completed:1; 78}; 79 80/* SLAVES */ 81 82struct slave { 83 //int pid[MAX_PIDS]; 84 struct pid_entry *pe; 85 //struct trace_entry *current_te; 86 //struct qelem *queue, *qend; 87#ifndef __linux__ 88 struct replay_binding *b; 89 /* bulk transfer data */ 90 struct capref frame; 91 struct bulk_transfer bt; 92 uintptr_t current_bid; 93#else 94 int socket; 95 ssize_t sentbytes; 96#endif 97}; 98 99static struct { 100 int num_slaves; 101 int num_finished; 102 struct slave slaves[MAX_SLAVES]; 103} SlState; 104 105//struct qelem { 106// replay_eventrec_t er; 107// struct qelem *next, *prev; 108//}; 109// 110//struct writer { 111// int fnum, pid; 112// struct slave *slave; 113// struct writer *prev, *next; 114//}; 115 116/* 117 * Dependencies: 118 * W writes to a file that R reads 119 * R needs to wait for W to finish 120 * W is parent, R is child 121 * nodes that are ready to execute have no parents 122 */ 123struct task_graph { 124 struct pid_entry *pes; /* array of (all) pid entries */ 125 int pes_nr, pes_size; /* number of pid entries, size of array */ 126 struct pid_entry **stack; /* current stack */ 127 int stack_nr, stack_size; /* stack entries/size */ 128 int pes_completed; /* completed entries */ 129 hash_t *pids_h; /* pid -> pid_entry hash */ 130}; 131static struct task_graph TG; 132 133static struct pid_entry * 134tg_stack_peek(struct task_graph *tg, int i) 135{ 136 assert(i < tg->stack_nr); 137 return tg->stack[tg->stack_size - i]; 138} 139 140static struct pid_entry * 141tg_pop(struct task_graph *tg) 142{ 143 struct pid_entry *ret; 144 if (tg->stack_nr == 0) 145 return NULL; 146 ret = tg->stack[tg->stack_size +1 - tg->stack_nr]; 147 tg->stack_nr--; 148 assert(ret != NULL); 149 return ret; 150} 151 152static void 153tg_push(struct task_graph *tg, struct pid_entry *pe) 154{ 155 assert(tg->stack_nr < tg->stack_size); 156 tg->stack[tg->stack_size - tg->stack_nr] = pe; 157 tg->stack_nr++; 158} 159 160static void 161__build_taskgraph_stack(struct task_graph *tg) 162{ 163 tg->stack_nr = 0; 164 for (int i=0; i<tg->pes_nr; i++) { 165 struct pid_entry *pe = tg->pes + i; 166 if (pe->parents_nr == 0) 167 tg_push(tg, pe); 168 } 169} 170 171static void 172build_taskgraph_stack(struct task_graph *tg) 173{ 174 // build stack, and fill it with ready nodes (i.e., no parent nodes) 175 tg->stack_size = TOTAL_PIDS; 176 tg->stack = calloc(tg->stack_size, sizeof(struct pid_entry *)); 177 assert(tg->stack); 178 __build_taskgraph_stack(tg); 179} 180 181static void __attribute__((unused)) 182tg_reset(struct task_graph *tg) 183{ 184 for (int i=0; i<tg->pes_nr; i++) { 185 tg->pes[i].completed = 0; 186 tg->pes[i].parents_completed = 0; 187 } 188 tg->pes_completed = 0; 189 __build_taskgraph_stack(tg); 190} 191 192static void 193tg_complete(struct task_graph *tg, struct pid_entry *pe) 194{ 195 assert(pe->completed == 0); 196 pe->completed = 1; 197 tg->pes_completed++; 198 dmsg("\tpe: %p (idx:%ld,pid:%d) completed (%d/%d)\n", pe, (pe - tg->pes), pe->pid, tg->pes_completed, tg->pes_nr); 199 for (int i=0; i < pe->children_nr; i++) { 200 struct pid_entry *pe_child = pe->children[i]; 201 if (++pe_child->parents_completed == pe_child->parents_nr) { 202 tg_push(tg, pe_child); 203 } 204 } 205} 206 207 208//static struct writer *writers = NULL; 209#ifndef __linux__ 210static bool bound; /* XXX: make this volatile? No, handler runs on the same context */ 211static bool init_ok; 212static bool print_stats_ok; 213#endif 214 215#ifdef __linux__ 216#endif 217 218#ifndef __linux__ 219//static void event_done(struct replay_binding *b, uint32_t fnum) 220//{ 221// /* if(te->op == TOP_Close) { */ 222// // See if it was a writer and remove 223// 224// printf("writer done for %u\n", fnum); 225// 226// for(struct writer *w = writers; w != NULL; w = w->next) { 227// if(w->fnum == fnum) { 228// assert(w != NULL); 229// if(w != writers) { 230// assert(w != NULL); 231// assert(w->prev != NULL); 232// w->prev->next = w->next; 233// } else { 234// writers = w->next; 235// } 236// free(w); 237// break; 238// } 239// } 240// /* } */ 241//} 242 243static void 244init_reply_handler(struct replay_binding *b) 245{ 246 assert(!init_ok); 247 init_ok = true; 248} 249 250static void 251print_stats_reply_handler(struct replay_binding *b) 252{ 253 assert(!print_stats_ok); 254 print_stats_ok = true; 255} 256static void 257task_completed_handler(struct replay_binding *b, uint16_t pid, uint64_t bulk_id) 258{ 259 struct task_graph *tg = b->st; 260 struct slave *sl; 261 struct pid_entry *pe; 262 263 pe = (void *)hash_lookup(tg->pids_h, pid); 264 assert(pe != (void *)HASH_ENTRY_NOTFOUND); 265 sl = pe->sl; 266 tg_complete(tg, pe); 267 bulk_free(&sl->bt, bulk_id); 268 sl->pe = NULL; 269} 270 271static void 272finish_reply_handler(struct replay_binding *b) 273{ 274 dmsg("ENTER\n"); 275 SlState.num_finished++; 276} 277 278static struct replay_rx_vtbl replay_vtbl = { 279 .task_completed = task_completed_handler, 280 .slave_finish_reply = finish_reply_handler, 281 .slave_init_reply = init_reply_handler, 282 .slave_print_stats_reply = print_stats_reply_handler 283}; 284 285static void replay_bind_cont(void *st, errval_t err, struct replay_binding *b) 286{ 287 static int slavenum = 0; 288 struct slave *sl = &SlState.slaves[slavenum]; 289 slavenum++; 290 291 dmsg("ENTER\n"); 292 //printf("%s:%s MY TASKGRAPH IS %p\n", __FILE__, __FUNCTION__, &TG); 293 294 assert(err_is_ok(err)); 295 sl->b = b; 296 b->rx_vtbl = replay_vtbl; 297 b->st = &TG; 298 bound = true; 299 /* printf("assigned binding to %p\n", sl); */ 300} 301#else 302static inline uint64_t rdtsc(void) 303{ 304 uint32_t eax, edx; 305 __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx)); 306 return ((uint64_t)edx << 32) | eax; 307} 308#endif 309 310//static bool printall = false; 311 312 313static inline void 314pentry_add_dependency(struct pid_entry *parent, struct pid_entry *child) 315{ 316 int idx; 317 for (int i=0; i<parent->children_nr; i++) 318 if (parent->children[i]->pid == child->pid) 319 return; 320 321 //printf("%d -> %d;\n", parent->pid, child->pid); 322 323 idx = parent->children_nr; 324 assert(idx < MAX_DEPS); 325 parent->children[idx] = child; 326 parent->children_nr++; 327 328 idx = child->parents_nr; 329 assert(idx < MAX_DEPS); 330 child->parents[idx] = parent; 331 child->parents_nr++; 332} 333 334//static struct pid_entry *allpids[nOTAL_PIDS]; 335 336static void 337parse_tracefile_line(char *line, int linen, struct trace_entry *te) 338{ 339 size_t fnum, size; 340 char flags[1024]; 341 int fd; 342 unsigned int pid; 343 344 if(sscanf(line, "open %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) { 345 te->op = TOP_Open; 346 te->fd = fd; 347 te->u.fnum = fnum; 348 } else if(sscanf(line, "close %d %u", &fd, &pid) >= 2) { 349 te->op = TOP_Close; 350 te->fd = fd; 351 } else if(sscanf(line, "read %d %zu %u", &fd, &size, &pid) >= 3) { 352 te->op = TOP_Read; 353 te->fd = fd; 354 te->u.size = size; 355 } else if(sscanf(line, "write %d %zu %u", &fd, &size, &pid) >= 3) { 356 te->op = TOP_Write; 357 te->fd = fd; 358 te->u.size = size; 359 } else if(sscanf(line, "seek %d %zu %u", &fd, &size, &pid) >= 3) { 360 te->op = TOP_Seek; 361 te->fd = fd; 362 te->u.size = size; 363 } else if(sscanf(line, "creat %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) { 364 te->op = TOP_Create; 365 te->fd = fd; 366 te->u.fnum = fnum; 367 } else if(sscanf(line, "unlink %zu %s %u", &fnum, flags, &pid) >= 3) { 368 te->op = TOP_Unlink; 369 te->u.fnum = fnum; 370 } else if(sscanf(line, "exit %u", &pid) >= 1) { 371 te->op = TOP_Exit; 372 } else { 373 printf("Invalid line %d: %s\n", linen, line); 374 exit(EXIT_FAILURE); 375 } 376 377 // There's always a PID 378 te->pid = pid; 379 assert(pid != 0); 380 te->fline = linen; 381 382 // If we have flags, set them now 383 if(te->op == TOP_Open || te->op == TOP_Create) { 384 if(!strcmp(flags, "rdonly")) { 385 te->mode = FLAGS_RdOnly; 386 } else if(!strcmp(flags, "wronly")) { 387 te->mode = FLAGS_WrOnly; 388 } else if(!strcmp(flags, "rdwr")) { 389 te->mode = FLAGS_RdWr; 390 } else { 391 printf("Invalid open flags: %s\n", flags); 392 exit(EXIT_FAILURE); 393 } 394 } 395} 396 397static void 398parse_tracefile(const char *tracefile, struct trace_list *tlist) 399{ 400 printf("reading tracefile...\n"); 401 402 FILE *f = fopen(tracefile, "r"); 403 assert(f != NULL); 404 int linen = 0; 405 406 while(!feof(f)) { 407 char line[MAX_LINE]; 408 409 if (fgets(line, MAX_LINE, f) == NULL) { 410 break; 411 } 412 413 linen++; 414 if (linen % 1000 == 0) { 415 printf("---- %s:%s() parsing line = %d\n", __FILE__, __FUNCTION__, linen); 416 } 417 418 struct trace_entry *tentry = malloc(sizeof(struct trace_entry)); 419 assert(tentry != NULL); 420 421 // parse current line to tentry 422 parse_tracefile_line(line, linen, tentry); 423 424 // Link it in with the rest of the list (forward order) 425 trace_add_tail(tlist, tentry); 426 } 427 fclose(f); 428 printf("tracefile read [number of lines:%d]\n", linen); 429} 430 431static void 432build_taskgraph(struct trace_list *tl, struct task_graph *tg) 433{ 434 assert(tl->head != NULL && tl->tail != NULL); 435 struct trace_entry *te, *te_next = NULL; 436 hash_t *writers_h = hash_init(TOTAL_PIDS); /* fnum -> pid_entry */ 437 // alloc a sequential array for all pid entries 438 tg->pids_h = hash_init(TOTAL_PIDS); /* pid -> pid_entry */ 439 tg->pes_size = TOTAL_PIDS; 440 tg->pes = calloc(tg->pes_size, sizeof(struct pid_entry)); // we can realloc it later 441 tg->pes_nr = 0; 442 443 for (te = tl->head; te != NULL; te = te_next) { 444 // store next pointer (trace_add_tail will change it) 445 te_next = te->next; 446 447 // if this is a new pid, create a new pid_entry 448 struct pid_entry *pe = (void *)hash_lookup(tg->pids_h, te->pid); 449 if (pe == (void *)HASH_ENTRY_NOTFOUND) { 450 // if a pid's first operation is not Open/Create ignore it 451 if (te->op != TOP_Open && te->op != TOP_Create) { 452 printf("%s() :: IGNORING operation %d for pid %d\n", __FUNCTION__, te->op, te->pid); 453 continue; 454 } 455 assert(tg->pes_nr < tg->pes_size); 456 pe = tg->pes + tg->pes_nr++; 457 pe->pid = te->pid; 458 hash_insert(tg->pids_h, pe->pid, (unsigned long)pe); 459 //printf("%d;\n", pe->pid); 460 } 461 462 // add trace entry into pid list 463 trace_add_tail(&pe->trace_l, te); 464 pe->tentries_nr++; 465 466 /* track dependencies: 467 * - look at open/create operations 468 * - put writers in a hash table, based on the file they write 469 * - for readers, check if a writer exists for the file they read 470 * - avoid cyclic dependencies of RW open/create 471 * Note: For multiple writers we just track the latest open/create */ 472 if (te->op == TOP_Open && te->op == TOP_Create) { 473 size_t fnum = te->u.fnum; 474 struct pid_entry *pe_writer = (void *)hash_lookup(writers_h, fnum); 475 476 if (te->mode != FLAGS_RdOnly) { // writer 477 if (pe_writer != (void *)HASH_ENTRY_NOTFOUND && pe_writer->pid != pe->pid) { 478 //assert(0 && "multiple different writers"); 479 } 480 hash_insert(writers_h, fnum, (unsigned long)pe); 481 pe_writer = pe; 482 } 483 484 if ((te->mode != FLAGS_WrOnly) && // this is a reader 485 (pe_writer != (void *)HASH_ENTRY_NOTFOUND) && // a writer exists 486 (pe_writer->pid != pe->pid)) { // and is not the reader 487 pentry_add_dependency(pe_writer, pe); 488 } 489 } 490 } 491 build_taskgraph_stack(tg); 492 //cleanup and return 493 tl->head = tl->tail = NULL; 494 hash_destroy(writers_h); 495} 496 497static void 498__print_taskgraph(struct pid_entry *root, int level) 499{ 500 if (root == NULL) 501 return; 502 for (int i=0; i<level; i++) 503 printf("\t"); 504 printf("%d (completed:%d)\n", root->pid, root->completed); 505 506 for (int i=0; i<root->children_nr; i++) { 507 __print_taskgraph(root->children[i], level+1); 508 } 509} 510 511static void __attribute__((unused)) 512print_taskgraph(struct task_graph *tg) 513{ 514 for (int i=0; i<tg->stack_nr; i++) { 515 struct pid_entry *pe = tg_stack_peek(tg, i); 516 __print_taskgraph(pe, 0); 517 } 518} 519 520static void 521print_pid_entry(struct pid_entry *pe, int print_ops) 522{ 523 struct trace_entry *te; 524 printf("pid entry (%p) pid:%d children:%zu parents:%zu completed:%d tentries:%zd\n", pe, pe->pid, pe->children_nr, pe->parents_nr, pe->completed, pe->tentries_nr); 525 te = pe->trace_l.head; 526 527 if (!print_ops) { 528 return; 529 } 530 531 do { 532 printf("\t op:%d pid:%d\n", te->op, te->pid); 533 } while ((te = te->next) != NULL); 534 printf("\tEND\n"); 535} 536 537static void __attribute__((unused)) 538print_task(struct task_graph *tg, int pid) 539{ 540 for (int i=0; i<tg->pes_nr; i++) { 541 struct pid_entry *pe = tg->pes + i; 542 if (pe->pid == pid) { 543 print_pid_entry(pe, 0); 544 } 545 } 546} 547 548static void 549mk_replay_event_req(struct trace_entry *te, replay_eventrec_t *req) 550{ 551 req->op = te->op; 552 req->fd = te->fd; 553 req->mode = te->mode; 554 //req->fline = te->fline; 555 req->pid = te->pid; 556 557 switch(te->op) { 558 case TOP_Open: 559 case TOP_Create: 560 case TOP_Unlink: 561 req->fnumsize = te->u.fnum; 562 break; 563 564 case TOP_Read: 565 case TOP_Write: 566 case TOP_Seek: 567 req->fnumsize = te->u.size; 568 break; 569 570 case TOP_Close: 571 case TOP_Exit: 572 break; 573 574 default: 575 assert(0); 576 break; 577 } 578 579 assert(req->pid != 0); 580} 581 582 583static void 584trace_bufs_init(struct task_graph *tg) 585{ 586 for (int i=0; i < tg->pes_nr; i++) { 587 struct pid_entry *pe = tg->pes + i; 588 589 size_t size = pe->tes_size = sizeof(replay_eventrec_t)*pe->tentries_nr; 590 if (size >= BULK_TOTAL_SIZE) { 591 msg("size for pid:%d [%zd] larger than %d\n", pe->pid, size, BULK_TOTAL_SIZE); 592 assert(0); 593 } 594 595 assert(size <= BULK_TOTAL_SIZE); 596 pe->tes = malloc(size); 597 assert(pe->tes); 598 599 struct trace_entry *te = pe->trace_l.head; 600 for (int ti=0; ti < pe->tentries_nr; ti++) { 601 mk_replay_event_req(te, pe->tes + ti); 602 te = te->next; 603 } 604 assert(te == NULL); // make sure the count is right! 605 } 606} 607 608static void __attribute__((unused)) 609print_all_tasks(struct task_graph *tg) 610{ 611 for (int i=0; i<tg->pes_nr; i++) { 612 struct pid_entry *pe = tg->pes + i; 613 print_pid_entry(pe, 0); 614 } 615} 616 617/* functions to be implemented seperately by bfish/linux */ 618static void slaves_connect(struct task_graph *tg); 619static void slave_push_work(struct slave *); 620static void slaves_finalize(void); 621static void slaves_print_stats(void); 622static void master_process_reqs(void); 623uint64_t tscperms; 624 625int main(int argc, char *argv[]) 626{ 627#ifndef __linux__ 628 if(argc < 5) { 629 printf("Usage: %s tracefile nslaves mountdir mount-URL\n", argv[0]); 630 exit(EXIT_FAILURE); 631 } 632 633 assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms))); 634 errval_t err = vfs_mkdir(argv[3]); 635 assert(err_is_ok(err)); 636 637 printf("----------------------------------- VFS MOUNT\n"); 638 err = vfs_mount(argv[3], argv[4]); 639 if(err_is_fail(err)) { 640 DEBUG_ERR(err, "vfs_mount"); 641 } 642 printf("----------------------------------- VFS MOUNT DONE\n"); 643 assert(err_is_ok(err)); 644#else 645 if(argc < 3) { 646 printf("Usage: %s tracefile nslaves\n", argv[0]); 647 exit(EXIT_FAILURE); 648 } 649#endif 650 651 memset(&SlState, 0, sizeof(SlState)); 652 //SlState.waitset = get_default_waitset(); 653 //struct waitset ws; 654 //waitset_init(&ws); 655 //SlState.waitset = &ws; 656 657 char *tracefile = argv[1]; 658 SlState.num_slaves = atoi(argv[2]); 659 printf("tracefile=%s\n", tracefile); 660 661 printf("reading dependency graph...\n"); 662 663 // Parse trace file into memory records 664 struct trace_list tlist = {.head = NULL, .tail = NULL}; 665 parse_tracefile(tracefile, &tlist); 666 667 // Build task graph. roots are nodes without dependencies 668 #ifndef __linux__ 669 msg("[MASTER] My cpu is: %d\n", disp_get_core_id()); 670 #endif 671 memset(&TG, 0, sizeof(TG)); 672 build_taskgraph(&tlist, &TG); 673 //print_all_tasks(&TG); 674 //print_taskgraph(&TG); 675 //printf("TG entries:%d completed:%d stack_size:%d\n", TG.pes_nr, TG.pes_completed, TG.stack_nr); 676 677 msg("[MASTER] INITIALIZING BUFFERS...\n"); 678 trace_bufs_init(&TG); 679 680 msg("[MASTER] CONNECTING TO SLAVES...\n"); 681 slaves_connect(&TG); 682 683 msg("[MASTER] STARTING WORK...\n"); 684 uint64_t start_ticks = rdtsc(); 685 for (;;) { 686 /* enqueue work to the slaves */ 687 for (int sid=0; sid < SlState.num_slaves; sid++) { 688 struct slave *sl = SlState.slaves + sid; 689 // try to assign a pid entry to a slave, if it doesn't hove one 690 if (sl->pe == NULL) { 691 sl->pe = tg_pop(&TG); 692 if (sl->pe == NULL) { 693 continue; /* no more tasks in the stack */ 694 } 695 dmsg("[MASTER] assigned pid:%d to sl:%d (stack_nr:%d completed:%d total:%d bytes:%zd)\n", 696 sl->pe->pid, sid, TG.stack_nr, TG.pes_completed, TG.pes_nr, sl->pe->tes_size); 697 sl->pe->sl = sl; 698 slave_push_work(sl); 699 } 700 master_process_reqs(); 701 } 702 703 if (TG.pes_completed == TG.pes_nr) 704 break; 705 } 706 707 uint64_t work_ticks = rdtsc() - start_ticks; 708 slaves_finalize(); 709 uint64_t total_ticks = rdtsc() - start_ticks; 710 printf("[MASTER] replay done> cache:%s slaves:%d ticks:%" PRIu64 711 " (%lf ms) [total: %lfms]\n", 712 vfs_cache_str, SlState.num_slaves, work_ticks, 713 (double)work_ticks /(double)tscperms, 714 (double)total_ticks/(double)tscperms); 715 slaves_print_stats(); 716 return 0; 717} 718 719#ifndef __linux__ 720static void 721master_process_reqs(void) 722{ 723 /* process slave requests */ 724 for (;;){ 725 struct event_closure closure; 726 errval_t ret; 727 ret = check_for_event(get_default_waitset(), &closure); 728 if (ret == LIB_ERR_NO_EVENT) 729 break; 730 assert(err_is_ok(ret)); 731 assert(closure.handler != NULL); 732 //printf(RED("-------- GOT A MASTER EVENT handler=%p arg=%p"), closure.handler, closure.arg); 733 closure.handler(closure.arg); 734 } 735} 736 737static void 738slaves_finalize(void) 739{ 740 int err; 741 742 /* notify slaves */ 743 for (int sid=0; sid < SlState.num_slaves; sid++) { 744 struct slave *sl = SlState.slaves + sid; 745 do { 746 err = sl->b->tx_vtbl.slave_finish(sl->b, NOP_CONT); 747 } while (err_no(err) == FLOUNDER_ERR_TX_BUSY); 748 assert(err_is_ok(err)); 749 } 750 751 /* wait for their replies */ 752 do { 753 err = event_dispatch(get_default_waitset()); 754 assert(err_is_ok(err)); 755 } while (SlState.num_finished < SlState.num_slaves); 756} 757 758static void 759slaves_print_stats(void) 760{ 761 int err; 762 763 /* have slaves print stats synchronously */ 764 for (int sid=0; sid < SlState.num_slaves; sid++) { 765 struct slave *sl = SlState.slaves + sid; 766 print_stats_ok = false; 767 err = sl->b->tx_vtbl.slave_print_stats(sl->b, NOP_CONT); 768 assert(err_is_ok(err)); 769 while (!print_stats_ok) { 770 err = event_dispatch(get_default_waitset()); 771 assert(err_is_ok(err)); 772 } 773 } 774} 775 776static void 777slave_push_work(struct slave *sl) 778{ 779 int err; 780 struct bulk_buf *bb; 781 uint64_t bulk_id; 782 783 //dmsg("pushing work for slave: %ld (%p) pid:%d (completed:%d)\n", sl-slaves, sl, sl->pe->pid, sl->pe->completed); 784 bb = bulk_alloc(&sl->bt); 785 if (bb == NULL) { 786 return; 787 } 788 bulk_buf_copy(bb, sl->pe->tes, sl->pe->tes_size); 789 bulk_id = bulk_prepare_send(bb); 790 err = sl->b->tx_vtbl.new_task(sl->b, NOP_CONT, bulk_id, sl->pe->tes_size); 791 if (err == FLOUNDER_ERR_TX_BUSY) { 792 bulk_free(&sl->bt, bulk_id); 793 return; 794 } 795 assert(err_is_ok(err)); 796} 797 798static void 799slaves_connect(struct task_graph *tg) 800{ 801 char name[128]; 802 iref_t iref; 803 int err; 804 805 for (int sid=0; sid < SlState.num_slaves; sid++) { 806 int r = snprintf(name, 128, "replay_slave.%u", sid + 1); 807 struct slave *sl = SlState.slaves + sid; 808 assert(r != -1); 809 810 err = nameservice_blocking_lookup(name, &iref); 811 if (err_is_fail(err)) { 812 DEBUG_ERR(err, "could not lookup IREF for replay slave"); 813 abort(); 814 } 815 816 /* bound to slave */ 817 bound = false; 818 err = replay_bind(iref, replay_bind_cont, NULL, 819 get_default_waitset(), 820 IDC_BIND_FLAGS_DEFAULT); 821 if(err_is_fail(err)) { 822 DEBUG_ERR(err, "replay_bind"); 823 } 824 while(!bound) { 825 err = event_dispatch(get_default_waitset()); 826 assert(err_is_ok(err)); 827 } 828 msg("Bound to slave %d\n", sid); 829 830 /* initialize bulk transfer for slave */ 831 init_ok = false; 832 err = bulk_create(BULK_TOTAL_SIZE, BULK_BLOCK_SIZE, &sl->frame, &sl->bt, false); 833 assert(err_is_ok(err)); 834 err = sl->b->tx_vtbl.slave_init(sl->b, NOP_CONT, sl->frame, BULK_TOTAL_SIZE); 835 assert(err_is_ok(err)); 836 while (!init_ok) { 837 err = event_dispatch(get_default_waitset()); 838 assert(err_is_ok(err)); 839 } 840 841 msg("Slave %d initialized\n", sid); 842 } 843} 844#endif 845 846#ifdef __linux__ 847static void 848slave_push_work(struct slave *sl) 849{ 850 ssize_t ret; 851 852 assert(sl->pe != NULL); 853 assert(sl->pe->tes_size > sl->sentbytes); 854 855 dmsg("TRYING TO SEND: %zd bytes for pid=%u\n", sl->pe->tes_size - sl->sentbytes, sl->pe->pid); 856 ret = send(sl->socket, 857 (char *)sl->pe->tes + sl->sentbytes, 858 sl->pe->tes_size - sl->sentbytes, 859 0); /* setting MSG_DONTWAIT seems to cause problems */ 860 if (ret <= 0 && errno != EAGAIN) { 861 perror("send"); 862 exit(1); 863 } 864 dmsg("SENT: %zd bytes for pid=%u\n", ret, sl->pe->pid); 865 sl->sentbytes += ret; 866} 867 868static void 869slaves_finalize(void) 870{ 871 for (int i=0; i<SlState.num_slaves; i++) { 872 struct slave *sl = SlState.slaves + i; 873 close(sl->socket); 874 } 875} 876 877static void 878master_process_reqs(void) 879{ 880 uint16_t rpid; 881 // read from available fds 882 for (int i=0; i<SlState.num_slaves; i++) { 883 struct slave *sl = SlState.slaves + i; 884 885 if (sl->pe == NULL) { 886 continue; 887 } 888 889 /* first check if we need to send more data to the slave 890 * (main loop will only call slave_push_work() once) */ 891 if (sl->pe->tes_size > sl->sentbytes) { 892 slave_push_work(sl); 893 } 894 895 int r = recv(sl->socket, &rpid, sizeof(rpid), MSG_DONTWAIT); 896 if (r == -1) { 897 assert(errno == EWOULDBLOCK || errno == EAGAIN); 898 continue; /* no data here, move on */ 899 } 900 /* slave is done with task */ 901 assert(r == sizeof(rpid)); 902 assert(rpid == sl->pe->pid); 903 tg_complete(&TG, sl->pe); 904 sl->pe = NULL; 905 sl->sentbytes = 0; 906 } 907} 908 909static void slaves_print_stats(void) 910{ 911} 912 913/* connection info */ 914 915static void 916slaves_connect(struct task_graph *tg) 917{ 918 msg("connecting to slaves...\n"); 919 for(int i = 0; i < SlState.num_slaves; i++) { 920 int ret; 921 struct slave *sl = &SlState.slaves[i]; 922 923 sl->socket = socket(AF_INET, SOCK_STREAM, 0); 924 if (sl->socket == -1) { 925 perror("socket"); 926 exit(1); 927 } 928 929 struct sockaddr_in a = { 930 .sin_family = PF_INET, 931 .sin_port = htons(0), 932 .sin_addr = { 933 .s_addr = htonl(INADDR_ANY) 934 } 935 }; 936 937 ret = bind(sl->socket, (struct sockaddr *)&a, sizeof(a)); 938 if (ret != 0) { 939 perror("bind"); 940 exit(1); 941 } 942 943 int port = 1234; 944 char host[128]; 945 snprintf(host, 128, "rck%02u", i + 1); 946 947 // FOR DEBUGGING! 948 snprintf(host, 128, "localhost"); 949 printf("connecting to %s:%d ...\n", host, port); 950 port = 1234 + i; 951 952 struct hostent *h; 953 h = gethostbyname(host); 954 assert(h != NULL && h->h_length == sizeof(struct in_addr)); 955 956 struct sockaddr_in sa = { 957 .sin_family = AF_INET, 958 .sin_port = htons(port), 959 .sin_addr = *(struct in_addr *)h->h_addr_list[0] 960 }; 961 962 ret = connect(sl->socket, (struct sockaddr *)&sa, sizeof(sa)); 963 if (ret < 0) { 964 perror("connect"); 965 exit(1); 966 } 967 968 sl->sentbytes = 0; 969 #if 0 /* do a recv with MSG_DONTWAIT */ 970 /* set non-blocking flag */ 971 int sock_fl = fcntl(sl->socket, F_GETFD); 972 sock_fl |= O_NONBLOCK; 973 sock_fl = fcntl(sl->socket, F_SETFD, sock_fl); 974 assert(sock_fl & O_NONBLOCK); 975 #endif 976 } 977} 978#endif 979 980// 981// 982// uint64_t tscperms; 983//#ifndef __linux__ 984// err = sys_debug_get_tsc_per_ms(&tscperms); 985// assert(err_is_ok(err)); 986// 987//#else 988// tscperms = 533000; 989// 990//#endif 991// 992// printf("starting replay\n"); 993// 994// /* for(struct trace_entry *te = trace; te != NULL; te = te->next) { */ 995// /* static int cnt = 0; */ 996// /* printf("%d: %d, %zu, %d, %d, %d, fline %d\n", */ 997// /* cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, te->fline); */ 998// /* cnt++; */ 999// /* } */ 1000// 1001// uint64_t start = rdtsc(); 1002// 1003// // Start trace replay 1004// for(struct trace_entry *te = trace; te != NULL; te = te->next) { 1005// // Distribute work to slaves -- either they are empty (PID == 1006// // 0) or they already execute for a PID, in which case we keep 1007// // sending them that PID's work until the PID exits) 1008// 1009// static int cnt = 0; 1010// 1011// /* if(((cnt * 100) / linen) % 5 == 0) { */ 1012// /* printf("%d / %d\n", cnt, linen); */ 1013// /* } */ 1014// cnt++; 1015// 1016// /* printall = false; */ 1017// /* if(cnt == 6186 || cnt == 5840) { */ 1018// /* printall = true; */ 1019// /* } */ 1020// 1021// // If this is an exit, remove the PID and continue 1022// if(te->op == TOP_Exit) { 1023// int i; 1024// /* printf("PIDs: "); */ 1025// for(i = 0; i < num_slaves; i++) { 1026// /* printf("%u ", slaves[i].pid); */ 1027// for(int j = 0; j < MAX_PIDS; j++) { 1028// if(slaves[i].pid[j] == te->pid) { 1029// slaves[i].pid[j] = 0; 1030// goto outexit; 1031// } 1032// } 1033// } 1034// outexit: 1035// /* printf("\n"); */ 1036// 1037// if(i < num_slaves) { 1038// continue; 1039// } else { 1040// printf("%d: exit on non-existant PID (%u), file line %d\n", 1041// cnt, te->pid, te->fline); 1042// exit(EXIT_FAILURE); 1043// } 1044// } 1045// 1046// if(printall) { 1047// printf("find slave\n"); 1048// } 1049// 1050// /* again: */ 1051// // Find a slave with the same PID 1052// struct slave *emptyslave = NULL, *s = NULL; 1053// int i; 1054// for(i = 0; i < num_slaves; i++) { 1055// s = &slaves[i]; 1056// 1057// /* if(s->pid == 0) { */ 1058// /* /\* printf("slave %d is the empty slave\n", i); *\/ */ 1059// /* emptyslave = s; */ 1060// /* } */ 1061// 1062// for(int j = 0; j < MAX_PIDS; j++) { 1063// if(s->pid[j] == te->pid) { 1064// goto out; 1065// } 1066// } 1067// } 1068// out: 1069// 1070// // Didn't find one, find an empty one 1071// if(i == num_slaves) { 1072// // No empty slave -- wait for something to happen and try again 1073// if(emptyslave == NULL) { 1074// // Pick one randomly 1075// int randslave = rand() / (RAND_MAX / num_slaves); 1076// assert(randslave < num_slaves); 1077// s = &slaves[randslave]; 1078// 1079// /* printf("no empty slave\n"); */ 1080// /* err = event_dispatch(get_default_waitset()); */ 1081// /* assert(err_is_ok(err)); */ 1082// /* printf("past no empty slave\n"); */ 1083// /* goto again; */ 1084// } else { 1085// s = emptyslave; 1086// } 1087// } 1088// 1089// // Assign slave this PID 1090// int j; 1091// for(j = 0; j < MAX_PIDS; j++) { 1092// if(s->pid[j] == 0 || s->pid[j] == te->pid) { 1093// break; 1094// } 1095// } 1096// assert(j < MAX_PIDS); 1097// s->pid[j] = te->pid; 1098// 1099// /* if(i == num_slaves) { */ 1100// /* printf("found empty slave\n"); */ 1101// /* } else { */ 1102// /* printf("found slave %d, PID %d\n", i, s->pid); */ 1103// /* } */ 1104// 1105// /* if(te->fline >= 41352 && te->fline <= 41365) { */ 1106// /* printf("%d: %d, %zu, %d, %d, %d to slave %d, fline %d\n", */ 1107// /* cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, i, te->fline); */ 1108// /* } */ 1109// 1110//#if 1 1111// if(te->op == TOP_Exit) { 1112// printf("exit %u\n", te->pid); 1113// // See if it was a writer and remove 1114// for(struct writer *w = writers; w != NULL; w = w->next) { 1115// assert(te != NULL); 1116// assert(w != NULL); 1117// if(w->pid == te->pid) { 1118// assert(w != NULL); 1119// if(w != writers) { 1120// assert(w != NULL); 1121// assert(w->prev != NULL); 1122// w->prev->next = w->next; 1123// } else { 1124// writers = w->next; 1125// } 1126// free(w); 1127// break; 1128// } 1129// } 1130// } 1131//#endif 1132// 1133// // If someone opens a file, we have to make sure 1134// // that anyone else has stopped writing to that file. 1135// if(te->op == TOP_Open || te->op == TOP_Create) { 1136// /* for(;;) { */ 1137// if(printall) { 1138// printf("find writer\n"); 1139// } 1140// 1141// struct writer *w; 1142// for(w = writers; w != NULL; w = w->next) { 1143// assert(w != NULL); 1144// assert(te != NULL); 1145// if(w->fnum == te->u.fnum) { 1146// // Somebody's writing to this file -- wait for him to finish 1147// /* printf("Warning: Concurrent file writer, fline = %d, fnum = %zu\n", */ 1148// /* te->fline, te->u.fnum); */ 1149// /* assert(!"NYI"); */ 1150// break; 1151// } 1152// } 1153// 1154//#if 0 1155// // There's a writer -- wait for it to finish 1156// if(w != NULL) { 1157// printf("Waiting for close from previous writer\n"); 1158// err = event_dispatch(get_default_waitset()); 1159// assert(err_is_ok(err)); 1160// } else { 1161// break; 1162// } 1163//#endif 1164// } 1165// 1166// // Add a new writer to the list 1167// if(te->mode != FLAGS_RdOnly) { 1168// struct writer *w = malloc(sizeof(struct writer)); 1169// 1170// /* printf("new writer to file %zu\n", te->u.fnum); */ 1171// 1172// // printall = true; 1173// 1174// w->fnum = te->u.fnum; 1175// w->pid = te->pid; 1176// w->slave = s; 1177// w->prev = NULL; 1178// w->next = writers; 1179// if(writers) { 1180// w->next->prev = w; 1181// } 1182// writers = w; 1183// } 1184// /* } */ 1185// 1186// 1187// if(printall) { 1188// printf("sending\n"); 1189// } 1190// 1191// assert(s != NULL); 1192// if(s->queue == NULL) { 1193//#ifndef __linux__ 1194// BARELLFISH -> send request 1195//#else 1196// if(printall) { 1197// printf("send_buf 1\n"); 1198// } 1199// ssize_t r = send_buf(s, &er); 1200// if(printall) { 1201// printf("after send_buf 1\n"); 1202// } 1203// /* ssize_t r = send(s->socket, &er, sizeof(er), MSG_DONTWAIT); */ 1204// if(r == -1) { 1205// if(errno == EAGAIN) { 1206// if(printall) { 1207// printf("queueing\n"); 1208// } 1209// /* printf("queueing\n"); */ 1210// struct qelem *q = malloc(sizeof(struct qelem)); 1211// assert(q != NULL); 1212// q->er = er; 1213// q->next = s->queue; 1214// if(s->queue != NULL) { 1215// s->queue->prev = q; 1216// } else { 1217// assert(s->qend == NULL); 1218// } 1219// q->prev = NULL; 1220// s->queue = q; 1221// if(s->qend == NULL) { 1222// s->qend = q; 1223// } 1224// } else { 1225// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1226// abort(); 1227// } 1228// } else { 1229// if(r != sizeof(er)) { 1230// printf("send_message: r == %zd, size = %zu\n", r, sizeof(er)); 1231// } 1232// assert(r == sizeof(er)); 1233// } 1234//#endif 1235// } else { 1236// // Put on slave's queue 1237// if(printall) { 1238// printf("queueing\n"); 1239// } 1240// /* printf("queueing\n"); */ 1241// struct qelem *q = malloc(sizeof(struct qelem)); 1242// assert(q != NULL); 1243// q->er = er; 1244// q->next = s->queue; 1245// if(s->queue != NULL) { 1246// s->queue->prev = q; 1247// } else { 1248// assert(s->qend == NULL); 1249// } 1250// q->prev = NULL; 1251// s->queue = q; 1252// if(s->qend == NULL) { 1253// s->qend = q; 1254// } 1255// } 1256// 1257// if(printall) { 1258// printf("resending\n"); 1259// } 1260// 1261// // Resend items that got queued 1262// for(i = 0; i < num_slaves; i++) { 1263// s = &slaves[i]; 1264// for(struct qelem *q = s->qend; q != NULL;) { 1265// // Need to keep pumping and dispatch at least one event 1266// 1267//#ifndef __linux__ 1268// err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er); 1269// if(err_is_ok(err)) { 1270// if(printall) { 1271// printf("resent %d\n", q->er.fline); 1272// } 1273// struct qelem *oldq = q; 1274// s->qend = q = q->prev; 1275// free(oldq); 1276// if(s->qend == NULL) { 1277// s->queue = NULL; 1278// } 1279// } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) { 1280// DEBUG_ERR(err, "error"); 1281// abort(); 1282// } else { 1283// // still busy, can't dequeue anything 1284// /* printf("busy2\n"); */ 1285// err = event_dispatch(get_default_waitset()); 1286// assert(err_is_ok(err)); 1287// break; 1288// /* printf("still busy\n"); */ 1289// /* qend = q = q->prev; */ 1290// } 1291//#else 1292// if(printall) { 1293// printf("send_buf 2\n"); 1294// } 1295// ssize_t r = send_buf(s, &q->er); 1296// if(printall) { 1297// printf("after send_buf 2\n"); 1298// } 1299// /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */ 1300// if(r == -1) { 1301// if(errno == EAGAIN) { 1302// break; 1303// } else { 1304// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1305// abort(); 1306// } 1307// } else { 1308// if(r != sizeof(er)) { 1309// printf("send_message: r == %zd, size = %zu\n", r, sizeof(er)); 1310// } 1311// assert(r == sizeof(er)); 1312// struct qelem *oldq = q; 1313// s->qend = q = q->prev; 1314// free(oldq); 1315// if(s->qend == NULL) { 1316// s->queue = NULL; 1317// } 1318// } 1319//#endif 1320// } 1321// } 1322// } 1323// 1324// printf("draining\n"); 1325// 1326// // Drain the queue 1327// for(int i = 0; i < num_slaves; i++) { 1328// struct slave *s = &slaves[i]; 1329// for(struct qelem *q = s->qend; q != NULL;) { 1330//#ifndef __linux__ 1331// err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er); 1332// if(err_is_ok(err)) { 1333// /* printf("resent %d\n", q->er.fline); */ 1334// struct qelem *oldq = q; 1335// s->qend = q = q->prev; 1336// free(oldq); 1337// if(s->qend == NULL) { 1338// s->queue = NULL; 1339// } 1340// } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) { 1341// DEBUG_ERR(err, "error"); 1342// abort(); 1343// } else { 1344// // still busy, can't dequeue anything 1345// break; 1346// /* printf("still busy\n"); */ 1347// /* qend = q = q->prev; */ 1348// } 1349//#else 1350// ssize_t r = send_buf(s, &q->er); 1351// /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */ 1352// if(r == -1) { 1353// if(errno == EAGAIN) { 1354// break; 1355// } else { 1356// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1357// abort(); 1358// } 1359// } else { 1360// if(r != sizeof(q->er)) { 1361// printf("send_message: r == %zd, size = %zu\n", r, sizeof(q->er)); 1362// } 1363// assert(r == sizeof(q->er)); 1364// struct qelem *oldq = q; 1365// s->qend = q = q->prev; 1366// free(oldq); 1367// if(s->qend == NULL) { 1368// s->queue = NULL; 1369// } 1370// } 1371//#endif 1372// } 1373// } 1374// 1375// for(int i = 0; i < num_slaves; i++) { 1376// struct slave *s = &slaves[i]; 1377// replay_eventrec_t er = { 1378// .op = TOP_End 1379// }; 1380//#ifndef __linux__ 1381// err = s->b->tx_vtbl.event(s->b, NOP_CONT, er); 1382// assert(err_is_ok(err)); 1383//#else 1384// ssize_t r = send_buf(s, &er); 1385// if(r == -1) { 1386// if(errno == EAGAIN) { 1387// printf("buffer full\n"); 1388// abort(); 1389// } else { 1390// printf("send_message to %d: %s\n", s->num, strerror(errno)); 1391// abort(); 1392// } 1393// } 1394//#endif 1395// } 1396// 1397// do { 1398// err = event_dispatch(get_default_waitset()); 1399// assert(err_is_ok(err)); 1400// } while(num_finished < num_slaves); 1401// 1402// uint64_t end = rdtsc(); 1403// 1404//#if 0 1405// // Wait for 5 seconds 1406// uint64_t beg = rdtsc(); 1407// while(rdtsc() - beg < tscperms * 5000) { 1408//#ifndef __linux__ 1409// thread_yield(); 1410//#else 1411// sched_yield(); 1412//#endif 1413// } 1414//#endif 1415// 1416// printf("replay done, took %" PRIu64" ms\n", (end - start) / tscperms); 1417// 1418// 1419