Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/MongoDB.Driver/Core/Misc/ThreadStaticRandom.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,16 @@ public static int Next(int maxValue)

return random.Next(maxValue);
}

public static double NextDouble()
{
var random = __threadStaticRandom;
if (random == null)
{
random = __threadStaticRandom = new Random();
}

return random.NextDouble();
}
}
}
15 changes: 15 additions & 0 deletions src/MongoDB.Driver/Core/Operations/RetryabilityHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ public static void AddRetryableWriteErrorLabelIfRequired(MongoException exceptio
}
}

public static int GetRetryDelayMs(int attempt, double backoffBase = 2, int backoffInitial = 100, int backoffMax = 10_000)
{
Ensure.IsGreaterThanZero(attempt, nameof(attempt));
Ensure.IsGreaterThanZero(backoffBase, nameof(backoffBase));
Ensure.IsGreaterThanZero(backoffInitial, nameof(backoffInitial));
Ensure.IsGreaterThan(backoffMax, backoffInitial, nameof(backoffMax));

#if NET6_0_OR_GREATER
var j = Random.Shared.NextDouble();
#else
var j = ThreadStaticRandom.NextDouble();
#endif
return (int)(j * Math.Min(backoffMax, backoffInitial * Math.Pow(backoffBase, attempt - 1)));
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation backoffInitial * Math.Pow(backoffBase, attempt - 1) may overflow for large attempt values, which would then be capped by Math.Min. Consider adding overflow protection or documenting the maximum safe attempt value to prevent unexpected behavior.

Suggested change
return (int)(j * Math.Min(backoffMax, backoffInitial * Math.Pow(backoffBase, attempt - 1)));
// compute the largest exponent such that backoffInitial * backoffBase^exponent <= backoffMax
var maxExponent = Math.Log(backoffMax / (double)backoffInitial, backoffBase);
var effectiveExponent = attempt - 1;
double delayWithoutJitter;
if (effectiveExponent >= maxExponent)
{
delayWithoutJitter = backoffMax;
}
else
{
delayWithoutJitter = backoffInitial * Math.Pow(backoffBase, effectiveExponent);
}
return (int)(j * delayWithoutJitter);

Copilot uses AI. Check for mistakes.
}

public static bool IsCommandRetryable(BsonDocument command)
{
return
Expand Down
174 changes: 78 additions & 96 deletions src/MongoDB.Driver/TransactionExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Threading.Tasks;
using MongoDB.Driver.Core.Bindings;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Operations;

namespace MongoDB.Driver
{
Expand All @@ -35,28 +36,41 @@ public static TResult ExecuteWithRetries<TResult>(
IClock clock,
CancellationToken cancellationToken)
{
var attempt = 0;
var transactionTimeout = transactionOptions?.Timeout ?? clientSession.Options.DefaultTransactionOptions?.Timeout;
using var operationContext = new OperationContext(clock, transactionTimeout, cancellationToken);

while (true)
{
attempt++;
clientSession.StartTransaction(transactionOptions);
clientSession.WrappedCoreSession.CurrentTransaction.OperationContext = operationContext;

var callbackOutcome = ExecuteCallback(operationContext, clientSession, callback, cancellationToken);
if (callbackOutcome.ShouldRetryTransaction)
try
{
continue;
var result = ExecuteCallback(operationContext, clientSession, callback, cancellationToken);
// Transaction could be completed by user's code inside the callback, skipping commit in such case.
if (IsTransactionInStartingOrInProgressState(clientSession))
{
CommitWithRetries(operationContext, clientSession, cancellationToken);
}

return result;
}
if (!IsTransactionInStartingOrInProgressState(clientSession))
catch (Exception ex)
{
return callbackOutcome.Result; // assume callback intentionally ended the transaction
}
if (!HasErrorLabel(ex, TransientTransactionErrorLabel))
{
throw;
}

var transactionHasBeenCommitted = CommitWithRetries(operationContext, clientSession, cancellationToken);
if (transactionHasBeenCommitted)
{
return callbackOutcome.Result;
var delay = GetRetryDelay(attempt);
if (HasTimedOut(operationContext, delay))
{
throw;
}

Thread.Sleep(delay);
}
}
}
Expand All @@ -68,97 +82,99 @@ public static async Task<TResult> ExecuteWithRetriesAsync<TResult>(
IClock clock,
CancellationToken cancellationToken)
{
TimeSpan? transactionTimeout = transactionOptions?.Timeout ?? clientSession.Options.DefaultTransactionOptions?.Timeout;
var attempt = 0;
var transactionTimeout = transactionOptions?.Timeout ?? clientSession.Options.DefaultTransactionOptions?.Timeout;
using var operationContext = new OperationContext(clock, transactionTimeout, cancellationToken);

while (true)
{
attempt++;
clientSession.StartTransaction(transactionOptions);
clientSession.WrappedCoreSession.CurrentTransaction.OperationContext = operationContext;

var callbackOutcome = await ExecuteCallbackAsync(operationContext, clientSession, callbackAsync, cancellationToken).ConfigureAwait(false);
if (callbackOutcome.ShouldRetryTransaction)
try
{
continue;
var result = await ExecuteCallbackAsync(operationContext, clientSession, callbackAsync, cancellationToken).ConfigureAwait(false);
// Transaction could be completed by user's code inside the callback, skipping commit in such case.
if (IsTransactionInStartingOrInProgressState(clientSession))
{
await CommitWithRetriesAsync(operationContext, clientSession, cancellationToken).ConfigureAwait(false);
}

return result;
}
if (!IsTransactionInStartingOrInProgressState(clientSession))
catch (Exception ex)
{
return callbackOutcome.Result; // assume callback intentionally ended the transaction
}
if (!HasErrorLabel(ex, TransientTransactionErrorLabel))
{
throw;
}

var transactionHasBeenCommitted = await CommitWithRetriesAsync(operationContext, clientSession, cancellationToken).ConfigureAwait(false);
if (transactionHasBeenCommitted)
{
return callbackOutcome.Result;
var delay = GetRetryDelay(attempt);
if (HasTimedOut(operationContext, delay))
{
throw;
}

await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
}
}

private static bool HasTimedOut(OperationContext operationContext)
private static TimeSpan GetRetryDelay(int attempt)
=> TimeSpan.FromMilliseconds(RetryabilityHelper.GetRetryDelayMs(attempt, 1.5, 5, 500));

private static bool HasTimedOut(OperationContext operationContext, TimeSpan delay = default)
{
return operationContext.IsTimedOut() ||
(operationContext.RootContext.Timeout == null && operationContext.RootContext.Elapsed > __transactionTimeout);
if (operationContext.Timeout.HasValue)
{
return operationContext.Elapsed + delay >= operationContext.Timeout;
}

return operationContext.RootContext.Elapsed + delay >= __transactionTimeout;
}

private static CallbackOutcome<TResult> ExecuteCallback<TResult>(OperationContext operationContext, IClientSessionHandle clientSession, Func<IClientSessionHandle, CancellationToken, TResult> callback, CancellationToken cancellationToken)
private static TResult ExecuteCallback<TResult>(OperationContext operationContext, IClientSessionHandle clientSession, Func<IClientSessionHandle, CancellationToken, TResult> callback, CancellationToken cancellationToken)
{
try
{
var result = callback(clientSession, cancellationToken);
return new CallbackOutcome<TResult>.WithResult(result);
return callback(clientSession, cancellationToken);
}
catch (Exception ex)
catch (Exception) when (IsTransactionInStartingOrInProgressState(clientSession))
{
if (IsTransactionInStartingOrInProgressState(clientSession))
AbortTransactionOptions abortOptions = null;
if (operationContext.IsRootContextTimeoutConfigured())
{
AbortTransactionOptions abortOptions = null;
if (operationContext.IsRootContextTimeoutConfigured())
{
abortOptions = new AbortTransactionOptions(operationContext.RootContext.Timeout);
}

clientSession.AbortTransaction(abortOptions, cancellationToken);
abortOptions = new AbortTransactionOptions(operationContext.RootContext.Timeout);
}

if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return new CallbackOutcome<TResult>.WithShouldRetryTransaction();
}
clientSession.AbortTransaction(abortOptions, cancellationToken);

throw;
}
}

private static async Task<CallbackOutcome<TResult>> ExecuteCallbackAsync<TResult>(OperationContext operationContext, IClientSessionHandle clientSession, Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, CancellationToken cancellationToken)
private static async Task<TResult> ExecuteCallbackAsync<TResult>(OperationContext operationContext, IClientSessionHandle clientSession, Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, CancellationToken cancellationToken)
{
try
{
var result = await callbackAsync(clientSession, cancellationToken).ConfigureAwait(false);
return new CallbackOutcome<TResult>.WithResult(result);
return await callbackAsync(clientSession, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
catch (Exception) when (IsTransactionInStartingOrInProgressState(clientSession))
{
if (IsTransactionInStartingOrInProgressState(clientSession))
AbortTransactionOptions abortOptions = null;
if (operationContext.IsRootContextTimeoutConfigured())
{
AbortTransactionOptions abortOptions = null;
if (operationContext.IsRootContextTimeoutConfigured())
{
abortOptions = new AbortTransactionOptions(operationContext.RootContext.Timeout);
}

await clientSession.AbortTransactionAsync(abortOptions, cancellationToken).ConfigureAwait(false);
abortOptions = new AbortTransactionOptions(operationContext.RootContext.Timeout);
}

if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return new CallbackOutcome<TResult>.WithShouldRetryTransaction();
}
await clientSession.AbortTransactionAsync(abortOptions, cancellationToken).ConfigureAwait(false);

throw;
}
}

private static bool CommitWithRetries(OperationContext operationContext, IClientSessionHandle clientSession, CancellationToken cancellationToken)
private static void CommitWithRetries(OperationContext operationContext, IClientSessionHandle clientSession, CancellationToken cancellationToken)
{
while (true)
{
Expand All @@ -171,7 +187,7 @@ private static bool CommitWithRetries(OperationContext operationContext, IClient
}

clientSession.CommitTransaction(commitOptions, cancellationToken);
return true;
return;
}
catch (Exception ex)
{
Expand All @@ -180,17 +196,12 @@ private static bool CommitWithRetries(OperationContext operationContext, IClient
continue;
}

if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return false; // the transaction will be retried
}

throw;
}
}
}

private static async Task<bool> CommitWithRetriesAsync(OperationContext operationContext, IClientSessionHandle clientSession, CancellationToken cancellationToken)
private static async Task CommitWithRetriesAsync(OperationContext operationContext, IClientSessionHandle clientSession, CancellationToken cancellationToken)
{
while (true)
{
Expand All @@ -203,7 +214,7 @@ private static async Task<bool> CommitWithRetriesAsync(OperationContext operatio
}

await clientSession.CommitTransactionAsync(commitOptions, cancellationToken).ConfigureAwait(false);
return true;
return;
}
catch (Exception ex)
{
Expand All @@ -212,11 +223,6 @@ private static async Task<bool> CommitWithRetriesAsync(OperationContext operatio
continue;
}

if (HasErrorLabel(ex, TransientTransactionErrorLabel) && !HasTimedOut(operationContext))
{
return false; // the transaction will be retried
}

throw;
}
}
Expand All @@ -228,10 +234,8 @@ private static bool HasErrorLabel(Exception ex, string errorLabel)
{
return mongoException.HasErrorLabel(errorLabel);
}
else
{
return false;
}

return false;
}

private static bool IsMaxTimeMSExpiredException(Exception ex)
Expand Down Expand Up @@ -279,27 +283,5 @@ private static bool ShouldRetryCommit(OperationContext operationContext, Excepti
!HasTimedOut(operationContext) &&
!IsMaxTimeMSExpiredException(ex);
}

// nested types
internal abstract class CallbackOutcome<TResult>
{
public virtual TResult Result => throw new InvalidOperationException();
public virtual bool ShouldRetryTransaction => false;

public class WithResult : CallbackOutcome<TResult>
{
public WithResult(TResult result)
{
Result = result;
}

public override TResult Result { get; }
}

public class WithShouldRetryTransaction : CallbackOutcome<TResult>
{
public override bool ShouldRetryTransaction => true;
}
}
}
}
1 change: 1 addition & 0 deletions tests/MongoDB.Driver.Tests/ClientSessionHandleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ public void WithTransaction_commit_after_callback_processing_should_be_processed
}

var subject = CreateSubject(coreSession: mockCoreSession.Object, clock: mockClock.Object);
SetupTransactionState(subject, true);

if (async)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.TestHelpers;
using Xunit;
using MongoDB.Driver.Core.Connections;

namespace MongoDB.Driver.Core.Operations
{
Expand Down Expand Up @@ -102,6 +101,42 @@ public void AddRetryableWriteErrorLabelIfRequired_should_add_RetryableWriteError
hasRetryableWriteErrorLabel.Should().Be(shouldAddErrorLabel);
}

[Theory]
[InlineData(1, 2, 100, 10000, 0, 100)]
[InlineData(2, 2, 100, 10000, 0, 200)]
[InlineData(3, 2, 100, 10000, 0, 400)]
[InlineData(9999, 2, 100, 10000, 0, 10000)]

[InlineData(1, 1.5, 100, 10000, 0, 100)]
[InlineData(2, 1.5, 100, 10000, 0, 150)]
[InlineData(3, 1.5, 100, 10000, 0, 225)]
[InlineData(9999, 1.5, 100, 10000, 0, 10000)]
public void GetRetryDelayMs_should_return_expected_results(int attempt, double backoffBase, int backoffInitial, int backoffMax, int expectedRangeMin, int expectedRangeMax)
{
var result = RetryabilityHelper.GetRetryDelayMs(attempt, backoffBase, backoffInitial, backoffMax);

result.Should().BeInRange(expectedRangeMin, expectedRangeMax);
}

[Theory]
[InlineData(-1, 2, 100, 1000, "attempt")]
[InlineData(0, 2, 100, 1000, "attempt")]
[InlineData(1, -1, 100, 1000, "backoffBase")]
[InlineData(1, 0, 100, 1000, "backoffBase")]
[InlineData(1, 2, -1, 1000, "backoffInitial")]
[InlineData(1, 2, 0, 1000, "backoffInitial")]
[InlineData(1, 2, 100, -1, "backoffMax")]
[InlineData(1, 2, 100, 0, "backoffMax")]
[InlineData(1, 2, 100, 50, "backoffMax")]

public void GetRetryDelayMs_throws_on_wrong_parameters(int attempt, double backoffBase, int backoffInitial, int backoffMax, string expectedParameterName)
{
var exception = Record.Exception(() => RetryabilityHelper.GetRetryDelayMs(attempt, backoffBase, backoffInitial, backoffMax));

exception.Should().BeOfType<ArgumentOutOfRangeException>().Subject
.ParamName.Should().Be(expectedParameterName);
}

[Theory]
[InlineData("{ txnNumber : 1 }", true)]
[InlineData("{ commitTransaction : 1 }", true)]
Expand Down