1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the
12*       names of its contributors may be used to endorse or promote products
13*       derived from this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/
26
27#ifndef MAP_REDUCE_H_
28#define MAP_REDUCE_H_
29
30#ifdef BARRELFISH
31#       include <barrelfish/barrelfish.h>
32#endif
33
34#include <sys/types.h>
35#include <stdbool.h>
36
37/* Standard data types for the function arguments and results */
38
39/* Argument to map function. This is specified by the splitter function.
40 * length - number of elements of data. The default splitter function gives
41            length in terms of the # of elements of unit_size bytes.
42 * data - data to process of a user defined type
43 */
44typedef struct
45{
46   intptr_t length;
47   void *data;
48} map_args_t;
49
50/* Single element of result
51 * key - pointer to the key
52 * val - pointer to the value
53 */
54typedef struct
55{
56   void *key;
57   void *val;
58} keyval_t;
59
60/* List of results
61 * length - number of key value pairs
62 * data - array of key value pairs
63 */
64typedef struct
65{
66   int length;
67   keyval_t *data;
68} final_data_t;
69
70/* Scheduler function pointer type definitions */
71
72/* Map function takes in map_args_t, as supplied by the splitter
73 * and emit_intermediate() should be called on any key value pairs
74 * in the intermediate result set.
75 */
76typedef void (*map_t)(map_args_t *);
77
78struct iterator_t;
79typedef struct iterator_t iterator_t;
80int iter_next (iterator_t *itr, void **);
81int iter_size (iterator_t *itr);
82
83/* Reduce function takes in a key pointer, a list of value pointers, and a
84 * length of the list. emit() should be called on any key value pairs
85 * in the result set.
86 */
87typedef void (*reduce_t)(void *, iterator_t *itr);
88
89/* Combiner function takes in an iterator for a particular key,
90 * and returns a reduced value. The operation should be identical to the
91 * reduce function, except that this function returns the reduced value
92 * directly. */
93typedef void *(*combiner_t)(iterator_t *itr);
94
95/* Splitter function takes in a pointer to the input data, an interger of
96 * the number of bytes requested, and an uninitialized pointer to a
97 * map_args_t pointer. The result is stored in map_args_t. The splitter
98 * should return 1 if the result is valid or 0 if there is no more data.
99 */
100typedef int (*splitter_t)(void *, int, map_args_t *);
101
102/* Locator function takes in a pointer to map_args_t, and returns
103 * the memory address where this map task would be heavily accessing.
104 * The runtime would then try to dispatch the task to a thread that
105 * is nearby the physical memory that backs the address. */
106typedef void* (*locator_t)(map_args_t *);
107
108/* Partition function takes in the number of reduce tasks, a pointer to
109 * a key, and the lendth of the key in bytes. It assigns a key to a reduce task.
110 * The value returned is the # of the reduce task where the key will be processed.
111 * This value should be the same for all keys that are equal.
112 */
113typedef int (*partition_t)(int, void *, int);
114
115/* key_cmp(key1, key2) returns:
116 *   0 if key1 == key2
117 *   + if key1 > key2
118 *   - if key1 < key2
119 */
120typedef int (*key_cmp_t)(const void *, const void*);
121
122/* The arguments to operate the runtime. */
123typedef struct
124{
125    void * task_data;           /* The data to run MapReduce on.
126                                 * If splitter is NULL, this should be an array. */
127    off_t data_size;            /* Total # of bytes of data */
128    int unit_size;              /* # of bytes for one element
129                                 * (if necessary, on average) */
130
131    map_t map;                  /* Map function pointer, must be user defined */
132    reduce_t reduce;            /* If NULL, identity reduce function is used,
133                                 * which emits a keyval pair for each val. */
134    combiner_t combiner;        /* If NULL, no combiner would be called. */
135    splitter_t splitter;        /* If NULL, the array splitter is used.*/
136    locator_t locator;          /* If NULL, no locality based optimization is
137                                   performed. */
138    key_cmp_t key_cmp;          /* Key comparison function.
139                                   Must be user defined.*/
140
141    final_data_t *result;       /* Pointer to output data.
142                                 * Must be allocated by user */
143
144    /*** Optional arguments must be zero if not used ***/
145    partition_t partition;      /* Default partition function is a
146                                 * hash function */
147
148    /* Creates one emit queue for each reduce task,
149    * instead of per reduce thread. This improves
150    * time to emit if data is emitted in order,
151    * but can increase merge time. */
152    bool use_one_queue_per_task;
153
154    int L1_cache_size;     /* Size of L1 cache in bytes */
155    int num_map_threads;   /* # of threads to run map tasks on.
156                                 * Default is one per processor */
157    int num_reduce_threads;     /* # of threads to run reduce tasks on.
158    * Default is one per processor */
159    int num_merge_threads;      /* # of threads to run merge tasks on.
160    * Default is one per processor */
161    int num_procs;              /* Maximum number of processors to use. */
162
163    int proc_offset;            /* number of procs to skip for thread binding */
164                                /* (useful if you have multiple MR's running
165                                 *  and you don't want them binding to the same
166                                 *  hardware thread). */
167
168    float key_match_factor;     /* Magic number that describes the ratio of
169    * the input data size to the output data size.
170    * This is used as a hint. */
171} map_reduce_args_t;
172
173/* Runtime defined functions. */
174
175/* MapReduce initialization function. Called once per process. */
176int map_reduce_init ();
177
178/* MapReduce finalization function. Called once per process. */
179int map_reduce_finalize ();
180
181/* The main MapReduce engine. This is the function called by the application.
182 * It is responsible for creating and scheduling all map and reduce tasks, and
183 * also organizes and maintains the data which is passed from application to
184 * map tasks, map tasks to reduce tasks, and reduce tasks back to the
185 * application. Results are stored in args->result. A return value less than zero
186 * represents an error. This function is not thread safe.
187 */
188int map_reduce (map_reduce_args_t * args);
189
190/* This should be called from the map function. It stores a key with key_size
191 * bytes and a value in the intermediate queues for processing by the reduce
192 * task. The runtime will call partiton function to assign the key to a
193 * reduce task.
194 */
195void emit_intermediate(void *key, void *val, int key_size);
196
197/* This should be called from the reduce function. It stores a key and a value
198 * in the reduce queue. This will be in the final result array.
199 */
200void emit(void *key, void *val);
201
202/* This is the built in partition function which is a hash.  It is global so
203 * the user defined partition function can call it.
204 */
205int default_partition(int reduce_tasks, void* key, int key_size);
206
207#endif // MAP_REDUCE_H_
208