Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f650d43
Add tracing support using Micrometer
nhachicha Apr 29, 2025
4a16c29
Simplifying bookkeeping logic for Span grouping
nhachicha Aug 3, 2025
33d46e8
Merging back missing changes after rebase
nhachicha Aug 4, 2025
86f1791
Fixing tests
nhachicha Aug 4, 2025
050fb60
Merge remote-tracking branch 'origin/main' into JAVA-5733
nhachicha Aug 5, 2025
da3ac32
Adding method to get namespace for Read and Write Operations
nhachicha Aug 10, 2025
1a75290
Refactoring operations and command spans
nhachicha Aug 10, 2025
26cc57a
Refactor the tracing implementation to use Observation instead of wor…
nhachicha Oct 2, 2025
45f3cb8
Added Prose Tests
nhachicha Oct 6, 2025
39483e4
Fixing test dependencies
nhachicha Oct 6, 2025
0ecfb19
Skipping non-compliant tests
nhachicha Oct 8, 2025
0523d41
Merge remote-tracking branch 'origin/main' into JAVA-5733
nhachicha Oct 8, 2025
fb1f1af
Fixing test deps for Scala
nhachicha Oct 8, 2025
0628386
Refactored some type visibility per PR feedback
nhachicha Oct 8, 2025
d3f2d62
Fixing javadoc & Scala test
nhachicha Oct 8, 2025
c490154
using JVM option behind a flag
nhachicha Oct 9, 2025
edb9c60
Refactoring logic around env variables.
nhachicha Oct 11, 2025
438f541
Merge remote-tracking branch 'origin/main' into JAVA-5733
nhachicha Oct 11, 2025
bf7b086
removed unused method
nhachicha Oct 13, 2025
360af35
update Java ticket reference
nhachicha Oct 13, 2025
471524b
Merge remote-tracking branch 'origin/main' into JAVA-5733
nhachicha Oct 13, 2025
427554b
switching to new spec branch for testing
nhachicha Oct 13, 2025
d4df11b
PR feedback refactoring:
nhachicha Oct 14, 2025
9a3696e
Extracting logic from InternalStreamConnection
nhachicha Oct 14, 2025
9a4889c
update submodule
nhachicha Oct 14, 2025
1f202bb
Update driver-core/src/main/com/mongodb/internal/tracing/MicrometerTr…
nhachicha Oct 15, 2025
7e5f1f0
Update driver-sync/src/test/functional/com/mongodb/client/tracing/Mic…
nhachicha Oct 15, 2025
da6bff1
Update driver-sync/src/test/functional/com/mongodb/client/tracing/Mic…
nhachicha Oct 15, 2025
46e2932
Update driver-core/src/main/com/mongodb/internal/tracing/TracingManag…
nhachicha Oct 15, 2025
d71c5c4
Update driver-core/src/main/com/mongodb/internal/tracing/TracingManag…
nhachicha Oct 15, 2025
e3b1c93
PR feedback
nhachicha Oct 15, 2025
c5a429e
Fixing tests
nhachicha Oct 15, 2025
d7b4449
Update driver-core/src/main/com/mongodb/MongoNamespace.java
nhachicha Oct 15, 2025
9f611b1
Update driver-core/src/main/com/mongodb/internal/tracing/TracingManag…
nhachicha Oct 16, 2025
a1ad428
Update driver-core/src/main/com/mongodb/observability/ObservabilitySe…
nhachicha Oct 16, 2025
1a8c59e
switching to main submodule commit; updating skipped tests; adding mi…
nhachicha Oct 16, 2025
b3e85fd
adding skip tests
nhachicha Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/project/Companion.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ import org.gradle.kotlin.dsl.getByType
internal val Project.libs: LibrariesForLibs
get() = extensions.getByType()

internal const val DEFAULT_JAVA_VERSION = 17
const val DEFAULT_JAVA_VERSION = 17
1 change: 1 addition & 0 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

<!-- Allow printStackTrace in this file -->
<suppress checks="Regexp" files="CallbackResultHolder"/>
<suppress checks="Regexp" files="MicrometerTracer"/>

<!--Do not check documentation tests classes -->
<suppress checks="Javadoc*" files=".*documentation.*"/>
Expand Down
3 changes: 3 additions & 0 deletions driver-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ dependencies {
optionalImplementation(libs.snappy.java)
optionalImplementation(libs.zstd.jni)

optionalImplementation(platform(libs.micrometer.observation.bom))
optionalImplementation(libs.micrometer.observation)

testImplementation(project(path = ":bson", configuration = "testArtifacts"))
testImplementation(libs.reflections)
testImplementation(libs.netty.tcnative.boringssl.static)
Expand Down
81 changes: 81 additions & 0 deletions driver-core/src/main/com/mongodb/MongoClientSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.InetAddressResolver;
import io.micrometer.observation.ObservationRegistry;
import org.bson.UuidRepresentation;
import org.bson.codecs.BsonCodecProvider;
import org.bson.codecs.BsonValueCodecProvider;
Expand All @@ -57,6 +59,7 @@
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.TimeoutSettings.convertAndValidateTimeout;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
Expand Down Expand Up @@ -119,6 +122,15 @@ public final class MongoClientSettings {
@Nullable
private final Long timeoutMS;

@VisibleForTesting(otherwise = PRIVATE)
// If set, this will enable/disable tracing even when an observationRegistry has been passed.
public static final String ENV_OTEL_ENABLED = "OTEL_JAVA_INSTRUMENTATION_MONGODB_ENABLED";
@VisibleForTesting(otherwise = PRIVATE)
// If set, this will truncate the command payload captured in the tracing span to the specified length.
public static final String ENV_OTEL_QUERY_TEXT_MAX_LENGTH = "OTEL_JAVA_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH";
private final ObservationRegistry observationRegistry;
private final boolean enableCommandPayloadTracing;

/**
* Gets the default codec registry. It includes the following providers:
*
Expand Down Expand Up @@ -238,6 +250,8 @@ public static final class Builder {
private ContextProvider contextProvider;
private DnsClient dnsClient;
private InetAddressResolver inetAddressResolver;
private ObservationRegistry observationRegistry;
private boolean enableCommandPayloadTracing;

private Builder() {
}
Expand Down Expand Up @@ -275,6 +289,8 @@ private Builder(final MongoClientSettings settings) {
if (settings.heartbeatSocketTimeoutSetExplicitly) {
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
}
observationRegistry = settings.observationRegistry;
enableCommandPayloadTracing = settings.enableCommandPayloadTracing;
}

/**
Expand Down Expand Up @@ -723,6 +739,47 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
return this;
}

/**
* Sets the observation registry to use for creating tracing Spans for operations, commands and transactions.
*
* <p> If set the environment variable {@value ENV_OTEL_ENABLED} is used to enable or disable the creation of tracing spans.
*
* <p> If set the environment variable {@value ENV_OTEL_QUERY_TEXT_MAX_LENGTH} is used to determine the maximum length
* of command payloads captured in tracing spans. If the environment variable is not set, the entire command payloads is
* captured.
*
* @param observationRegistry the observation registry
* @return this
* @since 5.7
*/
@Alpha(Reason.CLIENT)
public Builder observationRegistry(final ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
return this;
}

/**
* Sets the observation registry to use for creating tracing Spans for operations, commands and transactions.
*
* <p> If set the environment variable {@value ENV_OTEL_ENABLED} is used to enable or disable the creation of tracing spans.
*
* <p> If set the environment variable {@value ENV_OTEL_QUERY_TEXT_MAX_LENGTH} is used to determine the maximum length
* of command payloads captured in tracing spans. If the environment variable is not set, the entire command payloads is
* captured.
*
* @param observationRegistry the observation registry
* @param enableCommandPayload whether command payloads should be captured in tracing spans. This may have performance
* implications so should be used with care.
* @return this
* @since 5.7
*/
@Alpha(Reason.CLIENT)
public Builder observationRegistry(final ObservationRegistry observationRegistry, final boolean enableCommandPayload) {
this.observationRegistry = observationRegistry;
this.enableCommandPayloadTracing = enableCommandPayload;
return this;
}

/**
* Build an instance of {@code MongoClientSettings}.
*
Expand Down Expand Up @@ -1040,6 +1097,27 @@ public ContextProvider getContextProvider() {
return contextProvider;
}

/** Get the ObservationRegistry to create tracing Spans for operations, commands and transactions.
*
* @return the configured ObservationRegistry
* @since 5.7
*/
@Nullable
@Alpha(Reason.CLIENT)
public ObservationRegistry getObservationRegistry() {
return observationRegistry;
}

/** Returns true if command payload tracing is enabled.
*
* @return the enableCommandPayloadTracing value
* @since 5.7
*/
@Alpha(Reason.CLIENT)
public boolean isCommandPayloadTracingEnabled() {
return enableCommandPayloadTracing;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -1156,5 +1234,8 @@ private MongoClientSettings(final Builder builder) {
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
contextProvider = builder.contextProvider;
timeoutMS = builder.timeoutMS;

observationRegistry = builder.observationRegistry;
enableCommandPayloadTracing = builder.enableCommandPayloadTracing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.internal.tracing.Span;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -94,6 +95,8 @@
import static com.mongodb.internal.connection.ProtocolHelper.isCommandOk;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException;
import static com.mongodb.internal.tracing.MongodbObservation.HighCardinalityKeyNames.QUERY_TEXT;
import static com.mongodb.internal.tracing.MongodbObservation.LowCardinalityKeyNames.RESPONSE_STATUS_CODE;
import static java.util.Arrays.asList;

/**
Expand Down Expand Up @@ -432,22 +435,59 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
private <T> T sendAndReceiveInternal(final CommandMessage message, final Decoder<T> decoder,
final OperationContext operationContext) {
CommandEventSender commandEventSender;
Span tracingSpan;
try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) {
message.encode(bsonOutput, operationContext);
commandEventSender = createCommandEventSender(message, bsonOutput, operationContext);
commandEventSender.sendStartedEvent();
tracingSpan = operationContext
.getTracingManager()
.createTracingSpan(message,
operationContext,
() -> message.getCommandDocument(bsonOutput),
cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName)
|| SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName),
() -> getDescription().getServerAddress(),
() -> getDescription().getConnectionId()
);

boolean isLoggingCommandNeeded = isLoggingCommandNeeded();
boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled();

// Only hydrate the command document if necessary
BsonDocument commandDocument = null;
if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded) {
commandDocument = message.getCommandDocument(bsonOutput);
}
if (isLoggingCommandNeeded) {
commandEventSender = new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, commandDocument,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
commandEventSender.sendStartedEvent();
} else {
commandEventSender = new NoOpCommandEventSender();
}
if (isTracingCommandPayloadNeeded) {
tracingSpan.tagHighCardinality(QUERY_TEXT.asString(), commandDocument);
}

try {
sendCommandMessage(message, bsonOutput, operationContext);
} catch (Exception e) {
if (tracingSpan != null) {
tracingSpan.error(e);
}
commandEventSender.sendFailedEvent(e);
throw e;
}
}

if (message.isResponseExpected()) {
return receiveCommandMessageResponse(decoder, commandEventSender, operationContext);
return receiveCommandMessageResponse(decoder, commandEventSender, operationContext, tracingSpan);
} else {
commandEventSender.sendSucceededEventForOneWayCommand();
if (tracingSpan != null) {
tracingSpan.end();
}
return null;
}
}
Expand All @@ -466,7 +506,7 @@ public <T> void send(final CommandMessage message, final Decoder<T> decoder, fin
@Override
public <T> T receive(final Decoder<T> decoder, final OperationContext operationContext) {
isTrue("Response is expected", hasMoreToCome);
return receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), operationContext);
return receiveCommandMessageResponse(decoder, new NoOpCommandEventSender(), operationContext, null);
}

@Override
Expand Down Expand Up @@ -512,7 +552,7 @@ private void trySendMessage(final CommandMessage message, final ByteBufferBsonOu
}

private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final CommandEventSender commandEventSender,
final OperationContext operationContext) {
final OperationContext operationContext, @Nullable final Span tracingSpan) {
boolean commandSuccessful = false;
try (ResponseBuffers responseBuffers = receiveResponseBuffers(operationContext)) {
updateSessionContext(operationContext.getSessionContext(), responseBuffers);
Expand All @@ -537,7 +577,17 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder, final Comm
if (!commandSuccessful) {
commandEventSender.sendFailedEvent(e);
}
if (tracingSpan != null) {
if (e instanceof MongoCommandException) {
tracingSpan.tagLowCardinality(RESPONSE_STATUS_CODE.withValue(String.valueOf(((MongoCommandException) e).getErrorCode())));
}
tracingSpan.error(e);
}
throw e;
} finally {
if (tracingSpan != null) {
tracingSpan.end();
}
}
}

Expand All @@ -553,7 +603,18 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final

try {
message.encode(bsonOutput, operationContext);
CommandEventSender commandEventSender = createCommandEventSender(message, bsonOutput, operationContext);

CommandEventSender commandEventSender;
if (isLoggingCommandNeeded()) {
BsonDocument commandDocument = message.getCommandDocument(bsonOutput);
commandEventSender = new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, commandDocument,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
} else {
commandEventSender = new NoOpCommandEventSender();
}

commandEventSender.sendStartedEvent();
Compressor localSendCompressor = sendCompressor;
if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) {
Expand Down Expand Up @@ -952,19 +1013,13 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t

private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger("protocol.command");

private CommandEventSender createCommandEventSender(final CommandMessage message, final ByteBufferBsonOutput bsonOutput,
final OperationContext operationContext) {
private boolean isLoggingCommandNeeded() {
boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER.isRequired(DEBUG, getClusterId());
if (!recordEverything && (isMonitoringConnection || !opened() || !authenticated.get() || !listensOrLogs)) {
return new NoOpCommandEventSender();
}
return new LoggingCommandEventSender(
SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener,
operationContext, message, bsonOutput,
COMMAND_PROTOCOL_LOGGER, loggerSettings);
return recordEverything || (!isMonitoringConnection && opened() && authenticated.get() && listensOrLogs);
}

private ClusterId getClusterId() {
return description.getConnectionId().getServerId().getClusterId();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class LoggingCommandEventSender implements CommandEventSender {
@Nullable final CommandListener commandListener,
final OperationContext operationContext,
final CommandMessage message,
final ByteBufferBsonOutput bsonOutput,
final BsonDocument commandDocument,
final StructuredLogger logger,
final LoggerSettings loggerSettings) {
this.description = description;
Expand All @@ -88,7 +88,7 @@ class LoggingCommandEventSender implements CommandEventSender {
this.loggerSettings = loggerSettings;
this.startTimeNanos = System.nanoTime();
this.message = message;
this.commandDocument = message.getCommandDocument(bsonOutput);
this.commandDocument = commandDocument;
this.commandName = commandDocument.getFirstKey();
this.redactionRequired = securitySensitiveCommands.contains(commandName)
|| (securitySensitiveHelloCommands.contains(commandName) && commandDocument.containsKey("speculativeAuthenticate"));
Expand Down
Loading