1// Copyright 2017 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <threads.h> 6 7#include <zircon/process.h> 8#include <zircon/syscalls.h> 9#include <zircon/syscalls/exception.h> 10 11#include <lib/async-loop/cpp/loop.h> 12#include <lib/async/cpp/time.h> 13#include <lib/async/exception.h> 14#include <lib/async/receiver.h> 15#include <lib/async/task.h> 16#include <lib/async/time.h> 17#include <lib/async/wait.h> 18 19#include <fbl/atomic.h> 20#include <fbl/auto_lock.h> 21#include <fbl/function.h> 22#include <fbl/mutex.h> 23#include <lib/zx/event.h> 24#include <unittest/unittest.h> 25#include <zircon/threads.h> 26 27namespace { 28 29class TestWait : public async_wait_t { 30public: 31 TestWait(zx_handle_t object, zx_signals_t trigger) 32 : async_wait_t{{ASYNC_STATE_INIT}, &TestWait::CallHandler, object, trigger} { 33 } 34 35 virtual ~TestWait() = default; 36 37 uint32_t run_count = 0u; 38 zx_status_t last_status = ZX_ERR_INTERNAL; 39 const zx_packet_signal_t* last_signal = nullptr; 40 41 zx_status_t Begin(async_dispatcher_t* dispatcher) { 42 return async_begin_wait(dispatcher, this); 43 } 44 45 zx_status_t Cancel(async_dispatcher_t* dispatcher) { 46 return async_cancel_wait(dispatcher, this); 47 } 48 49protected: 50 virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 51 const zx_packet_signal_t* signal) { 52 run_count++; 53 last_status = status; 54 if (signal) { 55 last_signal_storage_ = *signal; 56 last_signal = &last_signal_storage_; 57 } else { 58 last_signal = nullptr; 59 } 60 } 61 62private: 63 static void CallHandler(async_dispatcher_t* dispatcher, async_wait_t* wait, 64 zx_status_t status, const zx_packet_signal_t* signal) { 65 static_cast<TestWait*>(wait)->Handle(dispatcher, status, signal); 66 } 67 68 zx_packet_signal_t last_signal_storage_; 69}; 70 71class CascadeWait : public TestWait { 72public: 73 CascadeWait(zx_handle_t object, zx_signals_t trigger, 74 zx_signals_t signals_to_clear, zx_signals_t signals_to_set, 75 bool repeat) 76 : TestWait(object, trigger), 77 signals_to_clear_(signals_to_clear), 78 signals_to_set_(signals_to_set), 79 repeat_(repeat) {} 80 81protected: 82 zx_signals_t signals_to_clear_; 83 zx_signals_t signals_to_set_; 84 bool repeat_; 85 86 void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 87 const zx_packet_signal_t* signal) override { 88 TestWait::Handle(dispatcher, status, signal); 89 zx_object_signal(object, signals_to_clear_, signals_to_set_); 90 if (repeat_ && status == ZX_OK) { 91 Begin(dispatcher); 92 } 93 } 94}; 95 96class SelfCancelingWait : public TestWait { 97public: 98 SelfCancelingWait(zx_handle_t object, zx_signals_t trigger) 99 : TestWait(object, trigger) {} 100 101 zx_status_t cancel_result = ZX_ERR_INTERNAL; 102 103protected: 104 void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 105 const zx_packet_signal_t* signal) override { 106 TestWait::Handle(dispatcher, status, signal); 107 cancel_result = Cancel(dispatcher); 108 } 109}; 110 111class TestTask : public async_task_t { 112public: 113 TestTask() 114 : async_task_t{{ASYNC_STATE_INIT}, &TestTask::CallHandler, ZX_TIME_INFINITE} {} 115 116 virtual ~TestTask() = default; 117 118 zx_status_t Post(async_dispatcher_t* dispatcher) { 119 this->deadline = async_now(dispatcher); 120 return async_post_task(dispatcher, this); 121 } 122 123 zx_status_t PostForTime(async_dispatcher_t* dispatcher, zx::time deadline) { 124 this->deadline = deadline.get(); 125 return async_post_task(dispatcher, this); 126 } 127 128 zx_status_t Cancel(async_dispatcher_t* dispatcher) { 129 return async_cancel_task(dispatcher, this); 130 } 131 132 uint32_t run_count = 0u; 133 zx_status_t last_status = ZX_ERR_INTERNAL; 134 135protected: 136 virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status) { 137 run_count++; 138 last_status = status; 139 } 140 141private: 142 static void CallHandler(async_dispatcher_t* dispatcher, async_task_t* task, zx_status_t status) { 143 static_cast<TestTask*>(task)->Handle(dispatcher, status); 144 } 145}; 146 147class QuitTask : public TestTask { 148public: 149 QuitTask() = default; 150 151protected: 152 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 153 TestTask::Handle(dispatcher, status); 154 async_loop_quit(async_loop_from_dispatcher(dispatcher)); 155 } 156}; 157 158class ResetQuitTask : public TestTask { 159public: 160 ResetQuitTask() = default; 161 162 zx_status_t result = ZX_ERR_INTERNAL; 163 164protected: 165 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 166 TestTask::Handle(dispatcher, status); 167 result = async_loop_reset_quit(async_loop_from_dispatcher(dispatcher)); 168 } 169}; 170 171class RepeatingTask : public TestTask { 172public: 173 RepeatingTask(zx::duration interval, uint32_t repeat_count) 174 : interval_(interval), repeat_count_(repeat_count) {} 175 176 void set_finish_callback(fbl::Closure callback) { 177 finish_callback_ = fbl::move(callback); 178 } 179 180protected: 181 zx::duration interval_; 182 uint32_t repeat_count_; 183 fbl::Closure finish_callback_; 184 185 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 186 TestTask::Handle(dispatcher, status); 187 if (repeat_count_ == 0) { 188 if (finish_callback_) 189 finish_callback_(); 190 } else { 191 repeat_count_ -= 1; 192 if (status == ZX_OK) { 193 deadline = zx_time_add_duration(deadline, interval_.get()); 194 Post(dispatcher); 195 } 196 } 197 } 198}; 199 200class SelfCancelingTask : public TestTask { 201public: 202 SelfCancelingTask() = default; 203 204 zx_status_t cancel_result = ZX_ERR_INTERNAL; 205 206protected: 207 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 208 TestTask::Handle(dispatcher, status); 209 cancel_result = Cancel(dispatcher); 210 } 211}; 212 213class TestReceiver : async_receiver_t { 214public: 215 TestReceiver() 216 : async_receiver_t{{ASYNC_STATE_INIT}, &TestReceiver::CallHandler} { 217 } 218 219 virtual ~TestReceiver() = default; 220 221 zx_status_t QueuePacket(async_dispatcher_t* dispatcher, const zx_packet_user_t* data) { 222 return async_queue_packet(dispatcher, this, data); 223 } 224 225 uint32_t run_count = 0u; 226 zx_status_t last_status = ZX_ERR_INTERNAL; 227 const zx_packet_user_t* last_data; 228 229protected: 230 virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 231 const zx_packet_user_t* data) { 232 run_count++; 233 last_status = status; 234 if (data) { 235 last_data_storage_ = *data; 236 last_data = &last_data_storage_; 237 } else { 238 last_data = nullptr; 239 } 240 } 241 242private: 243 static void CallHandler(async_dispatcher_t* dispatcher, async_receiver_t* receiver, 244 zx_status_t status, const zx_packet_user_t* data) { 245 static_cast<TestReceiver*>(receiver)->Handle(dispatcher, status, data); 246 } 247 248 zx_packet_user_t last_data_storage_{}; 249}; 250 251class TestException : public async_exception_t { 252public: 253 TestException(zx_handle_t task, uint32_t options) 254 : async_exception_t{{ASYNC_STATE_INIT}, &TestException::CallHandler, task, options} { 255 } 256 257 ~TestException() = default; 258 259 uint32_t run_count = 0u; 260 zx_status_t last_status = ZX_ERR_INTERNAL; 261 const zx_port_packet_t* last_report = nullptr; 262 263 zx_status_t Bind(async_dispatcher_t* dispatcher) { 264 return async_bind_exception_port(dispatcher, this); 265 } 266 267 zx_status_t Unbind(async_dispatcher_t* dispatcher) { 268 return async_unbind_exception_port(dispatcher, this); 269 } 270 271protected: 272 virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 273 const zx_port_packet_t* report) { 274 run_count++; 275 last_status = status; 276 if (report) { 277 last_report_storage_ = *report; 278 last_report = &last_report_storage_; 279 } else { 280 last_report = nullptr; 281 } 282 } 283 284private: 285 static void CallHandler(async_dispatcher_t* dispatcher, 286 async_exception_t* receiver, 287 zx_status_t status, 288 const zx_port_packet_t* report) { 289 static_cast<TestException*>(receiver)->Handle(dispatcher, status, report); 290 } 291 292 zx_port_packet_t last_report_storage_; 293}; 294 295// The C++ loop wrapper is one-to-one with the underlying C API so for the 296// most part we will test through that interface but here we make sure that 297// the C API actually exists but we don't comprehensively test what it does. 298bool c_api_basic_test() { 299 BEGIN_TEST; 300 301 async_loop_t* loop; 302 ASSERT_EQ(ZX_OK, async_loop_create(&kAsyncLoopConfigNoAttachToThread, &loop), "create"); 303 ASSERT_NONNULL(loop, "loop"); 304 305 EXPECT_EQ(ASYNC_LOOP_RUNNABLE, async_loop_get_state(loop), "runnable"); 306 307 async_loop_quit(loop); 308 EXPECT_EQ(ASYNC_LOOP_QUIT, async_loop_get_state(loop), "quitting"); 309 async_loop_run(loop, ZX_TIME_INFINITE, false); 310 EXPECT_EQ(ZX_OK, async_loop_reset_quit(loop)); 311 312 thrd_t thread{}; 313 EXPECT_EQ(ZX_OK, async_loop_start_thread(loop, "name", &thread), "thread start"); 314 EXPECT_NE(thrd_t{}, thread, "thread ws initialized"); 315 async_loop_quit(loop); 316 async_loop_join_threads(loop); 317 318 async_loop_shutdown(loop); 319 EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, async_loop_get_state(loop), "shutdown"); 320 321 async_loop_destroy(loop); 322 323 END_TEST; 324} 325 326bool make_default_false_test() { 327 BEGIN_TEST; 328 329 { 330 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 331 EXPECT_NULL(async_get_default_dispatcher(), "not default"); 332 } 333 EXPECT_NULL(async_get_default_dispatcher(), "still not default"); 334 335 END_TEST; 336} 337 338bool make_default_true_test() { 339 BEGIN_TEST; 340 341 async_loop_config_t config{}; 342 config.make_default_for_current_thread = true; 343 { 344 async::Loop loop(&config); 345 EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default"); 346 } 347 EXPECT_NULL(async_get_default_dispatcher(), "no longer default"); 348 349 END_TEST; 350} 351 352bool create_default_test() { 353 BEGIN_TEST; 354 355 { 356 async::Loop loop(&kAsyncLoopConfigAttachToThread); 357 EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default"); 358 } 359 EXPECT_NULL(async_get_default_dispatcher(), "no longer default"); 360 361 END_TEST; 362} 363 364bool quit_test() { 365 BEGIN_TEST; 366 367 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 368 EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "initially not quitting"); 369 370 loop.Quit(); 371 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting when quit"); 372 EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run returns immediately"); 373 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "still quitting"); 374 375 ResetQuitTask reset_quit_task; 376 EXPECT_EQ(ZX_OK, reset_quit_task.Post(loop.dispatcher()), "can post tasks even after quit"); 377 QuitTask quit_task; 378 EXPECT_EQ(ZX_OK, quit_task.Post(loop.dispatcher()), "can post tasks even after quit"); 379 380 EXPECT_EQ(ZX_OK, loop.ResetQuit()); 381 EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "not quitting after reset"); 382 383 EXPECT_EQ(ZX_OK, loop.Run(zx::time::infinite(), true /*once*/), "run tasks"); 384 385 EXPECT_EQ(1u, reset_quit_task.run_count, "reset quit task ran"); 386 EXPECT_EQ(ZX_ERR_BAD_STATE, reset_quit_task.result, "can't reset quit while loop is running"); 387 388 EXPECT_EQ(1u, quit_task.run_count, "quit task ran"); 389 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitted"); 390 391 EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "runs returns immediately when quitted"); 392 393 loop.Shutdown(); 394 EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState(), "shut down"); 395 EXPECT_EQ(ZX_ERR_BAD_STATE, loop.Run(), "run returns immediately when shut down"); 396 EXPECT_EQ(ZX_ERR_BAD_STATE, loop.ResetQuit()); 397 398 END_TEST; 399} 400 401bool time_test() { 402 BEGIN_TEST; 403 404 // Verify that the dispatcher's time-telling is strictly monotonic, 405 // which is constent with ZX_CLOCK_MONOTONIC. 406 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 407 zx::time t0 = zx::clock::get_monotonic(); 408 zx::time t1 = async::Now(loop.dispatcher()); 409 zx::time t2 = async::Now(loop.dispatcher()); 410 zx::time t3 = zx::clock::get_monotonic(); 411 412 EXPECT_LE(t0.get(), t1.get()); 413 EXPECT_LE(t1.get(), t2.get()); 414 EXPECT_LE(t2.get(), t3.get()); 415 416 END_TEST; 417} 418 419bool wait_test() { 420 BEGIN_TEST; 421 422 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 423 zx::event event; 424 EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event"); 425 426 CascadeWait wait1(event.get(), ZX_USER_SIGNAL_1, 427 0u, ZX_USER_SIGNAL_2, false); 428 CascadeWait wait2(event.get(), ZX_USER_SIGNAL_2, 429 ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, 0u, true); 430 CascadeWait wait3(event.get(), ZX_USER_SIGNAL_3, 431 ZX_USER_SIGNAL_3, 0u, true); 432 EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "wait 1"); 433 EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "wait 2"); 434 EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "wait 3"); 435 436 // Initially nothing is signaled. 437 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 438 EXPECT_EQ(0u, wait1.run_count, "run count 1"); 439 EXPECT_EQ(0u, wait2.run_count, "run count 2"); 440 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 441 442 // Set signal 1: notifies |wait1| which sets signal 2 and notifies |wait2| 443 // which clears signal 1 and 2 again. 444 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1"); 445 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 446 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 447 EXPECT_EQ(ZX_OK, wait1.last_status, "status 1"); 448 EXPECT_NONNULL(wait1.last_signal); 449 EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1"); 450 EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1"); 451 EXPECT_EQ(1u, wait1.last_signal->count, "count 1"); 452 EXPECT_EQ(1u, wait2.run_count, "run count 2"); 453 EXPECT_EQ(ZX_OK, wait2.last_status, "status 2"); 454 EXPECT_NONNULL(wait2.last_signal); 455 EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2"); 456 EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2"); 457 EXPECT_EQ(1u, wait2.last_signal->count, "count 2"); 458 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 459 460 // Set signal 1 again: does nothing because |wait1| was a one-shot. 461 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1"); 462 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 463 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 464 EXPECT_EQ(1u, wait2.run_count, "run count 2"); 465 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 466 467 // Set signal 2 again: notifies |wait2| which clears signal 1 and 2 again. 468 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_2), "signal 2"); 469 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 470 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 471 EXPECT_EQ(2u, wait2.run_count, "run count 2"); 472 EXPECT_EQ(ZX_OK, wait2.last_status, "status 2"); 473 EXPECT_NONNULL(wait2.last_signal); 474 EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2"); 475 EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2"); 476 EXPECT_EQ(1u, wait2.last_signal->count, "count 2"); 477 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 478 479 // Set signal 3: notifies |wait3| which clears signal 3. 480 // Do this a couple of times. 481 for (uint32_t i = 0; i < 3; i++) { 482 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3"); 483 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 484 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 485 EXPECT_EQ(2u, wait2.run_count, "run count 2"); 486 EXPECT_EQ(i + 1u, wait3.run_count, "run count 3"); 487 EXPECT_EQ(ZX_OK, wait3.last_status, "status 3"); 488 EXPECT_NONNULL(wait3.last_signal); 489 EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 3"); 490 EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 3"); 491 EXPECT_EQ(1u, wait3.last_signal->count, "count 3"); 492 } 493 494 // Cancel wait 3 then set signal 3 again: nothing happens this time. 495 EXPECT_EQ(ZX_OK, wait3.Cancel(loop.dispatcher()), "cancel"); 496 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3"); 497 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 498 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 499 EXPECT_EQ(2u, wait2.run_count, "run count 2"); 500 EXPECT_EQ(3u, wait3.run_count, "run count 3"); 501 502 // Redundant cancel returns an error. 503 EXPECT_EQ(ZX_ERR_NOT_FOUND, wait3.Cancel(loop.dispatcher()), "cancel again"); 504 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 505 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 506 EXPECT_EQ(2u, wait2.run_count, "run count 2"); 507 EXPECT_EQ(3u, wait3.run_count, "run count 3"); 508 509 loop.Shutdown(); 510 511 END_TEST; 512} 513 514bool wait_unwaitable_handle_test() { 515 BEGIN_TEST; 516 517 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 518 zx::event event; 519 EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event"); 520 event.replace(ZX_RIGHT_NONE, &event); 521 522 TestWait wait(event.get(), ZX_USER_SIGNAL_0); 523 EXPECT_EQ(ZX_ERR_ACCESS_DENIED, wait.Begin(loop.dispatcher()), "begin"); 524 EXPECT_EQ(ZX_ERR_NOT_FOUND, wait.Cancel(loop.dispatcher()), "cancel"); 525 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 526 EXPECT_EQ(0u, wait.run_count, "run count"); 527 528 END_TEST; 529} 530 531bool wait_shutdown_test() { 532 BEGIN_TEST; 533 534 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 535 zx::event event; 536 EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event"); 537 538 CascadeWait wait1(event.get(), ZX_USER_SIGNAL_0, 0u, 0u, false); 539 CascadeWait wait2(event.get(), ZX_USER_SIGNAL_0, ZX_USER_SIGNAL_0, 0u, true); 540 TestWait wait3(event.get(), ZX_USER_SIGNAL_1); 541 SelfCancelingWait wait4(event.get(), ZX_USER_SIGNAL_0); 542 SelfCancelingWait wait5(event.get(), ZX_USER_SIGNAL_1); 543 544 EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "begin 1"); 545 EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "begin 2"); 546 EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "begin 3"); 547 EXPECT_EQ(ZX_OK, wait4.Begin(loop.dispatcher()), "begin 4"); 548 EXPECT_EQ(ZX_OK, wait5.Begin(loop.dispatcher()), "begin 5"); 549 550 // Nothing signaled so nothing happens at first. 551 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 552 EXPECT_EQ(0u, wait1.run_count, "run count 1"); 553 EXPECT_EQ(0u, wait2.run_count, "run count 2"); 554 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 555 EXPECT_EQ(0u, wait4.run_count, "run count 4"); 556 EXPECT_EQ(0u, wait5.run_count, "run count 5"); 557 558 // Set signal 1: notifies both waiters, |wait2| clears the signal and repeats 559 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal 1"); 560 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 561 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 562 EXPECT_EQ(ZX_OK, wait1.last_status, "status 1"); 563 EXPECT_NONNULL(wait1.last_signal); 564 EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1"); 565 EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1"); 566 EXPECT_EQ(1u, wait1.last_signal->count, "count 1"); 567 EXPECT_EQ(1u, wait2.run_count, "run count 2"); 568 EXPECT_EQ(ZX_OK, wait2.last_status, "status 2"); 569 EXPECT_NONNULL(wait2.last_signal); 570 EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2"); 571 EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2"); 572 EXPECT_EQ(1u, wait2.last_signal->count, "count 2"); 573 EXPECT_EQ(0u, wait3.run_count, "run count 3"); 574 EXPECT_EQ(1u, wait4.run_count, "run count 4"); 575 EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 4"); 576 EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 4"); 577 EXPECT_EQ(ZX_ERR_NOT_FOUND, wait4.cancel_result, "cancel result 4"); 578 EXPECT_EQ(0u, wait5.run_count, "run count 5"); 579 580 // When the loop shuts down: 581 // |wait1| not notified because it was serviced and didn't repeat 582 // |wait2| notified because it repeated 583 // |wait3| notified because it was not yet serviced 584 // |wait4| not notified because it was serviced 585 // |wait5| notified because it was not yet serviced 586 loop.Shutdown(); 587 EXPECT_EQ(1u, wait1.run_count, "run count 1"); 588 EXPECT_EQ(2u, wait2.run_count, "run count 2"); 589 EXPECT_EQ(ZX_ERR_CANCELED, wait2.last_status, "status 2"); 590 EXPECT_NULL(wait2.last_signal, "signal 2"); 591 EXPECT_EQ(1u, wait3.run_count, "run count 3"); 592 EXPECT_EQ(ZX_ERR_CANCELED, wait3.last_status, "status 3"); 593 EXPECT_NULL(wait3.last_signal, "signal 3"); 594 EXPECT_EQ(1u, wait4.run_count, "run count 4"); 595 EXPECT_EQ(1u, wait5.run_count, "run count 5"); 596 EXPECT_EQ(ZX_ERR_CANCELED, wait5.last_status, "status 5"); 597 EXPECT_NULL(wait5.last_signal, "signal 5"); 598 EXPECT_EQ(ZX_ERR_NOT_FOUND, wait5.cancel_result, "cancel result 5"); 599 600 // Try to add or cancel work after shutdown. 601 TestWait wait6(event.get(), ZX_USER_SIGNAL_0); 602 EXPECT_EQ(ZX_ERR_BAD_STATE, wait6.Begin(loop.dispatcher()), "begin after shutdown"); 603 EXPECT_EQ(ZX_ERR_NOT_FOUND, wait6.Cancel(loop.dispatcher()), "cancel after shutdown"); 604 EXPECT_EQ(0u, wait6.run_count, "run count 6"); 605 606 END_TEST; 607} 608 609bool task_test() { 610 BEGIN_TEST; 611 612 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 613 614 zx::time start_time = async::Now(loop.dispatcher()); 615 TestTask task1; 616 RepeatingTask task2(zx::msec(1), 3u); 617 TestTask task3; 618 QuitTask task4; 619 TestTask task5; // posted after quit 620 621 EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1"); 622 EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2"); 623 EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), start_time), "post 3"); 624 task2.set_finish_callback([&loop, &task4, &task5, start_time] { 625 task4.PostForTime(loop.dispatcher(), start_time + zx::msec(10)); 626 task5.PostForTime(loop.dispatcher(), start_time + zx::msec(10)); 627 }); 628 629 // Cancel task 3. 630 EXPECT_EQ(ZX_OK, task3.Cancel(loop.dispatcher()), "cancel 3"); 631 632 // Run until quit. 633 EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop"); 634 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting"); 635 EXPECT_EQ(1u, task1.run_count, "run count 1"); 636 EXPECT_EQ(ZX_OK, task1.last_status, "status 1"); 637 EXPECT_EQ(4u, task2.run_count, "run count 2"); 638 EXPECT_EQ(ZX_OK, task2.last_status, "status 2"); 639 EXPECT_EQ(0u, task3.run_count, "run count 3"); 640 EXPECT_EQ(1u, task4.run_count, "run count 4"); 641 EXPECT_EQ(ZX_OK, task4.last_status, "status 4"); 642 EXPECT_EQ(0u, task5.run_count, "run count 5"); 643 644 // Reset quit and keep running, now task5 should go ahead followed 645 // by any subsequently posted tasks even if they have earlier deadlines. 646 QuitTask task6; 647 TestTask task7; 648 EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6"); 649 EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), start_time), "post 7"); 650 EXPECT_EQ(ZX_OK, loop.ResetQuit()); 651 EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop"); 652 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting"); 653 654 EXPECT_EQ(1u, task5.run_count, "run count 5"); 655 EXPECT_EQ(ZX_OK, task5.last_status, "status 5"); 656 EXPECT_EQ(1u, task6.run_count, "run count 6"); 657 EXPECT_EQ(ZX_OK, task6.last_status, "status 6"); 658 EXPECT_EQ(0u, task7.run_count, "run count 7"); 659 660 loop.Shutdown(); 661 662 END_TEST; 663} 664 665bool task_shutdown_test() { 666 BEGIN_TEST; 667 668 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 669 670 zx::time start_time = async::Now(loop.dispatcher()); 671 TestTask task1; 672 RepeatingTask task2(zx::msec(1000), 1u); 673 TestTask task3; 674 TestTask task4; 675 QuitTask task5; 676 SelfCancelingTask task6; 677 SelfCancelingTask task7; 678 679 EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1"); 680 EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2"); 681 EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 3"); 682 EXPECT_EQ(ZX_OK, task4.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 4"); 683 EXPECT_EQ(ZX_OK, task5.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 5"); 684 EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6"); 685 EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 7"); 686 687 // Run tasks which are due up to the time when the quit task runs. 688 EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop"); 689 EXPECT_EQ(1u, task1.run_count, "run count 1"); 690 EXPECT_EQ(ZX_OK, task1.last_status, "status 1"); 691 EXPECT_EQ(1u, task2.run_count, "run count 2"); 692 EXPECT_EQ(ZX_OK, task2.last_status, "status 2"); 693 EXPECT_EQ(0u, task3.run_count, "run count 3"); 694 EXPECT_EQ(0u, task4.run_count, "run count 4"); 695 EXPECT_EQ(1u, task5.run_count, "run count 5"); 696 EXPECT_EQ(ZX_OK, task5.last_status, "status 5"); 697 EXPECT_EQ(1u, task6.run_count, "run count 6"); 698 EXPECT_EQ(ZX_OK, task6.last_status, "status 6"); 699 EXPECT_EQ(ZX_ERR_NOT_FOUND, task6.cancel_result, "cancel result 6"); 700 EXPECT_EQ(0u, task7.run_count, "run count 7"); 701 702 // Cancel task 4. 703 EXPECT_EQ(ZX_OK, task4.Cancel(loop.dispatcher()), "cancel 4"); 704 705 // When the loop shuts down: 706 // |task1| not notified because it was serviced 707 // |task2| notified because it requested a repeat 708 // |task3| notified because it was not yet serviced 709 // |task4| not notified because it was canceled 710 // |task5| not notified because it was serviced 711 // |task6| not notified because it was serviced 712 // |task7| notified because it was not yet serviced 713 loop.Shutdown(); 714 EXPECT_EQ(1u, task1.run_count, "run count 1"); 715 EXPECT_EQ(2u, task2.run_count, "run count 2"); 716 EXPECT_EQ(ZX_ERR_CANCELED, task2.last_status, "status 2"); 717 EXPECT_EQ(1u, task3.run_count, "run count 3"); 718 EXPECT_EQ(ZX_ERR_CANCELED, task3.last_status, "status 3"); 719 EXPECT_EQ(0u, task4.run_count, "run count 4"); 720 EXPECT_EQ(1u, task5.run_count, "run count 5"); 721 EXPECT_EQ(1u, task6.run_count, "run count 6"); 722 EXPECT_EQ(1u, task7.run_count, "run count 7"); 723 EXPECT_EQ(ZX_ERR_CANCELED, task7.last_status, "status 7"); 724 EXPECT_EQ(ZX_ERR_NOT_FOUND, task7.cancel_result, "cancel result 7"); 725 726 // Try to add or cancel work after shutdown. 727 TestTask task8; 728 EXPECT_EQ(ZX_ERR_BAD_STATE, task8.PostForTime(loop.dispatcher(), zx::time::infinite()), "post after shutdown"); 729 EXPECT_EQ(ZX_ERR_NOT_FOUND, task8.Cancel(loop.dispatcher()), "cancel after shutdown"); 730 EXPECT_EQ(0u, task8.run_count, "run count 8"); 731 732 END_TEST; 733} 734 735bool receiver_test() { 736 const zx_packet_user_t data1{.u64 = {11, 12, 13, 14}}; 737 const zx_packet_user_t data2{.u64 = {21, 22, 23, 24}}; 738 const zx_packet_user_t data3{.u64 = {31, 32, 33, 34}}; 739 const zx_packet_user_t data_default{}; 740 741 BEGIN_TEST; 742 743 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 744 745 TestReceiver receiver1; 746 TestReceiver receiver2; 747 TestReceiver receiver3; 748 749 EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data1), "queue 1"); 750 EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data3), "queue 1, again"); 751 EXPECT_EQ(ZX_OK, receiver2.QueuePacket(loop.dispatcher(), &data2), "queue 2"); 752 EXPECT_EQ(ZX_OK, receiver3.QueuePacket(loop.dispatcher(), nullptr), "queue 3"); 753 754 EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop"); 755 EXPECT_EQ(2u, receiver1.run_count, "run count 1"); 756 EXPECT_EQ(ZX_OK, receiver1.last_status, "status 1"); 757 EXPECT_NONNULL(receiver1.last_data); 758 EXPECT_EQ(0, memcmp(&data3, receiver1.last_data, sizeof(zx_packet_user_t)), "data 1"); 759 EXPECT_EQ(1u, receiver2.run_count, "run count 2"); 760 EXPECT_EQ(ZX_OK, receiver2.last_status, "status 2"); 761 EXPECT_NONNULL(receiver2.last_data); 762 EXPECT_EQ(0, memcmp(&data2, receiver2.last_data, sizeof(zx_packet_user_t)), "data 2"); 763 EXPECT_EQ(1u, receiver3.run_count, "run count 3"); 764 EXPECT_EQ(ZX_OK, receiver3.last_status, "status 3"); 765 EXPECT_NONNULL(receiver3.last_data); 766 EXPECT_EQ(0, memcmp(&data_default, receiver3.last_data, sizeof(zx_packet_user_t)), "data 3"); 767 768 END_TEST; 769} 770 771bool receiver_shutdown_test() { 772 BEGIN_TEST; 773 774 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 775 loop.Shutdown(); 776 777 // Try to add work after shutdown. 778 TestReceiver receiver; 779 EXPECT_EQ(ZX_ERR_BAD_STATE, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue after shutdown"); 780 EXPECT_EQ(0u, receiver.run_count, "run count 1"); 781 782 END_TEST; 783} 784 785uint32_t get_thread_state(zx_handle_t thread) { 786 zx_info_thread_t info; 787 __UNUSED zx_status_t status = 788 zx_object_get_info(thread, ZX_INFO_THREAD, 789 &info, sizeof(info), NULL, NULL); 790 ZX_DEBUG_ASSERT(status == ZX_OK); 791 return info.state; 792} 793 794zx_koid_t get_koid(zx_handle_t handle) { 795 zx_info_handle_basic_t info; 796 __UNUSED zx_status_t status = 797 zx_object_get_info(handle, ZX_INFO_HANDLE_BASIC, 798 &info, sizeof(info), NULL, NULL); 799 ZX_DEBUG_ASSERT(status == ZX_OK); 800 return info.koid; 801} 802 803zx_status_t create_crashing_thread(zx_handle_t* out_thread) { 804 static const char kThreadName[] = "crasher"; 805 // Use zx_thread_create() so that the only cleanup we need to do is 806 // zx_task_kill/zx_handle_close. 807 auto status = zx_thread_create(zx_process_self(), kThreadName, 808 strlen(kThreadName), 0, out_thread); 809 if (status != ZX_OK) 810 return status; 811 // We want the thread to crash so we'll get an exception report. 812 // Easiest to just pass crashing values for pc,sp. 813 return zx_thread_start(*out_thread, 0u, 0u, 0u, 0u); 814} 815 816bool exception_test() { 817 BEGIN_TEST; 818 819 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 820 821 zx_handle_t self = zx_process_self(); 822 TestException exception(self, 0); 823 824 EXPECT_EQ(ZX_OK, exception.Bind(loop.dispatcher())); 825 826 // Initially nothing is signaled. 827 EXPECT_EQ(ZX_OK, loop.RunUntilIdle()); 828 EXPECT_EQ(0u, exception.run_count); 829 830 zx_handle_t crashing_thread; 831 EXPECT_EQ(ZX_OK, create_crashing_thread(&crashing_thread)); 832 833 // Wait until thread has crashed. 834 uint32_t state; 835 do { 836 zx_nanosleep(zx_deadline_after(ZX_MSEC(1))); 837 state = get_thread_state(crashing_thread); 838 } while (state != ZX_THREAD_STATE_BLOCKED_EXCEPTION); 839 840 // There should now be an exception to read. 841 EXPECT_EQ(ZX_OK, loop.RunUntilIdle()); 842 EXPECT_EQ(1u, exception.run_count); 843 EXPECT_EQ(ZX_OK, exception.last_status); 844 EXPECT_NONNULL(exception.last_report); 845 EXPECT_EQ(ZX_EXCP_FATAL_PAGE_FAULT, exception.last_report->type); 846 EXPECT_EQ(get_koid(self), exception.last_report->exception.pid); 847 EXPECT_EQ(get_koid(crashing_thread), exception.last_report->exception.tid); 848 zx_task_kill(crashing_thread); 849 zx_handle_close(crashing_thread); 850 851 loop.Shutdown(); 852 EXPECT_EQ(ZX_ERR_CANCELED, exception.last_status); 853 854 END_TEST; 855} 856 857bool exception_shutdown_test() { 858 BEGIN_TEST; 859 860 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 861 loop.Shutdown(); 862 863 // Try to bind a port after shutdown. 864 TestException exception(zx_process_self(), 0); 865 EXPECT_EQ(ZX_ERR_BAD_STATE, exception.Bind(loop.dispatcher())); 866 867 END_TEST; 868} 869 870class GetDefaultDispatcherTask : public QuitTask { 871public: 872 async_dispatcher_t* last_default_dispatcher; 873 874protected: 875 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 876 QuitTask::Handle(dispatcher, status); 877 last_default_dispatcher = async_get_default_dispatcher(); 878 } 879}; 880 881class ConcurrencyMeasure { 882public: 883 ConcurrencyMeasure(uint32_t end) 884 : end_(end) {} 885 886 uint32_t max_threads() const { return fbl::atomic_load(&max_threads_, fbl::memory_order_acquire); } 887 uint32_t count() const { return fbl::atomic_load(&count_, fbl::memory_order_acquire); } 888 889 void Tally(async_dispatcher_t* dispatcher) { 890 // Increment count of concurrently active threads. Update maximum if needed. 891 uint32_t active = 1u + fbl::atomic_fetch_add(&active_threads_, 1u, 892 fbl::memory_order_acq_rel); 893 uint32_t old_max; 894 do { 895 old_max = fbl::atomic_load(&max_threads_, fbl::memory_order_acquire); 896 } while (active > old_max && 897 !fbl::atomic_compare_exchange_weak(&max_threads_, &old_max, active, 898 fbl::memory_order_acq_rel, 899 fbl::memory_order_acquire)); 900 901 // Pretend to do work. 902 zx::nanosleep(zx::deadline_after(zx::msec(1))); 903 904 // Decrement count of active threads. 905 fbl::atomic_fetch_sub(&active_threads_, 1u, fbl::memory_order_acq_rel); 906 907 // Quit when last item processed. 908 if (1u + fbl::atomic_fetch_add(&count_, 1u, fbl::memory_order_acq_rel) == end_) 909 async_loop_quit(async_loop_from_dispatcher(dispatcher)); 910 } 911 912private: 913 const uint32_t end_; 914 fbl::atomic_uint32_t count_{}; 915 fbl::atomic_uint32_t active_threads_{}; 916 fbl::atomic_uint32_t max_threads_{}; 917}; 918 919class ThreadAssertWait : public TestWait { 920public: 921 ThreadAssertWait(zx_handle_t object, zx_signals_t trigger, ConcurrencyMeasure* measure) 922 : TestWait(object, trigger), measure_(measure) {} 923 924protected: 925 ConcurrencyMeasure* measure_; 926 927 void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 928 const zx_packet_signal_t* signal) override { 929 TestWait::Handle(dispatcher, status, signal); 930 measure_->Tally(dispatcher); 931 } 932}; 933 934class ThreadAssertTask : public TestTask { 935public: 936 ThreadAssertTask(ConcurrencyMeasure* measure) 937 : measure_(measure) {} 938 939protected: 940 ConcurrencyMeasure* measure_; 941 942 void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override { 943 TestTask::Handle(dispatcher, status); 944 measure_->Tally(dispatcher); 945 } 946}; 947 948class ThreadAssertReceiver : public TestReceiver { 949public: 950 ThreadAssertReceiver(ConcurrencyMeasure* measure) 951 : measure_(measure) {} 952 953protected: 954 ConcurrencyMeasure* measure_; 955 956 // This receiver's handler will run concurrently on multiple threads 957 // (unlike the Waits and Tasks) so we must guard its state. 958 fbl::Mutex mutex_; 959 960 void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 961 const zx_packet_user_t* data) override { 962 { 963 fbl::AutoLock lock(&mutex_); 964 TestReceiver::Handle(dispatcher, status, data); 965 } 966 measure_->Tally(dispatcher); 967 } 968}; 969 970class ThreadAssertException : public TestException { 971public: 972 ThreadAssertException(zx_handle_t task, uint32_t options, 973 ConcurrencyMeasure* measure) 974 : TestException(task, options), measure_(measure) {} 975 976protected: 977 ConcurrencyMeasure* measure_; 978 979 // This receiver's handler will run concurrently on multiple threads 980 // (unlike the Waits and Tasks) so we must guard its state. 981 fbl::Mutex mutex_; 982 983 void Handle(async_dispatcher_t* dispatcher, zx_status_t status, 984 const zx_port_packet_t* report) override { 985 { 986 fbl::AutoLock lock(&mutex_); 987 TestException::Handle(dispatcher, status, report); 988 } 989 measure_->Tally(dispatcher); 990 } 991}; 992 993bool threads_have_default_dispatcher() { 994 BEGIN_TEST; 995 996 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 997 EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread"); 998 999 GetDefaultDispatcherTask task; 1000 EXPECT_EQ(ZX_OK, task.Post(loop.dispatcher()), "post task"); 1001 loop.JoinThreads(); 1002 1003 EXPECT_EQ(1u, task.run_count, "run count"); 1004 EXPECT_EQ(ZX_OK, task.last_status, "status"); 1005 EXPECT_EQ(loop.dispatcher(), task.last_default_dispatcher, "default dispatcher"); 1006 1007 END_TEST; 1008} 1009 1010// The goal here is to ensure that threads stop when Quit() is called. 1011bool threads_quit() { 1012 const size_t num_threads = 4; 1013 1014 BEGIN_TEST; 1015 1016 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1017 for (size_t i = 0; i < num_threads; i++) { 1018 EXPECT_EQ(ZX_OK, loop.StartThread()); 1019 } 1020 loop.Quit(); 1021 loop.JoinThreads(); 1022 EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState()); 1023 1024 END_TEST; 1025} 1026 1027// The goal here is to ensure that threads stop when Shutdown() is called. 1028bool threads_shutdown() { 1029 const size_t num_threads = 4; 1030 1031 BEGIN_TEST; 1032 1033 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1034 for (size_t i = 0; i < num_threads; i++) { 1035 EXPECT_EQ(ZX_OK, loop.StartThread()); 1036 } 1037 loop.Shutdown(); 1038 EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState()); 1039 1040 loop.JoinThreads(); // should be a no-op 1041 1042 EXPECT_EQ(ZX_ERR_BAD_STATE, loop.StartThread(), "can't start threads after shutdown"); 1043 1044 END_TEST; 1045} 1046 1047// The goal here is to schedule a lot of work and see whether it runs 1048// on as many threads as we expected it to. 1049bool threads_waits_run_concurrently_test() { 1050 const size_t num_threads = 4; 1051 const size_t num_items = 100; 1052 1053 BEGIN_TEST; 1054 1055 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1056 for (size_t i = 0; i < num_threads; i++) { 1057 EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread"); 1058 } 1059 1060 ConcurrencyMeasure measure(num_items); 1061 zx::event event; 1062 EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event"); 1063 EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal"); 1064 1065 // Post a number of work items to run all at once. 1066 ThreadAssertWait* items[num_items]; 1067 for (size_t i = 0; i < num_items; i++) { 1068 items[i] = new ThreadAssertWait(event.get(), ZX_USER_SIGNAL_0, &measure); 1069 EXPECT_EQ(ZX_OK, items[i]->Begin(loop.dispatcher()), "begin wait"); 1070 } 1071 1072 // Wait until quitted. 1073 loop.JoinThreads(); 1074 1075 // Ensure all work items completed. 1076 EXPECT_EQ(num_items, measure.count(), "item count"); 1077 for (size_t i = 0; i < num_items; i++) { 1078 EXPECT_EQ(1u, items[i]->run_count, "run count"); 1079 EXPECT_EQ(ZX_OK, items[i]->last_status, "status"); 1080 EXPECT_NONNULL(items[i]->last_signal, "signal"); 1081 EXPECT_EQ(ZX_USER_SIGNAL_0, items[i]->last_signal->observed & ZX_USER_SIGNAL_ALL, "observed"); 1082 delete items[i]; 1083 } 1084 1085 // Ensure that we actually ran many waits concurrently on different threads. 1086 EXPECT_NE(1u, measure.max_threads(), "waits handled concurrently"); 1087 1088 END_TEST; 1089} 1090 1091// The goal here is to schedule a lot of work and see whether it runs 1092// on as many threads as we expected it to. 1093bool threads_tasks_run_sequentially_test() { 1094 const size_t num_threads = 4; 1095 const size_t num_items = 100; 1096 1097 BEGIN_TEST; 1098 1099 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1100 for (size_t i = 0; i < num_threads; i++) { 1101 EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread"); 1102 } 1103 1104 ConcurrencyMeasure measure(num_items); 1105 1106 // Post a number of work items to run all at once. 1107 ThreadAssertTask* items[num_items]; 1108 zx::time start_time = async::Now(loop.dispatcher()); 1109 for (size_t i = 0; i < num_items; i++) { 1110 items[i] = new ThreadAssertTask(&measure); 1111 EXPECT_EQ(ZX_OK, items[i]->PostForTime(loop.dispatcher(), start_time + zx::msec(i)), "post task"); 1112 } 1113 1114 // Wait until quitted. 1115 loop.JoinThreads(); 1116 1117 // Ensure all work items completed. 1118 EXPECT_EQ(num_items, measure.count(), "item count"); 1119 for (size_t i = 0; i < num_items; i++) { 1120 EXPECT_EQ(1u, items[i]->run_count, "run count"); 1121 EXPECT_EQ(ZX_OK, items[i]->last_status, "status"); 1122 delete items[i]; 1123 } 1124 1125 // Ensure that we actually ran tasks sequentially despite having many 1126 // threads available. 1127 EXPECT_EQ(1u, measure.max_threads(), "tasks handled sequentially"); 1128 1129 END_TEST; 1130} 1131 1132// The goal here is to schedule a lot of work and see whether it runs 1133// on as many threads as we expected it to. 1134bool threads_receivers_run_concurrently_test() { 1135 const size_t num_threads = 4; 1136 const size_t num_items = 100; 1137 1138 BEGIN_TEST; 1139 1140 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1141 for (size_t i = 0; i < num_threads; i++) { 1142 EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread"); 1143 } 1144 1145 ConcurrencyMeasure measure(num_items); 1146 1147 // Post a number of packets all at once. 1148 ThreadAssertReceiver receiver(&measure); 1149 for (size_t i = 0; i < num_items; i++) { 1150 EXPECT_EQ(ZX_OK, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue packet"); 1151 } 1152 1153 // Wait until quitted. 1154 loop.JoinThreads(); 1155 1156 // Ensure all work items completed. 1157 EXPECT_EQ(num_items, measure.count(), "item count"); 1158 EXPECT_EQ(num_items, receiver.run_count, "run count"); 1159 EXPECT_EQ(ZX_OK, receiver.last_status, "status"); 1160 1161 // Ensure that we actually processed many packets concurrently on different threads. 1162 EXPECT_NE(1u, measure.max_threads(), "packets handled concurrently"); 1163 1164 END_TEST; 1165} 1166 1167// The goal here is to schedule a lot of work and see whether it runs 1168// on as many threads as we expected it to. 1169bool threads_exceptions_run_concurrently_test() { 1170 const size_t num_threads = 4; 1171 // We generate this number of exceptions, and therefore this number of 1172 // crashing threads, so this number isn't that large (e.g., not 100). 1173 const size_t num_items = 10; 1174 1175 BEGIN_TEST; 1176 1177 ConcurrencyMeasure measure(num_items); 1178 1179 // The exception receiver object must survive the lifetime of the async 1180 // loop. 1181 ThreadAssertException receiver(zx_process_self(), 0, &measure); 1182 1183 { 1184 zx_handle_t crashing_threads[num_items]; 1185 async::Loop loop(&kAsyncLoopConfigNoAttachToThread); 1186 EXPECT_EQ(ZX_OK, receiver.Bind(loop.dispatcher())); 1187 1188 for (size_t i = 0; i < num_threads; i++) { 1189 EXPECT_EQ(ZX_OK, loop.StartThread()); 1190 } 1191 1192 // Post a number of packets all at once. 1193 for (size_t i = 0; i < num_items; i++) { 1194 EXPECT_EQ(ZX_OK, create_crashing_thread(&crashing_threads[i])); 1195 } 1196 // We don't need to wait for the threads to crash here as the loop 1197 // will continue until |measure| receives |num_items|. 1198 1199 // Wait until quitted. 1200 loop.JoinThreads(); 1201 1202 // Make sure the threads are gone before we unbind the exception port, 1203 // otherwise the global crash-handler will see the exceptions. 1204 for (size_t i = 0; i < num_items; i++) { 1205 zx_task_kill(crashing_threads[i]); 1206 zx_handle_close(crashing_threads[i]); 1207 } 1208 1209 // Ensure all work items completed. 1210 // When |loop| goes out of scope |receiver| will get ZX_ERR_CANCELED, 1211 // which will add one to the packet received count. Do these tests 1212 // here before |loop| goes out of scope. 1213 EXPECT_EQ(num_items, measure.count()); 1214 EXPECT_EQ(num_items, receiver.run_count); 1215 EXPECT_EQ(ZX_OK, receiver.last_status); 1216 } 1217 1218 // Loop shutdown -> ZX_ERR_CANCELED. 1219 EXPECT_EQ(ZX_ERR_CANCELED, receiver.last_status); 1220 1221 // Ensure that we actually processed many packets concurrently on different threads. 1222 EXPECT_NE(1u, measure.max_threads()); 1223 1224 END_TEST; 1225} 1226 1227} // namespace 1228 1229BEGIN_TEST_CASE(loop_tests) 1230RUN_TEST(c_api_basic_test) 1231RUN_TEST(make_default_false_test) 1232RUN_TEST(make_default_true_test) 1233RUN_TEST(quit_test) 1234RUN_TEST(time_test) 1235RUN_TEST(wait_test) 1236RUN_TEST(wait_unwaitable_handle_test) 1237RUN_TEST(wait_shutdown_test) 1238RUN_TEST(task_test) 1239RUN_TEST(task_shutdown_test) 1240RUN_TEST(receiver_test) 1241RUN_TEST(receiver_shutdown_test) 1242RUN_TEST(exception_test) 1243RUN_TEST(exception_shutdown_test) 1244RUN_TEST(threads_have_default_dispatcher) 1245for (int i = 0; i < 3; i++) { 1246 RUN_TEST(threads_quit) 1247 RUN_TEST(threads_shutdown) 1248 RUN_TEST(threads_waits_run_concurrently_test) 1249 RUN_TEST(threads_tasks_run_sequentially_test) 1250 RUN_TEST(threads_receivers_run_concurrently_test) 1251 RUN_TEST(threads_exceptions_run_concurrently_test) 1252} 1253END_TEST_CASE(loop_tests) 1254