Skip to content

Commit

Permalink
Add ability to run components in the Otel manager (#6697)
Browse files Browse the repository at this point in the history
* Add ability to run components in the Otel manager

# Conflicts:
#	NOTICE.txt
#	go.mod
#	go.sum

* Add coordinator test

* Set metricbeat receiver signal type to logs

* Drop unnecessary transform processor

The conversion now happens in the otel consumer in beats.

* Determine default datastream type from beat name

* Fix diagnostics tests

* Promote output queue settings to receivers

* Move otel config translation to the otel package

* Emit the otel component diagnostic conditionally

* Add more otel config translation tests

* Code review fixes

* Fix diagnostics tests

* Code Review fixes

* Correctly set input types if not present

* More code review fixes
  • Loading branch information
swiatekm authored Feb 24, 2025
1 parent d68ade9 commit eeffc0e
Show file tree
Hide file tree
Showing 9 changed files with 1,281 additions and 278 deletions.
498 changes: 249 additions & 249 deletions NOTICE.txt

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ require (
go.elastic.co/ecszap v1.0.2
go.elastic.co/go-licence-detector v0.7.0
go.opentelemetry.io/collector/component/componentstatus v0.119.0
go.opentelemetry.io/collector/pipeline v0.119.0
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.119.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.119.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.32.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/net v0.34.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.29.0
Expand Down Expand Up @@ -578,7 +580,6 @@ require (
go.opentelemetry.io/collector/pdata v1.25.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.119.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.119.0 // indirect
go.opentelemetry.io/collector/processor/processortest v0.119.0 // indirect
Expand Down Expand Up @@ -617,7 +618,6 @@ require (
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
Expand Down
138 changes: 119 additions & 19 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync/atomic"
"time"

"github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate"

"go.opentelemetry.io/collector/component/componentstatus"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand Down Expand Up @@ -217,6 +219,12 @@ type Coordinator struct {

otelMgr OTelManager
otelCfg *confmap.Conf
// the final config sent to the manager, contains both config from hybrid mode and from components
finalOtelCfg *confmap.Conf

// This variable controls whether we run supported components in the Otel manager instead of the runtime manager.
// It's a temporary measure until we decide exactly how we want to control where specific components run.
runComponentsInOtelManager bool

caps capabilities.Capabilities
modifiers []ComponentsModifier
Expand Down Expand Up @@ -384,21 +392,22 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
LogLevel: logLevel,
}
c := &Coordinator{
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
caps: caps,
modifiers: modifiers,
state: state,
logger: logger,
cfg: cfg,
agentInfo: agentInfo,
isManaged: isManaged,
specs: specs,
reexecMgr: reexecMgr,
upgradeMgr: upgradeMgr,
monitorMgr: monitorMgr,
runtimeMgr: runtimeMgr,
configMgr: configMgr,
varsMgr: varsMgr,
otelMgr: otelMgr,
runComponentsInOtelManager: false, // change this to run supported components in the Otel manager
caps: caps,
modifiers: modifiers,
state: state,
// Note: the uses of a buffered input channel in our broadcaster (the
// third parameter to broadcaster.New) means that it is possible for
// immediately adjacent writes/reads not to match, e.g.:
Expand Down Expand Up @@ -775,7 +784,7 @@ func (c *Coordinator) Run(ctx context.Context) error {
// information about the state of the Elastic Agent.
// Called by external goroutines.
func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
return diagnostics.Hooks{
hooks := diagnostics.Hooks{
{
Name: "agent-info",
Filename: "agent-info.yaml",
Expand Down Expand Up @@ -1016,6 +1025,26 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
},
},
}
if c.runComponentsInOtelManager {
otelComponentHook := diagnostics.Hook{
Name: "otel-final",
Filename: "otel-final.yaml",
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
if c.finalOtelCfg == nil {
return []byte("no active OTel configuration")
}
o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap())
if err != nil {
return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err))
}
return o
},
}
hooks = append(hooks, otelComponentHook)
}
return hooks
}

// runner performs the actual work of running all the managers.
Expand Down Expand Up @@ -1227,7 +1256,6 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) {
if c.otelMgr != nil {
c.otelCfg = cfg.OTel
c.otelMgr.Update(cfg.OTel)
}
return c.processConfigAgent(ctx, cfg)
}
Expand Down Expand Up @@ -1413,17 +1441,89 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
c.logger.Debugf("Continue with missing \"signed\" properties: %v", err)
}

model := component.Model{
model := &component.Model{
Components: c.componentModel,
Signed: signed,
}

c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
c.runtimeMgr.Update(model)
return c.updateManagersWithConfig(model)
}

// updateManagersWithConfig updates runtime managers with the component model and config.
// Components may be sent to different runtimes depending on various criteria.
func (c *Coordinator) updateManagersWithConfig(model *component.Model) error {
runtimeModel, otelModel := c.splitModelBetweenManagers(model)
c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model")
c.runtimeMgr.Update(*runtimeModel)
return c.updateOtelManagerConfig(otelModel)
}

// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration
// from the component model passed in and from the hybrid-mode otel config set on the Coordinator.
func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
finalOtelCfg := confmap.New()
var componentOtelCfg *confmap.Conf
if len(model.Components) > 0 {
var err error
c.logger.With("components", model.Components).Debug("Updating otel manager model")
componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo)
if err != nil {
c.logger.Errorf("failed to generate otel config: %v", err)
}
}
if componentOtelCfg != nil {
err := finalOtelCfg.Merge(componentOtelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if c.otelCfg != nil {
err := finalOtelCfg.Merge(c.otelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if len(finalOtelCfg.AllKeys()) == 0 {
// if the config is empty, we want to send nil to the manager, so it knows to stop the collector
finalOtelCfg = nil
}

c.otelMgr.Update(finalOtelCfg)
c.finalOtelCfg = finalOtelCfg
return nil
}

// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
if !c.runComponentsInOtelManager {
// Runtime manager gets all the components, this is the default
otelModel = &component.Model{}
runtimeModel = model
return
}
var otelComponents, runtimeComponents []component.Component
for _, comp := range model.Components {
if configtranslate.IsComponentOtelSupported(&comp) {
otelComponents = append(otelComponents, comp)
} else {
runtimeComponents = append(runtimeComponents, comp)
}
}
otelModel = &component.Model{
Components: otelComponents,
// the signed portion of the policy is only used by Defend, so otel doesn't need it for anything
}
runtimeModel = &component.Model{
Components: runtimeComponents,
Signed: model.Signed,
}
return
}

// generateComponentModel regenerates the configuration tree and
// components from the current AST and vars and returns the result.
// Called from both the main Coordinator goroutine and from external
Expand Down
Loading

0 comments on commit eeffc0e

Please sign in to comment.