Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 16 additions & 49 deletions pkg/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,8 @@ func TestExecutePlanPersistsShardPlans(t *testing.T) {
}},
}},
Shards: []*ternv1.ShardPlan{
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "commerce", Shard: "80-", NeedsChange: false},
{Namespace: "commerce", Shard: "-80"},
{Namespace: "commerce", Shard: "80-"},
},
},
}
Expand Down Expand Up @@ -1057,8 +1057,8 @@ func TestExecutePlanPersistsShardPlans(t *testing.T) {
require.NotNil(t, resp)
require.NotNil(t, plans.created)
assert.Equal(t, []storage.ShardPlan{
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "commerce", Shard: "80-", NeedsChange: false},
{Namespace: "commerce", Shard: "-80"},
{Namespace: "commerce", Shard: "80-"},
}, plans.created.Shards)
}

Expand Down Expand Up @@ -1898,7 +1898,7 @@ func TestCreateStoredApplyFansOutOperationsForResolvedTargets(t *testing.T) {
controls: &memoryControlRequestStore{},
}, cfg, map[string]tern.Client{}, logger)

apply, storedApplyID, err := svc.createStoredApply(t.Context(), executeApplyTestPlan(), ApplyRequest{Environment: "staging"}, nil, "apply-fanout", false)
apply, storedApplyID, err := svc.createStoredApply(t.Context(), executeApplyTestPlan(), ApplyRequest{Environment: "staging"}, nil, "apply-fanout")

require.NoError(t, err)
assert.Equal(t, int64(123), storedApplyID)
Expand Down Expand Up @@ -1941,10 +1941,11 @@ func TestCreateStoredApplyFansOutShardedPlanOperations(t *testing.T) {
},
},
}
usersChange := storage.TableChange{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"}
plan.Shards = []storage.ShardPlan{
{Namespace: "commerce", Shard: "80-", NeedsChange: true},
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "commerce", Shard: "-", NeedsChange: false},
{Namespace: "commerce", Shard: "80-", Changes: []storage.TableChange{usersChange}},
{Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{usersChange}},
{Namespace: "commerce", Shard: "-"}, // unchanged: no changes, stays out of the apply
}
cfg := testServerConfig()
cfg.Databases = map[string]DatabaseConfig{}
Expand All @@ -1963,7 +1964,7 @@ func TestCreateStoredApplyFansOutShardedPlanOperations(t *testing.T) {
controls: &memoryControlRequestStore{},
}, cfg, map[string]tern.Client{}, logger)

apply, storedApplyID, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-fanout", true)
apply, storedApplyID, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-fanout")

require.NoError(t, err)
assert.Equal(t, int64(123), storedApplyID)
Expand Down Expand Up @@ -2009,9 +2010,10 @@ func TestCreateStoredApplyFansOutShardedPlanWithFinalizerOperation(t *testing.T)
},
},
}
usersChange := storage.TableChange{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"}
plan.Shards = []storage.ShardPlan{
{Namespace: "commerce", Shard: "80-", NeedsChange: true},
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "commerce", Shard: "80-", Changes: []storage.TableChange{usersChange}},
{Namespace: "commerce", Shard: "-80", Changes: []storage.TableChange{usersChange}},
}
cfg := testServerConfig()
cfg.Databases = map[string]DatabaseConfig{}
Expand All @@ -2030,7 +2032,7 @@ func TestCreateStoredApplyFansOutShardedPlanWithFinalizerOperation(t *testing.T)
controls: &memoryControlRequestStore{},
}, cfg, map[string]tern.Client{}, logger)

_, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-finalizer", true)
_, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-sharded-finalizer")

require.NoError(t, err)
require.Len(t, applies.operations, 3)
Expand Down Expand Up @@ -2069,7 +2071,7 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) {
Artifacts: map[string]string{vSchemaArtifactName: "{\"routing\":true}"},
},
}
plan.Shards = []storage.ShardPlan{{Namespace: "commerce", Shard: "-", NeedsChange: true}}
plan.Shards = []storage.ShardPlan{{Namespace: "commerce", Shard: "-", Changes: []storage.TableChange{{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"}}}}
cfg := testServerConfig()
cfg.Databases = map[string]DatabaseConfig{}
cfg.Databases["testdb"] = DatabaseConfig{
Expand All @@ -2087,7 +2089,7 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) {
controls: &memoryControlRequestStore{},
}, cfg, map[string]tern.Client{}, logger)

_, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-finalizer-only-namespace", true)
_, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-finalizer-only-namespace")

require.NoError(t, err)
require.Len(t, applies.operations, 2)
Expand All @@ -2101,41 +2103,6 @@ func TestCreateStoredApplyDoesNotDropFinalizerOnlyNamespace(t *testing.T) {
assert.Equal(t, "users", tasks.tasks[0].TableName)
}

func TestCreateStoredApplyDoesNotShardWithoutClientOptIn(t *testing.T) {
applies := &capturingApplyStore{}
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
tasks := &capturingTaskStore{}
applies.taskStore = tasks
plan := executeApplyTestPlan()
plan.Namespaces = map[string]*storage.NamespacePlanData{
"commerce": {
Tables: []storage.TableChange{
{Namespace: "commerce", Table: "users", DDL: "ALTER TABLE `users` ADD COLUMN `email` varchar(255)", Operation: "alter"},
},
},
}
plan.Shards = []storage.ShardPlan{
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "commerce", Shard: "80-", NeedsChange: true},
}
svc := New(&mockStorageWithApplyStores{
plans: &staticPlanStore{plan: plan},
applies: applies,
tasks: tasks,
locks: &emptyLockStore{},
applyLogs: &noopApplyLogStore{},
controls: &memoryControlRequestStore{},
}, testServerConfig(), map[string]tern.Client{}, logger)

_, _, err := svc.createStoredApply(t.Context(), plan, ApplyRequest{Environment: "staging"}, nil, "apply-no-sharded-fanout", false)

require.NoError(t, err)
require.Len(t, applies.operations, 1)
assert.Empty(t, applies.operations[0].OperationKey)
require.Len(t, tasks.tasks, 1)
assert.Empty(t, tasks.tasks[0].Shard)
}

func TestValidateShardOperationKeyPartsRejectsDelimiter(t *testing.T) {
err := validateShardOperationKeyParts("commerce", "-80/80-", "users")

Expand Down
81 changes: 47 additions & 34 deletions pkg/api/plan_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/block/schemabot/pkg/schema"
"github.com/block/schemabot/pkg/state"
"github.com/block/schemabot/pkg/storage"
"github.com/block/schemabot/pkg/tern"
)

const applyOperationKeyMaxLen = 255
Expand Down Expand Up @@ -859,7 +858,7 @@ func (s *Service) queueValidatedApply(ctx context.Context, span trace.Span, plan
client.SetObserver(storedApplyID, observer)
}

applyIdentifier, storedApplyID, err := s.enqueueApply(ctx, plan, req, options, clientSupportsShardedApplyFanout(client), attachObserver)
applyIdentifier, storedApplyID, err := s.enqueueApply(ctx, plan, req, options, attachObserver)
if err != nil {
recordApplyError("enqueue apply", err)
return nil, 0, err
Expand All @@ -886,11 +885,10 @@ func (s *Service) enqueueApply(
plan *storage.Plan,
req ApplyRequest,
options map[string]string,
shardedFanoutSupported bool,
onApplyCreated func(int64),
) (string, int64, error) {
applyIdentifier := "apply-" + strings.ReplaceAll(uuid.New().String(), "-", "")[:16]
apply, storedApplyID, err := s.createStoredApply(ctx, plan, req, options, applyIdentifier, shardedFanoutSupported)
apply, storedApplyID, err := s.createStoredApply(ctx, plan, req, options, applyIdentifier)
if err != nil {
return "", 0, err
}
Expand All @@ -906,7 +904,6 @@ func (s *Service) createStoredApply(
req ApplyRequest,
options map[string]string,
applyIdentifier string,
shardedFanoutSupported bool,
) (*storage.Apply, int64, error) {
now := time.Now()
applyOpts := storage.ApplyOptionsFromMap(options)
Expand Down Expand Up @@ -961,7 +958,7 @@ func (s *Service) createStoredApply(
taskChanges := applyTaskChanges(plan)
cutoverPolicy := s.config.CutoverPolicyFor(plan.Database, req.Environment)
onFailure := s.config.OnFailure(plan.Database, req.Environment)
groups, shardedFanout, err := buildApplyOperationGroups(plan, taskChanges, targets, req.Environment, applyOpts, cutoverPolicy, onFailure, now, shardedFanoutSupported)
groups, shardedFanout, err := buildApplyOperationGroups(plan, taskChanges, targets, req.Environment, applyOpts, cutoverPolicy, onFailure, now)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -998,11 +995,6 @@ func (s *Service) createStoredApply(
return apply, storedApplyID, nil
}

func clientSupportsShardedApplyFanout(client tern.Client) bool {
capability, ok := client.(tern.ShardedApplyFanoutSupport)
return ok && capability.SupportsShardedApplyFanout()
}

// applyTaskChanges returns the per-table DDL changes that become apply tasks.
// VSchema application is no longer modeled as a synthetic task: PlanetScale
// surfaces its VSchema status/diff from engine resume metadata, and a sharded
Expand All @@ -1020,10 +1012,13 @@ func buildApplyOperationGroups(
cutoverPolicy string,
onFailure string,
now time.Time,
shardedFanoutSupported bool,
) ([]*storage.ApplyOperationWithTasks, bool, error) {
if shardedFanoutSupported && canBuildShardedOperationGroups(plan, taskChanges) {
groups, err := buildShardedApplyOperationGroups(plan, taskChanges, targets, environment, applyOpts, cutoverPolicy, onFailure, now)
// Fan a plan out per shard whenever it carries per-shard changes. Only an
// instance-local sharded engine (Strata) produces those, so an
// externally-authoritative engine (e.g. PlanetScale) — whose plans never
// carry per-shard changes — is never fanned out, regardless of transport.
if canBuildShardedOperationGroups(plan, taskChanges) {
groups, err := buildShardedApplyOperationGroups(plan, targets, environment, applyOpts, cutoverPolicy, onFailure, now)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1066,7 +1061,6 @@ func canBuildShardedOperationGroups(plan *storage.Plan, taskChanges []storage.Ta

func buildShardedApplyOperationGroups(
plan *storage.Plan,
taskChanges []storage.TableChange,
targets []routing.ExecutionTarget,
environment string,
applyOpts storage.ApplyOptions,
Expand All @@ -1075,28 +1069,46 @@ func buildShardedApplyOperationGroups(
now time.Time,
) ([]*storage.ApplyOperationWithTasks, error) {
shardsByNamespace := changingShardsByNamespace(plan.Shards)
groups := make([]*storage.ApplyOperationWithTasks, 0, len(targets)*(len(taskChanges)+1))
namespaces := make([]string, 0, len(shardsByNamespace))
for namespace := range shardsByNamespace {
namespaces = append(namespaces, namespace)
}
sort.Strings(namespaces)

groups := make([]*storage.ApplyOperationWithTasks, 0, len(targets)*(len(plan.Shards)+1))
groupsByTargetAndKey := make(map[string]*storage.ApplyOperationWithTasks)
for _, target := range targets {
for _, ddlChange := range taskChanges {
for _, shard := range shardsByNamespace[ddlChange.Namespace] {
if err := validateShardOperationKeyParts(ddlChange.Namespace, shard.Shard, ddlChange.Table); err != nil {
return nil, err
}
operationKey := shardOperationKey(ddlChange.Namespace, shard.Shard, ddlChange.Table)
if len(operationKey) > applyOperationKeyMaxLen {
return nil, fmt.Errorf("operation key for namespace %q shard %q table %q exceeds %d characters", ddlChange.Namespace, shard.Shard, ddlChange.Table, applyOperationKeyMaxLen)
}
groupKey := target.Deployment + "\x00" + operationKey
group := groupsByTargetAndKey[groupKey]
if group == nil {
group = &storage.ApplyOperationWithTasks{
Operation: newPendingApplyOperation(target, operationKey, cutoverPolicy, onFailure, now),
for _, namespace := range namespaces {
for _, shard := range shardsByNamespace[namespace] {
// Each shard is driven from its own changes; it is in
// shardsByNamespace only because those changes are non-empty.
for _, ddlChange := range shard.Changes {
Comment thread
morgo marked this conversation as resolved.
// Fail closed on a malformed per-shard change rather than building
// an operation key or engine request from empty/mismatched fields.
if strings.TrimSpace(ddlChange.Table) == "" || strings.TrimSpace(ddlChange.DDL) == "" || strings.TrimSpace(ddlChange.Operation) == "" {
return nil, fmt.Errorf("namespace %q shard %q has a change with an empty table, DDL, or operation", namespace, shard.Shard)
}
if ddlChange.Namespace != "" && ddlChange.Namespace != namespace {
return nil, fmt.Errorf("namespace %q shard %q change for table %q has mismatched namespace %q", namespace, shard.Shard, ddlChange.Table, ddlChange.Namespace)
}
if err := validateShardOperationKeyParts(namespace, shard.Shard, ddlChange.Table); err != nil {
return nil, err
}
operationKey := shardOperationKey(namespace, shard.Shard, ddlChange.Table)
if len(operationKey) > applyOperationKeyMaxLen {
return nil, fmt.Errorf("operation key for namespace %q shard %q table %q exceeds %d characters", namespace, shard.Shard, ddlChange.Table, applyOperationKeyMaxLen)
}
Comment thread
morgo marked this conversation as resolved.
groupKey := target.Deployment + "\x00" + operationKey
group := groupsByTargetAndKey[groupKey]
if group == nil {
group = &storage.ApplyOperationWithTasks{
Operation: newPendingApplyOperation(target, operationKey, cutoverPolicy, onFailure, now),
}
groupsByTargetAndKey[groupKey] = group
groups = append(groups, group)
}
groupsByTargetAndKey[groupKey] = group
groups = append(groups, group)
group.Tasks = append(group.Tasks, buildApplyTask(plan, ddlChange, environment, applyOpts, shard.Shard, now))
}
group.Tasks = append(group.Tasks, buildApplyTask(plan, ddlChange, environment, applyOpts, shard.Shard, now))
}
}
// Every namespace that changes its VSchema gets a task-less
Expand Down Expand Up @@ -1163,7 +1175,8 @@ func validateOperationKeyPart(label, value string) error {
func changingShardsByNamespace(shards []storage.ShardPlan) map[string][]storage.ShardPlan {
shardsByNamespace := make(map[string][]storage.ShardPlan)
for _, shard := range shards {
if !shard.NeedsChange {
// A shard is changing iff it carries its own changes.
if len(shard.Changes) == 0 {
continue
}
Comment thread
morgo marked this conversation as resolved.
shardsByNamespace[shard.Namespace] = append(shardsByNamespace[shard.Namespace], shard)
Expand Down
42 changes: 37 additions & 5 deletions pkg/api/proto_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,43 @@ func protoShardPlansToStorage(shards []*ternv1.ShardPlan) ([]storage.ShardPlan,
if namespace == "" {
namespace = "default"
}
out = append(out, storage.ShardPlan{
Shard: shardName,
Namespace: namespace,
NeedsChange: shard.NeedsChange,
})
sp := storage.ShardPlan{
Shard: shardName,
Namespace: namespace,
}
// Carry the shard's own changes so the apply-create fan-out can build
// per-shard tasks from per-shard DDL. This is remote/untrusted data-plane
// input over gRPC, so validate and fail closed rather than persisting
// corrupt plan_data that later builds tasks with empty/mismatched fields.
for j, ch := range shard.Changes {
if ch == nil {
return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d is null", i, namespace, shardName, j)
}
// Trim before validating and persisting: these values cross the gRPC
// boundary and later build operation keys and task rows, so preserved
// surrounding whitespace would yield surprising keys and reconciliation/
// progress mismatches.
table := strings.TrimSpace(ch.TableName)
changeDDL := strings.TrimSpace(ch.Ddl)
if table == "" || changeDDL == "" {
return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d has an empty table or DDL", i, namespace, shardName, j)
}
switch ch.ChangeType {
case ternv1.ChangeType_CHANGE_TYPE_CREATE, ternv1.ChangeType_CHANGE_TYPE_ALTER, ternv1.ChangeType_CHANGE_TYPE_DROP:
default:
return nil, fmt.Errorf("shard plan %d (namespace %q shard %q) change %d (table %q) has unsupported change type %v", i, namespace, shardName, j, table, ch.ChangeType)
}
if cn := strings.TrimSpace(ch.Namespace); cn != "" && cn != namespace {
return nil, fmt.Errorf("shard plan %d shard %q change %d (table %q) namespace %q disagrees with shard namespace %q", i, shardName, j, table, cn, namespace)
}
sp.Changes = append(sp.Changes, storage.TableChange{
Namespace: namespace,
Table: table,
DDL: changeDDL,
Operation: protoChangeTypeToOperation(ch.ChangeType),
})
}
out = append(out, sp)
}
return out, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/proto_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ func TestProtoChangesToNamespacesRejectsDuplicateDefaultNamespaces(t *testing.T)

func TestProtoShardPlansToStorage(t *testing.T) {
got, err := protoShardPlansToStorage([]*ternv1.ShardPlan{
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Shard: "80-", NeedsChange: false},
{Namespace: "commerce", Shard: "-80"},
{Shard: "80-"},
})

require.NoError(t, err)
assert.Equal(t, []storage.ShardPlan{
{Namespace: "commerce", Shard: "-80", NeedsChange: true},
{Namespace: "default", Shard: "80-", NeedsChange: false},
{Namespace: "commerce", Shard: "-80"},
{Namespace: "default", Shard: "80-"},
}, got)
}

Expand Down
Loading
Loading