1#include <stdio.h>
2#include <string.h>
3#include <barrelfish/barrelfish.h>
4
5#include "common.h"
6
7#define USE_DEFWAITSET 1
8
9#define NO_COPY_MOVE_BACK 1
10#define NO_COPY_MOVE_BACK_WITH_COPY 0
11
12#define DEBUG(x...) do {} while (0)
13//#define DEBUG(x...) debug_printf("echo: " x)
14
15#define BUFSZ 0x1000
16#define NUMBUFS 500
17
18static struct bulk_allocator txalloc;
19
20static uint32_t ctrl_value = 0;
21
22static volatile uint8_t wait_flag = 0;
23
24static void panic_handler(void *arg, errval_t err, struct bulk_channel *chan)
25{
26    expect_success(err);
27}
28
29static void wait_handler(void *arg, errval_t err, struct bulk_channel *chan)
30{
31    if (!err_is_ok(err)) {
32        err_print_calltrace(err);
33    }
34    expect_success(err);
35    wait_flag = 1;
36}
37
38static struct bulk_continuation panic_cont = {
39    .handler = panic_handler,
40    .arg = NULL, };
41
42static struct bulk_continuation wait_cont = {
43    .handler = wait_handler,
44    .arg = NULL, };
45
46static struct bulk_channel rxc, txc;
47
48static errval_t cb_pool_assigned(struct bulk_channel *channel,
49                                 struct bulk_pool *pool)
50{
51    if (channel == &rxc) {
52        debug_printf("pool_assigned: RX %p [%d,%d,%d]\n", pool,
53                     pool->id.machine, pool->id.dom, pool->id.local);
54        //there is a race condition between the two channels, so we have to check it here
55        while (txc.state != BULK_STATE_CONNECTED) {
56            event_dispatch(txc.waitset);
57        }
58        wait_flag = 0;
59        if (!is_no_copy) {
60            expect_success(bulk_channel_assign_pool(&txc, pool, wait_cont));
61            //XXX: there is still a possible race condition, if we are in receive master mode,
62            //but in that case, we don't expect to get a pool over this channel anyway
63            while (!wait_flag)
64                event_dispatch(txc.waitset);       //wait until pool is assigned
65        }
66    } else {
67        debug_printf("pool_assigned: TX %p [%d,%d,%d]\n", pool,
68                     pool->id.machine, pool->id.dom, pool->id.local);
69    }
70
71    if (is_no_copy) {
72        struct bulk_pool_constraints pool_constraints = {
73            .range_min = 0,
74            .range_max = 0,
75            .alignment = 0,
76            .trust = txc.trust, };
77        expect_success(bulk_alloc_init(&txalloc, NUMBUFS, BUFSZ, &pool_constraints));
78        DEBUG("TX Pool alloc: %p\n", txalloc.pool);
79
80        wait_flag = 0;
81        expect_success(bulk_channel_assign_pool(&txc, txalloc.pool, wait_cont));
82        while (!wait_flag)
83            event_dispatch(txc.waitset);       //wait until pool is assigned
84    }
85
86    return SYS_ERR_OK;
87}
88
89static void cb_move_received(struct bulk_channel *channel,
90                             struct bulk_buffer *buffer,
91                             void *meta)
92{
93    static unsigned count = 0;
94    DEBUG("move_received: %d b->p=%p\n", count, buffer->pool);
95    count++;
96    ctrl_value += *((uint32_t *) buffer->address);
97    assert(channel == &rxc);
98    if (is_no_copy) {
99        if (NO_COPY_MOVE_BACK) {
100            struct bulk_buffer *reply = bulk_alloc_new_buffer(&txalloc);
101            assert(reply);
102            if (NO_COPY_MOVE_BACK_WITH_COPY) {
103                memcpy(reply->address, buffer->address,
104                       buffer->pool->buffer_size);
105            }
106            *((uint32_t *) reply->address) = *((uint32_t *) buffer->address) +1;
107            expect_success(bulk_channel_move(&txc, reply, meta, panic_cont));
108        }
109        expect_success(bulk_channel_pass(&rxc, buffer, meta, panic_cont));
110    } else {
111        *((uint32_t *) buffer->address) = *((uint32_t *) buffer->address) +1;
112        expect_success(bulk_channel_move(&txc, buffer, meta, panic_cont));
113    }
114}
115
116static void cb_buffer_received(struct bulk_channel *channel,
117                               struct bulk_buffer *buffer,
118                               void *meta)
119{
120    static unsigned count = 0;
121    DEBUG("buffer_received: %d b->p=%p\n", count, buffer->pool);
122    count++;
123    assert(channel == &txc);
124    if (is_no_copy) {
125        expect_success(bulk_alloc_return_buffer(&txalloc, buffer));
126    } else {
127        expect_success(bulk_channel_pass(&rxc, buffer, meta, panic_cont));
128    }
129}
130
131static void init(void)
132{
133    static struct bulk_allocator rxalloc;
134    struct bulk_buffer *buf;
135    size_t i;
136    debug_printf("init: enter\n");
137
138    if (rxc.role == BULK_ROLE_MASTER) {
139        // If we're in receive master mode, we need to allocate and pass buffers
140        //set the trust level we want in our pool from the start
141        struct bulk_pool_constraints pool_constraints = {
142            .range_min = 0,
143            .range_max = 0,
144            .alignment = 0,
145            .trust = rxc.trust, };
146        expect_success(bulk_alloc_init(&rxalloc, NUMBUFS, BUFSZ, &pool_constraints));
147        DEBUG("RX Pool alloc: %p\n", rxalloc.pool);
148
149        wait_flag = 0;
150        expect_success(bulk_channel_assign_pool(&rxc, rxalloc.pool, wait_cont));
151        while (!wait_flag)
152            event_dispatch(rxc.waitset);
153
154        wait_flag = 0;
155        expect_success(bulk_channel_assign_pool(&txc, rxalloc.pool, wait_cont));
156        while (!wait_flag)
157            event_dispatch(txc.waitset);
158
159        for (i = 0; i < NUMBUFS; i++) {
160            buf = bulk_alloc_new_buffer(&rxalloc);
161            assert(buf != NULL);
162            expect_success(bulk_channel_pass(&rxc, buf, NULL, panic_cont));
163        }
164    }
165    debug_printf("init: done\n");
166}
167
168static struct bulk_channel_callbacks cb = {
169    .bind_received = cb_bind_received,
170    .pool_assigned = cb_pool_assigned,
171    .move_received = cb_move_received,
172    .buffer_received = cb_buffer_received, };
173
174int main(int argc, char *argv[])
175{
176    struct waitset *ws;
177#if !USE_DEFWAITSET
178    struct waitset l_ws;
179    waitset_init(&l_ws);
180    ws = &l_ws;
181#else
182    ws = get_default_waitset();
183#endif
184
185    bool rx_done = false, tx_done = false;
186
187    debug_printf("bulk echo service starting\n");
188    assert(argc == 3);
189    debug_printf("Initialzing RX channel... [%s]\n", argv[1]);
190    initialize_channel(argv[1], &rxc, &cb, ws, BULK_DIRECTION_RX, BUFSZ, 0,
191                       &rx_done);
192    debug_printf("Initialzing TX channel... [%s]\n", argv[2]);
193    initialize_channel(argv[2], &txc, &cb, ws, BULK_DIRECTION_TX, BUFSZ, 0,
194                       &tx_done);
195
196    printf("Benchmark Server Ready!\n");
197    while (!rx_done || !tx_done) {
198        event_dispatch(ws);
199    }
200
201    init();
202    while (1) {
203        event_dispatch(ws);
204    }
205
206    return 0;
207}
208
209