Skip to content

Commit

Permalink
fix: add retry operation to avoid the topic not found (#50)
Browse files Browse the repository at this point in the history
Fix #48 

### Motivation

When we created many topics at one time, then update these topic policies, the Pulsar always returns the topic not found, so add a retry operation.
  • Loading branch information
nodece authored Apr 15, 2022
1 parent 09ecc90 commit 0ecf4db
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/aws/aws-sdk-go v1.25.48 // indirect
github.com/cenkalti/backoff/v4 v4.1.2
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/hcl/v2 v2.1.0 // indirect
github.com/hashicorp/terraform-plugin-sdk v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1U
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ00z/TKoufEY6K/a0k6AhaJrQKdFe6OfVXsa4=
github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
42 changes: 22 additions & 20 deletions pulsar/resource_pulsar_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"fmt"

"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/pkg/errors"
"github.com/streamnative/pulsarctl/pkg/pulsar"
Expand Down Expand Up @@ -149,17 +150,11 @@ func resourcePulsarTopicCreate(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("ERROR_CREATE_TOPIC_PERMISSION_GRANT: %w", err)
}

if topicName.IsPersistent() {
err = updateRetentionPolicies(d, meta, topicName)
if err != nil {
return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: %w", err)
}
} else {
retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set)
if retentionPoliciesConfig.Len() != 0 {
return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: " +
"unsupported set retention policies for non-persistent topic")
}
err = retry(func() error {
return updateRetentionPolicies(d, meta, topicName)
})
if err != nil {
return fmt.Errorf("ERROR_CREATE_TOPIC_RETENTION_POLICIES: %w", err)
}

return resourcePulsarTopicRead(d, meta)
Expand Down Expand Up @@ -197,7 +192,6 @@ func resourcePulsarTopicRead(d *schema.ResourceData, meta interface{}) error {

if retPoliciesCfg, ok := d.GetOk("retention_policies"); ok && retPoliciesCfg.(*schema.Set).Len() > 0 {
if topicName.IsPersistent() {

ret, err := client.GetRetention(*topicName, true)
if err != nil {
return fmt.Errorf("ERROR_READ_TOPIC: GetRetention: %w", err)
Expand All @@ -209,6 +203,8 @@ func resourcePulsarTopicRead(d *schema.ResourceData, meta interface{}) error {
"retention_size_mb": int(ret.RetentionSizeInMB),
},
})
} else {
return errors.New("ERROR_READ_TOPIC: unsupported get retention policies for non-persistent topic")
}
}

Expand Down Expand Up @@ -374,20 +370,22 @@ func updateRetentionPolicies(d *schema.ResourceData, meta interface{}, topicName
client := meta.(pulsar.Client).Topics()

retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set)
if retentionPoliciesConfig.Len() == 0 {
return nil
}

if !topicName.IsPersistent() {
return errors.New("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: " +
"unsupported set retention policies for non-persistent topic")
}

if retentionPoliciesConfig.Len() > 0 {
var policies utils.RetentionPolicies
data := retentionPoliciesConfig.List()[0].(map[string]interface{})
policies.RetentionTimeInMinutes = data["retention_time_minutes"].(int)
policies.RetentionSizeInMB = int64(data["retention_size_mb"].(int))
var policies utils.RetentionPolicies
data := retentionPoliciesConfig.List()[0].(map[string]interface{})
policies.RetentionTimeInMinutes = data["retention_time_minutes"].(int)
policies.RetentionSizeInMB = int64(data["retention_size_mb"].(int))

if err := client.SetRetention(*topicName, policies); err != nil {
return fmt.Errorf("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: %w", err)
}
if err := client.SetRetention(*topicName, policies); err != nil {
return fmt.Errorf("ERROR_UPDATE_RETENTION_POLICIES: SetRetention: %w", err)
}

return nil
Expand All @@ -412,3 +410,7 @@ func updatePartitions(d *schema.ResourceData, meta interface{}, topicName *utils

return nil
}

func retry(operation func() error) error {
return backoff.Retry(operation, backoff.NewExponentialBackOff())
}

0 comments on commit 0ecf4db

Please sign in to comment.