Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup & optimise api #3

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 52 additions & 113 deletions internal/argocd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math"
"strings"
"sync"
"time"

argocdclient "github.com/argoproj/argo-cd/v2/pkg/apiclient"
Expand All @@ -18,20 +19,12 @@ import (

const (
maxRetries = 5
baseDelay = 2 // Base delay in seconds for exponential backoff
maxConcurrentOps = 10 // Maximum number of concurrent sync operations
initialBackoff = 100 // Initial backoff interval in milliseconds
postSyncDelay = 15 * time.Second // Delay for 15 seconds after sync
appQueryMaxRetries = 5
appQueryRetryDelay = 5 * time.Second
)

// Interface is an interface for API.
type Interface interface {
Sync(appName string) error
SyncWithLabels(labels string) ([]*v1alpha1.Application, error)
}

// API is struct for ArgoCD api.
type API struct {
client applicationpkg.ApplicationServiceClient
Expand Down Expand Up @@ -88,9 +81,9 @@ func (a API) GetApplicationWithRetry(appName string, refreshType string) (*v1alp

// Sync syncs given application.
func (a API) Sync(appName string) error {
maxRetries := 5
refreshPollInterval := 5 * time.Second // Interval to check sync status after refresh
maxRefreshPolls := 10 // Maximum number of times to poll sync status after refresh, 12 polls at 5-second intervals = 60 seconds
maxRetries := 3
refreshPollInterval := 3 * time.Second // Interval to check sync status after refresh
maxRefreshPolls := 7 // Maximum number of times to poll sync status after refresh, 7 polls at 3-second intervals = 21 seconds

for i := 0; i < maxRetries; i++ {
// Step 1: Refresh the application to detect latest changes
Expand Down Expand Up @@ -141,20 +134,6 @@ func (a API) Sync(appName string) error {
return fmt.Errorf("Failed to sync app %s after %d attempts", appName, maxRetries)
}

// Introduce a retry mechanism with exponential backoff
func (a API) SyncWithRetry(appName string) error {
var err error
for i := 0; i < maxRetries; i++ {
err = a.Sync(appName)
if err == nil {
return nil
}
// Exponential backoff: 2^i * 100ms. For i=0,1,2,... this results in 100ms, 200ms, 400ms, ...
time.Sleep(time.Duration(math.Pow(2, float64(i))) * 100 * time.Millisecond)
}
return err
}

// SyncWithLabels syncs applications based on provided labels.
func (a API) SyncWithLabels(labels string) ([]*v1alpha1.Application, error) {
// 1. Fetch applications based on labels
Expand All @@ -170,42 +149,60 @@ func (a API) SyncWithLabels(labels string) ([]*v1alpha1.Application, error) {
var syncedApps []*v1alpha1.Application
var syncErrors []string

// 2. Sync each application
// Use a channel to collect sync errors
errorsCh := make(chan string, len(listResponse.Items))
// Use a wait group to wait for all goroutines to finish
var wg sync.WaitGroup

// 2. Sync each application in parallel
for _, app := range listResponse.Items {
retries := 0
for retries < maxRetries {
err := a.Sync(app.Name)
if err != nil {
if isTransientError(err) {
// Handle transient errors with exponential backoff
time.Sleep(time.Duration(math.Pow(2, float64(retries))) * time.Millisecond * initialBackoff)
retries++
continue
wg.Add(1)
go func(app v1alpha1.Application) {
defer wg.Done()
retries := 0
for retries < maxRetries {
err := a.Sync(app.Name)
if err != nil {
if isTransientError(err) {
// Handle transient errors with exponential backoff
time.Sleep(time.Duration(math.Pow(2, float64(retries))) * time.Millisecond * initialBackoff)
retries++
continue
}
errorsCh <- fmt.Sprintf("Error syncing %s: %v", app.Name, err)
return
}
syncErrors = append(syncErrors, fmt.Sprintf("Error syncing %s: %v", app.Name, err))
break
}

// Introduce a post-sync delay
time.Sleep(postSyncDelay)
// Introduce a post-sync delay
time.Sleep(postSyncDelay)

// Check for differences after sync
hasDiff, err := a.HasDifferences(app.Name)
if err != nil {
syncErrors = append(syncErrors, fmt.Sprintf("Error checking differences for %s: %v", app.Name, err))
break
}
// Check for differences after sync
hasDiff, err := a.HasDifferences(app.Name)
if err != nil {
errorsCh <- fmt.Sprintf("Error checking differences for %s: %v", app.Name, err)
return
}

if !hasDiff {
log.Infof("Synced app %s based on labels", app.Name)
syncedApps = append(syncedApps, &app)
break
} else {
log.Warnf("Differences still found after sync for app %s. Retrying...", app.Name)
retries++
time.Sleep(10 * time.Second) // Wait before retrying
if !hasDiff {
log.Infof("Synced app %s based on labels", app.Name)
syncedApps = append(syncedApps, &app)
return
} else {
log.Warnf("Differences still found after sync for app %s. Retrying...", app.Name)
retries++
time.Sleep(5 * time.Second) // Reduced from 10 seconds
}
}
}
}(app)
}

// Wait for all goroutines to finish
wg.Wait()
close(errorsCh)

// Collect errors from the channel
for err := range errorsCh {
syncErrors = append(syncErrors, err)
}

// Close the gRPC connection after all sync operations are complete
Expand All @@ -230,64 +227,6 @@ func isTransientError(err error) bool {
return strings.Contains(errMsg, "ENHANCE_YOUR_CALM") || strings.Contains(errMsg, "too_many_pings")
}

func matchesLabels(app *v1alpha1.Application, labelsStr string) bool {
pairs := strings.Split(labelsStr, ",")
appLabels := app.ObjectMeta.Labels

for _, pair := range pairs {
// Handle negative matches
if strings.Contains(pair, "!=") {
keyValue := strings.Split(pair, "!=")
if len(keyValue) != 2 {
// Malformed label string
continue
}
key, value := keyValue[0], keyValue[1]
if appLabels[key] == value {
return false
}
} else if strings.Contains(pair, "=") {
keyValue := strings.Split(pair, "=")
if len(keyValue) != 2 {
// Malformed label string
continue
}
key, value := keyValue[0], keyValue[1]
if appLabels[key] != value {
return false
}
} else if strings.Contains(pair, "notin") {
parts := strings.Split(pair, "notin")
if len(parts) != 2 {
continue // or handle error
}

key := strings.TrimSpace(parts[0])
valueStr := strings.TrimSpace(parts[1])

// Trim brackets and split by comma
values := strings.Split(strings.Trim(valueStr, "()"), ",")

for _, v := range values {
if appLabels[key] == strings.TrimSpace(v) {
return false
}
}
} else if strings.HasPrefix(pair, "!") {
key := strings.TrimPrefix(pair, "!")
if _, exists := appLabels[key]; exists {
return false
}
} else {
// Existence checks
if _, exists := appLabels[pair]; !exists {
return false
}
}
}
return true
}

// HasDifferences checks if the given application has differences between the desired and live state.
func (a API) HasDifferences(appName string) (bool, error) {
defaultRefreshType := "normal" // or "hard" based on your preference
Expand Down
Loading