Skip to content

Commit a553ffb

Browse files
DmitryLukyanovBorisDog
authored andcommitted
CSHARP-3671: Better wait queue timeout errors for load balanced clusters. (#579)
CSHARP-3671: Better wait queue timeout errors for load balanced clusters.
1 parent a5f8257 commit a553ffb

14 files changed

+298
-113
lines changed

src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs

+17-29
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System.Threading;
1717
using MongoDB.Driver.Core.Bindings;
1818
using MongoDB.Driver.Core.Clusters;
19+
using MongoDB.Driver.Core.ConnectionPools;
1920
using MongoDB.Driver.Core.Connections;
2021
using MongoDB.Driver.Core.Servers;
2122

@@ -89,18 +90,20 @@ public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, I
8990
return new ReadWriteBindingHandle(readWriteBinding);
9091
}
9192

92-
internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, long cursorId)
93+
internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, IChannelHandle channel, long cursorId)
9394
{
9495
IChannelSource effectiveChannelSource;
9596
if (IsInLoadBalancedMode(channelSource.ServerDescription) && cursorId != 0)
9697
{
97-
var getMoreChannel = channelSource.GetChannel(CancellationToken.None); // no need for cancellation token since we already have channel in the source
98-
var getMoreSession = channelSource.Session.Fork();
98+
if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker)
99+
{
100+
checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Cursor);
101+
}
99102

100103
effectiveChannelSource = new ChannelChannelSource(
101104
channelSource.Server,
102-
getMoreChannel,
103-
getMoreSession);
105+
channel.Fork(),
106+
channelSource.Session.Fork());
104107
}
105108
else
106109
{
@@ -110,37 +113,22 @@ internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHa
110113
return new ChannelSourceHandle(effectiveChannelSource);
111114
}
112115

113-
internal static bool PinChannelSourceAndChannelIfRequired(
116+
internal static void PinChannellIfRequired(
114117
IChannelSourceHandle channelSource,
115118
IChannelHandle channel,
116-
ICoreSessionHandle session,
117-
out IChannelSourceHandle pinnedChannelSource,
118-
out IChannelHandle pinnedChannel)
119+
ICoreSessionHandle session)
119120
{
120-
if (IsInLoadBalancedMode(channel.ConnectionDescription))
121+
if (IsInLoadBalancedMode(channel.ConnectionDescription) &&
122+
session.IsInTransaction &&
123+
!IsChannelPinned(session.CurrentTransaction))
121124
{
122-
var server = channelSource.Server;
123-
124-
pinnedChannelSource = new ChannelSourceHandle(
125-
new ChannelChannelSource(
126-
server,
127-
channel.Fork(),
128-
session.Fork()));
129-
130-
if (session.IsInTransaction && !IsChannelPinned(session.CurrentTransaction))
125+
if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker)
131126
{
132-
session.CurrentTransaction.PinChannel(channel.Fork());
133-
session.CurrentTransaction.PinnedServer = server;
127+
checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Transaction);
134128
}
135-
136-
pinnedChannel = channel.Fork();
137-
138-
return true;
129+
session.CurrentTransaction.PinChannel(channel.Fork());
130+
session.CurrentTransaction.PinnedServer = channelSource.Server;
139131
}
140-
141-
pinnedChannelSource = null;
142-
pinnedChannel = null;
143-
return false;
144132
}
145133

146134
// private methods
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/* Copyright 2021-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading;
18+
19+
namespace MongoDB.Driver.Core.ConnectionPools
20+
{
21+
internal enum CheckOutReason
22+
{
23+
Cursor,
24+
Transaction
25+
}
26+
27+
internal interface ICheckOutReasonTracker
28+
{
29+
CheckOutReason? CheckOutReason { get; }
30+
void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason);
31+
}
32+
33+
internal sealed class CheckOutReasonCounter
34+
{
35+
public int _cursorCheckOutsCount = 0;
36+
public int _transactionCheckOutsCount = 0;
37+
38+
public int GetCheckOutsCount(CheckOutReason reason) =>
39+
reason switch
40+
{
41+
CheckOutReason.Cursor => _cursorCheckOutsCount,
42+
CheckOutReason.Transaction => _transactionCheckOutsCount,
43+
_ => throw new InvalidOperationException($"Invalid checkout reason {reason}.")
44+
};
45+
46+
public void Increment(CheckOutReason reason)
47+
{
48+
switch (reason)
49+
{
50+
case CheckOutReason.Cursor:
51+
Interlocked.Increment(ref _cursorCheckOutsCount);
52+
break;
53+
case CheckOutReason.Transaction:
54+
Interlocked.Increment(ref _transactionCheckOutsCount);
55+
break;
56+
default:
57+
throw new InvalidOperationException($"Invalid checkout reason {reason}.");
58+
}
59+
}
60+
61+
public void Decrement(CheckOutReason? reason)
62+
{
63+
switch (reason)
64+
{
65+
case null:
66+
break;
67+
case CheckOutReason.Cursor:
68+
Interlocked.Decrement(ref _cursorCheckOutsCount);
69+
break;
70+
case CheckOutReason.Transaction:
71+
Interlocked.Decrement(ref _transactionCheckOutsCount);
72+
break;
73+
default:
74+
throw new InvalidOperationException($"Invalid checkout reason {reason}.");
75+
}
76+
}
77+
}
78+
}

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

+70-10
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,33 @@
2929

3030
namespace MongoDB.Driver.Core.ConnectionPools
3131
{
32-
internal sealed partial class ExclusiveConnectionPool : IConnectionPool
32+
internal sealed partial class ExclusiveConnectionPool
3333
{
34+
// private methods
35+
private Exception CreateTimeoutException(Stopwatch stopwatch, string message)
36+
{
37+
var checkOutsForCursorCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Cursor);
38+
var checkOutsForTransactionCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Transaction);
39+
40+
// only use the expanded message format when connected to a load balancer
41+
if (checkOutsForCursorCount != 0 || checkOutsForTransactionCount != 0)
42+
{
43+
var maxPoolSize = _settings.MaxConnections;
44+
var availableConnectionsCount = AvailableCount;
45+
var checkOutsCount = maxPoolSize - availableConnectionsCount;
46+
var checkOutsForOtherCount = checkOutsCount - checkOutsForCursorCount - checkOutsForTransactionCount;
47+
48+
message =
49+
$"Timed out after {stopwatch.ElapsedMilliseconds}ms waiting for a connection from the connection pool. " +
50+
$"maxPoolSize: {maxPoolSize}, " +
51+
$"connections in use by cursors: {checkOutsForCursorCount}, " +
52+
$"connections in use by transactions: {checkOutsForTransactionCount}, " +
53+
$"connections in use by other operations: {checkOutsForOtherCount}.";
54+
}
55+
56+
return new TimeoutException(message);
57+
}
58+
3459
// nested classes
3560
private static class State
3661
{
@@ -125,7 +150,7 @@ private AcquiredConnection FinalizePoolEnterance(PooledConnection pooledConnecti
125150
_stopwatch.Stop();
126151

127152
var message = $"Timed out waiting for a connection after {_stopwatch.ElapsedMilliseconds}ms.";
128-
throw new TimeoutException(message);
153+
throw _pool.CreateTimeoutException(_stopwatch, message);
129154
}
130155
}
131156

@@ -173,8 +198,9 @@ public void HandleException(Exception ex)
173198
}
174199
}
175200

176-
private sealed class PooledConnection : IConnection
201+
private sealed class PooledConnection : IConnection, ICheckOutReasonTracker
177202
{
203+
private CheckOutReason? _checkOutReason;
178204
private readonly IConnection _connection;
179205
private readonly ExclusiveConnectionPool _connectionPool;
180206
private int _generation;
@@ -187,6 +213,14 @@ public PooledConnection(ExclusiveConnectionPool connectionPool, IConnection conn
187213
_generation = connectionPool._generation;
188214
}
189215

216+
public CheckOutReason? CheckOutReason
217+
{
218+
get
219+
{
220+
return _checkOutReason;
221+
}
222+
}
223+
190224
public ConnectionId ConnectionId
191225
{
192226
get { return _connection.ConnectionId; }
@@ -313,6 +347,15 @@ public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, Messag
313347
}
314348
}
315349

350+
public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)
351+
{
352+
if (_checkOutReason == null)
353+
{
354+
_checkOutReason = reason;
355+
_connectionPool._checkOutReasonCounter.Increment(reason);
356+
}
357+
}
358+
316359
public void SetReadTimeout(TimeSpan timeout)
317360
{
318361
_connection.SetReadTimeout(timeout);
@@ -335,7 +378,7 @@ private void SetEffectiveGenerationIfRequired(ConnectionDescription description)
335378
}
336379
}
337380

338-
private sealed class AcquiredConnection : IConnectionHandle
381+
private sealed class AcquiredConnection : IConnectionHandle, ICheckOutReasonTracker
339382
{
340383
private ExclusiveConnectionPool _connectionPool;
341384
private bool _disposed;
@@ -347,6 +390,14 @@ public AcquiredConnection(ExclusiveConnectionPool connectionPool, ReferenceCount
347390
_reference = reference;
348391
}
349392

393+
public CheckOutReason? CheckOutReason
394+
{
395+
get
396+
{
397+
return _reference.Instance.CheckOutReason;
398+
}
399+
}
400+
350401
public ConnectionId ConnectionId
351402
{
352403
get { return _reference.Instance.ConnectionId; }
@@ -432,6 +483,12 @@ public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncod
432483
return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
433484
}
434485

486+
public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)
487+
{
488+
ThrowIfDisposed();
489+
_reference.Instance.SetCheckOutReasonIfNotAlreadySet(reason);
490+
}
491+
435492
public void SetReadTimeout(TimeSpan timeout)
436493
{
437494
ThrowIfDisposed();
@@ -674,15 +731,15 @@ public PooledConnection CreateOpenedOrReuse(CancellationToken cancellationToken)
674731
{
675732
SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(),
676733
SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => CreateOpenedInternal(cancellationToken),
677-
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms."),
734+
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch),
678735
_ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}")
679736
};
680737

681738
waitTimeout = _connectingTimeout - stopwatch.Elapsed;
682739

683740
if (connection == null && waitTimeout <= TimeSpan.Zero)
684741
{
685-
throw TimoutException(stopwatch);
742+
throw CreateTimeoutException(stopwatch);
686743
}
687744
}
688745

@@ -708,15 +765,15 @@ public async Task<PooledConnection> CreateOpenedOrReuseAsync(CancellationToken c
708765
{
709766
SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(),
710767
SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => await CreateOpenedInternalAsync(cancellationToken).ConfigureAwait(false),
711-
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw TimoutException(stopwatch),
768+
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch),
712769
_ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}")
713770
};
714771

715772
waitTimeout = _connectingTimeout - stopwatch.Elapsed;
716773

717774
if (connection == null && waitTimeout <= TimeSpan.Zero)
718775
{
719-
throw TimoutException(stopwatch);
776+
throw CreateTimeoutException(stopwatch);
720777
}
721778
}
722779

@@ -783,8 +840,11 @@ private void FinishCreating(ConnectionDescription description)
783840
_pool._serviceStates.IncrementConnectionCount(description?.ServiceId);
784841
}
785842

786-
private Exception TimoutException(Stopwatch stopwatch) =>
787-
new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms.");
843+
private Exception CreateTimeoutException(Stopwatch stopwatch)
844+
{
845+
var message = $"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms.";
846+
return _pool.CreateTimeoutException(stopwatch, message);
847+
}
788848
}
789849
}
790850
}

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs

+4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace MongoDB.Driver.Core.ConnectionPools
2929
internal sealed partial class ExclusiveConnectionPool : IConnectionPool
3030
{
3131
// fields
32+
private readonly CheckOutReasonCounter _checkOutReasonCounter;
3233
private readonly IConnectionFactory _connectionFactory;
3334
private readonly ListConnectionHolder _connectionHolder;
3435
private readonly EndPoint _endPoint;
@@ -71,6 +72,7 @@ public ExclusiveConnectionPool(
7172
_connectionFactory = Ensure.IsNotNull(connectionFactory, nameof(connectionFactory));
7273
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
7374

75+
_checkOutReasonCounter = new CheckOutReasonCounter();
7476
_connectingQueue = new SemaphoreSlimSignalable(MongoInternalDefaults.ConnectionPool.MaxConnecting);
7577
_connectionHolder = new ListConnectionHolder(eventSubscriber, _connectingQueue);
7678
_serviceStates = new ServiceStates();
@@ -385,6 +387,8 @@ private void ReleaseConnection(PooledConnection connection)
385387
_checkedInConnectionEventHandler(new ConnectionPoolCheckedInConnectionEvent(connection.ConnectionId, TimeSpan.Zero, EventContext.OperationId));
386388
}
387389

390+
_checkOutReasonCounter.Decrement(connection.CheckOutReason);
391+
388392
if (!connection.IsExpired && _state.Value != State.Disposed)
389393
{
390394
_connectionHolder.Return(connection);

0 commit comments

Comments
 (0)