1/* 2 * Copyright (c) 2014 ETH Zurich. 3 * All rights reserved. 4 * 5 * This file is distributed under the terms in the attached LICENSE file. 6 * If you do not find this file, copies can be found by writing to: 7 * ETH Zurich D-INFK, Universitaetsstrasse 6, CH-8092 Zurich. Attn: Systems Group. 8 */ 9 10#include <barrelfish/barrelfish.h> 11#include <flounder/flounder.h> 12#include <flounder/flounder_txqueue.h> 13 14static void send_cont(void *arg) 15{ 16 TXQ_DEBUG("send_cont: sending message %p\n", arg); 17 18 struct txq_msg_st *st = arg; 19 errval_t err = st->send(st); 20 /* this function is called form regsiter_send callback sending shoudl not fail*/ 21 if (err_is_fail(err)) { 22 USER_PANIC_ERR(err, "failed to send even in register_send callback\n"); 23 } 24} 25 26static inline void try_send(struct txq_msg_st *st) 27{ 28 struct tx_queue *q = st->queue; 29 assert(q->head == st); 30 31 errval_t err = st->send(st); 32 33 TXQ_DEBUG("try_send: st=%p, #queued=%u, err=%s\n", st, q->queue_count, 34 err_getstring(err)); 35 36 switch (err_no(err)) { 37 case SYS_ERR_OK: 38 return; 39 case FLOUNDER_ERR_TX_BUSY: 40 /* registering here should not fail */ 41 err = q->register_send(q->binding, q->waitset, 42 MKCONT(send_cont, q->head)); 43 if (err_is_fail(err)) { 44 USER_PANIC_ERR(err, "could not register.\n"); 45 } 46 break; 47 default: 48 USER_PANIC_ERR(err, "error while sending"); 49 break; 50 } 51} 52 53/* 54 * =========================================================================== 55 * Public Interface 56 * =========================================================================== 57 */ 58 59/** 60 * \brief initializes a tx_queue to be used for a Flounder binding 61 * 62 * \param queue TX queue to be initialized 63 * \param binding Flounder binding 64 * \param waitset the waitset to be used 65 * \param register_send register send function of the binding 66 * \param can_send can send function of the binding 67 * \param msg_st_size size of the message state elements of this queue 68 */ 69void txq_init(struct tx_queue *queue, 70 void *binding, 71 struct waitset *waitset, 72 txq_register_fn_t register_send, 73 uint32_t msg_st_size) 74{ 75 assert(msg_st_size >= sizeof(struct txq_msg_st)); 76 assert(binding); 77 assert(waitset); 78 assert(register_send); 79 80 queue->binding = binding; 81 queue->register_send = register_send; 82 queue->msg_st_size = msg_st_size; 83 queue->waitset = waitset; 84 queue->head = NULL; 85 queue->tail = NULL; 86 queue->free = NULL; 87#ifdef FLOUNDER_TXQUEUE_DEBUG 88 queue->alloc_count = 0; 89 queue->free_count = 0; 90 queue->queue_count = 0; 91#endif 92} 93 94/** 95 * \brief allocates new message state for an outgoing flounder message 96 * 97 * \param txq TX queue to allocate from 98 * 99 * \returns mx_mst_st on success 100 * NULL on failure 101 */ 102struct txq_msg_st *txq_msg_st_alloc(struct tx_queue *txq) 103{ 104 struct txq_msg_st *st; 105 106 if (txq->free != NULL) { 107 st = txq->free; 108 txq->free = st->next; 109 110 TXQ_ASSERT(txq->free_count); 111 TXQ_OP(txq->free_count--); 112 TXQ_DEBUG("txq_mst_st_alloc: reusing msg st %p. free [%u / %u]\n", st, 113 txq->free_count, txq->alloc_count); 114 115 st->next = NULL; 116 return st; 117 } 118 119 TXQ_OP(txq->alloc_count++); 120 TXQ_ASSERT(txq->free_count == 0); 121 122 st = calloc(1, txq->msg_st_size); 123 if (st == NULL) { 124 return NULL; 125 } 126 127 TXQ_DEBUG("txq_msg_st_alloc: allocating new msg st %p. #allocated: %u\n", st, 128 txq->alloc_count); 129 130 st->queue = txq; 131 132 return st; 133} 134 135/** 136 * \brief frees up an unused message state 137 * 138 * \param st txq message state to be freed 139 */ 140void txq_msg_st_free(struct txq_msg_st *st) 141{ 142 struct tx_queue *q = st->queue; 143 144 TXQ_ASSERT(!(q->free_count == 0) || (q->free == NULL)); 145 TXQ_OP(st->queue->free_count++); 146 TXQ_DEBUG("txq_mst_st_free: %p, free msg states %p [ %u / %u ]\n", st, q->free, 147 st->queue->free_count, st->queue->alloc_count); 148 149 st->next = q->free; 150 q->free = st; 151} 152 153/** 154 * \brief handler to be called when the message was sent successfully 155 * 156 * \param st TX queue message state 157 */ 158void txq_sent_cb(void *arg) 159{ 160 errval_t err; 161 162 struct txq_msg_st *st = arg; 163 struct tx_queue *q = st->queue; 164 assert(q->head == st); 165 TXQ_OP(q->queue_count--); 166 167 if (st->cleanup) { 168 st->cleanup(st); 169 } 170 171 q->head = q->head->next; 172 173 if (q->head == NULL) { 174 q->tail = NULL; 175 176 TXQ_ASSERT(q->queue_count == 0); 177 TXQ_DEBUG("txq_sent_cb: st=%p, head=%p, tail=%p, count=%u\n", st, q->head, 178 q->tail, q->queue_count); 179 } else { 180 TXQ_ASSERT(q->queue_count); 181 TXQ_DEBUG("txq_sent_cb: st=%p, register sending head=%p, tail=%p, count=%u\n", 182 st, q->head, q->tail, q->queue_count); 183 184 err = q->register_send(q->binding, q->waitset, MKCONT(send_cont, q->head)); 185 if (err_is_fail(err)) { 186 USER_PANIC_ERR(err, "could not register for sending!\n"); 187 } 188 } 189 190 txq_msg_st_free(st); 191} 192 193/** 194 * \brief sends the Flounder message 195 * 196 * \param st txq message state of the message 197 */ 198void txq_send(struct txq_msg_st *st) 199{ 200 assert(st->queue); 201 202 struct tx_queue *q = st->queue; 203 204 st->next = NULL; 205 if (q->tail == NULL) { 206 q->head = st; 207 TXQ_ASSERT(q->queue_count == 0); 208 } else { 209 q->tail->next = st; 210 TXQ_ASSERT(q->queue_count); 211 } 212 q->tail = st; 213 214 TXQ_OP(q->queue_count++); 215 TXQ_DEBUG("txq_send: st=%p, head=%p, tail=%p, count=%u\n", st, q->head, q->tail, 216 q->queue_count); 217 218 if (q->tail == q->head) { 219 TXQ_ASSERT(q->queue_count == 1); 220 try_send(st); 221 } 222} 223