1/*
2 * Copyright (c) 2014 ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <barrelfish/barrelfish.h>
11#include <flounder/flounder.h>
12#include <flounder/flounder_txqueue.h>
13
14static void send_cont(void *arg)
15{
16    TXQ_DEBUG("send_cont: sending message %p\n", arg);
17
18    struct txq_msg_st *st = arg;
19    errval_t err = st->send(st);
20    /* this function is called form regsiter_send callback sending shoudl not fail*/
21    if (err_is_fail(err)) {
22        USER_PANIC_ERR(err, "failed to send even in register_send callback\n");
23    }
24}
25
26static inline void try_send(struct txq_msg_st *st)
27{
28    struct tx_queue *q = st->queue;
29    assert(q->head == st);
30
31    errval_t err = st->send(st);
32
33    TXQ_DEBUG("try_send: st=%p, #queued=%u, err=%s\n", st, q->queue_count,
34              err_getstring(err));
35
36    switch (err_no(err)) {
37        case SYS_ERR_OK:
38            return;
39        case FLOUNDER_ERR_TX_BUSY:
40            /* registering here should not fail */
41            err = q->register_send(q->binding, q->waitset,
42                                   MKCONT(send_cont, q->head));
43            if (err_is_fail(err)) {
44                USER_PANIC_ERR(err, "could not register.\n");
45            }
46            break;
47        default:
48            USER_PANIC_ERR(err, "error while sending");
49            break;
50    }
51}
52
53/*
54 * ===========================================================================
55 * Public Interface
56 * ===========================================================================
57 */
58
59/**
60 * \brief initializes a tx_queue to be used for a Flounder binding
61 *
62 * \param queue         TX queue to be initialized
63 * \param binding       Flounder binding
64 * \param waitset       the waitset to be used
65 * \param register_send register send function of the binding
66 * \param can_send      can send function of the binding
67 * \param msg_st_size   size of the message state elements of this queue
68 */
69void txq_init(struct tx_queue *queue,
70              void *binding,
71              struct waitset *waitset,
72              txq_register_fn_t register_send,
73              uint32_t msg_st_size)
74{
75    assert(msg_st_size >= sizeof(struct txq_msg_st));
76    assert(binding);
77    assert(waitset);
78    assert(register_send);
79
80    queue->binding = binding;
81    queue->register_send = register_send;
82    queue->msg_st_size = msg_st_size;
83    queue->waitset = waitset;
84    queue->head = NULL;
85    queue->tail = NULL;
86    queue->free = NULL;
87#ifdef FLOUNDER_TXQUEUE_DEBUG
88    queue->alloc_count = 0;
89    queue->free_count = 0;
90    queue->queue_count = 0;
91#endif
92}
93
94/**
95 * \brief allocates new message state for an outgoing flounder message
96 *
97 * \param txq   TX queue to allocate from
98 *
99 * \returns mx_mst_st on success
100 *          NULL on failure
101 */
102struct txq_msg_st *txq_msg_st_alloc(struct tx_queue *txq)
103{
104    struct txq_msg_st *st;
105
106    if (txq->free != NULL) {
107        st = txq->free;
108        txq->free = st->next;
109
110        TXQ_ASSERT(txq->free_count);
111        TXQ_OP(txq->free_count--);
112        TXQ_DEBUG("txq_mst_st_alloc: reusing msg st %p. free [%u / %u]\n", st,
113                  txq->free_count, txq->alloc_count);
114
115        st->next = NULL;
116        return st;
117    }
118
119    TXQ_OP(txq->alloc_count++);
120    TXQ_ASSERT(txq->free_count == 0);
121
122    st = calloc(1, txq->msg_st_size);
123    if (st == NULL) {
124        return NULL;
125    }
126
127    TXQ_DEBUG("txq_msg_st_alloc: allocating new msg st %p. #allocated: %u\n", st,
128              txq->alloc_count);
129
130    st->queue = txq;
131
132    return st;
133}
134
135/**
136 * \brief frees up an unused message state
137 *
138 * \param st    txq message state to be freed
139 */
140void txq_msg_st_free(struct txq_msg_st *st)
141{
142    struct tx_queue *q = st->queue;
143
144    TXQ_ASSERT(!(q->free_count == 0) || (q->free == NULL));
145    TXQ_OP(st->queue->free_count++);
146    TXQ_DEBUG("txq_mst_st_free: %p, free msg states %p [ %u / %u ]\n", st, q->free,
147              st->queue->free_count, st->queue->alloc_count);
148
149    st->next = q->free;
150    q->free = st;
151}
152
153/**
154 * \brief handler to be called when the message was sent successfully
155 *
156 * \param st    TX queue message state
157 */
158void txq_sent_cb(void *arg)
159{
160    errval_t err;
161
162    struct txq_msg_st *st = arg;
163    struct tx_queue *q = st->queue;
164    assert(q->head == st);
165    TXQ_OP(q->queue_count--);
166
167    if (st->cleanup) {
168        st->cleanup(st);
169    }
170
171    q->head = q->head->next;
172
173    if (q->head == NULL) {
174        q->tail = NULL;
175
176        TXQ_ASSERT(q->queue_count == 0);
177        TXQ_DEBUG("txq_sent_cb: st=%p, head=%p, tail=%p, count=%u\n", st, q->head,
178                  q->tail, q->queue_count);
179    } else {
180        TXQ_ASSERT(q->queue_count);
181        TXQ_DEBUG("txq_sent_cb: st=%p, register sending head=%p, tail=%p, count=%u\n",
182                  st, q->head, q->tail, q->queue_count);
183
184        err = q->register_send(q->binding, q->waitset, MKCONT(send_cont, q->head));
185        if (err_is_fail(err)) {
186            USER_PANIC_ERR(err, "could not register for sending!\n");
187        }
188    }
189
190    txq_msg_st_free(st);
191}
192
193/**
194 * \brief sends the Flounder message
195 *
196 * \param st    txq message state of the message
197 */
198void txq_send(struct txq_msg_st *st)
199{
200    assert(st->queue);
201
202    struct tx_queue *q = st->queue;
203
204    st->next = NULL;
205    if (q->tail == NULL) {
206        q->head = st;
207        TXQ_ASSERT(q->queue_count == 0);
208    } else {
209        q->tail->next = st;
210        TXQ_ASSERT(q->queue_count);
211    }
212    q->tail = st;
213
214    TXQ_OP(q->queue_count++);
215    TXQ_DEBUG("txq_send: st=%p, head=%p, tail=%p, count=%u\n", st, q->head, q->tail,
216              q->queue_count);
217
218    if (q->tail == q->head) {
219        TXQ_ASSERT(q->queue_count == 1);
220        try_send(st);
221    }
222}
223