1#include <stdio.h>
2#include <string.h>
3#include <lwip/inet.h>
4
5#include<bulk_transfer/bulk_transfer.h>
6#include<bulk_transfer/bulk_allocator.h>
7#include<bulk_transfer/bulk_local.h>
8#include<bulk_transfer/bulk_net_proxy.h>
9
10#include "sleep.h"
11
12#define BUFSZ 0x1000
13#define PORT 1234
14
15static void dummy_cont_cb(void *arg, errval_t err, struct bulk_channel *channel)
16{
17    //printf("DUMMY CONT\n");
18}
19
20struct bulk_continuation dummy_cont = { .arg = NULL, .handler = dummy_cont_cb, };
21
22/******************************************************************************/
23/* Receiver */
24
25#define RXNBUFS 4
26
27static errval_t cb_rx_bind_received(struct bulk_channel *channel);
28static void cb_rx_move_received(struct bulk_channel *channel,
29                                struct bulk_buffer *buffer,
30                                void *meta);
31static void cb_rx_copy_received(struct bulk_channel *channel,
32                                struct bulk_buffer *buffer,
33                                void *meta);
34static errval_t cb_rx_pool_assigned(struct bulk_channel *channel,
35                                    struct bulk_pool *pool);
36
37static struct bulk_channel rx_channel;
38static struct bulk_allocator rx_allocator;
39static struct bulk_channel_callbacks rx_callbacks = { .bind_received =
40    cb_rx_bind_received, .move_received = cb_rx_move_received, .copy_received =
41    cb_rx_copy_received, .pool_assigned = cb_rx_pool_assigned };
42static struct bulk_net_proxy rx_proxy;
43
44static errval_t cb_rx_bind_received(struct bulk_channel *channel)
45{
46    debug_printf("APPL: cb_rx_bind_received\n");
47    return SYS_ERR_OK;
48}
49
50static errval_t cb_rx_pool_assigned(struct bulk_channel *channel,
51                                    struct bulk_pool *pool)
52{
53    debug_printf("APPL: cb_rx_pool_assigned. Checking and reply OK.\n");
54    return SYS_ERR_OK;
55}
56
57static void cb_rx_move_received(struct bulk_channel *channel,
58                                struct bulk_buffer *buffer,
59                                void *meta)
60{
61    uint32_t meta_val = *((uint32_t*) meta);
62    debug_printf("APPL: cb_rx_move_received: %p, meta=%x\n", buffer->address,
63                    meta_val);
64    meta_val++;
65
66    volatile char *e = buffer->address;
67    for (int i = 0; i < buffer->pool->buffer_size; ++i) {
68        assert(e[i] == 123);
69    }
70
71    errval_t err = bulk_channel_pass(channel, buffer, &meta_val, dummy_cont);
72    assert(!err_is_fail(err));
73}
74
75static void cb_rx_copy_received(struct bulk_channel *channel,
76                                struct bulk_buffer *buffer,
77                                void *meta)
78{
79    debug_printf("APPL: cb_rx_copy_received: meta=%x\n", *((uint32_t*) meta));
80    volatile char *e = buffer->address;
81    for (int i = 0; i < buffer->pool->buffer_size; ++i) {
82        assert(e[i] == 101);
83    }
84    errval_t err = bulk_channel_release(channel, buffer, dummy_cont);
85    if (err_is_fail(err)) {
86        DEBUG_ERR(err, "channel release\n");
87        assert(!err_is_fail(err));
88    }
89
90}
91
92static void cb_rx_pool_assign_done(void *arg,
93                                   errval_t err,
94                                   struct bulk_channel *channel)
95{
96    size_t i;
97    struct bulk_buffer *b;
98    uint32_t meta = 0;
99
100    debug_printf("APPL: cb_rx_pool_assign_done()\n");
101    assert(arg == rx_allocator.pool);
102    assert(err_is_ok(err));
103
104    // Register buffers
105    debug_printf("Registering buffers...\n");
106    for (i = 0; i < RXNBUFS; i++) {
107        b = bulk_alloc_new_buffer(&rx_allocator);
108        assert(b != NULL);
109        debug_printf("  b[%p]->p=%"PRIx64"\n", b, b->phys);
110        err = bulk_channel_pass(channel, b, &meta, dummy_cont);
111        if (err_is_fail(err)) {
112            DEBUG_ERR(err, "Chan pass failed %d\n", i);
113        }
114        assert(!err_is_fail(err));
115        event_dispatch_non_block(channel->waitset);
116        event_dispatch_non_block(channel->waitset);
117        event_dispatch_non_block(channel->waitset);
118    }
119    debug_printf("Buffers registered\n");
120}
121
122static void cb_rx_connected(struct bulk_net_proxy *proxy)
123{
124    errval_t err;
125    debug_printf("APPL: cb_rx_connected\n");
126
127    struct bulk_continuation cont = { .handler = cb_rx_pool_assign_done, .arg =
128        rx_allocator.pool };
129
130    err = bulk_channel_assign_pool(&rx_channel, rx_allocator.pool, cont);
131    assert(!err_is_fail(err));
132}
133
134static void init_receiver(struct waitset *waitset)
135{
136    static struct bulk_local_endpoint ep, p_ep;
137    errval_t err;
138    struct bulk_channel_setup setup = { .direction = BULK_DIRECTION_RX,
139                    .role = BULK_ROLE_MASTER,
140                    .meta_size = sizeof(uint32_t),
141                    .waitset = waitset,
142                    .trust = BULK_TRUST_FULL, };
143
144    err = bulk_alloc_init(&rx_allocator, RXNBUFS, BUFSZ, NULL);
145    assert(!err_is_fail(err));
146    bulk_local_init_endpoint(&ep, NULL);
147    err = bulk_channel_create(&rx_channel, &ep.generic, &rx_callbacks, &setup);
148    assert(!err_is_fail(err));
149
150    bulk_local_init_endpoint(&p_ep, &rx_channel);
151    err = bulk_net_proxy_listen(&rx_proxy, &p_ep.generic, waitset, BUFSZ,
152            "e10k", 3, PORT, cb_rx_connected);
153    assert(err_is_ok(err));
154
155}
156
157/******************************************************************************/
158/* Sender */
159
160static errval_t cb_tx_bind_received(struct bulk_channel *channel);
161static void cb_tx_pool_assign_done(void *arg,
162                                   errval_t err,
163                                   struct bulk_channel *channel);
164static void cb_tx_buffer_received(struct bulk_channel *channel,
165                                  struct bulk_buffer *buffer,
166                                  void *meta);
167static void cb_tx_copy_released(struct bulk_channel *channel,
168                                struct bulk_buffer *buffer);
169static errval_t cb_tx_pool_assigned(struct bulk_channel *channel,
170                                    struct bulk_pool *pool);
171
172static struct bulk_channel tx_channel;
173static int tx_phase = 0;
174static struct bulk_allocator tx_allocator;
175static struct bulk_channel_callbacks tx_callbacks = {
176                .bind_received = cb_tx_bind_received,
177                .buffer_received = cb_tx_buffer_received,
178                .copy_released = cb_tx_copy_released,
179                .pool_assigned = cb_tx_pool_assigned, };
180static struct bulk_net_proxy tx_proxy;
181
182static errval_t cb_tx_bind_received(struct bulk_channel *channel)
183{
184    debug_printf("APPL: cb_tx_bind_received\n");
185    return SYS_ERR_OK;
186}
187
188static void cb_tx_pool_assign_done(void *arg,
189                                   errval_t err,
190                                   struct bulk_channel *channel)
191{
192    debug_printf("APPL: cb_tx_pool_assign_done()\n");
193    assert(tx_allocator.pool == arg);
194    debug_printf("Waiting for some time to make sure other side is ready\n");
195    milli_sleep(2000);
196    debug_printf("Sleep done\n");
197    tx_phase = 2;
198}
199
200
201static errval_t cb_tx_pool_assigned(struct bulk_channel *channel,
202                                    struct bulk_pool *pool)
203{
204    debug_printf("APPL: cb_tx_pool_assigned()\n");
205
206    return SYS_ERR_OK;
207}
208
209static void cb_tx_buffer_received(struct bulk_channel *channel,
210                                  struct bulk_buffer *buffer,
211                                  void *meta)
212{
213    debug_printf("APPL: cp_tx_buffer_received: %p, meta=%x\n", buffer->address,
214                    *((uint32_t*) meta));
215    errval_t err = bulk_alloc_return_buffer(&tx_allocator, buffer);
216    if (err_is_fail(err)) {
217        DEBUG_ERR(err, "Returning the buffer\n");
218        assert(!err_is_fail(err));
219    }
220
221    tx_phase = 4;
222}
223
224static void cb_tx_copy_released(struct bulk_channel *channel,
225                                struct bulk_buffer *buffer)
226{
227    debug_printf("APPL: cp_tx_copy_released\n");
228    tx_phase = 6;
229    errval_t err = bulk_alloc_return_buffer(&tx_allocator, buffer);
230    assert(!err_is_fail(err));
231
232}
233
234static void cb_tx_connected(struct bulk_net_proxy *proxy)
235{
236    errval_t err;
237    debug_printf("APPL: cb_tx_connected\n");
238    tx_phase = 1;
239
240    struct bulk_continuation cont = { .handler = cb_tx_pool_assign_done, .arg =
241        tx_allocator.pool };
242
243    err = bulk_channel_assign_pool(&tx_channel, tx_allocator.pool, cont);
244    assert(!err_is_fail(err));
245}
246
247static void init_sender(struct waitset *waitset)
248{
249    static struct bulk_local_endpoint ep, p_ep;
250    errval_t err;
251    struct bulk_channel_setup setup = { .direction = BULK_DIRECTION_TX,
252                    .role = BULK_ROLE_MASTER,
253                    .meta_size = sizeof(uint32_t),
254                    .waitset = waitset,
255                    .trust = BULK_TRUST_FULL, };
256    err = bulk_alloc_init(&tx_allocator, 4, BUFSZ, NULL);
257    assert(!err_is_fail(err));
258
259    bulk_local_init_endpoint(&ep, NULL);
260    err = bulk_channel_create(&tx_channel, &ep.generic, &tx_callbacks, &setup);
261    assert(!err_is_fail(err));
262
263    bulk_local_init_endpoint(&p_ep, &tx_channel);
264    err = bulk_net_proxy_connect(&tx_proxy, &p_ep.generic, waitset, BUFSZ,
265            "e10k", 3, ntohl(inet_addr("192.168.99.2")), PORT, cb_tx_connected);
266    assert(!err_is_fail(err));
267}
268
269static void tx_process(void)
270{
271    errval_t err;
272    uint32_t meta;
273    struct bulk_buffer *buffer;
274
275    if (tx_phase == 2) {
276        meta = 42;
277        debug_printf("APPL: Allocating buffer...\n");
278        buffer = bulk_alloc_new_buffer(&tx_allocator);
279        assert(buffer);
280        memset(buffer->address, 123, buffer->pool->buffer_size);
281        debug_printf("APPL: Starting move... meta=%x\n", meta);
282        err = bulk_channel_move(&tx_channel, buffer, &meta, dummy_cont);
283        if (err_is_fail(err)) {
284            DEBUG_ERR(err, "Bulk Channel Move Failed...\n");
285            assert(!"FOOFO");
286        }
287
288        tx_phase++;
289    } else if (tx_phase == 4) {
290        meta = 44;
291        debug_printf("APPL: Allocating buffer...\n");
292        buffer = bulk_alloc_new_buffer(&tx_allocator);
293        assert(buffer);
294        memset(buffer->address, 101, buffer->pool->buffer_size);
295        debug_printf("APPL: Starting copy... meta=%x\n", meta);
296        err = bulk_channel_copy(&tx_channel, buffer, &meta, dummy_cont);
297        if (err_is_fail(err)) {
298            DEBUG_ERR(err, "Bulk Channel Move Failed...\n");
299            assert(!"FOOFO");
300        }
301
302        tx_phase++;
303    } else if (tx_phase == 6) {
304        debug_printf("DONE.\n");
305        exit(0);
306    }
307
308}
309
310/******************************************************************************/
311/* Test control */
312
313static void print_usage(void)
314{
315    printf("Usage: bulk_netproxy (rx|tx)\n");
316    exit(0);
317}
318
319int main(int argc, char *argv[])
320{
321    if (argc != 2) {
322        print_usage();
323    }
324    bool is_tx = !strcmp(argv[1], "tx");
325    bool is_rx = !strcmp(argv[1], "rx");
326    if (!is_tx && !is_rx) {
327        print_usage();
328    }
329
330    struct waitset *ws = get_default_waitset();
331    debug_printf("bulk_mini: enter\n");
332    if (is_tx) {
333        init_sender(ws);
334        debug_printf("bulk_mini: tx_init done\n");
335    } else if (is_rx) {
336        init_receiver(ws);
337        debug_printf("bulk_mini: rx_init done\n");
338    }
339    while (true) {
340        if (is_tx) {
341            tx_process();
342        }
343        event_dispatch(ws);
344    }
345    return 0;
346}
347