Skip to content

CSHARP-5560: CSOT: Refactor IOperationExecutor to use operation context #1676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 5, 2025

Conversation

sanych-sun
Copy link
Member

No description provided.

@sanych-sun sanych-sun requested a review from a team as a code owner April 24, 2025 00:08
@sanych-sun sanych-sun requested review from BorisDog and adelinowona and removed request for a team April 24, 2025 00:08
Copy link
Contributor

@adelinowona adelinowona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! left some minor comments

Copy link
Contributor

@BorisDog BorisDog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review: minor comments so far.

@@ -58,6 +58,8 @@ private MongoCollectionImpl(IMongoDatabase database, CollectionNamespace collect
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));

_messageEncoderSettings = GetMessageEncoderSettings();
_readOperationOptions = new ReadOperationOptions(DefaultReadPreference: _settings.ReadPreference);
_writeOperationOptions = new WriteOperationOptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor:

_readOperationOptions = new()
_writeOperationOptions = new(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Ensure.IsNotNullOrEmpty(name, nameof(name));
if (name == "*")
{
throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: mistake in the original code, should be DropAll and DropAllAsync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -50,6 +51,9 @@ public MongoDatabase(IMongoClient client, DatabaseNamespace databaseNamespace, M
_settings = Ensure.IsNotNull(settings, nameof(settings)).Freeze();
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
_operationExecutor = Ensure.IsNotNull(operationExecutor, nameof(operationExecutor));

_readOperationOptions = new ReadOperationOptions(DefaultReadPreference: _settings.ReadPreference);
_writeOperationOptions = new WriteOperationOptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: new(...) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private IReadWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session)
{
// TODO: CreateReadWriteBinding from MongoClient did not used ChannelPinningHelper, double-check if it's OK to start using it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to address this TODO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add parameter to disable channel pinning + ticket to investigate why do we need that: https://jira.mongodb.org/browse/CSHARP-5600

@@ -100,7 +104,7 @@ public MongoClient(string connectionString)
internal MongoClient(IOperationExecutor operationExecutor, MongoClientSettings settings)
: this(settings)
{
_operationExecutor = operationExecutor;
_operationExecutor = new OperationExecutorWrapper(operationExecutor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the Fork approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorisDog what is the Fork approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is: MongoClient implements IDisposable, and disposed mongo client throws ObjectDisposedException. MongoDatabase and MongoCollection are not disposable on other hand. However most of work regarding commands execution is implemented in OperationExecutor, that is passed into Database and Collection. OperationExecutor holds reference to mongo client, but not used to implement IDisposable - which means that operation executor passed to database would be half-operational after the mongo client would be disposed. By half-operational I mean it will throw on any code path which require accessing mongo client: for example any implicit session creation will require that.

This is what I've tried to address here - I've made OperationExecutor disposable and move all disposed checks there. So now we will have consistent behaviour: once mongo client will be disposed - all databases and collections created from the client will throw. So this is a good part.

The bed part: mongo client has a number of methods to clone itself, by adjusting some settings (like WithReadConcern). Such method should create a new copy of mongo client with separate operation executor - so disposal of the original mongo client will not affect the created one. Plus we have a tests that wants to substitute implementation of the OperationExcecutor with the mock.

The only option to achieve all of that is to introduce the OperationExecutorFactory parameter - the factory method that accepts IMongoClient and returns OperationExecutor. So tests could substitute it, and clonning methods could ask for the fresh OperationExecutor.

{
[Theory]
[ParameterAttributeData]
public async Task StartImplicitSession_should_call_cluster([Values(true, false)]bool isAsync)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: _cluster_StartSession

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


namespace MongoDB.Driver
{
internal record class WriteOperationOptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove class here too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@BorisDog BorisDog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@adelinowona adelinowona left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

return UsingImplicitSession(session => ListCollectionNames(session, options, cancellationToken), cancellationToken);
var cursor = _operationExecutor.ExecuteReadOperation(
CreateListCollectionNamesOperation(options),
_readOperationOptions with { DefaultReadPreference = ReadPreference.Primary },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why DefaultReadPreference instead of ExplicitReadPreference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came from ListCollectionNames method, see line 358 before the changes.

{
Ensure.IsNotNull((object)requests, nameof(requests));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually arguments are validated by public methods only, to alert the user that they passed an invalid value.

When WE call private methods WE should never pass an invalid argument, so it is odd to be calling Ensure.IsNotNull here in a private method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to convert most of the public methods to expression-body methods (see other methods, for example FindOneAndDelete). Therefore I've decided to move some parameters validation inside of CreateXXXOperation methods.

Oleksandr Poliakov added 3 commits June 2, 2025 18:38
@sanych-sun sanych-sun requested a review from rstam June 3, 2025 16:14
_readOperationOptions,
session: null,
allowChannelPinning: true,
cancellationToken: cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you suddenly switch from using var session ... to this?

Wouldn't it be better to do:

public override long Count(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default)
{                                                                                                                                  
    using var session = _operationExecutor.StartImplicitSession(cancellationToken: cancellationToken);                             
    return CountDocuments(session, filter, options, cancellationToken);                                                            
}                                                                                                                                  

_writeOperationOptions,
session: null,
allowChannelPinning: false,
cancellationToken: cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the using var session ... approach you used in parts of MongoClientImpl.cs?

public ClientBulkWriteResult BulkWrite(IReadOnlyList<BulkWriteModel> models, ClientBulkWriteOptions options = null, CancellationToken cancellationToken = default)
{                                                                                                                                                                 
    using var session = _operationExecutor.StartImplicitSession(cancellationToken);                                                                               
    return BulkWrite(session, models, options, cancellationToken);                                                                                                
}                                                                                                                                                                 

It's less code duplication (specifically not duplicating the tricky part of whether to pass null or not for session and what value to pass for allowChannelPinning.

It very clearly emphasizes that the only difference between this method and the next is that an implicit session is being used.

return await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
ThrowIfDisposed();
var isOwnSession = session == null;
session ??= StartImplicitSession(cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change implementation to NOT allow passing null for session.

Let callers create implicit session instead (see other comments).

@sanych-sun sanych-sun requested a review from rstam June 4, 2025 15:58
Copy link
Contributor

@rstam rstam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to stop my review here to see what you think of the suggestions so far. If you agree I will wait for a further commit.

_writeOperationOptions,
Ensure.IsNotNull(session, nameof(session)),
allowChannelPinning: false,
cancellationToken: cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think converting this to an expression valued body is a mistake. The result is a complicated expression with lots of things happening in one single expression.

I propose the following:

public ClientBulkWriteResult BulkWrite(IClientSessionHandle session, IReadOnlyList<BulkWriteModel> models, ClientBulkWriteOptions options = null, CancellationToken cancellationToken = default)
{                                                                                                                                                                                               
    Ensure.IsNotNull(session, nameof(session));                                                                                                                                                 
    var operation = CreateClientBulkWriteOperation(models, options);                                                                                                                            
    return ExecuteWriteOperation<ClientBulkWriteResult>(session, operation, cancellationToken);                                                                                                 
}                                                                                                                                                                                               

and keeping the ExecuteWriteOperation helper method but refactoring to:

private TResult ExecuteWriteOperation<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
{                                                                                                                                                                                 
    return _operationExecutor.ExecuteWriteOperation(session, operation, _writeOperationOptions, allowChannelPinning: false, cancellationToken);                                   
}                                                                                                                                                                                 

This allows the decomposing the various things going on into separate statements.

It also simplifies the code because the options and the allowChannelPinning values are the same for the entire file, and can be supplied once in the ExecuteWrite helper method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for many other methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@sanych-sun sanych-sun requested a review from rstam June 4, 2025 18:47
var operation = CreateClientBulkWriteOperation(models, options);
return ExecuteWriteOperation<ClientBulkWriteResult>(session, operation, cancellationToken);
return ExecuteWriteOperation<ClientBulkWriteResult>(session, operation, _writeOperationOptions, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_writeOperationOptions argument is unnecessary. It's always the same.


return UsingImplicitSession(session => ListDatabaseNames(session, options, cancellationToken), cancellationToken);
var listDatabasesOptions = CreateListDatabasesOptionsFromListDatabaseNamesOptions(options);
var databases = ListDatabases(listDatabasesOptions, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did the implicit session go?

For consistency you should create the implicit session and call the other ListDatabaseNames overload.

I think you are trying to short circuit something here but don't think you should.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll double-check the difference.


return UsingImplicitSessionAsync(session => ListDatabaseNamesAsync(session, options, cancellationToken), cancellationToken);
var listDatabasesOptions = CreateListDatabasesOptionsFromListDatabaseNamesOptions(options);
var databases = await ListDatabasesAsync(listDatabasesOptions, cancellationToken).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to the implicit session here?

I think you are trying to short circuit something but it would be better to create the implicit session and call the other overlaod of ListDatabaseNamesAsync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll double-check the difference.

return ListDatabases(session, null, cancellationToken);
Ensure.IsNotNull(session, nameof(session));
var operation = CreateListDatabaseOperation(null);
return ExecuteReadOperation(session, operation, _readOperationOptions, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call the other overload passing null for options.

It's not a good idea to copy/paste the body of the other method here as an "optimization". If we ever change something in the other overload we could forget to make the same change here.

return ListDatabasesAsync(session, null, cancellationToken);
Ensure.IsNotNull(session, nameof(session));
var operation = CreateListDatabaseOperation(null);
return ExecuteReadOperationAsync(session, operation, _readOperationOptions, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call the other overload of ListDatabasesAsync like before. No need to copy/paste the implementation from that method to this one.

@sanych-sun sanych-sun requested a review from rstam June 4, 2025 21:32
private Task<TResult> ExecuteReadOperationAsync<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, ReadOperationOptions options, CancellationToken cancellationToken)
=> _operationExecutor.ExecuteReadOperationAsync(session, operation, options, true, cancellationToken);

private TResult ExecuteWriteOperation<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, WriteOperationOptions options, CancellationToken cancellationToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove options parameter?

Every single caller passes _writeOperationOptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@sanych-sun sanych-sun requested a review from rstam June 4, 2025 22:47
Copy link
Contributor

@rstam rstam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's VERY close! Looking good.

if (update is PipelineUpdateDefinition<TDocument> && (options.ArrayFilters != null && options.ArrayFilters.Any()))
{
throw new NotSupportedException("An arrayfilter is not supported in the pipeline-style update.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this argument validation need to be moved back to the public method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNullOrEmpty(name, nameof(name));
if (name == "*")
{
throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
throw new ArgumentException("Cannot specify '*' for the index name. Use DropAll and DropAllAsync to drop all indexes.", "name");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to mention DropAllAsync here?

They called DropOne, so the correct alternative method is DropAll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorisDog as far as I remember you've asked to change the error message. Could you please confirm?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it was discussed with Boris, I'll update wording to mention DropAll in DropOne, and DropAllAsync in DropOneAsync.

{
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNullOrEmpty(name, nameof(name));
if (name == "*")
{
throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
throw new ArgumentException("Cannot specify '*' for the index name. Use DropAll and DropAllAsync to drop all indexes.", "name");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to mention DropAll here?

They called DropOneAsync, so the correct alternative method is DropAllAsync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorisDog as far as I remember you've asked to change the error message. Could you please confirm?

{
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(models, nameof(models));
Ensure.IsNotNull((object)models, nameof(models));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me awhile to figure out why this cast to (object) was added.

At first glance it makes no sense!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it was discussed, this prevents warning about possible multiple enumeration. Will revert.

}

private T ExecuteWriteOperation<T>(IClientSessionHandle session, IWriteOperation<T> operation, CancellationToken cancellationToken)
private BsonDocument GetEncryptedFields(IClientSessionHandle session, CollectionNamespace collectionNamespace, DropCollectionOptions options, CancellationToken cancellationToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the difference between "encrypted fields" and "effective encrypted fields", but

return effectiveEncryptedFields;

makes me think this method should be called GetEffectiveEncryptedField.

Same for async version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return collections.Select(collection => collection["name"].AsString);
}
private TResult ExecuteReadOperation<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken)
=> _operationExecutor.ExecuteReadOperation(session, operation, _readOperationOptions, true, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call the other overload instead of _operationExecutor directly?

=> ExecuteReadOperation(session, operation, _readOperationOptions, cancellationToken);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return ExecuteReadOperation(session, operation, readPreference, cancellationToken);
}
private Task<TResult> ExecuteReadOperationAsync<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken)
=> _operationExecutor.ExecuteReadOperationAsync(session, operation, _readOperationOptions, true, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call the other overload instead of _operationExecutor directly?

=> ExecuteReadOperationAsync(session, operation, _readOperationOptions, cancellationToken);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@sanych-sun sanych-sun requested a review from rstam June 5, 2025 01:09
Copy link
Contributor

@rstam rstam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sanych-sun sanych-sun merged commit 27fd9e5 into mongodb:main Jun 5, 2025
30 of 34 checks passed
@sanych-sun sanych-sun deleted the csharp5560 branch June 5, 2025 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants