Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(loki/src/k8s_events): correctly update health state for the component during operation #6385

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ v0.40.0 (2024-02-27)

- Fix issue where registry was not being properly deleted. (@mattdurham)

- Fix bug where `loki.source.kubernetes_events` unable to register as unhealthy
when there are failures for underlying informers. (@hainenber)

### Other changes

- Removed support for Windows 2012 in line with Microsoft end of life. (@mattdurham)
Expand Down
17 changes: 16 additions & 1 deletion component/loki/source/kubernetes_events/event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -58,6 +59,9 @@ type eventController struct {

positionsKey string
initTimestamp time.Time

taskErr error
taskErrMut sync.RWMutex
}

func newEventController(task eventControllerTask) *eventController {
Expand Down Expand Up @@ -85,9 +89,14 @@ func (ctrl *eventController) Run(ctx context.Context) {
level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace)
defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace)

if err := ctrl.runError(ctx); err != nil {
err := ctrl.runError(ctx)
if err != nil {
level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err)
}

ctrl.taskErrMut.Lock()
ctrl.taskErr = err
ctrl.taskErrMut.Unlock()
}

func (ctrl *eventController) runError(ctx context.Context) error {
Expand Down Expand Up @@ -343,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo {
}
}

func (ctrl *eventController) GetTaskError() error {
ctrl.taskErrMut.RLock()
defer ctrl.taskErrMut.RUnlock()
return ctrl.taskErr
}

type controllerInfo struct {
Namespace string `river:"namespace,attr"`
LastTimestamp time.Time `river:"last_event_timestamp,attr"`
Expand Down
65 changes: 62 additions & 3 deletions component/loki/source/kubernetes_events/kubernetes_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ type Component struct {

receiversMut sync.RWMutex
receivers []loki.LogsReceiver

healthMut sync.RWMutex
health component.Health
}

var (
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

// New creates a new loki.source.kubernetes_events component.
Expand Down Expand Up @@ -154,6 +158,7 @@ func (c *Component) Run(ctx context.Context) error {
c.tasksMut.RUnlock()

if err := c.runner.ApplyTasks(ctx, tasks); err != nil {
c.setHealth(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this solves the issue in #6181 entirely.

ApplyTasks will return an error if the runner is closed, or if the context is already cancelled/exceeded via ctx.Err; we don't have a way to propagate errors directly from the underlying tasks as the worker interface is Run(ctx context.Context).

To make this happen we'd need to have a goroutine-safe mechanism of bubbling up the errors and sieving through them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so my approach is to change the worker interface as there aren't many implementations yet.

The ApplyTask function will now collect returned errors and store within the caller's struct.
 
I add a new method for Worker interface to get the bubbled-up errors, GetWorkerErrors to help setting the component's health properly.

PTAL if you have any concern re: goroutine-safe aspect. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed my approach and now choose one with smaller blast radius. That is, the eventController retain any error when executing Run method and there's a recurring actor in the component to set health based on the returned results.

Not exactly ideal since the component's health is not reflected right away but I think it's a worthwhile tradeoff

level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)
}
}
Expand All @@ -162,6 +167,31 @@ func (c *Component) Run(ctx context.Context) error {
cancel()
})

// Actor to set component health through errors from applied tasks.
ticker := time.NewTicker(500 * time.Millisecond)
rg.Add(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
appliedTaskErrorString := ""
for _, worker := range c.runner.Workers() {
if taskError := worker.(*eventController).GetTaskError(); taskError != nil {
appliedTaskErrorString += taskError.Error() + "\n"
}
}
if appliedTaskErrorString != "" {
c.setHealth(fmt.Errorf(appliedTaskErrorString))
} else {
c.setHealth(nil)
}
}
}
}, func(_ error) {
cancel()
})

// Runner to forward received logs.
rg.Add(func() error {
for {
Expand All @@ -182,7 +212,10 @@ func (c *Component) Run(ctx context.Context) error {
cancel()
})

return rg.Run()
err := rg.Run()
c.setHealth(err)

return err
}

// Update implements component.Component.
Expand Down Expand Up @@ -257,3 +290,29 @@ func (c *Component) DebugInfo() interface{} {
}
return info
}

// CurrentHealth implements component.HealthComponent
func (c *Component) CurrentHealth() component.Health {
c.healthMut.RLock()
defer c.healthMut.RUnlock()
return c.health
}

func (c *Component) setHealth(err error) {
c.healthMut.Lock()
defer c.healthMut.Unlock()

if err == nil {
c.health = component.Health{
Health: component.HealthTypeHealthy,
Message: "component is ready",
UpdateTime: time.Now(),
}
} else {
c.health = component.Health{
Health: component.HealthTypeUnhealthy,
Message: fmt.Sprintf("component encounters error: %s", err),
UpdateTime: time.Now(),
}
}
}
Loading