1/**
2 * \file
3 * \brief Distributed (percore) memory server
4 */
5
6/*
7 * Copyright (c) 2007-2011, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <stdlib.h>
16#include <stdio.h>
17#include <string.h>
18#include <getopt.h>
19
20#include <inttypes.h>
21#include <barrelfish/barrelfish.h>
22#include <barrelfish/dispatch.h>
23#include <mm/mm.h>
24#include <trace/trace.h>
25#include <trace_definitions/trace_defs.h>
26#include <barrelfish/morecore.h>
27#include <barrelfish/monitor_client.h>
28#include <barrelfish/spawn_client.h>
29#include <skb/skb.h>
30#include <dist/barrier.h>
31
32#include <if/mem_defs.h>
33#include <if/monitor_defs.h>
34#include <if/spawn_defs.h>
35
36#include "skb.h"
37#include "args.h"
38
39// #include "barrier.h"
40
41#include "mem_serv.h"
42#include "steal.h"
43
44/*
45 * TODO:
46 * - currently requests too much memory from initial mem_serv and other
47 *   (non_dist) mem_serv clients may suffer
48 */
49
50/// Globally track the total memory available
51memsize_t mem_total = 0;
52/// Globally track the actual memory available to allocate
53memsize_t mem_avail = 0;
54/// Globally track the local reserve memory available to allocate
55memsize_t mem_local = 0;
56
57/// MM per-core allocator instance data: B-tree to manage mem regions
58struct mm mm_percore;
59// static storage for MM allocator to get it started
60static char percore_nodebuf[SLAB_STATIC_SIZE(MINSPARENODES,
61                                             MM_NODE_SIZE(MAXCHILDBITS))];
62
63/// MM allocator for reserve of emergency memory used within the mem_serv only
64struct mm mm_local;
65// static storage for MM allocator to get it started
66static char local_nodebuf[SLAB_STATIC_SIZE(MINSPARENODES,
67                                           MM_NODE_SIZE(MAXCHILDBITS))];
68
69/// simple slot allocator used by MM
70static struct slot_prealloc percore_slot_alloc;
71
72static struct mm *mm_slots = &mm_percore;
73
74#if 0
75static void dump_ram_region(int index, struct mem_region* m)
76{
77
78    uintptr_t start, limit;
79
80    start = (uintptr_t)m->mr_base;
81    limit = start + (1UL << m->mr_bits);
82
83    char prefix = ' ';
84    size_t quantity = 1UL << m->mr_bits;
85
86    if (m->mr_bits >= 30) {
87        prefix = 'G';
88        quantity >>= 30;
89    }
90    else if (m->mr_bits >= 20) {
91        prefix = 'M';
92        quantity >>= 20;
93    }
94    else if (m->mr_bits >= 10) {
95        prefix = 'K';
96        quantity >>= 10;
97    }
98
99    printf("RAM region %d: 0x%" PRIxPTR
100           " - 0x%" PRIxPTR " (%lu %cB, %u bits)\n",
101           index, start, limit, quantity, prefix, m->mr_bits);
102}
103#endif // 0
104
105errval_t slab_refill(struct slab_allocator *slabs)
106{
107    errval_t err;
108
109    // refill slab allocator if needed
110    while (slab_freecount(slabs) <= MINSPARENODES) {
111        // debug_printf("running low on free slabs: slabs=%ld\n",
112        //             slab_freecount(&mm_percore.slabs));
113        struct capref frame;
114        err = slot_alloc(&frame);
115        if (err_is_fail(err)) {
116            return err_push(err, LIB_ERR_SLOT_ALLOC);
117        }
118        err = frame_create(frame, BASE_PAGE_SIZE * 8, NULL);
119        if (err_is_fail(err)) {
120            slot_free(frame);
121            return err_push(err, LIB_ERR_FRAME_CREATE);
122        }
123        void *buf;
124        err = vspace_map_one_frame(&buf, BASE_PAGE_SIZE * 8, frame,
125                                   NULL, NULL);
126        if (err_is_fail(err)) {
127            cap_destroy(frame);
128            return err_push(err, LIB_ERR_VSPACE_MAP);
129        }
130        slab_grow(slabs, buf, BASE_PAGE_SIZE * 8);
131    }
132
133    return SYS_ERR_OK;
134}
135
136static errval_t do_free(struct mm *mm, struct capref ramcap,
137                        genpaddr_t base, uint8_t bits,
138                        memsize_t *mem_available)
139{
140    errval_t ret;
141    memsize_t mem_to_add;
142
143    mem_to_add = (memsize_t)1 << bits;
144
145    ret = mm_free(mm, ramcap, base, bits);
146    if (err_is_fail(ret)) {
147        if (err_no(ret) == MM_ERR_NOT_FOUND) {
148            // memory wasn't there initially, add it
149            ret = mm_add(mm, ramcap, bits, base);
150            if (err_is_fail(ret)) {
151                return err_push(ret, MM_ERR_MM_ADD);
152            }
153            mem_total += mem_to_add;
154        } else {
155            return err_push(ret, MM_ERR_MM_FREE);
156        }
157    }
158
159    *mem_available += mem_to_add;
160
161    return SYS_ERR_OK;
162}
163
164static errval_t percore_free(struct capref ramcap)
165{
166    struct capability info;
167    errval_t ret;
168
169    ret = debug_cap_identify(ramcap, &info);
170    if (err_is_fail(ret)) {
171        return err_push(ret, MON_ERR_CAP_IDENTIFY);
172    }
173
174    if (info.type != ObjType_RAM) {
175        return SYS_ERR_INVALID_SOURCE_TYPE;
176    }
177
178#if 0
179    printf("%d: Cap is type %d Ram base 0x%"PRIxGENPADDR
180           " (%"PRIuGENPADDR") Bits %d\n", disp_get_core_id(),
181           info.type, info.u.ram.base, info.u.ram.base,
182           info.u.ram.bits);
183#endif
184
185    return do_free(&mm_percore, ramcap, info.u.ram.base,
186                   log2ceil(info.u.ram.bytes), &mem_avail);
187}
188
189errval_t percore_free_handler_common(struct capref ramcap, genpaddr_t base,
190                                     uint8_t bits)
191{
192    return do_free(&mm_percore, ramcap, base, bits, &mem_avail);
193}
194
195memsize_t mem_available_handler_common(void)
196{
197    return mem_avail;
198}
199
200
201static errval_t do_alloc(struct mm *mm, struct capref *ret, uint8_t bits,
202                         genpaddr_t minbase, genpaddr_t maxlimit,
203                         memsize_t *mem_available)
204{
205    errval_t err;
206
207    assert(bits >= MINSIZEBITS);
208
209    if (((memsize_t)1 << bits) > *mem_available) {
210        return MM_ERR_NOT_FOUND;
211    }
212
213    if(maxlimit == 0) {
214        err = mm_alloc(mm, bits, ret, NULL);
215    } else {
216        err = mm_alloc_range(mm, bits, minbase, maxlimit, ret, NULL);
217    }
218
219    if (err_is_ok(err)) {
220        *mem_available -= (memsize_t)1 << bits;
221    }
222
223    return err;
224}
225
226
227errval_t percore_alloc(struct capref *ret, uint8_t bits,
228                              genpaddr_t minbase, genpaddr_t maxlimit)
229{
230    return do_alloc(&mm_percore, ret, bits, minbase, maxlimit, &mem_avail);
231}
232
233
234static errval_t local_alloc(struct capref *ret, uint8_t bits,
235                            genpaddr_t minbase, genpaddr_t maxlimit)
236{
237    errval_t err;
238
239    // first try the general percore memory
240    err = percore_alloc(ret, bits, minbase, maxlimit);
241
242    // then try the local reserve
243    if (err_is_fail(err)) {
244        err = do_alloc(&mm_local, ret, bits, minbase, maxlimit, &mem_local);
245    }
246
247    return err;
248}
249
250static errval_t get_more_ram(uint8_t bits, genpaddr_t minbase,
251                             genpaddr_t maxlimit)
252{
253    errval_t err;
254    struct capref cap;
255
256    // try to steal a RAM cap
257    try_steal(&err, &cap, bits, minbase, maxlimit);
258    if (err_is_fail(err)) {
259        // try to get a local reserve RAM cap
260        err = local_alloc(&cap, bits, minbase, maxlimit);
261        if (err_is_fail(err)) {
262            return err;
263        }
264    }
265    // make the cap available for a subsequent alloc
266    percore_free(cap);
267
268    return SYS_ERR_OK;
269}
270
271static errval_t do_slot_prealloc_refill(struct slot_prealloc *slot_alloc_inst)
272{
273    errval_t err;
274
275    assert(slot_alloc_inst != NULL);
276
277    err = slot_prealloc_refill(slot_alloc_inst);
278    if (err_is_fail(err)) {
279        err = get_more_ram(L2_CNODE_BITS, 0,0);
280        if (err_is_fail(err)) {
281            // debug_printf("get_more_ram failed\n");
282        }
283        err = slot_prealloc_refill(slot_alloc_inst);
284        if (err_is_ok(err)) {
285            // debug_printf("second refill succeeded\n");
286        }
287    }
288    return err;
289}
290
291errval_t percore_allocate_handler_common(uint8_t bits,
292                                         genpaddr_t minbase,
293                                         genpaddr_t maxlimit,
294                                         struct capref *retcap)
295{
296    struct capref cap;
297    errval_t err, ret;
298
299    // debug_printf("percore alloc request: bits: %d\n", bits);
300
301    trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_ALLOC, bits);
302
303    // refill slot allocator if needed
304    err = do_slot_prealloc_refill(mm_slots->slot_alloc_inst);
305    if (err_is_fail(err)) {
306        DEBUG_ERR(err, "Warning: failure in slot_prealloc_refill");
307    }
308
309    // refill slab allocators if needed
310    err = slab_refill(&mm_percore.slabs);
311    if (err_is_fail(err)) {
312        DEBUG_ERR(err, "Warning: failure when refilling mm_percore slab");
313    }
314
315    err = slab_refill(&mm_local.slabs);
316    if (err_is_fail(err)) {
317        DEBUG_ERR(err, "Warning: failure when refilling mm_local slab");
318    }
319
320    // do the actual allocation
321    ret = percore_alloc(&cap, bits, minbase, maxlimit);
322
323    if (err_is_fail(ret)) {
324        // debug_printf("percore_alloc(%d (%lu)) failed\n", bits, 1UL << bits);
325		printf("[%d][%"PRIuDOMAINID"] percore_alloc failed, going to steal\n",
326					disp_get_core_id(), disp_get_domain_id());
327        try_steal(&ret, &cap, bits, minbase, maxlimit);
328    }
329
330    trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_ALLOC_COMPLETE, bits);
331
332    *retcap = cap;
333    return ret;
334}
335
336
337// this is a candidate for smarter calculation. possibly by the skb
338static memsize_t get_percore_size(int num_cores)
339{
340#ifdef MEMSERV_PERCORE_DYNAMIC
341    errval_t err;
342    memsize_t all_mem_avail, mem_percore, tot_mem;
343
344    // send message to mem_serv
345    struct mem_binding *b = get_mem_client();
346    err = b->rpc_tx_vtbl.available(b, &all_mem_avail, &tot_mem);
347
348    if (err_is_fail(err)) {
349        DEBUG_ERR(err, "Warning: failure in call to mem_serv.available");
350        // default to predetermined amount of memory per core
351        return PERCORE_MEM;
352    }
353
354    debug_printf("available memory: %"PRIuMEMSIZE" bytes over %d cores\n",
355                  all_mem_avail, num_cores);
356
357    mem_percore = all_mem_avail / num_cores;
358
359    debug_printf("available memory per core: %"PRIuMEMSIZE" bytes\n",
360                  mem_percore);
361
362    return mem_percore;
363#else
364    // Use predetermined amount of memory per core
365
366    debug_printf("available memory per core: %"PRIuMEMSIZE" bytes\n",
367                 PERCORE_MEM);
368
369    return PERCORE_MEM;
370#endif
371}
372
373#ifdef MEMSERV_AFFINITY
374static void set_affinity(coreid_t core)
375{
376    // get core affinity range and set it as the default for ram_alloc
377    errval_t err;
378    genpaddr_t base;
379    genpaddr_t limit;
380    err = get_percore_affinity(core, &base, &limit);
381    if (err_is_fail(err)) {
382        DEBUG_ERR(err, "Warning: failure in get_percore_affinity");
383        base = 0;
384        limit = 0;
385    }
386    ram_set_affinity(base, limit);
387
388    debug_printf("affinity range is base: %"PRIuGENPADDR", limit: %"
389                 PRIuGENPADDR"\n", base, limit);
390}
391#endif
392
393
394static memsize_t fill_mm(struct mm *mm, memsize_t mem_requested, uint8_t bits,
395                      memsize_t *mem_tot)
396{
397    errval_t err;
398
399    memsize_t mem_added = 0;
400    memsize_t mem_to_add = 0;
401    struct capref ramcap;
402    struct capability info;
403
404    // get as much of the requested memory as we can, requesting ever
405    // smaller RAM caps until we hit the smallest RAM cap size
406
407    while (bits >= MINALLOCBITS) {
408
409        // debug_printf("adding memory %"PRIuMEMSIZE" (%d bits)\n",
410        //             (memsize_t)1<<bits, bits);
411
412        err = ram_alloc(&ramcap, bits);
413        if (err_is_fail(err)) {
414            // skip this size and try the next size down
415            bits--;
416            continue;
417        }
418
419        // XXX: Hack until we have cross-core cap management
420        // Forget about remote relations of this cap. This will ensure
421        // that the monitor will hand it back to us in case anyone on
422        // this core deletes it.
423        err = monitor_cap_set_remote(ramcap, false);
424        if(err_is_fail(err)) {
425            DEBUG_ERR(err, "Warning: failed to set cap non-remote. Trying next one.");
426            continue;
427        }
428
429        err = debug_cap_identify(ramcap, &info);
430        if (err_is_fail(err)) {
431            DEBUG_ERR(err, "Warning: failed to identify cap. Trying next one.");
432            continue;
433        }
434#if 0
435        debug_printf("Cap is type %d Ram base 0x%"PRIxGENPADDR
436                     " (%"PRIuGENPADDR") Bits %d\n",
437                     info.type, info.u.ram.base, info.u.ram.base,
438                     info.u.ram.bits);
439#endif
440        assert(bits == log2ceil(info.u.ram.bytes));
441
442        mem_to_add = (memsize_t)1 << bits;
443
444        *mem_tot += mem_to_add;
445
446        err = mm_add(mm, ramcap, bits, info.u.ram.base);
447        if (err_is_ok(err)) {
448            mem_added += mem_to_add;
449
450            mem_requested -= mem_to_add;
451            uint8_t new_bits = log2floor(mem_requested);
452            bits = MIN(bits, new_bits);
453        } else {
454            DEBUG_ERR(err, "Warning: adding RAM region (%p/0x%"PRIxGENSIZE") FAILED",
455                      info.u.ram.base, info.u.ram.bytes);
456        }
457    }
458
459    return mem_added;
460}
461
462
463static errval_t init_mm(struct mm *mm, char nodebuf[], memsize_t nodebuf_size,
464                        struct slot_prealloc *slot_alloc_inst,
465                        memsize_t *mem_added, memsize_t *mem_tot)
466{
467    errval_t err;
468
469    struct capref ramcap;
470    struct capability info;
471
472    /* XXX Base shouldn't need to be 0 ? */
473    err = mm_init(mm, ObjType_RAM,
474                  0, MAXSIZEBITS, MAXCHILDBITS, NULL,
475                  slot_alloc_prealloc, NULL, slot_alloc_inst, true);
476    if (err_is_fail(err)) {
477        return err_push(err, MM_ERR_MM_INIT);
478    }
479
480    slab_grow(&mm->slabs, nodebuf, nodebuf_size);
481
482    // Need to bootstrap with a small cap first!
483    err = ram_alloc(&ramcap, SMALLCAP_BITS);
484    if (err_is_fail(err)) {
485        DEBUG_ERR(err, "failed to get small RAM from mem_serv");
486        return err_push(err, LIB_ERR_RAM_ALLOC);
487    }
488
489    err = debug_cap_identify(ramcap, &info);
490    if (err_is_fail(err)) {
491        percore_free(ramcap);
492        return err_push(err, MON_ERR_CAP_IDENTIFY);
493    }
494
495#if 0
496    printf("Cap is type %d Ram base 0x%"PRIxGENPADDR" Bits %d\n",
497           info.type, info.u.ram.base, info.u.ram.bits);
498#endif
499    assert(SMALLCAP_BITS == log2ceil(info.u.ram.bytes));
500
501    *mem_tot += (memsize_t)1<<SMALLCAP_BITS;
502
503    err = mm_add(mm, ramcap, SMALLCAP_BITS, info.u.ram.base);
504    if (err_is_ok(err)) {
505        *mem_added += (memsize_t)1<<SMALLCAP_BITS;
506    } else {
507        percore_free(ramcap);
508        return err_push(err, MM_ERR_MM_ADD);
509    }
510
511    // try to refill slot allocator (may fail or do nothing)
512    slot_prealloc_refill(mm->slot_alloc_inst);
513
514    return SYS_ERR_OK;
515}
516
517static errval_t init_slot_allocator(struct slot_prealloc *slot_alloc_inst,
518                                struct mm *mm)
519{
520    errval_t err;
521
522    // Use ROOTCN_SLOT_SLOT_ALLOC0 as initial cnode for mm slot allocator
523    struct capref cnode_start_cap = {
524        .cnode = {
525            .croot = CPTR_ROOTCN,
526            .cnode = ROOTCN_SLOT_ADDR(ROOTCN_SLOT_SLOT_ALLOC0),
527            .level = CNODE_TYPE_OTHER,
528        },
529        .slot = 0,
530    };
531
532    // init slot allocator
533    err = slot_prealloc_init(slot_alloc_inst, MAXCHILDBITS,
534                             cnode_start_cap, L2_CNODE_SLOTS, mm);
535    if (err_is_fail(err)) {
536        return err_push(err, MM_ERR_SLOT_ALLOC_INIT);
537    }
538
539    return SYS_ERR_OK;
540}
541
542errval_t initialize_percore_mem_serv(coreid_t core, coreid_t *cores,
543                                     int len_cores, memsize_t percore_mem)
544{
545    errval_t err;
546
547    mem_avail = 0;
548    mem_total = 0;
549
550    trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_INIT, 0);
551
552    err = init_slot_allocator(&percore_slot_alloc, mm_slots);
553    if (err_is_fail(err)) {
554        return err_push(err, MM_ERR_SLOT_ALLOC_INIT);
555    }
556
557    err = init_mm(&mm_percore, percore_nodebuf, sizeof(percore_nodebuf),
558                  &percore_slot_alloc, &mem_avail, &mem_total);
559    if (err_is_fail(err)) {
560        return err;
561    }
562    err = init_mm(&mm_local, local_nodebuf, sizeof(local_nodebuf),
563                  &percore_slot_alloc, &mem_local, &mem_total);
564    if (err_is_fail(err)) {
565        return err;
566    }
567
568#ifdef MEMSERV_AFFINITY
569    set_affinity(core);
570#endif
571
572    // determine how much memory we need to get to fill up the percore mm
573    percore_mem -= mem_total; // memory we've already taken
574    percore_mem -= LOCAL_MEM; // memory we'll take for mm_local
575
576    uint8_t percore_bits = log2floor(percore_mem);
577    if (percore_bits > MAXSIZEBITS) {
578        percore_bits = MAXSIZEBITS;
579    }
580    // debug_printf("memory to use: %"PRIuMEMSIZE"\n", percore_mem);
581
582    mem_local += fill_mm(&mm_local, LOCAL_MEM, LOCAL_MEMBITS, &mem_total);
583
584    mem_avail += fill_mm(&mm_percore, percore_mem, percore_bits, &mem_total);
585
586    // from now on we don't care where memory comes from anymore
587    ram_set_affinity(0,0);
588    // also use our own memory, rather than the remote central mem_serv
589    ram_alloc_set(local_alloc);
590
591    // try to refill slot allocator (may fail or do nothing)
592    // TODO: is this necessary?
593    slot_prealloc_refill(mm_slots->slot_alloc_inst);
594
595    // refill slab allocator if needed and possible
596    if (slab_freecount(&mm_percore.slabs) <= MINSPARENODES
597        && mem_avail > (1UL << (CNODE_BITS + OBJBITS_CTE)) * 2
598        + 10 * BASE_PAGE_SIZE) {
599        slab_default_refill(&mm_percore.slabs); // may fail
600    }
601
602    if (slab_freecount(&mm_local.slabs) <= MINSPARENODES
603        && mem_avail > (1UL << (CNODE_BITS + OBJBITS_CTE)) * 2
604        + 10 * BASE_PAGE_SIZE) {
605        slab_default_refill(&mm_local.slabs); // may fail
606    }
607
608    // try to refill slot allocator - now it shouldn't fail!
609    err = slot_prealloc_refill(mm_slots->slot_alloc_inst);
610    if (err_is_fail(err)) {
611        DEBUG_ERR(err, "Fatal internal error in RAM allocator: "
612                  "failed to init slot allocator");
613        return err;
614    }
615
616    // init peer data structures so we know who to contact to steal memory
617    err = init_peers(core, len_cores, cores);
618    if (err_is_fail(err)) {
619        return err_push(err, MS_ERR_INIT_PEERS);
620    }
621
622    // done.
623    debug_printf("Percore RAM allocator initialised, %"PRIuMEMSIZE
624                 " MB (of %"PRIuMEMSIZE" MB) available\n",
625                 mem_avail / 1024 / 1024, mem_total / 1024 / 1024);
626
627
628    trace_event(TRACE_SUBSYS_MEMSERV, TRACE_EVENT_MEMSERV_PERCORE_INIT, 9);
629
630    return SYS_ERR_OK;
631}
632
633/**
634 * \brief Request a spawnd to reconnect to a local memserv
635 *
636 * \param coreid The core that the spawnd is running on
637 */
638errval_t set_local_spawnd_memserv(coreid_t coreid)
639{
640    struct spawn_binding *cl;
641    errval_t err = spawn_binding(coreid, &cl);
642    if (err_is_fail(err)) {
643        return err;
644    }
645
646    return cl->rpc_tx_vtbl.use_local_memserv(cl);
647}
648
649
650static int run_worker(coreid_t core, struct args *args)
651{
652    assert(args != NULL);
653
654    // debug_printf("Distributed mem_serv. percore server on core %d\n", core);
655
656    // this should never return
657    percore_mem_serv(core, args->cores, args->cores_len, args->ram);
658    return EXIT_FAILURE; // so we should never reach here
659}
660
661
662static int run_master(coreid_t core, struct args *args)
663{
664    assert(args != NULL);
665
666    errval_t err;
667
668    debug_printf("Distributed mem_serv. master on core %d\n", core);
669
670    memsize_t percore_mem;
671    if (args->ram > 0) {
672        percore_mem = args->ram;
673    } else {
674        percore_mem = get_percore_size(args->cores_len);
675    }
676
677    // debug_printf("spawning on %d cores\n", args->cores_len);
678
679    // set up args for the spawn
680    // -w
681    // -c <core list>
682    // -r <percore_mem>
683    char *new_argv[7];
684    new_argv[0] = args->path;
685    new_argv[1] = "-w";
686    new_argv[2] = "-c";
687    new_argv[3] = list_to_string(args->cores, args->cores_len);
688    assert(new_argv[3] != NULL);
689    if (new_argv[3] == NULL) {
690        DEBUG_ERR(LIB_ERR_MALLOC_FAIL, "out of memory");
691        return EXIT_FAILURE;
692    }
693    new_argv[4] = "-r";
694    new_argv[5] = malloc(20); // enough to fit a 64 bit number
695    assert(new_argv[5] != NULL);
696    if (new_argv[5] == NULL) {
697        DEBUG_ERR(LIB_ERR_MALLOC_FAIL, "out of memory");
698        return EXIT_FAILURE;
699    }
700    sprintf(new_argv[5], "%"PRIuMEMSIZE, percore_mem);
701    new_argv[6] = NULL;
702
703    for (int i = 0; i < args->cores_len; i++) {
704        err = spawn_program(args->cores[i], new_argv[0], new_argv,
705                            NULL, 0, NULL);
706        if (err_is_fail(err)) {
707            DEBUG_ERR(err, "spawning percore mem_serv on core %d", i);
708            return EXIT_FAILURE;
709        }
710    }
711
712    // wait for all the spawned mem_servs to start up
713    // err = ns_barrier_master_l(args->cores, args->cores_len, MEMSERV_DIST);
714    err = nsb_master_l(args->cores, args->cores_len, MEMSERV_DIST);
715    if (err_is_fail(err)) {
716        DEBUG_ERR(err, "barrier_master failed");
717        return EXIT_FAILURE;
718    }
719
720    return EXIT_SUCCESS;
721}
722
723int common_main(int argc, char ** argv)
724{
725    coreid_t core = disp_get_core_id();
726
727    struct args my_args;
728    my_args = process_args(argc, argv);
729
730    if (my_args.master) {
731        return run_master(core, &my_args);
732    } else {
733        return run_worker(core, &my_args);
734    }
735
736    return EXIT_SUCCESS;
737}
738