Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Settings;
using DurableTask.Core.Tracing;
using LLL.DurableTask.Core;
using LLL.DurableTask.EFCore.Entities;
using LLL.DurableTask.EFCore.Mappers;
Expand Down Expand Up @@ -156,14 +158,18 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs

await dbContext.SaveChangesAsync();

return new TaskOrchestrationWorkItem
var workItem = new TaskOrchestrationWorkItem
{
InstanceId = instance.InstanceId,
LockedUntilUtc = instance.LockedUntil,
OrchestrationRuntimeState = session.RuntimeState,
NewMessages = messages,
Session = session
};

AttachTraceContext(workItem);

return workItem;
},
r => r is not null,
receiveTimeout,
Expand Down Expand Up @@ -554,6 +560,24 @@ private async Task<ActivityMessage> LockActivityMessage(OrchestrationDbContext d
return null;
}

// When distributed tracing is enabled, DurableTask.Core's TaskOrchestrationDispatcher
// reads CorrelationTraceContext.Current (seeded from workItem.TraceContext) and
// dereferences it unconditionally (e.g. ExecutionStartedEvent.Correlation =
// CorrelationTraceContext.Current.SerializableTraceContext). A backend that leaves
// TraceContext null makes the dispatcher throw a NullReferenceException, which aborts
// the work item and retries it forever. Mirror the reference backends by restoring the
// trace context from the ExecutionStartedEvent's correlation payload (Restore returns a
// valid empty context when there is none). Guarded so the tracing-off path stays
// zero-overhead.
private static void AttachTraceContext(TaskOrchestrationWorkItem workItem)
{
if (!CorrelationSettings.Current.EnableDistributedTracing)
return;

var correlation = workItem.OrchestrationRuntimeState?.ExecutionStartedEvent?.Correlation;
workItem.TraceContext = TraceContextBase.Restore(correlation);
}

private static string CreateTaskActivityWorkItemId(Guid id, string lockId, string replyQueue)
{
return $"{id}|{lockId}|{replyQueue}";
Expand Down
143 changes: 143 additions & 0 deletions test/LLL.DurableTask.Tests/Storages/DistributedTracingTestBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using AwesomeAssertions;
using DurableTask.Core;
using DurableTask.Core.Settings;
using LLL.DurableTask.Tests.Storage.Activities;
using LLL.DurableTask.Tests.Storage.Orchestrations;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace LLL.DurableTask.Tests.Storages;

// Distributed-tracing acceptance test. Runs single-threaded in its own collection because
// DurableTask.Core.Settings.CorrelationSettings.Current is a process-wide static. A dedicated
// base (rather than StorageTestBase) is required so tracing can be enabled before the host
// starts and only the orchestration/activity under test are registered.
[Collection("DistributedTracing")]
public abstract class DistributedTracingTestBase : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
protected IHost _host;
protected IConfiguration Configuration { get; }
protected TimeSpan FastWaitTimeout { get; set; } = TimeSpan.FromSeconds(60);

protected DistributedTracingTestBase(ITestOutputHelper output)
{
_output = output;

#if NET10_0
var framework = "net10";
#elif NET9_0
var framework = "net9";
#elif NET8_0
var framework = "net8";
#endif

Configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", false)
.AddJsonFile($"appsettings.{framework}.json", false)
.AddEnvironmentVariables()
.Build();
}

protected abstract void ConfigureStorage(IServiceCollection services);

public virtual async Task InitializeAsync()
{
CorrelationSettings.Current.EnableDistributedTracing = true;
CorrelationSettings.Current.Protocol = Protocol.W3CTraceContext;

_host = await Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.AddFilter(l => l >= LogLevel.Warning).AddXUnit(_output);
})
.ConfigureServices(services =>
{
ConfigureStorage(services);
services.AddDurableTaskClient();
services.AddDurableTaskWorker(builder =>
{
builder.AddOrchestration<SingleActivityOrchestration>(SingleActivityOrchestration.Name, SingleActivityOrchestration.Version);
builder.AddActivity<SumActivity>(SumActivity.Name, SumActivity.Version);
});
}).StartAsync();
}

public virtual async Task DisposeAsync()
{
CorrelationSettings.Current.EnableDistributedTracing = false;
await _host.StopAsync();
await _host.WaitForShutdownAsync();
_host.Dispose();
}

[Trait("Category", "Integration")]
[SkippableFact]
public async Task Tracing_ClientOrchestrationActivitySpans_ShouldConnect()
{
var spans = new List<Activity>();
using var listener = new ActivityListener
{
ShouldListenTo = source => source.Name == "DurableTask.Core",
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = a => { lock (spans) { spans.Add(a); } },
ActivityStopped = _ => { }
};
ActivitySource.AddActivityListener(listener);

var taskHubClient = _host.Services.GetRequiredService<TaskHubClient>();

string rootTraceId;
OrchestrationInstance instance;
using (var root = new Activity("test-root").Start())
{
rootTraceId = root.TraceId.ToHexString();
instance = await taskHubClient.CreateOrchestrationInstanceAsync(
SingleActivityOrchestration.Name,
SingleActivityOrchestration.Version,
41);
}

// (a) Execution must complete quickly with tracing on.
var state = await taskHubClient.WaitForOrchestrationAsync(instance, FastWaitTimeout);
state.Should().NotBeNull("orchestration must complete when distributed tracing is enabled");
state.OrchestrationStatus.Should().Be(OrchestrationStatus.Completed);
state.Output.Should().Be("42");

// Give any late spans a moment to flush.
await Task.Delay(200);

List<Activity> captured;
lock (spans) { captured = spans.ToList(); }

var names = captured.Select(s => $"{s.DisplayName} trace={s.TraceId.ToHexString()}").ToList();
_output.WriteLine($"root trace id: {rootTraceId}");
_output.WriteLine("captured spans:\n " + string.Join("\n ", names));

var create = captured.FirstOrDefault(s => s.DisplayName.StartsWith("create_orchestration"));
var orch = captured.FirstOrDefault(s => s.DisplayName.StartsWith("orchestration"));
var activity = captured.FirstOrDefault(s => s.DisplayName.StartsWith("activity"));

// (b) All three spans exist and share the caller's root trace id.
create.Should().NotBeNull("client should emit create_orchestration span");
create.TraceId.ToHexString().Should().Be(rootTraceId);

orch.Should().NotBeNull("worker should emit orchestration span");
orch.TraceId.ToHexString().Should().Be(rootTraceId, "orchestration span must connect to caller root");

activity.Should().NotBeNull("worker should emit activity span");
activity.TraceId.ToHexString().Should().Be(rootTraceId, "activity span must connect to caller root");
}
}

[CollectionDefinition("DistributedTracing", DisableParallelization = true)]
public class DistributedTracingCollection { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Xunit.Abstractions;

namespace LLL.DurableTask.Tests.Storages;

public class InMemoryDistributedTracingTests : DistributedTracingTestBase
{
private readonly string _databaseId;

public InMemoryDistributedTracingTests(ITestOutputHelper output) : base(output)
{
_databaseId = Guid.NewGuid().ToString();
}

protected override void ConfigureStorage(IServiceCollection services)
{
services.AddDurableTaskEFCoreStorage()
.UseInMemoryDatabase(_databaseId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#if !NET10_0
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Xunit.Abstractions;

namespace LLL.DurableTask.Tests.Storages;

public class MySqlDistributedTracingTests : DistributedTracingTestBase
{
public MySqlDistributedTracingTests(ITestOutputHelper output) : base(output)
{
}

protected override void ConfigureStorage(IServiceCollection services)
{
var connectionString = Configuration.GetConnectionString("MySql");

Skip.If(string.IsNullOrWhiteSpace(connectionString), "MySql connection string not configured");

services.AddDurableTaskEFCoreStorage()
.UseMySql(connectionString, MySqlServerVersion.AutoDetect(connectionString));
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Threading.Tasks;
using DurableTask.Core;
using LLL.DurableTask.Tests.Storage.Activities;

namespace LLL.DurableTask.Tests.Storage.Orchestrations;

// Minimal orchestration that schedules exactly one activity. Used by the
// distributed-tracing tests to assert client -> orchestration -> activity spans.
public class SingleActivityOrchestration : TaskOrchestration<int, int>
{
public const string Name = "SingleActivity";
public const string Version = "v1";

public override async Task<int> RunTask(OrchestrationContext context, int input)
{
return await context.ScheduleTask<int>(SumActivity.Name, SumActivity.Version, new SumActivity.Input
{
LeftValue = input,
RightValue = 1
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Xunit.Abstractions;

namespace LLL.DurableTask.Tests.Storages;

public class PostgresDistributedTracingTests : DistributedTracingTestBase
{
public PostgresDistributedTracingTests(ITestOutputHelper output) : base(output)
{
}

protected override void ConfigureStorage(IServiceCollection services)
{
var connectionString = Configuration.GetConnectionString("Postgres");

Skip.If(string.IsNullOrWhiteSpace(connectionString), "Postgres connection string not configured");

services.AddDurableTaskEFCoreStorage()
.UseNpgsql(connectionString);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Xunit.Abstractions;

namespace LLL.DurableTask.Tests.Storages;

public class SqlServerDistributedTracingTests : DistributedTracingTestBase
{
public SqlServerDistributedTracingTests(ITestOutputHelper output) : base(output)
{
}

protected override void ConfigureStorage(IServiceCollection services)
{
var connectionString = Configuration.GetConnectionString("SqlServer");

Skip.If(string.IsNullOrWhiteSpace(connectionString), "SqlServer connection string not configured");

services.AddDurableTaskEFCoreStorage()
.UseSqlServer(connectionString);
}
}