Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 43 additions & 18 deletions x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,7 @@ def pipeline_configs
es_version = get_es_version
fetcher = get_pipeline_fetcher(es_version)

begin
fetcher.fetch_config(es_version, pipeline_ids, client)
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
# es-output 12.0.2 throws 404 as error, but we want to handle it as empty config
return [] if e.response_code == 404
raise e
end
fetcher.fetch_config(es_version, pipeline_ids, client)

fetcher.get_pipeline_ids.collect do |pid|
get_pipeline(pid, fetcher)
Expand Down Expand Up @@ -210,22 +204,46 @@ class SystemIndicesFetcher
SYSTEM_INDICES_API_PATH = "_logstash/pipeline"

def fetch_config(es_version, pipeline_ids, client)
es_supports_pipeline_wildcard_search = es_supports_pipeline_wildcard_search?(es_version)
# if we are talking with an Elasticsearch that supports wildcard search, and get
# a successful response, use it. But wildcard search has a weird quirk that it 404's
# when there are no matches, so we need to fall through to traditional client-side
# search to rule out a proxy emitting a 404.
if es_supports_pipeline_wildcard_search?(es_version)
begin
logger.trace("querying for pipelines #{pipeline_ids.join(",")} using server-side wildcard search")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("querying for pipelines #{pipeline_ids.join(",")} using server-side wildcard search")
logger.trace? && logger.trace("querying for pipelines #{pipeline_ids.join(",")} using server-side wildcard search")

response = get_response(client,"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}")
intercept_error(response, pipeline_ids)
@pipelines = response
return @pipelines
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise unless e.response_code == 404
logger.warn("got 404 requesting pipelines from Elasticsearch using wildcard search; falling back to client-side filtering")
end
end

# client-side filtering
logger.trace("querying for pipelines #{pipeline_ids.join(",")} using client-side wildcard search")
response = get_response(client,"#{SYSTEM_INDICES_API_PATH}/")
intercept_error(response, pipeline_ids)
@pipelines = get_wildcard_pipelines(pipeline_ids, response)

rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise ElasticsearchSource::RemoteConfigError, "Cannot load configuration for pipeline_id: #{pipeline_ids}, server returned `#{e}`"
end

def get_response(client, path)
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
path = es_supports_pipeline_wildcard_search ?
"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}" :
"#{SYSTEM_INDICES_API_PATH}/"
client.get(path)
}

response
end

def intercept_error(response, pipeline_ids)
if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

@pipelines = es_supports_pipeline_wildcard_search ?
response :
get_wildcard_pipelines(pipeline_ids, response)
end

def es_supports_pipeline_wildcard_search?(es_version)
Expand Down Expand Up @@ -271,11 +289,18 @@ class LegacyHiddenIndicesFetcher
PIPELINE_INDEX = ".logstash"

def fetch_config(es_version, pipeline_ids, client)
deprecation_logger.deprecated("Fetching pipeline configs from Elasticsearch #{es_version}; Central Management will soon require Elasticsearch 8.x+")
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
}
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) do
begin
client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)
rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise e unless e.response_code == 404
@pipelines = {}
return @pipelines
end
end

if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
Expand Down
73 changes: 58 additions & 15 deletions x-pack/spec/config_management/elasticsearch_source_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
describe LogStash::ConfigManagement::ElasticsearchSource do
let(:system_indices_api) { LogStash::ConfigManagement::SystemIndicesFetcher::SYSTEM_INDICES_API_PATH }
let(:system_indices_url_regex) { Regexp.new("^#{system_indices_api}") }
let(:system_indices_url_search_regex) { Regexp.new("^#{Regexp.escape(system_indices_api)}[?]id=([^=&]+)$") }
let(:system_indices_url_all_regex) { Regexp.new("^#{Regexp.escape(system_indices_api)}/?$") }
let(:elasticsearch_url) { ["https://localhost:9898"] }
let(:elasticsearch_username) { "elastictest" }
let(:elasticsearch_password) { "testchangeme" }
Expand Down Expand Up @@ -91,6 +93,7 @@

let(:es_version_response) { es_version_8_response }
let(:es_version_8_response) { cluster_info("8.0.0-SNAPSHOT") }
let(:es_version_8_3_response) { cluster_info("8.3.0") }
let(:es_version_7_9_response) { cluster_info("7.9.1") }

let(:elasticsearch_7_9_err_response) {
Expand Down Expand Up @@ -230,7 +233,9 @@
let(:pipeline_id) { "super_generator" }
let(:elasticsearch_response) { {"#{pipeline_id}" => {"pipeline" => "#{config}"}} }
let(:all_pipelines) { JSON.parse(::File.read(::File.join(::File.dirname(__FILE__), "fixtures", "pipelines.json"))) }
let(:mock_logger) { double("fetcher's logger") }
let(:mock_logger) { double("fetcher's logger").as_null_object }

let(:bad_response_code_error) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError }

before(:each) {
allow(subject).to receive(:logger).and_return(mock_logger)
Expand All @@ -242,16 +247,36 @@
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
end

it "#fetch_config from ES v8.3" do
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
expect(subject.fetch_config(es_version_8_3, [pipeline_id], mock_client)).to eq(elasticsearch_response)
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
end
{
'v8.3' => { major: 8, minor: 3},
'v9.0' => { major: 9, minor: 0}
}.each do |desc, es_version|
context "ES #{desc}" do
it "#fetch_config works" do
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq(elasticsearch_response)
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
end

it "#fetch_config from ES v9.0" do
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone)
expect(subject.fetch_config(es_version_9_0, [pipeline_id], mock_client)).to eq(elasticsearch_response)
expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"})
it "#fetch_config from ES v8.3 with 404->200 is empty list" do
wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}"
expect(mock_client).to receive(:get).with(wildcard_search_path)
.and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}'))
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/").and_return({})
expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq({})
end

it "#fetch_config from ES v8.3 with 404->404 is error" do
# 404's on Wildcard search need to be confirmed with a 404 from the get-all endpoint
wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}"
expect(mock_client).to receive(:get).with(wildcard_search_path)
.and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}'))
get_all_path = "#{described_class::SYSTEM_INDICES_API_PATH}/"
expect(mock_client).to receive(:get).with(get_all_path)
.and_raise(bad_response_code_error.new(404, get_all_path, nil, '{}'))
expect { subject.fetch_config(es_version, [pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
end
end
end

it "#fetch_config should raise error" do
Expand Down Expand Up @@ -359,7 +384,7 @@
expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
end

it "#fetch_config should raise error when response is empty" do
it "#fetch_config should raise error when response is malformed" do
expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(LogStash::Json.load("{}"))
expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError)
end
Expand Down Expand Up @@ -760,10 +785,28 @@
expect { subject.pipeline_configs }.to raise_error /Something bad/
end

it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do
allow(mock_client).to receive(:get).with("/").and_return(es_version_response)
expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404)
expect(subject.pipeline_configs).to be_empty
context "8.0" do
it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do
allow(mock_client).to receive(:get).with("/").and_return(es_version_response)
expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404)
expect { subject.pipeline_configs }.to raise_error /response code '404'/
end
end

context "8.3+" do
it "returns empty pipeline when ES search 404's but client-side 200's and includes no matching pipelines" do
expect(mock_client).to receive(:get).with("/").and_return(es_version_8_3_response)
expect(mock_client).to receive(:get).with(system_indices_url_search_regex).and_raise(bad_response_404)
expect(mock_client).to receive(:get).with(system_indices_url_all_regex).and_return({})
expect(subject.pipeline_configs).to be_empty
end

it "raises the 404 when ES search 404's and client-side query 404's too" do
expect(mock_client).to receive(:get).with("/").and_return(es_version_8_3_response)
expect(mock_client).to receive(:get).with(system_indices_url_search_regex).and_raise(bad_response_404)
expect(mock_client).to receive(:get).with(system_indices_url_all_regex).and_raise(bad_response_404)
expect { subject.pipeline_configs }.to raise_error /response code '404'/
end
end

it "returns empty pipeline when ES client raise BadResponseCodeError in [7.9]" do
Expand Down