Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 212 additions & 46 deletions .gen/proto/sharddistributor/v1/executor.pb.go

Large diffs are not rendered by default.

91 changes: 47 additions & 44 deletions .gen/proto/sharddistributor/v1/executor.pb.yarpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/types/mapper/proto/sharddistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func FromShardDistributorExecutorHeartbeatRequest(t *types.ExecutorHeartbeatRequ
ExecutorId: t.GetExecutorID(),
Status: status,
ShardStatusReports: shardStatusReports,
Metadata: t.GetMetadata(),
}
}

Expand Down Expand Up @@ -173,6 +174,7 @@ func ToShardDistributorExecutorHeartbeatRequest(t *sharddistributorv1.HeartbeatR
ExecutorID: t.GetExecutorId(),
Status: status,
ShardStatusReports: shardStatusReports,
Metadata: t.GetMetadata(),
}
}

Expand Down
8 changes: 8 additions & 0 deletions common/types/sharddistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ExecutorHeartbeatRequest struct {
ExecutorID string
Status ExecutorStatus
ShardStatusReports map[string]*ShardStatusReport
Metadata map[string]string
}

func (v *ExecutorHeartbeatRequest) GetNamespace() (o string) {
Expand Down Expand Up @@ -122,6 +123,13 @@ func (v *ExecutorHeartbeatRequest) GetShardStatusReports() (o map[string]*ShardS
return
}

func (v *ExecutorHeartbeatRequest) GetMetadata() (o map[string]string) {
if v != nil {
return v.Metadata
}
return
}

// ExecutorStatus is persisted to the DB with a string value mapping.
// Beware - if we want to change the name - it should be backward compatible and should be done in two steps.
type ExecutorStatus int32
Expand Down
4 changes: 4 additions & 0 deletions common/types/testdata/service_sharddistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ var (
ShardLoad: 0.75,
},
},
Metadata: map[string]string{
"key-1": "value-1",
"key-2": "value-2",
},
}
ShardDistributorExecutorHeartbeatResponse = types.ExecutorHeartbeatResponse{
ShardAssignments: map[string]*types.ShardAssignment{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message HeartbeatRequest {
string executor_id = 2;
ExecutorStatus status = 3;
map<string, ShardStatusReport> shard_status_reports = 4;
map<string, string> metadata = 5;
}

enum ExecutorStatus {
Expand Down
27 changes: 27 additions & 0 deletions service/sharddistributor/handler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (

const (
_heartbeatRefreshRate = 2 * time.Second

_maxMetadataKeys = 32
_maxMetadataKeyLength = 128
_maxMetadataValueSize = 512 * 1024 // 512KB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should move these values to configuration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could even make them a per namespace dynamic config

)

type executor struct {
Expand Down Expand Up @@ -78,6 +82,11 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
LastHeartbeat: now.Unix(),
Status: request.Status,
ReportedShards: request.ShardStatusReports,
Metadata: request.GetMetadata(),
}

if err := validateMetadata(newHeartbeat.Metadata); err != nil {
return nil, fmt.Errorf("validate metadata: %w", err)
}

err = h.storage.RecordHeartbeat(ctx, request.Namespace, request.ExecutorID, newHeartbeat)
Expand Down Expand Up @@ -130,3 +139,21 @@ func _convertResponse(shards *store.AssignedState, mode types.MigrationMode) *ty
res.MigrationMode = mode
return res
}

func validateMetadata(metadata map[string]string) error {
if len(metadata) > _maxMetadataKeys {
return fmt.Errorf("metadata has %d keys, which exceeds the maximum of %d", len(metadata), _maxMetadataKeys)
}

for key, value := range metadata {
if len(key) > _maxMetadataKeyLength {
return fmt.Errorf("metadata key %q has length %d, which exceeds the maximum of %d", key, len(key), _maxMetadataKeyLength)
}

if len(value) > _maxMetadataValueSize {
return fmt.Errorf("metadata value for key %q has size %d bytes, which exceeds the maximum of %d bytes", key, len(value), _maxMetadataValueSize)
}
}

return nil
}
130 changes: 130 additions & 0 deletions service/sharddistributor/handler/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,136 @@ func TestHeartbeat(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), expectedErr.Error())
})

// Test Case 11: Heartbeat with metadata validation failure - too many keys
t.Run("MetadataValidationTooManyKeys", func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSourceAt(now)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

// Create metadata with more than max allowed keys
metadata := make(map[string]string)
for i := 0; i < _maxMetadataKeys+1; i++ {
metadata[string(rune('a'+i))] = "value"
}

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
ExecutorID: executorID,
Status: types.ExecutorStatusACTIVE,
Metadata: metadata,
}

mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(nil, nil, store.ErrExecutorNotFound)

_, err := handler.Heartbeat(ctx, req)
require.Error(t, err)
require.Contains(t, err.Error(), "validate metadata")
require.Contains(t, err.Error(), "exceeds the maximum")
})
}

func TestValidateMetadata(t *testing.T) {
// Helper function to generate metadata with N keys
makeMetadataWithKeys := func(n int) map[string]string {
metadata := make(map[string]string)
for i := 0; i < n; i++ {
metadata[string(rune('a'+i))] = "value"
}
return metadata
}

testCases := []struct {
name string
metadata map[string]string
expectError bool
errorSubstring string
}{
{
name: "ValidMetadata",
metadata: map[string]string{
"key1": "value1",
"key2": "value2",
},
expectError: false,
},
{
name: "EmptyMetadata",
metadata: map[string]string{},
expectError: false,
},
{
name: "NilMetadata",
metadata: nil,
expectError: false,
},
{
name: "TooManyKeys",
metadata: makeMetadataWithKeys(_maxMetadataKeys + 1),
expectError: true,
errorSubstring: "exceeds the maximum of 32",
},
{
name: "ExactlyMaxKeys",
metadata: makeMetadataWithKeys(_maxMetadataKeys),
expectError: false,
},
{
name: "KeyTooLong",
metadata: map[string]string{
string(make([]byte, _maxMetadataKeyLength+1)): "value",
},
expectError: true,
errorSubstring: "exceeds the maximum of 128",
},
{
name: "KeyExactlyMaxLength",
metadata: map[string]string{
string(make([]byte, _maxMetadataKeyLength)): "value",
},
expectError: false,
},
{
name: "ValueTooLarge",
metadata: map[string]string{
"key": string(make([]byte, _maxMetadataValueSize+1)),
},
expectError: true,
errorSubstring: "exceeds the maximum of 524288 bytes",
},
{
name: "ValueExactlyMaxSize",
metadata: map[string]string{
"key": string(make([]byte, _maxMetadataValueSize)),
},
expectError: false,
},
{
name: "MultipleValidationErrors",
metadata: func() map[string]string {
metadata := makeMetadataWithKeys(_maxMetadataKeys + 1)
longKey := string(make([]byte, _maxMetadataKeyLength+1))
metadata[longKey] = "value"
return metadata
}(),
expectError: true,
errorSubstring: "exceeds the maximum of 32", // First validation error
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validateMetadata(tc.metadata)
if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.errorSubstring)
} else {
require.NoError(t, err)
}
})
}
}

func TestConvertResponse(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ const (
ExecutorReportedShardsKey = "reported_shards"
ExecutorAssignedStateKey = "assigned_state"
ShardAssignedKey = "assigned"
ExecutorMetadataKey = "metadata"
)

var validKeyTypes = []string{
ExecutorHeartbeatKey,
ExecutorStatusKey,
ExecutorReportedShardsKey,
ExecutorAssignedStateKey,
ExecutorMetadataKey,
}

func isValidKeyType(key string) bool {
Expand Down Expand Up @@ -57,3 +59,12 @@ func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType
}
return parts[0], parts[1], nil
}

func BuildMetadataKey(prefix string, namespace, executorID, metadataKey string) string {
metadataKeyPrefix, err := BuildExecutorKey(prefix, namespace, executorID, ExecutorMetadataKey)
if err != nil {
// This should never happen since ExecutorMetadataKey is a valid constant
panic(fmt.Sprintf("BuildMetadataKey: unexpected error building executor key: %v", err))
}
return fmt.Sprintf("%s/%s", metadataKeyPrefix, metadataKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ func TestParseExecutorKey(t *testing.T) {
_, _, err = ParseExecutorKey("/cadence", "test-ns", "/cadence/test-ns/executors/exec-1/heartbeat/extra")
assert.ErrorContains(t, err, "unexpected key format: /cadence/test-ns/executors/exec-1/heartbeat/extra")
}

func TestBuildMetadataKey(t *testing.T) {
got := BuildMetadataKey("/cadence", "test-ns", "exec-1", "my-metadata-key")
assert.Equal(t, "/cadence/test-ns/executors/exec-1/metadata/my-metadata-key", got)
}
13 changes: 10 additions & 3 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,19 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
return fmt.Errorf("marshal assinged shards: %w", err)
}

// Atomically update both the timestamp and the state.
_, err = s.client.Txn(ctx).Then(
// Build all operations including metadata
ops := []clientv3.Op{
clientv3.OpPut(heartbeatETCDKey, strconv.FormatInt(request.LastHeartbeat, 10)),
clientv3.OpPut(stateETCDKey, string(jsonState)),
clientv3.OpPut(reportedShardsETCDKey, string(reportedShardsData)),
).Commit()
}
for key, value := range request.Metadata {
metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key)
ops = append(ops, clientv3.OpPut(metadataKey, value))
}

// Atomically update both the timestamp and the state.
_, err = s.client.Txn(ctx).Then(ops...).Commit()

if err != nil {
return fmt.Errorf("record heartbeat: %w", err)
Expand Down
Loading
Loading