1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <lib/async-testutils/test_loop_dispatcher.h>
6
7#include <zircon/assert.h>
8#include <zircon/status.h>
9#include <zircon/syscalls.h>
10
11#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
12#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
13
14namespace async {
15
16namespace {
17
18// The packet key used to signal timer expirations.
19constexpr uint64_t kTimerExpirationKey = 0u;
20
21// Convenience functions for task, wait, and list node management.
22inline list_node_t* WaitToNode(async_wait_t* wait) {
23    return TO_NODE(async_wait_t, wait);
24}
25
26inline async_wait_t* NodeToWait(list_node_t* node) {
27    return FROM_NODE(async_wait_t, node);
28}
29
30inline list_node_t* TaskToNode(async_task_t* task) {
31    return TO_NODE(async_task_t, task);
32}
33
34inline async_task_t* NodeToTask(list_node_t* node) {
35    return FROM_NODE(async_task_t, node);
36}
37
38inline void InsertTask(list_node_t* task_list, async_task_t* task) {
39    list_node_t* node;
40    for (node = task_list->prev; node != task_list; node = node->prev) {
41        if (task->deadline >= NodeToTask(node)->deadline) {
42            break;
43        }
44    }
45    list_add_after(node, TaskToNode(task));
46}
47} // namespace
48
49TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper)
50    : time_keeper_(time_keeper) {
51    ZX_DEBUG_ASSERT(time_keeper_);
52    list_initialize(&wait_list_);
53    list_initialize(&task_list_);
54    list_initialize(&due_list_);
55    zx_status_t status = zx::port::create(0u, &port_);
56    ZX_ASSERT_MSG(status == ZX_OK,
57                  "zx_port_create: %s",
58                  zx_status_get_string(status));
59}
60
61TestLoopDispatcher::~TestLoopDispatcher() {
62    Shutdown();
63    time_keeper_->CancelTimers(this);
64};
65
66zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); }
67
68// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
69zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
70    ZX_DEBUG_ASSERT(wait);
71
72    // Along with the above assertion, the following check guarantees that the
73    // packet to be sent to |port_| on completion of this wait will not be
74    // mistaken for a timer expiration.
75    static_assert(0u == kTimerExpirationKey,
76                  "Timer expirations must be signaled with a packet key of 0");
77
78    list_add_head(&wait_list_, WaitToNode(wait));
79    zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
80                                              reinterpret_cast<uintptr_t>(wait),
81                                              wait->trigger,
82                                              ZX_WAIT_ASYNC_ONCE);
83
84    if (status != ZX_OK) {
85        // In this rare condition, the wait failed. Since a dispatched handler will
86        // never be invoked on the wait object, we remove it ourselves.
87        list_delete(WaitToNode(wait));
88    }
89    return status;
90}
91
92zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
93    ZX_DEBUG_ASSERT(wait);
94
95    list_node_t* node = WaitToNode(wait);
96    if (!list_in_list(node)) {
97        return ZX_ERR_NOT_FOUND;
98    }
99
100    // |wait| already might be encoded in |due_packet_|.
101    if (due_packet_ && due_packet_->key != kTimerExpirationKey) {
102        if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) {
103            due_packet_.reset();
104            list_delete(node);
105            return ZX_OK;
106        }
107    }
108
109    zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object),
110                                      reinterpret_cast<uintptr_t>(wait));
111    if (status == ZX_OK) {
112        list_delete(node);
113    }
114    return status;
115}
116
117// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
118zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
119    ZX_DEBUG_ASSERT(task);
120
121    InsertTask(&task_list_, task);
122    if (NodeToTask(list_peek_head(&task_list_)) == task) {
123        time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
124    }
125    return ZX_OK;
126}
127
128zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
129    ZX_DEBUG_ASSERT(task);
130    list_node_t* node = TaskToNode(task);
131    if (!list_in_list(node)) {
132        return ZX_ERR_NOT_FOUND;
133    }
134    list_delete(node);
135    return ZX_OK;
136}
137
138void TestLoopDispatcher::FireTimer() {
139    zx_port_packet_t timer_packet{};
140    timer_packet.key = kTimerExpirationKey;
141    timer_packet.type = ZX_PKT_TYPE_USER;
142    zx_status_t status = port_.queue(&timer_packet);
143    ZX_ASSERT_MSG(status == ZX_OK,
144                  "zx_port_queue: %s",
145                  zx_status_get_string(status));
146}
147
148zx::time TestLoopDispatcher::GetNextTaskDueTime() {
149    list_node_t* node = list_is_empty(&due_list_) ?
150                        list_peek_head(&task_list_) :
151                        list_peek_head(&due_list_);
152    if (!node) {
153        return zx::time::infinite();
154    }
155    return zx::time(NodeToTask(node)->deadline);
156}
157
158
159void TestLoopDispatcher::ExtractNextDuePacket() {
160    ZX_DEBUG_ASSERT(!due_packet_);
161    bool tasks_are_due = GetNextTaskDueTime() <= Now();
162
163    // If no tasks are due, flush all timer expiration packets until either
164    // there are no more packets to dequeue or a wait packet is reached.
165    do {
166        auto packet = fbl::make_unique<zx_port_packet_t>();
167        if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; }
168        due_packet_.swap(packet);
169    } while (!tasks_are_due && due_packet_->key == kTimerExpirationKey);
170}
171
172bool TestLoopDispatcher::HasPendingWork() {
173    if (GetNextTaskDueTime() <= Now()) { return true; }
174    if (!due_packet_) { ExtractNextDuePacket(); }
175    return !!due_packet_;
176}
177
178void TestLoopDispatcher::DispatchNextDueTask() {
179    // if something is already in the due list, dispatch that.
180    list_node_t* node = list_peek_head(&due_list_);
181    if (node) {
182        list_delete(node);
183        async_task_t* task = NodeToTask(node);
184        task->handler(this, task, ZX_OK);
185
186        // If the due list is now empty and there are still pending tasks,
187        // register a timer for the next due time.
188        if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) {
189            time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
190        }
191    }
192}
193
194bool TestLoopDispatcher::DispatchNextDueMessage() {
195    if (!list_is_empty(&due_list_)) {
196        DispatchNextDueTask();
197        return true;
198    }
199
200    if (!due_packet_) { ExtractNextDuePacket(); }
201
202    if (!due_packet_) {
203        return false;
204    } else if (due_packet_->key == kTimerExpirationKey) {
205        ExtractDueTasks();
206        DispatchNextDueTask();
207        due_packet_.reset();
208    } else {  // |due_packet_| encodes a finished wait.
209        // Move the next due packet to the stack, as invoking the associated
210        // wait's handler might try to extract another.
211        zx_port_packet_t packet = *due_packet_;
212        due_packet_.reset();
213        async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
214        list_delete(WaitToNode(wait));
215        wait->handler(this, wait, ZX_OK, &packet.signal);
216    }
217    return true;
218}
219
220void TestLoopDispatcher::ExtractDueTasks() {
221    list_node_t* node;
222    list_node_t* tail = nullptr;
223    zx::time current_time = time_keeper_->Now();
224    list_for_every(&task_list_, node) {
225        if (NodeToTask(node)->deadline > current_time.get()) { break; }
226        tail = node;
227    }
228    if (tail) {
229        list_node_t* head = task_list_.next;
230        task_list_.next = tail->next;
231        tail->next->prev = &task_list_;
232        due_list_.next = head;
233        head->prev = &due_list_;
234        due_list_.prev = tail;
235        tail->next = &due_list_;
236    }
237}
238
239void TestLoopDispatcher::Shutdown() {
240    list_node_t* node;
241    while ((node = list_remove_head(&wait_list_))) {
242        async_wait_t* wait = NodeToWait(node);
243        wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
244    }
245    while ((node = list_remove_head(&due_list_))) {
246        async_task_t* task = NodeToTask(node);
247        task->handler(this, task, ZX_ERR_CANCELED);
248    }
249    while ((node = list_remove_head(&task_list_))) {
250        async_task_t* task = NodeToTask(node);
251        task->handler(this, task, ZX_ERR_CANCELED);
252    }
253}
254
255} // namespace async
256