1/**
2 * \file
3 * \brief RCCE library
4 */
5
6/*
7 * Copyright (c) 2009, 2010, 2012, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <ctype.h>
16#include <RCCE_lib.h>
17#include <barrelfish/barrelfish.h>
18#include <barrelfish/nameservice_client.h>
19#include <barrelfish/spawn_client.h>
20
21#include "internal.h"
22
23static bool core_present[MAX_CPUS] = {false};
24coreid_t bsp_id;
25static bool request_done = false;
26coreid_t my_core_id;
27coreid_t num_cores = 0;
28static char my_name[100];
29struct rcce_binding *barray[MAX_CPUS] = {NULL};
30static struct rcce_binding *connect_request = NULL;
31static uintptr_t connect_state = 0;
32
33static void error_reply(struct rcce_binding *st, errval_t err, uint64_t state);
34static void init_request(struct rcce_binding *st, coreid_t id, coreid_t bspid,
35                         uint64_t state, struct capref shared_mem);
36
37static struct rcce_rx_vtbl rcce_vtbl = {
38    .init_request = init_request,
39    .error_reply = error_reply,
40};
41
42static void client_connected(void *st, errval_t err, struct rcce_binding *b)
43{
44    struct rcce_state *cs = st;
45    assert(err_is_ok(err));
46
47    /* printf("%s: Am connected to client\n", my_name); */
48
49    b->rx_vtbl = rcce_vtbl;
50    b->st = cs;
51
52    // Create a Frame Capability
53    size_t allocated_size;
54    struct capref shared_mem;
55    errval_t r = frame_alloc(&shared_mem, BULK_SIZE * 2, &allocated_size);
56    assert(err_is_ok(r));
57
58    // Map the frame in local memory
59    void *pool;
60    r = vspace_map_one_frame_attr(&pool, allocated_size, shared_mem,
61                                  BULK_PAGE_MAP, NULL, NULL);
62    assert(pool != NULL);
63    assert(err_is_ok(r));
64    assert(allocated_size >= BULK_SIZE * 2);
65
66    // Init sender
67    err = bulk_init(pool, BULK_SIZE, BLOCK_SIZE, &cs->bt);
68    assert(err_is_ok(err));
69
70    // Init receiver
71    err = bulk_slave_init(pool + BULK_SIZE, BULK_SIZE, &cs->btr);
72    assert(err_is_ok(err));
73
74    barrier_binding_init(b);
75    barray[cs->index] = b;
76
77    err = barray[cs->index]->tx_vtbl.init_request(barray[cs->index],
78                                                  NOP_CONT, my_core_id, bsp_id,
79                                                  (uint64_t)(uintptr_t)cs,
80                                                  shared_mem);
81    assert(err_is_ok(err));
82}
83
84static void connect(coreid_t idx)
85{
86    errval_t err;
87    char id[100];
88    snprintf(id, sizeof(id), "%s%d", my_name, idx);
89
90    iref_t iref;
91    err = nameservice_blocking_lookup(id, &iref);
92    if (err_is_fail(err)) {
93        DEBUG_ERR(err, "nameservice_blocking_lookup failed");
94        abort();
95    }
96    assert(iref != 0);
97
98    struct rcce_state *st = malloc(sizeof(struct rcce_state));
99    assert(st != NULL);
100    memset(st, 0, sizeof(struct rcce_state));
101    st->index = idx;
102    st->request_done = false;
103
104    /* printf("%s: rcce_bind\n", my_name); */
105
106    err = rcce_bind(iref, client_connected, st, get_default_waitset(),
107                    IDC_BIND_FLAGS_DEFAULT);
108    assert(err_is_ok(err));
109
110    /* printf("%s: waiting\n", my_name); */
111
112    while (!st->request_done) {
113        messages_wait_and_handle_next();
114    }
115
116    /* printf("%s: done\n", my_name); */
117}
118
119static void init_request(struct rcce_binding *st, coreid_t id, coreid_t bspid,
120                         uint64_t state, struct capref shared_mem)
121{
122    errval_t err;
123    struct rcce_state *rs = st->st;
124
125    // Initialize local state for incoming connection
126    barray[id] = st;
127    bsp_id = bspid;
128
129    // Map the frame in local memory
130    void *pool;
131    err = vspace_map_one_frame_attr(&pool, BULK_SIZE * 2, shared_mem,
132                                    BULK_PAGE_MAP, NULL, NULL);
133    assert(pool != NULL);
134    assert(err_is_ok(err));
135
136    // Init receiver
137    err = bulk_slave_init(pool, BULK_SIZE, &rs->btr);
138    assert(err_is_ok(err));
139
140    // Init sender
141    err = bulk_init(pool + BULK_SIZE, BULK_SIZE, BLOCK_SIZE, &rs->bt);
142    assert(err_is_ok(err));
143
144    if(connect_request == NULL && my_core_id != bspid) {
145        connect_request = st;
146        connect_state = state;
147    } else {
148        err = st->tx_vtbl.error_reply(st, NOP_CONT, SYS_ERR_OK, state);
149        if (err_is_fail(err)) {
150            DEBUG_ERR(err, "init_reply failed");
151            abort();
152        }
153    }
154}
155
156static void error_reply(struct rcce_binding *st, errval_t err, uint64_t state)
157{
158    struct rcce_state *cs = (struct rcce_state *)(uintptr_t)state;
159    if(err_is_fail(err)) {
160        DEBUG_ERR(err, "error_reply");
161    }
162    assert(err_is_ok(err));
163    if(cs != NULL) {
164        assert(!cs->request_done);
165        cs->request_done = true;
166    } else {
167        assert(!request_done);
168        request_done = true;
169    }
170}
171
172static void _listening(void *st, errval_t err, iref_t iref)
173{
174    assert(err_is_ok(err));
175
176    /* printf("%s: listening\n", my_name); */
177
178    /* Register the service with the nameserver */
179    char serv[100];
180    snprintf(serv, sizeof(serv), "%s%d", my_name, my_core_id);
181
182    err = nameservice_register(serv, iref);
183    if (err_is_fail(err)) {
184        DEBUG_ERR(err, "nameservice_register failed");
185        abort();
186    }
187
188    /* printf("%s: registered '%s'\n", my_name, serv); */
189
190    assert(!request_done);
191    request_done = true;
192}
193
194static errval_t _connected(void *st, struct rcce_binding *b)
195{
196    /* printf("%s: connected\n", my_name); */
197    b->st = malloc(sizeof(struct rcce_state));
198    assert(b->st != NULL);
199    memset(b->st, 0, sizeof(struct rcce_state));
200    b->rx_vtbl = rcce_vtbl;
201    barrier_binding_init(b);
202    return SYS_ERR_OK;
203}
204
205static void monitor_reply(struct monitor_binding *st, errval_t msgerr)
206{
207    assert(!request_done);
208    assert(err_is_ok(msgerr));
209    request_done = true;
210}
211
212static void set_present(char *str)
213{
214    while (*str != '\0') {
215        if (!isdigit((int)*str)) {
216            str++;
217            continue;
218        }
219        int num = strtol(str, &str, 10);
220        if (num < MAX_CPUS) {
221            num_cores++;
222            core_present[num] = true;
223        }
224    }
225}
226
227void setup_routes(int argc, char **argv)
228{
229    errval_t err;
230    struct monitor_binding *st = get_monitor_binding();
231
232    /* printf("%s: setup_routes\n", argv[0]); */
233
234    /* Set core id */
235    my_core_id = disp_get_core_id();
236    strcpy(my_name, argv[0]);
237
238    // Get number of cores
239    coreid_t cores = atoi(argv[1]);
240
241    // Get list of present cores
242    for(int i = 3; i < argc; i++) {
243        set_present(argv[i]);
244    }
245
246    if (strcmp(argv[argc - 1], "dummy")) { /* bsp core */
247        // Spawn all copies
248        bsp_id = my_core_id;
249
250        /* Spawn on all cores */
251        char *spawnargv[argc + 2];
252        for (int i = 0; i < argc; i++) {
253            spawnargv[i] = argv[i];
254        }
255        spawnargv[argc] = "dummy";
256        spawnargv[argc + 1] = NULL;
257        for(coreid_t i = 0; i < MAX_CPUS; i++) {
258            if(core_present[i] && i != my_core_id) {
259                err = spawn_program(i, my_name, spawnargv, NULL,
260                                    SPAWN_FLAGS_DEFAULT, NULL);
261                assert(err_is_ok(err));
262            }
263        }
264    }
265
266    /* printf("%s: exporting service\n", argv[0]); */
267    /* Setup a server */
268    request_done = false;
269    err = rcce_export(NULL, _listening, _connected, get_default_waitset(),
270                      IDC_EXPORT_FLAGS_DEFAULT);
271    if (err_is_fail(err)) {
272        DEBUG_ERR(err, "rcce_export failed");
273        abort();
274    }
275    while (!request_done) {
276        event_dispatch(get_default_waitset());
277    }
278
279    if (strcmp(argv[argc - 1], "dummy")) { /* bsp core */
280        for (coreid_t i = 0; i < MAX_CPUS; i++) {
281            /* Connect to all cores */
282            if (core_present[i] && i != my_core_id && barray[i] == NULL) {
283                /* printf("%s: connecting to core %d\n", argv[0], i); */
284                connect(i);
285            }
286        }
287    } else {
288        /* printf("%s: waiting for connection\n", argv[0]); */
289        // Wait for an incoming connection request
290        while(connect_request == NULL) {
291            event_dispatch(get_default_waitset());
292        }
293
294        /* Connect to all cores to which we have not connected already */
295        for (coreid_t i = 0; i < MAX_CPUS; i++) {
296            if (core_present[i] && i != my_core_id && barray[i] == NULL) {
297                /* printf("%s: slave connecting to core %d\n", argv[0], i); */
298                connect(i);
299            }
300        }
301
302        /* printf("%s: sending connect reply\n", argv[0]); */
303        // Send the reply back
304        err = connect_request->tx_vtbl.
305            error_reply(connect_request, NOP_CONT, SYS_ERR_OK, connect_state);
306        if (err_is_fail(err)) {
307            DEBUG_ERR(err, "init_reply failed");
308            abort();
309        }
310    }
311
312    /* printf("%s: done\n", argv[0]); */
313
314    // Determine maximum core ID
315    coreid_t maxcore = 0;
316    for(coreid_t i = 0; i < MAX_CPUS; i++) {
317        if(core_present[i]) {
318            maxcore = i;
319        }
320    }
321
322    barriers_init(maxcore + 1);
323}
324