1 2#define _POSIX_C_SOURCE 200809L 3 4 5#include <stdio.h> 6#include <stdlib.h> 7#include <assert.h> 8#include <string.h> 9#include <unistd.h> 10#include <errno.h> 11#include <fcntl.h> 12#include <sys/stat.h> 13#ifndef __linux__ 14#include <barrelfish/barrelfish.h> 15#include <vfs/vfs.h> 16#include <barrelfish/nameservice_client.h> 17#include <if/replay_defs.h> 18#include <barrelfish/bulk_transfer.h> 19#else 20#include <stdbool.h> 21#include <sys/types.h> 22#include <sys/socket.h> 23#include <netinet/in.h> 24#include <netinet/ip.h> 25#include <netdb.h> 26#include <errno.h> 27#include <sched.h> 28#include <inttypes.h> 29#include <sys/time.h> 30#include <sys/resource.h> 31#endif 32 33#include "defs.h" 34 35#ifndef MIN 36#define MIN(x,y) (x < y ? x : y) 37#endif 38 39static char *defdir; 40static struct { 41 uint64_t op_ticks[TOPs_Total]; 42 uint64_t op_count[TOPs_Total]; 43 uint64_t total_ticks; 44} Stats = {{0}, {0}, 0}; 45 46//static uint64_t total_ticks=0, open_ticks=0, read_ticks=0, unlink_ticks=0; 47 48#ifndef __linux__ 49static void export_cb(void *st, errval_t err, iref_t iref) 50{ 51 assert(err_is_ok(err)); 52 char name[256]; 53 snprintf(name, 256, "replay_slave.%u", disp_get_core_id()); 54 msg("%s:%s() :: === registering %s\n", __FILE__, __FUNCTION__, name); 55 err = nameservice_register(name, iref); 56 assert(err_is_ok(err)); 57} 58#else 59static int connsock = -1; 60#endif 61 62#define MAX_FD_CONV 256 63#define MAX_DATA (2*1024*1024) 64#define WBUF_SIZE (16*1024*1024) 65 66struct wbuf; 67 68struct wbuf_entry { 69 struct wbuf *next, *prev; 70}; 71 72typedef struct wbuf { 73 int w_fd; 74 size_t w_i; 75 unsigned char w_buf[WBUF_SIZE]; 76 struct wbuf *next, *prev; 77} wbuf_t; 78 79static struct { 80 struct wbuf *head; 81 int cnt; 82} Wbufs = { 83 .head= NULL, 84 .cnt = 0 85}; 86 87static void 88wb_add(wbuf_t *wb) 89{ 90 if (Wbufs.head == NULL) { 91 assert(Wbufs.cnt == 0); 92 Wbufs.head = wb->next = wb->prev = wb; 93 } else { 94 struct wbuf *head = Wbufs.head; 95 wb->next = head; 96 wb->prev = head->prev; 97 head->prev->next = wb; 98 head->prev = wb; 99 } 100 Wbufs.cnt++; 101} 102 103static void 104wb_remove(wbuf_t *wb) 105{ 106 if (Wbufs.cnt == 1) { 107 Wbufs.head = NULL; 108 } else { 109 wb->next->prev = wb->prev; 110 wb->prev->next = wb->next; 111 } 112 Wbufs.cnt--; 113} 114 115static int openfiles = 0, read_fails=0, write_fails=0, seek_fails=0; 116static int tfd2fd[MAX_FD_CONV] = {0}; /* trace fd -> fd */ 117static int tfd2fname[MAX_FD_CONV] = {0}; /* trace fd -> name */ 118wbuf_t *tfd2wb[MAX_FD_CONV] = {0}; /* trace fd -> write buffer */ 119static char data[MAX_DATA]; 120 121#ifndef __linux__ 122static struct bulk_transfer_slave bulk_slave; 123static cycles_t tscperms; 124#endif 125 126#ifdef __linux__ 127static int 128disp_get_core_id(void) 129{ 130 return getpid(); 131} 132static inline uint64_t rdtsc(void) 133{ 134 uint32_t eax, edx; 135 __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx)); 136 return ((uint64_t)edx << 32) | eax; 137} 138#endif 139 140#ifndef __linux__ 141static void handle_init(struct replay_binding *b, struct capref shared_mem, uint32_t size) 142{ 143 errval_t err; 144 vregion_flags_t vspace_fl; 145 146 vspace_fl = VREGION_FLAGS_READ_WRITE; 147 148 // Map the frame in local memory 149 void *pool; 150 err = vspace_map_one_frame_attr(&pool, size, shared_mem, vspace_fl, NULL, NULL); 151 assert(pool != NULL); 152 assert(err_is_ok(err)); 153 154 // Init receiver 155 err = bulk_slave_init(pool, size, &bulk_slave); 156 assert(err_is_ok(err)); 157 msg("%s:%s: done\n", __FILE__, __FUNCTION__); 158 159 err = b->tx_vtbl.slave_init_reply(b, NOP_CONT); 160 assert(err_is_ok(err)); 161} 162 163static void handle_finish(struct replay_binding *b) 164{ 165 errval_t err; 166 err = b->tx_vtbl.slave_finish_reply(b, NOP_CONT); 167 assert(err_is_ok(err)); 168} 169 170static void handle_print_stats(struct replay_binding *b) 171{ 172 errval_t err; 173 msg("SLAVE[%u]: END took %" PRIu64 " ticks (%lf ms)\n", disp_get_core_id(), Stats.total_ticks, (double)Stats.total_ticks/(double)tscperms); 174 for (int i=0; i<TOPs_Total; i++) { 175 uint64_t op_cnt = Stats.op_count[i]; 176 double op_time = (double)Stats.op_ticks[i]/(double)tscperms; 177 msg(" op:%-10s cnt:%8" PRIu64 " time:%13.2lf avg:%9.3lf\n", top2str[i], op_cnt, op_time, op_time/(double)op_cnt); 178 } 179 msg("SLAVE[%u]: CACHE STATISTICS\n", disp_get_core_id()); 180 err = b->tx_vtbl.slave_print_stats_reply(b, NOP_CONT); 181 assert(err_is_ok(err)); 182} 183#endif 184 185static void 186do_handle_event(replay_eventrec_t *er) 187{ 188 static int pid = 0; 189 static int op_id = 0; 190 const int open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 191 192 /* pick a file for this operation */ 193 // (only needed for Open/Create/Unlink) 194 char fname[256]; 195 snprintf(fname, 256, "%s/data/%"PRIu32"", defdir, er->fnumsize); 196 197 198 op_id++; 199 enum top op = er->op; 200 dmsg("SLAVE[%u]: REQ pid:%d op:%d fd:%d fnumsize:%d [op_id:%d] er:%p\n", disp_get_core_id(), er->pid, op, er->fd, er->fnumsize, op_id, er); 201 202 /* - master will send consecutive operations with the same pid 203 * - the pid will change after an TOP_Exit, and a subsequent Open/Create */ 204 if (pid == 0) { 205 assert(er->op == TOP_Open || er->op == TOP_Create); 206 pid = er->pid; 207 dmsg("SLAVE[%u]: got new pid:%d\n", disp_get_core_id(), pid); 208 op_id = 0; 209 } else if (er->pid != pid) { 210 printf("REQ does not match current pid (%u). Aborting\n", pid); 211 assert(0); 212 } else if (op == TOP_Exit) { 213 pid = 0; 214 } 215 216 int open_flags = 0; 217 218 switch(op) { 219 case TOP_Create: 220 open_flags = O_CREAT; 221 /* fallthrough */ 222 case TOP_Open: 223 /* set flags */ 224 switch(er->mode) { 225 case FLAGS_RdOnly: 226 open_flags |= O_RDONLY; 227 break; 228 229 case FLAGS_WrOnly: 230 open_flags |= O_WRONLY; 231 break; 232 233 case FLAGS_RdWr: 234 open_flags |= O_RDWR; 235 break; 236 } 237 238 /* allocate a write buffer */ 239 if (open_flags != O_RDONLY) { 240 wbuf_t *wb = tfd2wb[er->fd] = malloc(sizeof(wbuf_t)); 241 assert(wb); 242 wb->w_i = 0; 243 wb->w_fd = er->fd; 244 wb_add(wb); 245 } 246 247 /* assert(fd2fptr[er->fd] == NULL); 248 * the above assertion will fail, because some close() operations are 249 * missing from the file: 250 * $ egrep -c -e 'close' <kernelcompile.trace.anon 251 * 10779 252 * $ egrep -c -e '(open|creat)' <kernelcompile.trace.anon 253 * 10974 */ 254 255 /* open the file */ 256 tfd2fd[er->fd] = open(fname, open_flags, open_mode); 257 tfd2fname[er->fd] = er->fnumsize; 258 if (tfd2fd[er->fd] != -1) { 259 openfiles++; 260 } else { 261 printf("Open file:%s (%d) failed\n", fname, open_flags); 262 perror(fname); 263 assert(0); 264 } 265 break; 266 267 case TOP_Unlink: { 268 int ret = unlink(fname); 269 assert(ret != -1); 270 break; 271 } 272 273 case TOP_Read: { 274 //uint64_t ticks = rdtsc(); 275 if (er->fnumsize > MAX_DATA) { 276 printf("er->fnumsize == %"PRIu32"\n", er->fnumsize); 277 assert(0); 278 } 279 int fd = tfd2fd[er->fd]; 280 int ret = read(fd, data, er->fnumsize); 281 if (ret != er->fnumsize) { 282 dmsg("[R] op_id:%d er->fnumsize:%u, read:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr)); 283 read_fails++; 284 } 285 //ticks = rdtsc() - ticks; 286 //msg("SLAVE[%d] READ %d took %lu ticks (%lf ms)\n", disp_get_core_id(), rdcnt++, ticks, (double)ticks/(double)tscperms); 287 break; 288 } 289 290 case TOP_Write: { 291 if (er->fnumsize > MAX_DATA) { 292 printf("er->fnumsize == %"PRIu32"\n", er->fnumsize); 293 assert(0); 294 } 295 wbuf_t *wb = tfd2wb[er->fd]; 296 int fd = tfd2fd[er->fd]; 297 assert(wb); 298 size_t rem_bytes = er->fnumsize; /* remaining bytes to write */ 299 for (;;) { 300 size_t buff_bytes = WBUF_SIZE - wb->w_i; /* wbuf available bytes */ 301 if (buff_bytes > rem_bytes) { 302 memcpy(wb->w_buf + wb->w_i, data, rem_bytes); 303 wb->w_i += rem_bytes; 304 break; 305 } else { 306 memcpy(wb->w_buf + wb->w_i, data, buff_bytes); 307 int ret = write(fd, wb->w_buf, WBUF_SIZE); 308 if (ret != WBUF_SIZE) { 309 write_fails++; 310 } 311 rem_bytes -= buff_bytes; 312 wb->w_i = 0; 313 } 314 } 315 break; 316 } 317 318 case TOP_Seek: { 319 int fd = tfd2fd[er->fd]; 320 int ret = lseek(fd, er->fnumsize, SEEK_SET); 321 if (ret != er->fnumsize) { 322 seek_fails++; 323 //msg("[S] op_id:%d er->fnumsize:%u, seek:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr)); 324 } 325 break; 326 } 327 328 case TOP_Close: { 329 wbuf_t *wb = tfd2wb[er->fd]; 330 int fd = tfd2fd[er->fd]; 331 if (wb != NULL) { 332 /* flush write buffer */ 333 int r = write(fd, wb->w_buf, wb->w_i); 334 if (r != wb->w_i) { 335 write_fails++; 336 } 337 wb_remove(wb); 338 free(wb); 339 tfd2wb[er->fd] = NULL; 340 } 341 int ret = close(fd); 342 assert(ret == 0); 343 openfiles--; 344 tfd2fd[er->fd] = 0; 345 tfd2fname[er->fd] = 0; 346 break; 347 } 348 349 case TOP_Exit: { 350 wbuf_t *wb; 351 wb = Wbufs.head; 352 for (int i=0; i<Wbufs.cnt; i++) { 353 wbuf_t *wb_tmp = wb->next; 354 int fd = tfd2fd[wb->w_fd]; 355 int r = write(fd, wb->w_buf, wb->w_i); 356 if (r != wb->w_i) { 357 write_fails++; 358 } 359 tfd2wb[wb->w_fd] = 0; 360 free(wb); 361 wb = wb_tmp; 362 } 363 Wbufs.cnt = 0; 364 Wbufs.head = NULL; 365 dmsg("SLAVE[%u]: TOP_Exit on %d\n", disp_get_core_id(), er->pid); 366 break; 367 } 368 369 default: 370 printf("Invalid request: %d\n", op); 371 assert(0); 372 break; 373 } 374} 375 376static void 377handle_event(replay_eventrec_t *er) 378{ 379 uint64_t handle_ticks = rdtsc(); 380 enum top op = er->op; 381 382 do_handle_event(er); 383 384 /* update stats */ 385 handle_ticks = (rdtsc() - handle_ticks); 386 Stats.total_ticks += handle_ticks; 387 Stats.op_count[op]++; 388 Stats.op_ticks[op] += handle_ticks; 389} 390#ifndef __linux__ 391 392static void handle_new_task(struct replay_binding *b, uint64_t bulk_id, uint32_t tes_size) 393{ 394 errval_t err; 395 replay_eventrec_t *tes; 396 size_t tes_nr; 397 int pid; 398 399 tes = bulk_slave_buf_get_mem(&bulk_slave, bulk_id, NULL); 400 tes_nr = tes_size / sizeof(replay_eventrec_t); 401 assert(tes_size % sizeof(replay_eventrec_t) == 0); 402 403 if (tes->op != TOP_Open && tes->op != TOP_Create) { 404 msg("ABORTING: task with pid:%d starts with op:%d\n", tes->pid, tes->op); 405 assert(false); 406 } 407 pid = tes->pid; 408 for (size_t i=0; i<tes_nr; i++) { 409 replay_eventrec_t *er = tes + i; 410 handle_event(er); 411 } 412 413 err = b->tx_vtbl.task_completed(b, NOP_CONT, pid, bulk_id); 414 assert(err_is_ok(err)); 415} 416 417 418static struct replay_rx_vtbl replay_vtbl = { 419 .new_task = handle_new_task, 420 .slave_init = handle_init, 421 .slave_finish = handle_finish, 422 .slave_print_stats = handle_print_stats 423}; 424 425static errval_t connect_cb(void *st, struct replay_binding *b) 426{ 427 b->rx_vtbl = replay_vtbl; 428 return SYS_ERR_OK; 429} 430#endif 431 432int main(int argc, char *argv[]) 433{ 434#ifndef __linux__ 435 assert(argc >= 4); 436#else 437 const int default_port = 1234; 438 if (argc < 2) { 439 fprintf(stderr, "Usage: %s <data_dir> [port (default:%d)]\n", argv[0], default_port); 440 exit(1); 441 } 442#endif 443 defdir = argv[1]; 444 445 msg("===================> replay slave up\n"); 446#ifndef __linux__ 447 assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms))); 448 449 errval_t err = vfs_mkdir(argv[2]); 450 if(err_is_fail(err) && err_no(err) != FS_ERR_EXISTS) { 451 DEBUG_ERR(err, "vfs_mkdir"); 452 } 453 /* assert(err_is_ok(err)); */ 454 455 err = vfs_mount(argv[2], argv[3]); 456 assert(err_is_ok(err)); 457 458 err = replay_export(NULL, export_cb, connect_cb, 459 get_default_waitset(), 460 IDC_EXPORT_FLAGS_DEFAULT); 461 assert(err_is_ok(err)); 462 463 msg("%s:%s() :: slave starts servicing requests\n", __FILE__, __FUNCTION__); 464 for(;;) { 465 err = event_dispatch(get_default_waitset()); 466 assert(err_is_ok(err)); 467 } 468#else 469 /* { */ 470 /* struct rlimit rl; */ 471 /* rl.rlim_cur = 2048; */ 472 /* rl.rlim_max = 2050; */ 473 /* int r = setrlimit(RLIMIT_NOFILE, &rl); */ 474 /* if(r == -1) { */ 475 /* printf("setrlimit errno = %s\n", strerror(errno)); */ 476 /* } */ 477 /* assert(r == 0); */ 478 /* } */ 479 480 int port = argc > 2 ? atoi(argv[2]) : default_port; 481 // Listen on port 1234 482 int listensock = socket(AF_INET, SOCK_STREAM, 0); 483 assert(listensock != -1); 484 struct sockaddr_in a = { 485 .sin_family = PF_INET, 486 .sin_port = htons(port), 487 .sin_addr = { 488 .s_addr = htonl(INADDR_ANY) 489 } 490 }; 491 int r = bind(listensock, (struct sockaddr *)&a, sizeof(a)); 492 if(r == -1) { 493 printf("bind: %s\n", strerror(errno)); 494 } 495 assert(r == 0); 496 r = listen(listensock, 5); 497 assert(r == 0); 498 499 socklen_t sizea = sizeof(a); 500 connsock = accept(listensock, (struct sockaddr *)&a, &sizea); 501 assert(connsock != -1); 502 assert(sizea == sizeof(a)); 503 504 int from = (ntohl(a.sin_addr.s_addr) & 0xff) - 1; 505 printf("got connection from %d\n", from); 506 507 508 /* circular buffer: 509 * - writer writes at byte grain 510 * - reader reads at sizeof(replay_eventrec_t) grain */ 511 const size_t er_size_elems = 1024; /* size in elements */ 512 replay_eventrec_t ers[er_size_elems]; 513 const size_t er_size = sizeof(ers); /* size in bytes */ 514 uint64_t er_r, er_w; /* full indices (in bytes) */ 515 516 er_r = er_w = 0; 517 518 for(;;) { 519 520 dmsg("r:%zd w:%zd er_size:%zd\n", er_r, er_w, er_size); 521 size_t er_avail = er_size - (er_w - er_r); 522 size_t er_w_idx = er_w % er_size; 523 char *xfer_start = (char *)ers + er_w_idx; 524 size_t xfer_len = MIN(er_avail, er_size - er_w_idx); 525 526 if (xfer_len == 0) { 527 continue; 528 } 529 530 /* fetch events */ 531 dmsg("RECV: from:%lu len:%zd\n", xfer_start - (char *)ers, xfer_len); 532 ssize_t ret = recv(connsock, xfer_start, xfer_len, 0); 533 if(ret == -1) { 534 perror("recv"); 535 exit(1); 536 } else if (ret == 0) { 537 printf("end of session\n"); 538 break; 539 } 540 dmsg("GOT DATA=%zd!\n", ret); 541 er_w += ret; 542 543 /* handle events */ 544 assert(er_r % sizeof(replay_eventrec_t) == 0); 545 assert(er_w > er_r); 546 while (er_w - er_r >= sizeof(replay_eventrec_t)) { 547 size_t er_r_items = er_r / sizeof(replay_eventrec_t); 548 replay_eventrec_t *er = ers + (er_r_items % er_size_elems); 549 550 handle_event(er); 551 552 // notify server that we are done with a task 553 if (er->op == TOP_Exit) { 554 uint16_t pid = er->pid; 555 dmsg("SENDING PID: %d\n", pid); 556 if (send(connsock, &pid, sizeof(pid), 0) != sizeof(pid)) { 557 perror("send"); 558 exit(1); 559 } 560 } 561 562 // increase read pointer 563 er_r += sizeof(replay_eventrec_t); 564 } 565 } 566#endif 567 568 return 0; 569} 570