diff --git a/cmd/server/go.mod b/cmd/server/go.mod index 3f6544affca..34be2659594 100644 --- a/cmd/server/go.mod +++ b/cmd/server/go.mod @@ -120,7 +120,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect diff --git a/cmd/server/go.sum b/cmd/server/go.sum index e0425cc2ba1..60d32b77b4c 100644 --- a/cmd/server/go.sum +++ b/cmd/server/go.sum @@ -169,8 +169,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/common/config/config.go b/common/config/config.go index 904a2f7eb8e..71a0b2ce961 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -631,11 +631,12 @@ type ( // ShardDistribution is a configuration for leader election running. // This configuration should be in sync with sharddistributor. ShardDistribution struct { - LeaderStore Store `yaml:"leaderStore"` - Election Election `yaml:"election"` - Namespaces []Namespace `yaml:"namespaces"` - Process LeaderProcess `yaml:"process"` - Store Store `yaml:"store"` + LeaderStore Store `yaml:"leaderStore"` + Election Election `yaml:"election"` + Namespaces []Namespace `yaml:"namespaces"` + Process LeaderProcess `yaml:"process"` + Store Store `yaml:"store"` + DataCompression bool `yaml:"dataCompression" default:"true"` } // Store is a generic container for any storage configuration that should be parsed by the implementation. diff --git a/config/development.yaml b/config/development.yaml index d485c7ac4ba..c3c2f9ef72e 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -186,3 +186,4 @@ shardDistribution: process: period: 1s heartbeatTTL: 2s + dataCompression: false diff --git a/service/sharddistributor/config/config.go b/service/sharddistributor/config/config.go index d5337d51ee5..eef08b8b684 100644 --- a/service/sharddistributor/config/config.go +++ b/service/sharddistributor/config/config.go @@ -49,11 +49,12 @@ type ( // ShardDistribution is a configuration for leader election running. ShardDistribution struct { - LeaderStore Store `yaml:"leaderStore"` - Election Election `yaml:"election"` - Namespaces []Namespace `yaml:"namespaces"` - Process LeaderProcess `yaml:"process"` - Store Store `yaml:"store"` + LeaderStore Store `yaml:"leaderStore"` + Election Election `yaml:"election"` + Namespaces []Namespace `yaml:"namespaces"` + Process LeaderProcess `yaml:"process"` + Store Store `yaml:"store"` + DataCompression bool `yaml:"dataCompression" default:"true"` } // Store is a generic container for any storage configuration that should be parsed by the implementation. @@ -132,10 +133,11 @@ func GetShardDistributionFromExternal(in config.ShardDistribution) ShardDistribu } return ShardDistribution{ - LeaderStore: Store(in.LeaderStore), - Store: Store(in.Store), - Election: Election(in.Election), - Namespaces: namespaces, - Process: LeaderProcess(in.Process), + LeaderStore: Store(in.LeaderStore), + Store: Store(in.Store), + Election: Election(in.Election), + Namespaces: namespaces, + Process: LeaderProcess(in.Process), + DataCompression: in.DataCompression, } } diff --git a/service/sharddistributor/store/etcd/executorstore/common/compression.go b/service/sharddistributor/store/etcd/executorstore/common/compression.go new file mode 100644 index 00000000000..74ccd893d66 --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/common/compression.go @@ -0,0 +1,79 @@ +package common + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/golang/snappy" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/types" +) + +var ( + // snappyMagic is a magic prefix prepended to compressed data to distinguish it from uncompressed data + snappyMagic = []byte{0xff, 0x06, 0x00, 0x00, 's', 'N', 'a', 'P', 'p', 'Y'} +) + +// Compress compresses data using snappy's framed format if compression is enabled +// If compressionEnabled returns false, returns the data as-is without compression +func Compress(data []byte, compressionEnabled bool) ([]byte, error) { + if !compressionEnabled { + return data, nil + } + + var buf bytes.Buffer + w := snappy.NewBufferedWriter(&buf) + + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Decompress decodes snappy-compressed data +// If the snappy header is present, it will successfully decompress it or return an error +// If the snappy header is absent, it treats data as uncompressed and returns it as-is +func Decompress(data []byte) ([]byte, error) { + if len(data) == 0 { + return data, nil + } + + if !hasFramedHeader(data) { + return data, nil + } + r := snappy.NewReader(bytes.NewReader(data)) + decompressed, err := io.ReadAll(r) + if err != nil { + return nil, err + } + return decompressed, nil +} + +// CompressedActiveStatus returns the compressed active status string. +func CompressedActiveStatus(compressionEnabled bool) string { + compressed, _ := Compress([]byte(fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)), compressionEnabled) + return string(compressed) +} + +// DecompressAndUnmarshal decompresses data and unmarshals it into the target +// errorContext is used to provide meaningful error messages +func DecompressAndUnmarshal(data []byte, target interface{}, errorContext string, logger log.Logger) error { + decompressed, err := Decompress(data) + if err != nil { + return fmt.Errorf("decompress %s: %w", errorContext, err) + } + if err := json.Unmarshal(decompressed, target); err != nil { + return fmt.Errorf("unmarshal %s: %w", errorContext, err) + } + return nil +} + +func hasFramedHeader(b []byte) bool { + return len(b) >= len(snappyMagic) && bytes.Equal(b[:len(snappyMagic)], snappyMagic) +} diff --git a/service/sharddistributor/store/etcd/executorstore/common/compression_test.go b/service/sharddistributor/store/etcd/executorstore/common/compression_test.go new file mode 100644 index 00000000000..ab77ff16f14 --- /dev/null +++ b/service/sharddistributor/store/etcd/executorstore/common/compression_test.go @@ -0,0 +1,169 @@ +package common + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/types" +) + +func TestCompressDecompress(t *testing.T) { + original := []byte(`{"status":"ACTIVE","shards":["shard1","shard2"]}`) + + compressed, err := Compress(original, true) + require.NoError(t, err) + require.NotNil(t, compressed) + + assert.NotEqual(t, original, compressed) + + decompressed, err := Decompress(compressed) + require.NoError(t, err) + assert.Equal(t, original, decompressed) +} + +func TestCompressWithDisabledFlag(t *testing.T) { + original := []byte(`{"status":"ACTIVE","shards":["shard1","shard2"]}`) + + result, err := Compress(original, false) + require.NoError(t, err) + assert.Equal(t, original, result, "Should return original data when compression is disabled") + + var status map[string]interface{} + err = json.Unmarshal(result, &status) + require.NoError(t, err) + assert.Equal(t, "ACTIVE", status["status"]) +} + +func TestDecompress(t *testing.T) { + t.Run("Empty data", func(t *testing.T) { + decompressed, err := Decompress([]byte{}) + require.NoError(t, err) + assert.Empty(t, decompressed) + }) + + t.Run("Nil data", func(t *testing.T) { + decompressed, err := Decompress(nil) + require.NoError(t, err) + assert.Nil(t, decompressed) + }) + + t.Run("Uncompressed data", func(t *testing.T) { + uncompressed := []byte(`{"status":"ACTIVE"}`) + + result, err := Decompress(uncompressed) + require.NoError(t, err) + assert.Equal(t, uncompressed, result, "Uncompressed data is returned as-is") + + var status map[string]string + err = json.Unmarshal(result, &status) + require.NoError(t, err) + assert.Equal(t, "ACTIVE", status["status"]) + }) + + t.Run("Compressed data", func(t *testing.T) { + original := []byte(`{"status":"DRAINING"}`) + compressed, err := Compress(original, true) + require.NoError(t, err) + + result, err := Decompress(compressed) + require.NoError(t, err) + assert.Equal(t, original, result) + + var status map[string]string + err = json.Unmarshal(result, &status) + require.NoError(t, err) + assert.Equal(t, "DRAINING", status["status"]) + }) +} + +func TestDecompressAndUnmarshal(t *testing.T) { + type testData struct { + Status string `json:"status"` + Shards []string `json:"shards"` + } + logger := testlogger.New(t) + + t.Run("Uncompressed data", func(t *testing.T) { + data := []byte(`{"status":"ACTIVE","shards":["shard1","shard2"]}`) + + var result testData + err := DecompressAndUnmarshal(data, &result, "test data", logger) + require.NoError(t, err) + assert.Equal(t, "ACTIVE", result.Status) + assert.Equal(t, []string{"shard1", "shard2"}, result.Shards) + }) + + t.Run("Compressed data", func(t *testing.T) { + original := testData{ + Status: "DRAINING", + Shards: []string{"shard3", "shard4"}, + } + originalJSON, _ := json.Marshal(original) + compressed, err := Compress(originalJSON, true) + require.NoError(t, err) + + var result testData + err = DecompressAndUnmarshal(compressed, &result, "test data", logger) + require.NoError(t, err) + assert.Equal(t, original.Status, result.Status) + assert.Equal(t, original.Shards, result.Shards) + }) + + t.Run("Invalid JSON in uncompressed data", func(t *testing.T) { + invalidJSON := []byte(`{invalid json}`) + + var result testData + err := DecompressAndUnmarshal(invalidJSON, &result, "test data", logger) + require.Error(t, err) + assert.Contains(t, err.Error(), "unmarshal test data") + }) +} + +func TestCompressedActiveStatus(t *testing.T) { + t.Run("Compression enabled", func(t *testing.T) { + compressed := CompressedActiveStatus(true) + require.NotEmpty(t, compressed) + + decompressed, err := Decompress([]byte(compressed)) + require.NoError(t, err) + + var status types.ExecutorStatus + err = json.Unmarshal(decompressed, &status) + require.NoError(t, err) + assert.Equal(t, types.ExecutorStatusACTIVE, status) + }) + + t.Run("Compression disabled", func(t *testing.T) { + uncompressed := CompressedActiveStatus(false) + require.NotEmpty(t, uncompressed) + + var status types.ExecutorStatus + err := json.Unmarshal([]byte(uncompressed), &status) + require.NoError(t, err) + assert.Equal(t, types.ExecutorStatusACTIVE, status) + }) +} + +func TestHasFramedHeader(t *testing.T) { + t.Run("Data with header", func(t *testing.T) { + data := append(snappyMagic, []byte("some data")...) + assert.True(t, hasFramedHeader(data)) + }) + + t.Run("Data without header", func(t *testing.T) { + data := []byte(`{"json":"data"}`) + assert.False(t, hasFramedHeader(data)) + }) + + t.Run("Empty data", func(t *testing.T) { + assert.False(t, hasFramedHeader([]byte{})) + }) + + t.Run("Data shorter than header", func(t *testing.T) { + assert.False(t, hasFramedHeader([]byte{0xff, 0x06})) + }) +} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index bf32a7d54a6..6d62535a670 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -20,18 +20,16 @@ import ( "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/shardcache" ) -var ( - _executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE) -) - type executorStoreImpl struct { - client *clientv3.Client - prefix string - logger log.Logger - shardCache *shardcache.ShardToExecutorCache + client *clientv3.Client + prefix string + dataCompression bool + logger log.Logger + shardCache *shardcache.ShardToExecutorCache } // ExecutorStoreParams defines the dependencies for the etcd store, for use with fx. @@ -71,10 +69,11 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) { shardCache := shardcache.NewShardToExecutorCache(etcdCfg.Prefix, etcdClient, p.Logger) store := &executorStoreImpl{ - client: etcdClient, - prefix: etcdCfg.Prefix, - logger: p.Logger, - shardCache: shardCache, + client: etcdClient, + prefix: etcdCfg.Prefix, + dataCompression: p.Cfg.DataCompression, + logger: p.Logger, + shardCache: shardCache, } p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop)) @@ -117,11 +116,22 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec return fmt.Errorf("marshal assinged shards: %w", err) } + // Compress data before writing to etcd + compressedReportedShards, err := common.Compress(reportedShardsData, s.dataCompression) + if err != nil { + return fmt.Errorf("compress reported shards: %w", err) + } + + compressedState, err := common.Compress(jsonState, s.dataCompression) + if err != nil { + return fmt.Errorf("compress state: %w", err) + } + // Build all operations including metadata ops := []clientv3.Op{ clientv3.OpPut(heartbeatETCDKey, strconv.FormatInt(request.LastHeartbeat, 10)), - clientv3.OpPut(stateETCDKey, string(jsonState)), - clientv3.OpPut(reportedShardsETCDKey, string(reportedShardsData)), + clientv3.OpPut(stateETCDKey, string(compressedState)), + clientv3.OpPut(reportedShardsETCDKey, string(compressedReportedShards)), } for key, value := range request.Metadata { metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key) @@ -174,19 +184,16 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, } heartbeatState.LastHeartbeat = timestamp case etcdkeys.ExecutorStatusKey: - err := json.Unmarshal([]byte(value), &heartbeatState.Status) - if err != nil { - return nil, nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value) + if err := common.DecompressAndUnmarshal(kv.Value, &heartbeatState.Status, "heartbeat state", s.logger); err != nil { + return nil, nil, err } case etcdkeys.ExecutorReportedShardsKey: - err = json.Unmarshal(kv.Value, &heartbeatState.ReportedShards) - if err != nil { - return nil, nil, fmt.Errorf("unmarshal reported shards: %w", err) + if err := common.DecompressAndUnmarshal(kv.Value, &heartbeatState.ReportedShards, "reported shards", s.logger); err != nil { + return nil, nil, err } case etcdkeys.ExecutorAssignedStateKey: - err = json.Unmarshal(kv.Value, &assignedState) - if err != nil { - return nil, nil, fmt.Errorf("unmarshal assigned shards: %w", err) + if err := common.DecompressAndUnmarshal(kv.Value, &assignedState, "assigned state", s.logger); err != nil { + return nil, nil, err } } } @@ -225,19 +232,16 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st timestamp, _ := strconv.ParseInt(value, 10, 64) heartbeat.LastHeartbeat = timestamp case etcdkeys.ExecutorStatusKey: - err := json.Unmarshal([]byte(value), &heartbeat.Status) - if err != nil { - return nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value) + if err := common.DecompressAndUnmarshal(kv.Value, &heartbeat.Status, "heartbeat state", s.logger); err != nil { + return nil, err } case etcdkeys.ExecutorReportedShardsKey: - err = json.Unmarshal(kv.Value, &heartbeat.ReportedShards) - if err != nil { - return nil, fmt.Errorf("unmarshal reported shards: %w", err) + if err := common.DecompressAndUnmarshal(kv.Value, &heartbeat.ReportedShards, "reported shards", s.logger); err != nil { + return nil, err } case etcdkeys.ExecutorAssignedStateKey: - err = json.Unmarshal(kv.Value, &assigned) - if err != nil { - return nil, fmt.Errorf("unmarshal assigned shards: %w, %s", err, value) + if err := common.DecompressAndUnmarshal(kv.Value, &assigned, "assigned state", s.logger); err != nil { + return nil, err } assigned.ModRevision = kv.ModRevision } @@ -309,7 +313,11 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, if err != nil { return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err) } - ops = append(ops, clientv3.OpPut(executorStateKey, string(value))) + compressedValue, err := common.Compress(value, s.dataCompression) + if err != nil { + return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err) + } + ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue))) comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision)) } @@ -388,8 +396,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, // If the executor already has shards, load its state. kv := resp.Kvs[0] modRevision = kv.ModRevision - if err := json.Unmarshal(kv.Value, &state); err != nil { - return fmt.Errorf("unmarshal assigned state: %w", err) + if err := common.DecompressAndUnmarshal(kv.Value, &state, "assigned state", s.logger); err != nil { + return err } } else { // If this is the first shard, initialize the state map. @@ -406,11 +414,16 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, return fmt.Errorf("marshal new assigned state: %w", err) } + compressedStateValue, err := common.Compress(newStateValue, s.dataCompression) + if err != nil { + return fmt.Errorf("compress new assigned state: %w", err) + } + var comparisons []clientv3.Cmp // 3. Prepare and commit the transaction with three atomic checks. // a) Check that the executor's status is ACTIVE. - comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSON)) + comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", common.CompressedActiveStatus(s.dataCompression))) // b) Check that the assigned_state key hasn't been changed by another process. comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision)) // c) Check that the cache is up to date. @@ -431,7 +444,7 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, txnResp, err := s.client.Txn(ctx). If(comparisons...). - Then(clientv3.OpPut(assignedState, string(newStateValue))). + Then(clientv3.OpPut(assignedState, string(compressedStateValue))). Commit() if err != nil { @@ -448,8 +461,13 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, if err != nil || len(currentStatusResp.Kvs) == 0 { return store.ErrExecutorNotFound } - if string(currentStatusResp.Kvs[0].Value) != _executorStatusRunningJSON { - return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, currentStatusResp.Kvs[0].Value) + if string(currentStatusResp.Kvs[0].Value) != common.CompressedActiveStatus(s.dataCompression) { + // Decompress the status for an error message + decompressedStatus, err := common.Decompress(currentStatusResp.Kvs[0].Value) + if err != nil { + s.logger.Warn(fmt.Sprintf("failed to decompress %s", currentStatusResp.Kvs[0].Value), tag.Error(err)) + } + return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, string(decompressedStatus)) } s.logger.Info("Assign shard transaction failed due to a conflict. Retrying...", tag.ShardNamespace(namespace), tag.ShardKey(shardID), tag.ShardExecutor(executorID)) diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index 903693c00d5..d10e27d3d11 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -15,6 +15,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" "github.com/uber/cadence/service/sharddistributor/store/etcd/leaderstore" "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" ) @@ -63,14 +64,18 @@ func TestRecordHeartbeat(t *testing.T) { resp, err = tc.Client.Get(ctx, stateKey) require.NoError(t, err) require.Equal(t, int64(1), resp.Count, "State key should exist") - assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(resp.Kvs[0].Value)) + decompressedState, err := common.Decompress(resp.Kvs[0].Value) + require.NoError(t, err) + assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(decompressedState)) resp, err = tc.Client.Get(ctx, reportedShardsKey) require.NoError(t, err) require.Equal(t, int64(1), resp.Count, "Reported shards key should exist") + decompressedReportedShards, err := common.Decompress(resp.Kvs[0].Value) + require.NoError(t, err) var reportedShards map[string]*types.ShardStatusReport - err = json.Unmarshal(resp.Kvs[0].Value, &reportedShards) + err = json.Unmarshal(decompressedReportedShards, &reportedShards) require.NoError(t, err) require.Len(t, reportedShards, 1) assert.Equal(t, types.ShardStatusREADY, reportedShards["shard-TestRecordHeartbeat"].Status) @@ -369,7 +374,9 @@ func TestSubscribe(t *testing.T) { // Now update the reported shards, which IS a significant change reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards") require.NoError(t, err) - _, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`) + compressedShards, err := common.Compress([]byte(`{"shard-1":{"status":"running"}}`), tc.LeaderCfg.DataCompression) + require.NoError(t, err) + _, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards)) require.NoError(t, err) select { diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go index 87019bc891d..484f6d3a661 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go @@ -2,7 +2,6 @@ package shardcache import ( "context" - "encoding/json" "fmt" "strings" "sync" @@ -13,6 +12,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" ) type namespaceShardToExecutor struct { @@ -150,9 +150,9 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error { shardOwner := getOrCreateShardOwner(shardOwners, executorID) var assignedState store.AssignedState - err = json.Unmarshal(kv.Value, &assignedState) + err = common.DecompressAndUnmarshal(kv.Value, &assignedState, "assigned state", n.logger) if err != nil { - return fmt.Errorf("unmarshal assigned state: %w", err) + return err } for shardID := range assignedState.AssignedShards { n.shardToExecutor[shardID] = shardOwner diff --git a/service/sharddistributor/store/etcd/go.mod b/service/sharddistributor/store/etcd/go.mod index 3893a026804..912afd9944f 100644 --- a/service/sharddistributor/store/etcd/go.mod +++ b/service/sharddistributor/store/etcd/go.mod @@ -7,12 +7,11 @@ toolchain go1.23.4 replace github.com/uber/cadence => ../../../.. require ( + github.com/golang/snappy v1.0.0 github.com/stretchr/testify v1.10.0 github.com/uber/cadence v0.0.0-00010101000000-000000000000 go.etcd.io/etcd/client/v3 v3.5.5 go.uber.org/fx v1.23.0 - go.uber.org/goleak v1.2.0 - go.uber.org/mock v0.5.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -69,6 +68,7 @@ require ( go.uber.org/cadence v0.19.0 // indirect go.uber.org/config v1.4.0 // indirect go.uber.org/dig v1.18.0 // indirect + go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/net/metrics v1.3.0 // indirect go.uber.org/thriftrw v1.29.2 // indirect diff --git a/service/sharddistributor/store/etcd/go.sum b/service/sharddistributor/store/etcd/go.sum index 38f11abe75a..ac5a1c94771 100644 --- a/service/sharddistributor/store/etcd/go.sum +++ b/service/sharddistributor/store/etcd/go.sum @@ -113,6 +113,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/service/sharddistributor/store/etcd/testhelper/testhelper.go b/service/sharddistributor/store/etcd/testhelper/testhelper.go index 2334ea453e1..01c2f114ab2 100644 --- a/service/sharddistributor/store/etcd/testhelper/testhelper.go +++ b/service/sharddistributor/store/etcd/testhelper/testhelper.go @@ -54,8 +54,9 @@ func SetupStoreTestCluster(t *testing.T) *StoreTestCluster { require.NoError(t, err) leaderCfg := shardDistributorCfg.ShardDistribution{ - Store: shardDistributorCfg.Store{StorageParams: yamlNode}, - LeaderStore: shardDistributorCfg.Store{StorageParams: yamlNode}, + Store: shardDistributorCfg.Store{StorageParams: yamlNode}, + LeaderStore: shardDistributorCfg.Store{StorageParams: yamlNode}, + DataCompression: true, } client, err := clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second})