1
2#define _POSIX_C_SOURCE 200809L
3
4
5#include <stdio.h>
6#include <stdlib.h>
7#include <assert.h>
8#include <string.h>
9#include <unistd.h>
10#include <errno.h>
11#include <fcntl.h>
12#include <sys/stat.h>
13#ifndef __linux__
14#include <barrelfish/barrelfish.h>
15#include <vfs/vfs.h>
16#include <barrelfish/nameservice_client.h>
17#include <if/replay_defs.h>
18#include <barrelfish/bulk_transfer.h>
19#else
20#include <stdbool.h>
21#include <sys/types.h>
22#include <sys/socket.h>
23#include <netinet/in.h>
24#include <netinet/ip.h>
25#include <netdb.h>
26#include <errno.h>
27#include <sched.h>
28#include <inttypes.h>
29#include <sys/time.h>
30#include <sys/resource.h>
31#endif
32
33#include "defs.h"
34
35#ifndef MIN
36#define MIN(x,y) (x < y ? x : y)
37#endif
38
39static char *defdir;
40static struct {
41    uint64_t op_ticks[TOPs_Total];
42    uint64_t op_count[TOPs_Total];
43    uint64_t total_ticks;
44} Stats = {{0}, {0}, 0};
45
46//static uint64_t total_ticks=0, open_ticks=0, read_ticks=0, unlink_ticks=0;
47
48#ifndef __linux__
49static void export_cb(void *st, errval_t err, iref_t iref)
50{
51    assert(err_is_ok(err));
52    char name[256];
53    snprintf(name, 256, "replay_slave.%u", disp_get_core_id());
54    msg("%s:%s() :: === registering %s\n", __FILE__, __FUNCTION__, name);
55    err = nameservice_register(name, iref);
56    assert(err_is_ok(err));
57}
58#else
59static int connsock = -1;
60#endif
61
62#define MAX_FD_CONV     256
63#define MAX_DATA        (2*1024*1024)
64#define WBUF_SIZE       (16*1024*1024)
65
66struct wbuf;
67
68struct wbuf_entry {
69    struct wbuf *next, *prev;
70};
71
72typedef struct wbuf {
73    int           w_fd;
74    size_t        w_i;
75    unsigned char w_buf[WBUF_SIZE];
76    struct wbuf   *next, *prev;
77} wbuf_t;
78
79static struct {
80    struct wbuf *head;
81    int cnt;
82} Wbufs = {
83    .head= NULL,
84    .cnt = 0
85};
86
87static void
88wb_add(wbuf_t *wb)
89{
90    if (Wbufs.head == NULL) {
91        assert(Wbufs.cnt == 0);
92        Wbufs.head = wb->next = wb->prev = wb;
93    } else {
94        struct wbuf *head = Wbufs.head;
95        wb->next = head;
96        wb->prev = head->prev;
97        head->prev->next = wb;
98        head->prev = wb;
99    }
100    Wbufs.cnt++;
101}
102
103static void
104wb_remove(wbuf_t *wb)
105{
106    if (Wbufs.cnt == 1) {
107        Wbufs.head = NULL;
108    } else {
109        wb->next->prev = wb->prev;
110        wb->prev->next = wb->next;
111    }
112    Wbufs.cnt--;
113}
114
115static int openfiles = 0, read_fails=0, write_fails=0, seek_fails=0;
116static int tfd2fd[MAX_FD_CONV] = {0};    /* trace fd -> fd */
117static int tfd2fname[MAX_FD_CONV] = {0}; /* trace fd -> name */
118wbuf_t    *tfd2wb[MAX_FD_CONV] = {0};    /* trace fd -> write buffer */
119static char data[MAX_DATA];
120
121#ifndef __linux__
122static struct bulk_transfer_slave bulk_slave;
123static cycles_t tscperms;
124#endif
125
126#ifdef __linux__
127static int
128disp_get_core_id(void)
129{
130    return getpid();
131}
132static inline uint64_t rdtsc(void)
133{
134    uint32_t eax, edx;
135    __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx));
136    return ((uint64_t)edx << 32) | eax;
137}
138#endif
139
140#ifndef __linux__
141static void handle_init(struct replay_binding *b, struct capref shared_mem, uint32_t size)
142{
143    errval_t err;
144    vregion_flags_t vspace_fl;
145
146    vspace_fl = VREGION_FLAGS_READ_WRITE;
147
148    // Map the frame in local memory
149    void *pool;
150    err = vspace_map_one_frame_attr(&pool, size, shared_mem, vspace_fl, NULL, NULL);
151    assert(pool != NULL);
152    assert(err_is_ok(err));
153
154    // Init receiver
155    err = bulk_slave_init(pool, size, &bulk_slave);
156    assert(err_is_ok(err));
157    msg("%s:%s: done\n", __FILE__, __FUNCTION__);
158
159    err = b->tx_vtbl.slave_init_reply(b, NOP_CONT);
160    assert(err_is_ok(err));
161}
162
163static void handle_finish(struct replay_binding *b)
164{
165    errval_t err;
166    err = b->tx_vtbl.slave_finish_reply(b, NOP_CONT);
167    assert(err_is_ok(err));
168}
169
170static void handle_print_stats(struct replay_binding *b)
171{
172    errval_t err;
173    msg("SLAVE[%u]: END took %" PRIu64 " ticks (%lf ms)\n", disp_get_core_id(), Stats.total_ticks, (double)Stats.total_ticks/(double)tscperms);
174    for (int i=0; i<TOPs_Total; i++) {
175        uint64_t op_cnt = Stats.op_count[i];
176        double op_time = (double)Stats.op_ticks[i]/(double)tscperms;
177        msg(" op:%-10s cnt:%8" PRIu64  " time:%13.2lf avg:%9.3lf\n", top2str[i], op_cnt, op_time, op_time/(double)op_cnt);
178    }
179    msg("SLAVE[%u]: CACHE STATISTICS\n", disp_get_core_id());
180    err = b->tx_vtbl.slave_print_stats_reply(b, NOP_CONT);
181    assert(err_is_ok(err));
182}
183#endif
184
185static void
186do_handle_event(replay_eventrec_t *er)
187{
188    static int pid = 0;
189    static int op_id = 0;
190    const int open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
191
192    /* pick a file for this operation */
193    // (only needed for Open/Create/Unlink)
194    char fname[256];
195    snprintf(fname, 256, "%s/data/%"PRIu32"", defdir, er->fnumsize);
196
197
198    op_id++;
199    enum top op = er->op;
200    dmsg("SLAVE[%u]: REQ pid:%d op:%d fd:%d fnumsize:%d [op_id:%d] er:%p\n", disp_get_core_id(), er->pid, op, er->fd, er->fnumsize, op_id, er);
201
202    /* - master will send consecutive operations with the same pid
203     * - the pid will change after an TOP_Exit, and a subsequent Open/Create */
204    if (pid == 0) {
205        assert(er->op == TOP_Open || er->op == TOP_Create);
206        pid = er->pid;
207        dmsg("SLAVE[%u]: got new pid:%d\n", disp_get_core_id(), pid);
208        op_id = 0;
209    } else if (er->pid != pid) {
210        printf("REQ does not match current pid (%u). Aborting\n", pid);
211        assert(0);
212    } else if (op == TOP_Exit) {
213        pid = 0;
214    }
215
216    int open_flags = 0;
217
218    switch(op) {
219    case TOP_Create:
220        open_flags = O_CREAT;
221    /* fallthrough */
222    case TOP_Open:
223        /* set flags */
224        switch(er->mode) {
225        case FLAGS_RdOnly:
226            open_flags |= O_RDONLY;
227            break;
228
229        case FLAGS_WrOnly:
230            open_flags |= O_WRONLY;
231            break;
232
233        case FLAGS_RdWr:
234            open_flags |= O_RDWR;
235            break;
236        }
237
238        /* allocate a write buffer */
239        if (open_flags != O_RDONLY) {
240            wbuf_t *wb = tfd2wb[er->fd] = malloc(sizeof(wbuf_t));
241            assert(wb);
242            wb->w_i = 0;
243            wb->w_fd = er->fd;
244            wb_add(wb);
245        }
246
247        /* assert(fd2fptr[er->fd] == NULL);
248         * the above assertion will fail, because some close() operations are
249         * missing from the file:
250         *  $ egrep -c -e 'close' <kernelcompile.trace.anon
251         *  10779
252         *  $ egrep -c -e '(open|creat)' <kernelcompile.trace.anon
253         *  10974  */
254
255        /* open the file */
256        tfd2fd[er->fd] = open(fname, open_flags, open_mode);
257        tfd2fname[er->fd] = er->fnumsize;
258        if (tfd2fd[er->fd] != -1) {
259            openfiles++;
260        } else {
261            printf("Open file:%s (%d) failed\n", fname, open_flags);
262            perror(fname);
263            assert(0);
264        }
265        break;
266
267    case TOP_Unlink: {
268        int ret = unlink(fname);
269        assert(ret != -1);
270        break;
271    }
272
273    case TOP_Read: {
274        //uint64_t ticks = rdtsc();
275        if (er->fnumsize > MAX_DATA) {
276            printf("er->fnumsize == %"PRIu32"\n", er->fnumsize);
277            assert(0);
278        }
279        int fd = tfd2fd[er->fd];
280        int ret = read(fd, data, er->fnumsize);
281        if (ret != er->fnumsize) {
282            dmsg("[R] op_id:%d er->fnumsize:%u, read:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr));
283            read_fails++;
284        }
285        //ticks = rdtsc() - ticks;
286        //msg("SLAVE[%d] READ %d took %lu ticks (%lf ms)\n", disp_get_core_id(), rdcnt++, ticks, (double)ticks/(double)tscperms);
287        break;
288    }
289
290    case TOP_Write: {
291        if (er->fnumsize > MAX_DATA) {
292            printf("er->fnumsize == %"PRIu32"\n", er->fnumsize);
293            assert(0);
294        }
295        wbuf_t *wb = tfd2wb[er->fd];
296        int fd = tfd2fd[er->fd];
297        assert(wb);
298        size_t rem_bytes = er->fnumsize; /* remaining bytes to write */
299        for (;;) {
300            size_t buff_bytes = WBUF_SIZE - wb->w_i; /* wbuf available bytes */
301            if (buff_bytes > rem_bytes) {
302                memcpy(wb->w_buf + wb->w_i, data, rem_bytes);
303                wb->w_i += rem_bytes;
304                break;
305            } else {
306                memcpy(wb->w_buf + wb->w_i, data, buff_bytes);
307                int ret = write(fd, wb->w_buf, WBUF_SIZE);
308                if (ret != WBUF_SIZE) {
309                    write_fails++;
310                }
311                rem_bytes -= buff_bytes;
312                wb->w_i = 0;
313            }
314        }
315        break;
316    }
317
318    case TOP_Seek: {
319        int fd = tfd2fd[er->fd];
320        int ret = lseek(fd, er->fnumsize, SEEK_SET);
321        if (ret != er->fnumsize) {
322            seek_fails++;
323            //msg("[S] op_id:%d er->fnumsize:%u, seek:%d fname:%d pid:%d error:%d eof:%d pos:%ld\n", op_id, er->fnumsize, ret, fd2fname[er->fd], er->pid, ferror(fptr), feof(fptr), ftell(fptr));
324        }
325        break;
326    }
327
328    case TOP_Close: {
329        wbuf_t *wb = tfd2wb[er->fd];
330        int fd = tfd2fd[er->fd];
331        if (wb != NULL) {
332            /* flush write buffer */
333            int r = write(fd, wb->w_buf, wb->w_i);
334            if (r != wb->w_i) {
335                write_fails++;
336            }
337            wb_remove(wb);
338            free(wb);
339            tfd2wb[er->fd] = NULL;
340        }
341        int ret = close(fd);
342        assert(ret == 0);
343        openfiles--;
344        tfd2fd[er->fd] = 0;
345        tfd2fname[er->fd] = 0;
346        break;
347    }
348
349    case TOP_Exit: {
350        wbuf_t *wb;
351        wb = Wbufs.head;
352        for (int i=0; i<Wbufs.cnt; i++) {
353            wbuf_t *wb_tmp = wb->next;
354            int fd = tfd2fd[wb->w_fd];
355            int r = write(fd, wb->w_buf, wb->w_i);
356            if (r != wb->w_i) {
357                write_fails++;
358            }
359            tfd2wb[wb->w_fd] = 0;
360            free(wb);
361            wb = wb_tmp;
362        }
363        Wbufs.cnt  = 0;
364        Wbufs.head = NULL;
365        dmsg("SLAVE[%u]: TOP_Exit on %d\n", disp_get_core_id(), er->pid);
366        break;
367    }
368
369    default:
370        printf("Invalid request: %d\n", op);
371        assert(0);
372        break;
373    }
374}
375
376static void
377handle_event(replay_eventrec_t *er)
378{
379    uint64_t handle_ticks = rdtsc();
380    enum top op = er->op;
381
382    do_handle_event(er);
383
384    /* update stats */
385    handle_ticks = (rdtsc() - handle_ticks);
386    Stats.total_ticks += handle_ticks;
387    Stats.op_count[op]++;
388    Stats.op_ticks[op] += handle_ticks;
389}
390#ifndef __linux__
391
392static void handle_new_task(struct replay_binding *b, uint64_t bulk_id, uint32_t tes_size)
393{
394    errval_t err;
395    replay_eventrec_t *tes;
396    size_t tes_nr;
397    int pid;
398
399    tes = bulk_slave_buf_get_mem(&bulk_slave, bulk_id, NULL);
400    tes_nr = tes_size / sizeof(replay_eventrec_t);
401    assert(tes_size % sizeof(replay_eventrec_t) == 0);
402
403    if (tes->op != TOP_Open && tes->op != TOP_Create) {
404        msg("ABORTING: task with pid:%d starts with op:%d\n", tes->pid, tes->op);
405        assert(false);
406    }
407    pid = tes->pid;
408    for (size_t i=0; i<tes_nr; i++) {
409        replay_eventrec_t *er = tes + i;
410        handle_event(er);
411    }
412
413    err = b->tx_vtbl.task_completed(b, NOP_CONT, pid, bulk_id);
414    assert(err_is_ok(err));
415}
416
417
418static struct replay_rx_vtbl replay_vtbl = {
419    .new_task          = handle_new_task,
420    .slave_init        = handle_init,
421    .slave_finish      = handle_finish,
422    .slave_print_stats = handle_print_stats
423};
424
425static errval_t connect_cb(void *st, struct replay_binding *b)
426{
427    b->rx_vtbl = replay_vtbl;
428    return SYS_ERR_OK;
429}
430#endif
431
432int main(int argc, char *argv[])
433{
434#ifndef __linux__
435    assert(argc >= 4);
436#else
437    const int default_port = 1234;
438    if (argc < 2) {
439        fprintf(stderr, "Usage: %s <data_dir> [port (default:%d)]\n", argv[0], default_port);
440        exit(1);
441    }
442#endif
443    defdir = argv[1];
444
445    msg("===================> replay slave up\n");
446#ifndef __linux__
447    assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms)));
448
449    errval_t err = vfs_mkdir(argv[2]);
450    if(err_is_fail(err) && err_no(err) != FS_ERR_EXISTS) {
451        DEBUG_ERR(err, "vfs_mkdir");
452    }
453    /* assert(err_is_ok(err)); */
454
455    err = vfs_mount(argv[2], argv[3]);
456    assert(err_is_ok(err));
457
458    err = replay_export(NULL, export_cb, connect_cb,
459                        get_default_waitset(),
460                        IDC_EXPORT_FLAGS_DEFAULT);
461    assert(err_is_ok(err));
462
463    msg("%s:%s() :: slave starts servicing requests\n", __FILE__, __FUNCTION__);
464    for(;;) {
465        err = event_dispatch(get_default_waitset());
466        assert(err_is_ok(err));
467    }
468#else
469    /* { */
470    /*     struct rlimit rl; */
471    /*     rl.rlim_cur = 2048; */
472    /*     rl.rlim_max = 2050; */
473    /*     int r = setrlimit(RLIMIT_NOFILE, &rl); */
474    /*     if(r == -1) { */
475    /*         printf("setrlimit errno = %s\n", strerror(errno)); */
476    /*     } */
477    /*     assert(r == 0); */
478    /* } */
479
480    int port = argc > 2 ? atoi(argv[2]) : default_port;
481    // Listen on port 1234
482    int listensock = socket(AF_INET, SOCK_STREAM, 0);
483    assert(listensock != -1);
484    struct sockaddr_in a = {
485        .sin_family = PF_INET,
486        .sin_port = htons(port),
487        .sin_addr = {
488            .s_addr = htonl(INADDR_ANY)
489        }
490    };
491    int r = bind(listensock, (struct sockaddr *)&a, sizeof(a));
492    if(r == -1) {
493        printf("bind: %s\n", strerror(errno));
494    }
495    assert(r == 0);
496    r = listen(listensock, 5);
497    assert(r == 0);
498
499    socklen_t sizea = sizeof(a);
500    connsock = accept(listensock, (struct sockaddr *)&a, &sizea);
501    assert(connsock != -1);
502    assert(sizea == sizeof(a));
503
504    int from = (ntohl(a.sin_addr.s_addr) & 0xff) - 1;
505    printf("got connection from %d\n", from);
506
507
508    /* circular buffer:
509     *  - writer writes at byte grain
510     *  - reader reads at sizeof(replay_eventrec_t) grain */
511    const size_t er_size_elems = 1024;   /* size in elements */
512    replay_eventrec_t ers[er_size_elems];
513    const size_t er_size = sizeof(ers);   /* size in bytes */
514    uint64_t er_r, er_w;                  /* full indices (in bytes) */
515
516    er_r = er_w = 0;
517
518    for(;;) {
519
520        dmsg("r:%zd w:%zd er_size:%zd\n", er_r, er_w, er_size);
521        size_t er_avail = er_size - (er_w - er_r);
522        size_t er_w_idx = er_w % er_size;
523        char *xfer_start = (char *)ers + er_w_idx;
524        size_t xfer_len = MIN(er_avail, er_size - er_w_idx);
525
526        if (xfer_len == 0) {
527            continue;
528        }
529
530        /* fetch events */
531        dmsg("RECV: from:%lu len:%zd\n", xfer_start - (char  *)ers, xfer_len);
532        ssize_t ret = recv(connsock, xfer_start, xfer_len, 0);
533        if(ret == -1) {
534            perror("recv");
535            exit(1);
536        } else if (ret == 0) {
537            printf("end of session\n");
538            break;
539        }
540        dmsg("GOT DATA=%zd!\n", ret);
541        er_w += ret;
542
543        /* handle events */
544        assert(er_r % sizeof(replay_eventrec_t) == 0);
545        assert(er_w > er_r);
546        while (er_w - er_r >= sizeof(replay_eventrec_t)) {
547            size_t er_r_items = er_r / sizeof(replay_eventrec_t);
548            replay_eventrec_t *er = ers + (er_r_items % er_size_elems);
549
550            handle_event(er);
551
552            // notify server that we are done with a task
553            if (er->op == TOP_Exit) {
554                uint16_t pid = er->pid;
555                dmsg("SENDING PID: %d\n", pid);
556                if (send(connsock, &pid, sizeof(pid), 0) != sizeof(pid)) {
557                    perror("send");
558                    exit(1);
559                }
560            }
561
562            // increase read pointer
563            er_r += sizeof(replay_eventrec_t);
564        }
565    }
566#endif
567
568    return 0;
569}
570