Skip to content

Add ability to customize bulk batch size #941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 67 additions & 57 deletions docs/index.asciidoc

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
@@ -75,12 +75,12 @@
#
# ==== HTTP Compression
#
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
#
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# setting in their Logstash config file.
#
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
@@ -103,6 +103,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base

config_name "elasticsearch"

DEFAULT_BATCH_SIZE = 20 * 1024 * 1024 # 20MiB

# The Elasticsearch action to perform. Valid actions are:
#
# - index: indexes a document (an event from Logstash).
@@ -242,6 +244,19 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Custom Headers to send on each request to elasticsearch nodes
config :custom_headers, :validate => :hash, :default => {}

# Bulk batch size is used to determine at what point to send the bulk requests.
# The criteria used for default value is:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
config :bulk_batch_size, :validate => :number, :default => DEFAULT_BATCH_SIZE

# @override to handle proxy => '' as if none was set
def config_init(params)
proxy = params['proxy']
7 changes: 3 additions & 4 deletions lib/logstash/outputs/elasticsearch/common_configs.rb
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ def self.included(mod)
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME

mod.config :document_type,
:validate => :string,
mod.config :document_type,
:validate => :string,
:deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature"

# From Logstash 1.3 onwards, a template is applied to Elasticsearch during
@@ -69,7 +69,7 @@ def self.included(mod)
# The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
mod.config :version, :validate => :string

# The version_type to use for indexing.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
# See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
@@ -145,7 +145,6 @@ def self.included(mod)
# here like `pipeline => "%{INGEST_PIPELINE}"`
mod.config :pipeline, :validate => :string, :default => nil


# -----
# ILM configurations (beta)
# -----
30 changes: 8 additions & 22 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
@@ -8,21 +8,6 @@
require 'stringio'

module LogStash; module Outputs; class ElasticSearch;
# This is a constant instead of a config option because
# there really isn't a good reason to configure it.
#
# The criteria used are:
# 1. We need a number that's less than 100MiB because ES
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
# This is here in case we use DEFAULT_OPTIONS in the future
@@ -52,6 +37,7 @@ class HttpClient
def initialize(options={})
@logger = options[:logger]
@metric = options[:metric]
@bulk_batch_size = options[:bulk_batch_size]
@bulk_request_metrics = @metric.namespace(:bulk_requests)
@bulk_response_metrics = @bulk_request_metrics.namespace(:responses)

@@ -110,7 +96,7 @@ def bulk(actions)
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
else
else
stream_writer = body_stream
end
bulk_responses = []
@@ -119,7 +105,7 @@ def bulk(actions)
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
if (body_stream.size + as_json.bytesize) > @bulk_batch_size
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
end
stream_writer.write(as_json)
@@ -215,7 +201,7 @@ def scheme
else
nil
end

calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing)

if calculated_scheme && calculated_scheme !~ /https?/
@@ -235,7 +221,7 @@ def port
# Enter things like foo:123, bar and wind up with foo:123, bar:9200
calculate_property(uris, :port, nil, sniffing) || 9200
end

def uris
@options[:hosts]
end
@@ -254,7 +240,7 @@ def http_compression

def build_adapter(options)
timeout = options[:timeout] || 0

adapter_options = {
:socket_timeout => timeout,
:request_timeout => timeout,
@@ -281,7 +267,7 @@ def build_adapter(options)
adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
adapter = adapter_class.new(@logger, adapter_options)
end

def build_pool(options)
adapter = build_adapter(options)

@@ -331,7 +317,7 @@ def host_to_url(h)
h.query
end
prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil

raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}"

::LogStash::Util::SafeURI.new(raw_url)
11 changes: 6 additions & 5 deletions lib/logstash/outputs/elasticsearch/http_client_builder.rb
Original file line number Diff line number Diff line change
@@ -11,13 +11,14 @@ def self.build(logger, hosts, params)
:http_compression => params["http_compression"],
:headers => params["custom_headers"] || {}
}

client_settings[:proxy] = params["proxy"] if params["proxy"]

common_options = {
:client_settings => client_settings,
:metric => params["metric"],
:resurrect_delay => params["resurrect_delay"]
:resurrect_delay => params["resurrect_delay"],
:bulk_batch_size => params["bulk_batch_size"]
}

if params["sniffing"]
@@ -65,7 +66,7 @@ def self.build(logger, hosts, params)
LogStash::ConfigurationError,
"External versioning requires the presence of a version number."
) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil


# Create API setup
raise(
@@ -144,7 +145,7 @@ def self.setup_ssl(logger, params)

def self.setup_basic_auth(logger, params)
user, password = params["user"], params["password"]

return {} unless user && password && password.value

{
63 changes: 48 additions & 15 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require_relative "../../../spec/es_spec_helper"
require "logstash/outputs/elasticsearch"

describe "TARGET_BULK_BYTES", :integration => true do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
describe "BATCH_BULK_SIZE", :integration => true do
let(:batch_bulk_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE }
let(:event_count) { 1000 }
let(:events) { event_count.times.map { event }.to_a }
let(:config) {
@@ -23,11 +23,11 @@
end

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= batch_bulk_size
end
end

@@ -38,7 +38,40 @@

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= batch_bulk_size
end
end
end
end

describe "custom bulk size set" do
let(:batch_bulk_size) { 5 * 1024 * 1024 }
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"bulk_batch_size" => batch_bulk_size
}
}

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= batch_bulk_size
end
end

describe "batches that fit in one" do
# Normally you'd want to generate a request that's just 1 byte below the limit, but it's
# impossible to know how many bytes an event will serialize as with bulk proto overhead
let(:event) { LogStash::Event.new("message" => "a") }

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= batch_bulk_size
end
end
end
end
@@ -53,7 +86,7 @@
let(:config) { "not implemented" }
let(:events) { event_count.times.map { event }.to_a }
subject { LogStash::Outputs::ElasticSearch.new(config) }

let(:es_url) { "http://#{get_host_port}" }
let(:index_url) {"#{es_url}/#{index}"}
let(:http_client_options) { {} }
@@ -65,7 +98,7 @@
subject.register
subject.multi_receive([])
end

shared_examples "an indexer" do |secure|
it "ships events" do
subject.multi_receive(events)
@@ -85,13 +118,13 @@
expect(doc["_index"]).to eq(index)
end
end

it "sets the correct content-type header" do
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything}
if secure
expected_manticore_opts = {
:headers => {"Content-Type" => "application/json"},
:body => anything,
:headers => {"Content-Type" => "application/json"},
:body => anything,
:auth => {
:user => user,
:password => password,
@@ -146,22 +179,22 @@
:auth => {
:user => user,
:password => password
},
},
:ssl => {
:enabled => true,
:ca_file => cacert
}
}
end
it_behaves_like("an indexer", true)

describe "with a password requiring escaping" do
let(:user) { "f@ncyuser" }
let(:password) { "ab%12#" }

include_examples("an indexer", true)
end

describe "with a user/password requiring escaping in the URL" do
let(:config) do
{
@@ -171,7 +204,7 @@
"index" => index
}
end

include_examples("an indexer", true)
end
end
44 changes: 30 additions & 14 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
opts = {
:hosts => [::LogStash::Util::SafeURI.new("127.0.0.1")],
:logger => Cabin::Channel.get,
:bulk_batch_size => LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE,
:metric => ::LogStash::Instrument::NullMetric.new(:dummy).namespace(:alsodummy)
}

@@ -30,7 +31,7 @@
let(:http_hostname_port) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}") }
let(:https_hostname_port) { ::LogStash::Util::SafeURI.new("https://#{hostname_port}") }
let(:http_hostname_port_path) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}/path") }

shared_examples("proper host handling") do
it "should properly transform a host:port string to a URL" do
expect(subject.host_to_url(hostname_port_uri).to_s).to eq(http_hostname_port.to_s + "/")
@@ -59,7 +60,7 @@
context "when SSL is false" do
let(:ssl) { false }
let(:base_options) { super.merge(:hosts => [https_hostname_port]) }

it "should refuse to handle an https url" do
expect {
subject.host_to_url(https_hostname_port)
@@ -73,13 +74,13 @@
subject
expect(subject.host_to_url(https_hostname_port).to_s).to eq(https_hostname_port.to_s + "/")
end
end
end
end

describe "path" do
let(:url) { http_hostname_port_path }
let(:base_options) { super.merge(:hosts => [url]) }

it "should allow paths in a url" do
expect(subject.host_to_url(url)).to eq(url)
end
@@ -93,12 +94,12 @@
}.to raise_error(LogStash::ConfigurationError)
end
end

context "with a path missing a leading /" do
let(:url) { http_hostname_port }
let(:base_options) { super.merge(:client_settings => {:path => "otherpath"}) }


it "should automatically insert a / in front of path overlays" do
expected = url.clone
expected.path = url.path + "/otherpath"
@@ -190,14 +191,29 @@
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
]}

context "if a message is over TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message) { "a" * (target_bulk_bytes + 1) }
context "if a message is over DEFAULT_BATCH_SIZE" do
let(:default_batch_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE }
let(:message) { "a" * (default_batch_size + 1) }

it "should be handled properly" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once do |data|
expect(data.size).to be > default_batch_size
end
s = subject.send(:bulk, actions)
end
end

context "if a message is over customized BATCH_SIZE" do
let(:batch_size) { 5 * 1024 * 1024 }
let(:base_options) { super.merge(:bulk_batch_size => batch_size) }

let(:message) { "a" * (batch_size + 1) }

it "should be handled properly" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).once do |data|
expect(data.size).to be > target_bulk_bytes
expect(data.size).to be > batch_size
end
s = subject.send(:bulk, actions)
end
@@ -216,9 +232,9 @@
s = subject.send(:bulk, actions)
end

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
context "if one exceeds BULK_BATCH_SIZE" do
let(:default_batch_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE }
let(:message1) { "a" * (default_batch_size + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice