Skip to content

Add schema_registry_api_key to kafka input #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,15 @@ The schemas must follow a naming convention with the pattern <topic name>-value.
Use either the Schema Registry config option or the
<<plugins-{type}s-{plugin}-value_deserializer_class>> config option, but not both.

[id="plugins-{type}s-{plugin}-schema_registry_api_key"]
===== `schema_registry_api_key`

* Value type is <<string,string>>
* There is no default value for this setting.

If schema registry is configured to validate client api key, this setting will send
the api key in all requests to the schema registry

[id="plugins-{type}s-{plugin}-schema_registry_validation"]
===== `schema_registry_validation`

Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ def create_consumer(client_id, group_instance_id)
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class)
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s)
if schema_registry_api_key
props.put("request.header.x-api-key", schema_registry_api_key.to_s)
end
if schema_registry_proxy && !schema_registry_proxy.empty?
props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host)
props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port)
Expand Down
10 changes: 9 additions & 1 deletion lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def setup_schema_registry_config
# instance of schema registry. If this option has value `value_deserializer_class` nor `topics_pattern` could be valued
config :schema_registry_url, :validate => :uri

# Option to set the api key of the Schema Registry.
# This option permits to define an api key to be used with all schema registry requests either in header or as query parameter.
config :schema_registry_api_key, :validate => :string

# Option to set the proxy of the Schema Registry.
# This option permits to define a proxy to be used to reach the schema registry service instance.
config :schema_registry_proxy, :validate => :uri
Expand Down Expand Up @@ -101,7 +105,11 @@ def check_for_schema_registry_connectivity_and_subjects

client = Manticore::Client.new(options)
begin
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body
if schema_registry_api_key
response = client.get(@schema_registry_url.uri.to_s + '/subjects' + '?key=' + @schema_registry_apikey.to_s).body
else
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body
end
rescue Manticore::ManticoreException => e
raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
end
Expand Down