1#include "common.h"
2
3#include <barrelfish/nameservice_client.h>
4#include <lwip/inet.h>
5
6#include <bulk_transfer/bulk_local.h>
7#include <bulk_transfer/bulk_net_proxy.h>
8#include <bulk_transfer/bulk_sm.h>
9
10#define BENCH_TRUST BULK_TRUST_FULL
11//#define BENCH_TRUST BULK_TRUST_NONE
12
13#define FORCE_NO_COPY_SCENARIO false
14
15#define BENCH_NET_MAX_QUEUES 5
16#define BENCH_NET_BUFFER_SIZE 0x1000
17#define BENCH_NET_BUFFER_COUNT 512
18
19bool is_no_copy = false;
20
21/** Checks if the string s has a particular prefix */
22static inline bool has_prefix(const char *s, const char *prefix)
23{
24    return !strncmp(s, prefix, strlen(prefix));
25}
26
27
28static void proxy_connected(struct bulk_net_proxy *p)
29{
30    bool *done = p->user_state;
31    debug_printf("proxy_connected()\n");
32    *done = true;
33}
34
35errval_t cb_bind_received(struct bulk_channel *channel)
36{
37    bool *done = channel->user_state;
38    debug_printf("Bound: %p\n", done);
39    if (done != NULL) {
40        *done = true;
41    }
42    return SYS_ERR_OK;
43}
44
45/* TODO: these functions are leaking memory all over the place, but since we are
46 * only using them for benchmark initializations. */
47
48// npl:card:queue:port
49static void net_proxy_listen(char *str, struct bulk_channel *channel,
50        struct bulk_channel_callbacks *cb, struct waitset *ws,
51        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
52        bool *done)
53{
54    errval_t err;
55    struct bulk_net_proxy *proxy = calloc(1, sizeof(*proxy));
56    struct bulk_local_endpoint *adesc = malloc(sizeof(*adesc));
57    struct bulk_local_endpoint *pdesc = malloc(sizeof(*pdesc));
58    struct bulk_channel_setup setup = {
59        .direction = dir,
60        .role = BULK_ROLE_MASTER,
61        .trust = BENCH_TRUST,
62        .meta_size = metasz,
63        .waitset = ws,
64    };
65
66    // Parse string
67    char *card = strtok(str, ":");
68    uint8_t queue = atoi(strtok(NULL, ":"));
69    uint16_t port = atoi(strtok(NULL, ":"));
70
71    // Prepare local channel
72    bulk_local_init_endpoint(adesc, NULL);
73    err = bulk_channel_create(channel, &adesc->generic, cb, &setup);
74    assert(err_is_ok(err));
75    channel->user_state = NULL;
76
77    // Initialize proxy
78    bulk_local_init_endpoint(pdesc, channel);
79    proxy->user_state = done;
80    err = bulk_net_proxy_listen(proxy, &pdesc->generic, ws, bufsz,
81            card, queue, port, proxy_connected);
82    if (!err_is_ok(err)) {
83        printf("err_string= %s\n", err_getstring(err));
84        err_print_calltrace(err);
85    }
86}
87
88// npc:card:queue:ip:port
89static void net_proxy_connect(char *str, struct bulk_channel *channel,
90        struct bulk_channel_callbacks *cb, struct waitset *ws,
91        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
92        bool *done)
93{
94    errval_t err;
95    struct bulk_net_proxy *proxy = calloc(1, sizeof(*proxy));
96    struct bulk_local_endpoint *adesc = malloc(sizeof(*adesc));
97    struct bulk_local_endpoint *pdesc = malloc(sizeof(*pdesc));
98    struct bulk_channel_setup setup = {
99        .direction = dir,
100        .role = BULK_ROLE_MASTER,
101        .trust = BENCH_TRUST,
102        .meta_size = metasz,
103        .waitset = ws,
104    };
105
106    // Parse string
107    char *card = strtok(str, ":");
108    uint8_t queue = atoi(strtok(NULL, ":"));
109    uint32_t ip = ntohl(inet_addr(strtok(NULL, ":")));
110    uint16_t port = atoi(strtok(NULL, ":"));
111
112    // Prepare local channel
113    bulk_local_init_endpoint(adesc, NULL);
114    err = bulk_channel_create(channel, &adesc->generic, cb, &setup);
115    assert(err_is_ok(err));
116    channel->user_state = NULL;
117
118    // Initialize proxy
119    bulk_local_init_endpoint(pdesc, channel);
120    proxy->user_state = done;
121    err = bulk_net_proxy_connect(proxy, &pdesc->generic, ws, bufsz,
122            card, queue, ip, port, proxy_connected);
123    if (!err_is_ok(err)) {
124        printf("err_string= %s\n", err_getstring(err));
125        err_print_calltrace(err);
126    }
127}
128
129// ntl:card:queue:port
130static void net_transparent_listen(char *str, struct bulk_channel *channel,
131        struct bulk_channel_callbacks *cb, struct waitset *ws,
132        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
133        bool *done, bool nocopy)
134{
135    struct bulk_net_endpoint_descriptor *epd = malloc(sizeof(*epd));
136
137    is_no_copy = nocopy;
138
139    char *card_str = strtok(str, ":");
140    char *queue_str = strtok(NULL, ":");
141    char *port_str = strtok(NULL, ":");
142
143    struct bulk_net_ep_setup epsetup = {
144        .port = atoi(port_str),
145        .queue = atoi(queue_str),
146        .max_queues = BENCH_NET_MAX_QUEUES,
147        .buffer_size = BENCH_NET_BUFFER_SIZE,
148        .buffer_count = BENCH_NET_BUFFER_COUNT,
149        .cardname = card_str,
150        .no_copy = nocopy,
151    };
152
153    is_no_copy = nocopy || FORCE_NO_COPY_SCENARIO;
154
155    expect_success(bulk_net_ep_create(epd, &epsetup));
156    struct bulk_channel_setup setup = {
157        .direction = dir,
158        .role = (dir == BULK_DIRECTION_TX ? BULK_ROLE_MASTER : BULK_ROLE_SLAVE),
159        .trust = BENCH_TRUST,
160        .meta_size = metasz,
161        .waitset = ws,
162        .user_state = done,
163    };
164    channel->user_state = done;
165    expect_success(bulk_channel_create(channel, &epd->ep_generic, cb, &setup));
166}
167
168static void bind_done_cb(void *arg, errval_t err, struct bulk_channel *channel)
169{
170    bool *done = arg;
171    assert(err_is_ok(err));
172    debug_printf("bind_done_cb()\n");
173    *done = true;
174}
175
176// ntc:card:queue:ip:port
177static void net_transparent_connect(char *str, struct bulk_channel *channel,
178        struct bulk_channel_callbacks *cb, struct waitset *ws,
179        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
180        bool *done, bool nocopy)
181{
182    struct bulk_net_endpoint_descriptor *epd = malloc(sizeof(*epd));
183
184    is_no_copy = nocopy;
185
186    char *card_str = strtok(str, ":");
187    char *queue_str = strtok(NULL, ":");
188    char *ip_str = strtok(NULL, ":");
189    char *port_str = strtok(NULL, ":");
190
191    struct bulk_net_ep_setup epsetup = {
192        .ip.addr = inet_addr(ip_str),
193        .port = atoi(port_str),
194        .queue = atoi(queue_str),
195        .max_queues = BENCH_NET_MAX_QUEUES,
196        .buffer_size = BENCH_NET_BUFFER_SIZE,
197        .buffer_count = BENCH_NET_BUFFER_COUNT,
198        .cardname = card_str,
199        .no_copy = nocopy,
200    };
201
202    expect_success(bulk_net_ep_create_remote(epd, &epsetup));
203    struct bulk_channel_bind_params setup = {
204        .role = (dir == BULK_DIRECTION_TX ? BULK_ROLE_MASTER : BULK_ROLE_SLAVE),
205        .trust = BENCH_TRUST,
206        .waitset = ws,
207    };
208
209    is_no_copy = nocopy || FORCE_NO_COPY_SCENARIO;
210
211    expect_success(bulk_channel_bind(channel, &epd->ep_generic, cb, &setup,
212            MK_BULK_CONT(bind_done_cb, done)));
213}
214
215// sml:name
216static void sm_listen(char *str, struct bulk_channel *channel,
217        struct bulk_channel_callbacks *cb, struct waitset *ws,
218        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
219        bool *done)
220{
221    char *name = str;
222
223    struct bulk_sm_endpoint_descriptor *epd = malloc(sizeof(*epd));
224    expect_success(bulk_sm_ep_create(epd));
225    struct bulk_channel_setup setup = {
226        .direction = dir,
227        .role = (dir == BULK_DIRECTION_TX ? BULK_ROLE_MASTER : BULK_ROLE_SLAVE),
228        .trust = BENCH_TRUST,
229        .meta_size = metasz,
230        .waitset = ws,
231        .user_state = done,
232    };
233    channel->user_state = done;
234    expect_success(bulk_channel_create(channel, &epd->ep_generic, cb, &setup));
235    expect_success(nameservice_register(name, epd->iref));
236}
237
238// smc:name
239static void sm_connect(char *str, struct bulk_channel *channel,
240        struct bulk_channel_callbacks *cb, struct waitset *ws,
241        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
242        bool *done)
243{
244    char *name = str;
245
246    struct bulk_sm_endpoint_descriptor *epd = malloc(sizeof(*epd));
247    iref_t iref;
248
249    expect_success(nameservice_blocking_lookup(name, &iref));
250    expect_success(bulk_sm_ep_create_remote(epd, iref));
251
252    struct bulk_channel_bind_params setup = {
253        .role = (dir == BULK_DIRECTION_TX ? BULK_ROLE_MASTER : BULK_ROLE_SLAVE),
254        .trust = BENCH_TRUST,
255        .waitset = ws,
256    };
257
258    expect_success(bulk_channel_bind(channel, &epd->ep_generic, cb, &setup,
259            MK_BULK_CONT(bind_done_cb, done)));
260}
261
262
263void initialize_channel(const char *str, struct bulk_channel *channel,
264        struct bulk_channel_callbacks *cb, struct waitset *ws,
265        enum bulk_channel_direction dir,  size_t bufsz, size_t metasz,
266        bool *done)
267{
268    char *s = malloc(strlen(str) + 1);
269    strcpy(s, str);
270
271    memset(channel, 0, sizeof(*channel));
272
273    if (has_prefix(s, "npl:")) {
274        net_proxy_listen(s + 4, channel, cb, ws, dir, bufsz, metasz, done);
275    } else if (has_prefix(s, "npc:")) {
276        net_proxy_connect(s + 4, channel, cb, ws, dir, bufsz, metasz, done);
277    } else if (has_prefix(s, "ntl:")) {
278        net_transparent_listen(s + 4, channel, cb, ws, dir, bufsz, metasz,
279                done, false);
280    } else if (has_prefix(s, "ntc:")) {
281        net_transparent_connect(s + 4, channel, cb, ws, dir, bufsz, metasz,
282                done, false);
283    } else if (has_prefix(s, "ncl:")) {
284        net_transparent_listen(s + 4, channel, cb, ws, dir, bufsz, metasz,
285                done, true);
286    } else if (has_prefix(s, "ncc:")) {
287        net_transparent_connect(s + 4, channel, cb, ws, dir, bufsz, metasz,
288                done, true);
289    } else if (has_prefix(s, "sml:")) {
290        sm_listen(s + 4, channel, cb, ws, dir, bufsz, metasz, done);
291    } else if (has_prefix(s, "smc:")) {
292        sm_connect(s + 4, channel, cb, ws, dir, bufsz, metasz, done);
293    } else {
294        USER_PANIC("Invalid channel description prefix");
295    }
296}
297