1/* 2 * Copyright (c) 2014 ETH Zurich. 3 * All rights reserved. 4 * 5 * This file is distributed under the terms in the attached LICENSE file. 6 * If you do not find this file, copies can be found by writing to: 7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group. 8 */ 9 10/** 11 * \brief this file contains the controll channel implementation between the 12 * node masters and the bomp worker threads 13 */ 14 15#include <bomp_internal.h> 16 17#include <if/bomp_defs.h> 18 19 20/* forward declaration */ 21static int bomp_thread_msg_handler(void *arg); 22 23///< stores the state of a message in transit 24struct bomp_msg_st 25{ 26 struct txq_msg_st common; ///< common msg state 27 28 uint32_t *message_sent; ///< 29 30 /* union of arguments */ 31 union { 32 struct { 33 uint64_t fn; ///< the function 34 uint64_t arg; ///< the argument to the function 35 uint32_t tid; ///< thread ID 36 uint64_t icv; ///< thread's control variables 37 } exec; ///< execution 38 } args; 39}; 40 41/* 42 * ============================================================================== 43 * Control Channel: Node Master Side 44 * ============================================================================== 45 */ 46 47/* 48 * ----------------------------------------------------------------------------- 49 * RX Handlers 50 * ----------------------------------------------------------------------------- 51 */ 52 53static void done__rx(struct bomp_binding *_binding, bomp_errval_t status) 54{ 55 BOMP_DEBUG_THREAD("done__rx from thread\n"); 56 57 struct bomp_thread *t = _binding->st; 58 59 struct bomp_node *node = t->node; 60 61 62 node->threads_active--; 63 64 BOMP_DEBUG_THREAD("threads active %u\n", node->threads_active); 65 66} 67 68/* 69 * ----------------------------------------------------------------------------- 70 * TX Handlers 71 * ----------------------------------------------------------------------------- 72 */ 73 74static void txq_msg_sent_cb(void *st) 75{ 76 struct bomp_msg_st *msg_st = (struct bomp_msg_st *)st; 77 *(msg_st->message_sent) = 1; 78} 79 80static errval_t execute__tx(struct txq_msg_st *msg_st) 81{ 82 struct bomp_msg_st *st = (struct bomp_msg_st *)msg_st; 83 84 return bomp_execute__tx(msg_st->queue->binding, TXQCONT(msg_st), 85 st->args.exec.fn, st->args.exec.arg, st->args.exec.tid, 86 st->args.exec.icv); 87} 88 89 90 91/** 92 * \brief callback when the BOMP thread connects to the node 93 * 94 * \param st state pointer 95 * \param err status of the connect 96 * \param _b created BOMP binding 97 */ 98static void bomp_thread_accept_cb(void *st, 99 errval_t err, 100 struct bomp_binding *_b) 101{ 102 struct bomp_thread *t = st; 103 104 BOMP_DEBUG_THREAD("connection accepted. tid=%" PRIuCOREID "\n", t->coreid); 105 106 t->thread_err = err; 107 108 txq_init(&t->txq, _b, _b->waitset, (txq_register_fn_t) _b->register_send, 109 sizeof(struct bomp_msg_st)); 110 111 _b->st = st; 112 t->ctrl = _b; 113 114 _b->rx_vtbl.done = done__rx; 115} 116 117/** 118 * \brief callback for creating the dispatcher on the remote core 119 * 120 * \param arg argument for the callback 121 * \param err outcome of the spanning request 122 */ 123static void bomp_thread_init_done(void *arg, errval_t err) 124{ 125 assert(err_is_ok(err)); 126 127 uint32_t *done = arg; 128 *done = 1; 129} 130 131/** 132 * \brief initializes a thread on the given core 133 * 134 * \@param core ID of the core on which to create the tread on 135 * \param stack_size size of the stack of the tread to be created 136 * \param thread pointer to the thread struct to create 137 * 138 * \returns SYS_ERR_OK on SUCCESS 139 * errval on FAILURE 140 */ 141errval_t bomp_thread_init(coreid_t core, 142 size_t stack_size, 143 struct bomp_thread *thread) 144{ 145 errval_t err; 146 147 BOMP_DEBUG_THREAD("Creating thread on core %"PRIuCOREID " \n", core); 148 149 uint32_t done; 150 151 err = domain_new_dispatcher(core, bomp_thread_init_done, &done); 152 if (err_is_fail(err)) { 153 BOMP_ERROR("creating new dispatcher on core %" PRIuCOREID "failed\n", 154 core); 155 return err; 156 } 157 158 while(!done) { 159 thread_yield(); 160 } 161 162 BOMP_DEBUG_THREAD("dispatcher ready. allocating memory for msg channel\n"); 163 164 size_t msg_frame_size; 165 err = frame_alloc(&thread->msgframe, 2 * BOMP_CHANNEL_SIZE, &msg_frame_size); 166 if (err_is_fail(err)) { 167 return err; 168 } 169 170 err = vspace_map_one_frame(&thread->msgbuf, msg_frame_size, thread->msgframe, 171 NULL, NULL); 172 if (err_is_fail(err)) { 173 return err; 174 } 175 176 struct bomp_frameinfo fi = { 177 .sendbase = (lpaddr_t)thread->msgbuf + BOMP_CHANNEL_SIZE, 178 .inbuf = thread->msgbuf, 179 .inbufsize = BOMP_CHANNEL_SIZE, 180 .outbuf = ((uint8_t *) thread->msgbuf) + BOMP_CHANNEL_SIZE, 181 .outbufsize = BOMP_CHANNEL_SIZE 182 }; 183 184 BOMP_DEBUG_THREAD("creating channel on %p\n", thread->msgbuf); 185 186 err = bomp_accept(&fi, thread, bomp_thread_accept_cb, 187 get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT); 188 189 if (err_is_fail(err)) { 190 // XXX> error handling 191 return err; 192 } 193 194 BOMP_DEBUG_THREAD("creating thread on core %" PRIuCOREID "\n", core); 195 err = domain_thread_create_on(core, bomp_thread_msg_handler, 196 thread->msgbuf, NULL); 197 if (err_is_fail(err)) { 198 // XXX> error handling 199 return err; 200 } 201 202 while (thread->ctrl == NULL) { 203 err = event_dispatch(get_default_waitset()); 204 if (err_is_fail(err)) { 205 USER_PANIC_ERR(err, "event dispatch\n"); 206 } 207 } 208 209 BOMP_DEBUG_THREAD("thread on core %" PRIuCOREID " connected \n", core); 210 211 return thread->thread_err; 212} 213 214errval_t bomp_thread_exec(struct bomp_thread *thread, 215 bomp_thread_fn_t fn, void *arg, uint32_t tid) 216{ 217 debug_printf("bomp_thread_exec(%p, %p, %p, %u) %p\n", thread, fn, arg, tid, thread->icvt); 218 struct txq_msg_st *msg_st = txq_msg_st_alloc(&thread->txq); 219 if (msg_st == NULL) { 220 return LIB_ERR_MALLOC_FAIL; 221 } 222 223 uint32_t msg_sent = 0; 224 225 msg_st->send = execute__tx; 226 msg_st->cleanup = (txq_cleanup_fn_t)txq_msg_sent_cb; 227 228 struct bomp_msg_st *bomp_msg_st = (struct bomp_msg_st *)msg_st; 229 230 bomp_msg_st->args.exec.arg = (uint64_t)arg; 231 bomp_msg_st->args.exec.fn = (uint64_t)fn; 232 bomp_msg_st->args.exec.tid = tid; 233 bomp_msg_st->args.exec.icv = (uint64_t)thread->icvt; 234 bomp_msg_st->message_sent = &msg_sent; 235 236 txq_send(msg_st); 237 238 while(msg_sent == 0) { 239 event_dispatch(get_default_waitset()); 240 } 241 242 //return event_dispatch_non_block(get_default_waitset()); 243 return SYS_ERR_OK; 244} 245 246/* 247 * ============================================================================== 248 * Control Channel: Worker Thread Side 249 * ============================================================================== 250 */ 251 252/* 253 * ----------------------------------------------------------------------------- 254 * TX Handlers 255 * ----------------------------------------------------------------------------- 256 */ 257 258static errval_t done__tx(struct txq_msg_st *msg_st) 259{ 260 BOMP_DEBUG_THREAD("done__tx\n"); 261 262 return bomp_done__tx(msg_st->queue->binding, TXQCONT(msg_st),msg_st->err); 263} 264 265/* 266 * ----------------------------------------------------------------------------- 267 * RX Handlers 268 * ----------------------------------------------------------------------------- 269 */ 270 271static void execute__rx(struct bomp_binding *_binding, 272 uint64_t fn, uint64_t arg, uint32_t tid, uint64_t icv_task) 273{ 274 275 276 struct bomp_thread *t = _binding->st; 277 struct bomp_tls *tls = thread_get_tls(); 278 279 BOMP_DEBUG_THREAD("execute__rx: %p %p, %lx\n", t, tls, icv_task); 280 281 assert(t == &tls->r.thread); 282 283 struct omp_icv_task icvt; 284 memcpy(&icvt, (void *)icv_task, sizeof(struct omp_icv_task)); 285 286 bomp_icv_set_task(&icvt); 287 288 tls->thread_id = tid; 289 290 bomp_thread_fn_t func= (bomp_thread_fn_t)fn; 291 292 // calling the function 293 func((void *)arg); 294 295 bomp_icv_set_task(NULL); 296 tls->thread_id = -1; 297 298 struct txq_msg_st *msg_st = txq_msg_st_alloc(&t->txq); 299 if (msg_st == NULL) { 300 BOMP_ERROR("allocation of message state failed: %" PRIu32 "\n", tid); 301 return; 302 } 303 304 msg_st->send = done__tx; 305 msg_st->err = SYS_ERR_OK; 306 307 txq_send(msg_st); 308} 309 310/** 311 * \brief XOMP channel connect callback called by the Flounder backend 312 * 313 * \param st Supplied worker state 314 * \param err outcome of the connect attempt 315 * \param xb XOMP Flounder binding 316 */ 317static void bomp_thread_connect_cb(void *st, 318 errval_t err, 319 struct bomp_binding *b) 320{ 321 struct bomp_thread *t = st; 322 323 BOMP_DEBUG_THREAD("connected to node master.\n"); 324 325 t->ctrl = b; 326 327 txq_init(&t->txq, b, b->waitset, (txq_register_fn_t) b->register_send, 328 sizeof(struct bomp_msg_st)); 329 330 b->rx_vtbl.execute = execute__rx; 331} 332 333 334/** 335 * \brief 336 * 337 * \param arg 338 * 339 * \return 340 */ 341static int bomp_thread_msg_handler(void *arg) 342{ 343 344 345 errval_t err; 346 347 struct bomp_tls *tls = calloc(1, sizeof(struct bomp_tls)); 348 if (tls == NULL) { 349 BOMP_ERROR("Could not allocate memory for TLS. %p\n", arg); 350 return -1; 351 } 352 353 BOMP_DEBUG_THREAD("thread message handler started %p\n", tls); 354 355 tls->role = BOMP_THREAD_ROLE_WORKER; 356 tls->self = thread_self(); 357 tls->r.thread.coreid = disp_get_core_id(); 358 tls->r.thread.msgbuf = arg; 359 tls->r.thread.tls = tls; 360 361 struct waitset local_waitset; 362 //struct waitset *ws = get_default_waitset(); 363 struct waitset *ws = &local_waitset; 364 365 waitset_init(ws); 366 367 368 struct bomp_frameinfo fi = { 369 .sendbase = (lpaddr_t)arg, 370 .inbuf = ((uint8_t *) arg) + BOMP_CHANNEL_SIZE, 371 .inbufsize = BOMP_CHANNEL_SIZE, 372 .outbuf = ((uint8_t *) arg), 373 .outbufsize = BOMP_CHANNEL_SIZE 374 }; 375 376 377 378 err = bomp_connect(&fi, bomp_thread_connect_cb, &tls->r.thread, ws, 379 IDC_EXPORT_FLAGS_DEFAULT); 380 381 382 if (err_is_fail(err)) { 383 /* TODO: Clean up */ 384 return err_push(err, XOMP_ERR_WORKER_INIT_FAILED); 385 } 386 387 thread_set_tls(tls); 388 389 390 while(1) { 391 err = event_dispatch_non_block(ws); 392 switch(err_no(err)) { 393 case LIB_ERR_NO_EVENT : 394 thread_yield(); 395 continue; 396 break; 397 case SYS_ERR_OK: 398 continue; 399 break; 400 default: 401 USER_PANIC_ERR(err, "event dispatch"); 402 break; 403 } 404 } 405 406 BOMP_NOTICE("thread %" PRIuCOREID " terminated", disp_get_core_id()); 407 408 409 return 0; 410} 411