diff --git a/example/otel-metrics/README.md b/example/otel-metrics/README.md new file mode 100644 index 000000000..25ae8d7b2 --- /dev/null +++ b/example/otel-metrics/README.md @@ -0,0 +1,67 @@ +# OpenTelemetry Metrics Example + +This example demonstrates how to enable OpenTelemetry metrics for Redis operations using the `extra/redisotel-native` package. + +## Features + +- ✅ OTLP exporter configuration +- ✅ Periodic metric export (every 10 seconds) +- ✅ Concurrent Redis operations +- ✅ Automatic metric collection for: + - Operation duration + - Connection metrics + - Error tracking + +## Prerequisites + +- Go 1.23.0 or later +- Redis server running on `localhost:6379` +- OTLP collector running on `localhost:4317` (optional) + +## Running the Example + +```bash +# Start Redis (if not already running) +redis-server + +# Optional: Start OTLP collector +# See: https://opentelemetry.io/docs/collector/ + +# Run the example +go run main.go +``` + +## What It Does + +1. Creates an OTLP exporter that sends metrics to a collector +2. Sets up a meter provider with periodic export (every 10 seconds) +3. Initializes Redis client with OTel instrumentation +4. Executes concurrent Redis operations (SET commands) +5. Waits for metrics to be exported + +## Metrics Collected + +The example automatically collects: + +- **db.client.operation.duration** - Operation latency histogram +- **db.client.connection.create_time** - Connection creation time +- **db.client.connection.count** - Active connection count +- **db.client.errors** - Error counter with error type classification + +## Configuration + +To use with a production OTLP collector: + +```go +exporter, err := otlpmetricgrpc.New(ctx, + otlpmetricgrpc.WithEndpoint("your-collector:4317"), + otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(certPool, "")), +) +``` + +## See Also + +- [OpenTelemetry Go SDK](https://opentelemetry.io/docs/languages/go/) +- [OTLP Exporter Documentation](https://opentelemetry.io/docs/specs/otlp/) +- [Redis OTel Native Package](../../extra/redisotel-native/) + diff --git a/example/otel-metrics/go.mod b/example/otel-metrics/go.mod new file mode 100644 index 000000000..8cee8554a --- /dev/null +++ b/example/otel-metrics/go.mod @@ -0,0 +1,39 @@ +module github.com/redis/go-redis/example/otel-metrics + +go 1.23.0 + +toolchain go1.24.2 + +replace github.com/redis/go-redis/v9 => ../.. + +replace github.com/redis/go-redis/extra/redisotel-native/v9 => ../../extra/redisotel-native + +require ( + github.com/redis/go-redis/extra/redisotel-native/v9 v9.0.0-00010101000000-000000000000 + github.com/redis/go-redis/v9 v9.7.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 +) + +require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.1 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect + google.golang.org/grpc v1.75.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) diff --git a/example/otel-metrics/go.sum b/example/otel-metrics/go.sum new file mode 100644 index 000000000..f7db960cf --- /dev/null +++ b/example/otel-metrics/go.sum @@ -0,0 +1,63 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 h1:vl9obrcoWVKp/lwl8tRE33853I8Xru9HFbw/skNeLs8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0/go.mod h1:GAXRxmLJcVM3u22IjTg74zWBrRCKq8BnOqUVLodpcpw= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4= +go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= +google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= +google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/otel-metrics/main.go b/example/otel-metrics/main.go new file mode 100644 index 000000000..a1f46d6f3 --- /dev/null +++ b/example/otel-metrics/main.go @@ -0,0 +1,121 @@ +// EXAMPLE: otel_metrics +// HIDE_START +package main + +import ( + "context" + "log" + "math/rand" + "strconv" + "sync" + "time" + + redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/sdk/metric" +) + +// ExampleClient_otel_metrics demonstrates how to enable OpenTelemetry metrics +// for Redis operations and export them to an OTLP collector. +func main() { + ctx := context.Background() + + // HIDE_END + + // STEP_START otel_exporter_setup + // Create OTLP exporter that sends metrics to the collector + // Default endpoint is localhost:4317 (gRPC) + exporter, err := otlpmetricgrpc.New(ctx, + otlpmetricgrpc.WithInsecure(), // Use insecure for local development + // For production, configure TLS and authentication: + // otlpmetricgrpc.WithEndpoint("your-collector:4317"), + // otlpmetricgrpc.WithTLSCredentials(...), + ) + if err != nil { + log.Fatalf("Failed to create OTLP exporter: %v", err) + } + // STEP_END + + // STEP_START otel_meter_provider + // Create meter provider with periodic reader + // Metrics are exported every 10 seconds + meterProvider := metric.NewMeterProvider( + metric.WithReader( + metric.NewPeriodicReader(exporter, + metric.WithInterval(10*time.Second), + ), + ), + ) + defer func() { + if err := meterProvider.Shutdown(ctx); err != nil { + log.Printf("Error shutting down meter provider: %v", err) + } + }() + + // Set the global meter provider + otel.SetMeterProvider(meterProvider) + // STEP_END + + // STEP_START redis_client_setup + // Create Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer rdb.Close() + + // Initialize OTel instrumentation (uses global meter provider) + if err := redisotel.Init(rdb); err != nil { + log.Fatalf("Failed to initialize OTel: %v", err) + } + defer redisotel.Shutdown() + // STEP_END + + // STEP_START redis_operations + // Execute Redis operations - metrics are automatically collected + log.Println("Executing Redis operations...") + var wg sync.WaitGroup + wg.Add(50) + for i := range 50 { + go func(i int) { + defer wg.Done() + + for j := range 10 { + if err := rdb.Set(ctx, "key"+strconv.Itoa(i*10+j), "value", 0).Err(); err != nil { + log.Printf("Error setting key: %v", err) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(400))) + } + }(i) + } + wg.Wait() + + wg.Add(10) + for i := range 10 { + go func(i int) { + defer wg.Done() + + for j := range 10 { + if err := rdb.Set(ctx, "key"+strconv.Itoa(i*10+j), "value", 0).Err(); err != nil { + log.Printf("Error setting key: %v", err) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(400))) + } + }(i) + } + wg.Wait() + + for j := range 10 { + if err := rdb.Set(ctx, "key"+strconv.Itoa(j), "value", 0).Err(); err != nil { + log.Printf("Error setting key: %v", err) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(400))) + } + + log.Println("Operations complete. Waiting for metrics to be exported...") + + // Wait for metrics to be exported + time.Sleep(15 * time.Second) + // STEP_END +} diff --git a/extra/redisotel-native/config.go b/extra/redisotel-native/config.go new file mode 100644 index 000000000..5f95daa58 --- /dev/null +++ b/extra/redisotel-native/config.go @@ -0,0 +1,238 @@ +package redisotel + +import ( + "go.opentelemetry.io/otel/metric" +) + +type MetricGroup string + +const ( + MetricGroupCommand MetricGroup = "command" + MetricGroupConnectionBasic MetricGroup = "connection-basic" + MetricGroupResiliency MetricGroup = "resiliency" + MetricGroupConnectionAdvanced MetricGroup = "connection-advanced" + MetricGroupPubSub MetricGroup = "pubsub" + MetricGroupStream MetricGroup = "stream" +) + +type HistogramAggregation string + +const ( + HistogramAggregationExplicitBucket HistogramAggregation = "explicit_bucket_histogram" + HistogramAggregationBase2Exponential HistogramAggregation = "base2_exponential_bucket_histogram" +) + +// config holds the configuration for the instrumentation +type config struct { + // Core settings + meterProvider metric.MeterProvider + enabled bool + + // Metric group settings + enabledMetricGroups map[MetricGroup]bool + + // Command filtering + includeCommands map[string]bool // nil means include all + excludeCommands map[string]bool // nil means exclude none + + // Cardinality reduction + hidePubSubChannelNames bool + hideStreamNames bool + + // Histogram settings + histAggregation HistogramAggregation + + // Bucket configurations for different histogram metrics + bucketsOperationDuration []float64 + bucketsStreamProcessingDuration []float64 + bucketsConnectionCreateTime []float64 + bucketsConnectionWaitTime []float64 + bucketsConnectionUseTime []float64 +} + +func defaultConfig() config { + return config{ + meterProvider: nil, // Will use global otel.GetMeterProvider() if nil + enabled: false, + + // Default metric groups: connection-basic, resiliency + enabledMetricGroups: map[MetricGroup]bool{ + MetricGroupConnectionBasic: true, + MetricGroupResiliency: true, + }, + + // No command filtering by default + includeCommands: nil, + excludeCommands: nil, + + // Don't hide labels by default + hidePubSubChannelNames: false, + hideStreamNames: false, + + // Use explicit bucket histogram by default + histAggregation: HistogramAggregationExplicitBucket, + + // Default buckets for all duration metrics + bucketsOperationDuration: defaultHistogramBuckets(), + bucketsStreamProcessingDuration: defaultHistogramBuckets(), + bucketsConnectionCreateTime: defaultHistogramBuckets(), + bucketsConnectionWaitTime: defaultHistogramBuckets(), + bucketsConnectionUseTime: defaultHistogramBuckets(), + } +} + +// isMetricGroupEnabled checks if a metric group is enabled +func (c *config) isMetricGroupEnabled(group MetricGroup) bool { + return c.enabledMetricGroups[group] +} + +// isCommandIncluded checks if a command should be included in metrics +func (c *config) isCommandIncluded(command string) bool { + if c.excludeCommands != nil && c.excludeCommands[command] { + return false + } + + if c.includeCommands != nil { + return c.includeCommands[command] + } + + return true +} + +// defaultHistogramBuckets returns the default histogram buckets for all duration metrics. +// These buckets are designed to capture typical Redis operation and connection latencies: +// - Sub-millisecond: 0.0001s (0.1ms), 0.0005s (0.5ms) +// - Milliseconds: 0.001s (1ms), 0.005s (5ms), 0.01s (10ms), 0.05s (50ms), 0.1s (100ms) +// - Sub-second: 0.5s (500ms) +// - Seconds: 1s, 5s, 10s +// +// This covers the range from 0.1ms to 10s, which is suitable for: +// - db.client.operation.duration (command execution time) +// - db.client.connection.create_time (connection establishment) +// - db.client.connection.wait_time (waiting for connection from pool) +// - db.client.connection.use_time (time connection is checked out) +// - redis.client.stream.processing_duration (stream message processing) +func defaultHistogramBuckets() []float64 { + return []float64{ + 0.0001, // 0.1ms + 0.0005, // 0.5ms + 0.001, // 1ms + 0.005, // 5ms + 0.01, // 10ms + 0.05, // 50ms + 0.1, // 100ms + 0.5, // 500ms + 1.0, // 1s + 5.0, // 5s + 10.0, // 10s + } +} + +// Option is a functional option for configuring the instrumentation +type Option interface { + apply(*config) +} + +// optionFunc wraps a function to implement the Option interface +type optionFunc func(*config) + +func (f optionFunc) apply(c *config) { + f(c) +} + +// WithMeterProvider sets the meter provider to use for creating metrics. +// If not provided, the global meter provider from otel.GetMeterProvider() will be used. +func WithMeterProvider(provider metric.MeterProvider) Option { + return optionFunc(func(c *config) { + c.meterProvider = provider + }) +} + +// WithEnabled enables or disables metrics emission +func WithEnabled(enabled bool) Option { + return optionFunc(func(c *config) { + c.enabled = enabled + }) +} + +// WithEnabledMetricGroups sets which metric groups to register +// Default: ["connection-basic", "resiliency"] +func WithEnabledMetricGroups(groups []MetricGroup) Option { + return optionFunc(func(c *config) { + c.enabledMetricGroups = make(map[MetricGroup]bool) + for _, group := range groups { + c.enabledMetricGroups[group] = true + } + }) +} + +// WithIncludeCommands sets a command allow-list for metrics +// Only commands in this list will have metrics recorded +// If not set, all commands are included (unless excluded) +func WithIncludeCommands(commands []string) Option { + return optionFunc(func(c *config) { + c.includeCommands = make(map[string]bool) + for _, cmd := range commands { + c.includeCommands[cmd] = true + } + }) +} + +// WithExcludeCommands sets a command deny-list for metrics +// Commands in this list will not have metrics recorded +func WithExcludeCommands(commands []string) Option { + return optionFunc(func(c *config) { + c.excludeCommands = make(map[string]bool) + for _, cmd := range commands { + c.excludeCommands[cmd] = true + } + }) +} + +// WithHidePubSubChannelNames omits channel label from Pub/Sub metrics to reduce cardinality +func WithHidePubSubChannelNames(hide bool) Option { + return optionFunc(func(c *config) { + c.hidePubSubChannelNames = hide + }) +} + +// WithHideStreamNames omits stream label from stream metrics to reduce cardinality +func WithHideStreamNames(hide bool) Option { + return optionFunc(func(c *config) { + c.hideStreamNames = hide + }) +} + +// WithHistogramAggregation sets the histogram aggregation mode +// Controls whether bucket overrides apply +func WithHistogramAggregation(agg HistogramAggregation) Option { + return optionFunc(func(c *config) { + c.histAggregation = agg + }) +} + +// WithHistogramBuckets sets custom histogram buckets for ALL duration metrics. +// If not set, uses defaultHistogramBuckets() which covers 0.1ms to 10s. +// Buckets should be in seconds (e.g., 0.001 = 1ms, 0.1 = 100ms, 1.0 = 1s). +// +// This applies to all duration histograms: +// - db.client.operation.duration +// - db.client.connection.create_time +// - db.client.connection.wait_time +// - db.client.connection.use_time +// - redis.client.stream.processing_duration +// +// Example: +// +// redisotel.Init(rdb, +// redisotel.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 1.0}), +// ) +func WithHistogramBuckets(buckets []float64) Option { + return optionFunc(func(c *config) { + c.bucketsOperationDuration = buckets + c.bucketsStreamProcessingDuration = buckets + c.bucketsConnectionCreateTime = buckets + c.bucketsConnectionWaitTime = buckets + c.bucketsConnectionUseTime = buckets + }) +} diff --git a/extra/redisotel-native/go.mod b/extra/redisotel-native/go.mod new file mode 100644 index 000000000..f7c1a4a23 --- /dev/null +++ b/extra/redisotel-native/go.mod @@ -0,0 +1,37 @@ +module github.com/redis/go-redis/extra/redisotel-native/v9 + +go 1.23.0 + +toolchain go1.24.2 + +replace github.com/redis/go-redis/v9 => ../.. + +require ( + github.com/redis/go-redis/v9 v9.7.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/metric v1.38.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.23.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/otlptranslator v0.0.2 // indirect + github.com/prometheus/procfs v0.17.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.60.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) diff --git a/extra/redisotel-native/go.sum b/extra/redisotel-native/go.sum new file mode 100644 index 000000000..b8d091a4b --- /dev/null +++ b/extra/redisotel-native/go.sum @@ -0,0 +1,61 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= +github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc= +github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/otlptranslator v0.0.2 h1:+1CdeLVrRQ6Psmhnobldo0kTp96Rj80DRXRd5OSnMEQ= +github.com/prometheus/otlptranslator v0.0.2/go.mod h1:P8AwMgdD7XEr6QRUJ2QWLpiAZTgTE2UYgjlu3svompI= +github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= +github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/prometheus v0.60.0 h1:cGtQxGvZbnrWdC2GyjZi0PDKVSLWP/Jocix3QWfXtbo= +go.opentelemetry.io/otel/exporters/prometheus v0.60.0/go.mod h1:hkd1EekxNo69PTV4OWFGZcKQiIqg0RfuWExcPKFvepk= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extra/redisotel-native/metrics.go b/extra/redisotel-native/metrics.go new file mode 100644 index 000000000..1fddfe6a7 --- /dev/null +++ b/extra/redisotel-native/metrics.go @@ -0,0 +1,800 @@ +package redisotel + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + // Library name for redis.client.library attribute + libraryName = "go-redis" +) + +// getLibraryVersionAttr returns the redis.client.library attribute +// This is computed once and reused to avoid repeated string formatting +func getLibraryVersionAttr() attribute.KeyValue { + return attribute.String("redis.client.library", fmt.Sprintf("%s:%s", libraryName, redis.Version())) +} + +// addServerPortIfNonDefault adds server.port attribute if port is not the default (6379) +func addServerPortIfNonDefault(attrs []attribute.KeyValue, serverPort string) []attribute.KeyValue { + if serverPort != "" && serverPort != "6379" { + return append(attrs, attribute.String("server.port", serverPort)) + } + return attrs +} + +// formatPoolName formats the pool name from server address and port +func formatPoolName(serverAddr, serverPort string) string { + if serverPort != "" && serverPort != "6379" { + return fmt.Sprintf("%s:%s", serverAddr, serverPort) + } + return serverAddr +} + +// metricsRecorder implements the otel.Recorder interface +type metricsRecorder struct { + operationDuration metric.Float64Histogram + connectionCount metric.Int64UpDownCounter + connectionCreateTime metric.Float64Histogram + connectionRelaxedTimeout metric.Int64UpDownCounter + connectionHandoff metric.Int64Counter + clientErrors metric.Int64Counter + maintenanceNotifications metric.Int64Counter + + connectionWaitTime metric.Float64Histogram + connectionUseTime metric.Float64Histogram + connectionTimeouts metric.Int64Counter + connectionClosed metric.Int64Counter + connectionPendingReqs metric.Int64UpDownCounter + + pubsubMessages metric.Int64Counter + + streamLag metric.Float64Histogram + + // Configuration + cfg *config + + // Client configuration for attributes (used for operation metrics only) + serverAddr string + serverPort string + dbIndex string +} + +// RecordOperationDuration records db.client.operation.duration metric +func (r *metricsRecorder) RecordOperationDuration( + ctx context.Context, + duration time.Duration, + cmd redis.Cmder, + attempts int, + cn redis.ConnInfo, +) { + if r.operationDuration == nil { + return + } + + // Check if command should be included + if r.cfg != nil && !r.cfg.isCommandIncluded(cmd.Name()) { + return + } + + // Convert duration to seconds (OTel convention for duration metrics) + durationSeconds := duration.Seconds() + + // Build attributes + attrs := []attribute.KeyValue{ + // Required attributes + attribute.String("db.operation.name", cmd.FullName()), + getLibraryVersionAttr(), + attribute.Int("redis.client.operation.retry_attempts", attempts-1), // attempts-1 = retry count + attribute.Bool("redis.client.operation.blocking", isBlockingCommand(cmd)), + + // Recommended attributes + attribute.String("db.system.name", "redis"), + attribute.String("server.address", r.serverAddr), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, r.serverPort) + + // Add db.namespace (database index) if available + if r.dbIndex != "" { + attrs = append(attrs, attribute.String("db.namespace", r.dbIndex)) + } + + // Add network.peer.address and network.peer.port from connection + if cn != nil { + remoteAddr := cn.RemoteAddr() + if remoteAddr != nil { + peerAddr, peerPort := splitHostPort(remoteAddr.String()) + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + } + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + } + + // Add error.type if command failed + if err := cmd.Err(); err != nil { + attrs = append(attrs, attribute.String("error.type", classifyError(err))) + } + + // Add db.response.status_code if error is a Redis error + if err := cmd.Err(); err != nil { + if statusCode := extractRedisErrorPrefix(err); statusCode != "" { + attrs = append(attrs, attribute.String("db.response.status_code", statusCode)) + } + } + + // Record the histogram + r.operationDuration.Record(ctx, durationSeconds, metric.WithAttributes(attrs...)) +} + +// isBlockingCommand checks if a command is a blocking operation +// Blocking commands have a timeout parameter and include: BLPOP, BRPOP, BRPOPLPUSH, BLMOVE, +// BZPOPMIN, BZPOPMAX, BZMPOP, BLMPOP, XREAD with BLOCK, XREADGROUP with BLOCK +func isBlockingCommand(cmd redis.Cmder) bool { + name := strings.ToLower(cmd.Name()) + + // Commands that start with 'b' and are blocking + if strings.HasPrefix(name, "b") { + switch name { + case "blpop", "brpop", "brpoplpush", "blmove", "bzpopmin", "bzpopmax", "bzmpop", "blmpop": + return true + } + } + + // XREAD and XREADGROUP with BLOCK option + if name == "xread" || name == "xreadgroup" { + args := cmd.Args() + for i, arg := range args { + if argStr, ok := arg.(string); ok { + if strings.ToLower(argStr) == "block" && i+1 < len(args) { + return true + } + } + } + } + + return false +} + +// classifyError returns the error.type attribute value +// Format: :: +func classifyError(err error) string { + if err == nil { + return "" + } + + errStr := err.Error() + + // Network errors + if isNetworkError(err) { + return fmt.Sprintf("network:%s", errStr) + } + + // Timeout errors + if isTimeoutError(err) { + return "timeout" + } + + // Redis errors (start with error prefix like ERR, WRONGTYPE, etc.) + if prefix := extractRedisErrorPrefix(err); prefix != "" { + return fmt.Sprintf("redis:%s", prefix) + } + + // Generic error + return errStr +} + +// extractRedisErrorPrefix extracts the Redis error prefix (e.g., "ERR", "WRONGTYPE") +// Redis errors typically start with an uppercase prefix followed by a space +func extractRedisErrorPrefix(err error) string { + if err == nil { + return "" + } + + errStr := err.Error() + + // Redis errors typically start with an uppercase prefix + // Examples: "ERR ...", "WRONGTYPE ...", "CLUSTERDOWN ..." + parts := strings.SplitN(errStr, " ", 2) + if len(parts) > 0 { + prefix := parts[0] + // Check if it's all uppercase (Redis error convention) + if prefix == strings.ToUpper(prefix) && len(prefix) > 0 { + return prefix + } + } + + return "" +} + +// isNetworkError checks if an error is a network-related error +func isNetworkError(err error) bool { + if err == nil { + return false + } + + // Check for net.Error interface (standard way to detect network errors) + _, ok := err.(net.Error) + return ok +} + +// isTimeoutError checks if an error is a timeout error +func isTimeoutError(err error) bool { + if err == nil { + return false + } + + // Check for net.Error with Timeout() method (standard way) + if netErr, ok := err.(net.Error); ok { + return netErr.Timeout() + } + + return false +} + +// splitHostPort splits a host:port string into host and port +// This is a simplified version that handles the common cases +func splitHostPort(addr string) (host, port string) { + // Handle Unix sockets + if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "@") { + return addr, "" + } + + host, port, err := net.SplitHostPort(addr) + if err != nil { + // If split fails, return the whole address as host + return addr, "" + } + + return host, port +} + +// parseAddr parses a Redis address into host and port +func parseAddr(addr string) (host, port string) { + // Handle Unix sockets + if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix://") { + return addr, "" + } + + // Remove protocol prefix if present + addr = strings.TrimPrefix(addr, "redis://") + addr = strings.TrimPrefix(addr, "rediss://") + + host, port, err := net.SplitHostPort(addr) + if err != nil { + // No port specified, use default + return addr, "6379" + } + + return host, port +} + +// formatDBIndex formats the database index as a string +func formatDBIndex(db int) string { + if db < 0 { + return "" + } + return strconv.Itoa(db) +} + +// RecordConnectionStateChange records a change in connection state +// This is called from the pool when connections transition between states +func (r *metricsRecorder) RecordConnectionStateChange( + ctx context.Context, + cn redis.ConnInfo, + fromState, toState string, +) { + if r.connectionCount == nil { + return + } + + // Extract server address from connection + serverAddr, serverPort := extractServerInfo(cn) + + // Build base attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system", "redis"), + attribute.String("server.address", serverAddr), + } + + // Add server.port if not default + if serverPort != "" && serverPort != "6379" { + attrs = append(attrs, attribute.String("server.port", serverPort)) + } + + // Decrement old state (if not empty) + if fromState != "" { + fromAttrs := append([]attribute.KeyValue{}, attrs...) + fromAttrs = append(fromAttrs, attribute.String("state", fromState)) + r.connectionCount.Add(ctx, -1, metric.WithAttributes(fromAttrs...)) + } + + // Increment new state + if toState != "" { + toAttrs := append([]attribute.KeyValue{}, attrs...) + toAttrs = append(toAttrs, attribute.String("state", toState)) + r.connectionCount.Add(ctx, 1, metric.WithAttributes(toAttrs...)) + } +} + +// extractServerInfo extracts server address and port from connection info +// For client connections, this is the remote endpoint (server address) +func extractServerInfo(cn redis.ConnInfo) (addr, port string) { + if cn == nil { + return "", "" + } + + remoteAddr := cn.RemoteAddr() + if remoteAddr == nil { + return "", "" + } + + addrStr := remoteAddr.String() + host, portStr := parseAddr(addrStr) + return host, portStr +} + +// RecordConnectionCreateTime records the time it took to create a new connection +func (r *metricsRecorder) RecordConnectionCreateTime( + ctx context.Context, + duration time.Duration, + cn redis.ConnInfo, +) { + if r.connectionCreateTime == nil { + return + } + + // Convert duration to seconds (OTel convention) + durationSeconds := duration.Seconds() + + // Extract server address from connection + serverAddr, serverPort := extractServerInfo(cn) + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add pool name (using server.address:server.port format) + poolName := formatPoolName(serverAddr, serverPort) + attrs = append(attrs, attribute.String("db.client.connection.pool.name", poolName)) + + // Record the histogram + r.connectionCreateTime.Record(ctx, durationSeconds, metric.WithAttributes(attrs...)) +} + +// RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed +func (r *metricsRecorder) RecordConnectionRelaxedTimeout( + ctx context.Context, + delta int, + cn redis.ConnInfo, + poolName, notificationType string, +) { + if r.connectionRelaxedTimeout == nil { + return + } + + // Extract server address from connection + serverAddr, serverPort := extractServerInfo(cn) + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + attribute.String("db.client.connection.pool.name", poolName), + attribute.String("redis.client.connection.notification", notificationType), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Record the counter (delta can be +1 or -1) + r.connectionRelaxedTimeout.Add(ctx, int64(delta), metric.WithAttributes(attrs...)) +} + +// RecordConnectionHandoff records when a connection is handed off to another node +func (r *metricsRecorder) RecordConnectionHandoff( + ctx context.Context, + cn redis.ConnInfo, + poolName string, +) { + if r.connectionHandoff == nil { + return + } + + // Extract server address from connection + serverAddr, serverPort := extractServerInfo(cn) + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + attribute.String("db.client.connection.pool.name", poolName), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Record the counter + r.connectionHandoff.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordError records client errors (ASK, MOVED, handshake failures, etc.) +func (r *metricsRecorder) RecordError( + ctx context.Context, + errorType string, + cn redis.ConnInfo, + statusCode string, + isInternal bool, + retryAttempts int, +) { + if r.clientErrors == nil { + return + } + + // Extract server address and peer address from connection (may be nil for some errors) + // For client connections, peer address is the same as server address (remote endpoint) + var serverAddr, serverPort, peerAddr, peerPort string + if cn != nil { + serverAddr, serverPort = extractServerInfo(cn) + peerAddr, peerPort = serverAddr, serverPort // Peer is same as server for client connections + } + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("error.type", errorType), + attribute.String("db.response.status_code", statusCode), + attribute.Bool("redis.client.errors.internal", isInternal), + attribute.Int("redis.client.operation.retry_attempts", retryAttempts), + getLibraryVersionAttr(), + } + + // Add server info if available + if serverAddr != "" { + attrs = append(attrs, attribute.String("server.address", serverAddr)) + attrs = addServerPortIfNonDefault(attrs, serverPort) + } + + // Add peer info if available + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the counter + r.clientErrors.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordMaintenanceNotification records when a maintenance notification is received +func (r *metricsRecorder) RecordMaintenanceNotification( + ctx context.Context, + cn redis.ConnInfo, + notificationType string, +) { + if r.maintenanceNotifications == nil { + return + } + + // Extract server address and peer address from connection + // For client connections, peer address is the same as server address (remote endpoint) + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort // Peer is same as server for client connections + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + attribute.String("redis.client.connection.notification", notificationType), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info if available + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the counter + r.maintenanceNotifications.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordConnectionWaitTime records db.client.connection.wait_time metric +func (r *metricsRecorder) RecordConnectionWaitTime( + ctx context.Context, + duration time.Duration, + cn redis.ConnInfo, +) { + if r.connectionWaitTime == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the histogram (duration in seconds) + r.connectionWaitTime.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} + +// RecordConnectionUseTime records db.client.connection.use_time metric +func (r *metricsRecorder) RecordConnectionUseTime( + ctx context.Context, + duration time.Duration, + cn redis.ConnInfo, +) { + if r.connectionUseTime == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the histogram (duration in seconds) + r.connectionUseTime.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...)) +} + +// RecordConnectionTimeout records db.client.connection.timeouts metric +func (r *metricsRecorder) RecordConnectionTimeout( + ctx context.Context, + cn redis.ConnInfo, + timeoutType string, +) { + if r.connectionTimeouts == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + attribute.String("redis.client.connection.timeout_type", timeoutType), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the counter + r.connectionTimeouts.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordConnectionClosed records redis.client.connection.closed metric +func (r *metricsRecorder) RecordConnectionClosed( + ctx context.Context, + cn redis.ConnInfo, + reason string, +) { + if r.connectionClosed == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + attribute.String("redis.client.connection.close_reason", reason), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the counter + r.connectionClosed.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordConnectionPendingRequests records db.client.connection.pending_requests metric +func (r *metricsRecorder) RecordConnectionPendingRequests( + ctx context.Context, + delta int, + cn redis.ConnInfo, +) { + if r.connectionPendingReqs == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + getLibraryVersionAttr(), + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the up/down counter + r.connectionPendingReqs.Add(ctx, int64(delta), metric.WithAttributes(attrs...)) +} + +// RecordPubSubMessage records redis.client.pubsub.messages metric +func (r *metricsRecorder) RecordPubSubMessage( + ctx context.Context, + cn redis.ConnInfo, + direction string, + channel string, + sharded bool, +) { + if r.pubsubMessages == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + attribute.String("redis.client.pubsub.direction", direction), // "sent" or "received" + attribute.Bool("redis.client.pubsub.sharded", sharded), + getLibraryVersionAttr(), + } + + // Add channel name if not hidden for cardinality reduction + if !r.cfg.hidePubSubChannelNames && channel != "" { + attrs = append(attrs, attribute.String("redis.client.pubsub.channel", channel)) + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the counter + r.pubsubMessages.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// RecordStreamLag records redis.client.stream.lag metric +func (r *metricsRecorder) RecordStreamLag( + ctx context.Context, + lag time.Duration, + cn redis.ConnInfo, + streamName string, + consumerGroup string, + consumerName string, +) { + if r.streamLag == nil { + return + } + + // Extract server address and peer address from connection + serverAddr, serverPort := extractServerInfo(cn) + peerAddr, peerPort := serverAddr, serverPort + + // Build attributes + attrs := []attribute.KeyValue{ + attribute.String("db.system.name", "redis"), + attribute.String("server.address", serverAddr), + attribute.String("redis.client.stream.consumer_group", consumerGroup), + attribute.String("redis.client.stream.consumer_name", consumerName), + getLibraryVersionAttr(), + } + + // Add stream name if not hidden for cardinality reduction + if !r.cfg.hideStreamNames && streamName != "" { + attrs = append(attrs, attribute.String("redis.client.stream.name", streamName)) + } + + // Add server.port if not default + attrs = addServerPortIfNonDefault(attrs, serverPort) + + // Add peer info + if peerAddr != "" { + attrs = append(attrs, attribute.String("network.peer.address", peerAddr)) + if peerPort != "" { + attrs = append(attrs, attribute.String("network.peer.port", peerPort)) + } + } + + // Record the histogram (lag in seconds) + r.streamLag.Record(ctx, lag.Seconds(), metric.WithAttributes(attrs...)) +} diff --git a/extra/redisotel-native/redisotel.go b/extra/redisotel-native/redisotel.go new file mode 100644 index 000000000..95de1e002 --- /dev/null +++ b/extra/redisotel-native/redisotel.go @@ -0,0 +1,407 @@ +// Package redisotel provides native OpenTelemetry instrumentation for go-redis. +// +// This package implements the OpenTelemetry Semantic Conventions for database clients, +// providing metrics, traces, and logs for Redis operations. +// +// Basic Usage (with global MeterProvider): +// +// import ( +// "github.com/redis/go-redis/v9" +// redisotel "github.com/redis/go-redis/extra/redisotel-native/v9" +// "go.opentelemetry.io/otel" +// ) +// +// func main() { +// // Initialize OpenTelemetry globally (meter provider, etc.) +// otel.SetMeterProvider(myMeterProvider) +// +// // Create Redis client +// rdb := redis.NewClient(&redis.Options{ +// Addr: "localhost:6379", +// DB: 0, +// }) +// +// // Initialize native OTel instrumentation (uses global MeterProvider) +// if err := redisotel.Init(rdb); err != nil { +// panic(err) +// } +// +// // Use the client normally - metrics are automatically recorded +// rdb.Set(ctx, "key", "value", 0) +// } +// +// Advanced Usage (with custom MeterProvider): +// +// // Pass a custom MeterProvider +// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { +// panic(err) +// } +package redisotel + +import ( + "fmt" + "sync" + + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +var ( + // Global singleton instance + globalInstance *metricsRecorder + globalInstanceOnce sync.Once + initErr error +) + +// Init initializes native OpenTelemetry instrumentation for the given Redis client. +// This function should be called once per application, typically during startup. +// Subsequent calls are no-ops and return nil. +// +// The function extracts configuration from the client (server address, port, database index) +// and registers a global metrics recorder. +// +// If no MeterProvider is provided via WithMeterProvider option, the global MeterProvider +// from otel.GetMeterProvider() will be used. Make sure to call otel.SetMeterProvider() +// before calling Init() if you want to use a custom provider. +// +// Example (using global MeterProvider): +// +// otel.SetMeterProvider(myMeterProvider) +// rdb := redis.NewClient(&redis.Options{ +// Addr: "localhost:6379", +// DB: 0, +// }) +// if err := redisotel.Init(rdb); err != nil { +// log.Fatal(err) +// } +// +// Example (using custom MeterProvider): +// +// if err := redisotel.Init(rdb, redisotel.WithMeterProvider(customProvider)); err != nil { +// log.Fatal(err) +// } +func Init(client redis.UniversalClient, opts ...Option) error { + globalInstanceOnce.Do(func() { + initErr = initOnce(client, opts...) + }) + return initErr +} + +// initOnce performs the actual initialization (called once by sync.Once) +func initOnce(client redis.UniversalClient, opts ...Option) error { + // Apply options + cfg := defaultConfig() + for _, opt := range opts { + opt.apply(&cfg) + } + + // Extract client configuration + serverAddr, serverPort, dbIndex, err := extractClientConfig(client) + if err != nil { + return fmt.Errorf("failed to extract client config: %w", err) + } + + // Get meter provider (use global if not provided) + meterProvider := cfg.meterProvider + if meterProvider == nil { + meterProvider = otel.GetMeterProvider() + } + + // Create meter + meter := meterProvider.Meter( + "github.com/redis/go-redis", + metric.WithInstrumentationVersion(redis.Version()), + ) + + var operationDuration metric.Float64Histogram + if cfg.isMetricGroupEnabled(MetricGroupCommand) { + var operationDurationOpts []metric.Float64HistogramOption + operationDurationOpts = append(operationDurationOpts, + metric.WithDescription("Duration of database client operations"), + metric.WithUnit("s"), + ) + if cfg.histAggregation == HistogramAggregationExplicitBucket { + operationDurationOpts = append(operationDurationOpts, + metric.WithExplicitBucketBoundaries(cfg.bucketsOperationDuration...), + ) + } + operationDuration, err = meter.Float64Histogram( + "db.client.operation.duration", + operationDurationOpts..., + ) + if err != nil { + return fmt.Errorf("failed to create operation duration histogram: %w", err) + } + } + + var connectionCount metric.Int64UpDownCounter + var connectionCreateTime metric.Float64Histogram + + if cfg.isMetricGroupEnabled(MetricGroupConnectionBasic) { + // Create synchronous UpDownCounter for connection count + connectionCount, err = meter.Int64UpDownCounter( + "db.client.connection.count", + metric.WithDescription("The number of connections that are currently in state described by the state attribute"), + metric.WithUnit("{connection}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection count metric: %w", err) + } + + // Create histogram for connection creation time + var connectionCreateTimeOpts []metric.Float64HistogramOption + connectionCreateTimeOpts = append(connectionCreateTimeOpts, + metric.WithDescription("The time it took to create a new connection"), + metric.WithUnit("s"), + ) + if cfg.histAggregation == HistogramAggregationExplicitBucket { + connectionCreateTimeOpts = append(connectionCreateTimeOpts, + metric.WithExplicitBucketBoundaries(cfg.bucketsConnectionCreateTime...), + ) + } + connectionCreateTime, err = meter.Float64Histogram( + "db.client.connection.create_time", + connectionCreateTimeOpts..., + ) + if err != nil { + return fmt.Errorf("failed to create connection create time histogram: %w", err) + } + } + + var connectionRelaxedTimeout metric.Int64UpDownCounter + var connectionHandoff metric.Int64Counter + var clientErrors metric.Int64Counter + var maintenanceNotifications metric.Int64Counter + + if cfg.isMetricGroupEnabled(MetricGroupResiliency) { + // Create UpDownCounter for relaxed timeout tracking + connectionRelaxedTimeout, err = meter.Int64UpDownCounter( + "redis.client.connection.relaxed_timeout", + metric.WithDescription("How many times the connection timeout has been increased/decreased (after a server maintenance notification)"), + metric.WithUnit("{relaxation}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection relaxed timeout metric: %w", err) + } + + // Create Counter for connection handoffs + connectionHandoff, err = meter.Int64Counter( + "redis.client.connection.handoff", + metric.WithDescription("Connections that have been handed off to another node (e.g after a MOVING notification)"), + ) + if err != nil { + return fmt.Errorf("failed to create connection handoff metric: %w", err) + } + + // Create Counter for client errors + clientErrors, err = meter.Int64Counter( + "redis.client.errors", + metric.WithDescription("Number of errors handled by the Redis client"), + metric.WithUnit("{error}"), + ) + if err != nil { + return fmt.Errorf("failed to create client errors metric: %w", err) + } + + // Create Counter for maintenance notifications + maintenanceNotifications, err = meter.Int64Counter( + "redis.client.maintenance.notifications", + metric.WithDescription("Number of maintenance notifications received"), + metric.WithUnit("{notification}"), + ) + if err != nil { + return fmt.Errorf("failed to create maintenance notifications metric: %w", err) + } + } + + var connectionWaitTime metric.Float64Histogram + var connectionUseTime metric.Float64Histogram + var connectionTimeouts metric.Int64Counter + var connectionClosed metric.Int64Counter + var connectionPendingReqs metric.Int64UpDownCounter + + if cfg.isMetricGroupEnabled(MetricGroupConnectionAdvanced) { + // Create histogram for connection wait time + var connectionWaitTimeOpts []metric.Float64HistogramOption + connectionWaitTimeOpts = append(connectionWaitTimeOpts, + metric.WithDescription("The time it took to obtain a connection from the pool"), + metric.WithUnit("s"), + ) + if cfg.histAggregation == HistogramAggregationExplicitBucket { + connectionWaitTimeOpts = append(connectionWaitTimeOpts, + metric.WithExplicitBucketBoundaries(cfg.bucketsConnectionWaitTime...), + ) + } + connectionWaitTime, err = meter.Float64Histogram( + "db.client.connection.wait_time", + connectionWaitTimeOpts..., + ) + if err != nil { + return fmt.Errorf("failed to create connection wait time histogram: %w", err) + } + + // Create histogram for connection use time + var connectionUseTimeOpts []metric.Float64HistogramOption + connectionUseTimeOpts = append(connectionUseTimeOpts, + metric.WithDescription("The time between borrowing a connection and returning it to the pool"), + metric.WithUnit("s"), + ) + if cfg.histAggregation == HistogramAggregationExplicitBucket { + connectionUseTimeOpts = append(connectionUseTimeOpts, + metric.WithExplicitBucketBoundaries(cfg.bucketsConnectionUseTime...), + ) + } + connectionUseTime, err = meter.Float64Histogram( + "db.client.connection.use_time", + connectionUseTimeOpts..., + ) + if err != nil { + return fmt.Errorf("failed to create connection use time histogram: %w", err) + } + + // Create counter for connection timeouts + connectionTimeouts, err = meter.Int64Counter( + "db.client.connection.timeouts", + metric.WithDescription("The number of connection timeouts that have occurred"), + metric.WithUnit("{timeout}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection timeouts metric: %w", err) + } + + // Create counter for closed connections + connectionClosed, err = meter.Int64Counter( + "redis.client.connection.closed", + metric.WithDescription("The number of connections that have been closed"), + metric.WithUnit("{connection}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection closed metric: %w", err) + } + + // Create up/down counter for pending requests + connectionPendingReqs, err = meter.Int64UpDownCounter( + "db.client.connection.pending_requests", + metric.WithDescription("The number of pending requests waiting for a connection"), + metric.WithUnit("{request}"), + ) + if err != nil { + return fmt.Errorf("failed to create connection pending requests metric: %w", err) + } + } + + var pubsubMessages metric.Int64Counter + + if cfg.isMetricGroupEnabled(MetricGroupPubSub) { + // Create counter for Pub/Sub messages + pubsubMessages, err = meter.Int64Counter( + "redis.client.pubsub.messages", + metric.WithDescription("The number of Pub/Sub messages sent or received"), + metric.WithUnit("{message}"), + ) + if err != nil { + return fmt.Errorf("failed to create Pub/Sub messages metric: %w", err) + } + } + + var streamLag metric.Float64Histogram + + if cfg.isMetricGroupEnabled(MetricGroupStream) { + // Create histogram for stream lag + var streamLagOpts []metric.Float64HistogramOption + streamLagOpts = append(streamLagOpts, + metric.WithDescription("The lag between message creation and consumption in a stream consumer group"), + metric.WithUnit("s"), + ) + if cfg.histAggregation == HistogramAggregationExplicitBucket { + streamLagOpts = append(streamLagOpts, + metric.WithExplicitBucketBoundaries(cfg.bucketsStreamProcessingDuration...), + ) + } + streamLag, err = meter.Float64Histogram( + "redis.client.stream.lag", + streamLagOpts..., + ) + if err != nil { + return fmt.Errorf("failed to create stream lag histogram: %w", err) + } + } + + // Create recorder + recorder := &metricsRecorder{ + operationDuration: operationDuration, + connectionCount: connectionCount, + connectionCreateTime: connectionCreateTime, + connectionRelaxedTimeout: connectionRelaxedTimeout, + connectionHandoff: connectionHandoff, + clientErrors: clientErrors, + maintenanceNotifications: maintenanceNotifications, + + connectionWaitTime: connectionWaitTime, + connectionUseTime: connectionUseTime, + connectionTimeouts: connectionTimeouts, + connectionClosed: connectionClosed, + connectionPendingReqs: connectionPendingReqs, + + pubsubMessages: pubsubMessages, + + streamLag: streamLag, + + // Configuration and client info + cfg: &cfg, + serverAddr: serverAddr, + serverPort: serverPort, + dbIndex: dbIndex, + } + + // Register global recorder + redis.SetOTelRecorder(recorder) + globalInstance = recorder + + return nil +} + +// extractClientConfig extracts server address, port, and database index from a Redis client +func extractClientConfig(client redis.UniversalClient) (serverAddr, serverPort, dbIndex string, err error) { + switch c := client.(type) { + case *redis.Client: + opts := c.Options() + host, port := parseAddr(opts.Addr) + return host, port, formatDBIndex(opts.DB), nil + + case *redis.ClusterClient: + opts := c.Options() + if len(opts.Addrs) > 0 { + // Use first address for server.address attribute + host, port := parseAddr(opts.Addrs[0]) + return host, port, "", nil + } + return "", "", "", fmt.Errorf("cluster client has no addresses") + + case *redis.Ring: + opts := c.Options() + if len(opts.Addrs) > 0 { + // Use first address for server.address attribute + for _, addr := range opts.Addrs { + host, port := parseAddr(addr) + return host, port, formatDBIndex(opts.DB), nil + } + } + return "", "", "", fmt.Errorf("ring client has no addresses") + + default: + return "", "", "", fmt.Errorf("unsupported client type: %T", client) + } +} + +// Shutdown cleans up resources (for testing purposes) +func Shutdown() { + if globalInstance != nil { + redis.SetOTelRecorder(nil) + globalInstance = nil + } + // Reset the sync.Once so Init can be called again (useful for tests) + globalInstanceOnce = sync.Once{} + initErr = nil +} diff --git a/internal/otel/metrics.go b/internal/otel/metrics.go new file mode 100644 index 000000000..33a22b859 --- /dev/null +++ b/internal/otel/metrics.go @@ -0,0 +1,214 @@ +package otel + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9/internal/pool" +) + +// Cmder is a minimal interface for command information needed for metrics. +// This avoids circular dependencies with the main redis package. +type Cmder interface { + Name() string + FullName() string + Args() []interface{} + Err() error +} + +// Recorder is the interface for recording metrics. +// Implementations are provided by extra/redisotel-native package. +type Recorder interface { + // RecordOperationDuration records the total operation duration (including all retries) + RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) + + // RecordConnectionStateChange records when a connection changes state + RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) + + // RecordConnectionCreateTime records the time it took to create a new connection + RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) + + // RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed + // delta: +1 for relaxed, -1 for unrelaxed + // poolName: name of the connection pool (e.g., "main", "pubsub") + // notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING") + RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) + + // RecordConnectionHandoff records when a connection is handed off to another node + // poolName: name of the connection pool (e.g., "main", "pubsub") + RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string) + + // RecordError records client errors (ASK, MOVED, handshake failures, etc.) + // errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED") + // statusCode: Redis response status code if available (e.g., "MOVED", "ASK") + // isInternal: whether this is an internal error + // retryAttempts: number of retry attempts made + RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) + + // RecordMaintenanceNotification records when a maintenance notification is received + // notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.) + RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string) + + // RecordConnectionWaitTime records the time spent waiting for a connection from the pool + RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn *pool.Conn) + + // RecordConnectionUseTime records the time a connection was checked out from the pool + RecordConnectionUseTime(ctx context.Context, duration time.Duration, cn *pool.Conn) + + // RecordConnectionTimeout records when a connection timeout occurs + // timeoutType: "pool" for pool timeout, "read" for read timeout, "write" for write timeout + RecordConnectionTimeout(ctx context.Context, cn *pool.Conn, timeoutType string) + + // RecordConnectionClosed records when a connection is closed + // reason: reason for closing (e.g., "idle", "max_lifetime", "error", "pool_closed") + RecordConnectionClosed(ctx context.Context, cn *pool.Conn, reason string) + + // RecordConnectionPendingRequests records changes in pending requests count + // delta: +1 when request starts, -1 when request completes + RecordConnectionPendingRequests(ctx context.Context, delta int, cn *pool.Conn) + + // RecordPubSubMessage records a Pub/Sub message + // direction: "sent" or "received" + // channel: channel name (may be hidden for cardinality reduction) + // sharded: true for sharded pub/sub (SPUBLISH/SSUBSCRIBE) + RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) + + // RecordStreamLag records the lag for stream consumer group processing + // lag: time difference between message creation and consumption + // streamName: name of the stream (may be hidden for cardinality reduction) + // consumerGroup: name of the consumer group + // consumerName: name of the consumer + RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) +} + +// Global recorder instance (initialized by extra/redisotel-native) +var globalRecorder Recorder = noopRecorder{} + +// SetGlobalRecorder sets the global recorder (called by Init() in extra/redisotel-native) +func SetGlobalRecorder(r Recorder) { + if r == nil { + globalRecorder = noopRecorder{} + // Unregister pool callbacks + pool.SetConnectionStateChangeCallback(nil) + pool.SetConnectionCreateTimeCallback(nil) + pool.SetConnectionRelaxedTimeoutCallback(nil) + pool.SetConnectionHandoffCallback(nil) + pool.SetErrorCallback(nil) + pool.SetMaintenanceNotificationCallback(nil) + pool.SetConnectionWaitTimeCallback(nil) + pool.SetConnectionUseTimeCallback(nil) + pool.SetConnectionTimeoutCallback(nil) + pool.SetConnectionClosedCallback(nil) + pool.SetConnectionPendingRequestsCallback(nil) + return + } + globalRecorder = r + + // Register pool callback to forward state changes to recorder + pool.SetConnectionStateChangeCallback(func(ctx context.Context, cn *pool.Conn, fromState, toState string) { + globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) + }) + + // Register pool callback to forward connection creation time to recorder + pool.SetConnectionCreateTimeCallback(func(ctx context.Context, duration time.Duration, cn *pool.Conn) { + globalRecorder.RecordConnectionCreateTime(ctx, duration, cn) + }) + + // Register pool callback to forward connection relaxed timeout changes to recorder + pool.SetConnectionRelaxedTimeoutCallback(func(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) { + globalRecorder.RecordConnectionRelaxedTimeout(ctx, delta, cn, poolName, notificationType) + }) + + // Register pool callback to forward connection handoffs to recorder + pool.SetConnectionHandoffCallback(func(ctx context.Context, cn *pool.Conn, poolName string) { + globalRecorder.RecordConnectionHandoff(ctx, cn, poolName) + }) + + // Register pool callback to forward errors to recorder + pool.SetErrorCallback(func(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) { + globalRecorder.RecordError(ctx, errorType, cn, statusCode, isInternal, retryAttempts) + }) + + // Register pool callback to forward maintenance notifications to recorder + pool.SetMaintenanceNotificationCallback(func(ctx context.Context, cn *pool.Conn, notificationType string) { + globalRecorder.RecordMaintenanceNotification(ctx, cn, notificationType) + }) + + // Register pool callback to forward connection wait time to recorder + pool.SetConnectionWaitTimeCallback(func(ctx context.Context, duration time.Duration, cn *pool.Conn) { + globalRecorder.RecordConnectionWaitTime(ctx, duration, cn) + }) + + // Register pool callback to forward connection use time to recorder + pool.SetConnectionUseTimeCallback(func(ctx context.Context, duration time.Duration, cn *pool.Conn) { + globalRecorder.RecordConnectionUseTime(ctx, duration, cn) + }) + + // Register pool callback to forward connection timeouts to recorder + pool.SetConnectionTimeoutCallback(func(ctx context.Context, cn *pool.Conn, timeoutType string) { + globalRecorder.RecordConnectionTimeout(ctx, cn, timeoutType) + }) + + // Register pool callback to forward connection closed to recorder + pool.SetConnectionClosedCallback(func(ctx context.Context, cn *pool.Conn, reason string) { + globalRecorder.RecordConnectionClosed(ctx, cn, reason) + }) + + // Register pool callback to forward connection pending requests to recorder + pool.SetConnectionPendingRequestsCallback(func(ctx context.Context, delta int, cn *pool.Conn) { + globalRecorder.RecordConnectionPendingRequests(ctx, delta, cn) + }) +} + +// RecordOperationDuration records the total operation duration. +// This is called from redis.go after command execution completes. +func RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn *pool.Conn) { + globalRecorder.RecordOperationDuration(ctx, duration, cmd, attempts, cn) +} + +// RecordConnectionStateChange records when a connection changes state. +// This is called from pool.go when connections transition between states. +func RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { + globalRecorder.RecordConnectionStateChange(ctx, cn, fromState, toState) +} + +// RecordConnectionCreateTime records the time it took to create a new connection. +// This is called from pool.go when a new connection is successfully created. +func RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) { + globalRecorder.RecordConnectionCreateTime(ctx, duration, cn) +} + +// RecordPubSubMessage records a Pub/Sub message sent or received. +// This is called from pubsub.go when messages are sent or received. +func RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) { + globalRecorder.RecordPubSubMessage(ctx, cn, direction, channel, sharded) +} + +// RecordStreamLag records the lag between message creation and consumption in a stream. +// This is called from stream_commands.go when processing stream messages. +func RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) { + globalRecorder.RecordStreamLag(ctx, lag, cn, streamName, consumerGroup, consumerName) +} + +// noopRecorder is a no-op implementation (zero overhead when metrics disabled) +type noopRecorder struct{} + +func (noopRecorder) RecordOperationDuration(context.Context, time.Duration, Cmder, int, *pool.Conn) {} +func (noopRecorder) RecordConnectionStateChange(context.Context, *pool.Conn, string, string) {} +func (noopRecorder) RecordConnectionCreateTime(context.Context, time.Duration, *pool.Conn) {} +func (noopRecorder) RecordConnectionRelaxedTimeout(context.Context, int, *pool.Conn, string, string) { +} +func (noopRecorder) RecordConnectionHandoff(context.Context, *pool.Conn, string) {} +func (noopRecorder) RecordError(context.Context, string, *pool.Conn, string, bool, int) {} +func (noopRecorder) RecordMaintenanceNotification(context.Context, *pool.Conn, string) {} + +func (noopRecorder) RecordConnectionWaitTime(context.Context, time.Duration, *pool.Conn) {} +func (noopRecorder) RecordConnectionUseTime(context.Context, time.Duration, *pool.Conn) {} +func (noopRecorder) RecordConnectionTimeout(context.Context, *pool.Conn, string) {} +func (noopRecorder) RecordConnectionClosed(context.Context, *pool.Conn, string) {} +func (noopRecorder) RecordConnectionPendingRequests(context.Context, int, *pool.Conn) {} + +func (noopRecorder) RecordPubSubMessage(context.Context, *pool.Conn, string, string, bool) {} + +func (noopRecorder) RecordStreamLag(context.Context, time.Duration, *pool.Conn, string, string, string) { +} diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 95d83bfde..e9c0c1c0d 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -69,8 +69,9 @@ type Conn struct { // Connection identifier for unique tracking id uint64 - usedAt atomic.Int64 - lastPutAt atomic.Int64 + usedAt atomic.Int64 + lastPutAt atomic.Int64 + checkoutAt atomic.Int64 // Time when connection was checked out from pool (for use_time metric) // Lock-free netConn access using atomic.Value // Contains *atomicNetConn wrapper, accessed atomically for better performance @@ -418,6 +419,8 @@ func (cn *Conn) IsPubSub() bool { // SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades. // These timeouts will be used for all subsequent commands until the deadline expires. // Uses atomic operations for lock-free access. +// Note: Metrics should be recorded by the caller (notification handler) which has context about +// the notification type and pool name. func (cn *Conn) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) { cn.relaxedCounter.Add(1) cn.relaxedReadTimeoutNs.Store(int64(readTimeout)) @@ -452,6 +455,11 @@ func (cn *Conn) clearRelaxedTimeout() { cn.relaxedWriteTimeoutNs.Store(0) cn.relaxedDeadlineNs.Store(0) cn.relaxedCounter.Store(0) + + // Note: Metrics for timeout unrelaxing are not recorded here because we don't have + // context about which notification type or pool triggered the relaxation. + // In practice, relaxed timeouts expire automatically via deadline, so explicit + // unrelaxing metrics are less critical than the initial relaxation metrics. } // HasRelaxedTimeout returns true if relaxed timeouts are currently active on this connection. diff --git a/internal/pool/pool.go b/internal/pool/pool.go index d757d1f4f..271baa646 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -32,6 +32,47 @@ var ( // errConnNotPooled is returned when trying to return a non-pooled connection to the pool. errConnNotPooled = errors.New("connection not pooled") + // Global callback for connection state changes (set by otel package) + connectionStateChangeCallback func(ctx context.Context, cn *Conn, fromState, toState string) + + // Global callback for connection creation time (set by otel package) + connectionCreateTimeCallback func(ctx context.Context, duration time.Duration, cn *Conn) + + // Global callback for connection relaxed timeout changes (set by otel package) + // Parameters: ctx, delta (+1/-1), cn, poolName, notificationType + connectionRelaxedTimeoutCallback func(ctx context.Context, delta int, cn *Conn, poolName, notificationType string) + + // Global callback for connection handoff (set by otel package) + // Parameters: ctx, cn, poolName + connectionHandoffCallback func(ctx context.Context, cn *Conn, poolName string) + + // Global callback for error tracking (set by otel package) + // Parameters: ctx, errorType, cn, statusCode, isInternal, retryAttempts + errorCallback func(ctx context.Context, errorType string, cn *Conn, statusCode string, isInternal bool, retryAttempts int) + + // Global callback for maintenance notifications (set by otel package) + // Parameters: ctx, cn, notificationType + maintenanceNotificationCallback func(ctx context.Context, cn *Conn, notificationType string) + + // Global callback for connection wait time (set by otel package) + // Parameters: ctx, duration, cn + connectionWaitTimeCallback func(ctx context.Context, duration time.Duration, cn *Conn) + + // Global callback for connection use time (set by otel package) + // Parameters: ctx, duration, cn + connectionUseTimeCallback func(ctx context.Context, duration time.Duration, cn *Conn) + + // Global callback for connection timeouts (set by otel package) + // Parameters: ctx, cn, timeoutType + connectionTimeoutCallback func(ctx context.Context, cn *Conn, timeoutType string) + + // Global callback for connection closed (set by otel package) + // Parameters: ctx, cn, reason + connectionClosedCallback func(ctx context.Context, cn *Conn, reason string) + + // Global callback for connection pending requests (set by otel package) + // Parameters: ctx, delta, cn + connectionPendingRequestsCallback func(ctx context.Context, delta int, cn *Conn) // popAttempts is the maximum number of attempts to find a usable connection // when popping from the idle connection pool. This handles cases where connections @@ -51,6 +92,96 @@ var ( noExpiration = maxTime ) +// SetConnectionStateChangeCallback sets the global callback for connection state changes. +// This is called by the otel package to register metrics recording. +func SetConnectionStateChangeCallback(fn func(ctx context.Context, cn *Conn, fromState, toState string)) { + connectionStateChangeCallback = fn +} + +// SetConnectionCreateTimeCallback sets the global callback for connection creation time. +// This is called by the otel package to register metrics recording. +func SetConnectionCreateTimeCallback(fn func(ctx context.Context, duration time.Duration, cn *Conn)) { + connectionCreateTimeCallback = fn +} + +// SetConnectionRelaxedTimeoutCallback sets the global callback for connection relaxed timeout changes. +// This is called by the otel package to register metrics recording. +func SetConnectionRelaxedTimeoutCallback(fn func(ctx context.Context, delta int, cn *Conn, poolName, notificationType string)) { + connectionRelaxedTimeoutCallback = fn +} + +// GetConnectionRelaxedTimeoutCallback returns the global callback for connection relaxed timeout changes. +// This is used by maintnotifications to record relaxed timeout metrics. +func GetConnectionRelaxedTimeoutCallback() func(ctx context.Context, delta int, cn *Conn, poolName, notificationType string) { + return connectionRelaxedTimeoutCallback +} + +// SetConnectionHandoffCallback sets the global callback for connection handoffs. +// This is called by the otel package to register metrics recording. +func SetConnectionHandoffCallback(fn func(ctx context.Context, cn *Conn, poolName string)) { + connectionHandoffCallback = fn +} + +// GetConnectionHandoffCallback returns the global callback for connection handoffs. +// This is used by maintnotifications to record handoff metrics. +func GetConnectionHandoffCallback() func(ctx context.Context, cn *Conn, poolName string) { + return connectionHandoffCallback +} + +// SetErrorCallback sets the global callback for error tracking. +// This is called by the otel package to register metrics recording. +func SetErrorCallback(fn func(ctx context.Context, errorType string, cn *Conn, statusCode string, isInternal bool, retryAttempts int)) { + errorCallback = fn +} + +// GetErrorCallback returns the global callback for error tracking. +// This is used by cluster and client code to record error metrics. +func GetErrorCallback() func(ctx context.Context, errorType string, cn *Conn, statusCode string, isInternal bool, retryAttempts int) { + return errorCallback +} + +// SetMaintenanceNotificationCallback sets the global callback for maintenance notifications. +// This is called by the otel package to register metrics recording. +func SetMaintenanceNotificationCallback(fn func(ctx context.Context, cn *Conn, notificationType string)) { + maintenanceNotificationCallback = fn +} + +// GetMaintenanceNotificationCallback returns the global callback for maintenance notifications. +// This is used by maintnotifications to record notification metrics. +func GetMaintenanceNotificationCallback() func(ctx context.Context, cn *Conn, notificationType string) { + return maintenanceNotificationCallback +} + +// SetConnectionWaitTimeCallback sets the global callback for connection wait time. +// This is called by the otel package to register metrics recording. +func SetConnectionWaitTimeCallback(fn func(ctx context.Context, duration time.Duration, cn *Conn)) { + connectionWaitTimeCallback = fn +} + +// SetConnectionUseTimeCallback sets the global callback for connection use time. +// This is called by the otel package to register metrics recording. +func SetConnectionUseTimeCallback(fn func(ctx context.Context, duration time.Duration, cn *Conn)) { + connectionUseTimeCallback = fn +} + +// SetConnectionTimeoutCallback sets the global callback for connection timeouts. +// This is called by the otel package to register metrics recording. +func SetConnectionTimeoutCallback(fn func(ctx context.Context, cn *Conn, timeoutType string)) { + connectionTimeoutCallback = fn +} + +// SetConnectionClosedCallback sets the global callback for connection closed. +// This is called by the otel package to register metrics recording. +func SetConnectionClosedCallback(fn func(ctx context.Context, cn *Conn, reason string)) { + connectionClosedCallback = fn +} + +// SetConnectionPendingRequestsCallback sets the global callback for connection pending requests. +// This is called by the otel package to register metrics recording. +func SetConnectionPendingRequestsCallback(fn func(ctx context.Context, delta int, cn *Conn)) { + connectionPendingRequestsCallback = fn +} + // Stats contains pool state information and accumulated stats. type Stats struct { Hits uint32 // number of times free connection was found in the pool @@ -359,10 +490,18 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { } } + // Notify metrics: new connection created and idle + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "", "idle") + } + return cn, nil } func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { + // Start measuring connection creation time + startTime := time.Now() + if p.closed() { return nil, ErrClosed } @@ -413,6 +552,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { cn.expiresAt = noExpiration } + // Record connection creation time + if connectionCreateTimeCallback != nil { + duration := time.Since(startTime) + connectionCreateTimeCallback(ctx, duration, cn) + } + return cn, nil } @@ -474,9 +619,27 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { return nil, ErrClosed } + // Track pending requests + if connectionPendingRequestsCallback != nil { + connectionPendingRequestsCallback(ctx, 1, nil) + defer func() { + if err != nil { + // Failed to get connection, decrement pending requests + connectionPendingRequestsCallback(ctx, -1, nil) + } + }() + } + + // Track wait time + waitStart := time.Now() if err := p.waitTurn(ctx); err != nil { + // Record timeout if applicable + if err == ErrPoolTimeout && connectionTimeoutCallback != nil { + connectionTimeoutCallback(ctx, nil, "pool") + } return nil, err } + waitDuration := time.Since(waitStart) // Use cached time for health checks (max 50ms staleness is acceptable) nowNs := getCachedTimeNs() @@ -524,6 +687,23 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } atomic.AddUint32(&p.stats.Hits, 1) + + // Notify metrics: connection moved from idle to used + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "idle", "used") + } + + // Record wait time and checkout time + if connectionWaitTimeCallback != nil { + connectionWaitTimeCallback(ctx, waitDuration, cn) + } + cn.checkoutAt.Store(time.Now().UnixNano()) + + // Decrement pending requests (connection acquired successfully) + if connectionPendingRequestsCallback != nil { + connectionPendingRequestsCallback(ctx, -1, cn) + } + return cn, nil } @@ -546,6 +726,23 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { return nil, err } } + + // Notify metrics: new connection is created and used + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, newcn, "", "used") + } + + // Record wait time and checkout time + if connectionWaitTimeCallback != nil { + connectionWaitTimeCallback(ctx, waitDuration, newcn) + } + newcn.checkoutAt.Store(time.Now().UnixNano()) + + // Decrement pending requests (connection acquired successfully) + if connectionPendingRequestsCallback != nil { + connectionPendingRequestsCallback(ctx, -1, newcn) + } + return newcn, nil } @@ -756,6 +953,14 @@ func (p *ConnPool) putConnWithoutTurn(ctx context.Context, cn *Conn) { // putConn is the internal implementation of Put that optionally frees a turn. func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { + // Record use time (time between checkout and return) + checkoutAtNs := cn.checkoutAt.Load() + if checkoutAtNs > 0 && connectionUseTimeCallback != nil { + useTime := time.Duration(time.Now().UnixNano() - checkoutAtNs) + connectionUseTimeCallback(ctx, useTime, cn) + cn.checkoutAt.Store(0) // Reset checkout time + } + // Process connection using the hooks system shouldPool := true shouldRemove := false @@ -840,9 +1045,19 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { p.connsMu.Unlock() p.idleConnsLen.Add(1) } + + // Notify metrics: connection moved from used to idle + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "idle") + } } else { shouldCloseConn = true p.removeConnWithLock(cn) + + // Notify metrics: connection removed (used -> nothing) + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "") + } } if freeTurn { @@ -870,6 +1085,14 @@ func (p *ConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error // removeConnInternal is the internal implementation of Remove that optionally frees a turn. func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason error, freeTurn bool) { + // Record use time if connection was checked out + checkoutAtNs := cn.checkoutAt.Load() + if checkoutAtNs > 0 && connectionUseTimeCallback != nil { + useTime := time.Duration(time.Now().UnixNano() - checkoutAtNs) + connectionUseTimeCallback(ctx, useTime, cn) + cn.checkoutAt.Store(0) // Reset checkout time + } + // Lock-free atomic read - no mutex overhead! hookManager := p.hookManager.Load() @@ -883,6 +1106,20 @@ func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason erro p.freeTurn() } + // Notify metrics: connection removed (assume from used state) + if connectionStateChangeCallback != nil { + connectionStateChangeCallback(ctx, cn, "used", "") + } + + // Record connection closed + if connectionClosedCallback != nil { + reasonStr := "unknown" + if reason != nil { + reasonStr = reason.Error() + } + connectionClosedCallback(ctx, cn, reasonStr) + } + _ = p.closeConn(cn) // Check if we need to create new idle connections to maintain MinIdleConns diff --git a/maintnotifications/handoff_worker.go b/maintnotifications/handoff_worker.go index 53f28f49c..632a3d842 100644 --- a/maintnotifications/handoff_worker.go +++ b/maintnotifications/handoff_worker.go @@ -434,6 +434,11 @@ func (hwm *handoffWorkerManager) performHandoffInternal( deadline := time.Now().Add(hwm.config.PostHandoffRelaxedDuration) conn.SetRelaxedTimeoutWithDeadline(relaxedTimeout, relaxedTimeout, deadline) + // Record relaxed timeout metric (post-handoff) + if relaxedTimeoutCallback := pool.GetConnectionRelaxedTimeoutCallback(); relaxedTimeoutCallback != nil { + relaxedTimeoutCallback(ctx, 1, conn, "main", "HANDOFF") + } + if internal.LogLevel.InfoOrAbove() { internal.Logger.Printf(context.Background(), logs.ApplyingRelaxedTimeoutDueToPostHandoff(connID, relaxedTimeout, deadline.Format("15:04:05.000"))) } @@ -462,6 +467,11 @@ func (hwm *handoffWorkerManager) performHandoffInternal( internal.Logger.Printf(ctx, logs.HandoffSucceeded(connID, newEndpoint)) // successfully completed the handoff, no retry needed and no error + // Notify metrics: connection handoff succeeded + if handoffCallback := pool.GetConnectionHandoffCallback(); handoffCallback != nil { + handoffCallback(ctx, conn, "main") + } + return false, nil } diff --git a/maintnotifications/push_notification_handler.go b/maintnotifications/push_notification_handler.go index 937b4ae82..1db035037 100644 --- a/maintnotifications/push_notification_handler.go +++ b/maintnotifications/push_notification_handler.go @@ -40,14 +40,44 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand var err error switch notificationType { case NotificationMoving: + // Record maintenance notification metric + if maintenanceCallback := pool.GetMaintenanceNotificationCallback(); maintenanceCallback != nil { + if conn, ok := handlerCtx.Conn.(*pool.Conn); ok { + maintenanceCallback(ctx, conn, notificationType) + } + } err = snh.handleMoving(ctx, handlerCtx, modifiedNotification) case NotificationMigrating: + // Record maintenance notification metric + if maintenanceCallback := pool.GetMaintenanceNotificationCallback(); maintenanceCallback != nil { + if conn, ok := handlerCtx.Conn.(*pool.Conn); ok { + maintenanceCallback(ctx, conn, notificationType) + } + } err = snh.handleMigrating(ctx, handlerCtx, modifiedNotification) case NotificationMigrated: + // Record maintenance notification metric + if maintenanceCallback := pool.GetMaintenanceNotificationCallback(); maintenanceCallback != nil { + if conn, ok := handlerCtx.Conn.(*pool.Conn); ok { + maintenanceCallback(ctx, conn, notificationType) + } + } err = snh.handleMigrated(ctx, handlerCtx, modifiedNotification) case NotificationFailingOver: + // Record maintenance notification metric + if maintenanceCallback := pool.GetMaintenanceNotificationCallback(); maintenanceCallback != nil { + if conn, ok := handlerCtx.Conn.(*pool.Conn); ok { + maintenanceCallback(ctx, conn, notificationType) + } + } err = snh.handleFailingOver(ctx, handlerCtx, modifiedNotification) case NotificationFailedOver: + // Record maintenance notification metric + if maintenanceCallback := pool.GetMaintenanceNotificationCallback(); maintenanceCallback != nil { + if conn, ok := handlerCtx.Conn.(*pool.Conn); ok { + maintenanceCallback(ctx, conn, notificationType) + } + } err = snh.handleFailedOver(ctx, handlerCtx, modifiedNotification) default: // Ignore other notification types (e.g., pub/sub messages) @@ -191,6 +221,12 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx internal.Logger.Printf(ctx, logs.RelaxedTimeoutDueToNotification(conn.GetID(), "MIGRATING", snh.manager.config.RelaxedTimeout)) } conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout) + + // Record relaxed timeout metric + if relaxedTimeoutCallback := pool.GetConnectionRelaxedTimeoutCallback(); relaxedTimeoutCallback != nil { + relaxedTimeoutCallback(ctx, 1, conn, "main", "MIGRATING") + } + return nil } @@ -249,6 +285,12 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt internal.Logger.Printf(ctx, logs.RelaxedTimeoutDueToNotification(connID, "FAILING_OVER", snh.manager.config.RelaxedTimeout)) } conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout) + + // Record relaxed timeout metric + if relaxedTimeoutCallback := pool.GetConnectionRelaxedTimeoutCallback(); relaxedTimeoutCallback != nil { + relaxedTimeoutCallback(ctx, 1, conn, "main", "FAILING_OVER") + } + return nil } diff --git a/osscluster.go b/osscluster.go index 6994ae83f..7e63c5e97 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1172,6 +1172,18 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { if moved || ask { c.state.LazyReload() + // Record error metrics + if errorCallback := pool.GetErrorCallback(); errorCallback != nil { + errorType := "MOVED" + statusCode := "MOVED" + if ask { + errorType = "ASK" + statusCode = "ASK" + } + // MOVED/ASK are not internal errors, and this is the first attempt (retry count = 0) + errorCallback(ctx, errorType, nil, statusCode, false, 0) + } + var err error node, err = c.nodes.GetOrCreate(addr) if err != nil { diff --git a/otel.go b/otel.go new file mode 100644 index 000000000..77a3964b4 --- /dev/null +++ b/otel.go @@ -0,0 +1,170 @@ +package redis + +import ( + "context" + "net" + "time" + + "github.com/redis/go-redis/v9/internal/otel" + "github.com/redis/go-redis/v9/internal/pool" +) + +// ConnInfo provides information about a Redis connection for metrics. +// This is a public interface to avoid exposing internal types. +type ConnInfo interface { + // RemoteAddr returns the remote network address + RemoteAddr() net.Addr +} + +// OTelRecorder is the interface for recording OpenTelemetry metrics. +// Implementations are provided by extra/redisotel-native package. +// +// This interface is exported to allow external packages to implement +// custom recorders without depending on internal packages. +type OTelRecorder interface { + // RecordOperationDuration records the total operation duration (including all retries) + RecordOperationDuration(ctx context.Context, duration time.Duration, cmd Cmder, attempts int, cn ConnInfo) + + // RecordConnectionStateChange records when a connection changes state (e.g., idle -> used) + RecordConnectionStateChange(ctx context.Context, cn ConnInfo, fromState, toState string) + + // RecordConnectionCreateTime records the time it took to create a new connection + RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn ConnInfo) + + // RecordConnectionRelaxedTimeout records when connection timeout is relaxed/unrelaxed + // delta: +1 for relaxed, -1 for unrelaxed + // poolName: name of the connection pool (e.g., "main", "pubsub") + // notificationType: the notification type that triggered the timeout relaxation (e.g., "MOVING", "HANDOFF") + RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn ConnInfo, poolName, notificationType string) + + // RecordConnectionHandoff records when a connection is handed off to another node + // poolName: name of the connection pool (e.g., "main", "pubsub") + RecordConnectionHandoff(ctx context.Context, cn ConnInfo, poolName string) + + // RecordError records client errors (ASK, MOVED, handshake failures, etc.) + // errorType: type of error (e.g., "ASK", "MOVED", "HANDSHAKE_FAILED") + // statusCode: Redis response status code if available (e.g., "MOVED", "ASK") + // isInternal: whether this is an internal error + // retryAttempts: number of retry attempts made + RecordError(ctx context.Context, errorType string, cn ConnInfo, statusCode string, isInternal bool, retryAttempts int) + + // RecordMaintenanceNotification records when a maintenance notification is received + // notificationType: the type of notification (e.g., "MOVING", "MIGRATING", etc.) + RecordMaintenanceNotification(ctx context.Context, cn ConnInfo, notificationType string) + + // RecordConnectionWaitTime records the time spent waiting for a connection from the pool + RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn ConnInfo) + + // RecordConnectionUseTime records the time a connection was checked out from the pool + RecordConnectionUseTime(ctx context.Context, duration time.Duration, cn ConnInfo) + + // RecordConnectionTimeout records when a connection timeout occurs + // timeoutType: "pool" for pool timeout, "read" for read timeout, "write" for write timeout + RecordConnectionTimeout(ctx context.Context, cn ConnInfo, timeoutType string) + + // RecordConnectionClosed records when a connection is closed + // reason: reason for closing (e.g., "idle", "max_lifetime", "error", "pool_closed") + RecordConnectionClosed(ctx context.Context, cn ConnInfo, reason string) + + // RecordConnectionPendingRequests records changes in pending requests count + // delta: +1 when request starts, -1 when request completes + RecordConnectionPendingRequests(ctx context.Context, delta int, cn ConnInfo) + + // RecordPubSubMessage records a Pub/Sub message + // direction: "sent" or "received" + // channel: channel name (may be hidden for cardinality reduction) + // sharded: true for sharded pub/sub (SPUBLISH/SSUBSCRIBE) + RecordPubSubMessage(ctx context.Context, cn ConnInfo, direction, channel string, sharded bool) + + // RecordStreamLag records the lag for stream consumer group processing + // lag: time difference between message creation and consumption + // streamName: name of the stream (may be hidden for cardinality reduction) + // consumerGroup: name of the consumer group + // consumerName: name of the consumer + RecordStreamLag(ctx context.Context, lag time.Duration, cn ConnInfo, streamName, consumerGroup, consumerName string) +} + +// SetOTelRecorder sets the global OpenTelemetry recorder. +// This is typically called by Init() in extra/redisotel-native package. +// +// Setting a nil recorder disables metrics collection. +func SetOTelRecorder(r OTelRecorder) { + if r == nil { + otel.SetGlobalRecorder(nil) + return + } + otel.SetGlobalRecorder(&otelRecorderAdapter{r}) +} + +// otelRecorderAdapter adapts the public OTelRecorder interface to the internal otel.Recorder interface +type otelRecorderAdapter struct { + recorder OTelRecorder +} + +// toConnInfo converts internal pool.Conn to public ConnInfo interface +// Returns nil if cn is nil, otherwise returns cn (which implements ConnInfo) +func toConnInfo(cn *pool.Conn) ConnInfo { + if cn != nil { + return cn + } + return nil +} + +func (a *otelRecorderAdapter) RecordOperationDuration(ctx context.Context, duration time.Duration, cmd otel.Cmder, attempts int, cn *pool.Conn) { + // Convert internal Cmder to public Cmder + if publicCmd, ok := cmd.(Cmder); ok { + a.recorder.RecordOperationDuration(ctx, duration, publicCmd, attempts, toConnInfo(cn)) + } +} + +func (a *otelRecorderAdapter) RecordConnectionStateChange(ctx context.Context, cn *pool.Conn, fromState, toState string) { + a.recorder.RecordConnectionStateChange(ctx, toConnInfo(cn), fromState, toState) +} + +func (a *otelRecorderAdapter) RecordConnectionCreateTime(ctx context.Context, duration time.Duration, cn *pool.Conn) { + a.recorder.RecordConnectionCreateTime(ctx, duration, toConnInfo(cn)) +} + +func (a *otelRecorderAdapter) RecordConnectionRelaxedTimeout(ctx context.Context, delta int, cn *pool.Conn, poolName, notificationType string) { + a.recorder.RecordConnectionRelaxedTimeout(ctx, delta, toConnInfo(cn), poolName, notificationType) +} + +func (a *otelRecorderAdapter) RecordConnectionHandoff(ctx context.Context, cn *pool.Conn, poolName string) { + a.recorder.RecordConnectionHandoff(ctx, toConnInfo(cn), poolName) +} + +func (a *otelRecorderAdapter) RecordError(ctx context.Context, errorType string, cn *pool.Conn, statusCode string, isInternal bool, retryAttempts int) { + a.recorder.RecordError(ctx, errorType, toConnInfo(cn), statusCode, isInternal, retryAttempts) +} + +func (a *otelRecorderAdapter) RecordMaintenanceNotification(ctx context.Context, cn *pool.Conn, notificationType string) { + a.recorder.RecordMaintenanceNotification(ctx, toConnInfo(cn), notificationType) +} + +func (a *otelRecorderAdapter) RecordConnectionWaitTime(ctx context.Context, duration time.Duration, cn *pool.Conn) { + a.recorder.RecordConnectionWaitTime(ctx, duration, toConnInfo(cn)) +} + +func (a *otelRecorderAdapter) RecordConnectionUseTime(ctx context.Context, duration time.Duration, cn *pool.Conn) { + a.recorder.RecordConnectionUseTime(ctx, duration, toConnInfo(cn)) +} + +func (a *otelRecorderAdapter) RecordConnectionTimeout(ctx context.Context, cn *pool.Conn, timeoutType string) { + a.recorder.RecordConnectionTimeout(ctx, toConnInfo(cn), timeoutType) +} + +func (a *otelRecorderAdapter) RecordConnectionClosed(ctx context.Context, cn *pool.Conn, reason string) { + a.recorder.RecordConnectionClosed(ctx, toConnInfo(cn), reason) +} + +func (a *otelRecorderAdapter) RecordConnectionPendingRequests(ctx context.Context, delta int, cn *pool.Conn) { + a.recorder.RecordConnectionPendingRequests(ctx, delta, toConnInfo(cn)) +} + +func (a *otelRecorderAdapter) RecordPubSubMessage(ctx context.Context, cn *pool.Conn, direction, channel string, sharded bool) { + a.recorder.RecordPubSubMessage(ctx, toConnInfo(cn), direction, channel, sharded) +} + +func (a *otelRecorderAdapter) RecordStreamLag(ctx context.Context, lag time.Duration, cn *pool.Conn, streamName, consumerGroup, consumerName string) { + a.recorder.RecordStreamLag(ctx, lag, toConnInfo(cn), streamName, consumerGroup, consumerName) +} diff --git a/pubsub.go b/pubsub.go index 1b9d4e7fe..49eec9358 100644 --- a/pubsub.go +++ b/pubsub.go @@ -8,6 +8,7 @@ import ( "time" "github.com/redis/go-redis/v9/internal" + "github.com/redis/go-redis/v9/internal/otel" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/push" @@ -403,7 +404,7 @@ func (p *Pong) String() string { return "Pong" } -func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { +func (c *PubSub) newMessage(ctx context.Context, cn *pool.Conn, reply interface{}) (interface{}, error) { switch reply := reply.(type) { case string: return &Pong{ @@ -420,30 +421,42 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { Count: int(reply[2].(int64)), }, nil case "message", "smessage": + channel := reply[1].(string) + sharded := kind == "smessage" switch payload := reply[2].(type) { case string: - return &Message{ - Channel: reply[1].(string), + msg := &Message{ + Channel: channel, Payload: payload, - }, nil + } + // Record PubSub message received + otel.RecordPubSubMessage(ctx, cn, "received", channel, sharded) + return msg, nil case []interface{}: ss := make([]string, len(payload)) for i, s := range payload { ss[i] = s.(string) } - return &Message{ - Channel: reply[1].(string), + msg := &Message{ + Channel: channel, PayloadSlice: ss, - }, nil + } + // Record PubSub message received + otel.RecordPubSubMessage(ctx, cn, "received", channel, sharded) + return msg, nil default: return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", payload) } case "pmessage": - return &Message{ + channel := reply[2].(string) + msg := &Message{ Pattern: reply[1].(string), - Channel: reply[2].(string), + Channel: channel, Payload: reply[3].(string), - }, nil + } + // Record PubSub message received (pattern message, not sharded) + otel.RecordPubSubMessage(ctx, cn, "received", channel, false) + return msg, nil case "pong": return &Pong{ Payload: reply[1].(string), @@ -485,7 +498,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int return nil, err } - return c.newMessage(c.cmd.Val()) + return c.newMessage(ctx, cn, c.cmd.Val()) } // Receive returns a message as a Subscription, Message, Pong or error. diff --git a/pubsub_commands.go b/pubsub_commands.go index 28622aa6b..ccc0ed524 100644 --- a/pubsub_commands.go +++ b/pubsub_commands.go @@ -1,6 +1,10 @@ package redis -import "context" +import ( + "context" + + "github.com/redis/go-redis/v9/internal/otel" +) type PubSubCmdable interface { Publish(ctx context.Context, channel string, message interface{}) *IntCmd @@ -16,12 +20,20 @@ type PubSubCmdable interface { func (c cmdable) Publish(ctx context.Context, channel string, message interface{}) *IntCmd { cmd := NewIntCmd(ctx, "publish", channel, message) _ = c(ctx, cmd) + // Record PubSub message sent (if command succeeded) + if cmd.Err() == nil { + otel.RecordPubSubMessage(ctx, nil, "sent", channel, false) + } return cmd } func (c cmdable) SPublish(ctx context.Context, channel string, message interface{}) *IntCmd { cmd := NewIntCmd(ctx, "spublish", channel, message) _ = c(ctx, cmd) + // Record PubSub message sent (if command succeeded) + if cmd.Err() == nil { + otel.RecordPubSubMessage(ctx, nil, "sent", channel, true) + } return cmd } diff --git a/redis.go b/redis.go index a6a710677..7c963ceb3 100644 --- a/redis.go +++ b/redis.go @@ -13,6 +13,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/auth/streaming" "github.com/redis/go-redis/v9/internal/hscan" + "github.com/redis/go-redis/v9/internal/otel" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/maintnotifications" @@ -559,6 +560,13 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { // enabled mode, fail the connection c.optLock.Unlock() cn.GetStateMachine().Transition(pool.StateClosed) + + // Record handshake failure metric + if errorCallback := pool.GetErrorCallback(); errorCallback != nil { + // Handshake failures are internal errors with no retry attempts + errorCallback(ctx, "HANDSHAKE_FAILED", cn, "HANDSHAKE_FAILED", true, 0) + } + return fmt.Errorf("failed to enable maintnotifications: %w", maintNotifHandshakeErr) default: // will handle auto and any other // Disabling logging here as it's too noisy. @@ -662,17 +670,34 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, } func (c *baseClient) process(ctx context.Context, cmd Cmder) error { + // Start measuring total operation duration (includes all retries) + operationStart := time.Now() + var lastConn *pool.Conn + var lastErr error + totalAttempts := 0 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + totalAttempts++ attempt := attempt - retry, err := c._process(ctx, cmd, attempt) + retry, cn, err := c._process(ctx, cmd, attempt) + if cn != nil { + lastConn = cn + } if err == nil || !retry { + // Record total operation duration + operationDuration := time.Since(operationStart) + otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) return err } lastErr = err } + + // Record failed operation after all retries + operationDuration := time.Since(operationStart) + otel.RecordOperationDuration(ctx, operationDuration, cmd, totalAttempts, lastConn) + return lastErr } @@ -689,15 +714,17 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) { } } -func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { +func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, *pool.Conn, error) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - return false, err + return false, nil, err } } + var usedConn *pool.Conn retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + usedConn = cn // Process any pending push notifications before executing the command if err := c.processPushNotifications(ctx, cn); err != nil { internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) @@ -738,10 +765,10 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool return nil }); err != nil { retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) - return retry, err + return retry, usedConn, err } - return false, nil + return false, usedConn, nil } func (c *baseClient) retryBackoff(attempt int) time.Duration { diff --git a/stream_commands.go b/stream_commands.go index 5573e48b9..73c01de5c 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -2,7 +2,11 @@ package redis import ( "context" + "strconv" + "strings" "time" + + "github.com/redis/go-redis/v9/internal/otel" ) type StreamCmdable interface { @@ -299,6 +303,26 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic } cmd.SetFirstKeyPos(keyPos) _ = c(ctx, cmd) + + // Record stream lag for each message (if command succeeded) + if cmd.Err() == nil { + streams := cmd.Val() + for _, stream := range streams { + for _, msg := range stream.Messages { + // Parse message ID to extract timestamp (format: "millisecondsTime-sequenceNumber") + if parts := strings.SplitN(msg.ID, "-", 2); len(parts) == 2 { + if timestampMs, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + // Calculate lag (time since message was created) + messageTime := time.Unix(0, timestampMs*int64(time.Millisecond)) + lag := time.Since(messageTime) + // Record lag metric + otel.RecordStreamLag(ctx, lag, nil, stream.Stream, a.Group, a.Consumer) + } + } + } + } + } + return cmd }