Skip to content

Commit

Permalink
Add Aspire.Confluent.Kafka component
Browse files Browse the repository at this point in the history
apply pr suggestions

apply pr suggestions

apply pr suggestions

Sort ConfigurationSchema.json properties

Update ConfigurationSchema.json using ConfigSchemaGenerator

apply pr suggestions

apply pr suggestions

apply pr suggesstions

apply pr suggesstions

apply pr suggestions

apply pr suggestions

drop kafka sample from this repo

apply pr suggestions
  • Loading branch information
g7ed6e committed Jan 11, 2024
1 parent 01e9d2e commit c540de4
Show file tree
Hide file tree
Showing 36 changed files with 3,170 additions and 3 deletions.
14 changes: 14 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OrleansServer", "samples\or
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceDefaults", "samples\orleans\ServiceDefaults\ServiceDefaults.csproj", "{F7D9FA54-1F64-4A36-961A-0087F8E88D07}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka", "src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj", "{174E0507-3BB0-4CDC-829E-9CA75DA66473}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka.Tests", "tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj", "{A8CB331A-1247-41D9-8118-538E5A2CC9DF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -496,6 +500,14 @@ Global
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7D9FA54-1F64-4A36-961A-0087F8E88D07}.Release|Any CPU.Build.0 = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.Build.0 = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.ActiveCfg = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.Build.0 = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -582,6 +594,8 @@ Global
{04B03D1C-45C5-44D4-AEE5-BC315F3D9D26} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{20758E81-7316-49AC-8E1B-A5461397530A} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{F7D9FA54-1F64-4A36-961A-0087F8E88D07} = {8BAF2119-8370-4E9E-A887-D92506F8C727}
{174E0507-3BB0-4CDC-829E-9CA75DA66473} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{A8CB331A-1247-41D9-8118-538E5A2CC9DF} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
4 changes: 4 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Kafka" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.0" />
Expand Down Expand Up @@ -64,6 +65,7 @@
<PackageVersion Include="Microsoft.Extensions.Primitives" Version="$(MicrosoftExtensionsPrimitivesPackageVersion)" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
<!-- external dependencies -->
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
<PackageVersion Include="Dapr.AspNetCore" Version="1.12.0" />
<PackageVersion Include="DnsClient" Version="1.7.0" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.59.0" />
Expand Down Expand Up @@ -111,5 +113,7 @@
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Workloads" Version="8.0.0-beta.23564.4" />
<PackageVersion Include="Microsoft.Signed.Wix" Version="1.0.0-v3.14.0.5722" />
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Installers" Version="8.0.0-beta.23564.4" />
<!-- unit test dependencies -->
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
</ItemGroup>
</Project>
80 changes: 80 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Publishing;

namespace Aspire.Hosting;

public static class KafkaBuilderExtensions
{
private const int KafkaBrokerPort = 9092;
/// <summary>
/// Adds a Kafka broker container to the application.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency.</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaContainerResource}"/></returns>
public static IResourceBuilder<KafkaContainerResource> AddKafkaContainer(this IDistributedApplicationBuilder builder, string name, int? port = null)
{
var kafka = new KafkaContainerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(hostPort: port, containerPort: KafkaBrokerPort)
.WithAnnotation(new ContainerImageAnnotation { Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(context => WriteKafkaContainerToManifest(context, kafka))
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));

static void WriteKafkaContainerToManifest(ManifestPublishingContext context, KafkaContainerResource resource)
{
context.WriteContainer(resource);
context.Writer.WriteString("connectionString", $"{{{resource.Name}.bindings.tcp.host}}:{{{resource.Name}.bindings.tcp.port}}");
}
}

/// <summary>
/// Adds a Kafka resource to the application. A container is used for local development.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, string name)
{
var kafka = new KafkaServerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(containerPort: KafkaBrokerPort)
.WithAnnotation(new ContainerImageAnnotation{ Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(WriteKafkaServerToManifest)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));

static void WriteKafkaServerToManifest(ManifestPublishingContext context)
{
context.Writer.WriteString("type", "kafka.server.v0");
}
}

private static void ConfigureKafkaContainer(EnvironmentCallbackContext context, IResource resource)
{
// confluentinc/confluent-local is a docker image that contains a Kafka broker started with KRaft to avoid pulling a separate image for ZooKeeper.
// See https://github.com/confluentinc/kafka-images/blob/master/local/README.md.
// When not explicitly set default configuration is applied.
// See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults for more details.

var hostPort = context.PublisherName == "manifest"
? KafkaBrokerPort
: GetResourcePort(resource);
context.EnvironmentVariables.Add("KAFKA_ADVERTISED_LISTENERS",
$"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{hostPort}");

static int GetResourcePort(IResource resource)
{
if (!resource.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException(
$"Kafka resource \"{resource.Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
}
37 changes: 37 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaContainerResource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting;

/// <summary>
/// A resource that represents a Kafka broker container.
/// </summary>
/// <param name="name"></param>
public class KafkaContainerResource(string name) : ContainerResource(name), IResourceWithConnectionString, IResourceWithEnvironment
{
/// <summary>
/// Gets the connection string for Kafka broker.
/// </summary>
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
public string? GetConnectionString()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.SingleOrDefault()?.EndPointString;
}

internal int GetPort()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
35 changes: 35 additions & 0 deletions src/Aspire.Hosting/Kafka/KafkaServerResource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// A resource that represents a Kafka broker.
/// </summary>
/// <param name="name">The name of the resource.</param>
public class KafkaServerResource(string name) : Resource(name), IResourceWithConnectionString, IResourceWithEnvironment
{
/// <summary>
/// Gets the connection string for Kafka broker.
/// </summary>
/// <returns>A connection string for the Kafka in the form "host:port" to be passed as <see href="https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ClientConfig.html#Confluent_Kafka_ClientConfig_BootstrapServers">BootstrapServers</see>.</returns>
public string? GetConnectionString()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.SingleOrDefault()?.EndPointString;
}

internal int GetPort()
{
if (!this.TryGetAllocatedEndPoints(out var allocatedEndpoints))
{
throw new DistributedApplicationException($"Kafka resource \"{Name}\" does not have endpoint annotation.");
}

return allocatedEndpoints.Single().Port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(NetCurrent)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>$(ComponentCommonPackageTags) kafka</PackageTags>
<Description>Confluent.Kafka based Kafka generic consumer and producer that integrates with Aspire, including healthchecks and metrics.</Description>
<NoWarn>$(NoWarn);SYSLIB1100</NoWarn>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Common\HealthChecksExtensions.cs" Link="HealthChecksExtensions.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
<PackageReference Include="Confluent.Kafka" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
</ItemGroup>

</Project>
Loading

0 comments on commit c540de4

Please sign in to comment.