Skip to content

Commit

Permalink
Add topic properties to Migrator
Browse files Browse the repository at this point in the history
Also make sure ACLs are always migrated even if the destination
topic already exists.

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Feb 25, 2025
1 parent 36a54a7 commit e3efdbd
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ All notable changes to this project will be documented in this file.

- Output `snowflake_streaming` has additional logging and debug information when errors arise. (@rockwotj)
- Input `postgres_cdc` now does not add a prefix to the replication slot name, if upgrading from a previous version, prefix your current replication slot with `rs_` to continue to use the same replication slot. (@rockwotj)
- The `redpanda_migrator` output now uses the source topic config when creating a topic in the destination cluster. It also attempts to transfer topic ACLs to the destination cluster even if the topics already exist. (@mihaitodor)

## 4.47.1 - 2025-02-11

Expand Down
163 changes: 161 additions & 2 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,66 @@ func startRedpanda(t *testing.T, pool *dockertest.Pool, exposeBroker bool, autoc
}, nil
}

func createTopicWithACLs(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

configs := map[string]*string{"retention.ms": &retentionTime}
_, err = adm.CreateTopic(context.Background(), 1, -1, configs, topic)
require.NoError(t, err)

updateTopicACL(t, adm, topic, principal, operation)
}

func updateTopicACL(t *testing.T, client *kadm.Client, topic, principal string, operation kadm.ACLOperation) {
builder := kadm.NewACLs().Allow(principal).AllowHosts("*").Topics(topic).ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation)
res, err := client.CreateACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, res, 1)
assert.NoError(t, res[0].Err)
}

func checkTopic(t *testing.T, brokerAddr, topic, retentionTime, principal string, operation kadm.ACLOperation) {
client, err := kgo.NewClient(kgo.SeedBrokers([]string{brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

topicConfigs, err := adm.DescribeTopicConfigs(context.Background(), topic)
require.NoError(t, err)

rc, err := topicConfigs.On(topic, nil)
require.NoError(t, err)
assert.Condition(t, func() bool {
for _, c := range rc.Configs {
if c.Key == "retention.ms" && *c.Value == retentionTime {
return true
}
}
return false
})

builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations(operation).Allow().Deny().AllowHosts().DenyHosts()

aclResults, err := adm.DescribeACLs(context.Background(), builder)
require.NoError(t, err)
require.Len(t, aclResults[0].Described, 1)
require.NoError(t, aclResults[0].Err)
require.Len(t, aclResults[0].Described, 1)

for _, acl := range aclResults[0].Described {
assert.Equal(t, principal, acl.Principal)
assert.Equal(t, "*", acl.Host)
assert.Equal(t, topic, acl.Name)
assert.Equal(t, operation.String(), acl.Operation.String())
}
}

// produceMessages produces `count` messages to the given `topic` with the given `message` content. The
// `timestampOffset` indicates an offset which gets added to the `counter()` Bloblang function which is used to generate
// the message timestamps sequentially, the first one being `1 + timestampOffset`.
Expand Down Expand Up @@ -776,8 +836,6 @@ input:
topics: [ %s ]
consumer_group: migrator_cg
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
processors:
Expand Down Expand Up @@ -1084,3 +1142,104 @@ output:
})
}
}

func TestRedpandaMigratorTopicConfigAndACLsIntegration(t *testing.T) {
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Minute

source, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)
destination, err := startRedpanda(t, pool, true, true)
require.NoError(t, err)

dummyTopic := "test"

runMigrator := func() {
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: migrator_cg
output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: ${! @kafka_topic }
key: ${! @kafka_key }
partition: ${! @kafka_partition }
partitioner: manual
timestamp_ms: ${! @kafka_timestamp_ms }
translate_schema_ids: false
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

migratorUpdateWG := sync.WaitGroup{}
migratorUpdateWG.Add(1)
require.NoError(t, streamBuilder.AddConsumerFunc(func(_ context.Context, m *service.Message) error {
defer migratorUpdateWG.Done()
return nil
}))

// Ensure the callback function is called after the output wrote the message.
streamBuilder.SetOutputBrokerPattern(service.OutputBrokerPatternFanOutSequential)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background.
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("redpanda_migrator_offsets pipeline shut down")
}()

migratorUpdateWG.Wait()

require.NoError(t, stream.StopWithin(3*time.Second))
}

// Create a topic with a custom retention time
dummyRetentionTime := strconv.Itoa(int((48 * time.Hour).Milliseconds()))
dummyPrincipal := "User:redpanda"
dummyACLOperation := kmsg.ACLOperationRead
createTopicWithACLs(t, source.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

// Produce one message
dummyMessage := `{"test":"foo"}`
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator
runMigrator()

// Ensure that the topic and ACL were migrated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)

client, err := kgo.NewClient(kgo.SeedBrokers([]string{source.brokerAddr}...))
require.NoError(t, err)
defer client.Close()

adm := kadm.NewClient(client)

// Update ACL in the source topic and ensure that it's reflected in the destination
dummyACLOperation = kmsg.ACLOperationDescribe
updateTopicACL(t, adm, dummyTopic, dummyPrincipal, dummyACLOperation)

// Produce one more message so the consumerFunc will get triggered to indicate that Migrator ran successfully
produceMessages(t, source, dummyTopic, dummyMessage, 0, 1, true)

// Run the Redpanda Migrator again
runMigrator()

// Ensure that the ACL was updated correctly
checkTopic(t, destination.brokerAddr, dummyTopic, dummyRetentionTime, dummyPrincipal, dummyACLOperation)
}
10 changes: 4 additions & 6 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,10 @@ func init() {
// message from it.
mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err)
}

continue
} else {
mgr.Logger().Infof("Created topic %q", topic)
}

mgr.Logger().Infof("Created topic %q", topic)

if err := createACLs(ctx, topic, inputClient, outputClient); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err)
}
Expand Down Expand Up @@ -294,10 +292,10 @@ func init() {
} else {
return fmt.Errorf("failed to create topic %q and ACLs: %s", record.Topic, err)
}
} else {
mgr.Logger().Infof("Created topic %q", record.Topic)
}

mgr.Logger().Infof("Created topic %q", record.Topic)

if err := createACLs(ctx, record.Topic, details.Client, client); err != nil {
mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err)
}
Expand Down
40 changes: 37 additions & 3 deletions internal/impl/kafka/enterprise/topic_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -56,7 +57,40 @@ func createTopic(ctx context.Context, topic string, replicationFactorOverride bo
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, nil, topic); err != nil {
topicConfigs, err := inputAdminClient.DescribeTopicConfigs(ctx, topic)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

rc, err := topicConfigs.On(topic, nil)
if err != nil {
return fmt.Errorf("failed to fetch configs for topic %q from source broker: %s", topic, err)
}

destinationConfigs := make(map[string]*string)
for _, c := range rc.Configs {
// Source: https://docs.redpanda.com/current/reference/properties/topic-properties/
if c.Key == "cleanup.policy" ||
c.Key == "flush.bytes" ||
c.Key == "flush.ms" ||
c.Key == "initial.retention.local.target.ms" ||
c.Key == "retention.bytes" ||
c.Key == "retention.ms" ||
c.Key == "segment.ms" ||
c.Key == "segment.bytes" ||
c.Key == "compression.type" ||
c.Key == "message.timestamp.type" ||
c.Key == "max.message.bytes" ||
c.Key == "replication.factor" ||
c.Key == "write.caching" ||
c.Key == "redpanda.iceberg.mode" ||
strings.HasPrefix(c.Key, "redpanda.remote.") {

destinationConfigs[c.Key] = c.Value
}
}

if _, err := outputAdminClient.CreateTopic(ctx, partitions, rp, destinationConfigs, topic); err != nil {
if !errors.Is(err, kerr.TopicAlreadyExists) {
return fmt.Errorf("failed to create topic %q: %s", topic, err)
}
Expand All @@ -72,11 +106,11 @@ func createACLs(ctx context.Context, topic string, inputClient *kgo.Client, outp
// Only topic ACLs are migrated, group ACLs are not migrated.
// Users are not migrated because we can't read passwords.

aclBuilder := kadm.NewACLs().Topics(topic).
builder := kadm.NewACLs().Topics(topic).
ResourcePatternType(kadm.ACLPatternLiteral).Operations().Allow().Deny().AllowHosts().DenyHosts()
var inputACLResults kadm.DescribeACLsResults
var err error
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, aclBuilder); err != nil {
if inputACLResults, err = inputAdminClient.DescribeACLs(ctx, builder); err != nil {
return fmt.Errorf("failed to fetch ACLs for topic %q: %s", topic, err)
}

Expand Down

0 comments on commit e3efdbd

Please sign in to comment.