Skip to content
Draft
1 change: 1 addition & 0 deletions gemfiles/standard.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
def standard_dependencies
gem 'yard', '>= 0.9.35'
gem 'ffi'
gem 'opentelemetry-sdk'

group :development, :testing do
gem 'jruby-openssl', platforms: :jruby
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
autoload :CGI, 'cgi'

require 'bson'
require 'opentelemetry-api'

require 'mongo/id'
require 'mongo/bson'
Expand Down Expand Up @@ -74,6 +75,7 @@
require 'mongo/socket'
require 'mongo/srv'
require 'mongo/timeout'
require 'mongo/tracing'
require 'mongo/uri'
require 'mongo/version'
require 'mongo/write_concern'
Expand Down
39 changes: 35 additions & 4 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Client
:ssl_verify_hostname,
:ssl_verify_ocsp_endpoint,
:timeout_ms,
:tracing,
:truncate_logs,
:user,
:wait_queue_timeout,
Expand Down Expand Up @@ -437,6 +438,20 @@ def hash
# See Ruby's Zlib module for valid levels.
# @option options [ Hash ] :resolv_options For internal driver use only.
# Options to pass through to Resolv::DNS constructor for SRV lookups.
# @option options [ Hash ] :tracing OpenTelemetry tracing options.
# - :enabled => Boolean, whether to enable OpenTelemetry tracing. The default
# value is nil that means that the configuration will be taken from the
# OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED environment variable.
# - :tracer => OpenTelemetry::Trace::Tracer, the tracer to use for
# tracing. Must be an implementation of OpenTelemetry::Trace::Tracer
# interface.
# - :query_text_max_length => Integer, the maximum length of the query text
# to be included in the span attributes. If the query text exceeds this
# length, it will be truncated. Value 0 means no query text
# will be included in the span attributes. The default value is nil that
# means that the configuration will be taken from the
# OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH environment
# variable.
# @option options [ Hash ] :auto_encryption_options Auto-encryption related
# options.
# - :key_vault_client => Client | nil, a client connected to the MongoDB
Expand Down Expand Up @@ -574,8 +589,12 @@ def initialize(addresses_or_uri, options = nil)

@connect_lock = Mutex.new
@connect_lock.synchronize do
@cluster = Cluster.new(addresses, @monitoring,
cluster_options.merge(srv_uri: srv_uri))
@cluster = Cluster.new(
addresses,
@monitoring,
tracer,
cluster_options.merge(srv_uri: srv_uri)
)
end

begin
Expand Down Expand Up @@ -893,7 +912,7 @@ def reconnect
@connect_lock.synchronize do
do_close rescue nil

@cluster = Cluster.new(addresses, monitoring, cluster_options)
@cluster = Cluster.new(addresses, monitoring, tracer, cluster_options)

if @options[:auto_encryption_options]
build_encrypter
Expand Down Expand Up @@ -965,7 +984,10 @@ def list_databases(filter = {}, name_only = false, opts = {})
cmd[:nameOnly] = !!name_only
cmd[:filter] = filter unless filter.empty?
cmd[:authorizedDatabases] = true if opts[:authorized_databases]
use(Database::ADMIN).database.read_command(cmd, opts).first[Database::DATABASES]
use(Database::ADMIN)
.database
.read_command(cmd, opts.merge(op_name: 'listDatabases'))
.first[Database::DATABASES]
end

# Returns a list of Mongo::Database objects.
Expand Down Expand Up @@ -1195,6 +1217,15 @@ def timeout_sec
end
end

def tracer
tracing_opts = @options[:tracing] || {}
@tracer ||= Tracing.create_tracer(
enabled: tracing_opts[:enabled],
query_text_max_length: tracing_opts[:query_text_max_length],
otel_tracer: tracing_opts[:tracer],
)
end

private

# Attempts to parse the given list of addresses, using the provided options.
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Cluster
# - *:deprecation_errors* -- boolean
#
# @since 2.0.0
def initialize(seeds, monitoring, options = Options::Redacted.new)
def initialize(seeds, monitoring, tracer = nil, options = Options::Redacted.new)
if seeds.nil?
raise ArgumentError, 'Seeds cannot be nil'
end
Expand All @@ -136,6 +136,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@update_lock = Mutex.new
@servers = []
@monitoring = monitoring
@tracer = tracer
@event_listeners = Event::Listeners.new
@app_metadata = Server::AppMetadata.new(@options.merge(purpose: :application))
@monitor_app_metadata = Server::Monitor::AppMetadata.new(@options.merge(purpose: :monitor))
Expand Down Expand Up @@ -309,6 +310,8 @@ def self.create(client, monitoring: nil)
# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring

attr_reader :tracer

# @return [ Object ] The cluster topology.
attr_reader :topology

Expand Down
102 changes: 56 additions & 46 deletions lib/mongo/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class Collection
# Delegate to the cluster for the next primary.
def_delegators :cluster, :next_primary

def_delegators :client, :tracer

# Options that can be updated on a new Collection instance via the #with method.
#
# @since 2.1.0
Expand Down Expand Up @@ -410,21 +412,24 @@ def create(opts = {})
client: client,
session: session
)
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
Operation::Create.new(
selector: operation,
db_name: database.name,
write_concern: write_concern,
session: session,
# Note that these are collection options, collation isn't
# taken from options passed to the create method.
collation: options[:collation] || options['collation'],
encrypted_fields: encrypted_fields,
validator: options[:validator],
).execute(
next_primary(nil, session),
context: context
)
operation = Operation::Create.new(
selector: operation,
db_name: database.name,
write_concern: write_concern,
session: session,
# Note that these are collection options, collation isn't
# taken from options passed to the create method.
collation: options[:collation] || options['collation'],
validator: options[:validator],
)
tracer.trace_operation(operation, context, op_name: 'createCollection') do
maybe_create_qe_collections(opts[:encrypted_fields], client, session) do |encrypted_fields|
operation.encrypted_fields = encrypted_fields
operation.execute(
next_primary(nil, session),
context: context
)
end
end
end
end
Expand Down Expand Up @@ -453,25 +458,27 @@ def create(opts = {})
# @since 2.0.0
def drop(opts = {})
client.with_session(opts) do |session|
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
temp_write_concern = write_concern
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
temp_write_concern
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
operation = Operation::Drop.new({
selector: { :drop => name },
db_name: database.name,
write_concern: write_concern,
session: session,
})
tracer.trace_operation(operation, context, op_name: 'dropCollection') do
maybe_drop_emm_collections(opts[:encrypted_fields], client, session) do
temp_write_concern = write_concern
write_concern = if opts[:write_concern]
WriteConcern.get(opts[:write_concern])
else
temp_write_concern
end
do_drop(operation, session, context)
end
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
operation = Operation::Drop.new({
selector: { :drop => name },
db_name: database.name,
write_concern: write_concern,
session: session,
})
do_drop(operation, session, context)
end
end
end
Expand Down Expand Up @@ -865,19 +872,22 @@ def insert_one(document, opts = {})
session: session,
operation_timeouts: operation_timeouts(opts)
)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
Operation::Insert.new(
:documents => [ document ],
:db_name => database.name,
:coll_name => name,
:write_concern => write_concern,
:bypass_document_validation => !!opts[:bypass_document_validation],
:options => opts,
:id_generator => client.options[:id_generator],
:session => session,
:txn_num => txn_num,
:comment => opts[:comment]
).execute_with_connection(connection, context: context)
operation = Operation::Insert.new(
:documents => [ document ],
:db_name => database.name,
:coll_name => name,
:write_concern => write_concern,
:bypass_document_validation => !!opts[:bypass_document_validation],
:options => opts,
:id_generator => client.options[:id_generator],
:session => session,
:comment => opts[:comment]
)
tracer.trace_operation(operation, context) do
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
operation.txn_num = txn_num
operation.execute_with_connection(connection, context: context)
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/collection/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class View
# Delegate to the cluster for the next primary.
def_delegators :cluster, :next_primary

def_delegators :client, :tracer

alias :selector :filter

# @return [ Integer | nil | The timeout_ms value that was passed as an
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/collection/view/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ class View
#
# @since 2.0.0
class Aggregation
extend Forwardable
include Behavior

# @return [ Array<Hash> ] pipeline The aggregation pipeline.
attr_reader :pipeline

def_delegators :view, :tracer

# Initialize the aggregation for the provided collection view, pipeline
# and options.
#
Expand Down Expand Up @@ -80,7 +83,7 @@ def new(options)
Aggregation.new(view, pipeline, options)
end

def initial_query_op(session, read_preference)
def initial_query_op(session, read_preference = nil)
Operation::Aggregate.new(aggregate_spec(session, read_preference))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/collection/view/aggregation/behavior.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def server_selector
@view.send(:server_selector)
end

def aggregate_spec(session, read_preference)
def aggregate_spec(session, read_preference = nil)
Builder::Aggregation.new(
pipeline,
view,
Expand Down
26 changes: 14 additions & 12 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,21 @@ def select_cursor(session)
operation_timeouts: operation_timeouts,
view: self
)

if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
op = initial_query_op(session)
tracer.trace_operation(op, context) do
if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
else
Cursor.new(view, result, server, session: session, context: context)
end
else
Cursor.new(view, result, server, session: session, context: context)
end
else
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
send_initial_query(server, context)
end
end
end
end
Expand Down
Loading
Loading