Skip to content

Commit

Permalink
Allow to break out of tailer when some conditions are met inside of t…
Browse files Browse the repository at this point in the history
…he block (stripe-archive#17)

* Allow to break out of tailer when some conditions are met inside of the block

* Switch to ruby 2.2 in travis and set fixed version for mongo

Mongo 2.0 has breaking changes

* Address code review feedback
  • Loading branch information
den-stripe authored Apr 28, 2017
1 parent 70b4dad commit ae548a5
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 14 deletions.
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
language: ruby
rvm:
- "1.8.7"
- "1.9.3"
- "2.1.2"
- "2.2.4"
# command to run tests
script: "bundle exec rake test"
script: "bundle exec rake test"
5 changes: 3 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ Rake::TestTask.new(:'test-unit') do |t|
t.test_files = FileList[
'test/test_mongoriver.rb',
'test/test_toku.rb',
'test/test_persistent_tailers.rb']
'test/test_persistent_tailers.rb',
'test/test_tailer.rb']
end

Rake::TestTask.new(:'test-connected') do |t|
t.test_files = FileList['test/test_*_connected.rb']
end
end
1 change: 1 addition & 0 deletions lib/mongoriver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Mongoriver; end
require 'mongoriver/assertions'

require 'mongoriver/tailer'
require 'mongoriver/tailer_stream_state'
require 'mongoriver/toku'
require 'mongoriver/abstract_persistent_tailer'
require 'mongoriver/persistent_tailer'
Expand Down
12 changes: 7 additions & 5 deletions lib/mongoriver/tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,20 @@ def tailing
def stream(limit=nil, &blk)
count = 0
@streaming = true
while !@stop && @cursor.has_next?
count += 1
break if limit && count >= limit
state = TailerStreamState.new(limit)
while !@stop && !state.break? && @cursor.has_next?
state.increment

record = @cursor.next

case database_type
when :mongo
blk.call(record)
blk.call(record, state)
when :toku
converted = Toku.convert(record, @upstream_conn)
converted.each(&blk)
converted.each do |converted_record|
blk.call(converted_record, state)
end
end
end
@streaming = false
Expand Down
26 changes: 26 additions & 0 deletions lib/mongoriver/tailer_stream_state.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module Mongoriver
class TailerStreamState
attr_reader :count

def initialize(limit=nil)
@break = false
@limit = limit
@count = 0
end

def break?
return @break
end

def break
@break = true
end

def increment()
@count += 1
if !@limit.nil? && @count >= @limit
self.break
end
end
end
end
2 changes: 1 addition & 1 deletion lib/mongoriver/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Mongoriver
VERSION = "0.4.3"
VERSION = "0.4.4"
end
4 changes: 2 additions & 2 deletions mongoriver.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = Mongoriver::VERSION

gem.add_runtime_dependency('mongo', '>= 1.7')
gem.add_runtime_dependency('bson_ext')
gem.add_runtime_dependency('mongo', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('bson_ext', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('log4r')

gem.add_development_dependency('rake')
Expand Down
98 changes: 98 additions & 0 deletions test/test_tailer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
require 'minitest/autorun'
require 'mocha/setup'
require 'mongo'
require 'mongoriver'

describe 'Mongoriver::Tailer' do
class CursorStub
def initialize
@events = []
@index = 0
end

def add_option(opt) end

def generate_ops(max)
@index = 0
(1..max).map do |id|
@events << {
'ts' => BSON::Timestamp.new(Time.now.to_i, 0),
'ns' => 'foo.bar',
'op' => 'i',
'o' => {
'_id' => id.to_s
}
}
end
end

def has_next?
@index < @events.length
end

def next
@index += 1
@events[@index - 1]
end
end

before do
cursor = CursorStub.new
collection = stub do
expects(:find).yields(cursor)
end
db = stub do
expects(:collection).with('oplog.rs').returns(collection)
end
conn = stub(server_info: {}) do
expects(:db).with('local').returns(db)
end
@cursor = cursor
@tailer = Mongoriver::Tailer.new([conn], :existing)
@tailer.tail
end

it 'tailer stream with limit' do
@cursor.generate_ops(10)
count = 0
has_more = @tailer.stream(5) do |_|
count += 1
end
assert(has_more)
assert_equal(5, count)
end

it 'tailer stream without limit' do
@cursor.generate_ops(10)
count = 0
has_more = @tailer.stream do |_|
count += 1
end
assert(!has_more)
assert_equal(10, count)
end

it 'tailer stream allow to break out' do
@cursor.generate_ops(10)
count = 0
has_more = @tailer.stream do |_, state|
count += 1
state.break if count == 5
assert_equal(count, state.count)
end
assert(has_more)
assert_equal(5, count)
end

it 'tailer stream allow to break out before limit' do
@cursor.generate_ops(10)
count = 0
has_more = @tailer.stream(7) do |_, state|
count += 1
state.break if count == 5
assert_equal(count, state.count)
end
assert(has_more)
assert_equal(5, count)
end
end

0 comments on commit ae548a5

Please sign in to comment.