1// Copyright 2017 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Notes on buffering modes
6// ------------------------
7//
8// Threads and strings are cached to improve performance and reduce buffer
9// usage. The caching involves emitting separate records that identify
10// threads/strings and then refering to them by a numeric id. For performance
11// each thread in the application maintains its own cache.
12//
13// Oneshot: The trace buffer is just one large buffer, and records are written
14// until the buffer is full after which all further records are dropped.
15//
16// Circular:
17// The trace buffer is effectively divided into two pieces, and tracing begins
18// by writing to the first piece. Once one buffer fills we start writing to the
19// other one. This results in half the buffer being dropped at every switch,
20// but simplifies things because we don't have to worry about varying record
21// lengths.
22//
23// Streaming:
24// The trace buffer is effectively divided into two pieces, and tracing begins
25// by writing to the first piece. Once one buffer fills we start writing to the
26// other buffer, if it is available, and notify the handler that the buffer is
27// full. If the other buffer is not available, then records are dropped until
28// it becomes available. The other buffer is unavailable between the point when
29// it filled and when the handler reports back that the buffer's contents have
30// been saved.
31//
32// There are two important properties we wish to preserve in circular and
33// streaming modes:
34// 1) We don't want records describing threads and strings to be dropped:
35// otherwise records refering to them will have nothing to refer to.
36// 2) We don't want thread records to be dropped at all: Fidelity of recording
37// of all traced threads is important, even if some of their records are
38// dropped.
39// To implement both (1) and (2) we introduce a third buffer that holds
40// records we don't want to drop called the "durable buffer". Threads and
41// small strings are recorded there. The two buffers holding normal trace
42// output are called "rolling buffers", as they fill we roll from one to the
43// next. Thread and string records typically aren't very large, the durable
44// buffer can hold a lot of records. To keep things simple, until there's a
45// compelling reason to do something more, once the durable buffer fills
46// tracing effectively stops, and all further records are dropped.
47// Note: The term "rolling buffer" is intended to be internal to the trace
48// engine/reader/manager and is not intended to appear in public APIs
49// (at least not today).
50//
51// The protocol between the trace engine and the handler for saving buffers in
52// streaming mode is as follows:
53// 1) Buffer fills -> handler gets notified via
54//    |trace_handler_ops::notify_buffer_full()|. Two arguments are passed
55//    along with this request:
56//    |wrapped_count| - records how many times tracing has wrapped from one
57//    buffer to the next, and also records the current buffer which is the one
58//    needing saving. Since there are two rolling buffers, the buffer to save
59//    is |wrapped_count & 1|.
60//    |durable_data_end| - records how much data has been written to the
61//    durable buffer thus far. This data needs to be written before data from
62//    the rolling buffers is written so string and thread references work.
63// 2) The handler receives the "notify_buffer_full" request.
64// 3) The handler saves new durable data since the last time, saves the
65//    rolling buffer, and replies back to the engine via
66//    |trace_engine_mark_buffer_saved()|.
67// 4) The engine receives this notification and marks the buffer as now empty.
68//    The next time the engine tries to allocate space from this buffer it will
69//    succeed.
70// Note that the handler is free to save buffers at whatever rate it can
71// manage. The protocol allows for records to be dropped if buffers can't be
72// saved fast enough.
73
74#include "context_impl.h"
75
76#include <assert.h>
77#include <inttypes.h>
78
79#include <fbl/atomic.h>
80#include <fbl/auto_lock.h>
81#include <trace-engine/fields.h>
82#include <trace-engine/handler.h>
83
84namespace trace {
85namespace {
86
87// The next context generation number.
88fbl::atomic<uint32_t> g_next_generation{1u};
89
90} // namespace
91} // namespace trace
92
93trace_context::trace_context(void* buffer, size_t buffer_num_bytes,
94                             trace_buffering_mode_t buffering_mode,
95                             trace_handler_t* handler)
96    : generation_(trace::g_next_generation.fetch_add(1u, fbl::memory_order_relaxed) + 1u),
97      buffering_mode_(buffering_mode),
98      buffer_start_(reinterpret_cast<uint8_t*>(buffer)),
99      buffer_end_(buffer_start_ + buffer_num_bytes),
100      header_(reinterpret_cast<trace_buffer_header*>(buffer)),
101      handler_(handler) {
102    ZX_DEBUG_ASSERT(buffer_num_bytes >= kMinPhysicalBufferSize);
103    ZX_DEBUG_ASSERT(buffer_num_bytes <= kMaxPhysicalBufferSize);
104    ZX_DEBUG_ASSERT(generation_ != 0u);
105    ComputeBufferSizes();
106}
107
108trace_context::~trace_context() = default;
109
110uint64_t* trace_context::AllocRecord(size_t num_bytes) {
111    ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
112    if (unlikely(num_bytes > TRACE_ENCODED_RECORD_MAX_LENGTH))
113        return nullptr;
114    static_assert(TRACE_ENCODED_RECORD_MAX_LENGTH < kMaxRollingBufferSize, "");
115
116    // For the circular and streaming cases, try at most once for each buffer.
117    // Note: Keep the normal case of one successful pass the fast path.
118    // E.g., We don't do a mode comparison unless we have to.
119
120    for (int iter = 0; iter < 2; ++iter) {
121        // TODO(dje): This can be optimized a bit. Later.
122        uint64_t offset_plus_counter =
123            rolling_buffer_current_.fetch_add(num_bytes,
124                                              fbl::memory_order_relaxed);
125        uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
126        int buffer_number = GetBufferNumber(wrapped_count);
127        uint64_t buffer_offset  = GetBufferOffset(offset_plus_counter);
128        // Note: There's no worry of an overflow in the calcs here.
129        if (likely(buffer_offset + num_bytes <= rolling_buffer_size_)) {
130            uint8_t* ptr = rolling_buffer_start_[buffer_number] + buffer_offset;
131            return reinterpret_cast<uint64_t*>(ptr); // success!
132        }
133
134        // Buffer is full!
135
136        switch (buffering_mode_) {
137        case TRACE_BUFFERING_MODE_ONESHOT:
138            ZX_DEBUG_ASSERT(iter == 0);
139            ZX_DEBUG_ASSERT(wrapped_count == 0);
140            ZX_DEBUG_ASSERT(buffer_number == 0);
141            MarkOneshotBufferFull(buffer_offset);
142            return nullptr;
143        case TRACE_BUFFERING_MODE_STREAMING: {
144            MarkRollingBufferFull(wrapped_count, buffer_offset);
145            // If the TraceManager is slow in saving buffers we could get
146            // here a lot. Do a quick check and early exit for this case.
147            if (unlikely(!IsOtherRollingBufferReady(buffer_number))) {
148                MarkRecordDropped();
149                StreamingBufferFullCheck(wrapped_count, buffer_offset);
150                return nullptr;
151            }
152            break;
153        }
154        case TRACE_BUFFERING_MODE_CIRCULAR:
155            MarkRollingBufferFull(wrapped_count, buffer_offset);
156            break;
157        default:
158            __UNREACHABLE;
159        }
160
161        if (iter == 1) {
162            // Second time through. We tried one buffer, it was full.
163            // We then switched to the other buffer, which was empty at
164            // the time, and now it is full too. This is technically
165            // possible in either circular or streaming modes, but rare.
166            // There are two possibilities here:
167            // 1) Keep trying (gated by some means).
168            // 2) Drop the record.
169            // In order to not introduce excessive latency into the app
170            // we choose (2). To assist the developer we at least provide
171            // a record that this happened, but since it's rare we keep
172            // it simple and maintain just a global count and no time
173            // information.
174            num_records_dropped_after_buffer_switch_.fetch_add(1, fbl::memory_order_relaxed);
175            return nullptr;
176        }
177
178        if (!SwitchRollingBuffer(wrapped_count, buffer_offset)) {
179            MarkRecordDropped();
180            return nullptr;
181        }
182
183        // Loop and try again.
184    }
185
186    __UNREACHABLE;
187}
188
189void trace_context::StreamingBufferFullCheck(uint32_t wrapped_count,
190                                             uint64_t buffer_offset) {
191    // We allow the current offset to grow and grow as each
192    // new tracing request is made: It's a trade-off to not penalize
193    // performance in this case. The number of counter bits is enough
194    // to not make this a concern: See the comments for
195    // |kUsableBufferOffsetBits|.
196    //
197    // As an absolute paranoia check, if the current buffer offset
198    // approaches overflow, grab the lock and snap the offset back
199    // to the end of the buffer. We grab the lock so that the
200    // buffer can't change while we're doing this.
201    if (unlikely(buffer_offset > MaxUsableBufferOffset())) {
202        fbl::AutoLock lock(&buffer_switch_mutex_);
203        uint32_t current_wrapped_count = CurrentWrappedCount();
204        if (GetBufferNumber(current_wrapped_count) ==
205            GetBufferNumber(wrapped_count)) {
206            SnapToEnd(wrapped_count);
207        }
208    }
209}
210
211// Returns false if there's some reason to not record this record.
212
213bool trace_context::SwitchRollingBuffer(uint32_t wrapped_count,
214                                        uint64_t buffer_offset) {
215    // While atomic variables are used to track things, we switch
216    // buffers under the lock due to multiple pieces of state being
217    // changed.
218    fbl::AutoLock lock(&buffer_switch_mutex_);
219
220    // If the durable buffer happened to fill while we were waiting for
221    // the lock we're done.
222    if (unlikely(tracing_artificially_stopped_)) {
223        return false;
224    }
225
226    uint32_t current_wrapped_count = CurrentWrappedCount();
227    // Anything allocated to the durable buffer after this point
228    // won't be for this buffer. This is racy, but all we need is
229    // some usable value for where the durable pointer is.
230    uint64_t durable_data_end = DurableBytesAllocated();
231
232    ZX_DEBUG_ASSERT(wrapped_count <= current_wrapped_count);
233    if (likely(wrapped_count == current_wrapped_count)) {
234        // Haven't switched buffers yet.
235        if (buffering_mode_ == TRACE_BUFFERING_MODE_STREAMING) {
236            // Is the other buffer ready?
237            if (!IsOtherRollingBufferReady(GetBufferNumber(wrapped_count))) {
238                // Nope. There are two possibilities here:
239                // 1) Wait for the other buffer to be saved.
240                // 2) Start dropping records until the other buffer is
241                //    saved.
242                // In order to not introduce excessive latency into the
243                // app we choose (2). To assist the developer we at
244                // least provide a record that indicates the window
245                // during which we dropped records.
246                // TODO(dje): Maybe have a future mode where we block
247                // until there's space. This is useful during some
248                // kinds of debugging: Something is going wrong and we
249                // care less about performance and more about keeping
250                // data, and the dropped data may be the clue to find
251                // the bug.
252                return false;
253            }
254
255            SwitchRollingBufferLocked(wrapped_count, buffer_offset);
256
257            // Notify the handler so it starts saving the buffer if
258            // we're in streaming mode.
259            // Note: The actual notification must be done *after*
260            // updating the buffer header: we need trace_manager to
261            // see the updates. The handler will get notified on the
262            // engine's async loop (and thus can't call back into us
263            // while we still have the lock).
264            NotifyRollingBufferFullLocked(wrapped_count, durable_data_end);
265        } else {
266            SwitchRollingBufferLocked(wrapped_count, buffer_offset);
267        }
268    } else {
269        // Someone else switched buffers while we were trying to obtain
270        // the lock. Nothing to do here.
271    }
272
273    return true;
274}
275
276uint64_t* trace_context::AllocDurableRecord(size_t num_bytes) {
277    ZX_DEBUG_ASSERT(UsingDurableBuffer());
278    ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
279
280    uint64_t buffer_offset =
281        durable_buffer_current_.fetch_add(num_bytes,
282                                          fbl::memory_order_relaxed);
283    if (likely(buffer_offset + num_bytes <= durable_buffer_size_)) {
284        uint8_t* ptr = durable_buffer_start_ + buffer_offset;
285        return reinterpret_cast<uint64_t*>(ptr); // success!
286    }
287
288    // Buffer is full!
289    MarkDurableBufferFull(buffer_offset);
290
291    return nullptr;
292}
293
294bool trace_context::AllocThreadIndex(trace_thread_index_t* out_index) {
295    trace_thread_index_t index = next_thread_index_.fetch_add(1u, fbl::memory_order_relaxed);
296    if (unlikely(index > TRACE_ENCODED_THREAD_REF_MAX_INDEX)) {
297        // Guard again possible wrapping.
298        next_thread_index_.store(TRACE_ENCODED_THREAD_REF_MAX_INDEX + 1u,
299                                 fbl::memory_order_relaxed);
300        return false;
301    }
302    *out_index = index;
303    return true;
304}
305
306bool trace_context::AllocStringIndex(trace_string_index_t* out_index) {
307    trace_string_index_t index = next_string_index_.fetch_add(1u, fbl::memory_order_relaxed);
308    if (unlikely(index > TRACE_ENCODED_STRING_REF_MAX_INDEX)) {
309        // Guard again possible wrapping.
310        next_string_index_.store(TRACE_ENCODED_STRING_REF_MAX_INDEX + 1u,
311                                 fbl::memory_order_relaxed);
312        return false;
313    }
314    *out_index = index;
315    return true;
316}
317
318void trace_context::ComputeBufferSizes() {
319    size_t full_buffer_size = buffer_end_ - buffer_start_;
320    ZX_DEBUG_ASSERT(full_buffer_size >= kMinPhysicalBufferSize);
321    ZX_DEBUG_ASSERT(full_buffer_size <= kMaxPhysicalBufferSize);
322    size_t header_size = sizeof(trace_buffer_header);
323    switch (buffering_mode_) {
324    case TRACE_BUFFERING_MODE_ONESHOT:
325        // Create one big buffer, where durable and non-durable records share
326        // the same buffer. There is no separate buffer for durable records.
327        durable_buffer_start_ = nullptr;
328        durable_buffer_size_ = 0;
329        rolling_buffer_start_[0] = buffer_start_ + header_size;
330        rolling_buffer_size_ = full_buffer_size - header_size;
331        // The second rolling buffer is not used.
332        rolling_buffer_start_[1] = nullptr;
333        break;
334    case TRACE_BUFFERING_MODE_CIRCULAR:
335    case TRACE_BUFFERING_MODE_STREAMING: {
336        // Rather than make things more complex on the user, at least for now,
337        // we choose the sizes of the durable and rolling buffers.
338        // Note: The durable buffer must have enough space for at least
339        // the initialization record.
340        // TODO(dje): The current choices are wip.
341        uint64_t avail = full_buffer_size - header_size;
342        uint64_t durable_buffer_size = GET_DURABLE_BUFFER_SIZE(avail);
343        if (durable_buffer_size > kMaxDurableBufferSize)
344            durable_buffer_size = kMaxDurableBufferSize;
345        // Further adjust |durable_buffer_size| to ensure all buffers are a
346        // multiple of 8. |full_buffer_size| is guaranteed by
347        // |trace_start_engine()| to be a multiple of 4096. We only assume
348        // header_size is a multiple of 8. In order for rolling_buffer_size
349        // to be a multiple of 8 we need (avail - durable_buffer_size) to be a
350        // multiple of 16. Round durable_buffer_size up as necessary.
351        uint64_t off_by = (avail - durable_buffer_size) & 15;
352        ZX_DEBUG_ASSERT(off_by == 0 || off_by == 8);
353        durable_buffer_size += off_by;
354        ZX_DEBUG_ASSERT((durable_buffer_size & 7) == 0);
355        // The value of |kMinPhysicalBufferSize| ensures this:
356        ZX_DEBUG_ASSERT(durable_buffer_size >= kMinDurableBufferSize);
357        uint64_t rolling_buffer_size = (avail - durable_buffer_size) / 2;
358        ZX_DEBUG_ASSERT((rolling_buffer_size & 7) == 0);
359        // We need to maintain the invariant that the entire buffer is used.
360        // This works if the buffer size is a multiple of
361        // sizeof(trace_buffer_header), which is true since the buffer is a
362        // vmo (some number of 4K pages).
363        ZX_DEBUG_ASSERT(durable_buffer_size + 2 * rolling_buffer_size == avail);
364        durable_buffer_start_ = buffer_start_ + header_size;
365        durable_buffer_size_ = durable_buffer_size;
366        rolling_buffer_start_[0] = durable_buffer_start_ + durable_buffer_size_;
367        rolling_buffer_start_[1] = rolling_buffer_start_[0] + rolling_buffer_size;
368        rolling_buffer_size_ = rolling_buffer_size;
369        break;
370    }
371    default:
372        __UNREACHABLE;
373    }
374
375    durable_buffer_current_.store(0);
376    durable_buffer_full_mark_.store(0);
377    rolling_buffer_current_.store(0);
378    rolling_buffer_full_mark_[0].store(0);
379    rolling_buffer_full_mark_[1].store(0);
380}
381
382void trace_context::InitBufferHeader() {
383    memset(header_, 0, sizeof(*header_));
384
385    header_->magic = TRACE_BUFFER_HEADER_MAGIC;
386    header_->version = TRACE_BUFFER_HEADER_V0;
387    header_->buffering_mode = static_cast<uint8_t>(buffering_mode_);
388    header_->total_size = buffer_end_ - buffer_start_;
389    header_->durable_buffer_size = durable_buffer_size_;
390    header_->rolling_buffer_size = rolling_buffer_size_;
391}
392
393void trace_context::UpdateBufferHeaderAfterStopped() {
394    // If the buffer filled, then the current pointer is "snapped" to the end.
395    // Therefore in that case we need to use the buffer_full_mark.
396    uint64_t durable_last_offset = durable_buffer_current_.load(fbl::memory_order_relaxed);
397    uint64_t durable_buffer_full_mark = durable_buffer_full_mark_.load(fbl::memory_order_relaxed);
398    if (durable_buffer_full_mark != 0)
399        durable_last_offset = durable_buffer_full_mark;
400    header_->durable_data_end = durable_last_offset;
401
402    uint64_t offset_plus_counter =
403        rolling_buffer_current_.load(fbl::memory_order_relaxed);
404    uint64_t last_offset = GetBufferOffset(offset_plus_counter);
405    uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
406    header_->wrapped_count = wrapped_count;
407    int buffer_number = GetBufferNumber(wrapped_count);
408    uint64_t buffer_full_mark = rolling_buffer_full_mark_[buffer_number].load(fbl::memory_order_relaxed);
409    if (buffer_full_mark != 0)
410        last_offset = buffer_full_mark;
411    header_->rolling_data_end[buffer_number] = last_offset;
412
413    header_->num_records_dropped = num_records_dropped();
414}
415
416size_t trace_context::RollingBytesAllocated() const {
417    switch (buffering_mode_) {
418    case TRACE_BUFFERING_MODE_ONESHOT: {
419        // There is a window during the processing of buffer-full where
420        // |rolling_buffer_current_| may point beyond the end of the buffer.
421        // This is ok, we don't promise anything better.
422        uint64_t full_bytes = rolling_buffer_full_mark_[0].load(fbl::memory_order_relaxed);
423        if (full_bytes != 0)
424            return full_bytes;
425        return rolling_buffer_current_.load(fbl::memory_order_relaxed);
426    }
427    case TRACE_BUFFERING_MODE_CIRCULAR:
428    case TRACE_BUFFERING_MODE_STREAMING: {
429        // Obtain the lock so that the buffers aren't switched on us while
430        // we're trying to compute the total.
431        fbl::AutoLock lock(&buffer_switch_mutex_);
432        uint64_t offset_plus_counter =
433            rolling_buffer_current_.load(fbl::memory_order_relaxed);
434        uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
435        int buffer_number = GetBufferNumber(wrapped_count);
436        // Note: If we catch things at the point where the buffer has
437        // filled, but before we swap buffers, then |buffer_offset| can point
438        // beyond the end. This is ok, we don't promise anything better.
439        uint64_t buffer_offset  = GetBufferOffset(offset_plus_counter);
440        if (wrapped_count == 0)
441            return buffer_offset;
442        // We've wrapped at least once, so the other buffer's "full mark"
443        // must be set. However, it may be zero if streaming and we happened
444        // to stop at a point where the buffer was saved, and hasn't
445        // subsequently been written to.
446        uint64_t full_mark_other_buffer = rolling_buffer_full_mark_[!buffer_number].load(fbl::memory_order_relaxed);
447        return full_mark_other_buffer + buffer_offset;
448    }
449    default:
450        __UNREACHABLE;
451    }
452}
453
454size_t trace_context::DurableBytesAllocated() const {
455    // Note: This will return zero in oneshot mode (as it should).
456    uint64_t offset = durable_buffer_full_mark_.load(fbl::memory_order_relaxed);
457    if (offset == 0)
458        offset = durable_buffer_current_.load(fbl::memory_order_relaxed);
459    return offset;
460}
461
462void trace_context::MarkDurableBufferFull(uint64_t last_offset) {
463    // Snap to the endpoint to reduce likelihood of pointer wrap-around.
464    // Otherwise each new attempt fill continually increase the offset.
465    durable_buffer_current_.store(reinterpret_cast<uint64_t>(durable_buffer_size_),
466                                  fbl::memory_order_relaxed);
467
468    // Mark the end point if not already marked.
469    uintptr_t expected_mark = 0u;
470    if (durable_buffer_full_mark_.compare_exchange_strong(
471            &expected_mark, last_offset,
472            fbl::memory_order_relaxed, fbl::memory_order_relaxed)) {
473        fprintf(stderr, "TraceEngine: durable buffer full @offset %" PRIu64 "\n",
474                last_offset);
475        header_->durable_data_end = last_offset;
476
477        // A record may be written that relies on this durable record.
478        // To preserve data integrity, we disable all further tracing.
479        // There is a small window where a non-durable record could get
480        // emitted that depends on this durable record. It's rare
481        // enough and inconsequential enough that we ignore it.
482        // TODO(dje): Another possibility is we could let tracing
483        // continue and start allocating future durable records in the
484        // rolling buffers, and accept potentially lost durable
485        // records. Another possibility is to remove the durable buffer,
486        // and, say, have separate caches for each rolling buffer.
487        MarkTracingArtificiallyStopped();
488    }
489}
490
491void trace_context::MarkOneshotBufferFull(uint64_t last_offset) {
492    SnapToEnd(0);
493
494    // Mark the end point if not already marked.
495    uintptr_t expected_mark = 0u;
496    if (rolling_buffer_full_mark_[0].compare_exchange_strong(
497            &expected_mark, last_offset,
498            fbl::memory_order_relaxed, fbl::memory_order_relaxed)) {
499        header_->rolling_data_end[0] = last_offset;
500    }
501
502    MarkRecordDropped();
503}
504
505void trace_context::MarkRollingBufferFull(uint32_t wrapped_count, uint64_t last_offset) {
506    // Mark the end point if not already marked.
507    int buffer_number = GetBufferNumber(wrapped_count);
508    uint64_t expected_mark = 0u;
509    if (rolling_buffer_full_mark_[buffer_number].compare_exchange_strong(
510            &expected_mark, last_offset,
511            fbl::memory_order_relaxed, fbl::memory_order_relaxed)) {
512        header_->rolling_data_end[buffer_number] = last_offset;
513    }
514}
515
516void trace_context::SwitchRollingBufferLocked(uint32_t prev_wrapped_count,
517                                              uint64_t prev_last_offset) {
518    // This has already done in streaming mode when the buffer was marked as
519    // saved, but hasn't been done yet for circular mode. KISS and just do it
520    // again. It's ok to do again as we don't resume allocating trace records
521    // until we update |rolling_buffer_current_|.
522    uint32_t new_wrapped_count = prev_wrapped_count + 1;
523    int next_buffer = GetBufferNumber(new_wrapped_count);
524    rolling_buffer_full_mark_[next_buffer].store(0, fbl::memory_order_relaxed);
525    header_->rolling_data_end[next_buffer] = 0;
526
527    // Do this last: After this tracing resumes in the new buffer.
528    uint64_t new_offset_plus_counter = MakeOffsetPlusCounter(0, new_wrapped_count);
529    rolling_buffer_current_.store(new_offset_plus_counter,
530                                     fbl::memory_order_relaxed);
531}
532
533void trace_context::MarkTracingArtificiallyStopped() {
534    // Grab the lock in part so that we don't switch buffers between
535    // |CurrentWrappedCount()| and |SnapToEnd()|.
536    fbl::AutoLock lock(&buffer_switch_mutex_);
537
538    // Disable tracing by making it look like the current rolling
539    // buffer is full. AllocRecord, on seeing the buffer is full, will
540    // then check |tracing_artificially_stopped_|.
541    tracing_artificially_stopped_ = true;
542    SnapToEnd(CurrentWrappedCount());
543}
544
545void trace_context::NotifyRollingBufferFullLocked(uint32_t wrapped_count,
546                                                  uint64_t durable_data_end) {
547    // The notification is handled on the engine's event loop as
548    // we need this done outside of the lock: Certain handlers
549    // (e.g., trace-benchmark) just want to immediately call
550    // |trace_engine_mark_buffer_saved()| which wants to reacquire
551    // the lock. Secondly, if we choose to wait until the buffer context is
552    // released before notifying the handler then we can't do so now as we
553    // still have a reference to the buffer context.
554    trace_engine_request_save_buffer(wrapped_count, durable_data_end);
555}
556
557void trace_context::HandleSaveRollingBufferRequest(uint32_t wrapped_count,
558                                                   uint64_t durable_data_end) {
559    // TODO(dje): An open issue is solving the problem of TraceManager
560    // prematurely reading the buffer: We know the buffer is full, but
561    // the only way we know existing writers have completed is when
562    // they release their trace context. Fortunately we know when all
563    // context acquisitions for the purpose of writing to the buffer
564    // have been released. The question is how to use this info.
565    // For now we punt the problem to the handler. Ultimately we could
566    // provide callers with a way to wait, and have trace_release_context()
567    // check for waiters and if any are present send a signal like it does
568    // for SIGNAL_CONTEXT_RELEASED.
569    handler_->ops->notify_buffer_full(handler_, wrapped_count,
570                                      durable_data_end);
571}
572
573void trace_context::MarkRollingBufferSaved(uint32_t wrapped_count,
574                                           uint64_t durable_data_end) {
575    fbl::AutoLock lock(&buffer_switch_mutex_);
576
577    int buffer_number = GetBufferNumber(wrapped_count);
578    {
579        // TODO(dje): Manage bad responses from TraceManager.
580        int current_buffer_number = GetBufferNumber(GetWrappedCount(rolling_buffer_current_.load(fbl::memory_order_relaxed)));
581        ZX_DEBUG_ASSERT(buffer_number != current_buffer_number);
582    }
583    rolling_buffer_full_mark_[buffer_number].store(0, fbl::memory_order_relaxed);
584    header_->rolling_data_end[buffer_number] = 0;
585    // Don't update |rolling_buffer_current_| here, that is done when we
586    // successfully allocate the next record. Until then we want to keep
587    // dropping records.
588}
589