1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <float.h>
6#include <math.h>
7#include <stdint.h>
8#include <stdlib.h>
9#include <string.h>
10#include <threads.h>
11#include <unistd.h>
12
13#include <cobalt-client/cpp/histogram-internal.h>
14#include <cobalt-client/cpp/histogram-options.h>
15#include <cobalt-client/cpp/histogram.h>
16#include <fbl/auto_call.h>
17#include <fbl/string.h>
18#include <fbl/string_printf.h>
19#include <fuchsia/cobalt/c/fidl.h>
20#include <lib/sync/completion.h>
21#include <lib/zx/time.h>
22#include <unittest/unittest.h>
23
24namespace cobalt_client {
25namespace internal {
26namespace {
27
28// Number of threads for running multi threaded tests.
29constexpr uint64_t kThreads = 20;
30
31// Number of buckets used for histogram(CUT).
32constexpr uint32_t kBuckets = 40;
33
34// Default id for the histogram.
35constexpr uint64_t kMetricId = 1;
36
37// Returns an immutable vector of metadata.
38fbl::Vector<Metadata>& GetMetadata() {
39    static fbl::Vector<Metadata> metadata = {{/*event_type =*/1, /*event_type_index =*/2},
40                                             {/*event_type =*/2, /*event_type_index =*/4}};
41    return metadata;
42}
43
44// Returns true if both vectors contains the same metadata entries in the same order.
45bool MetadataEq(const fbl::Vector<Metadata>& lhs, const fbl::Vector<Metadata>& rhs) {
46    if (lhs.size() != rhs.size()) {
47        return false;
48    }
49
50    for (size_t i = 0; i < rhs.size(); ++i) {
51        if (memcmp(&lhs[i], &rhs[i], sizeof(Metadata)) != 0) {
52            return false;
53        }
54    }
55
56    return true;
57}
58
59RemoteHistogram MakeRemoteHistogram() {
60    return RemoteHistogram(kBuckets, kMetricId, GetMetadata());
61}
62
63bool HistEventValuesEq(fidl::VectorView<HistogramBucket> actual,
64                       fidl::VectorView<HistogramBucket> expected) {
65    BEGIN_HELPER;
66    ASSERT_EQ(actual.count(), expected.count());
67    for (size_t i = 0; i < actual.count(); ++i) {
68        HistogramBucket& actual_bucket = actual[i];
69        bool found = false;
70        for (size_t j = 0; j < expected.count(); ++j) {
71            HistogramBucket& expected_bucket = expected[j];
72            if (actual_bucket.index != expected_bucket.index) {
73                continue;
74            }
75            EXPECT_EQ(actual_bucket.count, expected_bucket.count);
76            found = true;
77            break;
78        }
79        ASSERT_TRUE(found);
80    }
81    END_HELPER;
82}
83
84// Verify the count of the appropiate bucket is updated on increment.
85bool TestIncrement() {
86    BEGIN_TEST;
87    BaseHistogram histogram(kBuckets);
88
89    // Increase the count of each bucket bucket_index times.
90    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
91        ASSERT_EQ(histogram.GetCount(bucket_index), 0);
92        for (uint32_t times = 0; times < bucket_index; ++times) {
93            histogram.IncrementCount(bucket_index);
94        }
95        ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index);
96    }
97
98    // Verify that the operations are isolated, each bucket should have bucket_index counts.
99    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
100        ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index);
101    }
102    END_TEST;
103}
104
105// Verify the count of the appropiate bucket is updated on increment with a specified value. This
106// verifies the behaviour for weighted histograms, where the weight is limited to an integer.
107bool TestIncrementByVal() {
108    BEGIN_TEST;
109    BaseHistogram histogram(kBuckets);
110
111    // Increase the count of each bucket bucket_index times.
112    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
113        ASSERT_EQ(histogram.GetCount(bucket_index), 0);
114        histogram.IncrementCount(bucket_index, bucket_index);
115        ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index);
116    }
117
118    // Verify that the operations are isolated, each bucket should have bucket_index counts.
119    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
120        ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index);
121    }
122    END_TEST;
123}
124
125struct IncrementArgs {
126    // Target histogram.
127    BaseHistogram* histogram;
128
129    // Used for signaling the worker thread to start incrementing.
130    sync_completion_t* start;
131
132    // Number of times to call Increment.
133    size_t operations = 0;
134};
135
136// Increment each bucket by 2* operations * bucket_index.
137int IncrementFn(void* args) {
138    IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
139    sync_completion_wait(increment_args->start, zx::sec(20).get());
140
141    for (uint32_t bucket = 0; bucket < kBuckets; ++bucket) {
142        for (size_t i = 0; i < increment_args->operations; ++i) {
143            increment_args->histogram->IncrementCount(bucket, bucket);
144        }
145        increment_args->histogram->IncrementCount(bucket, bucket * increment_args->operations);
146    }
147
148    return thrd_success;
149}
150
151// Verifies that calling increment from multiple threads, yields consistent results.
152// Multiple threads will call Increment a known number of times, then the total count
153// per bucket should be sum of the times each thread called Increment one each bucket.
154bool TestIncrementMultiThread() {
155    BEGIN_TEST;
156    sync_completion_t start;
157    BaseHistogram histogram(kBuckets);
158    fbl::Vector<thrd_t> thread_ids;
159    IncrementArgs args[kThreads];
160
161    thread_ids.reserve(kThreads);
162    for (uint64_t i = 0; i < kThreads; ++i) {
163        thread_ids.push_back({});
164    }
165
166    for (uint64_t i = 0; i < kThreads; ++i) {
167        auto& thread_id = thread_ids[i];
168        args[i].histogram = &histogram;
169        args[i].operations = i;
170        args[i].start = &start;
171        ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
172    }
173
174    // Notify threads to start incrementing the count.
175    sync_completion_signal(&start);
176
177    // Wait for all threads to finish.
178    for (const auto& thread_id : thread_ids) {
179        thrd_join(thread_id, nullptr);
180    }
181
182    // Each thread increses each bucket by 2 * bucket_index, so the expected amount for each bucket
183    // is: 2 * bucket_index * Sum(i=0, kThreads -1) i = 2 * bucket_index * kThreads* (kThreads - 1)
184    // / 2;
185    constexpr size_t amount = (kThreads - 1) * (kThreads);
186    for (uint32_t i = 0; i < kBuckets; ++i) {
187        // We take the sum of the accumulated and what is left, because the last increment may have
188        // been scheduled after the last flush.
189        EXPECT_EQ(histogram.GetCount(i), i * amount);
190    }
191    END_TEST;
192}
193
194// Verifies that when flushing an histogram, all the flushed data matches that of the
195// count in the histogram.
196bool TestFlush() {
197    BEGIN_TEST;
198    RemoteHistogram histogram = MakeRemoteHistogram();
199    fidl::VectorView<HistogramBucket> flushed_event_data;
200    uint64_t flushed_metric_id;
201    RemoteHistogram::FlushCompleteFn complete_fn;
202    const fbl::Vector<Metadata>* flushed_metadata;
203
204    // Increase the count of each bucket bucket_index times.
205    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
206        ASSERT_EQ(histogram.GetCount(bucket_index), 0);
207        histogram.IncrementCount(bucket_index, bucket_index);
208        ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index);
209    }
210
211    ASSERT_TRUE(histogram.Flush(
212        [&flushed_event_data, &flushed_metadata, &flushed_metric_id, &complete_fn](
213            uint64_t metric_id, const EventBuffer<fidl::VectorView<HistogramBucket>>& buffer,
214            RemoteHistogram::FlushCompleteFn comp_fn) {
215            flushed_event_data = buffer.event_data();
216            flushed_metadata = &buffer.metadata();
217            flushed_metric_id = metric_id;
218            complete_fn = fbl::move(comp_fn);
219        }));
220
221    // Check that flushed data is actually what we expect:
222    // The metadata is the same, and each bucket contains bucket_index count.
223    ASSERT_EQ(flushed_metric_id, kMetricId);
224    for (uint64_t metadata_index = 0; metadata_index < GetMetadata().size(); ++metadata_index) {
225        EXPECT_TRUE(MetadataEq(*flushed_metadata, GetMetadata()));
226    }
227
228    fbl::Vector<HistogramBucket> buckets;
229    buckets.reserve(kBuckets);
230    for (size_t i = 0; i < kBuckets; ++i) {
231        buckets.push_back({.index = static_cast<uint32_t>(i), .count = i});
232    }
233    fidl::VectorView<HistogramBucket> expected_buckets;
234    expected_buckets.set_data(buckets.get());
235    expected_buckets.set_count(buckets.size());
236
237    // Verify there is a bucket event_data.
238    EXPECT_TRUE(HistEventValuesEq(flushed_event_data, expected_buckets));
239
240    // Until complete_fn is called this should be false.
241    ASSERT_FALSE(histogram.Flush(RemoteHistogram::FlushFn()));
242
243    complete_fn();
244
245    // Verify all buckets are 0.
246    for (uint32_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
247        ASSERT_EQ(histogram.GetCount(bucket_index), 0);
248    }
249
250    // Check that after calling complete_fn we can call flush again.
251    ASSERT_TRUE(histogram.Flush([](uint64_t metric_id,
252                                   const EventBuffer<fidl::VectorView<HistogramBucket>>& values,
253                                   RemoteHistogram::FlushCompleteFn comp_fn) {}));
254
255    END_TEST;
256}
257
258struct FlushArgs {
259    // Pointer to the histogram which is flushing incremental snapshot to a 'remote' histogram.
260    RemoteHistogram* histogram;
261
262    // Pointer to the 'Remote' histogram that is accumulating the data of each flush.
263    BaseHistogram* accumulated_histogram;
264
265    // Used to enforce the threads start together. The main thread will signal after
266    // all threads have been started.
267    sync_completion_t* start;
268
269    // Number of times to perform the given operation.
270    size_t operations = 0;
271
272    // Whether the thread will flush, if flase it will be incrementing the buckets.
273    bool flush = false;
274};
275
276int FlushFn(void* args) {
277    FlushArgs* flush_args = static_cast<FlushArgs*>(args);
278
279    sync_completion_wait(flush_args->start, zx::sec(20).get());
280
281    for (size_t i = 0; i < flush_args->operations; ++i) {
282        if (flush_args->flush) {
283            flush_args->histogram->Flush(
284                [&flush_args](uint64_t metric_id,
285                              const EventBuffer<fidl::VectorView<HistogramBucket>>& buffer,
286                              RemoteHistogram::FlushCompleteFn complete_fn) {
287                    uint64_t count = buffer.event_data().count();
288                    for (uint32_t i = 0; i < count; ++i) {
289                        flush_args->accumulated_histogram->IncrementCount(
290                            buffer.event_data()[i].index, buffer.event_data()[i].count);
291                    }
292                });
293        } else {
294            for (uint32_t j = 0; j < kBuckets; ++j) {
295                flush_args->histogram->IncrementCount(j, j);
296            }
297        }
298    }
299    return thrd_success;
300}
301
302// Verify that under concurrent environment the final results are consistent. This test
303// will have |kThreads|/2 threads increment bucket counts, and |kThreads|/2 Flush them
304// a certain amount of times, and collect into a BaseHistogram the final results. At the end,
305// each bucket of the BaseHistogram should be the expected value.
306bool TestFlushMultithread() {
307    BEGIN_TEST;
308    sync_completion_t start;
309    BaseHistogram accumulated(kBuckets);
310    RemoteHistogram histogram = MakeRemoteHistogram();
311    fbl::Vector<thrd_t> thread_ids;
312    FlushArgs args[kThreads];
313
314    thread_ids.reserve(kThreads);
315    for (uint64_t i = 0; i < kThreads; ++i) {
316        thread_ids.push_back({});
317    }
318
319    for (uint64_t i = 0; i < kThreads; ++i) {
320        auto& thread_id = thread_ids[i];
321        args[i].histogram = &histogram;
322        args[i].accumulated_histogram = &accumulated;
323        args[i].operations = i;
324        args[i].flush = i % 2;
325        args[i].start = &start;
326        ASSERT_EQ(thrd_create(&thread_id, FlushFn, &args[i]), thrd_success);
327    }
328
329    // Notify threads to start incrementing the count.
330    sync_completion_signal(&start);
331
332    // Wait for all threads to finish.
333    for (const auto& thread_id : thread_ids) {
334        thrd_join(thread_id, nullptr);
335    }
336
337    // Each thread at an even position, increases the the count of a bucket by bucket_index.
338    constexpr size_t ceil_threads = ((kThreads - 1) / 2 * ((kThreads - 1) / 2 + 1));
339    for (uint32_t i = 0; i < kBuckets; ++i) {
340        // We take the sum of the accumulated and what is left, because the last increment may have
341        // been scheduled after the last flush.
342        EXPECT_EQ(accumulated.GetCount(i) + histogram.GetCount(i), i * ceil_threads);
343    }
344    END_TEST;
345}
346
347bool TestAdd() {
348    BEGIN_TEST;
349    // Buckets 2^i + offset.
350    HistogramOptions options = HistogramOptions::Exponential(/*bucket_count=*/kBuckets, /*base=*/2,
351                                                             /*scalar=*/1, /*offset=*/-10);
352    RemoteHistogram remote_histogram(kBuckets + 2, kMetricId, {});
353    ASSERT_TRUE(options.IsValid());
354    Histogram histogram(&options, &remote_histogram);
355
356    histogram.Add(25);
357    ASSERT_EQ(histogram.GetRemoteCount(25), 1);
358    histogram.Add(25, 4);
359    histogram.Add(1500, 2);
360
361    ASSERT_EQ(histogram.GetRemoteCount(25), 5);
362    ASSERT_EQ(histogram.GetRemoteCount(1500), 2);
363
364    END_TEST;
365}
366
367// Verify that from the public point of view, changes are reflected accurately, while internally
368// the buckets are accessed correctly.
369// Note: The two extra buckets, are for underflow and overflow buckets.
370bool TestAddMultiple() {
371    BEGIN_TEST;
372    // Buckets 2^i + offset.
373    HistogramOptions options = HistogramOptions::Exponential(/*bucket_count=*/kBuckets, /*base=*/2,
374                                                             /*scalar=*/1, /*offset=*/-10);
375    RemoteHistogram remote_histogram(kBuckets + 2, kMetricId, {});
376    BaseHistogram expected_hist(kBuckets + 2);
377    ASSERT_TRUE(options.IsValid());
378    Histogram histogram(&options, &remote_histogram);
379
380    struct ValueBucket {
381        double value;
382        uint32_t bucket;
383    };
384    fbl::Vector<ValueBucket> data;
385    unsigned int seed = static_cast<unsigned int>(zx::ticks::now().get());
386
387    // 500 random observation.
388    for (int i = 0; i < 500; ++i) {
389        ValueBucket curr;
390        curr.bucket = rand_r(&seed) % (kBuckets + 2);
391        double min;
392        double max;
393        if (curr.bucket == 0) {
394            min = -DBL_MAX;
395        } else {
396            min = options.reverse_map_fn(curr.bucket, options);
397        }
398        max = nextafter(options.reverse_map_fn(curr.bucket + 1, options), min);
399        curr.value = min + (max - min) * (static_cast<double>(rand_r(&seed)) / RAND_MAX);
400        ASSERT_EQ(options.map_fn(curr.value, options), curr.bucket);
401
402        Histogram::Count count = 1 + rand_r(&seed) % 20;
403        expected_hist.IncrementCount(curr.bucket, count);
404        histogram.Add(curr.value, count);
405    }
406
407    // Verify that the data stored through public API, matches the expected values.
408    for (auto& val_bucket : data) {
409        EXPECT_EQ(histogram.GetRemoteCount(val_bucket.value),
410                  expected_hist.GetCount(val_bucket.bucket));
411    }
412
413    // Sanity-Check that the internal representation also matches the expected values.
414    for (uint32_t bucket = 0; bucket < kBuckets + 2; ++bucket) {
415        EXPECT_EQ(remote_histogram.GetCount(bucket), remote_histogram.GetCount(bucket));
416    }
417
418    END_TEST;
419}
420
421// Verify we are always exposing the delta since last FlushFn.
422bool TestAddAfterFlush() {
423    BEGIN_TEST;
424    // Buckets 2^i + offset.
425    HistogramOptions options = HistogramOptions::Exponential(/*bucket_count=*/kBuckets, /*base=*/2,
426                                                             /*scalar=*/1, /*offset=*/-10);
427    RemoteHistogram remote_histogram(kBuckets + 2, kMetricId, {});
428    BaseHistogram expected_hist(kBuckets + 2);
429    ASSERT_TRUE(options.IsValid());
430    Histogram histogram(&options, &remote_histogram);
431
432    histogram.Add(25, 4);
433    ASSERT_EQ(histogram.GetRemoteCount(25), 4);
434    remote_histogram.Flush([](uint64_t metric_id,
435                              const EventBuffer<fidl::VectorView<HistogramBucket>>&,
436                              RemoteHistogram::FlushCompleteFn complete) { complete(); });
437    histogram.Add(25, 4);
438    histogram.Add(1500, 2);
439
440    ASSERT_EQ(histogram.GetRemoteCount(25), 4);
441    ASSERT_EQ(histogram.GetRemoteCount(1500), 2);
442
443    END_TEST;
444}
445
446struct Observation {
447    double value;
448    Histogram::Count count;
449};
450
451struct HistogramFnArgs {
452    // Public histogram which is used to add observations.
453    Histogram histogram = Histogram(nullptr, nullptr);
454
455    // When we are flushing we act as the collector, so we need
456    // a pointer to the underlying histogram.
457    RemoteHistogram* remote_histogram = nullptr;
458
459    // We flush the contents at each step into this histogram.
460    BaseHistogram* flushed_histogram = nullptr;
461
462    // Synchronize thread start.
463    sync_completion_t* start = nullptr;
464
465    // Observations each thread will add.
466    fbl::Vector<Observation>* observed_values = nullptr;
467
468    // The thread will flush contents if set to true.
469    bool flush = false;
470};
471
472// Wait until all threads are started, then start adding observations or flushing, depending
473// on the thread parameters.
474int HistogramFn(void* v_args) {
475    HistogramFnArgs* args = reinterpret_cast<HistogramFnArgs*>(v_args);
476    sync_completion_wait(args->start, zx::sec(20).get());
477    for (auto& obs : *args->observed_values) {
478        if (!args->flush) {
479            args->histogram.Add(obs.value, obs.count);
480        } else {
481            args->remote_histogram->Flush(
482                [args](uint64_t, const EventBuffer<fidl::VectorView<HistogramBucket>>& buffer,
483                       RemoteHistogram::FlushCompleteFn complete_fn) {
484                    for (auto& hist_bucket : buffer.event_data()) {
485                        args->flushed_histogram->IncrementCount(hist_bucket.index,
486                                                                hist_bucket.count);
487                    }
488                });
489        }
490    }
491    return thrd_success;
492}
493
494// Verify that when multiple threads call Add the result is eventually consistent,
495// meaning that the total count in each bucket should match the count in an histogram
496// that is being kept manually(BaseHistogram).
497bool TestAddMultiThread() {
498    BEGIN_TEST;
499    // Buckets 2^i + offset.
500    HistogramOptions options = HistogramOptions::Linear(/*bucket_count=*/kBuckets,
501                                                        /*scalar=*/2, /*offset=*/0);
502    RemoteHistogram remote_histogram(kBuckets + 2, kMetricId, {});
503    BaseHistogram expected_hist(kBuckets + 2);
504    ASSERT_TRUE(options.IsValid());
505    Histogram histogram(&options, &remote_histogram);
506    fbl::Vector<Observation> observations;
507
508    // 1500 random observation.
509    unsigned int seed = static_cast<unsigned int>(zx::ticks::now().get());
510    for (int i = 0; i < 1500; ++i) {
511        Observation obs;
512        uint32_t bucket = rand_r(&seed) % (kBuckets + 2);
513        double min;
514        double max;
515        if (bucket == 0) {
516            min = -DBL_MAX;
517        } else {
518            min = options.reverse_map_fn(bucket, options);
519        }
520        max = nextafter(options.reverse_map_fn(bucket + 1, options), min);
521        obs.value = min + (max - min) * (static_cast<double>(rand_r(&seed)) / RAND_MAX);
522        ASSERT_EQ(options.map_fn(obs.value, options), bucket);
523        obs.count = 1 + rand_r(&seed) % 20;
524        expected_hist.IncrementCount(bucket, kThreads * obs.count);
525        observations.push_back(obs);
526    }
527
528    // Thread data.
529    HistogramFnArgs args;
530    sync_completion_t start;
531    fbl::Vector<thrd_t> thread_ids;
532    args.histogram = histogram;
533    args.start = &start;
534    args.observed_values = &observations;
535
536    for (size_t thread = 0; thread < kThreads; ++thread) {
537        thread_ids.push_back({});
538        auto& thread_id = thread_ids[thread];
539        ASSERT_EQ(thrd_create(&thread_id, HistogramFn, &args), thrd_success);
540    }
541    sync_completion_signal(&start);
542
543    for (auto& thread_id : thread_ids) {
544        thrd_join(thread_id, nullptr);
545    }
546
547    // Verify each bucket has the exact value as the expected histogram,
548    for (uint32_t bucket = 0; bucket < kBuckets + 2; ++bucket) {
549        double value;
550        value = options.reverse_map_fn(bucket, options);
551        EXPECT_EQ(histogram.GetRemoteCount(value), expected_hist.GetCount(bucket));
552    }
553    END_TEST;
554}
555
556// Verify that when multiple threads call Add and Flush consistently, the result
557// is eventually consistent, meaning that for each bucket, the amount in expected_hist
558// is equal to what remains in the remote histogram plus the amount in the flushed hist.
559// Essentially a sanity check that data is not lost.
560bool TestAddAndFlushMultiThread() {
561    BEGIN_TEST;
562    // Buckets 2^i + offset.
563    HistogramOptions options = HistogramOptions::Linear(/*bucket_count=*/kBuckets,
564                                                        /*scalar=*/2, /*offset=*/0);
565    RemoteHistogram remote_histogram(kBuckets + 2, kMetricId, {});
566    BaseHistogram expected_hist(kBuckets + 2);
567    BaseHistogram flushed_hist(kBuckets + 2);
568    ASSERT_TRUE(options.IsValid());
569    Histogram histogram(&options, &remote_histogram);
570    fbl::Vector<Observation> observations;
571
572    // 1500 random observation.
573    unsigned int seed = static_cast<unsigned int>(zx::ticks::now().get());
574    for (int i = 0; i < 1500; ++i) {
575        Observation obs;
576        uint32_t bucket = rand_r(&seed) % (kBuckets + 2);
577        double min;
578        double max;
579        if (bucket == 0) {
580            min = -DBL_MAX;
581        } else {
582            min = options.reverse_map_fn(bucket, options);
583        }
584        max = nextafter(options.reverse_map_fn(bucket + 1, options), min);
585        obs.value = min + (max - min) * (static_cast<double>(rand_r(&seed)) / RAND_MAX);
586        ASSERT_EQ(options.map_fn(obs.value, options), bucket);
587        obs.count = 1 + rand_r(&seed) % 20;
588        expected_hist.IncrementCount(bucket, (kThreads / 2 + kThreads % 2) * obs.count);
589        observations.push_back(obs);
590    }
591
592    sync_completion_t start;
593    HistogramFnArgs add_args;
594    fbl::Vector<thrd_t> thread_ids;
595    add_args.start = &start;
596    add_args.histogram = histogram;
597    add_args.observed_values = &observations;
598
599    HistogramFnArgs flush_args = add_args;
600    flush_args.flush = true;
601    flush_args.flushed_histogram = &flushed_hist;
602    flush_args.remote_histogram = &remote_histogram;
603
604    for (size_t thread = 0; thread < kThreads; ++thread) {
605        thread_ids.push_back({});
606        auto& thread_id = thread_ids[thread];
607        if (thread % 2 != 0) {
608            ASSERT_EQ(thrd_create(&thread_id, HistogramFn, &add_args), thrd_success);
609        } else {
610            ASSERT_EQ(thrd_create(&thread_id, HistogramFn, &flush_args), thrd_success);
611        }
612    }
613
614    sync_completion_signal(&start);
615
616    for (auto& thread_id : thread_ids) {
617        thrd_join(thread_id, nullptr);
618    }
619
620    // Verify each bucket has the exact value as the expected histogram.
621    // The addition here, is just because we have no guarantee that the last flush
622    // happened after the last add. Essentially what we have not yet flushed,
623    // plus what we flushed, should be equal to the expected value if we didnt flush at all.
624    for (uint32_t bucket = 0; bucket < kBuckets + 2; ++bucket) {
625        double value;
626        value = options.reverse_map_fn(bucket, options);
627        EXPECT_EQ(histogram.GetRemoteCount(value) + flushed_hist.GetCount(bucket),
628                  expected_hist.GetCount(bucket));
629    }
630    END_TEST;
631}
632
633BEGIN_TEST_CASE(BaseHistogramTest)
634RUN_TEST(TestIncrement)
635RUN_TEST(TestIncrementByVal)
636RUN_TEST(TestIncrementMultiThread)
637END_TEST_CASE(BaseHistogramTest)
638
639BEGIN_TEST_CASE(RemoteHistogramTest)
640RUN_TEST(TestFlush)
641RUN_TEST(TestFlushMultithread)
642END_TEST_CASE(RemoteHistogramTest)
643
644BEGIN_TEST_CASE(HistogramTest)
645RUN_TEST(TestAdd)
646RUN_TEST(TestAddAfterFlush)
647RUN_TEST(TestAddMultiple)
648RUN_TEST(TestAddMultiThread)
649RUN_TEST(TestAddAndFlushMultiThread)
650END_TEST_CASE(HistogramTest)
651
652} // namespace
653} // namespace internal
654} // namespace cobalt_client
655