1// Copyright 2017 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <zircon/syscalls.h> 6#include <zircon/types.h> 7#include <fbl/auto_call.h> 8 9#include <dispatcher-pool/dispatcher-channel.h> 10#include <dispatcher-pool/dispatcher-event-source.h> 11#include <dispatcher-pool/dispatcher-execution-domain.h> 12#include <dispatcher-pool/dispatcher-thread-pool.h> 13 14namespace dispatcher { 15 16// static 17fbl::RefPtr<dispatcher::Channel> dispatcher::Channel::Create() { 18 fbl::AllocChecker ac; 19 20 auto ptr = new (&ac) dispatcher::Channel(); 21 if (!ac.check()) 22 return nullptr; 23 24 return fbl::AdoptRef(ptr); 25} 26 27zx_status_t Channel::Activate(zx::channel* client_channel_out, 28 fbl::RefPtr<ExecutionDomain> domain, 29 ProcessHandler process_handler, 30 ChannelClosedHandler channel_closed_handler) { 31 // Arg and constant state checks first 32 if ((client_channel_out == nullptr) || client_channel_out->is_valid()) 33 return ZX_ERR_INVALID_ARGS; 34 35 if (domain == nullptr) 36 return ZX_ERR_INVALID_ARGS; 37 38 // Create the channel endpoints. 39 zx::channel channel; 40 zx_status_t res; 41 42 res = zx::channel::create(0u, &channel, client_channel_out); 43 if (res != ZX_OK) 44 return res; 45 46 // Attempt to activate. 47 res = Activate(fbl::move(channel), 48 fbl::move(domain), 49 fbl::move(process_handler), 50 fbl::move(channel_closed_handler)); 51 52 // If something went wrong, make sure we close the channel endpoint we were 53 // going to give back to the caller. 54 if (res != ZX_OK) 55 client_channel_out->reset(); 56 57 return res; 58} 59 60zx_status_t Channel::Activate(zx::channel channel, 61 fbl::RefPtr<ExecutionDomain> domain, 62 ProcessHandler process_handler, 63 ChannelClosedHandler channel_closed_handler) { 64 // In order to activate, the supplied execution domain and channel, and 65 // process handler must all be valid. Only the deactivate handler is 66 // optional. 67 if ((domain == nullptr) || !channel.is_valid() || (process_handler == nullptr)) 68 return ZX_ERR_INVALID_ARGS; 69 70 zx_status_t ret; 71 { 72 fbl::AutoLock obj_lock(&obj_lock_); 73 if ((process_handler_ != nullptr) || (channel_closed_handler_ != nullptr)) 74 return ZX_ERR_BAD_STATE; 75 76 ret = ActivateLocked(fbl::move(channel), fbl::move(domain)); 77 // If we succeeded, take control of the handlers provided by our caller. 78 // Otherwise, wait until we are outside of our lock before we let the 79 // handler state go out of scope and destruct. 80 if (ret == ZX_OK) { 81 ZX_DEBUG_ASSERT(process_handler_ == nullptr); 82 ZX_DEBUG_ASSERT(channel_closed_handler_ == nullptr); 83 process_handler_ = fbl::move(process_handler); 84 channel_closed_handler_ = fbl::move(channel_closed_handler); 85 } 86 } 87 return ret; 88} 89 90void Channel::Deactivate() { 91 ProcessHandler old_process_handler; 92 ChannelClosedHandler old_channel_closed_handler; 93 94 { 95 fbl::AutoLock obj_lock(&obj_lock_); 96 InternalDeactivateLocked(); 97 98 // If we are in the process of actively dispatching, do not discard our 99 // handlers just yet. They are currently being used by the dispatch 100 // thread. Instead, wait until the dispatch thread unwinds and allow it 101 // to clean up the handlers. 102 // 103 // Otherwise, transfer the handler state into local storage and let them 104 // destruct after we have released the object lock. 105 if (dispatch_state() != DispatchState::Dispatching) { 106 ZX_DEBUG_ASSERT((dispatch_state() == DispatchState::Idle) || 107 (dispatch_state() == DispatchState::WaitingOnPort)); 108 old_process_handler = fbl::move(process_handler_); 109 old_channel_closed_handler = fbl::move(channel_closed_handler_); 110 } 111 } 112} 113 114zx_status_t Channel::ActivateLocked(zx::channel channel, fbl::RefPtr<ExecutionDomain> domain) { 115 ZX_DEBUG_ASSERT((domain != nullptr) && channel.is_valid()); 116 117 // Take ownership of the channel resource and execution domain reference. 118 zx_status_t res = EventSource::ActivateLocked(fbl::move(channel), fbl::move(domain)); 119 if (res != ZX_OK) { 120 return res; 121 } 122 123 // Setup our initial async wait operation on our thread pool's port. 124 res = WaitOnPortLocked(); 125 if (res != ZX_OK) { 126 InternalDeactivateLocked(); 127 return res; 128 } 129 130 return res; 131} 132 133void Channel::Dispatch(ExecutionDomain* domain) { 134 // No one should be calling us if we have no messages to read. 135 ZX_DEBUG_ASSERT(domain != nullptr); 136 ZX_DEBUG_ASSERT(process_handler_ != nullptr); 137 ZX_DEBUG_ASSERT(pending_pkt_.signal.observed & process_signal_mask()); 138 bool signal_channel_closed = (pending_pkt_.signal.observed & ZX_CHANNEL_PEER_CLOSED); 139 140 // Do we have messages to dispatch? 141 if (pending_pkt_.signal.observed & ZX_CHANNEL_READABLE) { 142 // Process all of the pending messages in the channel before re-joining 143 // the thread pool. 144 // 145 // TODO(johngro) : Start to establish some sort of fair scheduler-like 146 // behavior. We do not want to dominate the thread pool processing a 147 // single channel for a single client. 148 ZX_DEBUG_ASSERT(pending_pkt_.signal.count); 149 for (uint64_t i = 0; i < pending_pkt_.signal.count; ++i) { 150 if (domain->deactivated()) 151 break; 152 153 if (process_handler_(this) != ZX_OK) { 154 signal_channel_closed = true; 155 break; 156 } 157 } 158 } 159 160 // If the other side has closed our channel, or there was an error during 161 // dispatch, attempt to call our deactivate handler (if it still exists). 162 if (signal_channel_closed && (channel_closed_handler_ != nullptr)) { 163 channel_closed_handler_(this); 164 } 165 166 // Ok, for better or worse, dispatch is now complete. Enter the lock and 167 // deal with state transition. If things are still healthy, attempt to wait 168 // on our thread-pool's port. If things are not healthy, go through the 169 // process of deactivation. 170 ProcessHandler old_process_handler; 171 ChannelClosedHandler old_channel_closed_handler; 172 { 173 fbl::AutoLock obj_lock(&obj_lock_); 174 ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::Dispatching); 175 dispatch_state_ = DispatchState::Idle; 176 177 // If we had an error during processing, or our peer closed their end of 178 // the channel, make sure that we have released our handle and our 179 // domain reference. 180 if (signal_channel_closed) { 181 InternalDeactivateLocked(); 182 } 183 184 // If we are still active, attempt to set up the next wait opertaion. 185 // If this fails (it should never fail) then automatically deactivate 186 // ourselves. 187 if (is_active()) { 188 ZX_DEBUG_ASSERT(handle_.is_valid()); 189 zx_status_t res = WaitOnPortLocked(); 190 if (res != ZX_OK) { 191 // TODO(johngro) : Log something about this. 192 InternalDeactivateLocked(); 193 } else { 194 ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::WaitingOnPort); 195 } 196 } 197 198 // If we have become deactivated for any reason, transfer our handler 199 // state to local storage so that the handlers can destruct from outside 200 // of our main lock. 201 if (!is_active()) { 202 old_process_handler = fbl::move(process_handler_); 203 old_channel_closed_handler = fbl::move(channel_closed_handler_); 204 } 205 } 206} 207 208zx_status_t Channel::Read(void* buf, 209 uint32_t buf_len, 210 uint32_t* bytes_read_out, 211 zx::handle* rxed_handle) { 212 if (!buf || !buf_len || !bytes_read_out || 213 ((rxed_handle != nullptr) && rxed_handle->is_valid())) 214 return ZX_ERR_INVALID_ARGS; 215 216 fbl::AutoLock obj_lock(&obj_lock_); 217 218 if (!handle_.is_valid()) 219 return ZX_ERR_BAD_HANDLE; 220 221 uint32_t rxed_handle_count = 0; 222 return zx_channel_read(handle_.get(), 223 0, 224 buf, 225 rxed_handle ? rxed_handle->reset_and_get_address() : nullptr, 226 buf_len, 227 rxed_handle ? 1 : 0, 228 bytes_read_out, 229 &rxed_handle_count); 230} 231 232zx_status_t Channel::Write(const void* buf, 233 uint32_t buf_len, 234 zx::handle&& tx_handle) { 235 if (!buf || !buf_len) 236 return ZX_ERR_INVALID_ARGS; 237 238 fbl::AutoLock obj_lock(&obj_lock_); 239 if (!handle_.is_valid()) 240 return ZX_ERR_BAD_HANDLE; 241 242 if (!tx_handle.is_valid()) 243 return zx_channel_write(handle_.get(), 0, buf, buf_len, nullptr, 0); 244 245 zx_handle_t h = tx_handle.release(); 246 return zx_channel_write(handle_.get(), 0, buf, buf_len, &h, 1); 247} 248 249} // namespace dispatcher 250