diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index d8df7a201d..c0c37bb6c8 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -271,9 +271,15 @@ def configure(conf)
end
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
+
+ if @compress == :zstd
+ log.warn "zstd compression feature is an experimental new feature supported since v1.19.0." +
+ " Please make sure that the destination server also supports this feature before using it." +
+ " in_forward plugin for Fluentd supports it since v1.19.0."
+ end
+
@healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true)
@registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true)
-
end
def multi_workers_ready?
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index 45d665a10e..8784c881f9 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -178,6 +178,60 @@ def try_write(chunk)
assert{ logs.any?{|log| log.include?(expected_log) && log.include?(expected_detail) } }
end
+ sub_test_case 'configure compress' do
+ data('default', ['', :text])
+ data('gzip', ['compress gzip', :gzip])
+ data('zstd', ['compress zstd', :zstd])
+ test 'should be applied' do |(option, expected)|
+ @d = d = create_driver(config + option)
+ node = d.instance.nodes.first
+
+ assert_equal(
+ [expected, expected],
+ [d.instance.compress, node.instance_variable_get(:@compress)]
+ )
+ end
+
+ data('default' => '')
+ data('gzip' => 'compress gzip')
+ data('zstd' => 'compress zstd')
+ test 'should log as experimental only for zstd' do |option|
+ @d = d = create_driver(config + option)
+
+ log_message = "zstd compression feature is an experimental new feature"
+ assert do
+ if d.instance.compress == :zstd
+ d.logs.any? { |log| log.include?(log_message) }
+ else
+ d.logs.none? { |log| log.include?(log_message) }
+ end
+ end
+ end
+
+ # TODO add tests that we cannot configure the different compress type between owner and buffer except for :text
+ data('gzip', ['compress gzip', :text, :gzip])
+ data('zstd', ['compress zstd', :text, :zstd])
+ test 'can configure buffer compress separately when owner uses :text' do |(buffer_option, expected_owner_compress, expected_buffer_compress)|
+ @d = d = create_driver(config + %[
+
+ type memory
+ #{buffer_option}
+
+ ])
+ node = d.instance.nodes.first
+
+ assert_equal(
+ [expected_owner_compress, expected_owner_compress, expected_buffer_compress],
+ [d.instance.compress, node.instance_variable_get(:@compress), d.instance.buffer.compress],
+ )
+
+ log_message = "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in "
+ assert do
+ d.logs.any? { |log| log.include?(log_message) }
+ end
+ end
+ end
+
data('CA cert' => 'tls_ca_cert_path',
'non CA cert' => 'tls_cert_path')
test 'configure tls_cert_path/tls_ca_cert_path' do |param|
@@ -326,66 +380,6 @@ def try_write(chunk)
assert_equal 1234, d.instance.discovery_manager.services[0].port
end
- test 'compress_default_value' do
- @d = d = create_driver
- assert_equal :text, d.instance.compress
-
- node = d.instance.nodes.first
- assert_equal :text, node.instance_variable_get(:@compress)
- end
-
- test 'set_compress_is_gzip' do
- @d = d = create_driver(config + %[compress gzip])
- assert_equal :gzip, d.instance.compress
- assert_equal :gzip, d.instance.buffer.compress
-
- node = d.instance.nodes.first
- assert_equal :gzip, node.instance_variable_get(:@compress)
- end
-
- test 'set_compress_is_zstd' do
- @d = d = create_driver(config + %[compress zstd])
- assert_equal :zstd, d.instance.compress
- assert_equal :zstd, d.instance.buffer.compress
-
- node = d.instance.nodes.first
- assert_equal :zstd, node.instance_variable_get(:@compress)
- end
-
- test 'set_compress_is_gzip_in_buffer_section' do
- mock = flexmock($log)
- mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ")
-
- @d = d = create_driver(config + %[
-
- type memory
- compress gzip
-
- ])
- assert_equal :text, d.instance.compress
- assert_equal :gzip, d.instance.buffer.compress
-
- node = d.instance.nodes.first
- assert_equal :text, node.instance_variable_get(:@compress)
- end
-
- test 'set_compress_is_zstd_in_buffer_section' do
- mock = flexmock($log)
- mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ")
-
- @d = d = create_driver(config + %[
-
- type memory
- compress zstd
-
- ])
- assert_equal :text, d.instance.compress
- assert_equal :zstd, d.instance.buffer.compress
-
- node = d.instance.nodes.first
- assert_equal :text, node.instance_variable_get(:@compress)
- end
-
test 'phi_failure_detector disabled' do
@d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0])
node = d.instance.nodes.first