1/* Copyright (c) 2007-2009, Stanford University 2* All rights reserved. 3* 4* Redistribution and use in source and binary forms, with or without 5* modification, are permitted provided that the following conditions are met: 6* * Redistributions of source code must retain the above copyright 7* notice, this list of conditions and the following disclaimer. 8* * Redistributions in binary form must reproduce the above copyright 9* notice, this list of conditions and the following disclaimer in the 10* documentation and/or other materials provided with the distribution. 11* * Neither the name of Stanford University nor the names of its 12* contributors may be used to endorse or promote products derived from 13* this software without specific prior written permission. 14* 15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY 16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY 19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25*/ 26 27#include <assert.h> 28#include <sys/types.h> 29#include <stdlib.h> 30#include <stdio.h> 31#include <errno.h> 32#include <string.h> 33#include <inttypes.h> 34 35#include "map_reduce.h" 36#include "memory.h" 37#include "processor.h" 38#include "defines.h" 39#include "scheduler.h" 40#include "synch.h" 41#include "taskQ.h" 42#include "queue.h" 43#include "stddefines.h" 44#include "iterator.h" 45#include "locality.h" 46#include "struct.h" 47#include "tpool.h" 48 49#if !defined(_LINUX_) && !defined(_SOLARIS_) && !defined(BARRELFISH) 50#error OS not supported 51#endif 52 53/* Begin tunables. */ 54//#define INCREMENTAL_COMBINER 55 56#define DEFAULT_NUM_REDUCE_TASKS 256 57#define EXTENDED_NUM_REDUCE_TASKS (DEFAULT_NUM_REDUCE_TASKS * 128) 58#define DEFAULT_CACHE_SIZE (64 * 1024) 59//#define DEFAULT_CACHE_SIZE (8 * 1024) 60#define DEFAULT_KEYVAL_ARR_LEN 10 61#define DEFAULT_VALS_ARR_LEN 10 62#define L2_CACHE_LINE_SIZE 64 63/* End tunables. */ 64 65/* Debug printf */ 66#ifdef dprintf 67#undef dprintf 68#define dprintf(...) //printf(__VA_ARGS__) 69#endif 70 71#ifndef MIN 72#define MIN(X,Y) ((X) < (Y) ? (X) : (Y)) 73#endif 74#ifndef MAX 75#define MAX(X,Y) ((X) > (Y) ? (X) : (Y)) 76#endif 77#define OUT_PREFIX "[Phoenix] " 78 79/* A key and a value pair. */ 80typedef struct 81{ 82 /* TODO add static assertion to make sure this fits in L2 line */ 83 union { 84 struct { 85 int len; 86 int alloc_len; 87 int pos; 88 keyval_t *arr; 89 }; 90 char pad[L2_CACHE_LINE_SIZE]; 91 }; 92} keyval_arr_t; 93 94/* Array of keyvals_t. */ 95typedef struct 96{ 97 int len; 98 int alloc_len; 99 int pos; 100 keyvals_t *arr; 101} keyvals_arr_t; 102 103/* Thread information. 104 Denotes the id and the assigned CPU of a thread. */ 105typedef struct 106{ 107 union { 108 struct { 109 struct thread *tid; 110 int curr_task; 111 }; 112 char pad[L2_CACHE_LINE_SIZE]; 113 }; 114} thread_info_t; 115 116typedef struct 117{ 118 uintptr_t work_time; 119 uintptr_t user_time; 120 uintptr_t combiner_time; 121} thread_timing_t; 122 123typedef struct { 124 task_t task; 125 queue_elem_t queue_elem; 126} task_queued; 127 128/* Internal map reduce state. */ 129typedef struct 130{ 131 /* Parameters. */ 132 int num_map_tasks; /* # of map tasks. */ 133 int num_reduce_tasks; /* # of reduce tasks. */ 134 int chunk_size; /* # of units of data for each map task. */ 135 int num_procs; /* # of processors to run on. */ 136 int num_map_threads; /* # of threads for map tasks. */ 137 int num_reduce_threads; /* # of threads for reduce tasks. */ 138 int num_merge_threads; /* # of threads for merge tasks. */ 139 float key_match_factor; /* # of values likely to be matched 140 to the same key. */ 141 142 bool oneOutputQueuePerMapTask; /* One output queue per map task? */ 143 bool oneOutputQueuePerReduceTask; /* One output queue per reduce task? */ 144 145 int intermediate_task_alloc_len; 146 147 /* Callbacks. */ 148 map_t map; /* Map function. */ 149 reduce_t reduce; /* Reduce function. */ 150 combiner_t combiner; /* Combiner function. */ 151 partition_t partition; /* Partition function. */ 152 splitter_t splitter; /* Splitter function. */ 153 locator_t locator; /* Locator function. */ 154 key_cmp_t key_cmp; /* Key comparator function. */ 155 156 /* Structures. */ 157 map_reduce_args_t * args; /* Args passed in by the user. */ 158 thread_info_t * tinfo; /* Thread information array. */ 159 160 keyvals_arr_t **intermediate_vals; 161 /* Array to send to reduce task. */ 162 163 keyval_arr_t *final_vals; /* Array to send to merge task. */ 164 keyval_arr_t *merge_vals; /* Array to send to user. */ 165 166 uintptr_t splitter_pos; /* Tracks position in array_splitter(). */ 167 168 /* Policy for mapping threads to cpus. */ 169 sched_policy *schedPolicies[TASK_TYPE_TOTAL]; 170 171 172 taskQ_t *taskQueue; /* Queues of tasks. */ 173 tpool_t *tpool; /* Thread pool. */ 174} mr_env_t; 175 176#ifdef TIMING 177static pthread_key_t emit_time_key; 178#endif 179struct tls { 180 tpool_t *tpool; 181 mr_env_t *env; 182}; 183 184/* Data passed on to each worker thread. */ 185typedef struct 186{ 187 int cpu_id; /* CPU this thread is to run. */ 188 int thread_id; /* Thread index. */ 189 TASK_TYPE_T task_type; /* Assigned task type. */ 190 int merge_len; 191 keyval_arr_t *merge_input; 192 int merge_round; 193 mr_env_t *env; 194} thread_arg_t; 195 196static inline mr_env_t* env_init (map_reduce_args_t *); 197static void env_fini(mr_env_t* env); 198static inline void env_print (mr_env_t* env); 199static inline void start_workers (mr_env_t* env, thread_arg_t *); 200static inline void *start_my_work (thread_arg_t *); 201static inline void emit_inline (mr_env_t* env, void *, void *); 202static inline mr_env_t* get_env(void); 203static inline int getCurrThreadIndex (TASK_TYPE_T); 204static inline int getNumTaskThreads (mr_env_t* env, TASK_TYPE_T); 205static inline void insert_keyval ( 206 mr_env_t* env, keyval_arr_t *, void *, void *); 207static inline void insert_keyval_merged ( 208 mr_env_t* env, keyvals_arr_t *, void *, void *); 209 210static int array_splitter (void *, int, map_args_t *); 211static void identity_reduce (void *, iterator_t *itr); 212static inline void merge_results (mr_env_t* env, keyval_arr_t*, int); 213 214static void *map_worker (void *); 215static void *reduce_worker (void *); 216static void *merge_worker (void *); 217 218static int gen_map_tasks (mr_env_t* env); 219static int gen_map_tasks_split(mr_env_t* env, queue_t* q); 220static int gen_reduce_tasks (mr_env_t* env); 221 222static void map(mr_env_t* mr); 223static void reduce(mr_env_t* mr); 224static void merge(mr_env_t* mr); 225 226#ifndef INCREMENTAL_COMBINER 227static void run_combiner (mr_env_t* env, int thread_idx); 228#endif 229 230int 231map_reduce_init () 232{ 233 struct tls *tls = calloc(1, sizeof(tls)); 234 thread_set_tls(tls); 235 /* CHECK_ERROR (pthread_key_create (&tpool_key, NULL)); */ 236 237 /* CHECK_ERROR (pthread_setspecific (tpool_key, NULL)); */ 238 239 return 0; 240} 241 242int 243map_reduce (map_reduce_args_t * args) 244{ 245 struct timeval begin, end; 246 mr_env_t* env; 247 248 assert (args != NULL); 249 assert (args->map != NULL); 250 assert (args->key_cmp != NULL); 251 assert (args->unit_size > 0); 252 assert (args->result != NULL); 253 254 get_time (&begin); 255 256 /* Initialize environment. */ 257 env = env_init (args); 258 if (env == NULL) { 259 /* could not allocate environment */ 260 return -1; 261 } 262 //env_print (env); 263 env->taskQueue = tq_init (env->num_map_threads); 264 assert (env->taskQueue != NULL); 265 266 /* Reuse thread pool. */ 267 struct tls *tls = thread_get_tls(); 268 if(tls == NULL) { 269 tls = calloc(1, sizeof(struct tls)); 270 thread_set_tls(tls); 271 } 272 env->tpool = tls->tpool; 273 if (env->tpool == NULL) { 274 tpool_t *tpool; 275 276 tpool = tpool_create (env->num_map_threads); 277 CHECK_ERROR (tpool == NULL); 278 279 env->tpool = tpool; 280 tls->tpool = tpool; 281 } 282 283#ifdef TIMING 284 CHECK_ERROR (pthread_key_create (&emit_time_key, NULL)); 285#endif 286 tls->env = env; 287 288 get_time (&end); 289 290#ifdef TIMING 291 fprintf (stderr, "library init: %u\n", time_diff (&end, &begin)); 292#endif 293 294 /* Run map tasks and get intermediate values. */ 295 get_time (&begin); 296 map (env); 297 get_time (&end); 298 299#ifdef TIMING 300 fprintf (stderr, "map phase: %u\n", time_diff (&end, &begin)); 301#endif 302 303 dprintf("In scheduler, all map tasks are done, now scheduling reduce tasks\n"); 304 305 /* Run reduce tasks and get final values. */ 306 get_time (&begin); 307 reduce (env); 308 get_time (&end); 309 310#ifdef TIMING 311 fprintf (stderr, "reduce phase: %u\n", time_diff (&end, &begin)); 312#endif 313 314 dprintf("In scheduler, all reduce tasks are done, now scheduling merge tasks\n"); 315 316 get_time (&begin); 317 merge (env); 318 get_time (&end); 319 320#ifdef TIMING 321 fprintf (stderr, "merge phase: %u\n", time_diff (&end, &begin)); 322#endif 323 324 /* Cleanup. */ 325 get_time (&begin); 326 env_fini(env); 327 get_time (&end); 328 329#ifdef TIMING 330 fprintf (stderr, "library finalize: %u\n", time_diff (&end, &begin)); 331 CHECK_ERROR (pthread_key_delete (emit_time_key)); 332#endif 333 334 return 0; 335} 336 337int map_reduce_finalize () 338{ 339 tpool_t *tpool; 340 341 struct tls *tls = thread_get_tls(); 342 tpool = tls->tpool; 343 CHECK_ERROR (tpool_destroy (tpool)); 344 345 return 0; 346} 347 348/** 349 * Frees memory used by map reduce environment once map reduce has completed. 350 * Frees environment pointer. 351 */ 352static void env_fini (mr_env_t* env) 353{ 354 int i; 355 356 tq_finalize (env->taskQueue); 357 358 for (i = 0; i < TASK_TYPE_TOTAL; i++) 359 sched_policy_put(env->schedPolicies[i]); 360 361 phoenix_mem_free (env); 362} 363 364/* Setup global state. */ 365static mr_env_t* 366env_init (map_reduce_args_t *args) 367{ 368 mr_env_t *env; 369 int i; 370 int num_procs; 371 372 env = phoenix_mem_malloc (sizeof (mr_env_t)); 373 if (env == NULL) { 374 return NULL; 375 } 376 377 mem_memset (env, 0, sizeof (mr_env_t)); 378 379 env->args = args; 380 381 /* 1. Determine paramenters. */ 382 383 /* Determine the number of processors to use. */ 384 num_procs = proc_get_num_cpus (); 385 if (args->num_procs > 0) 386 { 387 /* Can't have more processors than there are physically present. */ 388 CHECK_ERROR (args->num_procs > num_procs); 389 num_procs = args->num_procs; 390 } 391 env->num_procs = num_procs; 392 393 env->oneOutputQueuePerMapTask = false; 394 env->oneOutputQueuePerReduceTask = args->use_one_queue_per_task; 395 396 /* Determine the number of threads to schedule for each type of task. */ 397 env->num_map_threads = (args->num_map_threads > 0) ? 398 args->num_map_threads : num_procs; 399 400 env->num_reduce_threads = (args->num_reduce_threads > 0) ? 401 args->num_reduce_threads : num_procs; 402 403 env->num_merge_threads = (args->num_merge_threads > 0) ? 404 args->num_merge_threads : env->num_reduce_threads; 405 406 if (env->oneOutputQueuePerReduceTask == false) { 407 env->num_merge_threads = env->num_reduce_threads / 2; 408 } 409 410 /* Assign at least one merge thread. */ 411 env->num_merge_threads = MAX(env->num_merge_threads, 1); 412 413 env->key_match_factor = (args->key_match_factor > 0) ? 414 args->key_match_factor : 2; 415 416 /* Set num_map_tasks to 0 since we cannot anticipate how many map tasks 417 there would be. This does not matter since map workers will loop 418 until there is no more data left. */ 419 env->num_map_tasks = 0; 420 421 if (args->L1_cache_size > 0) 422 { 423 env->chunk_size = args->L1_cache_size / args->unit_size; 424 env->num_reduce_tasks = (int) 425 ( (env->key_match_factor * args->data_size) / 426 args->L1_cache_size); 427 } 428 else 429 { 430 env->chunk_size = DEFAULT_CACHE_SIZE / args->unit_size; 431 env->num_reduce_tasks = (int) 432 ( (env->key_match_factor * args->data_size) / 433 DEFAULT_CACHE_SIZE ); 434 } 435 436 if (env->num_reduce_tasks <= 0) env->num_reduce_tasks = 1; 437 if (env->chunk_size <= 0) env->chunk_size = 1; 438 439 if (env->oneOutputQueuePerReduceTask == false) 440 { 441 env->num_reduce_tasks = EXTENDED_NUM_REDUCE_TASKS; 442 } 443 else 444 { 445 env->num_reduce_tasks = DEFAULT_NUM_REDUCE_TASKS; 446 } 447 env->num_merge_threads = MIN ( 448 env->num_merge_threads, env->num_reduce_tasks / 2); 449 450 if (env->oneOutputQueuePerMapTask) 451 env->intermediate_task_alloc_len = 452 args->data_size / env->chunk_size + 1; 453 else 454 env->intermediate_task_alloc_len = env->num_map_threads; 455 456 /* Register callbacks. */ 457 env->map = args->map; 458 env->reduce = (args->reduce) ? args->reduce : identity_reduce; 459 env->combiner = args->combiner; 460 env->partition = (args->partition) ? args->partition : default_partition; 461 env->splitter = (args->splitter) ? args->splitter : array_splitter; 462 env->locator = args->locator; 463 env->key_cmp = args->key_cmp; 464 465 /* 2. Initialize structures. */ 466 467 env->intermediate_vals = (keyvals_arr_t **)phoenix_mem_malloc ( 468 env->intermediate_task_alloc_len * sizeof (keyvals_arr_t*)); 469 470 for (i = 0; i < env->intermediate_task_alloc_len; i++) 471 { 472 env->intermediate_vals[i] = (keyvals_arr_t *)phoenix_mem_calloc ( 473 env->num_reduce_tasks, sizeof (keyvals_arr_t)); 474 } 475 476 if (env->oneOutputQueuePerReduceTask) 477 { 478 env->final_vals = 479 (keyval_arr_t *)phoenix_mem_calloc ( 480 env->num_reduce_tasks, sizeof (keyval_arr_t)); 481 } 482 else 483 { 484 env->final_vals = 485 (keyval_arr_t *)phoenix_mem_calloc ( 486 env->num_reduce_threads, sizeof (keyval_arr_t)); 487 } 488 489 for (i = 0; i < TASK_TYPE_TOTAL; i++) { 490 /* TODO: Make this tunable */ 491 env->schedPolicies[i] = sched_policy_get(SCHED_POLICY_STRAND_FILL); 492 } 493 494 return env; 495} 496 497void env_print (mr_env_t* env) 498{ 499 printf (OUT_PREFIX "num_reduce_tasks = %u\n", env->num_reduce_tasks); 500 printf (OUT_PREFIX "num_procs = %u\n", env->num_procs); 501 printf (OUT_PREFIX "proc_offset = %u\n", env->args->proc_offset); 502 printf (OUT_PREFIX "num_map_threads = %u\n", env->num_map_threads); 503 printf (OUT_PREFIX "num_reduce_threads = %u\n", env->num_reduce_threads); 504 printf (OUT_PREFIX "num_merge_threads = %u\n", env->num_merge_threads); 505} 506 507/** 508 * Execute the same function as worker. 509 * @param th_arg arguments 510 */ 511static void *start_my_work (thread_arg_t* th_arg) 512{ 513 switch (th_arg->task_type) { 514 case TASK_TYPE_MAP: 515 return map_worker (th_arg); 516 break; 517 case TASK_TYPE_REDUCE: 518 return reduce_worker (th_arg); 519 break; 520 case TASK_TYPE_MERGE: 521 return merge_worker (th_arg); 522 break; 523 default: 524 assert (0); 525 break; 526 } 527 528 return NULL; 529} 530 531static void start_thread_pool ( 532 tpool_t *tpool, TASK_TYPE_T task_type, thread_arg_t** th_arg_array, int num_workers) 533{ 534 thread_func thread_func; 535 536 switch (task_type) { 537 case TASK_TYPE_MAP: 538 thread_func = map_worker; 539 break; 540 case TASK_TYPE_REDUCE: 541 thread_func = reduce_worker; 542 break; 543 case TASK_TYPE_MERGE: 544 thread_func = merge_worker; 545 break; 546 default: 547 assert (0); 548 thread_func = NULL; 549 break; 550 } 551 552 CHECK_ERROR (tpool_set (tpool, thread_func, (void **)th_arg_array, num_workers)); 553 CHECK_ERROR (tpool_begin (tpool)); 554} 555 556/** start_workers() 557 * thread_func - function pointer to process splitter data 558 * splitter_func - splitter function pointer 559 * splitter_init - splitter_init function pointer 560 * runs map tasks in a new thread on each the available processors. 561 * returns pointer intermediate value array 562 */ 563static void 564start_workers (mr_env_t* env, thread_arg_t *th_arg) 565{ 566 int thread_index; 567 TASK_TYPE_T task_type; 568 int num_threads; 569 int cpu; 570 intptr_t ret_val; 571 thread_arg_t **th_arg_array; 572 void **rets; 573#ifdef TIMING 574 uint64_t work_time = 0; 575 uint64_t user_time = 0; 576 uint64_t combiner_time = 0; 577#endif 578 579 assert(th_arg != NULL); 580 581 task_type = th_arg->task_type; 582 num_threads = getNumTaskThreads (env, task_type); 583 584 env->tinfo = (thread_info_t *)phoenix_mem_calloc ( 585 num_threads, sizeof (thread_info_t)); 586 th_arg->env = env; 587 588 th_arg_array = (thread_arg_t **)phoenix_mem_malloc ( 589 sizeof (thread_arg_t *) * num_threads); 590 CHECK_ERROR (th_arg_array == NULL); 591 592 for (thread_index = 0; thread_index < num_threads; ++thread_index) { 593 594 cpu = sched_thr_to_cpu (env->schedPolicies[task_type], thread_index + env->args->proc_offset); 595 th_arg->cpu_id = cpu; 596 th_arg->thread_id = thread_index; 597 598 th_arg_array[thread_index] = phoenix_mem_malloc (sizeof (thread_arg_t)); 599 CHECK_ERROR (th_arg_array[thread_index] == NULL); 600 mem_memcpy (th_arg_array[thread_index], th_arg, sizeof (thread_arg_t)); 601 } 602 603 start_thread_pool ( 604 env->tpool, task_type, &th_arg_array[1], num_threads - 1); 605 606 dprintf("Status: All %d threads have been created\n", num_threads); 607 608 ret_val = (intptr_t)start_my_work (th_arg_array[0]); 609#ifdef TIMING 610 thread_timing_t *timing = (thread_timing_t *)ret_val; 611 work_time += timing->work_time; 612 user_time += timing->user_time; 613 combiner_time += timing->combiner_time; 614 phoenix_mem_free (timing); 615#endif 616 phoenix_mem_free (th_arg_array[0]); 617 618 /* Barrier, wait for all threads to finish. */ 619 CHECK_ERROR (tpool_wait (env->tpool)); 620 rets = tpool_get_results (env->tpool); 621 622 for (thread_index = 1; thread_index < num_threads; ++thread_index) 623 { 624#ifdef TIMING 625 ret_val = (intptr_t)rets[thread_index - 1]; 626 thread_timing_t *timing = (thread_timing_t *)ret_val; 627 work_time += timing->work_time; 628 user_time += timing->user_time; 629 combiner_time += timing->combiner_time; 630 phoenix_mem_free (timing); 631#endif 632 phoenix_mem_free (th_arg_array[thread_index]); 633 } 634 635 phoenix_mem_free (th_arg_array); 636 phoenix_mem_free (rets); 637 638#ifdef TIMING 639 switch (task_type) 640 { 641 case TASK_TYPE_MAP: 642 fprintf (stderr, "map work time: %" PRIu64 "\n", 643 work_time / num_threads); 644 fprintf (stderr, "map user time: %" PRIu64 "\n", 645 user_time / num_threads); 646 fprintf (stderr, "map combiner time: %" PRIu64 "\n", 647 combiner_time / num_threads); 648 break; 649 650 case TASK_TYPE_REDUCE: 651 fprintf (stderr, "reduce work time: %" PRIu64 "\n", 652 work_time / num_threads); 653 fprintf (stderr, "reduce user time: %" PRIu64 "\n", 654 user_time / num_threads); 655 break; 656 657 case TASK_TYPE_MERGE: 658 fprintf (stderr, "merge work time: %" PRIu64 "\n", 659 work_time / num_threads); 660 661 default: 662 break; 663 } 664#endif 665 666 phoenix_mem_free(env->tinfo); 667 dprintf("Status: All tasks have completed\n"); 668} 669 670typedef struct { 671 uint64_t run_time; 672 int lgrp; 673} map_worker_task_args_t; 674 675/** 676 * Dequeue the latest task and run it 677 * @return true if ran a task, false otherwise 678 */ 679static bool map_worker_do_next_task ( 680 mr_env_t *env, int thread_index, map_worker_task_args_t *args) 681{ 682 struct timeval begin, end; 683 int alloc_len; 684 int curr_task; 685 task_t map_task; 686 map_args_t thread_func_arg; 687 bool oneOutputQueuePerMapTask; 688 int lgrp = args->lgrp; 689 690 oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask; 691 692 alloc_len = env->intermediate_task_alloc_len; 693 694 /* Get new map task. */ 695 if (tq_dequeue (env->taskQueue, &map_task, lgrp, thread_index) == 0) { 696 /* no more map tasks */ 697 return false; 698 } 699 700 curr_task = env->num_map_tasks++; 701 env->tinfo[thread_index].curr_task = curr_task; 702 703 thread_func_arg.length = map_task.len; 704 thread_func_arg.data = (void *)map_task.data; 705 706 dprintf("Task %d: cpu_id -> %d - Started\n", curr_task, th_arg->cpu_id); 707 708 /* Perform map task. */ 709 get_time (&begin); 710 env->map (&thread_func_arg); 711 get_time (&end); 712 713#ifdef TIMING 714 args->run_time = time_diff (&end, &begin); 715#endif 716 717 dprintf("Task %d: cpu_id -> %d - Done\n", curr_task, th_arg->cpu_id); 718 719 return true; 720} 721 722/** 723 * map_worker() 724 * args - pointer to thread_arg_t 725 * returns 0 on success 726 * This runs thread_func() until there is no more data from the splitter(). 727 * The pointer to results are stored in return_values array. 728 */ 729static void * 730map_worker (void *args) 731{ 732 assert (args != NULL); 733 734 struct timeval begin, end; 735 struct timeval work_begin, work_end; 736 uintptr_t user_time = 0; 737 thread_arg_t *th_arg = (thread_arg_t *)args; 738 mr_env_t *env = th_arg->env; 739 int thread_index = th_arg->thread_id; 740 int num_assigned = 0; 741 map_worker_task_args_t mwta; 742#ifdef TIMING 743 uintptr_t work_time = 0; 744 uintptr_t combiner_time = 0; 745#endif 746 747 env->tinfo[thread_index].tid = thread_self(); 748 749 /* Bind thread. */ 750 CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0); 751 752 struct tls *tls = thread_get_tls(); 753 if(tls == NULL) { 754 tls = calloc(1, sizeof(struct tls)); 755 thread_set_tls(tls); 756 } 757 758 tls->env = env; 759#ifdef TIMING 760 CHECK_ERROR (pthread_setspecific (emit_time_key, 0)); 761#endif 762 763 mwta.lgrp = loc_get_lgrp(); 764 765 get_time (&work_begin); 766 while (map_worker_do_next_task (env, thread_index, &mwta)) { 767 user_time += mwta.run_time; 768 num_assigned++; 769 } 770 get_time (&work_end); 771 772#ifdef TIMING 773 work_time = time_diff (&work_end, &work_begin); 774#endif 775 776 get_time (&begin); 777 778 /* Apply combiner to local map results. */ 779#ifndef INCREMENTAL_COMBINER 780 if (env->combiner != NULL) 781 run_combiner (env, thread_index); 782#endif 783 784 get_time (&end); 785 786#ifdef TIMING 787 combiner_time = time_diff (&end, &begin); 788#endif 789 790 dprintf("Status: Total of %d tasks were assigned to cpu_id %d\n", 791 num_assigned, th_arg->cpu_id); 792 793 /* Unbind thread. */ 794 CHECK_ERROR (proc_unbind_thread () != 0); 795 796#ifdef TIMING 797 thread_timing_t *timing = calloc (1, sizeof (thread_timing_t)); 798 uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key); 799 timing->user_time = user_time - emit_time; 800 timing->work_time = work_time - timing->user_time; 801 timing->combiner_time = combiner_time; 802 return (void *)timing; 803#else 804 return (void *)0; 805#endif 806} 807 808typedef struct { 809 struct iterator_t itr; 810 uint64_t run_time; 811 int num_map_threads; 812 int lgrp; 813} reduce_worker_task_args_t; 814 815/** 816 * Dequeue next reduce task and do it 817 * TODO Refactor this even more. It is still gross. 818 * @return true if did work, false otherwise 819 */ 820static bool reduce_worker_do_next_task ( 821 mr_env_t *env, int thread_index, reduce_worker_task_args_t *args) 822{ 823 struct timeval begin, end; 824 intptr_t curr_reduce_task = 0; 825 keyvals_t *min_key_val, *next_min; 826 task_t reduce_task; 827 int num_map_threads; 828 int curr_thread; 829 int lgrp = args->lgrp; 830 831 /* Get the next reduce task. */ 832 if (tq_dequeue (env->taskQueue, &reduce_task, lgrp, thread_index) == 0) { 833 /* No more reduce tasks. */ 834 return false; 835 } 836 837 curr_reduce_task = (intptr_t)reduce_task.id; 838 839 env->tinfo[thread_index].curr_task = curr_reduce_task; 840 841 num_map_threads = args->num_map_threads; 842 843 args->run_time = 0; 844 min_key_val = NULL; 845 next_min = NULL; 846 847 do { 848 for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) { 849 keyvals_t *curr_key_val; 850 keyvals_arr_t *thread_array; 851 852 /* Find the next array to search. */ 853 thread_array = 854 &env->intermediate_vals[curr_thread][curr_reduce_task]; 855 856 /* Check if the current processor array has been 857 completely searched. */ 858 if (thread_array->pos >= thread_array->len) { 859 continue; 860 } 861 862 /* Get the next key in the processor array. */ 863 curr_key_val = &thread_array->arr[thread_array->pos]; 864 865 /* If the key matches the minimum value, 866 add the value to the list of values for that key. */ 867 if (min_key_val != NULL && 868 !env->key_cmp(curr_key_val->key, min_key_val->key)) { 869 CHECK_ERROR (iter_add (&args->itr, curr_key_val)); 870 thread_array->pos += 1; 871 --curr_thread; 872 } 873 /* Find the location of the next min. */ 874 else if (next_min == NULL || 875 env->key_cmp(curr_key_val->key, next_min->key) < 0) 876 { 877 next_min = curr_key_val; 878 } 879 } 880 881 if (min_key_val != NULL) { 882 keyvals_t *curr_key_val; 883 884 if (env->reduce != identity_reduce) { 885 get_time (&begin); 886 env->reduce (min_key_val->key, &args->itr); 887 get_time (&end); 888#ifdef TIMING 889 args->run_time += time_diff (&end, &begin); 890#endif 891 } else { 892 env->reduce (min_key_val->key, &args->itr); 893 } 894 895 /* Free up memory */ 896 iter_rewind (&args->itr); 897 while (iter_next_list (&args->itr, &curr_key_val)) { 898 val_t *vals, *next; 899 900 vals = curr_key_val->vals; 901 while (vals != NULL) { 902 next = vals->next_val; 903 phoenix_mem_free (vals); 904 vals = next; 905 } 906 } 907 908 iter_reset(&args->itr); 909 } 910 911 min_key_val = next_min; 912 next_min = NULL; 913 914 /* See if there are any elements left. */ 915 for(curr_thread = 0; 916 curr_thread < num_map_threads && 917 env->intermediate_vals[curr_thread][curr_reduce_task].pos >= 918 env->intermediate_vals[curr_thread][curr_reduce_task].len; 919 curr_thread++); 920 } while (curr_thread != num_map_threads); 921 922 /* Free up the memory. */ 923 for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) { 924 keyvals_arr_t *arr; 925 926 arr = &env->intermediate_vals[curr_thread][curr_reduce_task]; 927 if (arr->alloc_len != 0) 928 phoenix_mem_free(arr->arr); 929 } 930 931 return true; 932} 933 934static void * 935reduce_worker (void *args) 936{ 937 assert(args != NULL); 938 939 struct timeval work_begin, work_end; 940 uintptr_t user_time = 0; 941 thread_arg_t *th_arg = (thread_arg_t *)args; 942 int thread_index = th_arg->thread_id; 943 mr_env_t *env = th_arg->env; 944 reduce_worker_task_args_t rwta; 945 int num_map_threads; 946#ifdef TIMING 947 uintptr_t work_time = 0; 948#endif 949 950 env->tinfo[thread_index].tid = thread_self(); 951 952 /* Bind thread. */ 953 CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0); 954 955 struct tls *tls = thread_get_tls(); 956 if(tls == NULL) { 957 tls = calloc(1, sizeof(struct tls)); 958 thread_set_tls(tls); 959 } 960 tls->env = env; 961#ifdef TIMING 962 CHECK_ERROR (pthread_setspecific (emit_time_key, 0)); 963#endif 964 965 if (env->oneOutputQueuePerMapTask) 966 num_map_threads = env->num_map_tasks; 967 else 968 num_map_threads = env->num_map_threads; 969 970 /* Assuming !oneOutputQueuePerMapTask */ 971 CHECK_ERROR (iter_init (&rwta.itr, env->num_map_threads)); 972 rwta.num_map_threads = num_map_threads; 973 rwta.lgrp = loc_get_lgrp(); 974 975 get_time (&work_begin); 976 977 while (reduce_worker_do_next_task (env, thread_index, &rwta)) { 978 user_time += rwta.run_time; 979 } 980 981 get_time (&work_end); 982 983#ifdef TIMING 984 work_time = time_diff (&work_end, &work_begin); 985#endif 986 987 iter_finalize (&rwta.itr); 988 989 /* Unbind thread. */ 990 CHECK_ERROR (proc_unbind_thread () != 0); 991 992#ifdef TIMING 993 thread_timing_t *timing = calloc (1, sizeof (thread_timing_t)); 994 uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key); 995 timing->user_time = user_time - emit_time; 996 timing->work_time = work_time - timing->user_time; 997 return (void *)timing; 998#else 999 return (void *)0; 1000#endif 1001} 1002 1003/** merge_worker() 1004* args - pointer to thread_arg_t 1005* returns 0 on success 1006*/ 1007static void * 1008merge_worker (void *args) 1009{ 1010 assert(args != NULL); 1011 1012 struct timeval work_begin, work_end; 1013 thread_arg_t *th_arg = (thread_arg_t *)args; 1014 int thread_index = th_arg->thread_id; 1015 mr_env_t *env = th_arg->env; 1016 int cpu; 1017#ifdef TIMING 1018 uintptr_t work_time = 0; 1019#endif 1020 1021 env->tinfo[thread_index].tid = thread_self(); 1022 1023 /* Bind thread. 1024 Spread out the merge workers as much as possible. */ 1025 if (env->oneOutputQueuePerReduceTask) 1026 cpu = th_arg->cpu_id * (1 << (th_arg->merge_round - 1)); 1027 else 1028 cpu = th_arg->cpu_id * (1 << th_arg->merge_round); 1029 1030 CHECK_ERROR (proc_bind_thread (cpu) != 0); 1031 1032 struct tls *tls = thread_get_tls(); 1033 if(tls == NULL) { 1034 tls = calloc(1, sizeof(struct tls)); 1035 thread_set_tls(tls); 1036 } 1037 tls->env = env; 1038 1039 /* Assumes num_merge_threads is modified before each call. */ 1040 int length = th_arg->merge_len / env->num_merge_threads; 1041 int modlen = th_arg->merge_len % env->num_merge_threads; 1042 1043 /* Let's make some progress here. */ 1044 if (length <= 1) { 1045 length = 2; 1046 modlen = th_arg->merge_len % 2; 1047 } 1048 1049 int pos = thread_index * length + 1050 ((thread_index < modlen) ? thread_index : modlen); 1051 1052 if (pos < th_arg->merge_len) { 1053 1054 keyval_arr_t *vals = &th_arg->merge_input[pos]; 1055 1056 dprintf("Thread %d: cpu_id -> %d - Started\n", 1057 thread_index, th_arg->cpu_id); 1058 1059 get_time (&work_begin); 1060 merge_results (th_arg->env, vals, length + (thread_index < modlen)); 1061 get_time (&work_end); 1062 1063#ifdef TIMING 1064 work_time = time_diff (&work_end, &work_begin); 1065#endif 1066 1067 dprintf("Thread %d: cpu_id -> %d - Done\n", 1068 thread_index, th_arg->cpu_id); 1069 } 1070 1071 /* Unbind thread. */ 1072 CHECK_ERROR (proc_unbind_thread () != 0); 1073 1074#ifdef TIMING 1075 thread_timing_t *timing = calloc (1, sizeof (thread_timing_t)); 1076 timing->work_time = work_time; 1077 return (void *)timing; 1078#else 1079 return (void *)0; 1080#endif 1081} 1082 1083/** 1084 * Split phase of map task generation, creates all tasks and throws in single 1085 * queue. 1086 * 1087 * @param q queue to place tasks into 1088 * @return number of tasks generated, or 0 on error 1089 */ 1090static int gen_map_tasks_split (mr_env_t* env, queue_t* q) 1091{ 1092 int cur_task_id; 1093 map_args_t args; 1094 task_queued *task = NULL; 1095 1096 /* split until complete */ 1097 cur_task_id = 0; 1098 while (env->splitter (env->args->task_data, env->chunk_size, &args)) 1099 { 1100 task = (task_queued *)phoenix_mem_malloc (sizeof (task_queued)); 1101 task->task.id = cur_task_id; 1102 task->task.len = (uint64_t)args.length; 1103 task->task.data = (uint64_t)args.data; 1104 1105 queue_push_back (q, &task->queue_elem); 1106 1107 ++cur_task_id; 1108 } 1109 1110 if (task == NULL) { 1111 /* not enough memory, undo what's been done, error out */ 1112 queue_elem_t *queue_elem; 1113 1114 while (queue_pop_front (q, &queue_elem)) 1115 { 1116 task = queue_entry (queue_elem, task_queued, queue_elem); 1117 assert (task != NULL); 1118 phoenix_mem_free (task); 1119 } 1120 1121 return 0; 1122 } 1123 1124 return cur_task_id; 1125} 1126 1127/** 1128 * User provided own splitter function but did not supply a locator function. 1129 * Nothing to do here about locality, so just try to put consecutive tasks 1130 * in the same task queue. 1131 */ 1132static int gen_map_tasks_distribute_lgrp ( 1133 mr_env_t* env, int num_map_tasks, queue_t* q) 1134{ 1135 queue_elem_t *queue_elem; 1136 int tasks_per_lgrp; 1137 int tasks_leftover; 1138 int num_lgrps; 1139 int lgrp; 1140 1141 num_lgrps = env->num_map_threads / loc_get_lgrp_size(); 1142 if (num_lgrps == 0) num_lgrps = 1; 1143 1144 tasks_per_lgrp = num_map_tasks / num_lgrps; 1145 tasks_leftover = num_map_tasks - tasks_per_lgrp * num_lgrps; 1146 1147 /* distribute tasks across locality groups */ 1148 for (lgrp = 0; lgrp < num_lgrps; ++lgrp) 1149 { 1150 int remaining_cur_lgrp_tasks; 1151 1152 remaining_cur_lgrp_tasks = tasks_per_lgrp; 1153 if (tasks_leftover > 0) { 1154 remaining_cur_lgrp_tasks++; 1155 tasks_leftover--; 1156 } 1157 do { 1158 task_queued *task; 1159 1160 if (queue_pop_front (q, &queue_elem) == 0) { 1161 /* queue is empty, everything is distributed */ 1162 break; 1163 } 1164 1165 task = queue_entry (queue_elem, task_queued, queue_elem); 1166 assert (task != NULL); 1167 1168 if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) < 0) { 1169 phoenix_mem_free (task); 1170 return -1; 1171 } 1172 1173 phoenix_mem_free (task); 1174 remaining_cur_lgrp_tasks--; 1175 } while (remaining_cur_lgrp_tasks); 1176 1177 if (remaining_cur_lgrp_tasks != 0) { 1178 break; 1179 } 1180 } 1181 1182 return 0; 1183} 1184 1185/** 1186 * We can try to queue tasks based on locality info 1187 */ 1188static int gen_map_tasks_distribute_locator ( 1189 mr_env_t* env, int num_map_tasks, queue_t* q) 1190{ 1191 queue_elem_t *queue_elem; 1192 1193 while (queue_pop_front (q, &queue_elem)) 1194 { 1195 task_queued *task; 1196 int lgrp; 1197 map_args_t args; 1198 1199 task = queue_entry (queue_elem, task_queued, queue_elem); 1200 assert (task != NULL); 1201 1202 args.length = task->task.len; 1203 args.data = (void*)task->task.data; 1204 1205 if (env->locator != NULL) { 1206 void *addr; 1207 addr = env->locator (&args); 1208 lgrp = loc_mem_to_lgrp (addr); 1209 } else { 1210 lgrp = loc_mem_to_lgrp (args.data); 1211 } 1212 1213 task->task.v[3] = lgrp; /* For debugging. */ 1214 if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) != 0) { 1215 phoenix_mem_free (task); 1216 return -1; 1217 } 1218 1219 phoenix_mem_free (task); 1220 } 1221 1222 return 0; 1223} 1224 1225/** 1226 * Distributes tasks to threads 1227 * @param num_map_tasks number of map tasks in queue 1228 * @param q queue of tasks to distribute 1229 * @return 0 on success, less than 0 on failure 1230 */ 1231static int gen_map_tasks_distribute ( 1232 mr_env_t* env, int num_map_tasks, queue_t* q) 1233{ 1234 if ((env->splitter != array_splitter) && 1235 (env->locator == NULL)) { 1236 return gen_map_tasks_distribute_lgrp (env, num_map_tasks, q); 1237 } else { 1238 return gen_map_tasks_distribute_locator (env, num_map_tasks, q); 1239 } 1240 1241 return 0; 1242} 1243 1244/** 1245 * Generate all map tasks and queue them up 1246 * @return number of map tasks created if successful, negative value on error 1247 */ 1248static int gen_map_tasks (mr_env_t* env) 1249{ 1250 int ret; 1251 int num_map_tasks; 1252 queue_t temp_queue; 1253 int num_map_threads; 1254 1255 queue_init (&temp_queue); 1256 1257 num_map_tasks = gen_map_tasks_split (env, &temp_queue); 1258 if (num_map_tasks <= 0) { 1259 return -1; 1260 } 1261 1262 num_map_threads = env->num_map_threads; 1263 if (num_map_tasks < num_map_threads) 1264 num_map_threads = num_map_tasks; 1265 tq_reset (env->taskQueue, num_map_threads); 1266 1267 ret = gen_map_tasks_distribute (env, num_map_tasks, &temp_queue); 1268 if (ret == 0) ret = num_map_tasks; 1269 1270 return num_map_tasks; 1271} 1272 1273static int gen_reduce_tasks (mr_env_t* env) 1274{ 1275 int ret, tid; 1276 int tasks_per_thread; 1277 int tasks_leftover; 1278 uint64_t task_id; 1279 task_t reduce_task; 1280 1281 tq_reset (env->taskQueue, env->num_reduce_threads); 1282 1283 tasks_per_thread = env->num_reduce_tasks / env->num_reduce_threads; 1284 tasks_leftover = env->num_reduce_tasks - 1285 tasks_per_thread * env->num_map_threads; 1286 1287 task_id = 0; 1288 for (tid = 0; tid < env->num_reduce_threads; ++tid) { 1289 int remaining_cur_thread_tasks; 1290 1291 remaining_cur_thread_tasks = tasks_per_thread; 1292 if (tasks_leftover > 0) { 1293 remaining_cur_thread_tasks += 1; 1294 --tasks_leftover; 1295 } 1296 do { 1297 if (task_id == env->num_reduce_tasks) { 1298 return 0; 1299 } 1300 1301 /* New task. */ 1302 reduce_task.id = task_id; 1303 1304 /* TODO: Implement locality optimization. */ 1305 ret = tq_enqueue_seq (env->taskQueue, &reduce_task, -1); 1306 if (ret < 0) { 1307 return -1; 1308 } 1309 ++task_id; 1310 --remaining_cur_thread_tasks; 1311 } while (remaining_cur_thread_tasks); 1312 } 1313 1314 return 0; 1315} 1316 1317#ifndef INCREMENTAL_COMBINER 1318static void run_combiner (mr_env_t* env, int thread_index) 1319{ 1320 assert (! env->oneOutputQueuePerMapTask); 1321 1322 int i, j; 1323 keyvals_arr_t *my_output; 1324 keyvals_t *reduce_pos; 1325 void *reduced_val; 1326 iterator_t itr; 1327 val_t *val, *next; 1328 1329 CHECK_ERROR (iter_init (&itr, 1)); 1330 1331 for (i = 0; i < env->num_reduce_tasks; ++i) 1332 { 1333 my_output = &env->intermediate_vals[thread_index][i]; 1334 for (j = 0; j < my_output->len; ++j) 1335 { 1336 reduce_pos = &(my_output->arr[j]); 1337 if (reduce_pos->len == 0) continue; 1338 1339 CHECK_ERROR (iter_add (&itr, reduce_pos)); 1340 1341 reduced_val = env->combiner (&itr); 1342 1343 /* Shed off trailing chunks. */ 1344 assert (reduce_pos->vals); 1345 val = reduce_pos->vals->next_val; 1346 while (val) 1347 { 1348 next = val->next_val; 1349 phoenix_mem_free (val); 1350 val = next; 1351 } 1352 1353 /* Update the entry. */ 1354 val = reduce_pos->vals; 1355 val->next_insert_pos = 0; 1356 val->next_val = NULL; 1357 val->array[val->next_insert_pos++] = reduced_val; 1358 reduce_pos->len = 1; 1359 1360 iter_reset (&itr); 1361 } 1362 } 1363 1364 iter_finalize (&itr); 1365} 1366#endif 1367 1368/** emit_intermediate() 1369 * inserts the key, val pair into the intermediate array 1370 */ 1371void 1372emit_intermediate (void *key, void *val, int key_size) 1373{ 1374 struct timeval begin, end; 1375 int curr_thread = -1; 1376 int curr_task; 1377 bool oneOutputQueuePerMapTask; 1378 keyvals_arr_t *arr; 1379 mr_env_t *env; 1380 1381 get_time (&begin); 1382 1383 env = get_env(); 1384 if (curr_thread < 0) 1385 curr_thread = getCurrThreadIndex (TASK_TYPE_MAP); 1386 1387 oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask; 1388 1389 if (oneOutputQueuePerMapTask) 1390 curr_task = env->tinfo[curr_thread].curr_task; 1391 else 1392 curr_task = curr_thread; 1393 1394 int reduce_pos = env->partition (env->num_reduce_tasks, key, key_size); 1395 reduce_pos %= env->num_reduce_tasks; 1396 1397 /* Insert sorted in global queue at pos curr_proc */ 1398 arr = &env->intermediate_vals[curr_task][reduce_pos]; 1399 1400 insert_keyval_merged (env, arr, key, val); 1401 1402 get_time (&end); 1403 1404#ifdef TIMING 1405 uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key); 1406 uintptr_t emit_time = time_diff (&end, &begin); 1407 total_emit_time += emit_time; 1408 CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time)); 1409#endif 1410} 1411 1412/** emit_inline () 1413 * inserts the key, val pair into the final output array 1414 */ 1415static inline void 1416emit_inline (mr_env_t* env, void *key, void *val) 1417{ 1418 keyval_arr_t *arr; 1419 int curr_red_queue; 1420 int thread_index = -1; 1421 1422 if (thread_index < 0) 1423 thread_index = getCurrThreadIndex (TASK_TYPE_REDUCE); 1424 1425 if (env->oneOutputQueuePerReduceTask) { 1426 curr_red_queue = env->tinfo[thread_index].curr_task; 1427 } 1428 else { 1429 curr_red_queue = thread_index; 1430 } 1431 1432 /* Insert sorted in global queue at pos curr_proc */ 1433 arr = &env->final_vals[curr_red_queue]; 1434 insert_keyval (env, arr, key, val); 1435} 1436 1437/** emit () 1438 */ 1439void 1440emit (void *key, void *val) 1441{ 1442 struct timeval begin, end; 1443 1444 get_time (&begin); 1445 1446 emit_inline (get_env(), key, val); 1447 1448 get_time (&end); 1449 1450#ifdef TIMING 1451 uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key); 1452 uintptr_t emit_time = time_diff (&end, &begin); 1453 total_emit_time += emit_time; 1454 CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time)); 1455#endif 1456} 1457 1458static inline void 1459insert_keyval_merged (mr_env_t* env, keyvals_arr_t *arr, void *key, void *val) 1460{ 1461 int high = arr->len, low = -1, next; 1462 int cmp = 1; 1463 keyvals_t *insert_pos; 1464 val_t *new_vals; 1465 1466 assert(arr->len <= arr->alloc_len); 1467 if (arr->len > 0) 1468 cmp = env->key_cmp(arr->arr[arr->len - 1].key, key); 1469 1470 if (cmp > 0) 1471 { 1472 /* Binary search the array to find the key. */ 1473 while (high - low > 1) 1474 { 1475 next = (high + low) / 2; 1476 if (env->key_cmp(arr->arr[next].key, key) > 0) 1477 high = next; 1478 else 1479 low = next; 1480 } 1481 1482 if (low < 0) low = 0; 1483 if (arr->len > 0 && 1484 (cmp = env->key_cmp(arr->arr[low].key, key)) < 0) 1485 low++; 1486 } 1487 else if (cmp < 0) 1488 low = arr->len; 1489 else 1490 low = arr->len-1; 1491 1492 if (arr->len == 0 || cmp) 1493 { 1494 /* if array is full, double and copy over. */ 1495 if (arr->len == arr->alloc_len) 1496 { 1497 if (arr->alloc_len == 0) 1498 { 1499 arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN; 1500 arr->arr = (keyvals_t *) 1501 phoenix_mem_malloc (arr->alloc_len * sizeof (keyvals_t)); 1502 } 1503 else 1504 { 1505 arr->alloc_len *= 2; 1506 arr->arr = (keyvals_t *) 1507 phoenix_mem_realloc (arr->arr, arr->alloc_len * sizeof (keyvals_t)); 1508 } 1509 } 1510 1511 /* Insert into array. */ 1512 memmove (&arr->arr[low+1], &arr->arr[low], 1513 (arr->len - low) * sizeof(keyvals_t)); 1514 1515 arr->arr[low].key = key; 1516 arr->arr[low].len = 0; 1517 arr->arr[low].vals = NULL; 1518 arr->len++; 1519 } 1520 1521 insert_pos = &(arr->arr[low]); 1522 1523 if (insert_pos->vals == NULL) 1524 { 1525 /* Allocate a chunk for the first time. */ 1526 new_vals = phoenix_mem_malloc 1527 (sizeof (val_t) + DEFAULT_VALS_ARR_LEN * sizeof (void *)); 1528 assert (new_vals); 1529 1530 new_vals->size = DEFAULT_VALS_ARR_LEN; 1531 new_vals->next_insert_pos = 0; 1532 new_vals->next_val = NULL; 1533 1534 insert_pos->vals = new_vals; 1535 } 1536 else if (insert_pos->vals->next_insert_pos >= insert_pos->vals->size) 1537 { 1538#ifdef INCREMENTAL_COMBINER 1539 if (env->combiner != NULL) { 1540 iterator_t itr; 1541 void *reduced_val; 1542 1543 CHECK_ERROR (iter_init (&itr, 1)); 1544 CHECK_ERROR (iter_add (&itr, insert_pos)); 1545 1546 reduced_val = env->combiner (&itr); 1547 1548 insert_pos->vals->array[0] = reduced_val; 1549 insert_pos->vals->next_insert_pos = 1; 1550 insert_pos->len = 1; 1551 1552 iter_finalize (&itr); 1553 } else { 1554#endif 1555 /* Need a new chunk. */ 1556 int alloc_size; 1557 1558 alloc_size = insert_pos->vals->size * 2; 1559 new_vals = phoenix_mem_malloc (sizeof (val_t) + alloc_size * sizeof (void *)); 1560 assert (new_vals); 1561 1562 new_vals->size = alloc_size; 1563 new_vals->next_insert_pos = 0; 1564 new_vals->next_val = insert_pos->vals; 1565 1566 insert_pos->vals = new_vals; 1567#ifdef INCREMENTAL_COMBINER 1568 } 1569#endif 1570 } 1571 1572 insert_pos->vals->array[insert_pos->vals->next_insert_pos++] = val; 1573 1574 insert_pos->len += 1; 1575} 1576 1577static inline void 1578insert_keyval (mr_env_t* env, keyval_arr_t *arr, void *key, void *val) 1579{ 1580 int high = arr->len, low = -1, next; 1581 int cmp = 1; 1582 1583 assert(arr->len <= arr->alloc_len); 1584 1585 /* If array is full, double and copy over. */ 1586 if (arr->len == arr->alloc_len) 1587 { 1588 if (arr->alloc_len == 0) 1589 { 1590 arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN; 1591 arr->arr = (keyval_t*)phoenix_mem_malloc(arr->alloc_len * sizeof(keyval_t)); 1592 } 1593 else 1594 { 1595 arr->alloc_len *= 2; 1596 arr->arr = (keyval_t*)phoenix_mem_realloc(arr->arr, arr->alloc_len * sizeof(keyval_t)); 1597 } 1598 } 1599 1600 if (env->oneOutputQueuePerReduceTask == false) 1601 { 1602 /* Need to sort. */ 1603 if (arr->len > 0) 1604 cmp = env->key_cmp(arr->arr[arr->len - 1].key, key); 1605 1606 if (cmp > 0) 1607 { 1608 /* Binary search the array to find the key. */ 1609 while (high - low > 1) 1610 { 1611 next = (high + low) / 2; 1612 if (env->key_cmp(arr->arr[next].key, key) > 0) 1613 high = next; 1614 else 1615 low = next; 1616 } 1617 1618 if (low < 0) low = 0; 1619 if (arr->len > 0 && env->key_cmp(arr->arr[low].key, key) < 0) 1620 low++; 1621 } 1622 else 1623 low = arr->len; 1624 1625 1626 /* Insert into array. */ 1627 memmove (&arr->arr[low+1], &arr->arr[low], 1628 (arr->len - low) * sizeof(keyval_t)); 1629 1630 arr->arr[low].key = key; 1631 arr->arr[low].val = val; 1632 } 1633 else 1634 { 1635 /* No need to sort. Just append. */ 1636 arr->arr[arr->len].key = key; 1637 arr->arr[arr->len].val = val; 1638 } 1639 1640 arr->len++; 1641} 1642 1643static inline void 1644merge_results (mr_env_t* env, keyval_arr_t *vals, int length) 1645{ 1646 int data_idx; 1647 int total_num_keys = 0; 1648 int i; 1649 int curr_thread = -1; 1650 1651 if (curr_thread < 0) 1652 curr_thread = getCurrThreadIndex (TASK_TYPE_MERGE); 1653 1654 for (i = 0; i < length; i++) { 1655 total_num_keys += vals[i].len; 1656 } 1657 1658 env->merge_vals[curr_thread].len = total_num_keys; 1659 env->merge_vals[curr_thread].alloc_len = total_num_keys; 1660 env->merge_vals[curr_thread].pos = 0; 1661 env->merge_vals[curr_thread].arr = (keyval_t *) 1662 phoenix_mem_malloc(sizeof(keyval_t) * total_num_keys); 1663 1664 for (data_idx = 0; data_idx < total_num_keys; data_idx++) { 1665 /* For each keyval_t. */ 1666 int min_idx; 1667 keyval_t *min_keyval; 1668 1669 for (i = 0; i < length && vals[i].pos >= vals[i].len; i++); 1670 1671 if (i == length) break; 1672 1673 /* Determine the minimum key. */ 1674 min_idx = i; 1675 min_keyval = &vals[i].arr[vals[i].pos]; 1676 1677 for (i++; i < length; i++) { 1678 if (vals[i].pos < vals[i].len) 1679 { 1680 int cmp_ret; 1681 cmp_ret = env->key_cmp( 1682 min_keyval->key, 1683 vals[i].arr[vals[i].pos].key); 1684 1685 if (cmp_ret > 0) { 1686 min_idx = i; 1687 min_keyval = &vals[i].arr[vals[i].pos]; 1688 } 1689 } 1690 } 1691 1692 mem_memcpy (&env->merge_vals[curr_thread].arr[data_idx], 1693 min_keyval, sizeof(keyval_t)); 1694 vals[min_idx].pos += 1; 1695 } 1696 1697 for (i = 0; i < length; i++) { 1698 phoenix_mem_free(vals[i].arr); 1699 } 1700} 1701 1702static inline int 1703getNumTaskThreads (mr_env_t* env, TASK_TYPE_T task_type) 1704{ 1705 int num_threads; 1706 1707 switch (task_type) 1708 { 1709 case TASK_TYPE_MAP: 1710 num_threads = env->num_map_threads; 1711 break; 1712 1713 case TASK_TYPE_REDUCE: 1714 num_threads = env->num_reduce_threads; 1715 break; 1716 1717 case TASK_TYPE_MERGE: 1718 num_threads = env->num_merge_threads; 1719 break; 1720 1721 default: 1722 assert (0); 1723 num_threads = env->num_map_threads; 1724 break; 1725 } 1726 1727 return num_threads; 1728} 1729 1730/** getCurrThreadIndex() 1731 * Returns the processor the current thread is running on 1732 */ 1733static inline int 1734getCurrThreadIndex (TASK_TYPE_T task_type) 1735{ 1736 int i; 1737 int num_threads; 1738 mr_env_t *env; 1739 struct thread *mytid; 1740 1741 env = get_env(); 1742 num_threads = getNumTaskThreads (env, task_type); 1743 mytid = thread_self(); 1744 for (i = 0; i < num_threads && env->tinfo[i].tid != mytid; i++); 1745 1746 assert(i != num_threads); 1747 1748 return i; 1749} 1750 1751/** array_splitter() 1752 * 1753 */ 1754int 1755array_splitter (void *data_in, int req_units, map_args_t *out) 1756{ 1757 assert(out != NULL); 1758 1759 mr_env_t *env; 1760 int unit_size; 1761 int data_units; 1762 1763 env = get_env(); 1764 unit_size = env->args->unit_size; 1765 data_units = env->args->data_size / unit_size; 1766 1767 /* End of data reached, return FALSE. */ 1768 if (env->splitter_pos >= data_units) 1769 return 0; 1770 1771 /* Set the start of the next data. */ 1772 out->data = ((void *)env->args->task_data) + env->splitter_pos*unit_size; 1773 1774 /* Determine the nominal length. */ 1775 if (env->splitter_pos + req_units > data_units) 1776 out->length = data_units - env->splitter_pos; 1777 else 1778 out->length = req_units; 1779 1780 env->splitter_pos += out->length; 1781 1782 /* Return true since the out data is valid. */ 1783 return 1; 1784} 1785 1786void 1787identity_reduce (void *key, iterator_t *itr) 1788{ 1789 void *val; 1790 mr_env_t *env; 1791 1792 env = get_env(); 1793 while (iter_next (itr, &val)) 1794 { 1795 emit_inline (env, key, val); 1796 } 1797} 1798 1799int 1800default_partition (int num_reduce_tasks, void* key, int key_size) 1801{ 1802 unsigned long hash = 5381; 1803 char *str = (char *)key; 1804 int i; 1805 1806 for (i = 0; i < key_size; i++) 1807 { 1808 hash = ((hash << 5) + hash) + ((int)str[i]); /* hash * 33 + c */ 1809 } 1810 1811 return hash % num_reduce_tasks; 1812} 1813 1814/** 1815 * Run map tasks and get intermediate values 1816 */ 1817static void map (mr_env_t* env) 1818{ 1819 thread_arg_t th_arg; 1820 int num_map_tasks; 1821 1822 num_map_tasks = gen_map_tasks (env); 1823 assert (num_map_tasks >= 0); 1824 1825 env->num_map_tasks = num_map_tasks; 1826 if (num_map_tasks < env->num_map_threads) 1827 env->num_map_threads = num_map_tasks; 1828 1829 //printf (OUT_PREFIX "num_map_tasks = %d\n", env->num_map_tasks); 1830 1831 mem_memset (&th_arg, 0, sizeof(thread_arg_t)); 1832 th_arg.task_type = TASK_TYPE_MAP; 1833 1834 start_workers (env, &th_arg); 1835} 1836 1837/** 1838 * Run reduce tasks and get final values. 1839 */ 1840static void reduce (mr_env_t* env) 1841{ 1842 int i; 1843 thread_arg_t th_arg; 1844 1845 CHECK_ERROR (gen_reduce_tasks (env)); 1846 1847 mem_memset (&th_arg, 0, sizeof(thread_arg_t)); 1848 th_arg.task_type = TASK_TYPE_REDUCE; 1849 1850 start_workers (env, &th_arg); 1851 1852 /* Cleanup intermediate results. */ 1853 for (i = 0; i < env->intermediate_task_alloc_len; ++i) 1854 { 1855 phoenix_mem_free (env->intermediate_vals[i]); 1856 } 1857 phoenix_mem_free (env->intermediate_vals); 1858} 1859 1860/** 1861 * Merge all reduced data 1862 */ 1863static void merge (mr_env_t* env) 1864{ 1865 thread_arg_t th_arg; 1866 1867 mem_memset (&th_arg, 0, sizeof (thread_arg_t)); 1868 th_arg.task_type = TASK_TYPE_MERGE; 1869 1870 if (env->oneOutputQueuePerReduceTask) { 1871 th_arg.merge_len = env->num_reduce_tasks; 1872 } else { 1873 th_arg.merge_len = env->num_reduce_threads; 1874 } 1875 th_arg.merge_input = env->final_vals; 1876 1877 if (th_arg.merge_len <= 1) { 1878 /* Already merged, nothing to do here */ 1879 env->args->result->data = env->final_vals->arr; 1880 env->args->result->length = env->final_vals->len; 1881 1882 phoenix_mem_free(env->final_vals); 1883 1884 return; 1885 } 1886 1887 /* have work to merge! */ 1888 while (th_arg.merge_len > 1) { 1889 th_arg.merge_round += 1; 1890 1891 /* This is the worst case length, 1892 depending on the value of num_merge_threads. */ 1893 env->merge_vals = (keyval_arr_t*) 1894 phoenix_mem_malloc (env->num_merge_threads * sizeof(keyval_arr_t)); 1895 1896 /* Run merge tasks and get merge values. */ 1897 start_workers (env, &th_arg); 1898 1899 phoenix_mem_free (th_arg.merge_input); 1900 th_arg.merge_len = env->num_merge_threads; 1901 1902 env->num_merge_threads /= 2; 1903 if (env->num_merge_threads == 0) 1904 env->num_merge_threads = 1; 1905 1906 th_arg.merge_input = env->merge_vals; 1907 } 1908 1909 env->args->result->data = env->merge_vals[0].arr; 1910 env->args->result->length = env->merge_vals[0].len; 1911 1912 phoenix_mem_free(env->merge_vals); 1913} 1914 1915static inline mr_env_t* get_env (void) 1916{ 1917 struct tls *tls = thread_get_tls(); 1918 if(tls == NULL) { 1919 tls = calloc(1, sizeof(struct tls)); 1920 thread_set_tls(tls); 1921 } 1922 return tls->env; 1923} 1924