1// Copyright 2017 The Fuchsia Authors
2//
3// Use of this source code is governed by a MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT
6
7#include <object/port_dispatcher.h>
8
9#include <assert.h>
10#include <err.h>
11#include <platform.h>
12#include <pow2.h>
13
14#include <fbl/alloc_checker.h>
15#include <fbl/arena.h>
16#include <fbl/auto_lock.h>
17#include <lib/counters.h>
18#include <object/excp_port.h>
19#include <object/handle.h>
20#include <object/thread_dispatcher.h>
21#include <zircon/compiler.h>
22#include <zircon/rights.h>
23#include <zircon/syscalls/port.h>
24#include <zircon/types.h>
25
26static_assert(sizeof(zx_packet_signal_t) == sizeof(zx_packet_user_t),
27              "size of zx_packet_signal_t must match zx_packet_user_t");
28static_assert(sizeof(zx_packet_exception_t) == sizeof(zx_packet_user_t),
29              "size of zx_packet_exception_t must match zx_packet_user_t");
30static_assert(sizeof(zx_packet_guest_mem_t) == sizeof(zx_packet_user_t),
31              "size of zx_packet_guest_mem_t must match zx_packet_user_t");
32static_assert(sizeof(zx_packet_guest_io_t) == sizeof(zx_packet_user_t),
33              "size of zx_packet_guest_io_t must match zx_packet_user_t");
34static_assert(sizeof(zx_packet_guest_vcpu_t) == sizeof(zx_packet_user_t),
35              "size of zx_packet_guest_vcpu_t must match zx_packet_user_t");
36
37KCOUNTER(port_arena_count, "kernel.port.arena.count");
38KCOUNTER(port_full_count, "kernel.port.full.count");
39
40class ArenaPortAllocator final : public PortAllocator {
41public:
42    zx_status_t Init();
43    virtual ~ArenaPortAllocator() = default;
44
45    virtual PortPacket* Alloc();
46    virtual void Free(PortPacket* port_packet);
47
48private:
49    fbl::TypedArena<PortPacket, fbl::Mutex> arena_;
50};
51
52namespace {
53constexpr size_t kMaxPendingPacketCount = 16 * 1024u;
54
55// TODO(maniscalco): Enforce this limit per process via the job policy.
56constexpr size_t kMaxPendingPacketCountPerPort = kMaxPendingPacketCount / 8;
57ArenaPortAllocator port_allocator;
58} // namespace.
59
60zx_status_t ArenaPortAllocator::Init() {
61    return arena_.Init("packets", kMaxPendingPacketCount);
62}
63
64PortPacket* ArenaPortAllocator::Alloc() {
65    PortPacket* packet = arena_.New(nullptr, this);
66    if (packet == nullptr) {
67        printf("WARNING: Could not allocate new port packet\n");
68        return nullptr;
69    }
70    kcounter_add(port_arena_count, 1);
71    return packet;
72}
73
74void ArenaPortAllocator::Free(PortPacket* port_packet) {
75    arena_.Delete(port_packet);
76    kcounter_add(port_arena_count, -1);
77}
78
79PortPacket::PortPacket(const void* handle, PortAllocator* allocator)
80    : packet{}, handle(handle), observer(nullptr), allocator(allocator) {
81    // Note that packet is initialized to zeros.
82    if (handle) {
83        // Currently |handle| is only valid if the packets are not ephemeral
84        // which means that PortObserver always uses the kernel heap.
85        DEBUG_ASSERT(allocator == nullptr);
86    }
87}
88
89PortObserver::PortObserver(uint32_t type, const Handle* handle, fbl::RefPtr<PortDispatcher> port,
90                           uint64_t key, zx_signals_t signals)
91    : type_(type),
92      trigger_(signals),
93      packet_(handle, nullptr),
94      port_(fbl::move(port)) {
95
96    DEBUG_ASSERT(handle != nullptr);
97
98    auto& packet = packet_.packet;
99    packet.status = ZX_OK;
100    packet.key = key;
101    packet.type = type_;
102    packet.signal.trigger = trigger_;
103}
104
105StateObserver::Flags PortObserver::OnInitialize(zx_signals_t initial_state,
106                                                const StateObserver::CountInfo* cinfo) {
107    uint64_t count = 1u;
108
109    if (cinfo) {
110        for (const auto& entry : cinfo->entry) {
111            if ((entry.signal & trigger_) && (entry.count > 0u)) {
112                count = entry.count;
113                break;
114            }
115        }
116    }
117    return MaybeQueue(initial_state, count);
118}
119
120StateObserver::Flags PortObserver::OnStateChange(zx_signals_t new_state) {
121    return MaybeQueue(new_state, 1u);
122}
123
124StateObserver::Flags PortObserver::OnCancel(const Handle* handle) {
125    if (packet_.handle == handle) {
126        return kHandled | kNeedRemoval;
127    } else {
128        return 0;
129    }
130}
131
132StateObserver::Flags PortObserver::OnCancelByKey(const Handle* handle, const void* port, uint64_t key) {
133    if ((packet_.handle != handle) || (packet_.key() != key) || (port_.get() != port))
134        return 0;
135    return kHandled | kNeedRemoval;
136}
137
138void PortObserver::OnRemoved() {
139    if (port_->CanReap(this, &packet_))
140        delete this;
141}
142
143StateObserver::Flags PortObserver::MaybeQueue(zx_signals_t new_state, uint64_t count) {
144    // Always called with the object state lock being held.
145    if ((trigger_ & new_state) == 0u)
146        return 0;
147
148    // TODO(cpu): Queue() can fail and we don't propagate this information
149    // here properly. Now, this failure is self inflicted because we constrain
150    // the packet arena size artificially.  See ZX-2166 for details.
151    auto status = port_->Queue(&packet_, new_state, count);
152
153    if ((type_ == ZX_PKT_TYPE_SIGNAL_ONE) || (status != ZX_OK))
154        return kNeedRemoval;
155
156    return 0;
157}
158
159/////////////////////////////////////////////////////////////////////////////////////////
160
161void PortDispatcher::Init() {
162    port_allocator.Init();
163}
164
165PortAllocator* PortDispatcher::DefaultPortAllocator() {
166    return &port_allocator;
167}
168
169zx_status_t PortDispatcher::Create(uint32_t options, fbl::RefPtr<Dispatcher>* dispatcher,
170                                   zx_rights_t* rights) {
171    if (options && options != ZX_PORT_BIND_TO_INTERRUPT) {
172        return ZX_ERR_INVALID_ARGS;
173    }
174    fbl::AllocChecker ac;
175    auto disp = new (&ac) PortDispatcher(options);
176    if (!ac.check())
177        return ZX_ERR_NO_MEMORY;
178
179    *rights = ZX_DEFAULT_PORT_RIGHTS;
180    *dispatcher = fbl::AdoptRef<Dispatcher>(disp);
181    return ZX_OK;
182}
183
184PortDispatcher::PortDispatcher(uint32_t options)
185    : options_(options), zero_handles_(false), num_packets_(0u) {
186}
187
188PortDispatcher::~PortDispatcher() {
189    DEBUG_ASSERT(zero_handles_);
190    DEBUG_ASSERT(num_packets_ == 0u);
191}
192
193void PortDispatcher::on_zero_handles() {
194    canary_.Assert();
195
196    Guard<fbl::Mutex> guard{get_lock()};
197    zero_handles_ = true;
198
199    // Unlink and unbind exception ports.
200    while (!eports_.is_empty()) {
201        auto eport = eports_.pop_back();
202
203        // Tell the eport to unbind itself, then drop our ref to it. Called
204        // unlocked because the eport may call our ::UnlinkExceptionPort.
205        guard.CallUnlocked([&eport]() { eport->OnPortZeroHandles(); });
206    }
207
208    // Free any queued packets.
209    while (!packets_.is_empty()) {
210        FreePacket(packets_.pop_front());
211        --num_packets_;
212    }
213}
214
215zx_status_t PortDispatcher::QueueUser(const zx_port_packet_t& packet) {
216    canary_.Assert();
217
218    auto port_packet = port_allocator.Alloc();
219    if (!port_packet)
220        return ZX_ERR_NO_MEMORY;
221
222    port_packet->packet = packet;
223    port_packet->packet.type = ZX_PKT_TYPE_USER;
224
225    auto status = Queue(port_packet, 0u, 0u);
226    if (status != ZX_OK)
227        port_packet->Free();
228    return status;
229}
230
231bool PortDispatcher::RemoveInterruptPacket(PortInterruptPacket* port_packet) {
232    Guard<SpinLock, IrqSave> guard{&spinlock_};
233    if (port_packet->InContainer()) {
234        interrupt_packets_.erase(*port_packet);
235        return true;
236    }
237    return false;
238}
239
240bool PortDispatcher::QueueInterruptPacket(PortInterruptPacket* port_packet, zx_time_t timestamp) {
241    Guard<SpinLock, IrqSave> guard{&spinlock_};
242    if (port_packet->InContainer()) {
243        return false;
244    } else {
245        port_packet->timestamp = timestamp;
246        interrupt_packets_.push_back(port_packet);
247        sema_.Post();
248        return true;
249    }
250}
251
252zx_status_t PortDispatcher::Queue(PortPacket* port_packet, zx_signals_t observed, uint64_t count) {
253    canary_.Assert();
254
255    AutoReschedDisable resched_disable; // Must come before the lock guard.
256    Guard<fbl::Mutex> guard{get_lock()};
257    if (zero_handles_)
258        return ZX_ERR_BAD_STATE;
259
260    if (num_packets_ > kMaxPendingPacketCountPerPort) {
261        kcounter_add(port_full_count, 1);
262        return ZX_ERR_SHOULD_WAIT;
263    }
264
265    if (observed) {
266        if (port_packet->InContainer()) {
267            port_packet->packet.signal.observed |= observed;
268            // |count| is deliberately left as is.
269            return ZX_OK;
270        }
271        port_packet->packet.signal.observed = observed;
272        port_packet->packet.signal.count = count;
273    }
274    packets_.push_back(port_packet);
275    ++num_packets_;
276    // This Disable() call must come before Post() to be useful, but doing
277    // it earlier would also be OK.
278    resched_disable.Disable();
279    sema_.Post();
280
281    return ZX_OK;
282}
283
284zx_status_t PortDispatcher::Dequeue(zx_time_t deadline, zx_port_packet_t* out_packet) {
285    canary_.Assert();
286
287    while (true) {
288        if (options_ == ZX_PORT_BIND_TO_INTERRUPT) {
289            Guard<SpinLock, IrqSave> guard{&spinlock_};
290            PortInterruptPacket* port_interrupt_packet = interrupt_packets_.pop_front();
291            if (port_interrupt_packet != nullptr) {
292                *out_packet = {};
293                out_packet->key = port_interrupt_packet->key;
294                out_packet->type = ZX_PKT_TYPE_INTERRUPT;
295                out_packet->status = ZX_OK;
296                out_packet->interrupt.timestamp = port_interrupt_packet->timestamp;
297                return ZX_OK;
298            }
299        }
300        {
301            Guard<fbl::Mutex> guard{get_lock()};
302            PortPacket* port_packet = packets_.pop_front();
303            if (port_packet != nullptr) {
304                --num_packets_;
305                *out_packet = port_packet->packet;
306                FreePacket(port_packet);
307                return ZX_OK;
308            }
309        }
310
311        {
312            ThreadDispatcher::AutoBlocked by(ThreadDispatcher::Blocked::PORT);
313            zx_status_t st = sema_.Wait(deadline);
314            if (st != ZX_OK)
315                return st;
316        }
317    }
318}
319
320void PortDispatcher::FreePacket(PortPacket* port_packet) {
321    PortObserver* observer = port_packet->observer;
322
323    if (observer) {
324        // Deleting the observer under the lock is fine because the
325        // reference that holds to this PortDispatcher is by construction
326        // not the last one. We need to do this under the lock because
327        // another thread can call CanReap().
328        delete observer;
329    } else if (port_packet->is_ephemeral()) {
330        port_packet->Free();
331    }
332}
333
334bool PortDispatcher::CanReap(PortObserver* observer, PortPacket* port_packet) {
335    canary_.Assert();
336
337    Guard<fbl::Mutex> guard{get_lock()};
338    if (!port_packet->InContainer())
339        return true;
340    // The destruction will happen when the packet is dequeued or in CancelQueued()
341    DEBUG_ASSERT(port_packet->observer == nullptr);
342    port_packet->observer = observer;
343    return false;
344}
345
346zx_status_t PortDispatcher::MakeObserver(uint32_t options, Handle* handle, uint64_t key,
347                                         zx_signals_t signals) {
348    canary_.Assert();
349
350    // Called under the handle table lock.
351
352    auto dispatcher = handle->dispatcher();
353    if (!dispatcher->has_state_tracker())
354        return ZX_ERR_NOT_SUPPORTED;
355
356    uint32_t type;
357    switch (options) {
358        case ZX_WAIT_ASYNC_ONCE:
359            type = ZX_PKT_TYPE_SIGNAL_ONE;
360            break;
361        case ZX_WAIT_ASYNC_REPEATING:
362            type = ZX_PKT_TYPE_SIGNAL_REP;
363            break;
364        default:
365            return ZX_ERR_INVALID_ARGS;
366    }
367
368    fbl::AllocChecker ac;
369    auto observer = new (&ac) PortObserver(type, handle, fbl::RefPtr<PortDispatcher>(this), key,
370                                           signals);
371    if (!ac.check())
372        return ZX_ERR_NO_MEMORY;
373
374    dispatcher->add_observer(observer);
375    return ZX_OK;
376}
377
378bool PortDispatcher::CancelQueued(const void* handle, uint64_t key) {
379    canary_.Assert();
380
381    Guard<fbl::Mutex> guard{get_lock()};
382
383    // This loop can take a while if there are many items.
384    // In practice, the number of pending signal packets is
385    // approximately the number of signaled _and_ watched
386    // objects plus the number of pending user-queued
387    // packets.
388    //
389    // There are two strategies to deal with too much
390    // looping here if that is seen in practice.
391    //
392    // 1. Swap the |packets_| list for an empty list and
393    //    release the lock. New arriving packets are
394    //    added to the empty list while the loop happens.
395    //    Readers will be blocked but the watched objects
396    //    will be fully operational. Once processing
397    //    is done the lists are appended.
398    //
399    // 2. Segregate user packets from signal packets
400    //    and deliver them in order via timestamps or
401    //    a side structure.
402
403    bool packet_removed = false;
404
405    for (auto it = packets_.begin(); it != packets_.end();) {
406        if ((it->handle == handle) && (it->key() == key)) {
407            auto to_remove = it++;
408            delete packets_.erase(to_remove)->observer;
409            --num_packets_;
410            packet_removed = true;
411        } else {
412            ++it;
413        }
414    }
415
416    return packet_removed;
417}
418
419void PortDispatcher::LinkExceptionPort(ExceptionPort* eport) {
420    canary_.Assert();
421
422    Guard<fbl::Mutex> guard{get_lock()};
423    DEBUG_ASSERT_COND(eport->PortMatches(this, /* allow_null */ false));
424    DEBUG_ASSERT(!eport->InContainer());
425    eports_.push_back(fbl::move(AdoptRef(eport)));
426}
427
428void PortDispatcher::UnlinkExceptionPort(ExceptionPort* eport) {
429    canary_.Assert();
430
431    Guard<fbl::Mutex> guard{get_lock()};
432    DEBUG_ASSERT_COND(eport->PortMatches(this, /* allow_null */ true));
433    if (eport->InContainer()) {
434        eports_.erase(*eport);
435    }
436}
437