Skip to content

Commit

Permalink
Add support for native Mongo drivers, 2.0+
Browse files Browse the repository at this point in the history
Connects to stripe-archive#15
  • Loading branch information
John Nason committed Jan 5, 2018
1 parent cc8bee6 commit 0764dd4
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 100 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ this to resume interrupted tailers so that no information is lost.

## Version history

### 0.5

Move from the Moped driver to the native Mongo 2.0 driver.

### 0.4

Add support for [tokumx](http://www.tokutek.com/products/tokumx-for-mongodb/). Backwards incompatible changes to persistent tailers to accomodate that. See [UPGRADING.md](UPGRADING.md).
17 changes: 7 additions & 10 deletions lib/mongoriver/persistent_tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,21 @@ def initialize(upstream, type, service, opts={})
db = opts[:db] || "_mongoriver"
collection = opts[:collection] || 'oplog-tailers'
@service = service
@state_collection = @upstream_conn.db(db).collection(collection)
@state_collection = @upstream_conn.use(db)[collection]
end

def read_state
row = @state_collection.find_one(:service => @service)
return nil unless row

# Try to do seamless upgrades from old mongoriver versions
case row['v']
when nil
row = @state_collection.find(:service => @service).first
return nil unless row && row.is_a?(Array)
if row[0] == 'state'
return row[1]
else
log.warn("Old style timestamp found in database. Converting!")
ts = Time.at(row['timestamp'].seconds)
ts = Time.at(row[1].seconds)
return {
'position' => most_recent_position(ts),
'time' => ts
}
when 1
return row['state']
end
end

Expand Down
61 changes: 24 additions & 37 deletions lib/mongoriver/tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def latest_oplog_entry(before_time=nil)

case database_type
when :mongo
record = oplog_collection.find_one(query, :sort => [['$natural', -1]])
record = oplog_collection.find(query).sort( {'$natural' => -1 } ).limit(1).first
when :toku
record = oplog_collection.find_one(query, :sort => [['_id', -1]])
record = oplog_collection.find(query).sort( {'_id' => -1} ).limit(1).first
end
record
end
Expand All @@ -84,20 +84,19 @@ def connect_upstream
@upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts)
when :slave, :direct
opts = @conn_opts.merge(:slave_ok => true)
host, port = parse_direct_upstream
@upstream_conn = Mongo::Connection.new(host, port, opts)
@upstream_conn = Mongo::Client.new(@upstreams[0], opts)
raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary?
ensure_upstream_replset!
when :existing
raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db)
raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:use)
@upstream_conn = @upstreams[0]
else
raise "Invalid connection type: #{@type.inspect}"
end
end

def connection_config
@upstream_conn.db('admin').command(:ismaster => 1)
@upstream_conn.use('admin').command(:ismaster => 1).documents.first
end

def ensure_upstream_replset!
Expand All @@ -108,46 +107,27 @@ def ensure_upstream_replset!
end
end

def parse_direct_upstream
raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1
upstream = @upstreams[0]
parse_host_spec(upstream)
end

def parse_host_spec(host_spec)
host, port = host_spec.split(':')
host = '127.0.0.1' if host.to_s.length == 0
port = '27017' if port.to_s.length == 0
[host, port.to_i]
end

def oplog_collection
@upstream_conn.db('local').collection(oplog)
@upstream_conn.use('local')[oplog]
end

# Start tailing the oplog.
#
#
# @param [Hash]
# @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating
# @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating
# where to start the query from. Binary value is used for tokumx.
# The timestamp is non-inclusive.
# @option opts [Hash] :filter Extra filters for the query.
# @option opts [Bool] :dont_wait(false)
# @option opts [Bool] :dont_wait(false)
def tail(opts = {})
raise "Already tailing the oplog!" if @cursor

query = build_tail_query(opts)

mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {})

oplog_collection.find(query, mongo_opts) do |oplog|
oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE)
oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts']
oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait]

log.debug("Starting oplog stream from #{opts[:from] || 'start'}")
@cursor = oplog
end
cursor_type = opts[:dont_wait] ? :tailable : :tailable_await
oplog_replay = query['ts'] ? true : false
mongo_opts = {:timeout => false, :cursor_type => cursor_type, :oplog_replay => oplog_replay}.merge(opts[:mongo_opts] || {})
@cursor = oplog_collection.find(query, mongo_opts).to_enum.lazy
end

# Deprecated: use #tail(:from => ts, ...) instead
Expand All @@ -161,10 +141,9 @@ def tailing
end

def stream(limit=nil, &blk)
count = 0
@streaming = true
state = TailerStreamState.new(limit)
while !@stop && !state.break? && @cursor.has_next?
while !@stop && !state.break? && cursor_has_more?
state.increment

record = @cursor.next
Expand All @@ -180,8 +159,7 @@ def stream(limit=nil, &blk)
end
end
@streaming = false

return @cursor.has_next?
return cursor_has_more?
end

def stop
Expand Down Expand Up @@ -212,5 +190,14 @@ def build_tail_query(opts = {})

query
end

def cursor_has_more?
begin
@cursor.peek
true
rescue StopIteration
false
end
end
end
end
13 changes: 7 additions & 6 deletions lib/mongoriver/toku.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module Mongoriver
# This module deals with converting TokuMX oplog records into mongodb oplogs.
# This module deals with converting TokuMX oplog records into mongodb oplogs.
class Toku
# @returns true if conn is a TokuMX database and the oplog records need to
# be converted
# @returns true if conn is a TokuMX database and the oplog records need to
# be converted
def self.conversion_needed?(conn)
conn.server_info.has_key? "tokumxVersion"
server_info = conn.command({:buildinfo => 1}).documents.first
server_info.has_key? "tokumxVersion"
end

def self.operations_for(record, conn=nil)
Expand All @@ -19,7 +20,7 @@ def self.operations_for(record, conn=nil)
end

# Convert hash representing a tokumx oplog record to mongodb oplog records.
#
#
# Things to note:
# 1) Unlike mongo oplog, the timestamps will not be monotonically
# increasing
Expand Down Expand Up @@ -71,7 +72,7 @@ def self.insert_record(operation, full_record)
"op" => "i",
# namespace being updated. in form of database-name.collection.name
"ns" => operation["ns"],
# operation being done.
# operation being done.
# e.g. {"_id"=>BSON::ObjectId('53fb8f6b9e126b2106000003')}
"o" => operation["o"]
}
Expand Down
2 changes: 1 addition & 1 deletion lib/mongoriver/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Mongoriver
VERSION = "0.4.5"
VERSION = "0.5.0"
end
4 changes: 2 additions & 2 deletions mongoriver.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = Mongoriver::VERSION

gem.add_runtime_dependency('mongo', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('bson_ext', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('mongo', '>= 2.0')
gem.add_runtime_dependency('bson_ext')
gem.add_runtime_dependency('log4r')

gem.add_development_dependency('rake')
Expand Down
8 changes: 8 additions & 0 deletions test/cursor_stub.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'mongo'

class CursorStub
include Enumerable

def initialize
@events = []
@index = 0
Expand Down Expand Up @@ -30,4 +32,10 @@ def next
@index += 1
@events[@index - 1]
end

def each(&block)
@events.each do |event|
block.call(event)
end
end
end
6 changes: 4 additions & 2 deletions test/test_mongoriver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ def create_op(op)
end

before do
conn = stub(:db => nil, :server_info => {})
conn = stub(:use => nil)
buildinfo_command = stub(:documents => [{}])
conn.expects(:command).with(:buildinfo => 1).returns(buildinfo_command)
@tailer = Mongoriver::Tailer.new([conn], :existing)
@outlet = Mongoriver::AbstractOutlet.new
@stream = Mongoriver::Stream.new(@tailer, @outlet)
Expand Down Expand Up @@ -62,4 +64,4 @@ def create_op(op)
@outlet.expects(:drop_database).once.with('foo')
@stream.send(:handle_op, create_op({'op'=>'c', 'ns'=>'foo.$cmd', 'o'=>{'dropDatabase'=>1.0}}))
end
end
end
19 changes: 9 additions & 10 deletions test/test_mongoriver_connected.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

def connect
begin
host, port = MONGO_SERVER.split(':', 2)
Mongo::Connection.new(host, port)
rescue Mongo::ConnectionFailure
Mongo::Client.new(MONGO_SERVER)
rescue Mongo::Error
nil
end
end
Expand Down Expand Up @@ -60,16 +59,16 @@ def run_stream(stream, start)
@outlet.expects(:drop_collection).once.with(db, collection+'_foo').in_sequence(op_sequence)
@outlet.expects(:drop_database).once.with(db) { @stream.stop }.in_sequence(op_sequence)

coll = @mongo[db][collection]
coll = @mongo.use(db)[collection]
coll.insert(doc)
coll.update({'_id' => 'foo'}, doc.merge('bar' => 'qux'))
coll.remove({'_id' => 'foo'})

name = coll.ensure_index(index_keys)
coll.drop_index(name)

@mongo[db].rename_collection(collection, collection+'_foo')
@mongo[db].drop_collection(collection+'_foo')
@mongo.rename_collection(collection, collection+'_foo')
@mongo.drop_collection(collection+'_foo')
@mongo.drop_database(db)

run_stream(@stream, @tail_from)
Expand All @@ -88,7 +87,7 @@ def run_stream(stream, start)
it 'ignores everything before the operation passed in' do
name = '_test_mongoriver'

@mongo[name][name].insert(:a => 5)
@mongo.use(name)[name].insert(:a => 5)

@outlet.expects(:insert).never
@outlet.expects(:drop_database).with(anything) { @stream.stop }
Expand All @@ -110,9 +109,9 @@ def run_stream(stream, start)
db_name != name || update['record'] == 'newvalue'
end

@mongo[name][name].insert('record' => 'value')
@mongo[name][name].update({'record' => 'value'}, {'record' => 'newvalue'})
@mongo.use(name)[name].insert('record' => 'value')
@mongo.use(name)[name].update({'record' => 'value'}, {'record' => 'newvalue'})
run_stream(@stream, Time.now-3)
end
end
end
end
38 changes: 18 additions & 20 deletions test/test_persistent_tailers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

def mocked_mongo()
mongo_connection = stub()
db = stub()
use = stub()
collection = stub()

mongo_connection.expects(:db).with('_mongoriver').returns(db)
db.expects(:collection).with('oplog-tailers').returns(collection)
mongo_connection.expects(:use).with('_mongoriver').returns({ 'oplog-tailers' => collection })

# mongodb
mongo_connection.expects(:server_info).at_least_once.returns({})
buildinfo_command = stub()
buildinfo_command.expects(:documents).returns([{}])
mongo_connection.expects(:command).with(:buildinfo => 1).returns(buildinfo_command)

[mongo_connection, collection]
end
Expand All @@ -38,7 +38,7 @@ def mocked_mongo()
@state_collection.expects(:update)
@tailer.save_state(@state)

@state_collection.expects(:find_one).returns({
@state_collection.expects(:find).returns({
'state' => @state,
'v' => 1
})
Expand All @@ -48,15 +48,15 @@ def mocked_mongo()
it 'should update gracefully' do
ts = BSON::Timestamp.new(77, 0)

@state_collection.expects(:find_one).returns('timestamp' => ts)
@tailer.expects(:most_recent_position).with(Time.at(77))
@state_collection.expects(:find).returns('timestamp' => ts)
@tailer.expects(:most_recent_position)

assert_equal(Time.at(77), @tailer.read_state['time'])
end
end

it 'helper methods for timestamps/positions' do
@state_collection.expects(:find_one).returns({
@state_collection.expects(:find).returns({
'state' => @state,
'v' => 1
}).at_least_once
Expand All @@ -74,28 +74,26 @@ def mocked_mongo()

it 'should stream with state' do
# Uses admin to verify that it is a replicaset
admin_db = stub()
admin_db.expects(:command).returns({'setName' => 'replica'})
@mongo_connection.expects(:db).with('admin').returns(admin_db)
admin_doc = stub(:first => {'setName' => 'replica', 'localTime' => nil })
admin_docs = stub(:documents => admin_doc)
admin_db = stub(:command => admin_docs)
@mongo_connection.expects(:use).with('admin').returns(admin_db)

# Updates state collection when finish iteration
@state_collection.expects(:update)

# Oplog collection to return results
local_db = stub()
@mongo_connection.expects(:db).with('local').returns(local_db)
oplog_collection = stub()
local_db.expects(:collection).with('oplog.rs').returns(oplog_collection)
cursor = CursorStub.new
oplog_collection.expects(:find).yields(cursor)
cursor.generate_ops(10)
oplog_collection = stub(:find => cursor)
@mongo_connection.expects(:use).with('local').returns({ 'oplog.rs' => oplog_collection })

@tailer.tail(:from => BSON::Timestamp.new(Time.now.to_i, 0))

cursor.generate_ops(10)

count = 0
@tailer.stream do |record, state|
count = state.count
state.break
state.break
end
assert_equal(1, count)
end
Expand Down
Loading

0 comments on commit 0764dd4

Please sign in to comment.