Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
g7ed6e committed Nov 20, 2023
1 parent 2f1354b commit 3b05a47
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 25 deletions.
14 changes: 14 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "samples\KafkaBa
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "samples\KafkaBasic\Producer\Producer.csproj", "{8463BB20-C998-4318-8265-4D9601DA7D1E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Kafka.Producer.Tests", "tests\Aspire.Kafka.Producer.Tests\Aspire.Kafka.Producer.Tests.csproj", "{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Kafka.Consumer.Tests", "tests\Aspire.Kafka.Consumer.Tests\Aspire.Kafka.Consumer.Tests.csproj", "{1452CE38-284F-4845-814F-5BC828F2EE25}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -456,6 +460,14 @@ Global
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8463BB20-C998-4318-8265-4D9601DA7D1E}.Release|Any CPU.Build.0 = Release|Any CPU
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78}.Release|Any CPU.Build.0 = Release|Any CPU
{1452CE38-284F-4845-814F-5BC828F2EE25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1452CE38-284F-4845-814F-5BC828F2EE25}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1452CE38-284F-4845-814F-5BC828F2EE25}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1452CE38-284F-4845-814F-5BC828F2EE25}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -536,6 +548,8 @@ Global
{A6DAFDA3-4AD5-4F06-8582-B9D0928DD933} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
{AF581BA0-60F0-4DC0-956A-3C211DF3BC3C} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
{8463BB20-C998-4318-8265-4D9601DA7D1E} = {C6A650C8-256E-49DF-B7B7-C001255A3688}
{7F0BAF43-46D7-42B2-B9E8-3716167FDC78} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{1452CE38-284F-4845-814F-5BC828F2EE25} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static void AddKafkaConsumer<TKey, TValue>(
Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder,
Action<ConsumerConfig>? configureConsumerConfig,
string connectionName,
object? serviceKey)
string? serviceKey)
{
ArgumentNullException.ThrowIfNull(builder);

Expand All @@ -62,26 +62,39 @@ private static void AddKafkaConsumer<TKey, TValue>(

configureConsumerConfig?.Invoke(config);

ConsumerBuilder<TKey, TValue> CreateConsumerBuilder(IServiceProvider sp)
{
// Create and configure the consumer builder
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config);
configureConsumerBuilder?.Invoke(consumerBuilder);

return consumerBuilder;
}

if (serviceKey is null)
{
builder.Services.AddSingleton<ConsumerBuilder<TKey, TValue>>(CreateConsumerBuilder);
builder.Services.AddSingleton<ConsumerConfig>(config);
if (configureConsumerBuilder is not null)
{
builder.Services.AddSingleton<Action<ConsumerBuilder<TKey, TValue>>>(configureConsumerBuilder);
}
builder.Services.AddSingleton<ConsumerBuilder<TKey, TValue>>(sp => CreateConsumerBuilder<TKey, TValue>(sp, null));
builder.Services.AddSingleton<IConsumer<TKey, TValue>>(sp => CreateConsumer(sp.GetRequiredService<ConsumerBuilder<TKey, TValue>>()));
}
else
{
builder.Services.AddKeyedSingleton<ConsumerBuilder<TKey, TValue>>(serviceKey, (sp, _) => CreateConsumerBuilder(sp));
builder.Services.AddKeyedSingleton<ConsumerConfig>(serviceKey, config);
if (configureConsumerBuilder is not null)
{
builder.Services.AddKeyedSingleton<Action<ConsumerBuilder<TKey, TValue>>>(serviceKey, configureConsumerBuilder);
}
builder.Services.AddKeyedSingleton<ConsumerBuilder<TKey, TValue>>(serviceKey, (sp, key) => CreateConsumerBuilder<TKey, TValue>(sp, key as string));
builder.Services.AddKeyedSingleton<IConsumer<TKey, TValue>>(serviceKey, (sp, key) => CreateConsumer(sp.GetRequiredKeyedService<ConsumerBuilder<TKey, TValue>>(key)));
}
}

private static ConsumerBuilder<TKey, TValue> CreateConsumerBuilder<TKey, TValue>(IServiceProvider sp, string? key)
{
// Create and configure the consumer builder
(ConsumerConfig config, Action<ConsumerBuilder<TKey, TValue>>? configureConsumerBuilder) = key is null
? (sp.GetRequiredService<ConsumerConfig>(), sp.GetService<Action<ConsumerBuilder<TKey, TValue>>>())
: (sp.GetRequiredKeyedService<ConsumerConfig>(key), sp.GetKeyedService<Action<ConsumerBuilder<TKey, TValue>>>(key));
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config);
configureConsumerBuilder?.Invoke(consumerBuilder);

return consumerBuilder;
}

private static IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(ConsumerBuilder<TKey, TValue> consumerBuilder) => consumerBuilder.Build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.Extensions.Hosting;
/// <summary>
/// Extension methods for connecting to a Kafka broker.
/// </summary>
public static partial class AspireKafkaProducerExtensions
public static class AspireKafkaProducerExtensions
{
private const string DefaultConfigSectionName = "Aspire:Kafka:Producer";

Expand Down Expand Up @@ -46,7 +46,7 @@ private static void AddKafkaProducer<TKey, TValue>(
Action<ProducerBuilder<TKey, TValue>>? configureProducerBuilder,
Action<ProducerConfig>? configureProducerConfig,
string connectionName,
object? serviceKey)
string? serviceKey)
{
ArgumentNullException.ThrowIfNull(builder);

Expand All @@ -62,26 +62,39 @@ private static void AddKafkaProducer<TKey, TValue>(

configureProducerConfig?.Invoke(config);

ProducerBuilder<TKey, TValue> CreateProducerBuilder(IServiceProvider sp)
{
// Create and configure the producer builder
var producerBuilder = new ProducerBuilder<TKey, TValue>(config);
configureProducerBuilder?.Invoke(producerBuilder);

return producerBuilder;
}

if (serviceKey is null)
{
builder.Services.AddSingleton<ProducerBuilder<TKey, TValue>>(CreateProducerBuilder);
builder.Services.AddSingleton<ProducerConfig>(config);
if (configureProducerBuilder is not null)
{
builder.Services.AddSingleton<Action<ProducerBuilder<TKey, TValue>>>(configureProducerBuilder);
}
builder.Services.AddSingleton<ProducerBuilder<TKey, TValue>>(sp => CreateProducerBuilder<TKey, TValue>(sp, null));
builder.Services.AddSingleton<IProducer<TKey, TValue>>(sp => CreateProducer(sp.GetRequiredService<ProducerBuilder<TKey, TValue>>()));
}
else
{
builder.Services.AddKeyedSingleton<ProducerBuilder<TKey, TValue>>(serviceKey, (sp, _) => CreateProducerBuilder(sp));
builder.Services.AddKeyedSingleton<ProducerConfig>(serviceKey, config);
if (configureProducerBuilder is not null)
{
builder.Services.AddKeyedSingleton<Action<ProducerBuilder<TKey, TValue>>>(serviceKey, configureProducerBuilder);
}
builder.Services.AddKeyedSingleton<ProducerBuilder<TKey, TValue>>(serviceKey, (sp, key) => CreateProducerBuilder<TKey, TValue>(sp, key as string));
builder.Services.AddKeyedSingleton<IProducer<TKey, TValue>>(serviceKey, (sp, key) => CreateProducer(sp.GetRequiredKeyedService<ProducerBuilder<TKey, TValue>>(key)));
}
}

private static ProducerBuilder<TKey, TValue> CreateProducerBuilder<TKey, TValue>(IServiceProvider sp, string? key)
{
// Create and configure the producer builder
(ProducerConfig config, Action<ProducerBuilder<TKey, TValue>>? configureProducerBuilder) = key is null
? (sp.GetRequiredService<ProducerConfig>(), sp.GetService<Action<ProducerBuilder<TKey, TValue>>>())
: (sp.GetRequiredKeyedService<ProducerConfig>(key), sp.GetKeyedService<Action<ProducerBuilder<TKey, TValue>>>(key));
var producerBuilder = new ProducerBuilder<TKey, TValue>(config);
configureProducerBuilder?.Invoke(producerBuilder);

return producerBuilder;
}

private static IProducer<TKey, TValue> CreateProducer<TKey, TValue>(ProducerBuilder<TKey, TValue> producerBuilder) => producerBuilder.Build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Components\Aspire.Kafka.Consumer\Aspire.Kafka.Consumer.csproj" />
<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;

namespace Aspire.Kafka.Consumer.Tests;

public class AspireKafkaConsumerExtensionsTests
{
[ConditionalTheory]
[InlineData(true)]
[InlineData(false)]
public void ReadsFromConnectionStringsCorrectly(bool useKeyed)
{
var builder = Host.CreateEmptyApplicationBuilder(null);
builder.Configuration.AddInMemoryCollection([
new KeyValuePair<string, string?>("ConnectionStrings:messaging", AspireKafkaConsumerHelpers.TestingEndpoint)
]);

if (useKeyed)
{
builder.AddKeyedKafkaConsumer<string, string>("messaging");
}
else
{
builder.AddKafkaConsumer<string, string>("messaging");
}

var host = builder.Build();
var ConsumerConfig = useKeyed ?
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
host.Services.GetRequiredService<ConsumerConfig>();

Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, ConsumerConfig.BootstrapServers);
}

[ConditionalTheory]
[InlineData(true)]
[InlineData(false)]
public void ConnectionStringCanBeSetInCode(bool useKeyed)
{
var builder = Host.CreateEmptyApplicationBuilder(null);
builder.Configuration.AddInMemoryCollection([
new KeyValuePair<string, string?>("ConnectionStrings:messaging", "unused")
]);

static void SetConnectionString(ConsumerConfig config) => config.BootstrapServers = AspireKafkaConsumerHelpers.TestingEndpoint;
if (useKeyed)
{
builder.AddKeyedKafkaConsumer<string, string>("messaging", configureConsumerConfig: SetConnectionString);
}
else
{
builder.AddKafkaConsumer<string, string>("messaging", configureConsumerConfig: SetConnectionString);
}

var host = builder.Build();
var config = useKeyed ?
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
host.Services.GetRequiredService<ConsumerConfig>();

Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, config.BootstrapServers);
}

[ConditionalTheory]
[InlineData(true)]
[InlineData(false)]
public void ConnectionNameWinsOverConfigSection(bool useKeyed)
{
var builder = Host.CreateEmptyApplicationBuilder(null);

var key = useKeyed ? "redis" : null;
builder.Configuration.AddInMemoryCollection([
new KeyValuePair<string, string?>(ConformanceTests.CreateConfigKey("Aspire:Kafka:Consumer", key, "ConnectionString"), "unused"),
new KeyValuePair<string, string?>("ConnectionStrings:messaging", AspireKafkaConsumerHelpers.TestingEndpoint)
]);

if (useKeyed)
{
builder.AddKeyedKafkaConsumer<string, string>("messaging");
}
else
{
builder.AddKafkaConsumer<string, string>("messaging");
}

var host = builder.Build();
var config = useKeyed ?
host.Services.GetRequiredKeyedService<ConsumerConfig>("messaging") :
host.Services.GetRequiredService<ConsumerConfig>();

Assert.Equal(AspireKafkaConsumerHelpers.TestingEndpoint, config.BootstrapServers);
}

[Fact]
public void ConsumerConfigOptionsFromConfig()
{
static Stream CreateStreamFromString(string data) => new MemoryStream(Encoding.UTF8.GetBytes(data));

using var jsonStream = CreateStreamFromString("""
{
"Aspire": {
"Kafka": {
"Consumer": {
"AutoOffsetReset": "Earliest",
"SaslUsername": "user",
"SaslPassword": "password",
"SaslMechanism": "Plain",
"SecurityProtocol": "Plaintext"
}
}
}
}
""");

var builder = Host.CreateEmptyApplicationBuilder(null);

builder.Configuration.AddJsonStream(jsonStream);

builder.AddKafkaConsumer<string, string>("messaging");

var host = builder.Build();
var config = (ConsumerConfig)host.Services.GetRequiredService<ConsumerConfig>();

Assert.Equal(AutoOffsetReset.Earliest, config.AutoOffsetReset);
Assert.Equal("user", config.SaslUsername);
Assert.Equal("password", config.SaslPassword);
Assert.Equal(SaslMechanism.Plain, config.SaslMechanism);
Assert.Equal(SecurityProtocol.Plaintext, config.SecurityProtocol);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Kafka.Consumer.Tests;

internal sealed class AspireKafkaConsumerHelpers
{
public const string TestingEndpoint = "localhost:9092";
}
50 changes: 50 additions & 0 deletions tests/Aspire.Kafka.Consumer.Tests/ConformanceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.ConformanceTests;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Aspire.Kafka.Consumer.Tests;
internal sealed class ConformanceTests : ConformanceTests<ConsumerConfig, ConsumerConfig>
{
protected override ServiceLifetime ServiceLifetime => throw new NotImplementedException();

protected override string ActivitySourceName => throw new NotImplementedException();

protected override string JsonSchemaPath => throw new NotImplementedException();

protected override string[] RequiredLogCategories => throw new NotImplementedException();

protected override void PopulateConfiguration(ConfigurationManager configuration, string? key = null)
{
throw new NotImplementedException();
}

protected override void RegisterComponent(HostApplicationBuilder builder, Action<ConsumerConfig>? configure = null, string? key = null)
{
throw new NotImplementedException();
}

protected override void SetHealthCheck(ConsumerConfig options, bool enabled)
{
throw new NotImplementedException();
}

protected override void SetMetrics(ConsumerConfig options, bool enabled)
{
throw new NotImplementedException();
}

protected override void SetTracing(ConsumerConfig options, bool enabled)
{
throw new NotImplementedException();
}

protected override void TriggerActivity(ConsumerConfig service)
{
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Components\Aspire.Kafka.Producer\Aspire.Kafka.Producer.csproj" />
<ProjectReference Include="..\Aspire.Components.Common.Tests\Aspire.Components.Common.Tests.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit 3b05a47

Please sign in to comment.