diff --git a/TEMPLATES.md b/TEMPLATES.md index 76c434ce..dc5b108b 100644 --- a/TEMPLATES.md +++ b/TEMPLATES.md @@ -5710,6 +5710,204 @@ _No details available yet._ +## Sharded Apply + +### PR Comments + +
+Plan: Divergent Shards + + +## Schema Change Plan โ€” Production + +**Database**: `cdb_resolute` | **Type**: `Strata` + +*Requested by @jackjackbits at 2026-01-01 00:00:00 UTC ยท planned from [`abcdef1`](https://github.com/block/schemabot/commit/abcdef1234567890abcdef1234567890abcdef12)* + +#### Keyspace: `cdb_resolute_sharded` +Shards diverge โ€” what applies where: + +**shards `-40`, `80-c0`, `c0-`** + +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`); +``` + +**shard `40-80`** + +```sql +ALTER TABLE `mutes` + ADD INDEX `created_at`(`created_at`), + ADD COLUMN `reason` varchar(255); +``` + +๐Ÿ“‹ **Plan**: **1** table to alter + + +--- + +๐Ÿ’ก **To apply** all schema changes from this PR, comment: +``` +schemabot apply -e production +``` + +
+ +
+Plan: Unsafe Change On One Shard + + +## Schema Change Plan โ€” Production + +**Database**: `cdb_resolute` | **Type**: `Strata` + +*Requested by @jackjackbits at 2026-01-01 00:00:00 UTC ยท planned from [`abcdef1`](https://github.com/block/schemabot/commit/abcdef1234567890abcdef1234567890abcdef12)* + +#### Keyspace: `cdb_resolute_sharded` +Shards diverge โ€” what applies where: + +**shards `-40`, `80-c0`, `c0-`** + +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`); +``` + +**shard `40-80`** + +```sql +ALTER TABLE `mutes` + ADD INDEX `created_at`(`created_at`), + DROP COLUMN `legacy_reason`; +``` + +**โ›” Unsafe Changes Detected:** +- `mutes` (shard `40-80`): DROP COLUMN removes data and is irreversible + +**Destructive drop guidance:** + +Before allowing a destructive drop, first deploy application code that no longer reads from or writes to the dropped column. + +๐Ÿ“‹ **Plan**: 2 DDL statements + + +--- + +๐Ÿ’ก **To apply** all schema changes from this PR, comment: +``` +schemabot apply -e production +``` + +
+ +
+Apply In Progress + + +## Schema Change In Progress โ€” Production + +**Database**: `cdb_resolute` | **Type**: `Strata` | **Apply ID**: `apply-a1b2c3d4e5f6` + +*Applied by @jackjackbits at 2026-01-01 00:00:00 UTC* + +**Shards**: 1 running table copy, 3 waiting for -40 + +#### Keyspace `cdb_resolute_sharded` + +| Shard | Status | +| --- | --- | +| `-40` | ๐Ÿ”„ running table copy | +| `40-80` | โณ waiting for -40 | +| `80-c0` | โณ waiting for -40 | +| `c0-` | โณ waiting for -40 | + +`mutes` +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`); +``` + +_Last updated: 2026-01-01 00:00:00 UTC (2026-01-01 00:00:00 UTC)_ + +
+ +
+Apply Failed (One Shard Failed) + + +## โŒ Schema Change Failed โ€” Production + +**Database**: `cdb_resolute` | **Type**: `Strata` | **Apply ID**: `apply-a1b2c3d4e5f6` + +*Applied by @jackjackbits at 2026-01-01 00:00:00 UTC* + +**Shards**: 1 failed, 3 halted + +> โš ๏ธ **First failure:** shard -40 โ€” resolve shard primary for `-40`: context deadline exceeded + +#### Keyspace `cdb_resolute_sharded` + +| Shard | Status | +| --- | --- | +| `-40` | โŒ failed โ€” resolve shard primary for `-40`: context deadline exceeded | +| `40-80` | โธ halted โ€” -40 failed | +| `80-c0` | โธ halted โ€” -40 failed | +| `c0-` | โธ halted โ€” -40 failed | + +`mutes` +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`); +``` + +--- + +To retry: +``` +schemabot apply -e production +``` + +
+ +
+Apply With Divergent Shards + + +## Schema Change In Progress โ€” Production + +**Database**: `cdb_resolute` | **Type**: `Strata` | **Apply ID**: `apply-a1b2c3d4e5f6` + +*Applied by @jackjackbits at 2026-01-01 00:00:00 UTC* + +**Shards**: 1 running table copy, 2 waiting for -40 + +#### Keyspace `cdb_resolute_sharded` + +Shards diverge โ€” grouped by change: + +**shards `-40`, `80-c0`** + +| Shard | Status | +| --- | --- | +| `-40` | ๐Ÿ”„ running table copy | +| `80-c0` | โณ waiting for -40 | + +`mutes` +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`); +``` + +**shard `40-80`** + +| Shard | Status | +| --- | --- | +| `40-80` | โณ waiting for -40 | + +`mutes` +```sql +ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`), ADD COLUMN `reason` varchar(255); +``` + +_Last updated: 2026-01-01 00:00:00 UTC (2026-01-01 00:00:00 UTC)_ +
+ ## Multi-Deployment Apply (CLI)
diff --git a/pkg/api/proto_helpers.go b/pkg/api/proto_helpers.go index dd5b65da..1642d31e 100644 --- a/pkg/api/proto_helpers.go +++ b/pkg/api/proto_helpers.go @@ -207,6 +207,35 @@ func planResponseFromProto(resp *ternv1.PlanResponse) *apitypes.PlanResponse { }) } + // Carry per-shard changes so the plan comment can show what applies to which + // shard. The namespace-level Changes above collapse a divergent keyspace to a + // single entry; the per-shard detail is preserved only here. + for _, sp := range resp.Shards { + if sp == nil { + continue + } + apiSP := &apitypes.ShardPlanResponse{Namespace: sp.Namespace, Shard: sp.Shard} + for _, t := range sp.Changes { + if t == nil { + continue + } + apiSP.Changes = append(apiSP.Changes, &apitypes.TableChangeResponse{ + TableName: t.TableName, + Namespace: t.Namespace, + DDL: t.Ddl, + ChangeType: protoChangeTypeToOperation(t.ChangeType), + IsUnsafe: t.IsUnsafe, + UnsafeReason: t.UnsafeReason, + }) + } + // A shard is changing iff it carries changes (the proto contract); drop an + // empty shard plan so it never renders a blank shard section downstream. + if len(apiSP.Changes) == 0 { + continue + } + httpResp.Shards = append(httpResp.Shards, apiSP) + } + return httpResp } diff --git a/pkg/apitypes/apitypes.go b/pkg/apitypes/apitypes.go index 2c56ad55..82885d5c 100644 --- a/pkg/apitypes/apitypes.go +++ b/pkg/apitypes/apitypes.go @@ -205,6 +205,19 @@ type PlanResponse struct { Changes []*SchemaChangeResponse `json:"changes"` LintResults []*LintViolationResponse `json:"lint_violations"` Errors []string `json:"errors"` + // Shards carries the per-shard plan for a sharded engine: each changing shard + // and the changes it needs. The namespace-level Changes above collapse a + // keyspace to one entry, so a keyspace whose shards diverge is represented + // faithfully only here. Empty for non-sharded plans. + Shards []*ShardPlanResponse `json:"shards,omitempty"` +} + +// ShardPlanResponse is one changing shard's plan: the keyspace it belongs to and +// the table changes that shard needs. +type ShardPlanResponse struct { + Namespace string `json:"namespace,omitempty"` + Shard string `json:"shard"` + Changes []*TableChangeResponse `json:"changes,omitempty"` } // HasErrors returns true if any lint result has error severity. diff --git a/pkg/cmd/commands/preview.go b/pkg/cmd/commands/preview.go index 43b16679..617c3d3b 100644 --- a/pkg/cmd/commands/preview.go +++ b/pkg/cmd/commands/preview.go @@ -92,6 +92,7 @@ func (cmd *PreviewCmd) Run(g *Globals) error { templates.PreviewCommentMultiDeployCompleted, templates.PreviewCommentMultiDeployAll, templates.PreviewCLIMultiDeployInProgress, templates.PreviewCLIMultiDeployFailed, templates.PreviewCLIMultiDeployCompleted, templates.PreviewCLIMultiDeployAll, + templates.PreviewCommentShardedAll, templates.PreviewCommentSingleProgress, templates.PreviewCommentSingleComplete, templates.PreviewCommentSingleFailed, templates.PreviewCommentSingleStopped, templates.PreviewCommentSummaryCompleted, templates.PreviewCommentSummaryFailed, diff --git a/pkg/cmd/internal/templates/preview.go b/pkg/cmd/internal/templates/preview.go index c69df4b4..b857b1d3 100644 --- a/pkg/cmd/internal/templates/preview.go +++ b/pkg/cmd/internal/templates/preview.go @@ -146,6 +146,7 @@ const ( PreviewCLIMultiDeployFailed PreviewType = "cli_multi_deploy_failed" // Halt-on-failure: one deployment failed PreviewCLIMultiDeployCompleted PreviewType = "cli_multi_deploy_completed" // All deployments completed PreviewCLIMultiDeployAll PreviewType = "cli_multi_deploy_all" // Show all CLI multi-deployment apply previews + PreviewCommentShardedAll PreviewType = "comment_sharded_all" // Show all sharded apply + plan previews // Single-table apply comment previews (most common case) PreviewCommentSingleProgress PreviewType = "comment_single_progress" // Single table running diff --git a/pkg/cmd/internal/templates/preview_comment.go b/pkg/cmd/internal/templates/preview_comment.go index 93b13cd3..27cde3d6 100644 --- a/pkg/cmd/internal/templates/preview_comment.go +++ b/pkg/cmd/internal/templates/preview_comment.go @@ -267,6 +267,20 @@ func previewCommentMultiDeployAllOutput() { printSections(sections) } +func previewCommentShardedAllOutput() { + sections := []struct { + name string + fn func() + }{ + {"PLAN: DIVERGENT SHARDS", func() { fmt.Print(webhooktemplates.PreviewCommentShardedPlanDivergent()) }}, + {"PLAN: UNSAFE CHANGE ON ONE SHARD", func() { fmt.Print(webhooktemplates.PreviewCommentShardedPlanUnsafe()) }}, + {"APPLY IN PROGRESS", func() { fmt.Print(webhooktemplates.PreviewCommentShardedApplyInProgress()) }}, + {"APPLY FAILED (ONE SHARD FAILED)", func() { fmt.Print(webhooktemplates.PreviewCommentShardedApplyFailed()) }}, + {"APPLY WITH DIVERGENT SHARDS", func() { fmt.Print(webhooktemplates.PreviewCommentShardedApplyDivergent()) }}, + } + printSections(sections) +} + func previewCLIPlanAllOutput() { sections := []struct { name string diff --git a/pkg/cmd/internal/templates/preview_dispatch.go b/pkg/cmd/internal/templates/preview_dispatch.go index 67272580..d3397f0c 100644 --- a/pkg/cmd/internal/templates/preview_dispatch.go +++ b/pkg/cmd/internal/templates/preview_dispatch.go @@ -173,6 +173,8 @@ func PreviewCLIOutput(previewType PreviewType) { fmt.Print(webhooktemplates.PreviewCommentMultiDeploymentApplyCompleted()) case PreviewCommentMultiDeployAll: previewCommentMultiDeployAllOutput() + case PreviewCommentShardedAll: + previewCommentShardedAllOutput() case PreviewCLIMultiDeployInProgress: previewCLIMultiDeploymentApplyInProgress() case PreviewCLIMultiDeployFailed: diff --git a/pkg/tern/grpc_client.go b/pkg/tern/grpc_client.go index 753be0ed..0cd67691 100644 --- a/pkg/tern/grpc_client.go +++ b/pkg/tern/grpc_client.go @@ -1901,6 +1901,22 @@ func (c *GRPCClient) dispatchPendingApply(ctx context.Context, apply *storage.Ap return err } + // Fail closed before dispatch when a shard-scoped operation resolves no target + // shard. A shard work operation (key "namespace/shard/table") must dispatch + // exactly one shard; if its tasks carry no shard the dispatch would send an + // empty TargetShards and the data plane would reject it opaquely with + // "expected exactly one target shard, got 0". Surfacing it here โ€” as a clear + // control-plane error โ€” turns a version/data skew into an actionable message + // instead of a confusing data-plane failure. + targetShards := taskTargetShards(tasks) + if scope.operation != nil && isShardWorkOperationKey(scope.operation.OperationKey) && len(targetShards) != 1 { + errMsg := fmt.Sprintf("queued gRPC apply failed: shard operation %q resolved %d target shards, expected exactly 1 โ€” its tasks carry no shard, so refusing to dispatch (the data plane would reject with \"expected exactly one target shard, got 0\"); this indicates a version or data skew", scope.operation.OperationKey, len(targetShards)) + if markErr := c.markRemoteApplyFailed(ctx, apply, nil, errMsg, false, scope); markErr != nil { + return fmt.Errorf("mark queued gRPC apply %s failed after shard-scope guard: %w", apply.ApplyIdentifier, markErr) + } + return fmt.Errorf("queued gRPC apply %s: %s", apply.ApplyIdentifier, errMsg) + } + // Use the per-operation copy-drive options so a multi-operation barrier // deployment parks the remote engine at the cutover barrier instead of // running straight through the swap. effectiveCopyDriveOptions OR's @@ -1927,7 +1943,7 @@ func (c *GRPCClient) dispatchPendingApply(ctx context.Context, apply *storage.Ap Environment: apply.Environment, Target: target, Caller: apply.Caller, - TargetShards: taskTargetShards(tasks), + TargetShards: targetShards, IdempotencyKey: remoteApplyIdempotencyKey(apply, scope), }) if err != nil { @@ -2112,6 +2128,15 @@ func tasksToProtoTableChanges(tasks []*storage.Task) []*ternv1.TableChange { return changes } +// isShardWorkOperationKey reports whether an operation key is a sharded work +// key ("namespace/shard/table") โ€” the per-shard fan-out's unit. A whole-apply +// key (empty) and a finalizer key ("namespace/group_finalizer") are not, so the +// shard-scope guard applies only to per-shard work. +func isShardWorkOperationKey(key string) bool { + parts := strings.Split(key, "/") + return len(parts) == 3 && parts[0] != "" && parts[1] != "" && parts[2] != "" +} + func taskTargetShards(tasks []*storage.Task) []string { seen := make(map[string]struct{}) var shards []string diff --git a/pkg/tern/grpc_client_test.go b/pkg/tern/grpc_client_test.go index e070018a..f9252095 100644 --- a/pkg/tern/grpc_client_test.go +++ b/pkg/tern/grpc_client_test.go @@ -909,6 +909,45 @@ func TestGRPCClient_ResumeApplyDispatchesQueuedRemoteApply(t *testing.T) { assert.Equal(t, "staging", progressReq.Environment) } +// A shard work operation (key "namespace/shard/table") whose tasks carry no +// shard must fail closed at the control plane with a clear error, rather than +// dispatching an empty TargetShards that the data plane rejects opaquely as +// "expected exactly one target shard, got 0". This makes a version/data skew +// self-diagnosing instead of surfacing as a confusing data-plane failure. +func TestGRPCClient_ResumeApplyOperationFailsClosedOnShardOpMissingShard(t *testing.T) { + server := &capturingTernServer{remoteApplyID: "remote-skew"} + client, cleanup := testCapturingGRPCClient(t, server) + defer cleanup() + + apply := &storage.Apply{ + ID: 7, ApplyIdentifier: "apply-skew", PlanID: 99, Database: "cdb_resolute", + DatabaseType: storage.DatabaseTypeStrata, Environment: "staging", State: state.Apply.Pending, + } + apply.SetOptions(storage.ApplyOptions{Target: "cdb-resolute-target"}) + operationID := int64(42) + // The task is missing its shard โ€” the skew this guard catches. + task := &storage.Task{ + ID: 11, TaskIdentifier: "task-mutes", ApplyID: apply.ID, ApplyOperationID: &operationID, + TableName: "mutes", Shard: "", Namespace: "cdb_resolute_sharded", + DDL: "ALTER TABLE mutes ADD INDEX created_at (created_at)", DDLAction: "alter", State: state.Task.Pending, + } + client.storage = &mockStorage{ + applies: &mockApplyStore{apply: apply}, + tasks: &mockTaskStore{tasks: []*storage.Task{task}}, + plans: &mockPlanStore{plan: &storage.Plan{ID: apply.PlanID, PlanIdentifier: "plan-skew"}}, + operations: &mockApplyOperationStore{ops: map[int64]*storage.ApplyOperation{ + operationID: {ID: operationID, ApplyID: apply.ID, Deployment: "cdb-resolute-deployment", OperationKey: "cdb_resolute_sharded/-40/mutes", State: state.ApplyOperation.Pending}, + }}, + } + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + err := client.ResumeApplyOperation(ctx, apply, operationID) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected exactly one target shard") + assert.Nil(t, server.getApplyRequest(), "must not dispatch to the data plane when the shard scope is missing") +} + func TestGRPCClient_ResumeApplyOperationDispatchesScopedTasks(t *testing.T) { // An operator driver resumes a single apply_operation over the remote path. // The drive loads tasks scoped to that operation (GetByApplyOperationID) and diff --git a/pkg/webhook/multi_apply.go b/pkg/webhook/multi_apply.go index c015a35b..34fd8334 100644 --- a/pkg/webhook/multi_apply.go +++ b/pkg/webhook/multi_apply.go @@ -19,6 +19,12 @@ import ( // ApplyOperations().ListByApply); tasks are the apply's tasks across all // deployments, regrouped per operation for the multi-deployment layout. func formatApplyStatusComment(apply *storage.Apply, ops []*storage.ApplyOperation, tasks []*storage.Task, displayByOp map[int64]operationDisplay, shardsByTable map[string][]*storage.Task) string { + // A sharded apply fans out across the shards of one keyspace within a single + // deployment, so it gets the shard-unit layout rather than the deployment-unit + // one โ€” its operations differ by shard, not deployment. + if isShardedApply(ops) { + return templates.RenderShardedApplyComment(buildShardedApplyData(apply, ops, tasks)) + } if len(ops) <= 1 { return templates.RenderApplyStatusComment(buildApplyCommentData(apply, tasks, singleOpDisplay(ops, displayByOp), shardsByTable)) } @@ -36,6 +42,12 @@ func formatApplyStatusComment(apply *storage.Apply, ops []*storage.ApplyOperatio // ApplyOperations().ListByApply); tasks are the apply's tasks across all // deployments, regrouped per operation for the multi-deployment layout. func formatApplySummaryComment(apply *storage.Apply, ops []*storage.ApplyOperation, tasks []*storage.Task, displayByOp map[int64]operationDisplay, shardsByTable map[string][]*storage.Task) string { + // The sharded layout is terminal-aware (header, footer, no last-updated line), + // so the same renderer serves the terminal summary; only the deployment-unit + // path has a distinct summary renderer. + if isShardedApply(ops) { + return templates.RenderShardedApplyComment(buildShardedApplyData(apply, ops, tasks)) + } if len(ops) <= 1 { return templates.RenderApplySummaryComment(buildApplyCommentData(apply, tasks, singleOpDisplay(ops, displayByOp), shardsByTable)) } diff --git a/pkg/webhook/plan.go b/pkg/webhook/plan.go index ffb9a211..080e0508 100644 --- a/pkg/webhook/plan.go +++ b/pkg/webhook/plan.go @@ -433,6 +433,42 @@ func (h *Handler) handleSchemaRequestError(repo string, pr int, installationID i h.postComment(repo, pr, installationID, templates.RenderGenericError(data)) } +// shardedUnsafeChanges collects unsafe per-shard changes, grouped by (table, +// reason) so a change present on several shards lists them together rather than +// repeating. Returns nil when the plan carries no per-shard changes (the +// non-sharded path uses the namespace-level unsafe view instead). +func shardedUnsafeChanges(shards []*apitypes.ShardPlanResponse) []templates.UnsafeChangeData { + if len(shards) == 0 { + return nil + } + type key struct{ table, reason string } + var order []key + byKey := make(map[key]*templates.UnsafeChangeData) + for _, sp := range shards { + if sp == nil { + continue + } + for _, t := range sp.Changes { + if t == nil || !t.IsUnsafe { + continue + } + k := key{table: t.TableName, reason: t.UnsafeReason} + uc := byKey[k] + if uc == nil { + uc = &templates.UnsafeChangeData{Table: t.TableName, Reason: t.UnsafeReason} + byKey[k] = uc + order = append(order, k) + } + uc.Shards = append(uc.Shards, sp.Shard) + } + } + out := make([]templates.UnsafeChangeData, 0, len(order)) + for _, k := range order { + out = append(out, *byKey[k]) + } + return out +} + // buildPlanCommentData converts plan results into template data. func buildPlanCommentData(schema *ghclient.SchemaRequestResult, planResp *apitypes.PlanResponse, environment, tenant, requestedBy string) templates.PlanCommentData { data := templates.PlanCommentData{ @@ -446,10 +482,33 @@ func buildPlanCommentData(schema *ghclient.SchemaRequestResult, planResp *apityp IsMySQL: schema.Type == "mysql", } + // Per-shard changes, grouped by keyspace, so a sharded keyspace can show what + // applies to which shard rather than the collapsed namespace-level view. + shardsByKeyspace := make(map[string][]templates.KeyspaceShardChange) + for _, sp := range planResp.Shards { + if sp == nil { + continue + } + shard := templates.KeyspaceShardChange{Shard: sp.Shard} + for _, t := range sp.Changes { + if t == nil || t.DDL == "" { + continue + } + shard.Statements = append(shard.Statements, t.DDL) + } + // A shard with no DDL is not changing; skipping it keeps it out of the + // shard-grouped rendering and the change count. + if len(shard.Statements) == 0 { + continue + } + shardsByKeyspace[sp.Namespace] = append(shardsByKeyspace[sp.Namespace], shard) + } + // Build keyspace changes from namespace-grouped plan response for _, sc := range planResp.Changes { ksData := templates.KeyspaceChangeData{ Keyspace: sc.Namespace, + Shards: shardsByKeyspace[sc.Namespace], } for _, t := range sc.TableChanges { ksData.Statements = append(ksData.Statements, t.DDL) @@ -462,8 +521,15 @@ func buildPlanCommentData(schema *ghclient.SchemaRequestResult, planResp *apityp data.Changes = append(data.Changes, ksData) } - unsafeChanges := planResp.UnsafeChanges() - if len(unsafeChanges) > 0 { + // Unsafe changes. For a sharded plan, derive them from the per-shard changes + // so an unsafe change confined to one shard (e.g. a column drop on a single + // drifted shard) is still flagged with the shard it applies to โ€” the + // collapsed namespace-level Changes can omit it. Otherwise use the + // namespace-level view. + if unsafe := shardedUnsafeChanges(planResp.Shards); len(unsafe) > 0 { + data.HasUnsafeChanges = true + data.UnsafeChanges = unsafe + } else if unsafeChanges := planResp.UnsafeChanges(); len(unsafeChanges) > 0 { data.HasUnsafeChanges = true for _, uc := range unsafeChanges { data.UnsafeChanges = append(data.UnsafeChanges, templates.UnsafeChangeData{ diff --git a/pkg/webhook/plan_test.go b/pkg/webhook/plan_test.go index 2201df5f..23742345 100644 --- a/pkg/webhook/plan_test.go +++ b/pkg/webhook/plan_test.go @@ -17,6 +17,59 @@ import ( "github.com/block/schemabot/pkg/webhook/templates" ) +// A sharded plan response's per-shard changes are threaded into the keyspace's +// Shards, so the plan comment can render "what applies where" โ€” not just the +// collapsed namespace-level Changes. +func TestBuildPlanCommentData_CarriesPerShardChanges(t *testing.T) { + schema := &ghclient.SchemaRequestResult{Database: "cdb_resolute", Type: "strata"} + const mutes = "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`)" + const mutesDrift = "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`), ADD COLUMN `reason` varchar(255)" + planResp := &apitypes.PlanResponse{ + Changes: []*apitypes.SchemaChangeResponse{{ + Namespace: "cdb_resolute_sharded", + TableChanges: []*apitypes.TableChangeResponse{{TableName: "mutes", DDL: mutes, ChangeType: "alter"}}, + }}, + Shards: []*apitypes.ShardPlanResponse{ + {Namespace: "cdb_resolute_sharded", Shard: "-40", Changes: []*apitypes.TableChangeResponse{{TableName: "mutes", DDL: mutes, ChangeType: "alter"}}}, + {Namespace: "cdb_resolute_sharded", Shard: "40-80", Changes: []*apitypes.TableChangeResponse{{TableName: "mutes", DDL: mutesDrift, ChangeType: "alter"}}}, + }, + } + + data := buildPlanCommentData(schema, planResp, "staging", "", "testuser") + + require.Len(t, data.Changes, 1) + require.Len(t, data.Changes[0].Shards, 2, "per-shard changes are threaded into the keyspace") + assert.Equal(t, "-40", data.Changes[0].Shards[0].Shard) + assert.Equal(t, []string{mutesDrift}, data.Changes[0].Shards[1].Statements, "the drifted shard keeps its own DDL") +} + +// An unsafe change on a single shard (per-shard plan) is surfaced with its shard, +// even when the collapsed namespace-level Changes don't carry it. +func TestBuildPlanCommentData_PerShardUnsafe(t *testing.T) { + schema := &ghclient.SchemaRequestResult{Database: "cdb_resolute", Type: "strata"} + planResp := &apitypes.PlanResponse{ + Changes: []*apitypes.SchemaChangeResponse{{ + Namespace: "cdb_resolute_sharded", + TableChanges: []*apitypes.TableChangeResponse{{TableName: "mutes", DDL: "ALTER TABLE `mutes` ADD INDEX a", ChangeType: "alter"}}, + }}, + Shards: []*apitypes.ShardPlanResponse{ + {Namespace: "cdb_resolute_sharded", Shard: "-40", Changes: []*apitypes.TableChangeResponse{{TableName: "mutes", DDL: "ALTER TABLE `mutes` ADD INDEX a", ChangeType: "alter"}}}, + // One combined ALTER per table; the drifted shard's single mutes change + // also drops a column and is flagged unsafe. + {Namespace: "cdb_resolute_sharded", Shard: "40-80", Changes: []*apitypes.TableChangeResponse{ + {TableName: "mutes", DDL: "ALTER TABLE `mutes` ADD INDEX a, DROP COLUMN x", ChangeType: "alter", IsUnsafe: true, UnsafeReason: "DROP COLUMN removes data"}, + }}, + }, + } + + data := buildPlanCommentData(schema, planResp, "staging", "", "testuser") + + assert.True(t, data.HasUnsafeChanges) + require.Len(t, data.UnsafeChanges, 1) + assert.Equal(t, "mutes", data.UnsafeChanges[0].Table) + assert.Equal(t, []string{"40-80"}, data.UnsafeChanges[0].Shards, "the unsafe change is scoped to the drifted shard") +} + func TestBuildPlanCommentData_UnsafeChangesPopulated(t *testing.T) { schema := &ghclient.SchemaRequestResult{ Database: "testdb", diff --git a/pkg/webhook/sharded_apply.go b/pkg/webhook/sharded_apply.go new file mode 100644 index 00000000..498d8422 --- /dev/null +++ b/pkg/webhook/sharded_apply.go @@ -0,0 +1,241 @@ +package webhook + +import ( + "sort" + "strings" + "time" + + "github.com/block/schemabot/pkg/presentation" + "github.com/block/schemabot/pkg/state" + "github.com/block/schemabot/pkg/storage" + "github.com/block/schemabot/pkg/webhook/templates" +) + +const finalizerKeySegment = "group_finalizer" + +// parseShardOperationKey splits a sharded work operation key +// "namespace/shard/table" into its parts. ok is false for any other shape โ€” an +// empty key (a non-sharded apply) or a "namespace/group_finalizer" finalizer +// key โ€” so callers can tell shard work apart from the rest. +func parseShardOperationKey(key string) (namespace, shard, table string, ok bool) { + // Split without a limit so a key with extra segments (e.g. + // "ns/-40/table/extra") fails the exact-three-parts check rather than folding + // the remainder into the table and being misclassified as shard work. + parts := strings.Split(key, "/") + if len(parts) != 3 || parts[0] == "" || parts[1] == "" || parts[2] == "" { + return "", "", "", false + } + return parts[0], parts[1], parts[2], true +} + +// isFinalizerOperationKey reports whether the key is a "namespace/group_finalizer" +// finalizer operation key. +func isFinalizerOperationKey(key string) bool { + ns, ok := strings.CutSuffix(key, "/"+finalizerKeySegment) + return ok && ns != "" && !strings.Contains(ns, "/") +} + +// isShardedApply reports whether the apply's operations are the per-shard +// fan-out of a single keyspace within one deployment: at least one work +// operation carries a "namespace/shard/table" key, every operation is a shard +// or finalizer operation, they all share one deployment, and every shard work +// operation is in the same namespace. A non-sharded multi-deployment apply +// (empty operation keys), an apply spanning more than one deployment, and a +// multi-keyspace apply all return false, so they keep the existing layout rather +// than mislabelling โ€” the sharded layout shows a single keyspace. +func isShardedApply(ops []*storage.ApplyOperation) bool { + deployment := "" + namespace := "" + hasShard := false + for _, op := range ops { + ns, _, _, isShard := parseShardOperationKey(op.OperationKey) + if !isShard && !isFinalizerOperationKey(op.OperationKey) { + return false + } + if deployment == "" { + deployment = op.Deployment + } else if op.Deployment != deployment { + return false + } + if isShard { + if namespace == "" { + namespace = ns + } else if ns != namespace { + return false + } + hasShard = true + } + } + return hasShard +} + +// buildShardedApplyData projects the per-shard operation rows into the +// sharded-apply comment input. Each shard work operation is one (shard, table) +// cell carrying its DDL; per-shard status is derived through pkg/presentation +// with the shard name as the operation identity, so the ordering labels +// ("waiting for `-40`", "halted โ€” `-40` failed") reference shards. Finalizer +// (VSchema) operations are not shard work and are omitted from the shard view; +// their outcome is still reflected in the aggregate headline state. +func buildShardedApplyData(apply *storage.Apply, ops []*storage.ApplyOperation, tasks []*storage.Task) templates.ShardedApplyData { + tasksByOp := groupTasksByOperation(tasks) + // Tasks arrive in created_at DESC order with no id tiebreaker. Sort each + // operation's tasks by id so the joined DDL (and the change signature derived + // from it) is deterministic. In practice a (shard, table) operation has a + // single task โ€” multiple statements for one table are combined into one ALTER + // upstream โ€” but this keeps the rendering stable regardless. + for _, ts := range tasksByOp { + sort.Slice(ts, func(i, j int) bool { return ts[i].ID < ts[j].ID }) + } + + keyspace := "" + cells := make([]templates.ShardCell, 0, len(ops)) + // Group work operations by shard in resolved order so a shard with more than + // one table change (a divergent shard) collapses to one status row. + var shardOrder []string + opsByShard := make(map[string][]*storage.ApplyOperation) + for _, op := range ops { + ns, shard, table, ok := parseShardOperationKey(op.OperationKey) + if !ok { + continue + } + if keyspace == "" { + keyspace = ns + } + // An operation can carry more than one task for its (namespace, shard, + // table) โ€” a shard plan may yield multiple statements for the same table โ€” + // so join every non-empty task DDL in task order. Taking only the first + // would drop statements and corrupt the change signature used to group + // shards. + var ddls []string + for _, t := range tasksByOp[op.ID] { + if strings.TrimSpace(t.DDL) != "" { + ddls = append(ddls, t.DDL) + } + } + cells = append(cells, templates.ShardCell{Shard: shard, Table: table, DDL: strings.Join(ddls, "\n")}) + if _, seen := opsByShard[shard]; !seen { + shardOrder = append(shardOrder, shard) + } + opsByShard[shard] = append(opsByShard[shard], op) + } + + data := templates.ShardedApplyData{ + State: apply.State, + Environment: apply.Environment, + Database: apply.Database, + Keyspace: keyspace, + ApplyID: apply.ApplyIdentifier, + RequestedBy: apply.Caller, + Shards: shardStatuses(shardOrder, opsByShard, tasksByOp), + Cells: cells, + } + if apply.StartedAt != nil { + data.StartedAt = apply.StartedAt.Format(time.RFC3339) + } + if apply.CompletedAt != nil { + data.CompletedAt = apply.CompletedAt.Format(time.RFC3339) + } + return data +} + +// shardStatuses derives one status per shard. Each shard's operations are +// aggregated to a single representative state, then the shards are projected +// together through pkg/presentation (shard name as identity) so ordering labels +// reference sibling shards. +func shardStatuses(shardOrder []string, opsByShard map[string][]*storage.ApplyOperation, tasksByOp map[int64][]*storage.Task) []templates.ShardStatus { + inputs := make([]presentation.Operation, 0, len(shardOrder)) + for _, shard := range shardOrder { + shardOps := opsByShard[shard] + st, errMsg := aggregateShardState(shardOps, tasksByOp) + first := shardOps[0] + inputs = append(inputs, presentation.Operation{ + Deployment: shard, + State: st, + Barrier: first.CutoverPolicy == storage.CutoverPolicyBarrier, + Parallel: first.CutoverPolicy == storage.CutoverPolicyParallel, + HaltOnFailure: first.OnFailure != storage.OnFailureContinue, + ContinueOnFailure: first.OnFailure == storage.OnFailureContinue, + Error: errMsg, + }) + } + derived := presentation.Derive(inputs).Deployments + out := make([]templates.ShardStatus, 0, len(derived)) + for _, d := range derived { + out = append(out, templates.ShardStatus{ + Shard: d.Deployment, + Emoji: d.Emoji, + Label: d.Label, + State: d.State, + Error: d.Error, + }) + } + return out +} + +// aggregateShardState reduces a shard's operations to its most significant +// state (and that operation's error), so a shard whose tables are in different +// states shows the state an operator should act on first. A shard with a single +// operation โ€” the common case โ€” returns that operation's state unchanged. When +// the chosen operation row carries no error message (a remote failure records +// the error on the operation's tasks, and the operator may not have stamped the +// row), it falls back to the first task error so a failed shard always shows why +// โ€” otherwise the comment is silent and the operator has to dig through logs. +func aggregateShardState(ops []*storage.ApplyOperation, tasksByOp map[int64][]*storage.Task) (string, string) { + best := ops[0] + for _, op := range ops[1:] { + if shardStateRank(op.State) > shardStateRank(best.State) { + best = op + } + } + errMsg := best.ErrorMessage + if errMsg == "" { + errMsg = firstTaskError(tasksByOp[best.ID]) + } + return best.State, errMsg +} + +// firstTaskError returns the first non-empty task error for an operation. +func firstTaskError(tasks []*storage.Task) string { + for _, t := range tasks { + if t.ErrorMessage != "" { + return t.ErrorMessage + } + } + return "" +} + +// shardStateRank orders operation states by how much they demand attention, so +// aggregateShardState surfaces the most actionable one. Failure ranks highest; +// completed lowest. +func shardStateRank(s string) int { + switch s { + case state.ApplyOperation.Failed: + return 12 + case state.ApplyOperation.FailedRetryable: + return 11 + case state.ApplyOperation.Running: + return 10 + case state.ApplyOperation.CuttingOver: + return 9 + case state.ApplyOperation.WaitingForCutover: + return 8 + case state.ApplyOperation.Recovering: + return 7 + case state.ApplyOperation.Resuming: + return 6 + case state.ApplyOperation.Stopped: + return 5 + case state.ApplyOperation.RevertWindow: + return 4 + case state.ApplyOperation.Pending: + return 3 + case state.ApplyOperation.Cancelled: + return 2 + case state.ApplyOperation.Reverted: + return 1 + case state.ApplyOperation.Completed: + return 0 + default: + return 3 + } +} diff --git a/pkg/webhook/sharded_apply_test.go b/pkg/webhook/sharded_apply_test.go new file mode 100644 index 00000000..b425ba51 --- /dev/null +++ b/pkg/webhook/sharded_apply_test.go @@ -0,0 +1,164 @@ +package webhook + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/block/schemabot/pkg/state" + "github.com/block/schemabot/pkg/storage" +) + +func TestParseShardOperationKey(t *testing.T) { + ns, shard, table, ok := parseShardOperationKey("cdb_resolute_sharded/-40/mutes") + require.True(t, ok) + assert.Equal(t, "cdb_resolute_sharded", ns) + assert.Equal(t, "-40", shard) + assert.Equal(t, "mutes", table) + + for _, key := range []string{"", "cdb_resolute_sharded/group_finalizer", "deployment-only", "ns//table"} { + _, _, _, ok := parseShardOperationKey(key) + assert.False(t, ok, "key %q must not parse as a shard work key", key) + } + + assert.True(t, isFinalizerOperationKey("cdb_resolute_sharded/group_finalizer")) + assert.False(t, isFinalizerOperationKey("cdb_resolute_sharded/-40/mutes")) + assert.False(t, isFinalizerOperationKey("")) +} + +func TestIsShardedApply(t *testing.T) { + shardOp := func(shard string) *storage.ApplyOperation { + return &storage.ApplyOperation{Deployment: "cake", OperationKey: "ks/" + shard + "/mutes"} + } + finalizer := &storage.ApplyOperation{Deployment: "cake", OperationKey: "ks/group_finalizer"} + + assert.True(t, isShardedApply([]*storage.ApplyOperation{shardOp("-40"), shardOp("80-"), finalizer}), + "shard work + finalizer in one deployment is sharded") + assert.False(t, isShardedApply([]*storage.ApplyOperation{finalizer}), "a finalizer alone has no shard work") + assert.False(t, isShardedApply([]*storage.ApplyOperation{ + {Deployment: "cake", OperationKey: ""}, {Deployment: "eu", OperationKey: ""}, + }), "empty keys are a non-sharded multi-deployment apply") + assert.False(t, isShardedApply([]*storage.ApplyOperation{ + {Deployment: "cake", OperationKey: "ks/-40/mutes"}, {Deployment: "eu", OperationKey: "ks/80-/mutes"}, + }), "shards spanning deployments fall back to the deployment layout") + assert.False(t, isShardedApply([]*storage.ApplyOperation{ + {Deployment: "cake", OperationKey: "ks1/-40/mutes"}, {Deployment: "cake", OperationKey: "ks2/-40/mutes"}, + }), "shard work across multiple keyspaces falls back rather than mislabelling one keyspace") +} + +// The failed sharded apply must render the shard-unit layout AND surface the +// failed shard's error โ€” the bug was that the deployment-keyed layout collided +// per-shard details and dropped the error. +func TestFormatApplyStatusComment_ShardedFailedSurfacesError(t *testing.T) { + const failErr = "resolve shard primary for `-40`: context deadline exceeded" + started := time.Unix(1700000000, 0).UTC() + apply := &storage.Apply{ + ApplyIdentifier: "apply-f5701ad9", Database: "cdb_resolute", Environment: "staging", + State: state.Apply.Failed, Caller: "morgo", StartedAt: &started, + } + op := func(id int64, shard, opState, errMsg string) *storage.ApplyOperation { + return &storage.ApplyOperation{ + ID: id, ApplyID: 1, Deployment: "cake", + OperationKey: "cdb_resolute_sharded/" + shard + "/mutes", + State: opState, ErrorMessage: errMsg, + CutoverPolicy: storage.CutoverPolicyRolling, OnFailure: storage.OnFailureHalt, + } + } + // Resolved order: the failed shard first, so the rest derive as halted by it. + ops := []*storage.ApplyOperation{ + op(1, "-40", state.ApplyOperation.Failed, failErr), + op(2, "40-80", state.ApplyOperation.Pending, ""), + op(3, "80-c0", state.ApplyOperation.Pending, ""), + op(4, "c0-", state.ApplyOperation.Pending, ""), + } + task := func(id int64, opID int64, shard string) *storage.Task { + oid := opID + return &storage.Task{ + ID: id, ApplyID: 1, ApplyOperationID: &oid, Shard: shard, + Namespace: "cdb_resolute_sharded", TableName: "mutes", + DDL: "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`);", + } + } + tasks := []*storage.Task{task(1, 1, "-40"), task(2, 2, "40-80"), task(3, 3, "80-c0"), task(4, 4, "c0-")} + + out := formatApplyStatusComment(apply, ops, tasks, nil, nil) + + assert.Contains(t, out, "โŒ Schema Change Failed", "uses the shard-unit failed headline") + assert.Contains(t, out, "**Shards**:", "counts shards, not deployments") + assert.NotContains(t, out, "**Deployments**:", "must not use the deployment-unit layout") + assert.Contains(t, out, failErr, "the failed shard's error is surfaced (the bug fix)") + assert.Contains(t, out, "First failure:", "the failure is lifted to the top") + for _, shard := range []string{"-40", "40-80", "80-c0", "c0-"} { + assert.Contains(t, out, "`"+shard+"`", "shard %s is shown", shard) + } +} + +// A remote failure records the error on the operation's task, and the operator +// may not stamp it onto the operation row. The apply comment must still surface +// it (falling back to the task error) rather than going silent โ€” the gap that +// forced digging through Datadog. +func TestFormatApplyStatusComment_ShardedFailureFallsBackToTaskError(t *testing.T) { + const gotZero = "strata work operation expected exactly one target shard, got 0" + apply := &storage.Apply{ApplyIdentifier: "apply-x", Database: "cdb_resolute", Environment: "staging", State: state.Apply.Failed} + op := &storage.ApplyOperation{ + ID: 1, ApplyID: 1, Deployment: "cake", OperationKey: "cdb_resolute_sharded/-40/mutes", + State: state.ApplyOperation.Failed, ErrorMessage: "", // operation row carries no error + CutoverPolicy: storage.CutoverPolicyRolling, OnFailure: storage.OnFailureHalt, + } + oid := int64(1) + tasks := []*storage.Task{{ID: 1, ApplyID: 1, ApplyOperationID: &oid, Namespace: "cdb_resolute_sharded", TableName: "mutes", Shard: "-40", DDL: "ALTER ...", ErrorMessage: gotZero}} + + out := formatApplyStatusComment(apply, []*storage.ApplyOperation{op}, tasks, nil, nil) + + assert.Contains(t, out, gotZero, "the task error is surfaced when the operation row has none") +} + +// A divergent sharded apply groups shards by change signature and keeps each +// table's DDL once; the keyspace and cells come from the operation keys/tasks. +func TestBuildShardedApplyData_DivergentGroupsByTable(t *testing.T) { + apply := &storage.Apply{ApplyIdentifier: "apply-x", Database: "cdb_resolute", Environment: "staging", State: state.Apply.Running} + mk := func(id int64, key string) *storage.ApplyOperation { + return &storage.ApplyOperation{ID: id, ApplyID: 1, Deployment: "cake", OperationKey: key, State: state.ApplyOperation.Pending, CutoverPolicy: storage.CutoverPolicyRolling, OnFailure: storage.OnFailureHalt} + } + ops := []*storage.ApplyOperation{ + mk(1, "ks/-40/mutes"), + mk(2, "ks/40-80/mutes"), + mk(3, "ks/40-80/blocks"), // 40-80 diverges: it also changes blocks + } + tk := func(id, opID int64, table string) *storage.Task { + oid := opID + return &storage.Task{ID: id, ApplyID: 1, ApplyOperationID: &oid, Namespace: "ks", TableName: table, DDL: "ALTER `" + table + "`"} + } + tasks := []*storage.Task{tk(1, 1, "mutes"), tk(2, 2, "mutes"), tk(3, 3, "blocks")} + + data := buildShardedApplyData(apply, ops, tasks) + + assert.Equal(t, "ks", data.Keyspace) + require.Len(t, data.Cells, 3) + require.Len(t, data.Shards, 2, "two distinct shards, each shown once") + assert.Equal(t, "-40", data.Shards[0].Shard) + assert.Equal(t, "40-80", data.Shards[1].Shard) +} + +// Defensive: in practice a (shard, table) operation has a single task โ€” multiple +// statements for one table are combined into one ALTER upstream โ€” but if more +// than one task ever shows up, every non-empty DDL is joined in deterministic id +// order rather than dropping all but the first. +func TestBuildShardedApplyData_JoinsMultiTaskDDL(t *testing.T) { + apply := &storage.Apply{ApplyIdentifier: "apply-x", Database: "cdb_resolute", Environment: "staging", State: state.Apply.Running} + op := &storage.ApplyOperation{ID: 1, ApplyID: 1, Deployment: "cake", OperationKey: "ks/-40/mutes", State: state.ApplyOperation.Pending, CutoverPolicy: storage.CutoverPolicyRolling, OnFailure: storage.OnFailureHalt} + oid := int64(1) + tasks := []*storage.Task{ + {ID: 1, ApplyID: 1, ApplyOperationID: &oid, Namespace: "ks", TableName: "mutes", DDL: "ALTER TABLE `mutes` ADD INDEX a"}, + {ID: 2, ApplyID: 1, ApplyOperationID: &oid, Namespace: "ks", TableName: "mutes", DDL: ""}, // empty is skipped + {ID: 3, ApplyID: 1, ApplyOperationID: &oid, Namespace: "ks", TableName: "mutes", DDL: "ALTER TABLE `mutes` ADD INDEX b"}, + } + + data := buildShardedApplyData(apply, []*storage.ApplyOperation{op}, tasks) + + require.Len(t, data.Cells, 1) + assert.Equal(t, "ALTER TABLE `mutes` ADD INDEX a\nALTER TABLE `mutes` ADD INDEX b", data.Cells[0].DDL, + "all non-empty task DDLs are joined in order") +} diff --git a/pkg/webhook/templates/plan.go b/pkg/webhook/templates/plan.go index 2989d0e9..f10f8b2a 100644 --- a/pkg/webhook/templates/plan.go +++ b/pkg/webhook/templates/plan.go @@ -23,6 +23,10 @@ type LintViolationData struct { type UnsafeChangeData struct { Table string Reason string + // Shards names the shards this unsafe change applies to, for a sharded plan + // where only some shards carry it. Empty for a non-sharded change (applies to + // the whole table). + Shards []string } // PlanCommentData contains all data needed to render a plan comment. @@ -69,6 +73,18 @@ type KeyspaceChangeData struct { Statements []string VSchemaChanged bool VSchemaDiff string + + // Shards carries this keyspace's per-shard changes for a sharded plan. When + // set, the DDL is rendered per shard-group ("what applies where") instead of + // the single Statements block โ€” so a keyspace whose shards diverge is shown + // faithfully. Empty for a non-sharded keyspace. + Shards []KeyspaceShardChange +} + +// KeyspaceShardChange is one shard's planned statements within a keyspace. +type KeyspaceShardChange struct { + Shard string + Statements []string } // RenderPlanComment renders the plan comment markdown. @@ -237,9 +253,7 @@ func writeOptions(sb *strings.Builder, data PlanCommentData) { func countChanges(changes []KeyspaceChangeData) (totalStatements, keyspacesWithVSchema int) { for _, ks := range changes { - if len(ks.Statements) > 0 { - totalStatements += len(ks.Statements) - } + totalStatements += keyspaceStatementCount(ks) if ks.VSchemaChanged { keyspacesWithVSchema++ } @@ -247,6 +261,24 @@ func countChanges(changes []KeyspaceChangeData) (totalStatements, keyspacesWithV return } +// keyspaceStatementCount counts a keyspace's DDL statements for the summary and +// the no-changes short-circuit. It prefers the collapsed namespace-level +// Statements; when those are absent but the keyspace carries per-shard changes, +// it counts the distinct statements across shards, so a sharded plan whose only +// DDL is per-shard is never miscounted as "no changes". +func keyspaceStatementCount(ks KeyspaceChangeData) int { + if len(ks.Statements) > 0 { + return len(ks.Statements) + } + seen := make(map[string]struct{}) + for _, sh := range ks.Shards { + for _, stmt := range sh.Statements { + seen[stmt] = struct{}{} + } + } + return len(seen) +} + func writePlanSummary(sb *strings.Builder, data PlanCommentData, totalStatements, keyspacesWithVSchema int) { totalChanges := totalStatements + keyspacesWithVSchema if totalChanges == 0 { @@ -315,7 +347,7 @@ func writeKeyspaceChanges(sb *strings.Builder, data PlanCommentData) { for _, ks := range data.Changes { hasVSchemaChanges := ks.VSchemaChanged && !data.IsMySQL - hasDDLChanges := len(ks.Statements) > 0 + hasDDLChanges := len(ks.Statements) > 0 || len(ks.Shards) > 0 if !hasDDLChanges && !hasVSchemaChanges { continue } @@ -340,20 +372,87 @@ func writeKeyspaceChanges(sb *strings.Builder, data PlanCommentData) { } if hasDDLChanges { - sb.WriteString("```sql\n") - for i, stmt := range ks.Statements { - sb.WriteString(ddl.FormatDDL(stmt)) - if i < len(ks.Statements)-1 { - sb.WriteString("\n\n") - } else { - sb.WriteString("\n") - } + if len(ks.Shards) > 0 { + writeShardedPlanDDL(sb, ks.Shards) + } else { + writePlanDDLBlock(sb, ks.Statements) } - sb.WriteString("```\n\n") } } } +// writePlanDDLBlock writes a single fenced SQL block of statements. +func writePlanDDLBlock(sb *strings.Builder, statements []string) { + sb.WriteString("```sql\n") + for i, stmt := range statements { + sb.WriteString(ddl.FormatDDL(stmt)) + if i < len(statements)-1 { + sb.WriteString("\n\n") + } else { + sb.WriteString("\n") + } + } + sb.WriteString("```\n\n") +} + +// writeShardedPlanDDL renders a sharded keyspace's DDL grouped by change: shards +// that need the same statements share one block, so a uniform keyspace shows the +// DDL once and a divergent one shows "what applies where" โ€” each distinct change +// set with the shards it applies to. +func writeShardedPlanDDL(sb *strings.Builder, shards []KeyspaceShardChange) { + groups := groupKeyspaceShardsByStatements(shards) + if len(groups) <= 1 { + if len(groups) == 1 { + writePlanDDLBlock(sb, groups[0].Statements) + } + return + } + sb.WriteString("Shards diverge โ€” what applies where:\n\n") + for _, g := range groups { + fmt.Fprintf(sb, "**%s**\n\n", planShardList(g.Shards)) + writePlanDDLBlock(sb, g.Statements) + } +} + +type keyspaceShardGroup struct { + Shards []string + Statements []string +} + +// groupKeyspaceShardsByStatements buckets shards whose statement set is +// identical, preserving resolved order, so a uniform keyspace yields one group. +func groupKeyspaceShardsByStatements(shards []KeyspaceShardChange) []keyspaceShardGroup { + var order []string + bySig := make(map[string]*keyspaceShardGroup) + for _, s := range shards { + sig := strings.Join(s.Statements, "\x01") + g := bySig[sig] + if g == nil { + g = &keyspaceShardGroup{Statements: s.Statements} + bySig[sig] = g + order = append(order, sig) + } + g.Shards = append(g.Shards, s.Shard) + } + groups := make([]keyspaceShardGroup, 0, len(order)) + for _, sig := range order { + groups = append(groups, *bySig[sig]) + } + return groups +} + +// planShardList renders a group's shards as "shard `x`" or "shards `x`, `y`". +func planShardList(shards []string) string { + quoted := make([]string, len(shards)) + for i, s := range shards { + quoted[i] = fmt.Sprintf("`%s`", s) + } + if len(quoted) == 1 { + return "shard " + quoted[0] + } + return "shards " + strings.Join(quoted, ", ") +} + func writeUnsafeWarning(sb *strings.Builder, changes []UnsafeChangeData, allowUnsafe bool, isMySQL bool) { if allowUnsafe { sb.WriteString("**๐Ÿšจ Unsafe Changes** (`--allow-unsafe` enabled):\n") @@ -361,11 +460,15 @@ func writeUnsafeWarning(sb *strings.Builder, changes []UnsafeChangeData, allowUn sb.WriteString("**โ›” Unsafe Changes Detected:**\n") } for _, c := range changes { + table := "`" + c.Table + "`" + if len(c.Shards) > 0 { + table = fmt.Sprintf("%s (%s)", table, planShardList(c.Shards)) + } reason := ui.CleanLintReason(c.Reason) if reason != "" { - fmt.Fprintf(sb, "- `%s`: %s\n", c.Table, reason) + fmt.Fprintf(sb, "- %s: %s\n", table, reason) } else { - fmt.Fprintf(sb, "- `%s`\n", c.Table) + fmt.Fprintf(sb, "- %s\n", table) } } sb.WriteString("\n") diff --git a/pkg/webhook/templates/preview_sharded.go b/pkg/webhook/templates/preview_sharded.go new file mode 100644 index 00000000..dcb83a61 --- /dev/null +++ b/pkg/webhook/templates/preview_sharded.go @@ -0,0 +1,121 @@ +package templates + +import ( + "github.com/block/schemabot/pkg/presentation" + "github.com/block/schemabot/pkg/state" +) + +// previewShardStatuses derives per-shard statuses from sample operations using +// the shard name as the operation identity โ€” the same projection the webhook +// uses โ€” so the preview's emoji/labels match production rendering. +func previewShardStatuses(ops []presentation.Operation) []ShardStatus { + derived := presentation.Derive(ops).Deployments + out := make([]ShardStatus, 0, len(derived)) + for _, d := range derived { + out = append(out, ShardStatus{Shard: d.Deployment, Emoji: d.Emoji, Label: d.Label, State: d.State, Error: d.Error}) + } + return out +} + +const ( + previewMutesIndex = "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`);" + previewMutesIndexDrift = "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`), ADD COLUMN `reason` varchar(255);" +) + +func previewMutesCell(shard string) ShardCell { + return ShardCell{Shard: shard, Table: "mutes", DDL: previewMutesIndex} +} + +// PreviewCommentShardedApplyInProgress renders a sharded apply mid-rollout: the +// first shard copying, the rest gated behind it. +func PreviewCommentShardedApplyInProgress() string { + return RenderShardedApplyComment(ShardedApplyData{ + State: state.Apply.Running, Environment: "production", Database: "cdb_resolute", + Keyspace: "cdb_resolute_sharded", ApplyID: "apply-a1b2c3d4e5f6", RequestedBy: previewRequestedBy, + Shards: previewShardStatuses([]presentation.Operation{ + {Deployment: "-40", State: state.ApplyOperation.Running, HaltOnFailure: true}, + {Deployment: "40-80", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + {Deployment: "80-c0", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + {Deployment: "c0-", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + }), + Cells: []ShardCell{previewMutesCell("-40"), previewMutesCell("40-80"), previewMutesCell("80-c0"), previewMutesCell("c0-")}, + }) +} + +// PreviewCommentShardedApplyFailed renders a sharded apply where one shard failed +// and the rest halted behind it, with the failed shard's error surfaced. +func PreviewCommentShardedApplyFailed() string { + return RenderShardedApplyComment(ShardedApplyData{ + State: state.Apply.Failed, Environment: "production", Database: "cdb_resolute", + Keyspace: "cdb_resolute_sharded", ApplyID: "apply-a1b2c3d4e5f6", RequestedBy: previewRequestedBy, + Shards: previewShardStatuses([]presentation.Operation{ + {Deployment: "-40", State: state.ApplyOperation.Failed, HaltOnFailure: true, Error: "resolve shard primary for `-40`: context deadline exceeded"}, + {Deployment: "40-80", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + {Deployment: "80-c0", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + {Deployment: "c0-", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + }), + Cells: []ShardCell{previewMutesCell("-40"), previewMutesCell("40-80"), previewMutesCell("80-c0"), previewMutesCell("c0-")}, + }) +} + +// PreviewCommentShardedApplyDivergent renders a sharded apply whose shards +// diverged (one shard's combined ALTER also adds a column), grouped by change. +func PreviewCommentShardedApplyDivergent() string { + return RenderShardedApplyComment(ShardedApplyData{ + State: state.Apply.Running, Environment: "production", Database: "cdb_resolute", + Keyspace: "cdb_resolute_sharded", ApplyID: "apply-a1b2c3d4e5f6", RequestedBy: previewRequestedBy, + Shards: previewShardStatuses([]presentation.Operation{ + {Deployment: "-40", State: state.ApplyOperation.Running, HaltOnFailure: true}, + {Deployment: "40-80", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + {Deployment: "80-c0", State: state.ApplyOperation.Pending, HaltOnFailure: true}, + }), + Cells: []ShardCell{ + previewMutesCell("-40"), + {Shard: "40-80", Table: "mutes", DDL: previewMutesIndexDrift}, + previewMutesCell("80-c0"), + }, + }) +} + +// PreviewCommentShardedPlanDivergent renders a sharded plan whose shards diverge, +// showing "what applies where". +func PreviewCommentShardedPlanDivergent() string { + idx := "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`)" + drift := "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`), ADD COLUMN `reason` varchar(255)" + return RenderPlanComment(PlanCommentData{ + Database: "cdb_resolute", Environment: "production", DatabaseType: "strata", + HeadSHA: previewHeadSHA, Repository: previewRepository, RequestedBy: previewRequestedBy, + Changes: []KeyspaceChangeData{{ + Keyspace: "cdb_resolute_sharded", + Statements: []string{idx}, + Shards: []KeyspaceShardChange{ + {Shard: "-40", Statements: []string{idx}}, + {Shard: "80-c0", Statements: []string{idx}}, + {Shard: "c0-", Statements: []string{idx}}, + {Shard: "40-80", Statements: []string{drift}}, + }, + }}, + }) +} + +// PreviewCommentShardedPlanUnsafe renders a sharded plan where one shard's +// combined ALTER drops a column (unsafe), flagged with the shard. +func PreviewCommentShardedPlanUnsafe() string { + idx := "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`)" + drop := "ALTER TABLE `mutes` ADD INDEX `created_at`(`created_at`), DROP COLUMN `legacy_reason`" + return RenderPlanComment(PlanCommentData{ + Database: "cdb_resolute", Environment: "production", DatabaseType: "strata", + HeadSHA: previewHeadSHA, Repository: previewRepository, RequestedBy: previewRequestedBy, + HasUnsafeChanges: true, + UnsafeChanges: []UnsafeChangeData{{Table: "mutes", Reason: "DROP COLUMN removes data and is irreversible", Shards: []string{"40-80"}}}, + Changes: []KeyspaceChangeData{{ + Keyspace: "cdb_resolute_sharded", + Shards: []KeyspaceShardChange{ + {Shard: "-40", Statements: []string{idx}}, + {Shard: "80-c0", Statements: []string{idx}}, + {Shard: "c0-", Statements: []string{idx}}, + {Shard: "40-80", Statements: []string{drop}}, + }, + }}, + }) +} diff --git a/pkg/webhook/templates/sharded_apply.go b/pkg/webhook/templates/sharded_apply.go new file mode 100644 index 00000000..836ac2f4 --- /dev/null +++ b/pkg/webhook/templates/sharded_apply.go @@ -0,0 +1,282 @@ +package templates + +import ( + "fmt" + "html" + "strings" + + "github.com/block/schemabot/pkg/state" +) + +// ShardedApplyData is the input to the sharded-apply comment: an apply that fans +// out across the shards of a single keyspace within one deployment. Its unit of +// work is one operation per (shard, table). Shards are grouped by their change +// signature: an apply whose shards all need the same change renders one status +// table with the DDL once, and an apply whose shards diverge โ€” different tables, +// or the same table computing to different DDL because shards drifted โ€” renders +// one group per distinct change set, each tying its shards' statuses to the DDL +// it runs. This is distinct from the multi-deployment comment, whose unit is the +// deployment. +type ShardedApplyData struct { + // State is the aggregate apply state (state.Apply.*), driving the headline. + State string + + Environment string + Database string + Keyspace string + ApplyID string + RequestedBy string + StartedAt string + CompletedAt string + + // Shards is the per-shard rollup in resolved order: one entry per shard with + // its aggregate state. It drives the count histogram, each group's status + // rows, and the first-failure callout. + Shards []ShardStatus + + // Cells is one entry per (shard, table) operation โ€” the unit that carries the + // DDL and defines a shard's change signature for grouping. + Cells []ShardCell +} + +// ShardStatus is one shard's aggregate status. Emoji/Label come from the same +// per-operation projection the multi-deployment comment uses, so the vocabulary +// is identical; only the unit (shard vs deployment) differs. +type ShardStatus struct { + Shard string + Emoji string + Label string + State string + Error string +} + +// ShardCell is one (shard, table) operation: the DDL for that table on that +// shard. Cells with the same (table, DDL) set across shards group those shards +// together. +type ShardCell struct { + Shard string + Table string + DDL string +} + +// ShardChange is one table's DDL within a group, shown once for the whole group. +type ShardChange struct { + Table string + DDL string +} + +// shardGroup is a set of shards that share an identical change signature, with +// the changes they all apply. +type shardGroup struct { + Shards []ShardStatus + Changes []ShardChange +} + +// RenderShardedApplyComment renders the PR comment for a sharded apply: the +// shared apply header and metadata, a per-shard count histogram, the first +// failed shard's error lifted to the top, then the shards grouped by change +// signature โ€” a single group renders one status table with the DDL once; more +// than one renders a labelled group per distinct change set. +func RenderShardedApplyComment(data ShardedApplyData) string { + var sb strings.Builder + renderedAt := currentTimestamp() + + writeApplyHeader(&sb, ApplyStatusCommentData{State: data.State, Environment: data.Environment}) + writeShardedMetadata(&sb, data, renderedAt) + + writeShardCounts(&sb, data.Shards) + writeShardFirstFailure(&sb, data.Shards) + + fmt.Fprintf(&sb, "\n#### Keyspace `%s`\n", data.Keyspace) + groups := groupShardsBySignature(data.Shards, data.Cells) + if len(groups) <= 1 { + writeShardStatusTable(&sb, data.Shards) + if len(groups) == 1 { + writeGroupDDL(&sb, groups[0].Changes) + } + } else { + sb.WriteString("\nShards diverge โ€” grouped by change:\n") + for _, g := range groups { + fmt.Fprintf(&sb, "\n**%s**\n", shardList(g.Shards)) + writeShardStatusTable(&sb, g.Shards) + writeGroupDDL(&sb, g.Changes) + } + } + + writeShardedFooter(&sb, data) + if !state.IsTerminalApplyState(data.State) { + writeLastUpdatedFooter(&sb, renderedAt) + } + return sb.String() +} + +func writeShardedMetadata(sb *strings.Builder, data ShardedApplyData, renderedAt string) { + parts := []string{ + fmt.Sprintf("**Database**: `%s`", data.Database), + "**Type**: `Strata`", + fmt.Sprintf("**Apply ID**: `%s`", data.ApplyID), + } + fmt.Fprintf(sb, "%s\n", strings.Join(parts, " | ")) + attributionAt := renderedAt + if data.RequestedBy == "" { + attributionAt = startedAtDisplay(data.StartedAt, renderedAt) + } + writeAppliedByOrTimestampAt(sb, data.RequestedBy, attributionAt) +} + +// groupShardsBySignature buckets shards whose change set is identical. The +// signature is the ordered (table, DDL) pairs the shard applies, so shards +// needing different tables โ€” or the same table with different DDL โ€” fall into +// different groups. Groups and the shards within them keep resolved order; a +// uniform apply yields exactly one group. +func groupShardsBySignature(shards []ShardStatus, cells []ShardCell) []shardGroup { + changesByShard := make(map[string][]ShardChange, len(shards)) + for _, c := range cells { + changesByShard[c.Shard] = append(changesByShard[c.Shard], ShardChange{Table: c.Table, DDL: c.DDL}) + } + + var order []string + bySig := make(map[string]*shardGroup) + for _, s := range shards { + changes := changesByShard[s.Shard] + sig := signatureOf(changes) + g := bySig[sig] + if g == nil { + g = &shardGroup{Changes: changes} + bySig[sig] = g + order = append(order, sig) + } + g.Shards = append(g.Shards, s) + } + + groups := make([]shardGroup, 0, len(order)) + for _, sig := range order { + groups = append(groups, *bySig[sig]) + } + return groups +} + +// signatureOf builds the change-set key for a shard from its ordered changes. +func signatureOf(changes []ShardChange) string { + parts := make([]string, len(changes)) + for i, c := range changes { + parts[i] = c.Table + "\x00" + c.DDL + } + return strings.Join(parts, "\x01") +} + +// shardList renders a group's shards as "shard `x`" or "shards `x`, `y`". +func shardList(shards []ShardStatus) string { + names := make([]string, len(shards)) + for i, s := range shards { + names[i] = fmt.Sprintf("`%s`", s.Shard) + } + if len(names) == 1 { + return "shard " + names[0] + } + return "shards " + strings.Join(names, ", ") +} + +// writeShardCounts writes the per-status histogram across shards so rollout +// health is visible at a glance โ€” the shard-unit analogue of the +// multi-deployment "Deployments:" line. +func writeShardCounts(sb *strings.Builder, shards []ShardStatus) { + if len(shards) == 0 { + return + } + order := make([]string, 0, len(shards)) + counts := make(map[string]int, len(shards)) + for _, s := range shards { + label := shardCountLabel(s) + if _, seen := counts[label]; !seen { + order = append(order, label) + } + counts[label]++ + } + parts := make([]string, 0, len(order)) + for _, label := range order { + parts = append(parts, fmt.Sprintf("%d %s", counts[label], label)) + } + fmt.Fprintf(sb, "\n**Shards**: %s\n", strings.Join(parts, ", ")) +} + +// shardCountLabel collapses a shard's full label to its leading state word +// ("halted โ€” โ€ฆ" โ†’ "halted") for the histogram. +func shardCountLabel(s ShardStatus) string { + if i := strings.Index(s.Label, " โ€” "); i >= 0 { + return s.Label[:i] + } + return s.Label +} + +// isShardFailureState reports whether a shard's state carries an operator-facing +// error to surface โ€” a terminal failure or an automatic retry after one. The +// retry case matters because SchemaBot holds the apply in failed_retryable while +// it retries, and the operator still needs to see what went wrong. +func isShardFailureState(opState string) bool { + return opState == state.ApplyOperation.Failed || opState == state.ApplyOperation.FailedRetryable +} + +// writeShardFirstFailure lifts the first failed shard's error to the top so an +// operator sees the cause without scanning the table. Renders nothing when no +// shard has failed or is retrying after a failure. +func writeShardFirstFailure(sb *strings.Builder, shards []ShardStatus) { + for _, s := range shards { + if !isShardFailureState(s.State) { + continue + } + shard := html.EscapeString(s.Shard) + if s.Error == "" { + fmt.Fprintf(sb, "\n> โš ๏ธ **First failure:** shard %s\n", shard) + } else { + fmt.Fprintf(sb, "\n> โš ๏ธ **First failure:** shard %s โ€” %s\n", shard, html.EscapeString(s.Error)) + } + return + } +} + +// writeShardStatusTable renders the per-shard status table for a set of shards. +func writeShardStatusTable(sb *strings.Builder, shards []ShardStatus) { + if len(shards) == 0 { + return + } + sb.WriteString("\n| Shard | Status |\n| --- | --- |\n") + for _, s := range shards { + fmt.Fprintf(sb, "| `%s` | %s |\n", s.Shard, shardStatusCell(s)) + } +} + +// writeGroupDDL writes a group's table changes, each DDL once. +func writeGroupDDL(sb *strings.Builder, changes []ShardChange) { + for _, c := range changes { + fmt.Fprintf(sb, "\n`%s`\n```sql\n%s\n```\n", c.Table, c.DDL) + } +} + +// shardStatusCell renders one shard's "