Skip to content

Commit 0886c60

Browse files
out_file+out_forward - zstd addition
Signed-off-by: Athish Pranav D <[email protected]>
1 parent a631c21 commit 0886c60

File tree

5 files changed

+64
-27
lines changed

5 files changed

+64
-27
lines changed

lib/fluent/plugin/buffer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
6464
config_param :queued_chunks_limit_size, :integer, default: nil
6565

6666
desc 'Compress buffered data.'
67-
config_param :compress, :enum, list: [:text, :gzip], default: :text
67+
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text
6868

6969
desc 'If true, chunks are thrown away when unrecoverable error happens'
7070
config_param :disable_chunk_backup, :bool, default: false

lib/fluent/plugin/out_file.rb

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ class FileOutput < Output
2929

3030
helpers :formatter, :inject, :compat_parameters
3131

32-
SUPPORTED_COMPRESS = [:text, :gz, :gzip]
32+
SUPPORTED_COMPRESS = [:text, :gz, :gzip, :zstd]
3333
SUPPORTED_COMPRESS_MAP = {
3434
text: nil,
3535
gz: :gzip,
3636
gzip: :gzip,
37+
zstd: :zstd,
3738
}
3839

3940
DEFAULT_TIMEKEY = 60 * 60 * 24
@@ -212,17 +213,27 @@ def write(chunk)
212213
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm
213214

214215
writer = case
215-
when @compress_method.nil?
216-
method(:write_without_compression)
217-
when @compress_method == :gzip
218-
if @buffer.compress != :gzip || @recompress
219-
method(:write_gzip_with_compression)
220-
else
221-
method(:write_gzip_from_gzipped_chunk)
222-
end
223-
else
224-
raise "BUG: unknown compression method #{@compress_method}"
225-
end
216+
when @compress_method.nil?
217+
method(:write_without_compression)
218+
when @compress_method == :gzip
219+
if @buffer.compress == :text || @recompress
220+
method(:write_with_compression).curry.call(@compress_method)
221+
elsif @buffer.compress == :gzip
222+
method(:write_from_compressed_chunk).curry.call(@compress_method)
223+
else
224+
raise Fluent::ConfigError, "You cannot specify different compression formats for Buffer (Buffer: #{@buffer.compress}, Self: #{@compress})"
225+
end
226+
when @compress_method == :zstd
227+
if @buffer.compress == :text || @recompress
228+
method(:write_with_compression).curry.call(@compress_method)
229+
elsif @buffer.compress == :zstd
230+
method(:write_from_compressed_chunk).curry.call(@compress_method)
231+
else
232+
raise Fluent::ConfigError, "You cannot specify different compression formats for Buffer (Buffer: #{@buffer.compress}, Self: #{@compress})"
233+
end
234+
else
235+
raise "BUG: unknown compression method #{@compress_method}"
236+
end
226237

227238
if @append
228239
if @need_lock
@@ -253,17 +264,22 @@ def write_without_compression(path, chunk)
253264
end
254265
end
255266

256-
def write_gzip_with_compression(path, chunk)
267+
def write_with_compression(type, path, chunk)
257268
File.open(path, "ab", @file_perm) do |f|
258-
gz = Zlib::GzipWriter.new(f)
269+
gz = nil
270+
if type == :gzip
271+
gz = Zlib::GzipWriter.new(f)
272+
elsif type == :zstd
273+
gz = Zstd::StreamWriter.new(f)
274+
end
259275
chunk.write_to(gz, compressed: :text)
260276
gz.close
261277
end
262278
end
263279

264-
def write_gzip_from_gzipped_chunk(path, chunk)
280+
def write_from_compressed_chunk(type, path, chunk)
265281
File.open(path, "ab", @file_perm) do |f|
266-
chunk.write_to(f, compressed: :gzip)
282+
chunk.write_to(f, compressed: type)
267283
end
268284
end
269285

@@ -280,6 +296,7 @@ def timekey_to_timeformat(timekey)
280296
def compression_suffix(compress)
281297
case compress
282298
when :gzip then '.gz'
299+
when :zstd then '.zstd'
283300
when nil then ''
284301
else
285302
raise ArgumentError, "unknown compression type #{compress}"

lib/fluent/plugin/out_forward.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class ForwardOutput < Output
8787
config_param :verify_connection_at_startup, :bool, default: false
8888

8989
desc 'Compress buffered data.'
90-
config_param :compress, :enum, list: [:text, :gzip], default: :text
90+
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text
9191

9292
desc 'The default version of TLS transport.'
9393
config_param :tls_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: Fluent::TLS::DEFAULT_VERSION
@@ -251,10 +251,14 @@ def configure(conf)
251251
end
252252

253253
unless @as_secondary
254-
if @compress == :gzip && @buffer.compress == :text
255-
@buffer.compress = :gzip
256-
elsif @compress == :text && @buffer.compress == :gzip
257-
log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
254+
if @buffer.compress == :text
255+
@buffer.compress = @compress unless @compress == :text
256+
else
257+
if @compress == :text
258+
log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
259+
elsif @compress != @buffer.compress
260+
raise Fluent::ConfigError, "You cannot specify different compression formats for Buffer (Buffer: #{@buffer.compress}, Self: #{@compress})"
261+
end
258262
end
259263
end
260264

lib/fluent/plugin/output.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,13 +1014,17 @@ def write_guard(&block)
10141014
end
10151015

10161016
FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1017-
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1017+
FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1018+
FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) }
10181019
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1019-
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1020+
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
1021+
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) }
10201022

10211023
def generate_format_proc
10221024
if @buffer && @buffer.compress == :gzip
1023-
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
1025+
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP : FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP
1026+
elsif @buffer && @buffer.compress == :zstd
1027+
@time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD : FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD
10241028
else
10251029
@time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
10261030
end

test/plugin/test_output.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,13 +1010,25 @@ def waiting(seconds)
10101010
test 'when output has <buffer> and compress is gzip' do
10111011
i = create_output(:buffered)
10121012
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})]))
1013-
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc
1013+
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP, i.generate_format_proc
10141014
end
10151015

10161016
test 'when output has <buffer> and compress is gzip and time_as_integer is true' do
10171017
i = create_output(:buffered)
10181018
i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})]))
1019-
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc
1019+
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP, i.generate_format_proc
1020+
end
1021+
1022+
test 'when output has <buffer> and compress is zstd' do
1023+
i = create_output(:buffered)
1024+
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'zstd'})]))
1025+
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD, i.generate_format_proc
1026+
end
1027+
1028+
test 'when output has <buffer> and compress is zstd and time_as_integer is true' do
1029+
i = create_output(:buffered)
1030+
i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'zstd'})]))
1031+
assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD, i.generate_format_proc
10201032
end
10211033

10221034
test 'when output has <buffer> and compress is text' do

0 commit comments

Comments
 (0)