Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [ENHANCEMENT] Alertmanager/Ruler: Introduce a user scanner to reduce the number of list calls to object storage. #6999
* [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,11 @@ blocks_storage:
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# How frequently user index file is updated, it only take effect when user
# scan stratehy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,11 @@ blocks_storage:
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# How frequently user index file is updated, it only take effect when user
# scan stratehy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
Expand Down
45 changes: 45 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,26 @@ local:
# Path at which alertmanager configurations are stored.
# CLI flag: -alertmanager-storage.local.path
[path: <string> | default = ""]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -alertmanager-storage.users-scanner.strategy
[strategy: <string> | default = "list"]

# Maximum period of time to consider the user index as stale. Fall back to the
# base scanner if stale. Only valid when strategy is user_index.
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# How frequently user index file is updated, it only take effect when user
# scan stratehy is user_index.
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
[cache_ttl: <duration> | default = 0s]
```

### `blocks_storage_config`
Expand Down Expand Up @@ -2602,6 +2622,11 @@ users_scanner:
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# How frequently user index file is updated, it only take effect when user
# scan stratehy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
Expand Down Expand Up @@ -5832,6 +5857,26 @@ local:
# Directory to scan for rules
# CLI flag: -ruler-storage.local.directory
[directory: <string> | default = ""]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -ruler-storage.users-scanner.strategy
[strategy: <string> | default = "list"]

# Maximum period of time to consider the user index as stale. Fall back to the
# base scanner if stale. Only valid when strategy is user_index.
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# How frequently user index file is updated, it only take effect when user
# scan stratehy is user_index.
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
[clean_up_interval: <duration> | default = 15m]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
# CLI flag: -ruler-storage.users-scanner.cache-ttl
[cache_ttl: <duration> | default = 0s]
```

### `runtime_configuration_storage_config`
Expand Down
35 changes: 35 additions & 0 deletions integration/alertmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,41 @@ func TestAlertmanager(t *testing.T) {
assertServiceMetricsPrefixes(t, AlertManager, alertmanager)
}

func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml)))

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, alertsBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
"-alertmanager-storage.users-scanner.strategy": "user_index",
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
"-alertmanager.configs.poll-interval": "5s",
})

am := e2ecortex.NewAlertmanager(
"alertmanager",
flags,
"",
)

require.NoError(t, s.StartAndWaitReady(am))
// To make sure user index file is updated/scanned
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
)
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
)
}

func TestAlertmanagerStoreAPI(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
59 changes: 59 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,65 @@ func TestRulerAPI(t *testing.T) {
}
}

func TestRulerWithUserIndexUpdater(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
"-ruler.sharding-strategy": "shuffle-sharding",
"-ruler-storage.users-scanner.strategy": "user_index",
"-ruler-storage.users-scanner.user-index.cleanup-interval": "15s",
"-ruler.tenant-shard-size": "1",
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-ruler.poll-interval": "2s",
"-log.level": "info",
},
)

ruler := e2ecortex.NewRuler(
"ruler",
consul.NetworkHTTPEndpoint(),
rulerFlags,
"",
)

require.NoError(t, s.StartAndWaitReady(ruler))

// Create a client with the ruler address configured
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
require.NoError(t, err)

ruleGroup := createTestRuleGroup(t)
ns := "ns"

// Set the rule group into the ruler
require.NoError(t, c.SetRuleGroup(ruleGroup, ns))

// To make sure user index file is updated/scanned
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
)

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
)
}

func TestRulerAPISingleBinary(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
63 changes: 46 additions & 17 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package bucketclient
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/runutil"
"github.com/cortexproject/cortex/pkg/util/users"
)

const (
Expand Down Expand Up @@ -45,27 +47,54 @@ type BucketAlertStore struct {
amBucket objstore.Bucket
cfgProvider bucket.TenantConfigProvider
logger log.Logger

usersScanner users.Scanner
userIndexUpdater *users.UserIndexUpdater
}

func NewBucketAlertStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *BucketAlertStore {
return &BucketAlertStore{
alertsBucket: bucket.NewPrefixedBucketClient(bkt, alertsPrefix),
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
cfgProvider: cfgProvider,
logger: logger,
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)

regWithComponent := extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg)
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, regWithComponent)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
}

var userIndexUpdater *users.UserIndexUpdater
if userScannerCfg.Strategy == users.UserScanStrategyUserIndex {
// We hardcode strategy to be list so can ignore error.
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
}, alertBucket, logger, regWithComponent)
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
}

return &BucketAlertStore{
alertsBucket: alertBucket,
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
cfgProvider: cfgProvider,
logger: logger,
usersScanner: usersScanner,
userIndexUpdater: userIndexUpdater,
}, nil
}

// GetUserIndexUpdater implements alertstore.AlertStore.
func (s *BucketAlertStore) GetUserIndexUpdater() *users.UserIndexUpdater {
return s.userIndexUpdater
}

// ListAllUsers implements alertstore.AlertStore.
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
var userIDs []string

err := s.alertsBucket.Iter(ctx, "", func(key string) error {
userIDs = append(userIDs, key)
return nil
})

return userIDs, err
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)
if err != nil {
return nil, fmt.Errorf("unable to list users in alertmanager store bucket: %w", err)
}
userIDs := make([]string, 0, len(active)+len(deleting))
userIDs = append(userIDs, active...)
userIDs = append(userIDs, deleting...)
return userIDs, nil
}

// GetAlertConfigs implements alertstore.AlertStore.
Expand Down Expand Up @@ -217,5 +246,5 @@ func (s *BucketAlertStore) getUserBucket(userID string) objstore.Bucket {

func (s *BucketAlertStore) getAlertmanagerUserBucket(userID string) objstore.Bucket {
uBucket := bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider)
return uBucket.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
return uBucket.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
}
9 changes: 6 additions & 3 deletions pkg/alertmanager/alertstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
"github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/users"
)

// Config configures a the alertmanager storage backend.
// Config configures the alertmanager storage backend.
type Config struct {
bucket.Config `yaml:",inline"`
ConfigDB client.Config `yaml:"configdb"`
Local local.StoreConfig `yaml:"local"`
ConfigDB client.Config `yaml:"configdb"`
Local local.StoreConfig `yaml:"local"`
UsersScanner users.UsersScannerConfig `yaml:"users_scanner"`
}

// RegisterFlags registers the backend storage config.
Expand All @@ -25,6 +27,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
cfg.RegisterFlagsWithPrefix(prefix, f)
cfg.UsersScanner.RegisterFlagsWithPrefix(prefix, f)
}

// IsFullStateSupported returns if the given configuration supports access to FullState objects.
Expand Down
6 changes: 6 additions & 0 deletions pkg/alertmanager/alertstore/configdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/configs/userconfig"
"github.com/cortexproject/cortex/pkg/util/users"
)

const (
Expand Down Expand Up @@ -34,6 +35,11 @@ func NewStore(c client.Client) *Store {
}
}

// GetUserIndexUpdater implements alertstore.AlertStore.
func (c *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
return nil
}

// ListAllUsers implements alertstore.AlertStore.
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
configs, err := c.reloadConfigs(ctx)
Expand Down
6 changes: 6 additions & 0 deletions pkg/alertmanager/alertstore/local/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/alertmanager/config"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/util/users"
)

const (
Expand Down Expand Up @@ -43,6 +44,11 @@ func NewStore(cfg StoreConfig) (*Store, error) {
return &Store{cfg}, nil
}

// GetUserIndexUpdater implements alertstore.AlertStore.
func (f *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
return nil
}

// ListAllUsers implements alertstore.AlertStore.
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
configs, err := f.reloadConfigs()
Expand Down
Loading
Loading