Skip to content

Introduce statements_directory parameter #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@
#
# ==== Configuring multiple SQL statements
#
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
# from different database tables or views. It is possible to define separate Logstash
# configuration files for each statement or to define multiple statements in a single configuration
# file. When using multiple statements in a single Logstash configuration file, each statement
# has to be defined as a separate jdbc input (including jdbc driver, connection string and other
# required parameters).
# Configuring multiple SQL statements is useful when there is a need to query and ingest data
# from different database tables or views. It is possible to define separate Logstash
# configuration files for each statement, to define multiple statements in a single configuration
# file or to use the `statements_directory` parameter where all statements within the configured
# directory get executed. When using multiple statements in a single Logstash configuration file,
# each statement has to be defined as a separate jdbc input (including jdbc driver, connection
# string and other required parameters).
#
# Please note that if any of the statements use the `sql_last_value` parameter (e.g. for
# ingesting only data changed since last run), each input should define its own
Expand Down Expand Up @@ -147,6 +148,9 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
# Path of file containing statement to execute
config :statement_filepath, :validate => :path

# Directory containing statement files to execute
config :statements_directory, :validate => :path

# Hash of query parameter, for example `{ "target_id" => "321" }`
config :parameters, :validate => :hash, :default => {}

Expand Down Expand Up @@ -206,11 +210,15 @@ def register
require "rufus/scheduler"
prepare_jdbc_connection

# Raise an error if @use_column_value is true, but no @tracking_column is set
if @use_column_value
# Raise an error if @use_column_value is true, but no @tracking_column is set
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
# Raise an error if @use_column_value is true, and @statements_directory is set
if @statements_directory
raise(LogStash::ConfigurationError, ":statements_directory must not be set if :use_column_value is true.")
end
end

@enable_encoding = [email protected]? || !@columns_charset.empty?
Expand All @@ -222,12 +230,24 @@ def register
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
end

unless @statement.nil? ^ @statement_filepath.nil?
if @statement && @statement_filepath
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end

@statement = File.read(@statement_filepath) if @statement_filepath

unless @statement.nil? ^ @statements_directory.nil?
raise(LogStash::ConfigurationError, "Must set either :statement, :statement_filepath or :statements_directory. Only one may be set at a time")
end

@statements = []
if @statements_directory
Dir.foreach(@statements_directory) do |file|
next if File.directory? file
@statements.push(File.read(@statements_directory + '/' + file))
end
end

if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
end
Expand All @@ -247,13 +267,25 @@ def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
execute_query(queue)
if @statements.any?
for statement in @statements
execute_query(queue, statement)
end
else
execute_query(queue, @statement)
end
update_state_file
end

@scheduler.join
else
execute_query(queue)
if @statements.any?
for statement in @statements
execute_query(queue, statement)
end
else
execute_query(queue, @statement)
end
update_state_file
end
end # def run
Expand All @@ -266,10 +298,10 @@ def stop

private

def execute_query(queue)
def execute_query(queue, statement)
# update default parameters
@parameters['sql_last_value'] = @sql_last_value
execute_statement(@statement, @parameters) do |row|
execute_statement(statement, @parameters) do |row|
if enable_encoding?
## do the necessary conversions to string elements
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
Expand Down
48 changes: 47 additions & 1 deletion spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
end


context "when neither statement and statement_filepath arguments are passed" do
context "when neither statement, statement_filepath or statements_directory arguments are passed" do
it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
Expand All @@ -163,6 +163,26 @@
end
end

context "when both statement and statements_directory arguments are passed" do
let(:statement) { "SELECT * from test_table" }
let(:statements_dir) { Stud::Temporary.pathname }
let(:settings) { { "statements_directory" => statements_dir, "statement" => statement } }

it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end

context "when both statement_filepath and statements_directory arguments are passed" do
let(:statement_file_path) { Stud::Temporary.pathname }
let(:statements_dir) { Stud::Temporary.pathname }
let(:settings) { { "statement_filepath" => statement_file_path, "statements_directory" => statements_dir } }

it "should fail to register" do
expect{ plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end

context "when statement is passed in from a file" do
let(:statement) { "SELECT * from test_table" }
let(:statement_file_path) { Stud::Temporary.pathname }
Expand All @@ -182,6 +202,32 @@
end
end

context "when statements_directory is passed" do
let(:statements) { ["SELECT * from test_table where num = 1", "SELECT * from test_table where num = 2"] }
let(:statements_dir) { Stud::Temporary.directory }
let(:settings) { { "statements_directory" => statements_dir } }

before do
File.write(statements_dir + '/statement1', statements[0])
File.write(statements_dir + '/statement2', statements[1])
plugin.register
end

after do
plugin.stop
end

it "should execute statements from files" do
db[:test_table].insert(:num => 1)
db[:test_table].insert(:num => 2)

plugin.run(queue)
expect(queue.size).to eq(2)
expect(queue.pop.get('num')).to eq(1)
expect(queue.pop.get('num')).to eq(2)
end
end

context "when passing parameters" do
let(:settings) do
{
Expand Down