From d2b7bc14da04e9525da01f1eb5cd5de1be296547 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla <144174402+rahulkaukuntla@users.noreply.github.com> Date: Tue, 5 May 2026 15:01:11 +0200 Subject: [PATCH 1/8] [AGENTCFG-626] Adding a consumer component for streaming configuration updates from the core agent to remote agents registered via RAR (#46206) ### What does this PR do? Adds the **configstreamconsumer component** (`comp/core/configstreamconsumer`), a shared Go library remote agents use to consume configuration streams from the core Datadog Agent over gRPC. - **Simple API**: a single `WaitReady(ctx)` method; the component otherwise runs autonomously as an FX lifecycle hook. - **Startup gating**: `OnStart` blocks until the first config snapshot is received from the core agent, so all components initialized after this one see a fully-populated config. Startup is aborted if `Params.ReadyTimeout` (default 60s) is exceeded. - **Source fidelity**: streamed settings are written into `config.Component` using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process. `SourceLocalConfigProcess` is not used. - **Ordered updates**: sequential application by sequence ID; stale updates are dropped, discontinuities trigger a server-side resync. - **Reconnection**: the consumer reconnects automatically on stream failure with a 5s backoff; `lastSeqID` is never reset between reconnects so stale snapshots from a restarted core agent are rejected. - **RAR-gated**: requires a session ID from Remote Agent Registry, supplied either as a fixed value or via a `SessionIDProvider` (e.g. from the remote agent component) that is resolved at connect time. ### Motivation Remote agents (e.g. system-probe, trace-agent, process-agent) need to receive configuration from the core agent. This component provides a single, reusable consumer so each agent does not reimplement gRPC wiring, sequencing, snapshot gating, or reconnection logic. ### Describe how you validated your changes Added and successfully ran unit tests. Furthermore, I actually set up system-probe to use this new consumer component ([in this PR](https://github.com/DataDog/datadog-agent/pull/46281), which is built off these changes). I was successfully able to stream the config to system-probe: > 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:250 in run) | Waiting for initial configuration from core agent... 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/remoteagent/helper/serverhelper.go:267 in registerWithAgent) | Registered with Remote Agent Registry for config streaming (session_id=d3411ec4-62bb-4d3f-8411-56e00615392d). Recommended refresh interval: 10 seconds. 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:293 in connectAndStream) | Config stream established for client system-probe 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:333 in applySnapshot) | Applying config snapshot (seq_id: 2, settings: 1563) 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:358 in func1) | Received first config snapshot after 140.719085ms 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:254 in run) | Initial configuration received from core agent. Starting system-probe. 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:318 in startSystemProbe) | starting system-probe v7.77.0-devel+git.1266.7f2d2db ### Additional Notes Co-authored-by: rahul.kaukuntla --- .github/CODEOWNERS | 3 +- cmd/agent/subcommands/run/command_test.go | 6 +- comp/README.md | 8 +- comp/core/configstream/README.md | 10 +- comp/core/configstream/def/component.go | 2 +- comp/core/configstream/impl/configstream.go | 9 +- .../configstream/impl/configstream_test.go | 53 ++- comp/core/configstream/server/server.go | 4 + comp/core/configstream/server/server_test.go | 20 + comp/core/configstreamconsumer/README.md | 142 ++++++ .../core/configstreamconsumer/def/BUILD.bazel | 8 + .../configstreamconsumer/def/component.go | 43 ++ comp/core/configstreamconsumer/fx/BUILD.bazel | 1 + comp/core/configstreamconsumer/fx/fx.go | 21 + .../configstreamconsumer/impl/BUILD.bazel | 1 + .../configstreamconsumer/impl/consumer.go | 403 ++++++++++++++++ .../impl/consumer_test.go | 444 ++++++++++++++++++ .../configstreamconsumer/mock/BUILD.bazel | 1 + comp/core/configstreamconsumer/mock/mock.go | 25 + comp/core/remoteagent/helper/serverhelper.go | 20 + .../impl-systemprobe/remoteagent.go | 6 + internal/remote-agent/main.go | 56 +++ pkg/config/model/types.go | 5 +- 23 files changed, 1273 insertions(+), 18 deletions(-) create mode 100644 comp/core/configstreamconsumer/README.md create mode 100644 comp/core/configstreamconsumer/def/BUILD.bazel create mode 100644 comp/core/configstreamconsumer/def/component.go create mode 100644 comp/core/configstreamconsumer/fx/BUILD.bazel create mode 100644 comp/core/configstreamconsumer/fx/fx.go create mode 100644 comp/core/configstreamconsumer/impl/BUILD.bazel create mode 100644 comp/core/configstreamconsumer/impl/consumer.go create mode 100644 comp/core/configstreamconsumer/impl/consumer_test.go create mode 100644 comp/core/configstreamconsumer/mock/BUILD.bazel create mode 100644 comp/core/configstreamconsumer/mock/mock.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1efaf23a0a3d..9a348a769b08 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -362,7 +362,8 @@ /comp/checks/winregistry @DataDog/windows-products /comp/core/autodiscovery @DataDog/container-platform /comp/core/config @DataDog/agent-configuration -/comp/core/configstream @DataDog/agent-metric-pipelines @DataDog/agent-configuration +/comp/core/configstream @DataDog/agent-configuration +/comp/core/configstreamconsumer @DataDog/agent-configuration /comp/core/configsync @DataDog/agent-configuration /comp/core/flare @DataDog/agent-configuration /comp/core/gui @DataDog/agent-configuration diff --git a/cmd/agent/subcommands/run/command_test.go b/cmd/agent/subcommands/run/command_test.go index 5e3012b42191..704903f6ef12 100644 --- a/cmd/agent/subcommands/run/command_test.go +++ b/cmd/agent/subcommands/run/command_test.go @@ -43,11 +43,11 @@ func newGlobalParamsTest(t *testing.T) *command.GlobalParams { // which lead to build: // - config.Component which requires a valid datadog.yaml // - hostname.Component which requires a valid hostname - config := path.Join(t.TempDir(), "datadog.yaml") - err := os.WriteFile(config, []byte("hostname: test"), 0644) + configPath := path.Join(t.TempDir(), "datadog.yaml") + err := os.WriteFile(configPath, []byte("hostname: test"), 0644) require.NoError(t, err) return &command.GlobalParams{ - ConfFilePath: config, + ConfFilePath: configPath, } } diff --git a/comp/README.md b/comp/README.md index d272a8719c04..6b4e38cfb096 100644 --- a/comp/README.md +++ b/comp/README.md @@ -118,10 +118,16 @@ component temporarily wraps pkg/config. ### [comp/core/configstream](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstream) -*Datadog Team*: agent-metric-pipelines agent-configuration +*Datadog Team*: agent-configuration Package configstream implements a component to handle streaming configuration events to subscribers. +### [comp/core/configstreamconsumer](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstreamconsumer) + +*Datadog Team*: agent-configuration + +Package configstreamconsumer implements a component that consumes config streams from the core agent. + ### [comp/core/configsync](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configsync) *Datadog Team*: agent-configuration diff --git a/comp/core/configstream/README.md b/comp/core/configstream/README.md index 224dc0eb3280..79cf8b74e3f7 100644 --- a/comp/core/configstream/README.md +++ b/comp/core/configstream/README.md @@ -91,16 +91,18 @@ message ConfigSetting { ## Configuration -The config stream is automatically enabled when the component is loaded. No explicit configuration required. +The config stream is **only available** when both of the following are true on the core agent: +- `remote_agent.registry.enabled: true` +- `remote_agent.configstream.enabled: true` **Optional settings:** ```yaml # datadog.yaml remote_agent: registry: - enabled: true # Required for RAR-gated authorization + enabled: true # Required for RAR-gated authorization and for use_configstream configstream: - enabled: true # Required to use the configstreamconsumer + enabled: true # Enables config stream for remote agents (requires remote_agent.registry.enabled) sleep_interval: 10s # Backoff on non-terminal errors (default: 10s) agent_ipc: # Maximum size of a single gRPC message accepted/sent by the agent's gRPC @@ -288,6 +290,6 @@ log_level: debug ## Contact -- **Teams:** agent-metric-pipelines, agent-configuration +- **Team:** agent-configuration - **Component:** `comp/core/configstream` - **Test Client:** `cmd/config-stream-client` diff --git a/comp/core/configstream/def/component.go b/comp/core/configstream/def/component.go index 8b29806b3a2b..360ea947ef9b 100644 --- a/comp/core/configstream/def/component.go +++ b/comp/core/configstream/def/component.go @@ -10,7 +10,7 @@ import ( pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" ) -// team: agent-metric-pipelines agent-configuration +// team: agent-configuration // Component is the component type. type Component interface { diff --git a/comp/core/configstream/impl/configstream.go b/comp/core/configstream/impl/configstream.go index 5a6a61269231..aeeaf3de7cf6 100644 --- a/comp/core/configstream/impl/configstream.go +++ b/comp/core/configstream/impl/configstream.go @@ -9,6 +9,7 @@ package configstreamimpl import ( "context" "encoding/json" + "errors" "fmt" "path/filepath" "sync" @@ -67,7 +68,11 @@ type subscription struct { } // NewComponent creates a new configstream component. -func NewComponent(reqs Requires) Provides { +func NewComponent(reqs Requires) (Provides, error) { + if reqs.Config.GetBool("remote_agent.configstream.enabled") && !reqs.Config.GetBool("remote_agent.registry.enabled") { + return Provides{}, errors.New("remote_agent.configstream.enabled is true but remote_agent.registry.enabled is not; set remote_agent.registry.enabled: true to use config stream for remote agents") + } + cs := &configStream{ config: reqs.Config, log: reqs.Log, @@ -100,7 +105,7 @@ func NewComponent(reqs Requires) Provides { return Provides{ Comp: cs, - } + }, nil } // Subscribe returns a channel that streams configuration events, starting with a snapshot. diff --git a/comp/core/configstream/impl/configstream_test.go b/comp/core/configstream/impl/configstream_test.go index 84cb64d493ff..1b87c74ed750 100644 --- a/comp/core/configstream/impl/configstream_test.go +++ b/comp/core/configstream/impl/configstream_test.go @@ -281,6 +281,51 @@ done: } } +func TestNewComponentValidation(t *testing.T) { + mockLog := logmock.New(t) + telemetryComp := telemetrynoops.GetCompatComponent() + + t.Run("error when configstream enabled but registry disabled", func(t *testing.T) { + cfg := configmock.New(t) + cfg.SetWithoutSource("remote_agent.configstream.enabled", true) + cfg.SetWithoutSource("remote_agent.registry.enabled", false) + _, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "remote_agent.configstream.enabled is true but remote_agent.registry.enabled is not") + }) + + t.Run("no error when configstream disabled", func(t *testing.T) { + cfg := configmock.New(t) + cfg.SetWithoutSource("remote_agent.configstream.enabled", false) + cfg.SetWithoutSource("remote_agent.registry.enabled", false) + _, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, + }) + require.NoError(t, err) + }) + + t.Run("no error when both configstream and registry enabled", func(t *testing.T) { + cfg := configmock.New(t) + cfg.SetWithoutSource("remote_agent.configstream.enabled", true) + cfg.SetWithoutSource("remote_agent.registry.enabled", true) + _, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, + }) + require.NoError(t, err) + }) +} + // newConfigStreamForTest creates a config stream for testing without lifecycle func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Component) *configStream { telemetryComp := telemetrynoops.GetCompatComponent() @@ -290,7 +335,8 @@ func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Compo Log: logger, Telemetry: telemetryComp, } - provides := NewComponent(reqs) + provides, err := NewComponent(reqs) + require.NoError(t, err) // Extract the underlying configStream // and start the run loop manually since lifecycle hooks are not executed @@ -342,10 +388,11 @@ func buildComponent(t *testing.T) (Provides, *configInterceptor) { Telemetry: telemetrynoops.GetCompatComponent(), } - provides := NewComponent(reqs) + provides, err := NewComponent(reqs) + require.NoError(t, err) // Start the component's run loop - err := lc.Start(context.Background()) + err = lc.Start(context.Background()) require.NoError(t, err) t.Cleanup(func() { diff --git a/comp/core/configstream/server/server.go b/comp/core/configstream/server/server.go index 2d5324c1d16d..0ecbf436048f 100644 --- a/comp/core/configstream/server/server.go +++ b/comp/core/configstream/server/server.go @@ -40,7 +40,11 @@ func NewServer(cfg config.Component, comp configstream.Component, registry remot // StreamConfigEvents handles the gRPC streaming logic. // It requires the caller to be a registered remote agent (RAR-gated). +// The config stream is only available when remote_agent.configstream.enabled is true on the core agent. func (s *Server) StreamConfigEvents(req *pb.ConfigStreamRequest, stream pb.AgentSecure_StreamConfigEventsServer) error { + if !s.cfg.GetBool("remote_agent.configstream.enabled") { + return status.Error(codes.Unimplemented, "config stream is not enabled for remote agents; set remote_agent.configstream.enabled: true and remote_agent.registry.enabled: true on the core agent") + } if s.registry == nil { return status.Error(codes.Unimplemented, "remote agent registry not enabled") } diff --git a/comp/core/configstream/server/server_test.go b/comp/core/configstream/server/server_test.go index 4a0411ee1ce5..b40e03e8719a 100644 --- a/comp/core/configstream/server/server_test.go +++ b/comp/core/configstream/server/server_test.go @@ -80,6 +80,7 @@ func (m *mockRemoteAgentRegistry) GetRegisteredAgentStatuses() []remoteagentregi func setupTest(ctx context.Context, t *testing.T, sessionID string) (*Server, *mockComp, *mockStream, chan *pb.ConfigEvent) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} @@ -181,8 +182,25 @@ func TestRARAuthorization(t *testing.T) { Name: "test-client", } + t.Run("rejects request when remote_agent.configstream.enabled is false", func(t *testing.T) { + cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.enabled", false, model.SourceAgentRuntime) + comp := &mockComp{} + mockRAR := &mockRemoteAgentRegistry{} + server := NewServer(cfg, comp, mockRAR) + md := metadata.New(map[string]string{"session_id": "test-session-id"}) + ctx := metadata.NewIncomingContext(context.Background(), md) + stream := &mockStream{ctx: ctx} + + err := server.StreamConfigEvents(testReq, stream) + assert.Error(t, err) + assert.Equal(t, codes.Unimplemented, status.Code(err)) + require.ErrorContains(t, err, "config stream is not enabled") + }) + t.Run("rejects request with missing metadata", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -198,6 +216,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with missing session_id in metadata", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -215,6 +234,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with empty session_id", func(t *testing.T) { cfg := configmock.New(t) + cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) diff --git a/comp/core/configstreamconsumer/README.md b/comp/core/configstreamconsumer/README.md new file mode 100644 index 000000000000..f4fc7af223ee --- /dev/null +++ b/comp/core/configstreamconsumer/README.md @@ -0,0 +1,142 @@ +# Config Stream Consumer Component + +A shared Go library for remote agents (system-probe, trace-agent, process-agent, etc.) to consume configuration streams from the core Datadog Agent. It provides gRPC connection management, snapshot gating, and ordered config application, writing received settings directly into the agent's `config.Component`. + +## Overview + +- **Real-time config**: Receive full snapshot then incremental updates from the core agent over gRPC. +- **RAR-gated**: Only registered remote agents can subscribe; session ID is required (fixed or via `SessionIDProvider`). +- **Readiness gating**: `Start` blocks until the first config snapshot is received, aborting startup if `Params.ReadyTimeout` (default: 60s) is exceeded. +- **Single source of truth**: Streamed config is written into `config.Component` via `model.Writer`. Callers read config through `config.Component` directly — not through this component. +- **Ordered updates**: Sequential application by sequence ID; stale updates dropped, discontinuities trigger resync. +- **Restart safety**: `lastSeqID` is never reset on reconnect. If the core agent restarts and its sequence counter resets, the consumer logs an error and refuses the new snapshot until the sub-process itself restarts. +- **Telemetry**: Metrics for time-to-first-snapshot, reconnects, sequence ID, and dropped updates. + +## Architecture + +Producer (core agent) and consumer (remote agents) communicate over the same gRPC contract: + +``` +┌─────────────────────────┐ ┌─────────────────────────┐ +│ Core Agent Process │ │ Remote Agent Process │ +│ │ │ (e.g. system-probe) │ +│ ┌──────────────────┐ │ │ ┌──────────────────┐ │ +│ │ configstream │ │ gRPC │ │ configstream- │ │ +│ │ (producer) │◄──┼──────────┼─►│ consumer │ │ +│ │ │ │ stream │ │ │ │ +│ └──────────────────┘ │ │ └──────────────────┘ │ +└─────────────────────────┘ └─────────────────────────┘ +``` + +**Flow:** + +1. Remote agent registers with RAR and obtains `session_id` (or supplies it via `SessionIDProvider`). +2. Consumer connects to core agent and calls `StreamConfigEvents` with `session_id` in gRPC metadata. +3. Core agent validates the session and sends an initial snapshot, then streams incremental updates. +4. Consumer applies snapshot/updates in order and writes them into `config.Component` via `model.Writer`. + +See `../configstream/README.md` for the producer side and the gRPC/protobuf contract. + +## Quick Start + +Supply **either** a fixed `SessionID` **or** a `SessionIDProvider` (e.g. from the remote agent component). The consumer uses the provider at connect time so RAR can register first. + +## Wiring guide + +### Only include the module when the feature is enabled + +Including `configstreamconsumerfx.Module()` when config streaming is disabled will abort FX startup. Check the feature flag before building FX options and include the module conditionally: + +```go +if configstreamEnabled { + opts = append(opts, configstreamFxOptions()) +} +``` + +### Full example + +```go +func configstreamFxOptions() fx.Option { + return fx.Options( + // Bridge config.Component to model.Writer so the consumer can write streamed config. + fx.Provide(func(c config.Component) model.Writer { return c }), + + // Provide the SessionIDProvider from the remote agent (blocks until RAR registration). + fx.Provide(func(ra remoteagent.Component) configstreamconsumerimpl.SessionIDProvider { + if ra == nil { + return nil + } + if p, ok := ra.(configstreamconsumerimpl.SessionIDProvider); ok { + return p + } + return nil + }), + + // Provide Params — only reached when configstream is known to be enabled. + fx.Provide(func(c config.Component, deps struct { + fx.In + SessionProvider configstreamconsumerimpl.SessionIDProvider `optional:"true"` + }) *configstreamconsumerimpl.Params { + host := c.GetString("cmd_host") + port := c.GetInt("cmd_port") + if port <= 0 { + port = 5001 + } + return &configstreamconsumerimpl.Params{ + ClientName: "my-agent", + CoreAgentAddress: net.JoinHostPort(host, strconv.Itoa(port)), + SessionIDProvider: deps.SessionProvider, + } + }), + + configstreamconsumerfx.Module(), + // Force instantiation so Start runs and blocks until the first snapshot. + fx.Invoke(func(_ configstreamconsumer.Component) {}), + ) +} +``` + +## Requirements + +- **Core agent**: `configstream` component (`remote_agent.configstream.enabled: true`) and RAR enabled (`remote_agent.registry.enabled: true`). +- **RAR**: Remote agent must register with RAR before subscribing; pass `session_id` via gRPC metadata (supply fixed `SessionID` or `SessionIDProvider` with `WaitSessionID(ctx) (string, error)`). +- **IPC**: mTLS and auth token for gRPC (same as other core-agent IPC). +- **`model.Writer`**: `config.Component` must be explicitly provided as `model.Writer` in the same FX scope. Streamed settings are written using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process. + +## Telemetry + +| Metric | Type | Description | +|--------|------|-------------| +| `configstream_consumer.time_to_first_snapshot_seconds` | Gauge | Time to receive first snapshot | +| `configstream_consumer.reconnect_count` | Counter | Stream reconnections | +| `configstream_consumer.last_sequence_id` | Gauge | Last received config sequence ID | +| `configstream_consumer.dropped_stale_updates` | Counter | Stale updates dropped | + +## Testing + +### Manual testing with system-probe + +1. Start the core agent with RAR and config stream enabled. +2. Set `cmd_host` / `cmd_port` in the config used by system-probe. +3. Start system-probe. You should see: + - `Waiting for initial configuration from core agent...` + - After snapshot: `Initial configuration received from core agent. Starting system-probe.` +4. If the core agent is down or the stream never sends a snapshot, system-probe exits with: `waiting for initial config snapshot: context deadline exceeded`. + +## Troubleshooting + +- **session_id required in metadata** + Ensure the remote agent registers with RAR first and that the consumer is given either a fixed `SessionID` or a `SessionIDProvider` that returns the session ID. + +- **Startup timeout (no snapshot received within `ReadyTimeout`)** + Core agent must be running, config stream enabled, and RAR returning a valid session. Check core agent logs for config stream and RAR errors. + +- **"core agent may have restarted" error in logs** + The consumer received a snapshot with a lower sequence ID than its last-known value, indicating the core agent restarted. Restart the sub-process to accept the new configuration. + +## Related documentation + +- **Producer**: `../configstream/README.md` — core agent config streaming service and gRPC contract. +- **Test client**: `cmd/config-stream-client/README.md` — standalone client for end-to-end testing. + +**Team**: agent-configuration diff --git a/comp/core/configstreamconsumer/def/BUILD.bazel b/comp/core/configstreamconsumer/def/BUILD.bazel new file mode 100644 index 000000000000..8430fe781a88 --- /dev/null +++ b/comp/core/configstreamconsumer/def/BUILD.bazel @@ -0,0 +1,8 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "def", + srcs = ["component.go"], + importpath = "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def", + visibility = ["//visibility:public"], +) diff --git a/comp/core/configstreamconsumer/def/component.go b/comp/core/configstreamconsumer/def/component.go new file mode 100644 index 000000000000..aba7e492bc63 --- /dev/null +++ b/comp/core/configstreamconsumer/def/component.go @@ -0,0 +1,43 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package configstreamconsumer implements a component that consumes config streams from the core agent. +// +// team: agent-configuration +package configstreamconsumer + +import ( + "context" + "time" +) + +// SessionIDProvider supplies the RAR session ID, typically after registration completes. +// When set, the consumer will call WaitSessionID at connect time instead of using Params.SessionID. +type SessionIDProvider interface { + WaitSessionID(ctx context.Context) (string, error) +} + +// Params defines the parameters for the configstreamconsumer component +type Params struct { + // ClientName is the identity of this remote agent (e.g., "system-probe", "trace-agent") + ClientName string + // CoreAgentAddress is the address of the core agent IPC endpoint + CoreAgentAddress string + // SessionID is the RAR session ID for authorization. Required if SessionIDProvider is nil. + SessionID string + // SessionIDProvider supplies the session ID at connect time (e.g. from remote agent component). + // When set, SessionID may be empty; the consumer will block on WaitSessionID before connecting. + SessionIDProvider SessionIDProvider + // ReadyTimeout is how long OnStart blocks waiting for the first config snapshot before + // returning an error and aborting startup. Defaults to 60s when zero. + ReadyTimeout time.Duration +} + +// Component is the config stream consumer component interface. +// Its sole purpose is to receive configuration from the core agent stream and write it +// into the local config.Component via the model.Writer provided at construction. +// Callers that need to read config or subscribe to changes should use config.Component directly. +// Readiness is guaranteed by the FX lifecycle: start blocks until the first snapshot is received. +type Component interface{} diff --git a/comp/core/configstreamconsumer/fx/BUILD.bazel b/comp/core/configstreamconsumer/fx/BUILD.bazel new file mode 100644 index 000000000000..9968c4af55f8 --- /dev/null +++ b/comp/core/configstreamconsumer/fx/BUILD.bazel @@ -0,0 +1 @@ +# gazelle:ignore diff --git a/comp/core/configstreamconsumer/fx/fx.go b/comp/core/configstreamconsumer/fx/fx.go new file mode 100644 index 000000000000..618afb61f90a --- /dev/null +++ b/comp/core/configstreamconsumer/fx/fx.go @@ -0,0 +1,21 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package fx provides the fx module for the configstreamconsumer component +package fx + +import ( + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + configstreamconsumerimpl "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/impl" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" +) + +// Module defines the fx options for this component +func Module() fxutil.Module { + return fxutil.Component( + fxutil.ProvideComponentConstructor(configstreamconsumerimpl.NewComponent), + fxutil.ProvideOptional[configstreamconsumer.Component](), + ) +} diff --git a/comp/core/configstreamconsumer/impl/BUILD.bazel b/comp/core/configstreamconsumer/impl/BUILD.bazel new file mode 100644 index 000000000000..9968c4af55f8 --- /dev/null +++ b/comp/core/configstreamconsumer/impl/BUILD.bazel @@ -0,0 +1 @@ +# gazelle:ignore diff --git a/comp/core/configstreamconsumer/impl/consumer.go b/comp/core/configstreamconsumer/impl/consumer.go new file mode 100644 index 000000000000..c27dee8d8270 --- /dev/null +++ b/comp/core/configstreamconsumer/impl/consumer.go @@ -0,0 +1,403 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package configstreamconsumerimpl implements the configstreamconsumer component +package configstreamconsumerimpl + +import ( + "context" + "errors" + "fmt" + "io" + "math" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/structpb" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + ipc "github.com/DataDog/datadog-agent/comp/core/ipc/def" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + "github.com/DataDog/datadog-agent/comp/core/telemetry/def" + compdef "github.com/DataDog/datadog-agent/comp/def" + "github.com/DataDog/datadog-agent/pkg/config/model" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" + grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc" +) + +// Requires defines the dependencies for the configstreamconsumer component +type Requires struct { + compdef.In + + Lifecycle compdef.Lifecycle + Log log.Component + IPC ipc.Component + Telemetry telemetry.Component + ConfigWriter model.Writer + Params configstreamconsumer.Params +} + +// Provides defines the output of the configstreamconsumer component +type Provides struct { + compdef.Out + + Comp configstreamconsumer.Component +} + +// consumer implements the configstreamconsumer.Component interface +type consumer struct { + log log.Component + ipc ipc.Component + telemetry telemetry.Component + params configstreamconsumer.Params + configWriter model.Writer // writes streamed config into the local config.Component + + conn *grpc.ClientConn + client pb.AgentSecureClient + stream pb.AgentSecure_StreamConfigEventsClient + streamLock sync.Mutex + + effectiveConfig map[string]interface{} + configLock sync.RWMutex + lastSeqID int32 + + ready bool + readyCh chan struct{} + readyOnce sync.Once + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + startTime time.Time + + timeToFirstSnapshot telemetry.Gauge + streamReconnectCount telemetry.Counter + lastSeqIDMetric telemetry.Gauge + droppedStaleUpdates telemetry.Counter +} + +// NewComponent creates a new configstreamconsumer component +func NewComponent(reqs Requires) (Provides, error) { + p := reqs.Params + if p.ClientName == "" { + return Provides{}, errors.New("ClientName is required") + } + if p.CoreAgentAddress == "" { + return Provides{}, errors.New("CoreAgentAddress is required") + } + if p.SessionID == "" && p.SessionIDProvider == nil { + return Provides{}, fmt.Errorf("configstreamconsumer: neither SessionID nor SessionIDProvider set for client %s", p.ClientName) + } + if p.SessionID != "" && p.SessionIDProvider != nil { + return Provides{}, errors.New("exactly one of SessionID or SessionIDProvider must be set") + } + + c := &consumer{ + log: reqs.Log, + ipc: reqs.IPC, + telemetry: reqs.Telemetry, + params: p, + configWriter: reqs.ConfigWriter, + effectiveConfig: make(map[string]interface{}), + readyCh: make(chan struct{}), + } + + c.initMetrics() + + // Register lifecycle hooks + reqs.Lifecycle.Append(compdef.Hook{ + OnStart: c.start, + OnStop: c.stop, + }) + + return Provides{Comp: c}, nil +} + +// start initiates the config stream connection and blocks until the first config snapshot is +// received. Blocking here ensures all components initialized after this one (and the binary's +// run function) see a fully-populated config. Returns an error if the snapshot is not received +// within ReadyTimeout (default 60s), which aborts FX startup. +func (c *consumer) start(_ context.Context) error { + // Use context.Background() so the stream lifetime is not bounded by the + // Fx startup context, which expires after app.StartTimeout (~5 minutes). + c.ctx, c.cancel = context.WithCancel(context.Background()) + c.startTime = time.Now() + + c.wg.Add(1) + go c.streamLoop() + + timeout := c.params.ReadyTimeout + if timeout == 0 { + timeout = 60 * time.Second + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + c.log.Infof("Waiting for initial configuration from core agent (timeout: %v)...", timeout) + if err := c.waitReady(ctx); err != nil { + c.cancel() + c.wg.Wait() + return fmt.Errorf("waiting for initial config snapshot: %w", err) + } + c.log.Infof("Initial configuration received from core agent.") + return nil +} + +// stop gracefully shuts down the consumer +func (c *consumer) stop(_ context.Context) error { + c.cancel() + c.streamLock.Lock() + if c.stream != nil { + _ = c.stream.CloseSend() + } + if c.conn != nil { + _ = c.conn.Close() + } + c.streamLock.Unlock() + c.wg.Wait() + return nil +} + +// waitReady blocks until the first config snapshot has been received and applied +func (c *consumer) waitReady(ctx context.Context) error { + select { + case <-c.readyCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for config snapshot: %w", ctx.Err()) + } +} + +// streamLoop manages the lifecycle of the config stream connection +func (c *consumer) streamLoop() { + defer c.wg.Done() + + for { + select { + case <-c.ctx.Done(): + return + default: + } + + // Establish connection and stream + if err := c.connectAndStream(); err != nil { + if err == context.Canceled || c.ctx.Err() != nil { + return + } + c.log.Warnf("Config stream error: %v, reconnecting...", err) + c.streamReconnectCount.Inc() + + select { + case <-c.ctx.Done(): + return + case <-time.After(5 * time.Second): + continue + } + } + } +} + +// connectAndStream establishes a gRPC connection and processes the config stream +func (c *consumer) connectAndStream() error { + conn, err := grpc.NewClient(c.params.CoreAgentAddress, + grpc.WithTransportCredentials(credentials.NewTLS(c.ipc.GetTLSClientConfig())), + grpc.WithPerRPCCredentials(grpcutil.NewBearerTokenAuth(c.ipc.GetAuthToken())), + ) + if err != nil { + return fmt.Errorf("failed to connect to core agent: %w", err) + } + // Ensure conn is closed when this invocation exits (EOF, error, or context cancel). + defer conn.Close() + + c.streamLock.Lock() + c.conn = conn + c.client = pb.NewAgentSecureClient(conn) + c.streamLock.Unlock() + + sessionID := c.params.SessionID + if c.params.SessionIDProvider != nil { + var err error + sessionID, err = c.params.SessionIDProvider.WaitSessionID(c.ctx) + if err != nil { + return fmt.Errorf("waiting for session ID: %w", err) + } + } + // Add session_id to gRPC metadata + md := metadata.New(map[string]string{"session_id": sessionID}) + ctxWithMetadata := metadata.NewOutgoingContext(c.ctx, md) + + // Start streaming + stream, err := c.client.StreamConfigEvents(ctxWithMetadata, &pb.ConfigStreamRequest{ + Name: c.params.ClientName, + }) + if err != nil { + return fmt.Errorf("failed to start config stream: %w", err) + } + + c.streamLock.Lock() + c.stream = stream + c.streamLock.Unlock() + + c.log.Infof("Config stream established for client %s", c.params.ClientName) + + // Process stream events + for { + event, err := stream.Recv() + if err != nil { + if err == io.EOF { + c.log.Info("Config stream closed by server") + return nil + } + return fmt.Errorf("stream receive error: %w", err) + } + + if err := c.handleConfigEvent(event); err != nil { + c.log.Errorf("Failed to handle config event: %v", err) + } + } +} + +// handleConfigEvent processes a single config event from the stream +func (c *consumer) handleConfigEvent(event *pb.ConfigEvent) error { + switch e := event.Event.(type) { + case *pb.ConfigEvent_Snapshot: + return c.applySnapshot(e.Snapshot) + case *pb.ConfigEvent_Update: + return c.applyUpdate(e.Update) + default: + return fmt.Errorf("unknown event type: %T", event.Event) + } +} + +// applySnapshot applies a complete config snapshot +func (c *consumer) applySnapshot(snapshot *pb.ConfigSnapshot) error { + // Reject out-of-order or server-restart snapshots. lastSeqID is never reset between + // reconnects: if the server restarts and its sequence counter resets to a lower value, + // we refuse the new snapshot and log an error. Sub-processes are expected to restart + // when the core agent restarts. + if snapshot.SequenceId <= c.lastSeqID { + c.log.Errorf("Received snapshot with seq_id %d <= current %d; the core agent may have restarted. "+ + "This sub-process must be restarted to accept a new configuration.", snapshot.SequenceId, c.lastSeqID) + c.droppedStaleUpdates.Inc() + return nil + } + + c.log.Infof("Applying config snapshot (seq_id: %d, settings: %d)", snapshot.SequenceId, len(snapshot.Settings)) + + // Convert protobuf settings to Go map + newConfig := make(map[string]interface{}, len(snapshot.Settings)) + for _, setting := range snapshot.Settings { + newConfig[setting.Key] = pbValueToGo(setting.Value) + } + + // Update effective config atomically + c.configLock.Lock() + c.effectiveConfig = newConfig + c.lastSeqID = snapshot.SequenceId + c.configLock.Unlock() + + c.lastSeqIDMetric.Set(float64(snapshot.SequenceId)) + + if c.configWriter != nil { + for _, setting := range snapshot.Settings { + c.configWriter.Set(setting.Key, newConfig[setting.Key], model.Source(setting.Source)) + } + } + + // Signal readiness once the snapshot is fully applied and config mirrored. + c.readyOnce.Do(func() { + close(c.readyCh) + c.ready = true + duration := time.Since(c.startTime) + c.timeToFirstSnapshot.Set(duration.Seconds()) + c.log.Infof("Received first config snapshot after %v", duration) + }) + + return nil +} + +// applyUpdate applies a single config update +func (c *consumer) applyUpdate(update *pb.ConfigUpdate) error { + // Check for stale update + if update.SequenceId <= c.lastSeqID { + c.log.Warnf("Ignoring stale update (seq_id: %d <= %d)", update.SequenceId, c.lastSeqID) + c.droppedStaleUpdates.Inc() + return nil + } + + // Detect discontinuity: trigger reconnect so the server sends a fresh snapshot. + if update.SequenceId != c.lastSeqID+1 { + return fmt.Errorf("seq_id discontinuity: expected %d, got %d", c.lastSeqID+1, update.SequenceId) + } + + c.log.Debugf("Applying config update (seq_id: %d, key: %s)", update.SequenceId, update.Setting.Key) + + newValue := pbValueToGo(update.Setting.Value) + + c.configLock.Lock() + c.effectiveConfig[update.Setting.Key] = newValue + c.lastSeqID = update.SequenceId + c.configLock.Unlock() + + c.lastSeqIDMetric.Set(float64(update.SequenceId)) + + if c.configWriter != nil { + c.configWriter.Set(update.Setting.Key, newValue, model.Source(update.Setting.Source)) + } + + return nil +} + +// initMetrics initializes telemetry metrics +func (c *consumer) initMetrics() { + c.timeToFirstSnapshot = c.telemetry.NewGauge( + "configstream_consumer", + "time_to_first_snapshot_seconds", + []string{}, + "Time taken to receive the first config snapshot", + ) + c.streamReconnectCount = c.telemetry.NewCounter( + "configstream_consumer", + "reconnect_count", + []string{}, + "Number of times the config stream has reconnected", + ) + c.lastSeqIDMetric = c.telemetry.NewGauge( + "configstream_consumer", + "last_sequence_id", + []string{}, + "Last received config sequence ID", + ) + c.droppedStaleUpdates = c.telemetry.NewCounter( + "configstream_consumer", + "dropped_stale_updates", + []string{}, + "Number of stale config updates dropped", + ) +} + +// pbValueToGo converts a protobuf Value to a Go value +func pbValueToGo(pbValue *structpb.Value) interface{} { + if pbValue == nil { + return nil + } + + // AsInterface() converts all numbers to float64 (JSON semantics). + // Try to preserve integer types when the float has no fractional part. + result := pbValue.AsInterface() + + if f, ok := result.(float64); ok { + // Only convert integers within float64's exact range (2^53); beyond that, + // float64 can't represent consecutive integers, so int64 conversion loses precision. + const maxExact float64 = 1 << 53 + if f >= -maxExact && f <= maxExact && f == math.Trunc(f) { + return int64(f) + } + } + + return result +} diff --git a/comp/core/configstreamconsumer/impl/consumer_test.go b/comp/core/configstreamconsumer/impl/consumer_test.go new file mode 100644 index 000000000000..45078b8dc2ad --- /dev/null +++ b/comp/core/configstreamconsumer/impl/consumer_test.go @@ -0,0 +1,444 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build test + +package configstreamconsumerimpl + +import ( + "context" + "fmt" + "io" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" + ipcmock "github.com/DataDog/datadog-agent/comp/core/ipc/mock" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + "github.com/DataDog/datadog-agent/comp/core/telemetry/impl" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" +) + +// mockConfigStreamServer is a mock gRPC server for testing +type mockConfigStreamServer struct { + pb.UnimplementedAgentSecureServer + events chan *pb.ConfigEvent + closed bool +} + +func (m *mockConfigStreamServer) StreamConfigEvents(_ *pb.ConfigStreamRequest, stream pb.AgentSecure_StreamConfigEventsServer) error { + // Extract session_id from gRPC metadata + md, ok := metadata.FromIncomingContext(stream.Context()) + if !ok { + return status.Error(codes.Unauthenticated, "missing gRPC metadata") + } + + sessionIDs := md.Get("session_id") + if len(sessionIDs) == 0 || sessionIDs[0] == "" { + return status.Error(codes.Unauthenticated, "session_id required in metadata") + } + + for event := range m.events { + if m.closed { + return io.EOF + } + if err := stream.Send(event); err != nil { + return err + } + } + return nil +} + +// setupTestServer creates a test gRPC server and returns the server, address, and event channel +func setupTestServer(t *testing.T, ipcComp *ipcmock.IPCMock) (*grpc.Server, string, chan *pb.ConfigEvent, func()) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + opts := []grpc.ServerOption{ + grpc.Creds(credentials.NewTLS(ipcComp.GetTLSServerConfig())), + } + grpcServer := grpc.NewServer(opts...) + mockServer := &mockConfigStreamServer{ + events: make(chan *pb.ConfigEvent, 100), + } + pb.RegisterAgentSecureServer(grpcServer, mockServer) + + go func() { + _ = grpcServer.Serve(listener) + }() + + cleanup := func() { + mockServer.closed = true + close(mockServer.events) + grpcServer.Stop() + listener.Close() + } + + return grpcServer, listener.Addr().String(), mockServer.events, cleanup +} + +// createTestConsumer creates a consumer for testing +func createTestConsumer(t *testing.T, serverAddr string, ipcComp *ipcmock.IPCMock) (*consumer, func()) { + log := logmock.New(t) + telemetryComp := telemetryimpl.NewMock(t) + + c := &consumer{ + log: log, + ipc: ipcComp, + telemetry: telemetryComp, + params: configstreamconsumer.Params{ + ClientName: "test-client", + CoreAgentAddress: serverAddr, + SessionID: "test-session-123", + }, + effectiveConfig: make(map[string]interface{}), + readyCh: make(chan struct{}), + startTime: time.Now(), + } + c.initMetrics() + + cleanup := func() { + if c.cancel != nil { + c.cancel() + } + if c.stream != nil { + _ = c.stream.CloseSend() + } + if c.conn != nil { + _ = c.conn.Close() + } + } + + return c, cleanup +} + +func TestConsumerSnapshot(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send a snapshot + settings := []*pb.ConfigSetting{ + {Key: "test_string", Value: mustNewValue(t, "hello")}, + {Key: "test_int", Value: mustNewValue(t, int64(42))}, + {Key: "test_bool", Value: mustNewValue(t, true)}, + } + + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: settings, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming in a goroutine + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Verify config was applied to the effective config map + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "hello", consumer.effectiveConfig["test_string"]) + assert.Equal(t, int64(42), consumer.effectiveConfig["test_int"]) + assert.Equal(t, true, consumer.effectiveConfig["test_bool"]) + assert.Equal(t, int32(1), consumer.lastSeqID) +} + +func TestConsumerUpdates(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send initial snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "test_key", Value: mustNewValue(t, "initial")}, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send an update + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: 2, + Setting: &pb.ConfigSetting{ + Key: "test_key", + Value: mustNewValue(t, "updated"), + }, + }, + }, + } + + // Wait a bit for the update to be processed + time.Sleep(100 * time.Millisecond) + + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "updated", consumer.effectiveConfig["test_key"]) + assert.Equal(t, int32(2), consumer.lastSeqID) +} + +func TestConsumerStaleUpdates(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send initial snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 5, + Settings: []*pb.ConfigSetting{ + {Key: "test_key", Value: mustNewValue(t, "current")}, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send a stale update (seq_id <= current) + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: 3, + Setting: &pb.ConfigSetting{ + Key: "test_key", + Value: mustNewValue(t, "stale"), + }, + }, + }, + } + + // Wait a bit + time.Sleep(100 * time.Millisecond) + + // Verify stale update was NOT applied + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, "current", consumer.effectiveConfig["test_key"]) + assert.Equal(t, int32(5), consumer.lastSeqID) +} + +// mustNewValue creates a structpb.Value or fails the test +func mustNewValue(t *testing.T, v interface{}) *structpb.Value { + val, err := structpb.NewValue(v) + require.NoError(t, err, fmt.Sprintf("failed to create Value from %v", v)) + return val +} + +func TestConsumerAppliesUpdatesInOrder(t *testing.T) { + t.Run("Consumer can start and block until snapshot", func(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Start streaming in background + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + // WaitReady should block + readyCtx, readyCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + err := consumer.waitReady(readyCtx) + readyCancel() + assert.Error(t, err, "should timeout before snapshot") + + // Send snapshot + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "ready", Value: mustNewValue(t, true)}, + }, + }, + }, + } + + // Now WaitReady should succeed + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err = consumer.waitReady(ctx) + assert.NoError(t, err, "should unblock after snapshot") + }) + + t.Run("Consumer applies updates in order", func(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Start streaming + consumer.ctx, consumer.cancel = context.WithCancel(context.Background()) + go func() { + _ = consumer.connectAndStream() + }() + + // Send snapshot and ordered updates + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := consumer.waitReady(ctx) + require.NoError(t, err) + + // Send ordered updates + for i := 2; i <= 5; i++ { + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Update{ + Update: &pb.ConfigUpdate{ + SequenceId: int32(i), + Setting: &pb.ConfigSetting{ + Key: "counter", + Value: mustNewValue(t, int64(i)), + }, + }, + }, + } + time.Sleep(50 * time.Millisecond) + } + + // Verify final state + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, int64(5), consumer.effectiveConfig["counter"]) + assert.Equal(t, int32(5), consumer.lastSeqID) + }) +} + +// TestStartBlocksUntilSnapshot verifies that start blocks until the first snapshot is received, +// so the binary's run function sees a fully-populated config without calling WaitReady. +func TestStartBlocksUntilSnapshot(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, events, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Send snapshot after a short delay to verify Start blocks. + go func() { + time.Sleep(50 * time.Millisecond) + events <- &pb.ConfigEvent{ + Event: &pb.ConfigEvent_Snapshot{ + Snapshot: &pb.ConfigSnapshot{ + SequenceId: 1, + Settings: []*pb.ConfigSetting{ + {Key: "server.port", Value: mustNewValue(t, int64(8080))}, + {Key: "feature.enabled", Value: mustNewValue(t, true)}, + }, + }, + }, + } + }() + + startTime := time.Now() + err := consumer.start(context.Background()) + startDuration := time.Since(startTime) + + require.NoError(t, err, "Start should succeed once snapshot is received") + assert.GreaterOrEqual(t, startDuration, 50*time.Millisecond, "Start should have blocked until snapshot arrived") + + // Config is guaranteed to be fully populated when Start returns. + consumer.configLock.RLock() + defer consumer.configLock.RUnlock() + assert.Equal(t, int64(8080), consumer.effectiveConfig["server.port"]) + assert.Equal(t, true, consumer.effectiveConfig["feature.enabled"]) + + consumer.stop(context.Background()) +} + +// TestStartTimeoutFailsStartup verifies that start returns an error when the first snapshot +// is not received within ReadyTimeout, aborting FX startup. +func TestStartTimeoutFailsStartup(t *testing.T) { + ipcComp := ipcmock.New(t) + _, serverAddr, _, cleanup := setupTestServer(t, ipcComp) + defer cleanup() + + consumer, cleanupConsumer := createTestConsumer(t, serverAddr, ipcComp) + defer cleanupConsumer() + + // Short timeout so the test doesn't take 60s. + consumer.params.ReadyTimeout = 200 * time.Millisecond + + startTime := time.Now() + err := consumer.start(context.Background()) + startDuration := time.Since(startTime) + + require.Error(t, err, "Start should fail when no snapshot received within timeout") + assert.Contains(t, err.Error(), "waiting for initial config snapshot") + assert.GreaterOrEqual(t, startDuration, 200*time.Millisecond, "should respect ReadyTimeout") + assert.Less(t, startDuration, 500*time.Millisecond, "should not wait longer than needed") +} diff --git a/comp/core/configstreamconsumer/mock/BUILD.bazel b/comp/core/configstreamconsumer/mock/BUILD.bazel new file mode 100644 index 000000000000..9968c4af55f8 --- /dev/null +++ b/comp/core/configstreamconsumer/mock/BUILD.bazel @@ -0,0 +1 @@ +# gazelle:ignore diff --git a/comp/core/configstreamconsumer/mock/mock.go b/comp/core/configstreamconsumer/mock/mock.go new file mode 100644 index 000000000000..069ad3e8adfb --- /dev/null +++ b/comp/core/configstreamconsumer/mock/mock.go @@ -0,0 +1,25 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build test + +// Package mock provides mock for configstreamconsumer component +package mock + +import ( + "testing" + + configstreamconsumer "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def" +) + +// Mock is a mock implementation of configstreamconsumer.Component +type Mock struct { + t *testing.T +} + +// New creates a new mock configstreamconsumer component +func New(t *testing.T) configstreamconsumer.Component { + return &Mock{t: t} +} diff --git a/comp/core/remoteagent/helper/serverhelper.go b/comp/core/remoteagent/helper/serverhelper.go index cb184792afa2..5cb3ddf2f216 100644 --- a/comp/core/remoteagent/helper/serverhelper.go +++ b/comp/core/remoteagent/helper/serverhelper.go @@ -329,3 +329,23 @@ func (s *UnimplementedRemoteAgentServer) refreshRegistration() error { func (s *UnimplementedRemoteAgentServer) GetGRPCServer() *grpc.Server { return s.grpcServer } + +// WaitSessionID blocks until the remote agent is registered and a session ID is available, or ctx is done. +// It returns the session ID or an error if the context is cancelled before registration completes. +func (s *UnimplementedRemoteAgentServer) WaitSessionID(ctx context.Context) (string, error) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + s.sessionIDMutex.RLock() + sid := s.sessionID + s.sessionIDMutex.RUnlock() + if sid != "" { + return sid, nil + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-ticker.C: + } + } +} diff --git a/comp/core/remoteagent/impl-systemprobe/remoteagent.go b/comp/core/remoteagent/impl-systemprobe/remoteagent.go index 373916377012..20edba0e680b 100644 --- a/comp/core/remoteagent/impl-systemprobe/remoteagent.go +++ b/comp/core/remoteagent/impl-systemprobe/remoteagent.go @@ -104,6 +104,12 @@ func (r *remoteagentImpl) GetStatusDetails(_ context.Context, _ *pbcore.GetStatu }, nil } +// WaitSessionID blocks until the remote agent is registered and a session ID is available. +// This allows components that need the session ID (e.g. config stream consumer) to wait for RAR registration. +func (r *remoteagentImpl) WaitSessionID(ctx context.Context) (string, error) { + return r.remoteAgentServer.WaitSessionID(ctx) +} + func (r *remoteagentImpl) GetTelemetry(_ context.Context, _ *pbcore.GetTelemetryRequest) (*pbcore.GetTelemetryResponse, error) { prometheusText, err := r.telemetry.GatherText(false, telemetry.StaticMetricFilter( // Metrics to forward from system-probe to core agent. diff --git a/internal/remote-agent/main.go b/internal/remote-agent/main.go index 16163bb2f97f..a46a1f4f40c6 100644 --- a/internal/remote-agent/main.go +++ b/internal/remote-agent/main.go @@ -12,6 +12,7 @@ import ( "encoding/pem" "flag" "fmt" + "io" "log" "net" "os" @@ -166,6 +167,53 @@ func refreshRegistration(agentClient pbcore.AgentSecureClient, sessionID string) return nil } +// streamConfigEvents subscribes to the Core Agent config stream and logs received events. +// It runs until ctx is cancelled or the stream closes/errors. +func streamConfigEvents(ctx context.Context, agentIpcAddress, agentAuthToken, agentFlavor, sessionID string, cert tls.Certificate) { + tlsCreds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + // Test client: no server name verification needed. + InsecureSkipVerify: true, //nolint:gosec + }) + conn, err := grpc.NewClient(agentIpcAddress, + grpc.WithTransportCredentials(tlsCreds), + grpc.WithPerRPCCredentials(grpcutil.NewBearerTokenAuth(agentAuthToken)), + ) + if err != nil { + log.Printf("config stream: failed to connect: %v", err) + return + } + defer conn.Close() + + client := pbcore.NewAgentSecureClient(conn) + md := metadata.New(map[string]string{"session_id": sessionID}) + ctxWithMD := metadata.NewOutgoingContext(ctx, md) + + stream, err := client.StreamConfigEvents(ctxWithMD, &pbcore.ConfigStreamRequest{Name: agentFlavor}) + if err != nil { + log.Printf("config stream: failed to start: %v", err) + return + } + log.Println("Config stream established.") + for { + event, err := stream.Recv() + if err != nil { + if err == io.EOF || ctx.Err() != nil { + log.Println("Config stream closed.") + return + } + log.Printf("Config stream receive error: %v", err) + return + } + switch e := event.Event.(type) { + case *pbcore.ConfigEvent_Snapshot: + log.Printf("Config snapshot received: %d settings (seq=%d)", len(e.Snapshot.GetSettings()), e.Snapshot.GetSequenceId()) + case *pbcore.ConfigEvent_Update: + log.Printf("Config update received: seq=%d", e.Update.GetSequenceId()) + } + } +} + func main() { // Read in all of the necessary configuration for this remote agent. var agentFlavor string @@ -213,14 +261,22 @@ func main() { log.Printf("Spawned remote agent gRPC server on %s.", listenAddr) // Wait forever, periodically refreshing our registration. + var streamCancel context.CancelFunc refreshTicker := time.NewTicker(1 * time.Second) for range refreshTicker.C { if sessionID == "" { + if streamCancel != nil { + streamCancel() + streamCancel = nil + } var err error sessionID, agentClient, err = registerWithAgent(agentIpcAddress, agentAuthToken, agentFlavor, displayName, listenAddr, refreshTicker, tlsCert) if err != nil { continue } + var streamCtx context.Context + streamCtx, streamCancel = context.WithCancel(context.Background()) + go streamConfigEvents(streamCtx, agentIpcAddress, agentAuthToken, agentFlavor, sessionID, tlsCert) } else { err := refreshRegistration(agentClient, sessionID) if err != nil { diff --git a/pkg/config/model/types.go b/pkg/config/model/types.go index 5eb5f01a2e80..59f3646d35ed 100644 --- a/pkg/config/model/types.go +++ b/pkg/config/model/types.go @@ -44,9 +44,8 @@ const ( SourceConfigPostInit Source = "config-post-init" // SourceSecret are values resolved from secrets (ENC[...] placeholders). SourceSecret Source = "secret" - // SourceLocalConfigProcess are the values mirrored from the config process. The config process is the - // core-agent. This is used when side process like security-agent or trace-agent pull their configuration from - // the core-agent. + // SourceLocalConfigProcess are the values mirrored from the config process via the configsync HTTP + // polling mechanism. SourceLocalConfigProcess Source = "local-config-process" // SourceAgentRuntime are the values configured by the agent itself. The agent can dynamically compute the best // value for some settings when not set by the user. From 62c6e7fc2a02d6fb1552e93e49a3876c835d3eb8 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla Date: Tue, 5 May 2026 11:49:36 -0400 Subject: [PATCH 2/8] avoiding a repeat of incident-53989 and incident-53990 --- comp/core/configstream/impl/configstream.go | 41 ++++++++++++++----- .../configstream/impl/configstream_test.go | 37 +++++++++++++++-- .../agent-platform/common/agent_behaviour.go | 4 ++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/comp/core/configstream/impl/configstream.go b/comp/core/configstream/impl/configstream.go index aeeaf3de7cf6..804bff6e54dd 100644 --- a/comp/core/configstream/impl/configstream.go +++ b/comp/core/configstream/impl/configstream.go @@ -44,6 +44,11 @@ type configStream struct { log log.Component telemetry telemetry.Component + // enabled is set once at construction; when false the run() goroutine is + // never started and Subscribe() returns a closed channel immediately so + // callers don't block on unserviced channels. + enabled bool + m sync.Mutex subscribers map[string]*subscription @@ -73,10 +78,13 @@ func NewComponent(reqs Requires) (Provides, error) { return Provides{}, errors.New("remote_agent.configstream.enabled is true but remote_agent.registry.enabled is not; set remote_agent.registry.enabled: true to use config stream for remote agents") } + enabled := reqs.Config.GetBool("remote_agent.configstream.enabled") + cs := &configStream{ config: reqs.Config, log: reqs.Log, telemetry: reqs.Telemetry, + enabled: enabled, subscribers: make(map[string]*subscription), subscribeChan: make(chan *subscription), unsubscribeChan: make(chan string), @@ -92,16 +100,21 @@ func NewComponent(reqs Requires) (Provides, error) { // Cache origin once at initialization to avoid lock contention in OnUpdate callback cs.origin = cs.getConfigOrigin() - reqs.Lifecycle.Append(compdef.Hook{ - OnStart: func(_ context.Context) error { - go cs.run() - return nil - }, - OnStop: func(_ context.Context) error { - close(cs.stopChan) - return nil - }, - }) + // Only start the background goroutine and OnUpdate listener when the feature + // is enabled. Skipping this at idle eliminates the per-update protobuf + // allocation overhead for users who haven't opted in to config streaming. + if enabled { + reqs.Lifecycle.Append(compdef.Hook{ + OnStart: func(_ context.Context) error { + go cs.run() + return nil + }, + OnStop: func(_ context.Context) error { + close(cs.stopChan) + return nil + }, + }) + } return Provides{ Comp: cs, @@ -110,7 +123,15 @@ func NewComponent(reqs Requires) (Provides, error) { // Subscribe returns a channel that streams configuration events, starting with a snapshot. // It also returns an unsubscribe function that must be called to clean up. +// If the configstream is disabled (remote_agent.configstream.enabled is false), it returns +// a pre-closed channel and a no-op unsubscribe so callers don't block on an unserviced channel. func (cs *configStream) Subscribe(req *pb.ConfigStreamRequest) (<-chan *pb.ConfigEvent, func()) { + if !cs.enabled { + ch := make(chan *pb.ConfigEvent) + close(ch) + return ch, func() {} + } + subID := fmt.Sprintf("%s-%s", req.Name, uuid.New().String()) subChan := make(chan *pb.ConfigEvent, 100) // Buffered channel to avoid blocking diff --git a/comp/core/configstream/impl/configstream_test.go b/comp/core/configstream/impl/configstream_test.go index 1b87c74ed750..13162a4ed741 100644 --- a/comp/core/configstream/impl/configstream_test.go +++ b/comp/core/configstream/impl/configstream_test.go @@ -312,6 +312,30 @@ func TestNewComponentValidation(t *testing.T) { require.NoError(t, err) }) + t.Run("Subscribe returns closed channel when disabled", func(t *testing.T) { + cfg := configmock.New(t) + cfg.SetWithoutSource("remote_agent.configstream.enabled", false) + cfg.SetWithoutSource("remote_agent.registry.enabled", false) + provides, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, + }) + require.NoError(t, err) + + ch, unsubscribe := provides.Comp.Subscribe(&pb.ConfigStreamRequest{Name: "test"}) + defer unsubscribe() + + // Channel must be immediately readable (closed), not block forever. + select { + case _, ok := <-ch: + assert.False(t, ok, "channel should be closed") + default: + t.Fatal("Subscribe returned a channel that is neither closed nor has a value; would block") + } + }) + t.Run("no error when both configstream and registry enabled", func(t *testing.T) { cfg := configmock.New(t) cfg.SetWithoutSource("remote_agent.configstream.enabled", true) @@ -326,7 +350,8 @@ func TestNewComponentValidation(t *testing.T) { }) } -// newConfigStreamForTest creates a config stream for testing without lifecycle +// newConfigStreamForTest creates a config stream for testing without lifecycle. +// It sets cs.enabled directly (same package) to avoid inflating the config seqID. func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Component) *configStream { telemetryComp := telemetrynoops.GetCompatComponent() reqs := Requires{ @@ -338,9 +363,10 @@ func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Compo provides, err := NewComponent(reqs) require.NoError(t, err) - // Extract the underlying configStream - // and start the run loop manually since lifecycle hooks are not executed + // Extract the underlying configStream, enable it, and start the run loop + // manually. cs := provides.Comp.(*configStream) + cs.enabled = true go cs.run() return cs @@ -372,6 +398,11 @@ func buildComponent(t *testing.T) (Provides, *configInterceptor) { log := logmock.New(t) cfg := configmock.New(t) + // Enable configstream so NewComponent registers the lifecycle hook and + // run() starts when lc.Start() is called. + cfg.SetWithoutSource("remote_agent.configstream.enabled", true) + cfg.SetWithoutSource("remote_agent.registry.enabled", true) + // Register keys used in tests cfg.BindEnvAndSetDefault("my.new.setting", "") cfg.BindEnvAndSetDefault("dropped.setting", "") diff --git a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go index 7eb6d90907ad..dcb09c18ff0d 100644 --- a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go +++ b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go @@ -418,6 +418,10 @@ func CheckADPEnabled(t *testing.T, client *TestClient) { require.NoError(tt, err) err = client.SetConfig(configFilePath, "data_plane.dogstatsd.enabled", "true") require.NoError(tt, err) + err = client.SetConfig(configFilePath, "remote_agent.registry.enabled", "true") + require.NoError(tt, err) + err = client.SetConfig(configFilePath, "remote_agent.configstream.enabled", "true") + require.NoError(tt, err) _, err = client.SvcManager.Restart(client.Helper.GetServiceName()) require.NoError(tt, err) From 5862fb51d2277fe604d5289c3145877baafc8e62 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla Date: Tue, 5 May 2026 13:39:11 -0400 Subject: [PATCH 3/8] change config option to remote_agent.configstream.consumer.enabled and make it the remote agent's responsibility to configure it properly --- comp/core/configstream/README.md | 13 ++- comp/core/configstream/impl/configstream.go | 48 +++-------- .../configstream/impl/configstream_test.go | 82 ++----------------- comp/core/configstream/server/server.go | 4 - comp/core/configstream/server/server_test.go | 23 +----- comp/core/configstreamconsumer/README.md | 7 +- pkg/config/setup/common_settings.go | 2 +- .../agent-platform/common/agent_behaviour.go | 5 +- 8 files changed, 35 insertions(+), 149 deletions(-) diff --git a/comp/core/configstream/README.md b/comp/core/configstream/README.md index 79cf8b74e3f7..ae6e013575e5 100644 --- a/comp/core/configstream/README.md +++ b/comp/core/configstream/README.md @@ -91,19 +91,18 @@ message ConfigSetting { ## Configuration -The config stream is **only available** when both of the following are true on the core agent: -- `remote_agent.registry.enabled: true` -- `remote_agent.configstream.enabled: true` +The config stream component always runs. Individual connections are RAR-gated: the caller must be a registered remote agent. -**Optional settings:** +**Settings:** ```yaml # datadog.yaml remote_agent: registry: - enabled: true # Required for RAR-gated authorization and for use_configstream + enabled: true # Required for RAR authorization; remote agents must register before subscribing configstream: - enabled: true # Enables config stream for remote agents (requires remote_agent.registry.enabled) - sleep_interval: 10s # Backoff on non-terminal errors (default: 10s) + sleep_interval: 10s # Backoff on non-terminal send errors (default: 10s) + consumer: + enabled: false # Default: false. Set to true on Go-based remote agents to enable the configstreamconsumer component. agent_ipc: # Maximum size of a single gRPC message accepted/sent by the agent's gRPC # server. Configstream snapshots can be large (the entire flattened agent diff --git a/comp/core/configstream/impl/configstream.go b/comp/core/configstream/impl/configstream.go index 804bff6e54dd..d2738d4aeff4 100644 --- a/comp/core/configstream/impl/configstream.go +++ b/comp/core/configstream/impl/configstream.go @@ -9,7 +9,6 @@ package configstreamimpl import ( "context" "encoding/json" - "errors" "fmt" "path/filepath" "sync" @@ -20,7 +19,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" configstream "github.com/DataDog/datadog-agent/comp/core/configstream/def" log "github.com/DataDog/datadog-agent/comp/core/log/def" - "github.com/DataDog/datadog-agent/comp/core/telemetry/def" + telemetry "github.com/DataDog/datadog-agent/comp/core/telemetry/def" compdef "github.com/DataDog/datadog-agent/comp/def" "github.com/DataDog/datadog-agent/pkg/config/model" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" @@ -44,11 +43,6 @@ type configStream struct { log log.Component telemetry telemetry.Component - // enabled is set once at construction; when false the run() goroutine is - // never started and Subscribe() returns a closed channel immediately so - // callers don't block on unserviced channels. - enabled bool - m sync.Mutex subscribers map[string]*subscription @@ -74,17 +68,10 @@ type subscription struct { // NewComponent creates a new configstream component. func NewComponent(reqs Requires) (Provides, error) { - if reqs.Config.GetBool("remote_agent.configstream.enabled") && !reqs.Config.GetBool("remote_agent.registry.enabled") { - return Provides{}, errors.New("remote_agent.configstream.enabled is true but remote_agent.registry.enabled is not; set remote_agent.registry.enabled: true to use config stream for remote agents") - } - - enabled := reqs.Config.GetBool("remote_agent.configstream.enabled") - cs := &configStream{ config: reqs.Config, log: reqs.Log, telemetry: reqs.Telemetry, - enabled: enabled, subscribers: make(map[string]*subscription), subscribeChan: make(chan *subscription), unsubscribeChan: make(chan string), @@ -100,21 +87,16 @@ func NewComponent(reqs Requires) (Provides, error) { // Cache origin once at initialization to avoid lock contention in OnUpdate callback cs.origin = cs.getConfigOrigin() - // Only start the background goroutine and OnUpdate listener when the feature - // is enabled. Skipping this at idle eliminates the per-update protobuf - // allocation overhead for users who haven't opted in to config streaming. - if enabled { - reqs.Lifecycle.Append(compdef.Hook{ - OnStart: func(_ context.Context) error { - go cs.run() - return nil - }, - OnStop: func(_ context.Context) error { - close(cs.stopChan) - return nil - }, - }) - } + reqs.Lifecycle.Append(compdef.Hook{ + OnStart: func(_ context.Context) error { + go cs.run() + return nil + }, + OnStop: func(_ context.Context) error { + close(cs.stopChan) + return nil + }, + }) return Provides{ Comp: cs, @@ -123,15 +105,7 @@ func NewComponent(reqs Requires) (Provides, error) { // Subscribe returns a channel that streams configuration events, starting with a snapshot. // It also returns an unsubscribe function that must be called to clean up. -// If the configstream is disabled (remote_agent.configstream.enabled is false), it returns -// a pre-closed channel and a no-op unsubscribe so callers don't block on an unserviced channel. func (cs *configStream) Subscribe(req *pb.ConfigStreamRequest) (<-chan *pb.ConfigEvent, func()) { - if !cs.enabled { - ch := make(chan *pb.ConfigEvent) - close(ch) - return ch, func() {} - } - subID := fmt.Sprintf("%s-%s", req.Name, uuid.New().String()) subChan := make(chan *pb.ConfigEvent, 100) // Buffered channel to avoid blocking diff --git a/comp/core/configstream/impl/configstream_test.go b/comp/core/configstream/impl/configstream_test.go index 13162a4ed741..90c55a38d83e 100644 --- a/comp/core/configstream/impl/configstream_test.go +++ b/comp/core/configstream/impl/configstream_test.go @@ -281,77 +281,21 @@ done: } } -func TestNewComponentValidation(t *testing.T) { +func TestNewComponentNoError(t *testing.T) { mockLog := logmock.New(t) telemetryComp := telemetrynoops.GetCompatComponent() - - t.Run("error when configstream enabled but registry disabled", func(t *testing.T) { - cfg := configmock.New(t) - cfg.SetWithoutSource("remote_agent.configstream.enabled", true) - cfg.SetWithoutSource("remote_agent.registry.enabled", false) - _, err := NewComponent(Requires{ - Lifecycle: compdef.NewTestLifecycle(t), - Config: cfg, - Log: mockLog, - Telemetry: telemetryComp, - }) - require.Error(t, err) - require.Contains(t, err.Error(), "remote_agent.configstream.enabled is true but remote_agent.registry.enabled is not") - }) - - t.Run("no error when configstream disabled", func(t *testing.T) { - cfg := configmock.New(t) - cfg.SetWithoutSource("remote_agent.configstream.enabled", false) - cfg.SetWithoutSource("remote_agent.registry.enabled", false) - _, err := NewComponent(Requires{ - Lifecycle: compdef.NewTestLifecycle(t), - Config: cfg, - Log: mockLog, - Telemetry: telemetryComp, - }) - require.NoError(t, err) - }) - - t.Run("Subscribe returns closed channel when disabled", func(t *testing.T) { - cfg := configmock.New(t) - cfg.SetWithoutSource("remote_agent.configstream.enabled", false) - cfg.SetWithoutSource("remote_agent.registry.enabled", false) - provides, err := NewComponent(Requires{ - Lifecycle: compdef.NewTestLifecycle(t), - Config: cfg, - Log: mockLog, - Telemetry: telemetryComp, - }) - require.NoError(t, err) - - ch, unsubscribe := provides.Comp.Subscribe(&pb.ConfigStreamRequest{Name: "test"}) - defer unsubscribe() - - // Channel must be immediately readable (closed), not block forever. - select { - case _, ok := <-ch: - assert.False(t, ok, "channel should be closed") - default: - t.Fatal("Subscribe returned a channel that is neither closed nor has a value; would block") - } - }) - - t.Run("no error when both configstream and registry enabled", func(t *testing.T) { - cfg := configmock.New(t) - cfg.SetWithoutSource("remote_agent.configstream.enabled", true) - cfg.SetWithoutSource("remote_agent.registry.enabled", true) - _, err := NewComponent(Requires{ - Lifecycle: compdef.NewTestLifecycle(t), - Config: cfg, - Log: mockLog, - Telemetry: telemetryComp, - }) - require.NoError(t, err) + cfg := configmock.New(t) + _, err := NewComponent(Requires{ + Lifecycle: compdef.NewTestLifecycle(t), + Config: cfg, + Log: mockLog, + Telemetry: telemetryComp, }) + require.NoError(t, err) } // newConfigStreamForTest creates a config stream for testing without lifecycle. -// It sets cs.enabled directly (same package) to avoid inflating the config seqID. +// It manually starts the run loop since the test lifecycle does not execute hooks. func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Component) *configStream { telemetryComp := telemetrynoops.GetCompatComponent() reqs := Requires{ @@ -363,10 +307,7 @@ func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Compo provides, err := NewComponent(reqs) require.NoError(t, err) - // Extract the underlying configStream, enable it, and start the run loop - // manually. cs := provides.Comp.(*configStream) - cs.enabled = true go cs.run() return cs @@ -398,11 +339,6 @@ func buildComponent(t *testing.T) (Provides, *configInterceptor) { log := logmock.New(t) cfg := configmock.New(t) - // Enable configstream so NewComponent registers the lifecycle hook and - // run() starts when lc.Start() is called. - cfg.SetWithoutSource("remote_agent.configstream.enabled", true) - cfg.SetWithoutSource("remote_agent.registry.enabled", true) - // Register keys used in tests cfg.BindEnvAndSetDefault("my.new.setting", "") cfg.BindEnvAndSetDefault("dropped.setting", "") diff --git a/comp/core/configstream/server/server.go b/comp/core/configstream/server/server.go index 0ecbf436048f..2d5324c1d16d 100644 --- a/comp/core/configstream/server/server.go +++ b/comp/core/configstream/server/server.go @@ -40,11 +40,7 @@ func NewServer(cfg config.Component, comp configstream.Component, registry remot // StreamConfigEvents handles the gRPC streaming logic. // It requires the caller to be a registered remote agent (RAR-gated). -// The config stream is only available when remote_agent.configstream.enabled is true on the core agent. func (s *Server) StreamConfigEvents(req *pb.ConfigStreamRequest, stream pb.AgentSecure_StreamConfigEventsServer) error { - if !s.cfg.GetBool("remote_agent.configstream.enabled") { - return status.Error(codes.Unimplemented, "config stream is not enabled for remote agents; set remote_agent.configstream.enabled: true and remote_agent.registry.enabled: true on the core agent") - } if s.registry == nil { return status.Error(codes.Unimplemented, "remote agent registry not enabled") } diff --git a/comp/core/configstream/server/server_test.go b/comp/core/configstream/server/server_test.go index b40e03e8719a..f634f120eb96 100644 --- a/comp/core/configstream/server/server_test.go +++ b/comp/core/configstream/server/server_test.go @@ -80,7 +80,6 @@ func (m *mockRemoteAgentRegistry) GetRegisteredAgentStatuses() []remoteagentregi func setupTest(ctx context.Context, t *testing.T, sessionID string) (*Server, *mockComp, *mockStream, chan *pb.ConfigEvent) { cfg := configmock.New(t) - cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} @@ -182,25 +181,9 @@ func TestRARAuthorization(t *testing.T) { Name: "test-client", } - t.Run("rejects request when remote_agent.configstream.enabled is false", func(t *testing.T) { - cfg := configmock.New(t) - cfg.Set("remote_agent.configstream.enabled", false, model.SourceAgentRuntime) - comp := &mockComp{} - mockRAR := &mockRemoteAgentRegistry{} - server := NewServer(cfg, comp, mockRAR) - md := metadata.New(map[string]string{"session_id": "test-session-id"}) - ctx := metadata.NewIncomingContext(context.Background(), md) - stream := &mockStream{ctx: ctx} - - err := server.StreamConfigEvents(testReq, stream) - assert.Error(t, err) - assert.Equal(t, codes.Unimplemented, status.Code(err)) - require.ErrorContains(t, err, "config stream is not enabled") - }) - t.Run("rejects request with missing metadata", func(t *testing.T) { cfg := configmock.New(t) - cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -216,7 +199,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with missing session_id in metadata", func(t *testing.T) { cfg := configmock.New(t) - cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) @@ -234,7 +217,7 @@ func TestRARAuthorization(t *testing.T) { t.Run("rejects request with empty session_id", func(t *testing.T) { cfg := configmock.New(t) - cfg.Set("remote_agent.configstream.enabled", true, model.SourceAgentRuntime) + cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime) comp := &mockComp{} mockRAR := &mockRemoteAgentRegistry{} server := NewServer(cfg, comp, mockRAR) diff --git a/comp/core/configstreamconsumer/README.md b/comp/core/configstreamconsumer/README.md index f4fc7af223ee..2606e243e735 100644 --- a/comp/core/configstreamconsumer/README.md +++ b/comp/core/configstreamconsumer/README.md @@ -45,10 +45,10 @@ Supply **either** a fixed `SessionID` **or** a `SessionIDProvider` (e.g. from th ### Only include the module when the feature is enabled -Including `configstreamconsumerfx.Module()` when config streaming is disabled will abort FX startup. Check the feature flag before building FX options and include the module conditionally: +Including `configstreamconsumerfx.Module()` when config streaming is disabled will abort FX startup. Gate on `remote_agent.configstream.consumer.enabled` before building FX options: ```go -if configstreamEnabled { +if cfg.GetBool("remote_agent.configstream.consumer.enabled") { opts = append(opts, configstreamFxOptions()) } ``` @@ -98,7 +98,8 @@ func configstreamFxOptions() fx.Option { ## Requirements -- **Core agent**: `configstream` component (`remote_agent.configstream.enabled: true`) and RAR enabled (`remote_agent.registry.enabled: true`). +- **Core agent**: configstream component (always on by default) and RAR enabled (`remote_agent.registry.enabled: true`). +- **Consumer opt-in**: Set `remote_agent.configstream.consumer.enabled: true` on the remote agent to enable this component. - **RAR**: Remote agent must register with RAR before subscribing; pass `session_id` via gRPC metadata (supply fixed `SessionID` or `SessionIDProvider` with `WaitSessionID(ctx) (string, error)`). - **IPC**: mTLS and auth token for gRPC (same as other core-agent IPC). - **`model.Writer`**: `config.Component` must be explicitly provided as `model.Writer` in the same FX scope. Streamed settings are written using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process. diff --git a/pkg/config/setup/common_settings.go b/pkg/config/setup/common_settings.go index cf4f4854c733..a7aa447885fe 100644 --- a/pkg/config/setup/common_settings.go +++ b/pkg/config/setup/common_settings.go @@ -1049,8 +1049,8 @@ func initCoreAgentFull(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("remote_agent.registry.idle_timeout", time.Duration(30*time.Second)) config.BindEnvAndSetDefault("remote_agent.registry.query_timeout", time.Duration(3*time.Second)) config.BindEnvAndSetDefault("remote_agent.registry.recommended_refresh_interval", time.Duration(10*time.Second)) - config.BindEnvAndSetDefault("remote_agent.configstream.enabled", false) config.BindEnvAndSetDefault("remote_agent.configstream.sleep_interval", 10*time.Second) + config.BindEnvAndSetDefault("remote_agent.configstream.consumer.enabled", false) // Data Plane config.BindEnvAndSetDefault("data_plane.enabled", false) diff --git a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go index dcb09c18ff0d..d45a9a5ae11f 100644 --- a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go +++ b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go @@ -418,10 +418,7 @@ func CheckADPEnabled(t *testing.T, client *TestClient) { require.NoError(tt, err) err = client.SetConfig(configFilePath, "data_plane.dogstatsd.enabled", "true") require.NoError(tt, err) - err = client.SetConfig(configFilePath, "remote_agent.registry.enabled", "true") - require.NoError(tt, err) - err = client.SetConfig(configFilePath, "remote_agent.configstream.enabled", "true") - require.NoError(tt, err) + _, err = client.SvcManager.Restart(client.Helper.GetServiceName()) require.NoError(tt, err) From 2af3ff778f20f11fae0e4e4452b39a0cfe812c95 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla Date: Tue, 5 May 2026 13:58:49 -0400 Subject: [PATCH 4/8] update schema --- .../schema/compressed/core_schema.yaml.zstd | Bin 62143 -> 62167 bytes pkg/config/schema/core_schema.yaml | 12 ++++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/config/schema/compressed/core_schema.yaml.zstd b/pkg/config/schema/compressed/core_schema.yaml.zstd index ec5bcb921a221cbc31c184e82e20138fd88a5606..e47a39a6d556eab74b80fdd9b0c1e59fb9c9b88e 100644 GIT binary patch delta 4870 zcmV+h6Z!1F9IZ&00IYPGI=Br353Bg45Ao{fgp%s7>HpI2!R-4l!4M)PGfwhpQgwC$~tYK z--GjhNS4L-L65>f^@J0!B|^?=qpP!+$`*50*Wb`hY%Gf@BR-}<;vantn9tOH9k*c#6cQib*U1$s3tKt zzh4U&uUKtR@Q_DJxV4C|$vAXI;e;2+J49`(fk{5J{ey1h;nJW&hR<4mV zM4Uv<`&iu{buDQptML?Ey?AN%0tZw)+kB8h~7K@)GG*UTt zru#k(hWP8oJb84tMYWg{kYWr%m5$WIC`E@51-6{F7U?l-{YU%B&%odwqJYjiF(VFw zfBiqQ<{v~t!%=^1Q7SRu?0kC`s!y#9c)&$UOdqHglO>K_o*F%8`hwcqX^WTm!8BRo zAJ7XSn<8(*rVea`9pDNQi z3?R7=*!aQKC9UelCT}QB_7@a+nF%yoe;GQimPa9Xy4>ni#b&;h%2x^+z$HfgYs*t4 zA4#bwDx9@|4b7r6DSAr8Bs3uG``r0W_vdX{uR)|58d!@6y8@9#p0+hbTOkW6%IbqOY^uta9wfua6E?~)e*@tjTYhGtuLle) z3bFeI_M27kNT4rfcjdUig8Jbyx&PD~-&*Kf$DX$n4$E=;Q)=|KFHunPe@JF2p#bPq z=?3(+l7A2hhXK+X_}*FI*8cwd#s<@AONSA{rsNORK83YLT zkv62+RWmi$Hy}%vd2+zD_-2Qx(IIb=T%(!cprk-HZmA@9rkNCG&B^xH2NSwe1Y+Zu zGI1N9L8Fq0ogIY=k6euGerO!WZ4#wdKGrW_>51Q`U5u3Pg`VXVaB&ex8&I%C6>Be7aHC;P^* zjJFov*<86^oDq1~9jZbeXTU4Y;Amd#SyIW@c7OZ}ww%C#%k%2Kf2*@#4-4-?MI%Gw zdyC~gOk`YSapw!X=Y#WgVRVa`(k*dZ8rglxc*qnPqNr3M;agCA`M^A=hdSww_*UTa z9Jy(-6+|TZC=3Mf;H3!U3(V9hsR*83ntre?1SIuDM3`FnK81=Sx{|(@17fOYUP|m8KRfc*}cL+7^B84BiGT z3-iG(brI#6NExAbI73|_-2QZ>n5o#$cs*Ss_7A~@f|Y@KB3)*9azvsziXe$t_1CUu zNuRRxjzcjEttt=gnC)Es+x|rxp_R-UpR=x5DVn7&0knqH51MbZhduHik@cDCm?>&J-v!%kE&+J&bE}{Qm zxG^g5w}-p4zTB!Lg>0;^6c;o5_%={ydbFHY7MYbZN(vEe0rNS1yjHC4&9HoG=p`NF z?~2@MynHMJe+{7}u!wV$rWiKtlRFen7R`;U#l4rU-lypgZrUivu2Ow*ivsqA8OB2c zK)ycyP-4t^N!Ry^*I(z5ZaF|C(F2!g8nSQUo=nZORwdWf>t)d1I+L~x42F>%7;WO zclWr+e;-Wnk5v?BB+dd0E-6b_omfZ-IejTbV!-FpErB_u>9Ge<)u)9@Q+i#j8D{mD zu)xA(qKI#m@1tHJ3x{#(uMTjZt*ISZRXx!MP9OwRvTEzXH#AOSkHz$g7mRU=d7t~Rn&&ET)SB@`m8KUNB@(~t+LNw$hp%A!0 zJ7znwLJ&i{L=8K?^=EE|mh$vrgmNU-`yUoM+lvhZXV)DJv6h78PSJNG^zYly=lc#o ze`?fM@iu@OGldzL4!@mU!J!-n_&8s()qIu#o6dV;g%)y#WBj%Wg4sP;lfHTTq)nszXSAJo`7N{XcBJ6FbYi9ZjHlGGF z4*-s-AL&hY;#KI0x3DrW@20}g0kUg>MQpwBUvh>NUq{5qrT>C((+VuYRP#JQkDLkl zY#5(i(l93o{@7r-Q;sats>1#;e{rzz#d#d(0$nVUk}xarVc_iG?MeJDlU@QG+FLZc zpDfHt{=_-+Hdwp3rwku++b>MYMY|)_9BZHE%uJRuPna2j;u|uJ8kJul<>w<{?*OB} zOjN~H7NUzBJPb!hRuc?4u|bATd)(7^86z5m?D-;7d}(M>RDP8QTtUK7epD@^J@tG2u$YxDW(&o{xO13%(ueyO>8cox$9ye9n2Df zr6X(!`w1_Be+{<3DQ|?k34j<6l@KQx6>3+5omF?_<4dsgqRA>hvmy7|rzFguaDls; z)Ot6?OV*J=rq(x+KRey(e~GiE-XXzR#3p=MipIi(0%lUd;Mb?nvC*izau@V|xDnt+ zCfTKAtnqE)VJM46wm1vEVWatgh=p^ARGM&!OQzMCGf)W#ua1^hIP(oqnVBV8>0!i5 z73C05i3P(ITSIFf*SadGl@6l8FZ4WU=)S9_UyG{k8@v~X8_oC|e=#Uty)_O(E8rG7 z=sEgxqNU8<6T4)2W=R=tH#r)>YE!TT*eOd8e;^v=Eztyt637FnVD-lobL`X8<1 z>kpt!&{1rXq$AghFtq-=B$qIa$A%%d*2(HXm@9FlGT{)!NbwVuA5LWj?$CrDg)OyG zuK%ZkWhJ#d^T9tCG;-IxIDe<~=bM2<9~GQTr@DICNH1rrW~Ur>M+HH)prL^wl)gW z>dEIL*_I`Dn2e^0G;69o!|0b=uO2Jbzq0pDtPebWSPiSf_#L?nJ}V z&qF~=B}Ipv&k1NHq}Vv>J-W&SaHode}tvhZkga>9=eHw5}34pbdsEk8iC(C zxKZe_s5t03bx#vsO8M?(T?N+ zpAeGe8~TeO{4W8gLEN5hRzMkGJ8`pz{xFTBb6~C&S=NC_&l>z=V$s+ECg{*nq!=5; zfa3}a^;|9pRBfBdpl*KiN#lny$ADv(f7>=DbS~?!RX?Jx597?Ks~*cYozV*n2wjL7 z7_Zoi0$xQl^v7BFIr0>U(6(T&WZ{N7tm|S)FO_Si2%!M?S}?FDk%wPkZuL%n@;9{~ z!E^!&5tbPTHlwkSo9z`<0YR`jXG%oOWl@<<#s5B2-9vP91hqEW)Jl>B*dGUpf4vy) z;=bItW?hD^!daDr8LMy(m)cVQY+`1~8lAo_Mi~ELX95Q-7WLzOFsfMAu(@n@j6~EF zF3JZ{ApV|U`wUbMxr1I#UI4z-lR(5*q^p2uHOW>%XEYVQm~?>-$EB#k2%pk+cXmpN zZyJS9D=os91`c_%3mL1qOSQsBf9n$TuS2r5H<78tBBBb-%1#~)mcwL#Q#SoiS%qDc z!z4MzctrRXye-tuu*n%R^lokOkyc+mz$1a8k%7*MxothFZU}@>-T+|vbk2+wSQchw zjUjxf@^~1+^R#-Z-^Z0PR3njm&L#9=2h+-ez<9F*?rc5K=UfcRS^X$@+Rha+fWLE?LCH5P0RDqOKM4=sBcH6FTWEgN4Ag-`Tdodgvk7HEVe}wD`owzRLJW2)= zX||8zVdq)db&mcOdk51_8|7ayZJy0X4wX+;SZG}dssmtko*}CzU1=bywlv%SBq{*EO8f8`B-@yB({J2#o>M>m|T#**>@1~i!wsu3+YFl#Za8j2U~?JX|y z3LFTmrDjkj%`j*={L6^G><;P_yYA)fXj)i!0WTt@>z>UzmOP-4lVrhpH=C9A^Xd`0 zd^;CBR9cv)jiI96Zx2i!auq>ryV(2F@2*0JNL%4uzh;PZY;vdRcs$u(nQCQN*BM_w zyRlZX0MT>@dj9V?X;0Ai@+-M>6@3Ue02&zJy0V;ASmGM3fC<}@3}ZnR-jv2A=Ed$q sSGF!Q!=Rik!xKSTQ9ULS5w-RnGo%lZv{|pN%@};^l}9$JP^)m6nbBE9NdN!< delta 4846 zcmVHpI2!R-4l!4M)PGbnCpQg}tmzxqo zzs_90RLhI-frceL&G84=EC`M;!!I*G$AZgjV$5-1J#;`f0OGXI+wwD)p%=m#3?HO1c#uY3y{1$yszIC^ zxvvF|S1dN}_lq1wQ7z)FjPu5__q#L|e)KE&6ZtA6>aqSLQ8p_VV*s`W8n75vNz}VC z>l1jLf`a7mvk4E`Bm9zJM?q%dMfotIIt|$Pe+%F38_xkzkBc}&zt$4|2VIvBu9r1@aWSHg|- zuP7?W+P_ZhGM*HVWj5GZbT?_yJU*AIS#-Sqg93>ue94wYI~u0=#HykXvvcl>Pzgz` zMMYcG6*%4^Qe=DJNheZdu;^R&sfZ4^O2MOyFUaSeBqfBLbYR&M>Vy=w{vu*xf37H3 zun@XI#X5^ZYv%*96cNYMV~84Q4W`I8;e9I>t4FM%U!Cl~e4Z83CcE3z4B-pmaH?m34;k`FZ08gq+GLM4!Asc%@ zOLUlo3dGk0e&r9cEdU|}mY&M_e}HhXms(*OO3&rO$9k~Otix?GB9U9xK%q3h1onvy zMF#heO!pDi;&a!oqlkku4art4qJ}cB<_zyDH))=fTe=nf_RcdW7ZzJmRh}7Ugf=G}V;OHJ}UJ^!@Jm7rkL#mp#R&qqG$>u&q3Z_wF zLONk9qN8S$g1y7#q-ut3#eI9pn>`OL%h}G0MZxzcF!gF)qwl+ze>uP=SSW*%?v-v? zyt_$-wJGjsA?=lKxLsIm5mUY;&dU^IB&Cj;BI773NyzDb#urjAfZNf;NQE^6?p=e| zJ=+HSR-fhMcn8yqK<&UxI<*6t=keS`FGT3d8LZb76awDZ_Gk7-*T23R2Z2Gg96jJT{`kX#qD^~Y!xcp-1MPl(lOKve< z{*^I{pyjBZe{++p7(DE$0Tiw)u+5m;9)xcW(J2Zpdd6YjpuTjGj`fWhMxg2CS3i#^ zF$%uKeSF>OqjN}~j?wkx;Y+l}5dc%V@g%FYO+xi~AJy40HuWh(!`-jVFsEwXJ9B+p zw1bqVwgK(3++OFuw~I6*zFEe~hYnHMbS`w$5vOcibH>?6=7NIy5;z*ehL5A^VGqoA|O zr;jMQ>(%lD;-f)|7rn`zp$DZ}^nFK?a5Pa@qF?M3$T&*h#~-i>KDy40?L)XK^!13S zA7t-~e^ZoLI*tYTHI}7&v0S9EGC=P=Xu##tT7s7b=8bz4U%Od)TUKjJlgk&Yz=Q=B z7Bz~(#2IY~%2t`e- zeIJ$!+l%c4C!kv*oGL4jJD0|d0Mxf(obL+&c>z+zuL0D^6lQEXeC+HBx^leZ`Lc!+Vu+Ojz0JAZ+_mpfSQNDZ-~V`=}6~7ZyhMAwgkKUO=Pe{4hihIuoNGepV<)RX9FZK6a*^&maU0-1sW z1myga?e18gBHX3qnr<$TAK+Q$xu@K02PN)=7Cv)nG(Eb3og#*Rhyi{B!u=mDd941u zp7EDyq+a-KLZi7C*4(8tO8=pZQY~hKTlR`RTE?0+9WpRTzl{CP9IVNFF~>Y~e~XjX zPH+PZ5|==^Ek@PNyh;^P!!Czel742qS}e>Zaq9n`uO z$hNH)`jRtzG3tnh7WyxsH?1%WOf}B~^vJ!C&xZ5aNrgFuFtxUoc1Q)oiNv(FJcsV^vBJlN1SZ3#^9#+<5DkQjx z*aR(0{Z*J`z)YGLT<*Z;;izS;WE- zj8y7>38l2u$uq=E3$L0+D_r>osLYgA6x18V@)f0mrzEzaOIuB9AJ=IWG)g~-!7q0n z`05^~rms%bRtzGI!;M<`V!!i)hXmZ zpU3@{3|k78DE+=KOi`>UWeY-~`^u(@4*l=#`1%~sCfG%6=;S)rOH8z$Zj$pi4XTFW zS?iB?Aj-u%QqgeeV5Iax;fGV11$S6NPs1%W0oQ+3!Bmo(o_;8dm+}DwGD!ey4>A(J zn3NK!-Ol$IgS43-e;5te(s+%eno3E;gjFja!Rg+`Ouatdf&NgMOleGs?9SLbX1!|8 z+aq8g9ECa;m>uyca`q`cr}5QRNp&bi|F-C)3^=?9q~~?ZktS9CNnv^P*}k;x*0q;L zxuO$MHj^HA=jue}aNtB-~|QlEk+WqN1N# zslRw4>g4_N(T-kmFJ+HwMo+V0wsq4~M^(io+Pom{8M-bT&LBJ@qzwpQVD}4gv@x zwDcEGSADP*cY}t%#e2ibP!14+R=07T9v9*3`#} zzpoy@x&8r((wAvB0@HZiAc!JwFc0WV{%*cWwRzE3(cW|gmnC=W42CE_I{UH||Mcvz z1iy^&f3uz@nx9!&&0KcAO}pKr&H$RGyF1ZxniFBp0IoV?7}HDF$wZY{Dif;&tvCMi zvlg1YOS)w2OD1vivC1fFwNFe5y!eBKx#z!4t57lu6R?C7Pj}^P5ly~m>#2t6uD+$* zfECyC5SF-hE1ipZNJ(O+c3-($A~-4g+yC&vs+!@LEi^kkoEDCf8IjGesWe_{78!7C ze%rfDdl! zuwNHTh`C%dMGIxPc7}nqjXdN6Q}cIH$SK5t2cwEA7&ed3juFsJVaI%+3)An(9gKm7kUKBT$x-4<=@STi zZEF?Kr-suixCf`g>y<9Va9n*>7-2)&u2iS;{H9T_X|rzuY2bjIT{y6+zEr+rv@Q?- zI%JmYggHtqB2-9eb}}zm4mAdxf5PdpmsR+<9A3#WP9*|g@U~DMflcDMq1SU4u|~CW z8$7ZCFfyPrF~`;;>xMwC<&8O(@8Qg7hvj)!0+83omB$MR&lBlEb{|(=P))t$)0fZ( z8VnMTw(4s5&+iH;1j~C;P_sK$7tT1q@g|5j51n9)#%PKL*UaQIZTnNre>fCcSxE5$ zdfks}HD-<2FSB|CwWuXD>$uu8!c)R?_*Jdd8{g3^wIyzkE%_N-=f@UT6(C+n;zz&I z*u*#!4_wZ&r8l7S2?PlAO?vB{izNq`=Wks?fXnn*Z!s)i=???M+v%-|2=b!gH9^mK z0?=InO7cXxZQC+sP$Cf)e|&ivt*7=PP=!7&3R9|bqcfik*d}&>9X&pTsgY7saaWrY zBsSCJUJm8}0wPa*;q=%&kzE`B*6p|J2tg?+nL@k1?AEnd zYRGUGVAoqs_i{KJuVzHmMCuBipf2TmIHh z@xsDA@*+~Y?rc^x@_;g)WDT5mx~$?puRhR$*s0}#*up&N7%K93dw>edxDdfqsg7?(T7L` zpoIii)OAu}yDvy(OxQ^Bkp)$FQyN!d9_>DjW*eDVsO2nPstD3H)MLbm$g=lXkQzm$ Une|3(#)exj{ Date: Tue, 5 May 2026 14:16:56 -0400 Subject: [PATCH 5/8] lint --- test/new-e2e/tests/agent-platform/common/agent_behaviour.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go index d45a9a5ae11f..7eb6d90907ad 100644 --- a/test/new-e2e/tests/agent-platform/common/agent_behaviour.go +++ b/test/new-e2e/tests/agent-platform/common/agent_behaviour.go @@ -419,7 +419,6 @@ func CheckADPEnabled(t *testing.T, client *TestClient) { err = client.SetConfig(configFilePath, "data_plane.dogstatsd.enabled", "true") require.NoError(tt, err) - _, err = client.SvcManager.Restart(client.Helper.GetServiceName()) require.NoError(tt, err) From 262106f18a3ab82a95ebdd5a052618f1ae2711f0 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla Date: Thu, 7 May 2026 09:25:13 -0400 Subject: [PATCH 6/8] move gazelle exclude commands to top-level bazel file --- BUILD.bazel | 3 +++ comp/core/configstreamconsumer/fx/BUILD.bazel | 1 - comp/core/configstreamconsumer/impl/BUILD.bazel | 1 - comp/core/configstreamconsumer/mock/BUILD.bazel | 1 - 4 files changed, 3 insertions(+), 3 deletions(-) delete mode 100644 comp/core/configstreamconsumer/fx/BUILD.bazel delete mode 100644 comp/core/configstreamconsumer/impl/BUILD.bazel delete mode 100644 comp/core/configstreamconsumer/mock/BUILD.bazel diff --git a/BUILD.bazel b/BUILD.bazel index 68af5a82632d..ea20fd748a8c 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -99,6 +99,9 @@ exports_files(glob( # gazelle:exclude comp/core/bundle_mock.go # gazelle:exclude comp/core/bundle_params.go # gazelle:exclude comp/core/bundle_test.go +# gazelle:exclude comp/core/configstreamconsumer/fx +# gazelle:exclude comp/core/configstreamconsumer/impl +# gazelle:exclude comp/core/configstreamconsumer/mock # gazelle:exclude comp/core/diagnose/def # gazelle:exclude comp/core/diagnose/fx # gazelle:exclude comp/core/diagnose/impl diff --git a/comp/core/configstreamconsumer/fx/BUILD.bazel b/comp/core/configstreamconsumer/fx/BUILD.bazel deleted file mode 100644 index 9968c4af55f8..000000000000 --- a/comp/core/configstreamconsumer/fx/BUILD.bazel +++ /dev/null @@ -1 +0,0 @@ -# gazelle:ignore diff --git a/comp/core/configstreamconsumer/impl/BUILD.bazel b/comp/core/configstreamconsumer/impl/BUILD.bazel deleted file mode 100644 index 9968c4af55f8..000000000000 --- a/comp/core/configstreamconsumer/impl/BUILD.bazel +++ /dev/null @@ -1 +0,0 @@ -# gazelle:ignore diff --git a/comp/core/configstreamconsumer/mock/BUILD.bazel b/comp/core/configstreamconsumer/mock/BUILD.bazel deleted file mode 100644 index 9968c4af55f8..000000000000 --- a/comp/core/configstreamconsumer/mock/BUILD.bazel +++ /dev/null @@ -1 +0,0 @@ -# gazelle:ignore From 825e66687438f0e11d79bbad93a16b7b191504bb Mon Sep 17 00:00:00 2001 From: Tony Aiuto Date: Thu, 7 May 2026 23:53:25 -0400 Subject: [PATCH 7/8] Update BUILD.bazel Removed unneeded gazelle:exclude comp/core/diagnose/def --- BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/BUILD.bazel b/BUILD.bazel index 4dd0344150ea..0ccff1c4b0b8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -101,7 +101,6 @@ exports_files(glob( # gazelle:exclude comp/core/configstreamconsumer/fx # gazelle:exclude comp/core/configstreamconsumer/impl # gazelle:exclude comp/core/configstreamconsumer/mock -# gazelle:exclude comp/core/diagnose/def # gazelle:exclude comp/core/diagnose/fx # gazelle:exclude comp/core/diagnose/impl # gazelle:exclude comp/core/diagnose/local From d28bb2ead05cf0699d881d3763b2d02bc6a38f65 Mon Sep 17 00:00:00 2001 From: rahulkaukuntla Date: Fri, 8 May 2026 08:00:13 -0400 Subject: [PATCH 8/8] trigger ci