Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 85 additions & 18 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"time"

"github.com/golang/snappy"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/fx"

Expand All @@ -24,9 +25,25 @@ import (
)

var (
_executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)
_executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)
_executorStatusRunningJSONCompressed string
)

func init() {
compressed, _ := compressJSON([]byte(_executorStatusRunningJSON))
_executorStatusRunningJSONCompressed = string(compressed)
}

// compressJSON compresses JSON data using snappy compression
func compressJSON(data []byte) ([]byte, error) {
return snappy.Encode(nil, data), nil
}

// decompressJSON decompresses snappy-compressed data
func decompressJSON(data []byte) ([]byte, error) {
return snappy.Decode(nil, data)
}

type executorStoreImpl struct {
client *clientv3.Client
prefix string
Expand Down Expand Up @@ -121,11 +138,22 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
return fmt.Errorf("marshal assinged shards: %w", err)
}

// Compress data before writing to etcd
compressedReportedShards, err := compressJSON(reportedShardsData)
if err != nil {
return fmt.Errorf("compress reported shards: %w", err)
}

compressedState, err := compressJSON(jsonState)
if err != nil {
return fmt.Errorf("compress state: %w", err)
}

// Atomically update both the timestamp and the state.
_, err = s.client.Txn(ctx).Then(
clientv3.OpPut(heartbeatETCDKey, strconv.FormatInt(request.LastHeartbeat, 10)),
clientv3.OpPut(stateETCDKey, string(jsonState)),
clientv3.OpPut(reportedShardsETCDKey, string(reportedShardsData)),
clientv3.OpPut(stateETCDKey, string(compressedState)),
clientv3.OpPut(reportedShardsETCDKey, string(compressedReportedShards)),
).Commit()

if err != nil {
Expand Down Expand Up @@ -171,17 +199,29 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
}
heartbeatState.LastHeartbeat = timestamp
case etcdkeys.ExecutorStatusKey:
err := json.Unmarshal([]byte(value), &heartbeatState.Status)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value)
return nil, nil, fmt.Errorf("decompress heartbeat state: %w", err)
}
err = json.Unmarshal(decompressed, &heartbeatState.Status)
if err != nil {
return nil, nil, fmt.Errorf("parse heartbeat state: %w", err)
}
case etcdkeys.ExecutorReportedShardsKey:
err = json.Unmarshal(kv.Value, &heartbeatState.ReportedShards)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, nil, fmt.Errorf("decompress reported shards: %w", err)
}
err = json.Unmarshal(decompressed, &heartbeatState.ReportedShards)
if err != nil {
return nil, nil, fmt.Errorf("unmarshal reported shards: %w", err)
}
case etcdkeys.ExecutorAssignedStateKey:
err = json.Unmarshal(kv.Value, &assignedState)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, nil, fmt.Errorf("decompress assigned state: %w", err)
}
err = json.Unmarshal(decompressed, &assignedState)
if err != nil {
return nil, nil, fmt.Errorf("unmarshal assigned shards: %w", err)
}
Expand Down Expand Up @@ -222,19 +262,31 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
timestamp, _ := strconv.ParseInt(value, 10, 64)
heartbeat.LastHeartbeat = timestamp
case etcdkeys.ExecutorStatusKey:
err := json.Unmarshal([]byte(value), &heartbeat.Status)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, fmt.Errorf("decompress heartbeat state: %w", err)
}
err = json.Unmarshal(decompressed, &heartbeat.Status)
if err != nil {
return nil, fmt.Errorf("parse heartbeat state: %w, value %s", err, value)
return nil, fmt.Errorf("parse heartbeat state: %w", err)
}
case etcdkeys.ExecutorReportedShardsKey:
err = json.Unmarshal(kv.Value, &heartbeat.ReportedShards)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, fmt.Errorf("decompress reported shards: %w", err)
}
err = json.Unmarshal(decompressed, &heartbeat.ReportedShards)
if err != nil {
return nil, fmt.Errorf("unmarshal reported shards: %w", err)
}
case etcdkeys.ExecutorAssignedStateKey:
err = json.Unmarshal(kv.Value, &assigned)
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return nil, fmt.Errorf("decompress assigned state: %w", err)
}
err = json.Unmarshal(decompressed, &assigned)
if err != nil {
return nil, fmt.Errorf("unmarshal assigned shards: %w, %s", err, value)
return nil, fmt.Errorf("unmarshal assigned shards: %w", err)
}
assigned.ModRevision = kv.ModRevision
}
Expand Down Expand Up @@ -306,7 +358,11 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
if err != nil {
return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err)
}
ops = append(ops, clientv3.OpPut(executorStateKey, string(value)))
compressedValue, err := compressJSON(value)
if err != nil {
return fmt.Errorf("compress assigned shards for executor %s: %w", executorID, err)
}
ops = append(ops, clientv3.OpPut(executorStateKey, string(compressedValue)))
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", state.ModRevision))
}

Expand Down Expand Up @@ -385,7 +441,11 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
// If the executor already has shards, load its state.
kv := resp.Kvs[0]
modRevision = kv.ModRevision
if err := json.Unmarshal(kv.Value, &state); err != nil {
decompressed, err := decompressJSON(kv.Value)
if err != nil {
return fmt.Errorf("decompress assigned state: %w", err)
}
if err := json.Unmarshal(decompressed, &state); err != nil {
return fmt.Errorf("unmarshal assigned state: %w", err)
}
} else {
Expand All @@ -403,11 +463,16 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
return fmt.Errorf("marshal new assigned state: %w", err)
}

compressedStateValue, err := compressJSON(newStateValue)
if err != nil {
return fmt.Errorf("compress new assigned state: %w", err)
}

var comparisons []clientv3.Cmp

// 3. Prepare and commit the transaction with three atomic checks.
// a) Check that the executor's status is ACTIVE.
comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSON))
comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSONCompressed))
// b) Check that the assigned_state key hasn't been changed by another process.
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision))
// c) Check that the cache is up to date.
Expand All @@ -428,7 +493,7 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,

txnResp, err := s.client.Txn(ctx).
If(comparisons...).
Then(clientv3.OpPut(assignedState, string(newStateValue))).
Then(clientv3.OpPut(assignedState, string(compressedStateValue))).
Commit()

if err != nil {
Expand All @@ -445,8 +510,10 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
if err != nil || len(currentStatusResp.Kvs) == 0 {
return store.ErrExecutorNotFound
}
if string(currentStatusResp.Kvs[0].Value) != _executorStatusRunningJSON {
return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, currentStatusResp.Kvs[0].Value)
if string(currentStatusResp.Kvs[0].Value) != _executorStatusRunningJSONCompressed {
// Decompress the status for error message
decompressedStatus, _ := decompressJSON(currentStatusResp.Kvs[0].Value)
return fmt.Errorf(`%w: executor status is %s"`, store.ErrVersionConflict, string(decompressedStatus))
}

s.logger.Info("Assign shard transaction failed due to a conflict. Retrying...", tag.ShardNamespace(namespace), tag.ShardKey(shardID), tag.ShardExecutor(executorID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,18 @@ func TestRecordHeartbeat(t *testing.T) {
resp, err = tc.Client.Get(ctx, stateKey)
require.NoError(t, err)
require.Equal(t, int64(1), resp.Count, "State key should exist")
assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(resp.Kvs[0].Value))
decompressedState, err := decompressJSON(resp.Kvs[0].Value)
require.NoError(t, err)
assert.Equal(t, stringStatus(types.ExecutorStatusACTIVE), string(decompressedState))

resp, err = tc.Client.Get(ctx, reportedShardsKey)
require.NoError(t, err)
require.Equal(t, int64(1), resp.Count, "Reported shards key should exist")

decompressedReportedShards, err := decompressJSON(resp.Kvs[0].Value)
require.NoError(t, err)
var reportedShards map[string]*types.ShardStatusReport
err = json.Unmarshal(resp.Kvs[0].Value, &reportedShards)
err = json.Unmarshal(decompressedReportedShards, &reportedShards)
require.NoError(t, err)
require.Len(t, reportedShards, 1)
assert.Equal(t, types.ShardStatusREADY, reportedShards["shard-TestRecordHeartbeat"].Status)
Expand Down Expand Up @@ -353,7 +357,9 @@ func TestSubscribe(t *testing.T) {
// Now update the reported shards, which IS a significant change
reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards")
require.NoError(t, err)
_, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`)
compressedShards, err := compressJSON([]byte(`{"shard-1":{"status":"running"}}`))
require.NoError(t, err)
_, err = tc.Client.Put(ctx, reportedShardsKey, string(compressedShards))
require.NoError(t, err)

select {
Expand Down
4 changes: 2 additions & 2 deletions service/sharddistributor/store/etcd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ toolchain go1.23.4
replace github.com/uber/cadence => ../../../..

require (
github.com/golang/snappy v1.0.0
github.com/stretchr/testify v1.10.0
github.com/uber/cadence v0.0.0-00010101000000-000000000000
go.etcd.io/etcd/client/v3 v3.5.5
go.uber.org/fx v1.23.0
go.uber.org/goleak v1.2.0
go.uber.org/mock v0.5.0
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -69,6 +68,7 @@ require (
go.uber.org/cadence v0.19.0 // indirect
go.uber.org/config v1.4.0 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/net/metrics v1.3.0 // indirect
go.uber.org/thriftrw v1.29.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions service/sharddistributor/store/etcd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
Loading