Skip to content

Commit d6f7509

Browse files
With nesting
1 parent 87e8ac3 commit d6f7509

File tree

12 files changed

+156
-78
lines changed

12 files changed

+156
-78
lines changed

lib/mongo/client.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,8 +589,12 @@ def initialize(addresses_or_uri, options = nil)
589589

590590
@connect_lock = Mutex.new
591591
@connect_lock.synchronize do
592-
@cluster = Cluster.new(addresses, @monitoring,
593-
cluster_options.merge(srv_uri: srv_uri))
592+
@cluster = Cluster.new(
593+
addresses,
594+
@monitoring,
595+
tracer,
596+
cluster_options.merge(srv_uri: srv_uri)
597+
)
594598
end
595599

596600
begin

lib/mongo/cluster.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class Cluster
117117
# - *:deprecation_errors* -- boolean
118118
#
119119
# @since 2.0.0
120-
def initialize(seeds, monitoring, options = Options::Redacted.new)
120+
def initialize(seeds, monitoring, tracer = nil, options = Options::Redacted.new)
121121
if seeds.nil?
122122
raise ArgumentError, 'Seeds cannot be nil'
123123
end
@@ -136,6 +136,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
136136
@update_lock = Mutex.new
137137
@servers = []
138138
@monitoring = monitoring
139+
@tracer = tracer
139140
@event_listeners = Event::Listeners.new
140141
@app_metadata = Server::AppMetadata.new(@options.merge(purpose: :application))
141142
@monitor_app_metadata = Server::Monitor::AppMetadata.new(@options.merge(purpose: :monitor))
@@ -309,6 +310,8 @@ def self.create(client, monitoring: nil)
309310
# @return [ Monitoring ] monitoring The monitoring.
310311
attr_reader :monitoring
311312

313+
attr_reader :tracer
314+
312315
# @return [ Object ] The cluster topology.
313316
attr_reader :topology
314317

lib/mongo/collection.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ def insert_one(document, opts = {})
878878
:session => session,
879879
:comment => opts[:comment]
880880
)
881-
tracer.trace_operation('insert_one', operation, context) do
881+
tracer.trace_operation(operation, context) do
882882
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
883883
operation.txn_num = txn_num
884884
operation.execute_with_connection(connection, context: context)

lib/mongo/collection/view/iterable.rb

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,7 @@ def select_cursor(session)
8989
view: self
9090
)
9191
op = initial_query_op(session)
92-
op_name = case op
93-
when Mongo::Operation::Find
94-
'find'
95-
when Mongo::Operation::Aggregate
96-
'aggregate'
97-
else
98-
op.class.name.split('::').last.downcase
99-
end
100-
tracer.trace_operation(op_name, op, context) do
92+
tracer.trace_operation(op, context) do
10193
if respond_to?(:write?, true) && write?
10294
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
10395
result = send_initial_query(server, context)

lib/mongo/cursor.rb

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class Cursor
3939
def_delegators :@view, :collection
4040
def_delegators :collection, :client, :database
4141
def_delegators :@server, :cluster
42-
def_delegators :client, :tracer
4342

4443
# @return [ Collection::View ] view The collection view.
4544
attr_reader :view
@@ -514,19 +513,11 @@ def unregister
514513
end
515514

516515
def execute_operation(op, context: nil)
517-
op_name = case op
518-
when Mongo::Operation::GetMore
519-
'get_more'
520-
when Mongo::Operation::Close
521-
'close'
522-
end
523516
op_context = context || possibly_refreshed_context
524-
tracer.trace_operation('find', op, op_context) do
525-
if @connection.nil?
526-
op.execute(@server, context: op_context)
527-
else
528-
op.execute_with_connection(@connection, context: op_context)
529-
end
517+
if @connection.nil?
518+
op.execute(@server, context: op_context)
519+
else
520+
op.execute_with_connection(@connection, context: op_context)
530521
end
531522
end
532523

lib/mongo/operation/context.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def initialize(
6969
attr_reader :session
7070
attr_reader :view
7171
attr_reader :options
72-
attr_accessor :tracer
7372

7473
# Returns a new Operation::Context with the deadline refreshed
7574
# and relative to the current moment.

lib/mongo/operation/shared/executable.rb

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,7 @@ def result_class
105105

106106
def get_result(connection, context, options = {})
107107
message = build_message(connection, context)
108-
if (tracer = context.tracer)
109-
tracer.trace_command(message, context, connection) do
110-
result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection)
111-
end
112-
else
108+
connection.tracer.trace_command(message, context, connection) do
113109
result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection)
114110
end
115111
end

lib/mongo/server.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ def compressor
218218
# @api private
219219
def_delegators :cluster,
220220
:monitor_app_metadata,
221-
:push_monitor_app_metadata
221+
:push_monitor_app_metadata,
222+
:tracer
222223

223224
def_delegators :features,
224225
:check_driver_support!

lib/mongo/server/connection.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ def initialize(server, options = {})
139139
# across all connections.
140140
attr_reader :global_id
141141

142+
def_delegators :server, :tracer
143+
142144
# The connection pool from which this connection was created.
143145
# May be nil.
144146
#

lib/mongo/tracing/open_telemetry/command_tracer.rb

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,60 +17,105 @@
1717
module Mongo
1818
module Tracing
1919
module OpenTelemetry
20+
# CommandTracer is responsible for tracing MongoDB server commands using OpenTelemetry.
21+
#
22+
# @api private
2023
class CommandTracer
21-
def initialize(otel_tracer, query_text_max_length: 0)
24+
extend Forwardable
25+
26+
def_delegators :@parent_tracer,
27+
:cursor_context_map,
28+
:parent_context_for,
29+
:transaction_context_map,
30+
:transaction_map_key
31+
32+
def initialize(otel_tracer, parent_tracer, query_text_max_length: 0)
2233
@otel_tracer = otel_tracer
34+
@parent_tracer = parent_tracer
2335
@query_text_max_length = query_text_max_length
2436
end
2537

26-
def trace_command(message, _operation_context, connection)
27-
@otel_tracer.in_span(
38+
def trace_command(message, operation_context, connection)
39+
parent_context = parent_context_for(operation_context, cursor_id(message))
40+
span = @otel_tracer.start_span(
2841
command_span_name(message),
2942
attributes: span_attributes(message, connection),
43+
with_parent: parent_context,
3044
kind: :client
31-
) do |span, _context|
45+
)
46+
::OpenTelemetry::Trace.with_span(span) do |s, c|
47+
# TODO: process cursor context if applicable
3248
yield.tap do |result|
33-
if result.respond_to?(:cursor_id) && result.cursor_id.positive?
34-
span.set_attribute('db.mongodb.cursor_id', result.cursor_id)
35-
end
49+
process_cursor_context(result, cursor_id(message), c, s)
3650
end
3751
end
52+
rescue Exception => e
53+
span&.record_exception(e)
54+
span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
55+
raise e
56+
ensure
57+
span&.finish
3858
end
3959

4060
private
4161

4262
def span_attributes(message, connection)
4363
{
4464
'db.system' => 'mongodb',
45-
'db.namespace' => message.documents.first['$db'],
65+
'db.namespace' => database(message),
4666
'db.collection.name' => collection_name(message),
47-
'db.operation.name' => message.documents.first.keys.first,
67+
'db.command.name' => command_name(message),
4868
'server.port' => connection.address.port,
4969
'server.address' => connection.address.host,
5070
'network.transport' => connection.transport.to_s,
5171
'db.mongodb.server_connection_id' => connection.server.description.server_connection_id,
5272
'db.mongodb.driver_connection_id' => connection.id,
73+
'db.mongodb.cursor_id' => cursor_id(message),
5374
'db.query.text' => query_text(message)
5475
}.compact
5576
end
5677

78+
def process_cursor_context(result, cursor_id, context, span)
79+
if result.respond_to?(:cursor_id) && result.cursor_id.positive?
80+
span.set_attribute('db.mongodb.cursor_id', result.cursor_id)
81+
end
82+
end
83+
5784
def command_span_name(message)
58-
message.documents.first.keys.first
85+
if (coll_name = collection_name(message))
86+
"#{command_name(message)} #{database(message)}.#{coll_name}"
87+
else
88+
"#{command_name(message)} #{database(message)}"
89+
end
5990
end
6091

6192
def collection_name(message)
6293
case message.documents.first.keys.first
6394
when 'getMore'
64-
message.documents.first['collection']
95+
message.documents.first['collection'].to_s
6596
else
66-
message.documents.first.values.first
97+
message.documents.first.values.first.to_s
6798
end
6899
end
69100

101+
def command_name(message)
102+
message.documents.first.keys.first.to_s
103+
end
104+
105+
def database(message)
106+
message.documents.first['$db'].to_s
107+
end
108+
70109
def query_text?
71110
@query_text_max_length.positive?
72111
end
73112

113+
def cursor_id(message)
114+
if command_name(message) == 'getMore'
115+
message.documents.first['getMore'].value
116+
end
117+
end
118+
74119
EXCLUDED_KEYS = %w[lsid $db $clusterTime signature].freeze
75120
ELLIPSES = '...'
76121

0 commit comments

Comments
 (0)