1/*
2 * Copyright (c) 2014 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10/**
11 * \brief this file contains the controll channel implementation between the
12 *        node masters and the bomp worker threads
13 */
14#include <barrelfish/barrelfish.h>
15#include <bomp_internal.h>
16#include <bitmap.h>
17
18/* forward declaration */
19static int bomp_node_msg_handler(void *arg);
20
21///< stores the state of a message in transit
22struct bomp_msg_st
23{
24    struct txq_msg_st common;       ///< common msg state
25
26    /* union of arguments */
27    union {
28        struct {
29            uint64_t fn;            ///< the function
30            uint64_t arg;           ///< the argument to the function
31            uint32_t tid;           ///< thread ID
32            uint64_t icv;           ///< thread's control variables
33        } exec;                     ///< execution
34    } args;
35};
36
37/*
38 * ==============================================================================
39 * Control Channel: Program Master Side
40 * ==============================================================================
41 */
42
43
44static errval_t bomp_node_init_threads(nodeid_t nodeid,
45                                       nodeid_t numanode,
46                                       coreid_t nthreads,
47                                       size_t stack_size,
48                                       struct bomp_node *node)
49{
50    errval_t err;
51
52    BOMP_DEBUG_NODE("Initialize worker threads for node %" PRIuNODEID " with %"
53                    PRIuCOREID " threads\n", nodeid, nthreads);
54
55    node->threads = calloc(nthreads, sizeof(struct bomp_thread));
56    if (node->threads == NULL) {
57        return LIB_ERR_MALLOC_FAIL;
58    }
59
60#if 0
61    struct bitmap *bm = numa_allocate_cpumask();
62    err = numa_node_to_cpus(numanode, bm);
63    assert(err_is_ok(err));
64#else
65    struct bitmap *bm = numa_all_cpus_ptr;
66#endif
67
68
69    coreid_t core = (coreid_t)bitmap_get_first(bm);
70
71    for (coreid_t i = 0; i < nthreads; ++i) {
72        BOMP_DEBUG_NODE("spanning to core %u\n", core);
73        node->threads_max++;
74        node->threads[i].node = node;
75        if (core == disp_get_core_id()) {
76            /* master thread */
77            core = (coreid_t)bitmap_get_next(bm, core);
78            continue;
79        }
80
81        err = bomp_thread_init(core, stack_size, &node->threads[i]);
82        if (err_is_fail(err)) {
83            DEBUG_ERR(err, "creating thread\n");
84            return err;
85        }
86
87        core = (coreid_t)bitmap_get_next(bm, core);
88    }
89    node->threads_active = 0;
90    node->tls = thread_get_tls();
91
92    return SYS_ERR_OK;
93}
94
95/**
96 * \brief callback for creating the dispatcher on the remote core
97 *
98 * \param arg   argument for the callback
99 * \param err   outcome of the spanning request
100 */
101static void bomp_node_init_done(void *arg, errval_t err)
102{
103    assert(err_is_ok(err));
104
105    uint32_t *done = arg;
106    *done = 1;
107}
108
109
110/**
111 * \brief callback when the BOMP thread connects to the node
112 *
113 * \param st    state pointer
114 * \param err   status of the connect
115 * \param _b    created BOMP binding
116 */
117static void bomp_node_accept_cb(void *st,
118                                errval_t err,
119                                struct bomp_binding *_b)
120{
121    struct bomp_node *n = st;
122
123    BOMP_DEBUG_NODE("connection accepted. tid=%" PRIuCOREID "\n", n->id);
124
125    n->node_err = err;
126
127    txq_init(&n->txq, _b, _b->waitset, (txq_register_fn_t) _b->register_send,
128                 sizeof(struct bomp_msg_st));
129
130    _b->st = st;
131    n->ctrl = _b;
132
133 //   _b->rx_vtbl.done = done__rx;
134}
135
136/* a node that is on our local address space */
137static errval_t bomp_node_init_local(nodeid_t nodeid,
138                                     nodeid_t numanode,
139                                     coreid_t nthreads,
140                                     size_t stack_size,
141                                     struct bomp_node *node)
142{
143    BOMP_DEBUG_NODE("Initialize local node node %" PRIuNODEID " with %"
144                     PRIuCOREID " threads\n", nodeid, nthreads);
145
146    errval_t err;
147
148    uint32_t done;
149
150    node->id = nodeid;
151    node->numa_node = numanode;
152    node->threads_max = nthreads;
153    node->stack_size = stack_size;
154    node->threads_max = nthreads;
155
156    struct bitmap *bm = numa_allocate_cpumask();
157    err = numa_node_to_cpus(numanode, bm);
158    assert(err_is_ok(err));
159
160    coreid_t core = (coreid_t)bitmap_get_first(bm);
161
162    err = domain_new_dispatcher(core, bomp_node_init_done, &done);
163    if (err_is_fail(err)) {
164        BOMP_ERROR("creating new dispatcher on core %" PRIuCOREID "failed\n",
165                   core);
166        return err;
167    }
168
169    while(!done) {
170        thread_yield();
171    }
172
173    BOMP_DEBUG_NODE("dispatcher ready. allocating memory for msg channel\n");
174
175    size_t msg_frame_size;
176    err = frame_alloc(&node->msgframe, 2 * BOMP_CHANNEL_SIZE, &msg_frame_size);
177    if (err_is_fail(err)) {
178        return err;
179    }
180
181    err = vspace_map_one_frame(&node->msgbuf, msg_frame_size, node->msgframe,
182                               NULL, NULL);
183    if (err_is_fail(err)) {
184        return err;
185    }
186
187    struct bomp_frameinfo fi = {
188        .sendbase = (lpaddr_t)node->msgbuf + BOMP_CHANNEL_SIZE,
189        .inbuf = node->msgbuf,
190        .inbufsize = BOMP_CHANNEL_SIZE,
191        .outbuf = ((uint8_t *) node->msgbuf) + BOMP_CHANNEL_SIZE,
192        .outbufsize = BOMP_CHANNEL_SIZE
193    };
194
195    BOMP_DEBUG_NODE("creating channel on %p\n", node->msgbuf);
196
197    err = bomp_accept(&fi, node, bomp_node_accept_cb,
198                      get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT);
199
200    if (err_is_fail(err)) {
201        // XXX> error handling
202        return err;
203    }
204
205    BOMP_DEBUG_NODE("creating thread on core %" PRIuCOREID "\n", core);
206    err = domain_thread_create_on(core, bomp_node_msg_handler, node, NULL);
207    if (err_is_fail(err)) {
208        // XXX> error handling
209        return err;
210    }
211
212    while (node->ctrl == NULL) {
213        err = event_dispatch(get_default_waitset());
214        if (err_is_fail(err)) {
215            USER_PANIC_ERR(err, "event dispatch\n");
216        }
217    }
218
219    BOMP_DEBUG_NODE("node master on node %" PRIuNODEID " connected \n", nodeid);
220
221    return node->node_err;
222}
223
224/* remote node: a node that is in a foreign address space */
225static errval_t bomp_node_init_remote(nodeid_t nodeid,
226                                      coreid_t nthreads,
227                                      size_t stack_size,
228                                      struct bomp_node *node)
229{
230    BOMP_DEBUG_NODE("Initialize remote node node %" PRIuNODEID " with %"
231                    PRIuCOREID " threads\n", nodeid, nthreads);
232
233    assert(!"NYI");
234    return SYS_ERR_OK;
235}
236
237
238/**
239 * \brief
240 */
241errval_t bomp_node_init(bomp_node_type_t type,
242                        nodeid_t numanode,
243                        nodeid_t nodeid,
244                        coreid_t nthreads,
245                        size_t stack_size,
246                        struct bomp_node *node)
247{
248    node->type = type;
249
250    switch(type) {
251        case BOMP_NODE_MASTER :
252            return bomp_node_init_threads(nodeid, numanode, nthreads, stack_size, node);
253            break;
254        case BOMP_NODE_LOCAL:
255            return bomp_node_init_local(nodeid, numanode, nthreads, stack_size, node);
256            break;
257        case BOMP_NODE_REMOTE :
258            return bomp_node_init_remote(nodeid, nthreads, stack_size, node);
259            break;
260        default:
261            return -1;
262            break;
263    }
264}
265
266
267
268coreid_t bomp_node_exec(struct bomp_node *node, void *fn, void *arg, coreid_t tid_start, coreid_t nthreads)
269{
270    debug_printf("Executing on node %u\n", node->id);
271    assert(!"NYI");
272    return node->threads_max;
273
274    return 0;
275}
276
277#if 0
278
279
280/*
281 * ==============================================================================
282 * Control Channel: Node Master Side
283 * ==============================================================================
284 */
285
286/**
287 * \brief initializes the shared memory channel Node Master - Worker Threads
288 *        (Worker Side)
289 *
290 * \param channel    address of the message buffers to use
291 */
292errval_t bomp_noded_channel_bind(void *channel)
293{
294    assert(!"NYI");
295
296    return SYS_ERR_OK;
297}
298
299#endif
300
301/**
302 * \brief XOMP channel connect callback called by the Flounder backend
303 *
304 * \param st    Supplied worker state
305 * \param err   outcome of the connect attempt
306 * \param xb    XOMP Flounder binding
307 */
308static void bomp_node_connect_cb(void *st,
309                                   errval_t err,
310                                   struct bomp_binding *b)
311{
312    struct bomp_thread *t = st;
313
314    BOMP_DEBUG_THREAD("connected to node master.\n");
315
316    t->ctrl = b;
317
318    txq_init(&t->txq, b, b->waitset, (txq_register_fn_t) b->register_send,
319             sizeof(struct bomp_msg_st));
320
321    //b->rx_vtbl.execute = execute__rx;
322}
323
324/**
325 * \brief
326 *
327 * \param arg
328 *
329 * \return
330 */
331static int bomp_node_msg_handler(void *arg)
332{
333    BOMP_DEBUG_NODE("node master message handler started\n");
334
335    errval_t err;
336
337    struct bomp_tls *tls = calloc(1, sizeof(struct bomp_tls));
338    if (tls == NULL) {
339        BOMP_ERROR("Could not allocate memory for TLS. %p\n", arg);
340        return -1;
341    }
342
343    struct bomp_node *node = arg;
344
345    assert(numa_current_node() == node->numa_node);
346
347    tls->role = BOMP_THREAD_ROLE_NODE;
348    tls->self = thread_self();
349    tls->r.node.id = node->id;
350    tls->r.node.msgbuf = node->msgbuf;
351    tls->r.node.tls = tls;
352    tls->r.node.stack_size = node->stack_size;
353
354    struct bomp_frameinfo fi = {
355        .sendbase = (lpaddr_t)arg,
356        .inbuf = ((uint8_t *) arg) + BOMP_CHANNEL_SIZE,
357        .inbufsize = BOMP_CHANNEL_SIZE,
358        .outbuf = ((uint8_t *) arg),
359        .outbufsize = BOMP_CHANNEL_SIZE
360    };
361
362    struct waitset *ws = get_default_waitset();
363
364    BOMP_DEBUG_NODE("initializing local worker threads\n");
365    err = bomp_node_init_threads(node->id, node->numa_node, node->threads_max,
366                                 node->stack_size, &tls->r.node);
367    if (err_is_fail(err)) {
368        DEBUG_ERR(err, "init threads\n");
369    }
370
371    assert(node->threads_max == tls->r.node.threads_max);
372
373    BOMP_DEBUG_NODE("connecting to program master\n");
374
375    err = bomp_connect(&fi, bomp_node_connect_cb, &tls->r.thread, ws,
376                       IDC_EXPORT_FLAGS_DEFAULT);
377
378
379    if (err_is_fail(err)) {
380        /* TODO: Clean up */
381        return err_push(err, XOMP_ERR_WORKER_INIT_FAILED);
382    }
383
384    thread_set_tls(tls);
385
386    while(1) {
387        err = event_dispatch_non_block(ws);
388        switch(err_no(err)) {
389            case LIB_ERR_NO_EVENT :
390                thread_yield();
391                continue;
392                break;
393            case SYS_ERR_OK:
394                continue;
395            break;
396            default:
397                USER_PANIC_ERR(err, "event dispatch");
398                break;
399        }
400    }
401
402    BOMP_NOTICE("node master %" PRIuNODEID " terminated", tls->r.node.id);
403
404
405    return 0;
406}
407