Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,15 @@ def configure(conf)
end

raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1

if @compress == :zstd
log.warn "zstd compression feature is an experimental new feature supported since v1.19.0." +
" Please make sure that the destination server also supports this feature before using it." +
" in_forward plugin for Fluentd supports it since v1.19.0."
end

@healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true)
@registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true)

end

def multi_workers_ready?
Expand Down
114 changes: 54 additions & 60 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,60 @@ def try_write(chunk)
assert{ logs.any?{|log| log.include?(expected_log) && log.include?(expected_detail) } }
end

sub_test_case 'configure compress' do
data('default', ['', :text])
data('gzip', ['compress gzip', :gzip])
data('zstd', ['compress zstd', :zstd])
test 'should be applied' do |(option, expected)|
@d = d = create_driver(config + option)
node = d.instance.nodes.first

assert_equal(
[expected, expected],
[d.instance.compress, node.instance_variable_get(:@compress)]
)
end

data('default' => '')
data('gzip' => 'compress gzip')
data('zstd' => 'compress zstd')
test 'should log as experimental only for zstd' do |option|
@d = d = create_driver(config + option)

log_message = "zstd compression feature is an experimental new feature"
assert do
if d.instance.compress == :zstd
d.logs.any? { |log| log.include?(log_message) }
else
d.logs.none? { |log| log.include?(log_message) }
end
end
end

# TODO add tests that we cannot configure the different compress type between owner and buffer except for :text
data('gzip', ['compress gzip', :text, :gzip])
data('zstd', ['compress zstd', :text, :zstd])
test 'can configure buffer compress separately when owner uses :text' do |(buffer_option, expected_owner_compress, expected_buffer_compress)|
@d = d = create_driver(config + %[
<buffer>
type memory
#{buffer_option}
</buffer>
])
node = d.instance.nodes.first

assert_equal(
[expected_owner_compress, expected_owner_compress, expected_buffer_compress],
[d.instance.compress, node.instance_variable_get(:@compress), d.instance.buffer.compress],
)

log_message = "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
assert do
d.logs.any? { |log| log.include?(log_message) }
end
end
end

data('CA cert' => 'tls_ca_cert_path',
'non CA cert' => 'tls_cert_path')
test 'configure tls_cert_path/tls_ca_cert_path' do |param|
Expand Down Expand Up @@ -326,66 +380,6 @@ def try_write(chunk)
assert_equal 1234, d.instance.discovery_manager.services[0].port
end

test 'compress_default_value' do
@d = d = create_driver
assert_equal :text, d.instance.compress

node = d.instance.nodes.first
assert_equal :text, node.instance_variable_get(:@compress)
end

test 'set_compress_is_gzip' do
@d = d = create_driver(config + %[compress gzip])
assert_equal :gzip, d.instance.compress
assert_equal :gzip, d.instance.buffer.compress

node = d.instance.nodes.first
assert_equal :gzip, node.instance_variable_get(:@compress)
end

test 'set_compress_is_zstd' do
@d = d = create_driver(config + %[compress zstd])
assert_equal :zstd, d.instance.compress
assert_equal :zstd, d.instance.buffer.compress

node = d.instance.nodes.first
assert_equal :zstd, node.instance_variable_get(:@compress)
end

test 'set_compress_is_gzip_in_buffer_section' do
mock = flexmock($log)
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")

@d = d = create_driver(config + %[
<buffer>
type memory
compress gzip
</buffer>
])
assert_equal :text, d.instance.compress
assert_equal :gzip, d.instance.buffer.compress

node = d.instance.nodes.first
assert_equal :text, node.instance_variable_get(:@compress)
end

test 'set_compress_is_zstd_in_buffer_section' do
mock = flexmock($log)
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")

@d = d = create_driver(config + %[
<buffer>
type memory
compress zstd
</buffer>
])
assert_equal :text, d.instance.compress
assert_equal :zstd, d.instance.buffer.compress

node = d.instance.nodes.first
assert_equal :text, node.instance_variable_get(:@compress)
end

test 'phi_failure_detector disabled' do
@d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0])
node = d.instance.nodes.first
Expand Down