1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*      * Redistributions of source code must retain the above copyright
7*         notice, this list of conditions and the following disclaimer.
8*      * Redistributions in binary form must reproduce the above copyright
9*         notice, this list of conditions and the following disclaimer in the
10*         documentation and/or other materials provided with the distribution.
11*      * Neither the name of Stanford University nor the names of its
12*         contributors may be used to endorse or promote products derived from
13*         this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/
26
27#define TIMING
28
29#include <stdio.h>
30#include <string.h>
31#include <stddef.h>
32#include <stdlib.h>
33#include <assert.h>
34#include <string.h>
35#include <math.h>
36#include "stddefines.h"
37#include "map_reduce.h"
38
39#define DEF_NUM_POINTS 100000
40#define DEF_NUM_MEANS 100
41#define DEF_DIM 3
42#define DEF_GRID_SIZE 1000
43
44#define false 0
45#define true 1
46
47int num_points; // number of vectors
48int dim;         // Dimension of each vector
49int num_means; // number of clusters
50int grid_size; // size of each dimension of vector space
51int modified;
52int num_pts = 0;
53
54typedef struct {
55    int *points;
56    keyval_t *means;  // each mean is an index and a coordinate.
57    int *clusters;
58    int next_point;
59    int unit_size;
60} kmeans_data_t;
61
62kmeans_data_t kmeans_data;
63
64typedef struct {
65    int length;
66    int *points;
67    keyval_t *means;
68    int *clusters;
69} kmeans_map_data_t;
70
71/** dump_means()
72 *  Helper function to Print out the mean values
73 */
74void dump_means(keyval_t *means, int size) {
75    int i,j;
76    for (i = 0; i < size; i++)
77    {
78        for (j = 0; j < dim; j++)
79        {
80            dprintf("%5d ", ((int *)means[i].val)[j]);
81        }
82        dprintf("\n");
83    }
84
85}
86
87/** dump_points()
88 *  Helper function to print out the points
89 */
90void dump_points(int *vals, int rows)
91{
92    int i, j;
93
94    for (i = 0; i < rows; i++)
95    {
96        for (j = 0; j < dim; j++)
97        {
98            dprintf("%5d ",vals[i * dim + j]);
99        }
100        dprintf("\n");
101    }
102}
103
104/** parse_args()
105 *  Parse the user arguments
106 */
107void parse_args(int argc, char **argv)
108{
109    int c;
110    extern char *optarg;
111    extern int optind;
112
113    num_points = DEF_NUM_POINTS;
114    num_means = DEF_NUM_MEANS;
115    dim = DEF_DIM;
116    grid_size = DEF_GRID_SIZE;
117
118    while ((c = getopt(argc, argv, "d:c:p:s:")) != EOF)
119    {
120        switch (c) {
121            case 'd':
122                dim = atoi(optarg);
123                break;
124            case 'c':
125                num_means = atoi(optarg);
126                break;
127            case 'p':
128                num_points = atoi(optarg);
129                break;
130            case 's':
131                grid_size = atoi(optarg);
132                break;
133            case '?':
134                printf("Usage: %s -d <vector dimension> -c <num clusters> -p <num points> -s <max value>\n", argv[0]);
135                exit(1);
136        }
137    }
138
139    if (dim <= 0 || num_means <= 0 || num_points <= 0 || grid_size <= 0) {
140        printf("Illegal argument value. All values must be numeric and greater than 0\n");
141        exit(1);
142    }
143
144    printf("Dimension = %d\n", dim);
145    printf("Number of clusters = %d\n", num_means);
146    printf("Number of points = %d\n", num_points);
147    printf("Size of each dimension = %d\n", grid_size);
148}
149
150/** generate_points()
151 *  Generate the points
152 */
153void generate_points(int *pts, int size)
154{
155    int i, j;
156
157    for (i=0; i<size; i++)
158    {
159        for (j=0; j<dim; j++)
160        {
161            pts[i * dim + j] = rand() % grid_size;
162        }
163    }
164}
165
166/** generate_means()
167 *  Compute the means for the various clusters
168 */
169void generate_means(keyval_t *means, int size)
170{
171    int i, j;
172
173    for (i=0; i<size; i++)
174    {
175        *((int *)(means[i].key)) = i;
176
177        for (j=0; j<dim; j++)
178        {
179            ((int *)(means[i].val))[j] = rand() % grid_size;
180        }
181    }
182}
183
184/** get_sq_dist()
185 *  Get the squared distance between 2 points
186 */
187static inline unsigned int get_sq_dist(int *v1, int *v2)
188{
189    int i;
190    int p1, p2;
191
192    unsigned int sum = 0;
193    for (i = 0; i < dim; i++)
194    {
195        p1 = v1[i];
196        p2 = v2[i];
197        sum += (p1 - p2) * (p1 - p2);
198    }
199    return sum;
200}
201
202/** add_to_sum()
203 *	Helper function to update the total distance sum
204 */
205void add_to_sum(int *sum, int *point)
206{
207    int i;
208
209    for (i = 0; i < dim; i++)
210    {
211        sum[i] += point[i];
212    }
213}
214
215/** mykeycmp()
216 *  Key comparison function
217 */
218int mykeycmp(const void *s1, const void *s2)
219{
220    if ( *((int *)s1) < *((int *)s2) ) return -1;
221    else if ( *((int *)s1) > *((int *)s2) ) return 1;
222    else return 0;
223}
224
225/** find_clusters()
226 *  Find the cluster that is most suitable for a given set of points
227 */
228void find_clusters(int *points, keyval_t *means, int *clusters, int size)
229{
230    int i, j;
231    unsigned int min_dist, cur_dist;
232    int min_idx;
233
234    for (i = 0; i < size; i++)
235    {
236        min_dist = get_sq_dist(&points[i * dim], (int *)(means[0].val));
237        min_idx = 0;
238        for (j = 1; j < num_means; j++)
239        {
240            cur_dist = get_sq_dist(&points[i * dim], (int *)(means[j].val));
241            if (cur_dist < min_dist)
242            {
243                min_dist = cur_dist;
244                min_idx = j;
245            }
246        }
247
248        if (clusters[i] != min_idx)
249        {
250            clusters[i] = min_idx;
251            modified = true;
252        }
253        //dprintf("Emitting [%d,%d]\n", *((int *)means[min_idx].key), *(points[i]));
254        emit_intermediate(means[min_idx].key, (void *)(&points[i * dim]), sizeof(means[min_idx].key));
255    }
256}
257
258/** kmeans_splitter()
259 *
260 * Assigns one or more points to each map task
261 */
262int kmeans_splitter(void *data_in, int req_units, map_args_t *out)
263{
264    kmeans_data_t *kmeans_data = (kmeans_data_t *)data_in;
265    kmeans_map_data_t *out_data;
266
267    assert (data_in);
268    assert (out);
269    assert (kmeans_data->points);
270    assert (kmeans_data->means);
271    assert (kmeans_data->clusters);
272    assert (req_units);
273
274    if (kmeans_data->next_point >= num_points) return 0;
275
276    out_data = (kmeans_map_data_t *)malloc(sizeof(kmeans_map_data_t));
277    out->length = 1;
278    out->data = (void *)out_data;
279
280    out_data->points = (void *)(&(kmeans_data->points[kmeans_data->next_point * dim]));
281    out_data->means = kmeans_data->means;
282    out_data->clusters = (void *)(&(kmeans_data->clusters[kmeans_data->next_point]));
283    kmeans_data->next_point += req_units;
284    if (kmeans_data->next_point >= num_points)
285    {
286        out_data->length = num_points - kmeans_data->next_point + req_units;
287    }
288    else out_data->length = req_units;
289
290    num_pts += out_data->length;
291    // Return true since the out data is valid
292    return 1;
293}
294
295/** kmeans_locator()
296 *
297 * Returns which memory this map task would be accessing.
298 */
299void *kmeans_locator (map_args_t *task)
300{
301    assert (task);
302
303    kmeans_map_data_t *out_data = task->data;
304
305    return out_data->points;
306}
307
308/** kmeans_map()
309 * Finds the cluster that is most suitable for a given set of points
310 *
311 */
312void kmeans_map(map_args_t *args)
313{
314    assert(args);
315    assert(args->length == 1);
316
317    kmeans_map_data_t *map_data = args->data;
318    find_clusters(map_data->points, map_data->means, map_data->clusters, map_data->length);
319    free(args->data);
320}
321
322/** kmeans_reduce()
323 *	Updates the sum calculation for the various points
324 */
325void kmeans_reduce(void *key_in, iterator_t *itr)
326{
327    assert (key_in);
328    assert (itr);
329
330    int i;
331    int *sum;
332    int *mean;
333    void *val;
334    int vals_len = iter_size (itr);
335
336    sum = (int *)calloc(dim, sizeof(int));
337    mean = (int *)malloc(dim * sizeof(int));
338
339    i = 0;
340    while (iter_next (itr, &val))
341    {
342        add_to_sum (sum, val);
343        ++i;
344    }
345    assert (i == vals_len);
346
347    for (i = 0; i < dim; i++)
348    {
349        mean[i] = sum[i] / vals_len;
350    }
351
352    free(sum);
353    emit(key_in, (void *)mean);
354}
355
356int main(int argc, char **argv)
357{
358    final_data_t kmeans_vals;
359    map_reduce_args_t map_reduce_args;
360    int i;
361    int *means;
362    bool first_run;
363
364    struct timeval begin, end;
365#ifdef TIMING
366    unsigned int library_time = 0;
367    unsigned int inter_library_time = 0;
368#endif
369
370    get_time (&begin);
371
372    parse_args(argc, argv);
373
374    // get points
375    kmeans_data.points = (int *)malloc(sizeof(int) * num_points * dim);
376    generate_points(kmeans_data.points, num_points);
377
378    // get means
379    kmeans_data.means = (keyval_t *)malloc(sizeof(keyval_t) * num_means);
380    means = malloc(sizeof(int) * dim * num_means);
381    for (i=0; i<num_means; i++) {
382        kmeans_data.means[i].val = &means[i * dim];
383        kmeans_data.means[i].key = malloc(sizeof(void *));
384    }
385    generate_means(kmeans_data.means, num_means);
386
387    kmeans_data.next_point = 0;
388    kmeans_data.unit_size = sizeof(int) * dim;
389
390    kmeans_data.clusters = (int *)malloc(sizeof(int) * num_points);
391    memset(kmeans_data.clusters, -1, sizeof(int) * num_points);
392
393    modified = true;
394
395    CHECK_ERROR (map_reduce_init ());
396
397    // Setup map reduce args
398    memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
399    map_reduce_args.task_data = &kmeans_data;
400    map_reduce_args.map = kmeans_map;
401    map_reduce_args.reduce = kmeans_reduce;
402    map_reduce_args.splitter = kmeans_splitter;
403    map_reduce_args.locator = kmeans_locator;
404    map_reduce_args.key_cmp = mykeycmp;
405    map_reduce_args.unit_size = kmeans_data.unit_size;
406    map_reduce_args.partition = NULL; // use default
407    map_reduce_args.result = &kmeans_vals;
408    map_reduce_args.data_size = (num_points + num_means) * dim * sizeof(int);
409    map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 8;
410    map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
411    map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
412    map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
413    map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
414    map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
415    map_reduce_args.use_one_queue_per_task = true;
416
417    printf("KMeans: Calling MapReduce Scheduler\n");
418
419    get_time (&end);
420
421#ifdef TIMING
422    fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin));
423#endif
424
425    first_run = true;
426    while (modified == true)
427    {
428        modified = false;
429        kmeans_data.next_point = 0;
430        //dprintf(".");
431
432        get_time (&begin);
433        CHECK_ERROR (map_reduce (&map_reduce_args) < 0);
434        get_time (&end);
435
436#ifdef TIMING
437        library_time += time_diff (&end, &begin);
438#endif
439
440        get_time (&begin);
441        for (i = 0; i < kmeans_vals.length; i++)
442        {
443            int mean_idx = *((int *)(kmeans_vals.data[i].key));
444            if (first_run == false)
445                free(kmeans_data.means[mean_idx].val);
446            kmeans_data.means[mean_idx] = kmeans_vals.data[i];
447        }
448        if (kmeans_vals.length > 0) free(kmeans_vals.data);
449        get_time (&end);
450
451#ifdef TIMING
452        inter_library_time += time_diff (&end, &begin);
453#endif
454        first_run = false;
455    }
456
457#ifdef TIMING
458    fprintf (stderr, "library: %u\n", library_time);
459    fprintf (stderr, "inter library: %u\n", inter_library_time);
460#endif
461
462    get_time (&begin);
463
464    CHECK_ERROR (map_reduce_finalize ());
465
466    dprintf("\n");
467    printf("KMeans: MapReduce Completed\n");
468
469    dprintf("\n\nFinal means:\n");
470    dump_means(kmeans_data.means, num_means);
471
472    free(kmeans_data.points);
473
474    for (i = 0; i < num_means; i++)
475    {
476        free(kmeans_data.means[i].key);
477        free(kmeans_data.means[i].val);
478    }
479    free (kmeans_data.means);
480    free (means);
481
482    free(kmeans_data.clusters);
483
484    get_time (&end);
485
486#ifdef TIMING
487    fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin));
488#endif
489
490    return 0;
491}
492