Skip to content

Commit b2f1180

Browse files
authored
CSHARP-3662: MongoClientSettings.SocketTimeout does not work for values under 500ms on Windows for sync code (#1690)
1 parent dde1ef9 commit b2f1180

File tree

10 files changed

+497
-129
lines changed

10 files changed

+497
-129
lines changed

src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output)
3434
{
3535
var uncompressedSize = (int)(input.Length - input.Position);
3636
var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers
37-
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize, CancellationToken.None);
37+
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize, Timeout.InfiniteTimeSpan, CancellationToken.None);
3838
var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize);
3939
var compressedBytes = new byte[maxCompressedSize];
4040
var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes);
@@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output)
5050
{
5151
var compressedSize = (int)(input.Length - input.Position);
5252
var compressedBytes = new byte[compressedSize];
53-
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize, CancellationToken.None);
53+
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize, Timeout.InfiniteTimeSpan, CancellationToken.None);
5454
var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes);
5555
var decompressedBytes = new byte[uncompressedSize];
5656
var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes);

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,14 +333,15 @@ private IByteBuffer ReceiveBuffer(CancellationToken cancellationToken)
333333
try
334334
{
335335
var messageSizeBytes = new byte[4];
336-
_stream.ReadBytes(messageSizeBytes, 0, 4, cancellationToken);
336+
var readTimeout = _stream.CanTimeout ? TimeSpan.FromMilliseconds(_stream.ReadTimeout) : Timeout.InfiniteTimeSpan;
337+
_stream.ReadBytes(messageSizeBytes, 0, 4, readTimeout, cancellationToken);
337338
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
338339
EnsureMessageSizeIsValid(messageSize);
339340
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
340341
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
341342
buffer.Length = messageSize;
342343
buffer.SetBytes(0, messageSizeBytes, 0, 4);
343-
_stream.ReadBytes(buffer, 4, messageSize - 4, cancellationToken);
344+
_stream.ReadBytes(buffer, 4, messageSize - 4, readTimeout, cancellationToken);
344345
_lastUsedAtUtc = DateTime.UtcNow;
345346
buffer.MakeReadOnly();
346347
return buffer;
@@ -535,7 +536,8 @@ private void SendBuffer(IByteBuffer buffer, CancellationToken cancellationToken)
535536

536537
try
537538
{
538-
_stream.WriteBytes(buffer, 0, buffer.Length, cancellationToken);
539+
var writeTimeout = _stream.CanTimeout ? TimeSpan.FromMilliseconds(_stream.WriteTimeout) : Timeout.InfiniteTimeSpan;
540+
_stream.WriteBytes(buffer, 0, buffer.Length, writeTimeout, cancellationToken);
539541
_lastUsedAtUtc = DateTime.UtcNow;
540542
}
541543
catch (Exception ex)

src/MongoDB.Driver/Core/Connections/TcpStreamFactory.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ private void Connect(Socket socket, EndPoint endPoint, CancellationToken cancell
138138

139139
if (!connectOperation.IsCompleted)
140140
{
141-
try { socket.Dispose(); } catch { }
141+
try
142+
{
143+
socket.Dispose();
144+
} catch { }
142145

143146
cancellationToken.ThrowIfCancellationRequested();
144147
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");
@@ -164,7 +167,12 @@ private async Task ConnectAsync(Socket socket, EndPoint endPoint, CancellationTo
164167

165168
if (!connectTask.IsCompleted)
166169
{
167-
try { socket.Dispose(); } catch { }
170+
try
171+
{
172+
socket.Dispose();
173+
// should await on the read task to avoid UnobservedTaskException
174+
await connectTask.ConfigureAwait(false);
175+
} catch { }
168176

169177
cancellationToken.ThrowIfCancellationRequested();
170178
throw new TimeoutException($"Timed out connecting to {endPoint}. Timeout was {_settings.ConnectTimeout}.");

src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs

Lines changed: 127 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -36,46 +36,80 @@ public static void EfficientCopyTo(this Stream input, Stream output)
3636
}
3737
}
3838

39-
public static async Task<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
39+
public static int Read(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
4040
{
41-
var state = 1; // 1 == reading, 2 == done reading, 3 == timedout, 4 == cancelled
42-
43-
var bytesRead = 0;
44-
using (new Timer(_ => ChangeState(3), null, timeout, Timeout.InfiniteTimeSpan))
45-
using (cancellationToken.Register(() => ChangeState(4)))
41+
try
4642
{
47-
try
48-
{
49-
bytesRead = await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
50-
ChangeState(2); // note: might not actually go to state 2 if already in state 3 or 4
51-
}
52-
catch when (state == 1)
53-
{
54-
try { stream.Dispose(); } catch { }
55-
throw;
56-
}
57-
catch when (state >= 3)
43+
using var manualResetEvent = new ManualResetEventSlim();
44+
var readOperation = stream.BeginRead(
45+
buffer,
46+
offset,
47+
count,
48+
state => ((ManualResetEventSlim)state.AsyncState).Set(),
49+
manualResetEvent);
50+
51+
if (readOperation.IsCompleted || manualResetEvent.Wait(timeout, cancellationToken))
5852
{
59-
// a timeout or operation cancelled exception will be thrown instead
53+
return stream.EndRead(readOperation);
6054
}
55+
}
56+
catch (OperationCanceledException)
57+
{
58+
// Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed.
59+
}
60+
catch (ObjectDisposedException)
61+
{
62+
throw new IOException();
63+
}
6164

62-
if (state == 3) { throw new TimeoutException(); }
63-
if (state == 4) { throw new OperationCanceledException(); }
65+
try
66+
{
67+
stream.Dispose();
68+
}
69+
catch
70+
{
71+
// Ignore any exceptions
6472
}
6573

66-
return bytesRead;
74+
cancellationToken.ThrowIfCancellationRequested();
75+
throw new TimeoutException();
76+
}
6777

68-
void ChangeState(int to)
78+
public static async Task<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
79+
{
80+
Task<int> readTask = null;
81+
try
82+
{
83+
readTask = stream.ReadAsync(buffer, offset, count);
84+
return await readTask.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);
85+
}
86+
catch (ObjectDisposedException)
6987
{
70-
var from = Interlocked.CompareExchange(ref state, to, 1);
71-
if (from == 1 && to >= 3)
88+
// It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
89+
throw new IOException();
90+
}
91+
catch (Exception ex) when (ex is OperationCanceledException or TimeoutException)
92+
{
93+
// await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout
94+
try
7295
{
73-
try { stream.Dispose(); } catch { } // disposing the stream aborts the read attempt
96+
stream.Dispose();
97+
if (readTask != null)
98+
{
99+
// Should await on the task to avoid UnobservedTaskException
100+
await readTask.ConfigureAwait(false);
101+
}
74102
}
103+
catch
104+
{
105+
// Ignore any exceptions
106+
}
107+
108+
throw;
75109
}
76110
}
77111

78-
public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
112+
public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
79113
{
80114
Ensure.IsNotNull(stream, nameof(stream));
81115
Ensure.IsNotNull(buffer, nameof(buffer));
@@ -84,7 +118,7 @@ public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int
84118

85119
while (count > 0)
86120
{
87-
var bytesRead = stream.Read(buffer, offset, count); // TODO: honor cancellationToken?
121+
var bytesRead = stream.Read(buffer, offset, count, timeout, cancellationToken);
88122
if (bytesRead == 0)
89123
{
90124
throw new EndOfStreamException();
@@ -94,7 +128,7 @@ public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int
94128
}
95129
}
96130

97-
public static void ReadBytes(this Stream stream, IByteBuffer buffer, int offset, int count, CancellationToken cancellationToken)
131+
public static void ReadBytes(this Stream stream, IByteBuffer buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
98132
{
99133
Ensure.IsNotNull(stream, nameof(stream));
100134
Ensure.IsNotNull(buffer, nameof(buffer));
@@ -105,7 +139,7 @@ public static void ReadBytes(this Stream stream, IByteBuffer buffer, int offset,
105139
{
106140
var backingBytes = buffer.AccessBackingBytes(offset);
107141
var bytesToRead = Math.Min(count, backingBytes.Count);
108-
var bytesRead = stream.Read(backingBytes.Array, backingBytes.Offset, bytesToRead); // TODO: honor cancellationToken?
142+
var bytesRead = stream.Read(backingBytes.Array, backingBytes.Offset, bytesToRead, timeout, cancellationToken);
109143
if (bytesRead == 0)
110144
{
111145
throw new EndOfStreamException();
@@ -155,44 +189,82 @@ public static async Task ReadBytesAsync(this Stream stream, IByteBuffer buffer,
155189
}
156190
}
157191

158-
159-
public static async Task WriteAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
192+
public static void Write(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
160193
{
161-
var state = 1; // 1 == writing, 2 == done writing, 3 == timedout, 4 == cancelled
162-
163-
using (new Timer(_ => ChangeState(3), null, timeout, Timeout.InfiniteTimeSpan))
164-
using (cancellationToken.Register(() => ChangeState(4)))
194+
try
165195
{
166-
try
167-
{
168-
await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
169-
ChangeState(2); // note: might not actually go to state 2 if already in state 3 or 4
170-
}
171-
catch when (state == 1)
172-
{
173-
try { stream.Dispose(); } catch { }
174-
throw;
175-
}
176-
catch when (state >= 3)
196+
using var manualResetEvent = new ManualResetEventSlim();
197+
var writeOperation = stream.BeginWrite(
198+
buffer,
199+
offset,
200+
count,
201+
state => ((ManualResetEventSlim)state.AsyncState).Set(),
202+
manualResetEvent);
203+
204+
if (writeOperation.IsCompleted || manualResetEvent.Wait(timeout, cancellationToken))
177205
{
178-
// a timeout or operation cancelled exception will be thrown instead
206+
stream.EndWrite(writeOperation);
207+
return;
179208
}
209+
}
210+
catch (OperationCanceledException)
211+
{
212+
// Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed.
213+
}
214+
catch (ObjectDisposedException)
215+
{
216+
// It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
217+
throw new IOException();
218+
}
180219

181-
if (state == 3) { throw new TimeoutException(); }
182-
if (state == 4) { throw new OperationCanceledException(); }
220+
try
221+
{
222+
stream.Dispose();
183223
}
224+
catch
225+
{
226+
// Ignore any exceptions
227+
}
228+
229+
cancellationToken.ThrowIfCancellationRequested();
230+
throw new TimeoutException();
231+
}
184232

185-
void ChangeState(int to)
233+
public static async Task WriteAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
234+
{
235+
Task writeTask = null;
236+
try
237+
{
238+
writeTask = stream.WriteAsync(buffer, offset, count);
239+
await writeTask.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);
240+
}
241+
catch (ObjectDisposedException)
242+
{
243+
// It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true.
244+
throw new IOException();
245+
}
246+
catch (Exception ex) when (ex is OperationCanceledException or TimeoutException)
186247
{
187-
var from = Interlocked.CompareExchange(ref state, to, 1);
188-
if (from == 1 && to >= 3)
248+
// await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout
249+
try
189250
{
190-
try { stream.Dispose(); } catch { } // disposing the stream aborts the write attempt
251+
stream.Dispose();
252+
// Should await on the task to avoid UnobservedTaskException
253+
if (writeTask != null)
254+
{
255+
await writeTask.ConfigureAwait(false);
256+
}
191257
}
258+
catch
259+
{
260+
// Ignore any exceptions
261+
}
262+
263+
throw;
192264
}
193265
}
194266

195-
public static void WriteBytes(this Stream stream, IByteBuffer buffer, int offset, int count, CancellationToken cancellationToken)
267+
public static void WriteBytes(this Stream stream, IByteBuffer buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken)
196268
{
197269
Ensure.IsNotNull(stream, nameof(stream));
198270
Ensure.IsNotNull(buffer, nameof(buffer));
@@ -204,7 +276,7 @@ public static void WriteBytes(this Stream stream, IByteBuffer buffer, int offset
204276
cancellationToken.ThrowIfCancellationRequested();
205277
var backingBytes = buffer.AccessBackingBytes(offset);
206278
var bytesToWrite = Math.Min(count, backingBytes.Count);
207-
stream.Write(backingBytes.Array, backingBytes.Offset, bytesToWrite); // TODO: honor cancellationToken?
279+
stream.Write(backingBytes.Array, backingBytes.Offset, bytesToWrite, timeout, cancellationToken);
208280
offset += bytesToWrite;
209281
count -= bytesToWrite;
210282
}

0 commit comments

Comments
 (0)