Skip to content

Commit 66ce999

Browse files
committed
sql: extend SST writer support to legacy schema changer backfill
Previously, SST writer support during index backfill with distributed merge was only available in the declarative schema changer (added in #158456). This commit extends the same logic to the legacy schema changer. To avoid duplication, shared logic was extracted into a helper function reused by both. Informs #158378 Epic: CRDB-48845 Release note: none
1 parent 2bf30c4 commit 66ce999

File tree

10 files changed

+372
-120
lines changed

10 files changed

+372
-120
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,10 @@ message KeyVisualizerProgress {}
930930

931931
message ResumeSpanList {
932932
repeated roachpb.Span resume_spans = 1 [(gogoproto.nullable) = false];
933+
repeated IndexBackfillSSTManifest sst_manifests = 2 [
934+
(gogoproto.nullable) = false,
935+
(gogoproto.customname) = "SSTManifests"
936+
];
933937
}
934938

935939
enum Status {

pkg/sql/backfill.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
5151
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
5252
"github.com/cockroachdb/errors"
53+
gogotypes "github.com/gogo/protobuf/types"
5354
)
5455

5556
const (
@@ -943,12 +944,15 @@ func (sc *SchemaChanger) distIndexBackfill(
943944
) error {
944945
// Gather the initial resume spans for the table.
945946
var todoSpans []roachpb.Span
947+
var resumeManifests []jobspb.IndexBackfillSSTManifest
946948
var mutationIdx int
949+
jobDetails := sc.job.Details().(jobspb.SchemaChangeDetails)
950+
useDistributedMerge := jobDetails.DistributedMergeMode == jobspb.IndexBackfillDistributedMergeMode_Enabled
947951

948952
if err := sc.txn(ctx, func(
949953
ctx context.Context, txn descs.Txn,
950954
) (err error) {
951-
todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans(
955+
todoSpans, resumeManifests, _, mutationIdx, err = rowexec.GetResumeSpansAndSSTManifests(
952956
ctx, sc.jobRegistry, txn, sc.execCfg.Codec, txn.Descriptors(), sc.descID,
953957
sc.mutationID, filter,
954958
)
@@ -963,7 +967,9 @@ func (sc *SchemaChanger) distIndexBackfill(
963967
return nil
964968
}
965969

966-
writeAsOf := sc.job.Details().(jobspb.SchemaChangeDetails).WriteTimestamp
970+
manifestBuf := backfill.NewSSTManifestBuffer(resumeManifests)
971+
972+
writeAsOf := jobDetails.WriteTimestamp
967973
if writeAsOf.IsEmpty() {
968974
status := jobs.StatusMessage("scanning target index for in-progress transactions")
969975
if err := sc.job.NoTxn().UpdateStatusMessage(ctx, status); err != nil {
@@ -1044,8 +1050,7 @@ func (sc *SchemaChanger) distIndexBackfill(
10441050
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
10451051
chunkSize := sc.getChunkSize(indexBatchSize)
10461052
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
1047-
if details, ok := sc.job.Details().(jobspb.SchemaChangeDetails); ok &&
1048-
details.DistributedMergeMode == jobspb.IndexBackfillDistributedMergeMode_Enabled {
1053+
if useDistributedMerge {
10491054
backfill.EnableDistributedMergeIndexBackfillSink(sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec)
10501055
}
10511056
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
@@ -1076,6 +1081,18 @@ func (sc *SchemaChanger) distIndexBackfill(
10761081
copy(mu.updatedTodoSpans, todoSpans)
10771082
}()
10781083

1084+
if useDistributedMerge {
1085+
var mapProgress execinfrapb.IndexBackfillMapProgress
1086+
if gogotypes.Is(&meta.BulkProcessorProgress.ProgressDetails, &mapProgress) {
1087+
if err := gogotypes.UnmarshalAny(&meta.BulkProcessorProgress.ProgressDetails, &mapProgress); err != nil {
1088+
return err
1089+
}
1090+
if len(mapProgress.SSTManifests) > 0 {
1091+
manifestBuf.Append(mapProgress.SSTManifests)
1092+
}
1093+
}
1094+
}
1095+
10791096
if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails {
10801097
if err := updateJobDetails(); err != nil {
10811098
return err
@@ -1156,15 +1173,28 @@ func (sc *SchemaChanger) distIndexBackfill(
11561173
var updateJobMu syncutil.Mutex
11571174
updateJobDetails = func() error {
11581175
updatedTodoSpans := getTodoSpansForUpdate()
1176+
manifestDirty := manifestBuf.Dirty()
11591177
return sc.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
11601178
updateJobMu.Lock()
11611179
defer updateJobMu.Unlock()
1162-
// No processor has returned completed spans yet.
1163-
if updatedTodoSpans == nil {
1180+
// No processor has returned completed spans yet and no new manifests.
1181+
if updatedTodoSpans == nil && !manifestDirty {
11641182
return nil
11651183
}
1166-
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", updatedTodoSpans)
1167-
return rowexec.SetResumeSpansInJob(ctx, updatedTodoSpans, mutationIdx, txn, sc.job)
1184+
spansToPersist := updatedTodoSpans
1185+
if spansToPersist == nil {
1186+
spansToPersist = todoSpans
1187+
}
1188+
var manifestSnapshot []jobspb.IndexBackfillSSTManifest
1189+
if manifestDirty {
1190+
manifestSnapshot = manifestBuf.SnapshotAndMarkClean()
1191+
} else {
1192+
manifestSnapshot = manifestBuf.Snapshot()
1193+
}
1194+
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", spansToPersist)
1195+
return rowexec.SetResumeSpansAndSSTManifestsInJob(
1196+
ctx, &sc.execCfg.Codec, spansToPersist, manifestSnapshot, mutationIdx, txn, sc.job,
1197+
)
11681198
})
11691199
}
11701200

pkg/sql/backfill/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"distributed_merge_mode.go",
88
"index_backfiller_cols.go",
99
"mvcc_index_merger.go",
10+
"sst_manifest.go",
1011
],
1112
importpath = "github.com/cockroachdb/cockroach/pkg/sql/backfill",
1213
visibility = ["//visibility:public"],

pkg/sql/backfill/sst_manifest.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package backfill
7+
8+
import (
9+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
10+
"github.com/cockroachdb/cockroach/pkg/keys"
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
13+
)
14+
15+
// SSTManifestBuffer accumulates SST metadata emitted during an index backfill
16+
// map stage. Callers may append new manifests and snapshot the buffer when
17+
// persisting checkpoints so they avoid copying slices manually throughout the
18+
// ingestion pipeline.
19+
type SSTManifestBuffer struct {
20+
syncutil.Mutex
21+
manifests []jobspb.IndexBackfillSSTManifest
22+
dirty bool
23+
}
24+
25+
// NewSSTManifestBuffer constructs a buffer initialized with the provided
26+
// manifests.
27+
func NewSSTManifestBuffer(initial []jobspb.IndexBackfillSSTManifest) *SSTManifestBuffer {
28+
buf := &SSTManifestBuffer{}
29+
buf.manifests = append(buf.manifests, initial...)
30+
if len(initial) > 0 {
31+
buf.dirty = true
32+
}
33+
return buf
34+
}
35+
36+
func (b *SSTManifestBuffer) snapshotLocked() []jobspb.IndexBackfillSSTManifest {
37+
return append([]jobspb.IndexBackfillSSTManifest(nil), b.manifests...)
38+
}
39+
40+
// Snapshot returns a copy of the buffered manifests.
41+
func (b *SSTManifestBuffer) Snapshot() []jobspb.IndexBackfillSSTManifest {
42+
b.Lock()
43+
defer b.Unlock()
44+
return b.snapshotLocked()
45+
}
46+
47+
// Append adds the provided manifests to the buffer and returns the updated
48+
// snapshot.
49+
func (b *SSTManifestBuffer) Append(
50+
newManifests []jobspb.IndexBackfillSSTManifest,
51+
) []jobspb.IndexBackfillSSTManifest {
52+
if len(newManifests) == 0 {
53+
return b.Snapshot()
54+
}
55+
b.Lock()
56+
defer b.Unlock()
57+
b.manifests = append(b.manifests, newManifests...)
58+
b.dirty = true
59+
return b.snapshotLocked()
60+
}
61+
62+
// Dirty reports whether the buffer has accumulated manifests that have not been
63+
// snapshotted since the last call to SnapshotAndMarkClean.
64+
func (b *SSTManifestBuffer) Dirty() bool {
65+
if b == nil {
66+
return false
67+
}
68+
b.Lock()
69+
defer b.Unlock()
70+
return b.dirty
71+
}
72+
73+
// SnapshotAndMarkClean returns a copy of the buffered manifests and clears the
74+
// dirty flag. Callers should invoke this once they have persisted the snapshot.
75+
func (b *SSTManifestBuffer) SnapshotAndMarkClean() []jobspb.IndexBackfillSSTManifest {
76+
if b == nil {
77+
return nil
78+
}
79+
b.Lock()
80+
defer b.Unlock()
81+
b.dirty = false
82+
return b.snapshotLocked()
83+
}
84+
85+
// StripTenantPrefixFromSSTManifests normalizes SST manifest metadata by
86+
// removing tenant prefixes before persisting it in job state. This matches the
87+
// CompletedSpans handling and keeps job progress tenant-agnostic.
88+
func StripTenantPrefixFromSSTManifests(
89+
codec keys.SQLCodec, manifests []jobspb.IndexBackfillSSTManifest,
90+
) ([]jobspb.IndexBackfillSSTManifest, error) {
91+
ret := make([]jobspb.IndexBackfillSSTManifest, len(manifests))
92+
for i, out := range manifests {
93+
ret[i].URI = out.URI
94+
ret[i].FileSize = out.FileSize
95+
if out.WriteTimestamp != nil {
96+
ts := *out.WriteTimestamp
97+
ret[i].WriteTimestamp = &ts
98+
}
99+
if out.Span != nil {
100+
stripped, err := stripTenantPrefixFromSpans(codec, *out.Span)
101+
if err != nil {
102+
return nil, err
103+
}
104+
ret[i].Span = &stripped
105+
}
106+
if len(out.RowSample) > 0 {
107+
key, err := codec.StripTenantPrefix(out.RowSample)
108+
if err != nil {
109+
return nil, err
110+
}
111+
ret[i].RowSample = append(roachpb.Key(nil), key...)
112+
}
113+
}
114+
return ret, nil
115+
}
116+
117+
// AddTenantPrefixToSSTManifests rehydrates manifests with the executing
118+
// processor's tenant prefix so they can be used against the local keyspace.
119+
func AddTenantPrefixToSSTManifests(
120+
codec keys.SQLCodec, manifests []jobspb.IndexBackfillSSTManifest,
121+
) []jobspb.IndexBackfillSSTManifest {
122+
ret := make([]jobspb.IndexBackfillSSTManifest, len(manifests))
123+
for i, out := range manifests {
124+
ret[i].URI = out.URI
125+
ret[i].FileSize = out.FileSize
126+
if out.WriteTimestamp != nil {
127+
ts := *out.WriteTimestamp
128+
ret[i].WriteTimestamp = &ts
129+
}
130+
if out.Span != nil {
131+
sp := addTenantPrefixToSpan(codec, *out.Span)
132+
ret[i].Span = &sp
133+
}
134+
if len(out.RowSample) > 0 {
135+
prefixed := make(roachpb.Key, 0, len(codec.TenantPrefix())+len(out.RowSample))
136+
prefixed = append(prefixed, codec.TenantPrefix()...)
137+
prefixed = append(prefixed, out.RowSample...)
138+
ret[i].RowSample = prefixed
139+
}
140+
}
141+
return ret
142+
}
143+
144+
func stripTenantPrefixFromSpans(codec keys.SQLCodec, span roachpb.Span) (roachpb.Span, error) {
145+
var err error
146+
var out roachpb.Span
147+
if out.Key, err = codec.StripTenantPrefix(span.Key); err != nil {
148+
return roachpb.Span{}, err
149+
}
150+
if out.EndKey, err = codec.StripTenantPrefix(span.EndKey); err != nil {
151+
return roachpb.Span{}, err
152+
}
153+
return out, nil
154+
}
155+
156+
func addTenantPrefixToSpan(codec keys.SQLCodec, span roachpb.Span) roachpb.Span {
157+
prefix := codec.TenantPrefix()
158+
prefix = prefix[:len(prefix):len(prefix)]
159+
out := roachpb.Span{
160+
Key: append(prefix, span.Key...),
161+
EndKey: append(prefix, span.EndKey...),
162+
}
163+
return out
164+
}

pkg/sql/index_backfiller.go

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,39 +36,6 @@ type IndexBackfillPlanner struct {
3636
execCfg *ExecutorConfig
3737
}
3838

39-
type sstManifestBuffer struct {
40-
syncutil.Mutex
41-
manifests []jobspb.IndexBackfillSSTManifest
42-
}
43-
44-
func newSSTManifestBuffer(initial []jobspb.IndexBackfillSSTManifest) *sstManifestBuffer {
45-
buf := &sstManifestBuffer{}
46-
buf.manifests = append(buf.manifests, initial...)
47-
return buf
48-
}
49-
50-
func (b *sstManifestBuffer) snapshotLocked() []jobspb.IndexBackfillSSTManifest {
51-
return append([]jobspb.IndexBackfillSSTManifest(nil), b.manifests...)
52-
}
53-
54-
func (b *sstManifestBuffer) Snapshot() []jobspb.IndexBackfillSSTManifest {
55-
b.Lock()
56-
defer b.Unlock()
57-
return b.snapshotLocked()
58-
}
59-
60-
func (b *sstManifestBuffer) Append(
61-
newManifests []jobspb.IndexBackfillSSTManifest,
62-
) []jobspb.IndexBackfillSSTManifest {
63-
if len(newManifests) == 0 {
64-
return b.Snapshot()
65-
}
66-
b.Lock()
67-
defer b.Unlock()
68-
b.manifests = append(b.manifests, newManifests...)
69-
return b.snapshotLocked()
70-
}
71-
7239
// NewIndexBackfiller creates a new IndexBackfillPlanner.
7340
func NewIndexBackfiller(execCfg *ExecutorConfig) *IndexBackfillPlanner {
7441
return &IndexBackfillPlanner{execCfg: execCfg}
@@ -117,7 +84,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
11784
}
11885
// Add spans that were already completed before the job resumed.
11986
addCompleted(progress.CompletedSpans...)
120-
sstManifestBuf := newSSTManifestBuffer(progress.SSTManifests)
87+
sstManifestBuf := backfill.NewSSTManifestBuffer(progress.SSTManifests)
12188
progress.SSTManifests = sstManifestBuf.Snapshot()
12289
updateSSTManifests := func(newManifests []jobspb.IndexBackfillSSTManifest) {
12390
progress.SSTManifests = sstManifestBuf.Append(newManifests)

0 commit comments

Comments
 (0)