Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ obj/
*.received.*
nugets/
/TestResults
.claude/settings.local.json
40 changes: 23 additions & 17 deletions src/SqlServer.Deduplication.SampleEndpoint/Program.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using Microsoft.Data.SqlClient;
using NServiceBus.Logging;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NServiceBus.Transport.SqlServerDeduplication;
using SampleNamespace;

class Program
{
const string connection = @"Server=.\SQLExpress;Database=DedupeSample; Integrated Security=True;Max Pool Size=100;TrustServerCertificate=True";
static async Task Main()
static async Task Main(string[] args)
{
var defaultFactory = LogManager.Use<DefaultFactory>();
defaultFactory.Level(LogLevel.Info);
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";

var configuration = new EndpointConfiguration("SampleEndpoint");
configuration.EnableInstallers();
Expand All @@ -20,13 +21,18 @@ static async Task Main()
var transport = configuration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
configuration.EnableInstallers();
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";
Console.TreatControlCAsInput = true;
var endpoint = await Endpoint.Start(configuration);
await SendMessages(endpoint);

var builder = Host.CreateApplicationBuilder(args);
builder.Logging.SetMinimumLevel(LogLevel.Information);
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();

var session = host.Services.GetRequiredService<IMessageSession>();
await SendMessages(session);

Console.ReadKey(true);
await endpoint.Stop();
await host.StopAsync();
}

static async Task<SqlConnection> ConnectionBuilder(Cancel cancel)
Expand All @@ -44,20 +50,20 @@ static async Task<SqlConnection> ConnectionBuilder(Cancel cancel)
}
}

static async Task SendMessages(IEndpointInstance endpoint)
static async Task SendMessages(IMessageSession session)
{
var guid = Guid.NewGuid();
var dedupeOutcome1 = await SendMessage(endpoint, guid);
var dedupeOutcome1 = await SendMessage(session, guid);
Console.WriteLine($"DedupeOutcome:{dedupeOutcome1.DedupeOutcome}. Context:{dedupeOutcome1.Context}");
var dedupeOutcome2 = await SendMessage(endpoint, guid);
var dedupeOutcome2 = await SendMessage(session, guid);
Console.WriteLine($"DedupeOutcome:{dedupeOutcome2.DedupeOutcome}. Context:{dedupeOutcome2.Context}");
}

static Task<DedupeResult> SendMessage(IEndpointInstance endpoint, Guid guid)
static Task<DedupeResult> SendMessage(IMessageSession session, Guid guid)
{
var message = new SampleMessage();
var options = new SendOptions();
options.RouteToThisEndpoint();
return endpoint.SendWithDedupe(guid, message, options);
return session.SendWithDedupe(guid, message, options);
}
}
}
40 changes: 19 additions & 21 deletions src/SqlServer.HttpPassthrough.SampleEndpoint/Program.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
using NServiceBus.Attachments.Sql;
using Microsoft.Extensions.Hosting;
using NServiceBus.Attachments.Sql;

class Program
{
static async Task Main()
{
var configuration = new EndpointConfiguration("SampleEndpoint");
configuration.UsePersistence<LearningPersistence>();
var attachments = configuration.EnableAttachments(Connection.ConnectionString, TimeToKeep.Default);
attachments.UseTransportConnectivity();
configuration.UseSerialization<NewtonsoftJsonSerializer>();
configuration.PurgeOnStartup(true);
var transport = configuration.UseTransport<SqlServerTransport>();
transport.ConnectionString(Connection.ConnectionString);
configuration.EnableInstallers();
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";
Console.TreatControlCAsInput = true;
var endpoint = await Endpoint.Start(configuration);
Console.ReadKey(true);
await endpoint.Stop();
}
}
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";

var builder = Host.CreateApplicationBuilder(args);

var configuration = new EndpointConfiguration("SampleEndpoint");
configuration.UsePersistence<LearningPersistence>();
var attachments = configuration.EnableAttachments(Connection.ConnectionString, TimeToKeep.Default);
attachments.UseTransportConnectivity();
configuration.UseSerialization<NewtonsoftJsonSerializer>();
configuration.PurgeOnStartup(true);
var transport = configuration.UseTransport<SqlServerTransport>();
transport.ConnectionString(Connection.ConnectionString);
configuration.EnableInstallers();

builder.Services.AddNServiceBusEndpoint(configuration);

await builder.Build().RunAsync();
34 changes: 20 additions & 14 deletions src/SqlServer.Native.SubscriptionSampleEndpoint/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Microsoft.Data.SqlClient;
using NServiceBus.Logging;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NServiceBus.Transport.SqlServerNative;
using SampleNamespace;

class Program
{
const string connection = @"Server=.\SQLExpress;Database=SubscriptionSample; Integrated Security=True;Max Pool Size=100;TrustServerCertificate=True";
static async Task Main()
static async Task Main(string[] args)
{
await CreateTables();
var defaultFactory = LogManager.Use<DefaultFactory>();
defaultFactory.Level(LogLevel.Info);
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";

var configuration = new EndpointConfiguration("SampleEndpoint");
configuration.UsePersistence<LearningPersistence>();
Expand All @@ -19,14 +20,19 @@ static async Task Main()
var transport = configuration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

configuration.EnableInstallers();
Console.Title = "SampleEndpoint Press Ctrl-C to Exit.";
Console.TreatControlCAsInput = true;
var endpoint = await Endpoint.Start(configuration);
await Publish(endpoint);

var builder = Host.CreateApplicationBuilder(args);
builder.Logging.SetMinimumLevel(LogLevel.Information);
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();

var session = host.Services.GetRequiredService<IMessageSession>();
await Publish(session);

Console.ReadKey(true);
await endpoint.Stop();
await host.StopAsync();
}

static async Task CreateTables()
Expand Down Expand Up @@ -55,9 +61,9 @@ static async Task<SqlConnection> ConnectionBuilder()
}
}

static Task Publish(IEndpointInstance endpoint)
static Task Publish(IMessageSession session)
{
var message = new SampleMessage();
return endpoint.Publish(message);
return session.Publish(message);
}
}
}
23 changes: 14 additions & 9 deletions src/SqlServer.Native.Tests/DedupeIntegrationTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using NServiceBus.Attachments.Sql;
using NServiceBus.Attachments.Sql;
using DedupeOutcome = NServiceBus.Transport.SqlServerDeduplication.DedupeOutcome;
using DedupeResult = NServiceBus.Transport.SqlServerDeduplication.DedupeResult;

Expand All @@ -10,35 +10,35 @@ public class DedupeIntegrationTests :
[Test]
public async Task Integration()
{
var endpoint = await StartEndpoint();
var (host, session) = await StartEndpoint();
var messageId = Guid.NewGuid();
var result = await SendMessage(messageId, endpoint, "context1");
var result = await SendMessage(messageId, session, "context1");
await Assert.That(result.Context).IsEqualTo("context1");
await Assert.That(result.DedupeOutcome).IsEqualTo(DedupeOutcome.Sent);
result = await SendMessage(messageId, endpoint, "context2");
result = await SendMessage(messageId, session, "context2");
await Assert.That(result.Context).IsEqualTo("context1");
await Assert.That(result.DedupeOutcome).IsEqualTo(DedupeOutcome.Deduplicated);
if (!countdown.Wait(TimeSpan.FromSeconds(20)))
{
throw new("Expected dedup");
}

await endpoint.Stop();
await host.StopAsync();
}

static async Task<DedupeResult> SendMessage(Guid messageId, IEndpointInstance endpoint, string context)
static async Task<DedupeResult> SendMessage(Guid messageId, IMessageSession session, string context)
{
var sendOptions = new SendOptions();
sendOptions.RouteToThisEndpoint();
var sendWithDedupe = await endpoint.SendWithDedupe(messageId, new MyMessage(), sendOptions,context);
var sendWithDedupe = await session.SendWithDedupe(messageId, new MyMessage(), sendOptions, context);
if (sendWithDedupe.DedupeOutcome == DedupeOutcome.Deduplicated)
{
countdown.Signal();
}
return sendWithDedupe;
}

static Task<IEndpointInstance> StartEndpoint()
static async Task<(IHost, IMessageSession)> StartEndpoint()
{
var configuration = new EndpointConfiguration(nameof(DedupeIntegrationTests));
configuration.UsePersistence<LearningPersistence>();
Expand All @@ -53,7 +53,12 @@ static Task<IEndpointInstance> StartEndpoint()
var transport = configuration.UseTransport<SqlServerTransport>();
transport.ConnectionString(Connection.ConnectionString);
transport.NativeDelayedDelivery();
return Endpoint.Start(configuration);

var builder = Host.CreateApplicationBuilder();
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();
return (host, host.Services.GetRequiredService<IMessageSession>());
}

class Handler :
Expand Down
1 change: 1 addition & 0 deletions src/SqlServer.Native.Tests/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
global using System.Security.Claims;
global using Microsoft.Data.SqlClient;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Hosting;
global using NServiceBus.SqlServer.HttpPassthrough;
global using NServiceBus.Transport.SqlServerNative;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.TestHost;
using HttpContext = Microsoft.AspNetCore.Http.HttpContext;
Expand All @@ -18,7 +18,7 @@ public async Task Integration()
await manager.Create();
}

var endpoint = await StartEndpoint();
var host = await StartEndpoint();

var hostBuilder = new WebHostBuilder()
.ConfigureServices(services =>
Expand Down Expand Up @@ -58,7 +58,7 @@ public async Task Integration()

Thread.Sleep(3000);

await endpoint.Stop();
await host.StopAsync();
await Assert.That(count).IsEqualTo(1);
}

Expand All @@ -74,10 +74,14 @@ static async Task<int> SendAsync(ClientFormSender clientFormSender, Guid guid)
return send.httpStatus;
}

static async Task<IEndpointInstance> StartEndpoint()
static async Task<IHost> StartEndpoint()
{
var configuration = await EndpointCreator.Create(nameof(HttpPassthroughDedupTests));
return await Endpoint.Start(configuration);
var builder = Host.CreateApplicationBuilder();
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();
return host;
}

static Task<Table> AmendMessage(HttpContext context, PassthroughMessage message) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if DEBUG
#if DEBUG

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
Expand All @@ -23,7 +23,7 @@ public async Task Integration()
}

var resetEvent = new ManualResetEvent(false);
var endpoint = await StartEndpoint(resetEvent);
var host = await StartEndpoint(resetEvent);

await SubmitMultipartForm();

Expand All @@ -32,7 +32,7 @@ public async Task Integration()
throw new("OutgoingMessage not received");
}

await endpoint.Stop();
await host.StopAsync();
}

static async Task SubmitMultipartForm()
Expand Down Expand Up @@ -79,13 +79,17 @@ await clientFormSender.Send(
});
}

static async Task<IEndpointInstance> StartEndpoint(ManualResetEvent resetEvent)
static async Task<IHost> StartEndpoint(ManualResetEvent resetEvent)
{
var configuration = await EndpointCreator.Create(nameof(HttpPassthroughIntegrationTests));
var attachments = configuration.EnableAttachments(Connection.ConnectionString, TimeToKeep.Default);
configuration.RegisterComponents(_ => _.AddSingleton(resetEvent));
attachments.UseTransportConnectivity();
return await Endpoint.Start(configuration);
var builder = Host.CreateApplicationBuilder();
builder.Services.AddSingleton(resetEvent);
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();
return host;
}

static Task<Table> AmendMessage(HttpContext context, PassthroughMessage message) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
public class ConsumerIntegrationTests :
public class ConsumerIntegrationTests :
TestBase
{
static string table = "IntegrationConsumer_Consumer";
Expand All @@ -11,18 +11,23 @@ public async Task Run()
await manager.Create();
var configuration = await EndpointCreator.Create("IntegrationConsumer");
configuration.SendOnly();
var endpoint = await Endpoint.Start(configuration);
await SendStartMessage(endpoint);
var builder = Host.CreateApplicationBuilder();
builder.Services.AddNServiceBusEndpoint(configuration);
var host = builder.Build();
await host.StartAsync();
var session = host.Services.GetRequiredService<IMessageSession>();
await SendStartMessage(session);
var consumer = new QueueManager(table, SqlConnection);
await using var message = await consumer.Consume();
await Assert.That(message).IsNotNull();
await host.StopAsync();
}

static Task SendStartMessage(IEndpointInstance endpoint)
static Task SendStartMessage(IMessageSession session)
{
var sendOptions = new SendOptions();
sendOptions.SetDestination(table);
return endpoint.Send(new SendMessage(), sendOptions);
return session.Send(new SendMessage(), sendOptions);
}

class SendMessage :
Expand Down
Loading
Loading