diff --git a/lib/mongoriver/abstract_persistent_tailer.rb b/lib/mongoriver/abstract_persistent_tailer.rb index 8b3a3b9..fd20f21 100644 --- a/lib/mongoriver/abstract_persistent_tailer.rb +++ b/lib/mongoriver/abstract_persistent_tailer.rb @@ -33,8 +33,8 @@ def stream(limit=nil) # Sketchy logic - yield results from Tailer.stream # if nothing is found and nothing in cursor, save the current position - entries_left = super(limit) do |entry| - yield entry + entries_left = super(limit) do |entry, state| + yield(entry, state) found_entry = true @last_read = state_for(entry) diff --git a/lib/mongoriver/tailer.rb b/lib/mongoriver/tailer.rb index 3374a5c..62dc6bf 100644 --- a/lib/mongoriver/tailer.rb +++ b/lib/mongoriver/tailer.rb @@ -97,7 +97,7 @@ def connect_upstream end def connection_config - @upstream_conn['admin'].command(:ismaster => 1) + @upstream_conn.db('admin').command(:ismaster => 1) end def ensure_upstream_replset! diff --git a/lib/mongoriver/version.rb b/lib/mongoriver/version.rb index 4ef3438..1266922 100644 --- a/lib/mongoriver/version.rb +++ b/lib/mongoriver/version.rb @@ -1,3 +1,3 @@ module Mongoriver - VERSION = "0.4.4" + VERSION = "0.4.5" end diff --git a/test/cursor_stub.rb b/test/cursor_stub.rb new file mode 100644 index 0000000..0d51b87 --- /dev/null +++ b/test/cursor_stub.rb @@ -0,0 +1,33 @@ +require 'mongo' + +class CursorStub + def initialize + @events = [] + @index = 0 + end + + def add_option(opt) end + + def generate_ops(max) + @index = 0 + (1..max).map do |id| + @events << { + 'ts' => BSON::Timestamp.new(Time.now.to_i, 0), + 'ns' => 'foo.bar', + 'op' => 'i', + 'o' => { + '_id' => id.to_s + } + } + end + end + + def has_next? + @index < @events.length + end + + def next + @index += 1 + @events[@index - 1] + end +end diff --git a/test/test_persistent_tailers.rb b/test/test_persistent_tailers.rb index d074c9a..1c31b96 100644 --- a/test/test_persistent_tailers.rb +++ b/test/test_persistent_tailers.rb @@ -2,7 +2,7 @@ require 'mongo' require 'minitest/autorun' require 'mocha/setup' - +require_relative './cursor_stub' def mocked_mongo() mongo_connection = stub() @@ -22,10 +22,10 @@ def mocked_mongo() before do @service_name = "_persistenttailer_test" - db, @state_collection = mocked_mongo + @mongo_connection, @state_collection = mocked_mongo @tailer = Mongoriver::PersistentTailer.new( - [db], :existing, @service_name) + [@mongo_connection], :existing, @service_name) @state = { 'time' => Time.now, 'position' => 'foobar' @@ -71,4 +71,32 @@ def mocked_mongo() @tailer.tail end + + it 'should stream with state' do + # Uses admin to verify that it is a replicaset + admin_db = stub() + admin_db.expects(:command).returns({'setName' => 'replica'}) + @mongo_connection.expects(:db).with('admin').returns(admin_db) + # Updates state collection when finish iteration + @state_collection.expects(:update) + + # Oplog collection to return results + local_db = stub() + @mongo_connection.expects(:db).with('local').returns(local_db) + oplog_collection = stub() + local_db.expects(:collection).with('oplog.rs').returns(oplog_collection) + cursor = CursorStub.new + oplog_collection.expects(:find).yields(cursor) + + @tailer.tail(:from => BSON::Timestamp.new(Time.now.to_i, 0)) + + cursor.generate_ops(10) + + count = 0 + @tailer.stream do |record, state| + count = state.count + state.break + end + assert_equal(1, count) + end end diff --git a/test/test_tailer.rb b/test/test_tailer.rb index 1548294..5d331cb 100644 --- a/test/test_tailer.rb +++ b/test/test_tailer.rb @@ -2,40 +2,9 @@ require 'mocha/setup' require 'mongo' require 'mongoriver' - +require_relative './cursor_stub' + describe 'Mongoriver::Tailer' do - class CursorStub - def initialize - @events = [] - @index = 0 - end - - def add_option(opt) end - - def generate_ops(max) - @index = 0 - (1..max).map do |id| - @events << { - 'ts' => BSON::Timestamp.new(Time.now.to_i, 0), - 'ns' => 'foo.bar', - 'op' => 'i', - 'o' => { - '_id' => id.to_s - } - } - end - end - - def has_next? - @index < @events.length - end - - def next - @index += 1 - @events[@index - 1] - end - end - before do cursor = CursorStub.new collection = stub do