1// Copyright 2016 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <assert.h> 6#include <errno.h> 7#include <fcntl.h> 8#include <stdbool.h> 9#include <stdint.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <threads.h> 14#include <unistd.h> 15 16#include <zircon/compiler.h> 17#include <zircon/listnode.h> 18#include <lib/zircon-internal/xorshiftrand.h> 19#include <zircon/types.h> 20 21#include "filesystems.h" 22#include "misc.h" 23 24#define FAIL -1 25#define BUSY 0 26#define DONE 1 27 28#define FBUFSIZE 65536 29 30static_assert(FBUFSIZE == ((FBUFSIZE / sizeof(uint64_t)) * sizeof(uint64_t)), 31 "FBUFSIZE not multiple of uint64_t"); 32 33typedef struct worker worker_t; 34// global environment variables 35typedef struct env { 36 worker_t* all_workers; 37 38 list_node_t threads; 39} env_t; 40 41typedef struct worker { 42 env_t* env; 43 44 worker_t* next; 45 int (*work)(worker_t* w); 46 47 rand64_t rdata; 48 rand32_t rops; 49 50 int fd; 51 int status; 52 uint32_t flags; 53 uint32_t size; 54 uint32_t pos; 55 56 union { 57 uint8_t u8[FBUFSIZE]; 58 uint64_t u64[FBUFSIZE / sizeof(uint64_t)]; 59 }; 60 61 char name[256]; 62} worker_t; 63#define F_RAND_IOSIZE 1 64 65 66bool worker_new(env_t* env, const char* where, const char* fn, 67 int (*work)(worker_t* w), uint32_t size, uint32_t flags); 68int worker_writer(worker_t* w); 69static bool init_environment(env_t* env); 70 71typedef struct thread_list { 72 list_node_t node; 73 thrd_t t; 74} thread_list_t; 75 76int worker_rw(worker_t* w, bool do_read) { 77 if (w->pos == w->size) { 78 return DONE; 79 } 80 81 // offset into buffer 82 uint32_t off = w->pos % FBUFSIZE; 83 84 // fill our content buffer if it's empty 85 if (off == 0) { 86 for (unsigned n = 0; n < (FBUFSIZE / sizeof(uint64_t)); n++) { 87 w->u64[n] = rand64(&w->rdata); 88 } 89 } 90 91 // data in buffer available to write 92 uint32_t xfer = FBUFSIZE - off; 93 94 // do not exceed our desired size 95 if (xfer > (w->size - w->pos)) { 96 xfer = w->size - w->pos; 97 } 98 99 if ((w->flags & F_RAND_IOSIZE) && (xfer > 3000)) { 100 xfer = 3000 + (rand32(&w->rops) % (xfer - 3000)); 101 } 102 103 int r; 104 if (do_read) { 105 uint8_t buffer[FBUFSIZE]; 106 if ((r = read(w->fd, buffer, xfer)) < 0) { 107 fprintf(stderr, "worker('%s') read failed @%u: %d\n", 108 w->name, w->pos, errno); 109 return FAIL; 110 } 111 if (memcmp(buffer, w->u8 + off, r)) { 112 fprintf(stderr, "worker('%s) verify failed @%u\n", 113 w->name, w->pos); 114 return FAIL; 115 } 116 } else { 117 if ((r = write(w->fd, w->u8 + off, xfer)) < 0) { 118 fprintf(stderr, "worker('%s') write failed @%u: %d\n", 119 w->name, w->pos, errno); 120 return FAIL; 121 } 122 } 123 124 // advance 125 w->pos += r; 126 return BUSY; 127} 128 129int worker_verify(worker_t* w) { 130 int r = worker_rw(w, true); 131 if (r == DONE) { 132 close(w->fd); 133 } 134 return r; 135} 136 137int worker_writer(worker_t* w) { 138 int r = worker_rw(w, false); 139 if (r == DONE) { 140 if (lseek(w->fd, 0, SEEK_SET) != 0) { 141 fprintf(stderr, "worker('%s') seek failed: %s\n", 142 w->name, strerror(errno)); 143 return FAIL; 144 } 145 // start at 0 and reset our data generator seed 146 srand64(&w->rdata, w->name); 147 w->pos = 0; 148 w->work = worker_verify; 149 return BUSY; 150 } 151 return r; 152} 153 154bool worker_new(env_t* env, const char* where, const char* fn, 155 int (*work)(worker_t* w), uint32_t size, uint32_t flags) { 156 worker_t* w = calloc(1, sizeof(worker_t)); 157 ASSERT_NE(w, NULL, ""); 158 159 w->env = env; 160 161 snprintf(w->name, sizeof(w->name), "%s%s", where, fn); 162 srand64(&w->rdata, w->name); 163 srand32(&w->rops, w->name); 164 w->size = size; 165 w->work = work; 166 w->flags = flags; 167 168 ASSERT_GT((w->fd = open(w->name, O_RDWR | O_CREAT | O_EXCL, 0644)), 0, ""); 169 170 w->next = w->env->all_workers; 171 env->all_workers = w; 172 173 return true; 174} 175 176int do_work(env_t* env) { 177 uint32_t busy_count = 0; 178 for (worker_t* w = env->all_workers; w != NULL; w = w->next) { 179 w->env = env; 180 if (w->status == BUSY) { 181 busy_count++; 182 if ((w->status = w->work(w)) == FAIL) { 183 EXPECT_EQ(unlink(w->name), 0, ""); 184 return FAIL; 185 } 186 if (w->status == DONE) { 187 fprintf(stderr, "worker('%s') finished\n", w->name); 188 EXPECT_EQ(unlink(w->name), 0, ""); 189 } 190 } 191 } 192 return busy_count ? BUSY : DONE; 193} 194 195bool test_work_single_thread(void) { 196 BEGIN_TEST; 197 198 env_t env; 199 init_environment(&env); 200 201 for (;;) { 202 int r = do_work(&env); 203 assert(r != FAIL); 204 if (r == DONE) { 205 break; 206 } 207 } 208 209 worker_t* w = env.all_workers; 210 worker_t* next; 211 while (w != NULL) { 212 next = w->next; 213 free(w); 214 w = next; 215 } 216 END_TEST; 217} 218 219#define KB(n) ((n)*1024) 220#define MB(n) ((n)*1024 * 1024) 221 222static struct { 223 int (*work)(worker_t*); 224 const char* name; 225 uint32_t size; 226 uint32_t flags; 227} WORK[] = { 228 { 229 worker_writer, "file0000", KB(512), F_RAND_IOSIZE, 230 }, 231 { 232 worker_writer, "file0001", MB(10), F_RAND_IOSIZE, 233 }, 234 { 235 worker_writer, "file0002", KB(512), F_RAND_IOSIZE, 236 }, 237 { 238 worker_writer, "file0003", KB(512), F_RAND_IOSIZE, 239 }, 240 { 241 worker_writer, "file0004", KB(512), 0, 242 }, 243 { 244 worker_writer, "file0005", MB(20), 0, 245 }, 246 { 247 worker_writer, "file0006", KB(512), 0, 248 }, 249 { 250 worker_writer, "file0007", KB(512), 0, 251 }, 252}; 253 254static bool init_environment(env_t* env) { 255 256 // tests are run repeatedly, so reinitialize each time 257 env->all_workers = NULL; 258 259 list_initialize(&env->threads); 260 261 // assemble the work 262 const char* where = "::"; 263 for (unsigned n = 0; n < countof(WORK); n++) { 264 ASSERT_TRUE(worker_new(env, where, WORK[n].name, WORK[n].work, 265 WORK[n].size, WORK[n].flags), ""); 266 } 267 return true; 268} 269 270static int do_threaded_work(void* arg) { 271 worker_t* w = arg; 272 273 fprintf(stderr, "work thread(%s) started\n", w->name); 274 while ((w->status = w->work(w)) == BUSY) { 275 thrd_yield(); 276 } 277 278 fprintf(stderr, "work thread(%s) %s\n", w->name, 279 w->status == DONE ? "finished" : "failed"); 280 EXPECT_EQ(unlink(w->name), 0, ""); 281 282 zx_status_t status = w->status; 283 free(w); 284 return status; 285} 286 287static bool test_work_concurrently(void) { 288 BEGIN_TEST; 289 290 env_t env; 291 ASSERT_TRUE(init_environment(&env), ""); 292 293 for (worker_t* w = env.all_workers; w != NULL; w = w->next) { 294 // start the workers on separate threads 295 thrd_t t; 296 ASSERT_EQ(thrd_create(&t, do_threaded_work, w), thrd_success, ""); 297 thread_list_t* thread = malloc(sizeof(thread_list_t)); 298 ASSERT_NE(thread, NULL, ""); 299 thread->t = t; 300 list_add_tail(&env.threads, &thread->node); 301 } 302 303 thread_list_t* next; 304 thread_list_t* tmp; 305 list_for_every_entry_safe(&env.threads, next, tmp, thread_list_t, node) { 306 int rc; 307 ASSERT_EQ(thrd_join(next->t, &rc), thrd_success, ""); 308 ASSERT_EQ(rc, DONE, "Thread joined, but failed"); 309 free(next); 310 } 311 312 END_TEST; 313} 314 315const test_disk_t disk = { 316 .block_count = 3 * (1LLU << 16), 317 .block_size = 1LLU << 9, 318 .slice_size = 1LLU << 23, 319}; 320 321RUN_FOR_ALL_FILESYSTEMS_SIZE(rw_workers_test, disk, 322 RUN_TEST_MEDIUM(test_work_single_thread) 323 RUN_TEST_LARGE(test_work_concurrently) 324) 325