1/**
2 * \file
3 * \brief
4 */
5
6/*
7 * Copyright (c) 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <string.h>
16#include "internal.h"
17
18enum role_type {
19    MSG_WAIT_MSG_WAIT,
20    WAIT_MSG_WAIT_MSG,
21};
22
23#define COORDINATOR (bsp_id)
24static enum role_type my_role;
25static bool round;
26static coreid_t experiment_max_cpus;
27
28static void barrier_init(void)
29{
30    round = false;
31
32    /* Determine roles as per my_core_id */
33    if (my_core_id == COORDINATOR) {
34        my_role = MSG_WAIT_MSG_WAIT;
35    } else {
36        my_role = WAIT_MSG_WAIT_MSG;
37    }
38}
39
40static void ring_reply(struct rcce_binding *st)
41{
42    /* nop */
43}
44
45static void ring_request(struct rcce_binding *st)
46{
47    assert(!round);
48    round = true;
49
50    errval_t err = st->tx_vtbl.ring_reply(st, NOP_CONT);
51    if (err_is_fail(err)){
52        DEBUG_ERR(err, "send ring reply");
53        abort();
54    }
55}
56
57static void ring_request_cont(void *arg)
58{
59    struct rcce_binding *b = arg;
60    errval_t err = b->tx_vtbl.ring_request(b, NOP_CONT);
61    if (err_is_fail(err)) {
62        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
63            err = b->register_send(b, get_default_waitset(),
64                                   MKCONT(ring_request_cont,b));
65            assert(err_is_ok(err));
66            return;
67        }
68        DEBUG_ERR(err, "send ring request");
69        abort();
70    }
71}
72
73static void message(void)
74{
75    for (coreid_t i = bsp_id + ((my_core_id - bsp_id + 1) % experiment_max_cpus);
76         i != my_core_id;
77         i = bsp_id + ((i - bsp_id + 1) % experiment_max_cpus)) {
78        if (barray[i] != NULL) {
79            ring_request_cont(barray[i]);
80            return;
81        }
82    }
83    assert(!"Should not get here");
84}
85
86struct msg_buf msgbuf[MAX_CPUS];
87
88static void message_request(struct rcce_binding *st, uint16_t coreid,
89                            const uint8_t *msg, size_t size)
90{
91    assert(!msgbuf[coreid].pending);
92    struct msg_buf *m = &msgbuf[coreid];
93
94    m->msg = memdup(msg, size);
95    m->length = size;
96    m->pending = true;
97    m->bulk = false;
98    dprintf("%d: msg arrived, (%d, %lu)\n", my_core_id, coreid, size);
99}
100
101#ifdef BULK_TRANSFER_ENABLED
102static void bulk_message_request(struct rcce_binding *b, uint16_t coreid,
103                                 uint64_t id, uint64_t size,
104                                 uint8_t last_fragment)
105{
106    struct rcce_state *st = b->st;
107    assert(!msgbuf[coreid].pending);
108    struct msg_buf *m = &msgbuf[coreid];
109    assert(m->bulk_ready);
110    assert(m->length == size);
111    void *buf = bulk_slave_buf_get_mem(&st->btr, id, NULL);
112    size_t copysize = last_fragment ? size - m->current : BLOCK_SIZE;
113
114    char *bf = buf;
115    /* printf("current = %lu, msg[0] = %d (%p), buf[0] = %d (%p), size = %llu, copysize = %lu\n", */
116    /*        m->current, m->msg[0], m->msg, bf[0], buf, size, copysize); */
117    /* for(int i = 0; i < 64; i++) { */
118    /*     printf("%d ", bf[i]); */
119    /* } */
120    /* printf("\n"); */
121    /* static int iter = 0; */
122    /* if(++iter >= 2) { */
123    /*     while(bf[0] == 0); */
124    /* } */
125    memcpy(m->msg + m->current, buf, copysize);
126    /* m->msg = bulk_slave_buf_get_mem(&st->btr, id); */
127    /* m->length = size; */
128    m->current += copysize;
129
130    if(last_fragment) {
131        m->pending = true;
132        m->bulk = true;
133        m->id = id;
134        m->bulk_ready = false;
135    }
136
137    errval_t err = barray[coreid]->tx_vtbl.
138        bulk_message_reply(barray[coreid], NOP_CONT, my_core_id, id);
139    assert(err_is_ok(err));
140}
141#endif
142
143static void message_reply(struct rcce_binding *b, uint16_t coreid)
144{
145    struct rcce_state *st = b->st;
146    assert(st->waitmsg == true);
147    st->waitmsg = false;
148    /* printf("%d: msg ack'd %p, %d\n", my_core_id, st, st->waitmsg); */
149}
150
151#ifdef BULK_TRANSFER_ENABLED
152static void bulk_message_reply(struct rcce_binding *b, uint16_t coreid,
153                               uint64_t id)
154{
155    struct rcce_state *st = b->st;
156    errval_t err = bulk_free(&st->bt, id);
157    assert(err_is_ok(err));
158    assert(st->bulk_waitmsg == true);
159    st->bulk_waitmsg = false;
160}
161#endif
162
163static void message_request_cont(void *arg)
164{
165    struct rcce_state *st = arg;
166    assert(!st->request_done);
167    st->request_done = true;
168    /* printf("%d: msg delivered, %p\n", my_core_id, st); */
169}
170
171#ifdef BULK_TRANSFER_ENABLED
172static void bulk_recv_ready(struct rcce_binding *b, uint16_t coreid,
173                            uint64_t size)
174{
175    struct rcce_state *st = b->st;
176
177    assert(!st->recv_ready);
178    st->recv_ready = true;
179    dprintf("bulk_recv_ready\n");
180}
181#endif
182
183#ifdef RCCE_PERF_MEASURE
184#       include <barrelfish/dispatcher_arch.h>
185#       include <barrelfish/curdispatcher_arch.h>
186#       define PERF(x)  d->timestamp[x] = rdtsc()
187#       define PERFM(x) x
188#else
189#       define PERF(x)
190#       define PERFM(x)
191#endif
192
193errval_t send_message(char *msg, size_t size, coreid_t dest)
194{
195    assert(barray[dest] != NULL);
196    struct rcce_state *st = barray[dest]->st;
197    errval_t err;
198
199#ifdef RCCE_PERF_MEASURE
200    dispatcher_handle_t handle = curdispatcher();
201    struct dispatcher_shared_generic* d =
202        get_dispatcher_shared_generic(handle);
203#endif
204
205    dprintf("%d: S(%lu,%d,%p,%d)\n", my_core_id, size, dest, st, st->waitmsg);
206
207#ifdef BULK_TRANSFER_ENABLED
208    // XXX: Assert we can always send a big buffer as bulk data for performance
209    // reasons
210    if(size > BLOCK_SIZE) {
211        /* printf("size = %lu, BLOCK_SIZE = %u\n", size, BLOCK_SIZE); */
212    }
213    //    assert(size <= BLOCK_SIZE);
214#endif
215
216    PERF(0);
217
218    // Wait til previous message has been processed by receiver
219#ifdef BULK_TRANSFER_ENABLED
220    while(st->waitmsg || st->bulk_waitmsg || !st->recv_ready) {
221#else
222    while(st->waitmsg) {
223#endif
224        dprintf("waiting\n");
225        messages_wait_and_handle_next();
226    }
227    st->recv_ready = false;
228
229    PERF(1);
230
231#ifndef BULK_TRANSFER_ENABLED
232    st->waitmsg = true;
233    // Send via UMP
234    st->request_done = false;
235    PERF(2);
236    err = barray[dest]->
237        tx_vtbl.message_request(barray[dest], MKCONT(message_request_cont,st),
238                                my_core_id, (uint8_t *)msg, size);
239    assert(err_is_ok(err));
240    PERF(16);
241    while(!st->request_done) {
242        /* printf("%d: handling\n", my_core_id); */
243        messages_wait_and_handle_next();
244    }
245    PERF(17);
246#else
247    /* printf("recv ready, sending %d\n", msg[0]); */
248    // Send via bulk transfer
249    for(size_t i = 0; i < size; i += BLOCK_SIZE) {
250        struct bulk_buf *bb = bulk_alloc(&st->bt);
251        assert(bb != NULL);
252        void *buf = bulk_buf_get_mem(bb);
253        size_t sendsize = i + BLOCK_SIZE < size ? BLOCK_SIZE : size - i;
254        bool last_fragment = i + BLOCK_SIZE < size ? false : true;
255
256        memcpy(buf, msg + i, sendsize);
257        char *bf = buf;
258        /* printf("send to %p (%d), msg = %p, i = %lu, sendsize = %lu\n", buf, bf[0], msg, i, sendsize); */
259        uintptr_t id = bulk_prepare_send(bb);
260        st->bulk_waitmsg = true;
261        err = barray[dest]->tx_vtbl.
262            bulk_message_request(barray[dest], NOP_CONT, my_core_id, id,
263                                 size, last_fragment);
264        assert(err_is_ok(err));
265        while(st->bulk_waitmsg) {
266            dprintf("waiting for bulk reply\n");
267            messages_wait_and_handle_next();
268        }
269    }
270#endif
271
272    return SYS_ERR_OK;
273}
274
275static void wait(void)
276{
277    while (!round) {
278        messages_wait_and_handle_next();
279    }
280    round = false;
281}
282
283void barrier_wait(void)
284{
285    switch(my_role) {
286    case MSG_WAIT_MSG_WAIT:
287        message();
288        wait();
289        message();
290        wait();
291        break;
292
293    case WAIT_MSG_WAIT_MSG:
294        wait();
295        message();
296        wait();
297        message();
298        break;
299
300    default:
301        assert(!"should not get here");
302    }
303}
304
305void barrier_binding_init(struct rcce_binding *binding)
306{
307    binding->rx_vtbl.ring_request = ring_request;
308    binding->rx_vtbl.ring_reply   = ring_reply;
309    binding->rx_vtbl.message_request = message_request;
310    binding->rx_vtbl.message_reply = message_reply;
311#ifdef BULK_TRANSFER_ENABLED
312    binding->rx_vtbl.bulk_message_request = bulk_message_request;
313    binding->rx_vtbl.bulk_message_reply = bulk_message_reply;
314    binding->rx_vtbl.bulk_recv_ready = bulk_recv_ready;
315#endif
316}
317
318void barriers_init(coreid_t max_cpus)
319{
320    experiment_max_cpus = max_cpus;
321
322    barrier_init();
323}
324