diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cdb5b0fc36ec..071bf9bbe96a 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -362,7 +362,8 @@ /comp/checks/winregistry @DataDog/windows-products /comp/core/autodiscovery @DataDog/container-platform /comp/core/config @DataDog/agent-configuration -/comp/core/configstream @DataDog/agent-metric-pipelines @DataDog/agent-configuration +/comp/core/configstream @DataDog/agent-configuration +/comp/core/configstreamconsumer @DataDog/agent-configuration /comp/core/configsync @DataDog/agent-configuration /comp/core/flare @DataDog/agent-configuration /comp/core/gui @DataDog/agent-configuration diff --git a/BUILD.bazel b/BUILD.bazel index f96a5629952a..0ccff1c4b0b8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -98,6 +98,9 @@ exports_files(glob( # gazelle:exclude comp/core/bundle_mock.go # gazelle:exclude comp/core/bundle_params.go # gazelle:exclude comp/core/bundle_test.go +# gazelle:exclude comp/core/configstreamconsumer/fx +# gazelle:exclude comp/core/configstreamconsumer/impl +# gazelle:exclude comp/core/configstreamconsumer/mock # gazelle:exclude comp/core/diagnose/fx # gazelle:exclude comp/core/diagnose/impl # gazelle:exclude comp/core/diagnose/local diff --git a/cmd/agent/subcommands/run/command_test.go b/cmd/agent/subcommands/run/command_test.go index 5e3012b42191..704903f6ef12 100644 --- a/cmd/agent/subcommands/run/command_test.go +++ b/cmd/agent/subcommands/run/command_test.go @@ -43,11 +43,11 @@ func newGlobalParamsTest(t *testing.T) *command.GlobalParams { // which lead to build: // - config.Component which requires a valid datadog.yaml // - hostname.Component which requires a valid hostname - config := path.Join(t.TempDir(), "datadog.yaml") - err := os.WriteFile(config, []byte("hostname: test"), 0644) + configPath := path.Join(t.TempDir(), "datadog.yaml") + err := os.WriteFile(configPath, []byte("hostname: test"), 0644) require.NoError(t, err) return &command.GlobalParams{ - ConfFilePath: config, + ConfFilePath: configPath, } } diff --git a/comp/README.md b/comp/README.md index 1d73b73de2e1..3cee2a46926d 100644 --- a/comp/README.md +++ b/comp/README.md @@ -118,10 +118,16 @@ component temporarily wraps pkg/config. ### [comp/core/configstream](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstream) -*Datadog Team*: agent-metric-pipelines agent-configuration +*Datadog Team*: agent-configuration Package configstream implements a component to handle streaming configuration events to subscribers. +### [comp/core/configstreamconsumer](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstreamconsumer) + +*Datadog Team*: agent-configuration + +Package configstreamconsumer implements a component that consumes config streams from the core agent. + ### [comp/core/configsync](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configsync) *Datadog Team*: agent-configuration diff --git a/comp/core/configstream/README.md b/comp/core/configstream/README.md index 224dc0eb3280..ae6e013575e5 100644 --- a/comp/core/configstream/README.md +++ b/comp/core/configstream/README.md @@ -91,17 +91,18 @@ message ConfigSetting { ## Configuration -The config stream is automatically enabled when the component is loaded. No explicit configuration required. +The config stream component always runs. Individual connections are RAR-gated: the caller must be a registered remote agent. -**Optional settings:** +**Settings:** ```yaml # datadog.yaml remote_agent: registry: - enabled: true # Required for RAR-gated authorization + enabled: true # Required for RAR authorization; remote agents must register before subscribing configstream: - enabled: true # Required to use the configstreamconsumer - sleep_interval: 10s # Backoff on non-terminal errors (default: 10s) + sleep_interval: 10s # Backoff on non-terminal send errors (default: 10s) + consumer: + enabled: false # Default: false. Set to true on Go-based remote agents to enable the configstreamconsumer component. agent_ipc: # Maximum size of a single gRPC message accepted/sent by the agent's gRPC # server. Configstream snapshots can be large (the entire flattened agent @@ -288,6 +289,6 @@ log_level: debug ## Contact -- **Teams:** agent-metric-pipelines, agent-configuration +- **Team:** agent-configuration - **Component:** `comp/core/configstream` - **Test Client:** `cmd/config-stream-client` diff --git a/comp/core/configstream/def/component.go b/comp/core/configstream/def/component.go index 8b29806b3a2b..360ea947ef9b 100644 --- a/comp/core/configstream/def/component.go +++ b/comp/core/configstream/def/component.go @@ -10,7 +10,7 @@ import ( pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" ) -// team: agent-metric-pipelines agent-configuration +// team: agent-configuration // Component is the component type. type Component interface { diff --git a/comp/core/configstream/impl/configstream.go b/comp/core/configstream/impl/configstream.go index 5a6a61269231..d2738d4aeff4 100644 --- a/comp/core/configstream/impl/configstream.go +++ b/comp/core/configstream/impl/configstream.go @@ -19,7 +19,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" configstream "github.com/DataDog/datadog-agent/comp/core/configstream/def" log "github.com/DataDog/datadog-agent/comp/core/log/def" - "github.com/DataDog/datadog-agent/comp/core/telemetry/def" + telemetry "github.com/DataDog/datadog-agent/comp/core/telemetry/def" compdef "github.com/DataDog/datadog-agent/comp/def" "github.com/DataDog/datadog-agent/pkg/config/model" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" @@ -67,7 +67,7 @@ type subscription struct { } // NewComponent creates a new configstream component. -func NewComponent(reqs Requires) Provides { +func NewComponent(reqs Requires) (Provides, error) { cs := &configStream{ config: reqs.Config, log: reqs.Log, @@ -100,7 +100,7 @@ func NewComponent(reqs Requires) Provides { return Provides{ Comp: cs, - } + }, nil } // Subscribe returns a channel that streams configuration events, starting with a snapshot. diff --git a/comp/core/configstream/impl/configstream_test.go b/comp/core/configstream/impl/configstream_test.go index 84cb64d493ff..90c55a38d83e 100644 --- a/comp/core/configstream/impl/configstream_test.go +++ b/comp/core/configstream/impl/configstream_test.go @@ -281,7 +281,21 @@ done: } } -// newConfigStreamForTest creates a config stream for testing without lifecycle +func TestNewComponentNoError(t *testing.T) { + mockLog := logmock.New(t) + telemetryComp := telemetrynoops.GetCompatComponent() + cfg := configmock.New(t) + _, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, + }) + require.NoError(t, err) +} + +// newConfigStreamForTest creates a config stream for testing without lifecycle. +// It manually starts the run loop since the test lifecycle does not execute hooks. func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Component) *configStream { telemetryComp := telemetrynoops.GetCompatComponent() reqs := Requires{ @@ -290,10 +304,9 @@ func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Compo Log: logger, Telemetry: telemetryComp, } - provides := NewComponent(reqs) + provides, err := NewComponent(reqs) + require.NoError(t, err) - // Extract the underlying configStream - // and start the run loop manually since lifecycle hooks are not executed cs := provides.Comp.(*configStream) go cs.run() @@ -342,10 +355,11 @@ func buildComponent(t *testing.T) (Provides, *configInterceptor) { Telemetry: telemetrynoops.GetCompatComponent(), } - provides := NewComponent(reqs) + provides, err := NewComponent(reqs) + require.NoError(t, err) // Start the component's run loop - err := lc.Start(context.Background()) + err = lc.Start(context.Background()) require.NoError(t, err) t.Cleanup(func() { diff --git a/comp/core/configstream/server/server_test.go b/comp/core/configstream/server/server_test.go index 4a0411ee1ce5..f634f120eb96 100644 --- a/comp/core/configstream/server/server_test.go +++ b/comp/core/configstream/server/server_test.go @@ -183,6 +183,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with missing metadata", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -198,6 +199,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with missing session_id in metadata", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -215,6 +217,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with empty session_id", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) diff --git a/comp/core/configstreamconsumer/README.md b/comp/core/configstreamconsumer/README.md new file mode 100644 index 000000000000..2606e243e735 --- /dev/null +++ b/comp/core/configstreamconsumer/README.md @@ -0,0 +1,143 @@ +# Config Stream Consumer Component + +A shared Go library for remote agents (system-probe, trace-agent, process-agent, etc.) to consume configuration streams from the core Datadog Agent. It provides gRPC connection management, snapshot gating, and ordered config application, writing received settings directly into the agent's `config.Component`. + +## Overview + +- **Real-time config**: Receive full snapshot then incremental updates from the core agent over gRPC. +- **RAR-gated**: Only registered remote agents can subscribe; session ID is required (fixed or via `SessionIDProvider`). +- **Readiness gating**: `Start` blocks until the first config snapshot is received, aborting startup if `Params.ReadyTimeout` (default: 60s) is exceeded. +- **Single source of truth**: Streamed config is written into `config.Component` via `model.Writer`. Callers read config through `config.Component` directly — not through this component. +- **Ordered updates**: Sequential application by sequence ID; stale updates dropped, discontinuities trigger resync. +- **Restart safety**: `lastSeqID` is never reset on reconnect. If the core agent restarts and its sequence counter resets, the consumer logs an error and refuses the new snapshot until the sub-process itself restarts. +- **Telemetry**: Metrics for time-to-first-snapshot, reconnects, sequence ID, and dropped updates. + +## Architecture + +Producer (core agent) and consumer (remote agents) communicate over the same gRPC contract: + +``` +┌─────────────────────────┐ ┌─────────────────────────┐ +│ Core Agent Process │ │ Remote Agent Process │ +│ │ │ (e.g. system-probe) │ +│ ┌──────────────────┐ │ │ ┌──────────────────┐ │ +│ │ configstream │ │ gRPC │ │ configstream- │ │ +│ │ (producer) │◄──┼──────────┼─►│ consumer │ │ +│ │ │ │ stream │ │ │ │ +│ └──────────────────┘ │ │ └──────────────────┘ │ +└─────────────────────────┘ └─────────────────────────┘ +``` + +**Flow:** + +1. Remote agent registers with RAR and obtains `session_id` (or supplies it via `SessionIDProvider`). +2. Consumer connects to core agent and calls `StreamConfigEvents` with `session_id` in gRPC metadata. +3. Core agent validates the session and sends an initial snapshot, then streams incremental updates. +4. Consumer applies snapshot/updates in order and writes them into `config.Component` via `model.Writer`. + +See `../configstream/README.md` for the producer side and the gRPC/protobuf contract. + +## Quick Start + +Supply **either** a fixed `SessionID` **or** a `SessionIDProvider` (e.g. from the remote agent component). The consumer uses the provider at connect time so RAR can register first. + +## Wiring guide + +### Only include the module when the feature is enabled + +Including `configstreamconsumerfx.Module()` when config streaming is disabled will abort FX startup. Gate on `remote_agent.configstream.consumer.enabled` before building FX options: + +```go +if cfg.GetBool("remote_agent.configstream.consumer.enabled") { + opts = append(opts, configstreamFxOptions()) +} +``` + +### Full example + +```go +func configstreamFxOptions() fx.Option { + return fx.Options( + // Bridge config.Component to model.Writer so the consumer can write streamed config. + fx.Provide(func(c config.Component) model.Writer { return c }), + + // Provide the SessionIDProvider from the remote agent (blocks until RAR registration). + fx.Provide(func(ra remoteagent.Component) configstreamconsumerimpl.SessionIDProvider { + if ra == nil { + return nil + } + if p, ok := ra.(configstreamconsumerimpl.SessionIDProvider); ok { + return p + } + return nil + }), + + // Provide Params — only reached when configstream is known to be enabled. + fx.Provide(func(c config.Component, deps struct { + fx.In + SessionProvider configstreamconsumerimpl.SessionIDProvider `optional:"true"` + }) *configstreamconsumerimpl.Params { + host := c.GetString("cmd_host") + port := c.GetInt("cmd_port") + if port <= 0 { + port = 5001 + } + return &configstreamconsumerimpl.Params{ + ClientName: "my-agent", + CoreAgentAddress: net.JoinHostPort(host, strconv.Itoa(port)), + SessionIDProvider: deps.SessionProvider, + } + }), + + configstreamconsumerfx.Module(), + // Force instantiation so Start runs and blocks until the first snapshot. + fx.Invoke(func(_ configstreamconsumer.Component) {}), + ) +} +``` + +## Requirements + +- **Core agent**: configstream component (always on by default) and RAR enabled (`remote_agent.registry.enabled: true`). +- **Consumer opt-in**: Set `remote_agent.configstream.consumer.enabled: true` on the remote agent to enable this component. +- **RAR**: Remote agent must register with RAR before subscribing; pass `session_id` via gRPC metadata (supply fixed `SessionID` or `SessionIDProvider` with `WaitSessionID(ctx) (string, error)`). +- **IPC**: mTLS and auth token for gRPC (same as other core-agent IPC). +- **`model.Writer`**: `config.Component` must be explicitly provided as `model.Writer` in the same FX scope. Streamed settings are written using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process. + +## Telemetry + +| Metric | Type | Description | +|--------|------|-------------| +| `configstream_consumer.time_to_first_snapshot_seconds` | Gauge | Time to receive first snapshot | +| `configstream_consumer.reconnect_count` | Counter | Stream reconnections | +| `configstream_consumer.last_sequence_id` | Gauge | Last received config sequence ID | +| `configstream_consumer.dropped_stale_updates` | Counter | Stale updates dropped | + +## Testing + +### Manual testing with system-probe + +1. Start the core agent with RAR and config stream enabled. +2. Set `cmd_host` / `cmd_port` in the config used by system-probe. +3. Start system-probe. You should see: + - `Waiting for initial configuration from core agent...` + - After snapshot: `Initial configuration received from core agent. Starting system-probe.` +4. If the core agent is down or the stream never sends a snapshot, system-probe exits with: `waiting for initial config snapshot: context deadline exceeded`. + +## Troubleshooting + +- **session_id required in metadata** + Ensure the remote agent registers with RAR first and that the consumer is given either a fixed `SessionID` or a `SessionIDProvider` that returns the session ID. + +- **Startup timeout (no snapshot received within `ReadyTimeout`)** + Core agent must be running, config stream enabled, and RAR returning a valid session. Check core agent logs for config stream and RAR errors. + +- **"core agent may have restarted" error in logs** + The consumer received a snapshot with a lower sequence ID than its last-known value, indicating the core agent restarted. Restart the sub-process to accept the new configuration. + +## Related documentation + +- **Producer**: `../configstream/README.md` — core agent config streaming service and gRPC contract. +- **Test client**: `cmd/config-stream-client/README.md` — standalone client for end-to-end testing. + +**Team**: agent-configuration diff --git a/comp/core/configstreamconsumer/def/BUILD.bazel b/comp/core/configstreamconsumer/def/BUILD.bazel new file mode 100644 index 000000000000..8430fe781a88 --- /dev/null +++ b/comp/core/configstreamconsumer/def/BUILD.bazel @@ -0,0 +1,8 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "def", + srcs = ["component.go"], + importpath = "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def", + visibility = ["//visibility:public"], +) diff --git a/comp/core/configstreamconsumer/def/component.go b/comp/core/configstreamconsumer/def/component.go new file mode 100644 index 000000000000..aba7e492bc63 --- /dev/null +++ b/comp/core/configstreamconsumer/def/component.go @@ -0,0 +1,43 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package configstreamconsumer implements a component that consumes config streams from the core agent. +// +// team: agent-configuration +package configstreamconsumer + +import ( + "context" + "time" +) + +// SessionIDProvider supplies the RAR session ID, typically after registration completes. +// When set, the consumer will call WaitSessionID at connect time instead of using Params.SessionID. +type SessionIDProvider interface { + WaitSessionID(ctx context.Context) (string, error) +} + +// Params defines the parameters for the configstreamconsumer component +type Params struct { + // ClientName is the identity of this remote agent (e.g., "system-probe", "trace-agent") + ClientName string + // CoreAgentAddress is the address of the core agent IPC endpoint + CoreAgentAddress string + // SessionID is the RAR session ID for authorization. Required if SessionIDProvider is nil. + SessionID string + // SessionIDProvider supplies the session ID at connect time (e.g. from remote agent component). + // When set, SessionID may be empty; the consumer will block on WaitSessionID before connecting. + SessionIDProvider SessionIDProvider + // ReadyTimeout is how long OnStart blocks waiting for the first config snapshot before + // returning an error and aborting startup. Defaults to 60s when zero. + ReadyTimeout time.Duration +} + +// Component is the config stream consumer component interface. +// Its sole purpose is to receive configuration from the core agent stream and write it +// into the local config.Component via the model.Writer provided at construction. +// Callers that need to read config or subscribe to changes should use config.Component directly. +// Readiness is guaranteed by the FX lifecycle: start blocks until the first snapshot is received. +type Component interface{} diff --git a/comp/core/configstreamconsumer/fx/fx.go b/comp/core/configstreamconsumer/fx/fx.go new file mode 100644 index 000000000000..618afb61f90a --- /dev/null +++ b/comp/core/configstreamconsumer/fx/fx.go @@ -0,0 +1,21 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package fx provides the fx module for the configstreamconsumer component +package fx + +import ( + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + configstreamconsumerimpl "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/impl" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" +) + +// Module defines the fx options for this component +func Module() fxutil.Module { + return fxutil.Component( + fxutil.ProvideComponentConstructor(configstreamconsumerimpl.NewComponent), + fxutil.ProvideOptional[configstreamconsumer.Component](), + ) +} diff --git a/comp/core/configstreamconsumer/impl/consumer.go b/comp/core/configstreamconsumer/impl/consumer.go new file mode 100644 index 000000000000..c27dee8d8270 --- /dev/null +++ b/comp/core/configstreamconsumer/impl/consumer.go @@ -0,0 +1,403 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package configstreamconsumerimpl implements the configstreamconsumer component +package configstreamconsumerimpl + +import ( + "context" + "errors" + "fmt" + "io" + "math" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + ipc "github.com/DataDog/datadog-agent/comp/core/ipc/def" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + "github.com/DataDog/datadog-agent/comp/core/telemetry/def" + compdef "github.com/DataDog/datadog-agent/comp/def" + "github.com/DataDog/datadog-agent/pkg/config/model" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" + grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc" +) + +// Requires defines the dependencies for the configstreamconsumer component +type Requires struct { + compdef.In + + Lifecycle compdef.Lifecycle + Log log.Component + IPC ipc.Component + Telemetry telemetry.Component + ConfigWriter model.Writer + Params configstreamconsumer.Params +} + +// Provides defines the output of the configstreamconsumer component +type Provides struct { + compdef.Out + + Comp configstreamconsumer.Component +} + +// consumer implements the configstreamconsumer.Component interface +type consumer struct { + log log.Component + ipc ipc.Component + telemetry telemetry.Component + params configstreamconsumer.Params + configWriter model.Writer // writes streamed config into the local config.Component + + conn *grpc.ClientConn + client pb.AgentSecureClient + stream pb.AgentSecure_StreamConfigEventsClient + streamLock sync.Mutex + + effectiveConfig map[string]interface{} + configLock sync.RWMutex + lastSeqID int32 + + ready bool + readyCh chan struct{} + readyOnce sync.Once + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + startTime time.Time + + timeToFirstSnapshot telemetry.Gauge + streamReconnectCount telemetry.Counter + lastSeqIDMetric telemetry.Gauge + droppedStaleUpdates telemetry.Counter +} + +// NewComponent creates a new configstreamconsumer component +func NewComponent(reqs Requires) (Provides, error) { + p := reqs.Params + if p.ClientName == "" { + return Provides{}, errors.New("ClientName is required") + } + if p.CoreAgentAddress == "" { + return Provides{}, errors.New("CoreAgentAddress is required") + } + if p.SessionID == "" && p.SessionIDProvider == nil { + return Provides{}, fmt.Errorf("configstreamconsumer: neither SessionID nor SessionIDProvider set for client %s", p.ClientName) + } + if p.SessionID != "" && p.SessionIDProvider != nil { + return Provides{}, errors.New("exactly one of SessionID or SessionIDProvider must be set") + } + + c := &consumer{ + log: reqs.Log, + ipc: reqs.IPC, + telemetry: reqs.Telemetry, + params: p, + configWriter: reqs.ConfigWriter, + effectiveConfig: make(map[string]interface{}), + readyCh: make(chan struct{}), + } + + c.initMetrics() + + // Register lifecycle hooks + reqs.Lifecycle.Append(compdef.Hook{ + OnStart: c.start, + OnStop: c.stop, + }) + + return Provides{Comp: c}, nil +} + +// start initiates the config stream connection and blocks until the first config snapshot is +// received. Blocking here ensures all components initialized after this one (and the binary's +// run function) see a fully-populated config. Returns an error if the snapshot is not received +// within ReadyTimeout (default 60s), which aborts FX startup. +func (c *consumer) start(_ context.Context) error { + // Use context.Background() so the stream lifetime is not bounded by the + // Fx startup context, which expires after app.StartTimeout (~5 minutes). + c.ctx, c.cancel = context.WithCancel(context.Background()) + c.startTime = time.Now() + + c.wg.Add(1) + go c.streamLoop() + + timeout := c.params.ReadyTimeout + if timeout == 0 { + timeout = 60 * time.Second + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + c.log.Infof("Waiting for initial configuration from core agent (timeout: %v)...", timeout) + if err := c.waitReady(ctx); err != nil { + c.cancel() + c.wg.Wait() + return fmt.Errorf("waiting for initial config snapshot: %w", err) + } + c.log.Infof("Initial configuration received from core agent.") + return nil +} + +// stop gracefully shuts down the consumer +func (c *consumer) stop(_ context.Context) error { + c.cancel() + c.streamLock.Lock() + if c.stream != nil { + _ = c.stream.CloseSend() + } + if c.conn != nil { + _ = c.conn.Close() + } + c.streamLock.Unlock() + c.wg.Wait() + return nil +} + +// waitReady blocks until the first config snapshot has been received and applied +func (c *consumer) waitReady(ctx context.Context) error { + select { + case <-c.readyCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for config snapshot: %w", ctx.Err()) + } +} + +// streamLoop manages the lifecycle of the config stream connection +func (c *consumer) streamLoop() { + defer c.wg.Done() + + for { + select { + case <-c.ctx.Done(): + return + default: + } + + // Establish connection and stream + if err := c.connectAndStream(); err != nil { + if err == context.Canceled || c.ctx.Err() != nil { + return + } + c.log.Warnf("Config stream error: %v, reconnecting...", err) + c.streamReconnectCount.Inc() + + select { + case <-c.ctx.Done(): + return + case <-time.After(5 * time.Second): + continue + } + } + } +} + +// connectAndStream establishes a gRPC connection and processes the config stream +func (c *consumer) connectAndStream() error { + conn, err := grpc.NewClient(c.params.CoreAgentAddress, + grpc.WithTransportCredentials(credentials.NewTLS(c.ipc.GetTLSClientConfig())), + grpc.WithPerRPCCredentials(grpcutil.NewBearerTokenAuth(c.ipc.GetAuthToken())), + ) + if err != nil { + return fmt.Errorf("failed to connect to core agent: %w", err) + } + // Ensure conn is closed when this invocation exits (EOF, error, or context cancel). + defer conn.Close() + + c.streamLock.Lock() + c.conn = conn + c.client = pb.NewAgentSecureClient(conn) + c.streamLock.Unlock() + + sessionID := c.params.SessionID + if c.params.SessionIDProvider != nil { + var err error + sessionID, err = c.params.SessionIDProvider.WaitSessionID(c.ctx) + if err != nil { + return fmt.Errorf("waiting for session ID: %w", err) + } + } + // Add session_id to gRPC metadata + md := metadata.New(map[string]string{"session_id": sessionID}) + ctxWithMetadata := metadata.NewOutgoingContext(c.ctx, md) + + // Start streaming + stream, err := c.client.StreamConfigEvents(ctxWithMetadata, &pb.ConfigStreamRequest{ + Name: c.params.ClientName, + }) + if err != nil { + return fmt.Errorf("failed to start config stream: %w", err) + } + + c.streamLock.Lock() + c.stream = stream + c.streamLock.Unlock() + + c.log.Infof("Config stream established for client %s", c.params.ClientName) + + // Process stream events + for { + event, err := stream.Recv() + if err != nil { + if err == io.EOF { + c.log.Info("Config stream closed by server") + return nil + } + return fmt.Errorf("stream receive error: %w", err) + } + + if err := c.handleConfigEvent(event); err != nil { + c.log.Errorf("Failed to handle config event: %v", err) + } + } +} + +// handleConfigEvent processes a single config event from the stream +func (c *consumer) handleConfigEvent(event *pb.ConfigEvent) error { + switch e := event.Event.(type) { + case *pb.ConfigEvent_Snapshot: + return c.applySnapshot(e.Snapshot) + case *pb.ConfigEvent_Update: + return c.applyUpdate(e.Update) + default: + return fmt.Errorf("unknown event type: %T", event.Event) + } +} + +// applySnapshot applies a complete config snapshot +func (c *consumer) applySnapshot(snapshot *pb.ConfigSnapshot) error { + // Reject out-of-order or server-restart snapshots. lastSeqID is never reset between + // reconnects: if the server restarts and its sequence counter resets to a lower value, + // we refuse the new snapshot and log an error. Sub-processes are expected to restart + // when the core agent restarts. + if snapshot.SequenceId <= c.lastSeqID { + c.log.Errorf("Received snapshot with seq_id %d <= current %d; the core agent may have restarted. "+ + "This sub-process must be restarted to accept a new configuration.", snapshot.SequenceId, c.lastSeqID) + c.droppedStaleUpdates.Inc() + return nil + } + + c.log.Infof("Applying config snapshot (seq_id: %d, settings: %d)", snapshot.SequenceId, len(snapshot.Settings)) + + // Convert protobuf settings to Go map + newConfig := make(map[string]interface{}, len(snapshot.Settings)) + for _, setting := range snapshot.Settings { + newConfig[setting.Key] = pbValueToGo(setting.Value) + } + + // Update effective config atomically + c.configLock.Lock() + c.effectiveConfig = newConfig + c.lastSeqID = snapshot.SequenceId + c.configLock.Unlock() + + c.lastSeqIDMetric.Set(float64(snapshot.SequenceId)) + + if c.configWriter != nil { + for _, setting := range snapshot.Settings { + c.configWriter.Set(setting.Key, newConfig[setting.Key], model.Source(setting.Source)) + } + } + + // Signal readiness once the snapshot is fully applied and config mirrored. + c.readyOnce.Do(func() { + close(c.readyCh) + c.ready = true + duration := time.Since(c.startTime) + c.timeToFirstSnapshot.Set(duration.Seconds()) + c.log.Infof("Received first config snapshot after %v", duration) + }) + + return nil +} + +// applyUpdate applies a single config update +func (c *consumer) applyUpdate(update *pb.ConfigUpdate) error { + // Check for stale update + if update.SequenceId <= c.lastSeqID { + c.log.Warnf("Ignoring stale update (seq_id: %d <= %d)", update.SequenceId, c.lastSeqID) + c.droppedStaleUpdates.Inc() + return nil + } + + // Detect discontinuity: trigger reconnect so the server sends a fresh snapshot. + if update.SequenceId != c.lastSeqID+1 { + return fmt.Errorf("seq_id discontinuity: expected %d, got %d", c.lastSeqID+1, update.SequenceId) + } + + c.log.Debugf("Applying config update (seq_id: %d, key: %s)", update.SequenceId, update.Setting.Key) + + newValue := pbValueToGo(update.Setting.Value) + + c.configLock.Lock() + c.effectiveConfig[update.Setting.Key] = newValue + c.lastSeqID = update.SequenceId + c.configLock.Unlock() + + c.lastSeqIDMetric.Set(float64(update.SequenceId)) + + if c.configWriter != nil { + c.configWriter.Set(update.Setting.Key, newValue, model.Source(update.Setting.Source)) + } + + return nil +} + +// initMetrics initializes telemetry metrics +func (c *consumer) initMetrics() { + c.timeToFirstSnapshot = c.telemetry.NewGauge( + "configstream_consumer", + "time_to_first_snapshot_seconds", + []string{}, + "Time taken to receive the first config snapshot", + ) + c.streamReconnectCount = c.telemetry.NewCounter( + "configstream_consumer", + "reconnect_count", + []string{}, + "Number of times the config stream has reconnected", + ) + c.lastSeqIDMetric = c.telemetry.NewGauge( + "configstream_consumer", + "last_sequence_id", + []string{}, + "Last received config sequence ID", + ) + c.droppedStaleUpdates = c.telemetry.NewCounter( + "configstream_consumer", + "dropped_stale_updates", + []string{}, + "Number of stale config updates dropped", + ) +} + +// pbValueToGo converts a protobuf Value to a Go value +func pbValueToGo(pbValue *structpb.Value) interface{} { + if pbValue == nil { + return nil + } + + // AsInterface() converts all numbers to float64 (JSON semantics). + // Try to preserve integer types when the float has no fractional part. + result := pbValue.AsInterface() + + if f, ok := result.(float64); ok { + // Only convert integers within float64's exact range (2^53); beyond that, + // float64 can't represent consecutive integers, so int64 conversion loses precision. + const maxExact float64 = 1 << 53 + if f >= -maxExact && f <= maxExact && f == math.Trunc(f) { + return int64(f) + } + } + + return result +} diff --git a/comp/core/configstreamconsumer/impl/consumer_test.go b/comp/core/configstreamconsumer/impl/consumer_test.go new file mode 100644 index 000000000000..45078b8dc2ad --- /dev/null +++ b/comp/core/configstreamconsumer/impl/consumer_test.go @@ -0,0 +1,444 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build test + +package configstreamconsumerimpl + +import ( + "context" + "fmt" + "io" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + ipcmock "github.com/DataDog/datadog-agent/comp/core/ipc/mock" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + "github.com/DataDog/datadog-agent/comp/core/telemetry/impl" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" +) + +// mockConfigStreamServer is a mock gRPC server for testing +type mockConfigStreamServer struct { + pb.UnimplementedAgentSecureServer + events chan *pb.ConfigEvent + closed bool +} + +func (m *mockConfigStreamServer) StreamConfigEvents(_ *pb.ConfigStreamRequest, stream pb.AgentSecure_StreamConfigEventsServer) error { + // Extract session_id from gRPC metadata + md, ok := metadata.FromIncomingContext(stream.Context()) + if !ok { + return status.Error(codes.Unauthenticated, "missing gRPC metadata") + } + + sessionIDs := md.Get("session_id") + if len(sessionIDs) == 0 || sessionIDs[0] == "" { + return status.Error(codes.Unauthenticated, "session_id required in metadata") + } + + for event := range m.events { + if m.closed { + return io.EOF + } + if err := stream.Send(event); err != nil { + return err + } + } + return nil +} + +// setupTestServer creates a test gRPC server and returns the server, address, and event channel +func setupTestServer(t *testing.T, ipcComp *ipcmock.IPCMock) (*grpc.Server, string, chan *pb.ConfigEvent, func()) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + opts := []grpc.ServerOption{ + grpc.Creds(credentials.NewTLS(ipcComp.GetTLSServerConfig())), + } + grpcServer := grpc.NewServer(opts...) + mockServer := &mockConfigStreamServer{ + events: make(chan *pb.ConfigEvent, 100), + } + pb.RegisterAgentSecureServer(grpcServer, mockServer) + + go func() { + _ = grpcServer.Serve(listener) + }() + + cleanup := func() { + mockServer.closed = true + close(mockServer.events) + grpcServer.Stop() + listener.Close() + } + + return grpcServer, listener.Addr().String(), mockServer.events, cleanup +} + +// createTestConsumer creates a consumer for testing +func createTestConsumer(t *testing.T, serverAddr string, ipcComp *ipcmock.IPCMock) (*consumer, func()) { + log := logmock.New(t) + telemetryComp := telemetryimpl.NewMock(t) + + c := &consumer{ + log: log, + ipc: ipcComp, + telemetry: telemetryComp, + params: configstreamconsumer.Params{ + ClientName: "test-client", + CoreAgentAddress: serverAddr, + SessionID: "test-session-123", + }, + effectiveConfig: make(map[string]interface{}), + readyCh: make(chan struct{}), + startTime: time.Now(), + } + c.initMetrics() + + cleanup := func() { + if c.cancel != nil { + c.cancel() + } + if c.stream != nil { + _ = c.stream.CloseSend() + } + if c.conn != nil { + _ = c.conn.Close() + } + } + + return c, cleanup +} + +func TestConsumerSnapshot(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send a snapshot + settings := []*pb.ConfigSetting{ + {Key: "test_string", Value: mustNewValue(t, "hello")}, + {Key: "test_int", Value: mustNewValue(t, int64(42))}, + {Key: "test_bool", Value: mustNewValue(t, true)}, + } + + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: settings, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming in a goroutine + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Verify config was applied to the effective config map + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "hello", consumer.effectiveConfig["test_string"]) + assert.Equal(t, int64(42), consumer.effectiveConfig["test_int"]) + assert.Equal(t, true, consumer.effectiveConfig["test_bool"]) + assert.Equal(t, int32(1), consumer.lastSeqID) +} + +func TestConsumerUpdates(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send initial snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "test_key", Value: mustNewValue(t, "initial")}, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send an update + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: 2, + Setting: &pb.ConfigSetting{ + Key: "test_key", + Value: mustNewValue(t, "updated"), + }, + }, + }, + } + + // Wait a bit for the update to be processed + time.Sleep(100 * time.Millisecond) + + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "updated", consumer.effectiveConfig["test_key"]) + assert.Equal(t, int32(2), consumer.lastSeqID) +} + +func TestConsumerStaleUpdates(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send initial snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 5, + Settings: []*pb.ConfigSetting{ + {Key: "test_key", Value: mustNewValue(t, "current")}, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send a stale update (seq_id <= current) + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: 3, + Setting: &pb.ConfigSetting{ + Key: "test_key", + Value: mustNewValue(t, "stale"), + }, + }, + }, + } + + // Wait a bit + time.Sleep(100 * time.Millisecond) + + // Verify stale update was NOT applied + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "current", consumer.effectiveConfig["test_key"]) + assert.Equal(t, int32(5), consumer.lastSeqID) +} + +// mustNewValue creates a structpb.Value or fails the test +func mustNewValue(t *testing.T, v interface{}) *structpb.Value { + val, err := structpb.NewValue(v) + require.NoError(t, err, fmt.Sprintf("failed to create Value from %v", v)) + return val +} + +func TestConsumerAppliesUpdatesInOrder(t *testing.T) { + t.Run("Consumer can start and block until snapshot", func(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Start streaming in background + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + // WaitReady should block + readyCtx, readyCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + err := consumer.waitReady(readyCtx) + readyCancel() + assert.Error(t, err, "should timeout before snapshot") + + // Send snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "ready", Value: mustNewValue(t, true)}, + }, + }, + }, + } + + // Now WaitReady should succeed + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err = consumer.waitReady(ctx) + assert.NoError(t, err, "should unblock after snapshot") + }) + + t.Run("Consumer applies updates in order", func(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + // Send snapshot and ordered updates + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send ordered updates + for i := 2; i <= 5; i++ { + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: int32(i), + Setting: &pb.ConfigSetting{ + Key: "counter", + Value: mustNewValue(t, int64(i)), + }, + }, + }, + } + time.Sleep(50 * time.Millisecond) + } + + // Verify final state + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, int64(5), consumer.effectiveConfig["counter"]) + assert.Equal(t, int32(5), consumer.lastSeqID) + }) +} + +// TestStartBlocksUntilSnapshot verifies that start blocks until the first snapshot is received, +// so the binary's run function sees a fully-populated config without calling WaitReady. +func TestStartBlocksUntilSnapshot(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send snapshot after a short delay to verify Start blocks. + go func() { + time.Sleep(50 * time.Millisecond) + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "server.port", Value: mustNewValue(t, int64(8080))}, + {Key: "feature.enabled", Value: mustNewValue(t, true)}, + }, + }, + }, + } + }() + + startTime := time.Now() + err := consumer.start(context.Background()) + startDuration := time.Since(startTime) + + require.NoError(t, err, "Start should succeed once snapshot is received") + assert.GreaterOrEqual(t, startDuration, 50*time.Millisecond, "Start should have blocked until snapshot arrived") + + // Config is guaranteed to be fully populated when Start returns. + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, int64(8080), consumer.effectiveConfig["server.port"]) + assert.Equal(t, true, consumer.effectiveConfig["feature.enabled"]) + + consumer.stop(context.Background()) +} + +// TestStartTimeoutFailsStartup verifies that start returns an error when the first snapshot +// is not received within ReadyTimeout, aborting FX startup. +func TestStartTimeoutFailsStartup(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, _, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Short timeout so the test doesn't take 60s. + consumer.params.ReadyTimeout = 200 * time.Millisecond + + startTime := time.Now() + err := consumer.start(context.Background()) + startDuration := time.Since(startTime) + + require.Error(t, err, "Start should fail when no snapshot received within timeout") + assert.Contains(t, err.Error(), "waiting for initial config snapshot") + assert.GreaterOrEqual(t, startDuration, 200*time.Millisecond, "should respect ReadyTimeout") + assert.Less(t, startDuration, 500*time.Millisecond, "should not wait longer than needed") +} diff --git a/comp/core/configstreamconsumer/mock/mock.go b/comp/core/configstreamconsumer/mock/mock.go new file mode 100644 index 000000000000..069ad3e8adfb --- /dev/null +++ b/comp/core/configstreamconsumer/mock/mock.go @@ -0,0 +1,25 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build test + +// Package mock provides mock for configstreamconsumer component +package mock + +import ( + "testing" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" +) + +// Mock is a mock implementation of configstreamconsumer.Component +type Mock struct { + t *testing.T +} + +// New creates a new mock configstreamconsumer component +func New(t *testing.T) configstreamconsumer.Component { + return &Mock{t: t} +} diff --git a/comp/core/remoteagent/helper/serverhelper.go b/comp/core/remoteagent/helper/serverhelper.go index cb184792afa2..5cb3ddf2f216 100644 --- a/comp/core/remoteagent/helper/serverhelper.go +++ b/comp/core/remoteagent/helper/serverhelper.go @@ -329,3 +329,23 @@ func (s *UnimplementedRemoteAgentServer) refreshRegistration() error { func (s *UnimplementedRemoteAgentServer) GetGRPCServer() *grpc.Server { return s.grpcServer } + +// WaitSessionID blocks until the remote agent is registered and a session ID is available, or ctx is done. +// It returns the session ID or an error if the context is cancelled before registration completes. +func (s *UnimplementedRemoteAgentServer) WaitSessionID(ctx context.Context) (string, error) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + s.sessionIDMutex.RLock() + sid := s.sessionID + s.sessionIDMutex.RUnlock() + if sid != "" { + return sid, nil + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-ticker.C: + } + } +} diff --git a/comp/core/remoteagent/impl-systemprobe/remoteagent.go b/comp/core/remoteagent/impl-systemprobe/remoteagent.go index 373916377012..20edba0e680b 100644 --- a/comp/core/remoteagent/impl-systemprobe/remoteagent.go +++ b/comp/core/remoteagent/impl-systemprobe/remoteagent.go @@ -104,6 +104,12 @@ func (r *remoteagentImpl) GetStatusDetails(_ context.Context, _ *pbcore.GetStatu }, nil } +// WaitSessionID blocks until the remote agent is registered and a session ID is available. +// This allows components that need the session ID (e.g. config stream consumer) to wait for RAR registration. +func (r *remoteagentImpl) WaitSessionID(ctx context.Context) (string, error) { + return r.remoteAgentServer.WaitSessionID(ctx) +} + func (r *remoteagentImpl) GetTelemetry(_ context.Context, _ *pbcore.GetTelemetryRequest) (*pbcore.GetTelemetryResponse, error) { prometheusText, err := r.telemetry.GatherText(false, telemetry.StaticMetricFilter( // Metrics to forward from system-probe to core agent. diff --git a/internal/remote-agent/main.go b/internal/remote-agent/main.go index 16163bb2f97f..a46a1f4f40c6 100644 --- a/internal/remote-agent/main.go +++ b/internal/remote-agent/main.go @@ -12,6 +12,7 @@ import ( "encoding/pem" "flag" "fmt" + "io" "log" "net" "os" @@ -166,6 +167,53 @@ func refreshRegistration(agentClient pbcore.AgentSecureClient, sessionID string) return nil } +// streamConfigEvents subscribes to the Core Agent config stream and logs received events. +// It runs until ctx is cancelled or the stream closes/errors. +func streamConfigEvents(ctx context.Context, agentIpcAddress, agentAuthToken, agentFlavor, sessionID string, cert tls.Certificate) { + tlsCreds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + // Test client: no server name verification needed. + InsecureSkipVerify: true, //nolint:gosec + }) + conn, err := grpc.NewClient(agentIpcAddress, + grpc.WithTransportCredentials(tlsCreds), + grpc.WithPerRPCCredentials(grpcutil.NewBearerTokenAuth(agentAuthToken)), + ) + if err != nil { + log.Printf("config stream: failed to connect: %v", err) + return + } + defer conn.Close() + + client := pbcore.NewAgentSecureClient(conn) + md := metadata.New(map[string]string{"session_id": sessionID}) + ctxWithMD := metadata.NewOutgoingContext(ctx, md) + + stream, err := client.StreamConfigEvents(ctxWithMD, &pbcore.ConfigStreamRequest{Name: agentFlavor}) + if err != nil { + log.Printf("config stream: failed to start: %v", err) + return + } + log.Println("Config stream established.") + for { + event, err := stream.Recv() + if err != nil { + if err == io.EOF || ctx.Err() != nil { + log.Println("Config stream closed.") + return + } + log.Printf("Config stream receive error: %v", err) + return + } + switch e := event.Event.(type) { + case *pbcore.ConfigEvent_Snapshot: + log.Printf("Config snapshot received: %d settings (seq=%d)", len(e.Snapshot.GetSettings()), e.Snapshot.GetSequenceId()) + case *pbcore.ConfigEvent_Update: + log.Printf("Config update received: seq=%d", e.Update.GetSequenceId()) + } + } +} + func main() { // Read in all of the necessary configuration for this remote agent. var agentFlavor string @@ -213,14 +261,22 @@ func main() { log.Printf("Spawned remote agent gRPC server on %s.", listenAddr) // Wait forever, periodically refreshing our registration. + var streamCancel context.CancelFunc refreshTicker := time.NewTicker(1 * time.Second) for range refreshTicker.C { if sessionID == "" { + if streamCancel != nil { + streamCancel() + streamCancel = nil + } var err error sessionID, agentClient, err = registerWithAgent(agentIpcAddress, agentAuthToken, agentFlavor, displayName, listenAddr, refreshTicker, tlsCert) if err != nil { continue } + var streamCtx context.Context + streamCtx, streamCancel = context.WithCancel(context.Background()) + go streamConfigEvents(streamCtx, agentIpcAddress, agentAuthToken, agentFlavor, sessionID, tlsCert) } else { err := refreshRegistration(agentClient, sessionID) if err != nil { diff --git a/pkg/config/model/types.go b/pkg/config/model/types.go index 1e24375b8eff..2d6b318ec069 100644 --- a/pkg/config/model/types.go +++ b/pkg/config/model/types.go @@ -44,9 +44,8 @@ const ( SourceConfigPostInit Source = "config-post-init" // SourceSecret are values resolved from secrets (ENC[...] placeholders). SourceSecret Source = "secret" - // SourceLocalConfigProcess are the values mirrored from the config process. The config process is the - // core-agent. This is used when side process like security-agent or trace-agent pull their configuration from - // the core-agent. + // SourceLocalConfigProcess are the values mirrored from the config process via the configsync HTTP + // polling mechanism. SourceLocalConfigProcess Source = "local-config-process" // SourceAgentRuntime are the values configured by the agent itself. The agent can dynamically compute the best // value for some settings when not set by the user. diff --git a/pkg/config/schema/compressed/core_schema.yaml.zstd b/pkg/config/schema/compressed/core_schema.yaml.zstd index 3fa24767049e..07bed2b03afe 100644 Binary files a/pkg/config/schema/compressed/core_schema.yaml.zstd and b/pkg/config/schema/compressed/core_schema.yaml.zstd differ diff --git a/pkg/config/schema/core_schema.yaml b/pkg/config/schema/core_schema.yaml index a0a2e0f31571..211f8d533231 100644 --- a/pkg/config/schema/core_schema.yaml +++ b/pkg/config/schema/core_schema.yaml @@ -13616,10 +13616,14 @@ properties: node_type: section type: object properties: - enabled: - node_type: setting - type: boolean - default: false + consumer: + node_type: section + type: object + properties: + enabled: + node_type: setting + type: boolean + default: false sleep_interval: node_type: setting type: number diff --git a/pkg/config/setup/common_settings.go b/pkg/config/setup/common_settings.go index cea91c39a910..25944862292f 100644 --- a/pkg/config/setup/common_settings.go +++ b/pkg/config/setup/common_settings.go @@ -1035,8 +1035,8 @@ func initCoreAgentFull(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("remote_agent.registry.idle_timeout", time.Duration(30*time.Second)) config.BindEnvAndSetDefault("remote_agent.registry.query_timeout", time.Duration(3*time.Second)) config.BindEnvAndSetDefault("remote_agent.registry.recommended_refresh_interval", time.Duration(10*time.Second)) - config.BindEnvAndSetDefault("remote_agent.configstream.enabled", false) config.BindEnvAndSetDefault("remote_agent.configstream.sleep_interval", 10*time.Second) + config.BindEnvAndSetDefault("remote_agent.configstream.consumer.enabled", false) // Data Plane config.BindEnvAndSetDefault("data_plane.enabled", false)