1#include <stdio.h> 2#include <string.h> 3#include <lwip/inet.h> 4 5#include<bulk_transfer/bulk_transfer.h> 6#include<bulk_transfer/bulk_allocator.h> 7#include<bulk_transfer/bulk_local.h> 8#include<bulk_transfer/bulk_net_proxy.h> 9 10#include "sleep.h" 11 12#define BUFSZ 0x1000 13#define PORT 1234 14 15static void dummy_cont_cb(void *arg, errval_t err, struct bulk_channel *channel) 16{ 17 //printf("DUMMY CONT\n"); 18} 19 20struct bulk_continuation dummy_cont = { .arg = NULL, .handler = dummy_cont_cb, }; 21 22/******************************************************************************/ 23/* Receiver */ 24 25#define RXNBUFS 4 26 27static errval_t cb_rx_bind_received(struct bulk_channel *channel); 28static void cb_rx_move_received(struct bulk_channel *channel, 29 struct bulk_buffer *buffer, 30 void *meta); 31static void cb_rx_copy_received(struct bulk_channel *channel, 32 struct bulk_buffer *buffer, 33 void *meta); 34static errval_t cb_rx_pool_assigned(struct bulk_channel *channel, 35 struct bulk_pool *pool); 36 37static struct bulk_channel rx_channel; 38static struct bulk_allocator rx_allocator; 39static struct bulk_channel_callbacks rx_callbacks = { .bind_received = 40 cb_rx_bind_received, .move_received = cb_rx_move_received, .copy_received = 41 cb_rx_copy_received, .pool_assigned = cb_rx_pool_assigned }; 42static struct bulk_net_proxy rx_proxy; 43 44static errval_t cb_rx_bind_received(struct bulk_channel *channel) 45{ 46 debug_printf("APPL: cb_rx_bind_received\n"); 47 return SYS_ERR_OK; 48} 49 50static errval_t cb_rx_pool_assigned(struct bulk_channel *channel, 51 struct bulk_pool *pool) 52{ 53 debug_printf("APPL: cb_rx_pool_assigned. Checking and reply OK.\n"); 54 return SYS_ERR_OK; 55} 56 57static void cb_rx_move_received(struct bulk_channel *channel, 58 struct bulk_buffer *buffer, 59 void *meta) 60{ 61 uint32_t meta_val = *((uint32_t*) meta); 62 debug_printf("APPL: cb_rx_move_received: %p, meta=%x\n", buffer->address, 63 meta_val); 64 meta_val++; 65 66 volatile char *e = buffer->address; 67 for (int i = 0; i < buffer->pool->buffer_size; ++i) { 68 assert(e[i] == 123); 69 } 70 71 errval_t err = bulk_channel_pass(channel, buffer, &meta_val, dummy_cont); 72 assert(!err_is_fail(err)); 73} 74 75static void cb_rx_copy_received(struct bulk_channel *channel, 76 struct bulk_buffer *buffer, 77 void *meta) 78{ 79 debug_printf("APPL: cb_rx_copy_received: meta=%x\n", *((uint32_t*) meta)); 80 volatile char *e = buffer->address; 81 for (int i = 0; i < buffer->pool->buffer_size; ++i) { 82 assert(e[i] == 101); 83 } 84 errval_t err = bulk_channel_release(channel, buffer, dummy_cont); 85 if (err_is_fail(err)) { 86 DEBUG_ERR(err, "channel release\n"); 87 assert(!err_is_fail(err)); 88 } 89 90} 91 92static void cb_rx_pool_assign_done(void *arg, 93 errval_t err, 94 struct bulk_channel *channel) 95{ 96 size_t i; 97 struct bulk_buffer *b; 98 uint32_t meta = 0; 99 100 debug_printf("APPL: cb_rx_pool_assign_done()\n"); 101 assert(arg == rx_allocator.pool); 102 assert(err_is_ok(err)); 103 104 // Register buffers 105 debug_printf("Registering buffers...\n"); 106 for (i = 0; i < RXNBUFS; i++) { 107 b = bulk_alloc_new_buffer(&rx_allocator); 108 assert(b != NULL); 109 debug_printf(" b[%p]->p=%"PRIx64"\n", b, b->phys); 110 err = bulk_channel_pass(channel, b, &meta, dummy_cont); 111 if (err_is_fail(err)) { 112 DEBUG_ERR(err, "Chan pass failed %d\n", i); 113 } 114 assert(!err_is_fail(err)); 115 event_dispatch_non_block(channel->waitset); 116 event_dispatch_non_block(channel->waitset); 117 event_dispatch_non_block(channel->waitset); 118 } 119 debug_printf("Buffers registered\n"); 120} 121 122static void cb_rx_connected(struct bulk_net_proxy *proxy) 123{ 124 errval_t err; 125 debug_printf("APPL: cb_rx_connected\n"); 126 127 struct bulk_continuation cont = { .handler = cb_rx_pool_assign_done, .arg = 128 rx_allocator.pool }; 129 130 err = bulk_channel_assign_pool(&rx_channel, rx_allocator.pool, cont); 131 assert(!err_is_fail(err)); 132} 133 134static void init_receiver(struct waitset *waitset) 135{ 136 static struct bulk_local_endpoint ep, p_ep; 137 errval_t err; 138 struct bulk_channel_setup setup = { .direction = BULK_DIRECTION_RX, 139 .role = BULK_ROLE_MASTER, 140 .meta_size = sizeof(uint32_t), 141 .waitset = waitset, 142 .trust = BULK_TRUST_FULL, }; 143 144 err = bulk_alloc_init(&rx_allocator, RXNBUFS, BUFSZ, NULL); 145 assert(!err_is_fail(err)); 146 bulk_local_init_endpoint(&ep, NULL); 147 err = bulk_channel_create(&rx_channel, &ep.generic, &rx_callbacks, &setup); 148 assert(!err_is_fail(err)); 149 150 bulk_local_init_endpoint(&p_ep, &rx_channel); 151 err = bulk_net_proxy_listen(&rx_proxy, &p_ep.generic, waitset, BUFSZ, 152 "e10k", 3, PORT, cb_rx_connected); 153 assert(err_is_ok(err)); 154 155} 156 157/******************************************************************************/ 158/* Sender */ 159 160static errval_t cb_tx_bind_received(struct bulk_channel *channel); 161static void cb_tx_pool_assign_done(void *arg, 162 errval_t err, 163 struct bulk_channel *channel); 164static void cb_tx_buffer_received(struct bulk_channel *channel, 165 struct bulk_buffer *buffer, 166 void *meta); 167static void cb_tx_copy_released(struct bulk_channel *channel, 168 struct bulk_buffer *buffer); 169static errval_t cb_tx_pool_assigned(struct bulk_channel *channel, 170 struct bulk_pool *pool); 171 172static struct bulk_channel tx_channel; 173static int tx_phase = 0; 174static struct bulk_allocator tx_allocator; 175static struct bulk_channel_callbacks tx_callbacks = { 176 .bind_received = cb_tx_bind_received, 177 .buffer_received = cb_tx_buffer_received, 178 .copy_released = cb_tx_copy_released, 179 .pool_assigned = cb_tx_pool_assigned, }; 180static struct bulk_net_proxy tx_proxy; 181 182static errval_t cb_tx_bind_received(struct bulk_channel *channel) 183{ 184 debug_printf("APPL: cb_tx_bind_received\n"); 185 return SYS_ERR_OK; 186} 187 188static void cb_tx_pool_assign_done(void *arg, 189 errval_t err, 190 struct bulk_channel *channel) 191{ 192 debug_printf("APPL: cb_tx_pool_assign_done()\n"); 193 assert(tx_allocator.pool == arg); 194 debug_printf("Waiting for some time to make sure other side is ready\n"); 195 milli_sleep(2000); 196 debug_printf("Sleep done\n"); 197 tx_phase = 2; 198} 199 200 201static errval_t cb_tx_pool_assigned(struct bulk_channel *channel, 202 struct bulk_pool *pool) 203{ 204 debug_printf("APPL: cb_tx_pool_assigned()\n"); 205 206 return SYS_ERR_OK; 207} 208 209static void cb_tx_buffer_received(struct bulk_channel *channel, 210 struct bulk_buffer *buffer, 211 void *meta) 212{ 213 debug_printf("APPL: cp_tx_buffer_received: %p, meta=%x\n", buffer->address, 214 *((uint32_t*) meta)); 215 errval_t err = bulk_alloc_return_buffer(&tx_allocator, buffer); 216 if (err_is_fail(err)) { 217 DEBUG_ERR(err, "Returning the buffer\n"); 218 assert(!err_is_fail(err)); 219 } 220 221 tx_phase = 4; 222} 223 224static void cb_tx_copy_released(struct bulk_channel *channel, 225 struct bulk_buffer *buffer) 226{ 227 debug_printf("APPL: cp_tx_copy_released\n"); 228 tx_phase = 6; 229 errval_t err = bulk_alloc_return_buffer(&tx_allocator, buffer); 230 assert(!err_is_fail(err)); 231 232} 233 234static void cb_tx_connected(struct bulk_net_proxy *proxy) 235{ 236 errval_t err; 237 debug_printf("APPL: cb_tx_connected\n"); 238 tx_phase = 1; 239 240 struct bulk_continuation cont = { .handler = cb_tx_pool_assign_done, .arg = 241 tx_allocator.pool }; 242 243 err = bulk_channel_assign_pool(&tx_channel, tx_allocator.pool, cont); 244 assert(!err_is_fail(err)); 245} 246 247static void init_sender(struct waitset *waitset) 248{ 249 static struct bulk_local_endpoint ep, p_ep; 250 errval_t err; 251 struct bulk_channel_setup setup = { .direction = BULK_DIRECTION_TX, 252 .role = BULK_ROLE_MASTER, 253 .meta_size = sizeof(uint32_t), 254 .waitset = waitset, 255 .trust = BULK_TRUST_FULL, }; 256 err = bulk_alloc_init(&tx_allocator, 4, BUFSZ, NULL); 257 assert(!err_is_fail(err)); 258 259 bulk_local_init_endpoint(&ep, NULL); 260 err = bulk_channel_create(&tx_channel, &ep.generic, &tx_callbacks, &setup); 261 assert(!err_is_fail(err)); 262 263 bulk_local_init_endpoint(&p_ep, &tx_channel); 264 err = bulk_net_proxy_connect(&tx_proxy, &p_ep.generic, waitset, BUFSZ, 265 "e10k", 3, ntohl(inet_addr("192.168.99.2")), PORT, cb_tx_connected); 266 assert(!err_is_fail(err)); 267} 268 269static void tx_process(void) 270{ 271 errval_t err; 272 uint32_t meta; 273 struct bulk_buffer *buffer; 274 275 if (tx_phase == 2) { 276 meta = 42; 277 debug_printf("APPL: Allocating buffer...\n"); 278 buffer = bulk_alloc_new_buffer(&tx_allocator); 279 assert(buffer); 280 memset(buffer->address, 123, buffer->pool->buffer_size); 281 debug_printf("APPL: Starting move... meta=%x\n", meta); 282 err = bulk_channel_move(&tx_channel, buffer, &meta, dummy_cont); 283 if (err_is_fail(err)) { 284 DEBUG_ERR(err, "Bulk Channel Move Failed...\n"); 285 assert(!"FOOFO"); 286 } 287 288 tx_phase++; 289 } else if (tx_phase == 4) { 290 meta = 44; 291 debug_printf("APPL: Allocating buffer...\n"); 292 buffer = bulk_alloc_new_buffer(&tx_allocator); 293 assert(buffer); 294 memset(buffer->address, 101, buffer->pool->buffer_size); 295 debug_printf("APPL: Starting copy... meta=%x\n", meta); 296 err = bulk_channel_copy(&tx_channel, buffer, &meta, dummy_cont); 297 if (err_is_fail(err)) { 298 DEBUG_ERR(err, "Bulk Channel Move Failed...\n"); 299 assert(!"FOOFO"); 300 } 301 302 tx_phase++; 303 } else if (tx_phase == 6) { 304 debug_printf("DONE.\n"); 305 exit(0); 306 } 307 308} 309 310/******************************************************************************/ 311/* Test control */ 312 313static void print_usage(void) 314{ 315 printf("Usage: bulk_netproxy (rx|tx)\n"); 316 exit(0); 317} 318 319int main(int argc, char *argv[]) 320{ 321 if (argc != 2) { 322 print_usage(); 323 } 324 bool is_tx = !strcmp(argv[1], "tx"); 325 bool is_rx = !strcmp(argv[1], "rx"); 326 if (!is_tx && !is_rx) { 327 print_usage(); 328 } 329 330 struct waitset *ws = get_default_waitset(); 331 debug_printf("bulk_mini: enter\n"); 332 if (is_tx) { 333 init_sender(ws); 334 debug_printf("bulk_mini: tx_init done\n"); 335 } else if (is_rx) { 336 init_receiver(ws); 337 debug_printf("bulk_mini: rx_init done\n"); 338 } 339 while (true) { 340 if (is_tx) { 341 tx_process(); 342 } 343 event_dispatch(ws); 344 } 345 return 0; 346} 347