Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
* [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_head_stale_series` metric to keep track of number of stale series on head. #7071
* [ENHANCEMENT] Expose more Go runtime metrics. #7070
* [ENHANCEMENT] Distributor: Filter out label with empty value. #7069
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3738,6 +3738,12 @@ instance_limits:
# CLI flag: -ingester.skip-metadata-limits
[skip_metadata_limits: <boolean> | default = true]

# Enable optimization of label matchers when query chunks. When enabled,
# matchers with low selectivity such as =~.+ are applied lazily during series
# scanning instead of being used for postings matching.
# CLI flag: -ingester.enable-matcher-optimization
[enable_matcher_optimization: <boolean> | default = false]

query_protection:
rejection:
threshold:
Expand Down
18 changes: 16 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ type Config struct {
// If enabled, the metadata API returns all metadata regardless of the limits.
SkipMetadataLimits bool `yaml:"skip_metadata_limits"`

// When enabled, matchers with low selectivity are applied lazily during series scanning
// instead of being used for postings selection.
EnableMatcherOptimization bool `yaml:"enable_matcher_optimization"`

QueryProtection configs.QueryProtection `yaml:"query_protection"`
}

Expand All @@ -185,6 +189,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.")
f.BoolVar(&cfg.EnableMatcherOptimization, "ingester.enable-matcher-optimization", false, "Enable optimization of label matchers when query chunks. When enabled, matchers with low selectivity such as =~.+ are applied lazily during series scanning instead of being used for postings matching.")

cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.")
cfg.QueryProtection.RegisterFlagsWithPrefix(f, "ingester.")
Expand Down Expand Up @@ -2295,6 +2300,10 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
End: through,
DisableTrimming: i.cfg.DisableChunkTrimming,
}
var lazyMatchers []*labels.Matcher
if i.cfg.EnableMatcherOptimization {
matchers, lazyMatchers = optimizeMatchers(matchers)
}
// It's not required to return sorted series because series are sorted by the Cortex querier.
ss := q.Select(ctx, false, hints, matchers...)
c()
Expand All @@ -2308,14 +2317,19 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
var it chunks.Iterator
for ss.Next() {
series := ss.At()
lbls := series.Labels()

if !labelsMatches(lbls, lazyMatchers) {
continue
}

if sm.IsSharded() && !sm.MatchesLabels(series.Labels()) {
if sm.IsSharded() && !sm.MatchesLabels(lbls) {
continue
}

// convert labels to LabelAdapter
ts := client.TimeSeriesChunk{
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
}

it := series.Iterator(it)
Expand Down
129 changes: 129 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -3941,6 +3942,134 @@ func BenchmarkIngester_QueryStream_Chunks(b *testing.B) {
}
}

func BenchmarkIngester_QueryStreamChunks_MatcherOptimization(b *testing.B) {
tests := map[string]struct {
matchers []*labels.Matcher
description string
}{
"metric name with regex matchers": {
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
labels.MustNewMatcher(labels.MatchRegexp, "region", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"),
},
description: "Metric name with .+ regex matchers",
},
"metric name with not equal empty": {
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
labels.MustNewMatcher(labels.MatchNotEqual, "env", ""),
labels.MustNewMatcher(labels.MatchNotEqual, "pod", ""),
},
description: "Metric name with != \"\" matchers",
},
"metric name with sparse label": {
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
labels.MustNewMatcher(labels.MatchRegexp, "sparse_label", ".+"),
},
description: "Metric name with sparse label matcher",
},
"complex matchers": {
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
labels.MustNewMatcher(labels.MatchRegexp, "region", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "env", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "pod", ".+"),
},
description: "Complex matchers with .+ regex",
},
}

for testName, testData := range tests {
b.Run(testName+"_optimization_disabled", func(b *testing.B) {
benchmarkQueryStreamChunksWithMatcherOptimization(b, false, testData.matchers, testData.description+" without optimization")
})
b.Run(testName+"_optimization_enabled", func(b *testing.B) {
benchmarkQueryStreamChunksWithMatcherOptimization(b, true, testData.matchers, testData.description+" with optimization")
})
}
}

func benchmarkQueryStreamChunksWithMatcherOptimization(b *testing.B, enableMatcherOptimization bool, matchers []*labels.Matcher, description string) {
cfg := defaultIngesterTestConfig(b)
cfg.EnableMatcherOptimization = enableMatcherOptimization

i, err := prepareIngesterWithBlocksStorage(b, cfg, prometheus.NewRegistry())
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(b, 1*time.Second, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), userID)

for s := range 1000 {
// Create base labels
labelPairs := []string{
labels.MetricName, "test_metric",
"region", fmt.Sprintf("region-%d", s%10),
"job", fmt.Sprintf("job-%d", s%20),
"env", fmt.Sprintf("env-%d", s%5),
"pod", fmt.Sprintf("pod-%d", s%1000),
}

// Add sparse label only for half of the series
if s%2 == 0 {
labelPairs = append(labelPairs, "sparse_label", fmt.Sprintf("sparse-%d", s%50))
}

lbls := labels.FromStrings(labelPairs...)

samples := make([]cortexpb.Sample, 0, 5)
for t := range 5 {
samples = append(samples, cortexpb.Sample{
Value: float64(s + t),
TimestampMs: int64(s*5 + t),
})
}

// Create labels slice with same length as samples
labelsSlice := make([]labels.Labels, len(samples))
for j := range labelsSlice {
labelsSlice[j] = lbls
}

req := cortexpb.ToWriteRequest(labelsSlice, samples, nil, nil, cortexpb.API)
_, err = i.Push(ctx, req)
require.NoError(b, err)
}

db, err := i.getTSDB(userID)
require.NoError(b, err)
require.NotNil(b, db)

mockStream := &mockQueryStreamServer{ctx: ctx}
sm := (&storepb.ShardInfo{
TotalShards: 0,
}).Matcher(nil)

b.ReportAllocs()
b.ResetTimer()

for b.Loop() {
numSeries, numSamples, _, numChunks, err := i.queryStreamChunks(
ctx, db, 0, 5000, matchers, sm, mockStream)

require.NoError(b, err)
require.Greater(b, numSeries, 0)
require.Greater(b, numSamples, 0)
require.Greater(b, numChunks, 0)

// Reset the mock stream for next iteration
mockStream.series = mockStream.series[:0]
}
}

func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) {
cfg := defaultIngesterTestConfig(b)

Expand Down
63 changes: 63 additions & 0 deletions pkg/ingester/matchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ingester

import (
"slices"

"github.com/prometheus/prometheus/model/labels"
)

// optimizeMatchers categorizes input matchers to matchers used in select and matchers applied lazily
// when scanning series.
func optimizeMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, []*labels.Matcher) {
// If there is only 1 matcher, use it for select.
// If there is no matcher to optimize, also return early.
if len(matchers) < 2 || !canOptimizeMatchers(matchers) {
return matchers, nil
}
selectMatchers := make([]*labels.Matcher, 0, len(matchers))
lazyMatchers := make([]*labels.Matcher, 0)
for _, m := range matchers {
// =~.* is a noop as it matches everything.
if m.Type == labels.MatchRegexp && m.Value == ".*" {
continue
}
if lazyMatcher(m) {
lazyMatchers = append(lazyMatchers, m)
continue
}
selectMatchers = append(selectMatchers, m)
}

// We need at least 1 select matcher.
if len(selectMatchers) == 0 {
selectMatchers = lazyMatchers[:1]
lazyMatchers = lazyMatchers[1:]
}

return selectMatchers, lazyMatchers
}

func canOptimizeMatchers(matchers []*labels.Matcher) bool {
return slices.ContainsFunc(matchers, lazyMatcher)
}

func labelsMatches(lbls labels.Labels, matchers []*labels.Matcher) bool {
for _, m := range matchers {
if !m.Matches(lbls.Get(m.Name)) {
return false
}
}
return true
}

// lazyMatcher checks if the label matcher should be applied lazily when scanning series instead of fetching postings
// for matcher. The matchers to apply lazily are matchers that are known to have low selectivity.
func lazyMatcher(matcher *labels.Matcher) bool {
if matcher.Value == ".+" && matcher.Type == labels.MatchRegexp {
return true
}
if matcher.Value == "" && (matcher.Type == labels.MatchNotEqual || matcher.Type == labels.MatchNotRegexp) {
return true
}
return false
}
Loading
Loading