Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@
/comp/checks/winregistry @DataDog/windows-products
/comp/core/autodiscovery @DataDog/container-platform
/comp/core/config @DataDog/agent-configuration
/comp/core/configstream @DataDog/agent-metric-pipelines @DataDog/agent-configuration
/comp/core/configstream @DataDog/agent-configuration
/comp/core/configstreamconsumer @DataDog/agent-configuration
/comp/core/configsync @DataDog/agent-configuration
/comp/core/flare @DataDog/agent-configuration
/comp/core/gui @DataDog/agent-configuration
Expand Down
3 changes: 3 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ exports_files(glob(
# gazelle:exclude comp/core/bundle_mock.go
# gazelle:exclude comp/core/bundle_params.go
# gazelle:exclude comp/core/bundle_test.go
# gazelle:exclude comp/core/configstreamconsumer/fx
# gazelle:exclude comp/core/configstreamconsumer/impl
# gazelle:exclude comp/core/configstreamconsumer/mock
# gazelle:exclude comp/core/diagnose/fx
# gazelle:exclude comp/core/diagnose/impl
# gazelle:exclude comp/core/diagnose/local
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/subcommands/run/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func newGlobalParamsTest(t *testing.T) *command.GlobalParams {
// which lead to build:
// - config.Component which requires a valid datadog.yaml
// - hostname.Component which requires a valid hostname
config := path.Join(t.TempDir(), "datadog.yaml")
err := os.WriteFile(config, []byte("hostname: test"), 0644)
configPath := path.Join(t.TempDir(), "datadog.yaml")
err := os.WriteFile(configPath, []byte("hostname: test"), 0644)
require.NoError(t, err)

return &command.GlobalParams{
ConfFilePath: config,
ConfFilePath: configPath,
}
}
8 changes: 7 additions & 1 deletion comp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,16 @@ component temporarily wraps pkg/config.

### [comp/core/configstream](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstream)

*Datadog Team*: agent-metric-pipelines agent-configuration
*Datadog Team*: agent-configuration

Package configstream implements a component to handle streaming configuration events to subscribers.

### [comp/core/configstreamconsumer](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configstreamconsumer)

*Datadog Team*: agent-configuration

Package configstreamconsumer implements a component that consumes config streams from the core agent.

### [comp/core/configsync](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/core/configsync)

*Datadog Team*: agent-configuration
Expand Down
13 changes: 7 additions & 6 deletions comp/core/configstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,18 @@ message ConfigSetting {

## Configuration

The config stream is automatically enabled when the component is loaded. No explicit configuration required.
The config stream component always runs. Individual connections are RAR-gated: the caller must be a registered remote agent.

**Optional settings:**
**Settings:**
```yaml
# datadog.yaml
remote_agent:
registry:
enabled: true # Required for RAR-gated authorization
enabled: true # Required for RAR authorization; remote agents must register before subscribing
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the same problem with remote_agent.registry.enabled? If it's false then ADP will fail?

Copy link
Copy Markdown
Contributor Author

@rahulkaukuntla rahulkaukuntla May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, configstream doesn't check if the remote_agent config options are set anymore--go-based remote agents are now assigned the responsibility of checking that these options are set. ADP should be fine as-is.

configstream:
enabled: true # Required to use the configstreamconsumer
sleep_interval: 10s # Backoff on non-terminal errors (default: 10s)
sleep_interval: 10s # Backoff on non-terminal send errors (default: 10s)
consumer:
enabled: false # Default: false. Set to true on Go-based remote agents to enable the configstreamconsumer component.
agent_ipc:
# Maximum size of a single gRPC message accepted/sent by the agent's gRPC
# server. Configstream snapshots can be large (the entire flattened agent
Expand Down Expand Up @@ -288,6 +289,6 @@ log_level: debug

## Contact

- **Teams:** agent-metric-pipelines, agent-configuration
- **Team:** agent-configuration
- **Component:** `comp/core/configstream`
- **Test Client:** `cmd/config-stream-client`
2 changes: 1 addition & 1 deletion comp/core/configstream/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
)

// team: agent-metric-pipelines agent-configuration
// team: agent-configuration

// Component is the component type.
type Component interface {
Expand Down
6 changes: 3 additions & 3 deletions comp/core/configstream/impl/configstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
configstream "github.com/DataDog/datadog-agent/comp/core/configstream/def"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/telemetry/def"
telemetry "github.com/DataDog/datadog-agent/comp/core/telemetry/def"
compdef "github.com/DataDog/datadog-agent/comp/def"
"github.com/DataDog/datadog-agent/pkg/config/model"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
Expand Down Expand Up @@ -67,7 +67,7 @@ type subscription struct {
}

// NewComponent creates a new configstream component.
func NewComponent(reqs Requires) Provides {
func NewComponent(reqs Requires) (Provides, error) {
cs := &configStream{
config: reqs.Config,
log: reqs.Log,
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewComponent(reqs Requires) Provides {

return Provides{
Comp: cs,
}
}, nil
}

// Subscribe returns a channel that streams configuration events, starting with a snapshot.
Expand Down
26 changes: 20 additions & 6 deletions comp/core/configstream/impl/configstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,21 @@ done:
}
}

// newConfigStreamForTest creates a config stream for testing without lifecycle
func TestNewComponentNoError(t *testing.T) {
mockLog := logmock.New(t)
telemetryComp := telemetrynoops.GetCompatComponent()
cfg := configmock.New(t)
_, err := NewComponent(Requires{
Lifecycle: compdef.NewTestLifecycle(t),
Config: cfg,
Log: mockLog,
Telemetry: telemetryComp,
})
require.NoError(t, err)
}

// newConfigStreamForTest creates a config stream for testing without lifecycle.
// It manually starts the run loop since the test lifecycle does not execute hooks.
func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Component) *configStream {
telemetryComp := telemetrynoops.GetCompatComponent()
reqs := Requires{
Expand All @@ -290,10 +304,9 @@ func newConfigStreamForTest(t *testing.T, cfg config.Component, logger log.Compo
Log: logger,
Telemetry: telemetryComp,
}
provides := NewComponent(reqs)
provides, err := NewComponent(reqs)
require.NoError(t, err)

// Extract the underlying configStream
// and start the run loop manually since lifecycle hooks are not executed
cs := provides.Comp.(*configStream)
go cs.run()

Expand Down Expand Up @@ -342,10 +355,11 @@ func buildComponent(t *testing.T) (Provides, *configInterceptor) {
Telemetry: telemetrynoops.GetCompatComponent(),
}

provides := NewComponent(reqs)
provides, err := NewComponent(reqs)
require.NoError(t, err)

// Start the component's run loop
err := lc.Start(context.Background())
err = lc.Start(context.Background())
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
3 changes: 3 additions & 0 deletions comp/core/configstream/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func TestRARAuthorization(t *testing.T) {

t.Run("rejects request with missing metadata", func(t *testing.T) {
cfg := configmock.New(t)
cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime)
comp := &mockComp{}
mockRAR := &mockRemoteAgentRegistry{}
server := NewServer(cfg, comp, mockRAR)
Expand All @@ -198,6 +199,7 @@ func TestRARAuthorization(t *testing.T) {

t.Run("rejects request with missing session_id in metadata", func(t *testing.T) {
cfg := configmock.New(t)
cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime)
comp := &mockComp{}
mockRAR := &mockRemoteAgentRegistry{}
server := NewServer(cfg, comp, mockRAR)
Expand All @@ -215,6 +217,7 @@ func TestRARAuthorization(t *testing.T) {

t.Run("rejects request with empty session_id", func(t *testing.T) {
cfg := configmock.New(t)
cfg.Set("remote_agent.configstream.sleep_interval", 10*time.Millisecond, model.SourceAgentRuntime)
comp := &mockComp{}
mockRAR := &mockRemoteAgentRegistry{}
server := NewServer(cfg, comp, mockRAR)
Expand Down
143 changes: 143 additions & 0 deletions comp/core/configstreamconsumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Config Stream Consumer Component

A shared Go library for remote agents (system-probe, trace-agent, process-agent, etc.) to consume configuration streams from the core Datadog Agent. It provides gRPC connection management, snapshot gating, and ordered config application, writing received settings directly into the agent's `config.Component`.

## Overview

- **Real-time config**: Receive full snapshot then incremental updates from the core agent over gRPC.
- **RAR-gated**: Only registered remote agents can subscribe; session ID is required (fixed or via `SessionIDProvider`).
- **Readiness gating**: `Start` blocks until the first config snapshot is received, aborting startup if `Params.ReadyTimeout` (default: 60s) is exceeded.
- **Single source of truth**: Streamed config is written into `config.Component` via `model.Writer`. Callers read config through `config.Component` directly — not through this component.
- **Ordered updates**: Sequential application by sequence ID; stale updates dropped, discontinuities trigger resync.
- **Restart safety**: `lastSeqID` is never reset on reconnect. If the core agent restarts and its sequence counter resets, the consumer logs an error and refuses the new snapshot until the sub-process itself restarts.
- **Telemetry**: Metrics for time-to-first-snapshot, reconnects, sequence ID, and dropped updates.

## Architecture

Producer (core agent) and consumer (remote agents) communicate over the same gRPC contract:

```
┌─────────────────────────┐ ┌─────────────────────────┐
│ Core Agent Process │ │ Remote Agent Process │
│ │ │ (e.g. system-probe) │
│ ┌──────────────────┐ │ │ ┌──────────────────┐ │
│ │ configstream │ │ gRPC │ │ configstream- │ │
│ │ (producer) │◄──┼──────────┼─►│ consumer │ │
│ │ │ │ stream │ │ │ │
│ └──────────────────┘ │ │ └──────────────────┘ │
└─────────────────────────┘ └─────────────────────────┘
```

**Flow:**

1. Remote agent registers with RAR and obtains `session_id` (or supplies it via `SessionIDProvider`).
2. Consumer connects to core agent and calls `StreamConfigEvents` with `session_id` in gRPC metadata.
3. Core agent validates the session and sends an initial snapshot, then streams incremental updates.
4. Consumer applies snapshot/updates in order and writes them into `config.Component` via `model.Writer`.

See `../configstream/README.md` for the producer side and the gRPC/protobuf contract.

## Quick Start

Supply **either** a fixed `SessionID` **or** a `SessionIDProvider` (e.g. from the remote agent component). The consumer uses the provider at connect time so RAR can register first.

## Wiring guide

### Only include the module when the feature is enabled

Including `configstreamconsumerfx.Module()` when config streaming is disabled will abort FX startup. Gate on `remote_agent.configstream.consumer.enabled` before building FX options:

```go
if cfg.GetBool("remote_agent.configstream.consumer.enabled") {
opts = append(opts, configstreamFxOptions())
}
```

### Full example

```go
func configstreamFxOptions() fx.Option {
return fx.Options(
// Bridge config.Component to model.Writer so the consumer can write streamed config.
fx.Provide(func(c config.Component) model.Writer { return c }),

// Provide the SessionIDProvider from the remote agent (blocks until RAR registration).
fx.Provide(func(ra remoteagent.Component) configstreamconsumerimpl.SessionIDProvider {
if ra == nil {
return nil
}
if p, ok := ra.(configstreamconsumerimpl.SessionIDProvider); ok {
return p
}
return nil
}),

// Provide Params — only reached when configstream is known to be enabled.
fx.Provide(func(c config.Component, deps struct {
fx.In
SessionProvider configstreamconsumerimpl.SessionIDProvider `optional:"true"`
}) *configstreamconsumerimpl.Params {
host := c.GetString("cmd_host")
port := c.GetInt("cmd_port")
if port <= 0 {
port = 5001
}
return &configstreamconsumerimpl.Params{
ClientName: "my-agent",
CoreAgentAddress: net.JoinHostPort(host, strconv.Itoa(port)),
SessionIDProvider: deps.SessionProvider,
}
}),

configstreamconsumerfx.Module(),
// Force instantiation so Start runs and blocks until the first snapshot.
fx.Invoke(func(_ configstreamconsumer.Component) {}),
)
}
```

## Requirements

- **Core agent**: configstream component (always on by default) and RAR enabled (`remote_agent.registry.enabled: true`).
- **Consumer opt-in**: Set `remote_agent.configstream.consumer.enabled: true` on the remote agent to enable this component.
- **RAR**: Remote agent must register with RAR before subscribing; pass `session_id` via gRPC metadata (supply fixed `SessionID` or `SessionIDProvider` with `WaitSessionID(ctx) (string, error)`).
- **IPC**: mTLS and auth token for gRPC (same as other core-agent IPC).
- **`model.Writer`**: `config.Component` must be explicitly provided as `model.Writer` in the same FX scope. Streamed settings are written using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process.

## Telemetry

| Metric | Type | Description |
|--------|------|-------------|
| `configstream_consumer.time_to_first_snapshot_seconds` | Gauge | Time to receive first snapshot |
| `configstream_consumer.reconnect_count` | Counter | Stream reconnections |
| `configstream_consumer.last_sequence_id` | Gauge | Last received config sequence ID |
| `configstream_consumer.dropped_stale_updates` | Counter | Stale updates dropped |

## Testing

### Manual testing with system-probe

1. Start the core agent with RAR and config stream enabled.
2. Set `cmd_host` / `cmd_port` in the config used by system-probe.
3. Start system-probe. You should see:
- `Waiting for initial configuration from core agent...`
- After snapshot: `Initial configuration received from core agent. Starting system-probe.`
4. If the core agent is down or the stream never sends a snapshot, system-probe exits with: `waiting for initial config snapshot: context deadline exceeded`.

## Troubleshooting

- **session_id required in metadata**
Ensure the remote agent registers with RAR first and that the consumer is given either a fixed `SessionID` or a `SessionIDProvider` that returns the session ID.

- **Startup timeout (no snapshot received within `ReadyTimeout`)**
Core agent must be running, config stream enabled, and RAR returning a valid session. Check core agent logs for config stream and RAR errors.

- **"core agent may have restarted" error in logs**
The consumer received a snapshot with a lower sequence ID than its last-known value, indicating the core agent restarted. Restart the sub-process to accept the new configuration.

## Related documentation

- **Producer**: `../configstream/README.md` — core agent config streaming service and gRPC contract.
- **Test client**: `cmd/config-stream-client/README.md` — standalone client for end-to-end testing.

**Team**: agent-configuration
8 changes: 8 additions & 0 deletions comp/core/configstreamconsumer/def/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "def",
srcs = ["component.go"],
importpath = "github.com/DataDog/datadog-agent/comp/core/configstreamconsumer/def",
visibility = ["//visibility:public"],
)
43 changes: 43 additions & 0 deletions comp/core/configstreamconsumer/def/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

// Package configstreamconsumer implements a component that consumes config streams from the core agent.
//
// team: agent-configuration
package configstreamconsumer

import (
"context"
"time"
)

// SessionIDProvider supplies the RAR session ID, typically after registration completes.
// When set, the consumer will call WaitSessionID at connect time instead of using Params.SessionID.
type SessionIDProvider interface {
WaitSessionID(ctx context.Context) (string, error)
}

// Params defines the parameters for the configstreamconsumer component
type Params struct {
// ClientName is the identity of this remote agent (e.g., "system-probe", "trace-agent")
ClientName string
// CoreAgentAddress is the address of the core agent IPC endpoint
CoreAgentAddress string
// SessionID is the RAR session ID for authorization. Required if SessionIDProvider is nil.
SessionID string
// SessionIDProvider supplies the session ID at connect time (e.g. from remote agent component).
// When set, SessionID may be empty; the consumer will block on WaitSessionID before connecting.
SessionIDProvider SessionIDProvider
// ReadyTimeout is how long OnStart blocks waiting for the first config snapshot before
// returning an error and aborting startup. Defaults to 60s when zero.
ReadyTimeout time.Duration
}

// Component is the config stream consumer component interface.
// Its sole purpose is to receive configuration from the core agent stream and write it
// into the local config.Component via the model.Writer provided at construction.
// Callers that need to read config or subscribe to changes should use config.Component directly.
// Readiness is guaranteed by the FX lifecycle: start blocks until the first snapshot is received.
type Component interface{}
Loading
Loading