Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle
public:
MetricCollector(MeterContext *context,
std::shared_ptr<MetricReader> metric_reader,
std::unique_ptr<MetricFilter> metric_filter = nullptr);
std::unique_ptr<MetricFilter> metric_filter = nullptr) noexcept;

~MetricCollector() override = default;

Expand Down
136 changes: 89 additions & 47 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,31 @@ namespace metrics

MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context,
std::shared_ptr<MetricReader> metric_reader,
std::unique_ptr<MetricFilter> metric_filter)
std::unique_ptr<MetricFilter> metric_filter) noexcept
: meter_context_{context},
metric_reader_{std::move(metric_reader)},
metric_filter_(std::move(metric_filter))
{
metric_reader_->SetMetricProducer(this);
if (!context)
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector."
<< "The context is invalid");
if (metric_reader_)
metric_reader_->SetMetricProducer(this);
else
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector."
Comment on lines +39 to +44
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error constructing MetricCollector. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector."
<< "The context is invalid");
if (metric_reader_)
metric_reader_->SetMetricProducer(this);
else
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector. "
<< "The context is invalid");
if (metric_reader_)
metric_reader_->SetMetricProducer(this);
else
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector. "

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error constructing MetricCollector. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::MetricCollector] - Error constructing MetricCollector. "

Copilot uses AI. Check for mistakes.
<< "The metric reader is invalid");
}

AggregationTemporality MetricCollector::GetAggregationTemporality(
InstrumentType instrument_type) noexcept
{
if (!metric_reader_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::GetAggregationTemporality] - Error getting aggregation temporality."
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error getting aggregation temporality. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::GetAggregationTemporality] - Error getting aggregation temporality."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::GetAggregationTemporality] - Error getting aggregation temporality. "

Copilot uses AI. Check for mistakes.
<< "The metric reader is invalid");
return AggregationTemporality::kCumulative;
}

auto aggregation_temporality = metric_reader_->GetAggregationTemporality(instrument_type);
if (aggregation_temporality == AggregationTemporality::kDelta &&
instrument_type == InstrumentType::kGauge)
Expand All @@ -64,78 +78,106 @@ MetricProducer::Result MetricCollector::Produce() noexcept
return {{}, MetricProducer::Status::kFailure};
}
ResourceMetrics resource_metrics;
meter_context_->ForEachMeter([&](const std::shared_ptr<Meter> &meter) noexcept {
auto collection_ts = std::chrono::system_clock::now();
auto metric_data = meter->Collect(this, collection_ts);
if (metric_data.empty())
{
return true;
}
ScopeMetrics scope_metrics;
scope_metrics.metric_data_ = std::move(metric_data);
scope_metrics.scope_ = meter->GetInstrumentationScope();
if (!metric_filter_)
bool success = meter_context_->ForEachMeter([&](const std::shared_ptr<Meter> &meter) noexcept {
try
{
resource_metrics.scope_metric_data_.emplace_back(std::move(scope_metrics));
return true;
}

ScopeMetrics filtered_scope_metrics;
filtered_scope_metrics.scope_ = meter->GetInstrumentationScope();
for (MetricData &metric : scope_metrics.metric_data_)
{
const opentelemetry::sdk::instrumentationscope::InstrumentationScope &scope =
*scope_metrics.scope_;
opentelemetry::nostd::string_view name = metric.instrument_descriptor.name_;
const InstrumentType &type = metric.instrument_descriptor.type_;
opentelemetry::nostd::string_view unit = metric.instrument_descriptor.unit_;

MetricFilter::MetricFilterResult metric_filter_result =
metric_filter_->TestMetric(scope, name, type, unit);
if (metric_filter_result == MetricFilter::MetricFilterResult::kAccept)
auto collection_ts = std::chrono::system_clock::now();
auto metric_data = meter->Collect(this, collection_ts);
if (metric_data.empty())
{
filtered_scope_metrics.metric_data_.emplace_back(std::move(metric));
continue;
return true;
}
else if (metric_filter_result == MetricFilter::MetricFilterResult::kDrop)
ScopeMetrics scope_metrics;
scope_metrics.metric_data_ = std::move(metric_data);
scope_metrics.scope_ = meter->GetInstrumentationScope();
if (!metric_filter_)
{
continue;
resource_metrics.scope_metric_data_.emplace_back(std::move(scope_metrics));
return true;
}

std::vector<PointDataAttributes> filtered_point_data_attrs;
for (PointDataAttributes &point_data_attr : metric.point_data_attr_)
ScopeMetrics filtered_scope_metrics;
filtered_scope_metrics.scope_ = scope_metrics.scope_;
for (MetricData &metric : scope_metrics.metric_data_)
{
const PointAttributes &attributes = point_data_attr.attributes;
MetricFilter::AttributesFilterResult attributes_filter_result =
metric_filter_->TestAttributes(scope, name, type, unit, attributes);
if (attributes_filter_result == MetricFilter::AttributesFilterResult::kAccept)
const opentelemetry::sdk::instrumentationscope::InstrumentationScope &scope =
*scope_metrics.scope_;
opentelemetry::nostd::string_view name = metric.instrument_descriptor.name_;
const InstrumentType &type = metric.instrument_descriptor.type_;
opentelemetry::nostd::string_view unit = metric.instrument_descriptor.unit_;

MetricFilter::MetricFilterResult metric_filter_result =
metric_filter_->TestMetric(scope, name, type, unit);
if (metric_filter_result == MetricFilter::MetricFilterResult::kAccept)
{
filtered_scope_metrics.metric_data_.emplace_back(std::move(metric));
continue;
}
else if (metric_filter_result == MetricFilter::MetricFilterResult::kDrop)
{
continue;
}

std::vector<PointDataAttributes> filtered_point_data_attrs;
for (PointDataAttributes &point_data_attr : metric.point_data_attr_)
{
const PointAttributes &attributes = point_data_attr.attributes;
MetricFilter::AttributesFilterResult attributes_filter_result =
metric_filter_->TestAttributes(scope, name, type, unit, attributes);
if (attributes_filter_result == MetricFilter::AttributesFilterResult::kAccept)
{
filtered_point_data_attrs.emplace_back(std::move(point_data_attr));
}
}
if (!filtered_point_data_attrs.empty())
{
filtered_point_data_attrs.emplace_back(std::move(point_data_attr));
metric.point_data_attr_ = std::move(filtered_point_data_attrs);
filtered_scope_metrics.metric_data_.emplace_back(std::move(metric));
}
}
if (!filtered_point_data_attrs.empty())
if (!filtered_scope_metrics.metric_data_.empty())
{
metric.point_data_attr_ = std::move(filtered_point_data_attrs);
filtered_scope_metrics.metric_data_.emplace_back(std::move(metric));
resource_metrics.scope_metric_data_.emplace_back(std::move(filtered_scope_metrics));
}
}
if (!filtered_scope_metrics.metric_data_.empty())
catch (const std::exception &e)
{
resource_metrics.scope_metric_data_.emplace_back(std::move(filtered_scope_metrics));
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error during collecting. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting. "

Copilot uses AI. Check for mistakes.
<< "Exception caught: " << e.what());
return false;
}
catch (...)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error during collecting. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting. "

Copilot uses AI. Check for mistakes.
<< "Unknown exception caught.");
return false;
}
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
return {resource_metrics, MetricProducer::Status::kSuccess};
return {resource_metrics,
success ? MetricProducer::Status::kSuccess : MetricProducer::Status::kFailure};
}

bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept
{
if (!metric_reader_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::ForceFlush] - Error accessing meter_reader_."
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error accessing meter_reader_. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::ForceFlush] - Error accessing meter_reader_."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::ForceFlush] - Error accessing meter_reader_. "

Copilot uses AI. Check for mistakes.
<< "The metric reader is invalid");
return false;
}
return metric_reader_->ForceFlush(timeout);
}

bool MetricCollector::Shutdown(std::chrono::microseconds timeout) noexcept
{
if (!metric_reader_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Shutdown] - Error accessing meter_reader_."
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between the error message and the stream insertion. Should be: \"Error accessing meter_reader_. \" (with trailing space) for proper formatting.

Suggested change
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Shutdown] - Error accessing meter_reader_."
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Shutdown] - Error accessing meter_reader_. "

Copilot uses AI. Check for mistakes.
<< "The metric reader is invalid");
return false;
}
return metric_reader_->Shutdown(timeout);
}

Expand Down
24 changes: 16 additions & 8 deletions sdk/test/metrics/metric_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,26 @@ MetricFilter::TestAttributesFn DropAllTestAttributesFn()
class testing::MetricCollectorTest : public Test
{
public:
std::weak_ptr<MetricCollector> AddMetricReaderToMeterContext(
std::shared_ptr<MetricCollector> AddMetricReaderToMeterContext(
const std::shared_ptr<MeterContext> &context,
std::shared_ptr<MetricReader> reader,
std::unique_ptr<MetricFilter> metric_filter = nullptr) noexcept
std::unique_ptr<MetricFilter> metric_filter = nullptr)
{
auto collector = std::shared_ptr<MetricCollector>{
new MetricCollector(context.get(), std::move(reader), std::move(metric_filter))};
context->collectors_.push_back(collector);
return std::weak_ptr<MetricCollector>(collector);
return collector;
}
};

TEST_F(MetricCollectorTest, ConstructMetricCollectorThrowInvalidArgs)
{
auto context = std::shared_ptr<MeterContext>(new MeterContext(ViewRegistryFactory::Create()));
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
EXPECT_THROW(AddMetricReaderToMeterContext({}, reader, {}), std::invalid_argument);
EXPECT_THROW(AddMetricReaderToMeterContext(context, {}, {}), std::invalid_argument);
}

TEST_F(MetricCollectorTest, CollectWithMetricFilterTestMetricTest1)
{
auto context = std::shared_ptr<MeterContext>(new MeterContext(ViewRegistryFactory::Create()));
Expand All @@ -112,7 +120,7 @@ TEST_F(MetricCollectorTest, CollectWithMetricFilterTestMetricTest1)

auto filter = MetricFilter::Create(AcceptAllTestMetricFn(), DropAllTestAttributesFn());
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter)).lock();
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter));

auto instrument_1_name = "instrument_1";
auto instrument_1 = meter->CreateUInt64Counter(instrument_1_name);
Expand Down Expand Up @@ -163,7 +171,7 @@ TEST_F(MetricCollectorTest, CollectWithMetricFilterTestMetricTest2)

auto filter = MetricFilter::Create(DropAllTestMetricFn(), AcceptAllTestAttributesFn());
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter)).lock();
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter));

auto instrument_1_name = "instrument_1";
auto instrument_1 = meter->CreateUInt64Counter(instrument_1_name);
Expand Down Expand Up @@ -209,7 +217,7 @@ TEST_F(MetricCollectorTest, CollectWithMetricFilterTestMetricTest3)
};
auto filter = MetricFilter::Create(test_metric_fn, DropAllTestAttributesFn());
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter)).lock();
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter));

auto instrument_1_name = "instrument_1";
auto instrument_1 = meter->CreateUInt64Counter(instrument_1_name);
Expand Down Expand Up @@ -263,7 +271,7 @@ TEST_F(MetricCollectorTest, CollectWithMetricFilterTestAttributesTest1)
};
auto filter = MetricFilter::Create(AcceptPartialAllTestMetricFn(), test_attributes_fn);
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter)).lock();
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter));

auto instrument_1_name = "instrument_1";
auto instrument_1 = meter->CreateUInt64Counter(instrument_1_name);
Expand Down Expand Up @@ -350,7 +358,7 @@ TEST_F(MetricCollectorTest, CollectWithMetricFilterTestAttributesTest2)
};
auto filter = MetricFilter::Create(AcceptPartialAllTestMetricFn(), test_attributes_fn);
auto reader = std::shared_ptr<MetricReader>(new MockMetricReader());
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter)).lock();
auto collector = AddMetricReaderToMeterContext(context, reader, std::move(filter));

auto instrument_1_name = "instrument_1";
auto instrument_1 = meter->CreateUInt64Counter(instrument_1_name);
Expand Down
Loading