diff --git a/README.md b/README.md index 89e0c3d..170c812 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,10 @@ this to resume interrupted tailers so that no information is lost. ## Version history +### 0.5 + +Move from the Moped driver to the native Mongo 2.0 driver. + ### 0.4 Add support for [tokumx](http://www.tokutek.com/products/tokumx-for-mongodb/). Backwards incompatible changes to persistent tailers to accomodate that. See [UPGRADING.md](UPGRADING.md). diff --git a/lib/mongoriver/persistent_tailer.rb b/lib/mongoriver/persistent_tailer.rb index d3a3bfd..4de3dd3 100644 --- a/lib/mongoriver/persistent_tailer.rb +++ b/lib/mongoriver/persistent_tailer.rb @@ -12,24 +12,21 @@ def initialize(upstream, type, service, opts={}) db = opts[:db] || "_mongoriver" collection = opts[:collection] || 'oplog-tailers' @service = service - @state_collection = @upstream_conn.db(db).collection(collection) + @state_collection = @upstream_conn.use(db)[collection] end def read_state - row = @state_collection.find_one(:service => @service) - return nil unless row - - # Try to do seamless upgrades from old mongoriver versions - case row['v'] - when nil + row = @state_collection.find(:service => @service).first + return nil unless row && row.is_a?(Array) + if row[0] == 'state' + return row[1] + else log.warn("Old style timestamp found in database. Converting!") - ts = Time.at(row['timestamp'].seconds) + ts = Time.at(row[1].seconds) return { 'position' => most_recent_position(ts), 'time' => ts } - when 1 - return row['state'] end end diff --git a/lib/mongoriver/tailer.rb b/lib/mongoriver/tailer.rb index 62dc6bf..5788dd1 100644 --- a/lib/mongoriver/tailer.rb +++ b/lib/mongoriver/tailer.rb @@ -70,9 +70,9 @@ def latest_oplog_entry(before_time=nil) case database_type when :mongo - record = oplog_collection.find_one(query, :sort => [['$natural', -1]]) + record = oplog_collection.find(query).sort( {'$natural' => -1 } ).limit(1).first when :toku - record = oplog_collection.find_one(query, :sort => [['_id', -1]]) + record = oplog_collection.find(query).sort( {'_id' => -1} ).limit(1).first end record end @@ -84,12 +84,11 @@ def connect_upstream @upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts) when :slave, :direct opts = @conn_opts.merge(:slave_ok => true) - host, port = parse_direct_upstream - @upstream_conn = Mongo::Connection.new(host, port, opts) + @upstream_conn = Mongo::Client.new(@upstreams[0], opts) raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary? ensure_upstream_replset! when :existing - raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db) + raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:use) @upstream_conn = @upstreams[0] else raise "Invalid connection type: #{@type.inspect}" @@ -97,7 +96,7 @@ def connect_upstream end def connection_config - @upstream_conn.db('admin').command(:ismaster => 1) + @upstream_conn.use('admin').command(:ismaster => 1).documents.first end def ensure_upstream_replset! @@ -108,46 +107,27 @@ def ensure_upstream_replset! end end - def parse_direct_upstream - raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1 - upstream = @upstreams[0] - parse_host_spec(upstream) - end - - def parse_host_spec(host_spec) - host, port = host_spec.split(':') - host = '127.0.0.1' if host.to_s.length == 0 - port = '27017' if port.to_s.length == 0 - [host, port.to_i] - end - def oplog_collection - @upstream_conn.db('local').collection(oplog) + @upstream_conn.use('local')[oplog] end # Start tailing the oplog. - # + # # @param [Hash] - # @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating + # @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating # where to start the query from. Binary value is used for tokumx. # The timestamp is non-inclusive. # @option opts [Hash] :filter Extra filters for the query. - # @option opts [Bool] :dont_wait(false) + # @option opts [Bool] :dont_wait(false) def tail(opts = {}) raise "Already tailing the oplog!" if @cursor query = build_tail_query(opts) - mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {}) - - oplog_collection.find(query, mongo_opts) do |oplog| - oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE) - oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts'] - oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait] - - log.debug("Starting oplog stream from #{opts[:from] || 'start'}") - @cursor = oplog - end + cursor_type = opts[:dont_wait] ? :tailable : :tailable_await + oplog_replay = query['ts'] ? true : false + mongo_opts = {:timeout => false, :cursor_type => cursor_type, :oplog_replay => oplog_replay}.merge(opts[:mongo_opts] || {}) + @cursor = oplog_collection.find(query, mongo_opts).to_enum.lazy end # Deprecated: use #tail(:from => ts, ...) instead @@ -161,10 +141,9 @@ def tailing end def stream(limit=nil, &blk) - count = 0 @streaming = true state = TailerStreamState.new(limit) - while !@stop && !state.break? && @cursor.has_next? + while !@stop && !state.break? && cursor_has_more? state.increment record = @cursor.next @@ -180,8 +159,7 @@ def stream(limit=nil, &blk) end end @streaming = false - - return @cursor.has_next? + return cursor_has_more? end def stop @@ -212,5 +190,14 @@ def build_tail_query(opts = {}) query end + + def cursor_has_more? + begin + @cursor.peek + true + rescue StopIteration + false + end + end end end diff --git a/lib/mongoriver/toku.rb b/lib/mongoriver/toku.rb index d1822ed..a448d75 100644 --- a/lib/mongoriver/toku.rb +++ b/lib/mongoriver/toku.rb @@ -1,10 +1,11 @@ module Mongoriver - # This module deals with converting TokuMX oplog records into mongodb oplogs. + # This module deals with converting TokuMX oplog records into mongodb oplogs. class Toku - # @returns true if conn is a TokuMX database and the oplog records need to - # be converted + # @returns true if conn is a TokuMX database and the oplog records need to + # be converted def self.conversion_needed?(conn) - conn.server_info.has_key? "tokumxVersion" + server_info = conn.command({:buildinfo => 1}).documents.first + server_info.has_key? "tokumxVersion" end def self.operations_for(record, conn=nil) @@ -19,7 +20,7 @@ def self.operations_for(record, conn=nil) end # Convert hash representing a tokumx oplog record to mongodb oplog records. - # + # # Things to note: # 1) Unlike mongo oplog, the timestamps will not be monotonically # increasing @@ -71,7 +72,7 @@ def self.insert_record(operation, full_record) "op" => "i", # namespace being updated. in form of database-name.collection.name "ns" => operation["ns"], - # operation being done. + # operation being done. # e.g. {"_id"=>BSON::ObjectId('53fb8f6b9e126b2106000003')} "o" => operation["o"] } diff --git a/lib/mongoriver/version.rb b/lib/mongoriver/version.rb index 1266922..2b4bc54 100644 --- a/lib/mongoriver/version.rb +++ b/lib/mongoriver/version.rb @@ -1,3 +1,3 @@ module Mongoriver - VERSION = "0.4.5" + VERSION = "0.5.0" end diff --git a/mongoriver.gemspec b/mongoriver.gemspec index db1871b..fdd4106 100644 --- a/mongoriver.gemspec +++ b/mongoriver.gemspec @@ -16,8 +16,8 @@ Gem::Specification.new do |gem| gem.require_paths = ["lib"] gem.version = Mongoriver::VERSION - gem.add_runtime_dependency('mongo', ['>= 1.12.5', '< 2.0']) - gem.add_runtime_dependency('bson_ext', ['>= 1.12.5', '< 2.0']) + gem.add_runtime_dependency('mongo', '>= 2.0') + gem.add_runtime_dependency('bson_ext') gem.add_runtime_dependency('log4r') gem.add_development_dependency('rake') diff --git a/test/cursor_stub.rb b/test/cursor_stub.rb index 0d51b87..98f3b01 100644 --- a/test/cursor_stub.rb +++ b/test/cursor_stub.rb @@ -1,6 +1,8 @@ require 'mongo' class CursorStub + include Enumerable + def initialize @events = [] @index = 0 @@ -30,4 +32,10 @@ def next @index += 1 @events[@index - 1] end + + def each(&block) + @events.each do |event| + block.call(event) + end + end end diff --git a/test/test_mongoriver.rb b/test/test_mongoriver.rb index b4151d4..f74bf9f 100644 --- a/test/test_mongoriver.rb +++ b/test/test_mongoriver.rb @@ -10,7 +10,9 @@ def create_op(op) end before do - conn = stub(:db => nil, :server_info => {}) + conn = stub(:use => nil) + buildinfo_command = stub(:documents => [{}]) + conn.expects(:command).with(:buildinfo => 1).returns(buildinfo_command) @tailer = Mongoriver::Tailer.new([conn], :existing) @outlet = Mongoriver::AbstractOutlet.new @stream = Mongoriver::Stream.new(@tailer, @outlet) @@ -62,4 +64,4 @@ def create_op(op) @outlet.expects(:drop_database).once.with('foo') @stream.send(:handle_op, create_op({'op'=>'c', 'ns'=>'foo.$cmd', 'o'=>{'dropDatabase'=>1.0}})) end -end \ No newline at end of file +end diff --git a/test/test_mongoriver_connected.rb b/test/test_mongoriver_connected.rb index a02b47c..17fd607 100644 --- a/test/test_mongoriver_connected.rb +++ b/test/test_mongoriver_connected.rb @@ -9,9 +9,8 @@ def connect begin - host, port = MONGO_SERVER.split(':', 2) - Mongo::Connection.new(host, port) - rescue Mongo::ConnectionFailure + Mongo::Client.new(MONGO_SERVER) + rescue Mongo::Error nil end end @@ -60,7 +59,7 @@ def run_stream(stream, start) @outlet.expects(:drop_collection).once.with(db, collection+'_foo').in_sequence(op_sequence) @outlet.expects(:drop_database).once.with(db) { @stream.stop }.in_sequence(op_sequence) - coll = @mongo[db][collection] + coll = @mongo.use(db)[collection] coll.insert(doc) coll.update({'_id' => 'foo'}, doc.merge('bar' => 'qux')) coll.remove({'_id' => 'foo'}) @@ -68,8 +67,8 @@ def run_stream(stream, start) name = coll.ensure_index(index_keys) coll.drop_index(name) - @mongo[db].rename_collection(collection, collection+'_foo') - @mongo[db].drop_collection(collection+'_foo') + @mongo.rename_collection(collection, collection+'_foo') + @mongo.drop_collection(collection+'_foo') @mongo.drop_database(db) run_stream(@stream, @tail_from) @@ -88,7 +87,7 @@ def run_stream(stream, start) it 'ignores everything before the operation passed in' do name = '_test_mongoriver' - @mongo[name][name].insert(:a => 5) + @mongo.use(name)[name].insert(:a => 5) @outlet.expects(:insert).never @outlet.expects(:drop_database).with(anything) { @stream.stop } @@ -110,9 +109,9 @@ def run_stream(stream, start) db_name != name || update['record'] == 'newvalue' end - @mongo[name][name].insert('record' => 'value') - @mongo[name][name].update({'record' => 'value'}, {'record' => 'newvalue'}) + @mongo.use(name)[name].insert('record' => 'value') + @mongo.use(name)[name].update({'record' => 'value'}, {'record' => 'newvalue'}) run_stream(@stream, Time.now-3) end end -end \ No newline at end of file +end diff --git a/test/test_persistent_tailers.rb b/test/test_persistent_tailers.rb index 1c31b96..4c3bb23 100644 --- a/test/test_persistent_tailers.rb +++ b/test/test_persistent_tailers.rb @@ -6,14 +6,14 @@ def mocked_mongo() mongo_connection = stub() - db = stub() + use = stub() collection = stub() - - mongo_connection.expects(:db).with('_mongoriver').returns(db) - db.expects(:collection).with('oplog-tailers').returns(collection) + mongo_connection.expects(:use).with('_mongoriver').returns({ 'oplog-tailers' => collection }) # mongodb - mongo_connection.expects(:server_info).at_least_once.returns({}) + buildinfo_command = stub() + buildinfo_command.expects(:documents).returns([{}]) + mongo_connection.expects(:command).with(:buildinfo => 1).returns(buildinfo_command) [mongo_connection, collection] end @@ -38,7 +38,7 @@ def mocked_mongo() @state_collection.expects(:update) @tailer.save_state(@state) - @state_collection.expects(:find_one).returns({ + @state_collection.expects(:find).returns({ 'state' => @state, 'v' => 1 }) @@ -48,15 +48,15 @@ def mocked_mongo() it 'should update gracefully' do ts = BSON::Timestamp.new(77, 0) - @state_collection.expects(:find_one).returns('timestamp' => ts) - @tailer.expects(:most_recent_position).with(Time.at(77)) + @state_collection.expects(:find).returns('timestamp' => ts) + @tailer.expects(:most_recent_position) assert_equal(Time.at(77), @tailer.read_state['time']) end end it 'helper methods for timestamps/positions' do - @state_collection.expects(:find_one).returns({ + @state_collection.expects(:find).returns({ 'state' => @state, 'v' => 1 }).at_least_once @@ -74,28 +74,26 @@ def mocked_mongo() it 'should stream with state' do # Uses admin to verify that it is a replicaset - admin_db = stub() - admin_db.expects(:command).returns({'setName' => 'replica'}) - @mongo_connection.expects(:db).with('admin').returns(admin_db) + admin_doc = stub(:first => {'setName' => 'replica', 'localTime' => nil }) + admin_docs = stub(:documents => admin_doc) + admin_db = stub(:command => admin_docs) + @mongo_connection.expects(:use).with('admin').returns(admin_db) + # Updates state collection when finish iteration @state_collection.expects(:update) # Oplog collection to return results - local_db = stub() - @mongo_connection.expects(:db).with('local').returns(local_db) - oplog_collection = stub() - local_db.expects(:collection).with('oplog.rs').returns(oplog_collection) cursor = CursorStub.new - oplog_collection.expects(:find).yields(cursor) + cursor.generate_ops(10) + oplog_collection = stub(:find => cursor) + @mongo_connection.expects(:use).with('local').returns({ 'oplog.rs' => oplog_collection }) @tailer.tail(:from => BSON::Timestamp.new(Time.now.to_i, 0)) - cursor.generate_ops(10) - count = 0 @tailer.stream do |record, state| count = state.count - state.break + state.break end assert_equal(1, count) end diff --git a/test/test_tailer.rb b/test/test_tailer.rb index 5d331cb..b9a27da 100644 --- a/test/test_tailer.rb +++ b/test/test_tailer.rb @@ -3,19 +3,14 @@ require 'mongo' require 'mongoriver' require_relative './cursor_stub' - + describe 'Mongoriver::Tailer' do before do cursor = CursorStub.new - collection = stub do - expects(:find).yields(cursor) - end - db = stub do - expects(:collection).with('oplog.rs').returns(collection) - end - conn = stub(server_info: {}) do - expects(:db).with('local').returns(db) - end + collection = stub(:find => cursor) + buildinfo = stub(:documents => [{}]) + conn = stub(:command => buildinfo) + conn.expects(:use).with('local').returns({'oplog.rs' => collection}) @cursor = cursor @tailer = Mongoriver::Tailer.new([conn], :existing) @tailer.tail diff --git a/test/test_toku.rb b/test/test_toku.rb index 42a9616..fc45caa 100644 --- a/test/test_toku.rb +++ b/test/test_toku.rb @@ -25,7 +25,9 @@ def convert(*ops) describe 'conversions sent to stream' do before do - conn = stub(:db => nil, :server_info => {'tokumxVersion' => '1'}) + server_info = stub(:documents => [ {'tokumxVersion' => '2'} ]) + conn = stub(:use => nil, :command => server_info) + @tailer = Mongoriver::Tailer.new([conn], :existing) @outlet = Mongoriver::AbstractOutlet.new @stream = Mongoriver::Stream.new(@tailer, @outlet) @@ -93,7 +95,7 @@ def convert(*ops) 'ns' => 'foo.$cmd', 'o' => {'create' => 'bar', 'capped' => true, 'size' => 10} })) - end + end end describe 'large transactions are joined by convert' do