1/** 2 * \file 3 * \brief block_server client process. 4 */ 5 6/* 7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <stdio.h> 16 17#include <barrelfish/barrelfish.h> 18#include <barrelfish/waitset.h> 19#include <barrelfish/nameservice_client.h> 20 21#include <bulk_transfer/bulk_transfer.h> 22#include <bulk_transfer/bulk_sm.h> 23 24#include "bs_connector.h" 25#include "benchmark.h" 26 27enum bs_connect_stage 28{ 29 CSTAGE_START, 30 CSTAGE_SERVICE_BOUND, 31 CSTAGE_SERVICE_SETUP, 32 CSTAGE_ERR 33}; 34 35static uint32_t seq_number = 0; 36 37volatile enum bs_connect_stage connection_stage = CSTAGE_START; 38 39struct waitset *bs_service_ws = NULL; 40struct waitset *bs_service_tx_ws = NULL; 41struct waitset *bs_service_rx_ws = NULL; 42 43static inline void wait_for_condition(enum bs_connect_stage stage) 44{ 45 struct bulk_sm_ws_item ws_list[3]; 46 47 ws_list[0].ws = bs_service_ws; 48 ws_list[1].ws = bs_service_tx_ws; 49 ws_list[2].ws = bs_service_rx_ws; 50 ws_list[0].next = &ws_list[1]; 51 ws_list[1].next = &ws_list[2]; 52 ws_list[2].next = NULL; 53 54 while (connection_stage != stage) { 55 errval_t err = bulk_sm_multiple_event_dispatch(ws_list); 56 if (err_is_fail(err)) { 57 USER_PANIC_ERR(err, "wait_for_condition: event_dispach"); 58 } 59 } 60} 61 62 63static void bs_service_status_cb(struct block_service_binding *b, 64 errval_t err, 65 uint32_t seqn, 66 uint32_t req) 67{ 68 BS_CONN_DEBUG("status message: error=%s, seqn=%u, req=%u", 69 err_getstring(err), seqn, req); 70 bench_signal(err, seqn, req); 71} 72 73static struct block_service_rx_vtbl bs_rx_vtbl = { 74 .status = bs_service_status_cb }; 75 76static void bs_service_bind_cb(void *st, 77 errval_t err, 78 struct block_service_binding *b) 79{ 80 struct bs_connection *conn = (struct bs_connection *) st; 81 82 b->st = conn; 83 conn->service = b; 84 85 conn->state = BS_CONN_SERVICE_BOUND; 86 87 b->rx_vtbl = bs_rx_vtbl; 88 89 connection_stage = CSTAGE_SERVICE_BOUND; 90} 91 92static void bs_service_setup_cb(void *a) 93{ 94 /* DUMMY */ 95} 96 97static errval_t bs_bulk_bind_cb(struct bulk_channel *channel) 98{ 99 struct bs_connection *conn = (struct bs_connection *) channel->user_state; 100 if (conn->state == BS_CONN_SERVICE_BOUND) { 101 BS_CONN_DEBUG("%s", "first bulk channel bound."); 102 conn->state = BS_CONN_BULK_BINDING; 103 return SYS_ERR_OK; 104 } else if (conn->state == BS_CONN_BULK_BINDING) { 105 BS_CONN_DEBUG("%s", "second channel bound."); 106 conn->state = BS_CONN_CONNECTED; 107 connection_stage = CSTAGE_SERVICE_SETUP; 108 return SYS_ERR_OK; 109 } 110 assert(!"This should not happen..."); 111 return SYS_ERR_OK; 112} 113 114static void bs_service_setup(void *st) 115{ 116 errval_t err; 117 BS_CONN_DEBUG("sending setup message to %s.", BLOCK_SERVICE_NAME); 118 119 struct bs_connection *conn = (struct bs_connection *) st; 120 121 struct event_closure txcont = MKCONT(bs_service_setup_cb, conn); 122 123 BS_CONN_DEBUG("tx_iref=%i, rx_iref=%i", (uint32_t )conn->tx_ep.iref, 124 (uint32_t )conn->rx_ep.iref); 125 126 err = block_service_setup__tx(conn->service, txcont, conn->rx_ep.iref, 127 conn->tx_ep.iref); 128 if (err_is_fail(err)) { 129 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 130 txcont = MKCONT(bs_service_setup, conn); 131 struct waitset *ws = get_default_waitset(); 132 err = conn->service->register_send(conn->service, ws, txcont); 133 if (err_is_fail(err)) { 134 connection_stage = CSTAGE_ERR; 135 conn->state = BS_CONN_ERR; 136 USER_PANIC_ERR(err, "could not register send\n"); 137 } 138 } else { 139 BS_CONN_DEBUG("sending setup message %s", BLOCK_SERVICE_NAME); 140 connection_stage = CSTAGE_ERR; 141 conn->state = BS_CONN_ERR; 142 USER_PANIC_ERR(err, "flounder failure\n"); 143 } 144 } 145} 146 147errval_t bs_service_connect(struct bs_connection *conn, 148 struct bulk_channel_callbacks *rx_cb, 149 struct bulk_channel_callbacks *tx_cb) 150{ 151 errval_t err; 152 iref_t service_iref; 153 154 BS_CONN_DEBUG("looking up service: %s", BLOCK_SERVICE_NAME); 155 /* connect to the block service */ 156 err = nameservice_blocking_lookup(BLOCK_SERVICE_NAME, &service_iref); 157 if (err_is_fail(err)) { 158 return err; 159 } 160 161 BS_CONN_DEBUG("binding to iref %i", (uint32_t )service_iref); 162 err = block_service_bind(service_iref, bs_service_bind_cb, conn, 163 get_default_waitset(), IDC_BIND_FLAGS_DEFAULT); 164 if (err_is_fail(err)) { 165 return err; 166 } 167 168 bs_service_ws = get_default_waitset(); 169 170 wait_for_condition(CSTAGE_SERVICE_BOUND); 171 172 BS_CONN_DEBUG("%s", "setting up bulk endpoints and channels"); 173 174 /* setting up bulk endpoints */ 175 err = bulk_sm_ep_create(&conn->rx_ep); 176 assert(!err_is_fail(err)); 177 err = bulk_sm_ep_create(&conn->tx_ep); 178 assert(!err_is_fail(err)); 179 180 // need exclusive waitset per channel 181 struct waitset *rx_ws = malloc(sizeof(*rx_ws)); 182 struct waitset *tx_ws = malloc(sizeof(*tx_ws)); 183 assert(rx_ws); 184 assert(tx_ws); 185 waitset_init(rx_ws); 186 waitset_init(tx_ws); 187 188 bs_service_tx_ws = tx_ws; 189 bs_service_rx_ws = rx_ws; 190 191 struct bulk_channel_setup setup = { 192 .role = BULK_ROLE_MASTER, 193 .trust = BULK_TRUST_FULL, 194 .direction = BULK_DIRECTION_RX, 195 .meta_size = sizeof(struct bs_meta_data), 196 .waitset = rx_ws, 197 .user_state = conn }; 198 199 rx_cb->bind_received = bs_bulk_bind_cb; 200 201 err = bulk_channel_create(&conn->rx_channel, 202 (struct bulk_endpoint_descriptor *) &conn->rx_ep, 203 rx_cb, &setup); 204 if (err_is_fail(err)) { 205 return err; 206 } 207 208 setup.direction = BULK_DIRECTION_TX; 209 setup.waitset = tx_ws; 210 211 tx_cb->bind_received = bs_bulk_bind_cb; 212 213 err = bulk_channel_create(&conn->tx_channel, 214 (struct bulk_endpoint_descriptor *) &conn->tx_ep, 215 tx_cb, &setup); 216 if (err_is_fail(err)) { 217 return err; 218 } 219 220 bs_service_setup(conn); 221 222 wait_for_condition(CSTAGE_SERVICE_SETUP); 223 224 BS_CONN_DEBUG("%s", "Service and bulk channels setup."); 225 226 return SYS_ERR_OK; 227} 228 229struct bs_read_request { 230 struct bs_connection *conn; 231 uint32_t block_id; 232 uint32_t block_count; 233 struct bulk_continuation cont; 234}; 235 236static void bs_service_read_sent_cb(void *a) 237{ 238 free(a); 239} 240 241static errval_t bs_service_read_send(void *a) 242{ 243 errval_t err; 244 245 struct bs_read_request *req = (struct bs_read_request*)a; 246 247 struct event_closure txcont = MKCONT(bs_service_read_sent_cb, req); 248 err = block_service_read__tx(req->conn->service, txcont, req->block_id, req->block_count, seq_number); 249 if (err_is_fail(err)) { 250 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 251 // cannot do this: would need to register multiple resend functions. 252 // struct waitset *ws = get_default_waitset(); 253 // txcont = MKCONT(bs_service_read_send, req); 254 // err = req->conn->service->register_send(req->conn->service, ws, txcont); 255 // if (err_is_fail(err)) { 256 // // note that only one continuation may be registered at a time 257 // DEBUG_ERR(err, "register_send on binding failed!"); 258 // } 259 } 260 } else { 261 seq_number++; 262 } 263 264 return err; 265} 266 267errval_t bs_service_read(struct bs_connection *conn, 268 uint32_t block_id, 269 uint32_t block_count, 270 struct bulk_continuation cont) 271{ 272 struct bs_read_request *req = malloc(sizeof(struct bs_read_request)); 273 274 req->block_count = block_count; 275 req->conn = conn; 276 req->block_id = block_id; 277 req->cont = cont; 278 279 errval_t err; 280 do { // busy wait because no send queue available. 281 err = bs_service_read_send(req); 282 if (err_is_fail(err) && err != FLOUNDER_ERR_TX_BUSY) { 283 USER_PANIC_ERR(err, "bs_service_read"); 284 } 285 286 errval_t other = event_dispatch(get_default_waitset()); 287 // maybe need dispatch service->tx/rx_channel.waitset as well? 288 assert(err_is_ok(other)); 289 } while (err_is_fail(err)); 290 291 return SYS_ERR_OK; 292} 293