Skip to content

Commit 9f2fbe5

Browse files
authored
Deps: allow any scheduler version with plugin (#132)
1 parent cd98d27 commit 9f2fbe5

File tree

6 files changed

+84
-74
lines changed

6 files changed

+84
-74
lines changed

.travis.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
import:
2-
- logstash-plugins/.ci:travis/[email protected]
2+
- logstash-plugins/.ci:travis/[email protected]
3+
4+
env:
5+
jobs: # test with old scheduler version (3.0 was locked in LS 7.x)
6+
- ELASTIC_STACK_VERSION=7.x RUFUS_SCHEDULER_VERSION=3.0.9 LOG_LEVEL=info
7+
- ELASTIC_STACK_VERSION=6.x RUFUS_SCHEDULER_VERSION=3.0.9

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.2.1
2+
- Deps: unpin rufus-scheduler dependency [#130](https://github.com/logstash-plugins/logstash-input-http_poller/pull/130)
3+
14
## 5.2.0
25
- Feat: support ssl_verification_mode option [#131](https://github.com/logstash-plugins/logstash-input-http_poller/pull/131)
36

Gemfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ if Dir.exist?(logstash_path) && use_logstash_source
99
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
1010
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
1111
end
12+
13+
gem 'rufus-scheduler', ENV['RUFUS_SCHEDULER_VERSION'] if ENV['RUFUS_SCHEDULER_VERSION']

lib/logstash/inputs/http_poller.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ def register
5555
setup_requests!
5656
end
5757

58+
# @overload
5859
def stop
59-
Stud.stop!(@interval_thread) if @interval_thread
60-
@scheduler.stop if @scheduler
60+
if @scheduler
61+
@scheduler.shutdown # on newer Rufus (3.8) this joins on the scheduler thread
62+
end
63+
# TODO implement client.close as we as releasing it's pooled resources!
6164
end
6265

6366
private
@@ -163,16 +166,15 @@ def setup_schedule(queue)
163166
#schedule hash must contain exactly one of the allowed keys
164167
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
165168
"exactly one of the following keys - cron, at, every or in"
166-
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
169+
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
167170
schedule_type = @schedule.keys.first
168171
schedule_value = @schedule[schedule_type]
169172
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
170173

171174
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
172-
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
173175
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
174176
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
175-
@scheduler.join
177+
@scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join
176178
end
177179

178180
def run_once(queue)

logstash-input-http_poller.gemspec

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-http_poller'
3-
s.version = '5.2.0'
3+
s.version = '5.2.1'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Decodes the output of an HTTP API into events"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -21,8 +21,7 @@ Gem::Specification.new do |s|
2121
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
2222
s.add_runtime_dependency 'logstash-codec-plain'
2323
s.add_runtime_dependency "logstash-mixin-http_client", ">= 7.1.0"
24-
s.add_runtime_dependency 'stud', "~> 0.0.22"
25-
s.add_runtime_dependency 'rufus-scheduler', "~>3.0.9"
24+
s.add_runtime_dependency 'rufus-scheduler', ">= 3.0.9"
2625
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3'
2726
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0', '>= 1.0.1'
2827
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

spec/inputs/http_poller_spec.rb

Lines changed: 64 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,19 @@
2525
"schedule" => default_schedule,
2626
"urls" => default_urls,
2727
"codec" => "json",
28-
"metadata_target" => metadata_target
28+
"metadata_target" => metadata_target,
29+
"pool_max" => 3, "pool_max_per_route" => 1, 'keepalive' => false
2930
}
3031
}
31-
let(:klass) { LogStash::Inputs::HTTP_Poller }
32+
let(:opts) { default_opts }
3233

34+
subject(:plugin) { described_class.new(opts) }
3335

3436
describe "instances" do
35-
subject { klass.new(default_opts) }
37+
subject { described_class.new(default_opts) }
3638

37-
before do
38-
subject.register
39-
end
39+
before { subject.register }
40+
after { subject.stop }
4041

4142
describe "#run" do
4243
it "should setup a scheduler" do
@@ -189,22 +190,21 @@
189190
"metadata_target" => metadata_target
190191
}
191192
}
192-
it "should run at the schedule" do
193-
instance = klass.new(opts)
194-
instance.register
193+
194+
before do
195195
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
196196
Timecop.scale(60)
197-
queue = Queue.new
198-
runner = Thread.new do
199-
instance.run(queue)
200-
end
201-
sleep 3
202-
instance.stop
203-
runner.kill
204-
runner.join
205-
expect(queue.size).to eq(2)
197+
end
198+
199+
after do
206200
Timecop.return
207201
end
202+
203+
it "should run at the schedule" do
204+
run_plugin_and_yield_queue(plugin, sleep: 3) do |queue|
205+
try(5) { expect(queue.size).to be >= 2 }
206+
end
207+
end
208208
end
209209

210210
context "given 'at' expression" do
@@ -216,22 +216,21 @@
216216
"metadata_target" => metadata_target
217217
}
218218
}
219-
it "should run at the schedule" do
220-
instance = klass.new(opts)
221-
instance.register
219+
220+
before do
222221
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
223-
Timecop.scale(60 * 5)
224-
queue = Queue.new
225-
runner = Thread.new do
226-
instance.run(queue)
227-
end
228-
sleep 2
229-
instance.stop
230-
runner.kill
231-
runner.join
232-
expect(queue.size).to eq(1)
222+
Timecop.scale (60 * 5) / 2
223+
end
224+
225+
after do
233226
Timecop.return
234227
end
228+
229+
it "should run at the schedule" do
230+
run_plugin_and_yield_queue(plugin, sleep: 2) do |queue|
231+
try(5) { expect(queue.size).to eq(1) }
232+
end
233+
end
235234
end
236235

237236
context "given 'every' expression" do
@@ -244,20 +243,12 @@
244243
}
245244
}
246245
it "should run at the schedule" do
247-
instance = klass.new(opts)
248-
instance.register
249-
queue = Queue.new
250-
runner = Thread.new do
251-
instance.run(queue)
246+
run_plugin_and_yield_queue(plugin, sleep: 5) do |queue|
247+
#T 0123456
248+
#events x x x x
249+
#expects 3 events at T=5
250+
try(5) { expect(queue.size).to be_between(2, 3) }
252251
end
253-
#T 0123456
254-
#events x x x x
255-
#expects 3 events at T=5
256-
sleep 5
257-
instance.stop
258-
runner.kill
259-
runner.join
260-
expect(queue.size).to be_between(2, 3)
261252
end
262253
end
263254

@@ -271,21 +262,28 @@
271262
}
272263
}
273264
it "should run at the schedule" do
274-
instance = klass.new(opts)
275-
instance.register
276-
queue = Queue.new
277-
runner = Thread.new do
278-
instance.run(queue)
265+
run_plugin_and_yield_queue(plugin, sleep: 2.5) do |queue|
266+
try(5) { expect(queue.size).to eq(1) }
279267
end
280-
sleep 3
281-
instance.stop
282-
runner.kill
283-
runner.join
284-
expect(queue.size).to eq(1)
285268
end
286269
end
287270
end
288271

272+
def run_plugin_and_yield_queue(plugin, sleep: nil)
273+
plugin.register
274+
queue = Queue.new
275+
begin
276+
runner = Thread.new do
277+
plugin.run(queue)
278+
end
279+
sleep(sleep) if sleep
280+
yield(queue)
281+
ensure
282+
plugin.stop
283+
runner.join if runner
284+
end
285+
end
286+
289287
describe "events", :ecs_compatibility_support, :aggregate_failures do
290288
ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|
291289
before do
@@ -339,6 +337,8 @@
339337
event # materialize the subject
340338
end
341339

340+
after { poller.stop }
341+
342342
it "should enqueue a message" do
343343
expect(event).to be_a(LogStash::Event)
344344
end
@@ -399,9 +399,6 @@
399399
let(:payload) { {"a" => 2, "hello" => ["a", "b", "c"]} }
400400
let(:response_body) { LogStash::Json.dump(payload) }
401401
let(:opts) { default_opts }
402-
let(:instance) {
403-
klass.new(opts)
404-
}
405402
let(:name) { default_name }
406403
let(:url) { default_url }
407404
let(:code) { 202 }
@@ -411,30 +408,30 @@
411408
}
412409

413410
before do
414-
instance.register
411+
plugin.register
415412
u = url.is_a?(Hash) ? url["url"] : url # handle both complex specs and simple string URLs
416-
instance.client.stub(u,
413+
plugin.client.stub(u,
417414
:body => response_body,
418415
:code => code
419416
)
420-
allow(instance).to receive(:decorate)
421-
instance.send(:run_once, queue)
417+
allow(plugin).to receive(:decorate)
418+
plugin.send(:run_once, queue)
422419
end
423420

424421
it "should have a matching message" do
425422
expect(event.to_hash).to include(payload)
426423
end
427424

428425
it "should decorate the event" do
429-
expect(instance).to have_received(:decorate).once
426+
expect(plugin).to have_received(:decorate).once
430427
end
431428

432429
include_examples("matching metadata")
433430

434431
context "with an empty body" do
435432
let(:response_body) { "" }
436433
it "should return an empty event" do
437-
instance.send(:run_once, queue)
434+
plugin.send(:run_once, queue)
438435
headers_field = ecs_select[disabled: "[#{metadata_target}][response_headers]",
439436
v1: "[#{metadata_target}][input][http_poller][response][headers]"]
440437
expect(event.get("#{headers_field}[content-length]")).to eql("0")
@@ -449,7 +446,7 @@
449446
}
450447

451448
it "should not have any metadata on the event" do
452-
instance.send(:run_once, queue)
449+
plugin.send(:run_once, queue)
453450
expect(event.get(metadata_target)).to be_nil
454451
end
455452
end
@@ -555,6 +552,8 @@
555552

556553
describe "stopping" do
557554
let(:config) { default_opts }
558-
it_behaves_like "an interruptible input plugin"
555+
it_behaves_like "an interruptible input plugin" do
556+
let(:allowed_lag) { 10 } # CI: wait till scheduler shuts down
557+
end
559558
end
560559
end

0 commit comments

Comments
 (0)