1#include <stdio.h>
2#include <stdlib.h>
3#include <assert.h>
4#include <string.h>
5#ifndef __linux__
6#include <barrelfish/barrelfish.h>
7#include <vfs/vfs.h>
8#include <barrelfish/nameservice_client.h>
9#include <barrelfish/bulk_transfer.h>
10#include <if/replay_defs.h>
11#include <errno.h>
12#else
13static const char vfs_cache_str[] = "linux (TCP/IP)";
14#include <unistd.h>
15#include <stdbool.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18#include <sys/time.h>
19#include <netinet/in.h>
20#include <netinet/ip.h>
21#include <poll.h>
22#include <netdb.h>
23#include <errno.h>
24#include <sched.h>
25#include <inttypes.h>
26#endif
27
28#include "defs.h"
29#include "hash.h"
30
31#define MAX_LINE        1024
32#define MAX_SLAVES      64
33#define MAX_DEPS        60
34#define BULK_BLOCK_SIZE (4096*256) // 1MiB
35#define BULK_BLOCKS_NR  1
36#define BULK_TOTAL_SIZE (BULK_BLOCK_SIZE*BULK_BLOCKS_NR)
37
38/* TRACE LIST */
39
40struct trace_list {
41    struct trace_entry *head, *tail;
42};
43
44static inline void trace_add_tail(struct trace_list *tl, struct trace_entry *te)
45{
46    te->next = NULL;
47
48    if (tl->head == NULL) {
49        tl->head = te;
50    } else {
51        tl->tail->next = te;
52    }
53
54    tl->tail = te;
55}
56
57/* PID ENTRIES (TASKS) */
58
59struct slave; /* forward reference */
60struct pid_entry {
61    int pid;
62    struct trace_list trace_l;
63    size_t tentries_nr;
64    /* list of children */
65    struct pid_entry *children[MAX_DEPS];
66    size_t children_nr;
67    /* list of parents */
68    struct pid_entry *parents[MAX_DEPS];
69    size_t parents_nr;
70
71    /* buffer for trace entries */
72    replay_eventrec_t *tes;
73    size_t tes_size; /* size of tes buffer in bytes */
74
75    size_t parents_completed;
76    struct slave *sl;
77    unsigned completed:1;
78};
79
80/* SLAVES */
81
82struct slave {
83    //int pid[MAX_PIDS];
84    struct pid_entry *pe;
85    //struct trace_entry *current_te;
86    //struct qelem *queue, *qend;
87#ifndef __linux__
88    struct replay_binding *b;
89    /* bulk transfer data */
90    struct capref        frame;
91    struct bulk_transfer bt;
92    uintptr_t            current_bid;
93#else
94    int socket;
95    ssize_t sentbytes;
96#endif
97};
98
99static struct {
100    int num_slaves;
101    int num_finished;
102    struct slave slaves[MAX_SLAVES];
103} SlState;
104
105//struct qelem {
106//    replay_eventrec_t er;
107//    struct qelem *next, *prev;
108//};
109//
110//struct writer {
111//    int fnum, pid;
112//    struct slave *slave;
113//    struct writer *prev, *next;
114//};
115
116/*
117 * Dependencies:
118 *  W writes to a file that R reads
119 *  R needs to wait for W to finish
120 *  W is parent, R is child
121 *  nodes that are ready to execute have no parents
122 */
123struct task_graph {
124    struct pid_entry *pes;      /* array of (all) pid entries */
125    int pes_nr, pes_size;       /* number of pid entries, size of array */
126    struct pid_entry **stack;   /* current stack */
127    int stack_nr, stack_size;   /* stack entries/size */
128    int pes_completed;          /* completed entries */
129    hash_t *pids_h;             /* pid -> pid_entry hash */
130};
131static struct task_graph TG;
132
133static struct pid_entry *
134tg_stack_peek(struct task_graph *tg, int i)
135{
136    assert(i < tg->stack_nr);
137    return tg->stack[tg->stack_size - i];
138}
139
140static struct pid_entry *
141tg_pop(struct task_graph *tg)
142{
143    struct pid_entry *ret;
144    if (tg->stack_nr == 0)
145        return NULL;
146    ret = tg->stack[tg->stack_size +1 - tg->stack_nr];
147    tg->stack_nr--;
148    assert(ret != NULL);
149    return ret;
150}
151
152static void
153tg_push(struct task_graph *tg, struct pid_entry *pe)
154{
155    assert(tg->stack_nr < tg->stack_size);
156    tg->stack[tg->stack_size - tg->stack_nr] = pe;
157    tg->stack_nr++;
158}
159
160static void
161__build_taskgraph_stack(struct task_graph *tg)
162{
163    tg->stack_nr = 0;
164    for (int i=0; i<tg->pes_nr; i++) {
165        struct pid_entry *pe = tg->pes + i;
166        if (pe->parents_nr == 0)
167            tg_push(tg, pe);
168    }
169}
170
171static void
172build_taskgraph_stack(struct task_graph *tg)
173{
174    // build stack, and fill it with ready nodes (i.e., no parent nodes)
175    tg->stack_size = TOTAL_PIDS;
176    tg->stack = calloc(tg->stack_size, sizeof(struct pid_entry *));
177    assert(tg->stack);
178    __build_taskgraph_stack(tg);
179}
180
181static void __attribute__((unused))
182tg_reset(struct task_graph *tg)
183{
184    for (int i=0; i<tg->pes_nr; i++) {
185        tg->pes[i].completed = 0;
186        tg->pes[i].parents_completed = 0;
187    }
188    tg->pes_completed = 0;
189    __build_taskgraph_stack(tg);
190}
191
192static void
193tg_complete(struct task_graph *tg, struct pid_entry *pe)
194{
195    assert(pe->completed == 0);
196    pe->completed = 1;
197    tg->pes_completed++;
198    dmsg("\tpe: %p (idx:%ld,pid:%d) completed (%d/%d)\n", pe, (pe - tg->pes), pe->pid, tg->pes_completed, tg->pes_nr);
199    for (int i=0; i < pe->children_nr; i++) {
200        struct pid_entry *pe_child = pe->children[i];
201        if (++pe_child->parents_completed == pe_child->parents_nr) {
202            tg_push(tg, pe_child);
203        }
204    }
205}
206
207
208//static struct writer *writers = NULL;
209#ifndef __linux__
210static bool bound; /* XXX: make this volatile? No, handler runs on the same context */
211static bool init_ok;
212static bool print_stats_ok;
213#endif
214
215#ifdef __linux__
216#endif
217
218#ifndef __linux__
219//static void event_done(struct replay_binding *b, uint32_t fnum)
220//{
221//    /* if(te->op == TOP_Close) { */
222//        // See if it was a writer and remove
223//
224//    printf("writer done for %u\n", fnum);
225//
226//        for(struct writer *w = writers; w != NULL; w = w->next) {
227//            if(w->fnum == fnum) {
228//                assert(w != NULL);
229//                if(w != writers) {
230//                    assert(w != NULL);
231//                    assert(w->prev != NULL);
232//                    w->prev->next = w->next;
233//                } else {
234//                    writers = w->next;
235//                }
236//                free(w);
237//                break;
238//            }
239//        }
240//    /* } */
241//}
242
243static void
244init_reply_handler(struct replay_binding *b)
245{
246    assert(!init_ok);
247    init_ok = true;
248}
249
250static void
251print_stats_reply_handler(struct replay_binding *b)
252{
253    assert(!print_stats_ok);
254    print_stats_ok = true;
255}
256static void
257task_completed_handler(struct replay_binding *b, uint16_t pid, uint64_t bulk_id)
258{
259    struct task_graph *tg = b->st;
260    struct slave *sl;
261    struct pid_entry *pe;
262
263    pe = (void *)hash_lookup(tg->pids_h, pid);
264    assert(pe != (void *)HASH_ENTRY_NOTFOUND);
265    sl = pe->sl;
266    tg_complete(tg, pe);
267    bulk_free(&sl->bt, bulk_id);
268    sl->pe = NULL;
269}
270
271static void
272finish_reply_handler(struct replay_binding *b)
273{
274    dmsg("ENTER\n");
275    SlState.num_finished++;
276}
277
278static struct replay_rx_vtbl replay_vtbl = {
279    .task_completed           = task_completed_handler,
280    .slave_finish_reply      = finish_reply_handler,
281    .slave_init_reply        = init_reply_handler,
282    .slave_print_stats_reply = print_stats_reply_handler
283};
284
285static void replay_bind_cont(void *st, errval_t err, struct replay_binding *b)
286{
287    static int slavenum = 0;
288    struct slave *sl = &SlState.slaves[slavenum];
289    slavenum++;
290
291    dmsg("ENTER\n");
292    //printf("%s:%s MY TASKGRAPH IS %p\n", __FILE__, __FUNCTION__, &TG);
293
294    assert(err_is_ok(err));
295    sl->b = b;
296    b->rx_vtbl = replay_vtbl;
297    b->st = &TG;
298    bound = true;
299    /* printf("assigned binding to %p\n", sl); */
300}
301#else
302static inline uint64_t rdtsc(void)
303{
304    uint32_t eax, edx;
305    __asm volatile ("rdtsc" : "=a" (eax), "=d" (edx));
306    return ((uint64_t)edx << 32) | eax;
307}
308#endif
309
310//static bool printall = false;
311
312
313static inline void
314pentry_add_dependency(struct pid_entry *parent, struct pid_entry *child)
315{
316    int idx;
317    for (int i=0; i<parent->children_nr; i++)
318        if (parent->children[i]->pid == child->pid)
319            return;
320
321    //printf("%d -> %d;\n", parent->pid, child->pid);
322
323    idx = parent->children_nr;
324    assert(idx < MAX_DEPS);
325    parent->children[idx] = child;
326    parent->children_nr++;
327
328    idx = child->parents_nr;
329    assert(idx < MAX_DEPS);
330    child->parents[idx] = parent;
331    child->parents_nr++;
332}
333
334//static struct pid_entry *allpids[nOTAL_PIDS];
335
336static void
337parse_tracefile_line(char *line, int linen, struct trace_entry *te)
338{
339        size_t fnum, size;
340        char flags[1024];
341        int fd;
342        unsigned int pid;
343
344        if(sscanf(line, "open %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) {
345            te->op = TOP_Open;
346            te->fd = fd;
347            te->u.fnum = fnum;
348        } else if(sscanf(line, "close %d %u", &fd, &pid) >= 2) {
349            te->op = TOP_Close;
350            te->fd = fd;
351        } else if(sscanf(line, "read %d %zu %u", &fd, &size, &pid) >= 3) {
352            te->op = TOP_Read;
353            te->fd = fd;
354            te->u.size = size;
355        } else if(sscanf(line, "write %d %zu %u", &fd, &size, &pid) >= 3) {
356            te->op = TOP_Write;
357            te->fd = fd;
358            te->u.size = size;
359        } else if(sscanf(line, "seek %d %zu %u", &fd, &size, &pid) >= 3) {
360            te->op = TOP_Seek;
361            te->fd = fd;
362            te->u.size = size;
363        } else if(sscanf(line, "creat %zu %s %d %u", &fnum, flags, &fd, &pid) >= 4) {
364            te->op = TOP_Create;
365            te->fd = fd;
366            te->u.fnum = fnum;
367        } else if(sscanf(line, "unlink %zu %s %u", &fnum, flags, &pid) >= 3) {
368            te->op = TOP_Unlink;
369            te->u.fnum = fnum;
370        } else if(sscanf(line, "exit %u", &pid) >= 1) {
371            te->op = TOP_Exit;
372        } else {
373            printf("Invalid line %d: %s\n", linen, line);
374            exit(EXIT_FAILURE);
375        }
376
377        // There's always a PID
378        te->pid = pid;
379        assert(pid != 0);
380        te->fline = linen;
381
382        // If we have flags, set them now
383        if(te->op == TOP_Open || te->op == TOP_Create) {
384            if(!strcmp(flags, "rdonly")) {
385                te->mode = FLAGS_RdOnly;
386            } else if(!strcmp(flags, "wronly")) {
387                te->mode = FLAGS_WrOnly;
388            } else if(!strcmp(flags, "rdwr")) {
389                te->mode = FLAGS_RdWr;
390            } else {
391                printf("Invalid open flags: %s\n", flags);
392                exit(EXIT_FAILURE);
393            }
394        }
395}
396
397static void
398parse_tracefile(const char *tracefile, struct trace_list *tlist)
399{
400    printf("reading tracefile...\n");
401
402    FILE *f = fopen(tracefile, "r");
403    assert(f != NULL);
404    int linen = 0;
405
406    while(!feof(f)) {
407        char line[MAX_LINE];
408
409        if (fgets(line, MAX_LINE, f) == NULL) {
410            break;
411        }
412
413        linen++;
414        if (linen % 1000 == 0) {
415            printf("---- %s:%s() parsing line = %d\n", __FILE__, __FUNCTION__, linen);
416        }
417
418        struct trace_entry *tentry = malloc(sizeof(struct trace_entry));
419        assert(tentry != NULL);
420
421        // parse current line to tentry
422        parse_tracefile_line(line, linen, tentry);
423
424        // Link it in with the rest of the list (forward order)
425        trace_add_tail(tlist, tentry);
426    }
427    fclose(f);
428    printf("tracefile read [number of lines:%d]\n", linen);
429}
430
431static void
432build_taskgraph(struct trace_list *tl, struct task_graph *tg)
433{
434    assert(tl->head != NULL && tl->tail != NULL);
435    struct trace_entry *te, *te_next = NULL;
436    hash_t *writers_h = hash_init(TOTAL_PIDS); /* fnum -> pid_entry */
437    // alloc a sequential array for all pid entries
438    tg->pids_h = hash_init(TOTAL_PIDS); /* pid  -> pid_entry */
439    tg->pes_size = TOTAL_PIDS;
440    tg->pes = calloc(tg->pes_size, sizeof(struct pid_entry)); // we can realloc it later
441    tg->pes_nr = 0;
442
443    for (te = tl->head; te != NULL; te = te_next) {
444        // store next pointer (trace_add_tail will change it)
445        te_next = te->next;
446
447        // if this is a new pid, create a new pid_entry
448        struct pid_entry *pe = (void *)hash_lookup(tg->pids_h, te->pid);
449        if (pe == (void *)HASH_ENTRY_NOTFOUND) {
450            // if a pid's first operation is not Open/Create ignore it
451            if (te->op != TOP_Open && te->op != TOP_Create) {
452                printf("%s() :: IGNORING operation %d for pid %d\n", __FUNCTION__, te->op, te->pid);
453                continue;
454            }
455            assert(tg->pes_nr < tg->pes_size);
456            pe = tg->pes + tg->pes_nr++;
457            pe->pid = te->pid;
458            hash_insert(tg->pids_h, pe->pid, (unsigned long)pe);
459            //printf("%d;\n", pe->pid);
460        }
461
462        // add trace entry into pid list
463        trace_add_tail(&pe->trace_l, te);
464        pe->tentries_nr++;
465
466        /* track dependencies:
467         *  - look at open/create operations
468         *  - put writers in a hash table, based on the file they write
469         *  - for readers, check if a writer exists for the file they read
470         *  - avoid cyclic dependencies of RW open/create
471         *  Note: For multiple writers we just track the latest open/create */
472        if (te->op == TOP_Open && te->op == TOP_Create) {
473            size_t fnum = te->u.fnum;
474            struct pid_entry *pe_writer = (void *)hash_lookup(writers_h, fnum);
475
476            if (te->mode != FLAGS_RdOnly) { // writer
477                if (pe_writer != (void *)HASH_ENTRY_NOTFOUND && pe_writer->pid != pe->pid) {
478                     //assert(0 && "multiple different writers");
479                }
480                hash_insert(writers_h, fnum, (unsigned long)pe);
481                pe_writer = pe;
482            }
483
484            if ((te->mode != FLAGS_WrOnly) &&                 // this is a reader
485                (pe_writer != (void *)HASH_ENTRY_NOTFOUND) && // a writer exists
486                (pe_writer->pid != pe->pid)) {                // and is not the reader
487                pentry_add_dependency(pe_writer, pe);
488            }
489        }
490    }
491    build_taskgraph_stack(tg);
492    //cleanup and return
493    tl->head = tl->tail = NULL;
494    hash_destroy(writers_h);
495}
496
497static void
498__print_taskgraph(struct pid_entry *root, int level)
499{
500    if (root == NULL)
501        return;
502    for (int i=0; i<level; i++)
503        printf("\t");
504    printf("%d (completed:%d)\n", root->pid, root->completed);
505
506    for (int i=0; i<root->children_nr; i++) {
507        __print_taskgraph(root->children[i], level+1);
508    }
509}
510
511static void __attribute__((unused))
512print_taskgraph(struct task_graph *tg)
513{
514    for (int i=0; i<tg->stack_nr; i++) {
515        struct pid_entry *pe = tg_stack_peek(tg, i);
516        __print_taskgraph(pe, 0);
517    }
518}
519
520static void
521print_pid_entry(struct pid_entry *pe, int print_ops)
522{
523    struct trace_entry *te;
524    printf("pid entry (%p) pid:%d children:%zu parents:%zu completed:%d tentries:%zd\n", pe, pe->pid, pe->children_nr, pe->parents_nr, pe->completed, pe->tentries_nr);
525    te = pe->trace_l.head;
526
527    if (!print_ops) {
528        return;
529    }
530
531    do {
532        printf("\t op:%d pid:%d\n", te->op, te->pid);
533    } while ((te = te->next) != NULL);
534    printf("\tEND\n");
535}
536
537static void __attribute__((unused))
538print_task(struct task_graph *tg, int pid)
539{
540    for (int i=0; i<tg->pes_nr; i++) {
541        struct pid_entry *pe = tg->pes + i;
542        if (pe->pid == pid) {
543            print_pid_entry(pe, 0);
544        }
545    }
546}
547
548static void
549mk_replay_event_req(struct trace_entry *te, replay_eventrec_t *req)
550{
551    req->op    = te->op;
552    req->fd    = te->fd;
553    req->mode  = te->mode;
554    //req->fline = te->fline;
555    req->pid   = te->pid;
556
557    switch(te->op) {
558    case TOP_Open:
559    case TOP_Create:
560    case TOP_Unlink:
561        req->fnumsize = te->u.fnum;
562        break;
563
564    case TOP_Read:
565    case TOP_Write:
566    case TOP_Seek:
567        req->fnumsize = te->u.size;
568        break;
569
570    case TOP_Close:
571    case TOP_Exit:
572        break;
573
574    default:
575        assert(0);
576        break;
577    }
578
579    assert(req->pid != 0);
580}
581
582
583static void
584trace_bufs_init(struct task_graph *tg)
585{
586    for (int i=0; i < tg->pes_nr; i++) {
587        struct pid_entry *pe = tg->pes + i;
588
589        size_t size = pe->tes_size = sizeof(replay_eventrec_t)*pe->tentries_nr;
590        if (size >= BULK_TOTAL_SIZE) {
591            msg("size for pid:%d [%zd] larger than %d\n", pe->pid, size, BULK_TOTAL_SIZE);
592            assert(0);
593        }
594
595        assert(size <= BULK_TOTAL_SIZE);
596        pe->tes = malloc(size);
597        assert(pe->tes);
598
599        struct trace_entry *te = pe->trace_l.head;
600        for (int ti=0; ti < pe->tentries_nr; ti++) {
601            mk_replay_event_req(te, pe->tes + ti);
602            te = te->next;
603        }
604        assert(te == NULL); // make sure the count is right!
605    }
606}
607
608static void __attribute__((unused))
609print_all_tasks(struct task_graph *tg)
610{
611    for (int i=0; i<tg->pes_nr; i++) {
612        struct pid_entry *pe = tg->pes + i;
613        print_pid_entry(pe, 0);
614    }
615}
616
617/* functions to be implemented seperately by bfish/linux */
618static void slaves_connect(struct task_graph *tg);
619static void slave_push_work(struct slave *);
620static void slaves_finalize(void);
621static void slaves_print_stats(void);
622static void master_process_reqs(void);
623cycles_t tscperms;
624
625int main(int argc, char *argv[])
626{
627#ifndef __linux__
628    if(argc < 5) {
629        printf("Usage: %s tracefile nslaves mountdir mount-URL\n", argv[0]);
630        exit(EXIT_FAILURE);
631    }
632
633    assert(err_is_ok(sys_debug_get_tsc_per_ms(&tscperms)));
634    errval_t err = vfs_mkdir(argv[3]);
635    assert(err_is_ok(err));
636
637    printf("----------------------------------- VFS MOUNT\n");
638    err = vfs_mount(argv[3], argv[4]);
639    if(err_is_fail(err)) {
640        DEBUG_ERR(err, "vfs_mount");
641    }
642    printf("----------------------------------- VFS MOUNT DONE\n");
643    assert(err_is_ok(err));
644#else
645    if(argc < 3) {
646        printf("Usage: %s tracefile nslaves\n", argv[0]);
647        exit(EXIT_FAILURE);
648    }
649#endif
650
651    memset(&SlState, 0, sizeof(SlState));
652    //SlState.waitset = get_default_waitset();
653    //struct waitset ws;
654    //waitset_init(&ws);
655    //SlState.waitset = &ws;
656
657    char *tracefile = argv[1];
658    SlState.num_slaves = atoi(argv[2]);
659    printf("tracefile=%s\n", tracefile);
660
661    printf("reading dependency graph...\n");
662
663    // Parse trace file into memory records
664    struct trace_list tlist = {.head = NULL, .tail = NULL};
665    parse_tracefile(tracefile, &tlist);
666
667    // Build task graph. roots are nodes without dependencies
668    #ifndef __linux__
669    msg("[MASTER] My cpu is: %d\n", disp_get_core_id());
670    #endif
671    memset(&TG, 0, sizeof(TG));
672    build_taskgraph(&tlist, &TG);
673    //print_all_tasks(&TG);
674    //print_taskgraph(&TG);
675    //printf("TG entries:%d completed:%d stack_size:%d\n", TG.pes_nr, TG.pes_completed, TG.stack_nr);
676
677    msg("[MASTER] INITIALIZING BUFFERS...\n");
678    trace_bufs_init(&TG);
679
680    msg("[MASTER] CONNECTING TO SLAVES...\n");
681    slaves_connect(&TG);
682
683    msg("[MASTER] STARTING WORK...\n");
684    uint64_t start_ticks = rdtsc();
685    for (;;) {
686        /* enqueue work to the slaves */
687        for (int sid=0; sid < SlState.num_slaves; sid++) {
688            struct slave *sl = SlState.slaves + sid;
689            // try to assign a pid entry to a slave, if it doesn't hove one
690            if (sl->pe == NULL) {
691                sl->pe = tg_pop(&TG);
692                if (sl->pe == NULL) {
693                    continue; /* no more tasks in the stack */
694                }
695                dmsg("[MASTER] assigned pid:%d to sl:%d (stack_nr:%d completed:%d total:%d bytes:%zd)\n",
696                      sl->pe->pid, sid, TG.stack_nr, TG.pes_completed, TG.pes_nr, sl->pe->tes_size);
697                sl->pe->sl = sl;
698                slave_push_work(sl);
699            }
700            master_process_reqs();
701        }
702
703        if (TG.pes_completed == TG.pes_nr)
704            break;
705    }
706
707    uint64_t work_ticks = rdtsc() - start_ticks;
708    slaves_finalize();
709    uint64_t total_ticks = rdtsc() - start_ticks;
710    printf("[MASTER] replay done> cache:%s slaves:%d ticks:%" PRIu64
711           " (%lf ms) [total: %lfms]\n",
712            vfs_cache_str, SlState.num_slaves, work_ticks,
713            (double)work_ticks /(double)tscperms,
714            (double)total_ticks/(double)tscperms);
715    slaves_print_stats();
716    return 0;
717}
718
719#ifndef __linux__
720static void
721master_process_reqs(void)
722{
723    /* process slave requests */
724    for (;;){
725        errval_t ret;
726        ret = event_dispatch(get_default_waitset());
727        if (ret == LIB_ERR_NO_EVENT)
728            break;
729        assert(err_is_ok(ret));
730    }
731}
732
733static void
734slaves_finalize(void)
735{
736    int err;
737
738    /* notify slaves */
739    for (int sid=0; sid < SlState.num_slaves; sid++) {
740        struct slave *sl = SlState.slaves + sid;
741        do {
742            err = sl->b->tx_vtbl.slave_finish(sl->b, NOP_CONT);
743        } while (err_no(err) == FLOUNDER_ERR_TX_BUSY);
744        assert(err_is_ok(err));
745    }
746
747    /* wait for their replies */
748    do {
749        err = event_dispatch(get_default_waitset());
750        assert(err_is_ok(err));
751    } while (SlState.num_finished < SlState.num_slaves);
752}
753
754static void
755slaves_print_stats(void)
756{
757    int err;
758
759    /* have slaves print stats synchronously */
760    for (int sid=0; sid < SlState.num_slaves; sid++) {
761        struct slave *sl = SlState.slaves + sid;
762        print_stats_ok = false;
763        err = sl->b->tx_vtbl.slave_print_stats(sl->b, NOP_CONT);
764        assert(err_is_ok(err));
765        while (!print_stats_ok) {
766            err = event_dispatch(get_default_waitset());
767            assert(err_is_ok(err));
768        }
769    }
770}
771
772static void
773slave_push_work(struct slave *sl)
774{
775    int err;
776    struct bulk_buf *bb;
777    uint64_t bulk_id;
778
779    //dmsg("pushing work for slave: %ld (%p) pid:%d (completed:%d)\n", sl-slaves, sl, sl->pe->pid, sl->pe->completed);
780    bb = bulk_alloc(&sl->bt);
781    if (bb == NULL) {
782        return;
783    }
784    bulk_buf_copy(bb, sl->pe->tes, sl->pe->tes_size);
785    bulk_id = bulk_prepare_send(bb);
786    err = sl->b->tx_vtbl.new_task(sl->b, NOP_CONT, bulk_id, sl->pe->tes_size);
787    if (err == FLOUNDER_ERR_TX_BUSY) {
788        bulk_free(&sl->bt, bulk_id);
789        return;
790    }
791    assert(err_is_ok(err));
792}
793
794static void
795slaves_connect(struct task_graph *tg)
796{
797    char name[128];
798    iref_t iref;
799    int err;
800
801    for (int sid=0; sid < SlState.num_slaves; sid++) {
802        int r = snprintf(name, 128, "replay_slave.%u", sid + 1);
803        struct slave *sl = SlState.slaves + sid;
804        assert(r != -1);
805
806        err = nameservice_blocking_lookup(name, &iref);
807        if (err_is_fail(err)) {
808            DEBUG_ERR(err, "could not lookup IREF for replay slave");
809            abort();
810        }
811
812        /* bound to slave  */
813        bound = false;
814        err = replay_bind(iref, replay_bind_cont, NULL,
815                          get_default_waitset(),
816                          IDC_BIND_FLAGS_DEFAULT);
817        if(err_is_fail(err)) {
818            DEBUG_ERR(err, "replay_bind");
819        }
820        while(!bound) {
821            err = event_dispatch(get_default_waitset());
822            assert(err_is_ok(err));
823        }
824        msg("Bound to slave %d\n", sid);
825
826        /* initialize bulk transfer for slave */
827        init_ok = false;
828        err = bulk_create(BULK_TOTAL_SIZE, BULK_BLOCK_SIZE, &sl->frame, &sl->bt);
829        assert(err_is_ok(err));
830        err = sl->b->tx_vtbl.slave_init(sl->b, NOP_CONT, sl->frame, BULK_TOTAL_SIZE);
831        assert(err_is_ok(err));
832        while (!init_ok) {
833            err = event_dispatch(get_default_waitset());
834            assert(err_is_ok(err));
835        }
836
837        msg("Slave %d initialized\n", sid);
838    }
839}
840#endif
841
842#ifdef __linux__
843static void
844slave_push_work(struct slave *sl)
845{
846    ssize_t ret;
847
848    assert(sl->pe != NULL);
849    assert(sl->pe->tes_size > sl->sentbytes);
850
851    dmsg("TRYING TO SEND: %zd bytes for pid=%u\n", sl->pe->tes_size - sl->sentbytes, sl->pe->pid);
852    ret = send(sl->socket,
853               (char *)sl->pe->tes + sl->sentbytes,
854               sl->pe->tes_size - sl->sentbytes,
855               0); /* setting MSG_DONTWAIT seems to cause problems */
856    if (ret <= 0 && errno != EAGAIN) {
857        perror("send");
858        exit(1);
859    }
860    dmsg("SENT: %zd bytes for pid=%u\n", ret, sl->pe->pid);
861    sl->sentbytes += ret;
862}
863
864static void
865slaves_finalize(void)
866{
867    for (int i=0; i<SlState.num_slaves; i++) {
868        struct slave *sl = SlState.slaves + i;
869        close(sl->socket);
870    }
871}
872
873static void
874master_process_reqs(void)
875{
876    uint16_t rpid;
877    // read from available fds
878    for (int i=0; i<SlState.num_slaves; i++) {
879        struct slave *sl = SlState.slaves + i;
880
881        if (sl->pe == NULL) {
882            continue;
883        }
884
885        /* first check if we need to send more data to the slave
886         * (main loop will only call slave_push_work() once) */
887         if (sl->pe->tes_size > sl->sentbytes) {
888            slave_push_work(sl);
889         }
890
891        int r = recv(sl->socket, &rpid, sizeof(rpid), MSG_DONTWAIT);
892        if (r == -1) {
893            assert(errno == EWOULDBLOCK || errno == EAGAIN);
894            continue; /* no data here, move on */
895        }
896        /* slave is done with task */
897        assert(r == sizeof(rpid));
898        assert(rpid == sl->pe->pid);
899        tg_complete(&TG, sl->pe);
900        sl->pe = NULL;
901        sl->sentbytes = 0;
902    }
903}
904
905static void slaves_print_stats(void)
906{
907}
908
909/* connection info */
910
911static void
912slaves_connect(struct task_graph *tg)
913{
914    msg("connecting to slaves...\n");
915    for(int i = 0; i < SlState.num_slaves; i++) {
916        int ret;
917        struct slave *sl = &SlState.slaves[i];
918
919        sl->socket = socket(AF_INET, SOCK_STREAM, 0);
920        if (sl->socket == -1) {
921            perror("socket");
922            exit(1);
923        }
924
925        struct sockaddr_in a = {
926            .sin_family = PF_INET,
927            .sin_port = htons(0),
928            .sin_addr = {
929                .s_addr = htonl(INADDR_ANY)
930            }
931        };
932
933        ret = bind(sl->socket, (struct sockaddr *)&a, sizeof(a));
934        if (ret != 0) {
935            perror("bind");
936            exit(1);
937        }
938
939        int port = 1234;
940        char host[128];
941        snprintf(host, 128, "rck%02u", i + 1);
942
943        // FOR DEBUGGING!
944        snprintf(host, 128, "localhost");
945        printf("connecting to %s:%d ...\n", host, port);
946        port = 1234 + i;
947
948        struct hostent *h;
949        h = gethostbyname(host);
950        assert(h != NULL && h->h_length == sizeof(struct in_addr));
951
952        struct sockaddr_in sa = {
953            .sin_family = AF_INET,
954            .sin_port = htons(port),
955            .sin_addr = *(struct in_addr *)h->h_addr_list[0]
956        };
957
958        ret = connect(sl->socket, (struct sockaddr *)&sa, sizeof(sa));
959        if (ret < 0) {
960            perror("connect");
961            exit(1);
962        }
963
964        sl->sentbytes = 0;
965        #if 0 /* do a recv with MSG_DONTWAIT */
966        /* set non-blocking flag */
967        int sock_fl = fcntl(sl->socket, F_GETFD);
968        sock_fl |= O_NONBLOCK;
969        sock_fl = fcntl(sl->socket, F_SETFD, sock_fl);
970        assert(sock_fl & O_NONBLOCK);
971        #endif
972    }
973}
974#endif
975
976//
977//
978//    uint64_t tscperms;
979//#ifndef __linux__
980//    err = sys_debug_get_tsc_per_ms(&tscperms);
981//    assert(err_is_ok(err));
982//
983//#else
984//    tscperms = 533000;
985//
986//#endif
987//
988//    printf("starting replay\n");
989//
990//    /* for(struct trace_entry *te = trace; te != NULL; te = te->next) { */
991//    /*     static int cnt = 0; */
992//    /*     printf("%d: %d, %zu, %d, %d, %d, fline %d\n", */
993//    /*            cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, te->fline); */
994//    /*     cnt++; */
995//    /* } */
996//
997//    uint64_t start = rdtsc();
998//
999//    // Start trace replay
1000//    for(struct trace_entry *te = trace; te != NULL; te = te->next) {
1001//        // Distribute work to slaves -- either they are empty (PID ==
1002//        // 0) or they already execute for a PID, in which case we keep
1003//        // sending them that PID's work until the PID exits)
1004//
1005//        static int cnt = 0;
1006//
1007//        /* if(((cnt * 100) / linen) % 5 == 0) { */
1008//            /* printf("%d / %d\n", cnt, linen); */
1009//        /* } */
1010//        cnt++;
1011//
1012//        /* printall = false; */
1013//        /* if(cnt == 6186 || cnt == 5840) { */
1014//        /*     printall = true; */
1015//        /* } */
1016//
1017//        // If this is an exit, remove the PID and continue
1018//        if(te->op == TOP_Exit) {
1019//            int i;
1020//            /* printf("PIDs: "); */
1021//            for(i = 0; i < num_slaves; i++) {
1022//                /* printf("%u ", slaves[i].pid); */
1023//                for(int j = 0; j < MAX_PIDS; j++) {
1024//                    if(slaves[i].pid[j] == te->pid) {
1025//                        slaves[i].pid[j] = 0;
1026//                        goto outexit;
1027//                    }
1028//                }
1029//            }
1030//        outexit:
1031//            /* printf("\n"); */
1032//
1033//            if(i < num_slaves) {
1034//                continue;
1035//            } else {
1036//                printf("%d: exit on non-existant PID (%u), file line %d\n",
1037//                       cnt, te->pid, te->fline);
1038//                exit(EXIT_FAILURE);
1039//            }
1040//        }
1041//
1042//        if(printall) {
1043//            printf("find slave\n");
1044//        }
1045//
1046//    /* again: */
1047//        // Find a slave with the same PID
1048//        struct slave *emptyslave = NULL, *s = NULL;
1049//        int i;
1050//        for(i = 0; i < num_slaves; i++) {
1051//            s = &slaves[i];
1052//
1053//            /* if(s->pid == 0) { */
1054//            /*     /\* printf("slave %d is the empty slave\n", i); *\/ */
1055//            /*     emptyslave = s; */
1056//            /* } */
1057//
1058//            for(int j = 0; j < MAX_PIDS; j++) {
1059//                if(s->pid[j] == te->pid) {
1060//                    goto out;
1061//                }
1062//            }
1063//        }
1064//    out:
1065//
1066//        // Didn't find one, find an empty one
1067//        if(i == num_slaves) {
1068//            // No empty slave -- wait for something to happen and try again
1069//            if(emptyslave == NULL) {
1070//                // Pick one randomly
1071//                int randslave = rand() / (RAND_MAX / num_slaves);
1072//                assert(randslave < num_slaves);
1073//                s = &slaves[randslave];
1074//
1075//                /* printf("no empty slave\n"); */
1076//                /* err = event_dispatch(get_default_waitset()); */
1077//                /* assert(err_is_ok(err)); */
1078//                /* printf("past no empty slave\n"); */
1079//                /* goto again; */
1080//            } else {
1081//                s = emptyslave;
1082//            }
1083//        }
1084//
1085//        // Assign slave this PID
1086//        int j;
1087//        for(j = 0; j < MAX_PIDS; j++) {
1088//            if(s->pid[j] == 0 || s->pid[j] == te->pid) {
1089//                break;
1090//            }
1091//        }
1092//        assert(j < MAX_PIDS);
1093//        s->pid[j] = te->pid;
1094//
1095//        /* if(i == num_slaves) { */
1096//        /*     printf("found empty slave\n"); */
1097//        /* } else { */
1098//        /*     printf("found slave %d, PID %d\n", i, s->pid); */
1099//        /* } */
1100//
1101//        /* if(te->fline >= 41352 && te->fline <= 41365) { */
1102//        /*     printf("%d: %d, %zu, %d, %d, %d to slave %d, fline %d\n", */
1103//        /*            cnt, te->op, te->u.fnum, te->fd, te->mode, te->pid, i, te->fline); */
1104//        /* } */
1105//
1106//#if 1
1107//        if(te->op == TOP_Exit) {
1108//            printf("exit %u\n", te->pid);
1109//            // See if it was a writer and remove
1110//            for(struct writer *w = writers; w != NULL; w = w->next) {
1111//                assert(te != NULL);
1112//                assert(w != NULL);
1113//                if(w->pid == te->pid) {
1114//                    assert(w != NULL);
1115//                    if(w != writers) {
1116//                        assert(w != NULL);
1117//                        assert(w->prev != NULL);
1118//                        w->prev->next = w->next;
1119//                    } else {
1120//                        writers = w->next;
1121//                    }
1122//                    free(w);
1123//                    break;
1124//                }
1125//            }
1126//        }
1127//#endif
1128//
1129//        // If someone opens a file, we have to make sure
1130//        // that anyone else has stopped writing to that file.
1131//        if(te->op == TOP_Open || te->op == TOP_Create) {
1132//            /* for(;;) { */
1133//                if(printall) {
1134//                    printf("find writer\n");
1135//                }
1136//
1137//                struct writer *w;
1138//                for(w = writers; w != NULL; w = w->next) {
1139//                    assert(w != NULL);
1140//                    assert(te != NULL);
1141//                    if(w->fnum == te->u.fnum) {
1142//                        // Somebody's writing to this file -- wait for him to finish
1143//                        /* printf("Warning: Concurrent file writer, fline = %d, fnum = %zu\n", */
1144//                        /*        te->fline, te->u.fnum); */
1145//                        /* assert(!"NYI"); */
1146//                        break;
1147//                    }
1148//                }
1149//
1150//#if 0
1151//                // There's a writer -- wait for it to finish
1152//                if(w != NULL) {
1153//                    printf("Waiting for close from previous writer\n");
1154//                    err = event_dispatch(get_default_waitset());
1155//                    assert(err_is_ok(err));
1156//                } else {
1157//                    break;
1158//                }
1159//#endif
1160//            }
1161//
1162//            // Add a new writer to the list
1163//            if(te->mode != FLAGS_RdOnly) {
1164//                struct writer *w = malloc(sizeof(struct writer));
1165//
1166//                /* printf("new writer to file %zu\n", te->u.fnum); */
1167//
1168//                //                printall = true;
1169//
1170//                w->fnum = te->u.fnum;
1171//                w->pid = te->pid;
1172//                w->slave = s;
1173//                w->prev = NULL;
1174//                w->next = writers;
1175//                if(writers) {
1176//                    w->next->prev = w;
1177//                }
1178//                writers = w;
1179//            }
1180//    /* } */
1181//
1182//
1183//        if(printall) {
1184//            printf("sending\n");
1185//        }
1186//
1187//        assert(s != NULL);
1188//        if(s->queue == NULL) {
1189//#ifndef __linux__
1190// BARELLFISH -> send request
1191//#else
1192//            if(printall) {
1193//                printf("send_buf 1\n");
1194//            }
1195//            ssize_t r = send_buf(s, &er);
1196//            if(printall) {
1197//                printf("after send_buf 1\n");
1198//            }
1199//            /* ssize_t r = send(s->socket, &er, sizeof(er), MSG_DONTWAIT); */
1200//            if(r == -1) {
1201//                if(errno == EAGAIN) {
1202//                    if(printall) {
1203//                        printf("queueing\n");
1204//                    }
1205//                    /* printf("queueing\n"); */
1206//                    struct qelem *q = malloc(sizeof(struct qelem));
1207//                    assert(q != NULL);
1208//                    q->er = er;
1209//                    q->next = s->queue;
1210//                    if(s->queue != NULL) {
1211//                        s->queue->prev = q;
1212//                    } else {
1213//                        assert(s->qend == NULL);
1214//                    }
1215//                    q->prev = NULL;
1216//                    s->queue = q;
1217//                    if(s->qend == NULL) {
1218//                        s->qend = q;
1219//                    }
1220//                } else {
1221//                    printf("send_message to %d: %s\n", s->num, strerror(errno));
1222//                    abort();
1223//                }
1224//            } else {
1225//                if(r != sizeof(er)) {
1226//                    printf("send_message: r == %zd, size = %zu\n", r, sizeof(er));
1227//                }
1228//                assert(r == sizeof(er));
1229//            }
1230//#endif
1231//        } else {
1232//            // Put on slave's queue
1233//            if(printall) {
1234//                printf("queueing\n");
1235//            }
1236//            /* printf("queueing\n"); */
1237//            struct qelem *q = malloc(sizeof(struct qelem));
1238//            assert(q != NULL);
1239//            q->er = er;
1240//            q->next = s->queue;
1241//            if(s->queue != NULL) {
1242//                s->queue->prev = q;
1243//            } else {
1244//                assert(s->qend == NULL);
1245//            }
1246//            q->prev = NULL;
1247//            s->queue = q;
1248//            if(s->qend == NULL) {
1249//                s->qend = q;
1250//            }
1251//        }
1252//
1253//        if(printall) {
1254//            printf("resending\n");
1255//        }
1256//
1257//        // Resend items that got queued
1258//        for(i = 0; i < num_slaves; i++) {
1259//            s = &slaves[i];
1260//            for(struct qelem *q = s->qend; q != NULL;) {
1261//                // Need to keep pumping and dispatch at least one event
1262//
1263//#ifndef __linux__
1264//                err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er);
1265//                if(err_is_ok(err)) {
1266//                    if(printall) {
1267//                        printf("resent %d\n", q->er.fline);
1268//                    }
1269//                    struct qelem *oldq = q;
1270//                    s->qend = q = q->prev;
1271//                    free(oldq);
1272//                    if(s->qend == NULL) {
1273//                        s->queue = NULL;
1274//                    }
1275//                } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) {
1276//                    DEBUG_ERR(err, "error");
1277//                    abort();
1278//                } else {
1279//                    // still busy, can't dequeue anything
1280//                    /* printf("busy2\n"); */
1281//                    err = event_dispatch(get_default_waitset());
1282//                    assert(err_is_ok(err));
1283//                    break;
1284//                    /* printf("still busy\n"); */
1285//                    /* qend = q = q->prev; */
1286//                }
1287//#else
1288//                if(printall) {
1289//                    printf("send_buf 2\n");
1290//                }
1291//                ssize_t r = send_buf(s, &q->er);
1292//                if(printall) {
1293//                    printf("after send_buf 2\n");
1294//                }
1295//                /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */
1296//                if(r == -1) {
1297//                    if(errno == EAGAIN) {
1298//                        break;
1299//                    } else {
1300//                        printf("send_message to %d: %s\n", s->num, strerror(errno));
1301//                        abort();
1302//                    }
1303//                } else {
1304//                    if(r != sizeof(er)) {
1305//                        printf("send_message: r == %zd, size = %zu\n", r, sizeof(er));
1306//                    }
1307//                    assert(r == sizeof(er));
1308//                    struct qelem *oldq = q;
1309//                    s->qend = q = q->prev;
1310//                    free(oldq);
1311//                    if(s->qend == NULL) {
1312//                        s->queue = NULL;
1313//                    }
1314//                }
1315//#endif
1316//            }
1317//        }
1318//    }
1319//
1320//    printf("draining\n");
1321//
1322//    // Drain the queue
1323//    for(int i = 0; i < num_slaves; i++) {
1324//        struct slave *s = &slaves[i];
1325//        for(struct qelem *q = s->qend; q != NULL;) {
1326//#ifndef __linux__
1327//            err = s->b->tx_vtbl.event(s->b, NOP_CONT, q->er);
1328//            if(err_is_ok(err)) {
1329//                /* printf("resent %d\n", q->er.fline); */
1330//                struct qelem *oldq = q;
1331//                s->qend = q = q->prev;
1332//                free(oldq);
1333//                if(s->qend == NULL) {
1334//                    s->queue = NULL;
1335//                }
1336//            } else if(err_no(err) != FLOUNDER_ERR_TX_BUSY) {
1337//                DEBUG_ERR(err, "error");
1338//                abort();
1339//            } else {
1340//                // still busy, can't dequeue anything
1341//                break;
1342//                /* printf("still busy\n"); */
1343//                /* qend = q = q->prev; */
1344//            }
1345//#else
1346//            ssize_t r = send_buf(s, &q->er);
1347//            /* ssize_t r = send(s->socket, &q->er, sizeof(q->er), MSG_DONTWAIT); */
1348//            if(r == -1) {
1349//                if(errno == EAGAIN) {
1350//                    break;
1351//                } else {
1352//                    printf("send_message to %d: %s\n", s->num, strerror(errno));
1353//                    abort();
1354//                }
1355//            } else {
1356//                if(r != sizeof(q->er)) {
1357//                    printf("send_message: r == %zd, size = %zu\n", r, sizeof(q->er));
1358//                }
1359//                assert(r == sizeof(q->er));
1360//                struct qelem *oldq = q;
1361//                s->qend = q = q->prev;
1362//                free(oldq);
1363//                if(s->qend == NULL) {
1364//                    s->queue = NULL;
1365//                }
1366//            }
1367//#endif
1368//        }
1369//    }
1370//
1371//    for(int i = 0; i < num_slaves; i++) {
1372//        struct slave *s = &slaves[i];
1373//        replay_eventrec_t er = {
1374//            .op = TOP_End
1375//        };
1376//#ifndef __linux__
1377//        err = s->b->tx_vtbl.event(s->b, NOP_CONT, er);
1378//        assert(err_is_ok(err));
1379//#else
1380//        ssize_t r = send_buf(s, &er);
1381//        if(r == -1) {
1382//            if(errno == EAGAIN) {
1383//                printf("buffer full\n");
1384//                abort();
1385//            } else {
1386//                printf("send_message to %d: %s\n", s->num, strerror(errno));
1387//                abort();
1388//            }
1389//        }
1390//#endif
1391//    }
1392//
1393//    do {
1394//        err = event_dispatch(get_default_waitset());
1395//        assert(err_is_ok(err));
1396//    } while(num_finished < num_slaves);
1397//
1398//    uint64_t end = rdtsc();
1399//
1400//#if 0
1401//    // Wait for 5 seconds
1402//    uint64_t beg = rdtsc();
1403//    while(rdtsc() - beg < tscperms * 5000) {
1404//#ifndef __linux__
1405//        thread_yield();
1406//#else
1407//        sched_yield();
1408//#endif
1409//    }
1410//#endif
1411//
1412//    printf("replay done, took %" PRIu64" ms\n", (end - start) / tscperms);
1413//
1414//
1415