1/*
2 * Copyright 2019, Data61
3 * Commonwealth Scientific and Industrial Research Organisation (CSIRO)
4 * ABN 41 687 119 230.
5 *
6 * This software may be distributed and modified according to the terms of
7 * the GNU General Public License version 2. Note that NO WARRANTY is provided.
8 * See "LICENSE_GPLv2.txt" for details.
9 *
10 * @TAG(DATA61_GPL)
11 */
12
13#include <autoconf.h>
14
15#include <string.h>
16#include <pico_stack.h>
17#include <pico_socket.h>
18#include <pico_device.h>
19#include <pico_addressing.h>
20#include <pico_ipv4.h>
21#undef PACKED
22#include <sel4/sel4.h>
23#include <utils/util.h>
24#include <sel4utils/sel4_zf_logif.h>
25#include "picoserver_client.h"
26#include "picoserver_socket.h"
27#include <picoserver_event.h>
28#include <picoserver_peer.h>
29#include <platsupport/io.h>
30#include <picotcp-socket-sync.h>
31#include <virtqueue.h>
32#include <camkes/virtqueue.h>
33
34/*
35 * Functions exposed by the connection from the interfaces.
36 */
37seL4_Word pico_control_get_sender_id(void);
38void pico_control_emit(unsigned int);
39int pico_control_largest_badge(void);
40
41seL4_Word pico_recv_get_sender_id(void);
42void *pico_recv_buf(seL4_Word);
43size_t pico_recv_buf_size(seL4_Word);
44seL4_Word pico_recv_enumerate_badge(unsigned int);
45
46seL4_Word pico_send_get_sender_id(void);
47void *pico_send_buf(seL4_Word);
48size_t pico_send_buf_size(seL4_Word);
49seL4_Word pico_send_enumerate_badge(unsigned int);
50
51virtqueue_device_t tx_virtqueue;
52virtqueue_device_t rx_virtqueue;
53
54int num_clients;
55int emit_client;
56int emit_client_async;
57
58/*
59 * Gets the client's ID and checks that it is valid.
60 */
61static inline seL4_Word client_check(void)
62{
63    /* Client IDs start from one to avoid using the zero badge */
64    seL4_Word client_id = pico_control_get_sender_id();
65    ZF_LOGF_IF(client_id >= num_clients, "Client ID is greater than the number of clients registered!");
66    return client_id;
67}
68
69/*
70 * Performs the common operations in the various control RPC calls.
71 * This includes error checking and fetching the client's socket structure.
72 */
73static int server_control_common(seL4_Word client_id, int socket_fd, picoserver_socket_t **ret_socket)
74{
75    if (socket_fd < 0) {
76        return -1;
77    }
78
79    *ret_socket = client_get_socket(client_id, socket_fd);
80    if (*ret_socket == NULL) {
81        return -1;
82    }
83
84    return 0;
85}
86
87/*
88 * Performs the common operations between the send and receive RPC calls.
89 * These include error checking, fetching the client's bookkeeping structures,
90 * their sockets etc.
91 */
92static int server_communication_common(seL4_Word client_id, int socket_fd, int len, int buffer_offset,
93                                       size_t buffer_size, picoserver_socket_t **ret_socket,
94                                       void **ret_buffer)
95{
96    ZF_LOGF_IF(ret_socket == NULL || ret_buffer == NULL, "Passed in NULL for ret_socket or ret_buffer");
97
98    if (socket_fd < 0 || len < 0 || buffer_offset < 0) {
99        return -1;
100    }
101
102    if ((buffer_offset + len) > buffer_size) {
103        /* Make sure we don't overflow the buffer */
104        return -1;
105    }
106
107    *ret_socket = client_get_socket(client_id, socket_fd);
108    if (*ret_socket == NULL) {
109        return -1;
110    }
111
112    *ret_buffer += buffer_offset;
113
114    return 0;
115}
116
117
118static void tx_complete(void *cookie, int len);
119static void rx_complete(void *cookie, int len);
120static void tx_socket(picoserver_socket_t *client_socket);
121static void rx_socket(picoserver_socket_t *client_socket);
122static void tx_queue_handle(void);
123static void rx_queue_handle(void);
124
125static void socket_cb(uint16_t ev, struct pico_socket *s)
126{
127    /* ZF_LOGE("\033[32mpico_socket addr = %x, ev = %d\033[0m", s, ev); */
128
129    /* Find the picoserver_socket struct that houses the pico_socket */
130    picoserver_socket_t *client_socket = client_get_socket_by_addr(s);
131
132    if (client_socket == NULL && s != NULL) {
133        /*
134         * For some reason, if pico_socket_listen is called to listen on
135         * a socket and when a client connects to the socket, PicoTCP will
136         * allocate another socket structure that will process callbacks even
137         * though we have not called pico_socket_accept. This results in a
138         * situation where we try to retrieve a socket from our hash table
139         * without having registed it in the first place. The solution now is
140         * to just ignore it and this shouldn't be a problem as PicoTCP does
141         * not allow interactions without accepting the client's connection.
142         *
143         *
144         *
145         * The dangling callback may also happen when the client calls
146         * pico_control_close instead of pico_control_shutdown +
147         * pico_control_close.
148         */
149        return;
150    }
151    if (client_socket->async_transport) {
152        if (ev & PICO_SOCK_EV_RD) {
153            ev &= ~PICO_SOCK_EV_RD;
154            rx_queue_handle();
155            rx_socket(client_socket);
156        }
157        if (ev & PICO_SOCK_EV_WR) {
158            ev &= ~PICO_SOCK_EV_WR;
159            tx_queue_handle();
160            tx_socket(client_socket);
161        }
162    }
163    if (ev) {
164        seL4_Word client_id = client_socket->client_id;
165        int ret = client_put_event(client_id, client_socket->socket_fd, ev);
166        ZF_LOGF_IF(ret == -1, "Failed to set the event flags for client %"PRIuPTR"'s socket %d",
167                   client_id + 1, client_socket->socket_fd);
168
169        emit_client = 1;
170    }
171}
172
173int pico_control_open(bool is_udp)
174{
175    seL4_Word client_id = client_check();
176    picoserver_socket_t *new_socket = calloc(1, sizeof(picoserver_socket_t));
177    if (new_socket == NULL) {
178        ZF_LOGE("Failed to malloc memory for the picoserver struct");
179        return -1;
180    }
181
182    new_socket->client_id = client_id;
183    uint16_t protocol = (is_udp) ? PICO_PROTO_UDP : PICO_PROTO_TCP;
184    new_socket->socket = pico_socket_open(PICO_PROTO_IPV4, protocol, &socket_cb);
185    if (new_socket->socket == NULL) {
186        ZF_LOGE("Failed to open a new socket through picotcp");
187        free(new_socket);
188        return -1;
189    }
190    new_socket->protocol = protocol;
191
192    int ret = client_put_socket(client_id, new_socket);
193    if (ret == -1) {
194        ZF_LOGE("Failed to put the socket into the client's hash table");
195        pico_socket_close(new_socket->socket);
196        free(new_socket);
197        return -1;
198    }
199    new_socket->socket_fd = ret;
200
201    return ret;
202}
203
204int pico_control_bind(int socket_fd, uint32_t local_addr, uint16_t port)
205{
206    seL4_Word client_id = client_check();
207
208
209    picoserver_socket_t *client_socket = NULL;
210
211    int ret = server_control_common(client_id, socket_fd, &client_socket);
212    if (ret) {
213        return -1;
214    }
215
216    port = short_be(port);
217
218    ret = pico_socket_bind(client_socket->socket, &local_addr, &port);
219    return ret;
220}
221
222int pico_control_connect(int socket_fd, uint32_t server_addr, uint16_t port)
223{
224    seL4_Word client_id = client_check();
225
226    picoserver_socket_t *client_socket = NULL;
227
228    int ret = server_control_common(client_id, socket_fd, &client_socket);
229    if (ret) {
230        return -1;
231    }
232
233    port = short_be(port);
234
235    ret = pico_socket_connect(client_socket->socket, &server_addr, port);
236    return ret;
237}
238
239int pico_control_listen(int socket_fd, int backlog)
240{
241    seL4_Word client_id = client_check();
242
243    picoserver_socket_t *client_socket = NULL;
244
245    int ret = server_control_common(client_id, socket_fd, &client_socket);
246    if (ret) {
247        return -1;
248    }
249
250    ret = pico_socket_listen(client_socket->socket, backlog);
251    return ret;
252}
253
254picoserver_peer_t pico_control_accept(int socket_fd)
255{
256    seL4_Word client_id = client_check();
257
258    picoserver_peer_t peer = {0};
259
260    picoserver_socket_t *client_socket = NULL;
261
262    int ret = server_control_common(client_id, socket_fd, &client_socket);
263    if (ret) {
264        peer.result = -1;
265        return peer;
266    }
267
268    uint32_t peer_addr;
269    uint16_t remote_port;
270
271    struct pico_socket *socket = pico_socket_accept(client_socket->socket, &peer_addr, &remote_port);
272    if (socket == NULL) {
273        peer.result = -1;
274        return peer;
275    }
276
277    picoserver_socket_t *new_socket = calloc(1, sizeof(picoserver_socket_t));
278    if (new_socket == NULL) {
279        peer.result = -1;
280        pico_socket_close(socket);
281        return peer;
282    }
283
284    new_socket->client_id = client_id;
285    new_socket->socket = socket;
286    new_socket->protocol = PICO_PROTO_TCP;
287
288    ret = client_put_socket(client_id, new_socket);
289    if (ret == -1) {
290        peer.result = -1;
291        pico_socket_close(socket);
292        free(new_socket);
293        return peer;
294    }
295    new_socket->socket_fd = ret;
296
297    peer.result = 0;
298    peer.socket = ret;
299    peer.peer_addr = peer_addr;
300    peer.peer_port = remote_port;
301
302    return peer;
303}
304
305int pico_control_shutdown(int socket_fd, int mode)
306{
307    seL4_Word client_id = client_check();
308
309    picoserver_socket_t *client_socket = NULL;
310
311    int ret = server_control_common(client_id, socket_fd, &client_socket);
312    if (ret) {
313        return -1;
314    }
315
316    ret = pico_socket_shutdown(client_socket->socket, mode);
317    return ret;
318}
319
320static void cleanup_async_socket(picoserver_socket_t *client_socket)
321{
322    ZF_LOGF_IF(client_socket == NULL, "Invalid arg");
323    if (client_socket->async_transport) {
324        tx_msg_t *msg;
325        while (client_socket->async_transport->tx_pending_queue) {
326            ZF_LOGF_IF(client_socket->async_transport->tx_pending_queue_end == NULL, "Inconsistent queue state");
327            msg = client_socket->async_transport->tx_pending_queue;
328            msg->done_len = -1;
329            client_socket->async_transport->tx_pending_queue = msg->next;
330            if (client_socket->async_transport->tx_pending_queue_end == msg) {
331                client_socket->async_transport->tx_pending_queue_end = NULL;
332            }
333            tx_complete(msg->cookie_save, 0);
334
335        }
336        while (client_socket->async_transport->rx_pending_queue) {
337            ZF_LOGF_IF(client_socket->async_transport->rx_pending_queue_end == NULL, "Inconsistent queue state");
338            msg = client_socket->async_transport->rx_pending_queue;
339            msg->done_len = -1;
340            client_socket->async_transport->rx_pending_queue = msg->next;
341            if (client_socket->async_transport->rx_pending_queue_end == msg) {
342                client_socket->async_transport->rx_pending_queue_end = NULL;
343            }
344            rx_complete(msg->cookie_save, 0);
345
346        }
347
348        free(client_socket->async_transport);
349        client_socket->async_transport = NULL;
350    }
351}
352
353int pico_control_close(int socket_fd)
354{
355    seL4_Word client_id = client_check();
356
357    picoserver_socket_t *client_socket = NULL;
358
359    int ret = server_control_common(client_id, socket_fd, &client_socket);
360    if (ret) {
361        return -1;
362    }
363
364    cleanup_async_socket(client_socket);
365
366    ret = client_delete_socket(client_id, socket_fd);
367    return ret;
368}
369
370
371int pico_control_set_async(int socket_fd, bool enabled)
372{
373    seL4_Word client_id = client_check();
374
375    picoserver_socket_t *client_socket = NULL;
376
377    int ret = server_control_common(client_id, socket_fd, &client_socket);
378    if (ret) {
379        return -1;
380    }
381
382    if (enabled && (client_socket->async_transport == NULL)) {
383        client_socket->async_transport = calloc(1, sizeof(picoserver_socket_async_t));
384        if (client_socket->async_transport == NULL) {
385            ZF_LOGE("Failed to malloc memory for the picoserver async struct");
386            return -1;
387        }
388    } else if (!enabled && client_socket->async_transport) {
389        cleanup_async_socket(client_socket);
390    } else {
391        ZF_LOGW("pico_control_set_async called with no-op");
392    }
393    return 0;
394}
395
396
397picoserver_event_t pico_control_event_poll(void)
398{
399    seL4_Word client_id = client_check();
400
401    /* Retrieve the client's outstanding events */
402    picoserver_event_t event = {0};
403    client_get_event(client_id, &event);
404
405    return event;
406}
407
408int pico_control_get_ipv4(uint32_t *addr)
409{
410    struct pico_device *dev = pico_get_device("eth0");
411    if (dev == NULL) {
412        return -1;
413    }
414    *addr = pico_ipv4_link_by_dev(dev)->address.addr;
415
416    return 0;
417}
418
419int pico_send_write(int socket_fd, int len, int buffer_offset)
420{
421    seL4_Word client_id = client_check();
422    /*
423     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
424     * from ours
425     */
426
427    size_t buffer_size = pico_send_buf_size(pico_send_enumerate_badge(client_id));
428    void *client_buf = pico_send_buf(pico_send_enumerate_badge(client_id));
429    picoserver_socket_t *client_socket = NULL;
430
431    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
432                                          buffer_size, &client_socket, &client_buf);
433    if (ret) {
434        return -1;
435    }
436
437    ret = pico_socket_write(client_socket->socket, client_buf, len);
438    return ret;
439}
440
441int pico_send_send(int socket_fd, int len, int buffer_offset)
442{
443    seL4_Word client_id = client_check();
444    /*
445     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
446     * from ours
447     */
448    size_t buffer_size = pico_send_buf_size(pico_send_enumerate_badge(client_id));
449    void *client_buf = pico_send_buf(pico_send_enumerate_badge(client_id));
450    picoserver_socket_t *client_socket = NULL;
451
452    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
453                                          buffer_size, &client_socket, &client_buf);
454    if (ret) {
455        return -1;
456    }
457
458    ret = pico_socket_send(client_socket->socket, client_buf, len);
459    return ret;
460}
461
462int pico_send_sendto(int socket_fd, int len, int buffer_offset, uint32_t dst_addr, uint16_t remote_port)
463{
464    seL4_Word client_id = client_check();
465    /*
466     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
467     * from ours
468     */
469    size_t buffer_size = pico_send_buf_size(pico_send_enumerate_badge(client_id));
470    void *client_buf = pico_send_buf(pico_send_enumerate_badge(client_id));
471    picoserver_socket_t *client_socket = NULL;
472
473    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
474                                          buffer_size, &client_socket, &client_buf);
475    if (ret) {
476        return -1;
477    }
478
479    remote_port = short_be(remote_port);
480
481    ret = pico_socket_sendto(client_socket->socket, client_buf, len, &dst_addr, remote_port);
482    return ret;
483}
484
485int pico_recv_read(int socket_fd, int len, int buffer_offset)
486{
487    seL4_Word client_id = client_check();
488    /*
489     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
490     * from ours
491     */
492    size_t buffer_size = pico_recv_buf_size(pico_recv_enumerate_badge(client_id));
493    void *client_buf = pico_recv_buf(pico_recv_enumerate_badge(client_id));
494    picoserver_socket_t *client_socket = NULL;
495
496    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
497                                          buffer_size, &client_socket, &client_buf);
498    if (ret) {
499        return -1;
500    }
501
502    ret = pico_socket_read(client_socket->socket, client_buf, len);
503    return ret;
504}
505
506int pico_recv_recv(int socket_fd, int len, int buffer_offset)
507{
508    seL4_Word client_id = client_check();
509    /*
510     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
511     * from ours
512     */
513    size_t buffer_size = pico_recv_buf_size(pico_recv_enumerate_badge(client_id));
514    void *client_buf = pico_recv_buf(pico_recv_enumerate_badge(client_id));
515    picoserver_socket_t *client_socket = NULL;
516
517    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
518                                          buffer_size, &client_socket, &client_buf);
519    if (ret) {
520        return -1;
521    }
522
523    ret = pico_socket_recv(client_socket->socket, client_buf, len);
524
525    return ret;
526}
527
528int pico_recv_recvfrom(int socket_fd, int len, int buffer_offset, uint32_t *src_addr, uint16_t *remote_port)
529{
530    seL4_Word client_id = client_check();
531    /*
532     * client_id needs to be incremented here as the CAmkES generated interfaces are one off
533     * from ours
534     */
535    size_t buffer_size = pico_recv_buf_size(pico_recv_enumerate_badge(client_id));
536    void *client_buf = pico_recv_buf(pico_recv_enumerate_badge(client_id));
537    picoserver_socket_t *client_socket = NULL;
538
539    int ret = server_communication_common(client_id, socket_fd, len, buffer_offset,
540                                          buffer_size, &client_socket, &client_buf);
541    if (ret) {
542        return -1;
543    }
544
545    ret = pico_socket_recvfrom(client_socket->socket, client_buf, len, src_addr, remote_port);
546
547    /* Reverse the big endian port number */
548    *remote_port = short_be(*remote_port);
549    return ret;
550}
551
552static void notify_client(UNUSED seL4_Word badge, void *cookie)
553{
554    if (emit_client) {
555        pico_control_emit(1);
556        emit_client = 0;
557    }
558    if (emit_client_async) {
559        tx_virtqueue.notify();
560        emit_client_async = 0;
561    }
562}
563
564static void tx_complete(void *cookie, int len)
565{
566    virtqueue_ring_object_t handle;
567    handle.first = (uint32_t)(uintptr_t)cookie;
568    handle.cur = (uint32_t)(uintptr_t)cookie;
569    if (!virtqueue_add_used_buf(&tx_virtqueue, &handle, len)) {
570        ZF_LOGE("TX: Error while enqueuing available buffer");
571    }
572    emit_client_async = true;
573}
574
575
576static void tx_socket(picoserver_socket_t *client_socket)
577{
578    if (client_socket == NULL || client_socket->socket == NULL) {
579        ZF_LOGE("Socket is null");
580        return;
581    }
582
583    if (client_socket->async_transport == NULL) {
584        ZF_LOGE("Socket isn't setup for async");
585        return;
586    }
587
588    while (client_socket->async_transport->tx_pending_queue) {
589        ZF_LOGF_IF(client_socket->async_transport->tx_pending_queue_end == NULL, "Inconsistent queue state");
590        int ret;
591        tx_msg_t *msg = client_socket->async_transport->tx_pending_queue;
592        if (client_socket->protocol == PICO_PROTO_UDP) {
593            ret = pico_socket_sendto(client_socket->socket, msg->buf + msg->done_len, msg->total_len - msg->done_len,
594                                     &msg->src_addr, msg->remote_port);
595        } else {
596            ret = pico_socket_send(client_socket->socket, msg->buf + msg->done_len, msg->total_len - msg->done_len);
597        }
598        if (ret == -1) {
599            /* Free the internal tx buffer in case tx fails. Up to the client to retry the trasmission */
600            ZF_LOGE("tx main: This shouldn't happen.  Handle error case");
601            msg->done_len = -1;
602            client_socket->async_transport->tx_pending_queue = msg->next;
603            if (client_socket->async_transport->tx_pending_queue_end == msg) {
604                client_socket->async_transport->tx_pending_queue_end = NULL;
605            }
606            tx_complete(msg->cookie_save, 0);
607            continue;
608        }
609        if (ret < (msg->total_len - msg->done_len)) {
610            msg->done_len += ret;
611            return;
612        } else {
613            msg->done_len = msg->total_len;
614            client_socket->async_transport->tx_pending_queue = msg->next;
615            if (client_socket->async_transport->tx_pending_queue_end == msg) {
616                client_socket->async_transport->tx_pending_queue_end = NULL;
617            }
618            tx_complete(msg->cookie_save, msg->total_len);
619        }
620    }
621}
622
623
624static void tx_queue_handle(void)
625{
626    while (1) {
627
628        virtqueue_ring_object_t handle;
629
630        if (virtqueue_get_available_buf(&tx_virtqueue, &handle) == 0) {
631            break;
632        }
633        void *buf;
634        unsigned len;
635        vq_flags_t flag;
636        int more = virtqueue_gather_available(&tx_virtqueue, &handle, &buf, &len, &flag);
637        if (more == 0) {
638            ZF_LOGE("No message received");
639        }
640        tx_msg_t *msg = DECODE_DMA_ADDRESS(buf);
641        ZF_LOGF_IF(msg == NULL, "msg is null");
642        ZF_LOGF_IF((msg->total_len > 1400) || (msg->total_len == 0), "bad msg len in tx %zd", msg->total_len);
643
644        picoserver_socket_t *client_socket = client_get_socket(0, msg->socket_fd);
645        if (client_socket == NULL || client_socket->socket == NULL) {
646            ZF_LOGE("Socket is null");
647            msg->done_len = -1;
648            tx_complete((void *)(uintptr_t)handle.first, 0);
649            continue;
650        }
651
652        if (client_socket->async_transport == NULL) {
653            ZF_LOGE("Socket isn't setup for async");
654            msg->done_len = -1;
655            tx_complete((void *)(uintptr_t)handle.first, 0);
656            continue;
657        }
658
659        if (client_socket->async_transport->tx_pending_queue) {
660            ZF_LOGF_IF(client_socket->async_transport->tx_pending_queue_end == NULL, "Inconsistent queue state");
661            client_socket->async_transport->tx_pending_queue_end->next = msg;
662            client_socket->async_transport->tx_pending_queue_end = msg;
663            msg->next = NULL;
664            msg->cookie_save = (void *)(uintptr_t)handle.first;
665            continue;
666        }
667        int ret;
668        if (client_socket->protocol == PICO_PROTO_UDP) {
669            ret = pico_socket_sendto(client_socket->socket, msg->buf, msg->total_len, &msg->src_addr, msg->remote_port);
670        } else {
671            ret = pico_socket_send(client_socket->socket, msg->buf, msg->total_len);
672        }
673        if (ret == -1) {
674            /* Free the internal tx buffer in case tx fails. Up to the client to retry the trasmission */
675            ZF_LOGE("tx main: This shouldn't happen.  Handle error case: %d", pico_err);
676            msg->done_len = -1;
677            tx_complete((void *)(uintptr_t)handle.first, 0);
678            continue;
679        }
680        if (ret < msg->total_len) {
681            msg->done_len = ret;
682            msg->cookie_save = (void *)(uintptr_t)handle.first;
683            msg->next = NULL;
684            client_socket->async_transport->tx_pending_queue = msg;
685            client_socket->async_transport->tx_pending_queue_end = msg;
686        } else {
687            msg->done_len = msg->total_len;
688            tx_complete((void *)(uintptr_t)handle.first, msg->total_len);
689        }
690
691    }
692}
693
694
695
696static void rx_complete(void *cookie, int len)
697{
698    virtqueue_ring_object_t handle;
699    handle.first = (uint32_t)(uintptr_t)cookie;
700    handle.cur = (uint32_t)(uintptr_t)cookie;
701    if (!virtqueue_add_used_buf(&rx_virtqueue, &handle, len)) {
702        ZF_LOGE("RX: Error while enqueuing available buffer");
703    }
704    emit_client_async = true;
705}
706
707static void rx_socket(picoserver_socket_t *client_socket)
708{
709    if (client_socket == NULL || client_socket->socket == NULL) {
710        ZF_LOGE("Socket is null");
711        return;
712    }
713
714    if (client_socket->async_transport == NULL) {
715        ZF_LOGE("Socket isn't setup for async");
716        return;
717    }
718
719    while (client_socket->async_transport->rx_pending_queue) {
720        ZF_LOGF_IF(client_socket->async_transport->rx_pending_queue_end == NULL, "Inconsistent queue state");
721        int ret;
722        tx_msg_t *msg = client_socket->async_transport->rx_pending_queue;
723        if (client_socket->protocol == PICO_PROTO_UDP) {
724            ret = pico_socket_recvfrom(client_socket->socket, msg->buf + msg->done_len, msg->total_len - msg->done_len,
725                                       &msg->src_addr, &msg->remote_port);
726        } else {
727            ret = pico_socket_recv(client_socket->socket, msg->buf + msg->done_len, msg->total_len - msg->done_len);
728        }
729
730        if (ret == -1) {
731            /* Free the internal tx buffer in case tx fails. Up to the client to retry the trasmission */
732            ZF_LOGE("rx_socket: This shouldn't happen.  Handle error case: %d", pico_err);
733            msg->done_len = -1;
734            client_socket->async_transport->rx_pending_queue = msg->next;
735            if (client_socket->async_transport->rx_pending_queue_end == msg) {
736                client_socket->async_transport->rx_pending_queue_end = NULL;
737            }
738            rx_complete(msg->cookie_save, 0);
739            return;
740        }
741        if ((client_socket->protocol == PICO_PROTO_TCP && ret < (msg->total_len - msg->done_len)) ||
742            (client_socket->protocol == PICO_PROTO_UDP && ret == 0)) {
743            msg->done_len += ret;
744            return;
745        } else {
746            msg->done_len += ret;
747            client_socket->async_transport->rx_pending_queue = msg->next;
748            if (client_socket->async_transport->rx_pending_queue_end == msg) {
749                client_socket->async_transport->rx_pending_queue_end = NULL;
750            }
751            rx_complete(msg->cookie_save, msg->total_len);
752        }
753    }
754}
755
756static void rx_queue_handle(void)
757{
758    while (1) {
759        virtqueue_ring_object_t handle;
760
761        if (virtqueue_get_available_buf(&rx_virtqueue, &handle) == 0) {
762            break;
763        }
764        void *buf;
765        unsigned len;
766        vq_flags_t flag;
767        int more = virtqueue_gather_available(&rx_virtqueue, &handle, &buf, &len, &flag);
768        if (more == 0) {
769            ZF_LOGE("No message received");
770        }
771
772        tx_msg_t *msg = DECODE_DMA_ADDRESS(buf);
773        ZF_LOGF_IF(msg == NULL, "msg is null");
774        ZF_LOGF_IF((msg->total_len > 1400) || (msg->total_len == 0), "bad msg len in rx %zd", msg->total_len);
775
776        picoserver_socket_t *client_socket = client_get_socket(0, msg->socket_fd);
777        if (client_socket == NULL || client_socket->socket == NULL) {
778            ZF_LOGE("Socket is null");
779            msg->done_len = -1;
780            rx_complete((void *)(uintptr_t)handle.first, 0);
781            continue;
782        }
783
784        if (client_socket->async_transport == NULL) {
785            ZF_LOGE("Socket isn't setup for async");
786            msg->done_len = -1;
787            rx_complete((void *)(uintptr_t)handle.first, 0);
788            continue;
789        }
790        if (client_socket->async_transport->rx_pending_queue) {
791            ZF_LOGF_IF(client_socket->async_transport->rx_pending_queue_end == NULL, "Inconsistent queue state");
792            client_socket->async_transport->rx_pending_queue_end->next = msg;
793            client_socket->async_transport->rx_pending_queue_end = msg;
794            msg->next = NULL;
795            msg->cookie_save = (void *)(uintptr_t)handle.first;
796            continue;
797        }
798
799        int ret;
800        if (client_socket->protocol == PICO_PROTO_UDP) {
801            ret = pico_socket_recvfrom(client_socket->socket, msg->buf, msg->total_len, &msg->src_addr, &msg->remote_port);
802        } else {
803            ret = pico_socket_recv(client_socket->socket, msg->buf, msg->total_len);
804        }
805        if (ret == -1) {
806            /* Free the internal tx buffer in case tx fails. Up to the client to retry the trasmission */
807            ZF_LOGE("Picosocket_rx: This shouldn't happen.  Handle error case");
808            msg->done_len = -1;
809            rx_complete((void *)(uintptr_t)handle.first, 0);
810            continue;
811        }
812        if ((client_socket->protocol == PICO_PROTO_TCP && ret < msg->total_len) ||
813            (client_socket->protocol == PICO_PROTO_UDP && ret == 0)) {
814            msg->done_len = ret;
815            msg->cookie_save = (void *)(uintptr_t)handle.first;
816            msg->next = NULL;
817            client_socket->async_transport->rx_pending_queue = msg;
818            client_socket->async_transport->rx_pending_queue_end = msg;
819        } else {
820            msg->done_len = ret;
821            rx_complete((void *)(uintptr_t)handle.first, msg->done_len);
822        }
823    }
824}
825
826static void tx_queue_handle_irq(seL4_Word badge, void *cookie)
827{
828
829    rx_queue_handle();
830    tx_queue_handle();
831    pico_stack_tick();
832}
833
834int picotcp_socket_sync_server_init_late(register_callback_handler_fn_t callback_handler)
835{
836    callback_handler(0, "notify_client", notify_client, NULL);
837    return 0;
838}
839
840
841int picotcp_socket_sync_server_init(ps_io_ops_t *io_ops, int num_clients_,
842                                    register_callback_handler_fn_t callback_handler)
843{
844    num_clients = num_clients_;
845    picoserver_clients_init(num_clients);
846
847    seL4_Word tx_badge;
848    seL4_Word rx_badge;
849
850    int num_registered_virtqueues = camkes_virtqueue_channel_num();
851    if (num_registered_virtqueues < 2) {
852        /* Amount of virtqueues is less than expected */
853        return 0;
854    }
855
856    int tx_virtqueue_id = camkes_virtqueue_get_id_from_name("pico_tx");
857    int rx_virtqueue_id = camkes_virtqueue_get_id_from_name("pico_rx");
858
859    if (tx_virtqueue_id == -1 || rx_virtqueue_id == -1) {
860        /* We don't have the virtqueues we expect */
861        return 0;
862    }
863
864    /* Initialise read virtqueue */
865    int error = camkes_virtqueue_device_init_with_recv(&tx_virtqueue, (unsigned) tx_virtqueue_id,
866                                                       NULL, &tx_badge);
867    if (error) {
868        ZF_LOGE("Unable to initialise serial server read virtqueue");
869    }
870    /* Initialise write virtqueue */
871    error = camkes_virtqueue_device_init_with_recv(&rx_virtqueue, (unsigned) rx_virtqueue_id,
872                                                   NULL, &rx_badge);
873    if (error) {
874        ZF_LOGE("Unable to initialise serial server write virtqueue");
875    }
876    error = callback_handler(tx_badge, "client_event_handler", tx_queue_handle_irq, NULL);
877    if (error) {
878        ZF_LOGE("Unable to register handler");
879    }
880
881    return 0;
882}
883