1/*
2 * Copyright (c) 2014 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10/**
11 * \brief this file contains the controll channel implementation between the
12 *        node masters and the bomp worker threads
13 */
14
15#include <bomp_internal.h>
16
17#include <if/bomp_defs.h>
18
19
20/* forward declaration */
21static int bomp_thread_msg_handler(void *arg);
22
23///< stores the state of a message in transit
24struct bomp_msg_st
25{
26    struct txq_msg_st common;       ///< common msg state
27
28    uint32_t *message_sent;         ///<
29
30    /* union of arguments */
31    union {
32        struct {
33            uint64_t fn;            ///< the function
34            uint64_t arg;           ///< the argument to the function
35            uint32_t tid;           ///< thread ID
36            uint64_t icv;           ///< thread's control variables
37        } exec;                     ///< execution
38    } args;
39};
40
41/*
42 * ==============================================================================
43 * Control Channel: Node Master Side
44 * ==============================================================================
45 */
46
47/*
48 * -----------------------------------------------------------------------------
49 * RX Handlers
50 * -----------------------------------------------------------------------------
51 */
52
53static void done__rx(struct bomp_binding *_binding, bomp_errval_t status)
54{
55    BOMP_DEBUG_THREAD("done__rx from thread\n");
56
57    struct bomp_thread *t = _binding->st;
58
59    struct bomp_node *node = t->node;
60
61
62    node->threads_active--;
63
64    BOMP_DEBUG_THREAD("threads active %u\n", node->threads_active);
65
66}
67
68/*
69 * -----------------------------------------------------------------------------
70 * TX Handlers
71 * -----------------------------------------------------------------------------
72 */
73
74static void txq_msg_sent_cb(void *st)
75{
76    struct bomp_msg_st *msg_st = (struct bomp_msg_st *)st;
77    *(msg_st->message_sent) = 1;
78}
79
80static errval_t execute__tx(struct txq_msg_st *msg_st)
81{
82    struct bomp_msg_st *st = (struct bomp_msg_st *)msg_st;
83
84    return bomp_execute__tx(msg_st->queue->binding, TXQCONT(msg_st),
85                            st->args.exec.fn, st->args.exec.arg, st->args.exec.tid,
86                            st->args.exec.icv);
87}
88
89
90
91/**
92 * \brief callback when the BOMP thread connects to the node
93 *
94 * \param st    state pointer
95 * \param err   status of the connect
96 * \param _b    created BOMP binding
97 */
98static void bomp_thread_accept_cb(void *st,
99                                  errval_t err,
100                                  struct bomp_binding *_b)
101{
102    struct bomp_thread *t = st;
103
104    BOMP_DEBUG_THREAD("connection accepted. tid=%" PRIuCOREID "\n", t->coreid);
105
106    t->thread_err = err;
107
108    txq_init(&t->txq, _b, _b->waitset, (txq_register_fn_t) _b->register_send,
109                 sizeof(struct bomp_msg_st));
110
111    _b->st = st;
112    t->ctrl = _b;
113
114    _b->rx_vtbl.done = done__rx;
115}
116
117/**
118 * \brief callback for creating the dispatcher on the remote core
119 *
120 * \param arg   argument for the callback
121 * \param err   outcome of the spanning request
122 */
123static void bomp_thread_init_done(void *arg, errval_t err)
124{
125    assert(err_is_ok(err));
126
127    uint32_t *done = arg;
128    *done = 1;
129}
130
131/**
132 * \brief initializes a thread on the given core
133 *
134 * \@param core         ID of the core on which to create the tread on
135 * \param stack_size    size of the stack of the tread to be created
136 * \param thread        pointer to the thread struct to create
137 *
138 * \returns SYS_ERR_OK on SUCCESS
139 *          errval on FAILURE
140 */
141errval_t bomp_thread_init(coreid_t core,
142                          size_t stack_size,
143                          struct bomp_thread *thread)
144{
145    errval_t err;
146
147    BOMP_DEBUG_THREAD("Creating thread on core %"PRIuCOREID " \n", core);
148
149    uint32_t done;
150
151    err = domain_new_dispatcher(core, bomp_thread_init_done, &done);
152    if (err_is_fail(err)) {
153        BOMP_ERROR("creating new dispatcher on core %" PRIuCOREID "failed\n",
154                   core);
155        return err;
156    }
157
158    while(!done) {
159        thread_yield();
160    }
161
162    BOMP_DEBUG_THREAD("dispatcher ready. allocating memory for msg channel\n");
163
164    size_t msg_frame_size;
165    err = frame_alloc(&thread->msgframe, 2 * BOMP_CHANNEL_SIZE, &msg_frame_size);
166    if (err_is_fail(err)) {
167        return err;
168    }
169
170    err = vspace_map_one_frame(&thread->msgbuf, msg_frame_size, thread->msgframe,
171                               NULL, NULL);
172    if (err_is_fail(err)) {
173        return err;
174    }
175
176    struct bomp_frameinfo fi = {
177        .sendbase = (lpaddr_t)thread->msgbuf + BOMP_CHANNEL_SIZE,
178        .inbuf = thread->msgbuf,
179        .inbufsize = BOMP_CHANNEL_SIZE,
180        .outbuf = ((uint8_t *) thread->msgbuf) + BOMP_CHANNEL_SIZE,
181        .outbufsize = BOMP_CHANNEL_SIZE
182    };
183
184    BOMP_DEBUG_THREAD("creating channel on %p\n", thread->msgbuf);
185
186    err = bomp_accept(&fi, thread, bomp_thread_accept_cb,
187                      get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT);
188
189    if (err_is_fail(err)) {
190        // XXX> error handling
191        return err;
192    }
193
194    BOMP_DEBUG_THREAD("creating thread on core %" PRIuCOREID "\n", core);
195    err = domain_thread_create_on(core, bomp_thread_msg_handler,
196                                  thread->msgbuf, NULL);
197    if (err_is_fail(err)) {
198        // XXX> error handling
199        return err;
200    }
201
202    while (thread->ctrl == NULL) {
203        err = event_dispatch(get_default_waitset());
204        if (err_is_fail(err)) {
205            USER_PANIC_ERR(err, "event dispatch\n");
206        }
207    }
208
209    BOMP_DEBUG_THREAD("thread on core %" PRIuCOREID " connected \n", core);
210
211    return thread->thread_err;
212}
213
214errval_t bomp_thread_exec(struct bomp_thread *thread,
215                          bomp_thread_fn_t fn, void *arg, uint32_t tid)
216{
217    debug_printf("bomp_thread_exec(%p, %p, %p, %u) %p\n", thread, fn, arg, tid, thread->icvt);
218    struct txq_msg_st *msg_st = txq_msg_st_alloc(&thread->txq);
219    if (msg_st == NULL) {
220        return LIB_ERR_MALLOC_FAIL;
221    }
222
223    uint32_t msg_sent = 0;
224
225    msg_st->send = execute__tx;
226    msg_st->cleanup = (txq_cleanup_fn_t)txq_msg_sent_cb;
227
228    struct bomp_msg_st *bomp_msg_st = (struct bomp_msg_st *)msg_st;
229
230    bomp_msg_st->args.exec.arg = (uint64_t)arg;
231    bomp_msg_st->args.exec.fn = (uint64_t)fn;
232    bomp_msg_st->args.exec.tid = tid;
233    bomp_msg_st->args.exec.icv = (uint64_t)thread->icvt;
234    bomp_msg_st->message_sent = &msg_sent;
235
236    txq_send(msg_st);
237
238    while(msg_sent == 0) {
239        event_dispatch(get_default_waitset());
240    }
241
242    //return event_dispatch_non_block(get_default_waitset());
243    return SYS_ERR_OK;
244}
245
246/*
247 * ==============================================================================
248 * Control Channel: Worker Thread Side
249 * ==============================================================================
250 */
251
252/*
253 * -----------------------------------------------------------------------------
254 * TX Handlers
255 * -----------------------------------------------------------------------------
256 */
257
258static errval_t done__tx(struct txq_msg_st *msg_st)
259{
260    BOMP_DEBUG_THREAD("done__tx\n");
261
262    return bomp_done__tx(msg_st->queue->binding, TXQCONT(msg_st),msg_st->err);
263}
264
265/*
266 * -----------------------------------------------------------------------------
267 * RX Handlers
268 * -----------------------------------------------------------------------------
269 */
270
271static void execute__rx(struct bomp_binding *_binding,
272                        uint64_t fn, uint64_t arg, uint32_t tid, uint64_t icv_task)
273{
274
275
276    struct bomp_thread *t = _binding->st;
277    struct bomp_tls *tls = thread_get_tls();
278
279    BOMP_DEBUG_THREAD("execute__rx: %p %p, %lx\n", t, tls, icv_task);
280
281    assert(t == &tls->r.thread);
282
283    struct omp_icv_task icvt;
284    memcpy(&icvt, (void *)icv_task, sizeof(struct omp_icv_task));
285
286    bomp_icv_set_task(&icvt);
287
288    tls->thread_id = tid;
289
290    bomp_thread_fn_t func= (bomp_thread_fn_t)fn;
291
292    // calling the function
293    func((void *)arg);
294
295    bomp_icv_set_task(NULL);
296    tls->thread_id = -1;
297
298    struct txq_msg_st *msg_st = txq_msg_st_alloc(&t->txq);
299    if (msg_st == NULL) {
300        BOMP_ERROR("allocation of message state failed: %" PRIu32 "\n", tid);
301        return;
302    }
303
304    msg_st->send = done__tx;
305    msg_st->err = SYS_ERR_OK;
306
307    txq_send(msg_st);
308}
309
310/**
311 * \brief XOMP channel connect callback called by the Flounder backend
312 *
313 * \param st    Supplied worker state
314 * \param err   outcome of the connect attempt
315 * \param xb    XOMP Flounder binding
316 */
317static void bomp_thread_connect_cb(void *st,
318                                   errval_t err,
319                                   struct bomp_binding *b)
320{
321    struct bomp_thread *t = st;
322
323    BOMP_DEBUG_THREAD("connected to node master.\n");
324
325    t->ctrl = b;
326
327    txq_init(&t->txq, b, b->waitset, (txq_register_fn_t) b->register_send,
328             sizeof(struct bomp_msg_st));
329
330    b->rx_vtbl.execute = execute__rx;
331}
332
333
334/**
335 * \brief
336 *
337 * \param arg
338 *
339 * \return
340 */
341static int bomp_thread_msg_handler(void *arg)
342{
343
344
345    errval_t err;
346
347    struct bomp_tls *tls = calloc(1, sizeof(struct bomp_tls));
348    if (tls == NULL) {
349        BOMP_ERROR("Could not allocate memory for TLS. %p\n", arg);
350        return -1;
351    }
352
353    BOMP_DEBUG_THREAD("thread message handler started %p\n", tls);
354
355    tls->role = BOMP_THREAD_ROLE_WORKER;
356    tls->self = thread_self();
357    tls->r.thread.coreid = disp_get_core_id();
358    tls->r.thread.msgbuf = arg;
359    tls->r.thread.tls = tls;
360
361    struct waitset local_waitset;
362    //struct waitset *ws = get_default_waitset();
363    struct waitset *ws = &local_waitset;
364
365    waitset_init(ws);
366
367
368    struct bomp_frameinfo fi = {
369        .sendbase = (lpaddr_t)arg,
370        .inbuf = ((uint8_t *) arg) + BOMP_CHANNEL_SIZE,
371        .inbufsize = BOMP_CHANNEL_SIZE,
372        .outbuf = ((uint8_t *) arg),
373        .outbufsize = BOMP_CHANNEL_SIZE
374    };
375
376
377
378    err = bomp_connect(&fi, bomp_thread_connect_cb, &tls->r.thread, ws,
379                       IDC_EXPORT_FLAGS_DEFAULT);
380
381
382    if (err_is_fail(err)) {
383        /* TODO: Clean up */
384        return err_push(err, XOMP_ERR_WORKER_INIT_FAILED);
385    }
386
387    thread_set_tls(tls);
388
389
390    while(1) {
391        err = event_dispatch_non_block(ws);
392        switch(err_no(err)) {
393            case LIB_ERR_NO_EVENT :
394                thread_yield();
395                continue;
396                break;
397            case SYS_ERR_OK:
398                continue;
399            break;
400            default:
401                USER_PANIC_ERR(err, "event dispatch");
402                break;
403        }
404    }
405
406    BOMP_NOTICE("thread %" PRIuCOREID " terminated", disp_get_core_id());
407
408
409    return 0;
410}
411