Skip to content

Commit 53751a5

Browse files
authored
[CRE-1041] More robust check if error is retryable for Red Panda's schema registration (#2185)
1 parent c7d1215 commit 53751a5

File tree

2 files changed

+52
-23
lines changed

2 files changed

+52
-23
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Even more robust check if error is retryable for `Red Panda`'s schema registration

framework/components/dockercompose/chip_ingress_set/protos.go

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,10 @@ func checkSchemaExists(registryURL, subject string) (int, bool) {
557557
maxAttempts := uint(10)
558558
var resp *http.Response
559559
existErr := retry.Do(func() error {
560+
if resp != nil && resp.Body != nil {
561+
resp.Body.Close()
562+
}
563+
560564
var err error
561565
resp, err = http.Get(url)
562566
if err != nil {
@@ -630,6 +634,11 @@ func registerSingleProto(
630634

631635
var resp *http.Response
632636
registerErr := retry.Do(func() error {
637+
// Close previous response before next attempt
638+
if resp != nil && resp.Body != nil {
639+
resp.Body.Close()
640+
}
641+
633642
var respErr error
634643
resp, respErr = client.Do(&http.Request{
635644
Method: "POST",
@@ -644,11 +653,17 @@ func registerSingleProto(
644653
}
645654

646655
if resp.StatusCode >= 300 {
647-
data, dataErr := io.ReadAll(resp.Body)
648-
if dataErr != nil {
649-
return errors.Wrap(dataErr, "failed to read response body")
656+
body, bodyErr := io.ReadAll(resp.Body)
657+
resp.Body.Close()
658+
if bodyErr != nil {
659+
return errors.Wrap(bodyErr, "failed to read response body")
660+
}
661+
662+
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
663+
return retry.Unrecoverable(fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body)))
650664
}
651-
return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, data)
665+
666+
return fmt.Errorf("schema registry error (%d): %s", resp.StatusCode, string(body))
652667
}
653668

654669
return nil
@@ -657,7 +672,13 @@ func registerSingleProto(
657672
}), retry.RetryIf(func(err error) bool {
658673
// we don't want to retry all errors, because some of them are are expected (e.g. missing dependencies)
659674
// and will be handled by higher-level code
660-
return isRetryableError(err)
675+
shouldRetry := isRetryableError(err)
676+
677+
if !shouldRetry {
678+
framework.L.Warn().Msgf("Determined to not retry error: %T (error: %s)", err, err.Error())
679+
}
680+
681+
return shouldRetry
661682
}))
662683
if registerErr != nil {
663684
return 0, errors.Wrapf(registerErr, "failed to register schema for subject %s", subject)
@@ -699,16 +720,6 @@ func stripFolderPrefix(path string, prefixes []string) string {
699720
return path
700721
}
701722

702-
// transformSchemaContent removes folder prefixes from import statements in protobuf source
703-
func transformSchemaContent(content string, prefixes []string) string {
704-
modified := content
705-
for _, prefix := range prefixes {
706-
// Transform import statements like "workflows/v1/" to "v1/"
707-
modified = strings.ReplaceAll(modified, `"`+prefix, `"`)
708-
}
709-
return modified
710-
}
711-
712723
// buildDependencyGraph builds a dependency graph from protobuf files
713724
func buildDependencyGraph(protoMap map[string]string) (map[string][]string, error) {
714725
dependencies := make(map[string][]string)
@@ -804,15 +815,32 @@ func isRetryableError(err error) bool {
804815
return false
805816
}
806817

807-
retryableErrorMessages := []string{
808-
"connection reset by peer",
809-
"EOF",
810-
}
811-
812-
for _, msg := range retryableErrorMessages {
813-
if strings.Contains(err.Error(), msg) {
818+
var urlErr *url.Error
819+
if errors.As(err, &urlErr) {
820+
// Retry on timeouts
821+
var ne net.Error
822+
if errors.As(urlErr, &ne) && ne.Timeout() {
814823
return true
815824
}
825+
// Fall through to check the cause too
826+
err = urlErr.Err
816827
}
817-
return false
828+
829+
// network-layer errors worth retrying (dial/read/write problems)
830+
var opErr *net.OpError
831+
if errors.As(err, &opErr) {
832+
return true
833+
}
834+
835+
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
836+
return true
837+
}
838+
839+
msg := err.Error()
840+
return strings.Contains(msg, "connection reset by peer") ||
841+
strings.Contains(msg, "broken pipe") ||
842+
strings.Contains(msg, "connection refused") ||
843+
strings.Contains(msg, "http2: stream error") ||
844+
strings.Contains(msg, "EOF") ||
845+
strings.Contains(strings.ToLower(msg), "timeout")
818846
}

0 commit comments

Comments
 (0)