[AGENTCFG-626] Adding a consumer component for streaming configuration updates from the core agent to remote agents registered via RAR (attempt 2)#50385
Conversation
…n updates from the core agent to remote agents registered via RAR (#46206) ### What does this PR do? Adds the **configstreamconsumer component** (`comp/core/configstreamconsumer`), a shared Go library remote agents use to consume configuration streams from the core Datadog Agent over gRPC. - **Simple API**: a single `WaitReady(ctx)` method; the component otherwise runs autonomously as an FX lifecycle hook. - **Startup gating**: `OnStart` blocks until the first config snapshot is received from the core agent, so all components initialized after this one see a fully-populated config. Startup is aborted if `Params.ReadyTimeout` (default 60s) is exceeded. - **Source fidelity**: streamed settings are written into `config.Component` using the same source the core agent assigned (e.g. `SourceDefault`, `SourceFile`, `SourceEnvVar`), preserving the original priority semantics on the remote process. `SourceLocalConfigProcess` is not used. - **Ordered updates**: sequential application by sequence ID; stale updates are dropped, discontinuities trigger a server-side resync. - **Reconnection**: the consumer reconnects automatically on stream failure with a 5s backoff; `lastSeqID` is never reset between reconnects so stale snapshots from a restarted core agent are rejected. - **RAR-gated**: requires a session ID from Remote Agent Registry, supplied either as a fixed value or via a `SessionIDProvider` (e.g. from the remote agent component) that is resolved at connect time. ### Motivation Remote agents (e.g. system-probe, trace-agent, process-agent) need to receive configuration from the core agent. This component provides a single, reusable consumer so each agent does not reimplement gRPC wiring, sequencing, snapshot gating, or reconnection logic. ### Describe how you validated your changes Added and successfully ran unit tests. Furthermore, I actually set up system-probe to use this new consumer component ([in this PR](#46281), which is built off these changes). I was successfully able to stream the config to system-probe: > 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:250 in run) | Waiting for initial configuration from core agent... 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/remoteagent/helper/serverhelper.go:267 in registerWithAgent) | Registered with Remote Agent Registry for config streaming (session_id=d3411ec4-62bb-4d3f-8411-56e00615392d). Recommended refresh interval: 10 seconds. 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:293 in connectAndStream) | Config stream established for client system-probe 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:333 in applySnapshot) | Applying config snapshot (seq_id: 2, settings: 1563) 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (comp/core/configstreamconsumer/impl/consumer.go:358 in func1) | Received first config snapshot after 140.719085ms 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:254 in run) | Initial configuration received from core agent. Starting system-probe. 2026-03-11 16:25:07 UTC | SYS-PROBE | INFO | (cmd/system-probe/subcommands/run/command.go:318 in startSystemProbe) | starting system-probe v7.77.0-devel+git.1266.7f2d2db ### Additional Notes Co-authored-by: rahul.kaukuntla <rahul.kaukuntla@datadoghq.com>
|
🎯 Code Coverage (details) 🔗 Commit SHA: 821efc8 | Docs | Datadog PR Page | Give us feedback! |
Files inventory check summaryFile checks results against ancestor dac78363: Results for datadog-agent_7.80.0~devel.git.578.821efc8.pipeline.111987790-1_amd64.deb:No change detected |
Static quality checks✅ Please find below the results from static quality gates Successful checksInfo
26 successful checks with minimal change (< 2 KiB)
|
Regression DetectorRegression Detector ResultsMetrics dashboard Baseline: 6d20614 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | docker_containers_cpu | % cpu utilization | +3.09 | [+0.04, +6.15] | 1 | Logs |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | docker_containers_cpu | % cpu utilization | +3.09 | [+0.04, +6.15] | 1 | Logs |
| ➖ | quality_gate_logs | % cpu utilization | +1.89 | [+0.90, +2.87] | 1 | Logs bounds checks dashboard |
| ➖ | ddot_metrics_sum_cumulative | memory utilization | +0.59 | [+0.43, +0.75] | 1 | Logs |
| ➖ | tcp_syslog_to_blackhole | ingress throughput | +0.35 | [+0.15, +0.55] | 1 | Logs |
| ➖ | quality_gate_security_mean_fs_load | memory utilization | +0.17 | [+0.13, +0.21] | 1 | Logs bounds checks dashboard |
| ➖ | ddot_metrics_sum_delta | memory utilization | +0.14 | [-0.05, +0.34] | 1 | Logs |
| ➖ | quality_gate_security_idle | memory utilization | +0.10 | [+0.03, +0.17] | 1 | Logs bounds checks dashboard |
| ➖ | otlp_ingest_logs | memory utilization | +0.04 | [-0.06, +0.14] | 1 | Logs |
| ➖ | file_to_blackhole_0ms_latency | egress throughput | +0.01 | [-0.52, +0.54] | 1 | Logs |
| ➖ | uds_dogstatsd_to_api | ingress throughput | +0.00 | [-0.20, +0.20] | 1 | Logs |
| ➖ | file_to_blackhole_1000ms_latency | egress throughput | -0.01 | [-0.45, +0.44] | 1 | Logs |
| ➖ | uds_dogstatsd_to_api_v3 | ingress throughput | -0.01 | [-0.21, +0.19] | 1 | Logs |
| ➖ | tcp_dd_logs_filter_exclude | ingress throughput | -0.01 | [-0.10, +0.08] | 1 | Logs |
| ➖ | file_to_blackhole_100ms_latency | egress throughput | -0.01 | [-0.16, +0.14] | 1 | Logs |
| ➖ | file_to_blackhole_500ms_latency | egress throughput | -0.07 | [-0.47, +0.33] | 1 | Logs |
| ➖ | ddot_metrics_sum_cumulativetodelta_exporter | memory utilization | -0.08 | [-0.32, +0.16] | 1 | Logs |
| ➖ | uds_dogstatsd_20mb_12k_contexts_20_senders | memory utilization | -0.10 | [-0.15, -0.05] | 1 | Logs |
| ➖ | docker_containers_memory | memory utilization | -0.10 | [-0.21, -0.00] | 1 | Logs |
| ➖ | quality_gate_idle_all_features | memory utilization | -0.17 | [-0.21, -0.13] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_idle | memory utilization | -0.17 | [-0.22, -0.13] | 1 | Logs bounds checks dashboard |
| ➖ | ddot_logs | memory utilization | -0.18 | [-0.24, -0.12] | 1 | Logs |
| ➖ | otlp_ingest_metrics | memory utilization | -0.29 | [-0.44, -0.13] | 1 | Logs |
| ➖ | ddot_metrics | memory utilization | -0.29 | [-0.49, -0.09] | 1 | Logs |
| ➖ | quality_gate_security_no_fs_load | memory utilization | -0.34 | [-0.44, -0.24] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_metrics_logs | memory utilization | -0.84 | [-1.09, -0.59] | 1 | Logs bounds checks dashboard |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | docker_containers_cpu | simple_check_run | 10/10 | 715 ≥ 26 | |
| ✅ | docker_containers_memory | memory_usage | 10/10 | 242.75MiB ≤ 370MiB | |
| ✅ | docker_containers_memory | simple_check_run | 10/10 | 686 ≥ 26 | |
| ✅ | file_to_blackhole_0ms_latency | memory_usage | 10/10 | 0.16GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_0ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_1000ms_latency | memory_usage | 10/10 | 0.21GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_1000ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_100ms_latency | memory_usage | 10/10 | 0.17GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_100ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_500ms_latency | memory_usage | 10/10 | 0.18GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_500ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | quality_gate_idle | intake_connections | 10/10 | 3 ≤ 4 | bounds checks dashboard |
| ✅ | quality_gate_idle | memory_usage | 10/10 | 141.39MiB ≤ 147MiB | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | intake_connections | 10/10 | 3 ≤ 4 | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | memory_usage | 10/10 | 471.11MiB ≤ 495MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | intake_connections | 10/10 | 4 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_logs | memory_usage | 10/10 | 180.44MiB ≤ 195MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | cpu_usage | 10/10 | 354.85 ≤ 2000 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | intake_connections | 10/10 | 3 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | memory_usage | 10/10 | 374.18MiB ≤ 430MiB | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
| ✅ | quality_gate_security_idle | cpu_usage | 10/10 | 25.23 ≤ 40 | bounds checks dashboard |
| ✅ | quality_gate_security_idle | memory_usage | 10/10 | 283.13MiB ≤ 330MiB | bounds checks dashboard |
| ✅ | quality_gate_security_mean_fs_load | cpu_usage | 10/10 | 52.58 ≤ 70 | bounds checks dashboard |
| ✅ | quality_gate_security_mean_fs_load | memory_usage | 10/10 | 269.52MiB ≤ 320MiB | bounds checks dashboard |
| ✅ | quality_gate_security_no_fs_load | cpu_usage | 10/10 | 19.78 ≤ 40 | bounds checks dashboard |
| ✅ | quality_gate_security_no_fs_load | memory_usage | 10/10 | 284.46MiB ≤ 320MiB | bounds checks dashboard |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
CI Pass/Fail Decision
✅ Passed. All Quality Gates passed.
- quality_gate_metrics_logs, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_security_idle, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_security_idle, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_security_mean_fs_load, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_security_mean_fs_load, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_security_no_fs_load, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_security_no_fs_load, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check memory_usage: 10/10 replicas passed. Gate passed.
…d make it the remote agent's responsibility to configure it properly
| remote_agent: | ||
| registry: | ||
| enabled: true # Required for RAR-gated authorization and for use_configstream | ||
| enabled: true # Required for RAR authorization; remote agents must register before subscribing |
There was a problem hiding this comment.
Do we have the same problem with remote_agent.registry.enabled? If it's false then ADP will fail?
There was a problem hiding this comment.
No, configstream doesn't check if the remote_agent config options are set anymore--go-based remote agents are now assigned the responsibility of checking that these options are set. ADP should be fine as-is.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d0be095618
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err := c.handleConfigEvent(event); err != nil { | ||
| c.log.Errorf("Failed to handle config event: %v", err) | ||
| } |
There was a problem hiding this comment.
Reconnect when a sequence discontinuity is detected
When the server-side subscriber channel drops an event, the next update reaches applyUpdate with a gap and returns a discontinuity error, but this loop only logs the error and keeps reading the same stream. Because lastSeqID is not advanced, every later update remains > lastSeqID+1 and is rejected too, so the remote agent's config stops updating until the stream happens to break; return the error from connectAndStream here so the loop can reconnect and receive a fresh snapshot as intended.
Useful? React with 👍 / 👎.
| @@ -0,0 +1 @@ | |||
| # gazelle:ignore | |||
There was a problem hiding this comment.
Add Bazel targets for the consumer implementation
This BUILD file only contains # gazelle:ignore, so it defines no go_library for //comp/core/configstreamconsumer/impl, even though the new fx package imports this implementation. Any Bazel-built binary or test that tries to consume the advertised configstreamconsumer FX module will fail to resolve/build the implementation package; please add the corresponding go_library/test targets as done for sibling components such as comp/core/configstream.
Useful? React with 👍 / 👎.
| @@ -0,0 +1 @@ | |||
| # gazelle:ignore | |||
There was a problem hiding this comment.
We would normally do this by adding
# gazelle:exclude comp/core/configstreamconsumer/impl
to the top level BUILD.bazel file.
Did gazelle create this? or an agent?
There was a problem hiding this comment.
I moved the commands to the top-level bazel file, as suggested. To be honest, the original iteration of this PR has been out for so long, I don't really remember how it was done 😅
…ttps://github.com/DataDog/datadog-agent into rahul/config-stream-consumer-attempt-2
| # gazelle:exclude comp/core/configstreamconsumer/fx | ||
| # gazelle:exclude comp/core/configstreamconsumer/impl | ||
| # gazelle:exclude comp/core/configstreamconsumer/mock | ||
| # gazelle:exclude comp/core/diagnose/def |
There was a problem hiding this comment.
You don't need line 104. There already is a build file. excluding it will update every build file that mentioned //comp/core/diagnose/def
The original PR had to be reverted due to #incident-53989 and #incident-53990.
#incident-53989 has been mitigated by changing the nature of the config streaming flag--now it is the remote agent's responsibility to not accept config from the core agent if the
remote_agent.configstream.consumer.enabledflag is not set (refer to the README instructions in the configstreamconsumer library).#incident-53990 will be mitigated by asking for a memory exception to pass the failing SQG.
What does this PR do?
Adds the configstreamconsumer component (
comp/core/configstreamconsumer), a shared Go library remote agents use to consume configuration streams from the core Datadog Agent over gRPC.WaitReady(ctx)method; the component otherwise runs autonomously as an FX lifecycle hook.OnStartblocks until the first config snapshot is received from the core agent, so all components initialized after this one see a fully-populated config. Startup is aborted ifParams.ReadyTimeout(default 60s) is exceeded.config.Componentusing the same source the core agent assigned (e.g.SourceDefault,SourceFile,SourceEnvVar), preserving the original priority semantics on the remote process.SourceLocalConfigProcessis not used.lastSeqIDis never reset between reconnects so stale snapshots from a restarted core agent are rejected.SessionIDProvider(e.g. from the remote agent component) that is resolved at connect time.Motivation
Remote agents (e.g. system-probe, trace-agent, process-agent) need to receive configuration from the core agent. This component provides a single, reusable consumer so each agent does not reimplement gRPC wiring, sequencing, snapshot gating, or reconnection logic.
Describe how you validated your changes
Added and successfully ran unit tests.
Furthermore, I actually set up system-probe to use this new consumer component (in this PR, which is built off these changes). I was successfully able to stream the config to system-probe:
Additional Notes