1// Copyright 2018 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <lib/async-testutils/test_loop_dispatcher.h> 6 7#include <zircon/assert.h> 8#include <zircon/status.h> 9#include <zircon/syscalls.h> 10 11#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state) 12#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state))) 13 14namespace async { 15 16namespace { 17 18// The packet key used to signal timer expirations. 19constexpr uint64_t kTimerExpirationKey = 0u; 20 21// Convenience functions for task, wait, and list node management. 22inline list_node_t* WaitToNode(async_wait_t* wait) { 23 return TO_NODE(async_wait_t, wait); 24} 25 26inline async_wait_t* NodeToWait(list_node_t* node) { 27 return FROM_NODE(async_wait_t, node); 28} 29 30inline list_node_t* TaskToNode(async_task_t* task) { 31 return TO_NODE(async_task_t, task); 32} 33 34inline async_task_t* NodeToTask(list_node_t* node) { 35 return FROM_NODE(async_task_t, node); 36} 37 38inline void InsertTask(list_node_t* task_list, async_task_t* task) { 39 list_node_t* node; 40 for (node = task_list->prev; node != task_list; node = node->prev) { 41 if (task->deadline >= NodeToTask(node)->deadline) { 42 break; 43 } 44 } 45 list_add_after(node, TaskToNode(task)); 46} 47} // namespace 48 49TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper) 50 : time_keeper_(time_keeper) { 51 ZX_DEBUG_ASSERT(time_keeper_); 52 list_initialize(&wait_list_); 53 list_initialize(&task_list_); 54 list_initialize(&due_list_); 55 zx_status_t status = zx::port::create(0u, &port_); 56 ZX_ASSERT_MSG(status == ZX_OK, 57 "zx_port_create: %s", 58 zx_status_get_string(status)); 59} 60 61TestLoopDispatcher::~TestLoopDispatcher() { 62 Shutdown(); 63 time_keeper_->CancelTimers(this); 64}; 65 66zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); } 67 68// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down. 69zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) { 70 ZX_DEBUG_ASSERT(wait); 71 72 // Along with the above assertion, the following check guarantees that the 73 // packet to be sent to |port_| on completion of this wait will not be 74 // mistaken for a timer expiration. 75 static_assert(0u == kTimerExpirationKey, 76 "Timer expirations must be signaled with a packet key of 0"); 77 78 list_add_head(&wait_list_, WaitToNode(wait)); 79 zx_status_t status = zx_object_wait_async(wait->object, port_.get(), 80 reinterpret_cast<uintptr_t>(wait), 81 wait->trigger, 82 ZX_WAIT_ASYNC_ONCE); 83 84 if (status != ZX_OK) { 85 // In this rare condition, the wait failed. Since a dispatched handler will 86 // never be invoked on the wait object, we remove it ourselves. 87 list_delete(WaitToNode(wait)); 88 } 89 return status; 90} 91 92zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) { 93 ZX_DEBUG_ASSERT(wait); 94 95 list_node_t* node = WaitToNode(wait); 96 if (!list_in_list(node)) { 97 return ZX_ERR_NOT_FOUND; 98 } 99 100 // |wait| already might be encoded in |due_packet_|. 101 if (due_packet_ && due_packet_->key != kTimerExpirationKey) { 102 if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) { 103 due_packet_.reset(); 104 list_delete(node); 105 return ZX_OK; 106 } 107 } 108 109 zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object), 110 reinterpret_cast<uintptr_t>(wait)); 111 if (status == ZX_OK) { 112 list_delete(node); 113 } 114 return status; 115} 116 117// TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down. 118zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) { 119 ZX_DEBUG_ASSERT(task); 120 121 InsertTask(&task_list_, task); 122 if (NodeToTask(list_peek_head(&task_list_)) == task) { 123 time_keeper_->RegisterTimer(GetNextTaskDueTime(), this); 124 } 125 return ZX_OK; 126} 127 128zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) { 129 ZX_DEBUG_ASSERT(task); 130 list_node_t* node = TaskToNode(task); 131 if (!list_in_list(node)) { 132 return ZX_ERR_NOT_FOUND; 133 } 134 list_delete(node); 135 return ZX_OK; 136} 137 138void TestLoopDispatcher::FireTimer() { 139 zx_port_packet_t timer_packet{}; 140 timer_packet.key = kTimerExpirationKey; 141 timer_packet.type = ZX_PKT_TYPE_USER; 142 zx_status_t status = port_.queue(&timer_packet); 143 ZX_ASSERT_MSG(status == ZX_OK, 144 "zx_port_queue: %s", 145 zx_status_get_string(status)); 146} 147 148zx::time TestLoopDispatcher::GetNextTaskDueTime() { 149 list_node_t* node = list_is_empty(&due_list_) ? 150 list_peek_head(&task_list_) : 151 list_peek_head(&due_list_); 152 if (!node) { 153 return zx::time::infinite(); 154 } 155 return zx::time(NodeToTask(node)->deadline); 156} 157 158 159void TestLoopDispatcher::ExtractNextDuePacket() { 160 ZX_DEBUG_ASSERT(!due_packet_); 161 bool tasks_are_due = GetNextTaskDueTime() <= Now(); 162 163 // If no tasks are due, flush all timer expiration packets until either 164 // there are no more packets to dequeue or a wait packet is reached. 165 do { 166 auto packet = fbl::make_unique<zx_port_packet_t>(); 167 if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; } 168 due_packet_.swap(packet); 169 } while (!tasks_are_due && due_packet_->key == kTimerExpirationKey); 170} 171 172bool TestLoopDispatcher::HasPendingWork() { 173 if (GetNextTaskDueTime() <= Now()) { return true; } 174 if (!due_packet_) { ExtractNextDuePacket(); } 175 return !!due_packet_; 176} 177 178void TestLoopDispatcher::DispatchNextDueTask() { 179 // if something is already in the due list, dispatch that. 180 list_node_t* node = list_peek_head(&due_list_); 181 if (node) { 182 list_delete(node); 183 async_task_t* task = NodeToTask(node); 184 task->handler(this, task, ZX_OK); 185 186 // If the due list is now empty and there are still pending tasks, 187 // register a timer for the next due time. 188 if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) { 189 time_keeper_->RegisterTimer(GetNextTaskDueTime(), this); 190 } 191 } 192} 193 194bool TestLoopDispatcher::DispatchNextDueMessage() { 195 if (!list_is_empty(&due_list_)) { 196 DispatchNextDueTask(); 197 return true; 198 } 199 200 if (!due_packet_) { ExtractNextDuePacket(); } 201 202 if (!due_packet_) { 203 return false; 204 } else if (due_packet_->key == kTimerExpirationKey) { 205 ExtractDueTasks(); 206 DispatchNextDueTask(); 207 due_packet_.reset(); 208 } else { // |due_packet_| encodes a finished wait. 209 // Move the next due packet to the stack, as invoking the associated 210 // wait's handler might try to extract another. 211 zx_port_packet_t packet = *due_packet_; 212 due_packet_.reset(); 213 async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key); 214 list_delete(WaitToNode(wait)); 215 wait->handler(this, wait, ZX_OK, &packet.signal); 216 } 217 return true; 218} 219 220void TestLoopDispatcher::ExtractDueTasks() { 221 list_node_t* node; 222 list_node_t* tail = nullptr; 223 zx::time current_time = time_keeper_->Now(); 224 list_for_every(&task_list_, node) { 225 if (NodeToTask(node)->deadline > current_time.get()) { break; } 226 tail = node; 227 } 228 if (tail) { 229 list_node_t* head = task_list_.next; 230 task_list_.next = tail->next; 231 tail->next->prev = &task_list_; 232 due_list_.next = head; 233 head->prev = &due_list_; 234 due_list_.prev = tail; 235 tail->next = &due_list_; 236 } 237} 238 239void TestLoopDispatcher::Shutdown() { 240 list_node_t* node; 241 while ((node = list_remove_head(&wait_list_))) { 242 async_wait_t* wait = NodeToWait(node); 243 wait->handler(this, wait, ZX_ERR_CANCELED, nullptr); 244 } 245 while ((node = list_remove_head(&due_list_))) { 246 async_task_t* task = NodeToTask(node); 247 task->handler(this, task, ZX_ERR_CANCELED); 248 } 249 while ((node = list_remove_head(&task_list_))) { 250 async_task_t* task = NodeToTask(node); 251 task->handler(this, task, ZX_ERR_CANCELED); 252 } 253} 254 255} // namespace async 256