1// Copyright 2016 The Fuchsia Authors
2//
3// Use of this source code is governed by a MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT
6
7#include <object/channel_dispatcher.h>
8
9#include <string.h>
10
11#include <assert.h>
12#include <err.h>
13#include <trace.h>
14
15#include <lib/counters.h>
16#include <kernel/event.h>
17#include <platform.h>
18#include <object/handle.h>
19#include <object/message_packet.h>
20#include <object/process_dispatcher.h>
21#include <object/thread_dispatcher.h>
22
23#include <fbl/alloc_checker.h>
24#include <fbl/auto_lock.h>
25#include <fbl/type_support.h>
26#include <zircon/rights.h>
27#include <zircon/types.h>
28
29#define LOCAL_TRACE 0
30
31KCOUNTER(channel_packet_depth_1, "kernel.channel.depth.1");
32KCOUNTER(channel_packet_depth_4, "kernel.channel.depth.4");
33KCOUNTER(channel_packet_depth_16, "kernel.channel.depth.16");
34KCOUNTER(channel_packet_depth_64, "kernel.channel.depth.64");
35KCOUNTER(channel_packet_depth_256, "kernel.channel.depth.256");
36KCOUNTER(channel_packet_depth_unbounded, "kernel.channel.depth.unbounded");
37
38// static
39zx_status_t ChannelDispatcher::Create(fbl::RefPtr<Dispatcher>* dispatcher0,
40                                      fbl::RefPtr<Dispatcher>* dispatcher1,
41                                      zx_rights_t* rights) {
42    fbl::AllocChecker ac;
43    auto holder0 = fbl::AdoptRef(new (&ac) PeerHolder<ChannelDispatcher>());
44    if (!ac.check())
45        return ZX_ERR_NO_MEMORY;
46    auto holder1 = holder0;
47
48    auto ch0 = fbl::AdoptRef(new (&ac) ChannelDispatcher(fbl::move(holder0)));
49    if (!ac.check())
50        return ZX_ERR_NO_MEMORY;
51
52    auto ch1 = fbl::AdoptRef(new (&ac) ChannelDispatcher(fbl::move(holder1)));
53    if (!ac.check())
54        return ZX_ERR_NO_MEMORY;
55
56    ch0->Init(ch1);
57    ch1->Init(ch0);
58
59    *rights = ZX_DEFAULT_CHANNEL_RIGHTS;
60    *dispatcher0 = fbl::move(ch0);
61    *dispatcher1 = fbl::move(ch1);
62    return ZX_OK;
63}
64
65ChannelDispatcher::ChannelDispatcher(fbl::RefPtr<PeerHolder<ChannelDispatcher>> holder)
66    : PeeredDispatcher(fbl::move(holder), ZX_CHANNEL_WRITABLE) {
67}
68
69// This is called before either ChannelDispatcher is accessible from threads other than the one
70// initializing the channel, so it does not need locking.
71void ChannelDispatcher::Init(fbl::RefPtr<ChannelDispatcher> other) TA_NO_THREAD_SAFETY_ANALYSIS {
72    peer_ = fbl::move(other);
73    peer_koid_ = peer_->get_koid();
74}
75
76ChannelDispatcher::~ChannelDispatcher() {
77    // At this point the other endpoint no longer holds
78    // a reference to us, so we can be sure we're discarding
79    // any remaining messages safely.
80
81    // It's not possible to do this safely in on_zero_handles()
82
83    messages_.clear();
84    message_count_ = 0;
85
86    switch (max_message_count_) {
87    case 0 ... 1:
88        kcounter_add(channel_packet_depth_1, 1);
89        break;
90    case 2 ... 4:
91        kcounter_add(channel_packet_depth_4, 1);
92        break;
93    case 5 ... 16:
94        kcounter_add(channel_packet_depth_16, 1);
95        break;
96    case 17 ... 64:
97        kcounter_add(channel_packet_depth_64, 1);
98        break;
99    case 65 ... 256:
100        kcounter_add(channel_packet_depth_256, 1);
101        break;
102    default:
103        kcounter_add(channel_packet_depth_unbounded, 1);
104        break;
105    }
106}
107
108zx_status_t ChannelDispatcher::add_observer(StateObserver* observer) {
109    canary_.Assert();
110
111    Guard<fbl::Mutex> guard{get_lock()};
112    StateObserver::CountInfo cinfo =
113        {{{message_count_, ZX_CHANNEL_READABLE}, {0u, 0u}}};
114    AddObserverLocked(observer, &cinfo);
115    return ZX_OK;
116}
117
118void ChannelDispatcher::RemoveWaiter(MessageWaiter* waiter) {
119    Guard<fbl::Mutex> guard{get_lock()};
120    if (!waiter->InContainer()) {
121        return;
122    }
123    waiters_.erase(*waiter);
124}
125
126void ChannelDispatcher::on_zero_handles_locked() {
127    canary_.Assert();
128
129    // (3A) Abort any waiting Call operations
130    // because we've been canceled by reason
131    // of our local handle going away.
132    // Remove waiter from list.
133    while (!waiters_.is_empty()) {
134        auto waiter = waiters_.pop_front();
135        waiter->Cancel(ZX_ERR_CANCELED);
136    }
137}
138
139// This requires holding the shared channel lock. The thread analysis
140// can reason about repeated calls to get_lock() on the shared object,
141// but cannot reason about the aliasing between left->get_lock() and
142// right->get_lock(), which occurs above in on_zero_handles.
143void ChannelDispatcher::OnPeerZeroHandlesLocked() {
144    canary_.Assert();
145
146    UpdateStateLocked(ZX_CHANNEL_WRITABLE, ZX_CHANNEL_PEER_CLOSED);
147    // (3B) Abort any waiting Call operations
148    // because we've been canceled by reason
149    // of the opposing endpoint going away.
150    // Remove waiter from list.
151    while (!waiters_.is_empty()) {
152        auto waiter = waiters_.pop_front();
153        waiter->Cancel(ZX_ERR_PEER_CLOSED);
154    }
155}
156
157zx_status_t ChannelDispatcher::Read(uint32_t* msg_size,
158                                    uint32_t* msg_handle_count,
159                                    fbl::unique_ptr<MessagePacket>* msg,
160                                    bool may_discard) {
161    canary_.Assert();
162
163    auto max_size = *msg_size;
164    auto max_handle_count = *msg_handle_count;
165
166    Guard<fbl::Mutex> guard{get_lock()};
167
168    if (messages_.is_empty())
169        return peer_ ? ZX_ERR_SHOULD_WAIT : ZX_ERR_PEER_CLOSED;
170
171    *msg_size = messages_.front().data_size();
172    *msg_handle_count = messages_.front().num_handles();
173    zx_status_t rv = ZX_OK;
174    if (*msg_size > max_size || *msg_handle_count > max_handle_count) {
175        if (!may_discard)
176            return ZX_ERR_BUFFER_TOO_SMALL;
177        rv = ZX_ERR_BUFFER_TOO_SMALL;
178    }
179
180    *msg = messages_.pop_front();
181    message_count_--;
182
183    if (messages_.is_empty())
184        UpdateStateLocked(ZX_CHANNEL_READABLE, 0u);
185
186    return rv;
187}
188
189zx_status_t ChannelDispatcher::Write(fbl::unique_ptr<MessagePacket> msg) {
190    canary_.Assert();
191
192    AutoReschedDisable resched_disable; // Must come before the lock guard.
193    resched_disable.Disable();
194    Guard<fbl::Mutex> guard{get_lock()};
195
196    if (!peer_)
197        return ZX_ERR_PEER_CLOSED;
198    peer_->WriteSelf(fbl::move(msg));
199
200    return ZX_OK;
201}
202
203zx_status_t ChannelDispatcher::Call(fbl::unique_ptr<MessagePacket> msg,
204                                    zx_time_t deadline, fbl::unique_ptr<MessagePacket>* reply) {
205
206    canary_.Assert();
207
208    auto waiter = ThreadDispatcher::GetCurrent()->GetMessageWaiter();
209    if (unlikely(waiter->BeginWait(fbl::WrapRefPtr(this)) != ZX_OK)) {
210        // If a thread tries BeginWait'ing twice, the VDSO contract around retrying
211        // channel calls has been violated.  Shoot the misbehaving process.
212        ProcessDispatcher::GetCurrent()->Kill();
213        return ZX_ERR_BAD_STATE;
214    }
215
216    {
217        AutoReschedDisable resched_disable; // Must come before the lock guard.
218        resched_disable.Disable();
219        Guard<fbl::Mutex> guard{get_lock()};
220
221        if (!peer_) {
222            waiter->EndWait(reply);
223            return ZX_ERR_PEER_CLOSED;
224        }
225
226        // Obtain a txid.  txid 0 is not allowed, and 1..0x7FFFFFFF are reserved
227        // for userspace.  So, bump our counter and OR in the high bit.
228alloc_txid:
229        zx_txid_t txid = (++txid_) | 0x80000000;
230
231        // If there are waiting messages, ensure we have not allocated a txid
232        // that's already in use.  This is unlikely.  It's atypical for multiple
233        // threads to be invoking channel_call() on the same channel at once, so
234        // the waiter list is most commonly empty.
235        for (auto& waiter: waiters_) {
236            if (waiter.get_txid() == txid) {
237                goto alloc_txid;
238            }
239        }
240
241        // Install our txid in the waiter and the outbound message
242        waiter->set_txid(txid);
243        msg->set_txid(txid);
244
245        // (0) Before writing the outbound message and waiting, add our
246        // waiter to the list.
247        waiters_.push_back(waiter);
248
249        // (1) Write outbound message to opposing endpoint.
250        peer_->WriteSelf(fbl::move(msg));
251    }
252
253    // Reuse the code from the half-call used for retrying a Call after thread
254    // suspend.
255    return ResumeInterruptedCall(waiter, deadline, reply);
256}
257
258zx_status_t ChannelDispatcher::ResumeInterruptedCall(MessageWaiter* waiter,
259                                                     zx_time_t deadline,
260                                                     fbl::unique_ptr<MessagePacket>* reply) {
261    canary_.Assert();
262
263    // (2) Wait for notification via waiter's event or for the
264    // deadline to hit.
265    {
266        ThreadDispatcher::AutoBlocked by(ThreadDispatcher::Blocked::CHANNEL);
267
268        zx_status_t status = waiter->Wait(deadline);
269        if (status == ZX_ERR_INTERNAL_INTR_RETRY) {
270            // If we got interrupted, return out to usermode, but
271            // do not clear the waiter.
272            return status;
273        }
274    }
275
276    // (3) see (3A), (3B) above or (3C) below for paths where
277    // the waiter could be signaled and removed from the list.
278    //
279    // If the deadline hits, the waiter is not removed
280    // from the list *but* another thread could still
281    // cause (3A), (3B), or (3C) before the lock below.
282    {
283        Guard<fbl::Mutex> guard{get_lock()};
284
285        // (4) If any of (3A), (3B), or (3C) have occurred,
286        // we were removed from the waiters list already
287        // and EndWait() returns a non-ZX_ERR_TIMED_OUT status.
288        // Otherwise, the status is ZX_ERR_TIMED_OUT and it
289        // is our job to remove the waiter from the list.
290        zx_status_t status = waiter->EndWait(reply);
291        if (status == ZX_ERR_TIMED_OUT)
292            waiters_.erase(*waiter);
293        return status;
294    }
295}
296
297size_t ChannelDispatcher::TxMessageMax() const {
298    return SIZE_MAX;
299}
300
301void ChannelDispatcher::WriteSelf(fbl::unique_ptr<MessagePacket> msg) {
302    canary_.Assert();
303
304    if (!waiters_.is_empty()) {
305        // If the far side is waiting for replies to messages
306        // send via "call", see if this message has a matching
307        // txid to one of the waiters, and if so, deliver it.
308        zx_txid_t txid = msg->get_txid();
309        for (auto& waiter: waiters_) {
310            // (3C) Deliver message to waiter.
311            // Remove waiter from list.
312            if (waiter.get_txid() == txid) {
313                waiters_.erase(waiter);
314                waiter.Deliver(fbl::move(msg));
315                return;
316            }
317        }
318    }
319    messages_.push_back(fbl::move(msg));
320    message_count_++;
321    if (message_count_ > max_message_count_) {
322        max_message_count_ = message_count_;
323    }
324
325    UpdateStateLocked(0u, ZX_CHANNEL_READABLE);
326}
327
328zx_status_t ChannelDispatcher::UserSignalSelf(uint32_t clear_mask, uint32_t set_mask) {
329    canary_.Assert();
330    UpdateStateLocked(clear_mask, set_mask);
331    return ZX_OK;
332}
333
334ChannelDispatcher::MessageWaiter::~MessageWaiter() {
335    if (unlikely(channel_)) {
336        channel_->RemoveWaiter(this);
337    }
338    DEBUG_ASSERT(!InContainer());
339}
340
341zx_status_t ChannelDispatcher::MessageWaiter::BeginWait(fbl::RefPtr<ChannelDispatcher> channel) {
342    if (unlikely(channel_)) {
343        return ZX_ERR_BAD_STATE;
344    }
345    DEBUG_ASSERT(!InContainer());
346
347    status_ = ZX_ERR_TIMED_OUT;
348    channel_ = fbl::move(channel);
349    event_.Unsignal();
350    return ZX_OK;
351}
352
353void ChannelDispatcher::MessageWaiter::Deliver(fbl::unique_ptr<MessagePacket> msg) {
354    DEBUG_ASSERT(channel_);
355
356    msg_ = fbl::move(msg);
357    status_ = ZX_OK;
358    event_.Signal(ZX_OK);
359}
360
361void ChannelDispatcher::MessageWaiter::Cancel(zx_status_t status) {
362    DEBUG_ASSERT(!InContainer());
363    DEBUG_ASSERT(channel_);
364    status_ = status;
365    event_.Signal(status);
366}
367
368zx_status_t ChannelDispatcher::MessageWaiter::Wait(zx_time_t deadline) {
369    if (unlikely(!channel_)) {
370        return ZX_ERR_BAD_STATE;
371    }
372    return event_.Wait(deadline);
373}
374
375// Returns any delivered message via out and the status.
376zx_status_t ChannelDispatcher::MessageWaiter::EndWait(fbl::unique_ptr<MessagePacket>* out) {
377    if (unlikely(!channel_)) {
378        return ZX_ERR_BAD_STATE;
379    }
380    *out = fbl::move(msg_);
381    channel_ = nullptr;
382    return status_;
383}
384