1/* 2 * Copyright (c) 2014 ETH Zurich. 3 * All rights reserved. 4 * 5 * This file is distributed under the terms in the attached LICENSE file. 6 * If you do not find this file, copies can be found by writing to: 7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group. 8 */ 9 10/** 11 * \brief this file contains the controll channel implementation between the 12 * node masters and the bomp worker threads 13 */ 14#include <barrelfish/barrelfish.h> 15#include <bomp_internal.h> 16#include <bitmap.h> 17 18/* forward declaration */ 19static int bomp_node_msg_handler(void *arg); 20 21///< stores the state of a message in transit 22struct bomp_msg_st 23{ 24 struct txq_msg_st common; ///< common msg state 25 26 /* union of arguments */ 27 union { 28 struct { 29 uint64_t fn; ///< the function 30 uint64_t arg; ///< the argument to the function 31 uint32_t tid; ///< thread ID 32 uint64_t icv; ///< thread's control variables 33 } exec; ///< execution 34 } args; 35}; 36 37/* 38 * ============================================================================== 39 * Control Channel: Program Master Side 40 * ============================================================================== 41 */ 42 43 44static errval_t bomp_node_init_threads(nodeid_t nodeid, 45 nodeid_t numanode, 46 coreid_t nthreads, 47 size_t stack_size, 48 struct bomp_node *node) 49{ 50 errval_t err; 51 52 BOMP_DEBUG_NODE("Initialize worker threads for node %" PRIuNODEID " with %" 53 PRIuCOREID " threads\n", nodeid, nthreads); 54 55 node->threads = calloc(nthreads, sizeof(struct bomp_thread)); 56 if (node->threads == NULL) { 57 return LIB_ERR_MALLOC_FAIL; 58 } 59 60#if 0 61 struct bitmap *bm = numa_allocate_cpumask(); 62 err = numa_node_to_cpus(numanode, bm); 63 assert(err_is_ok(err)); 64#else 65 struct bitmap *bm = numa_all_cpus_ptr; 66#endif 67 68 69 coreid_t core = (coreid_t)bitmap_get_first(bm); 70 71 for (coreid_t i = 0; i < nthreads; ++i) { 72 BOMP_DEBUG_NODE("spanning to core %u\n", core); 73 node->threads_max++; 74 node->threads[i].node = node; 75 if (core == disp_get_core_id()) { 76 /* master thread */ 77 core = (coreid_t)bitmap_get_next(bm, core); 78 continue; 79 } 80 81 err = bomp_thread_init(core, stack_size, &node->threads[i]); 82 if (err_is_fail(err)) { 83 DEBUG_ERR(err, "creating thread\n"); 84 return err; 85 } 86 87 core = (coreid_t)bitmap_get_next(bm, core); 88 } 89 node->threads_active = 0; 90 node->tls = thread_get_tls(); 91 92 return SYS_ERR_OK; 93} 94 95/** 96 * \brief callback for creating the dispatcher on the remote core 97 * 98 * \param arg argument for the callback 99 * \param err outcome of the spanning request 100 */ 101static void bomp_node_init_done(void *arg, errval_t err) 102{ 103 assert(err_is_ok(err)); 104 105 uint32_t *done = arg; 106 *done = 1; 107} 108 109 110/** 111 * \brief callback when the BOMP thread connects to the node 112 * 113 * \param st state pointer 114 * \param err status of the connect 115 * \param _b created BOMP binding 116 */ 117static void bomp_node_accept_cb(void *st, 118 errval_t err, 119 struct bomp_binding *_b) 120{ 121 struct bomp_node *n = st; 122 123 BOMP_DEBUG_NODE("connection accepted. tid=%" PRIuCOREID "\n", n->id); 124 125 n->node_err = err; 126 127 txq_init(&n->txq, _b, _b->waitset, (txq_register_fn_t) _b->register_send, 128 sizeof(struct bomp_msg_st)); 129 130 _b->st = st; 131 n->ctrl = _b; 132 133 // _b->rx_vtbl.done = done__rx; 134} 135 136/* a node that is on our local address space */ 137static errval_t bomp_node_init_local(nodeid_t nodeid, 138 nodeid_t numanode, 139 coreid_t nthreads, 140 size_t stack_size, 141 struct bomp_node *node) 142{ 143 BOMP_DEBUG_NODE("Initialize local node node %" PRIuNODEID " with %" 144 PRIuCOREID " threads\n", nodeid, nthreads); 145 146 errval_t err; 147 148 uint32_t done; 149 150 node->id = nodeid; 151 node->numa_node = numanode; 152 node->threads_max = nthreads; 153 node->stack_size = stack_size; 154 node->threads_max = nthreads; 155 156 struct bitmap *bm = numa_allocate_cpumask(); 157 err = numa_node_to_cpus(numanode, bm); 158 assert(err_is_ok(err)); 159 160 coreid_t core = (coreid_t)bitmap_get_first(bm); 161 162 err = domain_new_dispatcher(core, bomp_node_init_done, &done); 163 if (err_is_fail(err)) { 164 BOMP_ERROR("creating new dispatcher on core %" PRIuCOREID "failed\n", 165 core); 166 return err; 167 } 168 169 while(!done) { 170 thread_yield(); 171 } 172 173 BOMP_DEBUG_NODE("dispatcher ready. allocating memory for msg channel\n"); 174 175 size_t msg_frame_size; 176 err = frame_alloc(&node->msgframe, 2 * BOMP_CHANNEL_SIZE, &msg_frame_size); 177 if (err_is_fail(err)) { 178 return err; 179 } 180 181 err = vspace_map_one_frame(&node->msgbuf, msg_frame_size, node->msgframe, 182 NULL, NULL); 183 if (err_is_fail(err)) { 184 return err; 185 } 186 187 struct bomp_frameinfo fi = { 188 .sendbase = (lpaddr_t)node->msgbuf + BOMP_CHANNEL_SIZE, 189 .inbuf = node->msgbuf, 190 .inbufsize = BOMP_CHANNEL_SIZE, 191 .outbuf = ((uint8_t *) node->msgbuf) + BOMP_CHANNEL_SIZE, 192 .outbufsize = BOMP_CHANNEL_SIZE 193 }; 194 195 BOMP_DEBUG_NODE("creating channel on %p\n", node->msgbuf); 196 197 err = bomp_accept(&fi, node, bomp_node_accept_cb, 198 get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT); 199 200 if (err_is_fail(err)) { 201 // XXX> error handling 202 return err; 203 } 204 205 BOMP_DEBUG_NODE("creating thread on core %" PRIuCOREID "\n", core); 206 err = domain_thread_create_on(core, bomp_node_msg_handler, node, NULL); 207 if (err_is_fail(err)) { 208 // XXX> error handling 209 return err; 210 } 211 212 while (node->ctrl == NULL) { 213 err = event_dispatch(get_default_waitset()); 214 if (err_is_fail(err)) { 215 USER_PANIC_ERR(err, "event dispatch\n"); 216 } 217 } 218 219 BOMP_DEBUG_NODE("node master on node %" PRIuNODEID " connected \n", nodeid); 220 221 return node->node_err; 222} 223 224/* remote node: a node that is in a foreign address space */ 225static errval_t bomp_node_init_remote(nodeid_t nodeid, 226 coreid_t nthreads, 227 size_t stack_size, 228 struct bomp_node *node) 229{ 230 BOMP_DEBUG_NODE("Initialize remote node node %" PRIuNODEID " with %" 231 PRIuCOREID " threads\n", nodeid, nthreads); 232 233 assert(!"NYI"); 234 return SYS_ERR_OK; 235} 236 237 238/** 239 * \brief 240 */ 241errval_t bomp_node_init(bomp_node_type_t type, 242 nodeid_t numanode, 243 nodeid_t nodeid, 244 coreid_t nthreads, 245 size_t stack_size, 246 struct bomp_node *node) 247{ 248 node->type = type; 249 250 switch(type) { 251 case BOMP_NODE_MASTER : 252 return bomp_node_init_threads(nodeid, numanode, nthreads, stack_size, node); 253 break; 254 case BOMP_NODE_LOCAL: 255 return bomp_node_init_local(nodeid, numanode, nthreads, stack_size, node); 256 break; 257 case BOMP_NODE_REMOTE : 258 return bomp_node_init_remote(nodeid, nthreads, stack_size, node); 259 break; 260 default: 261 return -1; 262 break; 263 } 264} 265 266 267 268coreid_t bomp_node_exec(struct bomp_node *node, void *fn, void *arg, coreid_t tid_start, coreid_t nthreads) 269{ 270 debug_printf("Executing on node %u\n", node->id); 271 assert(!"NYI"); 272 return node->threads_max; 273 274 return 0; 275} 276 277#if 0 278 279 280/* 281 * ============================================================================== 282 * Control Channel: Node Master Side 283 * ============================================================================== 284 */ 285 286/** 287 * \brief initializes the shared memory channel Node Master - Worker Threads 288 * (Worker Side) 289 * 290 * \param channel address of the message buffers to use 291 */ 292errval_t bomp_noded_channel_bind(void *channel) 293{ 294 assert(!"NYI"); 295 296 return SYS_ERR_OK; 297} 298 299#endif 300 301/** 302 * \brief XOMP channel connect callback called by the Flounder backend 303 * 304 * \param st Supplied worker state 305 * \param err outcome of the connect attempt 306 * \param xb XOMP Flounder binding 307 */ 308static void bomp_node_connect_cb(void *st, 309 errval_t err, 310 struct bomp_binding *b) 311{ 312 struct bomp_thread *t = st; 313 314 BOMP_DEBUG_THREAD("connected to node master.\n"); 315 316 t->ctrl = b; 317 318 txq_init(&t->txq, b, b->waitset, (txq_register_fn_t) b->register_send, 319 sizeof(struct bomp_msg_st)); 320 321 //b->rx_vtbl.execute = execute__rx; 322} 323 324/** 325 * \brief 326 * 327 * \param arg 328 * 329 * \return 330 */ 331static int bomp_node_msg_handler(void *arg) 332{ 333 BOMP_DEBUG_NODE("node master message handler started\n"); 334 335 errval_t err; 336 337 struct bomp_tls *tls = calloc(1, sizeof(struct bomp_tls)); 338 if (tls == NULL) { 339 BOMP_ERROR("Could not allocate memory for TLS. %p\n", arg); 340 return -1; 341 } 342 343 struct bomp_node *node = arg; 344 345 assert(numa_current_node() == node->numa_node); 346 347 tls->role = BOMP_THREAD_ROLE_NODE; 348 tls->self = thread_self(); 349 tls->r.node.id = node->id; 350 tls->r.node.msgbuf = node->msgbuf; 351 tls->r.node.tls = tls; 352 tls->r.node.stack_size = node->stack_size; 353 354 struct bomp_frameinfo fi = { 355 .sendbase = (lpaddr_t)arg, 356 .inbuf = ((uint8_t *) arg) + BOMP_CHANNEL_SIZE, 357 .inbufsize = BOMP_CHANNEL_SIZE, 358 .outbuf = ((uint8_t *) arg), 359 .outbufsize = BOMP_CHANNEL_SIZE 360 }; 361 362 struct waitset *ws = get_default_waitset(); 363 364 BOMP_DEBUG_NODE("initializing local worker threads\n"); 365 err = bomp_node_init_threads(node->id, node->numa_node, node->threads_max, 366 node->stack_size, &tls->r.node); 367 if (err_is_fail(err)) { 368 DEBUG_ERR(err, "init threads\n"); 369 } 370 371 assert(node->threads_max == tls->r.node.threads_max); 372 373 BOMP_DEBUG_NODE("connecting to program master\n"); 374 375 err = bomp_connect(&fi, bomp_node_connect_cb, &tls->r.thread, ws, 376 IDC_EXPORT_FLAGS_DEFAULT); 377 378 379 if (err_is_fail(err)) { 380 /* TODO: Clean up */ 381 return err_push(err, XOMP_ERR_WORKER_INIT_FAILED); 382 } 383 384 thread_set_tls(tls); 385 386 while(1) { 387 err = event_dispatch_non_block(ws); 388 switch(err_no(err)) { 389 case LIB_ERR_NO_EVENT : 390 thread_yield(); 391 continue; 392 break; 393 case SYS_ERR_OK: 394 continue; 395 break; 396 default: 397 USER_PANIC_ERR(err, "event dispatch"); 398 break; 399 } 400 } 401 402 BOMP_NOTICE("node master %" PRIuNODEID " terminated", tls->r.node.id); 403 404 405 return 0; 406} 407