1/**
2 * \file
3 * \brief block_server client process.
4 */
5
6/*
7 * Copyright (c) 2007, 2008, 2009, 2010, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <stdio.h>
16
17#include <barrelfish/barrelfish.h>
18#include <bulk_transfer/bulk_transfer.h>
19#include <bulk_transfer/bulk_allocator.h>
20
21#include <if/block_service_defs.h>
22
23#include "benchmark.h"
24#include "bs_connector.h"
25
26static struct bs_connection *service;
27
28static struct bulk_allocator tx_alloc;
29static struct bulk_allocator rx_alloc;
30
31enum bs_bench_state
32{
33    BS_BENCH_STATE_INIT,
34    BS_BENCH_STATE_POOL_1,
35    BS_BENCH_STATE_POOL_2,
36    BS_BENCH_STATE_READY,
37    BS_BENCH_STATE_READ,
38    BS_BENCH_STATE_WRITE
39};
40
41volatile uint32_t num_buf_otherside = 0;
42
43static uint32_t req_id = 0;
44
45volatile enum bs_bench_state bench_state;
46
47static inline void wait_cond(enum bs_bench_state state)
48{
49    struct bulk_sm_ws_item ws_list[3];
50
51    ws_list[0].ws   = get_default_waitset();
52    ws_list[1].ws   = (service ? service->tx_channel.waitset : NULL);
53    ws_list[2].ws   = (service ? service->rx_channel.waitset : NULL);
54    ws_list[0].next = &ws_list[1];
55    ws_list[1].next = &ws_list[2];
56    ws_list[2].next = NULL;
57
58    while (bench_state != state) {
59        errval_t err = bulk_sm_multiple_event_dispatch(ws_list);
60        if (err_is_fail(err)) {
61            USER_PANIC_ERR(err, "wait_for_condition: event_dispach");
62        }
63    }
64}
65
66/* ---------------------- bulk channel callback ------------------------------*/
67
68static void move_recvd_cb(struct bulk_channel *channel,
69                          struct bulk_buffer *buffer,
70                          void *meta)
71{
72    errval_t err;
73    struct bs_meta_data *bs = meta;
74
75    BS_BENCH_DEBUG(" Block Received: id=[%i] data=[0x%x]", (uint32_t)bs->block_id, *(uint32_t*)buffer->address);
76
77
78
79    char *data = buffer->address;
80    for (uint32_t i = 0; i < buffer->pool->buffer_size; ++i) {
81        assert(data[i] == (((bs->block_id % BS_BENCH_MAX_BLOCKS)+5) % 255));
82    }
83
84    err = bulk_channel_pass(channel, buffer, meta, BULK_CONT_NOP);
85    if (err_is_fail(err)) {
86        USER_PANIC_ERR(err, "Could not pass the buffer");
87    }
88
89    bench_state = BS_BENCH_STATE_WRITE;
90}
91
92static void buffer_recvd_cb(struct bulk_channel *channel,
93                            struct bulk_buffer *buffer,
94                            void *meta)
95{
96    BS_BENCH_DEBUG("Buffer: %p", buffer);
97
98    bulk_alloc_return_buffer(&tx_alloc, buffer);
99}
100
101static void copy_recvd_cb(struct bulk_channel *channel,
102                          struct bulk_buffer *buffer,
103                          void *meta)
104{
105    assert(!"NYI: copy");
106}
107
108static void copy_released_cb(struct bulk_channel *channel,
109                             struct bulk_buffer *buffer)
110{
111    assert(!"NYI: copy");
112}
113
114static struct bulk_channel_callbacks bulk_rx_cb = {
115    .move_received = move_recvd_cb,
116    .copy_received = copy_recvd_cb, };
117
118static struct bulk_channel_callbacks bulk_tx_cb = {
119    .copy_released = copy_released_cb,
120    .buffer_received = buffer_recvd_cb, };
121
122struct bulk_channel_callbacks *bench_get_rx_cb(void)
123{
124    return &bulk_rx_cb;
125}
126
127struct bulk_channel_callbacks *bench_get_tx_cb(void)
128{
129    return &bulk_tx_cb;
130}
131
132static void buff_passed_cb(void *arg,
133                           errval_t err,
134                           struct bulk_channel *channel)
135{
136    if (err_is_fail(err)) {
137        DEBUG_ERR(err, "failed passing buffer");
138        return;
139    }
140
141    num_buf_otherside++;
142    if (bench_state == BS_BENCH_STATE_POOL_2
143                    && num_buf_otherside == BS_BENCH_NUM_BUFS) {
144        bench_state = BS_BENCH_STATE_READY;
145    }
146}
147
148static void pool_assigned_cb(void *arg,
149                             errval_t err,
150                             struct bulk_channel *channel)
151{
152    if (err_is_fail(err)) {
153        USER_PANIC_ERR(err, "error while assigning pool");
154        return;
155    }
156    if (bench_state == BS_BENCH_STATE_INIT) {
157        BS_BENCH_DEBUG("%s", "first pool assigned\n\n");
158        bench_state = BS_BENCH_STATE_POOL_1;
159    } else if (bench_state == BS_BENCH_STATE_POOL_1) {
160        BS_BENCH_DEBUG("%s", "second pool assigned\n\n");
161        bench_state = BS_BENCH_STATE_POOL_2;
162    }
163}
164
165errval_t bench_init(struct bs_connection *conn)
166{
167    errval_t err;
168
169    service = conn;
170
171    BS_BENCH_DEBUG("Allocating pools: size=%i",
172                   BS_BENCH_NUM_BUFS * BS_BENCH_BUF_SIZE)
173
174    err = bulk_alloc_init(&tx_alloc, BS_BENCH_NUM_BUFS, BS_BENCH_BUF_SIZE,
175    NULL);
176    if (err_is_fail(err)) {
177        USER_PANIC_ERR(err, "allocating buffer");
178    }
179
180    err = bulk_alloc_init(&rx_alloc, BS_BENCH_NUM_BUFS, BS_BENCH_BUF_SIZE,
181    NULL);
182    if (err_is_fail(err)) {
183        USER_PANIC_ERR(err, "allocating buffer");
184    }
185
186    bench_state = BS_BENCH_STATE_INIT;
187
188    struct bulk_continuation cont = {
189        .arg = NULL,
190        .handler = pool_assigned_cb };
191
192    BS_BENCH_DEBUG("%s", "Assigning first... ");
193
194    err = bulk_channel_assign_pool(&service->rx_channel, rx_alloc.pool, cont);
195    if (err_is_fail(err)) {
196        return err;
197    }
198
199    BS_BENCH_DEBUG("%s", "Assigning second pool... ");
200
201    err = bulk_channel_assign_pool(&service->tx_channel, tx_alloc.pool, cont);
202    if (err_is_fail(err)) {
203        return err;
204    }
205    wait_cond(BS_BENCH_STATE_POOL_2);
206
207    BS_BENCH_DEBUG("%s", "Passing buffers");
208
209    cont.handler = buff_passed_cb;
210
211    for (uint32_t i = 0; i < BS_BENCH_NUM_BUFS; ++i) {
212        // BS_BENCH_DEBUG("Passing buffer %u\n", i);
213        struct bulk_buffer *buf = bulk_alloc_new_buffer(&rx_alloc);
214        assert(buf);
215        err = bulk_channel_pass(&service->rx_channel, buf, NULL, cont);
216    }
217
218
219    wait_cond(BS_BENCH_STATE_READY);
220
221    BS_BENCH_DEBUG("%s", "Benchmark initiated.")
222    return SYS_ERR_OK;
223}
224
225static void free_cont(void *arg, errval_t err, struct bulk_channel *channel)
226{
227   // bench_state = BS_BENCH_STATE_READ;
228}
229
230void bench_signal(errval_t err,
231                  uint32_t seqn,
232                  uint32_t req)
233{
234    if (err != SYS_ERR_OK) {
235        BS_BENCH_DEBUG("SIGNAL: error = %i", (uint32_t)err);
236    }
237
238    // bench_state = BS_BENCH_STATE_READ;
239    if (req == 4) { // usr/block_server/network_common.h:block_net_msg_type:BLOCK_NET_MSG_WRITE
240        // this is messy business.
241        bench_state = BS_BENCH_STATE_READ;
242    }
243
244}
245
246
247static errval_t bench_do_single_run(uint32_t i)
248{
249    errval_t err;
250
251    printf("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n");
252
253    wait_cond(BS_BENCH_STATE_WRITE);
254
255    struct bulk_buffer *buf = bulk_alloc_new_buffer(&tx_alloc);
256    assert(buf);
257
258    struct bs_meta_data meta;
259    meta.block_id = i % BS_BENCH_MAX_BLOCKS;
260    meta.req_id = req_id++;
261
262    memset(buf->address, (((i % BS_BENCH_MAX_BLOCKS)+5) % 255),  buf->pool->buffer_size);
263
264    err = bulk_channel_move(&service->tx_channel, buf, &meta,
265                            MK_BULK_CONT(free_cont, NULL));
266    if (err_is_fail(err)) {
267        USER_PANIC_ERR(err, "Failed to write to channel");
268    }
269
270    /* TODO: Wait for status reply here */
271    wait_cond(BS_BENCH_STATE_READ);
272
273    err = bs_service_read(service, i % BS_BENCH_MAX_BLOCKS, 1, BULK_CONT_NOP);
274    if (err_is_fail(err)) {
275        USER_PANIC_ERR(err, "could not read request");
276    }
277
278    return SYS_ERR_OK;
279}
280
281errval_t bench_run(void)
282{
283    BS_BENCH_DEBUG("Starting benchmark. NRUNS=%i", BS_BENCH_NUM_RUNS);
284
285    bench_state = BS_BENCH_STATE_WRITE;
286
287    for (uint32_t i=0; i < BS_BENCH_NUM_RUNS; ++i) {
288        if(err_is_fail(bench_do_single_run(i))) {
289            USER_PANIC("Benchmark crashed.");
290            break;
291        }
292    }
293
294    wait_cond(BS_BENCH_STATE_WRITE);
295
296    BS_BENCH_DEBUG("%s", "Benchmark completed: ");
297    return SYS_ERR_OK;
298}
299