From 503054c6decb4672d52a0206b449d91685d42916 Mon Sep 17 00:00:00 2001 From: Jordan Ritter Date: Mon, 19 Jul 2021 13:44:03 -0700 Subject: [PATCH] Add max size limit to requests for bulk import This commit adds a new parameter `max_size`, in bytes, which is used to enforce an upper limit on the overall HTTP POST size. This is useful when trying to maximize bulk import speed by reducing roundtrips to retrieve and send data. This is needed for scenarios where there is no control over Elasticsearch's maximum HTTP request payload size. For example, AWS' elasticsearch offering has either a 10MiB or 100MiB HTTP request payload size limit. `batch_size` is good for bounding local runtime memory usage, but when indexing large sets of big objects, it's entirely possible to hit a service provider's underlying request size limit and biff the import mid-run. This is even worse when `force` is true - then the index is left in an incomplete state with no obvious value to adjust batch_size down to, in order to sneak under the limit. The `max_size` defaults to `10_000_000`, to catch the worst-case scenario on AWS. --- .../lib/elasticsearch/model/importing.rb | 54 ++++++++++++++----- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/elasticsearch-model/lib/elasticsearch/model/importing.rb b/elasticsearch-model/lib/elasticsearch/model/importing.rb index 927dd16ee..5e5255379 100644 --- a/elasticsearch-model/lib/elasticsearch/model/importing.rb +++ b/elasticsearch-model/lib/elasticsearch/model/importing.rb @@ -145,6 +145,7 @@ def import(options={}, &block) transform = options.delete(:transform) || __transform pipeline = options.delete(:pipeline) return_value = options.delete(:return) || 'count' + max_size = options.delete(:max_size) || 10_000_000 unless transform.respond_to?(:call) raise ArgumentError, @@ -159,19 +160,46 @@ def import(options={}, &block) end __find_in_batches(options) do |batch| - params = { - index: target_index, - type: target_type, - body: __batch_to_bulk(batch, transform) - } - - params[:pipeline] = pipeline if pipeline - - response = client.bulk params - - yield response if block_given? - - errors += response['items'].select { |k, v| k.values.first['error'] } + batch = __batch_to_bulk(batch, transform) + + until batch.empty? + todo = [] + size = 0 + + # Accumulate until we hit max size + until size > max_size or batch.empty? + todo.push batch.shift + size += todo.last.to_s.size + end + + # Put back last one if we went over + if size > max_size + batch.push todo.pop + size -= batch.last.to_s.size + end + + # If we got here with nothing to do, we put our only todo back + # because it was too big - error. + if todo.empty? + item = batch.last + raise RuntimeError, + "#{target} #{item[:index][:_id]} size #{item.to_s.size} is larger than max_size #{max_size}" + end + + params = { + index: target_index, + type: target_type, + body: todo + } + + params[:pipeline] = pipeline if pipeline + + response = client.bulk params + + yield response if block_given? + + errors += response['items'].select { |k, v| k.values.first['error'] } + end end self.refresh_index! index: target_index if refresh