1// Copyright 2016 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <assert.h>
6#include <errno.h>
7#include <fcntl.h>
8#include <stdbool.h>
9#include <stdint.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <threads.h>
14#include <unistd.h>
15
16#include <zircon/compiler.h>
17#include <zircon/listnode.h>
18#include <lib/zircon-internal/xorshiftrand.h>
19#include <zircon/types.h>
20
21#include "filesystems.h"
22#include "misc.h"
23
24#define FAIL -1
25#define BUSY 0
26#define DONE 1
27
28#define FBUFSIZE 65536
29
30static_assert(FBUFSIZE == ((FBUFSIZE / sizeof(uint64_t)) * sizeof(uint64_t)),
31              "FBUFSIZE not multiple of uint64_t");
32
33typedef struct worker worker_t;
34// global environment variables
35typedef struct env {
36    worker_t* all_workers;
37
38    list_node_t threads;
39} env_t;
40
41typedef struct worker {
42    env_t* env;
43
44    worker_t* next;
45    int (*work)(worker_t* w);
46
47    rand64_t rdata;
48    rand32_t rops;
49
50    int fd;
51    int status;
52    uint32_t flags;
53    uint32_t size;
54    uint32_t pos;
55
56    union {
57        uint8_t u8[FBUFSIZE];
58        uint64_t u64[FBUFSIZE / sizeof(uint64_t)];
59    };
60
61    char name[256];
62} worker_t;
63#define F_RAND_IOSIZE 1
64
65
66bool worker_new(env_t* env, const char* where, const char* fn,
67                int (*work)(worker_t* w), uint32_t size, uint32_t flags);
68int worker_writer(worker_t* w);
69static bool init_environment(env_t* env);
70
71typedef struct thread_list {
72    list_node_t node;
73    thrd_t t;
74} thread_list_t;
75
76int worker_rw(worker_t* w, bool do_read) {
77    if (w->pos == w->size) {
78        return DONE;
79    }
80
81    // offset into buffer
82    uint32_t off = w->pos % FBUFSIZE;
83
84    // fill our content buffer if it's empty
85    if (off == 0) {
86        for (unsigned n = 0; n < (FBUFSIZE / sizeof(uint64_t)); n++) {
87            w->u64[n] = rand64(&w->rdata);
88        }
89    }
90
91    // data in buffer available to write
92    uint32_t xfer = FBUFSIZE - off;
93
94    // do not exceed our desired size
95    if (xfer > (w->size - w->pos)) {
96        xfer = w->size - w->pos;
97    }
98
99    if ((w->flags & F_RAND_IOSIZE) && (xfer > 3000)) {
100        xfer = 3000 + (rand32(&w->rops) % (xfer - 3000));
101    }
102
103    int r;
104    if (do_read) {
105        uint8_t buffer[FBUFSIZE];
106        if ((r = read(w->fd, buffer, xfer)) < 0) {
107            fprintf(stderr, "worker('%s') read failed @%u: %d\n",
108                    w->name, w->pos, errno);
109            return FAIL;
110        }
111        if (memcmp(buffer, w->u8 + off, r)) {
112            fprintf(stderr, "worker('%s) verify failed @%u\n",
113                    w->name, w->pos);
114            return FAIL;
115        }
116    } else {
117        if ((r = write(w->fd, w->u8 + off, xfer)) < 0) {
118            fprintf(stderr, "worker('%s') write failed @%u: %d\n",
119                    w->name, w->pos, errno);
120            return FAIL;
121        }
122    }
123
124    // advance
125    w->pos += r;
126    return BUSY;
127}
128
129int worker_verify(worker_t* w) {
130    int r = worker_rw(w, true);
131    if (r == DONE) {
132        close(w->fd);
133    }
134    return r;
135}
136
137int worker_writer(worker_t* w) {
138    int r = worker_rw(w, false);
139    if (r == DONE) {
140        if (lseek(w->fd, 0, SEEK_SET) != 0) {
141            fprintf(stderr, "worker('%s') seek failed: %s\n",
142                    w->name, strerror(errno));
143            return FAIL;
144        }
145        // start at 0 and reset our data generator seed
146        srand64(&w->rdata, w->name);
147        w->pos = 0;
148        w->work = worker_verify;
149        return BUSY;
150    }
151    return r;
152}
153
154bool worker_new(env_t* env, const char* where, const char* fn,
155                int (*work)(worker_t* w), uint32_t size, uint32_t flags) {
156    worker_t* w = calloc(1, sizeof(worker_t));
157    ASSERT_NE(w, NULL, "");
158
159    w->env = env;
160
161    snprintf(w->name, sizeof(w->name), "%s%s", where, fn);
162    srand64(&w->rdata, w->name);
163    srand32(&w->rops, w->name);
164    w->size = size;
165    w->work = work;
166    w->flags = flags;
167
168    ASSERT_GT((w->fd = open(w->name, O_RDWR | O_CREAT | O_EXCL, 0644)), 0, "");
169
170    w->next = w->env->all_workers;
171    env->all_workers = w;
172
173    return true;
174}
175
176int do_work(env_t* env) {
177    uint32_t busy_count = 0;
178    for (worker_t* w = env->all_workers; w != NULL; w = w->next) {
179        w->env = env;
180        if (w->status == BUSY) {
181            busy_count++;
182            if ((w->status = w->work(w)) == FAIL) {
183                EXPECT_EQ(unlink(w->name), 0, "");
184                return FAIL;
185            }
186            if (w->status == DONE) {
187                fprintf(stderr, "worker('%s') finished\n", w->name);
188                EXPECT_EQ(unlink(w->name), 0, "");
189            }
190        }
191    }
192    return busy_count ? BUSY : DONE;
193}
194
195bool test_work_single_thread(void) {
196    BEGIN_TEST;
197
198    env_t env;
199    init_environment(&env);
200
201    for (;;) {
202        int r = do_work(&env);
203        assert(r != FAIL);
204        if (r == DONE) {
205            break;
206        }
207    }
208
209    worker_t* w = env.all_workers;
210    worker_t* next;
211    while (w != NULL) {
212        next = w->next;
213        free(w);
214        w = next;
215    }
216    END_TEST;
217}
218
219#define KB(n) ((n)*1024)
220#define MB(n) ((n)*1024 * 1024)
221
222static struct {
223    int (*work)(worker_t*);
224    const char* name;
225    uint32_t size;
226    uint32_t flags;
227} WORK[] = {
228    {
229        worker_writer, "file0000", KB(512), F_RAND_IOSIZE,
230    },
231    {
232        worker_writer, "file0001", MB(10), F_RAND_IOSIZE,
233    },
234    {
235        worker_writer, "file0002", KB(512), F_RAND_IOSIZE,
236    },
237    {
238        worker_writer, "file0003", KB(512), F_RAND_IOSIZE,
239    },
240    {
241        worker_writer, "file0004", KB(512), 0,
242    },
243    {
244        worker_writer, "file0005", MB(20), 0,
245    },
246    {
247        worker_writer, "file0006", KB(512), 0,
248    },
249    {
250        worker_writer, "file0007", KB(512), 0,
251    },
252};
253
254static bool init_environment(env_t* env) {
255
256    // tests are run repeatedly, so reinitialize each time
257    env->all_workers = NULL;
258
259    list_initialize(&env->threads);
260
261    // assemble the work
262    const char* where = "::";
263    for (unsigned n = 0; n < countof(WORK); n++) {
264        ASSERT_TRUE(worker_new(env, where, WORK[n].name, WORK[n].work,
265                               WORK[n].size, WORK[n].flags), "");
266    }
267    return true;
268}
269
270static int do_threaded_work(void* arg) {
271    worker_t* w = arg;
272
273    fprintf(stderr, "work thread(%s) started\n", w->name);
274    while ((w->status = w->work(w)) == BUSY) {
275        thrd_yield();
276    }
277
278    fprintf(stderr, "work thread(%s) %s\n", w->name,
279            w->status == DONE ? "finished" : "failed");
280    EXPECT_EQ(unlink(w->name), 0, "");
281
282    zx_status_t status = w->status;
283    free(w);
284    return status;
285}
286
287static bool test_work_concurrently(void) {
288    BEGIN_TEST;
289
290    env_t env;
291    ASSERT_TRUE(init_environment(&env), "");
292
293    for (worker_t* w = env.all_workers; w != NULL; w = w->next) {
294        // start the workers on separate threads
295        thrd_t t;
296        ASSERT_EQ(thrd_create(&t, do_threaded_work, w), thrd_success, "");
297        thread_list_t* thread = malloc(sizeof(thread_list_t));
298        ASSERT_NE(thread, NULL, "");
299        thread->t = t;
300        list_add_tail(&env.threads, &thread->node);
301    }
302
303    thread_list_t* next;
304    thread_list_t* tmp;
305    list_for_every_entry_safe(&env.threads, next, tmp, thread_list_t, node) {
306        int rc;
307        ASSERT_EQ(thrd_join(next->t, &rc), thrd_success, "");
308        ASSERT_EQ(rc, DONE, "Thread joined, but failed");
309        free(next);
310    }
311
312    END_TEST;
313}
314
315const test_disk_t disk = {
316    .block_count = 3 * (1LLU << 16),
317    .block_size = 1LLU << 9,
318    .slice_size = 1LLU << 23,
319};
320
321RUN_FOR_ALL_FILESYSTEMS_SIZE(rw_workers_test, disk,
322    RUN_TEST_MEDIUM(test_work_single_thread)
323    RUN_TEST_LARGE(test_work_concurrently)
324)
325