Skip to content

Commit bedbba9

Browse files
committed
Add HTTP Auth and SSL to the ES output plugin
The typical use case is the placement of a transparent proxy in front of ES. This PR enables basic Auth and HTTPS to be configured independently. This patch makes logstash use only elasticsearch-ruby gem when writing to elasticsearch. Also leverages the manticore http transport that ships with es-ruby for the basic http auth and ssl features. It's possible to configure the CA certificate using either a .pem/.cer file ("cacert" option), or a .jks truststore ("truststore" option).
1 parent 1f4449b commit bedbba9

File tree

4 files changed

+214
-75
lines changed

4 files changed

+214
-75
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require "logstash/json"
66
require "stud/buffer"
77
require "socket" # for Socket.gethostname
8+
require "uri" # for escaping user input
89
require 'logstash-output-elasticsearch_jars.rb'
910

1011
# This output lets you store logs in Elasticsearch and is the most recommended
@@ -185,18 +186,28 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
185186
config :plugins, :validate => :array
186187

187188

189+
# Username and password (HTTP only)
190+
config :user, :validate => :string
191+
config :password, :validate => :password
192+
193+
# SSL Configurations (HTTP only)
194+
#
195+
# Enable SSL
196+
config :ssl, :validate => :boolean, :default => false
197+
198+
# The .cer or .pem file to validate the server's certificate
199+
config :cacert, :validate => :path
200+
201+
# The JKS truststore to validate the server's certificate
202+
# Use either :truststore or :cacert
203+
config :truststore, :validate => :path
204+
205+
# Set the truststore password
206+
config :truststore_password, :validate => :password
207+
188208
public
189209
def register
190210
client_settings = {}
191-
client_settings["cluster.name"] = @cluster if @cluster
192-
client_settings["network.host"] = @bind_host if @bind_host
193-
client_settings["transport.tcp.port"] = @bind_port if @bind_port
194-
195-
if @node_name
196-
client_settings["node.name"] = @node_name
197-
else
198-
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
199-
end
200211

201212
if @protocol.nil?
202213
@protocol = LogStash::Environment.jruby? ? "node" : "http"
@@ -217,6 +228,15 @@ def register
217228

218229
# setup log4j properties for Elasticsearch
219230
# LogStash::Logger.setup_log4j(@logger)
231+
client_settings["cluster.name"] = @cluster if @cluster
232+
client_settings["network.host"] = @bind_host if @bind_host
233+
client_settings["transport.tcp.port"] = @bind_port if @bind_port
234+
235+
if @node_name
236+
client_settings["node.name"] = @node_name
237+
else
238+
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
239+
end
220240
end
221241

222242
require "logstash/outputs/elasticsearch/protocol"
@@ -233,12 +253,45 @@ def register
233253
@host = ["localhost"]
234254
end
235255

256+
if @ssl
257+
if @protocol == "http"
258+
@protocol = "https"
259+
if @cacert && @truststore
260+
raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore
261+
end
262+
ssl_options = {}
263+
if @cacert then
264+
@truststore, ssl_options[:truststore_password] = generate_jks @cacert
265+
elsif @truststore
266+
ssl_options[:truststore_password] = @truststore_password.value if @truststore_password
267+
end
268+
ssl_options[:truststore] = @truststore
269+
client_settings[:ssl] = ssl_options
270+
else
271+
raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.")
272+
end
273+
end
274+
275+
common_options = {
276+
:protocol => @protocol,
277+
:client_settings => client_settings
278+
}
279+
280+
if @user && @password
281+
if @protocol =~ /http/
282+
common_options[:user] = ::URI.escape(@user, "@:")
283+
common_options[:password] = ::URI.escape(@password.value, "@:")
284+
else
285+
raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.")
286+
end
287+
end
288+
236289
client_class = case @protocol
237290
when "transport"
238291
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
239292
when "node"
240293
LogStash::Outputs::Elasticsearch::Protocols::NodeClient
241-
when "http"
294+
when /http/
242295
LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
243296
end
244297

@@ -261,17 +314,15 @@ def register
261314
options = {
262315
:host => @host,
263316
:port => @port,
264-
:client_settings => client_settings
265-
}
317+
}.merge(common_options)
266318
@client << client_class.new(options)
267319
else # if @protocol in ["transport","http"]
268320
@host.each do |host|
269321
(_host,_port) = host.split ":"
270322
options = {
271323
:host => _host,
272324
:port => _port || @port,
273-
:client_settings => client_settings
274-
}
325+
}.merge(common_options)
275326
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
276327
@client << client_class.new(options)
277328
end # @host.each
@@ -338,6 +389,29 @@ def start_local_elasticsearch
338389
@embedded_elasticsearch.start
339390
end # def start_local_elasticsearch
340391

392+
private
393+
def generate_jks cert_path
394+
395+
require 'securerandom'
396+
require 'tempfile'
397+
require 'java'
398+
import java.io.FileInputStream
399+
import java.io.FileOutputStream
400+
import java.security.KeyStore
401+
import java.security.cert.CertificateFactory
402+
403+
jks = java.io.File.createTempFile("cert", ".jks")
404+
405+
ks = KeyStore.getInstance "JKS"
406+
ks.load nil, nil
407+
cf = CertificateFactory.getInstance "X.509"
408+
cert = cf.generateCertificate FileInputStream.new(cert_path)
409+
ks.setCertificateEntry "cacert", cert
410+
pwd = SecureRandom.urlsafe_base64(9)
411+
ks.store FileOutputStream.new(jks), pwd.to_java.toCharArray
412+
[jks.path, pwd]
413+
end
414+
341415
public
342416
def receive(event)
343417
return unless output?(event)
@@ -376,6 +450,9 @@ def flush(actions, teardown=false)
376450
end # def flush
377451

378452
def teardown
453+
if @cacert # remove temporary jks store created from the cacert
454+
File.delete(@truststore)
455+
end
379456
buffer_flush(:final => true)
380457
end
381458

lib/logstash/outputs/elasticsearch/protocol.rb

Lines changed: 18 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -51,82 +51,41 @@ class HTTPClient < Base
5151
}
5252

5353
def initialize(options={})
54-
require "ftw"
5554
super
5655
require "elasticsearch" # gem 'elasticsearch-ruby'
56+
# manticore http transport
57+
require "elasticsearch/transport/transport/http/manticore"
5758
@options = DEFAULT_OPTIONS.merge(options)
5859
@client = client
5960
end
6061

6162
def build_client(options)
62-
client = Elasticsearch::Client.new(
63-
:host => [options[:host], options[:port]].join(":")
64-
)
65-
66-
# Use FTW to do indexing requests, for now, until we
67-
# can identify and resolve performance problems of elasticsearch-ruby
68-
@bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk"
69-
@agent = FTW::Agent.new
70-
71-
return client
72-
end
73-
74-
if ENV["BULK"] == "esruby"
75-
def bulk(actions)
76-
bulk_esruby(actions)
77-
end
78-
else
79-
def bulk(actions)
80-
bulk_ftw(actions)
63+
uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}"
64+
65+
client_options = {
66+
:host => [uri],
67+
:transport_options => options[:client_settings]
68+
}
69+
client_options[:transport_class] = ::Elasticsearch::Transport::Transport::HTTP::Manticore
70+
client_options[:ssl] = client_options[:transport_options].delete(:ssl)
71+
72+
if options[:user] && options[:password] then
73+
token = Base64.strict_encode64(options[:user] + ":" + options[:password])
74+
client_options[:headers] = { "Authorization" => "Basic #{token}" }
8175
end
76+
77+
Elasticsearch::Client.new client_options
8278
end
8379

84-
def bulk_esruby(actions)
80+
def bulk(actions)
8581
@client.bulk(:body => actions.collect do |action, args, source|
8682
if source
8783
next [ { action => args }, source ]
8884
else
8985
next { action => args }
9086
end
9187
end.flatten)
92-
end # def bulk_esruby
93-
94-
# Avoid creating a new string for newline every time
95-
NEWLINE = "\n".freeze
96-
def bulk_ftw(actions)
97-
body = actions.collect do |action, args, source|
98-
header = { action => args }
99-
if source
100-
next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ]
101-
else
102-
next [ LogStash::Json.dump(header), NEWLINE ]
103-
end
104-
end.flatten.join("")
105-
begin
106-
response = @agent.post!(@bulk_url, :body => body)
107-
rescue EOFError
108-
@logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
109-
raise
110-
end
111-
112-
# Consume the body for error checking
113-
# This will also free up the connection for reuse.
114-
response_body = ""
115-
begin
116-
response.read_body { |chunk| response_body += chunk }
117-
rescue EOFError
118-
@logger.warn("EOF while reading response body from elasticsearch",
119-
:url => @bulk_url)
120-
raise
121-
end
122-
123-
if response.status != 200
124-
@logger.error("Error writing (bulk) to elasticsearch",
125-
:response => response, :response_body => response_body,
126-
:request_body => body)
127-
raise "Non-OK response code from Elasticsearch: #{response.status}"
128-
end
129-
end # def bulk_ftw
88+
end # def bulk
13089

13190
def template_exists?(name)
13291
@client.indices.get_template(:name => name)
@@ -292,4 +251,3 @@ class Index; end
292251
class Delete; end
293252
end
294253
end
295-

logstash-output-elasticsearch.gemspec

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ Gem::Specification.new do |s|
2323
s.requirements << "jar 'org.elasticsearch:elasticsearch', '1.3.1'"
2424

2525
# Gem dependencies
26-
s.add_runtime_dependency 'elasticsearch'
26+
s.add_runtime_dependency 'elasticsearch', ['~> 1.0.6']
2727
s.add_runtime_dependency 'stud'
2828
s.add_runtime_dependency 'cabin', ['>=0.6.0']
2929
s.add_runtime_dependency 'ftw', ['~> 0.0.39']
3030
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'
3131
s.add_runtime_dependency 'jar-dependencies', ['~> 0.0.7']
3232

33+
if RUBY_PLATFORM == 'java'
34+
gem.add_runtime_dependency "manticore"
35+
end
3336
end
3437

spec/outputs/elasticsearch.rb

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,4 +386,105 @@
386386
end
387387
end
388388
end
389+
390+
describe "Authentication option" do
391+
["node", "transport"].each do |protocol|
392+
context "with protocol => #{protocol}" do
393+
subject do
394+
require "logstash/outputs/elasticsearch"
395+
settings = {
396+
"protocol" => protocol,
397+
"node_name" => "logstash",
398+
"cluster" => "elasticsearch",
399+
"host" => "node01",
400+
"user" => "test",
401+
"password" => "test"
402+
}
403+
next LogStash::Outputs::ElasticSearch.new(settings)
404+
end
405+
406+
it "should fail in register" do
407+
expect {subject.register}.to raise_error
408+
end
409+
end
410+
end
411+
end
412+
413+
describe "SSL option" do
414+
["node", "transport"].each do |protocol|
415+
context "with protocol => #{protocol}" do
416+
subject do
417+
require "logstash/outputs/elasticsearch"
418+
settings = {
419+
"protocol" => protocol,
420+
"node_name" => "logstash",
421+
"cluster" => "elasticsearch",
422+
"host" => "node01",
423+
"ssl" => true
424+
}
425+
next LogStash::Outputs::ElasticSearch.new(settings)
426+
end
427+
428+
it "should fail in register" do
429+
expect {subject.register}.to raise_error
430+
end
431+
end
432+
end
433+
end
434+
435+
describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do
436+
subject do
437+
require "logstash/outputs/elasticsearch"
438+
settings = {
439+
"protocol" => "http",
440+
"node_name" => "logstash",
441+
"cluster" => "elasticsearch",
442+
"host" => "node01",
443+
"user" => "user",
444+
"password" => "changeme",
445+
"ssl" => true,
446+
"cacert" => "/tmp/ca/certs/cacert.pem",
447+
# or
448+
#"truststore" => "/tmp/ca/truststore.jks",
449+
#"truststore_password" => "testeteste"
450+
}
451+
next LogStash::Outputs::ElasticSearch.new(settings)
452+
end
453+
454+
before :each do
455+
subject.register
456+
end
457+
458+
it "sends events to ES" do
459+
expect {
460+
subject.receive(LogStash::Event.new("message" => "sample message here"))
461+
subject.buffer_flush(:final => true)
462+
}.to_not raise_error
463+
end
464+
end
465+
466+
describe "connect using HTTP Authentication", :elasticsearch_secure => true do
467+
subject do
468+
require "logstash/outputs/elasticsearch"
469+
settings = {
470+
"protocol" => "http",
471+
"cluster" => "elasticsearch",
472+
"host" => "node01",
473+
"user" => "user",
474+
"password" => "changeme",
475+
}
476+
next LogStash::Outputs::ElasticSearch.new(settings)
477+
end
478+
479+
before :each do
480+
subject.register
481+
end
482+
483+
it "sends events to ES" do
484+
expect {
485+
subject.receive(LogStash::Event.new("message" => "sample message here"))
486+
subject.buffer_flush(:final => true)
487+
}.to_not raise_error
488+
end
489+
end
389490
end

0 commit comments

Comments
 (0)