Skip to content

Commit

Permalink
Add kafka acl option to user create (#1508)
Browse files Browse the repository at this point in the history
Co-authored-by: Rahul Bhardwaj <[email protected]>
  • Loading branch information
bhardwajRahul and Rahul Bhardwaj authored Feb 27, 2024
1 parent 80974e9 commit 4a1686e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
2 changes: 2 additions & 0 deletions args.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ const (
ArgDatabaseUserMySQLAuthPlugin = "mysql-auth-plugin"
// ArgDatabasePrivateConnectionBool determine if the private connection details should be shown
ArgDatabasePrivateConnectionBool = "private"
// ArgDatabaseUserKafkaACLs will specify permissions on topics in kafka clsuter
ArgDatabaseUserKafkaACLs = "acl"

// ArgDatabaseTopicReplicationFactor is the replication factor of a kafka topic
ArgDatabaseTopicReplicationFactor = "replication-factor"
Expand Down
34 changes: 33 additions & 1 deletion commands/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func databaseUser() *Command {
Database user accounts are scoped to one database cluster, to which they have full admin access, and are given an automatically-generated password.`,
},
}

databaseKafkaACLsTxt := `A comma-separated list of kafka ACL rules, in ` + "`" + `topic:permission` + "`" + ` format.`
userDetailsDesc := `
- The username for the user
Expand Down Expand Up @@ -692,6 +692,7 @@ To retrieve a list of your databases and their IDs, call `+"`"+`doctl databases

AddStringFlag(cmdDatabaseUserCreate, doctl.ArgDatabaseUserMySQLAuthPlugin, "", "",
"Sets authorization plugin for a MySQL user. Possible values: `caching_sha2_password` or `mysql_native_password`")
AddStringSliceFlag(cmdDatabaseUserCreate, doctl.ArgDatabaseUserKafkaACLs, "", []string{}, databaseKafkaACLsTxt)
cmdDatabaseUserCreate.Example = `The following example creates a new user with the username ` + "`" + `example-user` + "`" + ` for a database cluster with the ID ` + "`" + `ca9f591d-f38h-5555-a0ef-1c02d1d1e35` + "`" + `: doctl databases user create ca9f591d-f38h-5555-a0ef-1c02d1d1e35 example-user`

cmdDatabaseUserResetAuth := CmdBuilder(cmd, RunDatabaseUserResetAuth, "reset <database-cluster-id> <user-name> <new-auth-mode>",
Expand Down Expand Up @@ -767,6 +768,17 @@ func RunDatabaseUserCreate(c *CmdConfig) error {
}
}

kafkaAcls, err := buildDatabaseCreateKafkaUserACls(c)
if err != nil {
return err
}

if len(kafkaAcls) != 0 {
req.Settings = &godo.DatabaseUserSettings{
ACL: kafkaAcls,
}
}

user, err := c.Databases().CreateUser(databaseID, req)
if err != nil {
return err
Expand All @@ -775,6 +787,26 @@ func RunDatabaseUserCreate(c *CmdConfig) error {
return displayDatabaseUsers(c, *user)
}

func buildDatabaseCreateKafkaUserACls(c *CmdConfig) (kafkaACls []*godo.KafkaACL, err error) {
acls, err := c.Doit.GetStringSlice(c.NS, doctl.ArgDatabaseUserKafkaACLs)
if err != nil {
return nil, err
}
for _, acl := range acls {
pair := strings.SplitN(acl, ":", 2)
if len(pair) != 2 {
return nil, fmt.Errorf("Unexpected input value [%v], must be a topic:permission pair", pair)
}

kafkaACl := new(godo.KafkaACL)
kafkaACl.Topic = pair[0]
kafkaACl.Permission = pair[1]

kafkaACls = append(kafkaACls, kafkaACl)
}
return kafkaACls, nil
}

func RunDatabaseUserResetAuth(c *CmdConfig) error {
if len(c.Args) < 2 {
return doctl.NewMissingArgsErr(c.NS)
Expand Down
23 changes: 23 additions & 0 deletions commands/databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,29 @@ func TestDatabaseUserCreate(t *testing.T) {
assert.NoError(t, err)
})

// Successful call with kafka acl set
withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
r := &godo.DatabaseCreateUserRequest{
Name: testDBUser.Name,
Settings: &godo.DatabaseUserSettings{
ACL: []*godo.KafkaACL{
{
Permission: "admin",
Topic: "test",
},
},
},
}

tm.databases.EXPECT().CreateUser(testDBCluster.ID, r).Return(&testDBUser, nil)

config.Args = append(config.Args, testDBCluster.ID, testDBUser.Name)
config.Doit.Set(config.NS, doctl.ArgDatabaseUserKafkaACLs, "test:admin")

err := RunDatabaseUserCreate(config)
assert.NoError(t, err)
})

// Error
withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
tm.databases.EXPECT().CreateUser(
Expand Down

0 comments on commit 4a1686e

Please sign in to comment.