1/*
2 * threadalloc.c
3 *
4 *  Created on: Mar 21, 2011
5 *      Author: scadrian
6 */
7
8#include <stdio.h>
9#include <barrelfish/barrelfish.h>
10#include <barrelfish/nameservice_client.h>
11#include <barrelfish/waitset.h>
12#include <skb/skb.h>
13#include <unistd.h>
14
15#include <barrelfish/curdispatcher_arch.h>
16#include <arch/x86/barrelfish/perfmon.h>
17#include <arch/x86/barrelfish_kpi/perfmon_amd.h>
18
19#define GROESSI (128*1024*1024)
20
21static volatile int nr_available_cores = 1;
22static volatile int nr_spanned_cores = 0;
23
24static volatile int nr_active_threads = 0;
25static volatile int nr_finished_threads = 0;
26static volatile uint64_t end = 0;
27static volatile uint64_t start = 0;
28
29static struct thread_mutex m;
30static volatile spinlock_t s = 0;
31
32static volatile uint64_t tmp_increments = 0;
33static spinlock_t stmp = 0;
34static struct thread_mutex mtmp;
35
36static struct thread_cond all_done_condition;
37static struct thread_mutex all_done_mutex;
38
39struct addr {
40    uint8_t *startaddr;
41    int size;
42};
43
44#define NR_ALLOCATED_THREADS 22
45//#define BUFFER_SIZE (1024 * 1024 * 1024)
46#define BUFFER_SIZE (4 * 1024 * 1024)
47
48static struct addr as[NR_ALLOCATED_THREADS];
49
50static volatile int totalincrements = 0;
51static volatile int value_per_thread = 1000000;
52static int increment_tmp_var(void *arg)
53{
54    for (uint64_t i = 0; i < value_per_thread; i++) {
55//        acquire_spinlock(&stmp);
56        thread_mutex_lock(&mtmp);
57        tmp_increments++;
58//        if (tmp_increments % 10000 == 0) {
59//            printf("tmp_increments: %lu\n", tmp_increments);
60//        }
61        if (tmp_increments == totalincrements) {
62            thread_mutex_lock(&all_done_mutex);
63            thread_cond_signal(&all_done_condition);
64            thread_mutex_unlock(&all_done_mutex);
65        }
66        thread_mutex_unlock(&mtmp);
67//        release_spinlock(&stmp);
68    }
69    return 0;
70}
71static void thread_done(void)
72{
73//    thread_mutex_lock(&m);
74    acquire_spinlock(&s);
75    nr_finished_threads++;
76//    if (nr_finished_threads == nr_active_threads) {
77//        end = rdtsc();
78//    }
79//    thread_mutex_unlock(&m);
80    release_spinlock(&s);
81}
82static void spanned_cb(void *arg, errval_t reterr)
83{
84    assert(err_is_ok(reterr));
85    nr_spanned_cores++;
86    if (nr_spanned_cores == nr_available_cores - 1) {
87        thread_mutex_lock(&all_done_mutex);
88        thread_cond_signal(&all_done_condition);
89        thread_mutex_unlock(&all_done_mutex);
90    }
91//    printf("nr_spanned_cores = %d\n", nr_spanned_cores);
92}
93
94static int create_skb_channel(void *arg)
95{
96    errval_t err = skb_client_connect();
97    if (err_is_fail(err)) {
98        DEBUG_ERR(err, "connection to the SKB from thread failed. Terminating.\n");
99        return (1);
100    }
101    printf("SKB channel on core %"PRIuCOREID" created\n", disp_get_core_id());
102    thread_done();
103    return (0);
104}
105
106static int initialize_memory(void *arg)
107{
108    struct addr *ptr = (struct addr*)arg;
109    for (int i = 0; i < ptr->size; i++) {
110        ptr->startaddr[i] = 0;
111    }
112    thread_done();
113    return 0;
114}
115
116static int initialize_number(void *arg)
117{
118    struct addr *ptr = (struct addr*)arg;
119    for (int i = 0; i < ptr->size; i++) {
120        ptr->startaddr[i] = i;
121    }
122    thread_done();
123    return 0;
124}
125
126static int sqr(void *arg)
127{
128    struct addr *ptr = (struct addr*)arg;
129    for (int i = 0; i < ptr->size; i++) {
130        ptr->startaddr[i] = ptr->startaddr[i] * ptr->startaddr[i];
131    }
132    thread_done();
133    return 0;
134}
135
136//static int writemem(void *arg)
137//{
138//    printf("very first program:\n");
139//    unsigned char *intarray = (unsigned char *)arg;
140//
141//    for (int i = 0; i < GROESSI; i++) {
142//        intarray[i] = 0;
143//    }
144//    start = rdtsc();
145//    for (int i = 0; i < GROESSI; i++) {
146//        intarray[i] = 0;
147//    }
148//    end = rdtsc();
149//    printf("zykla: %lu\n", end - start);
150//    printf("done.\n");
151//    return 0;
152//}
153
154
155static int writemem(void *arg)
156{
157    printf("very first program:\n");
158    unsigned char *intarray = (unsigned char *)arg;
159    uint64_t *data = (uint64_t*)malloc(sizeof(uint64_t) * GROESSI);
160
161    for (int i = 0; i < GROESSI; i++) {
162        intarray[i] = 0;
163    }
164    for (int i = 0; i < GROESSI; i++) {
165        start = rdtsc();
166        intarray[i] = 0;
167        end = rdtsc();
168        data[i] = end - start;
169    }
170    printf("\n**************start***********\n\n");
171    for (int i = 0; i < GROESSI; i++) {
172        printf("###RES%d\t%lu\n",i, data[i]);
173    }
174    printf("\n**************stop***********\n\n");
175
176//    printf("zykla: %lu\n", end - start);
177    printf("done.\n");
178    return 0;
179}
180
181
182int main(int argc, char **argv)
183{
184    errval_t err;
185
186    printf("%s starting up...\n", argv[0]);
187
188    err = skb_client_connect();
189    if (err_is_fail(err)) {
190        DEBUG_ERR(err, "connection to the SKB failed. Terminating.\n");
191        return (0);
192    }
193
194    iref_t iref;
195    err = nameservice_blocking_lookup("datagatherer_done", &iref);
196    if (err_is_fail(err)) {
197        USER_PANIC_ERR(err, "nameservice_blocking_lookup failed");
198    }
199
200    err = skb_execute("available_nr_cores(Nr),write(nrcores(Nr)).");
201    if (err_is_fail(err)) {
202        DEBUG_ERR(err, "Could not determine the number of cores."
203                "Stay with one core...\n");
204    }
205    err = skb_read_output("nrcores(%d)", &nr_available_cores);
206    if (err_is_fail(err)) {
207        DEBUG_ERR(err, "Could not determine the number of cores <2>."
208                "Stay with one core...\n");
209    }
210    printf("nr available cores: %d\n", nr_available_cores);
211
212    coreid_t my_core_id = disp_get_core_id();
213
214    int curr_core_nr = (my_core_id + 1) % nr_available_cores;
215    while (curr_core_nr != my_core_id) {
216//    while (curr_core_nr != ((my_core_id + 11)%nr_available_cores)) {
217        err = domain_new_dispatcher(curr_core_nr, spanned_cb, NULL);
218        if (err_is_fail(err)) {
219            DEBUG_ERR(err, "failed to span domain\n");
220        }
221        assert(err_is_ok(err));
222//        printf("new thread on core %d\n", curr_core_nr);
223        curr_core_nr = (curr_core_nr + 1) % nr_available_cores;
224    }
225
226    printf("ack\n");
227    // we need to wait until the domain got spanned to all other cores
228    // this means that we need to wait for n-1 callbacks
229    thread_mutex_init(&all_done_mutex);
230    thread_cond_init(&all_done_condition);
231
232    printf("wait for the domain to be spanned to all other cores...\n");
233    thread_mutex_lock(&all_done_mutex);
234    thread_cond_wait(&all_done_condition, &all_done_mutex);
235    thread_mutex_unlock(&all_done_mutex);
236    printf("wait for the domain to be spanned: signalled.\n");
237
238
239
240    printf("spanned to all cores!\n");
241    unsigned char *intarray = (unsigned char*)malloc(GROESSI);
242    printf("allocated.\n");
243
244
245    curr_core_nr = (my_core_id + 1) % nr_available_cores;
246    err = domain_thread_create_on(curr_core_nr, writemem, intarray, NULL);
247    if (err_is_fail(err)) {
248        DEBUG_ERR(err, "could not create thread on core\n");
249    }
250
251    // test: two threads that increment the same variable,
252    // each one increments it by 100. the result should be
253    // 200
254
255    thread_mutex_init(&mtmp);
256
257    thread_mutex_init(&all_done_mutex);
258    thread_cond_init(&all_done_condition);
259
260    curr_core_nr = (my_core_id + 1) % nr_available_cores;
261    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
262    if (err_is_fail(err)) {
263        DEBUG_ERR(err, "could not create thread on core\n");
264    }
265    curr_core_nr = (my_core_id) % nr_available_cores;
266    err = domain_thread_create_on(curr_core_nr, increment_tmp_var, NULL, NULL);
267    if (err_is_fail(err)) {
268        DEBUG_ERR(err, "could not create thread on core\n");
269    }
270
271    totalincrements = value_per_thread * 2;
272    printf("set totalincrements to %d\n", totalincrements);
273//    thread_mutex_lock(&all_done_mutex);
274//    thread_cond_wait(&all_done_condition, &all_done_mutex);
275//    thread_mutex_unlock(&all_done_mutex);
276    printf("signalled, wait now in while loop.\n");
277    while(tmp_increments < totalincrements);
278//    for (int i = 0; i < 5000; i++) {
279//        printf(".");
280//    }
281    printf("var = %ld\n", tmp_increments);
282
283    // each dispatcher on every core except the original core needs
284    // to create a channel to the SKB, otherwise it can't use the SKB
285//    nr_active_threads = nr_available_cores - 1;
286    nr_active_threads = nr_spanned_cores;
287    nr_finished_threads = 0;
288    thread_mutex_init(&m);
289    s = 0;
290    curr_core_nr = (my_core_id + 1) % nr_available_cores;
291    while (curr_core_nr != my_core_id) {
292        err = domain_thread_create_on(curr_core_nr, create_skb_channel, 0, NULL);
293        if (err_is_fail(err)) {
294            DEBUG_ERR(err, "could not create thread on core\n");
295            break;
296        }
297        curr_core_nr = (curr_core_nr + 1) % nr_available_cores;
298    }
299    while (nr_finished_threads != nr_active_threads);
300    printf("\nall SKB channels created\n");
301
302
303    void *ptr = sbrk(BUFFER_SIZE);
304    printf("address = %p\n", ptr);
305    void *saddr = ptr;
306    for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
307        as[i].startaddr = saddr;
308        as[i].size = BUFFER_SIZE / NR_ALLOCATED_THREADS;
309        saddr += (BUFFER_SIZE / NR_ALLOCATED_THREADS);
310        printf("start %d: %p, size %d: %d\n", i, as[i].startaddr, i, as[i].size);
311    }
312
313    thread_cond_init(&all_done_condition);
314
315    end = 0;
316    dispatcher_handle_t my_dispatcher = curdispatcher();
317//    uint64_t event = EVENT_AMD_L2_REQUESTS;
318    uint64_t event = EVENT_AMD_L2_CACHE_MISSES;
319    uint64_t umask = UMASK_AMD_L2_REQUEST_DC_FILL;
320    int counter = 0;
321    perfmon_setup(my_dispatcher, counter, event, umask, false);
322
323    static struct addr as_master;
324    as_master.size = BUFFER_SIZE;
325    as_master.startaddr = ptr;
326
327    start = rdtsc();
328    initialize_memory(&as_master);
329    end = rdtsc();
330    printf("master thread: %lu cycles\n", end - start);
331
332    start = rdtsc();
333
334    nr_active_threads = NR_ALLOCATED_THREADS;
335    nr_finished_threads = 0;
336    thread_mutex_init(&m);
337    s = 0;
338    curr_core_nr = (my_core_id + 1) % nr_available_cores;
339
340    uint64_t start_l2_cache_misses = rdpmc(0);
341    for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
342        err = domain_thread_create_on(curr_core_nr, initialize_memory, &as[i], NULL);
343        if (err_is_fail(err)) {
344            DEBUG_ERR(err, "could not create thread on core\n");
345        }
346        curr_core_nr = (curr_core_nr + 1) % nr_available_cores;
347    }
348    while (nr_active_threads != nr_finished_threads);
349    uint64_t stop_l2_cache_misses = rdpmc(0);
350    printf("Round 0: %lu Cache requests\n", stop_l2_cache_misses - start_l2_cache_misses);
351    end = rdtsc();
352    printf("round 0: %lu cycles\n", end - start);
353
354
355    start = rdtsc();
356
357    nr_active_threads = NR_ALLOCATED_THREADS;
358    nr_finished_threads = 0;
359    thread_mutex_init(&m);
360    s = 0;
361
362    curr_core_nr = (my_core_id + 1) % nr_available_cores;
363    start_l2_cache_misses = rdpmc(0);
364    for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
365        err = domain_thread_create_on(curr_core_nr, initialize_number, &as[i], NULL);
366        if (err_is_fail(err)) {
367            DEBUG_ERR(err, "could not create thread on core\n");
368        }
369        curr_core_nr = (curr_core_nr + 1) % nr_available_cores;
370    }
371    while (nr_active_threads != nr_finished_threads);
372    stop_l2_cache_misses = rdpmc(0);
373    printf("Round 0: %lu Cache requests\n", stop_l2_cache_misses - start_l2_cache_misses);
374    end = rdtsc();
375    printf("round 0: %lu cycles\n", end - start);
376
377
378    start = rdtsc();
379
380    nr_active_threads = NR_ALLOCATED_THREADS;
381    nr_finished_threads = 0;
382    thread_mutex_init(&m);
383    s = 0;
384
385    curr_core_nr = (my_core_id + 1) % nr_available_cores;
386    start_l2_cache_misses = rdpmc(0);
387    for (int i = 0; i < NR_ALLOCATED_THREADS; i++) {
388        err = domain_thread_create_on(curr_core_nr, sqr, &as[i], NULL);
389        if (err_is_fail(err)) {
390            DEBUG_ERR(err, "could not create thread on core\n");
391        }
392        curr_core_nr = (curr_core_nr + 1) % nr_available_cores;
393    }
394    while (nr_active_threads != nr_finished_threads);
395    stop_l2_cache_misses = rdpmc(0);
396    printf("Round 0: %lu Cache requests\n", stop_l2_cache_misses - start_l2_cache_misses);
397
398
399
400
401    end = rdtsc();
402    printf("Start = %lu, end = %lu, Time: %lu\n", start, end, end - start);
403
404    struct waitset *ws = get_default_waitset();
405//    while (nr_spanned_cores != nr_available_cores - 1) {
406    while (1) {
407        err = event_dispatch(ws);
408        if (err_is_fail(err)) {
409            DEBUG_ERR(err, "in event_dispatch");
410            break;
411        }
412    }
413while(1);
414}
415