1/*
2 * Copyright (c) 2014 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <stdio.h>
11#include <sys/param.h>
12
13#include <barrelfish/barrelfish.h>
14#include <bulk_transfer/bulk_transfer.h>
15#include <bulk_transfer/bulk_net.h>
16#include <bulk_transfer/bulk_allocator.h>
17
18#include "../../bulk_pool.h"
19#include "../../bulk_buffer.h"
20#include "../../helpers.h"
21
22#include "bulk_net_backend.h"
23#include "bulk_net_transfer.h"
24
25#if BULK_NET_ENABLE_DEBUG_BACKEND
26#define BNT_DEBUG_TRACE BULK_NET_TRACE
27#define BNT_DEBUG(fmt, msg...) BULK_NET_DEBUG(fmt, msg)
28#else
29#define BNT_DEBUG(fmt, msg...) do{}while(0);
30#define BNT_DEBUG_TRACE do{}while(0);
31#endif
32
33#if BULK_NET_ENABLE_STATUS_BACKEND
34#define BNT_STATUS(fmt, msg...) BULK_NET_STATUS(fmt, msg)
35#else
36#define BNT_STATUS(fmt, msg...) do{} while(0);
37#endif
38
39#define BULK_NET_CTRL_CHANNEL_BUF_SIZE 256
40
41struct bulk_net_nocopy
42{
43    struct bulk_net_control net_ctrl;
44
45    struct bulk_net_nocopy *bulk_control;
46    struct bulk_channel *channel;
47    struct bulk_pool *pool;
48    struct bulk_continuation bind_cont;
49    struct pending_pool_request *pending_pool_requests;
50    struct receive_buffer *meta_rb;
51    errval_t err;
52    bool bound;
53    struct bulk_continuation panic_cont;
54    void *zero_meta;
55
56    void *user_state;
57};
58
59enum proto_msg
60{
61    PROTO_INVALID,
62    PROTO_BIND_REQUEST,
63    PROTO_BIND_RESPONSE,
64    PROTO_POOL_REQUEST,
65    PROTO_POOL_RESPONSE,
66    PROTO_BUFFER_MOVE,
67    PROTO_BUFFER_COPY,
68    PROTO_BUFFER_PASS,
69    PROTO_BUFFER_RELEASE,
70    PROTO_STATUS,
71
72    /* NOT IMPLEMENTED */
73    PROTO_POOL_REMOVE,
74    PROTO_TEARDOWN
75};
76
77struct proto_trail_bind_req
78{
79    uint32_t buffer_size;
80    uint8_t trust_level;
81    uint8_t role;
82    /* XXX: there are no constraints on this channel */
83
84    uint8_t type;
85}__attribute__((packed));
86
87struct proto_trail_bind_resp
88{
89    uint32_t buffer_size;   ///< XXX: given by the creator side
90    uint32_t meta_size;     ///< XXX: given by the creator side
91    uint8_t direction;
92    uint8_t trust_level;
93    uint8_t role;
94    errval_t err;
95
96    uint8_t type;
97}__attribute__((packed));
98
99struct proto_trail_pool_req
100{
101    uint32_t buffer_count;
102    uint32_t buffer_size;
103    uint32_t pool_machine_id;
104    domainid_t pool_domain_id;
105    uint32_t pool_local_id;
106    uint16_t port;
107
108    uint8_t type;
109}__attribute__((packed));
110
111struct proto_trail_pool_resp
112{
113    errval_t err;
114    uint32_t pool_machine_id;
115    domainid_t pool_domain_id;
116    uint32_t pool_local_id;
117    uint16_t port;
118
119    uint8_t type;
120}__attribute__((packed));
121
122struct proto_trail_move
123{
124    uint32_t buffer_id;
125
126    uint8_t type;
127}__attribute__((packed));
128
129struct proto_trail_copy
130{
131    uint32_t buffer_id;
132
133    uint8_t type;
134}__attribute__((packed));
135
136struct proto_trail_pass
137{
138    uint32_t pool_machine_id;
139    domainid_t pool_domain_id;
140    uint32_t pool_local_id;
141    uint32_t buffer_id;
142
143    uint8_t type;
144}__attribute__((packed));
145
146struct proto_trail_release
147{
148    uint32_t pool_machine_id;
149    domainid_t pool_domain_id;
150    uint32_t pool_local_id;
151    uint32_t buffer_id;
152
153    uint8_t type;
154}__attribute__((packed));
155
156struct proto_trail_status
157{
158    errval_t err;
159
160    uint8_t type;
161}__attribute__((packed));
162
163static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg);
164static void tcb_transmitted(struct bulk_e10k *bu, void *opaque);
165
166struct pending_pool_request
167{
168    struct bulk_pool *pool;
169    struct bulk_net_nocopy *bnt;
170    struct bulk_continuation cont;
171    struct pending_pool_request *next;
172};
173
174struct bulk_net_pool_data
175{
176    struct bulk_net_nocopy *p;
177    uint32_t *buf_id_local_to_remote;
178    uint32_t *buf_id_remote_to_local;
179};
180
181/* ----------------------------- pools ------------------------------------- */
182static inline struct bulk_net_pool_data *get_pool_data(struct bulk_pool *pool)
183{
184    return ((struct bulk_pool_internal*) pool)->impl_data;
185}
186
187static inline struct bulk_net_nocopy *get_net_nocopy(struct bulk_pool *pool)
188{
189    struct bulk_net_pool_data *pd = get_pool_data(pool);
190    if (pd) {
191        return pd->p;
192    }
193    return NULL;
194}
195
196static inline struct bulk_buffer *get_buffer(struct bulk_channel *chan,
197                                             struct bulk_pool_id *pool_id,
198                                             uint32_t buffer_id)
199{
200    struct bulk_pool *pool = bulk_pool_get(pool_id, chan);
201    assert(pool);
202    assert(buffer_id < pool->num_buffers);
203    return pool->buffers[buffer_id];
204}
205
206/* ---------------------------- buffer id translation ---------------------- */
207
208static inline uint32_t get_local_bufid(struct bulk_buffer *buf)
209{
210    return ((lvaddr_t) buf->address - buf->pool->base_address)
211                    / buf->pool->buffer_size;
212}
213
214/// XXX: assuming pool goes just over one net
215static inline uint32_t get_remote_bufid(struct bulk_pool *pool,
216                                        uint32_t local_buf_id)
217{
218    struct bulk_net_pool_data *pd = get_pool_data(pool);
219    assert(pd);
220    return pd->buf_id_local_to_remote[local_buf_id];
221}
222
223static inline void set_remote_bufid(struct bulk_pool *pool,
224                                    uint32_t local_buf_id,
225                                    uint32_t remote_buf_id)
226{
227    struct bulk_net_pool_data *pd = get_pool_data(pool);
228    assert(pd);
229    pd->buf_id_local_to_remote[local_buf_id] = remote_buf_id;
230    pd->buf_id_remote_to_local[remote_buf_id] = local_buf_id;
231}
232
233static errval_t bulk_net_init_meta_rb(struct receive_buffer *rbs,
234                                      uint32_t num,
235                                      uint32_t size)
236{
237    errval_t err;
238    struct receive_buffer tmp_rb, *rb;
239
240    if (BULK_NET_NOCOPY_META_BUFFER_SIZE) {
241        size = BULK_NET_NOCOPY_META_BUFFER_SIZE;
242    }
243
244    err = allocmap_frame(num * size, &tmp_rb.virt, &tmp_rb.phys, NULL);
245    assert(err_is_ok(err));
246
247    for (uint32_t j = 0; j < num; ++j) {
248        rb = rbs + j;
249        rb->buffer = NULL;
250        rb->is_meta = true;
251        rb->virt = tmp_rb.virt + (j * size);
252        rb->phys = tmp_rb.phys + (j * size);
253    }
254    return SYS_ERR_OK;
255}
256
257/* --------------------------- binding ------------------------------------- */
258
259static void send_bind_response(struct bulk_net_nocopy *p,
260                               uint32_t buffer_size,
261                               uint32_t meta_size,
262                               uint8_t direction,
263                               uint8_t role,
264                               uint8_t trust,
265                               errval_t err)
266{
267    BNT_DEBUG_TRACE
268
269    struct proto_trail_bind_resp *t;
270    struct transmit_buffer *tb;
271    struct bulk_net_msgdesc msg;
272
273    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
274    assert(tb != NULL);
275
276    t = tb->int_virt;
277    t->err = err;
278    t->buffer_size = buffer_size;
279    t->meta_size = meta_size;
280    t->direction = direction;
281    t->role = role;
282    t->trust_level = trust;
283    t->type = PROTO_BIND_RESPONSE;
284
285    msg.parts[1].phys = tb->int_phys;
286    msg.parts[1].size = sizeof(*t);
287    msg.parts[1].opaque = tb;
288    msg.parts[2].size = 0;
289
290    bulk_net_transfer_add_header(&msg);
291    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
292    assert(err_is_ok(err));
293}
294
295static void handle_bind_response(struct bulk_net_nocopy *p,
296                                 struct proto_trail_bind_resp *t,
297                                 struct bulk_net_msgdesc *msg)
298{
299    BNT_DEBUG_TRACE
300
301    struct bulk_channel *chan = p->channel;
302
303    if (p->bound) {
304        BNT_DEBUG("channel [%p] already bound. request ignored.", chan);
305        goto free_rx;
306    }
307
308    assert(chan->state == BULK_STATE_BINDING);
309
310    chan->meta_size = t->meta_size;
311    chan->trust = t->trust_level;
312    chan->role = t->role;
313    chan->direction = t->direction;
314
315    if (err_is_fail(t->err)) {
316        BNT_STATUS("ERROR: binding failed on channel [%p].", chan);
317        chan->state = BULK_STATE_CLOSED;
318    } else {
319        BNT_STATUS("SUCCESS: channel [%p] bound.", chan);
320        chan->state = BULK_STATE_CONNECTED;
321        p->bound = true;
322    }
323
324    if (p->bind_cont.handler) {
325        p->bind_cont.handler(p->bind_cont.arg, t->err, p->channel);
326    }
327
328    free_rx: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
329}
330
331static void send_bind_request(struct bulk_net_nocopy *p,
332                              uint32_t buffer_size,
333                              uint8_t trust_level,
334                              uint8_t role)
335{
336    BNT_DEBUG_TRACE
337
338    errval_t err;
339
340    struct proto_trail_bind_req *t;
341    struct transmit_buffer *tb;
342    struct bulk_net_msgdesc msg;
343
344    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
345    assert(tb != NULL);
346
347    t = tb->int_virt;
348    t->buffer_size = buffer_size;
349    t->role = role;
350    t->trust_level = trust_level;
351    t->type = PROTO_BIND_REQUEST;
352
353    msg.parts[1].phys = tb->int_phys;
354    msg.parts[1].size = sizeof(*t);
355    msg.parts[1].opaque = tb;
356    msg.parts[2].size = 0;
357
358    bulk_net_transfer_add_header(&msg);
359    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
360    assert(err_is_ok(err));
361}
362
363static void handle_bind_request(struct bulk_net_nocopy *p,
364                                struct proto_trail_bind_req *t,
365                                struct bulk_net_msgdesc *msg)
366{
367    BNT_DEBUG_TRACE
368
369    errval_t err;
370
371    assert(p->bulk_control == p);
372
373    struct receive_buffer *rb = msg->parts[0].opaque;
374    struct packet_header *hdr = rb->hdr_virt;
375
376    if (p->bound) {
377        BNT_DEBUG("channel [%p] already bound. request ignored.", p->channel);
378
379        goto free_rx;
380    }
381
382    /* update mac address */
383    p->net_ctrl.r_mac = 0;
384    memcpy(&p->net_ctrl.r_mac, hdr->l2.smac, 6);
385
386    /* set the remote ip and ports */
387    p->net_ctrl.r_ip = ntohl(hdr->l3.s_ip);
388    p->net_ctrl.r_port = ntohs(hdr->l4.s_port);
389
390    /* update the TX headers */
391    bulk_net_transfer_update_tx_headers(&p->net_ctrl);
392
393    if (t->buffer_size != p->net_ctrl.buffer_size) {
394        BNT_DEBUG("ERROR: wrong buffer size: [%x] [%x]", t->buffer_size,
395                  (uint32_t )p->net_ctrl.buffer_size);
396        err = BULK_TRANSFER_ALLOC_BUFFER_SIZE;
397        goto send_and_free;
398    }
399
400    /* update the roles */
401    if (p->channel->role == BULK_ROLE_GENERIC) {
402        if (t->role == BULK_ROLE_GENERIC) {
403            p->channel->role = BULK_ROLE_MASTER;
404        } else {
405            p->channel->role = bulk_role_other(t->role);
406        }
407    }
408
409    /* update the trust level */
410    if (p->channel->trust != t->trust_level) {
411        /* TODO: chose appropriate trust level */
412        if (p->channel->trust == BULK_TRUST_FULL) {
413            p->channel->trust = t->trust_level;
414        } else if (p->channel->trust == BULK_TRUST_HALF) {
415            if (t->trust_level == BULK_TRUST_NONE) {
416                p->channel->trust = BULK_TRUST_NONE;
417            }
418        }
419    }
420
421    /* do the callback tot he application */
422    err = p->channel->callbacks->bind_received(p->channel);
423
424    /* update the connectoin state */
425    p->channel->state = BULK_STATE_CONNECTED;
426    p->bound = true;
427
428    send_and_free: send_bind_response(
429                    p, p->net_ctrl.buffer_size, p->channel->meta_size,
430                    bulk_direction_other(p->channel->direction),
431                    bulk_role_other(p->channel->role), p->channel->trust, err);
432
433    free_rx: bulk_net_transfer_free_rx(&p->net_ctrl, msg);
434}
435
436/* -------------------------- pool assignment -------------------------------*/
437
438static void send_pool_assign_response(struct bulk_net_nocopy *p,
439                                      errval_t err,
440                                      struct bulk_pool *pool,
441                                      uint16_t l_port)
442{
443    BNT_DEBUG_TRACE
444
445    struct proto_trail_pool_resp *t;
446    struct transmit_buffer *tb;
447    struct bulk_net_msgdesc msg;
448
449    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
450    assert(tb != NULL);
451
452    t = tb->int_virt;
453    t->err = err;
454
455    t->pool_domain_id = pool->id.dom;
456    t->pool_machine_id = pool->id.machine;
457    t->pool_local_id = pool->id.local;
458
459    t->port = l_port;
460
461    t->type = PROTO_POOL_RESPONSE;
462
463    msg.parts[1].phys = tb->int_phys;
464    msg.parts[1].size = sizeof(*t);
465    msg.parts[1].opaque = tb;
466    msg.parts[2].size = 0;
467
468    bulk_net_transfer_add_header(&msg);
469    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
470    assert(err_is_ok(err));
471}
472
473static void handle_pool_assign_response(struct bulk_net_nocopy *p,
474                                        struct proto_trail_pool_resp *t,
475                                        struct bulk_net_msgdesc *msg)
476{
477    BNT_DEBUG_TRACE
478
479    errval_t err;
480
481    assert(p->bulk_control == p);
482
483    struct pending_pool_request *ppr = p->pending_pool_requests;
484    struct pending_pool_request *prev = NULL;
485
486    struct bulk_pool_id id = {
487        .dom = t->pool_domain_id,
488        .machine = t->pool_machine_id,
489        .local = t->pool_local_id };
490
491    while (ppr) {
492        if (bulk_pool_cmp_id(&id, &ppr->pool->id) == 0) {
493            if (prev == NULL) {
494                p->pending_pool_requests = ppr->next;
495            } else {
496                prev->next = ppr->next;
497            }
498            break;
499        }
500        prev = ppr;
501        ppr = ppr->next;
502    }
503
504    if (ppr == NULL) {
505        BNT_DEBUG("ERROR: no pending binding request (ignored). [%i, %i]",
506                  (uint32_t )id.dom, id.local);
507        goto free_and_cont;
508    }
509
510    struct bulk_pool *pool = ppr->pool;
511    if (err_is_fail(t->err)) {
512        BNT_STATUS("FAILED: Pool [%x, %x, %x] assign to channel [%p] vetoed",
513                   pool->id.machine, pool->id.dom, pool->id.local, p->channel);
514        goto free_and_cont;
515    }
516
517    err = bulk_pool_assign(pool, p->channel);
518    if (err_is_fail(err)) {
519        BNT_STATUS("FAILED: Pool [%x, %x, %x] assignment to channel [%p] \n%s",
520                   pool->id.machine, pool->id.dom, pool->id.local, p->channel,
521                   err_getstring(err));
522        goto free_and_cont;
523    }
524
525    /* update status values */
526    struct bulk_net_nocopy *bnt = ppr->bnt;
527    bnt->bound = true;
528
529    /* update port information */
530    bnt->net_ctrl.r_port = t->port;
531    bulk_net_transfer_update_tx_headers(&bnt->net_ctrl);
532
533    BNT_STATUS("SUCCESS: Pool [%x, %x, %x] assigned to channel [%p]",
534               pool->id.machine, pool->id.dom, pool->id.local, p->channel);
535
536    free_and_cont:
537
538    if (ppr->cont.handler) {
539        ppr->cont.handler(ppr->cont.arg, t->err, p->channel);
540    }
541
542    if (err_is_fail(t->err)) {
543        assert(!"NYI: Cleaning up of network structs...");
544    }
545
546    free(ppr);
547    bulk_net_transfer_free_rx(&p->net_ctrl, msg);
548}
549
550static void send_pool_assign_request(struct bulk_net_nocopy *p,
551                                     struct bulk_pool *pool,
552                                     uint16_t l_port)
553{
554    BNT_DEBUG_TRACE
555
556    errval_t err;
557
558    struct proto_trail_pool_req *t;
559    struct transmit_buffer *tb;
560    struct bulk_net_msgdesc msg;
561
562    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
563    assert(tb != NULL);
564
565    t = tb->int_virt;
566    t->buffer_size = pool->buffer_size;
567    t->buffer_count = pool->num_buffers;
568    t->pool_domain_id = pool->id.dom;
569    t->pool_machine_id = pool->id.machine;
570    t->pool_local_id = pool->id.local;
571    t->port = l_port;
572    t->type = PROTO_POOL_REQUEST;
573
574    msg.parts[1].phys = tb->int_phys;
575    msg.parts[1].size = sizeof(*t);
576    msg.parts[1].opaque = tb;
577    msg.parts[2].size = 0;
578
579    bulk_net_transfer_add_header(&msg);
580    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
581    assert(err_is_ok(err));
582
583}
584
585static void handle_pool_assign_request(struct bulk_net_nocopy *p,
586                                       struct proto_trail_pool_req *t,
587                                       struct bulk_net_msgdesc *msg)
588{
589    BNT_DEBUG_TRACE
590
591    assert(p->bulk_control == p);
592
593    errval_t err;
594    uint16_t port = 0;
595    uint8_t first_assignment = 0;
596
597    struct bulk_net_pool_data *pd = NULL;
598    struct bulk_net_nocopy *bnt = NULL;
599
600    /* calculate the new queue */
601    uint8_t queueid = p->net_ctrl.queue + p->net_ctrl.num_queues;
602    p->net_ctrl.num_queues++;
603
604    /* check if the pool is already present in the domain */
605    struct bulk_pool_id id = {
606        .dom = t->pool_domain_id,
607        .machine = t->pool_machine_id,
608        .local = t->pool_local_id };
609
610    struct bulk_pool *pool = bulk_pool_domain_list_get(&id);
611
612    if (p->net_ctrl.num_queues == p->net_ctrl.max_queues) {
613        err = BULK_TRANSFER_NET_MAX_QUEUES;
614        goto send_and_free;
615    }
616
617    /* there is no such pool */
618    if (pool == NULL) {
619        struct bulk_allocator pool_alloc;
620
621        struct bulk_pool_constraints constr = {
622            .range_min = p->channel->constraints.mem_range_min,
623            .range_max = p->channel->constraints.mem_range_max,
624            .alignment = p->channel->constraints.men_align,
625            .trust = p->channel->trust };
626
627        err = bulk_alloc_init(&pool_alloc, t->buffer_count, t->buffer_size,
628                              &constr);
629        if (err_is_fail(err)) {
630            DEBUG_ERR(err, "Failed to allocate memory for the pool\n");
631            goto send_and_free;
632        }
633
634        /* Free allocator memory*/
635        free(pool_alloc.mngs);
636
637        /* overwrite the ID */
638        pool = pool_alloc.pool;
639        pool->id = id;
640
641        first_assignment = 1;
642
643        BNT_DEBUG("New pool allocated: [%x, %x, %x]", pool->id.machine,
644                  pool->id.dom, pool->id.local)
645
646    } else {
647        BNT_DEBUG("Pool already present in domain: [%x, %x, %x]",
648                  pool->id.machine, pool->id.dom, pool->id.local);
649        if (get_net_nocopy(pool)) {
650            err = BULK_TRANSFER_NET_POOL_USED;
651            goto send_and_free;
652        }
653
654        if (bulk_pool_is_assigned(pool, p->channel)) {
655            err = BULK_TRANSFER_POOL_ALREADY_ASSIGNED;
656            goto send_and_free;
657        }
658    }
659
660    /* we have a pool and this pool does not go over the network channel */
661
662    struct bulk_pool_internal *pool_int = (struct bulk_pool_internal *) pool;
663
664    assert(!pool_int->impl_data);
665
666    size_t pd_size = sizeof(struct bulk_net_pool_data)
667                    + 2 * t->buffer_count * sizeof(uint32_t);
668
669    pd = malloc(pd_size);
670
671    if (!pd) {
672        err = BULK_TRANSFER_MEM;
673        goto send_and_free;
674    }
675
676    pd->buf_id_local_to_remote = (uint32_t *) (pd + 1);
677    pd->buf_id_remote_to_local = (pd->buf_id_local_to_remote + t->buffer_count);
678
679    for (uint32_t i = 0; i < t->buffer_count; ++i) {
680        pd->buf_id_remote_to_local[i] = 0;
681        pd->buf_id_local_to_remote[i] = 0;
682    }
683
684    pool_int->impl_data = pd;
685
686    bnt =
687        calloc(1,
688               sizeof(struct bulk_net_nocopy)
689                               + t->buffer_count
690                                               * sizeof(struct receive_buffer));
691    if (!bnt) {
692        err = BULK_TRANSFER_MEM;
693        goto send_and_free;
694    }
695
696    bnt->meta_rb = (struct receive_buffer *) (bnt + 1);
697    err = bulk_net_init_meta_rb(bnt->meta_rb, t->buffer_count,
698                                pool->buffer_size);
699    assert(!err_is_fail(err));
700
701    memcpy(&bnt->net_ctrl, &p->net_ctrl, sizeof(bnt->net_ctrl));
702    bnt->net_ctrl.queue = queueid;
703    bnt->net_ctrl.buffer_count = 0;
704
705    pd->p = bnt;
706    bnt->net_ctrl.r_port = t->port;
707    /* this is the control channel, has just two buffers */
708
709    bnt->bulk_control = p;
710    bnt->channel = p->channel;
711    bnt->pool = pool;
712
713    err = bulk_net_transfer_bind(&bnt->net_ctrl, tcb_transmitted, tcb_received);
714    if (err_is_fail(err)) {
715        goto send_and_free;
716    }
717
718    err = p->channel->callbacks->pool_assigned(p->channel, pool);
719    if (err_is_fail(err)) {
720        BNT_STATUS("VETO: Pool [%x, %x, %x] not assigned to channel [%p]",
721                   pool->id.machine, pool->id.dom, pool->id.local, p->channel);
722        goto send_and_free;
723    }
724
725    err = bulk_pool_assign(pool, p->channel);
726    assert(!err_is_fail(err)); // should not fail
727
728    BNT_STATUS("SUCCESS: Pool [%x, %x, %x] assigned to channel [%p]",
729               pool->id.machine, pool->id.dom, pool->id.local, p->channel);
730
731    /* update status */
732    bnt->bound = true;
733
734    /* we must make sure that the buffers are ready for receiving */
735    if (p->channel->direction == BULK_DIRECTION_RX) {
736        BNT_STATUS("Adding %i receive buffers.", (uint32_t )pool->num_buffers);
737        for (uint32_t i = 0; i < pool->num_buffers; ++i) {
738            struct receive_buffer *rb;
739            struct bulk_buffer *buffer = pool->buffers[i];
740            rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
741            assert(rb != NULL);
742
743            rb->virt = buffer->address;
744            rb->phys = buffer->phys;
745            rb->buffer = buffer;
746
747            err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys,
748                                   rb->hdr_phys, rb);
749            assert(err_is_ok(err));
750
751            rb = bnt->meta_rb + i;
752            err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys,
753                                   rb->hdr_phys, rb);
754            assert(err_is_ok(err));
755        }
756
757    }
758
759    port = bnt->net_ctrl.l_port;
760
761    if (!pool) {
762        struct bulk_pool tmp_pool;
763        tmp_pool.id = id;
764        pool = &tmp_pool;
765    }
766
767    send_and_free: send_pool_assign_response(p, err, pool, port);
768
769    if (err_is_fail(err)) {
770        if (pd) {
771            free(pd);
772        }
773        if (bnt) {
774            /* TODO: Free up net resources */
775            free(bnt);
776        }
777        if (first_assignment) {
778            bulk_pool_dealloc(pool);
779        }
780    }
781
782    bulk_net_transfer_free_rx(&p->net_ctrl, msg);
783}
784
785/* ---------------------------- move operation ----------------------------- */
786
787static void send_buffer_move(struct bulk_net_nocopy *p,
788                             struct bulk_buffer *b,
789                             void *meta,
790                             struct bulk_continuation cont)
791{
792    BNT_DEBUG_TRACE
793
794    assert(p->bulk_control != p);
795    assert(p->channel->direction == BULK_DIRECTION_TX);
796
797    errval_t err;
798    struct proto_trail_move *t;
799    struct transmit_buffer *tb_d, *tb;
800    struct bulk_net_msgdesc msg;
801
802    tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
803    assert(tb_d != NULL);
804    tb_d->buffer = b;
805    tb_d->is_copy = false;
806    tb_d->cont = cont;
807
808    // prepare trailer
809    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
810    assert(tb != NULL);
811
812    if (meta != NULL) {
813        memcpy(tb->int_virt, meta, p->channel->meta_size);
814    } else {
815        memset(tb->int_virt, 0, p->channel->meta_size);
816    }
817    t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
818    t->type = PROTO_BUFFER_MOVE;
819
820    uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
821                    / b->pool->buffer_size;
822    t->buffer_id = get_remote_bufid(b->pool, local_id);
823
824    msg.parts[1].phys = b->phys;
825    msg.parts[1].size = p->net_ctrl.buffer_size;
826    msg.parts[1].opaque = tb_d;
827    msg.parts[2].phys = tb->int_phys;
828    msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
829    msg.parts[2].opaque = tb;
830    msg.parts[3].size = 0;
831
832    bulk_net_transfer_add_header(&msg);
833    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
834    assert(err_is_ok(err));
835}
836
837static void handle_buffer_move(struct bulk_net_nocopy *p,
838                               struct proto_trail_move *t,
839                               struct bulk_net_msgdesc *msg)
840{
841    BNT_DEBUG_TRACE
842
843    assert(p->bulk_control != p);
844
845    errval_t err;
846    struct receive_buffer *rb;
847    struct bulk_buffer *buffer;
848
849    rb = msg->parts[1].opaque;
850    buffer = rb->buffer;
851    stack_alloc_free(&p->net_ctrl.rb_stack, rb);
852
853    uint32_t local_id =
854        ((lvaddr_t) buffer->address - buffer->pool->base_address)
855                        / buffer->pool->buffer_size;
856
857    set_remote_bufid(buffer->pool, local_id, t->buffer_id);
858
859    err = bulk_buffer_change_state(buffer, BULK_BUFFER_READ_WRITE);
860    assert(!err_is_fail(err));
861
862    rb = msg->parts[2].opaque;
863
864    p->channel->callbacks->move_received(p->channel, buffer, rb->virt);
865
866    assert(rb->is_meta == true);
867
868    // bulk_net_transfer_free_rb(&p->net_ctrl, rb);
869}
870
871/* ----------------------------- copy operation ---------------------------- */
872
873static void send_buffer_copy(struct bulk_net_nocopy *p,
874                             struct bulk_buffer *b,
875                             void *meta,
876                             struct bulk_continuation cont)
877{
878    BNT_DEBUG_TRACE
879
880    assert(p->bulk_control != p);
881    assert(p->channel->direction == BULK_DIRECTION_TX);
882
883    errval_t err;
884    struct proto_trail_copy *t;
885    struct transmit_buffer *tb_d, *tb;
886    struct bulk_net_msgdesc msg;
887
888    tb_d = stack_alloc_alloc(&p->net_ctrl.tb_stack);
889    assert(tb_d != NULL);
890    tb_d->buffer = b;
891    tb_d->is_copy = true;
892    tb_d->cont = cont;
893
894    // prepare trailer
895    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
896    assert(tb != NULL);
897
898    if (meta != NULL) {
899        memcpy(tb->int_virt, meta, p->channel->meta_size);
900    } else {
901        memset(tb->int_virt, 0, p->channel->meta_size);
902    }
903    t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
904    t->type = PROTO_BUFFER_COPY;
905
906    uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
907                    / b->pool->buffer_size;
908    t->buffer_id = get_remote_bufid(b->pool, local_id);
909
910    msg.parts[1].phys = b->phys;
911    msg.parts[1].size = p->net_ctrl.buffer_size;
912    msg.parts[1].opaque = tb_d;
913    msg.parts[2].phys = tb->int_phys;
914    msg.parts[2].size = sizeof(*t) + p->channel->meta_size;
915    msg.parts[2].opaque = tb;
916    msg.parts[3].size = 0;
917
918    bulk_net_transfer_add_header(&msg);
919    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
920    assert(err_is_ok(err));
921}
922
923static void handle_buffer_copy(struct bulk_net_nocopy *p,
924                               struct proto_trail_copy *t,
925                               struct bulk_net_msgdesc *msg)
926{
927    BNT_DEBUG_TRACE
928
929    assert(p->bulk_control != p);
930
931    errval_t err;
932    struct receive_buffer *rb;
933
934    rb = msg->parts[2].opaque;
935
936    struct bulk_buffer *buf = rb->buffer;
937
938    assert(buf);
939
940    uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
941                    / buf->pool->buffer_size;
942
943    set_remote_bufid(buf->pool, local_id, t->buffer_id);
944
945    enum bulk_buffer_state st = BULK_BUFFER_READ_ONLY;
946    if (bulk_buffer_is_owner(buf)) {
947        st = BULK_BUFFER_RO_OWNED;
948    }
949    err = bulk_buffer_change_state(buf, st);
950    assert(!err_is_fail(err));
951
952    p->channel->callbacks->copy_received(p->channel, buf, rb->virt);
953
954    assert(rb->is_meta == true);
955
956    // bulk_net_transfer_free_rb(&p->net_ctrl, rb);
957}
958
959/* ------------------------------ pass operation --------------------------- */
960
961static void send_buffer_pass(struct bulk_net_nocopy *p,
962                             struct bulk_buffer *b,
963                             void *meta,
964                             struct bulk_continuation cont)
965{
966    BNT_DEBUG_TRACE
967
968    assert(p->bulk_control == p);
969
970    errval_t err;
971    struct proto_trail_pass *t;
972    struct transmit_buffer *tb;
973    struct bulk_net_msgdesc msg;
974
975    // prepare trailer
976    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
977    assert(tb != NULL);
978    tb->cont = cont;
979    tb->buffer = b;
980    if (meta != NULL) {
981        memcpy(tb->int_virt, meta, p->channel->meta_size);
982    } else {
983        memset(tb->int_virt, 0, p->channel->meta_size);
984    }
985    t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
986    t->type = PROTO_BUFFER_PASS;
987    t->pool_domain_id = b->pool->id.dom;
988    t->pool_local_id = b->pool->id.local;
989    t->pool_machine_id = b->pool->id.machine;
990
991    uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
992                    / b->pool->buffer_size;
993
994    t->buffer_id = get_remote_bufid(b->pool, local_id);
995
996    msg.parts[1].phys = tb->int_phys;
997    msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
998    msg.parts[1].opaque = tb;
999    msg.parts[2].size = 0;
1000
1001    bulk_net_transfer_add_header(&msg);
1002    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
1003    assert(err_is_ok(err));
1004}
1005
1006static void handle_buffer_pass(struct bulk_net_nocopy *p,
1007                               struct proto_trail_pass *t,
1008                               struct bulk_net_msgdesc *msg)
1009{
1010    BNT_DEBUG_TRACE
1011
1012    assert(p->bulk_control == p);
1013
1014    errval_t err;
1015    struct receive_buffer *rb;
1016
1017    struct bulk_pool_id id = {
1018        .machine = t->pool_machine_id,
1019        .local = t->pool_local_id,
1020        .dom = t->pool_domain_id, };
1021
1022    struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
1023
1024    assert(buf);
1025
1026    uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
1027                    / buf->pool->buffer_size;
1028
1029    set_remote_bufid(buf->pool, local_id, t->buffer_id);
1030
1031    err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
1032    assert(!err_is_fail(err));
1033
1034    rb = msg->parts[1].opaque;
1035
1036    p->channel->callbacks->buffer_received(p->channel, buf, rb->virt);
1037
1038    bulk_net_transfer_free_rb(&p->net_ctrl, rb);
1039}
1040
1041/* ----------------------------- release operation ------------------------- */
1042
1043static void send_buffer_release(struct bulk_net_nocopy *p,
1044                                struct bulk_buffer *b,
1045                                void *meta,
1046                                struct bulk_continuation cont)
1047{
1048    BNT_DEBUG_TRACE
1049
1050    errval_t err;
1051    struct proto_trail_release *t;
1052    struct transmit_buffer *tb;
1053    struct bulk_net_msgdesc msg;
1054
1055    assert(p->bulk_control == p);
1056
1057    // prepare trailer
1058    tb = stack_alloc_alloc(&p->net_ctrl.tb_stack);
1059    assert(tb != NULL);
1060    tb->cont = cont;
1061    tb->buffer = b;
1062    if (meta != NULL) {
1063        memcpy(tb->int_virt, meta, p->channel->meta_size);
1064    } else {
1065        memset(tb->int_virt, 0, p->channel->meta_size);
1066    }
1067    t = (void *) ((uint8_t *) tb->int_virt + p->channel->meta_size);
1068    t->type = PROTO_BUFFER_RELEASE;
1069    t->pool_domain_id = b->pool->id.dom;
1070    t->pool_local_id = b->pool->id.local;
1071    t->pool_machine_id = b->pool->id.machine;
1072
1073    uint32_t local_id = ((lvaddr_t) b->address - b->pool->base_address)
1074                    / b->pool->buffer_size;
1075    t->buffer_id = get_remote_bufid(b->pool, local_id);
1076
1077    msg.parts[1].phys = tb->int_phys;
1078    msg.parts[1].size = sizeof(*t) + p->channel->meta_size;
1079    msg.parts[1].opaque = tb;
1080    msg.parts[2].size = 0;
1081
1082    bulk_net_transfer_add_header(&msg);
1083    err = bulk_e10k_send(&p->net_ctrl.transfer, &msg);
1084    assert(err_is_ok(err));
1085}
1086
1087static void handle_buffer_release(struct bulk_net_nocopy *p,
1088                                  struct proto_trail_release *t,
1089                                  struct bulk_net_msgdesc *msg)
1090{
1091    BNT_DEBUG_TRACE
1092
1093    assert(p->bulk_control == p);
1094
1095    errval_t err;
1096    struct receive_buffer *rb;
1097
1098    struct bulk_pool_id id = {
1099        .machine = t->pool_machine_id,
1100        .local = t->pool_local_id,
1101        .dom = t->pool_domain_id, };
1102
1103    struct bulk_buffer *buf = get_buffer(p->channel, &id, t->buffer_id);
1104
1105    assert(buf);
1106
1107    uint32_t local_id = ((lvaddr_t) buf->address - buf->pool->base_address)
1108                    / buf->pool->buffer_size;
1109
1110    set_remote_bufid(buf->pool, local_id, t->buffer_id);
1111
1112    rb = msg->parts[1].opaque;
1113
1114    buf->local_ref_count--;
1115
1116    if (buf->state == BULK_BUFFER_RO_OWNED && bulk_buffer_can_release(buf)) {
1117        err = bulk_buffer_change_state(buf, BULK_BUFFER_READ_WRITE);
1118        assert(!err_is_fail(err));
1119    }
1120
1121    p->channel->callbacks->copy_released(p->channel, buf);
1122
1123    bulk_net_transfer_free_rb(&p->net_ctrl, rb);
1124}
1125
1126/* ---------------------------- status message ----------------------------- */
1127static void send_status_msg(void)
1128{
1129
1130}
1131
1132static void handle_status_msg(struct bulk_net_nocopy *p,
1133                              struct proto_trail_status *t,
1134                              struct bulk_net_msgdesc *msg)
1135{
1136
1137}
1138
1139/* ------------------------ network managements ---------------------------- */
1140static void tcb_transmitted(struct bulk_e10k *bu, void *opaque)
1141{
1142    BNT_DEBUG_TRACE
1143
1144    struct bulk_net_nocopy *p = bu->opaque;
1145    struct transmit_buffer *tb = opaque;
1146
1147    if (opaque == NULL) {
1148        // We can ignore the header buffers
1149        return;
1150    }
1151
1152    if (tb->buffer != NULL) {
1153        if (tb->cont.handler) {
1154            tb->cont.handler(tb->cont.arg, SYS_ERR_OK, p->channel);
1155        }
1156        tb->buffer = NULL;
1157        tb->cont = BULK_CONT_NOP;
1158    }
1159
1160    stack_alloc_free(&p->net_ctrl.tb_stack, tb);
1161}
1162
1163static void tcb_received(struct bulk_e10k* bu, struct bulk_net_msgdesc *msg)
1164{
1165    BNT_DEBUG_TRACE
1166
1167    struct bulk_net_nocopy *p = bu->opaque;
1168    size_t i;
1169    struct receive_buffer *rb;
1170    uint8_t *t;
1171
1172    assert(msg->parts[0].size == sizeof(struct packet_header));
1173    bulk_net_transfer_strip_padding(msg);
1174
1175    for (i = 0; i < BULK_NET_DESCLEN && msg->parts[i].size != 0; i++)
1176        ;
1177    i--;
1178
1179    rb = msg->parts[i].opaque;
1180    t = rb->virt;
1181
1182    switch (t[msg->parts[i].size - 1]) {
1183        case PROTO_BIND_REQUEST:
1184            handle_bind_request(
1185                            p,
1186                            (struct proto_trail_bind_req *) (t
1187                                            + msg->parts[i].size
1188                                            - sizeof(struct proto_trail_bind_req)),
1189                            msg);
1190            break;
1191        case PROTO_BIND_RESPONSE:
1192            handle_bind_response(
1193                            p,
1194                            (struct proto_trail_bind_resp *) (t
1195                                            + msg->parts[i].size
1196                                            - sizeof(struct proto_trail_bind_resp)),
1197                            msg);
1198            break;
1199        case PROTO_POOL_REQUEST:
1200            handle_pool_assign_request(
1201                            p,
1202                            (struct proto_trail_pool_req *) (t
1203                                            + msg->parts[i].size
1204                                            - sizeof(struct proto_trail_pool_req)),
1205                            msg);
1206            break;
1207        case PROTO_POOL_RESPONSE:
1208            handle_pool_assign_response(
1209                            p,
1210                            (struct proto_trail_pool_resp *) (t
1211                                            + msg->parts[i].size
1212                                            - sizeof(struct proto_trail_pool_resp)),
1213                            msg);
1214            break;
1215        case PROTO_BUFFER_MOVE:
1216            handle_buffer_move(
1217                            p,
1218                            (struct proto_trail_move *) (t + msg->parts[i].size
1219                                            - sizeof(struct proto_trail_move)),
1220                            msg);
1221            break;
1222        case PROTO_BUFFER_COPY:
1223            handle_buffer_copy(
1224                            p,
1225                            (struct proto_trail_copy *) (t + msg->parts[i].size
1226                                            - sizeof(struct proto_trail_copy)),
1227                            msg);
1228            break;
1229        case PROTO_BUFFER_PASS:
1230            handle_buffer_pass(
1231                            p,
1232                            (struct proto_trail_pass *) (t + msg->parts[i].size
1233                                            - sizeof(struct proto_trail_pass)),
1234                            msg);
1235            break;
1236        case PROTO_BUFFER_RELEASE:
1237            handle_buffer_release(
1238                            p,
1239                            (struct proto_trail_release *) (t
1240                                            + msg->parts[i].size
1241                                            - sizeof(struct proto_trail_release)),
1242                            msg);
1243            break;
1244        case PROTO_STATUS:
1245            handle_status_msg(
1246                            p,
1247                            (struct proto_trail_status *) (t
1248                                            + msg->parts[i].size
1249                                            - sizeof(struct proto_trail_status)),
1250                            msg);
1251            break;
1252        default:
1253            USER_PANIC("Unsupported Request")
1254            break;
1255
1256    }
1257}
1258
1259/* --------------------- implementation callbacks -------------------------- */
1260
1261static errval_t impl_channel_create(struct bulk_channel *channel)
1262{
1263    BNT_DEBUG_TRACE
1264
1265    errval_t err;
1266
1267    BNT_STATUS("Creating new bulk channel [%p] using net.no-copy backend",
1268               channel);
1269
1270    struct bulk_net_nocopy *p = calloc(1, sizeof(struct bulk_net_nocopy));
1271    if (p == NULL) {
1272        return BULK_TRANSFER_MEM;
1273    }
1274
1275    struct bulk_net_endpoint_descriptor *ep =
1276        (struct bulk_net_endpoint_descriptor *) channel->ep;
1277
1278    p->net_ctrl.card = ep->cardname;
1279    p->net_ctrl.l_port = ep->port;
1280    p->net_ctrl.queue = ep->queue;
1281    p->net_ctrl.ws = channel->waitset;
1282    p->net_ctrl.buffer_size = BULK_NET_CTRL_CHANNEL_BUF_SIZE;
1283    /* this is the control channel, has just two buffers */
1284    p->net_ctrl.buffer_count = ep->buffer_count;
1285    p->net_ctrl.max_queues = ep->max_queues;
1286    p->net_ctrl.num_queues = 1;
1287
1288    p->bulk_control = p;
1289
1290    err = bulk_net_transfer_export(&p->net_ctrl, tcb_transmitted, tcb_received);
1291    if (err_is_fail(err)) {
1292        free(p);
1293        return err;
1294    }
1295
1296    p->net_ctrl.buffer_size = ep->buffer_size;
1297
1298    channel->state = BULK_STATE_BINDING;
1299    p->channel = channel;
1300    channel->impl_data = p;
1301
1302    return err;
1303}
1304
1305static errval_t impl_channel_bind(struct bulk_channel *channel,
1306                                  struct bulk_continuation cont)
1307{
1308    BNT_DEBUG_TRACE
1309
1310    errval_t err;
1311
1312    BNT_STATUS("Binding new bulk channel [%p] using net.no-copy backend",
1313               channel);
1314
1315    struct bulk_net_nocopy *bnt = calloc(1, sizeof(struct bulk_net_nocopy));
1316    if (!bnt) {
1317        return BULK_TRANSFER_MEM;
1318    }
1319
1320    struct bulk_net_endpoint_descriptor *ep =
1321        (struct bulk_net_endpoint_descriptor *) channel->ep;
1322
1323    bnt->net_ctrl.card = ep->cardname;
1324    bnt->net_ctrl.r_port = ep->port;
1325    bnt->net_ctrl.r_ip = ep->ip.addr;   ///XXX: IP already in network byte order
1326    bnt->net_ctrl.queue = ep->queue;
1327    bnt->net_ctrl.ws = channel->waitset;
1328    bnt->net_ctrl.buffer_size = BULK_NET_CTRL_CHANNEL_BUF_SIZE;
1329    /* this is the control channel, has just two buffers */
1330    bnt->net_ctrl.buffer_count = ep->buffer_count;
1331    bnt->net_ctrl.max_queues = ep->max_queues;
1332    bnt->net_ctrl.num_queues = 1;
1333    bnt->bulk_control = bnt;
1334
1335    err = bulk_net_transfer_bind(&bnt->net_ctrl, tcb_transmitted, tcb_received);
1336    if (err_is_fail(err)) {
1337        free(bnt);
1338        return err;
1339    }
1340
1341    bnt->net_ctrl.buffer_size = ep->buffer_size;
1342
1343    channel->impl_data = bnt;
1344    bnt->channel = channel;
1345
1346    channel->state = BULK_STATE_BINDING;
1347
1348    bnt->bind_cont = cont;
1349
1350    send_bind_request(bnt, bnt->net_ctrl.buffer_size, channel->trust,
1351                      channel->role);
1352    send_status_msg();
1353
1354    return SYS_ERR_OK;
1355}
1356
1357static errval_t impl_channel_assign_pool(struct bulk_channel *channel,
1358                                         struct bulk_pool *pool,
1359                                         struct bulk_continuation cont)
1360{
1361    BNT_DEBUG_TRACE
1362
1363    errval_t err;
1364    struct bulk_net_nocopy *bnt, *p;
1365    struct bulk_pool_internal *pool_int = (struct bulk_pool_internal *) pool;
1366
1367    p = get_net_nocopy(pool);
1368    if (p) {
1369        return BULK_TRANSFER_NET_POOL_USED;
1370    }
1371
1372    bnt = (struct bulk_net_nocopy *) channel->impl_data;
1373
1374    /* need to take the control channel for this */
1375    assert(bnt->bulk_control == bnt);
1376
1377    if (bnt->net_ctrl.buffer_size != pool->buffer_size) {
1378        return BULK_TRANSFER_ALLOC_BUFFER_SIZE;
1379    }
1380
1381    if (bnt->net_ctrl.num_queues == bnt->net_ctrl.max_queues) {
1382        return BULK_TRANSFER_NET_MAX_QUEUES;
1383    }
1384
1385    uint8_t queueid = bnt->net_ctrl.queue + bnt->net_ctrl.num_queues;
1386    bnt->net_ctrl.num_queues++;
1387
1388    /* allocate a new queue for this pool */
1389    p = calloc(1,
1390               sizeof(struct bulk_net_nocopy)
1391                               + pool->num_buffers
1392                                               * sizeof(struct receive_buffer));
1393
1394    p->meta_rb = (struct receive_buffer *) (p + 1);
1395    err = bulk_net_init_meta_rb(p->meta_rb, pool->num_buffers,
1396                                pool->buffer_size);
1397    assert(!err_is_fail(err));
1398
1399    memcpy(&p->net_ctrl, &bnt->net_ctrl, sizeof(p->net_ctrl));
1400    p->net_ctrl.queue = queueid;
1401    p->net_ctrl.buffer_count = 0;
1402
1403    err = bulk_net_transfer_bind(&p->net_ctrl, tcb_transmitted, tcb_received);
1404    if (err_is_fail(err)) {
1405        free(p);
1406        return err;
1407    }
1408
1409    p->channel = channel;
1410    p->pool = pool;
1411    p->bulk_control = bnt;
1412    p->bind_cont = cont;
1413
1414    assert(!pool_int->impl_data);
1415
1416    size_t pd_size = sizeof(struct bulk_net_pool_data)
1417                    + 2 * pool->num_buffers * sizeof(uint32_t);
1418
1419    struct bulk_net_pool_data *pd = malloc(pd_size);
1420
1421    if (!pd) {
1422        free(p);
1423        /* TODO: Free network resources */
1424        return BULK_TRANSFER_MEM;
1425    }
1426
1427    pd->buf_id_local_to_remote = (uint32_t *) (pd + 1);
1428    pd->buf_id_remote_to_local =
1429        (pd->buf_id_local_to_remote + pool->num_buffers);
1430    for (uint32_t i = 0; i < pool->num_buffers; ++i) {
1431        pd->buf_id_local_to_remote[i] = i;
1432        pd->buf_id_remote_to_local[i] = i;
1433    }
1434    pd->p = p;
1435    pool_int->impl_data = pd;
1436
1437    struct pending_pool_request *req = malloc(
1438                    sizeof(struct pending_pool_request));
1439    if (!req) {
1440        free(p);
1441        free(pd);
1442        /* TODO: free network resources */
1443        return BULK_TRANSFER_MEM;
1444    }
1445
1446    req->cont = cont;
1447    req->pool = pool;
1448    req->bnt = p;
1449    if (bnt->pending_pool_requests) {
1450        req->next = bnt->pending_pool_requests->next;
1451    } else {
1452        req->next = NULL;
1453    }
1454    bnt->pending_pool_requests = req;
1455
1456    send_pool_assign_request(bnt, pool, p->net_ctrl.l_port);
1457
1458    return SYS_ERR_OK;
1459}
1460
1461static errval_t impl_channel_move(struct bulk_channel *channel,
1462                                  struct bulk_buffer *buffer,
1463                                  void *meta,
1464                                  struct bulk_continuation cont)
1465{
1466    struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
1467    struct bulk_net_pool_data *pd = pool->impl_data;
1468    struct bulk_net_nocopy *bnt = pd->p;
1469
1470    send_buffer_move(bnt, buffer, meta, cont);
1471    return SYS_ERR_OK;
1472}
1473
1474/**
1475 *
1476 */
1477static errval_t impl_channel_pass(struct bulk_channel *channel,
1478                                  struct bulk_buffer *buffer,
1479                                  void *meta,
1480                                  struct bulk_continuation cont)
1481{
1482    errval_t err;
1483
1484    struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
1485    struct bulk_net_pool_data *pd = pool->impl_data;
1486    struct bulk_net_nocopy *bnt = pd->p;
1487    struct bulk_net_nocopy *p = channel->impl_data;
1488
1489    if (channel->direction == BULK_DIRECTION_TX) {
1490        return BULK_TRANSFER_CHAN_DIRECTION;
1491    }
1492
1493    assert(bnt != NULL);
1494
1495    struct receive_buffer *rb;
1496    rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
1497    assert(rb != NULL);
1498
1499    rb->virt = buffer->address;
1500    rb->phys = buffer->phys;
1501    rb->buffer = buffer;
1502
1503    err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
1504    if (err_is_fail(err)) {
1505        return err;
1506    }
1507    uint32_t local_id = get_local_bufid(buffer);
1508    rb = bnt->meta_rb + local_id;
1509    err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
1510    if (err_is_fail(err)) {
1511        return err;
1512    }
1513
1514    /* send the buffer pass over the control channel */
1515    send_buffer_pass(p, buffer, meta, cont);
1516    return SYS_ERR_OK;
1517}
1518
1519static errval_t impl_channel_copy(struct bulk_channel *channel,
1520                                  struct bulk_buffer *buffer,
1521                                  void *meta,
1522                                  struct bulk_continuation cont)
1523{
1524    struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
1525    struct bulk_net_pool_data *pd = pool->impl_data;
1526    struct bulk_net_nocopy *bnt = pd->p;
1527
1528    send_buffer_copy(bnt, buffer, meta, cont);
1529    return SYS_ERR_OK;
1530}
1531
1532static errval_t impl_channel_release(struct bulk_channel *channel,
1533                                     struct bulk_buffer *buffer,
1534                                     struct bulk_continuation cont)
1535{
1536    errval_t err;
1537
1538    struct bulk_pool_internal *pool = (struct bulk_pool_internal *) buffer->pool;
1539    struct bulk_net_pool_data *pd = pool->impl_data;
1540    struct bulk_net_nocopy *bnt = pd->p;
1541    struct bulk_net_nocopy *p = channel->impl_data;
1542
1543    if (channel->direction == BULK_DIRECTION_TX) {
1544        return BULK_TRANSFER_CHAN_DIRECTION;
1545    }
1546
1547    struct receive_buffer *rb;
1548    rb = stack_alloc_alloc(&bnt->net_ctrl.rb_stack);
1549    assert(rb != NULL);
1550
1551    rb->virt = buffer->address;
1552    rb->phys = buffer->phys;
1553    rb->buffer = buffer;
1554
1555    err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
1556    if (err_is_fail(err)) {
1557        return err;
1558    }
1559    uint32_t local_id = get_local_bufid(buffer);
1560    rb = bnt->meta_rb + local_id;
1561    err = bulk_e10k_rx_add(&bnt->net_ctrl.transfer, rb->phys, rb->hdr_phys, rb);
1562    if (err_is_fail(err)) {
1563        return err;
1564    }
1565    /* send the buffer pass over the control channel */
1566    send_buffer_release(p, buffer, bnt->zero_meta, cont);
1567    return SYS_ERR_OK;
1568}
1569
1570static struct bulk_implementation bulk_net_implementation = {
1571    .channel_create = impl_channel_create,
1572    .channel_bind = impl_channel_bind,
1573    .assign_pool = impl_channel_assign_pool,
1574    .move = impl_channel_move,
1575    .pass = impl_channel_pass,
1576    .copy = impl_channel_copy,
1577    .release = impl_channel_release };
1578
1579struct bulk_implementation *bulk_net_get_impl_no_copy(void)
1580{
1581    return &bulk_net_implementation;
1582}
1583