diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 20d0edc..d19d1fa 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -86,12 +86,13 @@ # # ==== Configuring multiple SQL statements # -# Configuring multiple SQL statements is useful when there is a need to query and ingest data -# from different database tables or views. It is possible to define separate Logstash -# configuration files for each statement or to define multiple statements in a single configuration -# file. When using multiple statements in a single Logstash configuration file, each statement -# has to be defined as a separate jdbc input (including jdbc driver, connection string and other -# required parameters). +# Configuring multiple SQL statements is useful when there is a need to query and ingest data +# from different database tables or views. It is possible to define separate Logstash +# configuration files for each statement, to define multiple statements in a single configuration +# file or to use the `statements_directory` parameter where all statements within the configured +# directory get executed. When using multiple statements in a single Logstash configuration file, +# each statement has to be defined as a separate jdbc input (including jdbc driver, connection +# string and other required parameters). # # Please note that if any of the statements use the `sql_last_value` parameter (e.g. for # ingesting only data changed since last run), each input should define its own @@ -147,6 +148,9 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base # Path of file containing statement to execute config :statement_filepath, :validate => :path + # Directory containing statement files to execute + config :statements_directory, :validate => :path + # Hash of query parameter, for example `{ "target_id" => "321" }` config :parameters, :validate => :hash, :default => {} @@ -206,11 +210,15 @@ def register require "rufus/scheduler" prepare_jdbc_connection - # Raise an error if @use_column_value is true, but no @tracking_column is set if @use_column_value + # Raise an error if @use_column_value is true, but no @tracking_column is set if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end + # Raise an error if @use_column_value is true, and @statements_directory is set + if @statements_directory + raise(LogStash::ConfigurationError, ":statements_directory must not be set if :use_column_value is true.") + end end @enable_encoding = !@charset.nil? || !@columns_charset.empty? @@ -222,12 +230,24 @@ def register @sql_last_value = YAML.load(File.read(@last_run_metadata_path)) end - unless @statement.nil? ^ @statement_filepath.nil? + if @statement && @statement_filepath raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end @statement = File.read(@statement_filepath) if @statement_filepath + unless @statement.nil? ^ @statements_directory.nil? + raise(LogStash::ConfigurationError, "Must set either :statement, :statement_filepath or :statements_directory. Only one may be set at a time") + end + + @statements = [] + if @statements_directory + Dir.foreach(@statements_directory) do |file| + next if File.directory? file + @statements.push(File.read(@statements_directory + '/' + file)) + end + end + if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end @@ -247,13 +267,25 @@ def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do - execute_query(queue) + if @statements.any? + for statement in @statements + execute_query(queue, statement) + end + else + execute_query(queue, @statement) + end update_state_file end @scheduler.join else - execute_query(queue) + if @statements.any? + for statement in @statements + execute_query(queue, statement) + end + else + execute_query(queue, @statement) + end update_state_file end end # def run @@ -266,10 +298,10 @@ def stop private - def execute_query(queue) + def execute_query(queue, statement) # update default parameters @parameters['sql_last_value'] = @sql_last_value - execute_statement(@statement, @parameters) do |row| + execute_statement(statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index deca9a5..a3ef934 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -147,7 +147,7 @@ end - context "when neither statement and statement_filepath arguments are passed" do + context "when neither statement, statement_filepath or statements_directory arguments are passed" do it "should fail to register" do expect{ plugin.register }.to raise_error(LogStash::ConfigurationError) end @@ -163,6 +163,26 @@ end end + context "when both statement and statements_directory arguments are passed" do + let(:statement) { "SELECT * from test_table" } + let(:statements_dir) { Stud::Temporary.pathname } + let(:settings) { { "statements_directory" => statements_dir, "statement" => statement } } + + it "should fail to register" do + expect{ plugin.register }.to raise_error(LogStash::ConfigurationError) + end + end + + context "when both statement_filepath and statements_directory arguments are passed" do + let(:statement_file_path) { Stud::Temporary.pathname } + let(:statements_dir) { Stud::Temporary.pathname } + let(:settings) { { "statement_filepath" => statement_file_path, "statements_directory" => statements_dir } } + + it "should fail to register" do + expect{ plugin.register }.to raise_error(LogStash::ConfigurationError) + end + end + context "when statement is passed in from a file" do let(:statement) { "SELECT * from test_table" } let(:statement_file_path) { Stud::Temporary.pathname } @@ -182,6 +202,32 @@ end end + context "when statements_directory is passed" do + let(:statements) { ["SELECT * from test_table where num = 1", "SELECT * from test_table where num = 2"] } + let(:statements_dir) { Stud::Temporary.directory } + let(:settings) { { "statements_directory" => statements_dir } } + + before do + File.write(statements_dir + '/statement1', statements[0]) + File.write(statements_dir + '/statement2', statements[1]) + plugin.register + end + + after do + plugin.stop + end + + it "should execute statements from files" do + db[:test_table].insert(:num => 1) + db[:test_table].insert(:num => 2) + + plugin.run(queue) + expect(queue.size).to eq(2) + expect(queue.pop.get('num')).to eq(1) + expect(queue.pop.get('num')).to eq(2) + end + end + context "when passing parameters" do let(:settings) do {