Skip to content

Commit fec8ce1

Browse files
committed
feat(metrics): add MetricProducer support (fixes #4768)
1 parent daae23b commit fec8ce1

File tree

5 files changed

+968
-6
lines changed

5 files changed

+968
-6
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# MetricProducer Examples
2+
3+
This directory contains examples of how to implement and use the `MetricProducer` interface to bridge third-party metric sources with OpenTelemetry.
4+
5+
## What is MetricProducer?
6+
7+
`MetricProducer` is an interface defined in the OpenTelemetry specification that allows you to plug third-party metric sources into an OpenTelemetry `MetricReader`. This is particularly useful for:
8+
9+
- Bridging existing monitoring systems to OpenTelemetry
10+
- Integrating with systems like Prometheus, StatsD, or custom monitoring solutions
11+
- Collecting pre-processed metrics from external sources
12+
13+
## Key Concepts
14+
15+
- **MetricProducer**: Interface that defines how to produce metrics from third-party sources
16+
- **MetricReader**: Collects metrics from both the OpenTelemetry SDK and registered MetricProducers
17+
- **Pre-processed data**: Unlike OpenTelemetry instruments that collect raw measurements, MetricProducers work with already aggregated metrics
18+
19+
## Examples
20+
21+
### basic_example.py
22+
23+
A comprehensive example showing:
24+
- How to implement `MetricProducer` for different systems (Prometheus simulation, custom system)
25+
- How to convert third-party metric formats to OpenTelemetry `MetricsData`
26+
- How to register producers with a `MetricReader`
27+
- How both SDK metrics and producer metrics are combined
28+
29+
## Running the Examples
30+
31+
```bash
32+
# From the repo root
33+
cd docs/examples/metrics/producer
34+
python basic_example.py
35+
```
36+
37+
## Implementation Pattern
38+
39+
When implementing a `MetricProducer`:
40+
41+
1. **Inherit from MetricProducer**: Create a class that extends the abstract base class
42+
2. **Implement produce()**: This method should fetch and convert metrics to OpenTelemetry format
43+
3. **Handle errors gracefully**: Your producer should not crash the entire collection process
44+
4. **Respect timeout**: The `produce()` method receives a timeout parameter
45+
5. **Return MetricsData**: Convert your metrics to the standard OpenTelemetry format
46+
47+
```python
48+
from opentelemetry.sdk.metrics.export import MetricProducer, MetricsData
49+
50+
class MyMetricProducer(MetricProducer):
51+
def produce(self, timeout_millis: float = 10_000) -> MetricsData:
52+
# Fetch metrics from your source
53+
raw_metrics = self.fetch_from_source()
54+
55+
# Convert to OpenTelemetry format
56+
otel_metrics = self.convert_to_otel_format(raw_metrics)
57+
58+
# Return as MetricsData
59+
return MetricsData(resource_metrics=otel_metrics)
60+
```
61+
62+
## Best Practices
63+
64+
1. **Resource Identification**: Use appropriate resource attributes to identify the source system
65+
2. **Instrumentation Scope**: Create meaningful instrumentation scopes for your producers
66+
3. **Metric Naming**: Use clear, descriptive metric names, optionally with prefixes
67+
4. **Error Handling**: Handle network errors, parsing errors, and timeouts gracefully
68+
5. **Performance**: Consider caching and efficient data fetching to avoid impacting collection performance
69+
6. **Thread Safety**: Ensure your producer is thread-safe as it may be called concurrently
70+
71+
## Integration with MetricReader
72+
73+
```python
74+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
75+
76+
# Create your producers
77+
producer1 = MyCustomProducer()
78+
producer2 = PrometheusProducer()
79+
80+
# Create a reader with producers
81+
reader = PeriodicExportingMetricReader(
82+
exporter=ConsoleMetricExporter(),
83+
metric_producers=[producer1, producer2]
84+
)
85+
86+
# The reader will automatically collect from both SDK and producers
87+
```
88+
89+
## Relationship to OpenTelemetry Instruments
90+
91+
MetricProducer is different from OpenTelemetry instruments:
92+
93+
- **Instruments** (Counter, Histogram, etc.): Collect raw measurements and aggregate them in the SDK
94+
- **MetricProducer**: Provides already-aggregated metrics from external sources
95+
96+
Use MetricProducer when you have an existing system that already aggregates metrics and you want to bridge that data into OpenTelemetry.
97+
98+
## Further Reading
99+
100+
- [OpenTelemetry Metrics Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricproducer)
101+
- [OpenTelemetry Python SDK Documentation](https://opentelemetry-python.readthedocs.io/)
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
This example demonstrates how to implement and use a MetricProducer to
17+
bridge third-party metric sources with OpenTelemetry.
18+
19+
MetricProducer allows you to integrate pre-processed metrics from external
20+
systems (like Prometheus, custom monitoring systems, etc.) into the
21+
OpenTelemetry metrics pipeline.
22+
"""
23+
24+
import time
25+
from typing import Dict
26+
27+
from opentelemetry.sdk.metrics import MeterProvider
28+
from opentelemetry.sdk.metrics.export import (
29+
ConsoleMetricExporter,
30+
Metric,
31+
MetricProducer,
32+
MetricsData,
33+
NumberDataPoint,
34+
PeriodicExportingMetricReader,
35+
ResourceMetrics,
36+
ScopeMetrics,
37+
Sum,
38+
)
39+
from opentelemetry.sdk.resources import Resource
40+
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
41+
42+
43+
class PrometheusMetricProducer(MetricProducer):
44+
"""Example MetricProducer that bridges Prometheus metrics.
45+
46+
This example shows how to fetch metrics from a third-party source
47+
(simulating Prometheus) and convert them to OpenTelemetry format.
48+
"""
49+
50+
def __init__(self, prometheus_url: str = "http://localhost:9090"):
51+
self.prometheus_url = prometheus_url
52+
self.instrumentation_scope = InstrumentationScope(
53+
name="prometheus.bridge",
54+
version="1.0.0"
55+
)
56+
self.resource = Resource.create({
57+
"service.name": "prometheus-bridge",
58+
"bridge.source": "prometheus",
59+
"bridge.url": prometheus_url
60+
})
61+
62+
def produce(self, timeout_millis: float = 10_000) -> MetricsData:
63+
"""Fetch metrics from Prometheus and convert to OpenTelemetry format."""
64+
65+
# In a real implementation, you would:
66+
# 1. Make HTTP request to Prometheus /api/v1/query_range or /metrics
67+
# 2. Parse the response (JSON or Prometheus text format)
68+
# 3. Convert to OpenTelemetry metrics
69+
70+
# For this example, we'll simulate fetching metrics
71+
simulated_prometheus_metrics = self._fetch_prometheus_metrics()
72+
73+
# Convert to OpenTelemetry format
74+
otel_metrics = []
75+
for metric_name, metric_data in simulated_prometheus_metrics.items():
76+
otel_metrics.append(
77+
Metric(
78+
name=f"prometheus.{metric_name}",
79+
description=f"Metric {metric_name} from Prometheus",
80+
unit=metric_data.get("unit", "1"),
81+
data=Sum(
82+
data_points=[
83+
NumberDataPoint(
84+
attributes=metric_data.get("labels", {}),
85+
start_time_unix_nano=int((time.time() - 60) * 1e9), # 1 minute ago
86+
time_unix_nano=int(time.time() * 1e9),
87+
value=metric_data["value"],
88+
)
89+
],
90+
aggregation_temporality=1, # CUMULATIVE
91+
is_monotonic=metric_data.get("monotonic", False),
92+
),
93+
)
94+
)
95+
96+
# Return as MetricsData
97+
return MetricsData(
98+
resource_metrics=[
99+
ResourceMetrics(
100+
resource=self.resource,
101+
scope_metrics=[
102+
ScopeMetrics(
103+
scope=self.instrumentation_scope,
104+
metrics=otel_metrics,
105+
schema_url="",
106+
)
107+
],
108+
schema_url="",
109+
)
110+
]
111+
)
112+
113+
def _fetch_prometheus_metrics(self) -> Dict[str, Dict]:
114+
"""Simulate fetching metrics from Prometheus."""
115+
# In a real implementation, this would make HTTP requests to Prometheus
116+
# and parse the response. For this example, we return simulated data.
117+
118+
return {
119+
"http_requests_total": {
120+
"value": 12345,
121+
"labels": {"method": "GET", "status": "200"},
122+
"unit": "1",
123+
"monotonic": True,
124+
},
125+
"http_request_duration_seconds": {
126+
"value": 0.234,
127+
"labels": {"method": "GET", "quantile": "0.95"},
128+
"unit": "s",
129+
"monotonic": False,
130+
},
131+
"memory_usage_bytes": {
132+
"value": 1024 * 1024 * 512, # 512 MB
133+
"labels": {"instance": "server-1"},
134+
"unit": "bytes",
135+
"monotonic": False,
136+
},
137+
}
138+
139+
140+
class CustomSystemMetricProducer(MetricProducer):
141+
"""Example MetricProducer for a custom monitoring system."""
142+
143+
def __init__(self, system_name: str = "custom-system"):
144+
self.system_name = system_name
145+
self.instrumentation_scope = InstrumentationScope(
146+
name=f"{system_name}.bridge",
147+
version="1.0.0"
148+
)
149+
self.resource = Resource.create({
150+
"service.name": f"{system_name}-bridge",
151+
"bridge.source": system_name,
152+
})
153+
154+
def produce(self, timeout_millis: float = 10_000) -> MetricsData:
155+
"""Fetch metrics from custom system."""
156+
157+
# Simulate fetching from a custom system
158+
custom_metrics = self._fetch_custom_metrics()
159+
160+
# Convert to OpenTelemetry format
161+
otel_metrics = []
162+
for metric in custom_metrics:
163+
otel_metrics.append(
164+
Metric(
165+
name=f"custom.{metric['name']}",
166+
description=metric.get("description", ""),
167+
unit=metric.get("unit", "1"),
168+
data=Sum(
169+
data_points=[
170+
NumberDataPoint(
171+
attributes=metric.get("tags", {}),
172+
start_time_unix_nano=int((time.time() - 30) * 1e9), # 30 seconds ago
173+
time_unix_nano=int(time.time() * 1e9),
174+
value=metric["value"],
175+
)
176+
],
177+
aggregation_temporality=1, # CUMULATIVE
178+
is_monotonic=metric.get("is_counter", False),
179+
),
180+
)
181+
)
182+
183+
return MetricsData(
184+
resource_metrics=[
185+
ResourceMetrics(
186+
resource=self.resource,
187+
scope_metrics=[
188+
ScopeMetrics(
189+
scope=self.instrumentation_scope,
190+
metrics=otel_metrics,
191+
schema_url="",
192+
)
193+
],
194+
schema_url="",
195+
)
196+
]
197+
)
198+
199+
def _fetch_custom_metrics(self) -> list:
200+
"""Simulate fetching from a custom monitoring system."""
201+
return [
202+
{
203+
"name": "database_connections",
204+
"value": 25,
205+
"description": "Active database connections",
206+
"unit": "1",
207+
"tags": {"database": "postgres", "pool": "main"},
208+
"is_counter": False,
209+
},
210+
{
211+
"name": "api_calls_total",
212+
"value": 9876,
213+
"description": "Total API calls processed",
214+
"unit": "1",
215+
"tags": {"endpoint": "/api/v1/users", "method": "GET"},
216+
"is_counter": True,
217+
},
218+
]
219+
220+
221+
def main():
222+
"""Example usage of MetricProducer with OpenTelemetry."""
223+
224+
print("Starting MetricProducer example...")
225+
226+
# Create MetricProducers for different third-party sources
227+
prometheus_producer = PrometheusMetricProducer("http://localhost:9090")
228+
custom_producer = CustomSystemMetricProducer("monitoring-system")
229+
230+
# Create a metric reader that includes the producers
231+
exporter = ConsoleMetricExporter()
232+
reader = PeriodicExportingMetricReader(
233+
exporter=exporter,
234+
export_interval_millis=5000, # Export every 5 seconds
235+
metric_producers=[prometheus_producer, custom_producer]
236+
)
237+
238+
# IMPORTANT: Register the reader with a MeterProvider
239+
# This is required for the reader to be able to collect metrics
240+
meter_provider = MeterProvider(metric_readers=[reader])
241+
242+
print("Configured MetricReader with the following producers:")
243+
print("- PrometheusMetricProducer (simulated)")
244+
print("- CustomSystemMetricProducer (simulated)")
245+
print("\nThe reader is now registered with a MeterProvider and will collect")
246+
print("metrics from these producers every 5 seconds and export them to the console.\n")
247+
248+
# Note: You can also use the meter_provider to create meters and instruments
249+
# meter = meter_provider.get_meter("example.meter")
250+
# counter = meter.create_counter("example.counter")
251+
# counter.add(1)
252+
253+
print("=== Metrics will be collected and exported every 5 seconds ===")
254+
print("Press Ctrl+C to stop...")
255+
256+
try:
257+
# Let it run for a bit to show periodic collection
258+
time.sleep(20)
259+
except KeyboardInterrupt:
260+
print("\nStopping...")
261+
finally:
262+
# Clean shutdown
263+
meter_provider.shutdown()
264+
print("MeterProvider shut down successfully.")
265+
266+
267+
if __name__ == "__main__":
268+
main()

0 commit comments

Comments
 (0)