diff --git a/go.mod b/go.mod index 234e2914e..029f57d09 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/mod v0.29.0 + golang.org/x/sync v0.17.0 google.golang.org/protobuf v1.36.9 ) @@ -336,7 +337,6 @@ require ( golang.org/x/arch v0.20.0 // indirect golang.org/x/exp v0.0.0-20250808145144-a408d31f581a // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sync v0.17.0 // indirect golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.37.0 // indirect gonum.org/v1/gonum v0.16.0 // indirect diff --git a/internal/config/config.go b/internal/config/config.go index 3bb6a57cd..bb7c8cf9d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -621,6 +621,12 @@ func registerClientFlags(fs *flag.FlagSet) { DefMaxFileSize, "Max file size in bytes.", ) + + fs.Int( + ClientGRPCMaxParallelFileOperationsKey, + DefMaxParallelFileOperations, + "Max number of files downloaded or uploaded in parallel", + ) } func registerCommandFlags(fs *flag.FlagSet) { @@ -1087,11 +1093,12 @@ func resolveClient() *Client { Time: viperInstance.GetDuration(ClientKeepAliveTimeKey), PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey), }, - MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey), - MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey), - MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey), - MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey), - FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey), + MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey), + MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey), + MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey), + MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey), + FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey), + MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey), }, Backoff: &BackOff{ InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey), diff --git a/internal/config/config_test.go b/internal/config/config_test.go index e02c87924..d1ce4d019 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -1175,11 +1175,12 @@ func createConfig() *Config { Time: 10 * time.Second, PermitWithoutStream: false, }, - MaxMessageSize: 1048575, - MaxMessageReceiveSize: 1048575, - MaxMessageSendSize: 1048575, - MaxFileSize: 485753, - FileChunkSize: 48575, + MaxMessageSize: 1048575, + MaxMessageReceiveSize: 1048575, + MaxMessageSendSize: 1048575, + MaxFileSize: 485753, + FileChunkSize: 48575, + MaxParallelFileOperations: 10, }, Backoff: &BackOff{ InitialInterval: 200 * time.Millisecond, diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 615c7bc8b..aa5ab8dc4 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -58,11 +58,12 @@ const ( DefAuxiliaryCommandTLServerNameKey = "" // Client GRPC Settings - DefMaxMessageSize = 0 // 0 = unset - DefMaxMessageRecieveSize = 4194304 // default 4 MB - DefMaxMessageSendSize = 4194304 // default 4 MB - DefMaxFileSize uint32 = 1048576 // 1MB - DefFileChunkSize uint32 = 524288 // 0.5MB + DefMaxMessageSize = 0 // 0 = unset + DefMaxMessageRecieveSize = 4194304 // default 4 MB + DefMaxMessageSendSize = 4194304 // default 4 MB + DefMaxFileSize uint32 = 1048576 // 1MB + DefFileChunkSize uint32 = 524288 // 0.5MB + DefMaxParallelFileOperations = 5 // Client HTTP Settings DefHTTPTimeout = 10 * time.Second diff --git a/internal/config/flags.go b/internal/config/flags.go index 697c2906e..a2af4791c 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -34,12 +34,13 @@ var ( ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time" ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout" - ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout" - ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size" - ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size" - ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size" - ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size" - ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size" + ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout" + ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size" + ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size" + ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size" + ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size" + ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size" + ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations" ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval" ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 78f30a28d..21be3e558 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -50,6 +50,7 @@ client: max_message_send_size: 1048575 max_file_size: 485753 file_chunk_size: 48575 + max_parallel_file_operations: 10 backoff: initial_interval: 200ms max_interval: 10s diff --git a/internal/config/types.go b/internal/config/types.go index fe3bc1773..e9168f33d 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -87,15 +87,17 @@ type ( Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"` } + //nolint:lll // max line limit exceeded GRPC struct { KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"` // if MaxMessageSize is size set then we use that value, // otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings - MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"` - MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"` - MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"` - MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"` - FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"` + MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"` + MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"` + MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"` + MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"` + FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"` + MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"` } KeepAlive struct { diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 6c7ed4afb..af0e67a91 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -17,6 +17,7 @@ import ( "strconv" "sync" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/nginx/agent/v3/internal/model" @@ -288,28 +289,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context, } func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error { - var updatingFilesError error + uploadFiles := configUploadRequest.GetOverview().GetFiles() + if len(uploadFiles) == 0 { + return nil + } - for _, file := range configUploadRequest.GetOverview().GetFiles() { - err := fms.fileServiceOperator.UpdateFile( - ctx, - configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - file, - ) - if err != nil { - slog.ErrorContext( - ctx, - "Failed to update file", - "instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - "file_name", file.GetFileMeta().GetName(), - "error", err, + errGroup, errGroupCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations) + + for _, file := range uploadFiles { + errGroup.Go(func() error { + err := fms.fileServiceOperator.UpdateFile( + errGroupCtx, + configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + file, ) + if err != nil { + slog.ErrorContext( + errGroupCtx, + "Failed to update file", + "instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + "file_name", file.GetFileMeta().GetName(), + "error", err, + ) + } - updatingFilesError = errors.Join(updatingFilesError, err) - } + return err + }) } - return updatingFilesError + return errGroup.Wait() } // DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files @@ -570,25 +579,36 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr } func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) { + var downloadFiles []*model.FileCache for _, fileAction := range fms.fileActions { if fileAction.Action == model.Add || fileAction.Action == model.Update { + downloadFiles = append(downloadFiles, fileAction) + } + } + + if len(downloadFiles) == 0 { + slog.DebugContext(ctx, "No updated files to download") + return nil + } + + errGroup, errGroupCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations) + + for _, fileAction := range downloadFiles { + errGroup.Go(func() error { tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName()) slog.DebugContext( - ctx, + errGroupCtx, "Downloading file to temp location", "file", tempFilePath, ) - updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath) - if updateErr != nil { - updateError = updateErr - break - } - } + return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath) + }) } - return updateError + return errGroup.Wait() } func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error { diff --git a/test/types/config.go b/test/types/config.go index f544b8926..378ddaf85 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -29,7 +29,8 @@ const ( commonRandomizationFactor = 0.1 commonMultiplier = 0.2 - reloadMonitoringPeriod = 400 * time.Millisecond + maxParallelFileOperations = 5 + reloadMonitoringPeriod = 400 * time.Millisecond ) // Produces a populated Agent Config for testing usage. @@ -49,10 +50,11 @@ func AgentConfig() *config.Config { Time: clientTime, PermitWithoutStream: clientPermitWithoutStream, }, - MaxMessageReceiveSize: 1, - MaxMessageSendSize: 1, - MaxFileSize: 1, - FileChunkSize: 1, + MaxMessageReceiveSize: 1, + MaxMessageSendSize: 1, + MaxFileSize: 1, + FileChunkSize: 1, + MaxParallelFileOperations: maxParallelFileOperations, }, Backoff: &config.BackOff{ InitialInterval: commonInitialInterval,