1/* 2 * Copyright (c) 2010, 2011, 2012, ETH Zurich. 3 * All rights reserved. 4 * 5 * This file is distributed under the terms in the attached LICENSE file. 6 * If you do not find this file, copies can be found by writing to: 7 * ETH Zurich D-INFK, CAB F.78, Universitaetstrasse 6, CH-8092 Zurich, 8 * Attn: Systems Group. 9 */ 10 11#include <barrelfish/barrelfish.h> 12#include <barrelfish/spawn_client.h> 13 14#include <flounder/flounder_txqueue.h> 15#include <spawndomain/spawndomain.h> 16 17#include <xeon_phi/xeon_phi.h> 18#include <xeon_phi/xeon_phi_client.h> 19#include <xeon_phi/xeon_phi_domain.h> 20 21#include <bomp_internal.h> 22#include <xomp/xomp.h> 23 24#include <if/xomp_defs.h> 25 26#include <xomp_debug.h> 27 28/// enables the virtual threads 29#define XOMP_VTHREADS (XOMP_VTHREAD_COUNT + 1) 30 31/** 32 * \brief worker state enumeration. 33 * 34 * Describes the possible states a worker can be in. 35 */ 36typedef enum xomp_worker_state 37{ 38 XOMP_WORKER_ST_INVALID = 0, ///< this worker has not been initialized 39 XOMP_WORKER_ST_FAILURE = 1, ///< an error occurred during an operation 40 XOMP_WORKER_ST_SPAWNING = 2, ///< worker is being spawned 41 XOMP_WORKER_ST_SPAWNED = 3, ///< worker is spawned and connected to master 42 XOMP_WORKER_ST_READY = 4, ///< worker is ready to service requests 43 XOMP_WORKER_ST_BUSY = 5 ///< worker is busy servicing requests 44} xomp_worker_st_t; 45 46/** 47 * \brief worker type enumeration 48 * 49 * Describes the possible worker types i.e. where the domain is running on 50 */ 51typedef enum xomp_worker_type 52{ 53 XOMP_WORKER_TYPE_INVALID = 0, ///< invalid worker type (not initialized) 54 XOMP_WORKER_TYPE_LOCAL = 1, ///< worker runs local to master 55 XOMP_WORKER_TYPE_REMOTE = 2 ///< worker runs remote to master 56} xomp_worker_type_t; 57 58/** 59 * \brief XOMP worker 60 */ 61struct xomp_worker 62{ 63 xomp_wid_t id; ///< worker ID 64 xomp_worker_type_t type; ///< worker type 65 xomp_worker_st_t state; ///< worker state 66 struct capref domain; ///< domain cap of the worker 67 xphi_dom_id_t domainid; ///< domain ID of the worker 68 69 struct xomp_binding *binding; ///< Control channel binding 70 struct tx_queue txq; ///< Flounder TX queue 71 72 errval_t err; ///< error number in case an error occurred 73 uint8_t add_mem_st; ///< state flag when we adding a frame 74 75 struct capref msgframe; ///< messaging frame + tls for the worker 76 lpaddr_t msgbase; ///< physical base of the messaging frame 77 void *msgbuf; ///< where the messaging frame is mapped 78 79 void *tls; ///< pointer to the thread local storage 80 81#if XOMP_BENCH_ENABLED 82 cycles_t start; ///< start time of the operation 83 uint32_t index; 84#endif 85}; 86 87/** 88 * \brief XOMP master 89 */ 90struct xomp_master 91{ 92 uint32_t numworker; ///< total number of worker spawned 93 struct { 94 uint32_t num; ///< number of local workers 95 struct xomp_worker *workers; ///< array of local workers 96 uint32_t next; ///< next local worker to "allocate" 97 } local; 98 struct { 99 uint32_t num; ///< number of remote workers 100 struct xomp_worker *workers; ///< array of remote workers 101 uint32_t next; ///< next remote worker to "allocate" 102 } remote; 103}; 104 105/** 106 * \brief Message state for the TX queue 107 */ 108struct xomp_msg_st 109{ 110 struct txq_msg_st common; ///< common msg state 111 /* union of arguments */ 112 union { 113 struct { 114 uint64_t fn; 115 uint64_t arg; 116 uint64_t id; 117 uint64_t flags; 118 } do_work; 119 struct { 120 struct capref frame; 121 uint64_t vaddr; 122 uint8_t type; 123 } add_mem; 124 } args; 125}; 126 127/// intialized flag 128static uint8_t xomp_master_initialized = 0x0; 129 130/// XOMP master 131static struct xomp_master xmaster; 132 133/// exported service iref (for local workers) 134static iref_t svc_iref; 135 136/// number of present Xeon Phis 137static uint8_t num_phi = 0; 138 139/// only use remote workers, no locals 140static xomp_wloc_t worker_loc = XOMP_WORKER_LOC_MIXED; 141 142/// stride for core allocation (in case of hyperthreads) 143static coreid_t core_stride; 144 145/// arguments to supply to the local spawned workers 146static struct xomp_spawn spawn_args_local; 147 148/// arguments to supply to the remote spawned workers 149static struct xomp_spawn spawn_args_remote; 150 151/// buffer for the worker id argument 152static char worker_id_buf[26]; 153 154/// buffer for the iref argument 155static char iref_buf[19]; 156 157#if XOMP_BENCH_ENABLED 158 159#include <bench/bench.h> 160 161static bench_ctl_t **xomp_bench_mem_add; 162static bench_ctl_t **xomp_bench_do_work; 163static bench_ctl_t **xomp_bench_spawn; 164 165#endif 166 167#if XOMP_BENCH_MASTER_EN 168static cycles_t local_timer; 169static cycles_t remote_timer; 170#endif 171 172/** 173 * \brief enters the barrier when a worker finished his work, this function 174 * is called on the main thread (master domain) 175 * 176 * \param barrier The barrier to enter 177 */ 178static inline void xbomp_barrier_enter_no_wait(struct bomp_barrier *barrier) 179{ 180 if (__sync_fetch_and_add(&barrier->counter, 1) == (barrier->max - 1)) { 181 barrier->counter = 0; 182 barrier->cycle = !barrier->cycle; 183 } 184} 185 186#define XOMP_LOCAL_THREADS_MAX 10 187 188/* 189 * ---------------------------------------------------------------------------- 190 * Helper functions 191 * ---------------------------------------------------------------------------- 192 */ 193static inline uint32_t xomp_master_get_local_threads(uint32_t nthreads) 194{ 195 switch (worker_loc) { 196 case XOMP_WORKER_LOC_LOCAL: 197 return nthreads - 1; 198 case XOMP_WORKER_LOC_MIXED: 199#if XOMP_LOCAL_THREADS_MAX 200 if (nthreads > XOMP_LOCAL_THREADS_MAX) { 201 return XOMP_LOCAL_THREADS_MAX - 1; 202 } else { 203 return nthreads - (num_phi * ((nthreads) / (1 + num_phi))) - 1; 204 } 205#else 206 return nthreads - (num_phi * ((nthreads) / (1 + num_phi))) - 1; 207#endif 208 case XOMP_WORKER_LOC_REMOTE: 209 return 0; 210 default: 211 USER_PANIC("unknown worker location!"); 212 } 213 USER_PANIC("unknown worker location!"); 214 return 0; 215} 216 217static inline uint32_t xomp_master_get_remote_threads(uint32_t nthreads) 218{ 219 switch (worker_loc) { 220 case XOMP_WORKER_LOC_LOCAL: 221 return 0; 222 case XOMP_WORKER_LOC_MIXED: 223#if XOMP_LOCAL_THREADS_MAX 224 if (nthreads > XOMP_LOCAL_THREADS_MAX) { 225 return nthreads - XOMP_LOCAL_THREADS_MAX; 226 } else { 227 return ((nthreads) / (1 + num_phi)) * num_phi; 228 } 229#else 230 return ((nthreads) / (1 + num_phi)) * num_phi; 231#endif 232 case XOMP_WORKER_LOC_REMOTE: 233 return nthreads - 1; 234 default: 235 USER_PANIC("unknown worker location!"); 236 } 237 238 return 0; 239} 240 241/* 242 * ---------------------------------------------------------------------------- 243 * XOMP channel send handlers 244 * ---------------------------------------------------------------------------- 245 */ 246 247static errval_t do_work_tx(struct txq_msg_st *msg_st) 248{ 249 250 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 251 252 return xomp_do_work__tx(msg_st->queue->binding, TXQCONT(msg_st), 253 st->args.do_work.fn, st->args.do_work.arg, 254 st->args.do_work.id, st->args.do_work.flags); 255} 256 257static errval_t gw_req_memory_call_tx(struct txq_msg_st *msg_st) 258{ 259 260 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 261 262 return xomp_gw_req_memory_call__tx(msg_st->queue->binding, TXQCONT(msg_st), 263 st->args.add_mem.vaddr, 264 st->args.add_mem.type); 265} 266 267static errval_t add_memory_call_tx(struct txq_msg_st *msg_st) 268{ 269 270 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 271 272 return xomp_add_memory_call__tx(msg_st->queue->binding, TXQCONT(msg_st), 273 st->args.add_mem.frame, 274 st->args.add_mem.vaddr, st->args.add_mem.type); 275} 276 277/* 278 * ---------------------------------------------------------------------------- 279 * XOMP channel receive handlers 280 * ---------------------------------------------------------------------------- 281 */ 282 283static void gw_req_memory_response_rx(struct xomp_binding *b, 284 errval_t msgerr) 285{ 286 XMP_DEBUG("gw_req_memory_response_rx: %s\n", err_getstring(msgerr)); 287 288 struct xomp_worker *worker = b->st; 289 290 worker->err = msgerr; 291 worker->add_mem_st = 0x2; 292 293#if XOMP_BENCH_ENABLED 294 cycles_t timer = bench_tsc(); 295 if (xomp_bench_mem_add) { 296 timer = bench_time_diff(worker->start, timer); 297 bench_ctl_add_run(xomp_bench_mem_add[1], &timer); 298 bench_ctl_add_run(xomp_bench_mem_add[2+worker->index], &timer); 299 } 300#endif 301 302#if XOMP_BENCH_MASTER_EN 303 cycles_t duration = bench_tsc() - worker->start; 304 remote_timer += duration; 305 debug_printf("remote worker %016lx: add memory took %lu cycles, %lu ms\n", 306 worker->id, duration, bench_tsc_to_ms(duration)); 307#endif 308} 309 310static void add_memory_response_rx(struct xomp_binding *b, 311 errval_t msgerr) 312{ 313 XMP_DEBUG("add_memory_response_rx: %s\n", err_getstring(msgerr)); 314 315 struct xomp_worker *worker = b->st; 316 317 worker->err = msgerr; 318 worker->add_mem_st = 0x2; 319 320#if XOMP_BENCH_ENABLED 321 cycles_t timer = bench_tsc(); 322 if (xomp_bench_mem_add) { 323 timer = bench_time_diff(worker->start, timer); 324 bench_ctl_add_run(xomp_bench_mem_add[0], &timer); 325 bench_ctl_add_run(xomp_bench_mem_add[2+worker->index], &timer); 326 } 327#endif 328 329#if XOMP_BENCH_MASTER_EN 330 cycles_t duration = bench_tsc() - worker->start; 331 local_timer += duration; 332 debug_printf("local worker %016lx: add memory took %lu cycles, %lu ms\n", 333 worker->id, duration, bench_tsc_to_ms(duration)); 334#endif 335} 336 337static inline void done_msg_common(struct xomp_binding *b, 338 uint64_t tid, 339 errval_t msgerr) 340{ 341 struct xomp_task *task = (struct xomp_task *) tid; 342 343 struct xomp_worker *worker = b->st; 344 if (err_is_fail(msgerr)) { 345 worker->state = XOMP_WORKER_ST_FAILURE; 346 } else { 347 worker->state = XOMP_WORKER_ST_READY; 348 } 349 350#if XOMP_BENCH_ENABLED 351 cycles_t timer = bench_tsc(); 352 if (xomp_bench_do_work) { 353 timer = bench_time_diff(worker->start, timer); 354 if (worker->type == XOMP_WORKER_TYPE_LOCAL) { 355 bench_ctl_add_run(xomp_bench_do_work[0], &timer); 356 } else if (worker->type == XOMP_WORKER_TYPE_REMOTE){ 357 bench_ctl_add_run(xomp_bench_do_work[1], &timer); 358 } 359 bench_ctl_add_run(xomp_bench_do_work[2 + worker->index], &timer); 360 } 361#endif 362 363#if XOMP_BENCH_MASTER_EN 364 cycles_t duration = bench_tsc()- worker->start; 365 debug_printf("generic worker %u, %lu cycles, %lu ms\n", 366 (uint16_t)worker->id, duration, bench_tsc_to_ms(duration)); 367#endif 368 369 xbomp_barrier_enter_no_wait(task->barrier); 370 371 /* if the last worker returns, free up the task data structure */ 372 task->done++; 373 if (task->done == task->total_threads) { 374 free(task); 375 } 376} 377 378static void done_with_arg_rx(struct xomp_binding *b, 379 uint64_t tid, 380 uint64_t arg, 381 errval_t msgerr) 382{ 383 XMP_DEBUG("done_with_arg_rx: arg:%lx, id:%lx\n", arg, tid); 384 385 done_msg_common(b, tid, msgerr); 386 387 /* XXX: do something with the argument */ 388} 389 390static void done_notify_rx(struct xomp_binding *b, 391 uint64_t tid, 392 errval_t msgerr) 393{ 394 XMP_DEBUG("done_notify_rx: id:%lx\n", tid); 395 396 done_msg_common(b, tid, msgerr); 397} 398 399static struct xomp_rx_vtbl rx_vtbl = { 400 .gw_req_memory_response = gw_req_memory_response_rx, 401 .add_memory_response = add_memory_response_rx, 402 .done_notify = done_notify_rx, 403 .done_with_arg = done_with_arg_rx 404}; 405 406/* 407 * ---------------------------------------------------------------------------- 408 * XOMP channel connect handler 409 * ---------------------------------------------------------------------------- 410 */ 411 412static errval_t xomp_svc_connect_cb(void *st, 413 struct xomp_binding *xb) 414{ 415 struct xomp_worker *worker = xmaster.local.workers + xmaster.local.next++; 416 417 XMI_DEBUG("xomp_svc_connect_cb:%lx connected: %p\n", worker->id, worker); 418 419 xb->rx_vtbl = rx_vtbl; 420 xb->st = worker; 421 422 txq_init(&worker->txq, xb, xb->waitset, (txq_register_fn_t) xb->register_send, 423 sizeof(struct xomp_msg_st)); 424 425 worker->binding = xb; 426 worker->state = XOMP_WORKER_ST_SPAWNED; 427 428 return SYS_ERR_OK; 429} 430 431static void xomp_svc_export_cb(void *st, 432 errval_t err, 433 iref_t iref) 434{ 435 XMI_DEBUG("Service exported @ iref:%"PRIuIREF", %s\n", iref, err_getstring(err)); 436 437 svc_iref = iref; 438} 439 440/** 441 * \brief XOMP channel connect callback called by the Flounder backend 442 * 443 * \param st Supplied worker state 444 * \param err outcome of the connect attempt 445 * \param xb XOMP Flounder binding 446 */ 447static void worker_connect_cb(void *st, 448 errval_t err, 449 struct xomp_binding *xb) 450{ 451 452 struct xomp_worker *worker = st; 453 454 XMI_DEBUG("worker:%lx connected: %s\n", worker->id, err_getstring(err)); 455 456 if (err_is_fail(err)) { 457 worker->state = XOMP_WORKER_ST_FAILURE; 458 return; 459 } 460 461 xb->rx_vtbl = rx_vtbl; 462 xb->st = worker; 463 464 txq_init(&worker->txq, xb, xb->waitset, (txq_register_fn_t) xb->register_send, 465 sizeof(struct xomp_msg_st)); 466 467 worker->binding = xb; 468 worker->state = XOMP_WORKER_ST_SPAWNED; 469} 470 471/* 472 * ============================================================================ 473 * Public Interface 474 * ============================================================================ 475 */ 476 477/** 478 * \brief initializes the Xeon Phi openMP library 479 * 480 * \param args struct containing the master initialization values 481 * 482 * \returns SYS_ERR_OK on success 483 * errval on failure 484 */ 485 486errval_t xomp_master_init(struct xomp_args *args) 487{ 488 errval_t err; 489 490 if (xomp_master_initialized) { 491 XMI_DEBUG("WARNIG: XOMP master already initialized\n"); 492 return SYS_ERR_OK; 493 } 494 495 if (args->type == XOMP_ARG_TYPE_WORKER) { 496 return -1; // TODO: ERRNO 497 } 498 499#if XOMP_BENCH_MASTER_EN 500 bench_init(); 501#endif 502 503 if (args->core_stride != 0) { 504 core_stride = args->core_stride; 505 } else { 506 core_stride = BOMP_DEFAULT_CORE_STRIDE; 507 } 508 509 if (args->type == XOMP_ARG_TYPE_UNIFORM) { 510 num_phi = args->args.uniform.nphi; 511 worker_loc = args->args.uniform.worker_loc; 512 } else { 513 num_phi = args->args.distinct.nphi; 514 515 worker_loc = args->args.distinct.worker_loc; 516 } 517 518 XMI_DEBUG("Initializing XOMP master with nthreads:%u, nphi:%u\n", 519 args->args.uniform.nthreads, args->args.uniform.nphi); 520 521 /* exporting the interface for local workers */ 522 err = xomp_export(NULL, xomp_svc_export_cb, xomp_svc_connect_cb, 523 get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT); 524 if (err_is_fail(err)) { 525 return err; 526 } 527 528 while (svc_iref == 0) { 529 err = event_dispatch(get_default_waitset()); 530 if (err_is_fail(err)) { 531 USER_PANIC_ERR(err, "event dispatch\n"); 532 } 533 } 534 535 char **argv = NULL; 536 537 if (args->type == XOMP_ARG_TYPE_UNIFORM) { 538 539 spawn_args_local.argc = args->args.uniform.argc; 540 spawn_args_remote.argc = args->args.uniform.argc; 541 542 err = xomp_master_build_path(&spawn_args_local.path, &spawn_args_remote.path); 543 if (err_is_fail(err)) { 544 USER_PANIC_ERR(err, "could not build the path"); 545 } 546 argv = args->args.uniform.argv; 547 } else { 548 spawn_args_local.argc = args->args.distinct.local.argc; 549 spawn_args_local.path = args->args.distinct.local.path; 550 spawn_args_remote.path = args->args.distinct.remote.path; 551 argv = args->args.distinct.local.argv; 552 } 553 554 spawn_args_local.argv = calloc(spawn_args_local.argc + 3, sizeof(char *)); 555 556 if (spawn_args_local.argv == NULL) { 557 return LIB_ERR_MALLOC_FAIL; 558 } 559 560 for (uint8_t i = 0; i < spawn_args_local.argc; ++i) { 561 spawn_args_local.argv[i] = argv[i]; 562 } 563 564 spawn_args_local.argv[spawn_args_local.argc++] = XOMP_WORKER_ARG; 565 spawn_args_local.argv[spawn_args_local.argc++] = worker_id_buf; 566 spawn_args_local.argv[spawn_args_local.argc++] = iref_buf; 567 spawn_args_local.argv[spawn_args_local.argc] = NULL; 568 569 snprintf(iref_buf, sizeof(iref_buf), "--iref=0x%08x", svc_iref); 570 571 /* remote initialization */ 572 573 if (args->type == XOMP_ARG_TYPE_DISTINCT) { 574 argv = args->args.distinct.remote.argv; 575 spawn_args_remote.argc = args->args.distinct.remote.argc; 576 } 577 578 spawn_args_remote.argv = calloc(spawn_args_remote.argc + 2, sizeof(char *)); 579 580 if (spawn_args_remote.argv == NULL) { 581 free(spawn_args_local.argv); 582 return LIB_ERR_MALLOC_FAIL; 583 } 584 585 for (uint8_t i = 0; i < spawn_args_remote.argc; ++i) { 586 spawn_args_remote.argv[i] = argv[i]; 587 } 588 589 spawn_args_remote.argv[spawn_args_remote.argc++] = XOMP_WORKER_ARG; 590 spawn_args_remote.argv[spawn_args_remote.argc++] = worker_id_buf; 591 spawn_args_remote.argv[spawn_args_remote.argc] = NULL; 592 593 xomp_master_initialized = 0x1; 594 595 return SYS_ERR_OK; 596} 597 598/** 599 * \brief Spawns the worker threads on the Xeon Phi 600 * 601 * \param nworkers Number of total workers this includes the Master 602 * 603 * \returns SYS_ERR_OK on success 604 * errval on failure 605 */ 606errval_t xomp_master_spawn_workers(uint32_t nworkers) 607{ 608 errval_t err; 609 610 if (!xomp_master_initialized) { 611 return XOMP_ERR_MASTER_NOT_INIT; 612 } 613 614 xmaster.numworker = nworkers; 615 616 struct xomp_worker *workers = calloc(nworkers, sizeof(struct xomp_worker)); 617 618 if (workers == NULL) { 619 return LIB_ERR_MALLOC_FAIL; 620 } 621 622 uint32_t remote_threads = xomp_master_get_remote_threads(nworkers); 623 uint32_t local_threads = xomp_master_get_local_threads(nworkers); 624 625 xmaster.local.next = 0; 626 xmaster.remote.next = 0; 627 xmaster.local.num = local_threads; 628 xmaster.remote.num = remote_threads; 629 xmaster.local.workers = workers; 630 631 if (remote_threads > 0) { 632 err = spawn_symval_cache_init(0); 633 if (err_is_fail(err)) { 634 USER_PANIC_ERR(err, "domain no spawned with appropriate flags\n"); 635 return err; 636 } 637 } 638 639 if (num_phi > 0) { 640 xmaster.remote.workers = workers + local_threads; 641 } 642 643 XMI_DEBUG("spawning %u workers: local:%u, remote: %ux%u\n", nworkers - 1, 644 local_threads, num_phi, 645 (num_phi != 0 ? remote_threads / num_phi : remote_threads)); 646 647 assert((remote_threads + local_threads + 1) == nworkers); 648 649 xphi_id_t xid = 0; 650 coreid_t core = disp_get_core_id() + core_stride; 651 652#if XOMP_BENCH_MASTER_EN 653 cycles_t spawn_timer; 654 cycles_t remote_spawn_timer = 0; 655 cycles_t remote_connect_timer = 0; 656 cycles_t local_spawn_timer = 0; 657 cycles_t local_connect_timer = 0; 658#endif 659 660 for (uint32_t i = 0; i < remote_threads + local_threads; ++i) { 661#ifdef __k1om__ 662 if (xid == disp_xeon_phi_id()) { 663 xid = (xid + 1) % num_phi; 664 } 665#endif 666 if (i == local_threads) { 667 core = XOMP_REMOTE_COREID_START; 668 } 669 670 struct xomp_worker *worker = workers + i; 671 672#if XOMP_BENCH_ENABLED 673 worker->index = i; 674 worker->start = bench_tsc(); 675#endif 676 677#ifndef __k1om__ 678 /* 679 * XXX: we have to set the ram affinity in order to have a higher chance 680 * the node gets found at the Xeon Phi. It may be split up otherwise 681 */ 682 uint64_t min_base, max_limit; 683 ram_get_affinity(&min_base, &max_limit); 684 ram_set_affinity(XOMP_RAM_MIN_BASE, XOMP_RAM_MAX_LIMIT); 685#endif 686 687 if (i < local_threads) { 688 err = frame_alloc(&worker->msgframe, XOMP_TLS_SIZE, NULL); 689 } else { 690 err = frame_alloc(&worker->msgframe, XOMP_FRAME_SIZE, NULL); 691 } 692 693#ifndef __k1om__ 694 ram_set_affinity(min_base, max_limit); 695#endif 696 697 if (err_is_fail(err)) { 698 /* TODO: cleanup */ 699 worker->state = XOMP_WORKER_ST_FAILURE; 700 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 701 } 702 703 struct frame_identity id; 704 err = frame_identify(worker->msgframe, &id); 705 if (err_is_fail(err)) { 706 /* TODO: cleanup */ 707 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 708 } 709 710 /* TODO: build a good id */ 711 worker->id = ((uint64_t) disp_get_domain_id()) << 48 | ((uint64_t)core) << 32; 712 if (i < local_threads) { 713 worker->id |= ((uint64_t)0xFF) << 24; 714 } else { 715 worker->id |= ((uint64_t)xid) << 24; 716 } 717 worker->id |= i+1; 718 719 worker->msgbase = id.base; 720 worker->state = XOMP_WORKER_ST_SPAWNING; 721 722 err = vspace_map_one_frame(&worker->msgbuf, id.bytes, 723 worker->msgframe, NULL, NULL); 724 725 if (err_is_fail(err)) { 726 /* TODO: cleanup */ 727 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 728 } 729 730 XMI_DEBUG("messaging frame mapped: [%016lx] @ [%016lx]\n", 731 worker->msgbase, (lvaddr_t )worker->msgbuf); 732 733 if (i < local_threads) { 734 snprintf(worker_id_buf, sizeof(worker_id_buf), "--wid=%016"PRIx64, 735 worker->id); 736 /* 737 * TODO: set a gateway domain for each NUMA node as it is done with 738 * the Xeon Phi 739 */ 740 worker->tls = worker->msgbuf; 741 742 XMI_DEBUG("spawning {%s} on host, core:%u\n", spawn_args_local.path, 743 core); 744#if XOMP_BENCH_MASTER_EN 745 spawn_timer = bench_tsc(); 746#endif 747 748 struct capref did; 749 err = spawn_program_with_caps(core, spawn_args_local.path, 750 spawn_args_local.argv, NULL, NULL_CAP, 751 worker->msgframe, SPAWN_FLAGS_OMP, 752 &did); 753#if XOMP_BENCH_MASTER_EN 754 local_spawn_timer += bench_tsc() - spawn_timer; 755 spawn_timer = bench_tsc(); 756#endif 757 worker->domain = did; 758 worker->type = XOMP_WORKER_TYPE_LOCAL; 759 if (err_is_fail(err)) { 760 /* TODO: cleanup */ 761 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 762 } 763 764 core += core_stride; 765 } else { 766 /* 767 * we give give the first worker domains the gateway flag so it 768 * initializes the gateway service while others will connect to it 769 */ 770 if (core == XOMP_REMOTE_COREID_START) { 771 worker->id |= XOMP_WID_GATEWAY_FLAG; 772 } 773 774 snprintf(worker_id_buf, sizeof(worker_id_buf), "--wid=%016"PRIx64, 775 worker->id); 776 777 worker->tls = ((uint8_t *) worker->msgbuf) + XOMP_MSG_FRAME_SIZE; 778 779 struct xomp_frameinfo fi = { 780 .sendbase = worker->msgbase + XOMP_MSG_CHAN_SIZE, 781 .inbuf = worker->msgbuf, 782 .inbufsize = XOMP_MSG_CHAN_SIZE, 783 .outbuf = ((uint8_t *) worker->msgbuf) + XOMP_MSG_CHAN_SIZE, 784 .outbufsize = XOMP_MSG_CHAN_SIZE 785 }; 786 787 err = xomp_accept(&fi, worker, worker_connect_cb, 788 get_default_waitset(), IDC_EXPORT_FLAGS_DEFAULT); 789 790 if (err_is_fail(err)) { 791 /* TODO: Clean up */ 792 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 793 } 794 795 XMI_DEBUG("spawning {%s} on xid:%u, core:%u\n", 796 spawn_args_remote.path, xid, core); 797#if XOMP_BENCH_MASTER_EN 798 spawn_timer = bench_tsc(); 799#endif 800 err = xeon_phi_client_spawn(xid, core, spawn_args_remote.path, 801 spawn_args_remote.argv, worker->msgframe, 802 SPAWN_FLAGS_OMP, &worker->domainid); 803 804#if XOMP_BENCH_MASTER_EN 805 remote_spawn_timer += bench_tsc() - spawn_timer; 806 spawn_timer = bench_tsc(); 807#endif 808 809 if (err_is_fail(err)) { 810 /* TODO: cleanup */ 811 return err_push(err, XOMP_ERR_SPAWN_WORKER_FAILED); 812 } 813 worker->type = XOMP_WORKER_TYPE_REMOTE; 814 xid++; 815 } 816 817 XMI_DEBUG("waiting for client %p to connect...\n", worker); 818 819 while (worker->state == XOMP_WORKER_ST_SPAWNING) { 820 err = event_dispatch(get_default_waitset()); 821 if (err_is_fail(err)) { 822 USER_PANIC_ERR(err, "event dispatch\n"); 823 } 824 } 825#if XOMP_BENCH_MASTER_EN 826 if (worker->type == XOMP_WORKER_TYPE_REMOTE) { 827 remote_connect_timer += bench_tsc() - spawn_timer; 828 } else { 829 local_connect_timer += bench_tsc() - spawn_timer; 830 } 831#endif 832 833 if (worker->state == XOMP_WORKER_ST_FAILURE) { 834 return XOMP_ERR_SPAWN_WORKER_FAILED; 835 } 836 837#if XOMP_BENCH_ENABLED 838 cycles_t timer = bench_tsc(); 839 if (xomp_bench_spawn) { 840 timer = bench_time_diff(worker->start, timer); 841 if (i < local_threads) { 842 bench_ctl_add_run(xomp_bench_spawn[0], &timer); 843 } else { 844 bench_ctl_add_run(xomp_bench_spawn[1], &timer); 845 } 846 } 847#endif 848 849 worker->state = XOMP_WORKER_ST_READY; 850 851 if (i >= local_threads) { 852 if (xid == num_phi) { 853 xid = 0; 854 core++; // no stride on xeon phi 855 } 856 } 857 } 858 859#if XOMP_BENCH_MASTER_EN 860 remote_spawn_timer /= (remote_threads ? remote_threads : 1); 861 local_spawn_timer /= (local_threads ? local_threads : 1); 862 remote_connect_timer /= (remote_threads ? remote_threads : 1); 863 local_connect_timer /= (local_threads ? local_threads : 1); 864 debug_printf("Avg spawn time remote: %lu cycles, %lu ms\n", 865 remote_spawn_timer, bench_tsc_to_ms(remote_spawn_timer)); 866 debug_printf("Avg spawn time local: %lu cycles, %lu ms\n", 867 local_spawn_timer, bench_tsc_to_ms(local_spawn_timer)); 868 debug_printf("Avg connect time remote: %lu cycles, %lu ms\n", 869 remote_connect_timer, bench_tsc_to_ms(remote_connect_timer)); 870 debug_printf("Avg connect time local: %lu cycles, %lu ms\n", 871 local_connect_timer, bench_tsc_to_ms(local_connect_timer)); 872#endif 873 874 xmaster.local.next = 0; 875 xmaster.remote.next = 0; 876 877 return SYS_ERR_OK; 878} 879 880/** 881 * \brief Adds a memory region to be used for work 882 * 883 * \param frame Frame to be shared 884 * \param info information about the frame i.e. virtual address to map 885 * \oaram type Type of the frame 886 * 887 * \returns SYS_ERR_OK on success 888 * errval on error 889 */ 890 891errval_t xomp_master_add_memory(struct capref frame, 892 uint64_t info, 893 xomp_frame_type_t type) 894{ 895 errval_t err; 896 897 if (!xomp_master_initialized) { 898 return XOMP_ERR_MASTER_NOT_INIT; 899 } 900 901#if XOMP_BENCH_MASTER_EN 902 remote_timer = 0; 903 local_timer = 0; 904#endif 905 906 struct xomp_worker *worker; 907 908 XMI_DEBUG("adding memory of type %u @ info: %016lx\n", type, info); 909 910 /* 911 * we adding the memory to the worker domains with the Xeon Phi Gateway 912 * domains first. This is expected to take the longest time. (potential 913 * replication and going through the Xeon Phi drivers). 914 * 915 * For subsequent worker domains, we just send the messages asynchronously 916 */ 917 for (uint32_t i = 0; i < xmaster.remote.num; ++i) { 918 worker = &xmaster.remote.workers[i]; 919#if XOMP_BENCH_ENABLED 920 worker->start = bench_tsc(); 921#endif 922 if (worker->id & XOMP_WID_GATEWAY_FLAG) { 923 xphi_id_t xid = xeon_phi_domain_get_xid(worker->domainid); 924 err = xeon_phi_client_chan_open(xid, worker->domainid, info, frame, 925 type); 926 if (err_is_fail(err)) { 927 worker->state = XOMP_WORKER_ST_FAILURE; 928 /* 929 * XXX: if the gateway domain fails, the entire node is not 930 * operational. 931 */ 932 return err; 933 } 934#if XOMP_BENCH_ENABLED 935 if (xomp_bench_mem_add) { 936 cycles_t timer = bench_tsc(); 937 timer = bench_time_diff(worker->start, timer); 938 bench_ctl_add_run(xomp_bench_mem_add[1], &timer); 939 bench_ctl_add_run(xomp_bench_mem_add[2 + worker->index], &timer); 940 } 941#endif 942#if XOMP_BENCH_MASTER_EN 943 cycles_t duration = bench_tsc() - worker->start; 944 debug_printf("remote worker %lx: chan open took %lu cycles, %lu ms\n", 945 worker->id, duration, bench_tsc_to_ms(duration)); 946 remote_timer += duration; 947#endif 948 } else { 949 assert(worker->add_mem_st == 0x0); 950 951 worker->add_mem_st = 0x1; 952 953 struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq); 954 955 if (msg_st == NULL) { 956 return LIB_ERR_MALLOC_FAIL; 957 } 958 959 msg_st->send = gw_req_memory_call_tx; 960 msg_st->cleanup = NULL; 961 962 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 963 st->args.add_mem.vaddr = info; 964 st->args.add_mem.type = type; 965 966 txq_send(msg_st); 967 } 968 } 969 970 /* send the memory caps to the local workers directly */ 971 for (uint32_t i = 0; i < xmaster.local.num; ++i) { 972 worker = &xmaster.local.workers[i]; 973#if XOMP_BENCH_ENABLED 974 worker->start = bench_tsc(); 975#endif 976 assert(worker->type == XOMP_WORKER_TYPE_LOCAL); 977 assert(worker->add_mem_st == 0x0); 978 979 worker->add_mem_st = 0x1; 980 981 struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq); 982 983 if (msg_st == NULL) { 984 return LIB_ERR_MALLOC_FAIL; 985 } 986 987 msg_st->send = add_memory_call_tx; 988 msg_st->cleanup = NULL; 989 990 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 991 st->args.add_mem.frame = frame; 992 st->args.add_mem.vaddr = info; 993 994 // XXX: overwriting replicaton on the host for now 995 if (type == XOMP_FRAME_TYPE_REPL_RW) { 996 st->args.add_mem.type = XOMP_FRAME_TYPE_SHARED_RW; 997 } else { 998 st->args.add_mem.type = type; 999 } 1000 1001 txq_send(msg_st); 1002 } 1003 1004 /* wait for the replies */ 1005 1006 for (uint32_t i = 0; i < xmaster.remote.num; ++i) { 1007 worker = &xmaster.remote.workers[i]; 1008 if (worker->id & XOMP_WID_GATEWAY_FLAG) { 1009 continue; 1010 } 1011 while (worker->add_mem_st == 0x1) { 1012 err = event_dispatch(get_default_waitset()); 1013 if (err_is_fail(err)) { 1014 USER_PANIC_ERR(err, "event dispatch\n"); 1015 } 1016 } 1017 if (err_is_fail(worker->err)) { 1018 worker->state = XOMP_WORKER_ST_FAILURE; 1019 return worker->err; 1020 } 1021 worker->add_mem_st = 0x0; 1022 } 1023 1024 for (uint32_t i = 0; i < xmaster.local.num; ++i) { 1025 worker = &xmaster.local.workers[i]; 1026 assert(worker->type == XOMP_WORKER_TYPE_LOCAL); 1027 1028 while (worker->add_mem_st == 0x1) { 1029 err = event_dispatch(get_default_waitset()); 1030 if (err_is_fail(err)) { 1031 USER_PANIC_ERR(err, "event dispatch\n"); 1032 } 1033 } 1034 if (err_is_fail(worker->err)) { 1035 worker->state = XOMP_WORKER_ST_FAILURE; 1036 return worker->err; 1037 } 1038 worker->add_mem_st = 0x0; 1039 } 1040 1041#if XOMP_BENCH_MASTER_EN 1042 remote_timer /= (xmaster.remote.num ? xmaster.remote.num : 1); 1043 local_timer /= (xmaster.local.num ? xmaster.local.num : 1); 1044 1045 debug_printf("Avg mem add time remote: %lu cycles, %lu ms\n", 1046 remote_timer, bench_tsc_to_ms(remote_timer)); 1047 debug_printf("Avg mem add time local: %lu cycles, %lu ms\n", 1048 local_timer, bench_tsc_to_ms(local_timer)); 1049 1050#endif 1051 1052 return SYS_ERR_OK; 1053} 1054 1055/** 1056 * \brief builds the argument path based on the own binary name 1057 * 1058 * \param local pointer where to store the local path 1059 * \param remote pointer where to store the remote path 1060 * 1061 * \returns SYS_ERR_OK on success 1062 */ 1063errval_t xomp_master_build_path(char **local, 1064 char **remote) 1065{ 1066 size_t length, size = 0; 1067 1068 size += snprintf(NULL, 0, "/x86_64/sbin/%s", disp_name()) + 1; 1069 size += snprintf(NULL, 0, "/k1om/sbin/%s", disp_name()) + 1; 1070 1071 char *path = malloc(size); 1072 if (path == NULL) { 1073 return LIB_ERR_MALLOC_FAIL; 1074 } 1075 1076 length = snprintf(path, size, "/x86_64/sbin/%s", disp_name()); 1077 path[length] = '\0'; 1078 size -= (++length); 1079 1080 if (local) { 1081 *local = path; 1082 } 1083 1084 path += length; 1085 length = snprintf(path, size, "/k1om/sbin/%s", disp_name()); 1086 path[length] = '\0'; 1087 1088 if (remote) { 1089 *remote = path; 1090 } 1091 1092 return SYS_ERR_OK; 1093} 1094 1095/** 1096 * \brief executes some work on each worker domains 1097 * 1098 * \param task information about the task 1099 * 1100 * \returns SYS_ERR_OK on success 1101 * errval on error 1102 */ 1103errval_t xomp_master_do_work(struct xomp_task *task) 1104{ 1105 errval_t err; 1106 1107 if (!xomp_master_initialized) { 1108 return XOMP_ERR_MASTER_NOT_INIT; 1109 } 1110 1111#ifndef __k1om__ 1112 struct waitset *ws = get_default_waitset(); 1113#endif 1114 1115 uint64_t fn = 0; 1116 1117#if XOMP_BENCH_MASTER_EN 1118 remote_timer = 0; 1119 local_timer = 0; 1120#endif 1121 1122 uint32_t remote_threads = xomp_master_get_remote_threads(task->total_threads); 1123 uint32_t local_threads = xomp_master_get_local_threads(task->total_threads); 1124 1125 XMP_DEBUG("Executing task with %u workers host:%u, xphi:%ux%u]\n", 1126 task->total_threads, local_threads + 1, num_phi, remote_threads); 1127 1128 assert(local_threads <= xmaster.local.num); 1129 assert(remote_threads <= xmaster.remote.num); 1130 assert((local_threads + remote_threads + 1) == task->total_threads); 1131 1132 uint32_t fn_idx; 1133 char *fn_name; 1134 1135 if (remote_threads > 0) { 1136 /* 1137 * do the address translation for the remote workers 1138 */ 1139 err = spawn_symval_lookup_addr((genvaddr_t) task->fn, &fn_idx, &fn_name); 1140 if (err_is_fail(err)) { 1141 USER_PANIC_ERR(err, "looking up address\n"); 1142 return err; 1143 } 1144 } 1145 1146 /* overwrite the global num threads counter */ 1147 g_bomp_state->num_threads += ((local_threads) * (XOMP_VTHREAD_COUNT)); 1148 1149 uint32_t threadid = 1; 1150 1151 for (uint32_t i = 1; i < task->total_threads; ++i) { 1152 struct xomp_worker *worker = NULL; 1153 1154 if (i <= local_threads) { 1155 worker = &xmaster.local.workers[xmaster.local.next++]; 1156 assert(worker->type == XOMP_WORKER_TYPE_LOCAL); 1157 1158 if (xmaster.local.next == xmaster.local.num) { 1159 xmaster.local.next = 0; 1160 } 1161 1162 XMP_DEBUG("local worker id:%lx\n", worker->id); 1163 1164 fn = (uint64_t) task->fn; 1165 1166 } else { 1167 worker = &xmaster.remote.workers[xmaster.remote.next++]; 1168 assert(worker->type == XOMP_WORKER_TYPE_REMOTE); 1169 assert(fn_idx != 0); 1170 1171 if (xmaster.remote.next == xmaster.remote.num) { 1172 xmaster.remote.next = 0; 1173 } 1174 // build the function address based on the flag and the index 1175 fn = (uint64_t) fn_idx | XOMP_FN_INDEX_FLAG; 1176 1177 XMP_DEBUG("remote worker id: %016lx, function %s @ index %u\n", 1178 worker->id, fn_name, fn_idx); 1179 } 1180 1181#if XOMP_BENCH_ENABLED 1182 worker->start = bench_tsc(); 1183#endif 1184 1185 if (worker->state != XOMP_WORKER_ST_READY) { 1186 return XOMP_ERR_WORKER_STATE; 1187 } 1188 assert(worker->state == XOMP_WORKER_ST_READY); 1189 worker->state = XOMP_WORKER_ST_BUSY; 1190 1191 struct bomp_work *work = worker->tls; 1192 1193 work->fn = task->fn; 1194 1195 work->barrier = NULL; 1196 work->thread_id = threadid; 1197 work->num_threads = g_bomp_state->num_threads; 1198 1199 if (i <= local_threads) { 1200 work->num_vtreads = XOMP_VTHREADS; 1201 threadid += XOMP_VTHREADS; 1202 } else { 1203 work->num_vtreads = 1; 1204 threadid++; 1205 } 1206 1207 /* XXX: hack, we do not know how big the data section is... */ 1208 if (task->arg) { 1209 uint64_t *src = task->arg; 1210 uint64_t *dst = (uint64_t *) (work + 1); 1211 uint32_t bytes = 0; 1212 while (*src != 0 || bytes < 64) { 1213 *dst++ = *src++; 1214 bytes += 8; 1215 } 1216 } 1217 1218 struct txq_msg_st *msg_st = txq_msg_st_alloc(&worker->txq); 1219 1220 if (msg_st == NULL) { 1221 if (i == 1) { 1222 free(task); 1223 } 1224 return LIB_ERR_MALLOC_FAIL; 1225 } 1226 1227 msg_st->send = do_work_tx; 1228 msg_st->cleanup = NULL; 1229 1230 struct xomp_msg_st *st = (struct xomp_msg_st *) msg_st; 1231 st->args.do_work.arg = (uint64_t) work->data; 1232 st->args.do_work.fn = fn; 1233 st->args.do_work.id = (uint64_t) task; 1234 st->args.do_work.flags = 0; 1235 1236 txq_send(msg_st); 1237 1238#ifndef __k1om__ 1239 do { 1240 err = event_dispatch_non_block(ws); 1241 } while(err_is_ok(err)); 1242#endif 1243 } 1244 1245 return SYS_ERR_OK; 1246} 1247 1248 1249/** 1250 * \brief tells the gateway domains to update their local replicas 1251 * 1252 * \param frame capability of the shared frame 1253 * \param offset offset into the capability to copy 1254 * \param length number of bytes to copy 1255 * \param node which node to send the copy request to 1256 * \param direction UPDATE or WRITE BACK 1257 * 1258 * \return SYS_ERR_OK on sucess, 1259 * errval on failure 1260 * 1261 */ 1262errval_t xomp_master_copy_memory(struct capref frame, 1263 size_t offset, 1264 size_t length, 1265 uint16_t node, 1266 xomp_master_copy_t direction) 1267{ 1268 assert(!"NYI"); 1269 return SYS_ERR_OK; 1270} 1271 1272#if XOMP_BENCH_ENABLED 1273/** 1274 * \brief enables basic benchmarking facilities 1275 * 1276 * \param runs the number of runs of the experiment 1277 * \param flags flags which benchmarks to enable 1278 * 1279 * \returns SYS_ERR_OK on success 1280 */ 1281errval_t xomp_master_bench_enable(size_t runs, 1282 size_t nthreads, 1283 uint8_t flags) 1284{ 1285 bench_init(); 1286 1287 bench_ctl_t **mem = NULL; 1288 1289 if (!flags) { 1290 return -1; 1291 } 1292 1293 mem = calloc(2 + 2 * (2 + nthreads), sizeof(bench_ctl_t*)); 1294 1295 1296 if (flags & XOMP_MASTER_BENCH_SPAWN) { 1297 xomp_bench_spawn = mem; 1298 xomp_bench_spawn[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads); 1299 xomp_bench_spawn[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads); 1300 mem += (2); 1301 } 1302 1303 if (flags & XOMP_MASTER_BENCH_DO_WORK) { 1304 xomp_bench_do_work = mem; 1305 xomp_bench_do_work[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads * runs); 1306 xomp_bench_do_work[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads * runs); 1307 for (uint32_t i = 0; i < nthreads; ++i) { 1308 xomp_bench_do_work[i + 2] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, runs); 1309 } 1310 mem += (2 + nthreads); 1311 } 1312 1313 if (flags & XOMP_MASTER_BENCH_MEM_ADD) { 1314 xomp_bench_mem_add = mem; 1315 xomp_bench_mem_add[0] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads * runs); 1316 xomp_bench_mem_add[1] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, nthreads * runs); 1317 for (uint32_t i = 0; i < nthreads; ++i) { 1318 xomp_bench_mem_add[i + 2] = bench_ctl_init(BENCH_MODE_FIXEDRUNS, 1, runs); 1319 } 1320 } 1321 1322 return SYS_ERR_OK; 1323} 1324 1325/** 1326 * \brief prints the results of the enabled benchmarks 1327 */ 1328void xomp_master_bench_print_results(void) 1329{ 1330 cycles_t tsc_per_us = bench_tsc_per_us(); 1331 if (xomp_bench_spawn) { 1332 bench_ctl_dump_analysis(xomp_bench_spawn[0], 0, "SPAWN LOCAL", tsc_per_us); 1333 bench_ctl_dump_analysis(xomp_bench_spawn[1], 0, "SPAWN REMOTE", tsc_per_us); 1334 } 1335 1336 uint32_t nthreads = xmaster.local.num + xmaster.remote.num; 1337 1338 char buf[20]; 1339 1340 if (xomp_bench_do_work) { 1341 bench_ctl_dump_analysis(xomp_bench_do_work[0], 0, "WORK LOCAL", tsc_per_us); 1342 bench_ctl_dump_analysis(xomp_bench_do_work[1], 0, "WORK REMOTE", tsc_per_us); 1343 for (uint32_t i = 0; i < nthreads; ++i) { 1344 snprintf(buf, 20, "work w.%u", i+1); 1345 bench_ctl_dump_analysis(xomp_bench_spawn[2+i], 0, buf, tsc_per_us); 1346 } 1347 } 1348 1349 if (xomp_bench_mem_add) { 1350 bench_ctl_dump_analysis(xomp_bench_mem_add[0], 0, "MEM ADD LOCAL", tsc_per_us); 1351 bench_ctl_dump_analysis(xomp_bench_mem_add[1], 0, "MEM ADD REMOTE", tsc_per_us); 1352 for (uint32_t i = 0; i < nthreads; ++i) { 1353 snprintf(buf, 20, "memadd w.%u", i+1); 1354 bench_ctl_dump_analysis(xomp_bench_mem_add[2+i], 0, buf, tsc_per_us); 1355 } 1356 } 1357 1358} 1359#endif 1360