Skip to content

CSOT refactoring #1781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft

CSOT refactoring #1781

wants to merge 16 commits into from

Conversation

vbabanin
Copy link
Member

@vbabanin vbabanin commented Aug 10, 2025

Description

Following a CSOT code walkthrough in scope of Client Bulk Write API, we identified several improvements to make the Client-Side Operations Timeout (CSOT) code more predictable, consistent, and maintainable. This PR introduces naming refinements, clearer APIs, and safer handling of TimeoutContext to improve readability and reduce potential misuse.

The main change in this PR is that methods whose execution time can be limited by a timeout, or which are executed on a particular ClientSession or RequestContext, must now accept a separate OperationContext parameter.

The guiding principle is to avoid storing OperationContext in fields whenever possible. Instead, the OperationContext is passed explicitly, making it more predictable where a timeout originates and easier to trace through the call chain. Since OperationContext propagates down the tree of method calls, it can also be overridden at any level.

Some exceptions remain in entities such as Cursor and ClientSession, which must hold a reference to a specific OperationContext for their entire lifecycle. However, when any of their methods is called, a new OperationContext is created from the original to overrode a tImeout and passed down the call chain. This makes it easier to trace overrides and insert debugging hooks in the methods where contexts are instantiated.

To further improve predictability and reasoning about the code, both OperationContext and TimeoutContext are now immutable.

Key changes

  • Previously, ClusterBinding, ConnectionSource, and Cursor all shared the same OperationContext (and thus the same TimeoutContext), creating hidden coupling that required ad-hoc workarounds such as overriding the global timeout state and rolling it back to avoid side effects , see TimeoutContext.java#L298-L305. This change decouples OperationContext from ClusterBinding and ConnectionSource, requiring callers to explicitly provide it when invoking APIs such as getConnectionSource(OperationContext) and getConnection(OperationContext). By making the context an explicit parameter rather than relying on scattered internal state, the API becomes more predictable, easier to reason about, less prone to subtle lifecycle bugs and easier to compare with the CSOT spec.
  • The core cursor logic has been extracted into CoreCursor, allowing both tailable and non-tailable cursor operations -whether in ITERATION or CURSOR_LIFETIME mode (as implemented by CommandBatchCursor) to wrap a CoreCursor and manage timeouts by passing an OperationContext to next, tryNext, and closemethods. This separation also enablesChangeStreamCursorto wrapCoreCursorand implement its distinct timeout semantics without conflicting withCommandBatchCursor’slogic. Previously,CommandBatchCursorcarried two unrelated concerns, including the resetTimeoutOnClosing flag, which existed solely to letChangeStreamCursor` override timeout behavior. The new approach enforces a more clear separation of concerns, with each cursor type independently managing its timeout policy.
    Diagrams to present the changes made in cursors: CSOT refactoring #1781 (comment)
  • SyncOperationHelper and AsyncOperationHelper now propagate OperationContext to callbacks passed into withSourceAndConnection. Callbacks receive both the Connection and the OperationContext. Within withSourceAndConnection, the OperationContext passed to the callback includes minRTT, as the method creates a new context after selecting the ConnectionSource. This allows setting minRtt without mutating the original context from Operation.execute(), so in a callback a caller can chose what OperationContext to use further in the call chain, the original one without minRtt or the one with minRtt set.
  • withSourceAndConnection encapsulates the server selection timeout logic, computing the effective timeout (min(serverSelectionTimeout, timeoutMS)) per the specification and applying it to both server selection and connection checkout. This keeps the logic centralized and original context immutable.

JAVA-5640 - CSOT clarity enhancement/refactoring.
JAVA-5644 - this issue is also fixed since TimeoutContext is fully immutable now.

katcharov and others added 6 commits July 4, 2025 10:54
Make TimeoutContext immutable.
# Conflicts:
#	driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
#	driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
#	driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CountOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/ReadOperationCursor.java
#	driver-core/src/main/com/mongodb/internal/operation/ReadOperationMapReduceCursor.java
#	driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java
#	driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java
#	driver-core/src/test/functional/com/mongodb/ClusterFixture.java
#	driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
#	driver-core/src/test/unit/com/mongodb/internal/operation/OperationUnitSpecification.groovy
#	driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java
#	driver-legacy/src/main/com/mongodb/MongoClient.java
#	driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java
#	driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
#	driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java
#	driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java
#	driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java
#	driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
@vbabanin vbabanin self-assigned this Aug 10, 2025
@@ -114,7 +110,8 @@ private ClusterBindingConnectionSource(final ServerTuple serverTuple, final Read
this.server = serverTuple.getServer();
this.serverDescription = serverTuple.getServerDescription();
this.appliedReadPreference = appliedReadPreference;
operationContext.getTimeoutContext().minRoundTripTimeMS(NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos()));
Copy link
Member Author

@vbabanin vbabanin Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, TimeoutContext was mutable, and we would set minRoundTripTime on it once a connection source (i.e., server) was selected. Now, this logic has been moved into SyncOperationHelper:

The reasoning is that once a server is selected, we know the round-trip time and can propagate a modified OperationContext to getConnection. This method either retrieves a connection from the pool or establishes a new one, and then passes it to the operation’s callback.

Note: minRoundTripTime is used when calculating the maxTimeMS value appended to a command in CommandMessage.

The same change has been applied to the async counterpart in AsyncOperationHelper.

Comment on lines -230 to -231
OperationContext operationContext = originalOperationContext
.withTimeoutContext(originalOperationContext.getTimeoutContext().withComputedServerSelectionTimeoutContext());
Copy link
Member Author

@vbabanin vbabanin Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we already create an OperationContext with a started server selection timeout and pass it to getConnection(operationContext), this selected additional step is unnecessary. The entire connection checkout is already capped by the computed server selection timeout.

Relevant spec:

After a server has been selected, drivers MUST use the remaining computedServerSelectionTimeout value as the timeout for connection checkout.

Relevant code:
SyncOperationHelper.java#L141-L144

source -> withSuppliedResource(
                        source::getConnection,
                        wrapConnectionSourceException,
                        serverSelectionOperationContext.withMinRoundTripTime(source.getServerDescription())

Comment on lines +134 to +149
OperationContext serverSelectionOperationContext =
originalOperationContext.withOverride(TimeoutContext::withComputedServerSelectionTimeout);

return withSuppliedResource(
sourceFunction,
wrapConnectionSourceException,
serverSelectionOperationContext,
source -> withSuppliedResource(
source::getConnection,
wrapConnectionSourceException,
serverSelectionOperationContext.withMinRoundTripTime(source.getServerDescription()),
connection -> function.apply(
source,
connection,
originalOperationContext.withMinRoundTripTime(source.getServerDescription())))
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant spec section:

If timeoutMS is set, drivers MUST use min(serverSelectionTimeoutMS, remaining timeoutMS), referred to as computedServerSelectionTimeout as the timeout for server selection and connection checkout.

After a server has been selected, drivers MUST use the remaining computedServerSelectionTimeout value as the timeout for connection checkout.

Copy link
Member Author

@vbabanin vbabanin Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current cursor diagram

diagram

Refactored cursor diagram

diagram

}

private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<T>> callback, final boolean tryNext) {
timeoutContext.resetTimeoutIfPresent();
Copy link
Member Author

@vbabanin vbabanin Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling timeoutContext.resetTimeoutIfPresent() was violating the spec.

From the spec:

“If set, drivers MUST apply the timeoutMS option to the initial aggregate operation. Drivers MUST also apply the original timeoutMS value to each next call on the change stream.”

Because resume attempts occur within next() and are invisible to the user, they must share the same original timeout. This change moves the reset logic into next().

import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
import static java.util.Collections.emptyList;
public class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the logic has been moved into AsyncCoreCursor so it can be shared between AsyncChangeStreamBatchCursor and AsyncCommandBatchCursor.

As a result, AsyncCommandBatchCursor is now a small timeout-management wrapper whose structure mirrors the specification.

@vbabanin vbabanin requested a review from Copilot August 13, 2025 05:14
Copilot

This comment was marked as outdated.

@vbabanin vbabanin requested a review from Copilot August 13, 2025 18:02
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the Client Side Operation Timeout (CSOT) implementation by removing the OperationContext from binding classes and instead passing it explicitly to operation execution methods. The changes modernize the API design and improve consistency across sync and async operations.

Key changes:

  • Removed getOperationContext() methods from binding interfaces and implementations
  • Modified operation execution methods to accept OperationContext as an explicit parameter
  • Updated connection source methods to take OperationContext as parameter
  • Refactored ClientSessionBinding to create session contexts at execution time rather than during binding creation

Reviewed Changes

Copilot reviewed 149 out of 150 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
driver-sync/src/test/unit/com/mongodb/client/internal/ClientSessionBindingSpecification.groovy Updated test to verify binding delegates to wrapped binding with explicit OperationContext
driver-sync/src/test/resources/logback-test.xml Changed log level from INFO to DEBUG for testing
driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java Added imports and minor test method name changes
driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java Refactored to create OperationContext with session context at execution time
driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java Updated operation execution methods to accept OperationContext parameter
driver-sync/src/main/com/mongodb/client/internal/CryptConnection.java Renamed timeout variable for consistency
driver-sync/src/main/com/mongodb/client/internal/CryptBinding.java Removed getOperationContext() method and updated connection source methods
driver-sync/src/main/com/mongodb/client/internal/Crypt.java Parameter renaming and method signature updates
driver-sync/src/main/com/mongodb/client/internal/CommandMarker.java Parameter renaming from operationTimeout to timeout
driver-sync/src/main/com/mongodb/client/internal/CollectionInfoRetriever.java Parameter renaming for consistency
driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java Major refactoring to remove OperationContext from binding and create session context at execution time
driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ClientSessionBindingSpecification.groovy Similar updates to sync version for async binding tests

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@@ -242,7 +242,10 @@ public ReadConcern getReadConcern() {
} else if (isSnapshot()) {
return ReadConcern.SNAPSHOT;
} else {
return wrapped.getOperationContext().getSessionContext().getReadConcern();
Copy link
Member Author

@vbabanin vbabanin Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the read concern specified on the wrapped AsyncClusterAwareBindingContext ( inherited from either MongoCollection, MongoDatabase, etc. see

AsyncClusterAwareReadWriteBinding readWriteBinding = new AsyncClusterBinding(mongoClient.getCluster(),
getReadPreferenceForBinding(readPreference, session), readConcern,
getOperationContext(requestContext, session, readConcern));
).

With OperationContext removed from Bindings, we can now embed the parent read concern directly and name it as such.

@@ -215,7 +223,10 @@ public ReadConcern getReadConcern() {
} else if (isSnapshot()) {
return ReadConcern.SNAPSHOT;
} else {
return wrapped.getOperationContext().getSessionContext().getReadConcern();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the read concern specified on the wrapped ClusterAwareBindingContext ( inherited from either MongoCollection, MongoDatabase, etc. see

mongo-java-driver/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java

Lines 479 to 483 in 355c6ea

ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference,
final ReadConcern readConcern, final ClientSession session, final boolean ownsSession) {

 ClusterAwareReadWriteBinding readWriteBinding = new ClusterBinding(cluster, 
         getReadPreferenceForBinding(readPreference, session), readConcern, getOperationContext(session, readConcern)); 

).

With OperationContext removed from Bindings, we can now embed the parent read concern directly and name it as such.

@vbabanin vbabanin requested a review from rozza August 13, 2025 19:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants