diff --git a/internal/collector/containermetricsreceiver/internal/utils.go b/internal/collector/containermetricsreceiver/internal/utils.go index 46dd9adc1..26ab7b977 100644 --- a/internal/collector/containermetricsreceiver/internal/utils.go +++ b/internal/collector/containermetricsreceiver/internal/utils.go @@ -22,12 +22,20 @@ func ReadLines(filename string) ([]string, error) { } // nolint: revive -func ReadLinesOffsetN(filename string, offset uint, n int) ([]string, error) { +func ReadLinesOffsetN(filename string, offset uint, n int) (lines []string, err error) { f, err := os.Open(filename) + defer func() { + closeErr := f.Close() + if closeErr != nil { + if err == nil { + err = closeErr + } + } + }() + if err != nil { return []string{}, err } - defer f.Close() var ret []string diff --git a/internal/config/types.go b/internal/config/types.go index 8017987f1..00a468903 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -412,6 +412,10 @@ func (c *Config) AreReceiversConfigured() bool { } func isAllowedDir(dir string, allowedDirs []string) bool { + if !strings.HasSuffix(dir, "/") && filepath.Ext(dir) == "" { + dir += "/" + } + for _, allowedDirectory := range allowedDirs { if strings.HasPrefix(dir, allowedDirectory) { return true diff --git a/internal/config/types_test.go b/internal/config/types_test.go index 46e2c7f61..f857a76d5 100644 --- a/internal/config/types_test.go +++ b/internal/config/types_test.go @@ -25,10 +25,10 @@ func TestTypes_IsDirectoryAllowed(t *testing.T) { allowed: true, allowedDirs: []string{ AgentDirName, - "/etc/nginx", + "/etc/nginx/", "/var/log/nginx/", }, - fileDir: "/etc/nginx/nginx.conf", + fileDir: "/etc/nginx", }, { name: "Test 2: directory not allowed", diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index 6ced26361..949deb781 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -100,7 +100,7 @@ func (ncp *NginxConfigParser) Parse(ctx context.Context, instance *mpi.Instance) return ncp.createNginxConfigContext(ctx, instance, payload) } -// nolint: cyclop,revive,gocognit +// nolint: cyclop,revive,gocognit,gocyclo func (ncp *NginxConfigParser) createNginxConfigContext( ctx context.Context, instance *mpi.Instance, @@ -138,6 +138,10 @@ func (ncp *NginxConfigParser) createNginxConfigContext( err := ncp.crossplaneConfigTraverse(ctx, &conf, func(ctx context.Context, parent, directive *crossplane.Directive) error { switch directive.Directive { + case "include": + include := ncp.parseIncludeDirective(directive) + + nginxConfigContext.Includes = append(nginxConfigContext.Includes, include) case "log_format": formatMap = ncp.formatMap(directive) case "access_log": @@ -214,6 +218,17 @@ func (ncp *NginxConfigParser) createNginxConfigContext( return nginxConfigContext, nil } +func (ncp *NginxConfigParser) parseIncludeDirective(directive *crossplane.Directive) string { + var include string + if filepath.IsAbs(directive.Args[0]) { + include = directive.Args[0] + } else { + include = filepath.Join(filepath.Dir(directive.File), directive.Args[0]) + } + + return include +} + func (ncp *NginxConfigParser) addAccessLog(accessLog *model.AccessLog, accessLogs []*model.AccessLog, ) []*model.AccessLog { diff --git a/internal/datasource/host/info.go b/internal/datasource/host/info.go index 60e72d040..bb47c6933 100644 --- a/internal/datasource/host/info.go +++ b/internal/datasource/host/info.go @@ -205,9 +205,6 @@ func (i *Info) containerID() string { // mountInfo is the path: "/proc/self/mountinfo" func containerIDFromMountInfo(mountInfo string) (string, error) { mInfoFile, err := os.Open(mountInfo) - if err != nil { - return "", fmt.Errorf("could not read %s: %w", mountInfo, err) - } defer func(f *os.File, fileName string) { closeErr := f.Close() if closeErr != nil { @@ -215,6 +212,10 @@ func containerIDFromMountInfo(mountInfo string) (string, error) { } }(mInfoFile, mountInfo) + if err != nil { + return "", fmt.Errorf("could not read %s: %w", mountInfo, err) + } + fileScanner := bufio.NewScanner(mInfoFile) fileScanner.Split(bufio.ScanLines) @@ -308,15 +309,15 @@ func (i *Info) releaseInfo(ctx context.Context, osReleaseLocation string) (relea func readOsRelease(path string) (map[string]string, error) { f, err := os.Open(path) - if err != nil { - return nil, fmt.Errorf("release file %s is unreadable: %w", path, err) - } defer func(f *os.File, fileName string) { closeErr := f.Close() if closeErr != nil { slog.Error("Unable to close file", "file", fileName, "error", closeErr) } }(f, path) + if err != nil { + return nil, fmt.Errorf("release file %s is unreadable: %w", path, err) + } info, err := parseOsReleaseFile(f) if err != nil { diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index acbab1f29..5a9ab325d 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -854,15 +854,16 @@ func (fms *FileManagerService) writeManifestFile(updatedFiles map[string]*model. // 0600 ensures only root can read/write newFile, err := os.OpenFile(fms.manifestFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, filePerm) - if err != nil { - return fmt.Errorf("failed to read manifest file: %w", err) - } defer func() { if closeErr := newFile.Close(); closeErr != nil { writeError = closeErr } }() + if err != nil { + return fmt.Errorf("failed to read manifest file: %w", err) + } + _, err = newFile.Write(manifestJSON) if err != nil { return fmt.Errorf("failed to write manifest file: %w", err) diff --git a/internal/model/config.go b/internal/model/config.go index a2db0e578..ed1d92487 100644 --- a/internal/model/config.go +++ b/internal/model/config.go @@ -19,6 +19,7 @@ type NginxConfigContext struct { AccessLogs []*AccessLog ErrorLogs []*ErrorLog NAPSysLogServers []string + Includes []string } type APIDetails struct { diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 0ca19f046..9b95fe7c7 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -8,11 +8,11 @@ package file import ( "context" "errors" - "io/fs" "log/slog" "os" "path/filepath" "regexp" + "slices" "strings" "sync" "sync/atomic" @@ -21,14 +21,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/internal/logger" -) - -const ( - Create = fsnotify.Create - Write = fsnotify.Write - Remove = fsnotify.Remove - Rename = fsnotify.Rename - Chmod = fsnotify.Chmod + "github.com/nginx/agent/v3/internal/model" ) var emptyEvent = fsnotify.Event{ @@ -41,11 +34,12 @@ type FileUpdateMessage struct { } type FileWatcherService struct { - agentConfig *config.Config - watcher *fsnotify.Watcher - directoriesBeingWatched *sync.Map - filesChanged *atomic.Bool - enabled *atomic.Bool + agentConfig *config.Config + watcher *fsnotify.Watcher + filesChanged *atomic.Bool + enabled *atomic.Bool + directoriesToWatch map[string]struct{} + mu sync.Mutex } func NewFileWatcherService(agentConfig *config.Config) *FileWatcherService { @@ -56,10 +50,10 @@ func NewFileWatcherService(agentConfig *config.Config) *FileWatcherService { filesChanged.Store(false) return &FileWatcherService{ - agentConfig: agentConfig, - directoriesBeingWatched: &sync.Map{}, - enabled: enabled, - filesChanged: filesChanged, + agentConfig: agentConfig, + directoriesToWatch: make(map[string]struct{}), + enabled: enabled, + filesChanged: filesChanged, } } @@ -73,12 +67,11 @@ func (fws *FileWatcherService) Watch(ctx context.Context, ch chan<- FileUpdateMe watcher, err := fsnotify.NewWatcher() if err != nil { slog.ErrorContext(ctx, "Failed to create file watcher", "error", err) - return } + fws.mu.Lock() fws.watcher = watcher - - fws.watchDirectories(ctx) + fws.mu.Unlock() for { select { @@ -103,78 +96,70 @@ func (fws *FileWatcherService) SetEnabled(enabled bool) { fws.enabled.Store(enabled) } -func (fws *FileWatcherService) watchDirectories(ctx context.Context) { - for _, dir := range fws.agentConfig.AllowedDirectories { - if _, err := os.Stat(dir); errors.Is(err, os.ErrNotExist) { - slog.DebugContext(ctx, "Unable to watch directory that does not exist", "directory", dir, "error", err) - continue - } +func (fws *FileWatcherService) Update(ctx context.Context, nginxConfigContext *model.NginxConfigContext) { + slog.DebugContext(ctx, "Updating file watcher", "nginx_config_context", nginxConfigContext) - slog.DebugContext(ctx, "Creating file watchers", "directory", dir) + fws.mu.Lock() + defer fws.mu.Unlock() - err := fws.walkDir(ctx, dir) - if err != nil { - slog.ErrorContext(ctx, "Failed to create file watchers", "directory", dir, "error", err) - } - } -} - -func (fws *FileWatcherService) walkDir(ctx context.Context, dir string) error { - return filepath.WalkDir(dir, func(path string, d fs.DirEntry, fileWalkErr error) error { - if fileWalkErr != nil { - return fileWalkErr - } + directoriesToWatch := make(map[string]struct{}) - info, infoErr := d.Info() - if infoErr != nil { - slog.ErrorContext(ctx, "Error getting info for file", "error", infoErr) - return infoErr - } + for _, file := range nginxConfigContext.Files { + directoriesToWatch[filepath.Dir(file.GetFileMeta().GetName())] = struct{}{} + } - if d.IsDir() { - fws.addWatcher(ctx, path, info) - } + for _, file := range nginxConfigContext.Includes { + directoriesToWatch[filepath.Dir(file)] = struct{}{} + } - return nil - }) -} + fws.directoriesToWatch = directoriesToWatch -func (fws *FileWatcherService) addWatcher(ctx context.Context, path string, info os.FileInfo) { - if info.IsDir() && !fws.isWatching(path) { - if err := fws.watcher.Add(path); err != nil { - slog.ErrorContext(ctx, "Failed to add file watcher", "directory_path", path, "error", err) - removeError := fws.watcher.Remove(path) - if removeError != nil { - slog.ErrorContext(ctx, "Failed to remove file watcher", "directory_path", path, "error", removeError) - } + if fws.watcher != nil { + slog.InfoContext(ctx, "Updating file watcher", "allowed", fws.agentConfig.AllowedDirectories) - return - } + // Start watching new directories + fws.addWatchers(ctx) - fws.directoriesBeingWatched.Store(path, true) + // Check if directories no longer need to be watched + fws.removeWatchers(ctx) } } -func (fws *FileWatcherService) removeWatcher(ctx context.Context, path string) { - if _, ok := fws.directoriesBeingWatched.Load(path); ok { - err := fws.watcher.Remove(path) - if err != nil { - slog.ErrorContext(ctx, "Failed to remove file watcher", "directory_path", path, "error", err) - return +func (fws *FileWatcherService) addWatchers(ctx context.Context) { + for directory := range fws.directoriesToWatch { + if !fws.agentConfig.IsDirectoryAllowed(directory) { + slog.WarnContext( + ctx, + "Unable to watch file in a directory that is not in allowed directory list", + "directory", directory, + ) + + continue } - fws.directoriesBeingWatched.Delete(path) + if !slices.Contains(fws.watcher.WatchList(), directory) { + fws.addWatcher(ctx, directory) + fws.filesChanged.Store(true) + } } } -func (fws *FileWatcherService) isWatching(name string) bool { - v, _ := fws.directoriesBeingWatched.LoadOrStore(name, false) - - if value, ok := v.(bool); ok { - return value +func (fws *FileWatcherService) removeWatchers(ctx context.Context) { + for _, directoryBeingWatched := range fws.watcher.WatchList() { + if _, err := os.Stat(directoryBeingWatched); errors.Is(err, os.ErrNotExist) { + slog.DebugContext( + ctx, + "Directory does not exist removing watcher", + "directory", directoryBeingWatched, + ) + + fws.removeWatcher(ctx, directoryBeingWatched) + fws.filesChanged.Store(true) + } else if _, ok := fws.directoriesToWatch[directoryBeingWatched]; !ok { + fws.removeWatcher(ctx, directoryBeingWatched) + fws.filesChanged.Store(true) + } } - - return false } func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) { @@ -183,28 +168,33 @@ func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.E return } - switch { - case event.Op&Write == Write: - // We want to send messages on write since that means the contents changed, - // but we already have a watcher on the file so nothing special needs to happen here - case event.Op&Create == Create: - info, err := os.Stat(event.Name) - if err != nil { - slog.DebugContext(ctx, "Unable to add watcher", "path", event.Name, "error", err) - return - } - fws.addWatcher(ctx, event.Name, info) - case event.Op&Remove == Remove, event.Op&Rename == Rename: - fws.removeWatcher(ctx, event.Name) - } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) - fws.filesChanged.Store(true) } } func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- FileUpdateMessage) { + slog.DebugContext(ctx, "Checking for file watcher updates") + + fws.mu.Lock() + defer fws.mu.Unlock() + + if fws.watcher == nil { + watcher, err := fsnotify.NewWatcher() + if err != nil { + slog.ErrorContext(ctx, "Failed to create file watcher", "error", err) + return + } + + fws.watcher = watcher + } + + // Start watching new directories + fws.addWatchers(ctx) + + // Check if directories no longer need to be watched + fws.removeWatchers(ctx) + if fws.filesChanged.Load() { newCtx := context.WithValue( ctx, @@ -218,6 +208,32 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi } } +func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) { + slog.DebugContext(ctx, "Checking if file watcher needs to be added", "directory", directory) + + if _, err := os.Stat(directory); errors.Is(err, os.ErrNotExist) { + slog.DebugContext( + ctx, "Unable to watch directory that does not exist", + "directory", directory, "error", err, + ) + } + + slog.DebugContext(ctx, "Adding watcher", "directory", directory) + + if err := fws.watcher.Add(directory); err != nil { + slog.WarnContext(ctx, "Failed to add file watcher", "directory", directory, "error", err) + } +} + +func (fws *FileWatcherService) removeWatcher(ctx context.Context, path string) { + slog.DebugContext(ctx, "Removing watcher", "directory", path) + err := fws.watcher.Remove(path) + if err != nil { + slog.WarnContext(ctx, "Failed to remove file watcher", "directory_path", path, "error", err) + return + } +} + func (fws *FileWatcherService) isEventSkippable(event fsnotify.Event) bool { return event == emptyEvent || event.Name == "" || isExcludedFile(event.Name, fws.agentConfig.Watchers.FileWatcher.ExcludeFiles) diff --git a/internal/watcher/file/file_watcher_service_test.go b/internal/watcher/file/file_watcher_service_test.go index d3040fc4f..3177c7226 100644 --- a/internal/watcher/file/file_watcher_service_test.go +++ b/internal/watcher/file/file_watcher_service_test.go @@ -10,13 +10,17 @@ import ( "context" "os" "path" + "path/filepath" "testing" "time" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/test/helpers" "github.com/nginx/agent/v3/test/stub" "github.com/fsnotify/fsnotify" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/test/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,7 +33,7 @@ const ( func TestFileWatcherService_NewFileWatcherService(t *testing.T) { fileWatcherService := NewFileWatcherService(types.AgentConfig()) - assert.Empty(t, fileWatcherService.directoriesBeingWatched) + assert.Empty(t, fileWatcherService.directoriesToWatch) assert.True(t, fileWatcherService.enabled.Load()) assert.False(t, fileWatcherService.filesChanged.Load()) } @@ -52,22 +56,17 @@ func TestFileWatcherService_addWatcher(t *testing.T) { require.NoError(t, err) fileWatcherService.watcher = watcher - tempDir := os.TempDir() + tempDir := t.TempDir() testDirectory := path.Join(tempDir, "test_dir") err = os.Mkdir(testDirectory, directoryPermissions) require.NoError(t, err) defer os.Remove(testDirectory) - info, err := os.Stat(testDirectory) - require.NoError(t, err) - - fileWatcherService.addWatcher(ctx, testDirectory, info) + fileWatcherService.addWatcher(ctx, testDirectory) - value, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory) - assert.True(t, ok) - boolValue, ok := value.(bool) - assert.True(t, ok) - assert.True(t, boolValue) + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + assert.Len(t, directoriesBeingWatched, 1) + assert.Equal(t, testDirectory, directoriesBeingWatched[0]) } func TestFileWatcherService_addWatcher_Error(t *testing.T) { @@ -77,25 +76,13 @@ func TestFileWatcherService_addWatcher_Error(t *testing.T) { require.NoError(t, err) fileWatcherService.watcher = watcher - tempDir := os.TempDir() + tempDir := t.TempDir() testDirectory := path.Join(tempDir, "test_dir") - err = os.Mkdir(testDirectory, directoryPermissions) - require.NoError(t, err) - info, err := os.Stat(testDirectory) - require.NoError(t, err) - - // Delete directory to cause the addWatcher function to fail - err = os.Remove(testDirectory) - require.NoError(t, err) - fileWatcherService.addWatcher(ctx, testDirectory, info) + fileWatcherService.addWatcher(ctx, testDirectory) - value, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory) - assert.True(t, ok) - boolValue, ok := value.(bool) - assert.True(t, ok) - assert.False(t, boolValue) - assert.True(t, ok) + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + assert.Empty(t, directoriesBeingWatched) } func TestFileWatcherService_removeWatcher(t *testing.T) { @@ -105,7 +92,7 @@ func TestFileWatcherService_removeWatcher(t *testing.T) { require.NoError(t, err) fileWatcherService.watcher = watcher - tempDir := os.TempDir() + tempDir := t.TempDir() testDirectory := path.Join(tempDir, "test_dir") err = os.Mkdir(testDirectory, directoryPermissions) require.NoError(t, err) @@ -113,23 +100,16 @@ func TestFileWatcherService_removeWatcher(t *testing.T) { err = fileWatcherService.watcher.Add(testDirectory) require.NoError(t, err) - fileWatcherService.directoriesBeingWatched.Store(testDirectory, true) fileWatcherService.removeWatcher(ctx, testDirectory) - value, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory) - assert.Nil(t, value) - assert.False(t, ok) - logBuf := &bytes.Buffer{} + defer logBuf.Reset() stub.StubLoggerWith(logBuf) - fileWatcherService.directoriesBeingWatched.Store(testDirectory, true) fileWatcherService.removeWatcher(ctx, testDirectory) helpers.ValidateLog(t, "Failed to remove file watcher", logBuf) - - logBuf.Reset() } func TestFileWatcherService_isEventSkippable(t *testing.T) { @@ -156,16 +136,89 @@ func TestFileWatcherService_isExcludedFile(t *testing.T) { assert.False(t, isExcludedFile("/var/log/accesslog", excludeFiles)) } +func TestFileWatcherService_Update(t *testing.T) { + ctx := context.Background() + + tempDir := t.TempDir() + testDirectory := path.Join(tempDir, "test_dir") + err := os.Mkdir(testDirectory, directoryPermissions) + require.NoError(t, err) + defer os.RemoveAll(testDirectory) + + agentConfig := types.AgentConfig() + agentConfig.Watchers.FileWatcher.MonitoringFrequency = 100 * time.Millisecond + agentConfig.AllowedDirectories = []string{testDirectory, "/unknown/directory"} + + fileWatcherService := NewFileWatcherService(agentConfig) + + t.Run("Test 1: watcher not initialized yet", func(t *testing.T) { + fileWatcherService.Update(ctx, &model.NginxConfigContext{ + Includes: []string{filepath.Join(testDirectory, "*.conf")}, + }) + + _, ok := fileWatcherService.directoriesToWatch[testDirectory] + assert.True(t, ok) + + assert.Nil(t, fileWatcherService.watcher) + }) + + t.Run("Test 2: watcher initialized", func(t *testing.T) { + watcher, newWatcherError := fsnotify.NewWatcher() + require.NoError(t, newWatcherError) + + fileWatcherService.watcher = watcher + + fileWatcherService.Update(ctx, &model.NginxConfigContext{ + Includes: []string{filepath.Join(testDirectory, "*.conf")}, + }) + + _, ok := fileWatcherService.directoriesToWatch[testDirectory] + assert.True(t, ok) + + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + assert.Len(t, directoriesBeingWatched, 1) + assert.Equal(t, testDirectory, directoriesBeingWatched[0]) + }) + + t.Run("Test 3: remove watchers", func(t *testing.T) { + fileWatcherService.Update(ctx, &model.NginxConfigContext{ + Includes: []string{}, + }) + + assert.Empty(t, fileWatcherService.directoriesToWatch) + + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + assert.Empty(t, directoriesBeingWatched) + }) + + t.Run("Test 4: not allowed directory", func(t *testing.T) { + fileWatcherService.Update(ctx, &model.NginxConfigContext{ + Files: []*mpi.File{ + { + FileMeta: &mpi.FileMeta{ + Name: "/unknown/location/test.conf", + }, + }, + }, + }) + + _, ok := fileWatcherService.directoriesToWatch["/unknown/location/test.conf"] + assert.False(t, ok) + + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + assert.Empty(t, directoriesBeingWatched) + }) +} + func TestFileWatcherService_Watch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tempDir := os.TempDir() + tempDir := t.TempDir() testDirectory := path.Join(tempDir, "test_dir") - os.RemoveAll(testDirectory) err := os.Mkdir(testDirectory, directoryPermissions) require.NoError(t, err) - defer os.RemoveAll(testDirectory) + defer os.Remove(testDirectory) agentConfig := types.AgentConfig() agentConfig.Watchers.FileWatcher.MonitoringFrequency = 100 * time.Millisecond @@ -178,13 +231,32 @@ func TestFileWatcherService_Watch(t *testing.T) { time.Sleep(100 * time.Millisecond) + fileWatcherService.Update(ctx, &model.NginxConfigContext{ + Includes: []string{filepath.Join(testDirectory, "*.conf")}, + }) + file, err := os.CreateTemp(testDirectory, "test.conf") require.NoError(t, err) defer os.Remove(file.Name()) t.Run("Test 1: File updated", func(t *testing.T) { - fileUpdate := <-channel - assert.NotNil(t, fileUpdate.CorrelationID) + // Check that directory is being watched + assert.Eventually(t, func() bool { + _, ok := fileWatcherService.directoriesToWatch[testDirectory] + return ok + }, 1*time.Second, 100*time.Millisecond) + + assert.Eventually(t, func() bool { + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + return len(directoriesBeingWatched) == 1 + }, 1*time.Second, 100*time.Millisecond) + + select { + case fileUpdate := <-channel: + assert.NotNil(t, fileUpdate.CorrelationID) + case <-time.After(150 * time.Millisecond): + t.Fatalf("Expected file update event") + } }) t.Run("Test 2: Skippable file updated", func(t *testing.T) { @@ -199,4 +271,52 @@ func TestFileWatcherService_Watch(t *testing.T) { return } }) + + t.Run("Test 3: Directory deleted", func(t *testing.T) { + dirDeleteError := os.RemoveAll(testDirectory) + require.NoError(t, dirDeleteError) + + // Check that directory is no longer being watched + assert.Eventually(t, func() bool { + _, ok := fileWatcherService.directoriesToWatch[testDirectory] + return ok + }, 1*time.Second, 100*time.Millisecond) + + assert.Eventually(t, func() bool { + directoriesBeingWatched := fileWatcherService.watcher.WatchList() + return len(directoriesBeingWatched) == 0 + }, 1*time.Second, 100*time.Millisecond) + }) +} + +func TestFileWatcherService_checkForUpdates(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempDir := t.TempDir() + testDirectory := path.Join(tempDir, "test_dir") + err := os.Mkdir(testDirectory, directoryPermissions) + require.NoError(t, err) + defer os.RemoveAll(testDirectory) + + agentConfig := types.AgentConfig() + agentConfig.Watchers.FileWatcher.MonitoringFrequency = 100 * time.Millisecond + agentConfig.AllowedDirectories = []string{testDirectory, "/unknown/directory"} + + channel := make(chan FileUpdateMessage) + + fileWatcherService := NewFileWatcherService(agentConfig) + fileWatcherService.filesChanged.Store(true) + assert.Nil(t, fileWatcherService.watcher) + + go fileWatcherService.checkForUpdates(ctx, channel) + + select { + case fileUpdate := <-channel: + assert.NotNil(t, fileUpdate.CorrelationID) + assert.NotNil(t, fileWatcherService.watcher) + assert.False(t, fileWatcherService.filesChanged.Load()) + case <-time.After(150 * time.Millisecond): + t.Fatalf("Expected file update event") + } } diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index a516c0b69..489373c46 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -282,6 +282,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { case message := <-w.nginxConfigContextChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) w.watcherMutex.Lock() + if !slices.Contains(w.instancesWithConfigApplyInProgress, message.NginxConfigContext.InstanceID) { slog.DebugContext( newCtx, @@ -300,6 +301,9 @@ func (w *Watcher) monitorWatchers(ctx context.Context) { "nginx_config_context", message.NginxConfigContext, ) } + + w.fileWatcherService.Update(ctx, message.NginxConfigContext) + w.watcherMutex.Unlock() case message := <-w.instanceHealthChannel: newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID) diff --git a/test/config/nginx/empty-nginx.conf b/test/config/nginx/empty-nginx.conf new file mode 100644 index 000000000..8f5d2257c --- /dev/null +++ b/test/config/nginx/empty-nginx.conf @@ -0,0 +1 @@ +# Empty config diff --git a/test/config/nginx/nginx-with-server-block-access-log.conf b/test/config/nginx/nginx-with-server-block-access-log.conf index 33d849f3a..a6a3a9bb0 100644 --- a/test/config/nginx/nginx-with-server-block-access-log.conf +++ b/test/config/nginx/nginx-with-server-block-access-log.conf @@ -51,4 +51,6 @@ http { root /usr/share/nginx/html; } } + + include /etc/nginx/test/*; } diff --git a/test/integration/managementplane/file_watcher_test.go b/test/integration/managementplane/file_watcher_test.go index 40e823997..e7d13004e 100644 --- a/test/integration/managementplane/file_watcher_test.go +++ b/test/integration/managementplane/file_watcher_test.go @@ -25,19 +25,51 @@ func TestGrpc_FileWatcher(t *testing.T) { utils.VerifyConnection(t, 2) assert.False(t, t.Failed()) - err := utils.Container.CopyFileToContainer( - ctx, - "../../config/nginx/nginx-with-server-block-access-log.conf", - "/etc/nginx/nginx.conf", - 0o666, - ) - require.NoError(t, err) - - responses := utils.ManagementPlaneResponses(t, 2) - assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) - assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) - assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) - assert.Equal(t, "Successfully updated all files", responses[1].GetCommandResponse().GetMessage()) - - utils.VerifyUpdateDataPlaneStatus(t) + t.Run("Test 1: update nginx config file on data plane", func(t *testing.T) { + err := utils.Container.CopyFileToContainer( + ctx, + "../../config/nginx/nginx-with-server-block-access-log.conf", + "/etc/nginx/nginx.conf", + 0o666, + ) + require.NoError(t, err) + + responses := utils.ManagementPlaneResponses(t, 2) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[1].GetCommandResponse().GetMessage()) + + utils.VerifyUpdateDataPlaneStatus(t) + }) + + t.Run("Test 2: create new nginx config file", func(t *testing.T) { + err := utils.Container.CopyFileToContainer( + ctx, + "../../config/nginx/empty-nginx.conf", + "/etc/nginx/test/test.conf", + 0o666, + ) + require.NoError(t, err) + + responses := utils.ManagementPlaneResponses(t, 3) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[2].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[2].GetCommandResponse().GetMessage()) + + utils.VerifyUpdateDataPlaneStatus(t) + }) + + t.Run("Test 3: delete nginx config file", func(t *testing.T) { + _, _, err := utils.Container.Exec( + ctx, + []string{"rm", "-rf", "/etc/nginx/test"}, + ) + require.NoError(t, err) + + responses := utils.ManagementPlaneResponses(t, 4) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[3].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[3].GetCommandResponse().GetMessage()) + + utils.VerifyUpdateDataPlaneStatus(t) + }) }