Skip to content

Commit 58f61a2

Browse files
authored
Override WriteAsync in ShellStream (#1711)
ShellStream does not currently override the Read/Write async variants. They fall back to the base class implementations which run the sync variants on a thread pool thread, only allowing one call of either at a time in order to protect implementations that would break if Read/Write were called simultaneously. In ShellStream, reads and writes are independent so mutually excluding their use is unnecessary and can lead to effective deadlocks. We therefore override WriteAsync to get around this restriction. We do not override ReadAsync because the sync implementation does not lend itself well to async given the use of Monitor.Wait/Pulse. Note that while reading and writing simultaneously is allowed, it is not intended that ShellStream is used with multiple simultaneous reads or multiple simultaneous writes, so it is fine to keep the base one-at-a-time implementation on ReadAsync. Another note is that the new WriteAsync will be simple (synchronous) buffer copying in most cases, with a call to FlushAsync in others. We also do not override FlushAsync, so that will go onto a thread pool thread and potentially acquire some locks. But given that the current base implementation of WriteAsync does that unconditionally, it makes the new WriteAsync slightly better and certainly no worse than the current version.
1 parent cb1f26d commit 58f61a2

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/Renci.SshNet/ShellStream.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,57 @@ public override void WriteByte(byte value)
891891
Write([value]);
892892
}
893893

894+
/// <inheritdoc/>
895+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
896+
{
897+
#if !NET
898+
ThrowHelper.
899+
#endif
900+
ValidateBufferArguments(buffer, offset, count);
901+
902+
return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
903+
}
904+
905+
#if NET
906+
/// <inheritdoc/>
907+
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
908+
#else
909+
private async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
910+
#endif
911+
{
912+
ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
913+
914+
while (!buffer.IsEmpty)
915+
{
916+
if (_writeBuffer.AvailableLength == 0)
917+
{
918+
await FlushAsync(cancellationToken).ConfigureAwait(false);
919+
}
920+
921+
var bytesToCopy = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
922+
923+
Debug.Assert(bytesToCopy > 0);
924+
925+
buffer.Slice(0, bytesToCopy).CopyTo(_writeBuffer.AvailableMemory);
926+
927+
_writeBuffer.Commit(bytesToCopy);
928+
929+
buffer = buffer.Slice(bytesToCopy);
930+
}
931+
}
932+
933+
/// <inheritdoc/>
934+
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
935+
{
936+
return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state);
937+
}
938+
939+
/// <inheritdoc/>
940+
public override void EndWrite(IAsyncResult asyncResult)
941+
{
942+
TaskToAsyncResult.End(asyncResult);
943+
}
944+
894945
/// <summary>
895946
/// Writes the line to the shell.
896947
/// </summary>

test/Renci.SshNet.Tests/Classes/ShellStreamTest_ReadExpect.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,21 @@ public void Read_AfterDispose_StillWorks()
236236
Assert.IsNull(_shellStream.ReadLine());
237237
}
238238

239+
[TestMethod]
240+
public async Task ReadAsyncDoesNotBlockWriteAsync()
241+
{
242+
byte[] buffer = new byte[16];
243+
Task<int> readTask = _shellStream.ReadAsync(buffer, 0, buffer.Length);
244+
245+
await _shellStream.WriteAsync("ls\n"u8.ToArray(), 0, 3);
246+
247+
Assert.IsFalse(readTask.IsCompleted);
248+
249+
_channelSessionStub.Receive("Directory.Build.props"u8.ToArray());
250+
251+
await readTask;
252+
}
253+
239254
[TestMethod]
240255
public void Read_MultiByte()
241256
{

0 commit comments

Comments
 (0)