1// Copyright 2017 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <zircon/syscalls.h>
6#include <zircon/types.h>
7#include <fbl/auto_call.h>
8
9#include <dispatcher-pool/dispatcher-channel.h>
10#include <dispatcher-pool/dispatcher-event-source.h>
11#include <dispatcher-pool/dispatcher-execution-domain.h>
12#include <dispatcher-pool/dispatcher-thread-pool.h>
13
14namespace dispatcher {
15
16// static
17fbl::RefPtr<dispatcher::Channel> dispatcher::Channel::Create() {
18    fbl::AllocChecker ac;
19
20    auto ptr = new (&ac) dispatcher::Channel();
21    if (!ac.check())
22        return nullptr;
23
24    return fbl::AdoptRef(ptr);
25}
26
27zx_status_t Channel::Activate(zx::channel* client_channel_out,
28                              fbl::RefPtr<ExecutionDomain> domain,
29                              ProcessHandler process_handler,
30                              ChannelClosedHandler channel_closed_handler) {
31    // Arg and constant state checks first
32    if ((client_channel_out == nullptr) || client_channel_out->is_valid())
33        return ZX_ERR_INVALID_ARGS;
34
35    if (domain == nullptr)
36        return ZX_ERR_INVALID_ARGS;
37
38    // Create the channel endpoints.
39    zx::channel channel;
40    zx_status_t res;
41
42    res = zx::channel::create(0u, &channel, client_channel_out);
43    if (res != ZX_OK)
44        return res;
45
46    // Attempt to activate.
47    res = Activate(fbl::move(channel),
48                   fbl::move(domain),
49                   fbl::move(process_handler),
50                   fbl::move(channel_closed_handler));
51
52    // If something went wrong, make sure we close the channel endpoint we were
53    // going to give back to the caller.
54   if (res != ZX_OK)
55       client_channel_out->reset();
56
57   return res;
58}
59
60zx_status_t Channel::Activate(zx::channel channel,
61                              fbl::RefPtr<ExecutionDomain> domain,
62                              ProcessHandler process_handler,
63                              ChannelClosedHandler channel_closed_handler) {
64    // In order to activate, the supplied execution domain and channel, and
65    // process handler must all be valid.  Only the deactivate handler is
66    // optional.
67    if ((domain == nullptr) || !channel.is_valid() || (process_handler == nullptr))
68        return ZX_ERR_INVALID_ARGS;
69
70    zx_status_t ret;
71    {
72        fbl::AutoLock obj_lock(&obj_lock_);
73        if ((process_handler_ != nullptr) || (channel_closed_handler_ != nullptr))
74            return ZX_ERR_BAD_STATE;
75
76        ret = ActivateLocked(fbl::move(channel), fbl::move(domain));
77        // If we succeeded, take control of the handlers provided by our caller.
78        // Otherwise, wait until we are outside of our lock before we let the
79        // handler state go out of scope and destruct.
80        if (ret == ZX_OK) {
81            ZX_DEBUG_ASSERT(process_handler_ == nullptr);
82            ZX_DEBUG_ASSERT(channel_closed_handler_ == nullptr);
83            process_handler_ = fbl::move(process_handler);
84            channel_closed_handler_ = fbl::move(channel_closed_handler);
85        }
86    }
87    return ret;
88}
89
90void Channel::Deactivate() {
91    ProcessHandler       old_process_handler;
92    ChannelClosedHandler old_channel_closed_handler;
93
94    {
95        fbl::AutoLock obj_lock(&obj_lock_);
96        InternalDeactivateLocked();
97
98        // If we are in the process of actively dispatching, do not discard our
99        // handlers just yet.  They are currently being used by the dispatch
100        // thread.  Instead, wait until the dispatch thread unwinds and allow it
101        // to clean up the handlers.
102        //
103        // Otherwise, transfer the handler state into local storage and let them
104        // destruct after we have released the object lock.
105        if (dispatch_state() != DispatchState::Dispatching) {
106            ZX_DEBUG_ASSERT((dispatch_state() == DispatchState::Idle) ||
107                            (dispatch_state() == DispatchState::WaitingOnPort));
108            old_process_handler = fbl::move(process_handler_);
109            old_channel_closed_handler = fbl::move(channel_closed_handler_);
110        }
111    }
112}
113
114zx_status_t Channel::ActivateLocked(zx::channel channel, fbl::RefPtr<ExecutionDomain> domain) {
115    ZX_DEBUG_ASSERT((domain != nullptr) && channel.is_valid());
116
117    // Take ownership of the channel resource and execution domain reference.
118    zx_status_t res = EventSource::ActivateLocked(fbl::move(channel), fbl::move(domain));
119    if (res != ZX_OK) {
120        return res;
121    }
122
123    // Setup our initial async wait operation on our thread pool's port.
124    res = WaitOnPortLocked();
125    if (res != ZX_OK) {
126        InternalDeactivateLocked();
127        return res;
128    }
129
130    return res;
131}
132
133void Channel::Dispatch(ExecutionDomain* domain) {
134    // No one should be calling us if we have no messages to read.
135    ZX_DEBUG_ASSERT(domain != nullptr);
136    ZX_DEBUG_ASSERT(process_handler_ != nullptr);
137    ZX_DEBUG_ASSERT(pending_pkt_.signal.observed & process_signal_mask());
138    bool signal_channel_closed = (pending_pkt_.signal.observed & ZX_CHANNEL_PEER_CLOSED);
139
140    // Do we have messages to dispatch?
141    if (pending_pkt_.signal.observed & ZX_CHANNEL_READABLE) {
142        // Process all of the pending messages in the channel before re-joining
143        // the thread pool.
144        //
145        // TODO(johngro) : Start to establish some sort of fair scheduler-like
146        // behavior.  We do not want to dominate the thread pool processing a
147        // single channel for a single client.
148        ZX_DEBUG_ASSERT(pending_pkt_.signal.count);
149        for (uint64_t i = 0; i < pending_pkt_.signal.count; ++i) {
150            if (domain->deactivated())
151                break;
152
153            if (process_handler_(this) != ZX_OK) {
154                signal_channel_closed = true;
155                break;
156            }
157        }
158    }
159
160    // If the other side has closed our channel, or there was an error during
161    // dispatch, attempt to call our deactivate handler (if it still exists).
162    if (signal_channel_closed && (channel_closed_handler_ != nullptr)) {
163        channel_closed_handler_(this);
164    }
165
166    // Ok, for better or worse, dispatch is now complete.  Enter the lock and
167    // deal with state transition.  If things are still healthy, attempt to wait
168    // on our thread-pool's port.  If things are not healthy, go through the
169    // process of deactivation.
170    ProcessHandler       old_process_handler;
171    ChannelClosedHandler old_channel_closed_handler;
172    {
173        fbl::AutoLock obj_lock(&obj_lock_);
174        ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::Dispatching);
175        dispatch_state_ = DispatchState::Idle;
176
177        // If we had an error during processing, or our peer closed their end of
178        // the channel, make sure that we have released our handle and our
179        // domain reference.
180        if (signal_channel_closed) {
181            InternalDeactivateLocked();
182        }
183
184        // If we are still active, attempt to set up the next wait opertaion.
185        // If this fails (it should never fail) then automatically deactivate
186        // ourselves.
187        if (is_active()) {
188            ZX_DEBUG_ASSERT(handle_.is_valid());
189            zx_status_t res = WaitOnPortLocked();
190            if (res != ZX_OK) {
191                // TODO(johngro) : Log something about this.
192                InternalDeactivateLocked();
193            } else {
194                ZX_DEBUG_ASSERT(dispatch_state() == DispatchState::WaitingOnPort);
195            }
196        }
197
198        // If we have become deactivated for any reason, transfer our handler
199        // state to local storage so that the handlers can destruct from outside
200        // of our main lock.
201        if (!is_active()) {
202            old_process_handler = fbl::move(process_handler_);
203            old_channel_closed_handler = fbl::move(channel_closed_handler_);
204        }
205    }
206}
207
208zx_status_t Channel::Read(void*       buf,
209                          uint32_t    buf_len,
210                          uint32_t*   bytes_read_out,
211                          zx::handle* rxed_handle) {
212    if (!buf || !buf_len || !bytes_read_out ||
213       ((rxed_handle != nullptr) && rxed_handle->is_valid()))
214        return ZX_ERR_INVALID_ARGS;
215
216    fbl::AutoLock obj_lock(&obj_lock_);
217
218    if (!handle_.is_valid())
219        return ZX_ERR_BAD_HANDLE;
220
221    uint32_t rxed_handle_count = 0;
222    return zx_channel_read(handle_.get(),
223                           0,
224                           buf,
225                           rxed_handle ? rxed_handle->reset_and_get_address() : nullptr,
226                           buf_len,
227                           rxed_handle ? 1 : 0,
228                           bytes_read_out,
229                           &rxed_handle_count);
230}
231
232zx_status_t Channel::Write(const void*  buf,
233                           uint32_t     buf_len,
234                           zx::handle&& tx_handle) {
235    if (!buf || !buf_len)
236        return ZX_ERR_INVALID_ARGS;
237
238    fbl::AutoLock obj_lock(&obj_lock_);
239    if (!handle_.is_valid())
240        return ZX_ERR_BAD_HANDLE;
241
242    if (!tx_handle.is_valid())
243        return zx_channel_write(handle_.get(), 0, buf, buf_len, nullptr, 0);
244
245    zx_handle_t h = tx_handle.release();
246    return zx_channel_write(handle_.get(), 0, buf, buf_len, &h, 1);
247}
248
249}  // namespace dispatcher
250