1#include <stdio.h>
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#include <unistd.h>
6#include <errno.h>
7#include <sys/stat.h>
8#ifndef __linux__
9#include <barrelfish/barrelfish.h>
10#include <vfs/vfs.h>
11#include <barrelfish/nameservice_client.h>
12#include <if/replay_defs.h>
13#include <barrelfish/bulk_transfer.h>
14#else
15#include <stdbool.h>
16#include <sys/types.h>
17#include <fcntl.h>
18#include <sys/socket.h>
19#include <netinet/in.h>
20#include <netinet/ip.h>
21#include <netdb.h>
22#include <errno.h>
23#include <sched.h>
24#include <inttypes.h>
25#include <sys/time.h>
26#include <sys/resource.h>
27#endif
28
29#include "defs.h"
30
31#ifndef MIN
32#define MIN(x,y) (x < y ? x : y)
33#endif
34
35static char *defdir;
36static struct {
37    uint64_t op_ticks[TOPs_Total];
38    uint64_t op_count[TOPs_Total];
39    uint64_t total_ticks;
40} Stats = {{0}, {0}, 0};
41
42//static uint64_t total_ticks=0, open_ticks=0, read_ticks=0, unlink_ticks=0;
43
44#ifndef __linux__
45static void export_cb(void *st, errval_t err, iref_t iref)
46{
47    assert(err_is_ok(err));
48    char name[256];
49    snprintf(name, 256, "replay_slave.%u", disp_get_core_id());
50    msg("%s:%s() :: === registering %s\n", __FILE__, __FUNCTION__, name);
51    err = nameservice_register(name, iref);
52    assert(err_is_ok(err));
53}
54#else
55static int connsock = -1;
56#endif
57
58#define MAX_FD_CONV     256
59#define MAX_DATA        (2*1024*1024)
60#define WBUF_SIZE       (16*1024*1024)
61
62struct wbuf;
63
64struct wbuf_entry {
65    struct wbuf *next, *prev;
66};
67
68typedef struct wbuf {
69    int           w_fd;
70    size_t        w_i;
71    unsigned char w_buf[WBUF_SIZE];
72    struct wbuf   *next, *prev;
73} wbuf_t;
74
75static struct {
76    struct wbuf *head;
77    int cnt;
78} Wbufs = {
79    .head= NULL,
80    .cnt = 0
81};
82
83static void
84wb_add(wbuf_t *wb)
85{
86    if (Wbufs.head == NULL) {
87        assert(Wbufs.cnt == 0);
88        Wbufs.head = wb->next = wb->prev = wb;
89    } else {
90        struct wbuf *head = Wbufs.head;
91        wb->next = head;
92        wb->prev = head->prev;
93        head->prev->next = wb;
94        head->prev = wb;
95    }
96    Wbufs.cnt++;
97}
98
99static void
100wb_remove(wbuf_t *wb)
101{
102    if (Wbufs.cnt == 1) {
103        Wbufs.head = NULL;
104    } else {
105        wb->next->prev = wb->prev;
106        wb->prev->next = wb->next;
107    }
108    Wbufs.cnt--;
109}
110
111static int openfiles = 0, read_fails=0, write_fails=0, seek_fails=0;
112static int tfd2fd[MAX_FD_CONV] = {0};    /* trace fd -> fd */
113static int tfd2fname[MAX_FD_CONV] = {0}; /* trace fd -> name */
114wbuf_t    *tfd2wb[MAX_FD_CONV] = {0};    /* trace fd -> write buffer */
115static char data[MAX_DATA];
116
117#ifndef __linux__
118static struct bulk_transfer_slave bulk_slave;
119static uint64_t tscperms;
120#endif
121
122#ifdef __linux__
123static int
124disp_get_core_id(void)
125{
126    return getpid();
127}
128static inline uint64_t rdtsc(void)
129{
130    uint32_t eax, edx;
131    __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx));
132    return ((uint64_t)edx << 32) | eax;
133}
134#endif
135
136#ifndef __linux__
137static void handle_init(struct replay_binding *b, struct capref shared_mem, uint32_t size)
138{
139    errval_t err;
140    vregion_flags_t vspace_fl;
141
142    vspace_fl = VREGION_FLAGS_READ_WRITE;
143
144    // Map the frame in local memory
145    void *pool;
146    err = vspace_map_one_frame_attr(&pool, size, shared_mem, vspace_fl, NULL, NULL);
147    assert(pool != NULL);
148    assert(err_is_ok(err));
149
150    // Init receiver
151    err = bulk_slave_init(pool, size, &bulk_slave);
152    assert(err_is_ok(err));
153    msg("%s:%s: done\n", __FILE__, __FUNCTION__);
154
155    err = b->tx_vtbl.slave_init_reply(b, NOP_CONT);
156    assert(err_is_ok(err));
157}
158
159static void handle_finish(struct replay_binding *b)
160{
161    errval_t err;
162    err = b->tx_vtbl.slave_finish_reply(b, NOP_CONT);
163    assert(err_is_ok(err));
164}
165
166static void handle_print_stats(struct replay_binding *b)
167{
168    errval_t err;
169    msg("SLAVE[%u]: END took %" PRIu64 " ticks (%lf ms)\n", disp_get_core_id(), Stats.total_ticks, (double)Stats.total_ticks/(double)tscperms);
170    for (int i=0; i<TOPs_Total; i++) {
171        uint64_t op_cnt = Stats.op_count[i];
172        double op_time = (double)Stats.op_ticks[i]/(double)tscperms;
173        msg(" op:%-10s cnt:%8" PRIu64  " time:%13.2lf avg:%9.3lf\n", top2str[i], op_cnt, op_time, op_time/(double)op_cnt);
174    }
175    msg("SLAVE[%u]: CACHE STATISTICS\n", disp_get_core_id());
176    err = b->tx_vtbl.slave_print_stats_reply(b, NOP_CONT);
177    assert(err_is_ok(err));
178}
179#endif
180
181static void
182do_handle_event(replay_eventrec_t *er)
183{
184    static int pid = 0;
185    static int op_id = 0;
186    const int open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
187
188    /* pick a file for this operation */
189    // (only needed for Open/Create/Unlink)
190    char fname[256];
191    snprintf(fname, 256, "%s/data/%"PRIu32"", defdir, er->fnumsize);
192
193
194    op_id++;
195    enum top op = er->op;
196    dmsg("SLAVE[%u]: REQ pid:%d op:%d fd:%d fnumsize:%d [op_id:%d] er:%p\n", disp_get_core_id(), er->pid, op, er->fd, er->fnumsize, op_id, er);
197
198    /* - master will send consecutive operations with the same pid
199     * - the pid will change after an TOP_Exit, and a subsequent Open/Create */
200    if (pid == 0) {
201        assert(er->op == TOP_Open || er->op == TOP_Create);
202        pid = er->pid;
203        dmsg("SLAVE[%u]: got new pid:%d\n", disp_get_core_id(), pid);
204        op_id = 0;
205    } else if (er->pid != pid) {
206        printf("REQ does not match current pid (%u). Aborting\n", pid);
207        assert(0);
208    } else if (op == TOP_Exit) {
209        pid = 0;
210    }
211
212    int open_flags = 0;
213
214    switch(op) {
215    case TOP_Create:
216        open_flags = O_CREAT;
217    /* fallthrough */
218    case TOP_Open:
219        /* set flags */
220        switch(er->mode) {
221        case FLAGS_RdOnly:
222            open_flags |= O_RDONLY;
223            break;
224
225        case FLAGS_WrOnly:
226            open_flags |= O_WRONLY;
227            break;
228
229        case FLAGS_RdWr:
230            open_flags |= O_RDWR;
231            break;
232        }
233
234        /* allocate a write buffer */
235        if (open_flags != O_RDONLY) {
236            wbuf_t *wb = tfd2wb[er->fd] = malloc(sizeof(wbuf_t));
237            assert(wb);
238            wb->w_i = 0;
239            wb->w_fd = er->fd;
240            wb_add(wb);
241        }
242
243        /* assert(fd2fptr[er->fd] == NULL);
244         * the above assertion will fail, because some close() operations are
245         * missing from the file:
246         *  $ egrep -c -e 'close' <kernelcompile.trace.anon
247         *  10779
248         *  $ egrep -c -e '(open|creat)' <kernelcompile.trace.anon
249         *  10974  */
250
251        /* open the file */
252        tfd2fd[er->fd] = open(fname, open_flags, open_mode);
253        tfd2fname[er->fd] = er->fnumsize;
254        if (tfd2fd[er->fd] != -1) {
255            openfiles++;
256        } else {
257            printf("Open file:%s (%d) failed\n", fname, open_flags);
258            perror(fname);
259            assert(0);
260        }
261        break;
262
263    case TOP_Unlink: {
264        int ret = unlink(fname);
265        assert(ret != -1);
266        break;
267    }
268
269    case TOP_Read: {
270        //uint64_t ticks = rdtsc();
271        if (er->fnumsize > MAX_DATA) {
272            printf("er->fnumsize == %"PRIu32"\n", er->fnumsize);
273            assert(0);
274        }
275        int fd = tfd2fd[er->fd];
276        int ret = read(fd, data, er->fnumsize);
277        if (ret != er->fnumsize) {
278            dmsg("[R] op_id:%d er->fnumsize:%u, read:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr));
279            read_fails++;
280        }
281        //ticks = rdtsc() - ticks;
282        //msg("SLAVE[%d] READ %d took %lu ticks (%lf ms)\n", disp_get_core_id(), rdcnt++, ticks, (double)ticks/(double)tscperms);
283        break;
284    }
285
286    case TOP_Write: {
287        if (er->fnumsize > MAX_DATA) {
288            printf("er->fnumsize == %"PRIu32"\n", er->fnumsize);
289            assert(0);
290        }
291        wbuf_t *wb = tfd2wb[er->fd];
292        int fd = tfd2fd[er->fd];
293        assert(wb);
294        size_t rem_bytes = er->fnumsize; /* remaining bytes to write */
295        for (;;) {
296            size_t buff_bytes = WBUF_SIZE - wb->w_i; /* wbuf available bytes */
297            if (buff_bytes > rem_bytes) {
298                memcpy(wb->w_buf + wb->w_i, data, rem_bytes);
299                wb->w_i += rem_bytes;
300                break;
301            } else {
302                memcpy(wb->w_buf + wb->w_i, data, buff_bytes);
303                int ret = write(fd, wb->w_buf, WBUF_SIZE);
304                if (ret != WBUF_SIZE) {
305                    write_fails++;
306                }
307                rem_bytes -= buff_bytes;
308                wb->w_i = 0;
309            }
310        }
311        break;
312    }
313
314    case TOP_Seek: {
315        int fd = tfd2fd[er->fd];
316        int ret = lseek(fd, er->fnumsize, SEEK_SET);
317        if (ret != er->fnumsize) {
318            seek_fails++;
319            //msg("[S] op_id:%d er->fnumsize:%u, seek:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr));
320        }
321        break;
322    }
323
324    case TOP_Close: {
325        wbuf_t *wb = tfd2wb[er->fd];
326        int fd = tfd2fd[er->fd];
327        if (wb != NULL) {
328            /* flush write buffer */
329            int r = write(fd, wb->w_buf, wb->w_i);
330            if (r != wb->w_i) {
331                write_fails++;
332            }
333            wb_remove(wb);
334            free(wb);
335            tfd2wb[er->fd] = NULL;
336        }
337        int ret = close(fd);
338        assert(ret == 0);
339        openfiles--;
340        tfd2fd[er->fd] = 0;
341        tfd2fname[er->fd] = 0;
342        break;
343    }
344
345    case TOP_Exit: {
346        wbuf_t *wb;
347        wb = Wbufs.head;
348        for (int i=0; i<Wbufs.cnt; i++) {
349            wbuf_t *wb_tmp = wb->next;
350            int fd = tfd2fd[wb->w_fd];
351            int r = write(fd, wb->w_buf, wb->w_i);
352            if (r != wb->w_i) {
353                write_fails++;
354            }
355            tfd2wb[wb->w_fd] = 0;
356            free(wb);
357            wb = wb_tmp;
358        }
359        Wbufs.cnt  = 0;
360        Wbufs.head = NULL;
361        dmsg("SLAVE[%u]: TOP_Exit on %d\n", disp_get_core_id(), er->pid);
362        break;
363    }
364
365    default:
366        printf("Invalid request: %d\n", op);
367        assert(0);
368        break;
369    }
370}
371
372static void
373handle_event(replay_eventrec_t *er)
374{
375    uint64_t handle_ticks = rdtsc();
376    enum top op = er->op;
377
378    do_handle_event(er);
379
380    /* update stats */
381    handle_ticks = (rdtsc() - handle_ticks);
382    Stats.total_ticks += handle_ticks;
383    Stats.op_count[op]++;
384    Stats.op_ticks[op] += handle_ticks;
385}
386#ifndef __linux__
387
388static void handle_new_task(struct replay_binding *b, uint64_t bulk_id, uint32_t tes_size)
389{
390    errval_t err;
391    replay_eventrec_t *tes;
392    size_t tes_nr;
393    int pid;
394
395    tes = bulk_slave_buf_get_mem(&bulk_slave, bulk_id, NULL);
396    tes_nr = tes_size / sizeof(replay_eventrec_t);
397    assert(tes_size % sizeof(replay_eventrec_t) == 0);
398
399    if (tes->op != TOP_Open && tes->op != TOP_Create) {
400        msg("ABORTING: task with pid:%d starts with op:%d\n", tes->pid, tes->op);
401        assert(false);
402    }
403    pid = tes->pid;
404    for (size_t i=0; i<tes_nr; i++) {
405        replay_eventrec_t *er = tes + i;
406        handle_event(er);
407    }
408
409    err = b->tx_vtbl.task_completed(b, NOP_CONT, pid, bulk_id);
410    assert(err_is_ok(err));
411}
412
413
414static struct replay_rx_vtbl replay_vtbl = {
415    .new_task          = handle_new_task,
416    .slave_init        = handle_init,
417    .slave_finish      = handle_finish,
418    .slave_print_stats = handle_print_stats
419};
420
421static errval_t connect_cb(void *st, struct replay_binding *b)
422{
423    b->rx_vtbl = replay_vtbl;
424    return SYS_ERR_OK;
425}
426#endif
427
428int main(int argc, char *argv[])
429{
430#ifndef __linux__
431    assert(argc >= 4);
432#else
433    const int default_port = 1234;
434    if (argc < 2) {
435        fprintf(stderr, "Usage: %s <data_dir> [port (default:%d)]\n", argv[0], default_port);
436        exit(1);
437    }
438#endif
439    defdir = argv[1];
440
441    msg("===================> replay slave up\n");
442#ifndef __linux__
443    assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms)));
444
445    errval_t err = vfs_mkdir(argv[2]);
446    if(err_is_fail(err) && err_no(err) != FS_ERR_EXISTS) {
447        DEBUG_ERR(err, "vfs_mkdir");
448    }
449    /* assert(err_is_ok(err)); */
450
451    err = vfs_mount(argv[2], argv[3]);
452    assert(err_is_ok(err));
453
454    err = replay_export(NULL, export_cb, connect_cb,
455                        get_default_waitset(),
456                        IDC_EXPORT_FLAGS_DEFAULT);
457    assert(err_is_ok(err));
458
459    msg("%s:%s() :: slave starts servicing requests\n", __FILE__, __FUNCTION__);
460    for(;;) {
461        err = event_dispatch(get_default_waitset());
462        assert(err_is_ok(err));
463    }
464#else
465    /* { */
466    /*     struct rlimit rl; */
467    /*     rl.rlim_cur = 2048; */
468    /*     rl.rlim_max = 2050; */
469    /*     int r = setrlimit(RLIMIT_NOFILE, &rl); */
470    /*     if(r == -1) { */
471    /*         printf("setrlimit errno = %s\n", strerror(errno)); */
472    /*     } */
473    /*     assert(r == 0); */
474    /* } */
475
476    int port = argc > 2 ? atoi(argv[2]) : default_port;
477    // Listen on port 1234
478    int listensock = socket(AF_INET, SOCK_STREAM, 0);
479    assert(listensock != -1);
480    struct sockaddr_in a = {
481        .sin_family = PF_INET,
482        .sin_port = htons(port),
483        .sin_addr = {
484            .s_addr = htonl(INADDR_ANY)
485        }
486    };
487    int r = bind(listensock, (struct sockaddr *)&a, sizeof(a));
488    if(r == -1) {
489        printf("bind: %s\n", strerror(errno));
490    }
491    assert(r == 0);
492    r = listen(listensock, 5);
493    assert(r == 0);
494
495    socklen_t sizea = sizeof(a);
496    connsock = accept(listensock, (struct sockaddr *)&a, &sizea);
497    assert(connsock != -1);
498    assert(sizea == sizeof(a));
499
500    int from = (ntohl(a.sin_addr.s_addr) & 0xff) - 1;
501    printf("got connection from %d\n", from);
502
503
504    /* circular buffer:
505     *  - writer writes at byte grain
506     *  - reader reads at sizeof(replay_eventrec_t) grain */
507    const size_t er_size_elems = 1024;   /* size in elements */
508    replay_eventrec_t ers[er_size_elems];
509    const size_t er_size = sizeof(ers);   /* size in bytes */
510    uint64_t er_r, er_w;                  /* full indices (in bytes) */
511
512    er_r = er_w = 0;
513
514    for(;;) {
515
516        dmsg("r:%zd w:%zd er_size:%zd\n", er_r, er_w, er_size);
517        size_t er_avail = er_size - (er_w - er_r);
518        size_t er_w_idx = er_w % er_size;
519        char *xfer_start = (char *)ers + er_w_idx;
520        size_t xfer_len = MIN(er_avail, er_size - er_w_idx);
521
522        if (xfer_len == 0) {
523            continue;
524        }
525
526        /* fetch events */
527        dmsg("RECV: from:%lu len:%zd\n", xfer_start - (char  *)ers, xfer_len);
528        ssize_t ret = recv(connsock, xfer_start, xfer_len, 0);
529        if(ret == -1) {
530            perror("recv");
531            exit(1);
532        } else if (ret == 0) {
533            printf("end of session\n");
534            break;
535        }
536        dmsg("GOT DATA=%zd!\n", ret);
537        er_w += ret;
538
539        /* handle events */
540        assert(er_r % sizeof(replay_eventrec_t) == 0);
541        assert(er_w > er_r);
542        while (er_w - er_r >= sizeof(replay_eventrec_t)) {
543            size_t er_r_items = er_r / sizeof(replay_eventrec_t);
544            replay_eventrec_t *er = ers + (er_r_items % er_size_elems);
545
546            handle_event(er);
547
548            // notify server that we are done with a task
549            if (er->op == TOP_Exit) {
550                uint16_t pid = er->pid;
551                dmsg("SENDING PID: %d\n", pid);
552                if (send(connsock, &pid, sizeof(pid), 0) != sizeof(pid)) {
553                    perror("send");
554                    exit(1);
555                }
556            }
557
558            // increase read pointer
559            er_r += sizeof(replay_eventrec_t);
560        }
561    }
562#endif
563
564    return 0;
565}
566