Skip to content

Commit

Permalink
Support breaking out of stream in persistent tailer (stripe-archive#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
den-stripe authored May 24, 2017
1 parent ae548a5 commit cc8bee6
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 40 deletions.
4 changes: 2 additions & 2 deletions lib/mongoriver/abstract_persistent_tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def stream(limit=nil)

# Sketchy logic - yield results from Tailer.stream
# if nothing is found and nothing in cursor, save the current position
entries_left = super(limit) do |entry|
yield entry
entries_left = super(limit) do |entry, state|
yield(entry, state)

found_entry = true
@last_read = state_for(entry)
Expand Down
2 changes: 1 addition & 1 deletion lib/mongoriver/tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def connect_upstream
end

def connection_config
@upstream_conn['admin'].command(:ismaster => 1)
@upstream_conn.db('admin').command(:ismaster => 1)
end

def ensure_upstream_replset!
Expand Down
2 changes: 1 addition & 1 deletion lib/mongoriver/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Mongoriver
VERSION = "0.4.4"
VERSION = "0.4.5"
end
33 changes: 33 additions & 0 deletions test/cursor_stub.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'mongo'

class CursorStub
def initialize
@events = []
@index = 0
end

def add_option(opt) end

def generate_ops(max)
@index = 0
(1..max).map do |id|
@events << {
'ts' => BSON::Timestamp.new(Time.now.to_i, 0),
'ns' => 'foo.bar',
'op' => 'i',
'o' => {
'_id' => id.to_s
}
}
end
end

def has_next?
@index < @events.length
end

def next
@index += 1
@events[@index - 1]
end
end
34 changes: 31 additions & 3 deletions test/test_persistent_tailers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require 'mongo'
require 'minitest/autorun'
require 'mocha/setup'

require_relative './cursor_stub'

def mocked_mongo()
mongo_connection = stub()
Expand All @@ -22,10 +22,10 @@ def mocked_mongo()
before do
@service_name = "_persistenttailer_test"

db, @state_collection = mocked_mongo
@mongo_connection, @state_collection = mocked_mongo

@tailer = Mongoriver::PersistentTailer.new(
[db], :existing, @service_name)
[@mongo_connection], :existing, @service_name)
@state = {
'time' => Time.now,
'position' => 'foobar'
Expand Down Expand Up @@ -71,4 +71,32 @@ def mocked_mongo()

@tailer.tail
end

it 'should stream with state' do
# Uses admin to verify that it is a replicaset
admin_db = stub()
admin_db.expects(:command).returns({'setName' => 'replica'})
@mongo_connection.expects(:db).with('admin').returns(admin_db)
# Updates state collection when finish iteration
@state_collection.expects(:update)

# Oplog collection to return results
local_db = stub()
@mongo_connection.expects(:db).with('local').returns(local_db)
oplog_collection = stub()
local_db.expects(:collection).with('oplog.rs').returns(oplog_collection)
cursor = CursorStub.new
oplog_collection.expects(:find).yields(cursor)

@tailer.tail(:from => BSON::Timestamp.new(Time.now.to_i, 0))

cursor.generate_ops(10)

count = 0
@tailer.stream do |record, state|
count = state.count
state.break
end
assert_equal(1, count)
end
end
35 changes: 2 additions & 33 deletions test/test_tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,9 @@
require 'mocha/setup'
require 'mongo'
require 'mongoriver'

require_relative './cursor_stub'

describe 'Mongoriver::Tailer' do
class CursorStub
def initialize
@events = []
@index = 0
end

def add_option(opt) end

def generate_ops(max)
@index = 0
(1..max).map do |id|
@events << {
'ts' => BSON::Timestamp.new(Time.now.to_i, 0),
'ns' => 'foo.bar',
'op' => 'i',
'o' => {
'_id' => id.to_s
}
}
end
end

def has_next?
@index < @events.length
end

def next
@index += 1
@events[@index - 1]
end
end

before do
cursor = CursorStub.new
collection = stub do
Expand Down

0 comments on commit cc8bee6

Please sign in to comment.