Skip to content

Commit 53e2d73

Browse files
author
bhois
committed
Introduce statements_directory parameter
1 parent f397487 commit 53e2d73

File tree

2 files changed

+91
-13
lines changed

2 files changed

+91
-13
lines changed

lib/logstash/inputs/jdbc.rb

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@
8686
#
8787
# ==== Configuring multiple SQL statements
8888
#
89-
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
90-
# from different database tables or views. It is possible to define separate Logstash
91-
# configuration files for each statement or to define multiple statements in a single configuration
92-
# file. When using multiple statements in a single Logstash configuration file, each statement
93-
# has to be defined as a separate jdbc input (including jdbc driver, connection string and other
94-
# required parameters).
89+
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
90+
# from different database tables or views. It is possible to define separate Logstash
91+
# configuration files for each statement, to define multiple statements in a single configuration
92+
# file or to use the `statements_directory` parameter where all statements within the configured
93+
# directory get executed. When using multiple statements in a single Logstash configuration file,
94+
# each statement has to be defined as a separate jdbc input (including jdbc driver, connection
95+
# string and other required parameters).
9596
#
9697
# Please note that if any of the statements use the `sql_last_value` parameter (e.g. for
9798
# ingesting only data changed since last run), each input should define its own
@@ -147,6 +148,9 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
147148
# Path of file containing statement to execute
148149
config :statement_filepath, :validate => :path
149150

151+
# Directory containing statement files to execute
152+
config :statements_directory, :validate => :path
153+
150154
# Hash of query parameter, for example `{ "target_id" => "321" }`
151155
config :parameters, :validate => :hash, :default => {}
152156

@@ -206,11 +210,15 @@ def register
206210
require "rufus/scheduler"
207211
prepare_jdbc_connection
208212

209-
# Raise an error if @use_column_value is true, but no @tracking_column is set
210213
if @use_column_value
214+
# Raise an error if @use_column_value is true, but no @tracking_column is set
211215
if @tracking_column.nil?
212216
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
213217
end
218+
# Raise an error if @use_column_value is true, and @statements_directory is set
219+
if @statements_directory
220+
raise(LogStash::ConfigurationError, ":statements_directory must not be set if :use_column_value is true.")
221+
end
214222
end
215223

216224
@enable_encoding = !@charset.nil? || !@columns_charset.empty?
@@ -222,12 +230,24 @@ def register
222230
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
223231
end
224232

225-
unless @statement.nil? ^ @statement_filepath.nil?
233+
if @statement && @statement_filepath
226234
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
227235
end
228236

229237
@statement = File.read(@statement_filepath) if @statement_filepath
230238

239+
unless @statement.nil? ^ @statements_directory.nil?
240+
raise(LogStash::ConfigurationError, "Must set either :statement, :statement_filepath or :statements_directory. Only one may be set at a time")
241+
end
242+
243+
@statements = []
244+
if @statements_directory
245+
Dir.foreach(@statements_directory) do |file|
246+
next if File.directory? file
247+
@statements.push(File.read(@statements_directory + '/' + file))
248+
end
249+
end
250+
231251
if (@jdbc_password_filepath and @jdbc_password)
232252
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
233253
end
@@ -247,13 +267,25 @@ def run(queue)
247267
if @schedule
248268
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
249269
@scheduler.cron @schedule do
250-
execute_query(queue)
270+
if @statements.any?
271+
for statement in @statements
272+
execute_query(queue, statement)
273+
end
274+
else
275+
execute_query(queue, @statement)
276+
end
251277
update_state_file
252278
end
253279

254280
@scheduler.join
255281
else
256-
execute_query(queue)
282+
if @statements.any?
283+
for statement in @statements
284+
execute_query(queue, statement)
285+
end
286+
else
287+
execute_query(queue, @statement)
288+
end
257289
update_state_file
258290
end
259291
end # def run
@@ -266,10 +298,10 @@ def stop
266298

267299
private
268300

269-
def execute_query(queue)
301+
def execute_query(queue, statement)
270302
# update default parameters
271303
@parameters['sql_last_value'] = @sql_last_value
272-
execute_statement(@statement, @parameters) do |row|
304+
execute_statement(statement, @parameters) do |row|
273305
if enable_encoding?
274306
## do the necessary conversions to string elements
275307
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]

spec/inputs/jdbc_spec.rb

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
end
123123

124124

125-
context "when neither statement and statement_filepath arguments are passed" do
125+
context "when neither statement, statement_filepath or statements_directory arguments are passed" do
126126
it "should fail to register" do
127127
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
128128
end
@@ -138,6 +138,26 @@
138138
end
139139
end
140140

141+
context "when both statement and statements_directory arguments are passed" do
142+
let(:statement) { "SELECT * from test_table" }
143+
let(:statements_dir) { Stud::Temporary.pathname }
144+
let(:settings) { { "statements_directory" => statements_dir, "statement" => statement } }
145+
146+
it "should fail to register" do
147+
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
148+
end
149+
end
150+
151+
context "when both statement_filepath and statements_directory arguments are passed" do
152+
let(:statement_file_path) { Stud::Temporary.pathname }
153+
let(:statements_dir) { Stud::Temporary.pathname }
154+
let(:settings) { { "statement_filepath" => statement_file_path, "statements_directory" => statements_dir } }
155+
156+
it "should fail to register" do
157+
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
158+
end
159+
end
160+
141161
context "when statement is passed in from a file" do
142162
let(:statement) { "SELECT * from test_table" }
143163
let(:statement_file_path) { Stud::Temporary.pathname }
@@ -157,6 +177,32 @@
157177
end
158178
end
159179

180+
context "when statements_directory is passed" do
181+
let(:statements) { ["SELECT * from test_table where num = 1", "SELECT * from test_table where num = 2"] }
182+
let(:statements_dir) { Stud::Temporary.directory }
183+
let(:settings) { { "statements_directory" => statements_dir } }
184+
185+
before do
186+
File.write(statements_dir + '/statement1', statements[0])
187+
File.write(statements_dir + '/statement2', statements[1])
188+
plugin.register
189+
end
190+
191+
after do
192+
plugin.stop
193+
end
194+
195+
it "should execute statements from files" do
196+
db[:test_table].insert(:num => 1)
197+
db[:test_table].insert(:num => 2)
198+
199+
plugin.run(queue)
200+
expect(queue.size).to eq(2)
201+
expect(queue.pop.get('num')).to eq(1)
202+
expect(queue.pop.get('num')).to eq(2)
203+
end
204+
end
205+
160206
context "when passing parameters" do
161207
let(:settings) do
162208
{

0 commit comments

Comments
 (0)