1// Copyright 2017 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5// Notes on buffering modes 6// ------------------------ 7// 8// Threads and strings are cached to improve performance and reduce buffer 9// usage. The caching involves emitting separate records that identify 10// threads/strings and then refering to them by a numeric id. For performance 11// each thread in the application maintains its own cache. 12// 13// Oneshot: The trace buffer is just one large buffer, and records are written 14// until the buffer is full after which all further records are dropped. 15// 16// Circular: 17// The trace buffer is effectively divided into two pieces, and tracing begins 18// by writing to the first piece. Once one buffer fills we start writing to the 19// other one. This results in half the buffer being dropped at every switch, 20// but simplifies things because we don't have to worry about varying record 21// lengths. 22// 23// Streaming: 24// The trace buffer is effectively divided into two pieces, and tracing begins 25// by writing to the first piece. Once one buffer fills we start writing to the 26// other buffer, if it is available, and notify the handler that the buffer is 27// full. If the other buffer is not available, then records are dropped until 28// it becomes available. The other buffer is unavailable between the point when 29// it filled and when the handler reports back that the buffer's contents have 30// been saved. 31// 32// There are two important properties we wish to preserve in circular and 33// streaming modes: 34// 1) We don't want records describing threads and strings to be dropped: 35// otherwise records refering to them will have nothing to refer to. 36// 2) We don't want thread records to be dropped at all: Fidelity of recording 37// of all traced threads is important, even if some of their records are 38// dropped. 39// To implement both (1) and (2) we introduce a third buffer that holds 40// records we don't want to drop called the "durable buffer". Threads and 41// small strings are recorded there. The two buffers holding normal trace 42// output are called "rolling buffers", as they fill we roll from one to the 43// next. Thread and string records typically aren't very large, the durable 44// buffer can hold a lot of records. To keep things simple, until there's a 45// compelling reason to do something more, once the durable buffer fills 46// tracing effectively stops, and all further records are dropped. 47// Note: The term "rolling buffer" is intended to be internal to the trace 48// engine/reader/manager and is not intended to appear in public APIs 49// (at least not today). 50// 51// The protocol between the trace engine and the handler for saving buffers in 52// streaming mode is as follows: 53// 1) Buffer fills -> handler gets notified via 54// |trace_handler_ops::notify_buffer_full()|. Two arguments are passed 55// along with this request: 56// |wrapped_count| - records how many times tracing has wrapped from one 57// buffer to the next, and also records the current buffer which is the one 58// needing saving. Since there are two rolling buffers, the buffer to save 59// is |wrapped_count & 1|. 60// |durable_data_end| - records how much data has been written to the 61// durable buffer thus far. This data needs to be written before data from 62// the rolling buffers is written so string and thread references work. 63// 2) The handler receives the "notify_buffer_full" request. 64// 3) The handler saves new durable data since the last time, saves the 65// rolling buffer, and replies back to the engine via 66// |trace_engine_mark_buffer_saved()|. 67// 4) The engine receives this notification and marks the buffer as now empty. 68// The next time the engine tries to allocate space from this buffer it will 69// succeed. 70// Note that the handler is free to save buffers at whatever rate it can 71// manage. The protocol allows for records to be dropped if buffers can't be 72// saved fast enough. 73 74#include "context_impl.h" 75 76#include <assert.h> 77#include <inttypes.h> 78 79#include <fbl/atomic.h> 80#include <fbl/auto_lock.h> 81#include <trace-engine/fields.h> 82#include <trace-engine/handler.h> 83 84namespace trace { 85namespace { 86 87// The next context generation number. 88fbl::atomic<uint32_t> g_next_generation{1u}; 89 90} // namespace 91} // namespace trace 92 93trace_context::trace_context(void* buffer, size_t buffer_num_bytes, 94 trace_buffering_mode_t buffering_mode, 95 trace_handler_t* handler) 96 : generation_(trace::g_next_generation.fetch_add(1u, fbl::memory_order_relaxed) + 1u), 97 buffering_mode_(buffering_mode), 98 buffer_start_(reinterpret_cast<uint8_t*>(buffer)), 99 buffer_end_(buffer_start_ + buffer_num_bytes), 100 header_(reinterpret_cast<trace_buffer_header*>(buffer)), 101 handler_(handler) { 102 ZX_DEBUG_ASSERT(buffer_num_bytes >= kMinPhysicalBufferSize); 103 ZX_DEBUG_ASSERT(buffer_num_bytes <= kMaxPhysicalBufferSize); 104 ZX_DEBUG_ASSERT(generation_ != 0u); 105 ComputeBufferSizes(); 106} 107 108trace_context::~trace_context() = default; 109 110uint64_t* trace_context::AllocRecord(size_t num_bytes) { 111 ZX_DEBUG_ASSERT((num_bytes & 7) == 0); 112 if (unlikely(num_bytes > TRACE_ENCODED_RECORD_MAX_LENGTH)) 113 return nullptr; 114 static_assert(TRACE_ENCODED_RECORD_MAX_LENGTH < kMaxRollingBufferSize, ""); 115 116 // For the circular and streaming cases, try at most once for each buffer. 117 // Note: Keep the normal case of one successful pass the fast path. 118 // E.g., We don't do a mode comparison unless we have to. 119 120 for (int iter = 0; iter < 2; ++iter) { 121 // TODO(dje): This can be optimized a bit. Later. 122 uint64_t offset_plus_counter = 123 rolling_buffer_current_.fetch_add(num_bytes, 124 fbl::memory_order_relaxed); 125 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter); 126 int buffer_number = GetBufferNumber(wrapped_count); 127 uint64_t buffer_offset = GetBufferOffset(offset_plus_counter); 128 // Note: There's no worry of an overflow in the calcs here. 129 if (likely(buffer_offset + num_bytes <= rolling_buffer_size_)) { 130 uint8_t* ptr = rolling_buffer_start_[buffer_number] + buffer_offset; 131 return reinterpret_cast<uint64_t*>(ptr); // success! 132 } 133 134 // Buffer is full! 135 136 switch (buffering_mode_) { 137 case TRACE_BUFFERING_MODE_ONESHOT: 138 ZX_DEBUG_ASSERT(iter == 0); 139 ZX_DEBUG_ASSERT(wrapped_count == 0); 140 ZX_DEBUG_ASSERT(buffer_number == 0); 141 MarkOneshotBufferFull(buffer_offset); 142 return nullptr; 143 case TRACE_BUFFERING_MODE_STREAMING: { 144 MarkRollingBufferFull(wrapped_count, buffer_offset); 145 // If the TraceManager is slow in saving buffers we could get 146 // here a lot. Do a quick check and early exit for this case. 147 if (unlikely(!IsOtherRollingBufferReady(buffer_number))) { 148 MarkRecordDropped(); 149 StreamingBufferFullCheck(wrapped_count, buffer_offset); 150 return nullptr; 151 } 152 break; 153 } 154 case TRACE_BUFFERING_MODE_CIRCULAR: 155 MarkRollingBufferFull(wrapped_count, buffer_offset); 156 break; 157 default: 158 __UNREACHABLE; 159 } 160 161 if (iter == 1) { 162 // Second time through. We tried one buffer, it was full. 163 // We then switched to the other buffer, which was empty at 164 // the time, and now it is full too. This is technically 165 // possible in either circular or streaming modes, but rare. 166 // There are two possibilities here: 167 // 1) Keep trying (gated by some means). 168 // 2) Drop the record. 169 // In order to not introduce excessive latency into the app 170 // we choose (2). To assist the developer we at least provide 171 // a record that this happened, but since it's rare we keep 172 // it simple and maintain just a global count and no time 173 // information. 174 num_records_dropped_after_buffer_switch_.fetch_add(1, fbl::memory_order_relaxed); 175 return nullptr; 176 } 177 178 if (!SwitchRollingBuffer(wrapped_count, buffer_offset)) { 179 MarkRecordDropped(); 180 return nullptr; 181 } 182 183 // Loop and try again. 184 } 185 186 __UNREACHABLE; 187} 188 189void trace_context::StreamingBufferFullCheck(uint32_t wrapped_count, 190 uint64_t buffer_offset) { 191 // We allow the current offset to grow and grow as each 192 // new tracing request is made: It's a trade-off to not penalize 193 // performance in this case. The number of counter bits is enough 194 // to not make this a concern: See the comments for 195 // |kUsableBufferOffsetBits|. 196 // 197 // As an absolute paranoia check, if the current buffer offset 198 // approaches overflow, grab the lock and snap the offset back 199 // to the end of the buffer. We grab the lock so that the 200 // buffer can't change while we're doing this. 201 if (unlikely(buffer_offset > MaxUsableBufferOffset())) { 202 fbl::AutoLock lock(&buffer_switch_mutex_); 203 uint32_t current_wrapped_count = CurrentWrappedCount(); 204 if (GetBufferNumber(current_wrapped_count) == 205 GetBufferNumber(wrapped_count)) { 206 SnapToEnd(wrapped_count); 207 } 208 } 209} 210 211// Returns false if there's some reason to not record this record. 212 213bool trace_context::SwitchRollingBuffer(uint32_t wrapped_count, 214 uint64_t buffer_offset) { 215 // While atomic variables are used to track things, we switch 216 // buffers under the lock due to multiple pieces of state being 217 // changed. 218 fbl::AutoLock lock(&buffer_switch_mutex_); 219 220 // If the durable buffer happened to fill while we were waiting for 221 // the lock we're done. 222 if (unlikely(tracing_artificially_stopped_)) { 223 return false; 224 } 225 226 uint32_t current_wrapped_count = CurrentWrappedCount(); 227 // Anything allocated to the durable buffer after this point 228 // won't be for this buffer. This is racy, but all we need is 229 // some usable value for where the durable pointer is. 230 uint64_t durable_data_end = DurableBytesAllocated(); 231 232 ZX_DEBUG_ASSERT(wrapped_count <= current_wrapped_count); 233 if (likely(wrapped_count == current_wrapped_count)) { 234 // Haven't switched buffers yet. 235 if (buffering_mode_ == TRACE_BUFFERING_MODE_STREAMING) { 236 // Is the other buffer ready? 237 if (!IsOtherRollingBufferReady(GetBufferNumber(wrapped_count))) { 238 // Nope. There are two possibilities here: 239 // 1) Wait for the other buffer to be saved. 240 // 2) Start dropping records until the other buffer is 241 // saved. 242 // In order to not introduce excessive latency into the 243 // app we choose (2). To assist the developer we at 244 // least provide a record that indicates the window 245 // during which we dropped records. 246 // TODO(dje): Maybe have a future mode where we block 247 // until there's space. This is useful during some 248 // kinds of debugging: Something is going wrong and we 249 // care less about performance and more about keeping 250 // data, and the dropped data may be the clue to find 251 // the bug. 252 return false; 253 } 254 255 SwitchRollingBufferLocked(wrapped_count, buffer_offset); 256 257 // Notify the handler so it starts saving the buffer if 258 // we're in streaming mode. 259 // Note: The actual notification must be done *after* 260 // updating the buffer header: we need trace_manager to 261 // see the updates. The handler will get notified on the 262 // engine's async loop (and thus can't call back into us 263 // while we still have the lock). 264 NotifyRollingBufferFullLocked(wrapped_count, durable_data_end); 265 } else { 266 SwitchRollingBufferLocked(wrapped_count, buffer_offset); 267 } 268 } else { 269 // Someone else switched buffers while we were trying to obtain 270 // the lock. Nothing to do here. 271 } 272 273 return true; 274} 275 276uint64_t* trace_context::AllocDurableRecord(size_t num_bytes) { 277 ZX_DEBUG_ASSERT(UsingDurableBuffer()); 278 ZX_DEBUG_ASSERT((num_bytes & 7) == 0); 279 280 uint64_t buffer_offset = 281 durable_buffer_current_.fetch_add(num_bytes, 282 fbl::memory_order_relaxed); 283 if (likely(buffer_offset + num_bytes <= durable_buffer_size_)) { 284 uint8_t* ptr = durable_buffer_start_ + buffer_offset; 285 return reinterpret_cast<uint64_t*>(ptr); // success! 286 } 287 288 // Buffer is full! 289 MarkDurableBufferFull(buffer_offset); 290 291 return nullptr; 292} 293 294bool trace_context::AllocThreadIndex(trace_thread_index_t* out_index) { 295 trace_thread_index_t index = next_thread_index_.fetch_add(1u, fbl::memory_order_relaxed); 296 if (unlikely(index > TRACE_ENCODED_THREAD_REF_MAX_INDEX)) { 297 // Guard again possible wrapping. 298 next_thread_index_.store(TRACE_ENCODED_THREAD_REF_MAX_INDEX + 1u, 299 fbl::memory_order_relaxed); 300 return false; 301 } 302 *out_index = index; 303 return true; 304} 305 306bool trace_context::AllocStringIndex(trace_string_index_t* out_index) { 307 trace_string_index_t index = next_string_index_.fetch_add(1u, fbl::memory_order_relaxed); 308 if (unlikely(index > TRACE_ENCODED_STRING_REF_MAX_INDEX)) { 309 // Guard again possible wrapping. 310 next_string_index_.store(TRACE_ENCODED_STRING_REF_MAX_INDEX + 1u, 311 fbl::memory_order_relaxed); 312 return false; 313 } 314 *out_index = index; 315 return true; 316} 317 318void trace_context::ComputeBufferSizes() { 319 size_t full_buffer_size = buffer_end_ - buffer_start_; 320 ZX_DEBUG_ASSERT(full_buffer_size >= kMinPhysicalBufferSize); 321 ZX_DEBUG_ASSERT(full_buffer_size <= kMaxPhysicalBufferSize); 322 size_t header_size = sizeof(trace_buffer_header); 323 switch (buffering_mode_) { 324 case TRACE_BUFFERING_MODE_ONESHOT: 325 // Create one big buffer, where durable and non-durable records share 326 // the same buffer. There is no separate buffer for durable records. 327 durable_buffer_start_ = nullptr; 328 durable_buffer_size_ = 0; 329 rolling_buffer_start_[0] = buffer_start_ + header_size; 330 rolling_buffer_size_ = full_buffer_size - header_size; 331 // The second rolling buffer is not used. 332 rolling_buffer_start_[1] = nullptr; 333 break; 334 case TRACE_BUFFERING_MODE_CIRCULAR: 335 case TRACE_BUFFERING_MODE_STREAMING: { 336 // Rather than make things more complex on the user, at least for now, 337 // we choose the sizes of the durable and rolling buffers. 338 // Note: The durable buffer must have enough space for at least 339 // the initialization record. 340 // TODO(dje): The current choices are wip. 341 uint64_t avail = full_buffer_size - header_size; 342 uint64_t durable_buffer_size = GET_DURABLE_BUFFER_SIZE(avail); 343 if (durable_buffer_size > kMaxDurableBufferSize) 344 durable_buffer_size = kMaxDurableBufferSize; 345 // Further adjust |durable_buffer_size| to ensure all buffers are a 346 // multiple of 8. |full_buffer_size| is guaranteed by 347 // |trace_start_engine()| to be a multiple of 4096. We only assume 348 // header_size is a multiple of 8. In order for rolling_buffer_size 349 // to be a multiple of 8 we need (avail - durable_buffer_size) to be a 350 // multiple of 16. Round durable_buffer_size up as necessary. 351 uint64_t off_by = (avail - durable_buffer_size) & 15; 352 ZX_DEBUG_ASSERT(off_by == 0 || off_by == 8); 353 durable_buffer_size += off_by; 354 ZX_DEBUG_ASSERT((durable_buffer_size & 7) == 0); 355 // The value of |kMinPhysicalBufferSize| ensures this: 356 ZX_DEBUG_ASSERT(durable_buffer_size >= kMinDurableBufferSize); 357 uint64_t rolling_buffer_size = (avail - durable_buffer_size) / 2; 358 ZX_DEBUG_ASSERT((rolling_buffer_size & 7) == 0); 359 // We need to maintain the invariant that the entire buffer is used. 360 // This works if the buffer size is a multiple of 361 // sizeof(trace_buffer_header), which is true since the buffer is a 362 // vmo (some number of 4K pages). 363 ZX_DEBUG_ASSERT(durable_buffer_size + 2 * rolling_buffer_size == avail); 364 durable_buffer_start_ = buffer_start_ + header_size; 365 durable_buffer_size_ = durable_buffer_size; 366 rolling_buffer_start_[0] = durable_buffer_start_ + durable_buffer_size_; 367 rolling_buffer_start_[1] = rolling_buffer_start_[0] + rolling_buffer_size; 368 rolling_buffer_size_ = rolling_buffer_size; 369 break; 370 } 371 default: 372 __UNREACHABLE; 373 } 374 375 durable_buffer_current_.store(0); 376 durable_buffer_full_mark_.store(0); 377 rolling_buffer_current_.store(0); 378 rolling_buffer_full_mark_[0].store(0); 379 rolling_buffer_full_mark_[1].store(0); 380} 381 382void trace_context::InitBufferHeader() { 383 memset(header_, 0, sizeof(*header_)); 384 385 header_->magic = TRACE_BUFFER_HEADER_MAGIC; 386 header_->version = TRACE_BUFFER_HEADER_V0; 387 header_->buffering_mode = static_cast<uint8_t>(buffering_mode_); 388 header_->total_size = buffer_end_ - buffer_start_; 389 header_->durable_buffer_size = durable_buffer_size_; 390 header_->rolling_buffer_size = rolling_buffer_size_; 391} 392 393void trace_context::UpdateBufferHeaderAfterStopped() { 394 // If the buffer filled, then the current pointer is "snapped" to the end. 395 // Therefore in that case we need to use the buffer_full_mark. 396 uint64_t durable_last_offset = durable_buffer_current_.load(fbl::memory_order_relaxed); 397 uint64_t durable_buffer_full_mark = durable_buffer_full_mark_.load(fbl::memory_order_relaxed); 398 if (durable_buffer_full_mark != 0) 399 durable_last_offset = durable_buffer_full_mark; 400 header_->durable_data_end = durable_last_offset; 401 402 uint64_t offset_plus_counter = 403 rolling_buffer_current_.load(fbl::memory_order_relaxed); 404 uint64_t last_offset = GetBufferOffset(offset_plus_counter); 405 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter); 406 header_->wrapped_count = wrapped_count; 407 int buffer_number = GetBufferNumber(wrapped_count); 408 uint64_t buffer_full_mark = rolling_buffer_full_mark_[buffer_number].load(fbl::memory_order_relaxed); 409 if (buffer_full_mark != 0) 410 last_offset = buffer_full_mark; 411 header_->rolling_data_end[buffer_number] = last_offset; 412 413 header_->num_records_dropped = num_records_dropped(); 414} 415 416size_t trace_context::RollingBytesAllocated() const { 417 switch (buffering_mode_) { 418 case TRACE_BUFFERING_MODE_ONESHOT: { 419 // There is a window during the processing of buffer-full where 420 // |rolling_buffer_current_| may point beyond the end of the buffer. 421 // This is ok, we don't promise anything better. 422 uint64_t full_bytes = rolling_buffer_full_mark_[0].load(fbl::memory_order_relaxed); 423 if (full_bytes != 0) 424 return full_bytes; 425 return rolling_buffer_current_.load(fbl::memory_order_relaxed); 426 } 427 case TRACE_BUFFERING_MODE_CIRCULAR: 428 case TRACE_BUFFERING_MODE_STREAMING: { 429 // Obtain the lock so that the buffers aren't switched on us while 430 // we're trying to compute the total. 431 fbl::AutoLock lock(&buffer_switch_mutex_); 432 uint64_t offset_plus_counter = 433 rolling_buffer_current_.load(fbl::memory_order_relaxed); 434 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter); 435 int buffer_number = GetBufferNumber(wrapped_count); 436 // Note: If we catch things at the point where the buffer has 437 // filled, but before we swap buffers, then |buffer_offset| can point 438 // beyond the end. This is ok, we don't promise anything better. 439 uint64_t buffer_offset = GetBufferOffset(offset_plus_counter); 440 if (wrapped_count == 0) 441 return buffer_offset; 442 // We've wrapped at least once, so the other buffer's "full mark" 443 // must be set. However, it may be zero if streaming and we happened 444 // to stop at a point where the buffer was saved, and hasn't 445 // subsequently been written to. 446 uint64_t full_mark_other_buffer = rolling_buffer_full_mark_[!buffer_number].load(fbl::memory_order_relaxed); 447 return full_mark_other_buffer + buffer_offset; 448 } 449 default: 450 __UNREACHABLE; 451 } 452} 453 454size_t trace_context::DurableBytesAllocated() const { 455 // Note: This will return zero in oneshot mode (as it should). 456 uint64_t offset = durable_buffer_full_mark_.load(fbl::memory_order_relaxed); 457 if (offset == 0) 458 offset = durable_buffer_current_.load(fbl::memory_order_relaxed); 459 return offset; 460} 461 462void trace_context::MarkDurableBufferFull(uint64_t last_offset) { 463 // Snap to the endpoint to reduce likelihood of pointer wrap-around. 464 // Otherwise each new attempt fill continually increase the offset. 465 durable_buffer_current_.store(reinterpret_cast<uint64_t>(durable_buffer_size_), 466 fbl::memory_order_relaxed); 467 468 // Mark the end point if not already marked. 469 uintptr_t expected_mark = 0u; 470 if (durable_buffer_full_mark_.compare_exchange_strong( 471 &expected_mark, last_offset, 472 fbl::memory_order_relaxed, fbl::memory_order_relaxed)) { 473 fprintf(stderr, "TraceEngine: durable buffer full @offset %" PRIu64 "\n", 474 last_offset); 475 header_->durable_data_end = last_offset; 476 477 // A record may be written that relies on this durable record. 478 // To preserve data integrity, we disable all further tracing. 479 // There is a small window where a non-durable record could get 480 // emitted that depends on this durable record. It's rare 481 // enough and inconsequential enough that we ignore it. 482 // TODO(dje): Another possibility is we could let tracing 483 // continue and start allocating future durable records in the 484 // rolling buffers, and accept potentially lost durable 485 // records. Another possibility is to remove the durable buffer, 486 // and, say, have separate caches for each rolling buffer. 487 MarkTracingArtificiallyStopped(); 488 } 489} 490 491void trace_context::MarkOneshotBufferFull(uint64_t last_offset) { 492 SnapToEnd(0); 493 494 // Mark the end point if not already marked. 495 uintptr_t expected_mark = 0u; 496 if (rolling_buffer_full_mark_[0].compare_exchange_strong( 497 &expected_mark, last_offset, 498 fbl::memory_order_relaxed, fbl::memory_order_relaxed)) { 499 header_->rolling_data_end[0] = last_offset; 500 } 501 502 MarkRecordDropped(); 503} 504 505void trace_context::MarkRollingBufferFull(uint32_t wrapped_count, uint64_t last_offset) { 506 // Mark the end point if not already marked. 507 int buffer_number = GetBufferNumber(wrapped_count); 508 uint64_t expected_mark = 0u; 509 if (rolling_buffer_full_mark_[buffer_number].compare_exchange_strong( 510 &expected_mark, last_offset, 511 fbl::memory_order_relaxed, fbl::memory_order_relaxed)) { 512 header_->rolling_data_end[buffer_number] = last_offset; 513 } 514} 515 516void trace_context::SwitchRollingBufferLocked(uint32_t prev_wrapped_count, 517 uint64_t prev_last_offset) { 518 // This has already done in streaming mode when the buffer was marked as 519 // saved, but hasn't been done yet for circular mode. KISS and just do it 520 // again. It's ok to do again as we don't resume allocating trace records 521 // until we update |rolling_buffer_current_|. 522 uint32_t new_wrapped_count = prev_wrapped_count + 1; 523 int next_buffer = GetBufferNumber(new_wrapped_count); 524 rolling_buffer_full_mark_[next_buffer].store(0, fbl::memory_order_relaxed); 525 header_->rolling_data_end[next_buffer] = 0; 526 527 // Do this last: After this tracing resumes in the new buffer. 528 uint64_t new_offset_plus_counter = MakeOffsetPlusCounter(0, new_wrapped_count); 529 rolling_buffer_current_.store(new_offset_plus_counter, 530 fbl::memory_order_relaxed); 531} 532 533void trace_context::MarkTracingArtificiallyStopped() { 534 // Grab the lock in part so that we don't switch buffers between 535 // |CurrentWrappedCount()| and |SnapToEnd()|. 536 fbl::AutoLock lock(&buffer_switch_mutex_); 537 538 // Disable tracing by making it look like the current rolling 539 // buffer is full. AllocRecord, on seeing the buffer is full, will 540 // then check |tracing_artificially_stopped_|. 541 tracing_artificially_stopped_ = true; 542 SnapToEnd(CurrentWrappedCount()); 543} 544 545void trace_context::NotifyRollingBufferFullLocked(uint32_t wrapped_count, 546 uint64_t durable_data_end) { 547 // The notification is handled on the engine's event loop as 548 // we need this done outside of the lock: Certain handlers 549 // (e.g., trace-benchmark) just want to immediately call 550 // |trace_engine_mark_buffer_saved()| which wants to reacquire 551 // the lock. Secondly, if we choose to wait until the buffer context is 552 // released before notifying the handler then we can't do so now as we 553 // still have a reference to the buffer context. 554 trace_engine_request_save_buffer(wrapped_count, durable_data_end); 555} 556 557void trace_context::HandleSaveRollingBufferRequest(uint32_t wrapped_count, 558 uint64_t durable_data_end) { 559 // TODO(dje): An open issue is solving the problem of TraceManager 560 // prematurely reading the buffer: We know the buffer is full, but 561 // the only way we know existing writers have completed is when 562 // they release their trace context. Fortunately we know when all 563 // context acquisitions for the purpose of writing to the buffer 564 // have been released. The question is how to use this info. 565 // For now we punt the problem to the handler. Ultimately we could 566 // provide callers with a way to wait, and have trace_release_context() 567 // check for waiters and if any are present send a signal like it does 568 // for SIGNAL_CONTEXT_RELEASED. 569 handler_->ops->notify_buffer_full(handler_, wrapped_count, 570 durable_data_end); 571} 572 573void trace_context::MarkRollingBufferSaved(uint32_t wrapped_count, 574 uint64_t durable_data_end) { 575 fbl::AutoLock lock(&buffer_switch_mutex_); 576 577 int buffer_number = GetBufferNumber(wrapped_count); 578 { 579 // TODO(dje): Manage bad responses from TraceManager. 580 int current_buffer_number = GetBufferNumber(GetWrappedCount(rolling_buffer_current_.load(fbl::memory_order_relaxed))); 581 ZX_DEBUG_ASSERT(buffer_number != current_buffer_number); 582 } 583 rolling_buffer_full_mark_[buffer_number].store(0, fbl::memory_order_relaxed); 584 header_->rolling_data_end[buffer_number] = 0; 585 // Don't update |rolling_buffer_current_| here, that is done when we 586 // successfully allocate the next record. Until then we want to keep 587 // dropping records. 588} 589