Skip to content

Commit

Permalink
Add topic properties to Migrator (#3207)
Browse files Browse the repository at this point in the history
* Add topic properties to Migrator

Also make sure ACLs are always migrated even if the destination
topic already exists.

Signed-off-by: Mihai Todor <[email protected]>

* Skip `redpanda.remote.` when migrating Kafka topic properties

Signed-off-by: Mihai Todor <[email protected]>

* Refactor allowed topic properties logic in Migrator

Signed-off-by: Mihai Todor <[email protected]>

* Revamp the logging in the redpanda_migrator_offsets output

Signed-off-by: Mihai Todor <[email protected]>

---------

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor authored Feb 27, 2025
1 parent 4196ff3 commit e0a528d
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ All notable changes to this project will be documented in this file.

- Output `snowflake_streaming` has additional logging and debug information when errors arise. (@rockwotj)
- Input `postgres_cdc` now does not add a prefix to the replication slot name, if upgrading from a previous version, prefix your current replication slot with `rs_` to continue to use the same replication slot. (@rockwotj)
- The `redpanda_migrator` output now uses the source topic config when creating a topic in the destination cluster. It also attempts to transfer topic ACLs to the destination cluster even if the topics already exist. (@mihaitodor)

## 4.47.1 - 2025-02-11

Expand Down
163 changes: 161 additions & 2 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,66 @@ func startRedpanda(t *testing.T, pool *dockertest.Pool, exposeBroker bool, autoc
}, nil
}

func createTopicWithACLs(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

configs := map[string]*string{"retention.ms": &retentionTime}
_, err = adm.CreateTopic(context.Background(), 1, -1, configs, topic)
require.NoError(t, err)

updateTopicACL(t, adm, topic, principal, operation)
}

func updateTopicACL(t *testing.T, client *kadm.Client, topic, principal string, operation kadm.ACLOperation) {
builder := kadm.NewACLs().Allow(principal).AllowHosts("*").Topics(topic).ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation)
res, err := client.CreateACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, res, 1)
assert.NoError(t, res[0].Err)
}

func checkTopic(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

topicConfigs, err := adm.DescribeTopicConfigs(context.Background(), topic)
require.NoError(t, err)

rc, err := topicConfigs.On(topic, nil)
require.NoError(t, err)
assert.Condition(t, func() bool {
for _, c := range rc.Configs {
if c.Key == "retention.ms" && *c.Value == retentionTime {
return true
}
}
return false
})

builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation).Allow().Deny().AllowHosts().DenyHosts()

aclResults, err := adm.DescribeACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, aclResults[0].Described, 1)
require.NoError(t, aclResults[0].Err)
require.Len(t, aclResults[0].Described, 1)

for _, acl := range aclResults[0].Described {
assert.Equal(t, principal, acl.Principal)
assert.Equal(t, "*", acl.Host)
assert.Equal(t, topic, acl.Name)
assert.Equal(t, operation.String(), acl.Operation.String())
}
}

// produceMessages produces `count` messages to the given `topic` with the given `message` content. The
// `timestampOffset` indicates an offset which gets added to the `counter()` Bloblang function which is used to generate
// the message timestamps sequentially, the first one being `1 + timestampOffset`.
Expand Down Expand Up @@ -776,8 +836,6 @@ input:
topics: [ %s ]
consumer_group: migrator_cg
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
processors:
Expand Down Expand Up @@ -1084,3 +1142,104 @@ output:
})
}
}

func TestRedpandaMigratorTopicConfigAndACLsIntegration(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Minute

source, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)
destination, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)

dummyTopic := "test"

runMigrator := func() {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: migrator_cg
output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: ${! @kafka_topic }
key: ${! @kafka_key }
partition: ${! @kafka_partition }
partitioner: manual
timestamp_ms: ${! @kafka_timestamp_ms }
translate_schema_ids: false
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

migratorUpdateWG := sync.WaitGroup{}
migratorUpdateWG.Add(1)
require.NoError(t, streamBuilder.AddConsumerFunc(func(_ context.Context, m *service.Message) error {
defer migratorUpdateWG.Done()
return nil
}))

// Ensure the callback function is called after the output wrote the message.
streamBuilder.SetOutputBrokerPattern(service.OutputBrokerPatternFanOutSequential)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background.
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("redpanda_migrator_offsets pipeline shut down")
}()

migratorUpdateWG.Wait()

require.NoError(t, stream.StopWithin(3*time.Second))
}

// Create a topic with a custom retention time
dummyRetentionTime := strconv.Itoa(int((48 * time.Hour).Milliseconds()))
dummyPrincipal := "User:redpanda"
dummyACLOperation := kmsg.ACLOperationRead
createTopicWithACLs(t, source.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

// Produce one message
dummyMessage := `{"test":"foo"}`
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator
runMigrator()

// Ensure that the topic and ACL were migrated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

client, err := kgo.NewClient(kgo.SeedBrokers([]string{source.brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

// Update ACL in the source topic and ensure that it's reflected in the destination
dummyACLOperation = kmsg.ACLOperationDescribe
updateTopicACL(t, adm, dummyTopic, dummyPrincipal, dummyACLOperation)

// Produce one more message so the consumerFunc will get triggered to indicate that Migrator ran successfully
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator again
runMigrator()

// Ensure that the ACL was updated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)
}
20 changes: 11 additions & 9 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,24 +273,26 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
// timestamps of all the records in the topic. It also sets the timestamp of the returned offset to -1 in this case.
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, offsetCommitTimestamp, topic)
if err != nil {
return fmt.Errorf("failed to translate consumer offsets: %s", err)
return fmt.Errorf("failed to list offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

w.mgr.Logger().Tracef("Listed offsets for topic %q and timestamp %d: %+v", topic, offsetCommitTimestamp, listedOffsets)

if err := listedOffsets.Error(); err != nil {
return fmt.Errorf("listed offsets error: %s", err)
return fmt.Errorf("failed to read offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

offset, ok := listedOffsets.Lookup(topic, partition)
if !ok {
// This should never happen, but we check just in case.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic: lookup failed", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d: lookup failed", offsetCommitTimestamp, topic, partition)
}

if !isHighWatermark && offset.Timestamp == -1 {
// This can happen if we received an offset update, but the record which was read from the source cluster to
// trigger it has not been replicated to the destination cluster yet. In this case, we raise an error so the
// operation is retried.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d", offsetCommitTimestamp, topic, partition)
}

// This is an optimisation to try and avoid unnecessary duplicates in the common case when the received offset
Expand All @@ -303,12 +305,12 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
if isHighWatermark && offset.Timestamp != -1 {
offsets, err := w.client.ListEndOffsets(ctx, topic)
if err != nil {
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to list the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

highWatermark, ok := offsets.Lookup(topic, partition)
if !ok {
return fmt.Errorf("failed to find the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}
if highWatermark.Offset == offset.Offset+1 {
offset.Offset = highWatermark.Offset
Expand All @@ -326,11 +328,11 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

offsetResponses, err := w.client.CommitOffsets(ctx, group, offsets)
if err != nil {
return fmt.Errorf("failed to commit consumer offsets: %s", err)
return fmt.Errorf("failed to commit consumer offsets for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

if err := offsetResponses.Error(); err != nil {
return fmt.Errorf("committed consumer offsets returned an error: %s", err)
return fmt.Errorf("committed consumer offsets returned an error for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

return nil
Expand All @@ -349,7 +351,7 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

wait := backOff.NextBackOff()
if wait == backoff.Stop {
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d: %s", topic, partition, err)
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

time.Sleep(wait)
Expand Down
10 changes: 4 additions & 6 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ func init() {
// message from it.
mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
}

continue
} else {
mgr.Logger().Infof("Created topic %q", topic)
}

mgr.Logger().Infof("Created topic %q", topic)

if err := createACLs(ctx, topic, inputClient, outputClient); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)
}
Expand Down Expand Up @@ -294,10 +292,10 @@ func init() {
} else {
return fmt.Errorf("failed to create topic %q and ACLs: %s", record.Topic, err)
}
} else {
mgr.Logger().Infof("Created topic %q", record.Topic)
}

mgr.Logger().Infof("Created topic %q", record.Topic)

if err := createACLs(ctx, record.Topic, details.Client, client); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err)
}
Expand Down
41 changes: 38 additions & 3 deletions internal/impl/kafka/enterprise/topic_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,42 @@ func createTopic(ctx context.Context, topic string, replicationFactorOverride bo
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, nil, topic); err != nil {
topicConfigs, err := inputAdminClient.DescribeTopicConfigs(ctx, topic)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

rc, err := topicConfigs.On(topic, nil)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

// Source: https://docs.redpanda.com/current/reference/properties/topic-properties/
allowedConfigs := map[string]struct{}{
"cleanup.policy": {},
"flush.bytes": {},
"flush.ms": {},
"initial.retention.local.target.ms": {},
"retention.bytes": {},
"retention.ms": {},
"segment.ms": {},
"segment.bytes": {},
"compression.type": {},
"message.timestamp.type": {},
"max.message.bytes": {},
"replication.factor": {},
"write.caching": {},
"redpanda.iceberg.mode": {},
}

destinationConfigs := make(map[string]*string)
for _, c := range rc.Configs {
if _, ok := allowedConfigs[c.Key]; ok {
destinationConfigs[c.Key] = c.Value
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, destinationConfigs, topic); err != nil {
if !errors.Is(err, kerr.TopicAlreadyExists) {
return fmt.Errorf("failed to create topic %q: %s", topic, err)
}
Expand All @@ -72,11 +107,11 @@ func createACLs(ctx context.Context, topic string, inputClient *kgo.Client, outp
// Only topic ACLs are migrated, group ACLs are not migrated.
// Users are not migrated because we can't read passwords.

aclBuilder := kadm.NewACLs().Topics(topic).
builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations().Allow().Deny().AllowHosts().DenyHosts()
var inputACLResults kadm.DescribeACLsResults
var err error
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, aclBuilder); err != nil {
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, builder); err != nil {
return fmt.Errorf("failed to fetch ACLs for topic %q: %s", topic, err)
}

Expand Down

0 comments on commit e0a528d

Please sign in to comment.