1#include <stdio.h>
2#include <string.h>
3
4#include <barrelfish/barrelfish.h>
5#include <barrelfish/nameservice_client.h>
6#include <if/mt_waitset_defs.h>
7#include <if/mt_waitset_defs.h>
8#include <barrelfish/deferred.h>
9#include <barrelfish/inthandler.h>
10#include <bench/bench.h>
11#include <sys/time.h>
12#include "../lib/barrelfish/include/threads_priv.h"
13#include <barrelfish/debug.h>
14#include <barrelfish/spawn_client.h>
15#include <barrelfish/event_mutex.h>
16
17const static char *service_name = "mt_waitset_service";
18coreid_t my_core_id, num_cores;
19struct thread *threads[256];
20
21static int server_threads = 10;
22static int client_threads = 1;
23static int iteration_count = 1000;
24
25static int client_counter = 0;
26static int64_t server_calls[256];
27static int64_t client_calls[256][256];
28
29static void show_stats(void)
30{
31    debug_printf("Stats: %zd %zd %zd %zd %zd %zd %zd %zd %zd %zd\n",
32        server_calls[0], server_calls[1], server_calls[2], server_calls[3],
33        server_calls[4], server_calls[5], server_calls[6], server_calls[7],
34        server_calls[8], server_calls[9]);
35}
36
37static void show_client_stats(void)
38{
39    int i, j, s;
40    char text[256];
41
42    for (i = 0; i < num_cores; i++) {
43        s = sprintf(text, "Core %d:", i);
44        for (j = 0; j < 16; j++)
45            s += sprintf(text + s, "\t%zd", client_calls[i][j]);
46        s += sprintf(text + s, "\n");
47        debug_printf("%s", text);
48    }
49}
50
51static int client_thread(void * arg)
52{
53    struct mt_waitset_binding *binding;
54    errval_t err;
55    binding = arg;
56    int i, j, k, l;
57    uint64_t payload[512];
58    uint64_t result[512];
59    size_t result_size;
60    uint64_t o1;
61    uint32_t o2;
62    uint32_t i1 = my_core_id << 8 | thread_self()->id;
63    uint64_t mmm = ((uint64_t)my_core_id << 56) | ((uint64_t)thread_self()->id << 48);
64
65    debug_printf("Start\n");
66
67    for (k = 0; k < iteration_count; k++) {
68        uint64_t i2 = (rdtsc() & 0xffffffff) | mmm | (((uint64_t)k & 0xffffL) << 32);
69
70        j = ((i2 >> 5) & 511) + 1;
71
72        i2 &= 0xfffffffffffff000;
73
74        for (i = 0; i < j; i++)
75            payload[i] = i2 + i;
76        err = binding->rpc_tx_vtbl.rpc_method(binding, i2, (uint8_t *)payload, 8 * j, i1, &o1, (uint8_t *)result, &result_size, &o2);
77
78        assert(err == SYS_ERR_OK);
79        l = 0;
80        for (i = 0; i < j; i++) {
81            if (result[i] == payload[i] + i)
82                l++;
83        }
84        if (!(i2 + 1 == o1) || result_size != (8 * j) || l != j) {
85            debug_printf("%d: wrong %016lx != %016lx  %d %zd    %d %d\n", k, i2 + 1, o1, 8 * j, result_size, j, l);
86            for (i = 0; i < j; i++)
87                debug_printf("\t%d: %016lx %016lx\n", i, payload[i], result[i]);
88        }
89        server_calls[o2]++;
90        if (err_is_fail(err)) {
91            DEBUG_ERR(err, "error sending message\n");
92        }
93    }
94
95    dispatcher_handle_t handle = disp_disable();
96
97    client_counter--;
98    debug_printf("Done, threads left:%d\n", client_counter);
99
100    if (client_counter == 0) {
101        disp_enable(handle);
102        // all threads have finished, we're done, inform the server
103        payload[0] = mmm;
104        err = binding->rpc_tx_vtbl.rpc_method(binding, mmm, (uint8_t *)payload, 8, 65536, &o1, (uint8_t *)result, &result_size, &o2);
105        show_stats();
106    } else
107        disp_enable(handle);
108    return 0;
109}
110
111static void bind_cb(void *st, errval_t err, struct mt_waitset_binding *b)
112{
113    int i = (long int)st;
114
115    mt_waitset_rpc_client_init(b);
116
117    client_counter = client_threads;
118    for (i = 1; i < client_threads; i++)
119        thread_create(client_thread, b);
120    client_thread(b);
121}
122
123static void start_client(void)
124{
125    char name[64];
126    errval_t err;
127    iref_t iref;
128
129    debug_printf("Start client\n");
130    sprintf(name, "%s%d", service_name, 0);
131    err = nameservice_blocking_lookup(service_name, &iref);
132    if (err_is_fail(err)) {
133        USER_PANIC_ERR(err, "nameservice_blocking_lookup failed");
134    }
135    err = mt_waitset_bind(iref, bind_cb,  (void *)0, get_default_waitset(), IDC_BIND_FLAGS_DEFAULT);
136    if (err_is_fail(err)) {
137        USER_PANIC_ERR(err, "bind failed");
138    }
139}
140
141
142// server
143
144static void export_cb(void *st, errval_t err, iref_t iref)
145{
146    if (err_is_fail(err)) {
147        USER_PANIC_ERR(err, "export failed");
148    }
149    err = nameservice_register(service_name, iref);
150    if (err_is_fail(err)) {
151            USER_PANIC_ERR(err, "nameservice_register failed");
152    }
153}
154
155static errval_t server_rpc_method_call(struct mt_waitset_binding *b, uint64_t i1, const uint8_t *s, size_t ss, uint32_t i2, uint64_t *o1, uint8_t *r, size_t *rs, uint32_t *o2)
156{
157    int i, j, k, me;
158    static int count = 0;
159    static uint64_t calls = 0;
160    uint64_t *response = (uint64_t *)r;
161
162    for (i = 0;; i++) {
163        if (thread_self() == threads[i]) {
164            server_calls[i]++;
165            me = i;
166            break;
167        }
168    }
169
170    if (i2 == 65536) {
171        count++;    // client has finished
172    } else
173        client_calls[i2 >> 8][i2 & 255]++;
174
175    j = ss / 8;
176    k = 0;
177    for (i = 0; i < j; i++) {
178        response[i] = ((uint64_t *)s)[i];
179        if (response[i] == i1 + i)
180            k++;
181        response[i] += i;
182    }
183    if (k != j && i2 != 65536)
184        debug_printf("%s: binding:%p %08x %08x  %d %d   %016lx:%d\n", __func__, b, i2, b->incoming_token, k, j, response[0], me);
185    if (count == num_cores) {
186        bool failed = false;
187
188        debug_printf("Final statistics\n");
189        show_stats();
190        show_client_stats();
191        for (i = 0; i < num_cores; i++) {
192            for (j = 0; j < client_threads; j++) {
193                if (client_calls[i][j] != iteration_count) {
194                    failed = true;
195                    goto out;
196                }
197            }
198        }
199out:
200        if (failed)
201            debug_printf("Test FAILED\n");
202        else
203            debug_printf("Test PASSED\n");
204    }
205    calls++;
206    if ((calls % 10000) == 0) {
207        show_stats();
208    }
209
210    *o1 = i1 + 1;
211    *rs = 8 * j;
212    *o2 = me;
213
214    return SYS_ERR_OK;
215}
216
217static struct mt_waitset_rpc_rx_vtbl rpc_rx_vtbl = {
218    .rpc_method_call = server_rpc_method_call
219};
220
221static errval_t connect_cb(void *st, struct mt_waitset_binding *b)
222{
223    b->rpc_rx_vtbl = rpc_rx_vtbl;
224    return SYS_ERR_OK;
225}
226
227static int run_server(void * arg)
228{
229    int i = (uintptr_t)arg;
230    struct waitset *ws = get_default_waitset();
231    errval_t err;
232
233
234    debug_printf("Server dispatch loop %d\n", i);
235    threads[i] = thread_self();
236
237    for (;;) {
238        err = event_dispatch(ws);
239        if (err_is_fail(err)) {
240            DEBUG_ERR(err, "in event_dispatch");
241            break;
242        }
243    }
244    return SYS_ERR_OK;
245}
246
247static void start_server(void)
248{
249    struct waitset *ws = get_default_waitset();
250    errval_t err;
251    int i;
252
253    debug_printf("Start server\n");
254
255    err = mt_waitset_export(NULL, export_cb, connect_cb, ws,
256                            IDC_EXPORT_FLAGS_DEFAULT);
257    if (err_is_fail(err)) {
258        USER_PANIC_ERR(err, "export failed");
259    }
260    for (i = 1; i < server_threads; i++) {
261        thread_create(run_server, (void *)(uintptr_t)i);
262    }
263}
264
265int main(int argc, char *argv[])
266{
267    errval_t err;
268    char *my_name = strdup(argv[0]);
269
270    my_core_id = disp_get_core_id();
271
272    memset(server_calls, 0, sizeof(server_calls));
273    memset(client_calls, 0, sizeof(client_calls));
274
275    if (argc == 1) {
276        debug_printf("Usage: %s server_threads client_threads iteration_count\n", argv[0]);
277    } else if (argc == 4) {
278        char *xargv[] = {my_name, argv[2], argv[3], NULL};
279
280        server_threads = atoi(argv[1]);
281        client_threads = atoi(argv[2]);
282        iteration_count = atoi(argv[3]);
283
284        err = spawn_program_on_all_cores(true, xargv[0], xargv, NULL,
285            SPAWN_FLAGS_DEFAULT, NULL, &num_cores);
286        debug_printf("spawn program on all cores (%d)\n", num_cores);
287        assert(err_is_ok(err));
288
289        start_server();
290
291        struct waitset *ws = get_default_waitset();
292
293        threads[0] = thread_self();
294        for (;;) {
295            err = event_dispatch(ws);
296            if (err_is_fail(err)) {
297                DEBUG_ERR(err, "in event_dispatch");
298                break;
299            }
300        }
301    } else {
302        client_threads = atoi(argv[1]);
303        iteration_count = atoi(argv[2]);
304
305        struct waitset *ws = get_default_waitset();
306        start_client();
307        debug_printf("Client process events\n");
308        for (;;) {
309            err = event_dispatch(ws);
310            if (err_is_fail(err)) {
311                DEBUG_ERR(err, "in event_dispatch");
312                break;
313            }
314        }
315    }
316    return EXIT_FAILURE;
317}
318