1/*
2 * Copyright (c) 2010, 2011, 2012, ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, CAB F.78, Universitaetstrasse 6, CH-8092 Zurich,
8 * Attn: Systems Group.
9 */
10
11#include <barrelfish/barrelfish.h>
12#include <barrelfish/spawn_client.h>
13
14#include <flounder/flounder_txqueue.h>
15#include <spawndomain/spawndomain.h>
16
17#include <xeon_phi/xeon_phi.h>
18#include <xeon_phi/xeon_phi_client.h>
19#include <xeon_phi/xeon_phi_domain.h>
20
21#include <bomp_internal.h>
22#include <xomp/xomp.h>
23
24#include <if/xomp_defs.h>
25
26#include <xomp_debug.h>
27
28/// enables the virtual threads
29#define XOMP_VTHREADS (XOMP_VTHREAD_COUNT + 1)
30
31/**
32 * \brief worker state enumeration.
33 *
34 * Describes the possible states a worker can be in.
35 */
36typedef enum xomp_worker_state
37{
38    XOMP_WORKER_ST_INVALID  = 0,    ///< this worker has not been initialized
39    XOMP_WORKER_ST_FAILURE  = 1,    ///< an error occurred during an operation
40    XOMP_WORKER_ST_SPAWNING = 2,    ///< worker is being spawned
41    XOMP_WORKER_ST_SPAWNED  = 3,    ///< worker is spawned and connected to master
42    XOMP_WORKER_ST_READY    = 4,    ///< worker is ready to service requests
43    XOMP_WORKER_ST_BUSY     = 5     ///< worker is busy servicing requests
44} xomp_worker_st_t;
45
46/**
47 * \brief worker type enumeration
48 *
49 * Describes the possible worker types i.e. where the domain is running on
50 */
51typedef enum xomp_worker_type
52{
53    XOMP_WORKER_TYPE_INVALID = 0,  ///< invalid worker type (not initialized)
54    XOMP_WORKER_TYPE_LOCAL   = 1,  ///< worker runs local to master
55    XOMP_WORKER_TYPE_REMOTE  = 2   ///< worker runs remote to master
56} xomp_worker_type_t;
57
58/**
59 * \brief XOMP worker
60 */
61struct xomp_worker
62{
63    xomp_wid_t id;                  ///< worker ID
64    xomp_worker_type_t type;        ///< worker type
65    xomp_worker_st_t state;         ///< worker state
66    struct capref domain;           ///< domain cap of the worker
67    xphi_dom_id_t domainid;         ///< domain ID of the worker
68
69    struct xomp_binding *binding;   ///< Control channel binding
70    struct tx_queue txq;            ///< Flounder TX queue
71
72    errval_t err;                   ///< error number in case an error occurred
73    uint8_t add_mem_st;             ///< state flag when we adding a frame
74
75    struct capref msgframe;         ///< messaging frame + tls for the worker
76    lpaddr_t msgbase;               ///< physical base of the messaging frame
77    void *msgbuf;                   ///< where the messaging frame is mapped
78
79    void *tls;                      ///< pointer to the thread local storage
80
81#if XOMP_BENCH_ENABLED
82    cycles_t start;                 ///< start time of the operation
83    uint32_t index;
84#endif
85};
86
87/**
88 * \brief XOMP master
89 */
90struct xomp_master
91{
92    uint32_t numworker;                 ///< total number of worker spawned
93    struct {
94        uint32_t num;                   ///< number of local workers
95        struct xomp_worker *workers;    ///< array of local workers
96        uint32_t next;                  ///< next local worker to "allocate"
97    } local;
98    struct {
99        uint32_t num;                   ///< number of remote workers
100        struct xomp_worker *workers;    ///< array of remote workers
101        uint32_t next;                  ///< next remote worker to "allocate"
102    } remote;
103};
104
105/**
106 * \brief Message state for the TX queue
107 */
108struct xomp_msg_st
109{
110    struct txq_msg_st common;       ///< common msg state
111    /* union of arguments */
112    union {
113        struct {
114            uint64_t fn;
115            uint64_t arg;
116            uint64_t id;
117            uint64_t flags;
118        } do_work;
119        struct {
120            struct capref frame;
121            uint64_t vaddr;
122            uint8_t type;
123        } add_mem;
124    } args;
125};
126
127/// intialized flag
128static uint8_t xomp_master_initialized = 0x0;
129
130/// XOMP master
131static struct xomp_master xmaster;
132
133/// exported service iref (for local workers)
134static iref_t svc_iref;
135
136/// number of present Xeon Phis
137static uint8_t num_phi = 0;
138
139/// only use remote workers, no locals
140static xomp_wloc_t worker_loc = XOMP_WORKER_LOC_MIXED;
141
142/// stride for core allocation (in case of hyperthreads)
143static coreid_t core_stride;
144
145/// arguments to supply to the local spawned workers
146static struct xomp_spawn spawn_args_local;
147
148/// arguments to supply to the remote spawned workers
149static struct xomp_spawn spawn_args_remote;
150
151/// buffer for the worker id argument
152static char worker_id_buf[26];
153
154/// buffer for the iref argument
155static char iref_buf[19];
156
157#if XOMP_BENCH_ENABLED
158
159#include <bench/bench.h>
160
161static bench_ctl_t **xomp_bench_mem_add;
162static bench_ctl_t **xomp_bench_do_work;
163static bench_ctl_t **xomp_bench_spawn;
164
165#endif
166
167#if XOMP_BENCH_MASTER_EN
168static cycles_t local_timer;
169static cycles_t remote_timer;
170#endif
171
172/**
173 * \brief enters the barrier when a worker finished his work, this function
174 *        is called on the main thread (master domain)
175 *
176 * \param barrier   The barrier to enter
177 */
178static inline void xbomp_barrier_enter_no_wait(struct bomp_barrier *barrier)
179{
180    if (__sync_fetch_and_add(&barrier->counter, 1) == (barrier->max - 1)) {
181        barrier->counter = 0;
182        barrier->cycle = !barrier->cycle;
183    }
184}
185
186#define XOMP_LOCAL_THREADS_MAX 10
187
188/*
189 * ----------------------------------------------------------------------------
190 * Helper functions
191 * ----------------------------------------------------------------------------
192 */
193static inline uint32_t xomp_master_get_local_threads(uint32_t nthreads)
194{
195    switch (worker_loc) {
196        case XOMP_WORKER_LOC_LOCAL:
197            return nthreads - 1;
198        case XOMP_WORKER_LOC_MIXED:
199#if XOMP_LOCAL_THREADS_MAX
200            if (nthreads > XOMP_LOCAL_THREADS_MAX) {
201                return XOMP_LOCAL_THREADS_MAX - 1;
202            } else {
203                return nthreads - (num_phi * ((nthreads) / (1 + num_phi))) - 1;
204            }
205#else
206            return nthreads - (num_phi * ((nthreads) / (1 + num_phi))) - 1;
207#endif
208        case XOMP_WORKER_LOC_REMOTE:
209            return 0;
210        default:
211            USER_PANIC("unknown worker location!");
212    }
213    USER_PANIC("unknown worker location!");
214    return 0;
215}
216
217static inline uint32_t xomp_master_get_remote_threads(uint32_t nthreads)
218{
219    switch (worker_loc) {
220        case XOMP_WORKER_LOC_LOCAL:
221            return 0;
222        case XOMP_WORKER_LOC_MIXED:
223#if XOMP_LOCAL_THREADS_MAX
224            if (nthreads > XOMP_LOCAL_THREADS_MAX) {
225                return nthreads - XOMP_LOCAL_THREADS_MAX;
226            } else {
227                return ((nthreads) / (1 + num_phi)) * num_phi;
228            }
229#else
230            return ((nthreads) / (1 + num_phi)) * num_phi;
231#endif
232        case XOMP_WORKER_LOC_REMOTE:
233            return nthreads - 1;
234        default:
235            USER_PANIC("unknown worker location!");
236    }
237
238    return 0;
239}
240
241/*
242 * ----------------------------------------------------------------------------
243 * XOMP channel send handlers
244 * ----------------------------------------------------------------------------
245 */
246
247static errval_t do_work_tx(struct txq_msg_st *msg_st)
248{
249
250    struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
251
252    return xomp_do_work__tx(msg_st->queue->binding, TXQCONT(msg_st),
253                            st->args.do_work.fn, st->args.do_work.arg,
254                            st->args.do_work.id, st->args.do_work.flags);
255}
256
257static errval_t gw_req_memory_call_tx(struct txq_msg_st *msg_st)
258{
259
260    struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
261
262    return xomp_gw_req_memory_call__tx(msg_st->queue->binding, TXQCONT(msg_st),
263                                       st->args.add_mem.vaddr,
264                                       st->args.add_mem.type);
265}
266
267static errval_t add_memory_call_tx(struct txq_msg_st *msg_st)
268{
269
270    struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
271
272    return xomp_add_memory_call__tx(msg_st->queue->binding, TXQCONT(msg_st),
273                                    st->args.add_mem.frame,
274                                    st->args.add_mem.vaddr, st->args.add_mem.type);
275}
276
277/*
278 * ----------------------------------------------------------------------------
279 * XOMP channel receive handlers
280 * ----------------------------------------------------------------------------
281 */
282
283static void gw_req_memory_response_rx(struct xomp_binding *b,
284                                      errval_t msgerr)
285{
286    XMP_DEBUG("gw_req_memory_response_rx: %s\n", err_getstring(msgerr));
287
288    struct xomp_worker *worker = b->st;
289
290    worker->err = msgerr;
291    worker->add_mem_st = 0x2;
292
293#if XOMP_BENCH_ENABLED
294    cycles_t timer = bench_tsc();
295    if (xomp_bench_mem_add) {
296        timer = bench_time_diff(worker->start, timer);
297        bench_ctl_add_run(xomp_bench_mem_add[1], &timer);
298        bench_ctl_add_run(xomp_bench_mem_add[2+worker->index], &timer);
299    }
300#endif
301
302#if XOMP_BENCH_MASTER_EN
303    cycles_t duration = bench_tsc() - worker->start;
304    remote_timer += duration;
305    debug_printf("remote worker %016lx: add memory took %lu cycles, %lu ms\n",
306                 worker->id, duration, bench_tsc_to_ms(duration));
307#endif
308}
309
310static void add_memory_response_rx(struct xomp_binding *b,
311                                   errval_t msgerr)
312{
313    XMP_DEBUG("add_memory_response_rx: %s\n", err_getstring(msgerr));
314
315    struct xomp_worker *worker = b->st;
316
317    worker->err = msgerr;
318    worker->add_mem_st = 0x2;
319
320#if XOMP_BENCH_ENABLED
321    cycles_t timer = bench_tsc();
322    if (xomp_bench_mem_add) {
323        timer = bench_time_diff(worker->start, timer);
324        bench_ctl_add_run(xomp_bench_mem_add[0], &timer);
325        bench_ctl_add_run(xomp_bench_mem_add[2+worker->index], &timer);
326    }
327#endif
328
329#if XOMP_BENCH_MASTER_EN
330    cycles_t duration = bench_tsc() - worker->start;
331    local_timer += duration;
332    debug_printf("local worker %016lx: add memory took %lu cycles, %lu ms\n",
333                 worker->id, duration, bench_tsc_to_ms(duration));
334#endif
335}
336
337static inline void done_msg_common(struct xomp_binding *b,
338                                   uint64_t tid,
339                                   errval_t msgerr)
340{
341    struct xomp_task *task = (struct xomp_task *) tid;
342
343    struct xomp_worker *worker = b->st;
344    if (err_is_fail(msgerr)) {
345        worker->state = XOMP_WORKER_ST_FAILURE;
346    } else {
347        worker->state = XOMP_WORKER_ST_READY;
348    }
349
350#if XOMP_BENCH_ENABLED
351    cycles_t timer = bench_tsc();
352    if (xomp_bench_do_work) {
353        timer = bench_time_diff(worker->start, timer);
354        if (worker->type == XOMP_WORKER_TYPE_LOCAL) {
355            bench_ctl_add_run(xomp_bench_do_work[0], &timer);
356        } else if (worker->type == XOMP_WORKER_TYPE_REMOTE){
357            bench_ctl_add_run(xomp_bench_do_work[1], &timer);
358        }
359        bench_ctl_add_run(xomp_bench_do_work[2 + worker->index], &timer);
360    }
361#endif
362
363#if XOMP_BENCH_MASTER_EN
364    cycles_t duration = bench_tsc()- worker->start;
365    debug_printf("generic worker %u, %lu cycles, %lu ms\n",
366                 (uint16_t)worker->id, duration, bench_tsc_to_ms(duration));
367#endif
368
369    xbomp_barrier_enter_no_wait(task->barrier);
370
371    /* if the last worker returns, free up the task data structure */
372    task->done++;
373    if (task->done == task->total_threads) {
374        free(task);
375    }
376}
377
378static void done_with_arg_rx(struct xomp_binding *b,
379                             uint64_t tid,
380                             uint64_t arg,
381                             errval_t msgerr)
382{
383    XMP_DEBUG("done_with_arg_rx: arg:%lx, id:%lx\n", arg, tid);
384
385    done_msg_common(b, tid, msgerr);
386
387    /* XXX: do something with the argument */
388}
389
390static void done_notify_rx(struct xomp_binding *b,
391                           uint64_t tid,
392                           errval_t msgerr)
393{
394    XMP_DEBUG("done_notify_rx: id:%lx\n", tid);
395
396    done_msg_common(b, tid, msgerr);
397}
398
399static struct xomp_rx_vtbl rx_vtbl = {
400    .gw_req_memory_response = gw_req_memory_response_rx,
401    .add_memory_response = add_memory_response_rx,
402    .done_notify = done_notify_rx,
403    .done_with_arg = done_with_arg_rx
404};
405
406/*
407 * ----------------------------------------------------------------------------
408 * XOMP channel connect handler
409 * ----------------------------------------------------------------------------
410 */
411
412static errval_t xomp_svc_connect_cb(void *st,
413                                    struct xomp_binding *xb)
414{
415    struct xomp_worker *worker = xmaster.local.workers + xmaster.local.next++;
416
417    XMI_DEBUG("xomp_svc_connect_cb:%lx connected: %p\n", worker->id, worker);
418
419    xb->rx_vtbl = rx_vtbl;
420    xb->st = worker;
421
422    txq_init(&worker->txq, xb, xb->waitset, (txq_register_fn_t) xb->register_send,
423             sizeof(struct xomp_msg_st));
424
425    worker->binding = xb;
426    worker->state = XOMP_WORKER_ST_SPAWNED;
427
428    return SYS_ERR_OK;
429}
430
431static void xomp_svc_export_cb(void *st,
432                               errval_t err,
433                               iref_t iref)
434{
435    XMI_DEBUG("Service exported @ iref:%"PRIuIREF", %s\n", iref, err_getstring(err));
436
437    svc_iref = iref;
438}
439
440/**
441 * \brief XOMP channel connect callback called by the Flounder backend
442 *
443 * \param st    Supplied worker state
444 * \param err   outcome of the connect attempt
445 * \param xb    XOMP Flounder binding
446 */
447static void worker_connect_cb(void *st,
448                              errval_t err,
449                              struct xomp_binding *xb)
450{
451
452    struct xomp_worker *worker = st;
453
454    XMI_DEBUG("worker:%lx connected: %s\n", worker->id, err_getstring(err));
455
456    if (err_is_fail(err)) {
457        worker->state = XOMP_WORKER_ST_FAILURE;
458        return;
459    }
460
461    xb->rx_vtbl = rx_vtbl;
462    xb->st = worker;
463
464    txq_init(&worker->txq, xb, xb->waitset, (txq_register_fn_t) xb->register_send,
465             sizeof(struct xomp_msg_st));
466
467    worker->binding = xb;
468    worker->state = XOMP_WORKER_ST_SPAWNED;
469}
470
471/*
472 * ============================================================================
473 * Public Interface
474 * ============================================================================
475 */
476
477/**
478 * \brief initializes the Xeon Phi openMP library
479 *
480 * \param args struct containing the master initialization values
481 *
482 * \returns SYS_ERR_OK on success
483 *          errval on failure
484 */
485
486errval_t xomp_master_init(struct xomp_args *args)
487{
488    errval_t err;
489
490    if (xomp_master_initialized) {
491        XMI_DEBUG("WARNIG: XOMP master already initialized\n");
492        return SYS_ERR_OK;
493    }
494
495    if (args->type == XOMP_ARG_TYPE_WORKER) {
496        return -1;  // TODO: ERRNO
497    }
498
499#if XOMP_BENCH_MASTER_EN
500    bench_init();
501#endif
502
503    if (args->core_stride != 0) {
504        core_stride = args->core_stride;
505    } else {
506        core_stride = BOMP_DEFAULT_CORE_STRIDE;
507    }
508
509    if (args->type == XOMP_ARG_TYPE_UNIFORM) {
510        num_phi = args->args.uniform.nphi;
511        worker_loc = args->args.uniform.worker_loc;
512    } else {
513        num_phi = args->args.distinct.nphi;
514
515        worker_loc = args->args.distinct.worker_loc;
516    }
517
518    XMI_DEBUG("Initializing XOMP master with nthreads:%u, nphi:%u\n",
519              args->args.uniform.nthreads, args->args.uniform.nphi);
520
521    /* exporting the interface for local workers */
522    err = xomp_export(NULL, xomp_svc_export_cb, xomp_svc_connect_cb,
523                      get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT);
524    if (err_is_fail(err)) {
525        return err;
526    }
527
528    while (svc_iref == 0) {
529        err = event_dispatch(get_default_waitset());
530        if (err_is_fail(err)) {
531            USER_PANIC_ERR(err, "event dispatch\n");
532        }
533    }
534
535    char **argv = NULL;
536
537    if (args->type == XOMP_ARG_TYPE_UNIFORM) {
538
539        spawn_args_local.argc = args->args.uniform.argc;
540        spawn_args_remote.argc = args->args.uniform.argc;
541
542        err = xomp_master_build_path(&spawn_args_local.path, &spawn_args_remote.path);
543        if (err_is_fail(err)) {
544            USER_PANIC_ERR(err, "could not build the path");
545        }
546        argv = args->args.uniform.argv;
547    } else {
548        spawn_args_local.argc = args->args.distinct.local.argc;
549        spawn_args_local.path = args->args.distinct.local.path;
550        spawn_args_remote.path = args->args.distinct.remote.path;
551        argv = args->args.distinct.local.argv;
552    }
553
554    spawn_args_local.argv = calloc(spawn_args_local.argc + 3, sizeof(char *));
555
556    if (spawn_args_local.argv == NULL) {
557        return LIB_ERR_MALLOC_FAIL;
558    }
559
560    for (uint8_t i = 0; i < spawn_args_local.argc; ++i) {
561        spawn_args_local.argv[i] = argv[i];
562    }
563
564    spawn_args_local.argv[spawn_args_local.argc++] = XOMP_WORKER_ARG;
565    spawn_args_local.argv[spawn_args_local.argc++] = worker_id_buf;
566    spawn_args_local.argv[spawn_args_local.argc++] = iref_buf;
567    spawn_args_local.argv[spawn_args_local.argc] = NULL;
568
569    snprintf(iref_buf, sizeof(iref_buf), "--iref=0x%08x", svc_iref);
570
571    /* remote initialization */
572
573    if (args->type == XOMP_ARG_TYPE_DISTINCT) {
574        argv = args->args.distinct.remote.argv;
575        spawn_args_remote.argc = args->args.distinct.remote.argc;
576    }
577
578    spawn_args_remote.argv = calloc(spawn_args_remote.argc + 2, sizeof(char *));
579
580    if (spawn_args_remote.argv == NULL) {
581        free(spawn_args_local.argv);
582        return LIB_ERR_MALLOC_FAIL;
583    }
584
585    for (uint8_t i = 0; i < spawn_args_remote.argc; ++i) {
586        spawn_args_remote.argv[i] = argv[i];
587    }
588
589    spawn_args_remote.argv[spawn_args_remote.argc++] = XOMP_WORKER_ARG;
590    spawn_args_remote.argv[spawn_args_remote.argc++] = worker_id_buf;
591    spawn_args_remote.argv[spawn_args_remote.argc] = NULL;
592
593    xomp_master_initialized = 0x1;
594
595    return SYS_ERR_OK;
596}
597
598/**
599 * \brief Spawns the worker threads on the Xeon Phi
600 *
601 * \param nworkers    Number of total workers this includes the Master
602 *
603 * \returns SYS_ERR_OK on success
604 *          errval on failure
605 */
606errval_t xomp_master_spawn_workers(uint32_t nworkers)
607{
608    errval_t err;
609
610    if (!xomp_master_initialized) {
611        return XOMP_ERR_MASTER_NOT_INIT;
612    }
613
614    xmaster.numworker = nworkers;
615
616    struct xomp_worker *workers = calloc(nworkers, sizeof(struct xomp_worker));
617
618    if (workers == NULL) {
619        return LIB_ERR_MALLOC_FAIL;
620    }
621
622    uint32_t remote_threads = xomp_master_get_remote_threads(nworkers);
623    uint32_t local_threads = xomp_master_get_local_threads(nworkers);
624
625    xmaster.local.next = 0;
626    xmaster.remote.next = 0;
627    xmaster.local.num = local_threads;
628    xmaster.remote.num = remote_threads;
629    xmaster.local.workers = workers;
630
631    if (remote_threads > 0) {
632        err = spawn_symval_cache_init(0);
633        if (err_is_fail(err)) {
634            USER_PANIC_ERR(err, "domain no spawned with appropriate flags\n");
635            return err;
636        }
637    }
638
639    if (num_phi > 0) {
640        xmaster.remote.workers = workers + local_threads;
641    }
642
643    XMI_DEBUG("spawning %u workers: local:%u, remote: %ux%u\n", nworkers - 1,
644              local_threads, num_phi,
645              (num_phi != 0 ? remote_threads / num_phi : remote_threads));
646
647    assert((remote_threads + local_threads + 1) == nworkers);
648
649    xphi_id_t xid = 0;
650    coreid_t core = disp_get_core_id() + core_stride;
651
652#if XOMP_BENCH_MASTER_EN
653    cycles_t spawn_timer;
654    cycles_t remote_spawn_timer = 0;
655    cycles_t remote_connect_timer = 0;
656    cycles_t local_spawn_timer = 0;
657    cycles_t local_connect_timer = 0;
658#endif
659
660    for (uint32_t i = 0; i < remote_threads + local_threads; ++i) {
661#ifdef __k1om__
662        if (xid == disp_xeon_phi_id()) {
663            xid = (xid + 1) % num_phi;
664        }
665#endif
666        if (i == local_threads) {
667            core = XOMP_REMOTE_COREID_START;
668        }
669
670        struct xomp_worker *worker = workers + i;
671
672#if XOMP_BENCH_ENABLED
673        worker->index = i;
674        worker->start = bench_tsc();
675#endif
676
677#ifndef __k1om__
678        /*
679         * XXX: we have to set the ram affinity in order to have a higher chance
680         *      the node gets found at the Xeon Phi. It may be split up otherwise
681         */
682        uint64_t min_base, max_limit;
683        ram_get_affinity(&min_base, &max_limit);
684        ram_set_affinity(XOMP_RAM_MIN_BASE, XOMP_RAM_MAX_LIMIT);
685#endif
686
687        if (i < local_threads) {
688            err = frame_alloc(&worker->msgframe, XOMP_TLS_SIZE, NULL);
689        } else {
690            err = frame_alloc(&worker->msgframe, XOMP_FRAME_SIZE, NULL);
691        }
692
693#ifndef __k1om__
694        ram_set_affinity(min_base, max_limit);
695#endif
696
697        if (err_is_fail(err)) {
698            /* TODO: cleanup */
699            worker->state = XOMP_WORKER_ST_FAILURE;
700            return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
701        }
702
703        struct frame_identity id;
704        err = frame_identify(worker->msgframe, &id);
705        if (err_is_fail(err)) {
706            /* TODO: cleanup */
707            return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
708        }
709
710        /* TODO: build a good id */
711        worker->id = ((uint64_t) disp_get_domain_id()) << 48 | ((uint64_t)core) << 32;
712        if (i < local_threads) {
713            worker->id |= ((uint64_t)0xFF) << 24;
714        } else {
715            worker->id |= ((uint64_t)xid) << 24;
716        }
717        worker->id |= i+1;
718
719        worker->msgbase = id.base;
720        worker->state = XOMP_WORKER_ST_SPAWNING;
721
722        err = vspace_map_one_frame(&worker->msgbuf, id.bytes,
723                                   worker->msgframe, NULL, NULL);
724
725        if (err_is_fail(err)) {
726            /* TODO: cleanup */
727            return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
728        }
729
730        XMI_DEBUG("messaging frame mapped: [%016lx] @ [%016lx]\n",
731                  worker->msgbase, (lvaddr_t )worker->msgbuf);
732
733        if (i < local_threads) {
734            snprintf(worker_id_buf, sizeof(worker_id_buf), "--wid=%016"PRIx64,
735                     worker->id);
736            /*
737             * TODO: set a gateway domain for each NUMA node as it is done with
738             *       the Xeon Phi
739             */
740            worker->tls = worker->msgbuf;
741
742            XMI_DEBUG("spawning {%s} on host, core:%u\n", spawn_args_local.path,
743                      core);
744#if XOMP_BENCH_MASTER_EN
745            spawn_timer = bench_tsc();
746#endif
747
748            struct capref did;
749            err = spawn_program_with_caps(core, spawn_args_local.path,
750                                          spawn_args_local.argv, NULL, NULL_CAP,
751                                          worker->msgframe, SPAWN_FLAGS_OMP,
752                                          &did);
753#if XOMP_BENCH_MASTER_EN
754            local_spawn_timer += bench_tsc() - spawn_timer;
755            spawn_timer = bench_tsc();
756#endif
757            worker->domain = did;
758            worker->type = XOMP_WORKER_TYPE_LOCAL;
759            if (err_is_fail(err)) {
760                /* TODO: cleanup */
761                return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
762            }
763
764            core += core_stride;
765        } else {
766            /*
767             * we give give the first worker domains the gateway flag so it
768             * initializes the gateway service while others will connect to it
769             */
770            if (core == XOMP_REMOTE_COREID_START) {
771                worker->id |= XOMP_WID_GATEWAY_FLAG;
772            }
773
774            snprintf(worker_id_buf, sizeof(worker_id_buf), "--wid=%016"PRIx64,
775                     worker->id);
776
777            worker->tls = ((uint8_t *) worker->msgbuf) + XOMP_MSG_FRAME_SIZE;
778
779            struct xomp_frameinfo fi = {
780                .sendbase = worker->msgbase + XOMP_MSG_CHAN_SIZE,
781                .inbuf = worker->msgbuf,
782                .inbufsize = XOMP_MSG_CHAN_SIZE,
783                .outbuf = ((uint8_t *) worker->msgbuf) + XOMP_MSG_CHAN_SIZE,
784                .outbufsize = XOMP_MSG_CHAN_SIZE
785            };
786
787            err = xomp_accept(&fi, worker, worker_connect_cb,
788                              get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT);
789
790            if (err_is_fail(err)) {
791                /* TODO: Clean up */
792                return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
793            }
794
795            XMI_DEBUG("spawning {%s} on xid:%u, core:%u\n",
796                      spawn_args_remote.path, xid, core);
797#if XOMP_BENCH_MASTER_EN
798            spawn_timer = bench_tsc();
799#endif
800            err = xeon_phi_client_spawn(xid, core, spawn_args_remote.path,
801                                        spawn_args_remote.argv, worker->msgframe,
802                                        SPAWN_FLAGS_OMP, &worker->domainid);
803
804#if XOMP_BENCH_MASTER_EN
805            remote_spawn_timer += bench_tsc() - spawn_timer;
806            spawn_timer = bench_tsc();
807#endif
808
809            if (err_is_fail(err)) {
810                /* TODO: cleanup */
811                return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED);
812            }
813            worker->type = XOMP_WORKER_TYPE_REMOTE;
814            xid++;
815        }
816
817        XMI_DEBUG("waiting for client %p to connect...\n", worker);
818
819        while (worker->state == XOMP_WORKER_ST_SPAWNING) {
820            err = event_dispatch(get_default_waitset());
821            if (err_is_fail(err)) {
822                USER_PANIC_ERR(err, "event dispatch\n");
823            }
824        }
825#if XOMP_BENCH_MASTER_EN
826        if (worker->type == XOMP_WORKER_TYPE_REMOTE) {
827            remote_connect_timer += bench_tsc() - spawn_timer;
828        } else {
829            local_connect_timer += bench_tsc() - spawn_timer;
830        }
831#endif
832
833        if (worker->state == XOMP_WORKER_ST_FAILURE) {
834            return XOMP_ERR_SPAWN_WORKER_FAILED;
835        }
836
837#if XOMP_BENCH_ENABLED
838        cycles_t timer = bench_tsc();
839        if (xomp_bench_spawn) {
840            timer = bench_time_diff(worker->start, timer);
841            if (i < local_threads) {
842                bench_ctl_add_run(xomp_bench_spawn[0], &timer);
843            } else {
844                bench_ctl_add_run(xomp_bench_spawn[1], &timer);
845            }
846        }
847#endif
848
849        worker->state = XOMP_WORKER_ST_READY;
850
851        if (i >= local_threads) {
852            if (xid == num_phi) {
853                xid = 0;
854                core++; // no stride on xeon phi
855            }
856        }
857    }
858
859#if XOMP_BENCH_MASTER_EN
860    remote_spawn_timer /= (remote_threads ? remote_threads : 1);
861    local_spawn_timer /= (local_threads ? local_threads : 1);
862    remote_connect_timer /= (remote_threads ? remote_threads : 1);
863    local_connect_timer /= (local_threads ? local_threads : 1);
864    debug_printf("Avg spawn time remote: %lu cycles, %lu ms\n",
865                 remote_spawn_timer, bench_tsc_to_ms(remote_spawn_timer));
866    debug_printf("Avg spawn time local: %lu cycles, %lu ms\n",
867                 local_spawn_timer, bench_tsc_to_ms(local_spawn_timer));
868    debug_printf("Avg connect time remote: %lu cycles, %lu ms\n",
869                 remote_connect_timer, bench_tsc_to_ms(remote_connect_timer));
870    debug_printf("Avg connect time local: %lu cycles, %lu ms\n",
871                 local_connect_timer, bench_tsc_to_ms(local_connect_timer));
872#endif
873
874    xmaster.local.next = 0;
875    xmaster.remote.next = 0;
876
877    return SYS_ERR_OK;
878}
879
880/**
881 * \brief Adds a memory region to be used for work
882 *
883 * \param frame Frame to be shared
884 * \param info  information about the frame i.e. virtual address to map
885 * \oaram type  Type of the frame
886 *
887 * \returns SYS_ERR_OK on success
888 *          errval on error
889 */
890
891errval_t xomp_master_add_memory(struct capref frame,
892                                uint64_t info,
893                                xomp_frame_type_t type)
894{
895    errval_t err;
896
897    if (!xomp_master_initialized) {
898        return XOMP_ERR_MASTER_NOT_INIT;
899    }
900
901#if XOMP_BENCH_MASTER_EN
902    remote_timer = 0;
903    local_timer = 0;
904#endif
905
906    struct xomp_worker *worker;
907
908    XMI_DEBUG("adding memory of type %u @ info: %016lx\n", type, info);
909
910    /*
911     * we adding the memory to the worker domains with the Xeon Phi Gateway
912     * domains first. This is expected to take the longest time. (potential
913     * replication and going through the Xeon Phi drivers).
914     *
915     * For subsequent worker domains, we just send the messages asynchronously
916     */
917    for (uint32_t i = 0; i < xmaster.remote.num; ++i) {
918        worker = &xmaster.remote.workers[i];
919#if XOMP_BENCH_ENABLED
920            worker->start = bench_tsc();
921#endif
922        if (worker->id & XOMP_WID_GATEWAY_FLAG) {
923            xphi_id_t xid = xeon_phi_domain_get_xid(worker->domainid);
924            err = xeon_phi_client_chan_open(xid, worker->domainid, info, frame,
925                                            type);
926            if (err_is_fail(err)) {
927                worker->state = XOMP_WORKER_ST_FAILURE;
928                /*
929                 * XXX: if the gateway domain fails, the entire node is not
930                 *      operational.
931                 */
932                return err;
933            }
934#if XOMP_BENCH_ENABLED
935            if (xomp_bench_mem_add) {
936                cycles_t timer = bench_tsc();
937                timer = bench_time_diff(worker->start, timer);
938                bench_ctl_add_run(xomp_bench_mem_add[1], &timer);
939                bench_ctl_add_run(xomp_bench_mem_add[2 + worker->index], &timer);
940            }
941#endif
942#if XOMP_BENCH_MASTER_EN
943            cycles_t duration = bench_tsc() - worker->start;
944            debug_printf("remote worker %lx: chan open took  %lu cycles, %lu ms\n",
945                         worker->id, duration, bench_tsc_to_ms(duration));
946            remote_timer += duration;
947#endif
948        } else {
949            assert(worker->add_mem_st == 0x0);
950
951            worker->add_mem_st = 0x1;
952
953            struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq);
954
955            if (msg_st == NULL) {
956                return LIB_ERR_MALLOC_FAIL;
957            }
958
959            msg_st->send = gw_req_memory_call_tx;
960            msg_st->cleanup = NULL;
961
962            struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
963            st->args.add_mem.vaddr = info;
964            st->args.add_mem.type = type;
965
966            txq_send(msg_st);
967        }
968    }
969
970    /* send the memory caps to the local workers directly */
971    for (uint32_t i = 0; i < xmaster.local.num; ++i) {
972        worker = &xmaster.local.workers[i];
973#if XOMP_BENCH_ENABLED
974        worker->start = bench_tsc();
975#endif
976        assert(worker->type == XOMP_WORKER_TYPE_LOCAL);
977        assert(worker->add_mem_st == 0x0);
978
979        worker->add_mem_st = 0x1;
980
981        struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq);
982
983        if (msg_st == NULL) {
984            return LIB_ERR_MALLOC_FAIL;
985        }
986
987        msg_st->send = add_memory_call_tx;
988        msg_st->cleanup = NULL;
989
990        struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
991        st->args.add_mem.frame = frame;
992        st->args.add_mem.vaddr = info;
993
994        // XXX: overwriting replicaton on the host for now
995        if (type == XOMP_FRAME_TYPE_REPL_RW) {
996            st->args.add_mem.type =  XOMP_FRAME_TYPE_SHARED_RW;
997        } else {
998            st->args.add_mem.type = type;
999        }
1000
1001        txq_send(msg_st);
1002    }
1003
1004    /* wait for the replies */
1005
1006    for (uint32_t i = 0; i < xmaster.remote.num; ++i) {
1007        worker = &xmaster.remote.workers[i];
1008        if (worker->id & XOMP_WID_GATEWAY_FLAG) {
1009            continue;
1010        }
1011        while (worker->add_mem_st == 0x1) {
1012            err = event_dispatch(get_default_waitset());
1013            if (err_is_fail(err)) {
1014                USER_PANIC_ERR(err, "event dispatch\n");
1015            }
1016        }
1017        if (err_is_fail(worker->err)) {
1018            worker->state = XOMP_WORKER_ST_FAILURE;
1019            return worker->err;
1020        }
1021        worker->add_mem_st = 0x0;
1022    }
1023
1024    for (uint32_t i = 0; i < xmaster.local.num; ++i) {
1025        worker = &xmaster.local.workers[i];
1026        assert(worker->type == XOMP_WORKER_TYPE_LOCAL);
1027
1028        while (worker->add_mem_st == 0x1) {
1029            err = event_dispatch(get_default_waitset());
1030            if (err_is_fail(err)) {
1031                USER_PANIC_ERR(err, "event dispatch\n");
1032            }
1033        }
1034        if (err_is_fail(worker->err)) {
1035            worker->state = XOMP_WORKER_ST_FAILURE;
1036            return worker->err;
1037        }
1038        worker->add_mem_st = 0x0;
1039    }
1040
1041#if XOMP_BENCH_MASTER_EN
1042    remote_timer /= (xmaster.remote.num ? xmaster.remote.num : 1);
1043    local_timer /= (xmaster.local.num ? xmaster.local.num : 1);
1044
1045    debug_printf("Avg mem add time remote: %lu cycles, %lu ms\n",
1046                 remote_timer, bench_tsc_to_ms(remote_timer));
1047    debug_printf("Avg mem add time local: %lu cycles, %lu ms\n",
1048                 local_timer, bench_tsc_to_ms(local_timer));
1049
1050#endif
1051
1052    return SYS_ERR_OK;
1053}
1054
1055/**
1056 * \brief builds the argument path based on the own binary name
1057 *
1058 * \param local  pointer where to store the local path
1059 * \param remote pointer where to store the remote path
1060 *
1061 * \returns SYS_ERR_OK on success
1062 */
1063errval_t xomp_master_build_path(char **local,
1064                                char **remote)
1065{
1066    size_t length, size = 0;
1067
1068    size += snprintf(NULL, 0, "/x86_64/sbin/%s", disp_name()) + 1;
1069    size += snprintf(NULL, 0, "/k1om/sbin/%s", disp_name()) + 1;
1070
1071    char *path = malloc(size);
1072    if (path == NULL) {
1073        return LIB_ERR_MALLOC_FAIL;
1074    }
1075
1076    length = snprintf(path, size, "/x86_64/sbin/%s", disp_name());
1077    path[length] = '\0';
1078    size -= (++length);
1079
1080    if (local) {
1081        *local = path;
1082    }
1083
1084    path += length;
1085    length = snprintf(path, size, "/k1om/sbin/%s", disp_name());
1086    path[length] = '\0';
1087
1088    if (remote) {
1089        *remote = path;
1090    }
1091
1092    return SYS_ERR_OK;
1093}
1094
1095/**
1096 * \brief executes some work on each worker domains
1097 *
1098 * \param task information about the task
1099 *
1100 * \returns SYS_ERR_OK on success
1101 *          errval on error
1102 */
1103errval_t xomp_master_do_work(struct xomp_task *task)
1104{
1105    errval_t err;
1106
1107    if (!xomp_master_initialized) {
1108        return XOMP_ERR_MASTER_NOT_INIT;
1109    }
1110
1111#ifndef __k1om__
1112    struct waitset *ws = get_default_waitset();
1113#endif
1114
1115    uint64_t fn = 0;
1116
1117#if XOMP_BENCH_MASTER_EN
1118    remote_timer = 0;
1119    local_timer = 0;
1120#endif
1121
1122    uint32_t remote_threads = xomp_master_get_remote_threads(task->total_threads);
1123    uint32_t local_threads = xomp_master_get_local_threads(task->total_threads);
1124
1125    XMP_DEBUG("Executing task with %u workers host:%u, xphi:%ux%u]\n",
1126              task->total_threads, local_threads + 1, num_phi, remote_threads);
1127
1128    assert(local_threads <= xmaster.local.num);
1129    assert(remote_threads <= xmaster.remote.num);
1130    assert((local_threads + remote_threads + 1) == task->total_threads);
1131
1132    uint32_t fn_idx;
1133    char *fn_name;
1134
1135    if (remote_threads > 0) {
1136        /*
1137         * do the address translation for the remote workers
1138         */
1139        err = spawn_symval_lookup_addr((genvaddr_t) task->fn, &fn_idx, &fn_name);
1140        if (err_is_fail(err)) {
1141            USER_PANIC_ERR(err, "looking up address\n");
1142            return err;
1143        }
1144    }
1145
1146    /* overwrite the global num threads counter */
1147    g_bomp_state->num_threads += ((local_threads) * (XOMP_VTHREAD_COUNT));
1148
1149    uint32_t threadid = 1;
1150
1151    for (uint32_t i = 1; i < task->total_threads; ++i) {
1152        struct xomp_worker *worker = NULL;
1153
1154        if (i <= local_threads) {
1155            worker = &xmaster.local.workers[xmaster.local.next++];
1156            assert(worker->type == XOMP_WORKER_TYPE_LOCAL);
1157
1158            if (xmaster.local.next == xmaster.local.num) {
1159                xmaster.local.next = 0;
1160            }
1161
1162            XMP_DEBUG("local worker id:%lx\n", worker->id);
1163
1164            fn = (uint64_t) task->fn;
1165
1166        } else {
1167            worker = &xmaster.remote.workers[xmaster.remote.next++];
1168            assert(worker->type == XOMP_WORKER_TYPE_REMOTE);
1169            assert(fn_idx != 0);
1170
1171            if (xmaster.remote.next == xmaster.remote.num) {
1172                xmaster.remote.next = 0;
1173            }
1174            // build the function address based on the flag and the index
1175            fn = (uint64_t) fn_idx | XOMP_FN_INDEX_FLAG;
1176
1177            XMP_DEBUG("remote worker id: %016lx, function %s @ index %u\n",
1178                      worker->id, fn_name, fn_idx);
1179        }
1180
1181#if XOMP_BENCH_ENABLED
1182        worker->start = bench_tsc();
1183#endif
1184
1185        if (worker->state != XOMP_WORKER_ST_READY) {
1186            return XOMP_ERR_WORKER_STATE;
1187        }
1188        assert(worker->state == XOMP_WORKER_ST_READY);
1189        worker->state = XOMP_WORKER_ST_BUSY;
1190
1191        struct bomp_work *work = worker->tls;
1192
1193        work->fn = task->fn;
1194
1195        work->barrier = NULL;
1196        work->thread_id = threadid;
1197        work->num_threads = g_bomp_state->num_threads;
1198
1199        if (i <= local_threads) {
1200            work->num_vtreads = XOMP_VTHREADS;
1201            threadid += XOMP_VTHREADS;
1202        } else {
1203            work->num_vtreads = 1;
1204            threadid++;
1205        }
1206
1207        /* XXX: hack, we do not know how big the data section is... */
1208        if (task->arg) {
1209            uint64_t *src = task->arg;
1210            uint64_t *dst = (uint64_t *) (work + 1);
1211            uint32_t bytes = 0;
1212            while (*src != 0 || bytes < 64) {
1213                *dst++ = *src++;
1214                bytes += 8;
1215            }
1216        }
1217
1218        struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq);
1219
1220        if (msg_st == NULL) {
1221            if (i == 1) {
1222                free(task);
1223            }
1224            return LIB_ERR_MALLOC_FAIL;
1225        }
1226
1227        msg_st->send = do_work_tx;
1228        msg_st->cleanup = NULL;
1229
1230        struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st;
1231        st->args.do_work.arg = (uint64_t) work->data;
1232        st->args.do_work.fn = fn;
1233        st->args.do_work.id = (uint64_t) task;
1234        st->args.do_work.flags = 0;
1235
1236        txq_send(msg_st);
1237
1238#ifndef __k1om__
1239        do {
1240            err = event_dispatch_non_block(ws);
1241        } while(err_is_ok(err));
1242#endif
1243    }
1244
1245    return SYS_ERR_OK;
1246}
1247
1248
1249/**
1250 * \brief tells the gateway domains to update their local replicas
1251 *
1252 * \param frame      capability of the shared frame
1253 * \param offset     offset into the capability to copy
1254 * \param length     number of bytes to copy
1255 * \param node       which node to send the copy request to
1256 * \param direction  UPDATE or WRITE BACK
1257 *
1258 * \return SYS_ERR_OK on sucess,
1259 *         errval on failure
1260 *
1261 */
1262errval_t xomp_master_copy_memory(struct capref frame,
1263                                 size_t offset,
1264                                 size_t length,
1265                                 uint16_t node,
1266                                 xomp_master_copy_t direction)
1267{
1268    assert(!"NYI");
1269    return SYS_ERR_OK;
1270}
1271
1272#if XOMP_BENCH_ENABLED
1273/**
1274 * \brief enables basic benchmarking facilities
1275 *
1276 * \param runs   the number of runs of the experiment
1277 * \param flags  flags which benchmarks to enable
1278 *
1279 * \returns SYS_ERR_OK on success
1280 */
1281errval_t xomp_master_bench_enable(size_t runs,
1282                                  size_t nthreads,
1283                                  uint8_t flags)
1284{
1285    bench_init();
1286
1287    bench_ctl_t **mem = NULL;
1288
1289    if (!flags) {
1290        return -1;
1291    }
1292
1293    mem = calloc(2 + 2 * (2 + nthreads), sizeof(bench_ctl_t*));
1294
1295
1296    if (flags & XOMP_MASTER_BENCH_SPAWN) {
1297        xomp_bench_spawn = mem;
1298        xomp_bench_spawn[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads);
1299        xomp_bench_spawn[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads);
1300        mem += (2);
1301    }
1302
1303    if (flags & XOMP_MASTER_BENCH_DO_WORK) {
1304        xomp_bench_do_work = mem;
1305        xomp_bench_do_work[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads *  runs);
1306        xomp_bench_do_work[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads *  runs);
1307        for (uint32_t i = 0; i < nthreads; ++i) {
1308            xomp_bench_do_work[i + 2] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, runs);
1309        }
1310        mem += (2 + nthreads);
1311    }
1312
1313    if (flags & XOMP_MASTER_BENCH_MEM_ADD) {
1314        xomp_bench_mem_add = mem;
1315        xomp_bench_mem_add[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads *  runs);
1316        xomp_bench_mem_add[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads *  runs);
1317        for (uint32_t i = 0; i < nthreads; ++i) {
1318            xomp_bench_mem_add[i + 2] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, runs);
1319        }
1320    }
1321
1322    return SYS_ERR_OK;
1323}
1324
1325/**
1326 * \brief prints the results of the enabled benchmarks
1327 */
1328void xomp_master_bench_print_results(void)
1329{
1330    cycles_t tsc_per_us = bench_tsc_per_us();
1331    if (xomp_bench_spawn) {
1332        bench_ctl_dump_analysis(xomp_bench_spawn[0], 0, "SPAWN LOCAL", tsc_per_us);
1333        bench_ctl_dump_analysis(xomp_bench_spawn[1], 0, "SPAWN REMOTE", tsc_per_us);
1334    }
1335
1336    uint32_t nthreads = xmaster.local.num + xmaster.remote.num;
1337
1338    char buf[20];
1339
1340    if (xomp_bench_do_work) {
1341        bench_ctl_dump_analysis(xomp_bench_do_work[0], 0, "WORK LOCAL", tsc_per_us);
1342        bench_ctl_dump_analysis(xomp_bench_do_work[1], 0, "WORK REMOTE", tsc_per_us);
1343        for (uint32_t i = 0; i < nthreads; ++i) {
1344            snprintf(buf, 20, "work w.%u", i+1);
1345            bench_ctl_dump_analysis(xomp_bench_spawn[2+i], 0, buf, tsc_per_us);
1346        }
1347    }
1348
1349    if (xomp_bench_mem_add) {
1350        bench_ctl_dump_analysis(xomp_bench_mem_add[0], 0, "MEM ADD LOCAL", tsc_per_us);
1351        bench_ctl_dump_analysis(xomp_bench_mem_add[1], 0, "MEM ADD REMOTE", tsc_per_us);
1352        for (uint32_t i = 0; i < nthreads; ++i) {
1353            snprintf(buf, 20, "memadd w.%u", i+1);
1354            bench_ctl_dump_analysis(xomp_bench_mem_add[2+i], 0, buf, tsc_per_us);
1355        }
1356    }
1357
1358}
1359#endif
1360