Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
df514b4
Subscribe StatsAggregator to settings changes
andrewlock Oct 22, 2025
df8256e
Update CI Vis use of mutable settings
andrewlock Oct 22, 2025
59bee7f
Update DSM usages
andrewlock Oct 22, 2025
caffd6f
Fix manual instrumentation
andrewlock Oct 22, 2025
161ecde
Update direct log submission to use settings manager
andrewlock Oct 23, 2025
ab0cbd7
Fix OTLP usages
andrewlock Oct 23, 2025
fb743af
"Fix" debugger - this only uses the initial settings though, and does…
andrewlock Oct 23, 2025
14029de
Fix remote config to subscribe to setting changes
andrewlock Oct 23, 2025
311b4b8
Fix telemetry usages
andrewlock Oct 23, 2025
33d6a05
Fix Tracer and TracerManager
andrewlock Oct 23, 2025
c3f196a
Update statsd handling in runtime metrics
andrewlock Oct 24, 2025
f8dfc66
Update ITraceSampler to respond to setting changes
andrewlock Oct 24, 2025
980a1a3
Update Api and AgentWriter to handle changes in exporter and settings
andrewlock Oct 24, 2025
b85cf19
Fix remaining usages of direct access to Mutable Settings
andrewlock Oct 24, 2025
48f127d
minor fixes
andrewlock Oct 24, 2025
a5efe03
Update CI Vis usages - these don't need to respond to changes because…
andrewlock Oct 27, 2025
e2a3249
Update Telemetry to handle changes to agent configuration
andrewlock Oct 27, 2025
c8fc228
Update dynamic config usages
andrewlock Oct 27, 2025
f636502
Update debugger ExporterSettings - this only uses the initial setting…
andrewlock Oct 27, 2025
8720b66
Update DiscoveryService
andrewlock Oct 27, 2025
90a298c
Update TracerFlareApi
andrewlock Oct 28, 2025
5be7d75
Remove Exporter property from TracerSettings
andrewlock Oct 28, 2025
64d2dd5
Remove EnableSending and DisableSending from TelemetryController
andrewlock Oct 29, 2025
c5108f7
Fix OTel metrics test - I don't know why this changed, but it looks b…
andrewlock Oct 30, 2025
087524f
Fix manual instrumentation not reporting the correct settings
andrewlock Oct 30, 2025
df8700b
Fix telemetry recording and log configuration when settings change
andrewlock Oct 30, 2025
7ed9c21
Rework the StatsDManager
andrewlock Oct 31, 2025
3238b3e
Add fix for not re-reporting telemetry when there are "empty" dynamic…
andrewlock Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions tracer/missing-nullability-files.csv
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ src/Datadog.Trace/Tracer.cs
src/Datadog.Trace/TracerConstants.cs
src/Datadog.Trace/TracerManager.cs
src/Datadog.Trace/TracerManagerFactory.cs
src/Datadog.Trace/Agent/AgentWriter.cs
src/Datadog.Trace/Agent/Api.cs
src/Datadog.Trace/Agent/ClientStatsPayload.cs
src/Datadog.Trace/Agent/IAgentWriter.cs
src/Datadog.Trace/Agent/IApi.cs
src/Datadog.Trace/Agent/IApiRequest.cs
src/Datadog.Trace/Agent/IApiRequestFactory.cs
Expand Down
8 changes: 4 additions & 4 deletions tracer/src/Datadog.Trace.Tools.Runner/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ public static async Task<AgentConfiguration> CheckAgentConnectionAsync(string ag

var settings = new TracerSettings(configurationSource, new ConfigurationTelemetry(), new OverrideErrorLog());

Log.Debug("Creating DiscoveryService for: {AgentUri}", settings.Exporter.AgentUri);
var discoveryService = DiscoveryService.Create(
settings.Exporter,
Log.Debug("Creating DiscoveryService for: {AgentUri}", settings.Manager.InitialExporterSettings.AgentUri);
var discoveryService = DiscoveryService.CreateUnmanaged(
settings.Manager.InitialExporterSettings,
tcpTimeout: TimeSpan.FromSeconds(5),
initialRetryDelayMs: 200,
maxRetryDelayMs: 1000,
Expand All @@ -433,7 +433,7 @@ public static async Task<AgentConfiguration> CheckAgentConnectionAsync(string ag
using (cts.Token.Register(
() =>
{
WriteError($"Error connecting to the Datadog Agent at {settings.Exporter.AgentUri}.");
WriteError($"Error connecting to the Datadog Agent at {settings.Manager.InitialExporterSettings.AgentUri}.");
tcs.TrySetResult(null);
}))
{
Expand Down
77 changes: 51 additions & 26 deletions tracer/src/Datadog.Trace/Agent/AgentWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -12,10 +14,8 @@
using Datadog.Trace.Configuration;
using Datadog.Trace.DogStatsd;
using Datadog.Trace.Logging;
using Datadog.Trace.Tagging;
using Datadog.Trace.Telemetry;
using Datadog.Trace.Telemetry.Metrics;
using Datadog.Trace.Vendors.StatsdClient;

namespace Datadog.Trace.Agent
{
Expand All @@ -25,10 +25,10 @@ internal class AgentWriter : IAgentWriter

private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<AgentWriter>();

private static readonly ArraySegment<byte> EmptyPayload = new(new byte[] { 0x90 });
private static readonly ArraySegment<byte> EmptyPayload = new([0x90]);

private readonly ConcurrentQueue<WorkItem> _pendingTraces = new ConcurrentQueue<WorkItem>();
private readonly IDogStatsd _statsd;
private readonly IStatsdManager _statsd;
private readonly Task _flushTask;
private readonly Task _serializationTask;
private readonly TaskCompletionSource<bool> _processExit = new TaskCompletionSource<bool>();
Expand All @@ -43,7 +43,7 @@ internal class AgentWriter : IAgentWriter
private readonly int _batchInterval;
private readonly IKeepRateCalculator _traceKeepRateCalculator;

private readonly IStatsAggregator _statsAggregator;
private readonly IStatsAggregator? _statsAggregator;

private readonly bool _apmTracingEnabled;

Expand All @@ -65,17 +65,28 @@ internal class AgentWriter : IAgentWriter

private long _droppedTraces;

public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, TracerSettings settings)
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled)
private bool _traceMetricsEnabled;

public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, TracerSettings settings)
: this(api, statsAggregator, statsd, maxBufferSize: settings.TraceBufferSize, batchInterval: settings.TraceBatchInterval, apmTracingEnabled: settings.ApmTracingEnabled, initialTracerMetricsEnabled: settings.Manager.InitialMutableSettings.TracerMetricsEnabled)
{
settings.Manager.SubscribeToChanges(changes =>
{
if (changes.UpdatedMutable is { } mutable
&& mutable.TracerMetricsEnabled != changes.PreviousMutable.TracerMetricsEnabled)
{
Volatile.Write(ref _traceMetricsEnabled, mutable.TracerMetricsEnabled);
_statsd.SetRequired(StatsdConsumer.AgentWriter, mutable.TracerMetricsEnabled);
}
});
}

public AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true)
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled)
public AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, bool automaticFlush = true, int maxBufferSize = 1024 * 1024 * 10, int batchInterval = 100, bool apmTracingEnabled = true, bool initialTracerMetricsEnabled = false)
: this(api, statsAggregator, statsd, MovingAverageKeepRateCalculator.CreateDefaultKeepRateCalculator(), automaticFlush, maxBufferSize, batchInterval, apmTracingEnabled, initialTracerMetricsEnabled)
{
}

internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled)
internal AgentWriter(IApi api, IStatsAggregator? statsAggregator, IStatsdManager statsd, IKeepRateCalculator traceKeepRateCalculator, bool automaticFlush, int maxBufferSize, int batchInterval, bool apmTracingEnabled, bool initialTracerMetricsEnabled)
{
_statsAggregator = statsAggregator;

Expand All @@ -92,18 +103,20 @@ internal AgentWriter(IApi api, IStatsAggregator statsAggregator, IDogStatsd stat
_backBuffer = new SpanBuffer(maxBufferSize, formatterResolver);
_activeBuffer = _frontBuffer;

_apmTracingEnabled = apmTracingEnabled;
_traceMetricsEnabled = initialTracerMetricsEnabled;
_statsd.SetRequired(StatsdConsumer.AgentWriter, initialTracerMetricsEnabled);

_serializationTask = automaticFlush ? Task.Factory.StartNew(SerializeTracesLoop, TaskCreationOptions.LongRunning) : Task.CompletedTask;
_serializationTask.ContinueWith(t => Log.Error(t.Exception, "Error in serialization task"), TaskContinuationOptions.OnlyOnFaulted);

_flushTask = automaticFlush ? Task.Run(FlushBuffersTaskLoopAsync) : Task.CompletedTask;
_flushTask.ContinueWith(t => Log.Error(t.Exception, "Error in flush task"), TaskContinuationOptions.OnlyOnFaulted);

_backBufferFlushTask = _frontBufferFlushTask = Task.CompletedTask;

_apmTracingEnabled = apmTracingEnabled;
}

internal event Action Flushed;
internal event Action? Flushed;

internal SpanBuffer ActiveBuffer => _activeBuffer;

Expand Down Expand Up @@ -138,10 +151,14 @@ public void WriteTrace(ArraySegment<Span> trace)
}
}

if (_statsd != null)
if (Volatile.Read(ref _traceMetricsEnabled))
{
_statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
_statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
using var lease = _statsd.TryGetClientLease();
if (lease.Client is { } statsd)
{
statsd.Increment(TracerMetricNames.Queue.EnqueuedTraces);
statsd.Increment(TracerMetricNames.Queue.EnqueuedSpans, trace.Count);
}
}
}

Expand Down Expand Up @@ -244,7 +261,7 @@ private async Task FlushBuffersTaskLoopAsync()
{
tasks[2] = Task.Delay(TimeSpan.FromSeconds(1));
await Task.WhenAny(tasks).ConfigureAwait(false);
tasks[2] = null;
tasks[2] = null!;

if (_forceFlush.Task.IsCompleted)
{
Expand Down Expand Up @@ -314,10 +331,14 @@ async Task InternalBufferFlush()

try
{
if (_statsd != null)
if (Volatile.Read(ref _traceMetricsEnabled))
{
_statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
_statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
using var lease = _statsd.TryGetClientLease();
if (lease.Client is { } statsd)
{
statsd.Increment(TracerMetricNames.Queue.DequeuedTraces, buffer.TraceCount);
statsd.Increment(TracerMetricNames.Queue.DequeuedSpans, buffer.SpanCount);
}
}

var droppedTraces = Interlocked.Exchange(ref _droppedTraces, 0);
Expand All @@ -336,7 +357,7 @@ async Task InternalBufferFlush()
{
droppedP0Traces = Interlocked.Exchange(ref _droppedP0Traces, 0);
droppedP0Spans = Interlocked.Exchange(ref _droppedP0Spans, 0);
Log.Debug<int, int, long, long>("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
Log.Debug("Flushing {Spans} spans across {Traces} traces. CanComputeStats is enabled with {DroppedP0Traces} droppedP0Traces and {DroppedP0Spans} droppedP0Spans", buffer.SpanCount, buffer.TraceCount, droppedP0Traces, droppedP0Spans);
// Metrics for unsampled traces/spans already recorded
}
else
Expand Down Expand Up @@ -377,7 +398,7 @@ async Task InternalBufferFlush()
private void SerializeTrace(ArraySegment<Span> spans)
{
// Declaring as inline method because only safe to invoke in the context of SerializeTrace
SpanBuffer SwapBuffers()
SpanBuffer? SwapBuffers()
{
if (_activeBuffer == _frontBuffer)
{
Expand Down Expand Up @@ -512,10 +533,14 @@ private void DropTrace(ArraySegment<Span> spans)
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.OverfullBuffer, spans.Count);
TelemetryFactory.Metrics.RecordCountTraceChunkDropped(MetricTags.DropReason.OverfullBuffer);

if (_statsd != null)
if (Volatile.Read(ref _traceMetricsEnabled))
{
_statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
_statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
using var lease = _statsd.TryGetClientLease();
if (lease.Client is { } statsd)
{
statsd.Increment(TracerMetricNames.Queue.DroppedTraces);
statsd.Increment(TracerMetricNames.Queue.DroppedSpans, spans.Count);
}
}
}

Expand Down Expand Up @@ -578,7 +603,7 @@ private void SerializeTracesLoop()
private readonly struct WorkItem
{
public readonly ArraySegment<Span> Trace;
public readonly Action Callback;
public readonly Action? Callback;

public WorkItem(ArraySegment<Span> trace)
{
Expand Down
28 changes: 22 additions & 6 deletions tracer/src/Datadog.Trace/Agent/Api.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Agent.Transports;
using Datadog.Trace.DogStatsd;
Expand All @@ -31,7 +33,7 @@ internal class Api : IApi

private readonly IDatadogLogger _log;
private readonly IApiRequestFactory _apiRequestFactory;
private readonly IDogStatsd _statsd;
private readonly IStatsdManager _statsd;
private readonly string _containerId;
private readonly string _entityId;
private readonly Uri _tracesEndpoint;
Expand All @@ -42,12 +44,14 @@ internal class Api : IApi
private readonly SendCallback<SendTracesState> _sendTraces;
private string _cachedResponse;
private string _agentVersion;
private bool _healthMetricsEnabled;

public Api(
IApiRequestFactory apiRequestFactory,
IDogStatsd statsd,
IStatsdManager statsd,
Action<Dictionary<string, float>> updateSampleRates,
bool partialFlushEnabled,
bool healthMetricsEnabled,
IDatadogLogger log = null)
{
// optionally injecting a log instance in here for testing purposes
Expand All @@ -57,10 +61,12 @@ public Api(
_sendTraces = SendTracesAsyncImpl;
_updateSampleRates = updateSampleRates;
_statsd = statsd;
ToggleTracerHealthMetrics(healthMetricsEnabled);
_containerId = ContainerMetadata.GetContainerId();
_entityId = ContainerMetadata.GetEntityId();
_apiRequestFactory = apiRequestFactory;
_partialFlushEnabled = partialFlushEnabled;
_healthMetricsEnabled = healthMetricsEnabled;
_tracesEndpoint = _apiRequestFactory.GetEndpoint(TracesPath);
_log.Debug("Using traces endpoint {TracesEndpoint}", _tracesEndpoint.ToString());
_statsEndpoint = _apiRequestFactory.GetEndpoint(StatsPath);
Expand All @@ -76,6 +82,13 @@ private enum SendResult
Failed_DontRetry,
}

[MemberNotNull(nameof(_statsd))]
public void ToggleTracerHealthMetrics(bool enabled)
{
Volatile.Write(ref _healthMetricsEnabled, enabled);
_statsd.SetRequired(StatsdConsumer.TraceApi, enabled);
}

public Task<bool> SendStatsAsync(StatsBuffer stats, long bucketDuration)
{
_log.Debug("Sending stats to the Datadog Agent.");
Expand Down Expand Up @@ -298,10 +311,13 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin

try
{
var healthMetricsEnabled = Volatile.Read(ref _healthMetricsEnabled);
using var lease = healthMetricsEnabled ? _statsd.TryGetClientLease() : default;
var healthStats = healthMetricsEnabled ? lease.Client : null;
try
{
TelemetryFactory.Metrics.RecordCountTraceApiRequests();
_statsd?.Increment(TracerMetricNames.Api.Requests);
healthStats?.Increment(TracerMetricNames.Api.Requests);
response = await request.PostAsync(traces, MimeTypes.MsgPack).ConfigureAwait(false);
}
catch (Exception ex)
Expand All @@ -310,17 +326,17 @@ private async Task<SendResult> SendTracesAsyncImpl(IApiRequest request, bool fin
// (which are handled below)
var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError;
TelemetryFactory.Metrics.RecordCountTraceApiErrors(tag);
_statsd?.Increment(TracerMetricNames.Api.Errors);
healthStats?.Increment(TracerMetricNames.Api.Errors);
throw;
}

if (_statsd != null)
if (healthStats != null)
{
// don't bother creating the tags array if trace metrics are disabled
string[] tags = { $"status:{response.StatusCode}" };

// count every response, grouped by status code
_statsd?.Increment(TracerMetricNames.Api.Responses, tags: tags);
healthStats.Increment(TracerMetricNames.Api.Responses, tags: tags);
}

TelemetryFactory.Metrics.RecordCountTraceApiResponses(response.GetTelemetryStatusCodeMetricTag());
Expand Down
18 changes: 13 additions & 5 deletions tracer/src/Datadog.Trace/Agent/ClientStatsPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@
// </copyright>

using System.Threading;
using Datadog.Trace.Configuration;

namespace Datadog.Trace.Agent
{
internal class ClientStatsPayload
internal class ClientStatsPayload(MutableSettings settings)
{
private AppSettings _settings = CreateSettings(settings);
private long _sequence;

public string HostName { get; set; }
public string HostName { get; init; }

public string Environment { get; set; }

public string Version { get; set; }
public AppSettings Details => _settings;

public long GetSequenceNumber() => Interlocked.Increment(ref _sequence);

public void UpdateDetails(MutableSettings settings)
=> Interlocked.Exchange(ref _settings, CreateSettings(settings));

private static AppSettings CreateSettings(MutableSettings settings)
=> new(settings.Environment, settings.ServiceVersion);

internal record AppSettings(string Environment, string Version);
}
}
Loading
Loading