Skip to content

Commit ce36ea1

Browse files
authored
[GEN-2347]: add Kafka destination (#2428)
This pull request introduces support for Kafka as a new destination type. It includes changes to various configuration files, utility functions, and documentation to integrate Kafka seamlessly into the existing system. ### Kafka Integration: * [`common/config/kafka.go`](diffhunk://#diff-be939f19588c5a6161f1665810cf01fa15563ebc192c71b6261c14f7d0901c22R1-R261): Added a new Kafka configuration file with constants for various Kafka settings and a `ModifyConfig` method to handle Kafka-specific configurations. * [`common/config/root.go`](diffhunk://#diff-35f076c473aa76e7717e17ac33041f73ec6b16e26fe8a1ab50bf5963cb7fcd8eR45): Registered Kafka as an available configuration option. * [`common/dests.go`](diffhunk://#diff-e0ef4d5cecfc896240ae7392424f227db50784f83d0ddc0c317db59066f8757bR32): Added `KafkaDestinationType` to the list of supported destination types. * [`destinations/data/kafka.yaml`](diffhunk://#diff-5f116eed6ce6109f8914d86069c1248eb0f0a4a2630587dace6f49094588e7eaR1-R243): Created a new YAML configuration file for Kafka, defining its properties and supported signals. ### Utility Functions: * [`common/config/utils.go`](diffhunk://#diff-3a30bd3819234a1d17d8a057d80ffc71550dab2fc17687b78b40e5c6f49a5136R102-R117): Added `parseBool` and `parseInt` utility functions to handle string to boolean and integer conversions, respectively. ### Documentation: * [`docs/backends-overview.mdx`](diffhunk://#diff-26115b197a8b9dd8ee351f05b2cade47da5acd788d74b9f962a325d3e1b919a2R39): Updated the backends overview documentation to include Kafka as a supported self-hosted backend.
1 parent 7e20252 commit ce36ea1

File tree

10 files changed

+723
-2
lines changed

10 files changed

+723
-2
lines changed

.golangci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ linters:
2222
- gocyclo
2323
- gofmt
2424
- goimports
25-
# - revive
25+
# - revive
2626
- gosec
2727
- gosimple
2828
- govet
@@ -87,4 +87,4 @@ issues:
8787
- path: _test\.go
8888
linters:
8989
- unparam
90-
- funlen
90+
- funlen

common/config/kafka.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
package config
2+
3+
import (
4+
"errors"
5+
6+
"github.com/odigos-io/odigos/common"
7+
)
8+
9+
const (
10+
KAFKA_PROTOCOL_VERSION = "KAFKA_PROTOCOL_VERSION"
11+
KAFKA_BROKERS = "KAFKA_BROKERS"
12+
KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY = "KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY"
13+
KAFKA_CLIENT_ID = "KAFKA_CLIENT_ID"
14+
KAFKA_TOPIC = "KAFKA_TOPIC"
15+
KAFKA_TOPIC_FROM_ATTRIBUTE = "KAFKA_TOPIC_FROM_ATTRIBUTE"
16+
KAFKA_ENCODING = "KAFKA_ENCODING"
17+
KAFKA_PARTITION_TRACES_BY_ID = "KAFKA_PARTITION_TRACES_BY_ID"
18+
KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES = "KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES"
19+
KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES = "KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES"
20+
KAFKA_AUTH_METHOD = "KAFKA_AUTH_METHOD"
21+
KAFKA_USERNAME = "KAFKA_USERNAME"
22+
KAFKA_PASSWORD = "KAFKA_PASSWORD"
23+
KAFKA_METADATA_FULL = "KAFKA_METADATA_FULL"
24+
KAFKA_METADATA_MAX_RETRY = "KAFKA_METADATA_MAX_RETRY"
25+
KAFKA_METADATA_BACKOFF_RETRY = "KAFKA_METADATA_BACKOFF_RETRY"
26+
KAFKA_TIMEOUT = "KAFKA_TIMEOUT"
27+
KAFKA_RETRY_ON_FAILURE_ENABLED = "KAFKA_RETRY_ON_FAILURE_ENABLED"
28+
KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL = "KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL"
29+
KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL = "KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL"
30+
KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME = "KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME"
31+
KAFKA_SENDING_QUEUE_ENABLED = "KAFKA_SENDING_QUEUE_ENABLED"
32+
KAFKA_SENDING_QUEUE_NUM_CONSUMERS = "KAFKA_SENDING_QUEUE_NUM_CONSUMERS"
33+
KAFKA_SENDING_QUEUE_SIZE = "KAFKA_SENDING_QUEUE_SIZE"
34+
KAFKA_PRODUCER_MAX_MESSAGE_BYTES = "KAFKA_PRODUCER_MAX_MESSAGE_BYTES"
35+
KAFKA_PRODUCER_REQUIRED_ACKS = "KAFKA_PRODUCER_REQUIRED_ACKS"
36+
KAFKA_PRODUCER_COMPRESSION = "KAFKA_PRODUCER_COMPRESSION"
37+
KAFKA_PRODUCER_FLUSH_MAX_MESSAGES = "KAFKA_PRODUCER_FLUSH_MAX_MESSAGES"
38+
)
39+
40+
type Kafka struct{}
41+
42+
func (m *Kafka) DestType() common.DestinationType {
43+
// DestinationType defined in common/dests.go
44+
return common.KafkaDestinationType
45+
}
46+
47+
//nolint:funlen,gocyclo // This function is inherently complex due to Kafka config validation, refactoring is non-trivial
48+
func (m *Kafka) ModifyConfig(dest ExporterConfigurer, currentConfig *Config) ([]string, error) {
49+
config := dest.GetConfig()
50+
// To make sure that the exporter and pipeline names are unique, we'll need to define a unique ID
51+
uniqueUri := "kafka-" + dest.GetID()
52+
53+
protocolVersion, exists := config[KAFKA_PROTOCOL_VERSION]
54+
if !exists {
55+
return nil, errorMissingKey(KAFKA_PROTOCOL_VERSION)
56+
}
57+
brokers, exists := config[KAFKA_BROKERS]
58+
if !exists {
59+
brokers = "[\"localhost:9092\"]"
60+
}
61+
resolveCanonicalBootstrapServersOnly, exists := config[KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY]
62+
if !exists {
63+
resolveCanonicalBootstrapServersOnly = "false"
64+
}
65+
clientId, exists := config[KAFKA_CLIENT_ID]
66+
if !exists {
67+
clientId = "sarama"
68+
}
69+
topic, exists := config[KAFKA_TOPIC]
70+
if !exists {
71+
topic = "" // defined at bottom of file (otlp_spans, otlp_metrics, otlp_logs)
72+
}
73+
topicFromAttribute, exists := config[KAFKA_TOPIC_FROM_ATTRIBUTE]
74+
if !exists {
75+
topicFromAttribute = ""
76+
}
77+
encoding, exists := config[KAFKA_ENCODING]
78+
if !exists {
79+
encoding = "otlp_proto"
80+
}
81+
partitionTracesById, exists := config[KAFKA_PARTITION_TRACES_BY_ID]
82+
if !exists {
83+
partitionTracesById = "false"
84+
}
85+
partitionMetricsByResourceAttributes, exists := config[KAFKA_PARTITION_METRICS_BY_RESOURCE_ATTRIBUTES]
86+
if !exists {
87+
partitionMetricsByResourceAttributes = "false"
88+
}
89+
partitionLogsByResourceAttributes, exists := config[KAFKA_PARTITION_LOGS_BY_RESOURCE_ATTRIBUTES]
90+
if !exists {
91+
partitionLogsByResourceAttributes = "false"
92+
}
93+
authMethod, exists := config[KAFKA_AUTH_METHOD]
94+
if !exists {
95+
authMethod = "none"
96+
}
97+
username, exists := config[KAFKA_USERNAME]
98+
if !exists {
99+
username = ""
100+
}
101+
metadataFull, exists := config[KAFKA_METADATA_FULL]
102+
if !exists {
103+
metadataFull = "false"
104+
}
105+
metadataMaxRetry, exists := config[KAFKA_METADATA_MAX_RETRY]
106+
if !exists {
107+
metadataMaxRetry = "3"
108+
}
109+
metadataBackoffRetry, exists := config[KAFKA_METADATA_BACKOFF_RETRY]
110+
if !exists {
111+
metadataBackoffRetry = "250ms"
112+
}
113+
timeout, exists := config[KAFKA_TIMEOUT]
114+
if !exists {
115+
timeout = "5s"
116+
}
117+
retryOnFailureEnabled, exists := config[KAFKA_RETRY_ON_FAILURE_ENABLED]
118+
if !exists {
119+
retryOnFailureEnabled = "true"
120+
}
121+
retryOnFailureInitialInterval, exists := config[KAFKA_RETRY_ON_FAILURE_INITIAL_INTERVAL]
122+
if !exists {
123+
retryOnFailureInitialInterval = "5s"
124+
}
125+
retryOnFailureMaxInterval, exists := config[KAFKA_RETRY_ON_FAILURE_MAX_INTERVAL]
126+
if !exists {
127+
retryOnFailureMaxInterval = "30s"
128+
}
129+
retryOnFailureMaxTimeElapsed, exists := config[KAFKA_RETRY_ON_FAILURE_MAX_ELAPSED_TIME]
130+
if !exists {
131+
retryOnFailureMaxTimeElapsed = "120s"
132+
}
133+
sendingQueueEnabled, exists := config[KAFKA_SENDING_QUEUE_ENABLED]
134+
if !exists {
135+
sendingQueueEnabled = "true"
136+
}
137+
sendingQueueNumConsumers, exists := config[KAFKA_SENDING_QUEUE_NUM_CONSUMERS]
138+
if !exists {
139+
sendingQueueNumConsumers = "10"
140+
}
141+
sendingQueueSize, exists := config[KAFKA_SENDING_QUEUE_SIZE]
142+
if !exists {
143+
sendingQueueSize = "1000"
144+
}
145+
producerMaxMessageBytes, exists := config[KAFKA_PRODUCER_MAX_MESSAGE_BYTES]
146+
if !exists {
147+
producerMaxMessageBytes = "1000000"
148+
}
149+
producerRequiredAcks, exists := config[KAFKA_PRODUCER_REQUIRED_ACKS]
150+
if !exists {
151+
producerRequiredAcks = "1"
152+
}
153+
producerCompression, exists := config[KAFKA_PRODUCER_COMPRESSION]
154+
if !exists {
155+
producerCompression = "none"
156+
}
157+
producerFlushMaxMessages, exists := config[KAFKA_PRODUCER_FLUSH_MAX_MESSAGES]
158+
if !exists {
159+
producerFlushMaxMessages = "0"
160+
}
161+
162+
// Modify the exporter here
163+
exporterName := "kafka/" + uniqueUri
164+
exporterConfig := GenericMap{
165+
"protocol_version": protocolVersion,
166+
"brokers": brokers,
167+
"resolve_canonical_bootstrap_servers_only": parseBool(resolveCanonicalBootstrapServersOnly),
168+
"client_id": clientId,
169+
"topic": topic,
170+
"topic_from_attribute": topicFromAttribute,
171+
"encoding": encoding,
172+
"partition_traces_by_id": parseBool(partitionTracesById),
173+
"partition_metrics_by_resource_attributes": parseBool(partitionMetricsByResourceAttributes),
174+
"partition_logs_by_resource_attributes": parseBool(partitionLogsByResourceAttributes),
175+
"metadata": GenericMap{
176+
"full": parseBool(metadataFull),
177+
"retry": GenericMap{
178+
"max": parseInt(metadataMaxRetry),
179+
"backoff": metadataBackoffRetry,
180+
},
181+
},
182+
"timeout": timeout,
183+
"retry_on_failure": GenericMap{
184+
"enabled": parseBool(retryOnFailureEnabled),
185+
"initial_interval": retryOnFailureInitialInterval,
186+
"max_interval": retryOnFailureMaxInterval,
187+
"max_elapsed_time": retryOnFailureMaxTimeElapsed,
188+
},
189+
"sending_queue": GenericMap{
190+
"enabled": parseBool(sendingQueueEnabled),
191+
"num_consumers": parseInt(sendingQueueNumConsumers),
192+
"queue_size": parseInt(sendingQueueSize),
193+
},
194+
"producer": GenericMap{
195+
"max_message_bytes": parseInt(producerMaxMessageBytes),
196+
"required_acks": parseInt(producerRequiredAcks),
197+
"compression": producerCompression,
198+
"flush_max_messages": parseInt(producerFlushMaxMessages),
199+
},
200+
"auth": GenericMap{
201+
"tls": GenericMap{
202+
"insecure": true,
203+
},
204+
},
205+
}
206+
207+
if authMethod == "plain_text" {
208+
exporterConfigAuth, ok := exporterConfig["auth"].(GenericMap)
209+
if !ok {
210+
return nil, errors.New("invalid type assertion for exporterConfig[\"auth\"]")
211+
}
212+
exporterConfigAuth["plain_text"] = GenericMap{
213+
"username": username,
214+
"password": "${KAFKA_PASSWORD}",
215+
}
216+
}
217+
218+
// Modify the pipelines here
219+
var pipelineNames []string
220+
221+
if isTracingEnabled(dest) {
222+
if topic == "" {
223+
exporterConfig["topic"] = "otlp_spans"
224+
}
225+
currentConfig.Exporters[exporterName] = exporterConfig
226+
227+
pipeName := "traces/" + uniqueUri
228+
currentConfig.Service.Pipelines[pipeName] = Pipeline{
229+
Exporters: []string{exporterName},
230+
}
231+
pipelineNames = append(pipelineNames, pipeName)
232+
}
233+
234+
if isMetricsEnabled(dest) {
235+
if topic == "" {
236+
exporterConfig["topic"] = "otlp_metrics"
237+
}
238+
currentConfig.Exporters[exporterName] = exporterConfig
239+
240+
pipeName := "metrics/" + uniqueUri
241+
currentConfig.Service.Pipelines[pipeName] = Pipeline{
242+
Exporters: []string{exporterName},
243+
}
244+
pipelineNames = append(pipelineNames, pipeName)
245+
}
246+
247+
if isLoggingEnabled(dest) {
248+
if topic == "" {
249+
exporterConfig["topic"] = "otlp_logs"
250+
}
251+
currentConfig.Exporters[exporterName] = exporterConfig
252+
253+
pipeName := "logs/" + uniqueUri
254+
currentConfig.Service.Pipelines[pipeName] = Pipeline{
255+
Exporters: []string{exporterName},
256+
}
257+
pipelineNames = append(pipelineNames, pipeName)
258+
}
259+
260+
return pipelineNames, nil
261+
}

common/config/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var availableConfigers = []Configer{
4242
&HyperDX{},
4343
&Instana{},
4444
&Jaeger{},
45+
&Kafka{},
4546
&KloudMate{},
4647
&Last9{},
4748
&Lightstep{},

common/config/utils.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"net"
66
"net/url"
7+
"strconv"
78
"strings"
89
)
910

@@ -98,6 +99,22 @@ func getBooleanConfig(currentValue string, deprecatedValue string) bool {
9899
return lowerCaseValue == "true" || lowerCaseValue == deprecatedValue
99100
}
100101

102+
func parseBool(value string) bool {
103+
result, err := strconv.ParseBool(value)
104+
if err != nil {
105+
return false
106+
}
107+
return result
108+
}
109+
110+
func parseInt(value string) int {
111+
num, err := strconv.Atoi(value)
112+
if err != nil {
113+
panic(err.Error())
114+
}
115+
return num
116+
}
117+
101118
func errorMissingKey(key string) error {
102119
return fmt.Errorf("key (\"%q\") not specified, destination will not be configured", key)
103120
}

common/dests.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
HyperDxDestinationType DestinationType = "hyperdx"
3030
InstanaDestinationType DestinationType = "instana"
3131
JaegerDestinationType DestinationType = "jaeger"
32+
KafkaDestinationType DestinationType = "kafka"
3233
KloudMateDestinationType DestinationType = "kloudmate"
3334
Last9DestinationType DestinationType = "last9"
3435
LightstepDestinationType DestinationType = "lightstep"

0 commit comments

Comments
 (0)