Skip to content

Commit 7ab5bc7

Browse files
committed
rollback InitialSync
Signed-off-by: SungJin1212 <[email protected]>
1 parent c34a224 commit 7ab5bc7

File tree

5 files changed

+6
-251
lines changed

5 files changed

+6
-251
lines changed

integration/parquet_gateway_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,6 @@ func TestParquetGatewayWithBlocksStorageRunningInMicroservicesMode(t *testing.T)
325325
return found
326326
})
327327

328-
// Check how many tenants have been discovered and synced by store-gateways.
329-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_discovered"))
330-
if testCfg.blocksShardingStrategy == "shuffle-sharding" {
331-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_bucket_stores_tenants_synced"))
332-
} else {
333-
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_synced"))
334-
}
335-
336328
// Wait until the parquet-converter convert blocks
337329
require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(3)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics))
338330

pkg/storegateway/bucket_store_metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cortexproject/cortex/pkg/util"
88
)
99

10+
// CortexBucketStoreMetrics common metrics in thanos and parquet block stores (in future)
1011
type CortexBucketStoreMetrics struct {
1112
syncTimes prometheus.Histogram
1213
syncLastSuccess prometheus.Gauge

pkg/storegateway/bucket_stores.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many
103103
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) {
104104
switch cfg.BucketStore.BucketStoreType {
105105
case string(tsdb.ParquetBucketStore):
106-
return newParquetBucketStores(cfg, shardingStrategy, bucketClient, limits, logger, reg)
106+
return newParquetBucketStores(cfg, bucketClient, limits, logger, reg)
107107
case string(tsdb.TSDBBucketStore):
108108
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg)
109109
default:

pkg/storegateway/parquet_bucket_stores.go

Lines changed: 3 additions & 241 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,8 @@ package storegateway
33
import (
44
"context"
55
"fmt"
6-
"os"
7-
"path/filepath"
86
"sort"
97
"sync"
10-
"time"
118

129
"github.com/go-kit/log"
1310
"github.com/go-kit/log/level"
@@ -22,7 +19,6 @@ import (
2219
"github.com/prometheus/prometheus/model/labels"
2320
prom_storage "github.com/prometheus/prometheus/storage"
2421
"github.com/prometheus/prometheus/tsdb/chunkenc"
25-
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
2622
"github.com/thanos-io/objstore"
2723
storecache "github.com/thanos-io/thanos/pkg/store/cache"
2824
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -36,7 +32,6 @@ import (
3632
"github.com/cortexproject/cortex/pkg/storage/tsdb"
3733
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3834
cortex_util "github.com/cortexproject/cortex/pkg/util"
39-
"github.com/cortexproject/cortex/pkg/util/backoff"
4035
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
4136
"github.com/cortexproject/cortex/pkg/util/spanlogger"
4237
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -64,14 +59,12 @@ type ParquetBucketStores struct {
6459

6560
cortexBucketStoreMetrics *CortexBucketStoreMetrics
6661
userScanner users.Scanner
67-
shardingStrategy ShardingStrategy
6862

69-
userTokenBucketsMu sync.RWMutex
70-
userTokenBuckets map[string]*cortex_util.TokenBucket
63+
userTokenBuckets map[string]*cortex_util.TokenBucket
7164
}
7265

7366
// newParquetBucketStores creates a new multi-tenant parquet bucket stores
74-
func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*ParquetBucketStores, error) {
67+
func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*ParquetBucketStores, error) {
7568
// Create caching bucket client for parquet bucket stores
7669
cachingBucket, err := createCachingBucketClientForParquet(cfg, bucketClient, "parquet-storegateway", logger, reg)
7770
if err != nil {
@@ -88,7 +81,6 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shard
8881
chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()),
8982
inflightRequests: cortex_util.NewInflightRequestTracker(),
9083
cortexBucketStoreMetrics: NewCortexBucketStoreMetrics(reg),
91-
shardingStrategy: shardingStrategy,
9284
userTokenBuckets: make(map[string]*cortex_util.TokenBucket),
9385
}
9486
u.userScanner, err = users.NewScanner(cfg.UsersScanner, bucketClient, logger, reg)
@@ -209,244 +201,14 @@ func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.Labe
209201

210202
// SyncBlocks implements BucketStores
211203
func (u *ParquetBucketStores) SyncBlocks(ctx context.Context) error {
212-
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, p *parquetBucketStore) error {
213-
return p.SyncBlocks(ctx)
214-
})
204+
return nil
215205
}
216206

217207
// InitialSync implements BucketStores
218208
func (u *ParquetBucketStores) InitialSync(ctx context.Context) error {
219-
level.Info(u.logger).Log("msg", "synchronizing Parquet blocks for all users")
220-
221-
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, p *parquetBucketStore) error {
222-
return p.InitialSync(ctx)
223-
}); err != nil {
224-
level.Warn(u.logger).Log("msg", "failed to synchronize Parquet blocks", "err", err)
225-
return err
226-
}
227-
228-
level.Info(u.logger).Log("msg", "successfully synchronized Parquet blocks for all users")
229209
return nil
230210
}
231211

232-
func (u *ParquetBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *parquetBucketStore) error) error {
233-
retries := backoff.New(ctx, backoff.Config{
234-
MinBackoff: 1 * time.Second,
235-
MaxBackoff: 10 * time.Second,
236-
MaxRetries: 3,
237-
})
238-
239-
var lastErr error
240-
for retries.Ongoing() {
241-
lastErr = u.syncUsersBlocks(ctx, f)
242-
if lastErr == nil {
243-
return nil
244-
}
245-
246-
retries.Wait()
247-
}
248-
249-
if lastErr == nil {
250-
return retries.Err()
251-
}
252-
253-
return lastErr
254-
}
255-
256-
func (u *ParquetBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *parquetBucketStore) error) (returnErr error) {
257-
defer func(start time.Time) {
258-
u.cortexBucketStoreMetrics.syncTimes.Observe(time.Since(start).Seconds())
259-
if returnErr == nil {
260-
u.cortexBucketStoreMetrics.syncLastSuccess.SetToCurrentTime()
261-
}
262-
}(time.Now())
263-
264-
type job struct {
265-
userID string
266-
store *parquetBucketStore
267-
}
268-
269-
wg := &sync.WaitGroup{}
270-
jobs := make(chan job)
271-
errs := tsdb_errors.NewMulti()
272-
errsMx := sync.Mutex{}
273-
274-
// Scan users in the bucket.
275-
userIDs, err := u.scanUsers(ctx)
276-
if err != nil {
277-
return err
278-
}
279-
280-
includeUserIDs := make(map[string]struct{})
281-
for _, userID := range u.shardingStrategy.FilterUsers(ctx, userIDs) {
282-
includeUserIDs[userID] = struct{}{}
283-
}
284-
285-
u.cortexBucketStoreMetrics.tenantsDiscovered.Set(float64(len(userIDs)))
286-
u.cortexBucketStoreMetrics.tenantsSynced.Set(float64(len(includeUserIDs)))
287-
288-
// Create a pool of workers which will synchronize blocks. The pool size
289-
// is limited in order to avoid to concurrently sync a lot of tenants in
290-
// a large cluster.
291-
for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ {
292-
wg.Add(1)
293-
go func() {
294-
defer wg.Done()
295-
296-
for job := range jobs {
297-
if err := f(ctx, job.store); err != nil {
298-
if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
299-
u.storesErrorsMu.Lock()
300-
u.storesErrors[job.userID] = httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err)
301-
u.storesErrorsMu.Unlock()
302-
} else {
303-
errsMx.Lock()
304-
errs.Add(errors.Wrapf(err, "failed to synchronize Parquet blocks for user %s", job.userID))
305-
errsMx.Unlock()
306-
}
307-
} else {
308-
u.storesErrorsMu.Lock()
309-
delete(u.storesErrors, job.userID)
310-
u.storesErrorsMu.Unlock()
311-
}
312-
}
313-
}()
314-
}
315-
316-
// Lazily create a bucket store for each new user found
317-
// and submit a sync job for each user.
318-
for _, userID := range userIDs {
319-
// If we don't have a store for the tenant yet, then we should skip it if it's not
320-
// included in the store-gateway shard. If we already have it, we need to sync it
321-
// anyway to make sure all its blocks are unloaded and metrics updated correctly
322-
// (but bucket API calls are skipped thanks to the objstore client adapter).
323-
if _, included := includeUserIDs[userID]; !included && u.getStore(userID) == nil {
324-
continue
325-
}
326-
327-
bs, err := u.getOrCreateStore(userID)
328-
if err != nil {
329-
errsMx.Lock()
330-
errs.Add(err)
331-
errsMx.Unlock()
332-
333-
continue
334-
}
335-
336-
select {
337-
case jobs <- job{userID: userID, store: bs}:
338-
// Nothing to do. Will loop to push more jobs.
339-
case <-ctx.Done():
340-
return ctx.Err()
341-
}
342-
}
343-
344-
// Wait until all workers completed.
345-
close(jobs)
346-
wg.Wait()
347-
348-
u.deleteLocalFilesForExcludedTenants(includeUserIDs)
349-
350-
return errs.Err()
351-
}
352-
353-
// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current
354-
// shard.
355-
func (u *ParquetBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
356-
files, err := os.ReadDir(u.cfg.BucketStore.SyncDir)
357-
if err != nil {
358-
return
359-
}
360-
361-
for _, f := range files {
362-
if !f.IsDir() {
363-
continue
364-
}
365-
366-
userID := f.Name()
367-
if _, included := includeUserIDs[userID]; included {
368-
// Preserve directory for users owned by this shard.
369-
continue
370-
}
371-
372-
err := u.closeEmptyBucketStore(userID)
373-
switch {
374-
case errors.Is(err, errBucketStoreNotEmpty):
375-
continue
376-
case errors.Is(err, errBucketStoreNotFound):
377-
// This is OK, nothing was closed.
378-
case err == nil:
379-
level.Info(u.logger).Log("msg", "closed bucket store for user", "user", userID)
380-
default:
381-
level.Warn(u.logger).Log("msg", "failed to close bucket store for user", "user", userID, "err", err)
382-
}
383-
384-
userSyncDir := u.syncDirForUser(userID)
385-
err = os.RemoveAll(userSyncDir)
386-
if err == nil {
387-
level.Info(u.logger).Log("msg", "deleted user sync directory", "dir", userSyncDir)
388-
} else {
389-
level.Warn(u.logger).Log("msg", "failed to delete user sync directory", "dir", userSyncDir, "err", err)
390-
}
391-
}
392-
}
393-
394-
// closeEmptyBucketStore closes bucket store for given user, if it is empty,
395-
// and removes it from bucket stores map and metrics.
396-
// If bucket store doesn't exist, returns errBucketStoreNotFound.
397-
// Otherwise returns error from closing the bucket store.
398-
func (u *ParquetBucketStores) closeEmptyBucketStore(userID string) error {
399-
u.storesMu.Lock()
400-
unlockInDefer := true
401-
defer func() {
402-
if unlockInDefer {
403-
u.storesMu.Unlock()
404-
}
405-
}()
406-
407-
bs := u.stores[userID]
408-
if bs == nil {
409-
return errBucketStoreNotFound
410-
}
411-
412-
delete(u.stores, userID)
413-
unlockInDefer = false
414-
u.storesMu.Unlock()
415-
416-
if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
417-
u.userTokenBucketsMu.Lock()
418-
delete(u.userTokenBuckets, userID)
419-
u.userTokenBucketsMu.Unlock()
420-
}
421-
422-
return bs.Close()
423-
}
424-
425-
func (u *ParquetBucketStores) syncDirForUser(userID string) string {
426-
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
427-
}
428-
429-
// scanUsers in the bucket and return the list of found users. It includes active and deleting users
430-
// but not deleted users.
431-
func (u *ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) {
432-
activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx)
433-
if err != nil {
434-
return nil, err
435-
}
436-
users := make([]string, 0, len(activeUsers)+len(deletingUsers))
437-
users = append(users, activeUsers...)
438-
users = append(users, deletingUsers...)
439-
users = deduplicateUsers(users)
440-
441-
return users, err
442-
}
443-
444-
func (u *ParquetBucketStores) getStore(userID string) *parquetBucketStore {
445-
u.storesMu.RLock()
446-
defer u.storesMu.RUnlock()
447-
return u.stores[userID]
448-
}
449-
450212
func (u *ParquetBucketStores) getStoreError(userID string) error {
451213
u.storesErrorsMu.RLock()
452214
defer u.storesErrorsMu.RUnlock()

pkg/storegateway/parquet_bucket_stores_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func TestParquetBucketStoresWithCaching(t *testing.T) {
224224
limits := validation.NewOverrides(validation.Limits{}, nil)
225225

226226
// Create parquet bucket stores with caching
227-
parquetStores, err := newParquetBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucketClient, limits, log.NewNopLogger(), prometheus.NewRegistry())
227+
parquetStores, err := newParquetBucketStores(storageCfg, bucketClient, limits, log.NewNopLogger(), prometheus.NewRegistry())
228228
require.NoError(t, err)
229229
require.NotNil(t, parquetStores)
230230

0 commit comments

Comments
 (0)