1// Copyright 2017 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <threads.h>
6
7#include <zircon/process.h>
8#include <zircon/syscalls.h>
9#include <zircon/syscalls/exception.h>
10
11#include <lib/async-loop/cpp/loop.h>
12#include <lib/async/cpp/time.h>
13#include <lib/async/exception.h>
14#include <lib/async/receiver.h>
15#include <lib/async/task.h>
16#include <lib/async/time.h>
17#include <lib/async/wait.h>
18
19#include <fbl/atomic.h>
20#include <fbl/auto_lock.h>
21#include <fbl/function.h>
22#include <fbl/mutex.h>
23#include <lib/zx/event.h>
24#include <unittest/unittest.h>
25#include <zircon/threads.h>
26
27namespace {
28
29class TestWait : public async_wait_t {
30public:
31    TestWait(zx_handle_t object, zx_signals_t trigger)
32        : async_wait_t{{ASYNC_STATE_INIT}, &TestWait::CallHandler, object, trigger} {
33    }
34
35    virtual ~TestWait() = default;
36
37    uint32_t run_count = 0u;
38    zx_status_t last_status = ZX_ERR_INTERNAL;
39    const zx_packet_signal_t* last_signal = nullptr;
40
41    zx_status_t Begin(async_dispatcher_t* dispatcher) {
42        return async_begin_wait(dispatcher, this);
43    }
44
45    zx_status_t Cancel(async_dispatcher_t* dispatcher) {
46        return async_cancel_wait(dispatcher, this);
47    }
48
49protected:
50    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
51                        const zx_packet_signal_t* signal) {
52        run_count++;
53        last_status = status;
54        if (signal) {
55            last_signal_storage_ = *signal;
56            last_signal = &last_signal_storage_;
57        } else {
58            last_signal = nullptr;
59        }
60    }
61
62private:
63    static void CallHandler(async_dispatcher_t* dispatcher, async_wait_t* wait,
64                            zx_status_t status, const zx_packet_signal_t* signal) {
65        static_cast<TestWait*>(wait)->Handle(dispatcher, status, signal);
66    }
67
68    zx_packet_signal_t last_signal_storage_;
69};
70
71class CascadeWait : public TestWait {
72public:
73    CascadeWait(zx_handle_t object, zx_signals_t trigger,
74                zx_signals_t signals_to_clear, zx_signals_t signals_to_set,
75                bool repeat)
76        : TestWait(object, trigger),
77          signals_to_clear_(signals_to_clear),
78          signals_to_set_(signals_to_set),
79          repeat_(repeat) {}
80
81protected:
82    zx_signals_t signals_to_clear_;
83    zx_signals_t signals_to_set_;
84    bool repeat_;
85
86    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
87                const zx_packet_signal_t* signal) override {
88        TestWait::Handle(dispatcher, status, signal);
89        zx_object_signal(object, signals_to_clear_, signals_to_set_);
90        if (repeat_ && status == ZX_OK) {
91            Begin(dispatcher);
92        }
93    }
94};
95
96class SelfCancelingWait : public TestWait {
97public:
98    SelfCancelingWait(zx_handle_t object, zx_signals_t trigger)
99        : TestWait(object, trigger) {}
100
101    zx_status_t cancel_result = ZX_ERR_INTERNAL;
102
103protected:
104    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
105                const zx_packet_signal_t* signal) override {
106        TestWait::Handle(dispatcher, status, signal);
107        cancel_result = Cancel(dispatcher);
108    }
109};
110
111class TestTask : public async_task_t {
112public:
113    TestTask()
114        : async_task_t{{ASYNC_STATE_INIT}, &TestTask::CallHandler, ZX_TIME_INFINITE} {}
115
116    virtual ~TestTask() = default;
117
118    zx_status_t Post(async_dispatcher_t* dispatcher) {
119        this->deadline = async_now(dispatcher);
120        return async_post_task(dispatcher, this);
121    }
122
123    zx_status_t PostForTime(async_dispatcher_t* dispatcher, zx::time deadline) {
124        this->deadline = deadline.get();
125        return async_post_task(dispatcher, this);
126    }
127
128    zx_status_t Cancel(async_dispatcher_t* dispatcher) {
129        return async_cancel_task(dispatcher, this);
130    }
131
132    uint32_t run_count = 0u;
133    zx_status_t last_status = ZX_ERR_INTERNAL;
134
135protected:
136    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status) {
137        run_count++;
138        last_status = status;
139    }
140
141private:
142    static void CallHandler(async_dispatcher_t* dispatcher, async_task_t* task, zx_status_t status) {
143        static_cast<TestTask*>(task)->Handle(dispatcher, status);
144    }
145};
146
147class QuitTask : public TestTask {
148public:
149    QuitTask() = default;
150
151protected:
152    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
153        TestTask::Handle(dispatcher, status);
154        async_loop_quit(async_loop_from_dispatcher(dispatcher));
155    }
156};
157
158class ResetQuitTask : public TestTask {
159public:
160    ResetQuitTask() = default;
161
162    zx_status_t result = ZX_ERR_INTERNAL;
163
164protected:
165    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
166        TestTask::Handle(dispatcher, status);
167        result = async_loop_reset_quit(async_loop_from_dispatcher(dispatcher));
168    }
169};
170
171class RepeatingTask : public TestTask {
172public:
173    RepeatingTask(zx::duration interval, uint32_t repeat_count)
174        : interval_(interval), repeat_count_(repeat_count) {}
175
176    void set_finish_callback(fbl::Closure callback) {
177        finish_callback_ = fbl::move(callback);
178    }
179
180protected:
181    zx::duration interval_;
182    uint32_t repeat_count_;
183    fbl::Closure finish_callback_;
184
185    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
186        TestTask::Handle(dispatcher, status);
187        if (repeat_count_ == 0) {
188            if (finish_callback_)
189                finish_callback_();
190        } else {
191            repeat_count_ -= 1;
192            if (status == ZX_OK) {
193                deadline = zx_time_add_duration(deadline, interval_.get());
194                Post(dispatcher);
195            }
196        }
197    }
198};
199
200class SelfCancelingTask : public TestTask {
201public:
202    SelfCancelingTask() = default;
203
204    zx_status_t cancel_result = ZX_ERR_INTERNAL;
205
206protected:
207    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
208        TestTask::Handle(dispatcher, status);
209        cancel_result = Cancel(dispatcher);
210    }
211};
212
213class TestReceiver : async_receiver_t {
214public:
215    TestReceiver()
216        : async_receiver_t{{ASYNC_STATE_INIT}, &TestReceiver::CallHandler} {
217    }
218
219    virtual ~TestReceiver() = default;
220
221    zx_status_t QueuePacket(async_dispatcher_t* dispatcher, const zx_packet_user_t* data) {
222        return async_queue_packet(dispatcher, this, data);
223    }
224
225    uint32_t run_count = 0u;
226    zx_status_t last_status = ZX_ERR_INTERNAL;
227    const zx_packet_user_t* last_data;
228
229protected:
230    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
231                        const zx_packet_user_t* data) {
232        run_count++;
233        last_status = status;
234        if (data) {
235            last_data_storage_ = *data;
236            last_data = &last_data_storage_;
237        } else {
238            last_data = nullptr;
239        }
240    }
241
242private:
243    static void CallHandler(async_dispatcher_t* dispatcher, async_receiver_t* receiver,
244                            zx_status_t status, const zx_packet_user_t* data) {
245        static_cast<TestReceiver*>(receiver)->Handle(dispatcher, status, data);
246    }
247
248    zx_packet_user_t last_data_storage_{};
249};
250
251class TestException : public async_exception_t {
252public:
253    TestException(zx_handle_t task, uint32_t options)
254        : async_exception_t{{ASYNC_STATE_INIT}, &TestException::CallHandler, task, options} {
255    }
256
257    ~TestException() = default;
258
259    uint32_t run_count = 0u;
260    zx_status_t last_status = ZX_ERR_INTERNAL;
261    const zx_port_packet_t* last_report = nullptr;
262
263    zx_status_t Bind(async_dispatcher_t* dispatcher) {
264        return async_bind_exception_port(dispatcher, this);
265    }
266
267    zx_status_t Unbind(async_dispatcher_t* dispatcher) {
268        return async_unbind_exception_port(dispatcher, this);
269    }
270
271protected:
272    virtual void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
273                        const zx_port_packet_t* report) {
274        run_count++;
275        last_status = status;
276        if (report) {
277            last_report_storage_ = *report;
278            last_report = &last_report_storage_;
279        } else {
280            last_report = nullptr;
281        }
282    }
283
284private:
285    static void CallHandler(async_dispatcher_t* dispatcher,
286                            async_exception_t* receiver,
287                            zx_status_t status,
288                            const zx_port_packet_t* report) {
289        static_cast<TestException*>(receiver)->Handle(dispatcher, status, report);
290    }
291
292    zx_port_packet_t last_report_storage_;
293};
294
295// The C++ loop wrapper is one-to-one with the underlying C API so for the
296// most part we will test through that interface but here we make sure that
297// the C API actually exists but we don't comprehensively test what it does.
298bool c_api_basic_test() {
299    BEGIN_TEST;
300
301    async_loop_t* loop;
302    ASSERT_EQ(ZX_OK, async_loop_create(&kAsyncLoopConfigNoAttachToThread, &loop), "create");
303    ASSERT_NONNULL(loop, "loop");
304
305    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, async_loop_get_state(loop), "runnable");
306
307    async_loop_quit(loop);
308    EXPECT_EQ(ASYNC_LOOP_QUIT, async_loop_get_state(loop), "quitting");
309    async_loop_run(loop, ZX_TIME_INFINITE, false);
310    EXPECT_EQ(ZX_OK, async_loop_reset_quit(loop));
311
312    thrd_t thread{};
313    EXPECT_EQ(ZX_OK, async_loop_start_thread(loop, "name", &thread), "thread start");
314    EXPECT_NE(thrd_t{}, thread, "thread ws initialized");
315    async_loop_quit(loop);
316    async_loop_join_threads(loop);
317
318    async_loop_shutdown(loop);
319    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, async_loop_get_state(loop), "shutdown");
320
321    async_loop_destroy(loop);
322
323    END_TEST;
324}
325
326bool make_default_false_test() {
327    BEGIN_TEST;
328
329    {
330        async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
331        EXPECT_NULL(async_get_default_dispatcher(), "not default");
332    }
333    EXPECT_NULL(async_get_default_dispatcher(), "still not default");
334
335    END_TEST;
336}
337
338bool make_default_true_test() {
339    BEGIN_TEST;
340
341    async_loop_config_t config{};
342    config.make_default_for_current_thread = true;
343    {
344        async::Loop loop(&config);
345        EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default");
346    }
347    EXPECT_NULL(async_get_default_dispatcher(), "no longer default");
348
349    END_TEST;
350}
351
352bool create_default_test() {
353    BEGIN_TEST;
354
355    {
356        async::Loop loop(&kAsyncLoopConfigAttachToThread);
357        EXPECT_EQ(loop.dispatcher(), async_get_default_dispatcher(), "became default");
358    }
359    EXPECT_NULL(async_get_default_dispatcher(), "no longer default");
360
361    END_TEST;
362}
363
364bool quit_test() {
365    BEGIN_TEST;
366
367    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
368    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "initially not quitting");
369
370    loop.Quit();
371    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting when quit");
372    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run returns immediately");
373    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "still quitting");
374
375    ResetQuitTask reset_quit_task;
376    EXPECT_EQ(ZX_OK, reset_quit_task.Post(loop.dispatcher()), "can post tasks even after quit");
377    QuitTask quit_task;
378    EXPECT_EQ(ZX_OK, quit_task.Post(loop.dispatcher()), "can post tasks even after quit");
379
380    EXPECT_EQ(ZX_OK, loop.ResetQuit());
381    EXPECT_EQ(ASYNC_LOOP_RUNNABLE, loop.GetState(), "not quitting after reset");
382
383    EXPECT_EQ(ZX_OK, loop.Run(zx::time::infinite(), true /*once*/), "run tasks");
384
385    EXPECT_EQ(1u, reset_quit_task.run_count, "reset quit task ran");
386    EXPECT_EQ(ZX_ERR_BAD_STATE, reset_quit_task.result, "can't reset quit while loop is running");
387
388    EXPECT_EQ(1u, quit_task.run_count, "quit task ran");
389    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitted");
390
391    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "runs returns immediately when quitted");
392
393    loop.Shutdown();
394    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState(), "shut down");
395    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.Run(), "run returns immediately when shut down");
396    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.ResetQuit());
397
398    END_TEST;
399}
400
401bool time_test() {
402    BEGIN_TEST;
403
404    // Verify that the dispatcher's time-telling is strictly monotonic,
405    // which is constent with ZX_CLOCK_MONOTONIC.
406    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
407    zx::time t0 = zx::clock::get_monotonic();
408    zx::time t1 = async::Now(loop.dispatcher());
409    zx::time t2 = async::Now(loop.dispatcher());
410    zx::time t3 = zx::clock::get_monotonic();
411
412    EXPECT_LE(t0.get(), t1.get());
413    EXPECT_LE(t1.get(), t2.get());
414    EXPECT_LE(t2.get(), t3.get());
415
416    END_TEST;
417}
418
419bool wait_test() {
420    BEGIN_TEST;
421
422    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
423    zx::event event;
424    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
425
426    CascadeWait wait1(event.get(), ZX_USER_SIGNAL_1,
427                      0u, ZX_USER_SIGNAL_2, false);
428    CascadeWait wait2(event.get(), ZX_USER_SIGNAL_2,
429                      ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, 0u, true);
430    CascadeWait wait3(event.get(), ZX_USER_SIGNAL_3,
431                      ZX_USER_SIGNAL_3, 0u, true);
432    EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "wait 1");
433    EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "wait 2");
434    EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "wait 3");
435
436    // Initially nothing is signaled.
437    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
438    EXPECT_EQ(0u, wait1.run_count, "run count 1");
439    EXPECT_EQ(0u, wait2.run_count, "run count 2");
440    EXPECT_EQ(0u, wait3.run_count, "run count 3");
441
442    // Set signal 1: notifies |wait1| which sets signal 2 and notifies |wait2|
443    // which clears signal 1 and 2 again.
444    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1");
445    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
446    EXPECT_EQ(1u, wait1.run_count, "run count 1");
447    EXPECT_EQ(ZX_OK, wait1.last_status, "status 1");
448    EXPECT_NONNULL(wait1.last_signal);
449    EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1");
450    EXPECT_EQ(ZX_USER_SIGNAL_1, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1");
451    EXPECT_EQ(1u, wait1.last_signal->count, "count 1");
452    EXPECT_EQ(1u, wait2.run_count, "run count 2");
453    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
454    EXPECT_NONNULL(wait2.last_signal);
455    EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
456    EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
457    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
458    EXPECT_EQ(0u, wait3.run_count, "run count 3");
459
460    // Set signal 1 again: does nothing because |wait1| was a one-shot.
461    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_1), "signal 1");
462    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
463    EXPECT_EQ(1u, wait1.run_count, "run count 1");
464    EXPECT_EQ(1u, wait2.run_count, "run count 2");
465    EXPECT_EQ(0u, wait3.run_count, "run count 3");
466
467    // Set signal 2 again: notifies |wait2| which clears signal 1 and 2 again.
468    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_2), "signal 2");
469    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
470    EXPECT_EQ(1u, wait1.run_count, "run count 1");
471    EXPECT_EQ(2u, wait2.run_count, "run count 2");
472    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
473    EXPECT_NONNULL(wait2.last_signal);
474    EXPECT_EQ(ZX_USER_SIGNAL_2, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
475    EXPECT_EQ(ZX_USER_SIGNAL_1 | ZX_USER_SIGNAL_2, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
476    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
477    EXPECT_EQ(0u, wait3.run_count, "run count 3");
478
479    // Set signal 3: notifies |wait3| which clears signal 3.
480    // Do this a couple of times.
481    for (uint32_t i = 0; i < 3; i++) {
482        EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3");
483        EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
484        EXPECT_EQ(1u, wait1.run_count, "run count 1");
485        EXPECT_EQ(2u, wait2.run_count, "run count 2");
486        EXPECT_EQ(i + 1u, wait3.run_count, "run count 3");
487        EXPECT_EQ(ZX_OK, wait3.last_status, "status 3");
488        EXPECT_NONNULL(wait3.last_signal);
489        EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 3");
490        EXPECT_EQ(ZX_USER_SIGNAL_3, wait3.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 3");
491        EXPECT_EQ(1u, wait3.last_signal->count, "count 3");
492    }
493
494    // Cancel wait 3 then set signal 3 again: nothing happens this time.
495    EXPECT_EQ(ZX_OK, wait3.Cancel(loop.dispatcher()), "cancel");
496    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_3), "signal 3");
497    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
498    EXPECT_EQ(1u, wait1.run_count, "run count 1");
499    EXPECT_EQ(2u, wait2.run_count, "run count 2");
500    EXPECT_EQ(3u, wait3.run_count, "run count 3");
501
502    // Redundant cancel returns an error.
503    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait3.Cancel(loop.dispatcher()), "cancel again");
504    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
505    EXPECT_EQ(1u, wait1.run_count, "run count 1");
506    EXPECT_EQ(2u, wait2.run_count, "run count 2");
507    EXPECT_EQ(3u, wait3.run_count, "run count 3");
508
509    loop.Shutdown();
510
511    END_TEST;
512}
513
514bool wait_unwaitable_handle_test() {
515    BEGIN_TEST;
516
517    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
518    zx::event event;
519    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
520    event.replace(ZX_RIGHT_NONE, &event);
521
522    TestWait wait(event.get(), ZX_USER_SIGNAL_0);
523    EXPECT_EQ(ZX_ERR_ACCESS_DENIED, wait.Begin(loop.dispatcher()), "begin");
524    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait.Cancel(loop.dispatcher()), "cancel");
525    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
526    EXPECT_EQ(0u, wait.run_count, "run count");
527
528    END_TEST;
529}
530
531bool wait_shutdown_test() {
532    BEGIN_TEST;
533
534    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
535    zx::event event;
536    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
537
538    CascadeWait wait1(event.get(), ZX_USER_SIGNAL_0, 0u, 0u, false);
539    CascadeWait wait2(event.get(), ZX_USER_SIGNAL_0, ZX_USER_SIGNAL_0, 0u, true);
540    TestWait wait3(event.get(), ZX_USER_SIGNAL_1);
541    SelfCancelingWait wait4(event.get(), ZX_USER_SIGNAL_0);
542    SelfCancelingWait wait5(event.get(), ZX_USER_SIGNAL_1);
543
544    EXPECT_EQ(ZX_OK, wait1.Begin(loop.dispatcher()), "begin 1");
545    EXPECT_EQ(ZX_OK, wait2.Begin(loop.dispatcher()), "begin 2");
546    EXPECT_EQ(ZX_OK, wait3.Begin(loop.dispatcher()), "begin 3");
547    EXPECT_EQ(ZX_OK, wait4.Begin(loop.dispatcher()), "begin 4");
548    EXPECT_EQ(ZX_OK, wait5.Begin(loop.dispatcher()), "begin 5");
549
550    // Nothing signaled so nothing happens at first.
551    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
552    EXPECT_EQ(0u, wait1.run_count, "run count 1");
553    EXPECT_EQ(0u, wait2.run_count, "run count 2");
554    EXPECT_EQ(0u, wait3.run_count, "run count 3");
555    EXPECT_EQ(0u, wait4.run_count, "run count 4");
556    EXPECT_EQ(0u, wait5.run_count, "run count 5");
557
558    // Set signal 1: notifies both waiters, |wait2| clears the signal and repeats
559    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal 1");
560    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
561    EXPECT_EQ(1u, wait1.run_count, "run count 1");
562    EXPECT_EQ(ZX_OK, wait1.last_status, "status 1");
563    EXPECT_NONNULL(wait1.last_signal);
564    EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 1");
565    EXPECT_EQ(ZX_USER_SIGNAL_0, wait1.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 1");
566    EXPECT_EQ(1u, wait1.last_signal->count, "count 1");
567    EXPECT_EQ(1u, wait2.run_count, "run count 2");
568    EXPECT_EQ(ZX_OK, wait2.last_status, "status 2");
569    EXPECT_NONNULL(wait2.last_signal);
570    EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 2");
571    EXPECT_EQ(ZX_USER_SIGNAL_0, wait2.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 2");
572    EXPECT_EQ(1u, wait2.last_signal->count, "count 2");
573    EXPECT_EQ(0u, wait3.run_count, "run count 3");
574    EXPECT_EQ(1u, wait4.run_count, "run count 4");
575    EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->trigger & ZX_USER_SIGNAL_ALL, "trigger 4");
576    EXPECT_EQ(ZX_USER_SIGNAL_0, wait4.last_signal->observed & ZX_USER_SIGNAL_ALL, "observed 4");
577    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait4.cancel_result, "cancel result 4");
578    EXPECT_EQ(0u, wait5.run_count, "run count 5");
579
580    // When the loop shuts down:
581    //   |wait1| not notified because it was serviced and didn't repeat
582    //   |wait2| notified because it repeated
583    //   |wait3| notified because it was not yet serviced
584    //   |wait4| not notified because it was serviced
585    //   |wait5| notified because it was not yet serviced
586    loop.Shutdown();
587    EXPECT_EQ(1u, wait1.run_count, "run count 1");
588    EXPECT_EQ(2u, wait2.run_count, "run count 2");
589    EXPECT_EQ(ZX_ERR_CANCELED, wait2.last_status, "status 2");
590    EXPECT_NULL(wait2.last_signal, "signal 2");
591    EXPECT_EQ(1u, wait3.run_count, "run count 3");
592    EXPECT_EQ(ZX_ERR_CANCELED, wait3.last_status, "status 3");
593    EXPECT_NULL(wait3.last_signal, "signal 3");
594    EXPECT_EQ(1u, wait4.run_count, "run count 4");
595    EXPECT_EQ(1u, wait5.run_count, "run count 5");
596    EXPECT_EQ(ZX_ERR_CANCELED, wait5.last_status, "status 5");
597    EXPECT_NULL(wait5.last_signal, "signal 5");
598    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait5.cancel_result, "cancel result 5");
599
600    // Try to add or cancel work after shutdown.
601    TestWait wait6(event.get(), ZX_USER_SIGNAL_0);
602    EXPECT_EQ(ZX_ERR_BAD_STATE, wait6.Begin(loop.dispatcher()), "begin after shutdown");
603    EXPECT_EQ(ZX_ERR_NOT_FOUND, wait6.Cancel(loop.dispatcher()), "cancel after shutdown");
604    EXPECT_EQ(0u, wait6.run_count, "run count 6");
605
606    END_TEST;
607}
608
609bool task_test() {
610    BEGIN_TEST;
611
612    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
613
614    zx::time start_time = async::Now(loop.dispatcher());
615    TestTask task1;
616    RepeatingTask task2(zx::msec(1), 3u);
617    TestTask task3;
618    QuitTask task4;
619    TestTask task5; // posted after quit
620
621    EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1");
622    EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2");
623    EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), start_time), "post 3");
624    task2.set_finish_callback([&loop, &task4, &task5, start_time] {
625        task4.PostForTime(loop.dispatcher(), start_time + zx::msec(10));
626        task5.PostForTime(loop.dispatcher(), start_time + zx::msec(10));
627    });
628
629    // Cancel task 3.
630    EXPECT_EQ(ZX_OK, task3.Cancel(loop.dispatcher()), "cancel 3");
631
632    // Run until quit.
633    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
634    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting");
635    EXPECT_EQ(1u, task1.run_count, "run count 1");
636    EXPECT_EQ(ZX_OK, task1.last_status, "status 1");
637    EXPECT_EQ(4u, task2.run_count, "run count 2");
638    EXPECT_EQ(ZX_OK, task2.last_status, "status 2");
639    EXPECT_EQ(0u, task3.run_count, "run count 3");
640    EXPECT_EQ(1u, task4.run_count, "run count 4");
641    EXPECT_EQ(ZX_OK, task4.last_status, "status 4");
642    EXPECT_EQ(0u, task5.run_count, "run count 5");
643
644    // Reset quit and keep running, now task5 should go ahead followed
645    // by any subsequently posted tasks even if they have earlier deadlines.
646    QuitTask task6;
647    TestTask task7;
648    EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6");
649    EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), start_time), "post 7");
650    EXPECT_EQ(ZX_OK, loop.ResetQuit());
651    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
652    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState(), "quitting");
653
654    EXPECT_EQ(1u, task5.run_count, "run count 5");
655    EXPECT_EQ(ZX_OK, task5.last_status, "status 5");
656    EXPECT_EQ(1u, task6.run_count, "run count 6");
657    EXPECT_EQ(ZX_OK, task6.last_status, "status 6");
658    EXPECT_EQ(0u, task7.run_count, "run count 7");
659
660    loop.Shutdown();
661
662    END_TEST;
663}
664
665bool task_shutdown_test() {
666    BEGIN_TEST;
667
668    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
669
670    zx::time start_time = async::Now(loop.dispatcher());
671    TestTask task1;
672    RepeatingTask task2(zx::msec(1000), 1u);
673    TestTask task3;
674    TestTask task4;
675    QuitTask task5;
676    SelfCancelingTask task6;
677    SelfCancelingTask task7;
678
679    EXPECT_EQ(ZX_OK, task1.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 1");
680    EXPECT_EQ(ZX_OK, task2.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 2");
681    EXPECT_EQ(ZX_OK, task3.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 3");
682    EXPECT_EQ(ZX_OK, task4.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 4");
683    EXPECT_EQ(ZX_OK, task5.PostForTime(loop.dispatcher(), start_time + zx::msec(1)), "post 5");
684    EXPECT_EQ(ZX_OK, task6.PostForTime(loop.dispatcher(), start_time), "post 6");
685    EXPECT_EQ(ZX_OK, task7.PostForTime(loop.dispatcher(), zx::time::infinite()), "post 7");
686
687    // Run tasks which are due up to the time when the quit task runs.
688    EXPECT_EQ(ZX_ERR_CANCELED, loop.Run(), "run loop");
689    EXPECT_EQ(1u, task1.run_count, "run count 1");
690    EXPECT_EQ(ZX_OK, task1.last_status, "status 1");
691    EXPECT_EQ(1u, task2.run_count, "run count 2");
692    EXPECT_EQ(ZX_OK, task2.last_status, "status 2");
693    EXPECT_EQ(0u, task3.run_count, "run count 3");
694    EXPECT_EQ(0u, task4.run_count, "run count 4");
695    EXPECT_EQ(1u, task5.run_count, "run count 5");
696    EXPECT_EQ(ZX_OK, task5.last_status, "status 5");
697    EXPECT_EQ(1u, task6.run_count, "run count 6");
698    EXPECT_EQ(ZX_OK, task6.last_status, "status 6");
699    EXPECT_EQ(ZX_ERR_NOT_FOUND, task6.cancel_result, "cancel result 6");
700    EXPECT_EQ(0u, task7.run_count, "run count 7");
701
702    // Cancel task 4.
703    EXPECT_EQ(ZX_OK, task4.Cancel(loop.dispatcher()), "cancel 4");
704
705    // When the loop shuts down:
706    //   |task1| not notified because it was serviced
707    //   |task2| notified because it requested a repeat
708    //   |task3| notified because it was not yet serviced
709    //   |task4| not notified because it was canceled
710    //   |task5| not notified because it was serviced
711    //   |task6| not notified because it was serviced
712    //   |task7| notified because it was not yet serviced
713    loop.Shutdown();
714    EXPECT_EQ(1u, task1.run_count, "run count 1");
715    EXPECT_EQ(2u, task2.run_count, "run count 2");
716    EXPECT_EQ(ZX_ERR_CANCELED, task2.last_status, "status 2");
717    EXPECT_EQ(1u, task3.run_count, "run count 3");
718    EXPECT_EQ(ZX_ERR_CANCELED, task3.last_status, "status 3");
719    EXPECT_EQ(0u, task4.run_count, "run count 4");
720    EXPECT_EQ(1u, task5.run_count, "run count 5");
721    EXPECT_EQ(1u, task6.run_count, "run count 6");
722    EXPECT_EQ(1u, task7.run_count, "run count 7");
723    EXPECT_EQ(ZX_ERR_CANCELED, task7.last_status, "status 7");
724    EXPECT_EQ(ZX_ERR_NOT_FOUND, task7.cancel_result, "cancel result 7");
725
726    // Try to add or cancel work after shutdown.
727    TestTask task8;
728    EXPECT_EQ(ZX_ERR_BAD_STATE, task8.PostForTime(loop.dispatcher(), zx::time::infinite()), "post after shutdown");
729    EXPECT_EQ(ZX_ERR_NOT_FOUND, task8.Cancel(loop.dispatcher()), "cancel after shutdown");
730    EXPECT_EQ(0u, task8.run_count, "run count 8");
731
732    END_TEST;
733}
734
735bool receiver_test() {
736    const zx_packet_user_t data1{.u64 = {11, 12, 13, 14}};
737    const zx_packet_user_t data2{.u64 = {21, 22, 23, 24}};
738    const zx_packet_user_t data3{.u64 = {31, 32, 33, 34}};
739    const zx_packet_user_t data_default{};
740
741    BEGIN_TEST;
742
743    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
744
745    TestReceiver receiver1;
746    TestReceiver receiver2;
747    TestReceiver receiver3;
748
749    EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data1), "queue 1");
750    EXPECT_EQ(ZX_OK, receiver1.QueuePacket(loop.dispatcher(), &data3), "queue 1, again");
751    EXPECT_EQ(ZX_OK, receiver2.QueuePacket(loop.dispatcher(), &data2), "queue 2");
752    EXPECT_EQ(ZX_OK, receiver3.QueuePacket(loop.dispatcher(), nullptr), "queue 3");
753
754    EXPECT_EQ(ZX_OK, loop.RunUntilIdle(), "run loop");
755    EXPECT_EQ(2u, receiver1.run_count, "run count 1");
756    EXPECT_EQ(ZX_OK, receiver1.last_status, "status 1");
757    EXPECT_NONNULL(receiver1.last_data);
758    EXPECT_EQ(0, memcmp(&data3, receiver1.last_data, sizeof(zx_packet_user_t)), "data 1");
759    EXPECT_EQ(1u, receiver2.run_count, "run count 2");
760    EXPECT_EQ(ZX_OK, receiver2.last_status, "status 2");
761    EXPECT_NONNULL(receiver2.last_data);
762    EXPECT_EQ(0, memcmp(&data2, receiver2.last_data, sizeof(zx_packet_user_t)), "data 2");
763    EXPECT_EQ(1u, receiver3.run_count, "run count 3");
764    EXPECT_EQ(ZX_OK, receiver3.last_status, "status 3");
765    EXPECT_NONNULL(receiver3.last_data);
766    EXPECT_EQ(0, memcmp(&data_default, receiver3.last_data, sizeof(zx_packet_user_t)), "data 3");
767
768    END_TEST;
769}
770
771bool receiver_shutdown_test() {
772    BEGIN_TEST;
773
774    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
775    loop.Shutdown();
776
777    // Try to add work after shutdown.
778    TestReceiver receiver;
779    EXPECT_EQ(ZX_ERR_BAD_STATE, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue after shutdown");
780    EXPECT_EQ(0u, receiver.run_count, "run count 1");
781
782    END_TEST;
783}
784
785uint32_t get_thread_state(zx_handle_t thread) {
786    zx_info_thread_t info;
787    __UNUSED zx_status_t status =
788        zx_object_get_info(thread, ZX_INFO_THREAD,
789                           &info, sizeof(info), NULL, NULL);
790    ZX_DEBUG_ASSERT(status == ZX_OK);
791    return info.state;
792}
793
794zx_koid_t get_koid(zx_handle_t handle) {
795    zx_info_handle_basic_t info;
796    __UNUSED zx_status_t status =
797        zx_object_get_info(handle, ZX_INFO_HANDLE_BASIC,
798                           &info, sizeof(info), NULL, NULL);
799    ZX_DEBUG_ASSERT(status == ZX_OK);
800    return info.koid;
801}
802
803zx_status_t create_crashing_thread(zx_handle_t* out_thread) {
804    static const char kThreadName[] = "crasher";
805    // Use zx_thread_create() so that the only cleanup we need to do is
806    // zx_task_kill/zx_handle_close.
807    auto status = zx_thread_create(zx_process_self(), kThreadName,
808                                   strlen(kThreadName), 0, out_thread);
809    if (status != ZX_OK)
810        return status;
811    // We want the thread to crash so we'll get an exception report.
812    // Easiest to just pass crashing values for pc,sp.
813    return zx_thread_start(*out_thread, 0u, 0u, 0u, 0u);
814}
815
816bool exception_test() {
817    BEGIN_TEST;
818
819    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
820
821    zx_handle_t self = zx_process_self();
822    TestException exception(self, 0);
823
824    EXPECT_EQ(ZX_OK, exception.Bind(loop.dispatcher()));
825
826    // Initially nothing is signaled.
827    EXPECT_EQ(ZX_OK, loop.RunUntilIdle());
828    EXPECT_EQ(0u, exception.run_count);
829
830    zx_handle_t crashing_thread;
831    EXPECT_EQ(ZX_OK, create_crashing_thread(&crashing_thread));
832
833    // Wait until thread has crashed.
834    uint32_t state;
835    do {
836        zx_nanosleep(zx_deadline_after(ZX_MSEC(1)));
837        state = get_thread_state(crashing_thread);
838    } while (state != ZX_THREAD_STATE_BLOCKED_EXCEPTION);
839
840    // There should now be an exception to read.
841    EXPECT_EQ(ZX_OK, loop.RunUntilIdle());
842    EXPECT_EQ(1u, exception.run_count);
843    EXPECT_EQ(ZX_OK, exception.last_status);
844    EXPECT_NONNULL(exception.last_report);
845    EXPECT_EQ(ZX_EXCP_FATAL_PAGE_FAULT, exception.last_report->type);
846    EXPECT_EQ(get_koid(self), exception.last_report->exception.pid);
847    EXPECT_EQ(get_koid(crashing_thread), exception.last_report->exception.tid);
848    zx_task_kill(crashing_thread);
849    zx_handle_close(crashing_thread);
850
851    loop.Shutdown();
852    EXPECT_EQ(ZX_ERR_CANCELED, exception.last_status);
853
854    END_TEST;
855}
856
857bool exception_shutdown_test() {
858    BEGIN_TEST;
859
860    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
861    loop.Shutdown();
862
863    // Try to bind a port after shutdown.
864    TestException exception(zx_process_self(), 0);
865    EXPECT_EQ(ZX_ERR_BAD_STATE, exception.Bind(loop.dispatcher()));
866
867    END_TEST;
868}
869
870class GetDefaultDispatcherTask : public QuitTask {
871public:
872    async_dispatcher_t* last_default_dispatcher;
873
874protected:
875    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
876        QuitTask::Handle(dispatcher, status);
877        last_default_dispatcher = async_get_default_dispatcher();
878    }
879};
880
881class ConcurrencyMeasure {
882public:
883    ConcurrencyMeasure(uint32_t end)
884        : end_(end) {}
885
886    uint32_t max_threads() const { return fbl::atomic_load(&max_threads_, fbl::memory_order_acquire); }
887    uint32_t count() const { return fbl::atomic_load(&count_, fbl::memory_order_acquire); }
888
889    void Tally(async_dispatcher_t* dispatcher) {
890        // Increment count of concurrently active threads.  Update maximum if needed.
891        uint32_t active = 1u + fbl::atomic_fetch_add(&active_threads_, 1u,
892                                                     fbl::memory_order_acq_rel);
893        uint32_t old_max;
894        do {
895            old_max = fbl::atomic_load(&max_threads_, fbl::memory_order_acquire);
896        } while (active > old_max &&
897                 !fbl::atomic_compare_exchange_weak(&max_threads_, &old_max, active,
898                                                    fbl::memory_order_acq_rel,
899                                                    fbl::memory_order_acquire));
900
901        // Pretend to do work.
902        zx::nanosleep(zx::deadline_after(zx::msec(1)));
903
904        // Decrement count of active threads.
905        fbl::atomic_fetch_sub(&active_threads_, 1u, fbl::memory_order_acq_rel);
906
907        // Quit when last item processed.
908        if (1u + fbl::atomic_fetch_add(&count_, 1u, fbl::memory_order_acq_rel) == end_)
909            async_loop_quit(async_loop_from_dispatcher(dispatcher));
910    }
911
912private:
913    const uint32_t end_;
914    fbl::atomic_uint32_t count_{};
915    fbl::atomic_uint32_t active_threads_{};
916    fbl::atomic_uint32_t max_threads_{};
917};
918
919class ThreadAssertWait : public TestWait {
920public:
921    ThreadAssertWait(zx_handle_t object, zx_signals_t trigger, ConcurrencyMeasure* measure)
922        : TestWait(object, trigger), measure_(measure) {}
923
924protected:
925    ConcurrencyMeasure* measure_;
926
927    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
928                const zx_packet_signal_t* signal) override {
929        TestWait::Handle(dispatcher, status, signal);
930        measure_->Tally(dispatcher);
931    }
932};
933
934class ThreadAssertTask : public TestTask {
935public:
936    ThreadAssertTask(ConcurrencyMeasure* measure)
937        : measure_(measure) {}
938
939protected:
940    ConcurrencyMeasure* measure_;
941
942    void Handle(async_dispatcher_t* dispatcher, zx_status_t status) override {
943        TestTask::Handle(dispatcher, status);
944        measure_->Tally(dispatcher);
945    }
946};
947
948class ThreadAssertReceiver : public TestReceiver {
949public:
950    ThreadAssertReceiver(ConcurrencyMeasure* measure)
951        : measure_(measure) {}
952
953protected:
954    ConcurrencyMeasure* measure_;
955
956    // This receiver's handler will run concurrently on multiple threads
957    // (unlike the Waits and Tasks) so we must guard its state.
958    fbl::Mutex mutex_;
959
960    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
961                const zx_packet_user_t* data) override {
962        {
963            fbl::AutoLock lock(&mutex_);
964            TestReceiver::Handle(dispatcher, status, data);
965        }
966        measure_->Tally(dispatcher);
967    }
968};
969
970class ThreadAssertException : public TestException {
971public:
972    ThreadAssertException(zx_handle_t task, uint32_t options,
973                          ConcurrencyMeasure* measure)
974        : TestException(task, options), measure_(measure) {}
975
976protected:
977    ConcurrencyMeasure* measure_;
978
979    // This receiver's handler will run concurrently on multiple threads
980    // (unlike the Waits and Tasks) so we must guard its state.
981    fbl::Mutex mutex_;
982
983    void Handle(async_dispatcher_t* dispatcher, zx_status_t status,
984                const zx_port_packet_t* report) override {
985        {
986            fbl::AutoLock lock(&mutex_);
987            TestException::Handle(dispatcher, status, report);
988        }
989        measure_->Tally(dispatcher);
990    }
991};
992
993bool threads_have_default_dispatcher() {
994    BEGIN_TEST;
995
996    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
997    EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
998
999    GetDefaultDispatcherTask task;
1000    EXPECT_EQ(ZX_OK, task.Post(loop.dispatcher()), "post task");
1001    loop.JoinThreads();
1002
1003    EXPECT_EQ(1u, task.run_count, "run count");
1004    EXPECT_EQ(ZX_OK, task.last_status, "status");
1005    EXPECT_EQ(loop.dispatcher(), task.last_default_dispatcher, "default dispatcher");
1006
1007    END_TEST;
1008}
1009
1010// The goal here is to ensure that threads stop when Quit() is called.
1011bool threads_quit() {
1012    const size_t num_threads = 4;
1013
1014    BEGIN_TEST;
1015
1016    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1017    for (size_t i = 0; i < num_threads; i++) {
1018        EXPECT_EQ(ZX_OK, loop.StartThread());
1019    }
1020    loop.Quit();
1021    loop.JoinThreads();
1022    EXPECT_EQ(ASYNC_LOOP_QUIT, loop.GetState());
1023
1024    END_TEST;
1025}
1026
1027// The goal here is to ensure that threads stop when Shutdown() is called.
1028bool threads_shutdown() {
1029    const size_t num_threads = 4;
1030
1031    BEGIN_TEST;
1032
1033    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1034    for (size_t i = 0; i < num_threads; i++) {
1035        EXPECT_EQ(ZX_OK, loop.StartThread());
1036    }
1037    loop.Shutdown();
1038    EXPECT_EQ(ASYNC_LOOP_SHUTDOWN, loop.GetState());
1039
1040    loop.JoinThreads(); // should be a no-op
1041
1042    EXPECT_EQ(ZX_ERR_BAD_STATE, loop.StartThread(), "can't start threads after shutdown");
1043
1044    END_TEST;
1045}
1046
1047// The goal here is to schedule a lot of work and see whether it runs
1048// on as many threads as we expected it to.
1049bool threads_waits_run_concurrently_test() {
1050    const size_t num_threads = 4;
1051    const size_t num_items = 100;
1052
1053    BEGIN_TEST;
1054
1055    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1056    for (size_t i = 0; i < num_threads; i++) {
1057        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
1058    }
1059
1060    ConcurrencyMeasure measure(num_items);
1061    zx::event event;
1062    EXPECT_EQ(ZX_OK, zx::event::create(0u, &event), "create event");
1063    EXPECT_EQ(ZX_OK, event.signal(0u, ZX_USER_SIGNAL_0), "signal");
1064
1065    // Post a number of work items to run all at once.
1066    ThreadAssertWait* items[num_items];
1067    for (size_t i = 0; i < num_items; i++) {
1068        items[i] = new ThreadAssertWait(event.get(), ZX_USER_SIGNAL_0, &measure);
1069        EXPECT_EQ(ZX_OK, items[i]->Begin(loop.dispatcher()), "begin wait");
1070    }
1071
1072    // Wait until quitted.
1073    loop.JoinThreads();
1074
1075    // Ensure all work items completed.
1076    EXPECT_EQ(num_items, measure.count(), "item count");
1077    for (size_t i = 0; i < num_items; i++) {
1078        EXPECT_EQ(1u, items[i]->run_count, "run count");
1079        EXPECT_EQ(ZX_OK, items[i]->last_status, "status");
1080        EXPECT_NONNULL(items[i]->last_signal, "signal");
1081        EXPECT_EQ(ZX_USER_SIGNAL_0, items[i]->last_signal->observed & ZX_USER_SIGNAL_ALL, "observed");
1082        delete items[i];
1083    }
1084
1085    // Ensure that we actually ran many waits concurrently on different threads.
1086    EXPECT_NE(1u, measure.max_threads(), "waits handled concurrently");
1087
1088    END_TEST;
1089}
1090
1091// The goal here is to schedule a lot of work and see whether it runs
1092// on as many threads as we expected it to.
1093bool threads_tasks_run_sequentially_test() {
1094    const size_t num_threads = 4;
1095    const size_t num_items = 100;
1096
1097    BEGIN_TEST;
1098
1099    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1100    for (size_t i = 0; i < num_threads; i++) {
1101        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
1102    }
1103
1104    ConcurrencyMeasure measure(num_items);
1105
1106    // Post a number of work items to run all at once.
1107    ThreadAssertTask* items[num_items];
1108    zx::time start_time = async::Now(loop.dispatcher());
1109    for (size_t i = 0; i < num_items; i++) {
1110        items[i] = new ThreadAssertTask(&measure);
1111        EXPECT_EQ(ZX_OK, items[i]->PostForTime(loop.dispatcher(), start_time + zx::msec(i)), "post task");
1112    }
1113
1114    // Wait until quitted.
1115    loop.JoinThreads();
1116
1117    // Ensure all work items completed.
1118    EXPECT_EQ(num_items, measure.count(), "item count");
1119    for (size_t i = 0; i < num_items; i++) {
1120        EXPECT_EQ(1u, items[i]->run_count, "run count");
1121        EXPECT_EQ(ZX_OK, items[i]->last_status, "status");
1122        delete items[i];
1123    }
1124
1125    // Ensure that we actually ran tasks sequentially despite having many
1126    // threads available.
1127    EXPECT_EQ(1u, measure.max_threads(), "tasks handled sequentially");
1128
1129    END_TEST;
1130}
1131
1132// The goal here is to schedule a lot of work and see whether it runs
1133// on as many threads as we expected it to.
1134bool threads_receivers_run_concurrently_test() {
1135    const size_t num_threads = 4;
1136    const size_t num_items = 100;
1137
1138    BEGIN_TEST;
1139
1140    async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1141    for (size_t i = 0; i < num_threads; i++) {
1142        EXPECT_EQ(ZX_OK, loop.StartThread(), "start thread");
1143    }
1144
1145    ConcurrencyMeasure measure(num_items);
1146
1147    // Post a number of packets all at once.
1148    ThreadAssertReceiver receiver(&measure);
1149    for (size_t i = 0; i < num_items; i++) {
1150        EXPECT_EQ(ZX_OK, receiver.QueuePacket(loop.dispatcher(), nullptr), "queue packet");
1151    }
1152
1153    // Wait until quitted.
1154    loop.JoinThreads();
1155
1156    // Ensure all work items completed.
1157    EXPECT_EQ(num_items, measure.count(), "item count");
1158    EXPECT_EQ(num_items, receiver.run_count, "run count");
1159    EXPECT_EQ(ZX_OK, receiver.last_status, "status");
1160
1161    // Ensure that we actually processed many packets concurrently on different threads.
1162    EXPECT_NE(1u, measure.max_threads(), "packets handled concurrently");
1163
1164    END_TEST;
1165}
1166
1167// The goal here is to schedule a lot of work and see whether it runs
1168// on as many threads as we expected it to.
1169bool threads_exceptions_run_concurrently_test() {
1170    const size_t num_threads = 4;
1171    // We generate this number of exceptions, and therefore this number of
1172    // crashing threads, so this number isn't that large (e.g., not 100).
1173    const size_t num_items = 10;
1174
1175    BEGIN_TEST;
1176
1177    ConcurrencyMeasure measure(num_items);
1178
1179    // The exception receiver object must survive the lifetime of the async
1180    // loop.
1181    ThreadAssertException receiver(zx_process_self(), 0, &measure);
1182
1183    {
1184        zx_handle_t crashing_threads[num_items];
1185        async::Loop loop(&kAsyncLoopConfigNoAttachToThread);
1186        EXPECT_EQ(ZX_OK, receiver.Bind(loop.dispatcher()));
1187
1188        for (size_t i = 0; i < num_threads; i++) {
1189            EXPECT_EQ(ZX_OK, loop.StartThread());
1190        }
1191
1192        // Post a number of packets all at once.
1193        for (size_t i = 0; i < num_items; i++) {
1194            EXPECT_EQ(ZX_OK, create_crashing_thread(&crashing_threads[i]));
1195        }
1196        // We don't need to wait for the threads to crash here as the loop
1197        // will continue until |measure| receives |num_items|.
1198
1199        // Wait until quitted.
1200        loop.JoinThreads();
1201
1202        // Make sure the threads are gone before we unbind the exception port,
1203        // otherwise the global crash-handler will see the exceptions.
1204        for (size_t i = 0; i < num_items; i++) {
1205            zx_task_kill(crashing_threads[i]);
1206            zx_handle_close(crashing_threads[i]);
1207        }
1208
1209        // Ensure all work items completed.
1210        // When |loop| goes out of scope |receiver| will get ZX_ERR_CANCELED,
1211        // which will add one to the packet received count. Do these tests
1212        // here before |loop| goes out of scope.
1213        EXPECT_EQ(num_items, measure.count());
1214        EXPECT_EQ(num_items, receiver.run_count);
1215        EXPECT_EQ(ZX_OK, receiver.last_status);
1216    }
1217
1218    // Loop shutdown -> ZX_ERR_CANCELED.
1219    EXPECT_EQ(ZX_ERR_CANCELED, receiver.last_status);
1220
1221    // Ensure that we actually processed many packets concurrently on different threads.
1222    EXPECT_NE(1u, measure.max_threads());
1223
1224    END_TEST;
1225}
1226
1227} // namespace
1228
1229BEGIN_TEST_CASE(loop_tests)
1230RUN_TEST(c_api_basic_test)
1231RUN_TEST(make_default_false_test)
1232RUN_TEST(make_default_true_test)
1233RUN_TEST(quit_test)
1234RUN_TEST(time_test)
1235RUN_TEST(wait_test)
1236RUN_TEST(wait_unwaitable_handle_test)
1237RUN_TEST(wait_shutdown_test)
1238RUN_TEST(task_test)
1239RUN_TEST(task_shutdown_test)
1240RUN_TEST(receiver_test)
1241RUN_TEST(receiver_shutdown_test)
1242RUN_TEST(exception_test)
1243RUN_TEST(exception_shutdown_test)
1244RUN_TEST(threads_have_default_dispatcher)
1245for (int i = 0; i < 3; i++) {
1246    RUN_TEST(threads_quit)
1247    RUN_TEST(threads_shutdown)
1248    RUN_TEST(threads_waits_run_concurrently_test)
1249    RUN_TEST(threads_tasks_run_sequentially_test)
1250    RUN_TEST(threads_receivers_run_concurrently_test)
1251    RUN_TEST(threads_exceptions_run_concurrently_test)
1252}
1253END_TEST_CASE(loop_tests)
1254