Skip to content

Commit 6d594e4

Browse files
author
bhois
committed
Introduce statements_directory parameter
1 parent 2c2767e commit 6d594e4

File tree

2 files changed

+91
-13
lines changed

2 files changed

+91
-13
lines changed

lib/logstash/inputs/jdbc.rb

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@
8686
#
8787
# ==== Configuring multiple SQL statements
8888
#
89-
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
90-
# from different database tables or views. It is possible to define separate Logstash
91-
# configuration files for each statement or to define multiple statements in a single configuration
92-
# file. When using multiple statements in a single Logstash configuration file, each statement
93-
# has to be defined as a separate jdbc input (including jdbc driver, connection string and other
94-
# required parameters).
89+
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
90+
# from different database tables or views. It is possible to define separate Logstash
91+
# configuration files for each statement, to define multiple statements in a single configuration
92+
# file or to use the `statements_directory` parameter where all statements within the configured
93+
# directory get executed. When using multiple statements in a single Logstash configuration file,
94+
# each statement has to be defined as a separate jdbc input (including jdbc driver, connection
95+
# string and other required parameters).
9596
#
9697
# Please note that if any of the statements use the `sql_last_value` parameter (e.g. for
9798
# ingesting only data changed since last run), each input should define its own
@@ -147,6 +148,9 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
147148
# Path of file containing statement to execute
148149
config :statement_filepath, :validate => :path
149150

151+
# Directory containing statement files to execute
152+
config :statements_directory, :validate => :path
153+
150154
# Hash of query parameter, for example `{ "target_id" => "321" }`
151155
config :parameters, :validate => :hash, :default => {}
152156

@@ -205,11 +209,15 @@ def register
205209
require "rufus/scheduler"
206210
prepare_jdbc_connection
207211

208-
# Raise an error if @use_column_value is true, but no @tracking_column is set
209212
if @use_column_value
213+
# Raise an error if @use_column_value is true, but no @tracking_column is set
210214
if @tracking_column.nil?
211215
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
212216
end
217+
# Raise an error if @use_column_value is true, and @statements_directory is set
218+
if @statements_directory
219+
raise(LogStash::ConfigurationError, ":statements_directory must not be set if :use_column_value is true.")
220+
end
213221
end
214222

215223
@enable_encoding = !@charset.nil? || !@columns_charset.empty?
@@ -221,12 +229,24 @@ def register
221229
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
222230
end
223231

224-
unless @statement.nil? ^ @statement_filepath.nil?
232+
if @statement && @statement_filepath
225233
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
226234
end
227235

228236
@statement = File.read(@statement_filepath) if @statement_filepath
229237

238+
unless @statement.nil? ^ @statements_directory.nil?
239+
raise(LogStash::ConfigurationError, "Must set either :statement, :statement_filepath or :statements_directory. Only one may be set at a time")
240+
end
241+
242+
@statements = []
243+
if @statements_directory
244+
Dir.foreach(@statements_directory) do |file|
245+
next if File.directory? file
246+
@statements.push(File.read(@statements_directory + '/' + file))
247+
end
248+
end
249+
230250
if (@jdbc_password_filepath and @jdbc_password)
231251
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
232252
end
@@ -246,13 +266,25 @@ def run(queue)
246266
if @schedule
247267
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
248268
@scheduler.cron @schedule do
249-
execute_query(queue)
269+
if @statements.any?
270+
for statement in @statements
271+
execute_query(queue, statement)
272+
end
273+
else
274+
execute_query(queue, @statement)
275+
end
250276
update_state_file
251277
end
252278

253279
@scheduler.join
254280
else
255-
execute_query(queue)
281+
if @statements.any?
282+
for statement in @statements
283+
execute_query(queue, statement)
284+
end
285+
else
286+
execute_query(queue, @statement)
287+
end
256288
update_state_file
257289
end
258290
end # def run
@@ -265,10 +297,10 @@ def stop
265297

266298
private
267299

268-
def execute_query(queue)
300+
def execute_query(queue, statement)
269301
# update default parameters
270302
@parameters['sql_last_value'] = @sql_last_value
271-
execute_statement(@statement, @parameters) do |row|
303+
execute_statement(statement, @parameters) do |row|
272304
if enable_encoding?
273305
## do the necessary conversions to string elements
274306
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]

spec/inputs/jdbc_spec.rb

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@
120120
end
121121

122122

123-
context "when neither statement and statement_filepath arguments are passed" do
123+
context "when neither statement, statement_filepath or statements_directory arguments are passed" do
124124
it "should fail to register" do
125125
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
126126
end
@@ -136,6 +136,26 @@
136136
end
137137
end
138138

139+
context "when both statement and statements_directory arguments are passed" do
140+
let(:statement) { "SELECT * from test_table" }
141+
let(:statements_dir) { Stud::Temporary.pathname }
142+
let(:settings) { { "statements_directory" => statements_dir, "statement" => statement } }
143+
144+
it "should fail to register" do
145+
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
146+
end
147+
end
148+
149+
context "when both statement_filepath and statements_directory arguments are passed" do
150+
let(:statement_file_path) { Stud::Temporary.pathname }
151+
let(:statements_dir) { Stud::Temporary.pathname }
152+
let(:settings) { { "statement_filepath" => statement_file_path, "statements_directory" => statements_dir } }
153+
154+
it "should fail to register" do
155+
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
156+
end
157+
end
158+
139159
context "when statement is passed in from a file" do
140160
let(:statement) { "SELECT * from test_table" }
141161
let(:statement_file_path) { Stud::Temporary.pathname }
@@ -155,6 +175,32 @@
155175
end
156176
end
157177

178+
context "when statements_directory is passed" do
179+
let(:statements) { ["SELECT * from test_table where num = 1", "SELECT * from test_table where num = 2"] }
180+
let(:statements_dir) { Stud::Temporary.directory }
181+
let(:settings) { { "statements_directory" => statements_dir } }
182+
183+
before do
184+
File.write(statements_dir + '/statement1', statements[0])
185+
File.write(statements_dir + '/statement2', statements[1])
186+
plugin.register
187+
end
188+
189+
after do
190+
plugin.stop
191+
end
192+
193+
it "should execute statements from files" do
194+
db[:test_table].insert(:num => 1)
195+
db[:test_table].insert(:num => 2)
196+
197+
plugin.run(queue)
198+
expect(queue.size).to eq(2)
199+
expect(queue.pop.get('num')).to eq(1)
200+
expect(queue.pop.get('num')).to eq(2)
201+
end
202+
end
203+
158204
context "when passing parameters" do
159205
let(:settings) do
160206
{

0 commit comments

Comments
 (0)