Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support assuming an intermediate role when using the AWS-MSK-IAM SASL mechanism #67

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.11-0.10.2.2
name: kafka2
Expand All @@ -36,6 +40,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.11-0.10.2.2
name: kafka3
Expand All @@ -47,6 +55,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.11-0.10.2.2
name: kafka4
Expand All @@ -58,6 +70,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka4
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.11-0.10.2.2
name: kafka5
Expand All @@ -69,6 +85,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka5
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.11-0.10.2.2
name: kafka6
Expand All @@ -80,6 +100,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka6
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

steps:
- checkout
Expand Down Expand Up @@ -127,6 +151,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.12-2.4.1
name: kafka2
Expand All @@ -138,6 +166,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.12-2.4.1
name: kafka3
Expand All @@ -149,6 +181,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.12-2.4.1
name: kafka4
Expand All @@ -160,6 +196,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka4
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.12-2.4.1
name: kafka5
Expand All @@ -171,6 +211,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka5
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

- image: wurstmeister/kafka:2.12-2.4.1
name: kafka6
Expand All @@ -182,6 +226,10 @@ jobs:
KAFKA_ADVERTISED_HOST_NAME: kafka6
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
command:
- '/bin/sh'
- '-c'
- 'sleep 10 && /usr/bin/start-kafka.sh'

steps:
- checkout
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func applyTopic(
applyConfig.dryRun,
applyConfig.shared.saslUsername,
applyConfig.shared.saslPassword,
applyConfig.shared.saslAssumeRole,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
true,
bootstrapConfig.shared.saslUsername,
bootstrapConfig.shared.saslPassword,
bootstrapConfig.shared.saslAssumeRole,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func checkTopicFile(
true,
checkConfig.shared.saslUsername,
checkConfig.shared.saslPassword,
checkConfig.shared.saslAssumeRole,
)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func getPreRun(cmd *cobra.Command, args []string) error {

func getRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
sess, _ := session.NewSession()

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func replPreRun(cmd *cobra.Command, args []string) error {

func replRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())
sess, _ := session.NewSession()

adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
Expand Down
55 changes: 37 additions & 18 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (
)

type sharedOptions struct {
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
saslAssumeRole string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
}

func (s sharedOptions) validate() error {
Expand Down Expand Up @@ -95,6 +96,10 @@ func (s sharedOptions) validate() error {
(s.saslUsername != "" || s.saslPassword != "") {
log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM")
}

if saslMechanism != admin.SASLMechanismAWSMSKIAM && s.saslAssumeRole != "" {
log.Warn("AssumeRole is ignored unless using SASL AWS-MSK-IAM")
}
}

return err
Expand All @@ -116,6 +121,7 @@ func (s sharedOptions) getAdminClient(
readOnly,
s.saslUsername,
s.saslPassword,
s.saslAssumeRole,
)
} else if s.brokerAddr != "" {
tlsEnabled := (s.tlsEnabled ||
Expand Down Expand Up @@ -150,10 +156,11 @@ func (s sharedOptions) getAdminClient(
SkipVerify: s.tlsSkipVerify,
},
SASL: admin.SASLConfig{
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
AssumeRole: s.saslAssumeRole,
},
},
ReadOnly: readOnly,
Expand Down Expand Up @@ -211,6 +218,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
os.Getenv("TOPICCTL_SASL_USERNAME"),
"SASL username if using SASL; will override value set in cluster config",
)
cmd.Flags().StringVar(
&options.saslAssumeRole,
"sasl-assume-role",
"",
"Intermediate role to assume if using SASL AWS-MSK-IAM",
)
cmd.Flags().StringVar(
&options.tlsCACert,
"tls-ca-cert",
Expand Down Expand Up @@ -288,4 +301,10 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
os.Getenv("TOPICCTL_SASL_USERNAME"),
"SASL username if using SASL; will override value set in cluster config",
)
cmd.Flags().StringVar(
&options.saslAssumeRole,
"sasl-assume-role",
os.Getenv("TOPICCTL_SASL_ASSUME_ROLE"),
"SASL assume role if using SASL AWS-MSK-IAM; will override value set in cluster config",
)
}
20 changes: 15 additions & 5 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -48,10 +50,11 @@ type TLSConfig struct {

// SASLConfig stores the SASL-related configuration for a connection.
type SASLConfig struct {
Enabled bool
Mechanism SASLMechanism
Username string
Password string
Enabled bool
Mechanism SASLMechanism
Username string
Password string
AssumeRole string
}

// Connector is a wrapper around the low-level, kafka-go dialer and client.
Expand All @@ -75,7 +78,14 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
switch config.SASL.Mechanism {
case SASLMechanismAWSMSKIAM:
sess := session.Must(session.NewSession())
signer := sigv4.NewSigner(sess.Config.Credentials)
var creds *credentials.Credentials
if config.SASL.AssumeRole != "" {
creds = stscreds.NewCredentials(sess, config.SASL.AssumeRole)
} else {
creds = sess.Config.Credentials
}

signer := sigv4.NewSigner(creds)
region := aws.StringValue(sess.Config.Region)

mechanismClient = &aws_msk_iam.Mechanism{
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func TestApplyOverrides(t *testing.T) {
},
}

adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "")
adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "", "")
require.NoError(t, err)

applier, err := NewTopicApplier(
Expand Down Expand Up @@ -922,7 +922,7 @@ func testApplier(
},
}

adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "")
adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "", "")
require.NoError(t, err)

applier, err := NewTopicApplier(
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCheck(t *testing.T) {
},
}

adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "")
adminClient, err := clusterConfig.NewAdminClient(ctx, nil, false, "", "", "")
require.NoError(t, err)

topicName := util.RandomString("check-topic-", 6)
Expand Down
Loading