diff --git a/pkg/api/handlers_test.go b/pkg/api/handlers_test.go index 01f78e55..aecc4121 100644 --- a/pkg/api/handlers_test.go +++ b/pkg/api/handlers_test.go @@ -1020,8 +1020,8 @@ func TestExecutePlanPersistsShardPlans(t *testing.T) { }}, }}, Shards: []*ternv1.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "commerce", Shard: "80-", NeedsChange: false}, + {Namespace: "commerce", Shard: "-80"}, + {Namespace: "commerce", Shard: "80-"}, }, }, } @@ -1057,8 +1057,8 @@ func TestExecutePlanPersistsShardPlans(t *testing.T) { require.NotNil(t, resp) require.NotNil(t, plans.created) assert.Equal(t, []storage.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "commerce", Shard: "80-", NeedsChange: false}, + {Namespace: "commerce", Shard: "-80"}, + {Namespace: "commerce", Shard: "80-"}, }, plans.created.Shards) } @@ -1898,7 +1898,7 @@ func TestCreateStoredApplyFansOutOperationsForResolvedTargets(t *testing.T) { controls: &memoryControlRequestStore{}, }, cfg, map[string]tern.Client{}, logger) - apply, storedApplyID, err := svc.createStoredApply(t.Context(), executeApplyTestPlan(), ApplyRequest{Environment: "staging"}, nil, "apply-fanout", false) + apply, storedApplyID, err := svc.createStoredApply(t.Context(), executeApplyTestPlan(), ApplyRequest{Environment: "staging"}, nil, "apply-fanout") require.NoError(t, err) assert.Equal(t, int64(123), storedApplyID) @@ -1941,10 +1941,11 @@ func TestCreateStoredApplyFansOutShardedPlanOperations(t *testing.T) { }, }, } + usersChange := storage.TableChange{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"} plan.Shards = []storage.ShardPlan{ - {Namespace: "commerce", Shard: "80-", NeedsChange: true}, - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "commerce", Shard: "-", NeedsChange: false}, + {Namespace: "commerce", Shard: "80-", Changes: []storage.TableChange{usersChange}}, + {Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{usersChange}}, + {Namespace: "commerce", Shard: "-"}, // unchanged: no changes, stays out of the apply } cfg := testServerConfig() cfg.Databases = map[string]DatabaseConfig{} @@ -1963,7 +1964,7 @@ func TestCreateStoredApplyFansOutShardedPlanOperations(t *testing.T) { controls: &memoryControlRequestStore{}, }, cfg, map[string]tern.Client{}, logger) - apply, storedApplyID, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-fanout", true) + apply, storedApplyID, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-fanout") require.NoError(t, err) assert.Equal(t, int64(123), storedApplyID) @@ -2009,9 +2010,10 @@ func TestCreateStoredApplyFansOutShardedPlanWithFinalizerOperation(t *testing.T) }, }, } + usersChange := storage.TableChange{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"} plan.Shards = []storage.ShardPlan{ - {Namespace: "commerce", Shard: "80-", NeedsChange: true}, - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, + {Namespace: "commerce", Shard: "80-", Changes: []storage.TableChange{usersChange}}, + {Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{usersChange}}, } cfg := testServerConfig() cfg.Databases = map[string]DatabaseConfig{} @@ -2030,7 +2032,7 @@ func TestCreateStoredApplyFansOutShardedPlanWithFinalizerOperation(t *testing.T) controls: &memoryControlRequestStore{}, }, cfg, map[string]tern.Client{}, logger) - _, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-finalizer", true) + _, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-finalizer") require.NoError(t, err) require.Len(t, applies.operations, 3) @@ -2069,7 +2071,7 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) { Artifacts: map[string]string{vSchemaArtifactName: "{\"routing\":true}"}, }, } - plan.Shards = []storage.ShardPlan{{Namespace: "commerce", Shard: "-", NeedsChange: true}} + plan.Shards = []storage.ShardPlan{{Namespace: "commerce", Shard: "-", Changes: []storage.TableChange{{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"}}}} cfg := testServerConfig() cfg.Databases = map[string]DatabaseConfig{} cfg.Databases["testdb"] = DatabaseConfig{ @@ -2087,7 +2089,7 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) { controls: &memoryControlRequestStore{}, }, cfg, map[string]tern.Client{}, logger) - _, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-finalizer-only-namespace", true) + _, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-finalizer-only-namespace") require.NoError(t, err) require.Len(t, applies.operations, 2) @@ -2101,41 +2103,6 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) { assert.Equal(t, "users", tasks.tasks[0].TableName) } -func TestCreateStoredApplyDoesNotShardWithoutClientOptIn(t *testing.T) { - applies := &capturingApplyStore{} - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) - tasks := &capturingTaskStore{} - applies.taskStore = tasks - plan := executeApplyTestPlan() - plan.Namespaces = map[string]*storage.NamespacePlanData{ - "commerce": { - Tables: []storage.TableChange{ - {Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"}, - }, - }, - } - plan.Shards = []storage.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "commerce", Shard: "80-", NeedsChange: true}, - } - svc := New(&mockStorageWithApplyStores{ - plans: &staticPlanStore{plan: plan}, - applies: applies, - tasks: tasks, - locks: &emptyLockStore{}, - applyLogs: &noopApplyLogStore{}, - controls: &memoryControlRequestStore{}, - }, testServerConfig(), map[string]tern.Client{}, logger) - - _, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-no-sharded-fanout", false) - - require.NoError(t, err) - require.Len(t, applies.operations, 1) - assert.Empty(t, applies.operations[0].OperationKey) - require.Len(t, tasks.tasks, 1) - assert.Empty(t, tasks.tasks[0].Shard) -} - func TestValidateShardOperationKeyPartsRejectsDelimiter(t *testing.T) { err := validateShardOperationKeyParts("commerce", "-80/80-", "users") diff --git a/pkg/api/plan_handlers.go b/pkg/api/plan_handlers.go index 58ece0e6..1ec05e2c 100644 --- a/pkg/api/plan_handlers.go +++ b/pkg/api/plan_handlers.go @@ -26,7 +26,6 @@ import ( "github.com/block/schemabot/pkg/schema" "github.com/block/schemabot/pkg/state" "github.com/block/schemabot/pkg/storage" - "github.com/block/schemabot/pkg/tern" ) const applyOperationKeyMaxLen = 255 @@ -859,7 +858,7 @@ func (s *Service) queueValidatedApply(ctx context.Context, span trace.Span, plan client.SetObserver(storedApplyID, observer) } - applyIdentifier, storedApplyID, err := s.enqueueApply(ctx, plan, req, options, clientSupportsShardedApplyFanout(client), attachObserver) + applyIdentifier, storedApplyID, err := s.enqueueApply(ctx, plan, req, options, attachObserver) if err != nil { recordApplyError("enqueue apply", err) return nil, 0, err @@ -886,11 +885,10 @@ func (s *Service) enqueueApply( plan *storage.Plan, req ApplyRequest, options map[string]string, - shardedFanoutSupported bool, onApplyCreated func(int64), ) (string, int64, error) { applyIdentifier := "apply-" + strings.ReplaceAll(uuid.New().String(), "-", "")[:16] - apply, storedApplyID, err := s.createStoredApply(ctx, plan, req, options, applyIdentifier, shardedFanoutSupported) + apply, storedApplyID, err := s.createStoredApply(ctx, plan, req, options, applyIdentifier) if err != nil { return "", 0, err } @@ -906,7 +904,6 @@ func (s *Service) createStoredApply( req ApplyRequest, options map[string]string, applyIdentifier string, - shardedFanoutSupported bool, ) (*storage.Apply, int64, error) { now := time.Now() applyOpts := storage.ApplyOptionsFromMap(options) @@ -961,7 +958,7 @@ func (s *Service) createStoredApply( taskChanges := applyTaskChanges(plan) cutoverPolicy := s.config.CutoverPolicyFor(plan.Database, req.Environment) onFailure := s.config.OnFailure(plan.Database, req.Environment) - groups, shardedFanout, err := buildApplyOperationGroups(plan, taskChanges, targets, req.Environment, applyOpts, cutoverPolicy, onFailure, now, shardedFanoutSupported) + groups, shardedFanout, err := buildApplyOperationGroups(plan, taskChanges, targets, req.Environment, applyOpts, cutoverPolicy, onFailure, now) if err != nil { return nil, 0, err } @@ -998,11 +995,6 @@ func (s *Service) createStoredApply( return apply, storedApplyID, nil } -func clientSupportsShardedApplyFanout(client tern.Client) bool { - capability, ok := client.(tern.ShardedApplyFanoutSupport) - return ok && capability.SupportsShardedApplyFanout() -} - // applyTaskChanges returns the per-table DDL changes that become apply tasks. // VSchema application is no longer modeled as a synthetic task: PlanetScale // surfaces its VSchema status/diff from engine resume metadata, and a sharded @@ -1020,10 +1012,13 @@ func buildApplyOperationGroups( cutoverPolicy string, onFailure string, now time.Time, - shardedFanoutSupported bool, ) ([]*storage.ApplyOperationWithTasks, bool, error) { - if shardedFanoutSupported && canBuildShardedOperationGroups(plan, taskChanges) { - groups, err := buildShardedApplyOperationGroups(plan, taskChanges, targets, environment, applyOpts, cutoverPolicy, onFailure, now) + // Fan a plan out per shard whenever it carries per-shard changes. Only an + // instance-local sharded engine (Strata) produces those, so an + // externally-authoritative engine (e.g. PlanetScale) — whose plans never + // carry per-shard changes — is never fanned out, regardless of transport. + if canBuildShardedOperationGroups(plan, taskChanges) { + groups, err := buildShardedApplyOperationGroups(plan, targets, environment, applyOpts, cutoverPolicy, onFailure, now) if err != nil { return nil, false, err } @@ -1066,7 +1061,6 @@ func canBuildShardedOperationGroups(plan *storage.Plan, taskChanges []storage.Ta func buildShardedApplyOperationGroups( plan *storage.Plan, - taskChanges []storage.TableChange, targets []routing.ExecutionTarget, environment string, applyOpts storage.ApplyOptions, @@ -1075,28 +1069,46 @@ func buildShardedApplyOperationGroups( now time.Time, ) ([]*storage.ApplyOperationWithTasks, error) { shardsByNamespace := changingShardsByNamespace(plan.Shards) - groups := make([]*storage.ApplyOperationWithTasks, 0, len(targets)*(len(taskChanges)+1)) + namespaces := make([]string, 0, len(shardsByNamespace)) + for namespace := range shardsByNamespace { + namespaces = append(namespaces, namespace) + } + sort.Strings(namespaces) + + groups := make([]*storage.ApplyOperationWithTasks, 0, len(targets)*(len(plan.Shards)+1)) groupsByTargetAndKey := make(map[string]*storage.ApplyOperationWithTasks) for _, target := range targets { - for _, ddlChange := range taskChanges { - for _, shard := range shardsByNamespace[ddlChange.Namespace] { - if err := validateShardOperationKeyParts(ddlChange.Namespace, shard.Shard, ddlChange.Table); err != nil { - return nil, err - } - operationKey := shardOperationKey(ddlChange.Namespace, shard.Shard, ddlChange.Table) - if len(operationKey) > applyOperationKeyMaxLen { - return nil, fmt.Errorf("operation key for namespace %q shard %q table %q exceeds %d characters", ddlChange.Namespace, shard.Shard, ddlChange.Table, applyOperationKeyMaxLen) - } - groupKey := target.Deployment + "\x00" + operationKey - group := groupsByTargetAndKey[groupKey] - if group == nil { - group = &storage.ApplyOperationWithTasks{ - Operation: newPendingApplyOperation(target, operationKey, cutoverPolicy, onFailure, now), + for _, namespace := range namespaces { + for _, shard := range shardsByNamespace[namespace] { + // Each shard is driven from its own changes; it is in + // shardsByNamespace only because those changes are non-empty. + for _, ddlChange := range shard.Changes { + // Fail closed on a malformed per-shard change rather than building + // an operation key or engine request from empty/mismatched fields. + if strings.TrimSpace(ddlChange.Table) == "" || strings.TrimSpace(ddlChange.DDL) == "" || strings.TrimSpace(ddlChange.Operation) == "" { + return nil, fmt.Errorf("namespace %q shard %q has a change with an empty table, DDL, or operation", namespace, shard.Shard) + } + if ddlChange.Namespace != "" && ddlChange.Namespace != namespace { + return nil, fmt.Errorf("namespace %q shard %q change for table %q has mismatched namespace %q", namespace, shard.Shard, ddlChange.Table, ddlChange.Namespace) + } + if err := validateShardOperationKeyParts(namespace, shard.Shard, ddlChange.Table); err != nil { + return nil, err + } + operationKey := shardOperationKey(namespace, shard.Shard, ddlChange.Table) + if len(operationKey) > applyOperationKeyMaxLen { + return nil, fmt.Errorf("operation key for namespace %q shard %q table %q exceeds %d characters", namespace, shard.Shard, ddlChange.Table, applyOperationKeyMaxLen) + } + groupKey := target.Deployment + "\x00" + operationKey + group := groupsByTargetAndKey[groupKey] + if group == nil { + group = &storage.ApplyOperationWithTasks{ + Operation: newPendingApplyOperation(target, operationKey, cutoverPolicy, onFailure, now), + } + groupsByTargetAndKey[groupKey] = group + groups = append(groups, group) } - groupsByTargetAndKey[groupKey] = group - groups = append(groups, group) + group.Tasks = append(group.Tasks, buildApplyTask(plan, ddlChange, environment, applyOpts, shard.Shard, now)) } - group.Tasks = append(group.Tasks, buildApplyTask(plan, ddlChange, environment, applyOpts, shard.Shard, now)) } } // Every namespace that changes its VSchema gets a task-less @@ -1163,7 +1175,8 @@ func validateOperationKeyPart(label, value string) error { func changingShardsByNamespace(shards []storage.ShardPlan) map[string][]storage.ShardPlan { shardsByNamespace := make(map[string][]storage.ShardPlan) for _, shard := range shards { - if !shard.NeedsChange { + // A shard is changing iff it carries its own changes. + if len(shard.Changes) == 0 { continue } shardsByNamespace[shard.Namespace] = append(shardsByNamespace[shard.Namespace], shard) diff --git a/pkg/api/proto_helpers.go b/pkg/api/proto_helpers.go index 501e5448..dd5b65da 100644 --- a/pkg/api/proto_helpers.go +++ b/pkg/api/proto_helpers.go @@ -272,11 +272,43 @@ func protoShardPlansToStorage(shards []*ternv1.ShardPlan) ([]storage.ShardPlan, if namespace == "" { namespace = "default" } - out = append(out, storage.ShardPlan{ - Shard: shardName, - Namespace: namespace, - NeedsChange: shard.NeedsChange, - }) + sp := storage.ShardPlan{ + Shard: shardName, + Namespace: namespace, + } + // Carry the shard's own changes so the apply-create fan-out can build + // per-shard tasks from per-shard DDL. This is remote/untrusted data-plane + // input over gRPC, so validate and fail closed rather than persisting + // corrupt plan_data that later builds tasks with empty/mismatched fields. + for j, ch := range shard.Changes { + if ch == nil { + return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d is null", i, namespace, shardName, j) + } + // Trim before validating and persisting: these values cross the gRPC + // boundary and later build operation keys and task rows, so preserved + // surrounding whitespace would yield surprising keys and reconciliation/ + // progress mismatches. + table := strings.TrimSpace(ch.TableName) + changeDDL := strings.TrimSpace(ch.Ddl) + if table == "" || changeDDL == "" { + return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d has an empty table or DDL", i, namespace, shardName, j) + } + switch ch.ChangeType { + case ternv1.ChangeType_CHANGE_TYPE_CREATE, ternv1.ChangeType_CHANGE_TYPE_ALTER, ternv1.ChangeType_CHANGE_TYPE_DROP: + default: + return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d (table %q) has unsupported change type %v", i, namespace, shardName, j, table, ch.ChangeType) + } + if cn := strings.TrimSpace(ch.Namespace); cn != "" && cn != namespace { + return nil, fmt.Errorf("shard plan %d shard %q change %d (table %q) namespace %q disagrees with shard namespace %q", i, shardName, j, table, cn, namespace) + } + sp.Changes = append(sp.Changes, storage.TableChange{ + Namespace: namespace, + Table: table, + DDL: changeDDL, + Operation: protoChangeTypeToOperation(ch.ChangeType), + }) + } + out = append(out, sp) } return out, nil } diff --git a/pkg/api/proto_helpers_test.go b/pkg/api/proto_helpers_test.go index 767c9d9c..976e79e5 100644 --- a/pkg/api/proto_helpers_test.go +++ b/pkg/api/proto_helpers_test.go @@ -160,14 +160,14 @@ func TestProtoChangesToNamespacesRejectsDuplicateDefaultNamespaces(t *testing.T) func TestProtoShardPlansToStorage(t *testing.T) { got, err := protoShardPlansToStorage([]*ternv1.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Shard: "80-", NeedsChange: false}, + {Namespace: "commerce", Shard: "-80"}, + {Shard: "80-"}, }) require.NoError(t, err) assert.Equal(t, []storage.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "default", Shard: "80-", NeedsChange: false}, + {Namespace: "commerce", Shard: "-80"}, + {Namespace: "default", Shard: "80-"}, }, got) } diff --git a/pkg/api/sharded_pershard_fanout_test.go b/pkg/api/sharded_pershard_fanout_test.go new file mode 100644 index 00000000..1b4a5ce7 --- /dev/null +++ b/pkg/api/sharded_pershard_fanout_test.go @@ -0,0 +1,159 @@ +package api + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ternv1 "github.com/block/schemabot/pkg/proto/ternv1" + "github.com/block/schemabot/pkg/routing" + "github.com/block/schemabot/pkg/storage" +) + +const pershardNamespace = "cdb_resolute_sharded" + +func pershardTargets() []routing.ExecutionTarget { + return []routing.ExecutionTarget{{DatabaseType: storage.DatabaseTypeStrata, Deployment: "cdb-resolute", Target: "cdb-resolute"}} +} + +func pershardTestTime() time.Time { return time.Unix(1700000000, 0).UTC() } + +// operationKeys returns each group's operation key paired with its tasks' DDLs, +// so a test can assert exactly which (shard, table) operations were built and +// what DDL each carries. +func operationDDLByKey(groups []*storage.ApplyOperationWithTasks) map[string][]string { + out := make(map[string][]string, len(groups)) + for _, g := range groups { + ddls := make([]string, 0, len(g.Tasks)) + for _, task := range g.Tasks { + ddls = append(ddls, task.DDL) + } + out[g.Operation.OperationKey] = ddls + } + return out +} + +// Each shard is driven from its own changes, so a keyspace whose shards diverge +// fans out into the per-shard operations that shard actually needs — not a +// uniform cross-product. +func TestBuildShardedApplyOperationGroupsUsesPerShardDDL(t *testing.T) { + mutesDDL := "ALTER TABLE `mutes` ADD INDEX (`created_at`)" + logsDDL := "ALTER TABLE `logs` ADD INDEX (`created_at`)" + + plan := &storage.Plan{ + Database: "cdb-resolute", + Shards: []storage.ShardPlan{ + {Namespace: pershardNamespace, Shard: "-80", Changes: []storage.TableChange{ + {Namespace: pershardNamespace, Table: "mutes", DDL: mutesDDL, Operation: "alter"}, + }}, + // 80- has drifted further: it needs both tables. + {Namespace: pershardNamespace, Shard: "80-", Changes: []storage.TableChange{ + {Namespace: pershardNamespace, Table: "mutes", DDL: mutesDDL, Operation: "alter"}, + {Namespace: pershardNamespace, Table: "logs", DDL: logsDDL, Operation: "alter"}, + }}, + }, + } + + groups, err := buildShardedApplyOperationGroups(plan, pershardTargets(), "production", storage.ApplyOptions{}, "", "", pershardTestTime()) + require.NoError(t, err) + + got := operationDDLByKey(groups) + assert.Equal(t, map[string][]string{ + pershardNamespace + "/-80/mutes": {mutesDDL}, + pershardNamespace + "/80-/mutes": {mutesDDL}, + pershardNamespace + "/80-/logs": {logsDDL}, + }, got, "-80 needs only mutes; 80- needs mutes and logs") + + for _, g := range groups { + require.Len(t, g.Tasks, 1) + assert.NotEmpty(t, g.Tasks[0].Shard, "every task is shard-tagged") + } +} + +// A shard with no changes is not a changing shard, so it produces no operations +// (membership is implied by changes, not a separate flag). +func TestBuildShardedApplyOperationGroupsSkipsShardsWithoutChanges(t *testing.T) { + mutesDDL := "ALTER TABLE `mutes` ADD INDEX (`created_at`)" + plan := &storage.Plan{ + Database: "cdb-resolute", + Shards: []storage.ShardPlan{ + {Namespace: pershardNamespace, Shard: "-80", Changes: []storage.TableChange{ + {Namespace: pershardNamespace, Table: "mutes", DDL: mutesDDL, Operation: "alter"}, + }}, + {Namespace: pershardNamespace, Shard: "80-"}, // unchanged: no changes + }, + } + + groups, err := buildShardedApplyOperationGroups(plan, pershardTargets(), "production", storage.ApplyOptions{}, "", "", pershardTestTime()) + require.NoError(t, err) + + assert.Equal(t, map[string][]string{ + pershardNamespace + "/-80/mutes": {mutesDDL}, + }, operationDDLByKey(groups), "only the shard with changes fans out; the unchanged shard stays out") +} + +// Per-shard change data is remote/untrusted (it crosses the gRPC boundary), so a +// malformed change fails closed rather than persisting corrupt plan_data. +func TestProtoShardPlansToStorageFailsClosedOnMalformedChange(t *testing.T) { + shardWith := func(ch *ternv1.TableChange) []*ternv1.ShardPlan { + return []*ternv1.ShardPlan{{Namespace: pershardNamespace, Shard: "-80", Changes: []*ternv1.TableChange{ch}}} + } + cases := []struct { + name string + ch *ternv1.TableChange + want string + }{ + {"nil change", nil, "is null"}, + {"empty table", &ternv1.TableChange{Ddl: "ALTER TABLE `mutes` ADD INDEX (`x`)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER}, "empty table or DDL"}, + {"empty ddl", &ternv1.TableChange{TableName: "mutes", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER}, "empty table or DDL"}, + {"unsupported type", &ternv1.TableChange{TableName: "mutes", Ddl: "x", ChangeType: ternv1.ChangeType_CHANGE_TYPE_VSCHEMA}, "unsupported change type"}, + {"namespace mismatch", &ternv1.TableChange{TableName: "mutes", Ddl: "x", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "other"}, "disagrees with shard namespace"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := protoShardPlansToStorage(shardWith(tc.ch)) + require.Error(t, err) + assert.Contains(t, err.Error(), tc.want) + }) + } +} + +// A malformed per-shard change in the stored plan fails closed before building an +// operation key or engine request from empty/mismatched fields. +func TestBuildShardedApplyOperationGroupsFailsClosedOnMalformedChange(t *testing.T) { + plan := &storage.Plan{ + Database: "cdb-resolute", + Shards: []storage.ShardPlan{{ + Namespace: pershardNamespace, Shard: "-80", + Changes: []storage.TableChange{{Namespace: pershardNamespace, Table: "", DDL: "ALTER TABLE `mutes` ADD INDEX (`x`)", Operation: "alter"}}, + }}, + } + _, err := buildShardedApplyOperationGroups(plan, pershardTargets(), "production", storage.ApplyOptions{}, "", "", pershardTestTime()) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty table") +} + +// Per-shard changes survive the proto boundary so the control plane rebuilds the +// fan-out from each shard's own DDL. +func TestProtoShardPlansToStorageCarriesPerShardChanges(t *testing.T) { + got, err := protoShardPlansToStorage([]*ternv1.ShardPlan{{ + Namespace: pershardNamespace, + Shard: "-80", + Changes: []*ternv1.TableChange{{ + Namespace: pershardNamespace, + TableName: "mutes", + Ddl: "ALTER TABLE `mutes` ADD INDEX (`created_at`)", + ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, + }}, + }}) + + require.NoError(t, err) + require.Len(t, got, 1) + require.Len(t, got[0].Changes, 1) + assert.Equal(t, "mutes", got[0].Changes[0].Table) + assert.Equal(t, pershardNamespace, got[0].Changes[0].Namespace) + assert.Equal(t, "ALTER TABLE `mutes` ADD INDEX (`created_at`)", got[0].Changes[0].DDL) + assert.Equal(t, "alter", got[0].Changes[0].Operation) +} diff --git a/pkg/proto/tern.proto b/pkg/proto/tern.proto index 1fc8e9ee..b5fc2974 100644 --- a/pkg/proto/tern.proto +++ b/pkg/proto/tern.proto @@ -426,13 +426,19 @@ message LintViolation { string severity = 6; // "error", "warning", or "info" } -// ShardPlan reports per-shard membership and drift for a namespace. A shard is -// listed with needs_change=true when its current schema differs from the -// desired schema. +// ShardPlan reports a shard's own changes for a namespace. A shard is changing +// when changes is non-empty; carrying the changes (rather than a separate +// needs_change flag) means a keyspace whose shards diverge — drift, or a +// partially-applied canary rollout — is represented per shard, and the exact +// reviewed DDL is what gets applied (no plan-time/apply-time TOCTOU where a +// membership flag was saved but the DDL was not). message ShardPlan { + // needs_change (field 3) was removed pre-release: a shard is changing iff + // changes is non-empty. No reserved marker — this proto has no released + // consumers, so reusing the field number is a safe, deliberate breaking change. string shard = 1; string namespace = 2; - bool needs_change = 3; + repeated TableChange changes = 3; } // PlanResponse contains the generated schema change plan. diff --git a/pkg/proto/ternv1/tern.pb.go b/pkg/proto/ternv1/tern.pb.go index eea66a13..a0769684 100644 --- a/pkg/proto/ternv1/tern.pb.go +++ b/pkg/proto/ternv1/tern.pb.go @@ -1369,14 +1369,20 @@ func (x *LintViolation) GetSeverity() string { return "" } -// ShardPlan reports per-shard membership and drift for a namespace. A shard is -// listed with needs_change=true when its current schema differs from the -// desired schema. +// ShardPlan reports a shard's own changes for a namespace. A shard is changing +// when changes is non-empty; carrying the changes (rather than a separate +// needs_change flag) means a keyspace whose shards diverge — drift, or a +// partially-applied canary rollout — is represented per shard, and the exact +// reviewed DDL is what gets applied (no plan-time/apply-time TOCTOU where a +// membership flag was saved but the DDL was not). type ShardPlan struct { - state protoimpl.MessageState `protogen:"open.v1"` - Shard string `protobuf:"bytes,1,opt,name=shard,proto3" json:"shard,omitempty"` - Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` - NeedsChange bool `protobuf:"varint,3,opt,name=needs_change,json=needsChange,proto3" json:"needs_change,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + // needs_change (field 3) was removed pre-release: a shard is changing iff + // changes is non-empty. No reserved marker — this proto has no released + // consumers, so reusing the field number is a safe, deliberate breaking change. + Shard string `protobuf:"bytes,1,opt,name=shard,proto3" json:"shard,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Changes []*TableChange `protobuf:"bytes,3,rep,name=changes,proto3" json:"changes,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1425,11 +1431,11 @@ func (x *ShardPlan) GetNamespace() string { return "" } -func (x *ShardPlan) GetNeedsChange() bool { +func (x *ShardPlan) GetChanges() []*TableChange { if x != nil { - return x.NeedsChange + return x.Changes } - return false + return nil } // PlanResponse contains the generated schema change plan. @@ -3078,11 +3084,11 @@ const file_tern_proto_rawDesc = "" + "\x06column\x18\x03 \x01(\tR\x06column\x12\x16\n" + "\x06linter\x18\x04 \x01(\tR\x06linter\x12\x19\n" + "\bfix_type\x18\x05 \x01(\tR\afixType\x12\x1a\n" + - "\bseverity\x18\x06 \x01(\tR\bseverity\"b\n" + + "\bseverity\x18\x06 \x01(\tR\bseverity\"o\n" + "\tShardPlan\x12\x14\n" + "\x05shard\x18\x01 \x01(\tR\x05shard\x12\x1c\n" + - "\tnamespace\x18\x02 \x01(\tR\tnamespace\x12!\n" + - "\fneeds_change\x18\x03 \x01(\bR\vneedsChange\"\x86\x02\n" + + "\tnamespace\x18\x02 \x01(\tR\tnamespace\x12.\n" + + "\achanges\x18\x03 \x03(\v2\x14.tern.v1.TableChangeR\achanges\"\x86\x02\n" + "\fPlanResponse\x12\x17\n" + "\aplan_id\x18\x01 \x01(\tR\x06planId\x12'\n" + "\x06engine\x18\x02 \x01(\x0e2\x0f.tern.v1.EngineR\x06engine\x12/\n" + @@ -3350,50 +3356,51 @@ var file_tern_proto_depIdxs = []int32{ 14, // 12: tern.v1.SchemaChange.table_changes:type_name -> tern.v1.TableChange 45, // 13: tern.v1.SchemaChange.metadata:type_name -> tern.v1.SchemaChange.MetadataEntry 46, // 14: tern.v1.SchemaChange.original_files:type_name -> tern.v1.SchemaChange.OriginalFilesEntry - 0, // 15: tern.v1.PlanResponse.engine:type_name -> tern.v1.Engine - 15, // 16: tern.v1.PlanResponse.changes:type_name -> tern.v1.SchemaChange - 16, // 17: tern.v1.PlanResponse.lint_violations:type_name -> tern.v1.LintViolation - 17, // 18: tern.v1.PlanResponse.shards:type_name -> tern.v1.ShardPlan - 47, // 19: tern.v1.ApplyRequest.options:type_name -> tern.v1.ApplyRequest.OptionsEntry - 48, // 20: tern.v1.ApplyRequest.schema_files:type_name -> tern.v1.ApplyRequest.SchemaFilesEntry - 14, // 21: tern.v1.ApplyRequest.ddl_changes:type_name -> tern.v1.TableChange - 22, // 22: tern.v1.TableProgress.shards:type_name -> tern.v1.ShardProgress - 2, // 23: tern.v1.TableProgress.change_type:type_name -> tern.v1.ChangeType - 1, // 24: tern.v1.ProgressResponse.state:type_name -> tern.v1.State - 0, // 25: tern.v1.ProgressResponse.engine:type_name -> tern.v1.Engine - 23, // 26: tern.v1.ProgressResponse.tables:type_name -> tern.v1.TableProgress - 49, // 27: tern.v1.ProgressResponse.metadata:type_name -> tern.v1.ProgressResponse.MetadataEntry - 8, // 28: tern.v1.PulledNamespace.TableCatalogEntry.value:type_name -> tern.v1.TableCatalog - 6, // 29: tern.v1.PullSchemaResponse.NamespacesEntry.value:type_name -> tern.v1.PulledNamespace - 4, // 30: tern.v1.PlanRequest.SchemaFilesEntry.value:type_name -> tern.v1.SchemaFiles - 4, // 31: tern.v1.ApplyRequest.SchemaFilesEntry.value:type_name -> tern.v1.SchemaFiles - 5, // 32: tern.v1.Tern.PullSchema:input_type -> tern.v1.PullSchemaRequest - 13, // 33: tern.v1.Tern.Plan:input_type -> tern.v1.PlanRequest - 19, // 34: tern.v1.Tern.Apply:input_type -> tern.v1.ApplyRequest - 21, // 35: tern.v1.Tern.Progress:input_type -> tern.v1.ProgressRequest - 25, // 36: tern.v1.Tern.Cutover:input_type -> tern.v1.CutoverRequest - 27, // 37: tern.v1.Tern.Revert:input_type -> tern.v1.RevertRequest - 29, // 38: tern.v1.Tern.SkipRevert:input_type -> tern.v1.SkipRevertRequest - 31, // 39: tern.v1.Tern.Health:input_type -> tern.v1.HealthRequest - 33, // 40: tern.v1.Tern.Stop:input_type -> tern.v1.StopRequest - 35, // 41: tern.v1.Tern.Start:input_type -> tern.v1.StartRequest - 37, // 42: tern.v1.Tern.Volume:input_type -> tern.v1.VolumeRequest - 12, // 43: tern.v1.Tern.PullSchema:output_type -> tern.v1.PullSchemaResponse - 18, // 44: tern.v1.Tern.Plan:output_type -> tern.v1.PlanResponse - 20, // 45: tern.v1.Tern.Apply:output_type -> tern.v1.ApplyResponse - 24, // 46: tern.v1.Tern.Progress:output_type -> tern.v1.ProgressResponse - 26, // 47: tern.v1.Tern.Cutover:output_type -> tern.v1.CutoverResponse - 28, // 48: tern.v1.Tern.Revert:output_type -> tern.v1.RevertResponse - 30, // 49: tern.v1.Tern.SkipRevert:output_type -> tern.v1.SkipRevertResponse - 32, // 50: tern.v1.Tern.Health:output_type -> tern.v1.HealthResponse - 34, // 51: tern.v1.Tern.Stop:output_type -> tern.v1.StopResponse - 36, // 52: tern.v1.Tern.Start:output_type -> tern.v1.StartResponse - 38, // 53: tern.v1.Tern.Volume:output_type -> tern.v1.VolumeResponse - 43, // [43:54] is the sub-list for method output_type - 32, // [32:43] is the sub-list for method input_type - 32, // [32:32] is the sub-list for extension type_name - 32, // [32:32] is the sub-list for extension extendee - 0, // [0:32] is the sub-list for field type_name + 14, // 15: tern.v1.ShardPlan.changes:type_name -> tern.v1.TableChange + 0, // 16: tern.v1.PlanResponse.engine:type_name -> tern.v1.Engine + 15, // 17: tern.v1.PlanResponse.changes:type_name -> tern.v1.SchemaChange + 16, // 18: tern.v1.PlanResponse.lint_violations:type_name -> tern.v1.LintViolation + 17, // 19: tern.v1.PlanResponse.shards:type_name -> tern.v1.ShardPlan + 47, // 20: tern.v1.ApplyRequest.options:type_name -> tern.v1.ApplyRequest.OptionsEntry + 48, // 21: tern.v1.ApplyRequest.schema_files:type_name -> tern.v1.ApplyRequest.SchemaFilesEntry + 14, // 22: tern.v1.ApplyRequest.ddl_changes:type_name -> tern.v1.TableChange + 22, // 23: tern.v1.TableProgress.shards:type_name -> tern.v1.ShardProgress + 2, // 24: tern.v1.TableProgress.change_type:type_name -> tern.v1.ChangeType + 1, // 25: tern.v1.ProgressResponse.state:type_name -> tern.v1.State + 0, // 26: tern.v1.ProgressResponse.engine:type_name -> tern.v1.Engine + 23, // 27: tern.v1.ProgressResponse.tables:type_name -> tern.v1.TableProgress + 49, // 28: tern.v1.ProgressResponse.metadata:type_name -> tern.v1.ProgressResponse.MetadataEntry + 8, // 29: tern.v1.PulledNamespace.TableCatalogEntry.value:type_name -> tern.v1.TableCatalog + 6, // 30: tern.v1.PullSchemaResponse.NamespacesEntry.value:type_name -> tern.v1.PulledNamespace + 4, // 31: tern.v1.PlanRequest.SchemaFilesEntry.value:type_name -> tern.v1.SchemaFiles + 4, // 32: tern.v1.ApplyRequest.SchemaFilesEntry.value:type_name -> tern.v1.SchemaFiles + 5, // 33: tern.v1.Tern.PullSchema:input_type -> tern.v1.PullSchemaRequest + 13, // 34: tern.v1.Tern.Plan:input_type -> tern.v1.PlanRequest + 19, // 35: tern.v1.Tern.Apply:input_type -> tern.v1.ApplyRequest + 21, // 36: tern.v1.Tern.Progress:input_type -> tern.v1.ProgressRequest + 25, // 37: tern.v1.Tern.Cutover:input_type -> tern.v1.CutoverRequest + 27, // 38: tern.v1.Tern.Revert:input_type -> tern.v1.RevertRequest + 29, // 39: tern.v1.Tern.SkipRevert:input_type -> tern.v1.SkipRevertRequest + 31, // 40: tern.v1.Tern.Health:input_type -> tern.v1.HealthRequest + 33, // 41: tern.v1.Tern.Stop:input_type -> tern.v1.StopRequest + 35, // 42: tern.v1.Tern.Start:input_type -> tern.v1.StartRequest + 37, // 43: tern.v1.Tern.Volume:input_type -> tern.v1.VolumeRequest + 12, // 44: tern.v1.Tern.PullSchema:output_type -> tern.v1.PullSchemaResponse + 18, // 45: tern.v1.Tern.Plan:output_type -> tern.v1.PlanResponse + 20, // 46: tern.v1.Tern.Apply:output_type -> tern.v1.ApplyResponse + 24, // 47: tern.v1.Tern.Progress:output_type -> tern.v1.ProgressResponse + 26, // 48: tern.v1.Tern.Cutover:output_type -> tern.v1.CutoverResponse + 28, // 49: tern.v1.Tern.Revert:output_type -> tern.v1.RevertResponse + 30, // 50: tern.v1.Tern.SkipRevert:output_type -> tern.v1.SkipRevertResponse + 32, // 51: tern.v1.Tern.Health:output_type -> tern.v1.HealthResponse + 34, // 52: tern.v1.Tern.Stop:output_type -> tern.v1.StopResponse + 36, // 53: tern.v1.Tern.Start:output_type -> tern.v1.StartResponse + 38, // 54: tern.v1.Tern.Volume:output_type -> tern.v1.VolumeResponse + 44, // [44:55] is the sub-list for method output_type + 33, // [33:44] is the sub-list for method input_type + 33, // [33:33] is the sub-list for extension type_name + 33, // [33:33] is the sub-list for extension extendee + 0, // [0:33] is the sub-list for field type_name } func init() { file_tern_proto_init() } diff --git a/pkg/storage/mysqlstore/plans_test.go b/pkg/storage/mysqlstore/plans_test.go index 17394b67..b364d269 100644 --- a/pkg/storage/mysqlstore/plans_test.go +++ b/pkg/storage/mysqlstore/plans_test.go @@ -42,8 +42,14 @@ func TestPlanStore_RoundTripsShardPlans(t *testing.T) { }, }, Shards: []storage.ShardPlan{ - {Namespace: "commerce", Shard: "80-", NeedsChange: false}, - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, + {Namespace: "commerce", Shard: "80-"}, + // -80 carries its own per-shard DDL; the round-trip must preserve it. + {Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{{ + Namespace: "commerce", + Table: "users", + DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", + Operation: "alter", + }}}, }, CreatedAt: time.Now(), }) @@ -54,9 +60,14 @@ func TestPlanStore_RoundTripsShardPlans(t *testing.T) { require.NotNil(t, got) assert.Equal(t, []storage.ShardPlan{ - {Namespace: "commerce", Shard: "-80", NeedsChange: true}, - {Namespace: "commerce", Shard: "80-", NeedsChange: false}, - }, got.Shards) + {Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{{ + Namespace: "commerce", + Table: "users", + DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", + Operation: "alter", + }}}, + {Namespace: "commerce", Shard: "80-"}, + }, got.Shards, "per-shard Changes must round-trip through plan_data JSON") require.Contains(t, got.Namespaces, "commerce") assert.Equal(t, got.Shards, got.Namespaces["commerce"].Shards) assert.Equal(t, []storage.TableChange{{ diff --git a/pkg/storage/types.go b/pkg/storage/types.go index 12d55e21..7067cfad 100644 --- a/pkg/storage/types.go +++ b/pkg/storage/types.go @@ -270,9 +270,14 @@ type NamespacePlanData struct { // sharded namespace. It is generic storage metadata used by apply-create to // reconstruct operation groups after the original plan request has returned. type ShardPlan struct { - Shard string `json:"shard"` - Namespace string `json:"namespace,omitempty"` - NeedsChange bool `json:"needs_change,omitempty"` + Shard string `json:"shard"` + Namespace string `json:"namespace,omitempty"` + // Changes are this shard's own table changes; a shard is changing when this + // is non-empty. Persisting the changes (rather than a separate membership + // flag) lets a keyspace whose shards diverge — drift, or a partially-applied + // canary rollout — be represented per shard, and makes the reviewed DDL the + // exact DDL that gets applied. + Changes []TableChange `json:"changes,omitempty"` } // Plan represents a schema change plan generated by tern.Client.Plan(). diff --git a/pkg/tern/client.go b/pkg/tern/client.go index b1ace53d..50fe8933 100644 --- a/pkg/tern/client.go +++ b/pkg/tern/client.go @@ -120,9 +120,3 @@ type Client interface { // Close releases any resources held by the client. Close() error } - -// ShardedApplyFanoutSupport is an optional client capability for engines that -// can drive sharded work as independent SchemaBot apply operations. -type ShardedApplyFanoutSupport interface { - SupportsShardedApplyFanout() bool -} diff --git a/pkg/tern/grpc_client.go b/pkg/tern/grpc_client.go index aed71be0..753be0ed 100644 --- a/pkg/tern/grpc_client.go +++ b/pkg/tern/grpc_client.go @@ -1008,6 +1008,18 @@ func (s applyTaskScope) suppressesDirectParentApplyWrites() bool { return s.usesOperationRemoteResume() } +// finalizerOperationScope reports whether this drive owns a task-less +// group_finalizer operation. Such an operation has no task rows, so the +// operator's task-derived operation→parent projection can never move its +// operation row off pending: the terminal remote state (completion or failure) +// would be lost. A finalizer drive must therefore persist its own operation +// row's terminal state, mirroring LocalClient.driveGroupFinalizer. +func (s applyTaskScope) finalizerOperationScope() bool { + return s.usesOperationRemoteResume() && + s.operation != nil && + s.operation.OperationKind == storage.ApplyOperationKindGroupFinalizer +} + // remoteApplyID resolves the remote Tern apply id sent on this drive's // Progress/Stop/Start/Cutover calls. Multi-operation drives read the claimed // operation's engine_resume_context (which may be empty before dispatch); @@ -1189,7 +1201,14 @@ func (c *GRPCClient) ResumeApplyOperation(ctx context.Context, apply *storage.Ap if err != nil { return err } - // Fail closed before any dispatch or state mutation: an operation that + // A group_finalizer is task-less by design: it applies the namespace VSchema + // once its sibling shard work completes. Drive it over gRPC as a VSchema-only + // apply rather than failing closed on the empty task set, mirroring + // LocalClient.ResumeApplyOperation's finalizer branch. + if scope.operation != nil && scope.operation.OperationKind == storage.ApplyOperationKindGroupFinalizer { + return c.dispatchRemoteGroupFinalizer(ctx, apply, scope) + } + // Fail closed before any dispatch or state mutation: a work operation that // resolves to no tasks is an invalid or stale claim. The shared resume path // would otherwise mark the whole parent apply failed (dispatchPendingApply // and the remote-failure sites set applies.state regardless of scope), which @@ -1205,6 +1224,86 @@ func (c *GRPCClient) ResumeApplyOperation(ctx context.Context, apply *storage.Ap return c.resumeApply(ctx, apply, scope) } +// dispatchRemoteGroupFinalizer drives a task-less group_finalizer apply_operation +// over gRPC. It is the remote counterpart to LocalClient.driveGroupFinalizer: +// the control plane cannot run the engine, so it dispatches the namespace's +// VSchema as a VSchema-only apply (no DDL, no target shards) to the data plane, +// which applies it via its task-less VSchema-only path +// (isTasklessVSchemaOnlyPlan); records the remote apply id on the operation; and +// polls to completion. Carrying both a VSchema change and the plan's schema +// files lets the remote drive it whether it has the plan locally or materializes +// it from the dispatch. +func (c *GRPCClient) dispatchRemoteGroupFinalizer(ctx context.Context, apply *storage.Apply, scope applyTaskScope) error { + op := scope.operation + namespace := namespaceFromFinalizerKey(op.OperationKey) + if namespace == "" { + return fmt.Errorf("group_finalizer apply_operation %d (apply %s): malformed operation key %q", op.ID, apply.ApplyIdentifier, op.OperationKey) + } + plan, err := c.storage.Plans().GetByID(ctx, apply.PlanID) + if err != nil { + return fmt.Errorf("load plan %d for group_finalizer apply_operation %d (apply %s): %w", apply.PlanID, op.ID, apply.ApplyIdentifier, err) + } + if plan == nil { + return fmt.Errorf("plan %d not found for group_finalizer apply_operation %d (apply %s)", apply.PlanID, op.ID, apply.ApplyIdentifier) + } + // Fail closed if the namespace carries no VSchema artifact, mirroring the + // local finalizer drive. + if _, err := finalizerVSchemaChanges(plan, namespace); err != nil { + return fmt.Errorf("group_finalizer apply_operation %d (apply %s): %w", op.ID, apply.ApplyIdentifier, err) + } + + // Dispatch only if this operation has not already been dispatched. On resume + // the recorded remote apply id lets us poll the existing remote apply instead + // of starting a duplicate. + if scope.remoteApplyID(apply) == "" { + options := effectiveCopyDriveOptions(apply, scope.multiOperation, scope.operation).Map() + target := options["target"] + if target == "" { + target = apply.Database + } + if handled, err := c.processPendingStopControlRequest(ctx, apply, scope); handled || err != nil { + return err + } + resp, err := c.client.Apply(ctx, &ternv1.ApplyRequest{ + PlanId: plan.PlanIdentifier, + Options: options, + Database: apply.Database, + Type: apply.DatabaseType, + DdlChanges: []*ternv1.TableChange{{Namespace: namespace, TableName: "VSchema: " + namespace, ChangeType: ternv1.ChangeType_CHANGE_TYPE_VSCHEMA}}, + SchemaFiles: schemaFilesToProto(plan.SchemaFiles), + Environment: apply.Environment, + Target: target, + Caller: apply.Caller, + // No TargetShards: the VSchema is namespace-level, not per shard. + IdempotencyKey: remoteApplyIdempotencyKey(apply, scope), + }) + if err != nil { + if isAmbiguousRemoteApplyDispatchError(err) { + return fmt.Errorf("group_finalizer apply_operation %d (apply %s) has ambiguous remote dispatch outcome: %w", op.ID, apply.ApplyIdentifier, err) + } + if markErr := c.markRemoteApplyFailed(ctx, apply, nil, err.Error(), isRetryableRemoteApplyError(err), scope); markErr != nil { + return fmt.Errorf("mark group_finalizer apply_operation %d failed after remote apply error: %w", op.ID, markErr) + } + return fmt.Errorf("dispatch group_finalizer apply_operation %d (apply %s): %w", op.ID, apply.ApplyIdentifier, err) + } + if resp == nil || !resp.Accepted || resp.ApplyId == "" { + errMsg := "remote group_finalizer apply was not accepted" + if resp != nil && resp.ErrorMessage != "" { + errMsg = resp.ErrorMessage + } + if markErr := c.markRemoteApplyFailed(ctx, apply, nil, errMsg, false, scope); markErr != nil { + return fmt.Errorf("mark group_finalizer apply_operation %d failed: %w", op.ID, markErr) + } + return fmt.Errorf("dispatch group_finalizer apply_operation %d (apply %s): %s", op.ID, apply.ApplyIdentifier, errMsg) + } + if err := c.persistRemoteApplyID(ctx, apply, scope, resp.ApplyId); err != nil { + return fmt.Errorf("store remote apply id for group_finalizer apply_operation %d: %w", op.ID, err) + } + } + + return c.pollForCompletion(ctx, apply, false, scope, false) +} + // ResumeApplyOperationCutover drives a single barrier-parked apply_operation // through its cutover phase over the remote (gRPC) path. It is the // deployment-ordered counterpart to ResumeApplyOperation's copy drive: the @@ -2208,6 +2307,16 @@ func (c *GRPCClient) markRemoteApplyFailedWithOptions(ctx context.Context, remot // the parent applies.state via the projection CAS; the driver must not write // the parent failure or run its parent-level side effects. if scope.suppressesDirectParentApplyWrites() { + // A task-less group_finalizer has no task rows to carry the failure, so the + // operator could never derive its failed operation row. Mark it failed + // directly on a non-retryable failure; a retryable failure is left + // non-terminal so the operator re-drives the operation (and re-polls the + // existing remote apply). + if scope.finalizerOperationScope() && !retryable { + if err := c.storage.ApplyOperations().MarkFailed(ctx, scope.applyOperationID, message); err != nil { + return fmt.Errorf("mark group_finalizer apply_operation %d failed after remote apply failure: %w", scope.applyOperationID, err) + } + } slog.Debug("recorded operation task failures during multi-operation drive; parent failure is owned by the rollout projection", "apply_id", storedApply.ApplyIdentifier, "apply_db_id", storedApply.ID, @@ -2323,6 +2432,13 @@ func (c *GRPCClient) persistTerminalStateFromRemote(ctx context.Context, storedA // projection CAS and completes any parent control requests. The driver must // not write the parent row or run its parent-level side effects here. if scope.suppressesDirectParentApplyWrites() { + // A task-less group_finalizer has no task rows for the operator to derive + // the operation row from, so persist its terminal state directly here. + if scope.finalizerOperationScope() { + if err := c.persistFinalizerOperationTerminalState(ctx, scope.applyOperationID, remoteApply.State, remoteApply.ErrorMessage); err != nil { + return err + } + } slog.Debug("skipping parent terminal write during multi-operation drive; operation tasks are resolved and parent state is owned by the rollout projection", "apply_id", storedApply.ApplyIdentifier, "apply_db_id", storedApply.ID, @@ -2359,6 +2475,27 @@ func (c *GRPCClient) persistTerminalStateFromRemote(ctx context.Context, storedA return nil } +// persistFinalizerOperationTerminalState reflects a remote terminal state onto a +// task-less group_finalizer operation row. Because such an operation carries no +// task rows, the operator's task-derived projection can never move it: the drive +// owns its terminal transition, mirroring LocalClient.driveGroupFinalizer +// (MarkCompleted on success, MarkFailed on failure). Non-completed/non-failed +// terminal states (stopped/cancelled) stay owned by the operator's stop/cancel +// handling, so the drive leaves the operation row untouched for those. +func (c *GRPCClient) persistFinalizerOperationTerminalState(ctx context.Context, applyOperationID int64, terminalState, errMsg string) error { + switch { + case state.IsState(terminalState, state.Apply.Completed): + if err := c.storage.ApplyOperations().MarkCompleted(ctx, applyOperationID); err != nil { + return fmt.Errorf("mark group_finalizer apply_operation %d completed from remote terminal state: %w", applyOperationID, err) + } + case state.IsState(terminalState, state.Apply.Failed): + if err := c.storage.ApplyOperations().MarkFailed(ctx, applyOperationID, errMsg); err != nil { + return fmt.Errorf("mark group_finalizer apply_operation %d failed from remote terminal state: %w", applyOperationID, err) + } + } + return nil +} + func remoteTerminalApplyLogLevel(apply *storage.Apply) string { if apply != nil && state.IsState(apply.State, state.Apply.Failed) { return storage.LogLevelError diff --git a/pkg/tern/grpc_client_test.go b/pkg/tern/grpc_client_test.go index 14c9e435..e070018a 100644 --- a/pkg/tern/grpc_client_test.go +++ b/pkg/tern/grpc_client_test.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/status" ternv1 "github.com/block/schemabot/pkg/proto/ternv1" + "github.com/block/schemabot/pkg/schema" "github.com/block/schemabot/pkg/state" "github.com/block/schemabot/pkg/storage" ) @@ -734,6 +735,29 @@ func (m *mockApplyOperationStore) MarkTerminal(_ context.Context, id int64, newS return nil } +func (m *mockApplyOperationStore) MarkCompleted(_ context.Context, id int64) error { + op, ok := m.ops[id] + if !ok { + return storage.ErrApplyOperationNotFound + } + now := time.Now() + op.State = state.ApplyOperation.Completed + op.CompletedAt = &now + return nil +} + +func (m *mockApplyOperationStore) MarkFailed(_ context.Context, id int64, errMsg string) error { + op, ok := m.ops[id] + if !ok { + return storage.ErrApplyOperationNotFound + } + now := time.Now() + op.State = state.ApplyOperation.Failed + op.ErrorMessage = errMsg + op.CompletedAt = &now + return nil +} + func (m *mockApplyOperationStore) SaveEngineResumeState(_ context.Context, operationID int64, resumeState *storage.EngineResumeState) error { if m.saveErr != nil { return m.saveErr @@ -958,6 +982,154 @@ func TestGRPCClient_ResumeApplyOperationDispatchesScopedTasks(t *testing.T) { assert.Equal(t, []string{"-80"}, req.TargetShards) } +func TestGRPCClient_ResumeApplyOperationDispatchesGroupFinalizerAsVSchemaOnly(t *testing.T) { + // A task-less group_finalizer operation is driven over the remote path by + // dispatching the namespace VSchema as a VSchema-only apply (no DDL, no target + // shards) rather than failing closed on the empty task set. The data plane + // applies it via its task-less VSchema-only path. + server := &capturingTernServer{remoteApplyID: "remote-finalizer-1"} // default Progress = COMPLETED + client, cleanup := testCapturingGRPCClient(t, server) + defer cleanup() + + apply := &storage.Apply{ + ID: 8, + ApplyIdentifier: "apply-finalizer", + PlanID: 99, + Database: "commerce", + DatabaseType: storage.DatabaseTypeStrata, + Environment: "staging", + State: state.Apply.Pending, + } + apply.SetOptions(storage.ApplyOptions{Target: "commerce-target"}) + operationID := int64(51) + client.storage = &mockStorage{ + applies: &mockApplyStore{apply: apply}, + tasks: &mockTaskStore{getByApplyIDErr: errors.New("finalizer drive must not load tasks")}, + plans: &mockPlanStore{plan: &storage.Plan{ + ID: apply.PlanID, + PlanIdentifier: "plan-finalizer", + SchemaFiles: schema.SchemaFiles{ + "commerce": {Files: map[string]string{vSchemaArtifactName: `{"sharded":true}`}}, + }, + Namespaces: map[string]*storage.NamespacePlanData{ + "commerce": {Artifacts: map[string]string{vSchemaArtifactName: `{"sharded":true}`}}, + }, + }}, + operations: &mockApplyOperationStore{ops: map[int64]*storage.ApplyOperation{ + operationID: { + ID: operationID, + ApplyID: apply.ID, + Deployment: "commerce-deployment", + OperationKey: "commerce/group_finalizer", + OperationKind: storage.ApplyOperationKindGroupFinalizer, + State: state.ApplyOperation.Pending, + }, + }}, + } + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + err := client.ResumeApplyOperation(ctx, apply, operationID) + require.NoError(t, err, "a group_finalizer must dispatch, not fail closed on no tasks") + + req := server.getApplyRequest() + require.NotNil(t, req, "expected the finalizer to dispatch a VSchema apply to remote Tern") + require.Len(t, req.DdlChanges, 1) + assert.Equal(t, ternv1.ChangeType_CHANGE_TYPE_VSCHEMA, req.DdlChanges[0].ChangeType) + assert.Equal(t, "commerce", req.DdlChanges[0].Namespace) + assert.Empty(t, req.TargetShards, "a namespace VSchema apply targets no shard") + assert.Contains(t, req.SchemaFiles, "commerce", "the vschema.json must be carried for a materializing remote") +} + +func TestGRPCClient_ResumeApplyOperationReflectsFinalizerTerminalStateOntoOperationRow(t *testing.T) { + // A task-less group_finalizer in a multi-operation apply carries no task rows, + // so the operator's task-derived projection can never move its operation row. + // The remote drive must therefore reflect the remote terminal state onto the + // operation row itself, or the operation is stranded in pending and the + // operator retries forever (and a terminal failure is silently lost). + cases := []struct { + name string + progress ternv1.State + progressSet bool + wantState string + }{ + {name: "completed", wantState: state.ApplyOperation.Completed}, // default Progress = COMPLETED + {name: "failed", progress: ternv1.State_STATE_FAILED, progressSet: true, wantState: state.ApplyOperation.Failed}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + server := &capturingTernServer{ + remoteApplyID: "remote-finalizer-terminal", + progressState: tc.progress, + progressStateSet: tc.progressSet, + } + client, cleanup := testCapturingGRPCClient(t, server) + defer cleanup() + + apply := &storage.Apply{ + ID: 8, + ApplyIdentifier: "apply-finalizer-terminal", + PlanID: 99, + Database: "commerce", + DatabaseType: storage.DatabaseTypeStrata, + Environment: "staging", + State: state.Apply.Pending, + } + apply.SetOptions(storage.ApplyOptions{Target: "commerce-target"}) + // The drive mutates its in-memory parent apply but, for a multi-operation + // scope, must not persist it (the operator owns parent state). Store an + // independent copy so the terminal reload reflects the persisted pending + // parent, not the drive's in-memory mutation. + storedParent := *apply + finalizerID := int64(51) + siblingID := int64(52) // a second operation makes this a multi-operation apply + operations := &mockApplyOperationStore{ops: map[int64]*storage.ApplyOperation{ + finalizerID: { + ID: finalizerID, + ApplyID: apply.ID, + Deployment: "commerce-deployment", + OperationKey: "commerce/group_finalizer", + OperationKind: storage.ApplyOperationKindGroupFinalizer, + State: state.ApplyOperation.Pending, + }, + siblingID: { + ID: siblingID, + ApplyID: apply.ID, + Deployment: "commerce-deployment", + OperationKey: "commerce/-80/mutes", + State: state.ApplyOperation.Completed, + }, + }} + client.storage = &mockStorage{ + applies: &mockApplyStore{apply: &storedParent}, + tasks: &mockTaskStore{getByApplyIDErr: errors.New("finalizer drive must not load tasks")}, + plans: &mockPlanStore{plan: &storage.Plan{ + ID: apply.PlanID, + PlanIdentifier: "plan-finalizer", + SchemaFiles: schema.SchemaFiles{ + "commerce": {Files: map[string]string{vSchemaArtifactName: `{"sharded":true}`}}, + }, + Namespaces: map[string]*storage.NamespacePlanData{ + "commerce": {Artifacts: map[string]string{vSchemaArtifactName: `{"sharded":true}`}}, + }, + }}, + operations: operations, + } + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + // Reaching a terminal remote state — success or failure — is a successful + // drive: the outcome lives in the persisted operation state, not a Go + // error (mirroring the non-suppressed terminal path). + require.NoError(t, client.ResumeApplyOperation(ctx, apply, finalizerID)) + + assert.Equal(t, tc.wantState, operations.ops[finalizerID].State, + "the finalizer operation row must reflect the remote terminal state, not stay pending") + assert.NotNil(t, operations.ops[finalizerID].CompletedAt, "a terminal operation row is stamped completed_at") + }) + } +} + func TestGRPCClient_ResumeApplyOperationDispatchParksBarrierCutoverRemotely(t *testing.T) { // On a multi-deployment apply under the barrier cutover policy, the remote // copy drive must instruct Tern to park at the cutover barrier (defer_cutover) diff --git a/pkg/tern/local_client.go b/pkg/tern/local_client.go index 98297ed8..f989c6b2 100644 --- a/pkg/tern/local_client.go +++ b/pkg/tern/local_client.go @@ -1150,11 +1150,21 @@ func (c *LocalClient) Plan(ctx context.Context, req *ternv1.PlanRequest) (*ternv Operation: ddl.StatementTypeToOp(tc.Operation), }) } - // Record per-shard membership so apply-create can rebuild per-shard - // operation groups. A SchemaChange with an empty shard targets the whole + // Record each changing shard's own changes so apply-create can rebuild + // per-shard operation groups with per-shard DDL (a keyspace whose shards + // diverge is persisted per shard, not collapsed; a shard is changing iff + // it has changes). A SchemaChange with an empty shard targets the whole // namespace (non-sharded engines) and contributes no shard rows. if shardName := strings.TrimSpace(sc.Shard.Name); shardName != "" { - sp := storage.ShardPlan{Shard: shardName, Namespace: ns, NeedsChange: true} + sp := storage.ShardPlan{Shard: shardName, Namespace: ns} + for _, tc := range sc.TableChanges { + sp.Changes = append(sp.Changes, storage.TableChange{ + Namespace: ns, + Table: tc.Table, + DDL: tc.DDL, + Operation: ddl.StatementTypeToOp(tc.Operation), + }) + } nsData.Shards = append(nsData.Shards, sp) allShardPlans = append(allShardPlans, sp) } @@ -1285,14 +1295,24 @@ func (c *LocalClient) Plan(ctx context.Context, req *ternv1.PlanRequest) (*ternv } // Surface per-shard plan metadata on the response too, for parity with the - // gRPC path: callers of Plan can display per-shard drift/membership. + // gRPC path: callers of Plan can display per-shard drift/membership, and the + // control plane rebuilds per-shard operation groups from each shard's own + // changes. var protoShards []*ternv1.ShardPlan for _, sp := range allShardPlans { - protoShards = append(protoShards, &ternv1.ShardPlan{ - Shard: sp.Shard, - Namespace: sp.Namespace, - NeedsChange: sp.NeedsChange, - }) + protoSP := &ternv1.ShardPlan{ + Shard: sp.Shard, + Namespace: sp.Namespace, + } + for _, ch := range sp.Changes { + protoSP.Changes = append(protoSP.Changes, &ternv1.TableChange{ + Namespace: ch.Namespace, + TableName: ch.Table, + Ddl: ch.DDL, + ChangeType: ddlActionToProtoChangeType(ch.Operation), + }) + } + protoShards = append(protoShards, protoSP) } return &ternv1.PlanResponse{ @@ -1371,6 +1391,70 @@ func (c *LocalClient) planMySQLNamespacesWithEngine(ctx context.Context, eng eng return result, nil } +// dispatchTargetShard validates a shard-scoped dispatch's target shard set and +// returns the single, trimmed shard name. The per-shard fan-out emits exactly +// one shard per operation, so more than one shard — or an empty/whitespace +// shard (which would build tasks with an invalid scope the engine rejects or, +// worse, mis-scopes) — is a malformed dispatch and fails closed. +func dispatchTargetShard(targetShards []string) (string, error) { + if len(targetShards) != 1 { + return "", fmt.Errorf("a sharded dispatch must target exactly one shard, got %d (%v)", len(targetShards), targetShards) + } + shard := strings.TrimSpace(targetShards[0]) + if shard == "" { + return "", fmt.Errorf("sharded dispatch has an empty target shard") + } + return shard, nil +} + +// scopedDispatchDDLChanges converts a shard-scoped dispatch's DDL changes to +// storage table changes, failing closed on a missing or malformed set. A +// shard-scoped dispatch is already scoped by the control-plane operator (one +// table's change for one shard, the per-shard fan-out the control plane owns), +// so it must carry valid, non-empty changes; falling back to the whole stored +// plan would apply unrelated tables on the targeted shard. The proto change type +// round-trips to the DDL action the stored plan would carry (create/alter/drop). +func scopedDispatchDDLChanges(changes []*ternv1.TableChange) ([]storage.TableChange, error) { + if len(changes) == 0 { + return nil, fmt.Errorf("shard-scoped dispatch carried no ddl_changes") + } + out := make([]storage.TableChange, 0, len(changes)) + for i, ch := range changes { + if ch == nil { + return nil, fmt.Errorf("shard-scoped dispatch ddl_change %d is nil", i) + } + // Trim before validating and storing: these values build operation keys and + // task rows, so preserved surrounding whitespace would yield surprising keys + // and reconciliation/progress mismatches. A shard-scoped dispatch is the + // per-(namespace, shard) fan-out the control plane owns, so the namespace is + // authoritative scope and must be present. + namespace := strings.TrimSpace(ch.Namespace) + table := strings.TrimSpace(ch.TableName) + ddl := strings.TrimSpace(ch.Ddl) + if namespace == "" { + return nil, fmt.Errorf("shard-scoped dispatch ddl_change %d has empty namespace", i) + } + if table == "" || ddl == "" { + return nil, fmt.Errorf("shard-scoped dispatch ddl_change %d has empty table or DDL", i) + } + op := protoChangeTypeToDDLAction(ch.ChangeType) + // A shard-scoped dispatch carries only table DDL (create/alter/drop) for one + // shard. A VSchema update is keyspace-wide, never shard-scoped — it is applied + // by the task-less group_finalizer path — so accepting it here would build a + // shard-tagged task with an unexpected operation. Reject it explicitly. + if op == "unknown" || op == "vschema_update" { + return nil, fmt.Errorf("shard-scoped dispatch ddl_change %d (table %q) has unsupported change type %v", i, table, ch.ChangeType) + } + out = append(out, storage.TableChange{ + Namespace: namespace, + Table: table, + DDL: ddl, + Operation: op, + }) + } + return out, nil +} + // planForApplyRequest resolves the plan for an apply. It prefers a plan row in // this deployment's own storage (the single-deployment path, and the primary // deployment of a multi-deployment apply). When no local plan exists, a @@ -1409,9 +1493,11 @@ func (c *LocalClient) materializeApplyRequestPlan(ctx context.Context, req *tern return nil, fmt.Errorf("materialize plan %s: apply request carried no DDL changes or schema files", req.PlanId) } - if err := c.verifyMaterializedPlanMatchesLiveSchema(ctx, req, schemaFiles); err != nil { - return nil, fmt.Errorf("materialize plan %s: %w", req.PlanId, err) - } + // The dispatch carries the reviewed plan's DDL verbatim, so the materialized + // plan is the reviewed plan — no re-plan-and-compare here. Drift between when + // the plan was reviewed and when it applies is reconciled the same way on + // every path: replanAndFilterTasks re-plans against live schema at apply/resume + // and drops or updates work, and the engine validates each change at execution. target := req.Target if target == "" { @@ -1530,195 +1616,6 @@ func materializedTableChangeOperation(ch *ternv1.TableChange) (string, error) { return op, nil } -// driftChangeKey identifies a single table DDL change for drift comparison. Two -// changes are the same iff they target the same namespace and table with the -// same operation and canonicalized DDL. -type driftChangeKey struct { - namespace string - table string - operation string - ddl string -} - -// driftChangeMultiset counts table DDL changes by key so duplicate changes are -// compared exactly (set equality would silently tolerate a duplicated change). -type driftChangeMultiset map[driftChangeKey]int - -// verifyMaterializedPlanMatchesLiveSchema fails closed unless the reviewed DDL -// carried by a dispatch request is exactly what this deployment would plan -// against its own live schema. A non-primary deployment never planned locally — -// the reviewed plan was computed against the primary deployment's live schema — -// so blindly materializing it would silently replay the primary's DDL on a -// deployment whose schema may have drifted. Recomputing the local diff and -// requiring an exact match keeps non-primary drift from being applied unreviewed. -func (c *LocalClient) verifyMaterializedPlanMatchesLiveSchema(ctx context.Context, req *ternv1.ApplyRequest, schemaFiles schema.SchemaFiles) error { - if len(req.TargetShards) > 0 { - return fmt.Errorf("drift guard does not support shard-scoped applies (target_shards=%v); a whole-deployment replan cannot be compared to a shard subset", req.TargetShards) - } - - result, err := c.planWithEngine(ctx, &ternv1.PlanRequest{ - Database: c.config.Database, - Type: c.config.Type, - Environment: req.Environment, - Target: req.Target, - }, c.config.Database, schemaFiles) - if err != nil { - return fmt.Errorf("recompute local plan: %w", err) - } - - recomputed, err := c.driftMultisetFromPlanResult(result) - if err != nil { - return fmt.Errorf("recomputed plan: %w", err) - } - dispatched, err := c.driftMultisetFromApplyRequest(req.DdlChanges) - if err != nil { - return fmt.Errorf("dispatched plan: %w", err) - } - if err := compareDriftMultisets(recomputed, dispatched); err != nil { - return fmt.Errorf("local schema has drifted from the reviewed plan (database %q, target %q): %w", c.config.Database, req.Target, err) - } - - if err := compareVSchemaParity(vschemaNamespacesFromPlanResult(c, result), vschemaNamespacesFromApplyRequest(c, req.DdlChanges)); err != nil { - return fmt.Errorf("local vschema has drifted from the reviewed plan (database %q, target %q): %w", c.config.Database, req.Target, err) - } - return nil -} - -// driftMultisetFromPlanResult builds the table DDL multiset this deployment -// would plan against its own live schema. VSchema changes carry no table DDL and -// are compared separately, so they are excluded here. -func (c *LocalClient) driftMultisetFromPlanResult(result *engine.PlanResult) (driftChangeMultiset, error) { - ms := driftChangeMultiset{} - for _, sc := range result.Changes { - ns := c.planNamespace(sc.Namespace) - for _, tc := range sc.TableChanges { - canon, err := canonicalDDLForDrift(tc.DDL) - if err != nil { - return nil, fmt.Errorf("table %q: %w", tc.Table, err) - } - ms[driftChangeKey{ns, tc.Table, ddl.StatementTypeToOp(tc.Operation), canon}]++ - } - } - return ms, nil -} - -// driftMultisetFromApplyRequest builds the table DDL multiset the dispatch -// request carries as the reviewed, authoritative plan. VSchema changes are -// compared separately and excluded here. Nil entries are corrupt input and fail -// closed. -func (c *LocalClient) driftMultisetFromApplyRequest(changes []*ternv1.TableChange) (driftChangeMultiset, error) { - ms := driftChangeMultiset{} - for _, ch := range changes { - if ch == nil { - return nil, fmt.Errorf("dispatch request carried a nil table change") - } - if ch.ChangeType == ternv1.ChangeType_CHANGE_TYPE_VSCHEMA { - continue - } - op, err := materializedTableChangeOperation(ch) - if err != nil { - return nil, err - } - canon, err := canonicalDDLForDrift(ch.Ddl) - if err != nil { - return nil, fmt.Errorf("table %q: %w", ch.TableName, err) - } - ms[driftChangeKey{c.planNamespace(ch.Namespace), ch.TableName, op, canon}]++ - } - return ms, nil -} - -// canonicalDDLForDrift normalizes a DDL statement for comparison and fails -// closed if it cannot be parsed — ddl.Canonicalize returns the input unchanged -// on a parse failure, so an unparseable statement would otherwise compare by raw -// text and could mask drift. -func canonicalDDLForDrift(raw string) (string, error) { - raw = strings.TrimSpace(raw) - if raw == "" { - return "", fmt.Errorf("empty DDL") - } - if _, _, err := ddl.ClassifyStatement(raw); err != nil { - return "", fmt.Errorf("unparseable DDL: %w", err) - } - return ddl.Canonicalize(raw), nil -} - -// compareDriftMultisets reports drift unless the recomputed and dispatched table -// DDL multisets are exactly equal. -func compareDriftMultisets(recomputed, dispatched driftChangeMultiset) error { - var missing, unexpected []string - for key, want := range dispatched { - if recomputed[key] < want { - missing = append(missing, formatDriftKey(key)) - } - } - for key, have := range recomputed { - if have > dispatched[key] { - unexpected = append(unexpected, formatDriftKey(key)) - } - } - if len(missing) == 0 && len(unexpected) == 0 { - return nil - } - sort.Strings(missing) - sort.Strings(unexpected) - return fmt.Errorf("reviewed changes this deployment would not plan: %v; changes this deployment would plan that were not reviewed: %v", missing, unexpected) -} - -func formatDriftKey(k driftChangeKey) string { - // Include the canonicalized DDL: the multiset keys on it, so two changes for - // the same namespace/table/operation that differ only in DDL must render - // differently or the drift message would list identical-looking entries on - // both sides and hide what actually drifted. - return fmt.Sprintf("%s.%s/%s (%s)", k.namespace, k.table, k.operation, k.ddl) -} - -// vschemaNamespacesFromPlanResult returns the namespaces the recomputed plan -// detected a vschema change for. -func vschemaNamespacesFromPlanResult(c *LocalClient, result *engine.PlanResult) map[string]bool { - out := map[string]bool{} - for _, sc := range result.Changes { - if sc.Metadata["vschema_changed"] == "true" { - out[c.planNamespace(sc.Namespace)] = true - } - } - return out -} - -// vschemaNamespacesFromApplyRequest returns the namespaces the dispatch request -// carries a vschema change for. -func vschemaNamespacesFromApplyRequest(c *LocalClient, changes []*ternv1.TableChange) map[string]bool { - out := map[string]bool{} - for _, ch := range changes { - if ch != nil && ch.ChangeType == ternv1.ChangeType_CHANGE_TYPE_VSCHEMA { - out[c.planNamespace(ch.Namespace)] = true - } - } - return out -} - -// compareVSchemaParity reports drift unless the recomputed and dispatched -// vschema-changed namespaces are exactly equal. -func compareVSchemaParity(recomputed, dispatched map[string]bool) error { - var missing, unexpected []string - for ns := range dispatched { - if !recomputed[ns] { - missing = append(missing, ns) - } - } - for ns := range recomputed { - if !dispatched[ns] { - unexpected = append(unexpected, ns) - } - } - if len(missing) == 0 && len(unexpected) == 0 { - return nil - } - sort.Strings(missing) - sort.Strings(unexpected) - return fmt.Errorf("reviewed vschema changes this deployment would not plan: %v; vschema changes this deployment would plan that were not reviewed: %v", missing, unexpected) -} - // existingIdempotentApply returns the apply previously created for // req.IdempotencyKey, or nil when the key is empty or unseen. The match is // returned regardless of the existing apply's state: the key encodes the @@ -1794,11 +1691,33 @@ func (c *LocalClient) Apply(ctx context.Context, req *ternv1.ApplyRequest) (*ter ErrorMessage: "plan not found", }, nil } + // Determine the dispatch's shard scope. A sharded engine's work is dispatched + // one apply_operation per shard, so a request that carries target shards is + // scoped to that single shard: it drives the operation's own DDL changes + // (req.DdlChanges) and tags its tasks with the shard, so the engine receives + // exactly one target shard. A whole-deployment or non-sharded apply carries no + // target shard and uses the stored plan unchanged — keeping that path + // byte-for-byte as before. More than one target shard is a malformed dispatch + // (the per-shard fan-out emits one shard per operation) and fails closed. ddlChanges := plan.FlatDDLChanges() + dispatchShard := "" + if len(req.TargetShards) > 0 { + shard, err := dispatchTargetShard(req.TargetShards) + if err != nil { + return nil, fmt.Errorf("apply for plan %s: %w", req.PlanId, err) + } + scoped, err := scopedDispatchDDLChanges(req.DdlChanges) + if err != nil { + return nil, fmt.Errorf("apply for plan %s: %w", req.PlanId, err) + } + dispatchShard = shard + ddlChanges = scoped + } c.logger.Info("Apply: retrieved plan", "plan_id", req.PlanId, "plan_identifier", plan.PlanIdentifier, "ddl_change_count", len(ddlChanges), + "target_shards", req.TargetShards, "database", plan.Database, ) @@ -1907,6 +1826,7 @@ func (c *LocalClient) Apply(ctx context.Context, req *ternv1.ApplyRequest) (*ter Options: optionsJSON, TableName: ddlChange.Table, Namespace: ddlChange.Namespace, + Shard: dispatchShard, DDL: ddlChange.DDL, DDLAction: ddlChange.Operation, CreatedAt: now, @@ -1996,34 +1916,6 @@ func (c *LocalClient) getEngine() engine.Engine { } } -// engineProgressIsExternallyAuthoritative reports whether the progress read path -// may query the engine directly for live progress, or must serve progress from -// shared storage instead. -// -// One logical data-plane route can be served by multiple instances that share -// storage. A progress request can be balanced onto any instance. An engine -// whose progress comes from instance-local memory only knows the schema change -// that instance is running, so an instance that is not driving the queried -// schema change would observe unrelated or stale state — its progress must come -// from storage, which the driving instance keeps current. An engine whose -// progress comes from authoritative external state returns the same correct -// result on every instance and may be queried directly. -// -// This fails closed: an engine that does not declare its progress authoritative -// is served from storage. -func engineProgressIsExternallyAuthoritative(eng engine.Engine) bool { - source, ok := eng.(engine.ExternallyAuthoritativeProgress) - return ok && source.ProgressIsExternallyAuthoritative() -} - -// SupportsShardedApplyFanout reports whether this local client can drive -// sharded work as independent SchemaBot apply operations. Engines that expose -// externally authoritative progress own their execution ordering outside -// SchemaBot, so apply-create must keep them as one operation. -func (c *LocalClient) SupportsShardedApplyFanout() bool { - return !engineProgressIsExternallyAuthoritative(c.getEngine()) -} - // Progress returns detailed progress for an active schema change. // Returns ALL tasks for the current apply: completed, running, and pending. // req.ApplyId is required so progress is always scoped to a single apply. diff --git a/pkg/tern/local_client_shardplan_test.go b/pkg/tern/local_client_shardplan_test.go index cb80204a..5d9ab8e7 100644 --- a/pkg/tern/local_client_shardplan_test.go +++ b/pkg/tern/local_client_shardplan_test.go @@ -73,8 +73,16 @@ func TestPlanRecordsPerShardSchemaChanges(t *testing.T) { ns := store.created.Namespaces["resolute"] require.NotNil(t, ns, "namespace plan data must exist for the changed keyspace") require.Len(t, ns.Shards, 2) - assert.Equal(t, storage.ShardPlan{Shard: "-80", Namespace: "resolute", NeedsChange: true}, ns.Shards[0]) - assert.Equal(t, storage.ShardPlan{Shard: "80-", Namespace: "resolute", NeedsChange: true}, ns.Shards[1]) + // Each shard records its own changes (here identical) so a later divergence is + // representable; the namespace-level Tables stay deduped. + wantChange := storage.TableChange{ + Namespace: "resolute", + Table: "users", + DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", + Operation: "alter", + } + assert.Equal(t, storage.ShardPlan{Shard: "-80", Namespace: "resolute", Changes: []storage.TableChange{wantChange}}, ns.Shards[0]) + assert.Equal(t, storage.ShardPlan{Shard: "80-", Namespace: "resolute", Changes: []storage.TableChange{wantChange}}, ns.Shards[1]) require.Len(t, ns.Tables, 1, "the table repeated across shards is deduped at the namespace level") assert.Equal(t, "users", ns.Tables[0].Table) } @@ -123,8 +131,13 @@ func TestPlanSurfacesShardPlanOnResponse(t *testing.T) { require.Len(t, resp.Shards, 2) assert.Equal(t, "-80", resp.Shards[0].Shard) assert.Equal(t, "resolute", resp.Shards[0].Namespace) - assert.True(t, resp.Shards[0].NeedsChange) assert.Equal(t, "80-", resp.Shards[1].Shard) + // Each shard surfaces its own changes so the control plane can rebuild the + // fan-out (and present per-shard divergence) from the response. + require.Len(t, resp.Shards[0].Changes, 1) + assert.Equal(t, "users", resp.Shards[0].Changes[0].TableName) + require.Len(t, resp.Shards[1].Changes, 1) + assert.Equal(t, "users", resp.Shards[1].Changes[0].TableName) // The repeated table collapses to a single namespace-level proto change. require.Len(t, resp.Changes, 1) require.Len(t, resp.Changes[0].TableChanges, 1) diff --git a/pkg/tern/local_control_resume.go b/pkg/tern/local_control_resume.go index 2b0d804e..2f2a159d 100644 --- a/pkg/tern/local_control_resume.go +++ b/pkg/tern/local_control_resume.go @@ -324,7 +324,7 @@ func (c *LocalClient) resumeApplySequential(ctx context.Context, apply *storage. // between re-plan (which reads schema) and Spirit's cutover (which renames // the shadow table). If Spirit completed the cutover after the re-plan read // the schema, the table already has the desired changes. - needsChange, err := c.tableStillNeedsChange(ctx, apply, plan, task.TableName) + needsChange, err := c.tableStillNeedsChange(ctx, apply, plan, task) if err != nil { c.logger.Warn("could not verify table schema state, proceeding with apply", "task_id", task.TaskIdentifier, "table", task.TableName, "error", err) @@ -364,21 +364,45 @@ func (c *LocalClient) resumeApplySequential(ctx context.Context, apply *storage. c.logger.Info("sequential resume finished", "apply_id", apply.ApplyIdentifier, "state", apply.State) } -// tableStillNeedsChange does a quick re-plan to check if a specific table -// still needs schema changes. Returns false if the table already has the +// shardTableKey identifies a table change within a specific (namespace, shard). +// A plan is keyed by (Namespace, Shard), so the same table name can appear in +// more than one namespace (multiple Vitess keyspaces) and on more than one shard +// within a namespace; both must be in the key to avoid conflating tasks. For a +// non-sharded engine the shard is empty, so keying degrades to (namespace, +// table) and matches the pre-sharding behavior. +type shardTableKey struct { + namespace string + shard string + table string +} + +// replanShardTableDDL indexes a re-plan's table changes by +// (namespace, shard, table) -> DDL so the resume/recovery path reconciles each +// task against its own namespace and shard. A sharded engine emits one +// SchemaChange per (namespace, shard) and the same table repeats across them, so +// keying by table name alone would conflate tasks: another shard's (or another +// keyspace's) remaining diff could keep this task active, or update it with the +// wrong DDL. +func replanShardTableDDL(result *engine.PlanResult) map[shardTableKey]string { + out := make(map[shardTableKey]string) + for _, sc := range result.Changes { + for _, tc := range sc.TableChanges { + out[shardTableKey{namespace: sc.Namespace, shard: sc.Shard.Name, table: tc.Table}] = tc.DDL + } + } + return out +} + +// tableStillNeedsChange does a quick re-plan to check if a task's table on its +// own shard still needs schema changes. Returns false if it already has the // desired schema (e.g., Spirit's cutover completed during the stop sequence). -func (c *LocalClient) tableStillNeedsChange(ctx context.Context, apply *storage.Apply, plan *storage.Plan, tableName string) (bool, error) { +func (c *LocalClient) tableStillNeedsChange(ctx context.Context, apply *storage.Apply, plan *storage.Plan, task *storage.Task) (bool, error) { result, err := c.planWithEngine(ctx, &ternv1.PlanRequest{}, apply.Database, plan.SchemaFiles) if err != nil { return false, fmt.Errorf("re-plan check failed: %w", err) } - - for _, tc := range result.FlatTableChanges() { - if tc.Table == tableName { - return true, nil - } - } - return false, nil + _, stillNeeded := replanShardTableDDL(result)[shardTableKey{namespace: task.Namespace, shard: task.Shard, table: task.TableName}] + return stillNeeded, nil } // replanResult holds the result of replanAndFilterTasks. @@ -400,13 +424,10 @@ func (c *LocalClient) replanAndFilterTasks(ctx context.Context, apply *storage.A return nil, fmt.Errorf("re-plan failed: %w", err) } - // Build set of tables that still need changes - needsChange := make(map[string]bool, len(replanOut.FlatTableChanges())) - replanDDL := make(map[string]string, len(replanOut.FlatTableChanges())) - for _, tc := range replanOut.FlatTableChanges() { - needsChange[tc.Table] = true - replanDDL[tc.Table] = tc.DDL - } + // Index the re-plan's remaining changes by (namespace, shard, table) so each + // task is reconciled against its own namespace and shard rather than conflated + // with same-named tables in other keyspaces or on other shards. + replanDDL := replanShardTableDDL(replanOut) // Partition tasks: already-done vs still-needed now := time.Now() @@ -416,17 +437,16 @@ func (c *LocalClient) replanAndFilterTasks(ctx context.Context, apply *storage.A if task.State == state.Task.Completed { continue } - if !needsChange[task.TableName] { - // Table no longer in diff — it already completed + ddl, stillNeeded := replanDDL[shardTableKey{namespace: task.Namespace, shard: task.Shard, table: task.TableName}] + if !stillNeeded { + // This shard's table is no longer in the diff — it already completed. task.ProgressPercent = 100 task.CompletedAt = &now c.transitionTaskState(ctx, task, apply.ID, state.Task.Completed, fmt.Sprintf("Task %s already completed (re-plan shows no remaining changes)", task.TaskIdentifier)) completedCount++ } else { - if ddl, ok := replanDDL[task.TableName]; ok { - task.DDL = ddl - } + task.DDL = ddl activeTasks = append(activeTasks, task) } } diff --git a/pkg/tern/local_control_resume_test.go b/pkg/tern/local_control_resume_test.go new file mode 100644 index 00000000..3d5143e3 --- /dev/null +++ b/pkg/tern/local_control_resume_test.go @@ -0,0 +1,51 @@ +package tern + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/block/schemabot/pkg/engine" +) + +// A sharded re-plan repeats the same table across shards (and across keyspaces), +// each with its own DDL. replanShardTableDDL must key by (namespace, shard, +// table) so one shard's — or one keyspace's — remaining diff never reconciles +// another's task. Keying by less than the full tuple would collapse these +// entries and conflate tasks. +func TestReplanShardTableDDLKeysPerNamespaceAndShard(t *testing.T) { + ddlA := "ALTER TABLE `mutes` ADD INDEX (`created_at`)" + ddlB := "ALTER TABLE `mutes` ADD INDEX (`updated_at`)" // 80- has drifted differently + ddlC := "ALTER TABLE `mutes` ADD INDEX (`deleted_at`)" // a second keyspace, same shard+table + result := &engine.PlanResult{ + Changes: []engine.SchemaChange{ + {Namespace: "ks1", Shard: engine.Shard{Name: "-80"}, TableChanges: []engine.TableChange{{Table: "mutes", DDL: ddlA}}}, + {Namespace: "ks1", Shard: engine.Shard{Name: "80-"}, TableChanges: []engine.TableChange{{Table: "mutes", DDL: ddlB}}}, + {Namespace: "ks2", Shard: engine.Shard{Name: "-80"}, TableChanges: []engine.TableChange{{Table: "mutes", DDL: ddlC}}}, + }, + } + + got := replanShardTableDDL(result) + + require.Len(t, got, 3, "same table across shards and keyspaces must produce three distinct keys") + assert.Equal(t, ddlA, got[shardTableKey{namespace: "ks1", shard: "-80", table: "mutes"}]) + assert.Equal(t, ddlB, got[shardTableKey{namespace: "ks1", shard: "80-", table: "mutes"}]) + assert.Equal(t, ddlC, got[shardTableKey{namespace: "ks2", shard: "-80", table: "mutes"}], "the same shard+table in another keyspace is not conflated") +} + +// For a non-sharded engine the shard name is empty, so keying degrades to +// (namespace, table) and matches the pre-sharding lookup. +func TestReplanShardTableDDLNonShardedDegradesToTable(t *testing.T) { + ddl := "ALTER TABLE `mutes` ADD INDEX (`created_at`)" + result := &engine.PlanResult{ + Changes: []engine.SchemaChange{ + {Namespace: "commerce", TableChanges: []engine.TableChange{{Table: "mutes", DDL: ddl}}}, + }, + } + + got := replanShardTableDDL(result) + + require.Len(t, got, 1) + assert.Equal(t, ddl, got[shardTableKey{namespace: "commerce", table: "mutes"}]) +} diff --git a/pkg/tern/local_plan_drift_guard_test.go b/pkg/tern/local_plan_drift_guard_test.go deleted file mode 100644 index b5404a57..00000000 --- a/pkg/tern/local_plan_drift_guard_test.go +++ /dev/null @@ -1,219 +0,0 @@ -package tern - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/block/schemabot/pkg/engine" - ternv1 "github.com/block/schemabot/pkg/proto/ternv1" - "github.com/block/schemabot/pkg/storage" - "github.com/block/spirit/pkg/statement" -) - -// A non-primary deployment whose recomputed local plan exactly matches the -// reviewed DDL materializes the plan: there is no drift to block. -func TestDriftGuard_MatchMaterializes(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }, createID: 5} - c := newPlanMaterializeClientWithPlan(store, alterUsersEmailPlan()) - - got, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_ok", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.NoError(t, err) - require.NotNil(t, got) - assert.Equal(t, int64(5), got.ID) -} - -// Whitespace and quoting differences between the recomputed DDL and the reviewed -// DDL are normalized away by canonicalization, so they are not drift. -func TestDriftGuard_CanonicalizationTolerant(t *testing.T) { - recomputed := &engine.PlanResult{Changes: []engine.SchemaChange{{ - Namespace: "testapp", - TableChanges: []engine.TableChange{{ - Table: "users", - Operation: statement.StatementAlterTable, - DDL: "ALTER TABLE users ADD COLUMN email varchar(255)", - }}, - }}} - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }, createID: 6} - c := newPlanMaterializeClientWithPlan(store, recomputed) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_canon", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.NoError(t, err) -} - -// A reviewed change this deployment would not plan (local schema already has the -// column) fails closed rather than replaying unreviewed DDL. -func TestDriftGuard_MissingReviewedChangeFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - c := newPlanMaterializeClientWithPlan(store, &engine.PlanResult{}) // recomputes no changes - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_drift", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "drifted") - assert.Nil(t, store.created, "must not materialize a drifted plan") -} - -// A change this deployment would plan that was never reviewed (local schema is -// behind the desired files in a way the primary did not see) fails closed. -func TestDriftGuard_UnexpectedLocalChangeFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - c := newPlanMaterializeClientWithPlan(store, alterUsersEmailPlan()) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_extra", - DdlChanges: []*ternv1.TableChange{ - {TableName: "orders", Ddl: "CREATE TABLE `orders` (`id` bigint)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_CREATE, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "drifted") -} - -// Different DDL for the same table/operation is drift even though the -// namespace/table/action triple matches. -func TestDriftGuard_DifferentDDLSameTableFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - c := newPlanMaterializeClientWithPlan(store, alterUsersEmailPlan()) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_diff_ddl", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `phone` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "drifted") -} - -// A shard-scoped apply cannot be drift-checked against a whole-deployment -// replan, so it fails closed. -func TestDriftGuard_ShardScopedFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - c := newPlanMaterializeClientWithPlan(store, alterUsersEmailPlan()) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_shard", - TargetShards: []string{"-80"}, - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "shard") -} - -// A vschema change the reviewed plan carries but this deployment would not plan -// is drift, even when the table DDL matches exactly. -func TestDriftGuard_VSchemaParityFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - // Recomputed plan has the table change but no vschema change. - c := newPlanMaterializeClientWithPlan(store, alterUsersEmailPlan()) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_vschema", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - {TableName: "VSchema: testapp", ChangeType: ternv1.ChangeType_CHANGE_TYPE_VSCHEMA, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "vschema") -} - -// A matching vschema change on both sides is not drift. -func TestDriftGuard_VSchemaParityMatches(t *testing.T) { - recomputed := &engine.PlanResult{Changes: []engine.SchemaChange{{ - Namespace: "testapp", - Metadata: map[string]string{"vschema_changed": "true"}, - }}} - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }, createID: 9} - c := newPlanMaterializeClientWithPlan(store, recomputed) - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_vschema_ok", - DdlChanges: []*ternv1.TableChange{ - {TableName: "VSchema: testapp", ChangeType: ternv1.ChangeType_CHANGE_TYPE_VSCHEMA, Namespace: "testapp"}, - }, - SchemaFiles: map[string]*ternv1.SchemaFiles{ - "testapp": {Files: map[string]string{vSchemaArtifactName: `{"sharded":true}`}}, - }, - }) - - require.NoError(t, err) -} - -// An engine failure during recompute surfaces as an error: the guard never -// fails open when it cannot recompute. -func TestDriftGuard_RecomputeErrorFailsClosed(t *testing.T) { - store := &fakePlanStore{getFn: func(string) (*storage.Plan, error) { return nil, nil }} - c := newPlanMaterializeClient(store) - c.config.TargetDSN = "user:pass@tcp(127.0.0.1:3306)/testapp" - c.spiritEngine = fakePlanEngine{ - planFn: func(ctx context.Context, _ *engine.PlanRequest) (*engine.PlanResult, error) { - return nil, errors.New("engine boom") - }, - } - - _, err := c.planForApplyRequest(t.Context(), &ternv1.ApplyRequest{ - PlanId: "plan_engine_err", - DdlChanges: []*ternv1.TableChange{ - {TableName: "users", Ddl: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, Namespace: "testapp"}, - }, - }) - - require.Error(t, err) - assert.Contains(t, err.Error(), "recompute local plan") -} - -// canonicalDDLForDrift must fail closed on DDL it cannot parse: ddl.Canonicalize -// returns its input unchanged on a parse failure, so without this guard an -// unparseable statement would silently compare by raw text and could mask drift. -func TestCanonicalDDLForDrift_FailsClosed(t *testing.T) { - t.Run("unparseable DDL is rejected", func(t *testing.T) { - _, err := canonicalDDLForDrift("this is not valid sql") - require.Error(t, err) - assert.Contains(t, err.Error(), "unparseable DDL") - }) - - t.Run("empty DDL is rejected", func(t *testing.T) { - _, err := canonicalDDLForDrift(" ") - require.Error(t, err) - assert.Contains(t, err.Error(), "empty DDL") - }) - - t.Run("parseable DDL is canonicalized", func(t *testing.T) { - // Whitespace and unquoted identifiers normalize to the same canonical form - // regardless of incidental formatting, so equivalent DDL compares equal. - spaced, err := canonicalDDLForDrift("ALTER TABLE users ADD COLUMN email varchar(255)") - require.NoError(t, err) - quoted, err := canonicalDDLForDrift("ALTER TABLE `users` ADD COLUMN `email` varchar(255)") - require.NoError(t, err) - assert.Equal(t, quoted, spaced) - assert.NotEmpty(t, spaced) - }) -} diff --git a/pkg/tern/progress_ownership_test.go b/pkg/tern/progress_ownership_test.go deleted file mode 100644 index 931b9003..00000000 --- a/pkg/tern/progress_ownership_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package tern - -import ( - "testing" - - "github.com/block/schemabot/pkg/engine" - "github.com/block/schemabot/pkg/storage" - "github.com/stretchr/testify/assert" -) - -// authoritativeEngine models an engine whose progress is read from authoritative -// external state (e.g. a remote online-DDL service), so it is correct on any -// instance. -type authoritativeEngine struct { - engine.Engine - authoritative bool -} - -func (e *authoritativeEngine) ProgressIsExternallyAuthoritative() bool { - return e.authoritative -} - -// instanceLocalEngine models an engine whose progress comes from instance-local -// memory and does not declare the capability at all. -type instanceLocalEngine struct { - engine.Engine -} - -// The progress read path may query an engine directly only when that engine's -// progress is authoritative regardless of which instance answers. Instance-local -// engines must be served from shared storage instead, and an engine that does -// not declare the capability must default to storage (fail closed). -func TestEngineProgressIsExternallyAuthoritative(t *testing.T) { - tests := []struct { - name string - eng engine.Engine - want bool - }{ - { - name: "engine declaring authoritative progress is queried directly", - eng: &authoritativeEngine{authoritative: true}, - want: true, - }, - { - name: "engine declaring non-authoritative progress is served from storage", - eng: &authoritativeEngine{authoritative: false}, - want: false, - }, - { - name: "engine without the capability defaults to storage", - eng: &instanceLocalEngine{}, - want: false, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.want, engineProgressIsExternallyAuthoritative(tc.eng)) - }) - } -} - -func TestLocalClientSupportsShardedApplyFanoutOnlyForInstanceLocalEngines(t *testing.T) { - assert.False(t, (&LocalClient{ - config: LocalConfig{Type: storage.DatabaseTypeMySQL}, - spiritEngine: &authoritativeEngine{authoritative: true}, - }).SupportsShardedApplyFanout()) - - assert.True(t, (&LocalClient{ - config: LocalConfig{Type: storage.DatabaseTypeMySQL}, - spiritEngine: &instanceLocalEngine{}, - }).SupportsShardedApplyFanout()) -} diff --git a/pkg/tern/sharded_apply_dispatch_test.go b/pkg/tern/sharded_apply_dispatch_test.go new file mode 100644 index 00000000..4c69e0c2 --- /dev/null +++ b/pkg/tern/sharded_apply_dispatch_test.go @@ -0,0 +1,105 @@ +package tern + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ternv1 "github.com/block/schemabot/pkg/proto/ternv1" +) + +// A shard-scoped dispatch carries the control plane's authoritative, +// already-scoped DDL changes for one apply_operation; the data plane executes +// exactly those. This is what lets a per-(shard, table) operation drive only its +// own table change. +func TestScopedDispatchDDLChangesHonorsDispatchedScope(t *testing.T) { + got, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{ + Namespace: "cdb_resolute_sharded", + TableName: "mutes", + Ddl: "ALTER TABLE `mutes` ADD INDEX (`created_at`)", + ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, + }}) + require.NoError(t, err) + require.Len(t, got, 1) + assert.Equal(t, "mutes", got[0].Table) + assert.Equal(t, "cdb_resolute_sharded", got[0].Namespace) + assert.Equal(t, "alter", got[0].Operation, "proto change type round-trips to the plan's DDL action") + assert.Contains(t, got[0].DDL, "ADD INDEX") +} + +// A shard-scoped dispatch is already scoped by the control plane, so it must +// carry valid, non-empty changes. Anything malformed fails closed rather than +// falling back to the whole plan (which would apply unrelated tables on the +// targeted shard). +func TestDispatchTargetShard(t *testing.T) { + t.Run("single shard is trimmed", func(t *testing.T) { + shard, err := dispatchTargetShard([]string{" -80 "}) + require.NoError(t, err) + assert.Equal(t, "-80", shard) + }) + t.Run("zero shards", func(t *testing.T) { + _, err := dispatchTargetShard(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "exactly one shard") + }) + t.Run("more than one shard", func(t *testing.T) { + _, err := dispatchTargetShard([]string{"-80", "80-"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "exactly one shard") + }) + t.Run("empty after trim fails closed", func(t *testing.T) { + _, err := dispatchTargetShard([]string{" "}) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty target shard") + }) +} + +func TestScopedDispatchDDLChangesFailsClosed(t *testing.T) { + t.Run("no changes", func(t *testing.T) { + _, err := scopedDispatchDDLChanges(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "no ddl_changes") + }) + t.Run("nil entry", func(t *testing.T) { + _, err := scopedDispatchDDLChanges([]*ternv1.TableChange{nil}) + require.Error(t, err) + assert.Contains(t, err.Error(), "nil") + }) + t.Run("empty namespace", func(t *testing.T) { + // The namespace is authoritative scope for a shard-scoped dispatch. + _, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{TableName: "mutes", Ddl: "x", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty namespace") + }) + t.Run("empty table or DDL", func(t *testing.T) { + _, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{Namespace: "ks", TableName: "mutes", ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty table or DDL") + }) + t.Run("unsupported change type", func(t *testing.T) { + _, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{Namespace: "ks", TableName: "mutes", Ddl: "ALTER TABLE `mutes` ADD INDEX (`x`)", ChangeType: ternv1.ChangeType_CHANGE_TYPE_OTHER}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "unsupported change type") + }) + t.Run("vschema is not shard-scoped", func(t *testing.T) { + // A VSchema update is keyspace-wide (applied by the task-less + // group_finalizer), never shard-scoped — reject it here. + _, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{Namespace: "ks", TableName: "mutes", Ddl: "x", ChangeType: ternv1.ChangeType_CHANGE_TYPE_VSCHEMA}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "unsupported change type") + }) + t.Run("values are trimmed before storing", func(t *testing.T) { + got, err := scopedDispatchDDLChanges([]*ternv1.TableChange{{ + Namespace: " ks ", + TableName: " mutes ", + Ddl: " ALTER TABLE `mutes` ADD INDEX (`x`) ", + ChangeType: ternv1.ChangeType_CHANGE_TYPE_ALTER, + }}) + require.NoError(t, err) + require.Len(t, got, 1) + assert.Equal(t, "ks", got[0].Namespace) + assert.Equal(t, "mutes", got[0].Table) + assert.Equal(t, "ALTER TABLE `mutes` ADD INDEX (`x`)", got[0].DDL, "surrounding whitespace must not leak into operation keys/tasks") + }) +}