From f5e9415f869b6fed1a7c23126d4cf5df2c579d6a Mon Sep 17 00:00:00 2001 From: Oleh Date: Sun, 14 Apr 2024 14:10:29 +0300 Subject: [PATCH] Updated in_opensearch.rb and added retry logic for both client creation and search operations in case of connection failures. Signed-off-by: Oleh Palanskyi Signed-off-by: Oleh Signed-off-by: OlehPalanskyi --- README.OpenSearchInput.md | 10 +++ lib/fluent/plugin/in_opensearch.rb | 125 +++++++++++++++++++---------- 2 files changed, 91 insertions(+), 44 deletions(-) diff --git a/README.OpenSearchInput.md b/README.OpenSearchInput.md index a89535f..a8de566 100644 --- a/README.OpenSearchInput.md +++ b/README.OpenSearchInput.md @@ -24,6 +24,7 @@ + [docinfo_fields](#docinfo_fields) + [docinfo_target](#docinfo_target) + [docinfo](#docinfo) + + [infinite_check_connection](#infinite_check_connection) * [Advanced Usage](#advanced-usage) ## Usage @@ -274,6 +275,15 @@ This parameter specifies whether docinfo information including or not. The defau docinfo false ``` +### infinite_check_connection + +The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true,`. But if value is `false` then checking of connection will be only 3 times + +``` +infinite_check_connection true +``` + + ## Advanced Usage OpenSearch Input plugin and OpenSearch output plugin can combine to transfer records into another cluster. diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 380c0d7..7e33872 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -80,6 +80,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id'] config_param :docinfo_target, :string, :default => METADATA config_param :docinfo, :bool, :default => false + config_param :infinite_check_connection, :bool, :default => true include Fluent::Plugin::OpenSearchConstants @@ -240,40 +241,55 @@ def parse_time(value, event_time, tag) end def client(host = nil) - # check here to see if we already have a client connection for the given host - connection_options = get_connection_options(host) + retry_count = 0 + max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - @_os = nil unless is_existing_connection(connection_options[:hosts]) + begin + # check here to see if we already have a client connection for the given host + connection_options = get_connection_options(host) - @_os ||= begin - @current_config = connection_options[:hosts].clone - adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } - local_reload_connections = @reload_connections - if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER - local_reload_connections = @reload_after - end + @_os = nil unless is_existing_connection(connection_options[:hosts]) + + @_os ||= begin + @current_config = connection_options[:hosts].clone + adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } + local_reload_connections = @reload_connections + if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER + local_reload_connections = @reload_after + end - headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) - - transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( - connection_options.merge( - options: { - reload_connections: local_reload_connections, - reload_on_failure: @reload_on_failure, - resurrect_after: @resurrect_after, - logger: @transport_logger, - transport_options: { - headers: headers, - request: { timeout: @request_timeout }, - ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } - }, - http: { - user: @user, - password: @password - }, - sniffer_class: @sniffer_class, - }), &adapter_conf) - OpenSearch::Client.new transport: transport + headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + + transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( + connection_options.merge( + options: { + reload_connections: local_reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + logger: @transport_logger, + transport_options: { + headers: headers, + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } + }, + http: { + user: @user, + password: @password + }, + sniffer_class: @sniffer_class, + }), &adapter_conf) + OpenSearch::Client.new transport: transport + end + rescue Faraday::ConnectionFailed => e + # Retry logic for connection failures during client creation + if retry_count < max_retry + log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..." + retry_count += 1 + sleep(@request_timeout) + retry + else + raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + end end end @@ -305,23 +321,44 @@ def run end def run_slice(slice_id=nil) - slice_query = @base_query - slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? - result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) - es = Fluent::MultiEventStream.new - - result["hits"]["hits"].each {|hit| process_events(hit, es)} - has_hits = result['hits']['hits'].any? - scroll_id = result['_scroll_id'] - - while has_hits && scroll_id - result = process_next_scroll_request(es, scroll_id) - has_hits = result['has_hits'] + retry_count = 0 + max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed + begin + slice_query = @base_query + slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? + result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) + es = Fluent::MultiEventStream.new + + result["hits"]["hits"].each {|hit| process_events(hit, es)} + has_hits = result['hits']['hits'].any? scroll_id = result['_scroll_id'] + + while has_hits && scroll_id + result = process_next_scroll_request(es, scroll_id) + has_hits = result['has_hits'] + scroll_id = result['_scroll_id'] + end + + router.emit_stream(@tag, es) + clear_scroll(scroll_id) + rescue Faraday::ConnectionFailed => e + # Retry logic for connection failures during search + if retry_count < max_retry + log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..." + retry_count += 1 + sleep(@request_timeout) + retry + else + raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + end end + end - router.emit_stream(@tag, es) + def clear_scroll(scroll_id) client.clear_scroll(scroll_id: scroll_id) if scroll_id + rescue => e + # ignore & log any clear_scroll errors + log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end def process_scroll_request(scroll_id)