diff --git a/CHANGELOG.md b/CHANGELOG.md index b9988a7fe3f2..e2cf98c623cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -135,6 +135,9 @@ v0.40.0 (2024-02-27) - Fix issue where registry was not being properly deleted. (@mattdurham) +- Fix bug where `loki.source.kubernetes_events` unable to register as unhealthy + when there are failures for underlying informers. (@hainenber) + ### Other changes - Removed support for Windows 2012 in line with Microsoft end of life. (@mattdurham) diff --git a/component/loki/source/kubernetes_events/event_controller.go b/component/loki/source/kubernetes_events/event_controller.go index 0f45c3dc62ae..cb1d12fa0e5a 100644 --- a/component/loki/source/kubernetes_events/event_controller.go +++ b/component/loki/source/kubernetes_events/event_controller.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/cespare/xxhash/v2" @@ -58,6 +59,9 @@ type eventController struct { positionsKey string initTimestamp time.Time + + taskErr error + taskErrMut sync.RWMutex } func newEventController(task eventControllerTask) *eventController { @@ -85,9 +89,14 @@ func (ctrl *eventController) Run(ctx context.Context) { level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace) defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace) - if err := ctrl.runError(ctx); err != nil { + err := ctrl.runError(ctx) + if err != nil { level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err) } + + ctrl.taskErrMut.Lock() + ctrl.taskErr = err + ctrl.taskErrMut.Unlock() } func (ctrl *eventController) runError(ctx context.Context) error { @@ -343,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo { } } +func (ctrl *eventController) GetTaskError() error { + ctrl.taskErrMut.RLock() + defer ctrl.taskErrMut.RUnlock() + return ctrl.taskErr +} + type controllerInfo struct { Namespace string `river:"namespace,attr"` LastTimestamp time.Time `river:"last_event_timestamp,attr"` diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index e06247f05b27..1bf803371e6e 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -95,11 +95,15 @@ type Component struct { receiversMut sync.RWMutex receivers []loki.LogsReceiver + + healthMut sync.RWMutex + health component.Health } var ( - _ component.Component = (*Component)(nil) - _ component.DebugComponent = (*Component)(nil) + _ component.Component = (*Component)(nil) + _ component.DebugComponent = (*Component)(nil) + _ component.HealthComponent = (*Component)(nil) ) // New creates a new loki.source.kubernetes_events component. @@ -154,6 +158,7 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RUnlock() if err := c.runner.ApplyTasks(ctx, tasks); err != nil { + c.setHealth(err) level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } } @@ -162,6 +167,31 @@ func (c *Component) Run(ctx context.Context) error { cancel() }) + // Actor to set component health through errors from applied tasks. + ticker := time.NewTicker(500 * time.Millisecond) + rg.Add(func() error { + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + appliedTaskErrorString := "" + for _, worker := range c.runner.Workers() { + if taskError := worker.(*eventController).GetTaskError(); taskError != nil { + appliedTaskErrorString += taskError.Error() + "\n" + } + } + if appliedTaskErrorString != "" { + c.setHealth(fmt.Errorf(appliedTaskErrorString)) + } else { + c.setHealth(nil) + } + } + } + }, func(_ error) { + cancel() + }) + // Runner to forward received logs. rg.Add(func() error { for { @@ -182,7 +212,10 @@ func (c *Component) Run(ctx context.Context) error { cancel() }) - return rg.Run() + err := rg.Run() + c.setHealth(err) + + return err } // Update implements component.Component. @@ -257,3 +290,29 @@ func (c *Component) DebugInfo() interface{} { } return info } + +// CurrentHealth implements component.HealthComponent +func (c *Component) CurrentHealth() component.Health { + c.healthMut.RLock() + defer c.healthMut.RUnlock() + return c.health +} + +func (c *Component) setHealth(err error) { + c.healthMut.Lock() + defer c.healthMut.Unlock() + + if err == nil { + c.health = component.Health{ + Health: component.HealthTypeHealthy, + Message: "component is ready", + UpdateTime: time.Now(), + } + } else { + c.health = component.Health{ + Health: component.HealthTypeUnhealthy, + Message: fmt.Sprintf("component encounters error: %s", err), + UpdateTime: time.Now(), + } + } +}