1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the names of its
12*       contributors may be used to endorse or promote products derived from
13*       this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/
26
27#include <assert.h>
28#include <sys/types.h>
29#include <stdlib.h>
30#include <stdio.h>
31#include <errno.h>
32#include <string.h>
33#include <inttypes.h>
34
35#include "map_reduce.h"
36#include "memory.h"
37#include "processor.h"
38#include "defines.h"
39#include "scheduler.h"
40#include "synch.h"
41#include "taskQ.h"
42#include "queue.h"
43#include "stddefines.h"
44#include "iterator.h"
45#include "locality.h"
46#include "struct.h"
47#include "tpool.h"
48
49#if !defined(_LINUX_) && !defined(_SOLARIS_) && !defined(BARRELFISH)
50#error OS not supported
51#endif
52
53/* Begin tunables. */
54//#define INCREMENTAL_COMBINER
55
56#define DEFAULT_NUM_REDUCE_TASKS    256
57#define EXTENDED_NUM_REDUCE_TASKS   (DEFAULT_NUM_REDUCE_TASKS * 128)
58#define DEFAULT_CACHE_SIZE          (64 * 1024)
59//#define DEFAULT_CACHE_SIZE        (8 * 1024)
60#define DEFAULT_KEYVAL_ARR_LEN      10
61#define DEFAULT_VALS_ARR_LEN        10
62#define L2_CACHE_LINE_SIZE          64
63/* End tunables. */
64
65/* Debug printf */
66#ifdef dprintf
67#undef dprintf
68#define dprintf(...) //printf(__VA_ARGS__)
69#endif
70
71#ifndef MIN
72#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
73#endif
74#ifndef MAX
75#define MAX(X,Y) ((X) > (Y) ? (X) : (Y))
76#endif
77#define OUT_PREFIX "[Phoenix] "
78
79/* A key and a value pair. */
80typedef struct
81{
82    /* TODO add static assertion to make sure this fits in L2 line */
83    union {
84        struct {
85            int         len;
86            int         alloc_len;
87            int         pos;
88            keyval_t    *arr;
89        };
90        char pad[L2_CACHE_LINE_SIZE];
91    };
92} keyval_arr_t;
93
94/* Array of keyvals_t. */
95typedef struct
96{
97    int len;
98    int alloc_len;
99    int pos;
100    keyvals_t *arr;
101} keyvals_arr_t;
102
103/* Thread information.
104   Denotes the id and the assigned CPU of a thread. */
105typedef struct
106{
107    union {
108        struct {
109            struct thread *tid;
110            int curr_task;
111        };
112        char pad[L2_CACHE_LINE_SIZE];
113    };
114} thread_info_t;
115
116typedef struct
117{
118    uintptr_t work_time;
119    uintptr_t user_time;
120    uintptr_t combiner_time;
121} thread_timing_t;
122
123typedef struct {
124    task_t          task;
125    queue_elem_t    queue_elem;
126} task_queued;
127
128/* Internal map reduce state. */
129typedef struct
130{
131    /* Parameters. */
132    int num_map_tasks;              /* # of map tasks. */
133    int num_reduce_tasks;           /* # of reduce tasks. */
134    int chunk_size;                 /* # of units of data for each map task. */
135    int num_procs;                  /* # of processors to run on. */
136    int num_map_threads;            /* # of threads for map tasks. */
137    int num_reduce_threads;         /* # of threads for reduce tasks. */
138    int num_merge_threads;          /* # of threads for merge tasks. */
139    float key_match_factor;         /* # of values likely to be matched
140                                       to the same key. */
141
142    bool oneOutputQueuePerMapTask;      /* One output queue per map task? */
143    bool oneOutputQueuePerReduceTask;   /* One output queue per reduce task? */
144
145    int intermediate_task_alloc_len;
146
147    /* Callbacks. */
148    map_t map;                      /* Map function. */
149    reduce_t reduce;                /* Reduce function. */
150    combiner_t combiner;            /* Combiner function. */
151    partition_t partition;          /* Partition function. */
152    splitter_t splitter;            /* Splitter function. */
153    locator_t locator;              /* Locator function. */
154    key_cmp_t key_cmp;              /* Key comparator function. */
155
156    /* Structures. */
157    map_reduce_args_t * args;       /* Args passed in by the user. */
158    thread_info_t * tinfo;          /* Thread information array. */
159
160    keyvals_arr_t **intermediate_vals;
161                                    /* Array to send to reduce task. */
162
163    keyval_arr_t *final_vals;       /* Array to send to merge task. */
164    keyval_arr_t *merge_vals;       /* Array to send to user. */
165
166    uintptr_t splitter_pos;         /* Tracks position in array_splitter(). */
167
168    /* Policy for mapping threads to cpus. */
169    sched_policy    *schedPolicies[TASK_TYPE_TOTAL];
170
171
172    taskQ_t         *taskQueue;     /* Queues of tasks. */
173    tpool_t         *tpool;         /* Thread pool. */
174} mr_env_t;
175
176#ifdef TIMING
177static pthread_key_t emit_time_key;
178#endif
179struct tls {
180    tpool_t     *tpool;
181    mr_env_t    *env;
182};
183
184/* Data passed on to each worker thread. */
185typedef struct
186{
187    int             cpu_id;             /* CPU this thread is to run. */
188    int             thread_id;          /* Thread index. */
189    TASK_TYPE_T     task_type;          /* Assigned task type. */
190    int             merge_len;
191    keyval_arr_t    *merge_input;
192    int             merge_round;
193    mr_env_t        *env;
194} thread_arg_t;
195
196static inline mr_env_t* env_init (map_reduce_args_t *);
197static void env_fini(mr_env_t* env);
198static inline void env_print (mr_env_t* env);
199static inline void start_workers (mr_env_t* env, thread_arg_t *);
200static inline void *start_my_work (thread_arg_t *);
201static inline void emit_inline (mr_env_t* env, void *, void *);
202static inline mr_env_t* get_env(void);
203static inline int getCurrThreadIndex (TASK_TYPE_T);
204static inline int getNumTaskThreads (mr_env_t* env, TASK_TYPE_T);
205static inline void insert_keyval (
206    mr_env_t* env, keyval_arr_t *, void *, void *);
207static inline void insert_keyval_merged (
208    mr_env_t* env, keyvals_arr_t *, void *, void *);
209
210static int array_splitter (void *, int, map_args_t *);
211static void identity_reduce (void *, iterator_t *itr);
212static inline void merge_results (mr_env_t* env, keyval_arr_t*, int);
213
214static void *map_worker (void *);
215static void *reduce_worker (void *);
216static void *merge_worker (void *);
217
218static int gen_map_tasks (mr_env_t* env);
219static int gen_map_tasks_split(mr_env_t* env, queue_t* q);
220static int gen_reduce_tasks (mr_env_t* env);
221
222static void map(mr_env_t* mr);
223static void reduce(mr_env_t* mr);
224static void merge(mr_env_t* mr);
225
226#ifndef INCREMENTAL_COMBINER
227static void run_combiner (mr_env_t* env, int thread_idx);
228#endif
229
230int
231map_reduce_init ()
232{
233    struct tls *tls = calloc(1, sizeof(tls));
234    thread_set_tls(tls);
235    /* CHECK_ERROR (pthread_key_create (&tpool_key, NULL)); */
236
237    /* CHECK_ERROR (pthread_setspecific (tpool_key, NULL)); */
238
239    return 0;
240}
241
242int
243map_reduce (map_reduce_args_t * args)
244{
245    struct timeval begin, end;
246    mr_env_t* env;
247
248    assert (args != NULL);
249    assert (args->map != NULL);
250    assert (args->key_cmp != NULL);
251    assert (args->unit_size > 0);
252    assert (args->result != NULL);
253
254    get_time (&begin);
255
256    /* Initialize environment. */
257    env = env_init (args);
258    if (env == NULL) {
259       /* could not allocate environment */
260       return -1;
261    }
262    //env_print (env);
263    env->taskQueue = tq_init (env->num_map_threads);
264    assert (env->taskQueue != NULL);
265
266    /* Reuse thread pool. */
267    struct tls *tls = thread_get_tls();
268    if(tls == NULL) {
269        tls = calloc(1, sizeof(struct tls));
270        thread_set_tls(tls);
271    }
272    env->tpool = tls->tpool;
273    if (env->tpool == NULL) {
274        tpool_t *tpool;
275
276        tpool = tpool_create (env->num_map_threads);
277        CHECK_ERROR (tpool == NULL);
278
279        env->tpool = tpool;
280        tls->tpool = tpool;
281    }
282
283#ifdef TIMING
284    CHECK_ERROR (pthread_key_create (&emit_time_key, NULL));
285#endif
286    tls->env = env;
287
288    get_time (&end);
289
290#ifdef TIMING
291    fprintf (stderr, "library init: %u\n", time_diff (&end, &begin));
292#endif
293
294    /* Run map tasks and get intermediate values. */
295    get_time (&begin);
296    map (env);
297    get_time (&end);
298
299#ifdef TIMING
300    fprintf (stderr, "map phase: %u\n", time_diff (&end, &begin));
301#endif
302
303    dprintf("In scheduler, all map tasks are done, now scheduling reduce tasks\n");
304
305    /* Run reduce tasks and get final values. */
306    get_time (&begin);
307    reduce (env);
308    get_time (&end);
309
310#ifdef TIMING
311    fprintf (stderr, "reduce phase: %u\n", time_diff (&end, &begin));
312#endif
313
314    dprintf("In scheduler, all reduce tasks are done, now scheduling merge tasks\n");
315
316    get_time (&begin);
317    merge (env);
318    get_time (&end);
319
320#ifdef TIMING
321    fprintf (stderr, "merge phase: %u\n", time_diff (&end, &begin));
322#endif
323
324    /* Cleanup. */
325    get_time (&begin);
326    env_fini(env);
327    get_time (&end);
328
329#ifdef TIMING
330    fprintf (stderr, "library finalize: %u\n", time_diff (&end, &begin));
331    CHECK_ERROR (pthread_key_delete (emit_time_key));
332#endif
333
334    return 0;
335}
336
337int map_reduce_finalize ()
338{
339    tpool_t *tpool;
340
341    struct tls *tls = thread_get_tls();
342    tpool = tls->tpool;
343    CHECK_ERROR (tpool_destroy (tpool));
344
345    return 0;
346}
347
348/**
349 * Frees memory used by map reduce environment once map reduce has completed.
350 * Frees environment pointer.
351 */
352static void env_fini (mr_env_t* env)
353{
354    int i;
355
356    tq_finalize (env->taskQueue);
357
358    for (i = 0; i < TASK_TYPE_TOTAL; i++)
359        sched_policy_put(env->schedPolicies[i]);
360
361    phoenix_mem_free (env);
362}
363
364/* Setup global state. */
365static mr_env_t*
366env_init (map_reduce_args_t *args)
367{
368    mr_env_t    *env;
369    int         i;
370    int         num_procs;
371
372    env = phoenix_mem_malloc (sizeof (mr_env_t));
373    if (env == NULL) {
374       return NULL;
375    }
376
377    mem_memset (env, 0, sizeof (mr_env_t));
378
379    env->args = args;
380
381    /* 1. Determine paramenters. */
382
383    /* Determine the number of processors to use. */
384    num_procs = proc_get_num_cpus ();
385    if (args->num_procs > 0)
386    {
387        /* Can't have more processors than there are physically present. */
388        CHECK_ERROR (args->num_procs > num_procs);
389        num_procs = args->num_procs;
390    }
391    env->num_procs = num_procs;
392
393    env->oneOutputQueuePerMapTask = false;
394    env->oneOutputQueuePerReduceTask = args->use_one_queue_per_task;
395
396    /* Determine the number of threads to schedule for each type of task. */
397    env->num_map_threads = (args->num_map_threads > 0) ?
398        args->num_map_threads : num_procs;
399
400    env->num_reduce_threads = (args->num_reduce_threads > 0) ?
401        args->num_reduce_threads : num_procs;
402
403    env->num_merge_threads = (args->num_merge_threads > 0) ?
404        args->num_merge_threads : env->num_reduce_threads;
405
406    if (env->oneOutputQueuePerReduceTask == false) {
407        env->num_merge_threads = env->num_reduce_threads / 2;
408    }
409
410    /* Assign at least one merge thread. */
411    env->num_merge_threads = MAX(env->num_merge_threads, 1);
412
413    env->key_match_factor = (args->key_match_factor > 0) ?
414        args->key_match_factor : 2;
415
416    /* Set num_map_tasks to 0 since we cannot anticipate how many map tasks
417       there would be. This does not matter since map workers will loop
418       until there is no more data left. */
419    env->num_map_tasks = 0;
420
421    if (args->L1_cache_size > 0)
422    {
423        env->chunk_size = args->L1_cache_size / args->unit_size;
424        env->num_reduce_tasks = (int)
425            ( (env->key_match_factor * args->data_size) /
426                args->L1_cache_size);
427    }
428    else
429    {
430        env->chunk_size = DEFAULT_CACHE_SIZE / args->unit_size;
431        env->num_reduce_tasks = (int)
432            ( (env->key_match_factor * args->data_size) /
433                DEFAULT_CACHE_SIZE );
434    }
435
436    if (env->num_reduce_tasks <= 0) env->num_reduce_tasks = 1;
437    if (env->chunk_size <= 0) env->chunk_size = 1;
438
439    if (env->oneOutputQueuePerReduceTask == false)
440    {
441        env->num_reduce_tasks = EXTENDED_NUM_REDUCE_TASKS;
442    }
443    else
444    {
445        env->num_reduce_tasks = DEFAULT_NUM_REDUCE_TASKS;
446    }
447    env->num_merge_threads = MIN (
448        env->num_merge_threads, env->num_reduce_tasks / 2);
449
450    if (env->oneOutputQueuePerMapTask)
451        env->intermediate_task_alloc_len =
452            args->data_size / env->chunk_size + 1;
453    else
454        env->intermediate_task_alloc_len = env->num_map_threads;
455
456    /* Register callbacks. */
457    env->map = args->map;
458    env->reduce = (args->reduce) ? args->reduce : identity_reduce;
459    env->combiner = args->combiner;
460    env->partition = (args->partition) ? args->partition : default_partition;
461    env->splitter = (args->splitter) ? args->splitter : array_splitter;
462    env->locator = args->locator;
463    env->key_cmp = args->key_cmp;
464
465    /* 2. Initialize structures. */
466
467    env->intermediate_vals = (keyvals_arr_t **)phoenix_mem_malloc (
468        env->intermediate_task_alloc_len * sizeof (keyvals_arr_t*));
469
470    for (i = 0; i < env->intermediate_task_alloc_len; i++)
471    {
472        env->intermediate_vals[i] = (keyvals_arr_t *)phoenix_mem_calloc (
473            env->num_reduce_tasks, sizeof (keyvals_arr_t));
474    }
475
476    if (env->oneOutputQueuePerReduceTask)
477    {
478        env->final_vals =
479            (keyval_arr_t *)phoenix_mem_calloc (
480                env->num_reduce_tasks, sizeof (keyval_arr_t));
481    }
482    else
483    {
484        env->final_vals =
485            (keyval_arr_t *)phoenix_mem_calloc (
486                env->num_reduce_threads, sizeof (keyval_arr_t));
487    }
488
489    for (i = 0; i < TASK_TYPE_TOTAL; i++) {
490        /* TODO: Make this tunable */
491        env->schedPolicies[i] = sched_policy_get(SCHED_POLICY_STRAND_FILL);
492    }
493
494    return env;
495}
496
497void env_print (mr_env_t* env)
498{
499    printf (OUT_PREFIX "num_reduce_tasks = %u\n", env->num_reduce_tasks);
500    printf (OUT_PREFIX "num_procs = %u\n", env->num_procs);
501    printf (OUT_PREFIX "proc_offset = %u\n", env->args->proc_offset);
502    printf (OUT_PREFIX "num_map_threads = %u\n", env->num_map_threads);
503    printf (OUT_PREFIX "num_reduce_threads = %u\n", env->num_reduce_threads);
504    printf (OUT_PREFIX "num_merge_threads = %u\n", env->num_merge_threads);
505}
506
507/**
508 * Execute the same function as worker.
509 * @param th_arg    arguments
510 */
511static void *start_my_work (thread_arg_t* th_arg)
512{
513    switch (th_arg->task_type) {
514    case TASK_TYPE_MAP:
515        return map_worker (th_arg);
516        break;
517    case TASK_TYPE_REDUCE:
518        return reduce_worker (th_arg);
519        break;
520    case TASK_TYPE_MERGE:
521        return merge_worker (th_arg);
522        break;
523    default:
524        assert (0);
525        break;
526    }
527
528    return NULL;
529}
530
531static void start_thread_pool (
532    tpool_t *tpool, TASK_TYPE_T task_type, thread_arg_t** th_arg_array, int num_workers)
533{
534    thread_func     thread_func;
535
536    switch (task_type) {
537    case TASK_TYPE_MAP:
538        thread_func = map_worker;
539        break;
540    case TASK_TYPE_REDUCE:
541        thread_func = reduce_worker;
542        break;
543    case TASK_TYPE_MERGE:
544        thread_func = merge_worker;
545        break;
546    default:
547        assert (0);
548        thread_func = NULL;
549        break;
550    }
551
552    CHECK_ERROR (tpool_set (tpool, thread_func, (void **)th_arg_array, num_workers));
553    CHECK_ERROR (tpool_begin (tpool));
554}
555
556/** start_workers()
557 *  thread_func - function pointer to process splitter data
558 *  splitter_func - splitter function pointer
559 *  splitter_init - splitter_init function pointer
560 *  runs map tasks in a new thread on each the available processors.
561 *  returns pointer intermediate value array
562 */
563static void
564start_workers (mr_env_t* env, thread_arg_t *th_arg)
565{
566    int             thread_index;
567    TASK_TYPE_T     task_type;
568    int             num_threads;
569    int             cpu;
570    intptr_t        ret_val;
571    thread_arg_t    **th_arg_array;
572    void            **rets;
573#ifdef TIMING
574    uint64_t        work_time = 0;
575    uint64_t        user_time = 0;
576    uint64_t        combiner_time = 0;
577#endif
578
579    assert(th_arg != NULL);
580
581    task_type = th_arg->task_type;
582    num_threads = getNumTaskThreads (env, task_type);
583
584    env->tinfo = (thread_info_t *)phoenix_mem_calloc (
585        num_threads, sizeof (thread_info_t));
586    th_arg->env = env;
587
588    th_arg_array = (thread_arg_t **)phoenix_mem_malloc (
589        sizeof (thread_arg_t *) * num_threads);
590    CHECK_ERROR (th_arg_array == NULL);
591
592    for (thread_index = 0; thread_index < num_threads; ++thread_index) {
593
594        cpu = sched_thr_to_cpu (env->schedPolicies[task_type], thread_index + env->args->proc_offset);
595        th_arg->cpu_id = cpu;
596        th_arg->thread_id = thread_index;
597
598        th_arg_array[thread_index] = phoenix_mem_malloc (sizeof (thread_arg_t));
599        CHECK_ERROR (th_arg_array[thread_index] == NULL);
600        mem_memcpy (th_arg_array[thread_index], th_arg, sizeof (thread_arg_t));
601    }
602
603    start_thread_pool (
604        env->tpool, task_type, &th_arg_array[1], num_threads - 1);
605
606    dprintf("Status: All %d threads have been created\n", num_threads);
607
608    ret_val = (intptr_t)start_my_work (th_arg_array[0]);
609#ifdef TIMING
610    thread_timing_t *timing = (thread_timing_t *)ret_val;
611    work_time += timing->work_time;
612    user_time += timing->user_time;
613    combiner_time += timing->combiner_time;
614    phoenix_mem_free (timing);
615#endif
616    phoenix_mem_free (th_arg_array[0]);
617
618    /* Barrier, wait for all threads to finish. */
619    CHECK_ERROR (tpool_wait (env->tpool));
620    rets = tpool_get_results (env->tpool);
621
622    for (thread_index = 1; thread_index < num_threads; ++thread_index)
623    {
624#ifdef TIMING
625        ret_val = (intptr_t)rets[thread_index - 1];
626        thread_timing_t *timing = (thread_timing_t *)ret_val;
627        work_time += timing->work_time;
628        user_time += timing->user_time;
629        combiner_time += timing->combiner_time;
630        phoenix_mem_free (timing);
631#endif
632        phoenix_mem_free (th_arg_array[thread_index]);
633    }
634
635    phoenix_mem_free (th_arg_array);
636    phoenix_mem_free (rets);
637
638#ifdef TIMING
639    switch (task_type)
640    {
641        case TASK_TYPE_MAP:
642            fprintf (stderr, "map work time: %" PRIu64 "\n",
643                                        work_time / num_threads);
644            fprintf (stderr, "map user time: %" PRIu64 "\n",
645                                        user_time / num_threads);
646            fprintf (stderr, "map combiner time: %" PRIu64 "\n",
647                                        combiner_time / num_threads);
648            break;
649
650        case TASK_TYPE_REDUCE:
651            fprintf (stderr, "reduce work time: %" PRIu64 "\n",
652                                        work_time / num_threads);
653            fprintf (stderr, "reduce user time: %" PRIu64 "\n",
654                                        user_time / num_threads);
655            break;
656
657        case TASK_TYPE_MERGE:
658            fprintf (stderr, "merge work time: %" PRIu64 "\n",
659                                        work_time / num_threads);
660
661        default:
662            break;
663    }
664#endif
665
666    phoenix_mem_free(env->tinfo);
667    dprintf("Status: All tasks have completed\n");
668}
669
670typedef struct {
671    uint64_t            run_time;
672    int                 lgrp;
673} map_worker_task_args_t;
674
675/**
676 * Dequeue the latest task and run it
677 * @return true if ran a task, false otherwise
678 */
679static bool map_worker_do_next_task (
680    mr_env_t *env, int thread_index, map_worker_task_args_t *args)
681{
682    struct timeval  begin, end;
683    int             alloc_len;
684    int             curr_task;
685    task_t          map_task;
686    map_args_t      thread_func_arg;
687    bool            oneOutputQueuePerMapTask;
688    int             lgrp = args->lgrp;
689
690    oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask;
691
692    alloc_len = env->intermediate_task_alloc_len;
693
694    /* Get new map task. */
695    if (tq_dequeue (env->taskQueue, &map_task, lgrp, thread_index) == 0) {
696        /* no more map tasks */
697        return false;
698    }
699
700    curr_task = env->num_map_tasks++;
701    env->tinfo[thread_index].curr_task = curr_task;
702
703    thread_func_arg.length = map_task.len;
704    thread_func_arg.data = (void *)map_task.data;
705
706    dprintf("Task %d: cpu_id -> %d - Started\n", curr_task, th_arg->cpu_id);
707
708    /* Perform map task. */
709    get_time (&begin);
710    env->map (&thread_func_arg);
711    get_time (&end);
712
713#ifdef TIMING
714    args->run_time = time_diff (&end, &begin);
715#endif
716
717    dprintf("Task %d: cpu_id -> %d - Done\n", curr_task, th_arg->cpu_id);
718
719    return true;
720}
721
722/**
723 * map_worker()
724 * args - pointer to thread_arg_t
725 * returns 0 on success
726 * This runs thread_func() until there is no more data from the splitter().
727 * The pointer to results are stored in return_values array.
728 */
729static void *
730map_worker (void *args)
731{
732    assert (args != NULL);
733
734    struct timeval          begin, end;
735    struct timeval          work_begin, work_end;
736    uintptr_t               user_time = 0;
737    thread_arg_t            *th_arg = (thread_arg_t *)args;
738    mr_env_t                *env = th_arg->env;
739    int                     thread_index = th_arg->thread_id;
740    int                     num_assigned = 0;
741    map_worker_task_args_t  mwta;
742#ifdef TIMING
743    uintptr_t               work_time = 0;
744    uintptr_t               combiner_time = 0;
745#endif
746
747    env->tinfo[thread_index].tid = thread_self();
748
749    /* Bind thread. */
750    CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0);
751
752    struct tls *tls = thread_get_tls();
753    if(tls == NULL) {
754        tls = calloc(1, sizeof(struct tls));
755        thread_set_tls(tls);
756    }
757
758    tls->env = env;
759#ifdef TIMING
760    CHECK_ERROR (pthread_setspecific (emit_time_key, 0));
761#endif
762
763    mwta.lgrp = loc_get_lgrp();
764
765    get_time (&work_begin);
766    while (map_worker_do_next_task (env, thread_index, &mwta)) {
767        user_time += mwta.run_time;
768        num_assigned++;
769    }
770    get_time (&work_end);
771
772#ifdef TIMING
773    work_time = time_diff (&work_end, &work_begin);
774#endif
775
776    get_time (&begin);
777
778    /* Apply combiner to local map results. */
779#ifndef INCREMENTAL_COMBINER
780    if (env->combiner != NULL)
781        run_combiner (env, thread_index);
782#endif
783
784    get_time (&end);
785
786#ifdef TIMING
787    combiner_time = time_diff (&end, &begin);
788#endif
789
790    dprintf("Status: Total of %d tasks were assigned to cpu_id %d\n",
791        num_assigned, th_arg->cpu_id);
792
793    /* Unbind thread. */
794    CHECK_ERROR (proc_unbind_thread () != 0);
795
796#ifdef TIMING
797    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
798    uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
799    timing->user_time = user_time - emit_time;
800    timing->work_time = work_time - timing->user_time;
801    timing->combiner_time = combiner_time;
802    return (void *)timing;
803#else
804    return (void *)0;
805#endif
806}
807
808typedef struct {
809    struct iterator_t   itr;
810    uint64_t            run_time;
811    int                 num_map_threads;
812    int                 lgrp;
813} reduce_worker_task_args_t;
814
815/**
816 * Dequeue next reduce task and do it
817 * TODO Refactor this even more. It is still gross.
818 * @return true if did work, false otherwise
819 */
820static bool reduce_worker_do_next_task (
821    mr_env_t *env, int thread_index, reduce_worker_task_args_t *args)
822{
823    struct timeval  begin, end;
824    intptr_t        curr_reduce_task = 0;
825    keyvals_t       *min_key_val, *next_min;
826    task_t          reduce_task;
827    int             num_map_threads;
828    int             curr_thread;
829    int             lgrp = args->lgrp;
830
831    /* Get the next reduce task. */
832    if (tq_dequeue (env->taskQueue, &reduce_task, lgrp, thread_index) == 0) {
833        /* No more reduce tasks. */
834        return false;
835    }
836
837    curr_reduce_task = (intptr_t)reduce_task.id;
838
839    env->tinfo[thread_index].curr_task = curr_reduce_task;
840
841    num_map_threads =  args->num_map_threads;
842
843    args->run_time = 0;
844    min_key_val = NULL;
845    next_min = NULL;
846
847    do {
848        for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) {
849            keyvals_t       *curr_key_val;
850            keyvals_arr_t   *thread_array;
851
852            /* Find the next array to search. */
853            thread_array =
854                &env->intermediate_vals[curr_thread][curr_reduce_task];
855
856            /* Check if the current processor array has been
857               completely searched. */
858            if (thread_array->pos >= thread_array->len) {
859                continue;
860            }
861
862            /* Get the next key in the processor array. */
863            curr_key_val = &thread_array->arr[thread_array->pos];
864
865            /* If the key matches the minimum value,
866               add the value to the list of values for that key. */
867            if (min_key_val != NULL &&
868                !env->key_cmp(curr_key_val->key, min_key_val->key)) {
869                CHECK_ERROR (iter_add (&args->itr, curr_key_val));
870                thread_array->pos += 1;
871                --curr_thread;
872            }
873            /* Find the location of the next min. */
874            else if (next_min == NULL ||
875                env->key_cmp(curr_key_val->key, next_min->key) < 0)
876            {
877                next_min = curr_key_val;
878            }
879        }
880
881        if (min_key_val != NULL) {
882            keyvals_t       *curr_key_val;
883
884            if (env->reduce != identity_reduce) {
885                get_time (&begin);
886                env->reduce (min_key_val->key, &args->itr);
887                get_time (&end);
888#ifdef TIMING
889                args->run_time += time_diff (&end, &begin);
890#endif
891            } else {
892                env->reduce (min_key_val->key, &args->itr);
893            }
894
895            /* Free up memory */
896            iter_rewind (&args->itr);
897            while (iter_next_list (&args->itr, &curr_key_val)) {
898                val_t   *vals, *next;
899
900                vals = curr_key_val->vals;
901                while (vals != NULL) {
902                    next = vals->next_val;
903                    phoenix_mem_free (vals);
904                    vals = next;
905                }
906            }
907
908            iter_reset(&args->itr);
909        }
910
911        min_key_val = next_min;
912        next_min = NULL;
913
914        /* See if there are any elements left. */
915        for(curr_thread = 0;
916            curr_thread < num_map_threads &&
917            env->intermediate_vals[curr_thread][curr_reduce_task].pos >=
918            env->intermediate_vals[curr_thread][curr_reduce_task].len;
919            curr_thread++);
920    } while (curr_thread != num_map_threads);
921
922    /* Free up the memory. */
923    for (curr_thread = 0; curr_thread < num_map_threads; curr_thread++) {
924        keyvals_arr_t   *arr;
925
926        arr = &env->intermediate_vals[curr_thread][curr_reduce_task];
927        if (arr->alloc_len != 0)
928            phoenix_mem_free(arr->arr);
929    }
930
931    return true;
932}
933
934static void *
935reduce_worker (void *args)
936{
937    assert(args != NULL);
938
939    struct timeval              work_begin, work_end;
940    uintptr_t                   user_time = 0;
941    thread_arg_t                *th_arg = (thread_arg_t *)args;
942    int                         thread_index = th_arg->thread_id;
943    mr_env_t                    *env = th_arg->env;
944    reduce_worker_task_args_t   rwta;
945    int                         num_map_threads;
946#ifdef TIMING
947    uintptr_t                   work_time = 0;
948#endif
949
950    env->tinfo[thread_index].tid = thread_self();
951
952    /* Bind thread. */
953    CHECK_ERROR (proc_bind_thread (th_arg->cpu_id) != 0);
954
955    struct tls *tls = thread_get_tls();
956    if(tls == NULL) {
957        tls = calloc(1, sizeof(struct tls));
958        thread_set_tls(tls);
959    }
960    tls->env = env;
961#ifdef TIMING
962    CHECK_ERROR (pthread_setspecific (emit_time_key, 0));
963#endif
964
965    if (env->oneOutputQueuePerMapTask)
966        num_map_threads = env->num_map_tasks;
967    else
968        num_map_threads = env->num_map_threads;
969
970    /* Assuming !oneOutputQueuePerMapTask */
971    CHECK_ERROR (iter_init (&rwta.itr, env->num_map_threads));
972    rwta.num_map_threads = num_map_threads;
973    rwta.lgrp = loc_get_lgrp();
974
975    get_time (&work_begin);
976
977    while (reduce_worker_do_next_task (env, thread_index, &rwta)) {
978        user_time += rwta.run_time;
979    }
980
981    get_time (&work_end);
982
983#ifdef TIMING
984    work_time = time_diff (&work_end, &work_begin);
985#endif
986
987    iter_finalize (&rwta.itr);
988
989    /* Unbind thread. */
990    CHECK_ERROR (proc_unbind_thread () != 0);
991
992#ifdef TIMING
993    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
994    uintptr_t emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
995    timing->user_time = user_time - emit_time;
996    timing->work_time = work_time - timing->user_time;
997    return (void *)timing;
998#else
999    return (void *)0;
1000#endif
1001}
1002
1003/** merge_worker()
1004* args - pointer to thread_arg_t
1005* returns 0 on success
1006*/
1007static void *
1008merge_worker (void *args)
1009{
1010    assert(args != NULL);
1011
1012    struct timeval  work_begin, work_end;
1013    thread_arg_t    *th_arg = (thread_arg_t *)args;
1014    int             thread_index = th_arg->thread_id;
1015    mr_env_t        *env = th_arg->env;
1016    int             cpu;
1017#ifdef TIMING
1018    uintptr_t       work_time = 0;
1019#endif
1020
1021    env->tinfo[thread_index].tid = thread_self();
1022
1023    /* Bind thread.
1024       Spread out the merge workers as much as possible. */
1025    if (env->oneOutputQueuePerReduceTask)
1026        cpu = th_arg->cpu_id * (1 << (th_arg->merge_round - 1));
1027    else
1028        cpu = th_arg->cpu_id * (1 << th_arg->merge_round);
1029
1030    CHECK_ERROR (proc_bind_thread (cpu) != 0);
1031
1032    struct tls *tls = thread_get_tls();
1033    if(tls == NULL) {
1034        tls = calloc(1, sizeof(struct tls));
1035        thread_set_tls(tls);
1036    }
1037    tls->env = env;
1038
1039    /* Assumes num_merge_threads is modified before each call. */
1040    int length = th_arg->merge_len / env->num_merge_threads;
1041    int modlen = th_arg->merge_len % env->num_merge_threads;
1042
1043    /* Let's make some progress here. */
1044    if (length <= 1) {
1045        length = 2;
1046        modlen = th_arg->merge_len % 2;
1047    }
1048
1049    int pos = thread_index * length +
1050                ((thread_index < modlen) ? thread_index : modlen);
1051
1052    if (pos < th_arg->merge_len) {
1053
1054        keyval_arr_t *vals = &th_arg->merge_input[pos];
1055
1056        dprintf("Thread %d: cpu_id -> %d - Started\n",
1057                    thread_index, th_arg->cpu_id);
1058
1059        get_time (&work_begin);
1060        merge_results (th_arg->env, vals, length + (thread_index < modlen));
1061        get_time (&work_end);
1062
1063#ifdef TIMING
1064        work_time = time_diff (&work_end, &work_begin);
1065#endif
1066
1067        dprintf("Thread %d: cpu_id -> %d - Done\n",
1068                    thread_index, th_arg->cpu_id);
1069    }
1070
1071    /* Unbind thread. */
1072    CHECK_ERROR (proc_unbind_thread () != 0);
1073
1074#ifdef TIMING
1075    thread_timing_t *timing = calloc (1, sizeof (thread_timing_t));
1076    timing->work_time = work_time;
1077    return (void *)timing;
1078#else
1079    return (void *)0;
1080#endif
1081}
1082
1083/**
1084 * Split phase of map task generation, creates all tasks and throws in single
1085 * queue.
1086 *
1087 * @param q     queue to place tasks into
1088 * @return number of tasks generated, or 0 on error
1089 */
1090static int gen_map_tasks_split (mr_env_t* env, queue_t* q)
1091{
1092    int                 cur_task_id;
1093    map_args_t          args;
1094    task_queued         *task = NULL;
1095
1096    /* split until complete */
1097    cur_task_id = 0;
1098    while (env->splitter (env->args->task_data, env->chunk_size, &args))
1099    {
1100        task = (task_queued *)phoenix_mem_malloc (sizeof (task_queued));
1101        task->task.id = cur_task_id;
1102        task->task.len = (uint64_t)args.length;
1103        task->task.data = (uint64_t)args.data;
1104
1105        queue_push_back (q, &task->queue_elem);
1106
1107        ++cur_task_id;
1108    }
1109
1110    if (task == NULL) {
1111        /* not enough memory, undo what's been done, error out */
1112        queue_elem_t    *queue_elem;
1113
1114        while (queue_pop_front (q, &queue_elem))
1115        {
1116            task = queue_entry (queue_elem, task_queued, queue_elem);
1117            assert (task != NULL);
1118            phoenix_mem_free (task);
1119        }
1120
1121        return 0;
1122    }
1123
1124    return cur_task_id;
1125}
1126
1127/**
1128 * User provided own splitter function but did not supply a locator function.
1129 * Nothing to do here about locality, so just try to put consecutive tasks
1130 * in the same task queue.
1131 */
1132static int gen_map_tasks_distribute_lgrp (
1133    mr_env_t* env, int num_map_tasks, queue_t* q)
1134{
1135    queue_elem_t    *queue_elem;
1136    int             tasks_per_lgrp;
1137    int             tasks_leftover;
1138    int             num_lgrps;
1139    int             lgrp;
1140
1141    num_lgrps = env->num_map_threads / loc_get_lgrp_size();
1142    if (num_lgrps == 0) num_lgrps = 1;
1143
1144    tasks_per_lgrp = num_map_tasks / num_lgrps;
1145    tasks_leftover = num_map_tasks - tasks_per_lgrp * num_lgrps;
1146
1147    /* distribute tasks across locality groups */
1148    for (lgrp = 0; lgrp < num_lgrps; ++lgrp)
1149    {
1150        int remaining_cur_lgrp_tasks;
1151
1152        remaining_cur_lgrp_tasks = tasks_per_lgrp;
1153        if (tasks_leftover > 0) {
1154            remaining_cur_lgrp_tasks++;
1155            tasks_leftover--;
1156        }
1157        do {
1158            task_queued *task;
1159
1160            if (queue_pop_front (q, &queue_elem) == 0) {
1161                /* queue is empty, everything is distributed */
1162                break;
1163            }
1164
1165            task = queue_entry (queue_elem, task_queued, queue_elem);
1166            assert (task != NULL);
1167
1168            if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) < 0) {
1169                phoenix_mem_free (task);
1170                return -1;
1171            }
1172
1173            phoenix_mem_free (task);
1174            remaining_cur_lgrp_tasks--;
1175        } while (remaining_cur_lgrp_tasks);
1176
1177        if (remaining_cur_lgrp_tasks != 0) {
1178            break;
1179        }
1180    }
1181
1182    return 0;
1183}
1184
1185/**
1186 * We can try to queue tasks based on locality info
1187 */
1188static int gen_map_tasks_distribute_locator (
1189    mr_env_t* env, int num_map_tasks, queue_t* q)
1190{
1191    queue_elem_t    *queue_elem;
1192
1193    while (queue_pop_front (q, &queue_elem))
1194    {
1195        task_queued *task;
1196        int         lgrp;
1197        map_args_t  args;
1198
1199        task = queue_entry (queue_elem, task_queued, queue_elem);
1200        assert (task != NULL);
1201
1202        args.length = task->task.len;
1203        args.data = (void*)task->task.data;
1204
1205        if (env->locator != NULL) {
1206            void    *addr;
1207            addr = env->locator (&args);
1208            lgrp = loc_mem_to_lgrp (addr);
1209        } else {
1210            lgrp = loc_mem_to_lgrp (args.data);
1211        }
1212
1213        task->task.v[3] = lgrp;         /* For debugging. */
1214        if (tq_enqueue_seq (env->taskQueue, &task->task, lgrp) != 0) {
1215            phoenix_mem_free (task);
1216            return -1;
1217        }
1218
1219        phoenix_mem_free (task);
1220    }
1221
1222    return 0;
1223}
1224
1225/**
1226 * Distributes tasks to threads
1227 * @param num_map_tasks number of map tasks in queue
1228 * @param q queue of tasks to distribute
1229 * @return 0 on success, less than 0 on failure
1230 */
1231static int gen_map_tasks_distribute (
1232    mr_env_t* env, int num_map_tasks, queue_t* q)
1233{
1234    if ((env->splitter != array_splitter) &&
1235        (env->locator == NULL)) {
1236        return gen_map_tasks_distribute_lgrp (env, num_map_tasks, q);
1237    } else {
1238        return gen_map_tasks_distribute_locator (env, num_map_tasks, q);
1239    }
1240
1241    return 0;
1242}
1243
1244/**
1245 * Generate all map tasks and queue them up
1246 * @return number of map tasks created if successful, negative value on error
1247 */
1248static int gen_map_tasks (mr_env_t* env)
1249{
1250    int             ret;
1251    int             num_map_tasks;
1252    queue_t         temp_queue;
1253    int             num_map_threads;
1254
1255    queue_init (&temp_queue);
1256
1257    num_map_tasks = gen_map_tasks_split (env, &temp_queue);
1258    if (num_map_tasks <= 0) {
1259        return -1;
1260    }
1261
1262    num_map_threads = env->num_map_threads;
1263    if (num_map_tasks < num_map_threads)
1264        num_map_threads = num_map_tasks;
1265    tq_reset (env->taskQueue, num_map_threads);
1266
1267    ret = gen_map_tasks_distribute (env, num_map_tasks, &temp_queue);
1268    if (ret == 0) ret = num_map_tasks;
1269
1270    return num_map_tasks;
1271}
1272
1273static int gen_reduce_tasks (mr_env_t* env)
1274{
1275    int ret, tid;
1276    int tasks_per_thread;
1277    int tasks_leftover;
1278    uint64_t task_id;
1279    task_t reduce_task;
1280
1281    tq_reset (env->taskQueue, env->num_reduce_threads);
1282
1283    tasks_per_thread = env->num_reduce_tasks / env->num_reduce_threads;
1284    tasks_leftover = env->num_reduce_tasks -
1285        tasks_per_thread * env->num_map_threads;
1286
1287    task_id = 0;
1288    for (tid = 0; tid < env->num_reduce_threads; ++tid) {
1289        int remaining_cur_thread_tasks;
1290
1291        remaining_cur_thread_tasks = tasks_per_thread;
1292        if (tasks_leftover > 0) {
1293            remaining_cur_thread_tasks += 1;
1294            --tasks_leftover;
1295        }
1296        do {
1297            if (task_id == env->num_reduce_tasks) {
1298                return 0;
1299            }
1300
1301            /* New task. */
1302            reduce_task.id = task_id;
1303
1304            /* TODO: Implement locality optimization. */
1305            ret = tq_enqueue_seq (env->taskQueue, &reduce_task,  -1);
1306            if (ret < 0) {
1307                return -1;
1308            }
1309            ++task_id;
1310            --remaining_cur_thread_tasks;
1311        } while (remaining_cur_thread_tasks);
1312    }
1313
1314    return 0;
1315}
1316
1317#ifndef INCREMENTAL_COMBINER
1318static void run_combiner (mr_env_t* env, int thread_index)
1319{
1320    assert (! env->oneOutputQueuePerMapTask);
1321
1322    int i, j;
1323    keyvals_arr_t *my_output;
1324    keyvals_t *reduce_pos;
1325    void *reduced_val;
1326    iterator_t itr;
1327    val_t *val, *next;
1328
1329    CHECK_ERROR (iter_init (&itr, 1));
1330
1331    for (i = 0; i < env->num_reduce_tasks; ++i)
1332    {
1333        my_output = &env->intermediate_vals[thread_index][i];
1334        for (j = 0; j < my_output->len; ++j)
1335        {
1336            reduce_pos = &(my_output->arr[j]);
1337            if (reduce_pos->len == 0) continue;
1338
1339            CHECK_ERROR (iter_add (&itr, reduce_pos));
1340
1341            reduced_val = env->combiner (&itr);
1342
1343            /* Shed off trailing chunks. */
1344            assert (reduce_pos->vals);
1345            val = reduce_pos->vals->next_val;
1346            while (val)
1347            {
1348                next = val->next_val;
1349                phoenix_mem_free (val);
1350                val = next;
1351            }
1352
1353            /* Update the entry. */
1354            val = reduce_pos->vals;
1355            val->next_insert_pos = 0;
1356            val->next_val = NULL;
1357            val->array[val->next_insert_pos++] = reduced_val;
1358            reduce_pos->len = 1;
1359
1360            iter_reset (&itr);
1361        }
1362    }
1363
1364    iter_finalize (&itr);
1365}
1366#endif
1367
1368/** emit_intermediate()
1369 *  inserts the key, val pair into the intermediate array
1370 */
1371void
1372emit_intermediate (void *key, void *val, int key_size)
1373{
1374    struct timeval  begin, end;
1375    int curr_thread = -1;
1376    int             curr_task;
1377    bool            oneOutputQueuePerMapTask;
1378    keyvals_arr_t   *arr;
1379    mr_env_t        *env;
1380
1381    get_time (&begin);
1382
1383    env = get_env();
1384    if (curr_thread < 0)
1385        curr_thread = getCurrThreadIndex (TASK_TYPE_MAP);
1386
1387    oneOutputQueuePerMapTask = env->oneOutputQueuePerMapTask;
1388
1389    if (oneOutputQueuePerMapTask)
1390        curr_task = env->tinfo[curr_thread].curr_task;
1391    else
1392        curr_task = curr_thread;
1393
1394    int reduce_pos = env->partition (env->num_reduce_tasks, key, key_size);
1395    reduce_pos %= env->num_reduce_tasks;
1396
1397    /* Insert sorted in global queue at pos curr_proc */
1398    arr = &env->intermediate_vals[curr_task][reduce_pos];
1399
1400    insert_keyval_merged (env, arr, key, val);
1401
1402    get_time (&end);
1403
1404#ifdef TIMING
1405    uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
1406    uintptr_t emit_time = time_diff (&end, &begin);
1407    total_emit_time += emit_time;
1408    CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time));
1409#endif
1410}
1411
1412/** emit_inline ()
1413 *  inserts the key, val pair into the final output array
1414 */
1415static inline void
1416emit_inline (mr_env_t* env, void *key, void *val)
1417{
1418    keyval_arr_t    *arr;
1419    int             curr_red_queue;
1420    int thread_index = -1;
1421
1422    if (thread_index < 0)
1423        thread_index = getCurrThreadIndex (TASK_TYPE_REDUCE);
1424
1425    if (env->oneOutputQueuePerReduceTask) {
1426        curr_red_queue = env->tinfo[thread_index].curr_task;
1427    }
1428    else {
1429        curr_red_queue = thread_index;
1430    }
1431
1432    /* Insert sorted in global queue at pos curr_proc */
1433    arr = &env->final_vals[curr_red_queue];
1434    insert_keyval (env, arr, key, val);
1435}
1436
1437/** emit ()
1438 */
1439void
1440emit (void *key, void *val)
1441{
1442    struct timeval begin, end;
1443
1444    get_time (&begin);
1445
1446    emit_inline (get_env(), key, val);
1447
1448    get_time (&end);
1449
1450#ifdef TIMING
1451    uintptr_t total_emit_time = (uintptr_t)pthread_getspecific (emit_time_key);
1452    uintptr_t emit_time = time_diff (&end, &begin);
1453    total_emit_time += emit_time;
1454    CHECK_ERROR (pthread_setspecific (emit_time_key, (void *)total_emit_time));
1455#endif
1456}
1457
1458static inline void
1459insert_keyval_merged (mr_env_t* env, keyvals_arr_t *arr, void *key, void *val)
1460{
1461    int high = arr->len, low = -1, next;
1462    int cmp = 1;
1463    keyvals_t *insert_pos;
1464    val_t *new_vals;
1465
1466    assert(arr->len <= arr->alloc_len);
1467    if (arr->len > 0)
1468        cmp = env->key_cmp(arr->arr[arr->len - 1].key, key);
1469
1470    if (cmp > 0)
1471    {
1472        /* Binary search the array to find the key. */
1473        while (high - low > 1)
1474        {
1475            next = (high + low) / 2;
1476            if (env->key_cmp(arr->arr[next].key, key) > 0)
1477                high = next;
1478            else
1479                low = next;
1480        }
1481
1482        if (low < 0) low = 0;
1483        if (arr->len > 0 &&
1484                (cmp = env->key_cmp(arr->arr[low].key, key)) < 0)
1485            low++;
1486    }
1487    else if (cmp < 0)
1488        low = arr->len;
1489    else
1490        low = arr->len-1;
1491
1492    if (arr->len == 0 || cmp)
1493    {
1494        /* if array is full, double and copy over. */
1495        if (arr->len == arr->alloc_len)
1496        {
1497            if (arr->alloc_len == 0)
1498            {
1499                arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN;
1500                arr->arr = (keyvals_t *)
1501                    phoenix_mem_malloc (arr->alloc_len * sizeof (keyvals_t));
1502            }
1503            else
1504            {
1505                arr->alloc_len *= 2;
1506                arr->arr = (keyvals_t *)
1507                    phoenix_mem_realloc (arr->arr, arr->alloc_len * sizeof (keyvals_t));
1508            }
1509        }
1510
1511        /* Insert into array. */
1512        memmove (&arr->arr[low+1], &arr->arr[low],
1513                        (arr->len - low) * sizeof(keyvals_t));
1514
1515        arr->arr[low].key = key;
1516        arr->arr[low].len = 0;
1517        arr->arr[low].vals = NULL;
1518        arr->len++;
1519    }
1520
1521    insert_pos = &(arr->arr[low]);
1522
1523    if (insert_pos->vals == NULL)
1524    {
1525        /* Allocate a chunk for the first time. */
1526        new_vals = phoenix_mem_malloc
1527            (sizeof (val_t) + DEFAULT_VALS_ARR_LEN * sizeof (void *));
1528        assert (new_vals);
1529
1530        new_vals->size = DEFAULT_VALS_ARR_LEN;
1531        new_vals->next_insert_pos = 0;
1532        new_vals->next_val = NULL;
1533
1534        insert_pos->vals = new_vals;
1535    }
1536    else if (insert_pos->vals->next_insert_pos >= insert_pos->vals->size)
1537    {
1538#ifdef INCREMENTAL_COMBINER
1539        if (env->combiner != NULL) {
1540            iterator_t itr;
1541            void *reduced_val;
1542
1543            CHECK_ERROR (iter_init (&itr, 1));
1544            CHECK_ERROR (iter_add (&itr, insert_pos));
1545
1546            reduced_val = env->combiner (&itr);
1547
1548            insert_pos->vals->array[0] = reduced_val;
1549            insert_pos->vals->next_insert_pos = 1;
1550            insert_pos->len = 1;
1551
1552            iter_finalize (&itr);
1553        } else {
1554#endif
1555            /* Need a new chunk. */
1556            int alloc_size;
1557
1558            alloc_size = insert_pos->vals->size * 2;
1559            new_vals = phoenix_mem_malloc (sizeof (val_t) + alloc_size * sizeof (void *));
1560            assert (new_vals);
1561
1562            new_vals->size = alloc_size;
1563            new_vals->next_insert_pos = 0;
1564            new_vals->next_val = insert_pos->vals;
1565
1566            insert_pos->vals = new_vals;
1567#ifdef INCREMENTAL_COMBINER
1568        }
1569#endif
1570    }
1571
1572    insert_pos->vals->array[insert_pos->vals->next_insert_pos++] = val;
1573
1574    insert_pos->len += 1;
1575}
1576
1577static inline void
1578insert_keyval (mr_env_t* env, keyval_arr_t *arr, void *key, void *val)
1579{
1580    int high = arr->len, low = -1, next;
1581    int cmp = 1;
1582
1583    assert(arr->len <= arr->alloc_len);
1584
1585    /* If array is full, double and copy over. */
1586    if (arr->len == arr->alloc_len)
1587    {
1588        if (arr->alloc_len == 0)
1589        {
1590            arr->alloc_len = DEFAULT_KEYVAL_ARR_LEN;
1591            arr->arr = (keyval_t*)phoenix_mem_malloc(arr->alloc_len * sizeof(keyval_t));
1592        }
1593        else
1594        {
1595            arr->alloc_len *= 2;
1596            arr->arr = (keyval_t*)phoenix_mem_realloc(arr->arr, arr->alloc_len * sizeof(keyval_t));
1597        }
1598    }
1599
1600    if (env->oneOutputQueuePerReduceTask == false)
1601    {
1602        /* Need to sort. */
1603        if (arr->len > 0)
1604            cmp = env->key_cmp(arr->arr[arr->len - 1].key, key);
1605
1606        if (cmp > 0)
1607        {
1608            /* Binary search the array to find the key. */
1609            while (high - low > 1)
1610            {
1611                next = (high + low) / 2;
1612                if (env->key_cmp(arr->arr[next].key, key) > 0)
1613                    high = next;
1614                else
1615                    low = next;
1616            }
1617
1618            if (low < 0) low = 0;
1619            if (arr->len > 0 && env->key_cmp(arr->arr[low].key, key) < 0)
1620                low++;
1621        }
1622        else
1623            low = arr->len;
1624
1625
1626        /* Insert into array. */
1627        memmove (&arr->arr[low+1], &arr->arr[low],
1628            (arr->len - low) * sizeof(keyval_t));
1629
1630        arr->arr[low].key = key;
1631        arr->arr[low].val = val;
1632    }
1633    else
1634    {
1635        /* No need to sort. Just append. */
1636        arr->arr[arr->len].key = key;
1637        arr->arr[arr->len].val = val;
1638    }
1639
1640    arr->len++;
1641}
1642
1643static inline void
1644merge_results (mr_env_t* env, keyval_arr_t *vals, int length)
1645{
1646    int data_idx;
1647    int total_num_keys = 0;
1648    int i;
1649    int curr_thread = -1;
1650
1651    if (curr_thread < 0)
1652        curr_thread = getCurrThreadIndex (TASK_TYPE_MERGE);
1653
1654    for (i = 0; i < length; i++) {
1655        total_num_keys += vals[i].len;
1656    }
1657
1658    env->merge_vals[curr_thread].len = total_num_keys;
1659    env->merge_vals[curr_thread].alloc_len = total_num_keys;
1660    env->merge_vals[curr_thread].pos = 0;
1661    env->merge_vals[curr_thread].arr = (keyval_t *)
1662        phoenix_mem_malloc(sizeof(keyval_t) * total_num_keys);
1663
1664    for (data_idx = 0; data_idx < total_num_keys; data_idx++) {
1665        /* For each keyval_t. */
1666        int         min_idx;
1667        keyval_t    *min_keyval;
1668
1669        for (i = 0; i < length && vals[i].pos >= vals[i].len; i++);
1670
1671        if (i == length) break;
1672
1673        /* Determine the minimum key. */
1674        min_idx = i;
1675        min_keyval = &vals[i].arr[vals[i].pos];
1676
1677        for (i++; i < length; i++) {
1678            if (vals[i].pos < vals[i].len)
1679            {
1680                int cmp_ret;
1681                cmp_ret = env->key_cmp(
1682                    min_keyval->key,
1683                    vals[i].arr[vals[i].pos].key);
1684
1685                if (cmp_ret > 0) {
1686                    min_idx = i;
1687                    min_keyval = &vals[i].arr[vals[i].pos];
1688                }
1689            }
1690        }
1691
1692        mem_memcpy (&env->merge_vals[curr_thread].arr[data_idx],
1693                        min_keyval, sizeof(keyval_t));
1694        vals[min_idx].pos += 1;
1695    }
1696
1697    for (i = 0; i < length; i++) {
1698        phoenix_mem_free(vals[i].arr);
1699    }
1700}
1701
1702static inline int
1703getNumTaskThreads (mr_env_t* env, TASK_TYPE_T task_type)
1704{
1705    int num_threads;
1706
1707    switch (task_type)
1708    {
1709        case TASK_TYPE_MAP:
1710            num_threads = env->num_map_threads;
1711            break;
1712
1713        case TASK_TYPE_REDUCE:
1714            num_threads = env->num_reduce_threads;
1715            break;
1716
1717        case TASK_TYPE_MERGE:
1718            num_threads = env->num_merge_threads;
1719            break;
1720
1721        default:
1722            assert (0);
1723            num_threads = env->num_map_threads;
1724            break;
1725    }
1726
1727    return num_threads;
1728}
1729
1730/** getCurrThreadIndex()
1731 *  Returns the processor the current thread is running on
1732 */
1733static inline int
1734getCurrThreadIndex (TASK_TYPE_T task_type)
1735{
1736    int         i;
1737    int         num_threads;
1738    mr_env_t    *env;
1739    struct thread   *mytid;
1740
1741    env = get_env();
1742    num_threads = getNumTaskThreads (env, task_type);
1743    mytid = thread_self();
1744    for (i = 0; i < num_threads && env->tinfo[i].tid != mytid; i++);
1745
1746    assert(i != num_threads);
1747
1748    return i;
1749}
1750
1751/** array_splitter()
1752 *
1753 */
1754int
1755array_splitter (void *data_in, int req_units, map_args_t *out)
1756{
1757    assert(out != NULL);
1758
1759    mr_env_t    *env;
1760    int         unit_size;
1761    int         data_units;
1762
1763    env = get_env();
1764    unit_size = env->args->unit_size;
1765    data_units = env->args->data_size / unit_size;
1766
1767    /* End of data reached, return FALSE. */
1768    if (env->splitter_pos >= data_units)
1769        return 0;
1770
1771    /* Set the start of the next data. */
1772    out->data = ((void *)env->args->task_data) + env->splitter_pos*unit_size;
1773
1774    /* Determine the nominal length. */
1775    if (env->splitter_pos + req_units > data_units)
1776        out->length = data_units - env->splitter_pos;
1777    else
1778        out->length = req_units;
1779
1780    env->splitter_pos += out->length;
1781
1782    /* Return true since the out data is valid. */
1783    return 1;
1784}
1785
1786void
1787identity_reduce (void *key, iterator_t *itr)
1788{
1789    void        *val;
1790    mr_env_t    *env;
1791
1792    env = get_env();
1793    while (iter_next (itr, &val))
1794    {
1795        emit_inline (env, key, val);
1796    }
1797}
1798
1799int
1800default_partition (int num_reduce_tasks, void* key, int key_size)
1801{
1802    unsigned long hash = 5381;
1803    char *str = (char *)key;
1804    int i;
1805
1806    for (i = 0; i < key_size; i++)
1807    {
1808        hash = ((hash << 5) + hash) + ((int)str[i]); /* hash * 33 + c */
1809    }
1810
1811    return hash % num_reduce_tasks;
1812}
1813
1814/**
1815 * Run map tasks and get intermediate values
1816 */
1817static void map (mr_env_t* env)
1818{
1819    thread_arg_t   th_arg;
1820    int            num_map_tasks;
1821
1822    num_map_tasks = gen_map_tasks (env);
1823    assert (num_map_tasks >= 0);
1824
1825    env->num_map_tasks = num_map_tasks;
1826    if (num_map_tasks < env->num_map_threads)
1827        env->num_map_threads = num_map_tasks;
1828
1829    //printf (OUT_PREFIX "num_map_tasks = %d\n", env->num_map_tasks);
1830
1831    mem_memset (&th_arg, 0, sizeof(thread_arg_t));
1832    th_arg.task_type = TASK_TYPE_MAP;
1833
1834    start_workers (env, &th_arg);
1835}
1836
1837/**
1838 * Run reduce tasks and get final values.
1839 */
1840static void reduce (mr_env_t* env)
1841{
1842    int            i;
1843    thread_arg_t   th_arg;
1844
1845    CHECK_ERROR (gen_reduce_tasks (env));
1846
1847    mem_memset (&th_arg, 0, sizeof(thread_arg_t));
1848    th_arg.task_type = TASK_TYPE_REDUCE;
1849
1850    start_workers (env, &th_arg);
1851
1852    /* Cleanup intermediate results. */
1853    for (i = 0; i < env->intermediate_task_alloc_len; ++i)
1854    {
1855        phoenix_mem_free (env->intermediate_vals[i]);
1856    }
1857    phoenix_mem_free (env->intermediate_vals);
1858}
1859
1860/**
1861 * Merge all reduced data
1862 */
1863static void merge (mr_env_t* env)
1864{
1865    thread_arg_t   th_arg;
1866
1867    mem_memset (&th_arg, 0, sizeof (thread_arg_t));
1868    th_arg.task_type = TASK_TYPE_MERGE;
1869
1870    if (env->oneOutputQueuePerReduceTask) {
1871        th_arg.merge_len = env->num_reduce_tasks;
1872    } else {
1873        th_arg.merge_len = env->num_reduce_threads;
1874    }
1875    th_arg.merge_input = env->final_vals;
1876
1877    if (th_arg.merge_len <= 1) {
1878        /* Already merged, nothing to do here */
1879        env->args->result->data = env->final_vals->arr;
1880        env->args->result->length = env->final_vals->len;
1881
1882        phoenix_mem_free(env->final_vals);
1883
1884        return;
1885    }
1886
1887    /* have work to merge! */
1888    while (th_arg.merge_len > 1) {
1889        th_arg.merge_round += 1;
1890
1891        /* This is the worst case length,
1892           depending on the value of num_merge_threads. */
1893        env->merge_vals = (keyval_arr_t*)
1894            phoenix_mem_malloc (env->num_merge_threads * sizeof(keyval_arr_t));
1895
1896        /* Run merge tasks and get merge values. */
1897        start_workers (env, &th_arg);
1898
1899        phoenix_mem_free (th_arg.merge_input);
1900        th_arg.merge_len = env->num_merge_threads;
1901
1902        env->num_merge_threads /= 2;
1903        if (env->num_merge_threads == 0)
1904            env->num_merge_threads = 1;
1905
1906        th_arg.merge_input = env->merge_vals;
1907    }
1908
1909    env->args->result->data = env->merge_vals[0].arr;
1910    env->args->result->length = env->merge_vals[0].len;
1911
1912    phoenix_mem_free(env->merge_vals);
1913}
1914
1915static inline mr_env_t* get_env (void)
1916{
1917    struct tls *tls = thread_get_tls();
1918    if(tls == NULL) {
1919        tls = calloc(1, sizeof(struct tls));
1920        thread_set_tls(tls);
1921    }
1922    return tls->env;
1923}
1924