diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 6ad10691..27ca0ca3 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -678,6 +678,15 @@ The schemas must follow a naming convention with the pattern -value. Use either the Schema Registry config option or the <> config option, but not both. +[id="plugins-{type}s-{plugin}-schema_registry_api_key"] +===== `schema_registry_api_key` + +* Value type is <> +* There is no default value for this setting. + +If schema registry is configured to validate client api key, this setting will send +the api key in all requests to the schema registry + [id="plugins-{type}s-{plugin}-schema_registry_validation"] ===== `schema_registry_validation` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index d730e567..f23f7fe6 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -451,6 +451,9 @@ def create_consumer(client_id, group_instance_id) props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class) serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s) + if schema_registry_api_key + props.put("request.header.x-api-key", schema_registry_api_key.to_s) + end if schema_registry_proxy && !schema_registry_proxy.empty? props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host) props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port) diff --git a/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb b/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb index 670231f6..b9d0459d 100644 --- a/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb +++ b/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb @@ -20,6 +20,10 @@ def setup_schema_registry_config # instance of schema registry. If this option has value `value_deserializer_class` nor `topics_pattern` could be valued config :schema_registry_url, :validate => :uri + # Option to set the api key of the Schema Registry. + # This option permits to define an api key to be used with all schema registry requests either in header or as query parameter. + config :schema_registry_api_key, :validate => :string + # Option to set the proxy of the Schema Registry. # This option permits to define a proxy to be used to reach the schema registry service instance. config :schema_registry_proxy, :validate => :uri @@ -101,7 +105,11 @@ def check_for_schema_registry_connectivity_and_subjects client = Manticore::Client.new(options) begin - response = client.get(@schema_registry_url.uri.to_s + '/subjects').body + if schema_registry_api_key + response = client.get(@schema_registry_url.uri.to_s + '/subjects' + '?key=' + @schema_registry_apikey.to_s).body + else + response = client.get(@schema_registry_url.uri.to_s + '/subjects').body + end rescue Manticore::ManticoreException => e raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}") end