diff --git a/build/Common.props b/build/Common.props
index b0dc596895..cef5a72e28 100644
--- a/build/Common.props
+++ b/build/Common.props
@@ -38,6 +38,7 @@
[1.7.0,2.0)
[1.7.0-rc.1]
[2.1.58,3.0)
+ [2.3.0,3.0)
[3.16.0,4.0)
[1.2.0-beta.507,2.0)
[4.3.4,)
diff --git a/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj
new file mode 100644
index 0000000000..19f908314f
--- /dev/null
+++ b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj
@@ -0,0 +1,33 @@
+
+
+
+ $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/opentelemetry-dotnet-contrib.sln b/opentelemetry-dotnet-contrib.sln
index 1c749e3975..3e7af5f715 100644
--- a/opentelemetry-dotnet-contrib.sln
+++ b/opentelemetry-dotnet-contrib.sln
@@ -329,6 +329,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.ResourceDetec
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests", "test\OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests\OpenTelemetry.ResourceDetectors.ProcessRuntime.Tests.csproj", "{B6157646-8EBA-464C-99B9-C386D474CB12}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -655,6 +659,14 @@ Global
{B6157646-8EBA-464C-99B9-C386D474CB12}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6157646-8EBA-464C-99B9-C386D474CB12}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6157646-8EBA-464C-99B9-C386D474CB12}.Release|Any CPU.Build.0 = Release|Any CPU
+ {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -753,6 +765,8 @@ Global
{048509D6-FB49-4B84-832A-90E55520B97B} = {824BD1DE-3FA8-4FE0-823A-FD365EAC78AF}
{95372E82-CA5B-4C61-BD6C-74E6AB1970D4} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63}
{B6157646-8EBA-464C-99B9-C386D474CB12} = {2097345F-4DD3-477D-BC54-A922F9B2B402}
+ {96341E23-990E-4144-A7E3-9EF0DAFF3232} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63}
+ {BE40900A-2859-471D-8802-21DFD73DDAA7} = {2097345F-4DD3-477D-BC54-A922F9B2B402}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B0816796-CDB3-47D7-8C3C-946434DE3B66}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt
new file mode 100644
index 0000000000..8b13789179
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Shipped.txt
@@ -0,0 +1 @@
+
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt
new file mode 100644
index 0000000000..75fa03d3aa
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/net8.0/PublicAPI.Unshipped.txt
@@ -0,0 +1,17 @@
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder
+OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable> config) -> void
+OpenTelemetry.Trace.TracerProviderBuilderExtensions
+override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt
new file mode 100644
index 0000000000..8b13789179
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Shipped.txt
@@ -0,0 +1 @@
+
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt
new file mode 100644
index 0000000000..75fa03d3aa
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt
@@ -0,0 +1,17 @@
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.AddProducer(string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentation.Dispose() -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions
+OpenTelemetry.Instrumentation.ConfluentKafka.ConfluentKafkaInstrumentationOptions.ConfluentKafkaInstrumentationOptions() -> void
+OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder
+OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable> config) -> void
+OpenTelemetry.Trace.TracerProviderBuilderExtensions
+override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, string name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder producerBuilder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
+static OpenTelemetry.Trace.TracerProviderBuilderExtensions.ConfigureKafkaInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder builder, System.Action configure) -> OpenTelemetry.Trace.TracerProviderBuilder
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs
new file mode 100644
index 0000000000..fc47c948a3
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs
@@ -0,0 +1,34 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ConfluentKafka.Tests" + AssemblyInfo.PublicKey)]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2" + AssemblyInfo.MoqPublicKey)]
+
+#if SIGNED
+internal static class AssemblyInfo
+{
+ public const string PublicKey = ", PublicKey=002400000480000094000000060200000024000052534131000400000100010051C1562A090FB0C9F391012A32198B5E5D9A60E9B80FA2D7B434C9E5CCB7259BD606E66F9660676AFC6692B8CDC6793D190904551D2103B7B22FA636DCBB8208839785BA402EA08FC00C8F1500CCEF28BBF599AA64FFB1E1D5DC1BF3420A3777BADFE697856E9D52070A50C3EA5821C80BEF17CA3ACFFA28F89DD413F096F898";
+ public const string MoqPublicKey = ", PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7";
+}
+#else
+internal static class AssemblyInfo
+{
+ public const string PublicKey = "";
+ public const string MoqPublicKey = "";
+}
+#endif
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs
new file mode 100644
index 0000000000..4e90d18a54
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentation.cs
@@ -0,0 +1,113 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using OpenTelemetry.Internal;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+///
+/// Confluent.Kafka instrumentation.
+///
+public sealed class ConfluentKafkaInstrumentation : IDisposable
+{
+ private readonly IOptionsMonitor options;
+ private readonly MetricsService metricsService;
+ private readonly MetricsChannel metricsChannel;
+ private readonly CancellationTokenSource cts = new();
+
+ internal ConfluentKafkaInstrumentation(
+ IOptionsMonitor options,
+ MetricsService metricsService,
+ MetricsChannel metricsChannel)
+ {
+ this.options = options;
+ this.metricsService = metricsService;
+ this.metricsChannel = metricsChannel;
+
+ Task.Factory.StartNew(() => this.metricsService.ExecuteAsync(this.cts.Token), this.cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+ }
+
+ internal List InstrumentedProducers { get; } = new();
+
+ ///
+ /// Adds an to the instrumentation.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// .
+ public void AddProducer(InstrumentedProducerBuilder producerBuilder)
+ => this.AddProducer(Options.DefaultName, producerBuilder);
+
+ ///
+ /// Adds an to the instrumentation.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// Name to use when retrieving options.
+ /// .
+ public void AddProducer(string name, InstrumentedProducerBuilder producerBuilder)
+ {
+ Guard.ThrowIfNull(name);
+ Guard.ThrowIfNull(producerBuilder);
+
+ var options = this.options.Get(name);
+
+ producerBuilder.SetStatisticsHandler(this.OnStatistics);
+
+ lock (this.InstrumentedProducers)
+ {
+ var instrumentation = new ConfluentKafkaProducerInstrumentation(producerBuilder, name, options);
+
+ this.InstrumentedProducers.Add(instrumentation);
+
+ lock (this.InstrumentedProducers)
+ {
+ if (this.InstrumentedProducers.Remove(instrumentation))
+ {
+ instrumentation.Dispose();
+ }
+ }
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ lock (this.InstrumentedProducers)
+ {
+ foreach (var instrumentation in this.InstrumentedProducers)
+ {
+ instrumentation.Dispose();
+ }
+
+ this.InstrumentedProducers.Clear();
+ }
+
+ this.cts.Dispose();
+ }
+
+ private void OnStatistics(IProducer producer, string json)
+ {
+ if (string.IsNullOrEmpty(json))
+ {
+ return;
+ }
+
+ this.metricsChannel.Writer.TryWrite(json);
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentationOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentationOptions.cs
new file mode 100644
index 0000000000..2ee24cf03f
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaInstrumentationOptions.cs
@@ -0,0 +1,24 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+///
+/// Options for .
+///
+public class ConfluentKafkaInstrumentationOptions
+{
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs
new file mode 100644
index 0000000000..37fe340760
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaMetrics.cs
@@ -0,0 +1,130 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System.Collections.Concurrent;
+using System.Diagnostics.Metrics;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+internal static class ConfluentKafkaMetrics
+{
+ public const string MeterName = "OpenTelemetry.Instrumentation.ConfluentKafka";
+
+ private static readonly Dictionary Descriptions = new()
+ {
+ { Gauges.ReplyQueue, "Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()" },
+ { Gauges.MessageCount, "Current number of messages in producer queues" },
+ { Gauges.MessageSize, "Current total size of messages in producer queues" },
+ { Counters.Tx, "Total number of requests sent to Kafka brokers" },
+ { Counters.TxBytes, "Total number of bytes transmitted to Kafka brokers" },
+ { Counters.Rx, "Total number of responses received from Kafka brokers" },
+ { Counters.RxBytes, "Total number of bytes received from Kafka brokers" },
+ { Counters.TxMessages, "Total number of messages transmitted (produced) to Kafka brokers" },
+ { Counters.TxMessageBytes, "Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers" },
+ { Counters.RxMessages, "Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers" },
+ { Counters.RxMessageBytes, "Total number of message bytes (including framing) received from Kafka brokers" },
+ };
+
+ private static readonly Meter Meter = new(MeterName);
+
+ static ConfluentKafkaMetrics()
+ {
+ Meter.CreateObservableGauge(Gauges.ReplyQueue, GetReplyQMeasurements, Descriptions[Gauges.ReplyQueue]);
+ Meter.CreateObservableGauge(Gauges.MessageCount, GetMessageCountMeasurements, Descriptions[Gauges.MessageCount]);
+ Meter.CreateObservableGauge(Gauges.MessageSize, GetMessageSizeMeasurements, Descriptions[Gauges.MessageSize]);
+
+ Tx = Meter.CreateCounter(Counters.Tx, Descriptions[Counters.Tx]);
+ TxBytes = Meter.CreateCounter(Counters.TxBytes, Descriptions[Counters.TxBytes]);
+ TxMessages = Meter.CreateCounter(Counters.TxMessages, Descriptions[Counters.TxMessages]);
+ TxMessageBytes = Meter.CreateCounter(Counters.TxMessageBytes, Descriptions[Counters.TxMessageBytes]);
+ Rx = Meter.CreateCounter(Counters.Rx, Descriptions[Counters.Rx]);
+ RxBytes = Meter.CreateCounter(Counters.RxBytes, Descriptions[Counters.RxBytes]);
+ RxMessages = Meter.CreateCounter(Counters.RxMessages, Descriptions[Counters.RxMessages]);
+ RxMessageBytes = Meter.CreateCounter(Counters.RxMessageBytes, Descriptions[Counters.RxMessageBytes]);
+ }
+
+ public static Counter Tx { get; }
+
+ public static Counter TxBytes { get; }
+
+ public static Counter TxMessages { get; }
+
+ public static Counter TxMessageBytes { get; }
+
+ public static Counter Rx { get; }
+
+ public static Counter RxBytes { get; }
+
+ public static Counter RxMessages { get; }
+
+ public static Counter RxMessageBytes { get; }
+
+ public static ConcurrentQueue> ReplyQueueMeasurements { get; } = new ConcurrentQueue>();
+
+ public static ConcurrentQueue> MessageCountMeasurements { get; } = new ConcurrentQueue>();
+
+ public static ConcurrentQueue> MessageSizeMeasurements { get; } = new ConcurrentQueue>();
+
+ private static IEnumerable> GetReplyQMeasurements()
+ {
+ while (ReplyQueueMeasurements.TryDequeue(out var measurement))
+ {
+ yield return measurement;
+ }
+ }
+
+ private static IEnumerable> GetMessageCountMeasurements()
+ {
+ while (MessageCountMeasurements.TryDequeue(out var measurement))
+ {
+ yield return measurement;
+ }
+ }
+
+ private static IEnumerable> GetMessageSizeMeasurements()
+ {
+ while (MessageSizeMeasurements.TryDequeue(out var measurement))
+ {
+ yield return measurement;
+ }
+ }
+
+ public static class Tags
+ {
+ public const string ClientId = "messaging.client_id";
+ public const string Type = "type";
+ public const string Name = "name";
+ }
+
+ private static class Gauges
+ {
+ public const string ReplyQueue = "messaging.kafka.consumer.queue.message_count";
+ public const string MessageCount = "messaging.kafka.producer.queue.message_count";
+ public const string MessageSize = "messaging.kafka.producer.queue.size";
+ }
+
+ private static class Counters
+ {
+ public const string Tx = "messaging.kafka.network.tx";
+ public const string TxBytes = "messaging.kafka.network.transmitted";
+ public const string Rx = "messaging.kafka.network.rx";
+ public const string RxBytes = "messaging.kafka.network.received";
+ public const string TxMessages = "messaging.publish.messages";
+ public const string TxMessageBytes = "messaging.kafka.message.transmitted";
+ public const string RxMessages = "messaging.receive.messages";
+ public const string RxMessageBytes = "messaging.kafka.message.received";
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs
new file mode 100644
index 0000000000..533cabfc20
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs
@@ -0,0 +1,59 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using Confluent.Kafka;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+internal class ConfluentKafkaProducerInstrumentation : IDisposable
+{
+ internal static readonly string ActivitySourceName = typeof(ConfluentKafkaInstrumentation).Assembly.GetName().Name!;
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ // TODO release managed resources here
+ }
+ }
+}
+
+#pragma warning disable SA1402 // File may only contain a single type
+internal sealed class ConfluentKafkaProducerInstrumentation : ConfluentKafkaProducerInstrumentation
+#pragma warning restore SA1402 // File may only contain a single type
+{
+ private readonly ProducerBuilder producerBuilder;
+ private readonly string name;
+ private readonly ConfluentKafkaInstrumentationOptions options;
+
+ public ConfluentKafkaProducerInstrumentation(ProducerBuilder producerBuilder, string name, ConfluentKafkaInstrumentationOptions options)
+ {
+ this.producerBuilder = producerBuilder;
+ this.name = name;
+ this.options = options;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs
new file mode 100644
index 0000000000..f6eb29c092
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs
@@ -0,0 +1,124 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using Confluent.Kafka;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+internal sealed class InstrumentedProducer : IProducer
+{
+ private readonly IProducer producerImplementation;
+
+ public InstrumentedProducer(IProducer producerImplementation)
+ {
+ this.producerImplementation = producerImplementation;
+ }
+
+ public Handle Handle => this.producerImplementation.Handle;
+
+ public string Name => this.producerImplementation.Name;
+
+ public int AddBrokers(string brokers)
+ {
+ return this.producerImplementation.AddBrokers(brokers);
+ }
+
+ public void SetSaslCredentials(string username, string password)
+ {
+ this.producerImplementation.SetSaslCredentials(username, password);
+ }
+
+ public Task> ProduceAsync(
+ string topic,
+ Message message,
+ CancellationToken cancellationToken = default)
+ {
+ return this.producerImplementation.ProduceAsync(topic, message, cancellationToken);
+ }
+
+ public Task> ProduceAsync(
+ TopicPartition topicPartition,
+ Message message,
+ CancellationToken cancellationToken = default)
+ {
+ return this.producerImplementation.ProduceAsync(topicPartition, message, cancellationToken);
+ }
+
+ public void Produce(string topic, Message message, Action> deliveryHandler = null)
+ {
+ this.producerImplementation.Produce(topic, message, deliveryHandler);
+ }
+
+ public void Produce(TopicPartition topicPartition, Message message, Action> deliveryHandler = null)
+ {
+ this.producerImplementation.Produce(topicPartition, message, deliveryHandler);
+ }
+
+ public int Poll(TimeSpan timeout)
+ {
+ return this.producerImplementation.Poll(timeout);
+ }
+
+ public int Flush(TimeSpan timeout)
+ {
+ return this.producerImplementation.Flush(timeout);
+ }
+
+ public void Flush(CancellationToken cancellationToken = default)
+ {
+ this.producerImplementation.Flush(cancellationToken);
+ }
+
+ public void InitTransactions(TimeSpan timeout)
+ {
+ this.producerImplementation.InitTransactions(timeout);
+ }
+
+ public void BeginTransaction()
+ {
+ this.producerImplementation.BeginTransaction();
+ }
+
+ public void CommitTransaction(TimeSpan timeout)
+ {
+ this.producerImplementation.CommitTransaction(timeout);
+ }
+
+ public void CommitTransaction()
+ {
+ this.producerImplementation.CommitTransaction();
+ }
+
+ public void AbortTransaction(TimeSpan timeout)
+ {
+ this.producerImplementation.AbortTransaction(timeout);
+ }
+
+ public void AbortTransaction()
+ {
+ this.producerImplementation.AbortTransaction();
+ }
+
+ public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout)
+ {
+ this.producerImplementation.SendOffsetsToTransaction(offsets, groupMetadata, timeout);
+ }
+
+ public void Dispose()
+ {
+ this.producerImplementation.Dispose();
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs
new file mode 100644
index 0000000000..007be2414f
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs
@@ -0,0 +1,45 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using Confluent.Kafka;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+///
+/// A builder of with support for instrumentation.
+///
+/// Type of the key.
+/// Type of value.
+public sealed class InstrumentedProducerBuilder : ProducerBuilder
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified.
+ public InstrumentedProducerBuilder(IEnumerable> config)
+ : base(config)
+ {
+ }
+
+ ///
+ /// Build a new IProducer instance.
+ ///
+ /// an .
+ public override IProducer Build()
+ {
+ return new InstrumentedProducer(base.Build());
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsChannel.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsChannel.cs
new file mode 100644
index 0000000000..dba8da25c9
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsChannel.cs
@@ -0,0 +1,32 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System.Threading.Channels;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+internal sealed class MetricsChannel
+{
+ private readonly Channel channel = Channel.CreateBounded(new BoundedChannelOptions(10_000)
+ {
+ SingleReader = true,
+ SingleWriter = false,
+ });
+
+ public ChannelReader Reader => this.channel.Reader;
+
+ public ChannelWriter Writer => this.channel.Writer;
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsService.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsService.cs
new file mode 100644
index 0000000000..119378d7b4
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MetricsService.cs
@@ -0,0 +1,93 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using System.Text.Json;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+internal sealed class MetricsService
+{
+ private readonly MetricsChannel channel;
+ private readonly Dictionary state = new();
+
+ public MetricsService(MetricsChannel channel)
+ {
+ this.channel = channel;
+ }
+
+ public async Task ExecuteAsync(CancellationToken token)
+ {
+ while (await this.channel.Reader.WaitToReadAsync(token).ConfigureAwait(false))
+ {
+ while (this.channel.Reader.TryRead(out var json))
+ {
+ Statistics statistics;
+ try
+ {
+ statistics = JsonSerializer.Deserialize(json, StatisticsJsonSerializerContext.Default.Statistics);
+ }
+ catch
+ {
+ return;
+ }
+
+ if (statistics == null || statistics.Name == null)
+ {
+ return;
+ }
+
+ TagList tags = new()
+ {
+ { ConfluentKafkaMetrics.Tags.ClientId, statistics.ClientId },
+ { ConfluentKafkaMetrics.Tags.Name, statistics.Name },
+ };
+
+ ConfluentKafkaMetrics.ReplyQueueMeasurements.Enqueue(new Measurement(statistics.ReplyQueue, tags));
+ ConfluentKafkaMetrics.MessageCountMeasurements.Enqueue(new Measurement(statistics.MessageCount, tags));
+ ConfluentKafkaMetrics.MessageSizeMeasurements.Enqueue(new Measurement(statistics.MessageSize, tags));
+
+ tags.Add(new KeyValuePair(ConfluentKafkaMetrics.Tags.Type, statistics.Type));
+
+ if (this.state.TryGetValue(statistics.Name, out var previous))
+ {
+ ConfluentKafkaMetrics.Tx.Add(statistics.Tx - previous.Tx, tags);
+ ConfluentKafkaMetrics.TxBytes.Add(statistics.TxBytes - previous.TxBytes, tags);
+ ConfluentKafkaMetrics.TxMessages.Add(statistics.TxMessages - previous.TxMessages, tags);
+ ConfluentKafkaMetrics.TxMessageBytes.Add(statistics.TxMessageBytes - previous.TxMessageBytes, tags);
+ ConfluentKafkaMetrics.Rx.Add(statistics.Rx - previous.Rx, tags);
+ ConfluentKafkaMetrics.RxBytes.Add(statistics.RxBytes - previous.RxBytes, tags);
+ ConfluentKafkaMetrics.RxMessages.Add(statistics.RxMessages - previous.RxMessages, tags);
+ ConfluentKafkaMetrics.RxMessageBytes.Add(statistics.RxMessageBytes - previous.RxMessageBytes, tags);
+ }
+ else
+ {
+ ConfluentKafkaMetrics.Tx.Add(statistics.Tx, tags);
+ ConfluentKafkaMetrics.TxBytes.Add(statistics.TxBytes, tags);
+ ConfluentKafkaMetrics.TxMessages.Add(statistics.TxMessages, tags);
+ ConfluentKafkaMetrics.TxMessageBytes.Add(statistics.TxMessageBytes, tags);
+ ConfluentKafkaMetrics.Rx.Add(statistics.Rx, tags);
+ ConfluentKafkaMetrics.RxBytes.Add(statistics.RxBytes, tags);
+ ConfluentKafkaMetrics.RxMessages.Add(statistics.RxMessages, tags);
+ ConfluentKafkaMetrics.RxMessageBytes.Add(statistics.RxMessageBytes, tags);
+ }
+
+ this.state[statistics.Name] = statistics;
+ }
+ }
+ }
+}
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj
new file mode 100644
index 0000000000..b676fa1b9b
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj
@@ -0,0 +1,31 @@
+
+
+
+ net8.0;netstandard2.0
+ Confluent.Kafka instrumentation for OpenTelemetry .NET
+ $(PackageTags);distributed-tracing;Kafka;Confluent.Kafka
+ true
+ Instrumentation.ConfluentKafka-
+ true
+ disable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs
new file mode 100644
index 0000000000..1695e4a216
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/Statistics.cs
@@ -0,0 +1,94 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System.Text.Json.Serialization;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka;
+
+///
+/// Maps to the JSON output returned by the librdkafka statistics API.
+///
+internal sealed class Statistics
+{
+ [JsonPropertyName("name")]
+ public string Name { get; set; }
+
+ [JsonPropertyName("client_id")]
+ public string ClientId { get; set; }
+
+ [JsonPropertyName("type")]
+ public string Type { get; set; }
+
+ [JsonPropertyName("ts")]
+ public long Timestamp { get; set; }
+
+ [JsonPropertyName("time")]
+ public long Time { get; set; }
+
+ [JsonPropertyName("age")]
+ public long Age { get; set; }
+
+ [JsonPropertyName("replyq")]
+ public long ReplyQueue { get; set; }
+
+ [JsonPropertyName("msg_cnt")]
+ public long MessageCount { get; set; }
+
+ [JsonPropertyName("msg_size")]
+ public long MessageSize { get; set; }
+
+ [JsonPropertyName("msg_max")]
+ public long MessageMax { get; set; }
+
+ [JsonPropertyName("msg_size_max")]
+ public long MessageSizeMax { get; set; }
+
+ [JsonPropertyName("tx")]
+ public long Tx { get; set; }
+
+ [JsonPropertyName("tx_bytes")]
+ public long TxBytes { get; set; }
+
+ [JsonPropertyName("rx")]
+ public long Rx { get; set; }
+
+ [JsonPropertyName("rx_bytes")]
+ public long RxBytes { get; set; }
+
+ [JsonPropertyName("txmsgs")]
+ public long TxMessages { get; set; }
+
+ [JsonPropertyName("txmsg_bytes")]
+ public long TxMessageBytes { get; set; }
+
+ [JsonPropertyName("rxmsgs")]
+ public long RxMessages { get; set; }
+
+ [JsonPropertyName("rxmsg_bytes")]
+ public long RxMessageBytes { get; set; }
+
+ [JsonPropertyName("simple_cnt")]
+ public long SimpleCount { get; set; }
+
+ [JsonPropertyName("metadata_cache_cnt")]
+ public long MetadataCacheCount { get; set; }
+}
+
+[JsonSerializable(typeof(Statistics))]
+[JsonSourceGenerationOptions]
+#pragma warning disable SA1402,SA1601
+internal sealed partial class StatisticsJsonSerializerContext : JsonSerializerContext;
+#pragma warning restore SA1402,SA1601
diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs
new file mode 100644
index 0000000000..a0f274051e
--- /dev/null
+++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.cs
@@ -0,0 +1,198 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Options;
+using OpenTelemetry.Instrumentation.ConfluentKafka;
+using OpenTelemetry.Internal;
+
+namespace OpenTelemetry.Trace;
+
+///
+/// Extension methods to simplify registering of dependency instrumentation.
+///
+public static class TracerProviderBuilderExtensions
+{
+ ///
+ /// Enables automatic data collection of outgoing requests to Kafka.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// being configured.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddKafkaProducerInstrumentation(
+ this TracerProviderBuilder builder)
+ => AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null, configure: null);
+
+ ///
+ /// Enables automatic data collection of outgoing requests to Redis.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// being configured.
+ /// to instrument.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddKafkaProducerInstrumentation(
+ this TracerProviderBuilder builder,
+ InstrumentedProducerBuilder producerBuilder)
+ {
+ Guard.ThrowIfNull(producerBuilder);
+
+ return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder, configure: null);
+ }
+
+ ///
+ /// Enables automatic data collection of outgoing requests to Redis.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// being configured.
+ /// Callback to configure options.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddKafkaProducerInstrumentation(
+ this TracerProviderBuilder builder,
+ Action configure)
+ {
+ Guard.ThrowIfNull(configure);
+
+ return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null, configure);
+ }
+
+ ///
+ /// Enables automatic data collection of outgoing requests to Redis.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// being configured.
+ /// to instrument.
+ /// Callback to configure options.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddKafkaProducerInstrumentation(
+ this TracerProviderBuilder builder,
+ InstrumentedProducerBuilder producerBuilder,
+ Action configure)
+ {
+ Guard.ThrowIfNull(producerBuilder);
+ Guard.ThrowIfNull(configure);
+
+ return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder, configure);
+ }
+
+ ///
+ /// Enables automatic data collection of outgoing requests to Redis.
+ ///
+ /// The type of the key.
+ /// The type of the value.
+ /// being configured.
+ /// Optional name which is used when retrieving options.
+ /// Optional to instrument.
+ /// Optional callback to configure options.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder AddKafkaProducerInstrumentation(
+ this TracerProviderBuilder builder,
+ string name,
+ InstrumentedProducerBuilder producerBuilder,
+ Action configure)
+ {
+ Guard.ThrowIfNull(builder);
+
+ name ??= Options.DefaultName;
+
+ builder.AddKafkaInstrumentationSharedServices();
+
+ if (configure != null)
+ {
+ builder.ConfigureServices(services =>
+ {
+ services.Configure(name, configure);
+ });
+ }
+
+ return builder
+ .AddSource(ConfluentKafkaProducerInstrumentation.ActivitySourceName)
+ .AddInstrumentation(sp =>
+ {
+ var instrumentation = sp.GetRequiredService();
+
+ producerBuilder ??= sp.GetService>();
+
+ if (producerBuilder != null)
+ {
+ instrumentation.AddProducer(name, producerBuilder);
+ }
+
+ return instrumentation;
+ });
+ }
+
+ ///
+ /// Registers a callback for configuring Redis instrumentation.
+ ///
+ /// being configured.
+ /// Callback to configure instrumentation.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder ConfigureKafkaInstrumentation(
+ this TracerProviderBuilder builder,
+ Action configure)
+ {
+ Guard.ThrowIfNull(configure);
+
+ return ConfigureKafkaInstrumentation(builder, (sp, instrumentation) => configure(instrumentation));
+ }
+
+ ///
+ /// Registers a callback for configuring Kafka instrumentation.
+ ///
+ /// being configured.
+ /// Callback to configure instrumentation.
+ /// The instance of to chain the calls.
+ public static TracerProviderBuilder ConfigureKafkaInstrumentation(
+ this TracerProviderBuilder builder,
+ Action configure)
+ {
+ Guard.ThrowIfNull(configure);
+
+ if (builder is not IDeferredTracerProviderBuilder deferredTracerProviderBuilder)
+ {
+ throw new NotSupportedException("ConfigureRedisInstrumentation is not supported on the supplied builder type.");
+ }
+
+ builder.AddKafkaInstrumentationSharedServices();
+
+ deferredTracerProviderBuilder.Configure(
+ (sp, builder) => configure(sp, sp.GetRequiredService()));
+
+ return builder;
+ }
+
+ private static TracerProviderBuilder AddKafkaInstrumentationSharedServices(
+ this TracerProviderBuilder builder)
+ {
+ Guard.ThrowIfNull(builder);
+
+ return builder.ConfigureServices(services =>
+ {
+ services.TryAddSingleton(
+ sp => new ConfluentKafkaInstrumentation(
+ sp.GetRequiredService>(),
+ sp.GetRequiredService(),
+ sp.GetRequiredService()));
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ });
+ }
+}
diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs
new file mode 100644
index 0000000000..8313b7566c
--- /dev/null
+++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ConfluentKafkaCallsInstrumentationTests.cs
@@ -0,0 +1,86 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using Moq;
+using OpenTelemetry.Tests;
+using OpenTelemetry.Trace;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests;
+
+[Collection("Kafka")]
+public class ConfluentKafkaCallsInstrumentationTests
+{
+ private const string KafkaEndPointEnvVarName = "OTEL_KAFKAENDPOINT";
+
+ private static readonly string? KafkaEndPoint = SkipUnlessEnvVarFoundTheoryAttribute.GetEnvironmentVariable(KafkaEndPointEnvVarName);
+
+ private readonly ITestOutputHelper outputHelper;
+ /*
+ To run the integration tests, set the OTEL_KAFKAENDPOINT machine-level environment variable to a valid Kafka endpoint.
+
+ To use Docker...
+ 1) Run: docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local
+ 2) Set OTEL_KAFKAENDPOINT as: localhost:9092
+ */
+
+ public ConfluentKafkaCallsInstrumentationTests(ITestOutputHelper outputHelper)
+ {
+ this.outputHelper = outputHelper;
+ }
+
+ [Trait("CategoryName", "KafkaIntegrationTests")]
+ [SkipUnlessEnvVarFoundTheory(KafkaEndPointEnvVarName)]
+ [InlineData("any_value")]
+ public async Task BasicProduceMessageTest(string value)
+ {
+ ProducerConfig producerConfig = new ProducerConfig
+ {
+ BootstrapServers = KafkaEndPoint,
+ };
+ InstrumentedProducerBuilder producerBuilder = new(producerConfig);
+
+ // ensure user can use ProducerBuilder handlers too.
+ producerBuilder.SetLogHandler(LogHandler);
+
+ var activityProcessor = new Mock>();
+ var sampler = new TestSampler();
+ using (Sdk.CreateTracerProviderBuilder()
+ .AddProcessor(activityProcessor.Object)
+ .SetSampler(sampler)
+ .AddKafkaProducerInstrumentation(producerBuilder, o =>
+ {
+ })
+ .Build())
+ {
+ IProducer producer = producerBuilder.Build();
+ await producer.ProduceAsync($"otel-topic-{Guid.NewGuid().ToString()}", new Message
+ {
+ Value = value,
+ });
+ }
+
+ void LogHandler(IProducer p, LogMessage logMessage)
+ {
+ this.outputHelper.WriteLine(logMessage.Message);
+ }
+ }
+}
diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile
new file mode 100644
index 0000000000..7632458cd8
--- /dev/null
+++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile
@@ -0,0 +1,19 @@
+# Create a container for running the OpenTelemetry Redis integration tests.
+# This should be run from the root of the repo:
+# docker build --file test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile .
+
+ARG BUILD_SDK_VERSION=8.0
+ARG TEST_SDK_VERSION=8.0
+
+FROM mcr.microsoft.com/dotnet/sdk:${BUILD_SDK_VERSION} AS build
+ARG PUBLISH_CONFIGURATION=Release
+ARG PUBLISH_FRAMEWORK=net8.0
+WORKDIR /repo
+COPY . ./
+WORKDIR "/repo/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests"
+RUN dotnet publish "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" -c "${PUBLISH_CONFIGURATION}" -f "${PUBLISH_FRAMEWORK}" -o /drop -p:IntegrationBuild=true -p:TARGET_FRAMEWORK=${PUBLISH_FRAMEWORK}
+
+FROM mcr.microsoft.com/dotnet/sdk:${TEST_SDK_VERSION} AS final
+WORKDIR /test
+COPY --from=build /drop .
+ENTRYPOINT ["dotnet", "vstest", "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.dll"]
diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj
new file mode 100644
index 0000000000..af8cce4db0
--- /dev/null
+++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj
@@ -0,0 +1,25 @@
+
+
+ Unit test project for OpenTelemetry ConfluentKafka instrumentation
+
+ $(SupportedNetTargets)
+ $(TargetFrameworks);net462
+ $(TARGET_FRAMEWORK)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml
new file mode 100644
index 0000000000..7f094775fd
--- /dev/null
+++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml
@@ -0,0 +1,24 @@
+# Start a kafka container and then run OpenTelemetry redis integration tests.
+# This should be run from the root of the repo:
+# opentelemetry>docker-compose --file=test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml --project-directory=. up --exit-code-from=tests --build
+version: '3.7'
+
+services:
+ kafka:
+ image: confluentinc/confluent-local
+ environment:
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
+ KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://kafka:9092
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_INTERNAL://kafka:9092,PLAINTEXT_HOST://localhost:9092
+ ports:
+ - "9092:9092"
+
+ tests:
+ build:
+ context: .
+ dockerfile: ./test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile
+ command: --TestCaseFilter:CategoryName=KafkaIntegrationTests
+ environment:
+ - OTEL_KAFKAENDPOINT=kafka:9092
+ depends_on:
+ - kafka