From 4fced39eb57997e673724e945f5895a6daec5888 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20L=C3=A9au-Mercier?= Date: Tue, 4 Nov 2014 11:53:27 +0100 Subject: [PATCH 1/3] add gzip decrompression in tail --- lib/filewatch/tail.rb | 86 ++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 22 deletions(-) diff --git a/lib/filewatch/tail.rb b/lib/filewatch/tail.rb index 6c27bfe..e171323 100644 --- a/lib/filewatch/tail.rb +++ b/lib/filewatch/tail.rb @@ -1,13 +1,15 @@ require "filewatch/buftok" require "filewatch/watch" +require "zlib" + if RbConfig::CONFIG['host_os'] =~ /mswin|mingw|cygwin/ require "filewatch/winhelper" end require "logger" require "rbconfig" -include Java if defined? JRUBY_VERSION -require "JRubyFileExtension.jar" if defined? JRUBY_VERSION +#include Java if defined? JRUBY_VERSION +#require "JRubyFileExtension.jar" if defined? JRUBY_VERSION module FileWatch class Tail @@ -30,6 +32,7 @@ def initialize(opts={}) @logger.level = Logger::INFO end @files = {} + @gzip = {} @lastwarn = Hash.new { |h, k| h[k] = 0 } @buffers = {} @watch = FileWatch::Watch.new @@ -95,6 +98,7 @@ def subscribe(&block) _read_file(path, &block) @files[path].close @files.delete(path) + @gzip.delete(path) @statcache.delete(path) else @logger.warn("unknown event type #{event} for #{path}") @@ -123,6 +127,7 @@ def _open_file(path, event) @logger.debug("(warn supressed) failed to open #{path}: #{$!}") end @files.delete(path) + @gzip.delete(path) return false end @@ -133,7 +138,7 @@ def _open_file(path, event) inode = [fileId, stat.dev_major, stat.dev_minor] else inode = [stat.ino.to_s, stat.dev_major, stat.dev_minor] - end + end @statcache[path] = inode @@ -148,16 +153,29 @@ def _open_file(path, event) @sincedb[inode] = 0 end elsif event == :create_initial && @files[path] - # TODO(sissel): Allow starting at beginning of the file. - if @opts[:start_new_files_at] == :beginning - @logger.debug("#{path}: initial create, no sincedb, seeking to beginning of file") + @gzip[path] = path.end_with?('.gz') + if @gzip[path] + # RFC1952 two frst byte for gzip file is ID1 ID2 == 31=1f 139=8b + @opts[:start_new_files_at] = :beginning # force start to beginning @files[path].sysseek(0, IO::SEEK_SET) + dataID1 = @files[path].getc.unpack('H*').first + dataID2 = @files[path].getc.unpack('H*').first + @gzip[path] = dataID1 == "1f" and dataID2 == "8b" + @logger.debug("gzip flag #{@gzip[path]} #{dataID1} #{dataID2}") + @files[path].rewind @sincedb[inode] = 0 - else - # seek to end - @logger.debug("#{path}: initial create, no sincedb, seeking to end #{stat.size}") - @files[path].sysseek(stat.size, IO::SEEK_SET) - @sincedb[inode] = stat.size + else + # TODO(sissel): Allow starting at beginning of the file. + if @opts[:start_new_files_at] == :beginning + @logger.debug("#{path}: initial create, no sincedb, seeking to beginning of file") + @files[path].sysseek(0, IO::SEEK_SET) + @sincedb[inode] = 0 + else + # seek to end + @logger.debug("#{path}: initial create, no sincedb, seeking to end #{stat.size}") + @files[path].sysseek(stat.size, IO::SEEK_SET) + @sincedb[inode] = stat.size + end end else @logger.debug("#{path}: staying at position 0, no sincedb") @@ -168,20 +186,44 @@ def _open_file(path, event) private def _read_file(path, &block) - @buffers[path] ||= FileWatch::BufferedTokenizer.new + @logger.debug("_read_file gzip flag is #{@gzip[path]} for path #{path}") + if @gzip[path] + @buffers[path] ||= Zlib::GzipReader.new(@files[path]) + else + @buffers[path] ||= FileWatch::BufferedTokenizer.new + end changed = false - loop do - begin - data = @files[path].sysread(32768) - changed = true - @buffers[path].extract(data).each do |line| - yield(path, line) + if @gzip[path] + loop do + begin + changed = true + @buffers[path].each_line do |line| + line = line.gsub(/\r?\n$/,'') + yield(path, line) + end + @sincedb[@statcache[path]] = @files[path].pos + if @buffers[path].eof? + break + end + rescue Zlib::Error, Zlib::GzipFile::Error => error + @logger.warn("Tail error on gz reading : #{error}") + break + end + end + else + loop do + begin + data = @files[path].sysread(32768) + changed = true + @buffers[path].extract(data).each do |line| + yield(path, line) + end + + @sincedb[@statcache[path]] = @files[path].pos + rescue Errno::EWOULDBLOCK, Errno::EINTR, EOFError + break end - - @sincedb[@statcache[path]] = @files[path].pos - rescue Errno::EWOULDBLOCK, Errno::EINTR, EOFError - break end end From 367c2d5c6bd4acbbfebe590f53c25c4c46c991f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20L=C3=A9au-Mercier?= Date: Wed, 5 Nov 2014 11:11:32 +0100 Subject: [PATCH 2/3] add gzip detection if new file is added in path --- lib/filewatch/tail.rb | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/lib/filewatch/tail.rb b/lib/filewatch/tail.rb index e171323..b8f8e5e 100644 --- a/lib/filewatch/tail.rb +++ b/lib/filewatch/tail.rb @@ -141,7 +141,7 @@ def _open_file(path, event) end @statcache[path] = inode - + if @sincedb.member?(inode) last_size = @sincedb[inode] @logger.debug("#{path}: sincedb last value #{@sincedb[inode]}, cur size #{stat.size}") @@ -153,18 +153,8 @@ def _open_file(path, event) @sincedb[inode] = 0 end elsif event == :create_initial && @files[path] - @gzip[path] = path.end_with?('.gz') - if @gzip[path] - # RFC1952 two frst byte for gzip file is ID1 ID2 == 31=1f 139=8b - @opts[:start_new_files_at] = :beginning # force start to beginning - @files[path].sysseek(0, IO::SEEK_SET) - dataID1 = @files[path].getc.unpack('H*').first - dataID2 = @files[path].getc.unpack('H*').first - @gzip[path] = dataID1 == "1f" and dataID2 == "8b" - @logger.debug("gzip flag #{@gzip[path]} #{dataID1} #{dataID2}") - @files[path].rewind - @sincedb[inode] = 0 - else + _check_gzip_file(path, inode) + if !@gzip[path] # TODO(sissel): Allow starting at beginning of the file. if @opts[:start_new_files_at] == :beginning @logger.debug("#{path}: initial create, no sincedb, seeking to beginning of file") @@ -178,12 +168,32 @@ def _open_file(path, event) end end else - @logger.debug("#{path}: staying at position 0, no sincedb") + _check_gzip_file(path, inode) + if @gzip[path] + @logger.debug("#{path}: gzip file") + else + @logger.debug("#{path}: staying at position 0, no sincedb") + end end return true end # def _open_file + private + def _check_gzip_file(path, inode) + if path.end_with?('.gz') + # RFC1952 two first byte for gzip file is ID1 ID2 == 31=1f 139=8b + @opts[:start_new_files_at] = :beginning # force start to beginning + @files[path].sysseek(0, IO::SEEK_SET) + dataID1 = @files[path].getbyte() + dataID2 = @files[path].getbyte() + @gzip[path] = dataID1 == 31 and dataID2 == 139 + @logger.debug("gzip flag #{@gzip[path]} #{dataID1} #{dataID2}") + @files[path].rewind + @sincedb[inode] = 0 + end + end + private def _read_file(path, &block) @logger.debug("_read_file gzip flag is #{@gzip[path]} for path #{path}") From 572924150c940df3e0158639c6a7e78dc02bab81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20L=C3=A9au-Mercier?= Date: Wed, 5 Nov 2014 11:19:06 +0100 Subject: [PATCH 3/3] activate java required --- lib/filewatch/tail.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/filewatch/tail.rb b/lib/filewatch/tail.rb index b8f8e5e..f3a7c7b 100644 --- a/lib/filewatch/tail.rb +++ b/lib/filewatch/tail.rb @@ -8,8 +8,8 @@ require "logger" require "rbconfig" -#include Java if defined? JRUBY_VERSION -#require "JRubyFileExtension.jar" if defined? JRUBY_VERSION +include Java if defined? JRUBY_VERSION +require "JRubyFileExtension.jar" if defined? JRUBY_VERSION module FileWatch class Tail