Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic properties to Migrator #3207

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ All notable changes to this project will be documented in this file.

- Output `snowflake_streaming` has additional logging and debug information when errors arise. (@rockwotj)
- Input `postgres_cdc` now does not add a prefix to the replication slot name, if upgrading from a previous version, prefix your current replication slot with `rs_` to continue to use the same replication slot. (@rockwotj)
- The `redpanda_migrator` output now uses the source topic config when creating a topic in the destination cluster. It also attempts to transfer topic ACLs to the destination cluster even if the topics already exist. (@mihaitodor)

## 4.47.1 - 2025-02-11

Expand Down
163 changes: 161 additions & 2 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,66 @@ func startRedpanda(t *testing.T, pool *dockertest.Pool, exposeBroker bool, autoc
}, nil
}

func createTopicWithACLs(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

configs := map[string]*string{"retention.ms": &retentionTime}
_, err = adm.CreateTopic(context.Background(), 1, -1, configs, topic)
require.NoError(t, err)

updateTopicACL(t, adm, topic, principal, operation)
}

func updateTopicACL(t *testing.T, client *kadm.Client, topic, principal string, operation kadm.ACLOperation) {
builder := kadm.NewACLs().Allow(principal).AllowHosts("*").Topics(topic).ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation)
res, err := client.CreateACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, res, 1)
assert.NoError(t, res[0].Err)
}

func checkTopic(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

topicConfigs, err := adm.DescribeTopicConfigs(context.Background(), topic)
require.NoError(t, err)

rc, err := topicConfigs.On(topic, nil)
require.NoError(t, err)
assert.Condition(t, func() bool {
for _, c := range rc.Configs {
if c.Key == "retention.ms" && *c.Value == retentionTime {
return true
}
}
return false
})

builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation).Allow().Deny().AllowHosts().DenyHosts()

aclResults, err := adm.DescribeACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, aclResults[0].Described, 1)
require.NoError(t, aclResults[0].Err)
require.Len(t, aclResults[0].Described, 1)

for _, acl := range aclResults[0].Described {
assert.Equal(t, principal, acl.Principal)
assert.Equal(t, "*", acl.Host)
assert.Equal(t, topic, acl.Name)
assert.Equal(t, operation.String(), acl.Operation.String())
}
}

// produceMessages produces `count` messages to the given `topic` with the given `message` content. The
// `timestampOffset` indicates an offset which gets added to the `counter()` Bloblang function which is used to generate
// the message timestamps sequentially, the first one being `1 + timestampOffset`.
Expand Down Expand Up @@ -776,8 +836,6 @@ input:
topics: [ %s ]
consumer_group: migrator_cg
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
Comment on lines -779 to -780
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The template linter is a bit annoying, since it doesn't check if there are any unrecognised fields... Something to improve in Benthos.

schema_registry:
url: %s
processors:
Expand Down Expand Up @@ -1084,3 +1142,104 @@ output:
})
}
}

func TestRedpandaMigratorTopicConfigAndACLsIntegration(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Minute

source, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)
destination, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)

dummyTopic := "test"

runMigrator := func() {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: migrator_cg

output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: ${! @kafka_topic }
key: ${! @kafka_key }
partition: ${! @kafka_partition }
partitioner: manual
timestamp_ms: ${! @kafka_timestamp_ms }
translate_schema_ids: false
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

migratorUpdateWG := sync.WaitGroup{}
migratorUpdateWG.Add(1)
require.NoError(t, streamBuilder.AddConsumerFunc(func(_ context.Context, m *service.Message) error {
defer migratorUpdateWG.Done()
return nil
}))

// Ensure the callback function is called after the output wrote the message.
streamBuilder.SetOutputBrokerPattern(service.OutputBrokerPatternFanOutSequential)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background.
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("redpanda_migrator_offsets pipeline shut down")
}()

migratorUpdateWG.Wait()

require.NoError(t, stream.StopWithin(3*time.Second))
}

// Create a topic with a custom retention time
dummyRetentionTime := strconv.Itoa(int((48 * time.Hour).Milliseconds()))
dummyPrincipal := "User:redpanda"
dummyACLOperation := kmsg.ACLOperationRead
createTopicWithACLs(t, source.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

// Produce one message
dummyMessage := `{"test":"foo"}`
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator
runMigrator()

// Ensure that the topic and ACL were migrated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

client, err := kgo.NewClient(kgo.SeedBrokers([]string{source.brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

// Update ACL in the source topic and ensure that it's reflected in the destination
dummyACLOperation = kmsg.ACLOperationDescribe
updateTopicACL(t, adm, dummyTopic, dummyPrincipal, dummyACLOperation)

// Produce one more message so the consumerFunc will get triggered to indicate that Migrator ran successfully
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator again
runMigrator()

// Ensure that the ACL was updated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)
}
10 changes: 4 additions & 6 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ func init() {
// message from it.
mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
}

continue
} else {
mgr.Logger().Infof("Created topic %q", topic)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a debug message which gets printed if the topic already exists...

}

mgr.Logger().Infof("Created topic %q", topic)

if err := createACLs(ctx, topic, inputClient, outputClient); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)
}
Expand Down Expand Up @@ -294,10 +292,10 @@ func init() {
} else {
return fmt.Errorf("failed to create topic %q and ACLs: %s", record.Topic, err)
}
} else {
mgr.Logger().Infof("Created topic %q", record.Topic)
}

mgr.Logger().Infof("Created topic %q", record.Topic)

if err := createACLs(ctx, record.Topic, details.Client, client); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err)
}
Expand Down
38 changes: 35 additions & 3 deletions internal/impl/kafka/enterprise/topic_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,39 @@ func createTopic(ctx context.Context, topic string, replicationFactorOverride bo
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, nil, topic); err != nil {
topicConfigs, err := inputAdminClient.DescribeTopicConfigs(ctx, topic)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

rc, err := topicConfigs.On(topic, nil)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

destinationConfigs := make(map[string]*string)
for _, c := range rc.Configs {
// Source: https://docs.redpanda.com/current/reference/properties/topic-properties/
if c.Key == "cleanup.policy" ||
c.Key == "flush.bytes" ||
c.Key == "flush.ms" ||
c.Key == "initial.retention.local.target.ms" ||
c.Key == "retention.bytes" ||
c.Key == "retention.ms" ||
c.Key == "segment.ms" ||
c.Key == "segment.bytes" ||
c.Key == "compression.type" ||
c.Key == "message.timestamp.type" ||
c.Key == "max.message.bytes" ||
c.Key == "replication.factor" ||
c.Key == "write.caching" ||
c.Key == "redpanda.iceberg.mode" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler as a switch or a slices.Contain?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went for a map since that's what Copilot suggested when I asked it to make the code look nicer 😅


destinationConfigs[c.Key] = c.Value
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, destinationConfigs, topic); err != nil {
if !errors.Is(err, kerr.TopicAlreadyExists) {
return fmt.Errorf("failed to create topic %q: %s", topic, err)
}
Expand All @@ -72,11 +104,11 @@ func createACLs(ctx context.Context, topic string, inputClient *kgo.Client, outp
// Only topic ACLs are migrated, group ACLs are not migrated.
// Users are not migrated because we can't read passwords.

aclBuilder := kadm.NewACLs().Topics(topic).
builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations().Allow().Deny().AllowHosts().DenyHosts()
var inputACLResults kadm.DescribeACLsResults
var err error
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, aclBuilder); err != nil {
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, builder); err != nil {
return fmt.Errorf("failed to fetch ACLs for topic %q: %s", topic, err)
}

Expand Down
Loading