Skip to content

Commit f7a6635

Browse files
committed
changefeedccl: fix unwatched column families memory monitoring bug
This patch fixes a bug where the memory monitor wouldn't reclaim the memory allocated to events corresponding to unwatched column families for a changefeed that targets only a subset of a table's families. Release note (bug fix): A bug where a changefeed targeting only a subset of a table's column families could become stuck has been fixed.
1 parent 2d1e26b commit f7a6635

File tree

3 files changed

+93
-0
lines changed

3 files changed

+93
-0
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ go_test(
202202
"alter_changefeed_test.go",
203203
"avro_test.go",
204204
"changefeed_dist_test.go",
205+
"changefeed_memory_test.go",
205206
"changefeed_processors_test.go",
206207
"changefeed_stmt_test.go",
207208
"changefeed_test.go",
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedccl
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
13+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
14+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
18+
)
19+
20+
// TestChangefeedUnwatchedFamilyMemoryMonitoring verifies that changefeeds
21+
// correctly release memory allocations for events corresponding to unwatched
22+
// column families after discarding them. This is a regression test for #154776.
23+
func TestChangefeedUnwatchedFamilyMemoryMonitoring(t *testing.T) {
24+
defer leaktest.AfterTest(t)()
25+
defer log.Scope(t).Close(t)
26+
27+
ctx := context.Background()
28+
rnd, _ := randutil.NewTestRand()
29+
30+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
31+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
32+
33+
// Set a low memory limit.
34+
changefeedbase.PerChangefeedMemLimit.Override(
35+
ctx, &s.Server.ClusterSettings().SV, 1<<20 /* 1 MiB */)
36+
37+
// Create a table with two column families.
38+
sqlDB.Exec(t, `CREATE TABLE foo (
39+
id INT PRIMARY KEY,
40+
a STRING,
41+
b STRING,
42+
FAMILY f1 (id, a),
43+
FAMILY f2 (b)
44+
)`)
45+
46+
// Insert initial data.
47+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a', 'b')`)
48+
49+
// Start changefeed watching only f1 with diff enabled.
50+
// Events from f2 should trigger ErrUnwatchedFamily.
51+
feed := feed(t, f,
52+
`CREATE CHANGEFEED FOR foo FAMILY f1 WITH diff, initial_scan='no', resolved`)
53+
defer closeFeed(t, feed)
54+
55+
// Update a watched column to generate an event.
56+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_1' WHERE id = 1`)
57+
assertPayloads(t, feed, []string{
58+
`foo.f1: [1]->{"after": {"a": "a_1", "id": 1}, "before": {"a": "a", "id": 1}}`,
59+
})
60+
61+
// Generate a lot of events for the unwatched family. If the memory
62+
// allocations are being leaked, this would cause the changefeed to
63+
// exceed the previously configured 1 MiB limit and become stuck
64+
// when it attempts to process more events after the limit is hit.
65+
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
66+
for range 1000 {
67+
// Each update will create an event of size ~2 KiB
68+
// (~1 KiB for each of before/after).
69+
data := make([]byte, 1<<10 /* 1 KiB */)
70+
for i := range data {
71+
data[i] = charset[rnd.Intn(len(charset))]
72+
}
73+
sqlDB.Exec(t, `UPDATE foo SET b = $1 WHERE id = 1`, data)
74+
}
75+
76+
// Update watched column again to verify the feed is still progressing.
77+
// If the memory allocations leaked, this assertion would time out
78+
// because the changefeed would be stuck.
79+
sqlDB.Exec(t, `UPDATE foo SET a = 'a_2' WHERE id = 1`)
80+
assertPayloads(t, feed, []string{
81+
`foo.f1: [1]->{"after": {"a": "a_2", "id": 1}, "before": {"a": "a_1", "id": 1}}`,
82+
})
83+
}
84+
85+
cdcTest(t, testFn)
86+
}

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
337337
// Column families are stored contiguously, so we'll get
338338
// events for each one even if we're not watching them all.
339339
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
340+
// Release the event's allocation since we're not processing it.
341+
a := ev.DetachAlloc()
342+
a.Release(ctx)
340343
return nil
341344
}
342345
return err
@@ -353,6 +356,9 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even
353356
// Column families are stored contiguously, so we'll get
354357
// events for each one even if we're not watching them all.
355358
if errors.Is(err, cdcevent.ErrUnwatchedFamily) {
359+
// Release the event's allocation since we're not processing it.
360+
a := ev.DetachAlloc()
361+
a.Release(ctx)
356362
return nil
357363
}
358364
return err

0 commit comments

Comments
 (0)