Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/mod v0.29.0
golang.org/x/sync v0.17.0
google.golang.org/protobuf v1.36.9
)

Expand Down Expand Up @@ -336,7 +337,6 @@ require (
golang.org/x/arch v0.20.0 // indirect
golang.org/x/exp v0.0.0-20250808145144-a408d31f581a // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.37.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
Expand Down
17 changes: 12 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,12 @@ func registerClientFlags(fs *flag.FlagSet) {
DefMaxFileSize,
"Max file size in bytes.",
)

fs.Int(
ClientGRPCMaxParallelFileOperationsKey,
DefMaxParallelFileOperations,
"Max number of files downloaded or uploaded in parallel",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Max number of files downloaded or uploaded in parallel",
"Maximum number of file downloads or uploads performed in parallel",

)
}

func registerCommandFlags(fs *flag.FlagSet) {
Expand Down Expand Up @@ -1087,11 +1093,12 @@ func resolveClient() *Client {
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
},
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
},
Backoff: &BackOff{
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),
Expand Down
11 changes: 6 additions & 5 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,11 +1175,12 @@ func createConfig() *Config {
Time: 10 * time.Second,
PermitWithoutStream: false,
},
MaxMessageSize: 1048575,
MaxMessageReceiveSize: 1048575,
MaxMessageSendSize: 1048575,
MaxFileSize: 485753,
FileChunkSize: 48575,
MaxMessageSize: 1048575,
MaxMessageReceiveSize: 1048575,
MaxMessageSendSize: 1048575,
MaxFileSize: 485753,
FileChunkSize: 48575,
MaxParallelFileOperations: 10,
},
Backoff: &BackOff{
InitialInterval: 200 * time.Millisecond,
Expand Down
11 changes: 6 additions & 5 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ const (
DefAuxiliaryCommandTLServerNameKey = ""

// Client GRPC Settings
DefMaxMessageSize = 0 // 0 = unset
DefMaxMessageRecieveSize = 4194304 // default 4 MB
DefMaxMessageSendSize = 4194304 // default 4 MB
DefMaxFileSize uint32 = 1048576 // 1MB
DefFileChunkSize uint32 = 524288 // 0.5MB
DefMaxMessageSize = 0 // 0 = unset
DefMaxMessageRecieveSize = 4194304 // default 4 MB
DefMaxMessageSendSize = 4194304 // default 4 MB
DefMaxFileSize uint32 = 1048576 // 1MB
DefFileChunkSize uint32 = 524288 // 0.5MB
DefMaxParallelFileOperations = 5

// Client HTTP Settings
DefHTTPTimeout = 10 * time.Second
Expand Down
13 changes: 7 additions & 6 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ var (
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"

ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"

ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"
Expand Down
1 change: 1 addition & 0 deletions internal/config/testdata/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ client:
max_message_send_size: 1048575
max_file_size: 485753
file_chunk_size: 48575
max_parallel_file_operations: 10
backoff:
initial_interval: 200ms
max_interval: 10s
Expand Down
12 changes: 7 additions & 5 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ type (
Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"`
}

//nolint:lll // max line limit exceeded
GRPC struct {
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
// if MaxMessageSize is size set then we use that value,
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
}

KeepAlive struct {
Expand Down
70 changes: 45 additions & 25 deletions internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"sync"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/nginx/agent/v3/internal/model"
Expand Down Expand Up @@ -288,28 +289,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
}

func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error {
var updatingFilesError error
uploadFiles := configUploadRequest.GetOverview().GetFiles()
if len(uploadFiles) == 0 {
return nil
}

for _, file := range configUploadRequest.GetOverview().GetFiles() {
err := fms.fileServiceOperator.UpdateFile(
ctx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
errGroup, errGroupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)

for _, file := range uploadFiles {
errGroup.Go(func() error {
err := fms.fileServiceOperator.UpdateFile(
errGroupCtx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
errGroupCtx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
)
}

updatingFilesError = errors.Join(updatingFilesError, err)
}
return err
})
}

return updatingFilesError
return errGroup.Wait()
}

// DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files
Expand Down Expand Up @@ -570,25 +579,36 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr
}

func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) {
var downloadFiles []*model.FileCache
for _, fileAction := range fms.fileActions {
if fileAction.Action == model.Add || fileAction.Action == model.Update {
downloadFiles = append(downloadFiles, fileAction)
}
}

if len(downloadFiles) == 0 {
slog.DebugContext(ctx, "No updated files to download")
return nil
}

errGroup, errGroupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)

for _, fileAction := range downloadFiles {
errGroup.Go(func() error {
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())

slog.DebugContext(
ctx,
errGroupCtx,
"Downloading file to temp location",
"file", tempFilePath,
)

updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
if updateErr != nil {
updateError = updateErr
break
}
}
return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath)
})
}

return updateError
return errGroup.Wait()
}

func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error {
Expand Down
12 changes: 7 additions & 5 deletions test/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const (
commonRandomizationFactor = 0.1
commonMultiplier = 0.2

reloadMonitoringPeriod = 400 * time.Millisecond
maxParallelFileOperations = 5
reloadMonitoringPeriod = 400 * time.Millisecond
)

// Produces a populated Agent Config for testing usage.
Expand All @@ -49,10 +50,11 @@ func AgentConfig() *config.Config {
Time: clientTime,
PermitWithoutStream: clientPermitWithoutStream,
},
MaxMessageReceiveSize: 1,
MaxMessageSendSize: 1,
MaxFileSize: 1,
FileChunkSize: 1,
MaxMessageReceiveSize: 1,
MaxMessageSendSize: 1,
MaxFileSize: 1,
FileChunkSize: 1,
MaxParallelFileOperations: maxParallelFileOperations,
},
Backoff: &config.BackOff{
InitialInterval: commonInitialInterval,
Expand Down
Loading