Skip to content

Commit 2bf30c4

Browse files
craig[bot]spilchen
andcommitted
Merge #158339
158339: sql/bulkingest: add IngestFileProcessor for SST ingestion r=spilchen a=spilchen This introduces the IngestFileProcessor, a new DistSQL porcessor that ingests SST files directly into the KV layer. This is part of the new distributed merge pipeline. The SSTs will be provided as input from an earlier stage in the pipeline (merge processor). The processor is registered but not yet used by any SQL operations. Future work will integrate it into the distribute merge pipeline. Epic: CRDB-48845 Fixes: #156659 Release note: none Co-authored by: `@jeffswenson` Co-authored-by: Matt Spilchen <[email protected]>
2 parents 758c22d + 0883feb commit 2bf30c4

File tree

15 files changed

+887
-18
lines changed

15 files changed

+887
-18
lines changed

pkg/cloud/nodelocal/nodelocal_storage.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ func makeLocalFileStorage(
110110
return nil, errors.New("nodelocal storage is not available")
111111
}
112112
cfg := dest.LocalFileConfig
113-
if cfg.Path == "" {
114-
return nil, errors.Errorf("local storage requested but path not provided")
115-
}
116113
client, err := args.BlobClientFactory(ctx, cfg.NodeID)
117114
if err != nil {
118115
return nil, errors.Wrap(err, "failed to create blob client")

pkg/sql/bulkingest/BUILD.bazel

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,47 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "bulkingest",
55
srcs = [
6+
"ingest.go",
7+
"ingest_file_processor.go",
8+
"plan_ingest.go",
69
"split.go",
710
"split_picker.go",
811
],
912
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkingest",
1013
visibility = ["//visibility:public"],
1114
deps = [
15+
"//pkg/cloud",
16+
"//pkg/clusterversion",
1217
"//pkg/keys",
1318
"//pkg/kv",
1419
"//pkg/roachpb",
20+
"//pkg/settings",
21+
"//pkg/sql",
22+
"//pkg/sql/bulkutil",
23+
"//pkg/sql/execinfra",
1524
"//pkg/sql/execinfrapb",
25+
"//pkg/sql/physicalplan",
26+
"//pkg/sql/rowexec",
27+
"//pkg/sql/sem/tree",
28+
"//pkg/sql/types",
29+
"//pkg/util/ctxgroup",
30+
"//pkg/util/errorutil/unimplemented",
31+
"//pkg/util/hlc",
32+
"//pkg/util/ioctx",
1633
"//pkg/util/log",
34+
"//pkg/util/mon",
35+
"//pkg/util/tracing",
1736
"@com_github_cockroachdb_errors//:errors",
1837
],
1938
)
2039

2140
go_test(
2241
name = "bulkingest_test",
2342
srcs = [
43+
"ingest_file_processor_test.go",
44+
"ingest_test.go",
2445
"main_test.go",
46+
"plan_ingest_test.go",
2547
"split_picker_test.go",
2648
"split_test.go",
2749
],
@@ -32,14 +54,26 @@ go_test(
3254
}),
3355
deps = [
3456
"//pkg/base",
57+
"//pkg/cloud/externalconn/providers",
3558
"//pkg/keys",
3659
"//pkg/kv/kvclient/kvtenant",
60+
"//pkg/kv/kvpb",
61+
"//pkg/kv/kvserver",
3762
"//pkg/roachpb",
3863
"//pkg/security/securityassets",
3964
"//pkg/security/securitytest",
65+
"//pkg/security/username",
4066
"//pkg/server",
67+
"//pkg/settings/cluster",
68+
"//pkg/sql",
69+
"//pkg/sql/bulksst",
70+
"//pkg/sql/catalog",
4171
"//pkg/sql/catalog/descpb",
72+
"//pkg/sql/catalog/desctestutils",
4273
"//pkg/sql/execinfrapb",
74+
"//pkg/sql/rowenc",
75+
"//pkg/storage",
76+
"//pkg/testutils",
4377
"//pkg/testutils/serverutils",
4478
"//pkg/testutils/sqlutils",
4579
"//pkg/testutils/testcluster",

pkg/sql/bulkingest/ingest.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkingest
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/roachpb"
12+
"github.com/cockroachdb/cockroach/pkg/sql"
13+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
15+
)
16+
17+
// IngestFiles runs the distributed ingest phase for a set of pre-built SSTs.
18+
// It:
19+
// - picks range splits that align with SST boundaries,
20+
// - pre-splits and scatters those ranges, and
21+
// - executes an ingest DistSQL flow that AddSSTables each file.
22+
func IngestFiles(
23+
ctx context.Context,
24+
execCtx sql.JobExecContext,
25+
spans []roachpb.Span,
26+
ssts []execinfrapb.BulkMergeSpec_SST,
27+
) error {
28+
splits, err := pickSplits(spans, ssts)
29+
if err != nil {
30+
return err
31+
}
32+
33+
db := execCtx.ExecCfg().InternalDB.KV()
34+
if err := splitAndScatterSpans(ctx, db, splits); err != nil {
35+
return err
36+
}
37+
38+
plan, planCtx, err := planIngest(ctx, execCtx, ssts)
39+
if err != nil {
40+
return err
41+
}
42+
43+
res := sql.NewMetadataOnlyMetadataCallbackWriter(func(context.Context, *execinfrapb.ProducerMetadata) error { return nil })
44+
recv := sql.MakeDistSQLReceiver(
45+
ctx,
46+
res,
47+
tree.Ack,
48+
execCtx.ExecCfg().RangeDescriptorCache, /* rangeCache */
49+
nil, /* txn - the flow does not read or write the database */
50+
nil, /* clockUpdater */
51+
execCtx.ExtendedEvalContext().Tracing,
52+
)
53+
defer recv.Release()
54+
55+
evalCtxCopy := execCtx.ExtendedEvalContext().Context.Copy()
56+
execCtx.DistSQLPlanner().Run(
57+
ctx, planCtx, nil, plan, recv, evalCtxCopy, nil, /* finishedSetupFn */
58+
)
59+
60+
return res.Err()
61+
}

0 commit comments

Comments
 (0)