Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic properties to Migrator #3207

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ All notable changes to this project will be documented in this file.

- Output `snowflake_streaming` has additional logging and debug information when errors arise. (@rockwotj)
- Input `postgres_cdc` now does not add a prefix to the replication slot name, if upgrading from a previous version, prefix your current replication slot with `rs_` to continue to use the same replication slot. (@rockwotj)
- The `redpanda_migrator` output now uses the source topic config when creating a topic in the destination cluster. It also attempts to transfer topic ACLs to the destination cluster even if the topics already exist. (@mihaitodor)

## 4.47.1 - 2025-02-11

Expand Down
163 changes: 161 additions & 2 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,66 @@ func startRedpanda(t *testing.T, pool *dockertest.Pool, exposeBroker bool, autoc
}, nil
}

func createTopicWithACLs(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

configs := map[string]*string{"retention.ms": &retentionTime}
_, err = adm.CreateTopic(context.Background(), 1, -1, configs, topic)
require.NoError(t, err)

updateTopicACL(t, adm, topic, principal, operation)
}

func updateTopicACL(t *testing.T, client *kadm.Client, topic, principal string, operation kadm.ACLOperation) {
builder := kadm.NewACLs().Allow(principal).AllowHosts("*").Topics(topic).ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation)
res, err := client.CreateACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, res, 1)
assert.NoError(t, res[0].Err)
}

func checkTopic(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

topicConfigs, err := adm.DescribeTopicConfigs(context.Background(), topic)
require.NoError(t, err)

rc, err := topicConfigs.On(topic, nil)
require.NoError(t, err)
assert.Condition(t, func() bool {
for _, c := range rc.Configs {
if c.Key == "retention.ms" && *c.Value == retentionTime {
return true
}
}
return false
})

builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation).Allow().Deny().AllowHosts().DenyHosts()

aclResults, err := adm.DescribeACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, aclResults[0].Described, 1)
require.NoError(t, aclResults[0].Err)
require.Len(t, aclResults[0].Described, 1)

for _, acl := range aclResults[0].Described {
assert.Equal(t, principal, acl.Principal)
assert.Equal(t, "*", acl.Host)
assert.Equal(t, topic, acl.Name)
assert.Equal(t, operation.String(), acl.Operation.String())
}
}

// produceMessages produces `count` messages to the given `topic` with the given `message` content. The
// `timestampOffset` indicates an offset which gets added to the `counter()` Bloblang function which is used to generate
// the message timestamps sequentially, the first one being `1 + timestampOffset`.
Expand Down Expand Up @@ -776,8 +836,6 @@ input:
topics: [ %s ]
consumer_group: migrator_cg
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
Comment on lines -779 to -780
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The template linter is a bit annoying, since it doesn't check if there are any unrecognised fields... Something to improve in Benthos.

schema_registry:
url: %s
processors:
Expand Down Expand Up @@ -1084,3 +1142,104 @@ output:
})
}
}

func TestRedpandaMigratorTopicConfigAndACLsIntegration(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Minute

source, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)
destination, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)

dummyTopic := "test"

runMigrator := func() {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: migrator_cg

output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: ${! @kafka_topic }
key: ${! @kafka_key }
partition: ${! @kafka_partition }
partitioner: manual
timestamp_ms: ${! @kafka_timestamp_ms }
translate_schema_ids: false
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

migratorUpdateWG := sync.WaitGroup{}
migratorUpdateWG.Add(1)
require.NoError(t, streamBuilder.AddConsumerFunc(func(_ context.Context, m *service.Message) error {
defer migratorUpdateWG.Done()
return nil
}))

// Ensure the callback function is called after the output wrote the message.
streamBuilder.SetOutputBrokerPattern(service.OutputBrokerPatternFanOutSequential)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background.
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("redpanda_migrator_offsets pipeline shut down")
}()

migratorUpdateWG.Wait()

require.NoError(t, stream.StopWithin(3*time.Second))
}

// Create a topic with a custom retention time
dummyRetentionTime := strconv.Itoa(int((48 * time.Hour).Milliseconds()))
dummyPrincipal := "User:redpanda"
dummyACLOperation := kmsg.ACLOperationRead
createTopicWithACLs(t, source.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

// Produce one message
dummyMessage := `{"test":"foo"}`
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator
runMigrator()

// Ensure that the topic and ACL were migrated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

client, err := kgo.NewClient(kgo.SeedBrokers([]string{source.brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

// Update ACL in the source topic and ensure that it's reflected in the destination
dummyACLOperation = kmsg.ACLOperationDescribe
updateTopicACL(t, adm, dummyTopic, dummyPrincipal, dummyACLOperation)

// Produce one more message so the consumerFunc will get triggered to indicate that Migrator ran successfully
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator again
runMigrator()

// Ensure that the ACL was updated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)
}
20 changes: 11 additions & 9 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,24 +273,26 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
// timestamps of all the records in the topic. It also sets the timestamp of the returned offset to -1 in this case.
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, offsetCommitTimestamp, topic)
if err != nil {
return fmt.Errorf("failed to translate consumer offsets: %s", err)
return fmt.Errorf("failed to list offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

w.mgr.Logger().Tracef("Listed offsets for topic %q and timestamp %d: %+v", topic, offsetCommitTimestamp, listedOffsets)

if err := listedOffsets.Error(); err != nil {
return fmt.Errorf("listed offsets error: %s", err)
return fmt.Errorf("failed to read offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

offset, ok := listedOffsets.Lookup(topic, partition)
if !ok {
// This should never happen, but we check just in case.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic: lookup failed", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d: lookup failed", offsetCommitTimestamp, topic, partition)
}

if !isHighWatermark && offset.Timestamp == -1 {
// This can happen if we received an offset update, but the record which was read from the source cluster to
// trigger it has not been replicated to the destination cluster yet. In this case, we raise an error so the
// operation is retried.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d", offsetCommitTimestamp, topic, partition)
}

// This is an optimisation to try and avoid unnecessary duplicates in the common case when the received offset
Expand All @@ -303,12 +305,12 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
if isHighWatermark && offset.Timestamp != -1 {
offsets, err := w.client.ListEndOffsets(ctx, topic)
if err != nil {
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to list the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

highWatermark, ok := offsets.Lookup(topic, partition)
if !ok {
return fmt.Errorf("failed to find the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}
if highWatermark.Offset == offset.Offset+1 {
offset.Offset = highWatermark.Offset
Expand All @@ -326,11 +328,11 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

offsetResponses, err := w.client.CommitOffsets(ctx, group, offsets)
if err != nil {
return fmt.Errorf("failed to commit consumer offsets: %s", err)
return fmt.Errorf("failed to commit consumer offsets for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

if err := offsetResponses.Error(); err != nil {
return fmt.Errorf("committed consumer offsets returned an error: %s", err)
return fmt.Errorf("committed consumer offsets returned an error for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

return nil
Expand All @@ -349,7 +351,7 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

wait := backOff.NextBackOff()
if wait == backoff.Stop {
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d: %s", topic, partition, err)
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

time.Sleep(wait)
Expand Down
10 changes: 4 additions & 6 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ func init() {
// message from it.
mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
}

continue
} else {
mgr.Logger().Infof("Created topic %q", topic)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a debug message which gets printed if the topic already exists...

}

mgr.Logger().Infof("Created topic %q", topic)

if err := createACLs(ctx, topic, inputClient, outputClient); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)
}
Expand Down Expand Up @@ -294,10 +292,10 @@ func init() {
} else {
return fmt.Errorf("failed to create topic %q and ACLs: %s", record.Topic, err)
}
} else {
mgr.Logger().Infof("Created topic %q", record.Topic)
}

mgr.Logger().Infof("Created topic %q", record.Topic)

if err := createACLs(ctx, record.Topic, details.Client, client); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err)
}
Expand Down
41 changes: 38 additions & 3 deletions internal/impl/kafka/enterprise/topic_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,42 @@ func createTopic(ctx context.Context, topic string, replicationFactorOverride bo
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, nil, topic); err != nil {
topicConfigs, err := inputAdminClient.DescribeTopicConfigs(ctx, topic)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

rc, err := topicConfigs.On(topic, nil)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

// Source: https://docs.redpanda.com/current/reference/properties/topic-properties/
allowedConfigs := map[string]struct{}{
"cleanup.policy": {},
"flush.bytes": {},
"flush.ms": {},
"initial.retention.local.target.ms": {},
"retention.bytes": {},
"retention.ms": {},
"segment.ms": {},
"segment.bytes": {},
"compression.type": {},
"message.timestamp.type": {},
"max.message.bytes": {},
"replication.factor": {},
"write.caching": {},
"redpanda.iceberg.mode": {},
}

destinationConfigs := make(map[string]*string)
for _, c := range rc.Configs {
if _, ok := allowedConfigs[c.Key]; ok {
destinationConfigs[c.Key] = c.Value
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, destinationConfigs, topic); err != nil {
if !errors.Is(err, kerr.TopicAlreadyExists) {
return fmt.Errorf("failed to create topic %q: %s", topic, err)
}
Expand All @@ -72,11 +107,11 @@ func createACLs(ctx context.Context, topic string, inputClient *kgo.Client, outp
// Only topic ACLs are migrated, group ACLs are not migrated.
// Users are not migrated because we can't read passwords.

aclBuilder := kadm.NewACLs().Topics(topic).
builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations().Allow().Deny().AllowHosts().DenyHosts()
var inputACLResults kadm.DescribeACLsResults
var err error
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, aclBuilder); err != nil {
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, builder); err != nil {
return fmt.Errorf("failed to fetch ACLs for topic %q: %s", topic, err)
}

Expand Down
Loading