1/** \file
2 *  \brief A simple work stealing library based upon both cilk and wool.
3 */
4
5/*
6 * Copyright (c) 2010, ETH Zurich.
7 * All rights reserved.
8 *
9 * This file is distributed under the terms in the attached LICENSE file.
10 * If you do not find this file, copies can be found by writing to:
11 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
12 */
13#include "tweed/tweed.h"
14#include <stdio.h>
15#include <stdlib.h>
16#include <string.h>
17#include <barrelfish/dispatch.h>
18#include "trace/trace.h"
19#include <trace_definitions/trace_defs.h>
20
21/** Array of worker descriptors */
22static struct worker_desc * workers;
23/** Number of workers started */
24static int num_workers;
25/** Number of dispatchers started */
26static volatile int num_dispatchers = 1;
27/** Space used to store task for each worker (accessed as a stack) */
28static char * task_stack_space;
29/** Signals program exit */
30static volatile int do_quit = 0;
31
32#ifndef TWEED_LOCK_FREE
33// TODO - replace these with mutexes, once they work
34#define LOCK(x)   acquire_spinlock(&x);
35#define UNLOCK(x) release_spinlock(&x)
36#else
37#define LOCK(x)
38#define UNLOCK(x)
39#endif
40
41
42struct worker_args {
43    int id;
44    int origin;
45};
46static int steal(struct generic_task_desc * _tweed_top_,
47                 struct worker_desc * victim);
48
49/** Start the main task */
50static int main_worker (int id,
51                        int(*main_func)(struct generic_task_desc *,void*),
52                        void * main_args) {
53
54    workers[id].worker_thr = thread_self();
55    workers[id].id = id;
56    workers[id].core_id = disp_get_core_id();
57
58    struct generic_task_desc * _tweed_top_ = NULL;
59
60    thread_set_tls(&(workers[id]));
61
62    int ret = main_func(_tweed_top_, main_args);
63
64    // signal exit to other workers
65    do_quit = 1;
66
67    return ret;
68}
69
70
71
72/** Work-stealing loop for workers */
73static int worker_run (void * data) {
74    struct worker_args * args = (struct worker_args*) data;
75    int id = args->id;
76
77    errval_t err = thread_detach(thread_self());
78    assert(err_is_ok(err));
79
80    free(args);
81    workers[id].worker_thr = thread_self();
82    workers[id].id = id;
83    workers[id].core_id = disp_get_core_id();
84
85    struct generic_task_desc * _tweed_top_ = NULL;
86
87    thread_set_tls( &(workers[id]));
88
89    trace_init_disp();
90
91    num_dispatchers += 1;
92
93    // start trying to steal work
94    int steal_id = (id+1) % num_workers;
95    while(!do_quit) {
96        int success = steal(_tweed_top_, &workers[steal_id]);
97        if (!success) {
98            // try next worker
99            steal_id = (steal_id+1) % num_workers;
100        }
101    }
102    exit(0);
103    return 0;
104}
105
106static int start_worker_thread(void * data) {
107    thread_create(&worker_run, data);
108    return 0;
109}
110
111
112/** Initialize a worker's data-structures */
113static void init_worker(int id, void* stack_start) {
114    workers[id].task_desc_stack = (struct generic_task_desc *) stack_start;
115    memset(workers[id].task_desc_stack, 0, TWEED_TASK_STACK_SIZE);
116    workers[id].bot = NULL;
117    // TODO - when mutexes work - workers[id].lock =
118    //    (struct thread_mutex *) malloc (sizeof(struct thread_mutex));
119    // thread_mutex_init (workers[id].lock);
120}
121
122
123static void domain_spanned_callback(void *arg, errval_t err)
124{
125    num_dispatchers++;
126}
127
128
129/** Initialise the tweed library - must be called before any other
130 *  tweed calls
131 */
132int init_tweed(int workers_requested,
133              int(*main_func)(struct generic_task_desc *,void*),
134              void* main_args) {
135    int i, err;
136
137    if (workers_requested < 1) {
138        fprintf(stderr,
139                "Error initalizing tweed - requested less than 1 worker\n");
140        return -1;
141    }
142
143    num_workers = workers_requested;
144    workers = (struct worker_desc *) malloc (
145	          num_workers * sizeof(struct worker_desc));
146    // alloc task stack space for all workers, leave space for alignment
147    task_stack_space = malloc (TWEED_TASK_STACK_SIZE * (num_workers + 1));
148    char * curr_stack_space = (char*)(((unsigned long)task_stack_space + TWEED_TASK_STACK_SIZE) & ~TWEED_TASK_STACK_MASK);
149
150    // Initialize worker data-structures
151    for (i=0; i<num_workers; i++) {
152        init_worker(i, curr_stack_space);
153        curr_stack_space += TWEED_TASK_STACK_SIZE;
154    }
155
156    // create dispatchers on all other cores required for num_workers
157    for (i=1; i<num_workers; i++) {
158        err = domain_new_dispatcher(i + disp_get_core_id(),
159                                    domain_spanned_callback,
160                                    (void*)(uintptr_t)i);
161        if (err_is_fail(err)) {
162            DEBUG_ERR(err, "domain_new_dispatcher failed");
163            printf("%d failed\n", i);
164        }
165    }
166
167    // wait for all dispatchers to come up
168    while (num_dispatchers < num_workers) {
169        messages_wait_and_handle_next();
170    }
171    num_dispatchers = 1;  // reset
172
173    // start work stealing threads on newly created domains
174    for (i = 1; i < num_workers; i++) {
175        struct worker_args * args = (struct worker_args *) malloc (
176                                        sizeof(struct worker_args));
177        args->id = i;
178        args->origin = disp_get_core_id();
179
180        err = domain_thread_create_on(i + disp_get_core_id(), start_worker_thread,
181                                 args, NULL);
182        if (err_is_fail(err)) {
183            DEBUG_ERR(err, "Failed to run a function on remote core");
184        }
185    }
186
187    // wait for all dispatchers to come up
188    while (num_dispatchers < num_workers) {
189        messages_wait_and_handle_next();
190    }
191
192    // now start the main worker on the current dispatcher
193    return main_worker(0, main_func, main_args);
194}
195
196static inline tweed_task_func_t steal_task(struct generic_task_desc * task,
197                                           struct worker_desc * thief) {
198#ifdef TWEED_USE_CAS
199    tweed_task_func_t func = task->f.func;
200    int success = cmpxchg128((uint64_t *)&(task->f.func),
201                             (uint64_t)func, TWEED_TASK_NEW,
202                             (uint64_t)thief, TWEED_TASK_STOLEN);
203    return success ? func : NULL;
204#else
205    task->balarm = TWEED_TASK_STOLEN;
206    mfence();
207    tweed_task_func_t func = task->f.func;
208    task->thief = thief;
209    mfence();
210    return func;
211#endif
212}
213
214
215
216/** Initializes _tweed_top_ to start of this worker's task block
217 */
218struct generic_task_desc * set_top(void) {
219    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING, 0);
220    struct worker_desc * tls = (struct worker_desc *) thread_get_tls();
221    LOCK(tls->lock);
222    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING_END, 0);
223    tls->bot = workers[tls->id].task_desc_stack;
224    UNLOCK(tls->lock);
225    return workers[tls->id].task_desc_stack;
226}
227
228
229/** Called when a worker spawns its first task to set its bot value so other
230 *  workers can steal tasks from it.
231 */
232static inline void set_bot(struct generic_task_desc * val) {
233    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING, 0);
234    struct worker_desc * tls = (struct worker_desc *) thread_get_tls();
235    LOCK(tls->lock);
236    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING_END, 0);
237    tls->bot = val;
238    UNLOCK(tls->lock);
239}
240
241
242
243/** Called when a worker spawns its first task to set its bot value so other
244 *  workers can steal tasks from it.
245 */
246static inline void atomic_inc(struct generic_task_desc ** bot, int bytes) {
247    __sync_fetch_and_add((uint64_t*)bot, (uint64_t) bytes);
248}
249
250
251/** Steal work from another worker's task stack */
252static int steal(struct generic_task_desc * _tweed_top_,
253                 struct worker_desc * victim) {
254    struct generic_task_desc * stolenTask;
255    struct worker_desc * me = (struct worker_desc *) thread_get_tls();
256
257    LOCK(victim->lock);
258
259    stolenTask = victim->bot;
260    // check if there is actually work to steal
261    if (stolenTask != NULL && stolenTask->balarm == TWEED_TASK_NEW) {
262
263        // try to steal task
264        tweed_task_func_t func = steal_task(stolenTask, me);
265
266        if (func == NULL) {
267            // we didn't succeed in the steal, back off
268#ifndef TWEED_USE_CAS
269            stolenTask->balarm  = TWEED_TASK_INLINED;
270            stolenTask->thief = NULL;
271#endif
272            UNLOCK(victim->lock);
273            return 0; // didn't steal anything
274        } else {
275            // we have stolen the task, update bot
276            atomic_inc(&(victim->bot), stolenTask->size);
277            UNLOCK(victim->lock);
278
279            // and run task
280            trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_STEAL, victim->core_id);
281            func(_tweed_top_, stolenTask);
282            trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_STEAL_END,
283                        victim->core_id);
284
285            // signal task completion
286            stolenTask->balarm |= TWEED_TASK_COMPLETE;
287            return 1;
288        }
289    } else {
290        UNLOCK(victim->lock);
291        return 0; // didn't steal anything
292    }
293}
294
295/** Check if syncing task really is stolen */
296int sync_stolen(struct generic_task_desc * _tweed_top_) {
297#ifndef TWEED_LOCK_FREE
298    struct worker_desc * tls = (struct worker_desc *) thread_get_tls();
299#endif
300    LOCK(tls->lock);
301    int ret = ((_tweed_top_->balarm & TWEED_TASK_STOLEN) != 0);
302    UNLOCK(tls->lock);
303    return ret;
304}
305
306#ifdef TWEED_USE_CAS
307#define GET_THIEF(t) t->f.thief
308#else
309#define GET_THIEF(t) t->thief
310#endif
311
312#if defined(TWEED_WAITING)
313static inline int waiting(struct generic_task_desc * _tweed_top_) {
314    // do nothing, just wait
315    return 0;
316}
317#elif defined(TWEED_LEAPFROG)
318static inline int waiting(struct generic_task_desc * _tweed_top_) {
319    // steal work from the thief
320    return steal(_tweed_top_, (struct worker_desc *) GET_THIEF(_tweed_top_));
321}
322#elif defined(TWEED_PARK)
323#error "NYI"
324#else
325#error "One of TWEED_WAITING, TWEED_LEAPFROG or TWEED_PARK must be defined"
326#endif
327
328/** Handle stolen task */
329int handle_stolen_task(struct generic_task_desc * _tweed_top_) {
330    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_WAIT,
331                GET_THIEF(_tweed_top_)->core_id);
332
333    while ((_tweed_top_->balarm & TWEED_TASK_COMPLETE) == 0) {
334        if (!waiting(_tweed_top_)) {
335            thread_yield();
336        }
337    }
338    trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_WAIT_END,
339                GET_THIEF(_tweed_top_)->core_id); ;
340
341    // update bot
342    set_bot(_tweed_top_);
343    return 0;
344}
345