1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <threads.h>
6
7#include <cobalt-client/cpp/collector-internal.h>
8#include <cobalt-client/cpp/collector.h>
9#include <cobalt-client/cpp/counter-internal.h>
10#include <cobalt-client/cpp/histogram-internal.h>
11#include <cobalt-client/cpp/types-internal.h>
12#include <fuchsia/cobalt/c/fidl.h>
13#include <lib/fidl/cpp/vector_view.h>
14#include <lib/zx/channel.h>
15
16namespace cobalt_client {
17namespace {
18
19using internal::Logger;
20using internal::Metadata;
21using internal::RemoteCounter;
22using internal::RemoteHistogram;
23
24Metadata MakeMetadata(uint32_t event_type_index) {
25    Metadata metadata;
26    metadata.event_type = 0;
27    metadata.event_type_index = event_type_index;
28
29    return metadata;
30}
31
32} // namespace
33
34Collector::Collector(const CollectorOptions& options, fbl::unique_ptr<internal::Logger> logger)
35    : logger_(fbl::move(logger)) {
36    flushing_.store(false);
37    remote_counters_.reserve(options.max_counters);
38    remote_histograms_.reserve(options.max_histograms);
39    histogram_options_.reserve(options.max_histograms);
40}
41
42Collector::Collector(Collector&& other)
43    : histogram_options_(fbl::move(other.histogram_options_)),
44      remote_histograms_(fbl::move(other.remote_histograms_)),
45      remote_counters_(fbl::move(other.remote_counters_)), logger_(fbl::move(other.logger_)),
46      flushing_(other.flushing_.load()) {}
47
48Collector::~Collector() {
49    if (logger_ != nullptr) {
50        Flush();
51    }
52};
53
54Histogram Collector::AddHistogram(uint64_t metric_id, uint32_t event_type_index,
55                                  const HistogramOptions& options) {
56    ZX_DEBUG_ASSERT_MSG(remote_histograms_.size() < remote_histograms_.capacity(),
57                        "Exceeded pre-allocated histogram capacity.");
58    remote_histograms_.push_back(
59        RemoteHistogram(options.bucket_count + 2, metric_id, {MakeMetadata(event_type_index)}));
60    histogram_options_.push_back(options);
61    size_t index = remote_histograms_.size() - 1;
62    return Histogram(&histogram_options_[index], &remote_histograms_[index]);
63}
64
65Counter Collector::AddCounter(uint64_t metric_id, uint32_t event_type_index) {
66    ZX_DEBUG_ASSERT_MSG(remote_counters_.size() < remote_counters_.capacity(),
67                        "Exceeded pre-allocated counter capacity.");
68    remote_counters_.push_back(RemoteCounter(metric_id, {MakeMetadata(event_type_index)}));
69    size_t index = remote_counters_.size() - 1;
70    return Counter(&remote_counters_[index]);
71}
72
73void Collector::Flush() {
74    // If we are already flushing we just return and do nothing.
75    // First come first serve.
76    if (flushing_.exchange(true)) {
77        return;
78    }
79
80    for (auto& histogram : remote_histograms_) {
81        LogHistogram(&histogram);
82    }
83
84    for (auto& counter : remote_counters_) {
85        LogCounter(&counter);
86    }
87
88    // Once we are finished we allow flushing again.
89    flushing_.store(false);
90}
91
92void Collector::LogHistogram(RemoteHistogram* histogram) {
93    histogram->Flush([this, histogram](uint64_t metric_id,
94                                       const RemoteHistogram::EventBuffer& buffer,
95                                       RemoteHistogram::FlushCompleteFn complete_fn) {
96        if (!logger_->Log(metric_id, buffer)) {
97            // If we failed to log the data, then add the values again to the histogram, so they may
98            // be flushed in the future, and we dont need to keep a buffer around for retrying or
99            // anything.
100            for (auto& bucket : buffer.event_data()) {
101                if (bucket.count > 0) {
102                    histogram->IncrementCount(bucket.index, bucket.count);
103                }
104            }
105        }
106
107        // Make the buffer writeable again.
108        complete_fn();
109    });
110}
111
112void Collector::LogCounter(RemoteCounter* counter) {
113    counter->Flush([this, counter](uint64_t metric_id, const RemoteCounter::EventBuffer& buffer,
114                                   RemoteCounter::FlushCompleteFn complete_fn) {
115        // Attempt to log data, if we fail, we increase the in process counter by the amount
116        // flushed.
117        if (!logger_->Log(metric_id, buffer)) {
118            if (buffer.event_data() > 0) {
119                counter->Increment(buffer.event_data());
120            }
121        }
122        // Make the buffer writeable again.
123        complete_fn();
124    });
125}
126
127} // namespace cobalt_client
128