1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <stdint.h>
6#include <string.h>
7#include <threads.h>
8#include <unistd.h>
9
10#include <cobalt-client/cpp/collector-internal.h>
11#include <cobalt-client/cpp/collector.h>
12#include <fbl/unique_ptr.h>
13#include <fbl/vector.h>
14#include <lib/sync/completion.h>
15#include <unittest/unittest.h>
16
17namespace cobalt_client {
18namespace internal {
19namespace {
20
21// Number of threads to spawn for multi threaded tests.
22constexpr size_t kThreads = 20;
23static_assert(kThreads % 2 == 0, "Use even number of threads for simplcity");
24
25// Number of times to perform an operation in a given thread.
26constexpr size_t kOperations = 50;
27
28// Fake storage used by our FakeLogger.
29template <typename T>
30class FakeStorage {
31public:
32    T* GetOrNull(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index) {
33        size_t index = 0;
34        if (!Find(metric_id, event_type, event_type_index, &index)) {
35            return nullptr;
36        }
37        return entries_[index].data.get();
38    };
39
40    void InsertOrUpdateEntry(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index,
41                             const fbl::Function<void(fbl::unique_ptr<T>*)>& update) {
42        size_t index = 0;
43        if (!Find(metric_id, event_type, event_type_index, &index)) {
44            entries_.push_back({.metric_id = metric_id,
45                                .event_type = event_type,
46                                .event_type_index = event_type_index,
47                                .data = nullptr});
48            index = entries_.size() - 1;
49        }
50        update(&entries_[index].data);
51    }
52
53private:
54    bool Find(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index,
55              size_t* index) const {
56        *index = 0;
57        for (auto& entry : entries_) {
58            if (entry.metric_id == metric_id && entry.event_type == event_type &&
59                entry.event_type_index == event_type_index) {
60                return true;
61            }
62            ++(*index);
63        }
64        return false;
65    }
66
67    // Help to identify event data logged.
68    struct Entry {
69        uint64_t metric_id;
70        uint32_t event_type;
71        uint32_t event_type_index;
72        fbl::unique_ptr<T> data;
73    };
74    fbl::Vector<Entry> entries_;
75};
76
77// Logger for to verify that the Collector behavior is correct.
78class TestLogger : public Logger {
79public:
80    TestLogger(FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters)
81        : histograms_(histograms), counters_(counters), fail_(false) {}
82    TestLogger(const TestLogger&) = delete;
83    TestLogger(TestLogger&&) = delete;
84    TestLogger& operator=(const TestLogger&) = delete;
85    TestLogger& operator=(TestLogger&&) = delete;
86    ~TestLogger() override = default;
87
88    // Returns true if the histogram was persisted.
89    bool Log(uint64_t metric_id, const RemoteHistogram::EventBuffer& histogram) override {
90        if (!fail_.load()) {
91            histograms_->InsertOrUpdateEntry(
92                metric_id, histogram.metadata()[0].event_type,
93                histogram.metadata()[0].event_type_index,
94                [&histogram](fbl::unique_ptr<BaseHistogram>* persisted) {
95                    if (*persisted == nullptr) {
96                        persisted->reset(new BaseHistogram(
97                            static_cast<uint32_t>(histogram.event_data().count())));
98                    }
99                    for (auto& bucket : histogram.event_data()) {
100                        (*persisted)->IncrementCount(bucket.index, bucket.count);
101                    }
102                });
103        }
104        return !fail_.load();
105    }
106
107    // Returns true if the counter was persisted.
108    bool Log(uint64_t metric_id, const RemoteCounter::EventBuffer& counter) override {
109        if (!fail_.load()) {
110            counters_->InsertOrUpdateEntry(metric_id, counter.metadata()[0].event_type,
111                                           counter.metadata()[0].event_type_index,
112                                           [&counter](fbl::unique_ptr<BaseCounter>* persisted) {
113                                               if (*persisted == nullptr) {
114                                                   persisted->reset(new BaseCounter());
115                                               }
116                                               (*persisted)->Increment(counter.event_data());
117                                           });
118        }
119        return !fail_.load();
120    }
121
122    void set_fail(bool should_fail) { fail_.store(should_fail); }
123
124private:
125    FakeStorage<BaseHistogram>* histograms_;
126    FakeStorage<BaseCounter>* counters_;
127    fbl::atomic<bool> fail_;
128};
129
130Collector MakeCollector(size_t max_histograms, size_t max_counters,
131                        FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters,
132                        TestLogger** test_logger = nullptr) {
133    fbl::unique_ptr<TestLogger> logger = fbl::make_unique<TestLogger>(histograms, counters);
134    CollectorOptions options;
135    options.max_counters = max_counters;
136    options.max_histograms = max_histograms;
137
138    if (test_logger != nullptr) {
139        *test_logger = logger.get();
140    }
141
142    return fbl::move(Collector(options, fbl::move(logger)));
143}
144
145HistogramOptions MakeOptions() {
146    // | .....| ....| ...| .... |
147    // -inf  -2     0    2    +inf
148    HistogramOptions options =
149        HistogramOptions::Linear(/*bucket_count=*/2, /*scalar=*/2, /*offset=*/-2);
150    return fbl::move(options);
151}
152
153bool AddCounterTest() {
154    BEGIN_TEST;
155    FakeStorage<BaseHistogram> histograms;
156    FakeStorage<BaseCounter> counters;
157    Collector collector =
158        MakeCollector(/*max_histograms=*/0, /*max_counters=*/1, &histograms, &counters);
159    auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1);
160    counter.Increment(5);
161    ASSERT_EQ(counter.GetRemoteCount(), 5);
162    END_TEST;
163}
164
165// Sanity check that different counters do not interfere with each other.
166bool AddCounterMultipleTest() {
167    BEGIN_TEST;
168    FakeStorage<BaseHistogram> histograms;
169    FakeStorage<BaseCounter> counters;
170    Collector collector =
171        MakeCollector(/*max_histograms=*/0, /*max_counters=*/3, &histograms, &counters);
172    auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1);
173    auto counter_2 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/2);
174    auto counter_3 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/3);
175    counter.Increment(5);
176    counter_2.Increment(3);
177    counter_3.Increment(2);
178    ASSERT_EQ(counter.GetRemoteCount(), 5);
179    ASSERT_EQ(counter_2.GetRemoteCount(), 3);
180    ASSERT_EQ(counter_3.GetRemoteCount(), 2);
181    END_TEST;
182}
183
184bool AddHistogramTest() {
185    BEGIN_TEST;
186    FakeStorage<BaseHistogram> histograms;
187    FakeStorage<BaseCounter> counters;
188    Collector collector =
189        MakeCollector(/*max_histograms=*/1, /*max_counters=*/0, &histograms, &counters);
190    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions());
191    histogram.Add(-4, 2);
192    ASSERT_EQ(histogram.GetRemoteCount(-4), 2);
193    END_TEST;
194}
195
196// Sanity check that different histograms do not interfere with each other.
197bool AddHistogramMultipleTest() {
198    BEGIN_TEST;
199    FakeStorage<BaseHistogram> histograms;
200    FakeStorage<BaseCounter> counters;
201    Collector collector =
202        MakeCollector(/*max_histograms=*/3, /*max_counters=*/0, &histograms, &counters);
203    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions());
204    auto histogram_2 =
205        collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, MakeOptions());
206    auto histogram_3 =
207        collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 3, MakeOptions());
208    histogram.Add(-4, 2);
209    histogram_2.Add(-1, 3);
210    histogram_3.Add(1, 4);
211    EXPECT_EQ(histogram.GetRemoteCount(-4), 2);
212    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3);
213    EXPECT_EQ(histogram_3.GetRemoteCount(1), 4);
214    END_TEST;
215}
216
217// Verify that flushed data matches the logged data. This means that the FakeStorage has the right
218// values for the right metric and event_type_index.
219bool FlushTest() {
220    BEGIN_TEST;
221    FakeStorage<BaseHistogram> histograms;
222    FakeStorage<BaseCounter> counters;
223    HistogramOptions options = MakeOptions();
224    Collector collector =
225        MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters);
226    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options);
227    auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options);
228    auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1);
229    auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2);
230
231    histogram.Add(-4, 2);
232    histogram_2.Add(-1, 3);
233    counter.Increment(5);
234    counter_2.Increment(3);
235
236    collector.Flush();
237
238    // Verify reset of local data.
239    EXPECT_EQ(histogram.GetRemoteCount(-4), 0);
240    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 0);
241    EXPECT_EQ(counter.GetRemoteCount(), 0);
242    EXPECT_EQ(counter_2.GetRemoteCount(), 0);
243
244    // Verify 'persisted' data matches what the local data used to be.
245    // Note: for now event_type is 0 for all metrics.
246
247    // -4 goes to underflow bucket(0)
248    EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1)
249                  ->GetCount(options.map_fn(-4, options)),
250              2);
251
252    // -1 goes to first non underflow bucket(1)
253    EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 3);
254
255    EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5);
256    EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 3);
257    END_TEST;
258}
259
260// Verify that when the logger fails to persist data, the flushed values are restored.
261bool FlushFailTest() {
262    BEGIN_TEST;
263    FakeStorage<BaseHistogram> histograms;
264    FakeStorage<BaseCounter> counters;
265    TestLogger* logger;
266    HistogramOptions options = MakeOptions();
267    Collector collector =
268        MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters, &logger);
269    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options);
270    auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options);
271    auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1);
272    auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2);
273
274    histogram.Add(-4, 2);
275    counter.Increment(5);
276    collector.Flush();
277    logger->set_fail(/*should_fail=*/true);
278
279    histogram_2.Add(-1, 3);
280    counter_2.Increment(3);
281
282    collector.Flush();
283
284    // Verify reset of local data.
285    EXPECT_EQ(histogram.GetRemoteCount(-4), 0);
286    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3);
287    EXPECT_EQ(counter.GetRemoteCount(), 0);
288    EXPECT_EQ(counter_2.GetRemoteCount(), 3);
289
290    // Verify 'persisted' data matches what the local data used to be.
291    // Note: for now event_type is 0 for all metrics.
292
293    // -4 goes to underflow bucket(0)
294    EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1)
295                  ->GetCount(options.map_fn(-4, options)),
296              2);
297
298    // -1 goes to first non underflow bucket(1), and its expected to be 0 because the logger failed.
299    EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 0);
300
301    EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5);
302
303    // Expected to be 0, because the logger failed.
304    EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 0);
305    END_TEST;
306}
307
308// All histograms have the same shape bucket for simplicity,
309// and we either operate on even or odd buckets.
310struct ObserveFnArgs {
311    // List of histograms to operate on.
312    fbl::Vector<Histogram> histograms;
313
314    // List of counters to operate on.
315    fbl::Vector<Counter> counters;
316
317    // Number of observations to register.
318    size_t count;
319
320    // Notify the thread when to start executing.
321    sync_completion_t* start;
322};
323
324int ObserveFn(void* vargs) {
325    ObserveFnArgs* args = reinterpret_cast<ObserveFnArgs*>(vargs);
326    static HistogramOptions options = MakeOptions();
327    sync_completion_wait(args->start, zx::sec(20).get());
328    size_t curr = 0;
329    for (auto& hist : args->histograms) {
330        for (size_t bucket_index = 0; bucket_index < options.bucket_count + 2; ++bucket_index) {
331            for (size_t i = 0; i < args->count; ++i) {
332                hist.Add(options.reverse_map_fn(static_cast<uint32_t>(bucket_index), options),
333                         curr + bucket_index);
334            }
335        }
336        ++curr;
337    }
338
339    curr = 0;
340    for (auto& counter : args->counters) {
341        for (size_t i = 0; i < args->count; ++i) {
342            counter.Increment(curr);
343        }
344        ++curr;
345    }
346    return thrd_success;
347}
348
349struct FlushFnArgs {
350    // Target collector to be flushed.
351    Collector* collector;
352
353    // Number of times to flush.
354    size_t count;
355
356    // Notify thread start.
357    sync_completion_t* start;
358};
359
360int FlushFn(void* vargs) {
361    FlushFnArgs* args = reinterpret_cast<FlushFnArgs*>(vargs);
362
363    sync_completion_wait(args->start, zx::sec(20).get());
364    for (size_t i = 0; i < args->count; ++i) {
365        args->collector->Flush();
366    }
367
368    return thrd_success;
369}
370
371// Verify that if we flush while the histograms and counters are being updated,
372// no data is lost, meaning that the sum of the persisted data and the local data
373// is equal to the expected value.
374template <bool should_fail>
375bool FlushMultithreadTest() {
376    BEGIN_TEST;
377    FakeStorage<BaseHistogram> histograms;
378    FakeStorage<BaseCounter> counters;
379    HistogramOptions options = MakeOptions();
380    sync_completion_t start;
381
382    ObserveFnArgs observe_args;
383    observe_args.start = &start;
384    observe_args.count = kOperations;
385    TestLogger* logger;
386
387    Collector collector =
388        MakeCollector(/*max_histograms=*/9, /*max_counters=*/9, &histograms, &counters, &logger);
389
390    for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) {
391        for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) {
392            observe_args.histograms.push_back(
393                collector.AddHistogram(2 * metric_id, event_type_index, options));
394            observe_args.counters.push_back(
395                collector.AddCounter(2 * metric_id + 1, event_type_index));
396        }
397    }
398    // Add empty entries to the fake storage.
399    collector.Flush();
400    // Set the logger to either fail to persist or succeed.
401    logger->set_fail(should_fail);
402
403    FlushFnArgs flush_args;
404    flush_args.collector = &collector;
405    flush_args.count = kOperations;
406    flush_args.start = &start;
407
408    fbl::Vector<thrd_t> thread_ids;
409
410    thread_ids.reserve(kThreads);
411    for (size_t i = 0; i < kThreads; ++i) {
412        thrd_t thread_id;
413        if (i % 2 == 0) {
414            thrd_create(&thread_id, &ObserveFn, &observe_args);
415        } else {
416            thrd_create(&thread_id, &FlushFn, &flush_args);
417        }
418        thread_ids.push_back(thread_id);
419    }
420
421    // Start all threads.
422    sync_completion_signal(&start);
423
424    for (auto thread_id : thread_ids) {
425        ASSERT_EQ(thrd_join(thread_id, nullptr), thrd_success);
426    }
427
428    // Verify that all histograms buckets and counters have exactly |kOperations| * |kThreads| /
429    // 2 count.
430    constexpr size_t target_count = kThreads * kOperations / 2;
431    for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) {
432        for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) {
433            size_t index = 3 * metric_id + event_type_index;
434            auto* tmp_hist = histograms.GetOrNull(2 * metric_id, 0, event_type_index);
435            // Each bucket is increased |index| + |i| for each thread recording observations.
436            for (uint32_t i = 0; i < 4; ++i) {
437                ASSERT_TRUE(tmp_hist != nullptr);
438                EXPECT_EQ(tmp_hist->GetCount(i) + observe_args.histograms[index].GetRemoteCount(
439                                                      options.reverse_map_fn(i, options)),
440                          target_count * (i + index));
441            }
442
443            auto* tmp_counter = counters.GetOrNull(2 * metric_id + 1, 0, event_type_index);
444            ASSERT_TRUE(tmp_counter != nullptr);
445            // Each counter is increased by |index| for each thread recording observations.
446            EXPECT_EQ(tmp_counter->Load() + observe_args.counters[index].GetRemoteCount(),
447                      target_count * index);
448        }
449    }
450    END_TEST;
451}
452
453BEGIN_TEST_CASE(CollectorTest)
454RUN_TEST(AddCounterTest)
455RUN_TEST(AddCounterMultipleTest)
456RUN_TEST(AddHistogramTest)
457RUN_TEST(AddHistogramMultipleTest)
458RUN_TEST(FlushTest)
459RUN_TEST(FlushFailTest)
460RUN_TEST_LARGE(FlushMultithreadTest<false>)
461RUN_TEST_LARGE(FlushMultithreadTest<true>)
462END_TEST_CASE(CollectorTest)
463
464} // namespace
465} // namespace internal
466} // namespace cobalt_client
467