1#include <stdio.h> 2#include <stdlib.h> 3#include <assert.h> 4#include <string.h> 5#include <unistd.h> 6#include <errno.h> 7#include <sys/stat.h> 8#ifndef __linux__ 9#include <barrelfish/barrelfish.h> 10#include <vfs/vfs.h> 11#include <barrelfish/nameservice_client.h> 12#include <if/replay_defs.h> 13#include <barrelfish/bulk_transfer.h> 14#else 15#include <stdbool.h> 16#include <sys/types.h> 17#include <fcntl.h> 18#include <sys/socket.h> 19#include <netinet/in.h> 20#include <netinet/ip.h> 21#include <netdb.h> 22#include <errno.h> 23#include <sched.h> 24#include <inttypes.h> 25#include <sys/time.h> 26#include <sys/resource.h> 27#endif 28 29#include "defs.h" 30 31#ifndef MIN 32#define MIN(x,y) (x < y ? x : y) 33#endif 34 35static char *defdir; 36static struct { 37 uint64_t op_ticks[TOPs_Total]; 38 uint64_t op_count[TOPs_Total]; 39 uint64_t total_ticks; 40} Stats = {{0}, {0}, 0}; 41 42//static uint64_t total_ticks=0, open_ticks=0, read_ticks=0, unlink_ticks=0; 43 44#ifndef __linux__ 45static void export_cb(void *st, errval_t err, iref_t iref) 46{ 47 assert(err_is_ok(err)); 48 char name[256]; 49 snprintf(name, 256, "replay_slave.%u", disp_get_core_id()); 50 msg("%s:%s() :: === registering %s\n", __FILE__, __FUNCTION__, name); 51 err = nameservice_register(name, iref); 52 assert(err_is_ok(err)); 53} 54#else 55static int connsock = -1; 56#endif 57 58#define MAX_FD_CONV 256 59#define MAX_DATA (2*1024*1024) 60#define WBUF_SIZE (16*1024*1024) 61 62struct wbuf; 63 64struct wbuf_entry { 65 struct wbuf *next, *prev; 66}; 67 68typedef struct wbuf { 69 int w_fd; 70 size_t w_i; 71 unsigned char w_buf[WBUF_SIZE]; 72 struct wbuf *next, *prev; 73} wbuf_t; 74 75static struct { 76 struct wbuf *head; 77 int cnt; 78} Wbufs = { 79 .head= NULL, 80 .cnt = 0 81}; 82 83static void 84wb_add(wbuf_t *wb) 85{ 86 if (Wbufs.head == NULL) { 87 assert(Wbufs.cnt == 0); 88 Wbufs.head = wb->next = wb->prev = wb; 89 } else { 90 struct wbuf *head = Wbufs.head; 91 wb->next = head; 92 wb->prev = head->prev; 93 head->prev->next = wb; 94 head->prev = wb; 95 } 96 Wbufs.cnt++; 97} 98 99static void 100wb_remove(wbuf_t *wb) 101{ 102 if (Wbufs.cnt == 1) { 103 Wbufs.head = NULL; 104 } else { 105 wb->next->prev = wb->prev; 106 wb->prev->next = wb->next; 107 } 108 Wbufs.cnt--; 109} 110 111static int openfiles = 0, read_fails=0, write_fails=0, seek_fails=0; 112static int tfd2fd[MAX_FD_CONV] = {0}; /* trace fd -> fd */ 113static int tfd2fname[MAX_FD_CONV] = {0}; /* trace fd -> name */ 114wbuf_t *tfd2wb[MAX_FD_CONV] = {0}; /* trace fd -> write buffer */ 115static char data[MAX_DATA]; 116 117#ifndef __linux__ 118static struct bulk_transfer_slave bulk_slave; 119static uint64_t tscperms; 120#endif 121 122#ifdef __linux__ 123static int 124disp_get_core_id(void) 125{ 126 return getpid(); 127} 128static inline uint64_t rdtsc(void) 129{ 130 uint32_t eax, edx; 131 __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx)); 132 return ((uint64_t)edx << 32) | eax; 133} 134#endif 135 136#ifndef __linux__ 137static void handle_init(struct replay_binding *b, struct capref shared_mem, uint32_t size) 138{ 139 errval_t err; 140 vregion_flags_t vspace_fl; 141 142 vspace_fl = VREGION_FLAGS_READ_WRITE; 143 144 // Map the frame in local memory 145 void *pool; 146 err = vspace_map_one_frame_attr(&pool, size, shared_mem, vspace_fl, NULL, NULL); 147 assert(pool != NULL); 148 assert(err_is_ok(err)); 149 150 // Init receiver 151 err = bulk_slave_init(pool, size, &bulk_slave); 152 assert(err_is_ok(err)); 153 msg("%s:%s: done\n", __FILE__, __FUNCTION__); 154 155 err = b->tx_vtbl.slave_init_reply(b, NOP_CONT); 156 assert(err_is_ok(err)); 157} 158 159static void handle_finish(struct replay_binding *b) 160{ 161 errval_t err; 162 err = b->tx_vtbl.slave_finish_reply(b, NOP_CONT); 163 assert(err_is_ok(err)); 164} 165 166static void handle_print_stats(struct replay_binding *b) 167{ 168 errval_t err; 169 msg("SLAVE[%u]: END took %" PRIu64 " ticks (%lf ms)\n", disp_get_core_id(), Stats.total_ticks, (double)Stats.total_ticks/(double)tscperms); 170 for (int i=0; i<TOPs_Total; i++) { 171 uint64_t op_cnt = Stats.op_count[i]; 172 double op_time = (double)Stats.op_ticks[i]/(double)tscperms; 173 msg(" op:%-10s cnt:%8" PRIu64 " time:%13.2lf avg:%9.3lf\n", top2str[i], op_cnt, op_time, op_time/(double)op_cnt); 174 } 175 msg("SLAVE[%u]: CACHE STATISTICS\n", disp_get_core_id()); 176 err = b->tx_vtbl.slave_print_stats_reply(b, NOP_CONT); 177 assert(err_is_ok(err)); 178} 179#endif 180 181static void 182do_handle_event(replay_eventrec_t *er) 183{ 184 static int pid = 0; 185 static int op_id = 0; 186 const int open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 187 188 /* pick a file for this operation */ 189 // (only needed for Open/Create/Unlink) 190 char fname[256]; 191 snprintf(fname, 256, "%s/data/%"PRIu32"", defdir, er->fnumsize); 192 193 194 op_id++; 195 enum top op = er->op; 196 dmsg("SLAVE[%u]: REQ pid:%d op:%d fd:%d fnumsize:%d [op_id:%d] er:%p\n", disp_get_core_id(), er->pid, op, er->fd, er->fnumsize, op_id, er); 197 198 /* - master will send consecutive operations with the same pid 199 * - the pid will change after an TOP_Exit, and a subsequent Open/Create */ 200 if (pid == 0) { 201 assert(er->op == TOP_Open || er->op == TOP_Create); 202 pid = er->pid; 203 dmsg("SLAVE[%u]: got new pid:%d\n", disp_get_core_id(), pid); 204 op_id = 0; 205 } else if (er->pid != pid) { 206 printf("REQ does not match current pid (%u). Aborting\n", pid); 207 assert(0); 208 } else if (op == TOP_Exit) { 209 pid = 0; 210 } 211 212 int open_flags = 0; 213 214 switch(op) { 215 case TOP_Create: 216 open_flags = O_CREAT; 217 /* fallthrough */ 218 case TOP_Open: 219 /* set flags */ 220 switch(er->mode) { 221 case FLAGS_RdOnly: 222 open_flags |= O_RDONLY; 223 break; 224 225 case FLAGS_WrOnly: 226 open_flags |= O_WRONLY; 227 break; 228 229 case FLAGS_RdWr: 230 open_flags |= O_RDWR; 231 break; 232 } 233 234 /* allocate a write buffer */ 235 if (open_flags != O_RDONLY) { 236 wbuf_t *wb = tfd2wb[er->fd] = malloc(sizeof(wbuf_t)); 237 assert(wb); 238 wb->w_i = 0; 239 wb->w_fd = er->fd; 240 wb_add(wb); 241 } 242 243 /* assert(fd2fptr[er->fd] == NULL); 244 * the above assertion will fail, because some close() operations are 245 * missing from the file: 246 * $ egrep -c -e 'close' <kernelcompile.trace.anon 247 * 10779 248 * $ egrep -c -e '(open|creat)' <kernelcompile.trace.anon 249 * 10974 */ 250 251 /* open the file */ 252 tfd2fd[er->fd] = open(fname, open_flags, open_mode); 253 tfd2fname[er->fd] = er->fnumsize; 254 if (tfd2fd[er->fd] != -1) { 255 openfiles++; 256 } else { 257 printf("Open file:%s (%d) failed\n", fname, open_flags); 258 perror(fname); 259 assert(0); 260 } 261 break; 262 263 case TOP_Unlink: { 264 int ret = unlink(fname); 265 assert(ret != -1); 266 break; 267 } 268 269 case TOP_Read: { 270 //uint64_t ticks = rdtsc(); 271 if (er->fnumsize > MAX_DATA) { 272 printf("er->fnumsize == %"PRIu32"\n", er->fnumsize); 273 assert(0); 274 } 275 int fd = tfd2fd[er->fd]; 276 int ret = read(fd, data, er->fnumsize); 277 if (ret != er->fnumsize) { 278 dmsg("[R] op_id:%d er->fnumsize:%u, read:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr)); 279 read_fails++; 280 } 281 //ticks = rdtsc() - ticks; 282 //msg("SLAVE[%d] READ %d took %lu ticks (%lf ms)\n", disp_get_core_id(), rdcnt++, ticks, (double)ticks/(double)tscperms); 283 break; 284 } 285 286 case TOP_Write: { 287 if (er->fnumsize > MAX_DATA) { 288 printf("er->fnumsize == %"PRIu32"\n", er->fnumsize); 289 assert(0); 290 } 291 wbuf_t *wb = tfd2wb[er->fd]; 292 int fd = tfd2fd[er->fd]; 293 assert(wb); 294 size_t rem_bytes = er->fnumsize; /* remaining bytes to write */ 295 for (;;) { 296 size_t buff_bytes = WBUF_SIZE - wb->w_i; /* wbuf available bytes */ 297 if (buff_bytes > rem_bytes) { 298 memcpy(wb->w_buf + wb->w_i, data, rem_bytes); 299 wb->w_i += rem_bytes; 300 break; 301 } else { 302 memcpy(wb->w_buf + wb->w_i, data, buff_bytes); 303 int ret = write(fd, wb->w_buf, WBUF_SIZE); 304 if (ret != WBUF_SIZE) { 305 write_fails++; 306 } 307 rem_bytes -= buff_bytes; 308 wb->w_i = 0; 309 } 310 } 311 break; 312 } 313 314 case TOP_Seek: { 315 int fd = tfd2fd[er->fd]; 316 int ret = lseek(fd, er->fnumsize, SEEK_SET); 317 if (ret != er->fnumsize) { 318 seek_fails++; 319 //msg("[S] op_id:%d er->fnumsize:%u, seek:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr)); 320 } 321 break; 322 } 323 324 case TOP_Close: { 325 wbuf_t *wb = tfd2wb[er->fd]; 326 int fd = tfd2fd[er->fd]; 327 if (wb != NULL) { 328 /* flush write buffer */ 329 int r = write(fd, wb->w_buf, wb->w_i); 330 if (r != wb->w_i) { 331 write_fails++; 332 } 333 wb_remove(wb); 334 free(wb); 335 tfd2wb[er->fd] = NULL; 336 } 337 int ret = close(fd); 338 assert(ret == 0); 339 openfiles--; 340 tfd2fd[er->fd] = 0; 341 tfd2fname[er->fd] = 0; 342 break; 343 } 344 345 case TOP_Exit: { 346 wbuf_t *wb; 347 wb = Wbufs.head; 348 for (int i=0; i<Wbufs.cnt; i++) { 349 wbuf_t *wb_tmp = wb->next; 350 int fd = tfd2fd[wb->w_fd]; 351 int r = write(fd, wb->w_buf, wb->w_i); 352 if (r != wb->w_i) { 353 write_fails++; 354 } 355 tfd2wb[wb->w_fd] = 0; 356 free(wb); 357 wb = wb_tmp; 358 } 359 Wbufs.cnt = 0; 360 Wbufs.head = NULL; 361 dmsg("SLAVE[%u]: TOP_Exit on %d\n", disp_get_core_id(), er->pid); 362 break; 363 } 364 365 default: 366 printf("Invalid request: %d\n", op); 367 assert(0); 368 break; 369 } 370} 371 372static void 373handle_event(replay_eventrec_t *er) 374{ 375 uint64_t handle_ticks = rdtsc(); 376 enum top op = er->op; 377 378 do_handle_event(er); 379 380 /* update stats */ 381 handle_ticks = (rdtsc() - handle_ticks); 382 Stats.total_ticks += handle_ticks; 383 Stats.op_count[op]++; 384 Stats.op_ticks[op] += handle_ticks; 385} 386#ifndef __linux__ 387 388static void handle_new_task(struct replay_binding *b, uint64_t bulk_id, uint32_t tes_size) 389{ 390 errval_t err; 391 replay_eventrec_t *tes; 392 size_t tes_nr; 393 int pid; 394 395 tes = bulk_slave_buf_get_mem(&bulk_slave, bulk_id, NULL); 396 tes_nr = tes_size / sizeof(replay_eventrec_t); 397 assert(tes_size % sizeof(replay_eventrec_t) == 0); 398 399 if (tes->op != TOP_Open && tes->op != TOP_Create) { 400 msg("ABORTING: task with pid:%d starts with op:%d\n", tes->pid, tes->op); 401 assert(false); 402 } 403 pid = tes->pid; 404 for (size_t i=0; i<tes_nr; i++) { 405 replay_eventrec_t *er = tes + i; 406 handle_event(er); 407 } 408 409 err = b->tx_vtbl.task_completed(b, NOP_CONT, pid, bulk_id); 410 assert(err_is_ok(err)); 411} 412 413 414static struct replay_rx_vtbl replay_vtbl = { 415 .new_task = handle_new_task, 416 .slave_init = handle_init, 417 .slave_finish = handle_finish, 418 .slave_print_stats = handle_print_stats 419}; 420 421static errval_t connect_cb(void *st, struct replay_binding *b) 422{ 423 b->rx_vtbl = replay_vtbl; 424 return SYS_ERR_OK; 425} 426#endif 427 428int main(int argc, char *argv[]) 429{ 430#ifndef __linux__ 431 assert(argc >= 4); 432#else 433 const int default_port = 1234; 434 if (argc < 2) { 435 fprintf(stderr, "Usage: %s <data_dir> [port (default:%d)]\n", argv[0], default_port); 436 exit(1); 437 } 438#endif 439 defdir = argv[1]; 440 441 msg("===================> replay slave up\n"); 442#ifndef __linux__ 443 assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms))); 444 445 errval_t err = vfs_mkdir(argv[2]); 446 if(err_is_fail(err) && err_no(err) != FS_ERR_EXISTS) { 447 DEBUG_ERR(err, "vfs_mkdir"); 448 } 449 /* assert(err_is_ok(err)); */ 450 451 err = vfs_mount(argv[2], argv[3]); 452 assert(err_is_ok(err)); 453 454 err = replay_export(NULL, export_cb, connect_cb, 455 get_default_waitset(), 456 IDC_EXPORT_FLAGS_DEFAULT); 457 assert(err_is_ok(err)); 458 459 msg("%s:%s() :: slave starts servicing requests\n", __FILE__, __FUNCTION__); 460 for(;;) { 461 err = event_dispatch(get_default_waitset()); 462 assert(err_is_ok(err)); 463 } 464#else 465 /* { */ 466 /* struct rlimit rl; */ 467 /* rl.rlim_cur = 2048; */ 468 /* rl.rlim_max = 2050; */ 469 /* int r = setrlimit(RLIMIT_NOFILE, &rl); */ 470 /* if(r == -1) { */ 471 /* printf("setrlimit errno = %s\n", strerror(errno)); */ 472 /* } */ 473 /* assert(r == 0); */ 474 /* } */ 475 476 int port = argc > 2 ? atoi(argv[2]) : default_port; 477 // Listen on port 1234 478 int listensock = socket(AF_INET, SOCK_STREAM, 0); 479 assert(listensock != -1); 480 struct sockaddr_in a = { 481 .sin_family = PF_INET, 482 .sin_port = htons(port), 483 .sin_addr = { 484 .s_addr = htonl(INADDR_ANY) 485 } 486 }; 487 int r = bind(listensock, (struct sockaddr *)&a, sizeof(a)); 488 if(r == -1) { 489 printf("bind: %s\n", strerror(errno)); 490 } 491 assert(r == 0); 492 r = listen(listensock, 5); 493 assert(r == 0); 494 495 socklen_t sizea = sizeof(a); 496 connsock = accept(listensock, (struct sockaddr *)&a, &sizea); 497 assert(connsock != -1); 498 assert(sizea == sizeof(a)); 499 500 int from = (ntohl(a.sin_addr.s_addr) & 0xff) - 1; 501 printf("got connection from %d\n", from); 502 503 504 /* circular buffer: 505 * - writer writes at byte grain 506 * - reader reads at sizeof(replay_eventrec_t) grain */ 507 const size_t er_size_elems = 1024; /* size in elements */ 508 replay_eventrec_t ers[er_size_elems]; 509 const size_t er_size = sizeof(ers); /* size in bytes */ 510 uint64_t er_r, er_w; /* full indices (in bytes) */ 511 512 er_r = er_w = 0; 513 514 for(;;) { 515 516 dmsg("r:%zd w:%zd er_size:%zd\n", er_r, er_w, er_size); 517 size_t er_avail = er_size - (er_w - er_r); 518 size_t er_w_idx = er_w % er_size; 519 char *xfer_start = (char *)ers + er_w_idx; 520 size_t xfer_len = MIN(er_avail, er_size - er_w_idx); 521 522 if (xfer_len == 0) { 523 continue; 524 } 525 526 /* fetch events */ 527 dmsg("RECV: from:%lu len:%zd\n", xfer_start - (char *)ers, xfer_len); 528 ssize_t ret = recv(connsock, xfer_start, xfer_len, 0); 529 if(ret == -1) { 530 perror("recv"); 531 exit(1); 532 } else if (ret == 0) { 533 printf("end of session\n"); 534 break; 535 } 536 dmsg("GOT DATA=%zd!\n", ret); 537 er_w += ret; 538 539 /* handle events */ 540 assert(er_r % sizeof(replay_eventrec_t) == 0); 541 assert(er_w > er_r); 542 while (er_w - er_r >= sizeof(replay_eventrec_t)) { 543 size_t er_r_items = er_r / sizeof(replay_eventrec_t); 544 replay_eventrec_t *er = ers + (er_r_items % er_size_elems); 545 546 handle_event(er); 547 548 // notify server that we are done with a task 549 if (er->op == TOP_Exit) { 550 uint16_t pid = er->pid; 551 dmsg("SENDING PID: %d\n", pid); 552 if (send(connsock, &pid, sizeof(pid), 0) != sizeof(pid)) { 553 perror("send"); 554 exit(1); 555 } 556 } 557 558 // increase read pointer 559 er_r += sizeof(replay_eventrec_t); 560 } 561 } 562#endif 563 564 return 0; 565} 566