1/* Copyright (c) 2007-2009, Stanford University 2* All rights reserved. 3* 4* Redistribution and use in source and binary forms, with or without 5* modification, are permitted provided that the following conditions are met: 6* * Redistributions of source code must retain the above copyright 7* notice, this list of conditions and the following disclaimer. 8* * Redistributions in binary form must reproduce the above copyright 9* notice, this list of conditions and the following disclaimer in the 10* documentation and/or other materials provided with the distribution. 11* * Neither the name of Stanford University nor the 12* names of its contributors may be used to endorse or promote products 13* derived from this software without specific prior written permission. 14* 15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY 16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY 19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25*/ 26 27#ifndef MAP_REDUCE_H_ 28#define MAP_REDUCE_H_ 29 30#ifdef BARRELFISH 31# include <barrelfish/barrelfish.h> 32#endif 33 34#include <sys/types.h> 35#include <stdbool.h> 36 37/* Standard data types for the function arguments and results */ 38 39/* Argument to map function. This is specified by the splitter function. 40 * length - number of elements of data. The default splitter function gives 41 length in terms of the # of elements of unit_size bytes. 42 * data - data to process of a user defined type 43 */ 44typedef struct 45{ 46 intptr_t length; 47 void *data; 48} map_args_t; 49 50/* Single element of result 51 * key - pointer to the key 52 * val - pointer to the value 53 */ 54typedef struct 55{ 56 void *key; 57 void *val; 58} keyval_t; 59 60/* List of results 61 * length - number of key value pairs 62 * data - array of key value pairs 63 */ 64typedef struct 65{ 66 int length; 67 keyval_t *data; 68} final_data_t; 69 70/* Scheduler function pointer type definitions */ 71 72/* Map function takes in map_args_t, as supplied by the splitter 73 * and emit_intermediate() should be called on any key value pairs 74 * in the intermediate result set. 75 */ 76typedef void (*map_t)(map_args_t *); 77 78struct iterator_t; 79typedef struct iterator_t iterator_t; 80int iter_next (iterator_t *itr, void **); 81int iter_size (iterator_t *itr); 82 83/* Reduce function takes in a key pointer, a list of value pointers, and a 84 * length of the list. emit() should be called on any key value pairs 85 * in the result set. 86 */ 87typedef void (*reduce_t)(void *, iterator_t *itr); 88 89/* Combiner function takes in an iterator for a particular key, 90 * and returns a reduced value. The operation should be identical to the 91 * reduce function, except that this function returns the reduced value 92 * directly. */ 93typedef void *(*combiner_t)(iterator_t *itr); 94 95/* Splitter function takes in a pointer to the input data, an interger of 96 * the number of bytes requested, and an uninitialized pointer to a 97 * map_args_t pointer. The result is stored in map_args_t. The splitter 98 * should return 1 if the result is valid or 0 if there is no more data. 99 */ 100typedef int (*splitter_t)(void *, int, map_args_t *); 101 102/* Locator function takes in a pointer to map_args_t, and returns 103 * the memory address where this map task would be heavily accessing. 104 * The runtime would then try to dispatch the task to a thread that 105 * is nearby the physical memory that backs the address. */ 106typedef void* (*locator_t)(map_args_t *); 107 108/* Partition function takes in the number of reduce tasks, a pointer to 109 * a key, and the lendth of the key in bytes. It assigns a key to a reduce task. 110 * The value returned is the # of the reduce task where the key will be processed. 111 * This value should be the same for all keys that are equal. 112 */ 113typedef int (*partition_t)(int, void *, int); 114 115/* key_cmp(key1, key2) returns: 116 * 0 if key1 == key2 117 * + if key1 > key2 118 * - if key1 < key2 119 */ 120typedef int (*key_cmp_t)(const void *, const void*); 121 122/* The arguments to operate the runtime. */ 123typedef struct 124{ 125 void * task_data; /* The data to run MapReduce on. 126 * If splitter is NULL, this should be an array. */ 127 off_t data_size; /* Total # of bytes of data */ 128 int unit_size; /* # of bytes for one element 129 * (if necessary, on average) */ 130 131 map_t map; /* Map function pointer, must be user defined */ 132 reduce_t reduce; /* If NULL, identity reduce function is used, 133 * which emits a keyval pair for each val. */ 134 combiner_t combiner; /* If NULL, no combiner would be called. */ 135 splitter_t splitter; /* If NULL, the array splitter is used.*/ 136 locator_t locator; /* If NULL, no locality based optimization is 137 performed. */ 138 key_cmp_t key_cmp; /* Key comparison function. 139 Must be user defined.*/ 140 141 final_data_t *result; /* Pointer to output data. 142 * Must be allocated by user */ 143 144 /*** Optional arguments must be zero if not used ***/ 145 partition_t partition; /* Default partition function is a 146 * hash function */ 147 148 /* Creates one emit queue for each reduce task, 149 * instead of per reduce thread. This improves 150 * time to emit if data is emitted in order, 151 * but can increase merge time. */ 152 bool use_one_queue_per_task; 153 154 int L1_cache_size; /* Size of L1 cache in bytes */ 155 int num_map_threads; /* # of threads to run map tasks on. 156 * Default is one per processor */ 157 int num_reduce_threads; /* # of threads to run reduce tasks on. 158 * Default is one per processor */ 159 int num_merge_threads; /* # of threads to run merge tasks on. 160 * Default is one per processor */ 161 int num_procs; /* Maximum number of processors to use. */ 162 163 int proc_offset; /* number of procs to skip for thread binding */ 164 /* (useful if you have multiple MR's running 165 * and you don't want them binding to the same 166 * hardware thread). */ 167 168 float key_match_factor; /* Magic number that describes the ratio of 169 * the input data size to the output data size. 170 * This is used as a hint. */ 171} map_reduce_args_t; 172 173/* Runtime defined functions. */ 174 175/* MapReduce initialization function. Called once per process. */ 176int map_reduce_init (); 177 178/* MapReduce finalization function. Called once per process. */ 179int map_reduce_finalize (); 180 181/* The main MapReduce engine. This is the function called by the application. 182 * It is responsible for creating and scheduling all map and reduce tasks, and 183 * also organizes and maintains the data which is passed from application to 184 * map tasks, map tasks to reduce tasks, and reduce tasks back to the 185 * application. Results are stored in args->result. A return value less than zero 186 * represents an error. This function is not thread safe. 187 */ 188int map_reduce (map_reduce_args_t * args); 189 190/* This should be called from the map function. It stores a key with key_size 191 * bytes and a value in the intermediate queues for processing by the reduce 192 * task. The runtime will call partiton function to assign the key to a 193 * reduce task. 194 */ 195void emit_intermediate(void *key, void *val, int key_size); 196 197/* This should be called from the reduce function. It stores a key and a value 198 * in the reduce queue. This will be in the final result array. 199 */ 200void emit(void *key, void *val); 201 202/* This is the built in partition function which is a hash. It is global so 203 * the user defined partition function can call it. 204 */ 205int default_partition(int reduce_tasks, void* key, int key_size); 206 207#endif // MAP_REDUCE_H_ 208