1/* Copyright (c) 2007-2009, Stanford University 2* All rights reserved. 3* 4* Redistribution and use in source and binary forms, with or without 5* modification, are permitted provided that the following conditions are met: 6* * Redistributions of source code must retain the above copyright 7* notice, this list of conditions and the following disclaimer. 8* * Redistributions in binary form must reproduce the above copyright 9* notice, this list of conditions and the following disclaimer in the 10* documentation and/or other materials provided with the distribution. 11* * Neither the name of Stanford University nor the names of its 12* contributors may be used to endorse or promote products derived from 13* this software without specific prior written permission. 14* 15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY 16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY 19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25*/ 26 27#define TIMING 28 29#include <stdio.h> 30#include <string.h> 31#include <stddef.h> 32#include <stdlib.h> 33#include <assert.h> 34#include <string.h> 35#include <math.h> 36#include "stddefines.h" 37#include "map_reduce.h" 38 39#define DEF_NUM_POINTS 100000 40#define DEF_NUM_MEANS 100 41#define DEF_DIM 3 42#define DEF_GRID_SIZE 1000 43 44#define false 0 45#define true 1 46 47int num_points; // number of vectors 48int dim; // Dimension of each vector 49int num_means; // number of clusters 50int grid_size; // size of each dimension of vector space 51int modified; 52int num_pts = 0; 53 54typedef struct { 55 int *points; 56 keyval_t *means; // each mean is an index and a coordinate. 57 int *clusters; 58 int next_point; 59 int unit_size; 60} kmeans_data_t; 61 62kmeans_data_t kmeans_data; 63 64typedef struct { 65 int length; 66 int *points; 67 keyval_t *means; 68 int *clusters; 69} kmeans_map_data_t; 70 71/** dump_means() 72 * Helper function to Print out the mean values 73 */ 74void dump_means(keyval_t *means, int size) { 75 int i,j; 76 for (i = 0; i < size; i++) 77 { 78 for (j = 0; j < dim; j++) 79 { 80 dprintf("%5d ", ((int *)means[i].val)[j]); 81 } 82 dprintf("\n"); 83 } 84 85} 86 87/** dump_points() 88 * Helper function to print out the points 89 */ 90void dump_points(int *vals, int rows) 91{ 92 int i, j; 93 94 for (i = 0; i < rows; i++) 95 { 96 for (j = 0; j < dim; j++) 97 { 98 dprintf("%5d ",vals[i * dim + j]); 99 } 100 dprintf("\n"); 101 } 102} 103 104/** parse_args() 105 * Parse the user arguments 106 */ 107void parse_args(int argc, char **argv) 108{ 109 int c; 110 extern char *optarg; 111 extern int optind; 112 113 num_points = DEF_NUM_POINTS; 114 num_means = DEF_NUM_MEANS; 115 dim = DEF_DIM; 116 grid_size = DEF_GRID_SIZE; 117 118 while ((c = getopt(argc, argv, "d:c:p:s:")) != EOF) 119 { 120 switch (c) { 121 case 'd': 122 dim = atoi(optarg); 123 break; 124 case 'c': 125 num_means = atoi(optarg); 126 break; 127 case 'p': 128 num_points = atoi(optarg); 129 break; 130 case 's': 131 grid_size = atoi(optarg); 132 break; 133 case '?': 134 printf("Usage: %s -d <vector dimension> -c <num clusters> -p <num points> -s <max value>\n", argv[0]); 135 exit(1); 136 } 137 } 138 139 if (dim <= 0 || num_means <= 0 || num_points <= 0 || grid_size <= 0) { 140 printf("Illegal argument value. All values must be numeric and greater than 0\n"); 141 exit(1); 142 } 143 144 printf("Dimension = %d\n", dim); 145 printf("Number of clusters = %d\n", num_means); 146 printf("Number of points = %d\n", num_points); 147 printf("Size of each dimension = %d\n", grid_size); 148} 149 150/** generate_points() 151 * Generate the points 152 */ 153void generate_points(int *pts, int size) 154{ 155 int i, j; 156 157 for (i=0; i<size; i++) 158 { 159 for (j=0; j<dim; j++) 160 { 161 pts[i * dim + j] = rand() % grid_size; 162 } 163 } 164} 165 166/** generate_means() 167 * Compute the means for the various clusters 168 */ 169void generate_means(keyval_t *means, int size) 170{ 171 int i, j; 172 173 for (i=0; i<size; i++) 174 { 175 *((int *)(means[i].key)) = i; 176 177 for (j=0; j<dim; j++) 178 { 179 ((int *)(means[i].val))[j] = rand() % grid_size; 180 } 181 } 182} 183 184/** get_sq_dist() 185 * Get the squared distance between 2 points 186 */ 187static inline unsigned int get_sq_dist(int *v1, int *v2) 188{ 189 int i; 190 int p1, p2; 191 192 unsigned int sum = 0; 193 for (i = 0; i < dim; i++) 194 { 195 p1 = v1[i]; 196 p2 = v2[i]; 197 sum += (p1 - p2) * (p1 - p2); 198 } 199 return sum; 200} 201 202/** add_to_sum() 203 * Helper function to update the total distance sum 204 */ 205void add_to_sum(int *sum, int *point) 206{ 207 int i; 208 209 for (i = 0; i < dim; i++) 210 { 211 sum[i] += point[i]; 212 } 213} 214 215/** mykeycmp() 216 * Key comparison function 217 */ 218int mykeycmp(const void *s1, const void *s2) 219{ 220 if ( *((int *)s1) < *((int *)s2) ) return -1; 221 else if ( *((int *)s1) > *((int *)s2) ) return 1; 222 else return 0; 223} 224 225/** find_clusters() 226 * Find the cluster that is most suitable for a given set of points 227 */ 228void find_clusters(int *points, keyval_t *means, int *clusters, int size) 229{ 230 int i, j; 231 unsigned int min_dist, cur_dist; 232 int min_idx; 233 234 for (i = 0; i < size; i++) 235 { 236 min_dist = get_sq_dist(&points[i * dim], (int *)(means[0].val)); 237 min_idx = 0; 238 for (j = 1; j < num_means; j++) 239 { 240 cur_dist = get_sq_dist(&points[i * dim], (int *)(means[j].val)); 241 if (cur_dist < min_dist) 242 { 243 min_dist = cur_dist; 244 min_idx = j; 245 } 246 } 247 248 if (clusters[i] != min_idx) 249 { 250 clusters[i] = min_idx; 251 modified = true; 252 } 253 //dprintf("Emitting [%d,%d]\n", *((int *)means[min_idx].key), *(points[i])); 254 emit_intermediate(means[min_idx].key, (void *)(&points[i * dim]), sizeof(means[min_idx].key)); 255 } 256} 257 258/** kmeans_splitter() 259 * 260 * Assigns one or more points to each map task 261 */ 262int kmeans_splitter(void *data_in, int req_units, map_args_t *out) 263{ 264 kmeans_data_t *kmeans_data = (kmeans_data_t *)data_in; 265 kmeans_map_data_t *out_data; 266 267 assert (data_in); 268 assert (out); 269 assert (kmeans_data->points); 270 assert (kmeans_data->means); 271 assert (kmeans_data->clusters); 272 assert (req_units); 273 274 if (kmeans_data->next_point >= num_points) return 0; 275 276 out_data = (kmeans_map_data_t *)malloc(sizeof(kmeans_map_data_t)); 277 out->length = 1; 278 out->data = (void *)out_data; 279 280 out_data->points = (void *)(&(kmeans_data->points[kmeans_data->next_point * dim])); 281 out_data->means = kmeans_data->means; 282 out_data->clusters = (void *)(&(kmeans_data->clusters[kmeans_data->next_point])); 283 kmeans_data->next_point += req_units; 284 if (kmeans_data->next_point >= num_points) 285 { 286 out_data->length = num_points - kmeans_data->next_point + req_units; 287 } 288 else out_data->length = req_units; 289 290 num_pts += out_data->length; 291 // Return true since the out data is valid 292 return 1; 293} 294 295/** kmeans_locator() 296 * 297 * Returns which memory this map task would be accessing. 298 */ 299void *kmeans_locator (map_args_t *task) 300{ 301 assert (task); 302 303 kmeans_map_data_t *out_data = task->data; 304 305 return out_data->points; 306} 307 308/** kmeans_map() 309 * Finds the cluster that is most suitable for a given set of points 310 * 311 */ 312void kmeans_map(map_args_t *args) 313{ 314 assert(args); 315 assert(args->length == 1); 316 317 kmeans_map_data_t *map_data = args->data; 318 find_clusters(map_data->points, map_data->means, map_data->clusters, map_data->length); 319 free(args->data); 320} 321 322/** kmeans_reduce() 323 * Updates the sum calculation for the various points 324 */ 325void kmeans_reduce(void *key_in, iterator_t *itr) 326{ 327 assert (key_in); 328 assert (itr); 329 330 int i; 331 int *sum; 332 int *mean; 333 void *val; 334 int vals_len = iter_size (itr); 335 336 sum = (int *)calloc(dim, sizeof(int)); 337 mean = (int *)malloc(dim * sizeof(int)); 338 339 i = 0; 340 while (iter_next (itr, &val)) 341 { 342 add_to_sum (sum, val); 343 ++i; 344 } 345 assert (i == vals_len); 346 347 for (i = 0; i < dim; i++) 348 { 349 mean[i] = sum[i] / vals_len; 350 } 351 352 free(sum); 353 emit(key_in, (void *)mean); 354} 355 356int main(int argc, char **argv) 357{ 358 final_data_t kmeans_vals; 359 map_reduce_args_t map_reduce_args; 360 int i; 361 int *means; 362 bool first_run; 363 364 struct timeval begin, end; 365#ifdef TIMING 366 unsigned int library_time = 0; 367 unsigned int inter_library_time = 0; 368#endif 369 370 get_time (&begin); 371 372 parse_args(argc, argv); 373 374 // get points 375 kmeans_data.points = (int *)malloc(sizeof(int) * num_points * dim); 376 generate_points(kmeans_data.points, num_points); 377 378 // get means 379 kmeans_data.means = (keyval_t *)malloc(sizeof(keyval_t) * num_means); 380 means = malloc(sizeof(int) * dim * num_means); 381 for (i=0; i<num_means; i++) { 382 kmeans_data.means[i].val = &means[i * dim]; 383 kmeans_data.means[i].key = malloc(sizeof(void *)); 384 } 385 generate_means(kmeans_data.means, num_means); 386 387 kmeans_data.next_point = 0; 388 kmeans_data.unit_size = sizeof(int) * dim; 389 390 kmeans_data.clusters = (int *)malloc(sizeof(int) * num_points); 391 memset(kmeans_data.clusters, -1, sizeof(int) * num_points); 392 393 modified = true; 394 395 CHECK_ERROR (map_reduce_init ()); 396 397 // Setup map reduce args 398 memset(&map_reduce_args, 0, sizeof(map_reduce_args_t)); 399 map_reduce_args.task_data = &kmeans_data; 400 map_reduce_args.map = kmeans_map; 401 map_reduce_args.reduce = kmeans_reduce; 402 map_reduce_args.splitter = kmeans_splitter; 403 map_reduce_args.locator = kmeans_locator; 404 map_reduce_args.key_cmp = mykeycmp; 405 map_reduce_args.unit_size = kmeans_data.unit_size; 406 map_reduce_args.partition = NULL; // use default 407 map_reduce_args.result = &kmeans_vals; 408 map_reduce_args.data_size = (num_points + num_means) * dim * sizeof(int); 409 map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 8; 410 map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8; 411 map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16; 412 map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8; 413 map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16; 414 map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2; 415 map_reduce_args.use_one_queue_per_task = true; 416 417 printf("KMeans: Calling MapReduce Scheduler\n"); 418 419 get_time (&end); 420 421#ifdef TIMING 422 fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin)); 423#endif 424 425 first_run = true; 426 while (modified == true) 427 { 428 modified = false; 429 kmeans_data.next_point = 0; 430 //dprintf("."); 431 432 get_time (&begin); 433 CHECK_ERROR (map_reduce (&map_reduce_args) < 0); 434 get_time (&end); 435 436#ifdef TIMING 437 library_time += time_diff (&end, &begin); 438#endif 439 440 get_time (&begin); 441 for (i = 0; i < kmeans_vals.length; i++) 442 { 443 int mean_idx = *((int *)(kmeans_vals.data[i].key)); 444 if (first_run == false) 445 free(kmeans_data.means[mean_idx].val); 446 kmeans_data.means[mean_idx] = kmeans_vals.data[i]; 447 } 448 if (kmeans_vals.length > 0) free(kmeans_vals.data); 449 get_time (&end); 450 451#ifdef TIMING 452 inter_library_time += time_diff (&end, &begin); 453#endif 454 first_run = false; 455 } 456 457#ifdef TIMING 458 fprintf (stderr, "library: %u\n", library_time); 459 fprintf (stderr, "inter library: %u\n", inter_library_time); 460#endif 461 462 get_time (&begin); 463 464 CHECK_ERROR (map_reduce_finalize ()); 465 466 dprintf("\n"); 467 printf("KMeans: MapReduce Completed\n"); 468 469 dprintf("\n\nFinal means:\n"); 470 dump_means(kmeans_data.means, num_means); 471 472 free(kmeans_data.points); 473 474 for (i = 0; i < num_means; i++) 475 { 476 free(kmeans_data.means[i].key); 477 free(kmeans_data.means[i].val); 478 } 479 free (kmeans_data.means); 480 free (means); 481 482 free(kmeans_data.clusters); 483 484 get_time (&end); 485 486#ifdef TIMING 487 fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin)); 488#endif 489 490 return 0; 491} 492