1/** \file 2 * \brief A simple work stealing library based upon both cilk and wool. 3 */ 4 5/* 6 * Copyright (c) 2010, ETH Zurich. 7 * All rights reserved. 8 * 9 * This file is distributed under the terms in the attached LICENSE file. 10 * If you do not find this file, copies can be found by writing to: 11 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 12 */ 13#include "tweed/tweed.h" 14#include <stdio.h> 15#include <stdlib.h> 16#include <string.h> 17#include <barrelfish/dispatch.h> 18#include "trace/trace.h" 19#include <trace_definitions/trace_defs.h> 20 21/** Array of worker descriptors */ 22static struct worker_desc * workers; 23/** Number of workers started */ 24static int num_workers; 25/** Number of dispatchers started */ 26static volatile int num_dispatchers = 1; 27/** Space used to store task for each worker (accessed as a stack) */ 28static char * task_stack_space; 29/** Signals program exit */ 30static volatile int do_quit = 0; 31 32#ifndef TWEED_LOCK_FREE 33// TODO - replace these with mutexes, once they work 34#define LOCK(x) acquire_spinlock(&x); 35#define UNLOCK(x) release_spinlock(&x) 36#else 37#define LOCK(x) 38#define UNLOCK(x) 39#endif 40 41 42struct worker_args { 43 int id; 44 int origin; 45}; 46static int steal(struct generic_task_desc * _tweed_top_, 47 struct worker_desc * victim); 48 49/** Start the main task */ 50static int main_worker (int id, 51 int(*main_func)(struct generic_task_desc *,void*), 52 void * main_args) { 53 54 workers[id].worker_thr = thread_self(); 55 workers[id].id = id; 56 workers[id].core_id = disp_get_core_id(); 57 58 struct generic_task_desc * _tweed_top_ = NULL; 59 60 thread_set_tls(&(workers[id])); 61 62 int ret = main_func(_tweed_top_, main_args); 63 64 // signal exit to other workers 65 do_quit = 1; 66 67 return ret; 68} 69 70 71 72/** Work-stealing loop for workers */ 73static int worker_run (void * data) { 74 struct worker_args * args = (struct worker_args*) data; 75 int id = args->id; 76 77 errval_t err = thread_detach(thread_self()); 78 assert(err_is_ok(err)); 79 80 free(args); 81 workers[id].worker_thr = thread_self(); 82 workers[id].id = id; 83 workers[id].core_id = disp_get_core_id(); 84 85 struct generic_task_desc * _tweed_top_ = NULL; 86 87 thread_set_tls( &(workers[id])); 88 89 trace_init_disp(); 90 91 num_dispatchers += 1; 92 93 // start trying to steal work 94 int steal_id = (id+1) % num_workers; 95 while(!do_quit) { 96 int success = steal(_tweed_top_, &workers[steal_id]); 97 if (!success) { 98 // try next worker 99 steal_id = (steal_id+1) % num_workers; 100 } 101 } 102 exit(0); 103 return 0; 104} 105 106static int start_worker_thread(void * data) { 107 thread_create(&worker_run, data); 108 return 0; 109} 110 111 112/** Initialize a worker's data-structures */ 113static void init_worker(int id, void* stack_start) { 114 workers[id].task_desc_stack = (struct generic_task_desc *) stack_start; 115 memset(workers[id].task_desc_stack, 0, TWEED_TASK_STACK_SIZE); 116 workers[id].bot = NULL; 117 // TODO - when mutexes work - workers[id].lock = 118 // (struct thread_mutex *) malloc (sizeof(struct thread_mutex)); 119 // thread_mutex_init (workers[id].lock); 120} 121 122 123static void domain_spanned_callback(void *arg, errval_t err) 124{ 125 num_dispatchers++; 126} 127 128 129/** Initialise the tweed library - must be called before any other 130 * tweed calls 131 */ 132int init_tweed(int workers_requested, 133 int(*main_func)(struct generic_task_desc *,void*), 134 void* main_args) { 135 int i, err; 136 137 if (workers_requested < 1) { 138 fprintf(stderr, 139 "Error initalizing tweed - requested less than 1 worker\n"); 140 return -1; 141 } 142 143 num_workers = workers_requested; 144 workers = (struct worker_desc *) malloc ( 145 num_workers * sizeof(struct worker_desc)); 146 // alloc task stack space for all workers, leave space for alignment 147 task_stack_space = malloc (TWEED_TASK_STACK_SIZE * (num_workers + 1)); 148 char * curr_stack_space = (char*)(((unsigned long)task_stack_space + TWEED_TASK_STACK_SIZE) & ~TWEED_TASK_STACK_MASK); 149 150 // Initialize worker data-structures 151 for (i=0; i<num_workers; i++) { 152 init_worker(i, curr_stack_space); 153 curr_stack_space += TWEED_TASK_STACK_SIZE; 154 } 155 156 // create dispatchers on all other cores required for num_workers 157 for (i=1; i<num_workers; i++) { 158 err = domain_new_dispatcher(i + disp_get_core_id(), 159 domain_spanned_callback, 160 (void*)(uintptr_t)i); 161 if (err_is_fail(err)) { 162 DEBUG_ERR(err, "domain_new_dispatcher failed"); 163 printf("%d failed\n", i); 164 } 165 } 166 167 // wait for all dispatchers to come up 168 while (num_dispatchers < num_workers) { 169 messages_wait_and_handle_next(); 170 } 171 num_dispatchers = 1; // reset 172 173 // start work stealing threads on newly created domains 174 for (i = 1; i < num_workers; i++) { 175 struct worker_args * args = (struct worker_args *) malloc ( 176 sizeof(struct worker_args)); 177 args->id = i; 178 args->origin = disp_get_core_id(); 179 180 err = domain_thread_create_on(i + disp_get_core_id(), start_worker_thread, 181 args, NULL); 182 if (err_is_fail(err)) { 183 DEBUG_ERR(err, "Failed to run a function on remote core"); 184 } 185 } 186 187 // wait for all dispatchers to come up 188 while (num_dispatchers < num_workers) { 189 messages_wait_and_handle_next(); 190 } 191 192 // now start the main worker on the current dispatcher 193 return main_worker(0, main_func, main_args); 194} 195 196static inline tweed_task_func_t steal_task(struct generic_task_desc * task, 197 struct worker_desc * thief) { 198#ifdef TWEED_USE_CAS 199 tweed_task_func_t func = task->f.func; 200 int success = cmpxchg128((uint64_t *)&(task->f.func), 201 (uint64_t)func, TWEED_TASK_NEW, 202 (uint64_t)thief, TWEED_TASK_STOLEN); 203 return success ? func : NULL; 204#else 205 task->balarm = TWEED_TASK_STOLEN; 206 mfence(); 207 tweed_task_func_t func = task->f.func; 208 task->thief = thief; 209 mfence(); 210 return func; 211#endif 212} 213 214 215 216/** Initializes _tweed_top_ to start of this worker's task block 217 */ 218struct generic_task_desc * set_top(void) { 219 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING, 0); 220 struct worker_desc * tls = (struct worker_desc *) thread_get_tls(); 221 LOCK(tls->lock); 222 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING_END, 0); 223 tls->bot = workers[tls->id].task_desc_stack; 224 UNLOCK(tls->lock); 225 return workers[tls->id].task_desc_stack; 226} 227 228 229/** Called when a worker spawns its first task to set its bot value so other 230 * workers can steal tasks from it. 231 */ 232static inline void set_bot(struct generic_task_desc * val) { 233 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING, 0); 234 struct worker_desc * tls = (struct worker_desc *) thread_get_tls(); 235 LOCK(tls->lock); 236 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_LOCKING_END, 0); 237 tls->bot = val; 238 UNLOCK(tls->lock); 239} 240 241 242 243/** Called when a worker spawns its first task to set its bot value so other 244 * workers can steal tasks from it. 245 */ 246static inline void atomic_inc(struct generic_task_desc ** bot, int bytes) { 247 __sync_fetch_and_add((uint64_t*)bot, (uint64_t) bytes); 248} 249 250 251/** Steal work from another worker's task stack */ 252static int steal(struct generic_task_desc * _tweed_top_, 253 struct worker_desc * victim) { 254 struct generic_task_desc * stolenTask; 255 struct worker_desc * me = (struct worker_desc *) thread_get_tls(); 256 257 LOCK(victim->lock); 258 259 stolenTask = victim->bot; 260 // check if there is actually work to steal 261 if (stolenTask != NULL && stolenTask->balarm == TWEED_TASK_NEW) { 262 263 // try to steal task 264 tweed_task_func_t func = steal_task(stolenTask, me); 265 266 if (func == NULL) { 267 // we didn't succeed in the steal, back off 268#ifndef TWEED_USE_CAS 269 stolenTask->balarm = TWEED_TASK_INLINED; 270 stolenTask->thief = NULL; 271#endif 272 UNLOCK(victim->lock); 273 return 0; // didn't steal anything 274 } else { 275 // we have stolen the task, update bot 276 atomic_inc(&(victim->bot), stolenTask->size); 277 UNLOCK(victim->lock); 278 279 // and run task 280 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_STEAL, victim->core_id); 281 func(_tweed_top_, stolenTask); 282 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_STEAL_END, 283 victim->core_id); 284 285 // signal task completion 286 stolenTask->balarm |= TWEED_TASK_COMPLETE; 287 return 1; 288 } 289 } else { 290 UNLOCK(victim->lock); 291 return 0; // didn't steal anything 292 } 293} 294 295/** Check if syncing task really is stolen */ 296int sync_stolen(struct generic_task_desc * _tweed_top_) { 297#ifndef TWEED_LOCK_FREE 298 struct worker_desc * tls = (struct worker_desc *) thread_get_tls(); 299#endif 300 LOCK(tls->lock); 301 int ret = ((_tweed_top_->balarm & TWEED_TASK_STOLEN) != 0); 302 UNLOCK(tls->lock); 303 return ret; 304} 305 306#ifdef TWEED_USE_CAS 307#define GET_THIEF(t) t->f.thief 308#else 309#define GET_THIEF(t) t->thief 310#endif 311 312#if defined(TWEED_WAITING) 313static inline int waiting(struct generic_task_desc * _tweed_top_) { 314 // do nothing, just wait 315 return 0; 316} 317#elif defined(TWEED_LEAPFROG) 318static inline int waiting(struct generic_task_desc * _tweed_top_) { 319 // steal work from the thief 320 return steal(_tweed_top_, (struct worker_desc *) GET_THIEF(_tweed_top_)); 321} 322#elif defined(TWEED_PARK) 323#error "NYI" 324#else 325#error "One of TWEED_WAITING, TWEED_LEAPFROG or TWEED_PARK must be defined" 326#endif 327 328/** Handle stolen task */ 329int handle_stolen_task(struct generic_task_desc * _tweed_top_) { 330 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_WAIT, 331 GET_THIEF(_tweed_top_)->core_id); 332 333 while ((_tweed_top_->balarm & TWEED_TASK_COMPLETE) == 0) { 334 if (!waiting(_tweed_top_)) { 335 thread_yield(); 336 } 337 } 338 trace_event(TRACE_SUBSYS_TWEED, TRACE_EVENT_TWEED_WAIT_END, 339 GET_THIEF(_tweed_top_)->core_id); ; 340 341 // update bot 342 set_bot(_tweed_top_); 343 return 0; 344} 345