Skip to content

Commit 989f9e7

Browse files
authored
Deps: un-pin (and avoid) rufus-scheduler (#14260)
+ Refactor: specific require + scope java_import + Refactor: redundant requires + Refactor: avoid rufus - hook up a timer task
1 parent ce27e08 commit 989f9e7

File tree

3 files changed

+45
-21
lines changed

3 files changed

+45
-21
lines changed

logstash-core/logstash-core.gemspec

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,13 @@ Gem::Specification.new do |gem|
7777
gem.add_development_dependency 'logstash-filter-geoip', '>= 7.2.1' # breaking change of DatabaseManager
7878
gem.add_dependency 'down', '~> 5.2.0' #(MIT license)
7979
gem.add_dependency 'tzinfo-data' #(MIT license)
80-
# TEMPORARY: Modern Rufus Scheduler 3.x subtly breaks thread joining, which
81-
# is done in several plugins to handle shutdowns.
82-
# Pin pending migration to shared Scheduler Mixin that can mitigate this issue.
83-
# https://github.com/logstash-plugins/logstash-mixin-scheduler/pull/1
84-
gem.add_runtime_dependency 'rufus-scheduler', '~> 3.0.9' #(MIT license)
80+
81+
# NOTE: plugins now avoid using **rufus-scheduler** directly, if logstash-core would find itself in a need
82+
# to use rufus than preferably the **logstash-mixin-scheduler** should be changed to work with non-plugins.
83+
#
84+
# Using the scheduler directly might lead to issues e.g. when join-ing, see:
85+
# https://github.com/logstash-plugins/logstash-mixin-scheduler/blob/v1.0.1/lib/logstash/plugin_mixins/scheduler/rufus_impl.rb#L85=
86+
# and https://github.com/elastic/logstash/issues/13773
8587

8688
# TEMPORARY: racc-1.6.0 doesn't have JAVA counterpart (yet)
8789
# SEE: https://github.com/ruby/racc/issues/172

x-pack/lib/filters/geoip/database_manager.rb

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@
77
require_relative "database_metadata"
88
require_relative "download_manager"
99
require_relative "database_metric"
10-
require "faraday"
1110
require "json"
12-
require "zlib"
1311
require "stud/try"
14-
require "down"
15-
require "rufus/scheduler"
1612
require "singleton"
17-
require "concurrent"
13+
require "concurrent/array"
14+
require "concurrent/timer_task"
1815
require "thread"
19-
java_import org.apache.logging.log4j.ThreadContext
2016

2117
# The mission of DatabaseManager is to ensure the plugin running an up-to-date MaxMind database and
2218
# thus users are compliant with EULA.
@@ -35,10 +31,13 @@ module LogStash module Filters module Geoip class DatabaseManager
3531
include LogStash::Filters::Geoip::Util
3632
include Singleton
3733

34+
java_import org.apache.logging.log4j.ThreadContext
35+
3836
private
3937
def initialize
4038
@triggered = false
4139
@trigger_lock = Mutex.new
40+
@download_interval = 24 * 60 * 60 # 24h
4241
end
4342

4443
def setup
@@ -205,21 +204,25 @@ def trigger_download
205204
return if @triggered
206205
setup
207206
execute_download_job
208-
# check database update periodically. trigger `call` method
209-
@scheduler = Rufus::Scheduler.new({:max_work_threads => 1})
210-
@scheduler.every('24h', self)
207+
# check database update periodically:
208+
209+
@download_task = Concurrent::TimerTask.execute(execution_interval: @download_interval) do
210+
LogStash::Util.set_thread_name 'geoip database download task'
211+
database_update_check # every 24h
212+
end
211213
@triggered = true
212214
end
213215
end
214216

215217
public
216218

217-
# scheduler callback
218-
def call(job, time)
219-
logger.debug "scheduler runs database update check"
219+
# @note this method is expected to execute on a separate thread
220+
def database_update_check
221+
logger.debug "running database update check"
220222
ThreadContext.put("pipeline.id", nil)
221223
execute_download_job
222224
end
225+
private :database_update_check
223226

224227
def subscribe_database_path(database_type, database_path, geoip_plugin)
225228
if database_path.nil?

x-pack/spec/filters/geoip/database_manager_spec.rb

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
let(:mock_geoip_plugin) { double("geoip_plugin") }
1313
let(:mock_metadata) { double("database_metadata") }
1414
let(:mock_download_manager) { double("download_manager") }
15-
let(:mock_scheduler) { double("scheduler") }
1615
let(:agent_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
1716
let(:database_metric) { LogStash::Filters::Geoip::DatabaseMetric.new(agent_metric) }
1817
let(:db_manager) do
@@ -21,7 +20,6 @@
2120
manager.send(:setup)
2221
manager.instance_variable_set(:@metadata, mock_metadata)
2322
manager.instance_variable_set(:@download_manager, mock_download_manager)
24-
manager.instance_variable_set(:@scheduler, mock_scheduler)
2523
manager
2624
end
2725
let(:logger) { double("Logger") }
@@ -215,6 +213,27 @@ def expect_download_metric_fail(c)
215213
end
216214
end
217215

216+
context "periodic database update" do
217+
218+
it 'sets up periodic task when download triggered' do
219+
db_manager.send :trigger_download
220+
download_task = db_manager.instance_variable_get(:@download_task)
221+
expect( download_task ).to_not be nil
222+
expect( download_task.running? ).to be true
223+
expect( download_task.execution_interval ).to eq 86_400
224+
end
225+
226+
it 'executes download job after interval passes' do
227+
db_manager.instance_variable_set(:@download_interval, 1.5)
228+
db_manager.send :trigger_download
229+
download_task = db_manager.instance_variable_get(:@download_task)
230+
expect( download_task.running? ).to be true
231+
expect( db_manager ).to receive :execute_download_job
232+
sleep 2.0 # wait for task execution
233+
end
234+
235+
end
236+
218237
context "check age" do
219238
context "eula database" do
220239
let(:db_manager) do
@@ -362,7 +381,7 @@ def expect_healthy_database_metric(c)
362381
end
363382

364383
context "shutdown" do
365-
let(:db_manager) { manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
384+
let(:db_manager) { Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
366385

367386
it "should unsubscribe gracefully" do
368387
db_manager.subscribe_database_path(CITY, default_city_db_path, mock_geoip_plugin)
@@ -371,7 +390,7 @@ def expect_healthy_database_metric(c)
371390
end
372391

373392
context "database metric is not assigned" do
374-
let(:db_manager) { manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
393+
let(:db_manager) { Class.new(LogStash::Filters::Geoip::DatabaseManager).instance }
375394

376395
it "does not throw error" do
377396
allow(LogStash::Filters::Geoip::DatabaseManager).to receive(:logger).and_return(logger)

0 commit comments

Comments
 (0)