Skip to content

Commit

Permalink
Fix deadlock in error handling in OTelManager (#6927)
Browse files Browse the repository at this point in the history
* Fix race in error handling.

* Add changelog.

* Add testing.

* Changelog bug-fix.
  • Loading branch information
blakerouse authored Feb 20, 2025
1 parent 93cfb49 commit 9b572ef
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 28 deletions.
34 changes: 34 additions & 0 deletions changelog/fragments/1739981369-Fix-deadlock-in-OTelManager.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix deadlock in OTelManager

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Fixes a deadlock case in the OTelManager where an Update can occur while an error is also being reported
which causes the communication with the OTelManager to hit a deadlock.
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6927

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
50 changes: 25 additions & 25 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type OTelManager struct {
func NewOTelManager(logger *logger.Logger) *OTelManager {
return &OTelManager{
logger: logger,
errCh: make(chan error),
errCh: make(chan error, 1), // holds at most one error
cfgCh: make(chan *confmap.Conf),
statusCh: make(chan *status.AggregateStatus),
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -83,16 +83,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// error occurred while running the collector, this occurs in the
Expand All @@ -114,10 +108,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// pass the error to the errCh so the coordinator, unless it's a cancel error
if !errors.Is(err, context.Canceled) {
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
}
}
case cfg := <-m.cfgCh:
Expand All @@ -141,10 +132,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// ensure that the coordinator knows that there is no error
// as the collector is not running anymore
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
} else {
// either a new configuration or the first configuration
// that results in the collector being started
Expand All @@ -157,16 +145,10 @@ func (m *OTelManager) Run(ctx context.Context) error {
// it's failing to run). we do not retry creation on failure
// as it will always fail a new configuration is required for
// it not to fail (a new configuration will result in the retry)
select {
case m.errCh <- err:
case <-ctx.Done():
}
m.reportErr(ctx, err)
} else {
// all good at the moment (possible that it will fail)
select {
case m.errCh <- nil:
case <-ctx.Done():
}
m.reportErr(ctx, nil)
}
} else {
// collector is already running so only the configuration
Expand Down Expand Up @@ -226,3 +208,21 @@ func (m *OTelManager) startCollector(cfg *confmap.Conf, errCh chan error) (conte
}()
return cancel, ap, nil
}

// reportErr reports an error to the service that is controlling this manager
//
// the manager can be blocked doing other work like sending this manager a new configuration
// so we do not want error reporting to be a blocking send over the channel
//
// the manager really only needs the most recent error, so if it misses an error it's not a big
// deal, what matters is that it has the current error for the state of this manager
func (m *OTelManager) reportErr(ctx context.Context, err error) {
select {
case <-m.errCh:
default:
}
select {
case m.errCh <- err:
case <-ctx.Done():
}
}
19 changes: 16 additions & 3 deletions internal/pkg/otel/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ var (
)

func TestOTelManager_Run(t *testing.T) {
t.Skip("Flaky test") // https://github.com/elastic/elastic-agent/issues/6119
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l, _ := loggertest.New("otel")
Expand Down Expand Up @@ -206,8 +205,22 @@ func TestOTelManager_ConfigError(t *testing.T) {
}
}()

cfg := confmap.New() // invalid config
m.Update(cfg)
// Errors channel is non-blocking, should be able to send an Update that causes an error multiple
// times without it blocking on sending over the errCh.
for range 3 {
cfg := confmap.New() // invalid config
m.Update(cfg)

// delay between updates to ensure the collector will have to fail
<-time.After(100 * time.Millisecond)
}

// because of the retry logic and timing we need to ensure
// that this keeps retrying to see the error and only store
// an actual error
//
// a nil error just means that the collector is trying to restart
// which clears the error on the restart loop
timeoutCh := time.After(time.Second * 5)
var err error
outer:
Expand Down

0 comments on commit 9b572ef

Please sign in to comment.