diff --git a/libraries/src/AWS.Lambda.Powertools.Tracing/Internal/TracingAspect.cs b/libraries/src/AWS.Lambda.Powertools.Tracing/Internal/TracingAspect.cs index 0be560ff..dfdabb00 100644 --- a/libraries/src/AWS.Lambda.Powertools.Tracing/Internal/TracingAspect.cs +++ b/libraries/src/AWS.Lambda.Powertools.Tracing/Internal/TracingAspect.cs @@ -2,6 +2,7 @@ using System.Linq; using System.Runtime.ExceptionServices; using System.Text; +using System.Threading; using System.Threading.Tasks; using AspectInjector.Broker; using AWS.Lambda.Powertools.Common; @@ -28,14 +29,15 @@ public class TracingAspect private readonly IXRayRecorder _xRayRecorder; /// - /// If true, capture annotations + /// Thread-safe flag for capturing annotations. Uses int for Interlocked operations. + /// 1 = should capture, 0 = already captured /// - private static bool _captureAnnotations = true; + private static int _captureAnnotations = 1; /// - /// If true, annotations have been captured + /// If true, annotations have been captured by this invocation's execution context /// - private bool _isAnnotationsCaptured; + private static readonly AsyncLocal _isAnnotationsCaptured = new(); /// /// Aspect constructor @@ -117,8 +119,12 @@ public object Around( } finally { - if (_isAnnotationsCaptured) - _captureAnnotations = true; + // Reset the capture flag if this execution context captured annotations + if (_isAnnotationsCaptured.Value) + { + Interlocked.Exchange(ref _captureAnnotations, 1); + _isAnnotationsCaptured.Value = false; + } } } @@ -127,12 +133,12 @@ private void BeginSegment(string segmentName, string @namespace) _xRayRecorder.BeginSubsegment(segmentName); _xRayRecorder.SetNamespace(@namespace); - if (_captureAnnotations) + // Use Interlocked.CompareExchange for thread-safe check-and-set + // Only one thread will successfully change from 1 to 0 + if (Interlocked.CompareExchange(ref _captureAnnotations, 0, 1) == 1) { _xRayRecorder.AddAnnotation("ColdStart", LambdaLifecycleTracker.IsColdStart); - - _captureAnnotations = false; - _isAnnotationsCaptured = true; + _isAnnotationsCaptured.Value = true; if (_powertoolsConfigurations.IsServiceDefined) _xRayRecorder.AddAnnotation("Service", _powertoolsConfigurations.Service); @@ -231,6 +237,7 @@ private bool CaptureError(TracingCaptureMode captureMode) internal static void ResetForTest() { LambdaLifecycleTracker.Reset(); - _captureAnnotations = true; + Interlocked.Exchange(ref _captureAnnotations, 1); + _isAnnotationsCaptured.Value = false; } } \ No newline at end of file diff --git a/libraries/src/Directory.Packages.props b/libraries/src/Directory.Packages.props index cf240f82..665d5a3b 100644 --- a/libraries/src/Directory.Packages.props +++ b/libraries/src/Directory.Packages.props @@ -21,7 +21,7 @@ - + diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj index 907369e1..50d191a8 100644 --- a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj @@ -26,6 +26,7 @@ + diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/SubsegmentIsolationTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/SubsegmentIsolationTests.cs new file mode 100644 index 00000000..7dd38ccb --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/SubsegmentIsolationTests.cs @@ -0,0 +1,425 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Amazon.XRay.Recorder.Core; +using Amazon.XRay.Recorder.Core.Internal.Entities; +using AWS.Lambda.Powertools.Common; +using Xunit; +using PowertoolsTracing = AWS.Lambda.Powertools.Tracing.Tracing; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Tracing; + +/// +/// Tests for validating subsegment isolation in Powertools Tracing +/// under concurrent execution scenarios. +/// +/// These tests verify that when multiple Lambda invocations run concurrently, +/// each invocation's tracing subsegments remain isolated from other invocations. +/// +/// Note: X-Ray SDK uses AsyncLocal for trace context, which provides natural +/// isolation between async execution contexts. These tests validate that +/// Powertools Tracing correctly leverages this isolation. +/// +[Collection("Tracing Concurrency Tests")] +public class SubsegmentIsolationTests : IDisposable +{ + public SubsegmentIsolationTests() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", "/var/task"); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", "TestService"); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", "false"); + } + + public void Dispose() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", null); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", null); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", null); + } + + private static void ResetTracingState() + { + try + { + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch + { + // Ignore if no entity exists + } + } + + + #region Helper Result Classes + + private class SubsegmentResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public string SubsegmentName { get; set; } = string.Empty; + public bool SubsegmentCreated { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + } + + private class AnnotationResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public List<(string Key, object Value)> AnnotationsAdded { get; set; } = new(); + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class MetadataResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public List<(string Namespace, string Key, object Value)> MetadataAdded { get; set; } = new(); + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + private class WithSubsegmentResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public string SubsegmentName { get; set; } = string.Empty; + public bool CallbackExecuted { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + #endregion + + #region Property 1: Subsegment Creation Isolation + + /// + /// Verifies that concurrent invocations can create subsegments without interference. + /// Each invocation should be able to create its own subsegment independently. + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task SubsegmentCreation_ConcurrentInvocations_ShouldNotInterfere(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new SubsegmentResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + SubsegmentName = $"Subsegment_{invocationIndex}_{invocationId}" + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Create a subsegment + AWSXRayRecorder.Instance.BeginSubsegment(result.SubsegmentName); + result.SubsegmentCreated = true; + + Thread.Sleep(Random.Shared.Next(1, 10)); + + AWSXRayRecorder.Instance.EndSubsegment(); + + // Clean up + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = ex.Message; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should have created subsegments without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Invocation {r.InvocationIndex} threw: {r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.True(r.SubsegmentCreated, + $"Invocation {r.InvocationIndex} failed to create subsegment")); + } + + #endregion + + + #region Property 2: Annotation Isolation + + /// + /// Verifies that concurrent invocations adding annotations don't interfere with each other. + /// + [Theory] + [InlineData(2, 1)] + [InlineData(3, 3)] + [InlineData(5, 2)] + public async Task AnnotationIsolation_ConcurrentInvocations_ShouldNotInterfere( + int concurrencyLevel, int annotationsPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new AnnotationResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + AWSXRayRecorder.Instance.BeginSubsegment($"Subsegment_{invocationIndex}"); + + // Add annotations + for (int a = 0; a < annotationsPerInvocation; a++) + { + var key = $"annotation_inv{invocationIndex}_a{a}"; + var value = $"value_{invocationId}_{a}"; + PowertoolsTracing.AddAnnotation(key, value); + result.AnnotationsAdded.Add((key, value)); + } + + Thread.Sleep(Random.Shared.Next(1, 10)); + + AWSXRayRecorder.Instance.EndSubsegment(); + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(annotationsPerInvocation, r.AnnotationsAdded.Count)); + } + + #endregion + + #region Property 3: Metadata Isolation + + /// + /// Verifies that concurrent invocations adding metadata don't interfere with each other. + /// + [Theory] + [InlineData(2, 1)] + [InlineData(3, 3)] + [InlineData(5, 2)] + public async Task MetadataIsolation_ConcurrentInvocations_ShouldNotInterfere( + int concurrencyLevel, int metadataPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new MetadataResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + AWSXRayRecorder.Instance.BeginSubsegment($"Subsegment_{invocationIndex}"); + + // Add metadata + for (int m = 0; m < metadataPerInvocation; m++) + { + var ns = $"namespace_{invocationIndex}"; + var key = $"metadata_inv{invocationIndex}_m{m}"; + var value = new { InvocationId = invocationId, Index = m }; + PowertoolsTracing.AddMetadata(ns, key, value); + result.MetadataAdded.Add((ns, key, value)); + } + + Thread.Sleep(Random.Shared.Next(1, 10)); + + AWSXRayRecorder.Instance.EndSubsegment(); + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(metadataPerInvocation, r.MetadataAdded.Count)); + } + + #endregion + + + #region Property 4: WithSubsegment Isolation + + /// + /// Verifies that concurrent invocations using WithSubsegment don't interfere with each other. + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task WithSubsegment_ConcurrentInvocations_ShouldNotInterfere(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new WithSubsegmentResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + SubsegmentName = $"WithSubsegment_{invocationIndex}" + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Use WithSubsegment + PowertoolsTracing.WithSubsegment(result.SubsegmentName, subsegment => + { + result.CallbackExecuted = true; + subsegment.AddAnnotation($"invocation_{invocationIndex}", invocationId); + subsegment.AddMetadata($"data_{invocationIndex}", new { Id = invocationId }); + Thread.Sleep(Random.Shared.Next(1, 10)); + }); + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.CallbackExecuted, + $"Invocation {r.InvocationIndex} callback was not executed")); + } + + #endregion + + #region Property 5: BeginSubsegment/Dispose Pattern Isolation + + /// + /// Verifies that concurrent invocations using BeginSubsegment with using pattern don't interfere. + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task BeginSubsegment_ConcurrentInvocations_ShouldNotInterfere(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new SubsegmentResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + SubsegmentName = $"BeginSubsegment_{invocationIndex}" + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Use BeginSubsegment with using pattern + using (var subsegment = PowertoolsTracing.BeginSubsegment(result.SubsegmentName)) + { + result.SubsegmentCreated = true; + subsegment.AddAnnotation($"invocation_{invocationIndex}", invocationId); + Thread.Sleep(Random.Shared.Next(1, 10)); + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Invocation {r.InvocationIndex} threw: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.True(r.SubsegmentCreated, + $"Invocation {r.InvocationIndex} failed to create subsegment")); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingAsyncContextTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingAsyncContextTests.cs new file mode 100644 index 00000000..4cfa1ca0 --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingAsyncContextTests.cs @@ -0,0 +1,319 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Amazon.XRay.Recorder.Core; +using Amazon.XRay.Recorder.Core.Internal.Entities; +using AWS.Lambda.Powertools.Common; +using Xunit; +using PowertoolsTracing = AWS.Lambda.Powertools.Tracing.Tracing; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Tracing; + +/// +/// Tests for validating async context behavior in Powertools Tracing +/// under concurrent execution scenarios. +/// +/// These tests verify that the X-Ray SDK's AsyncLocal-based trace context +/// correctly maintains isolation between async execution contexts, +/// which is essential for Lambda's multi-instance mode. +/// +[Collection("Tracing Concurrency Tests")] +public class TracingAsyncContextTests : IDisposable +{ + public TracingAsyncContextTests() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", "/var/task"); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", "TestService"); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", "false"); + } + + public void Dispose() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", null); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", null); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", null); + } + + private static void ResetTracingState() + { + try + { + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch + { + // Ignore if no entity exists + } + } + + #region Helper Result Classes + + private class AsyncContextResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public string ExpectedSegmentName { get; set; } = string.Empty; + public string? ActualSegmentName { get; set; } + public bool ContextPreserved { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + #endregion + + #region Property 1: Async Context Preservation + + /// + /// Verifies that async context is preserved across await points. + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task AsyncContextPreservation_AcrossAwaitPoints_ShouldMaintainContext(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(async invocationIndex => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new AsyncContextResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExpectedSegmentName = $"AsyncTest_{invocationIndex}" + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment(result.ExpectedSegmentName); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // First await point + await Task.Delay(Random.Shared.Next(1, 10)); + + // Verify context is preserved + var entity1 = PowertoolsTracing.GetEntity(); + if (entity1 == null) + { + result.ExceptionThrown = true; + result.ExceptionMessage = "Context lost after first await"; + results.Add(result); + return; + } + + // Create a subsegment + AWSXRayRecorder.Instance.BeginSubsegment($"async_sub_{invocationIndex}"); + + // Second await point + await Task.Delay(Random.Shared.Next(1, 10)); + + // Verify we're still in the subsegment context + var entity2 = PowertoolsTracing.GetEntity(); + if (entity2 == null) + { + result.ExceptionThrown = true; + result.ExceptionMessage = "Context lost after second await"; + results.Add(result); + return; + } + + AWSXRayRecorder.Instance.EndSubsegment(); + + // Third await point + await Task.Delay(Random.Shared.Next(1, 10)); + + // Verify we're back to root segment context + var entity3 = PowertoolsTracing.GetEntity(); + result.ActualSegmentName = entity3?.Name; + result.ContextPreserved = entity3 != null; + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + }).ToList(); + + await Task.WhenAll(tasks); + + // All invocations should preserve context + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreserved, + $"Invocation {r.InvocationIndex} lost context")); + } + + #endregion + + #region Property 2: ConfigureAwait(false) Behavior + + /// + /// Verifies that ConfigureAwait(false) doesn't break trace context. + /// X-Ray SDK uses AsyncLocal which flows across ConfigureAwait(false). + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task ConfigureAwaitFalse_ShouldPreserveTraceContext(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(async invocationIndex => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new AsyncContextResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExpectedSegmentName = $"ConfigureAwaitTest_{invocationIndex}" + }; + + try + { + var rootSegment = new Segment(result.ExpectedSegmentName); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Use ConfigureAwait(false) + await Task.Delay(Random.Shared.Next(1, 10)).ConfigureAwait(false); + + // Verify context is preserved + var entity1 = PowertoolsTracing.GetEntity(); + if (entity1 == null) + { + result.ExceptionThrown = true; + result.ExceptionMessage = "Context lost after ConfigureAwait(false)"; + results.Add(result); + return; + } + + AWSXRayRecorder.Instance.BeginSubsegment($"after_configureawait_{invocationIndex}"); + + await Task.Delay(Random.Shared.Next(1, 10)).ConfigureAwait(false); + + var entity2 = PowertoolsTracing.GetEntity(); + if (entity2 == null) + { + result.ExceptionThrown = true; + result.ExceptionMessage = "Context lost after second ConfigureAwait(false)"; + results.Add(result); + return; + } + + AWSXRayRecorder.Instance.EndSubsegment(); + + await Task.Delay(Random.Shared.Next(1, 10)).ConfigureAwait(false); + + var entity3 = PowertoolsTracing.GetEntity(); + result.ContextPreserved = entity3 != null; + result.ActualSegmentName = entity3?.Name; + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + }).ToList(); + + await Task.WhenAll(tasks); + + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreserved, + $"Invocation {r.InvocationIndex} lost context after ConfigureAwait(false)")); + } + + #endregion + + #region Property 3: Task.Run Context Flow + + /// + /// Verifies that trace context flows correctly into Task.Run. + /// + [Theory] + [InlineData(2, 3)] + [InlineData(5, 2)] + [InlineData(10, 2)] + public async Task TaskRun_ShouldFlowTraceContext(int concurrencyLevel, int taskRunsPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(async invocationIndex => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new AsyncContextResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + ExpectedSegmentName = $"TaskRunTest_{invocationIndex}" + }; + + try + { + var rootSegment = new Segment(result.ExpectedSegmentName); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + var contextPreservedInAllTasks = true; + + // Run multiple Task.Run operations + var taskRunTasks = Enumerable.Range(0, taskRunsPerInvocation) + .Select(taskIndex => Task.Run(() => + { + // Verify context flowed into Task.Run + var entity = PowertoolsTracing.GetEntity(); + if (entity == null) + { + contextPreservedInAllTasks = false; + } + Thread.Sleep(Random.Shared.Next(1, 5)); + })); + + await Task.WhenAll(taskRunTasks); + + result.ContextPreserved = contextPreservedInAllTasks; + + var finalEntity = PowertoolsTracing.GetEntity(); + result.ActualSegmentName = finalEntity?.Name; + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + } + + results.Add(result); + }).ToList(); + + await Task.WhenAll(tasks); + + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.True(r.ContextPreserved, + $"Invocation {r.InvocationIndex} lost context in Task.Run")); + } + + #endregion +} diff --git a/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingThreadSafetyTests.cs b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingThreadSafetyTests.cs new file mode 100644 index 00000000..106acb2f --- /dev/null +++ b/libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/Tracing/TracingThreadSafetyTests.cs @@ -0,0 +1,500 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Amazon.XRay.Recorder.Core; +using Amazon.XRay.Recorder.Core.Internal.Entities; +using AWS.Lambda.Powertools.Common; +using Xunit; +using PowertoolsTracing = AWS.Lambda.Powertools.Tracing.Tracing; + +namespace AWS.Lambda.Powertools.ConcurrencyTests.Tracing; + +/// +/// Tests for validating thread safety in Powertools Tracing +/// under concurrent execution scenarios. +/// +/// These tests verify that when multiple Lambda invocations run concurrently, +/// the tracing operations are thread-safe and don't cause exceptions or data corruption. +/// +[Collection("Tracing Concurrency Tests")] +public class TracingThreadSafetyTests : IDisposable +{ + public TracingThreadSafetyTests() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", "/var/task"); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", "TestService"); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", "false"); + } + + public void Dispose() + { + ResetTracingState(); + Environment.SetEnvironmentVariable("LAMBDA_TASK_ROOT", null); + Environment.SetEnvironmentVariable("POWERTOOLS_SERVICE_NAME", null); + Environment.SetEnvironmentVariable("POWERTOOLS_TRACE_DISABLED", null); + } + + private static void ResetTracingState() + { + try + { + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch + { + // Ignore if no entity exists + } + } + + + #region Helper Result Classes + + private class ThreadSafetyResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public int OperationsAttempted { get; set; } + public int OperationsCompleted { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + public string? ExceptionType { get; set; } + } + + private class StressTestResult + { + public string InvocationId { get; set; } = string.Empty; + public int InvocationIndex { get; set; } + public int SubsegmentsCreated { get; set; } + public int AnnotationsAdded { get; set; } + public int MetadataAdded { get; set; } + public bool ExceptionThrown { get; set; } + public string? ExceptionMessage { get; set; } + } + + #endregion + + #region Property 1: Concurrent Operations Thread Safety + + /// + /// Verifies that concurrent tracing operations don't throw exceptions. + /// + [Theory] + [InlineData(2, 10)] + [InlineData(5, 20)] + [InlineData(10, 10)] + public async Task ConcurrentOperations_SimultaneousTracingCalls_ShouldNotThrowException( + int concurrencyLevel, int operationsPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new ThreadSafetyResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + OperationsAttempted = operationsPerInvocation + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + for (int op = 0; op < operationsPerInvocation; op++) + { + // Mix of operations + AWSXRayRecorder.Instance.BeginSubsegment($"Op_{invocationIndex}_{op}"); + + PowertoolsTracing.AddAnnotation($"key_{op}", $"value_{invocationId}_{op}"); + PowertoolsTracing.AddMetadata($"meta_{op}", new { Id = invocationId, Op = op }); + + AWSXRayRecorder.Instance.EndSubsegment(); + result.OperationsCompleted++; + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = ex.Message; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, + $"Invocation {r.InvocationIndex} threw {r.ExceptionType}: {r.ExceptionMessage}")); + Assert.All(results, r => Assert.Equal(r.OperationsAttempted, r.OperationsCompleted)); + } + + #endregion + + #region Property 2: Nested Subsegments Thread Safety + + /// + /// Verifies that concurrent invocations with nested subsegments don't interfere. + /// + [Theory] + [InlineData(2, 3)] + [InlineData(5, 2)] + [InlineData(10, 2)] + public async Task NestedSubsegments_ConcurrentInvocations_ShouldNotInterfere( + int concurrencyLevel, int nestingDepth) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new ThreadSafetyResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + OperationsAttempted = nestingDepth + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"TestLambda_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Create nested subsegments + for (int depth = 0; depth < nestingDepth; depth++) + { + AWSXRayRecorder.Instance.BeginSubsegment($"Nested_{invocationIndex}_{depth}"); + PowertoolsTracing.AddAnnotation($"depth_{depth}", depth); + result.OperationsCompleted++; + } + + // End all nested subsegments + for (int depth = nestingDepth - 1; depth >= 0; depth--) + { + AWSXRayRecorder.Instance.EndSubsegment(); + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(r.OperationsAttempted, r.OperationsCompleted)); + } + + #endregion + + + #region Property 3: Stress Test - All Operations + + /// + /// Stress test: Multiple threads simultaneously performing all Tracing operations. + /// This validates that the implementation handles high concurrency. + /// + [Theory] + [InlineData(3, 50)] + [InlineData(5, 100)] + [InlineData(10, 50)] + public async Task StressTest_MultipleThreadsAllOperations_ShouldNotThrowException( + int threadCount, int operationsPerThread) + { + var results = new ConcurrentBag(); + var exceptions = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, threadCount).Select(threadIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new StressTestResult + { + InvocationId = invocationId, + InvocationIndex = threadIndex + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"StressTest_{threadIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + for (int i = 0; i < operationsPerThread; i++) + { + // Mix of all operations + var subsegmentName = $"stress_{threadIndex}_{i}"; + + // BeginSubsegment + AWSXRayRecorder.Instance.BeginSubsegment(subsegmentName); + result.SubsegmentsCreated++; + + // AddAnnotation + PowertoolsTracing.AddAnnotation($"thread_{threadIndex}_op_{i}", i); + result.AnnotationsAdded++; + + // AddMetadata + PowertoolsTracing.AddMetadata($"meta_{i}", new { Thread = threadIndex, Op = i }); + result.MetadataAdded++; + + // EndSubsegment + AWSXRayRecorder.Instance.EndSubsegment(); + + // Occasionally use WithSubsegment + if (i % 5 == 0) + { + PowertoolsTracing.WithSubsegment($"with_{threadIndex}_{i}", subsegment => + { + subsegment.AddAnnotation("nested", true); + }); + } + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}\n{ex.StackTrace}"; + exceptions.Add(ex); + } + + results.Add(result); + })).ToList(); + + await Task.WhenAll(tasks); + + // All threads should complete without exceptions + Assert.Equal(threadCount, results.Count); + Assert.Empty(exceptions); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(operationsPerThread, r.SubsegmentsCreated)); + } + + #endregion + + #region Property 4: Rapid Subsegment Creation/Destruction + + /// + /// Verifies that rapid creation and destruction of subsegments is thread-safe. + /// + [Theory] + [InlineData(2, 100)] + [InlineData(5, 50)] + [InlineData(10, 30)] + public async Task RapidSubsegmentLifecycle_ConcurrentInvocations_ShouldNotThrowException( + int concurrencyLevel, int cyclesPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new ThreadSafetyResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + OperationsAttempted = cyclesPerInvocation + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"RapidTest_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + // Rapid create/destroy cycles + for (int cycle = 0; cycle < cyclesPerInvocation; cycle++) + { + AWSXRayRecorder.Instance.BeginSubsegment($"rapid_{invocationIndex}_{cycle}"); + // Minimal work + PowertoolsTracing.AddAnnotation("cycle", cycle); + AWSXRayRecorder.Instance.EndSubsegment(); + result.OperationsCompleted++; + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(r.OperationsAttempted, r.OperationsCompleted)); + } + + #endregion + + + #region Property 5: Exception Handling Thread Safety + + /// + /// Verifies that concurrent invocations handling exceptions don't interfere. + /// + [Theory] + [InlineData(2)] + [InlineData(5)] + [InlineData(10)] + public async Task ExceptionHandling_ConcurrentInvocations_ShouldNotInterfere(int concurrencyLevel) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new ThreadSafetyResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + OperationsAttempted = 1 + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"ExceptionTest_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + AWSXRayRecorder.Instance.BeginSubsegment($"exception_{invocationIndex}"); + + // Add an exception to the trace + var testException = new InvalidOperationException($"Test exception from invocation {invocationIndex}"); + PowertoolsTracing.AddException(testException); + result.OperationsCompleted++; + + AWSXRayRecorder.Instance.EndSubsegment(); + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without unexpected exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(r.OperationsAttempted, r.OperationsCompleted)); + } + + #endregion + + #region Property 6: GetEntity/SetEntity Thread Safety + + /// + /// Verifies that GetEntity and SetEntity operations are thread-safe. + /// + [Theory] + [InlineData(2, 10)] + [InlineData(5, 20)] + [InlineData(10, 10)] + public async Task GetSetEntity_ConcurrentInvocations_ShouldMaintainIsolation( + int concurrencyLevel, int operationsPerInvocation) + { + var results = new ConcurrentBag(); + + var tasks = Enumerable.Range(0, concurrencyLevel).Select(invocationIndex => Task.Run(() => + { + var invocationId = Guid.NewGuid().ToString("N"); + var result = new ThreadSafetyResult + { + InvocationId = invocationId, + InvocationIndex = invocationIndex, + OperationsAttempted = operationsPerInvocation + }; + + try + { + // Create a root segment for this "invocation" + var rootSegment = new Segment($"EntityTest_{invocationIndex}"); + rootSegment.SetStartTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.SetEntity(rootSegment); + + for (int op = 0; op < operationsPerInvocation; op++) + { + // Get current entity + var entity = PowertoolsTracing.GetEntity(); + + // Verify we got an entity (should be our root segment or a subsegment) + Assert.NotNull(entity); + + // Create a subsegment + AWSXRayRecorder.Instance.BeginSubsegment($"entity_op_{invocationIndex}_{op}"); + + // Get entity again - should now be the subsegment + var subsegmentEntity = PowertoolsTracing.GetEntity(); + Assert.NotNull(subsegmentEntity); + + AWSXRayRecorder.Instance.EndSubsegment(); + result.OperationsCompleted++; + } + + rootSegment.SetEndTimeToNow(); + AWSXRayRecorder.Instance.TraceContext.ClearEntity(); + } + catch (Exception ex) + { + result.ExceptionThrown = true; + result.ExceptionMessage = $"{ex.GetType().Name}: {ex.Message}"; + result.ExceptionType = ex.GetType().Name; + } + + results.Add(result); + })); + + await Task.WhenAll(tasks); + + // All invocations should complete without exceptions + Assert.Equal(concurrencyLevel, results.Count); + Assert.All(results, r => Assert.False(r.ExceptionThrown, r.ExceptionMessage)); + Assert.All(results, r => Assert.Equal(r.OperationsAttempted, r.OperationsCompleted)); + } + + #endregion +}