[Logs Stateful Encoding] transport layer initial implementation#48879
[Logs Stateful Encoding] transport layer initial implementation#48879TheSafo wants to merge 16 commits intologs-stateful-mainfrom
Conversation
Hooks gRPC stream to Logs agent pipeline
During flow-control/back-pressure, a slow/blocking Send won't block the supervisor loop
…nsport Note the current implementation delta-encode at batch-level, which doesn't require protobuf change. We still make the protobuf change in case we want to extend the delta-encoding at stream-level in the future
Go Package Import DifferencesBaseline: a903409
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 136b0e3399
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| statefulInputChan := make(chan *message.StatefulMessage, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) | ||
| grpcsender.StartMessageTranslator(inputChan, statefulInputChan) | ||
|
|
||
| return grpcsender.NewBatchStrategy(statefulInputChan, outputChan, flushChan, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor, instanceID) |
There was a problem hiding this comment.
Keep translator and strategy channels in one shutdown domain
In gRPC mode this function starts a long-lived translator from inputChan to statefulInputChan, but the returned strategy only owns (and closes) statefulInputChan on Stop(). Pipeline.Stop() stops the processor first, yet that does not close inputChan, so the translator can remain alive indefinitely; if it is still forwarding buffered messages when statefulInputChan is closed, it will panic on send to a closed channel. This makes shutdown unsafe and leaks goroutines whenever logs_config.use_grpc is enabled.
Useful? React with 👍 / 👎.
| s.backoffTimer.Stop() | ||
| s.drainTimer.Stop() | ||
| if s.batchToSendCh != nil { | ||
| close(s.batchToSendCh) |
There was a problem hiding this comment.
Prevent double-close of stream send channel during shutdown
tryBeginStreamRotation closes s.batchToSendCh when a stream fails, but the field is left non-nil; if shutdown happens while the worker is in connecting/disconnected/backoff before a new channel is installed, handleShutdown closes the same channel again and panics (close of closed channel). This is a crash path during stop under transient stream failures.
Useful? React with 👍 / 👎.
Files inventory check summaryFile checks results against ancestor a9034095: Results for datadog-agent_7.79.0~devel.git.448.136b0e3.pipeline.106013071-1_amd64.deb:No change detected |
Static quality checks❌ Please find below the results from static quality gates Error
Gate failure full details
Static quality gates prevent the PR to merge! Successful checksInfo
2 successful checks with minimal change (< 2 KiB)
On-wire sizes (compressed)
|
Regression DetectorRegression Detector ResultsMetrics dashboard Baseline: a903409 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | docker_containers_cpu | % cpu utilization | +1.03 | [-2.04, +4.10] | 1 | Logs |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | quality_gate_logs | % cpu utilization | +3.95 | [+2.29, +5.60] | 1 | Logs bounds checks dashboard |
| ➖ | docker_containers_cpu | % cpu utilization | +1.03 | [-2.04, +4.10] | 1 | Logs |
| ➖ | otlp_ingest_logs | memory utilization | +0.79 | [+0.69, +0.88] | 1 | Logs |
| ➖ | otlp_ingest_metrics | memory utilization | +0.63 | [+0.47, +0.78] | 1 | Logs |
| ➖ | docker_containers_memory | memory utilization | +0.53 | [+0.44, +0.61] | 1 | Logs |
| ➖ | ddot_metrics_sum_cumulative | memory utilization | +0.46 | [+0.32, +0.61] | 1 | Logs |
| ➖ | ddot_metrics_sum_cumulativetodelta_exporter | memory utilization | +0.43 | [+0.20, +0.65] | 1 | Logs |
| ➖ | uds_dogstatsd_20mb_12k_contexts_20_senders | memory utilization | +0.35 | [+0.29, +0.42] | 1 | Logs |
| ➖ | ddot_metrics | memory utilization | +0.33 | [+0.15, +0.51] | 1 | Logs |
| ➖ | ddot_logs | memory utilization | +0.29 | [+0.23, +0.34] | 1 | Logs |
| ➖ | quality_gate_idle_all_features | memory utilization | +0.26 | [+0.23, +0.30] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_idle | memory utilization | +0.14 | [+0.09, +0.19] | 1 | Logs bounds checks dashboard |
| ➖ | ddot_metrics_sum_delta | memory utilization | +0.11 | [-0.06, +0.28] | 1 | Logs |
| ➖ | uds_dogstatsd_to_api | ingress throughput | +0.05 | [-0.17, +0.26] | 1 | Logs |
| ➖ | uds_dogstatsd_to_api_v3 | ingress throughput | +0.02 | [-0.19, +0.23] | 1 | Logs |
| ➖ | file_to_blackhole_1000ms_latency | egress throughput | +0.01 | [-0.43, +0.44] | 1 | Logs |
| ➖ | tcp_dd_logs_filter_exclude | ingress throughput | -0.00 | [-0.11, +0.11] | 1 | Logs |
| ➖ | file_to_blackhole_500ms_latency | egress throughput | -0.02 | [-0.41, +0.38] | 1 | Logs |
| ➖ | file_tree | memory utilization | -0.02 | [-0.08, +0.04] | 1 | Logs |
| ➖ | file_to_blackhole_0ms_latency | egress throughput | -0.02 | [-0.56, +0.52] | 1 | Logs |
| ➖ | file_to_blackhole_100ms_latency | egress throughput | -0.03 | [-0.16, +0.10] | 1 | Logs |
| ➖ | tcp_syslog_to_blackhole | ingress throughput | -0.08 | [-0.26, +0.10] | 1 | Logs |
| ➖ | quality_gate_metrics_logs | memory utilization | -0.69 | [-0.92, -0.46] | 1 | Logs bounds checks dashboard |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | docker_containers_cpu | simple_check_run | 10/10 | 704 ≥ 26 | |
| ✅ | docker_containers_memory | memory_usage | 10/10 | 274.13MiB ≤ 370MiB | |
| ✅ | docker_containers_memory | simple_check_run | 10/10 | 692 ≥ 26 | |
| ✅ | file_to_blackhole_0ms_latency | memory_usage | 10/10 | 0.19GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_0ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_1000ms_latency | memory_usage | 10/10 | 0.23GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_1000ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_100ms_latency | memory_usage | 10/10 | 0.19GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_100ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | file_to_blackhole_500ms_latency | memory_usage | 10/10 | 0.21GiB ≤ 1.20GiB | |
| ✅ | file_to_blackhole_500ms_latency | missed_bytes | 10/10 | 0B = 0B | |
| ✅ | quality_gate_idle | intake_connections | 10/10 | 3 = 3 | bounds checks dashboard |
| ✅ | quality_gate_idle | memory_usage | 10/10 | 172.86MiB ≤ 181MiB | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | intake_connections | 10/10 | 3 = 3 | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | memory_usage | 10/10 | 491.58MiB ≤ 550MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | intake_connections | 10/10 | 4 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_logs | memory_usage | 10/10 | 201.63MiB ≤ 220MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | cpu_usage | 10/10 | 357.58 ≤ 2000 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | intake_connections | 10/10 | 4 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | memory_usage | 10/10 | 431.65MiB ≤ 475MiB | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
CI Pass/Fail Decision
✅ Passed. All Quality Gates passed.
- quality_gate_idle_all_features, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
| // Currently this is treated as stream error, which will trigger a stream rotation | ||
| // and retry of the same payload, which loops on. this IS NOT the desired behavior. | ||
| // TODO: Implement proper handling of irrecoverable errors, by blocking the ingestion | ||
| log.Infof("Worker %s: irrecoverable error detected: %s", s.workerID, reason) |
There was a problem hiding this comment.
We should eventually make this either a metric or a log metric to track why some errors happen
There was a problem hiding this comment.
Ack. I do have improved metrics in a follow up PR. An open question i have though is what metrics should be shared w/ HTTP vs isolated between the two
There was a problem hiding this comment.
Top of my head i think the following should be shared:
- Bytes sent/ bytes dropped
- If possible Compression Ratio
- Logs component utilization (But i think this might be impossible since 1 is a long lived connection while the other one is per instance)
And the following i think is beneficial to track stateful encoding behavior:
- Patterns added/removed
- Inflight bytes (backpressure health)
- Total state size bytes? (track total state footprints (patterns+tokens+tags*number of pipeline used), but im not sure if we should just count this in a hypothetical sense instead of actually tracking it)
@ddrthall what do you think?
There was a problem hiding this comment.
e63bacb#diff-f6b32aa8d2763ef6abcf429dd989e58459cd22700852f21645501fa44f3f736c is what I have so far to followup with.
What does this PR do?
Implements the "transport" layer of the new gRPC based "stateful encoding" logs protocol (DataDog/agent-payload#443). This explicitly doesn't connect the change to existing pipeline/configuration or pattern extraction code to keep the PR a more reviewable size - follow up PRs will implement that.
Motivation
Partial implementation of experimental new logs protocol.
Describe how you validated your changes
We have tested the full feature end to end against a real Intake as well as in the Lading regression test setup.
How to Review this PR
This change is extracted from #43321 which has the full stateful encoding setup. This PR specifically extracts pkg/logs/sender/grpc (without hooking it up to config or integrating pattern extraction) to turn the transport layer into a more reviewable chunk.
./pkg/logs/sender/grpc/DESIGN.md is included to document how this code works.
Github now includes file filters on the "Files changed" tab to ignore all the go.mod / go.sum changes link
Additional Notes
Same as #46583 but pointed at the new feature branch
https://datadoghq.atlassian.net/browse/EPIN-2660