From 4c78c18d1236a6e300156a0d1295666721ba2c0c Mon Sep 17 00:00:00 2001 From: Guillaume Delahaye Date: Thu, 11 Jan 2024 21:28:48 +0100 Subject: [PATCH] dump kafka sample source from aspire repo --- samples/KafkaBasic/Consumer/Consumer.csproj | 16 +++ samples/KafkaBasic/Consumer/ConsumerWorker.cs | 37 ++++++ samples/KafkaBasic/Consumer/Program.cs | 15 +++ .../Consumer/appsettings.Development.json | 8 ++ samples/KafkaBasic/Consumer/appsettings.json | 21 ++++ .../KafkaBasic.AppHost/Directory.Build.props | 8 ++ .../Directory.Build.targets | 8 ++ .../KafkaBasic.AppHost.csproj | 17 +++ .../KafkaBasic/KafkaBasic.AppHost/Program.cs | 14 +++ .../Properties/launchSettings.json | 16 +++ .../appsettings.Development.json | 8 ++ .../KafkaBasic.AppHost/appsettings.json | 9 ++ .../KafkaBasic.ServiceDefaults/Extensions.cs | 119 ++++++++++++++++++ .../KafkaBasic.ServiceDefaults.csproj | 27 ++++ samples/KafkaBasic/KafkaBasic.sln | 42 +++++++ .../Producer/ContinuousProducerWorker.cs | 22 ++++ .../Producer/IntermittentProducerWorker.cs | 28 +++++ samples/KafkaBasic/Producer/Producer.csproj | 16 +++ samples/KafkaBasic/Producer/Program.cs | 17 +++ .../Producer/appsettings.Development.json | 8 ++ samples/KafkaBasic/Producer/appsettings.json | 20 +++ 21 files changed, 476 insertions(+) create mode 100644 samples/KafkaBasic/Consumer/Consumer.csproj create mode 100644 samples/KafkaBasic/Consumer/ConsumerWorker.cs create mode 100644 samples/KafkaBasic/Consumer/Program.cs create mode 100644 samples/KafkaBasic/Consumer/appsettings.Development.json create mode 100644 samples/KafkaBasic/Consumer/appsettings.json create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/Program.cs create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json create mode 100644 samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json create mode 100644 samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs create mode 100644 samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj create mode 100644 samples/KafkaBasic/KafkaBasic.sln create mode 100644 samples/KafkaBasic/Producer/ContinuousProducerWorker.cs create mode 100644 samples/KafkaBasic/Producer/IntermittentProducerWorker.cs create mode 100644 samples/KafkaBasic/Producer/Producer.csproj create mode 100644 samples/KafkaBasic/Producer/Program.cs create mode 100644 samples/KafkaBasic/Producer/appsettings.Development.json create mode 100644 samples/KafkaBasic/Producer/appsettings.json diff --git a/samples/KafkaBasic/Consumer/Consumer.csproj b/samples/KafkaBasic/Consumer/Consumer.csproj new file mode 100644 index 00000000..c09f5723 --- /dev/null +++ b/samples/KafkaBasic/Consumer/Consumer.csproj @@ -0,0 +1,16 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + diff --git a/samples/KafkaBasic/Consumer/ConsumerWorker.cs b/samples/KafkaBasic/Consumer/ConsumerWorker.cs new file mode 100644 index 00000000..f2f587db --- /dev/null +++ b/samples/KafkaBasic/Consumer/ConsumerWorker.cs @@ -0,0 +1,37 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Confluent.Kafka; + +namespace Consumer; + +internal sealed class ConsumerWorker(IConsumer consumer, ILogger logger) : BackgroundService +{ + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + long i = 0; + return Task.Factory.StartNew(async () => + { + consumer.Subscribe("topic"); + while (!stoppingToken.IsCancellationRequested) + { + ConsumeResult? result = default; + try + { + result = consumer.Consume(TimeSpan.FromSeconds(1)); + } + catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart) + { + await Task.Delay(100); + continue; + } + + i++; + if (i % 1000 == 0) + { + logger.LogInformation($"Received {i} messages. current offset is '{result!.Offset}'"); + } + } + }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); + } +} diff --git a/samples/KafkaBasic/Consumer/Program.cs b/samples/KafkaBasic/Consumer/Program.cs new file mode 100644 index 00000000..f82c9181 --- /dev/null +++ b/samples/KafkaBasic/Consumer/Program.cs @@ -0,0 +1,15 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Confluent.Kafka; +using Consumer; + +var builder = Host.CreateApplicationBuilder(args); + +builder.AddServiceDefaults(); + +builder.AddKafkaConsumer("kafka"); + +builder.Services.AddHostedService(); + +builder.Build().Run(); diff --git a/samples/KafkaBasic/Consumer/appsettings.Development.json b/samples/KafkaBasic/Consumer/appsettings.Development.json new file mode 100644 index 00000000..b2dcdb67 --- /dev/null +++ b/samples/KafkaBasic/Consumer/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/samples/KafkaBasic/Consumer/appsettings.json b/samples/KafkaBasic/Consumer/appsettings.json new file mode 100644 index 00000000..42aa3d91 --- /dev/null +++ b/samples/KafkaBasic/Consumer/appsettings.json @@ -0,0 +1,21 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information", + "Azure": "Warning" + } + }, + "Aspire": { + "Confluent": { + "Kafka": { + "Consumer": { + "Config": { + "AutoOffsetReset": "Earliest", + "GroupId": "aspire" + } + } + } + } + } +} diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props new file mode 100644 index 00000000..b9b39c05 --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.props @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets new file mode 100644 index 00000000..281a6cb2 --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/Directory.Build.targets @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj b/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj new file mode 100644 index 00000000..d96354be --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj @@ -0,0 +1,17 @@ + + + + Exe + net8.0 + enable + enable + true + + + + + + + + + diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs b/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs new file mode 100644 index 00000000..5b33d5d4 --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/Program.cs @@ -0,0 +1,14 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +var builder = DistributedApplication.CreateBuilder(args); + +var containerResource = builder.AddKafkaContainer("kafka"); + +builder.AddProject("producer") + .WithReference(containerResource); + +builder.AddProject("consumer") + .WithReference(containerResource); + +builder.Build().Run(); diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json b/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json new file mode 100644 index 00000000..e250d24c --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/Properties/launchSettings.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:15024", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16132" + } + } + } +} diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json new file mode 100644 index 00000000..0c208ae9 --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json new file mode 100644 index 00000000..31c092aa --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.AppHost/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "Aspire.Hosting.Dcp": "Warning" + } + } +} diff --git a/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs new file mode 100644 index 00000000..c59308d5 --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs @@ -0,0 +1,119 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Diagnostics.HealthChecks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +namespace Microsoft.Extensions.Hosting; + +public static class Extensions +{ + public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder) + { + builder.ConfigureOpenTelemetry(); + + builder.AddDefaultHealthChecks(); + + builder.Services.AddServiceDiscovery(); + + builder.Services.ConfigureHttpClientDefaults(http => + { + // Turn on resilience by default + http.AddStandardResilienceHandler(); + + // Turn on service discovery by default + http.UseServiceDiscovery(); + }); + + return builder; + } + + public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder) + { + builder.Logging.AddOpenTelemetry(logging => + { + logging.IncludeFormattedMessage = true; + logging.IncludeScopes = true; + }); + + builder.Services.AddOpenTelemetry() + .WithMetrics(metrics => + { + metrics.AddRuntimeInstrumentation() + .AddBuiltInMeters(); + }) + .WithTracing(tracing => + { + if (builder.Environment.IsDevelopment()) + { + // We want to view all traces in development + tracing.SetSampler(new AlwaysOnSampler()); + } + + tracing.AddAspNetCoreInstrumentation() + .AddGrpcClientInstrumentation() + .AddHttpClientInstrumentation(); + }); + + builder.AddOpenTelemetryExporters(); + + return builder; + } + + private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder) + { + var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]); + + if (useOtlpExporter) + { + builder.Services.Configure(logging => logging.AddOtlpExporter()); + builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter()); + builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter()); + } + + // Uncomment the following lines to enable the Prometheus exporter (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package) + // builder.Services.AddOpenTelemetry() + // .WithMetrics(metrics => metrics.AddPrometheusExporter()); + + // Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.Exporter package) + // builder.Services.AddOpenTelemetry() + // .UseAzureMonitor(); + + return builder; + } + + public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder) + { + builder.Services.AddHealthChecks() + // Add a default liveness check to ensure app is responsive + .AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]); + + return builder; + } + + public static WebApplication MapDefaultEndpoints(this WebApplication app) + { + // Uncomment the following line to enable the Prometheus endpoint (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package) + // app.MapPrometheusScrapingEndpoint(); + + // All health checks must pass for app to be considered ready to accept traffic after starting + app.MapHealthChecks("/health"); + + // Only health checks tagged with the "live" tag must pass for app to be considered alive + app.MapHealthChecks("/alive", new HealthCheckOptions + { + Predicate = r => r.Tags.Contains("live") + }); + + return app; + } + + private static MeterProviderBuilder AddBuiltInMeters(this MeterProviderBuilder meterProviderBuilder) => + meterProviderBuilder.AddMeter( + "Microsoft.AspNetCore.Hosting", + "Microsoft.AspNetCore.Server.Kestrel", + "System.Net.Http"); +} diff --git a/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj new file mode 100644 index 00000000..5a525eeb --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.ServiceDefaults/KafkaBasic.ServiceDefaults.csproj @@ -0,0 +1,27 @@ + + + + Library + net8.0 + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/KafkaBasic/KafkaBasic.sln b/samples/KafkaBasic/KafkaBasic.sln new file mode 100644 index 00000000..6d1a44ad --- /dev/null +++ b/samples/KafkaBasic/KafkaBasic.sln @@ -0,0 +1,42 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34310.174 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.AppHost", "KafkaBasic.AppHost\KafkaBasic.AppHost.csproj", "{C0E6A5CB-D61D-4091-9F5E-81562E480C40}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.ServiceDefaults", "KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj", "{DE933720-1947-4920-A2E8-CB943D381634}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "Producer\Producer.csproj", "{45316E78-FF0A-4984-B303-F292BB3340C7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "Consumer\Consumer.csproj", "{6612601B-5912-4858-B23F-A2CC061C2918}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C0E6A5CB-D61D-4091-9F5E-81562E480C40}.Release|Any CPU.Build.0 = Release|Any CPU + {DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DE933720-1947-4920-A2E8-CB943D381634}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DE933720-1947-4920-A2E8-CB943D381634}.Release|Any CPU.Build.0 = Release|Any CPU + {45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {45316E78-FF0A-4984-B303-F292BB3340C7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {45316E78-FF0A-4984-B303-F292BB3340C7}.Release|Any CPU.Build.0 = Release|Any CPU + {6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6612601B-5912-4858-B23F-A2CC061C2918}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6612601B-5912-4858-B23F-A2CC061C2918}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {3683F1C2-032E-43A3-93C0-3F79606377E4} + EndGlobalSection +EndGlobal diff --git a/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs b/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs new file mode 100644 index 00000000..90633e4c --- /dev/null +++ b/samples/KafkaBasic/Producer/ContinuousProducerWorker.cs @@ -0,0 +1,22 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Confluent.Kafka; + +namespace Producer; + +internal sealed class ContinuousProducerWorker(IProducer producer, ILogger logger) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(10)); + long i = 0; + while (await timer.WaitForNextTickAsync(stoppingToken)) + { + var message = new Message { Value = $"Hello, World! {i}" }; + producer.Produce("topic", message); + logger.LogInformation($"{producer.Name} sent message '{message.Value}'"); + i++; + } + } +} diff --git a/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs b/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs new file mode 100644 index 00000000..e3113afe --- /dev/null +++ b/samples/KafkaBasic/Producer/IntermittentProducerWorker.cs @@ -0,0 +1,28 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Confluent.Kafka; + +namespace Producer; + +internal sealed class IntermittentProducerWorker(IProducer producer, ILogger logger) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + long i = 0; + while (!stoppingToken.IsCancellationRequested) + { + for (int j = 0; j < 1000; j++, i++) + { + var message = new Message { Value = $"Hello, World! {i}" }; + producer.Produce("topic", message); + } + + producer.Flush(stoppingToken); + + logger.LogInformation($"{producer.Name} sent 1000 messages, waiting 10 s"); + + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + } + } +} diff --git a/samples/KafkaBasic/Producer/Producer.csproj b/samples/KafkaBasic/Producer/Producer.csproj new file mode 100644 index 00000000..c09f5723 --- /dev/null +++ b/samples/KafkaBasic/Producer/Producer.csproj @@ -0,0 +1,16 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + diff --git a/samples/KafkaBasic/Producer/Program.cs b/samples/KafkaBasic/Producer/Program.cs new file mode 100644 index 00000000..41ffc80b --- /dev/null +++ b/samples/KafkaBasic/Producer/Program.cs @@ -0,0 +1,17 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Confluent.Kafka; +using Producer; + +var builder = Host.CreateApplicationBuilder(args); + +builder.AddServiceDefaults(); + +builder.AddKafkaProducer("kafka"); +builder.AddKafkaProducer("kafka"); + +builder.Services.AddHostedService(); +builder.Services.AddHostedService(); + +builder.Build().Run(); diff --git a/samples/KafkaBasic/Producer/appsettings.Development.json b/samples/KafkaBasic/Producer/appsettings.Development.json new file mode 100644 index 00000000..b2dcdb67 --- /dev/null +++ b/samples/KafkaBasic/Producer/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/samples/KafkaBasic/Producer/appsettings.json b/samples/KafkaBasic/Producer/appsettings.json new file mode 100644 index 00000000..22dbdff2 --- /dev/null +++ b/samples/KafkaBasic/Producer/appsettings.json @@ -0,0 +1,20 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information", + "Azure": "Warning" + } + }, + "Aspire": { + "Confluent": { + "Kafka": { + "Producer": { + "Config": { + "Acks": "All" + } + } + } + } + } +}