1// Copyright 2018 The Fuchsia Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <stdint.h> 6#include <string.h> 7#include <threads.h> 8#include <unistd.h> 9 10#include <cobalt-client/cpp/collector-internal.h> 11#include <cobalt-client/cpp/collector.h> 12#include <fbl/unique_ptr.h> 13#include <fbl/vector.h> 14#include <lib/sync/completion.h> 15#include <unittest/unittest.h> 16 17namespace cobalt_client { 18namespace internal { 19namespace { 20 21// Number of threads to spawn for multi threaded tests. 22constexpr size_t kThreads = 20; 23static_assert(kThreads % 2 == 0, "Use even number of threads for simplcity"); 24 25// Number of times to perform an operation in a given thread. 26constexpr size_t kOperations = 50; 27 28// Fake storage used by our FakeLogger. 29template <typename T> 30class FakeStorage { 31public: 32 T* GetOrNull(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index) { 33 size_t index = 0; 34 if (!Find(metric_id, event_type, event_type_index, &index)) { 35 return nullptr; 36 } 37 return entries_[index].data.get(); 38 }; 39 40 void InsertOrUpdateEntry(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index, 41 const fbl::Function<void(fbl::unique_ptr<T>*)>& update) { 42 size_t index = 0; 43 if (!Find(metric_id, event_type, event_type_index, &index)) { 44 entries_.push_back({.metric_id = metric_id, 45 .event_type = event_type, 46 .event_type_index = event_type_index, 47 .data = nullptr}); 48 index = entries_.size() - 1; 49 } 50 update(&entries_[index].data); 51 } 52 53private: 54 bool Find(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index, 55 size_t* index) const { 56 *index = 0; 57 for (auto& entry : entries_) { 58 if (entry.metric_id == metric_id && entry.event_type == event_type && 59 entry.event_type_index == event_type_index) { 60 return true; 61 } 62 ++(*index); 63 } 64 return false; 65 } 66 67 // Help to identify event data logged. 68 struct Entry { 69 uint64_t metric_id; 70 uint32_t event_type; 71 uint32_t event_type_index; 72 fbl::unique_ptr<T> data; 73 }; 74 fbl::Vector<Entry> entries_; 75}; 76 77// Logger for to verify that the Collector behavior is correct. 78class TestLogger : public Logger { 79public: 80 TestLogger(FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters) 81 : histograms_(histograms), counters_(counters), fail_(false) {} 82 TestLogger(const TestLogger&) = delete; 83 TestLogger(TestLogger&&) = delete; 84 TestLogger& operator=(const TestLogger&) = delete; 85 TestLogger& operator=(TestLogger&&) = delete; 86 ~TestLogger() override = default; 87 88 // Returns true if the histogram was persisted. 89 bool Log(uint64_t metric_id, const RemoteHistogram::EventBuffer& histogram) override { 90 if (!fail_.load()) { 91 histograms_->InsertOrUpdateEntry( 92 metric_id, histogram.metadata()[0].event_type, 93 histogram.metadata()[0].event_type_index, 94 [&histogram](fbl::unique_ptr<BaseHistogram>* persisted) { 95 if (*persisted == nullptr) { 96 persisted->reset(new BaseHistogram( 97 static_cast<uint32_t>(histogram.event_data().count()))); 98 } 99 for (auto& bucket : histogram.event_data()) { 100 (*persisted)->IncrementCount(bucket.index, bucket.count); 101 } 102 }); 103 } 104 return !fail_.load(); 105 } 106 107 // Returns true if the counter was persisted. 108 bool Log(uint64_t metric_id, const RemoteCounter::EventBuffer& counter) override { 109 if (!fail_.load()) { 110 counters_->InsertOrUpdateEntry(metric_id, counter.metadata()[0].event_type, 111 counter.metadata()[0].event_type_index, 112 [&counter](fbl::unique_ptr<BaseCounter>* persisted) { 113 if (*persisted == nullptr) { 114 persisted->reset(new BaseCounter()); 115 } 116 (*persisted)->Increment(counter.event_data()); 117 }); 118 } 119 return !fail_.load(); 120 } 121 122 void set_fail(bool should_fail) { fail_.store(should_fail); } 123 124private: 125 FakeStorage<BaseHistogram>* histograms_; 126 FakeStorage<BaseCounter>* counters_; 127 fbl::atomic<bool> fail_; 128}; 129 130Collector MakeCollector(size_t max_histograms, size_t max_counters, 131 FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters, 132 TestLogger** test_logger = nullptr) { 133 fbl::unique_ptr<TestLogger> logger = fbl::make_unique<TestLogger>(histograms, counters); 134 CollectorOptions options; 135 options.max_counters = max_counters; 136 options.max_histograms = max_histograms; 137 138 if (test_logger != nullptr) { 139 *test_logger = logger.get(); 140 } 141 142 return fbl::move(Collector(options, fbl::move(logger))); 143} 144 145HistogramOptions MakeOptions() { 146 // | .....| ....| ...| .... | 147 // -inf -2 0 2 +inf 148 HistogramOptions options = 149 HistogramOptions::Linear(/*bucket_count=*/2, /*scalar=*/2, /*offset=*/-2); 150 return fbl::move(options); 151} 152 153bool AddCounterTest() { 154 BEGIN_TEST; 155 FakeStorage<BaseHistogram> histograms; 156 FakeStorage<BaseCounter> counters; 157 Collector collector = 158 MakeCollector(/*max_histograms=*/0, /*max_counters=*/1, &histograms, &counters); 159 auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1); 160 counter.Increment(5); 161 ASSERT_EQ(counter.GetRemoteCount(), 5); 162 END_TEST; 163} 164 165// Sanity check that different counters do not interfere with each other. 166bool AddCounterMultipleTest() { 167 BEGIN_TEST; 168 FakeStorage<BaseHistogram> histograms; 169 FakeStorage<BaseCounter> counters; 170 Collector collector = 171 MakeCollector(/*max_histograms=*/0, /*max_counters=*/3, &histograms, &counters); 172 auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1); 173 auto counter_2 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/2); 174 auto counter_3 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/3); 175 counter.Increment(5); 176 counter_2.Increment(3); 177 counter_3.Increment(2); 178 ASSERT_EQ(counter.GetRemoteCount(), 5); 179 ASSERT_EQ(counter_2.GetRemoteCount(), 3); 180 ASSERT_EQ(counter_3.GetRemoteCount(), 2); 181 END_TEST; 182} 183 184bool AddHistogramTest() { 185 BEGIN_TEST; 186 FakeStorage<BaseHistogram> histograms; 187 FakeStorage<BaseCounter> counters; 188 Collector collector = 189 MakeCollector(/*max_histograms=*/1, /*max_counters=*/0, &histograms, &counters); 190 auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions()); 191 histogram.Add(-4, 2); 192 ASSERT_EQ(histogram.GetRemoteCount(-4), 2); 193 END_TEST; 194} 195 196// Sanity check that different histograms do not interfere with each other. 197bool AddHistogramMultipleTest() { 198 BEGIN_TEST; 199 FakeStorage<BaseHistogram> histograms; 200 FakeStorage<BaseCounter> counters; 201 Collector collector = 202 MakeCollector(/*max_histograms=*/3, /*max_counters=*/0, &histograms, &counters); 203 auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions()); 204 auto histogram_2 = 205 collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, MakeOptions()); 206 auto histogram_3 = 207 collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 3, MakeOptions()); 208 histogram.Add(-4, 2); 209 histogram_2.Add(-1, 3); 210 histogram_3.Add(1, 4); 211 EXPECT_EQ(histogram.GetRemoteCount(-4), 2); 212 EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3); 213 EXPECT_EQ(histogram_3.GetRemoteCount(1), 4); 214 END_TEST; 215} 216 217// Verify that flushed data matches the logged data. This means that the FakeStorage has the right 218// values for the right metric and event_type_index. 219bool FlushTest() { 220 BEGIN_TEST; 221 FakeStorage<BaseHistogram> histograms; 222 FakeStorage<BaseCounter> counters; 223 HistogramOptions options = MakeOptions(); 224 Collector collector = 225 MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters); 226 auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options); 227 auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options); 228 auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1); 229 auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2); 230 231 histogram.Add(-4, 2); 232 histogram_2.Add(-1, 3); 233 counter.Increment(5); 234 counter_2.Increment(3); 235 236 collector.Flush(); 237 238 // Verify reset of local data. 239 EXPECT_EQ(histogram.GetRemoteCount(-4), 0); 240 EXPECT_EQ(histogram_2.GetRemoteCount(-1), 0); 241 EXPECT_EQ(counter.GetRemoteCount(), 0); 242 EXPECT_EQ(counter_2.GetRemoteCount(), 0); 243 244 // Verify 'persisted' data matches what the local data used to be. 245 // Note: for now event_type is 0 for all metrics. 246 247 // -4 goes to underflow bucket(0) 248 EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1) 249 ->GetCount(options.map_fn(-4, options)), 250 2); 251 252 // -1 goes to first non underflow bucket(1) 253 EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 3); 254 255 EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5); 256 EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 3); 257 END_TEST; 258} 259 260// Verify that when the logger fails to persist data, the flushed values are restored. 261bool FlushFailTest() { 262 BEGIN_TEST; 263 FakeStorage<BaseHistogram> histograms; 264 FakeStorage<BaseCounter> counters; 265 TestLogger* logger; 266 HistogramOptions options = MakeOptions(); 267 Collector collector = 268 MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters, &logger); 269 auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options); 270 auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options); 271 auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1); 272 auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2); 273 274 histogram.Add(-4, 2); 275 counter.Increment(5); 276 collector.Flush(); 277 logger->set_fail(/*should_fail=*/true); 278 279 histogram_2.Add(-1, 3); 280 counter_2.Increment(3); 281 282 collector.Flush(); 283 284 // Verify reset of local data. 285 EXPECT_EQ(histogram.GetRemoteCount(-4), 0); 286 EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3); 287 EXPECT_EQ(counter.GetRemoteCount(), 0); 288 EXPECT_EQ(counter_2.GetRemoteCount(), 3); 289 290 // Verify 'persisted' data matches what the local data used to be. 291 // Note: for now event_type is 0 for all metrics. 292 293 // -4 goes to underflow bucket(0) 294 EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1) 295 ->GetCount(options.map_fn(-4, options)), 296 2); 297 298 // -1 goes to first non underflow bucket(1), and its expected to be 0 because the logger failed. 299 EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 0); 300 301 EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5); 302 303 // Expected to be 0, because the logger failed. 304 EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 0); 305 END_TEST; 306} 307 308// All histograms have the same shape bucket for simplicity, 309// and we either operate on even or odd buckets. 310struct ObserveFnArgs { 311 // List of histograms to operate on. 312 fbl::Vector<Histogram> histograms; 313 314 // List of counters to operate on. 315 fbl::Vector<Counter> counters; 316 317 // Number of observations to register. 318 size_t count; 319 320 // Notify the thread when to start executing. 321 sync_completion_t* start; 322}; 323 324int ObserveFn(void* vargs) { 325 ObserveFnArgs* args = reinterpret_cast<ObserveFnArgs*>(vargs); 326 static HistogramOptions options = MakeOptions(); 327 sync_completion_wait(args->start, zx::sec(20).get()); 328 size_t curr = 0; 329 for (auto& hist : args->histograms) { 330 for (size_t bucket_index = 0; bucket_index < options.bucket_count + 2; ++bucket_index) { 331 for (size_t i = 0; i < args->count; ++i) { 332 hist.Add(options.reverse_map_fn(static_cast<uint32_t>(bucket_index), options), 333 curr + bucket_index); 334 } 335 } 336 ++curr; 337 } 338 339 curr = 0; 340 for (auto& counter : args->counters) { 341 for (size_t i = 0; i < args->count; ++i) { 342 counter.Increment(curr); 343 } 344 ++curr; 345 } 346 return thrd_success; 347} 348 349struct FlushFnArgs { 350 // Target collector to be flushed. 351 Collector* collector; 352 353 // Number of times to flush. 354 size_t count; 355 356 // Notify thread start. 357 sync_completion_t* start; 358}; 359 360int FlushFn(void* vargs) { 361 FlushFnArgs* args = reinterpret_cast<FlushFnArgs*>(vargs); 362 363 sync_completion_wait(args->start, zx::sec(20).get()); 364 for (size_t i = 0; i < args->count; ++i) { 365 args->collector->Flush(); 366 } 367 368 return thrd_success; 369} 370 371// Verify that if we flush while the histograms and counters are being updated, 372// no data is lost, meaning that the sum of the persisted data and the local data 373// is equal to the expected value. 374template <bool should_fail> 375bool FlushMultithreadTest() { 376 BEGIN_TEST; 377 FakeStorage<BaseHistogram> histograms; 378 FakeStorage<BaseCounter> counters; 379 HistogramOptions options = MakeOptions(); 380 sync_completion_t start; 381 382 ObserveFnArgs observe_args; 383 observe_args.start = &start; 384 observe_args.count = kOperations; 385 TestLogger* logger; 386 387 Collector collector = 388 MakeCollector(/*max_histograms=*/9, /*max_counters=*/9, &histograms, &counters, &logger); 389 390 for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) { 391 for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) { 392 observe_args.histograms.push_back( 393 collector.AddHistogram(2 * metric_id, event_type_index, options)); 394 observe_args.counters.push_back( 395 collector.AddCounter(2 * metric_id + 1, event_type_index)); 396 } 397 } 398 // Add empty entries to the fake storage. 399 collector.Flush(); 400 // Set the logger to either fail to persist or succeed. 401 logger->set_fail(should_fail); 402 403 FlushFnArgs flush_args; 404 flush_args.collector = &collector; 405 flush_args.count = kOperations; 406 flush_args.start = &start; 407 408 fbl::Vector<thrd_t> thread_ids; 409 410 thread_ids.reserve(kThreads); 411 for (size_t i = 0; i < kThreads; ++i) { 412 thrd_t thread_id; 413 if (i % 2 == 0) { 414 thrd_create(&thread_id, &ObserveFn, &observe_args); 415 } else { 416 thrd_create(&thread_id, &FlushFn, &flush_args); 417 } 418 thread_ids.push_back(thread_id); 419 } 420 421 // Start all threads. 422 sync_completion_signal(&start); 423 424 for (auto thread_id : thread_ids) { 425 ASSERT_EQ(thrd_join(thread_id, nullptr), thrd_success); 426 } 427 428 // Verify that all histograms buckets and counters have exactly |kOperations| * |kThreads| / 429 // 2 count. 430 constexpr size_t target_count = kThreads * kOperations / 2; 431 for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) { 432 for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) { 433 size_t index = 3 * metric_id + event_type_index; 434 auto* tmp_hist = histograms.GetOrNull(2 * metric_id, 0, event_type_index); 435 // Each bucket is increased |index| + |i| for each thread recording observations. 436 for (uint32_t i = 0; i < 4; ++i) { 437 ASSERT_TRUE(tmp_hist != nullptr); 438 EXPECT_EQ(tmp_hist->GetCount(i) + observe_args.histograms[index].GetRemoteCount( 439 options.reverse_map_fn(i, options)), 440 target_count * (i + index)); 441 } 442 443 auto* tmp_counter = counters.GetOrNull(2 * metric_id + 1, 0, event_type_index); 444 ASSERT_TRUE(tmp_counter != nullptr); 445 // Each counter is increased by |index| for each thread recording observations. 446 EXPECT_EQ(tmp_counter->Load() + observe_args.counters[index].GetRemoteCount(), 447 target_count * index); 448 } 449 } 450 END_TEST; 451} 452 453BEGIN_TEST_CASE(CollectorTest) 454RUN_TEST(AddCounterTest) 455RUN_TEST(AddCounterMultipleTest) 456RUN_TEST(AddHistogramTest) 457RUN_TEST(AddHistogramMultipleTest) 458RUN_TEST(FlushTest) 459RUN_TEST(FlushFailTest) 460RUN_TEST_LARGE(FlushMultithreadTest<false>) 461RUN_TEST_LARGE(FlushMultithreadTest<true>) 462END_TEST_CASE(CollectorTest) 463 464} // namespace 465} // namespace internal 466} // namespace cobalt_client 467