Skip to content

Commit ea427f2

Browse files
danielmarbachlukebakken
authored andcommitted
Pass main loop token forward
1 parent b4b520b commit ea427f2

File tree

6 files changed

+66
-6
lines changed

6 files changed

+66
-6
lines changed

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,17 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
265265
/// <returns></returns>
266266
Task CloseAsync(ShutdownEventArgs reason, bool abort);
267267

268+
/// <summary>
269+
/// Asynchronously close this session.
270+
/// </summary>
271+
/// <param name="reason">The <see cref="ShutdownEventArgs"/> instance containing the close data.</param>
272+
/// <param name="abort">Whether or not the close is an abort (ignoring certain exceptions).</param>
273+
/// <param name="cancellationToken">CancellationToken for this operation.</param>
274+
/// <returns></returns>
275+
[Obsolete("7.2.0 - cancellationToken is ignored")]
276+
Task CloseAsync(ShutdownEventArgs reason, bool abort,
277+
CancellationToken cancellationToken = default);
278+
268279
/// <summary>Asynchronously declare an exchange.</summary>
269280
/// <param name="exchange">The name of the exchange.</param>
270281
/// <param name="type">The type of the exchange.</param>

projects/RabbitMQ.Client/IConnectionExtensions.cs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,21 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
8888
CancellationToken.None);
8989
}
9090

91+
/// <summary>
92+
/// Asynchronously abort this connection and all its channels.
93+
/// </summary>
94+
/// <remarks>
95+
/// Note that all active channels and sessions will be closed if this method is called.
96+
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection, CancellationToken)"/> will not throw
97+
/// <see cref="IOException"/> during closing connection.
98+
///This method waits infinitely for the in-progress close operation to complete.
99+
/// </remarks>
100+
public static Task AbortAsync(this IConnection connection)
101+
{
102+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced",
103+
InternalConstants.DefaultConnectionAbortTimeout, true, default);
104+
}
105+
91106
/// <summary>
92107
/// Asynchronously abort this connection and all its channels.
93108
/// </summary>
@@ -99,8 +114,27 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
99114
/// </remarks>
100115
public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default)
101116
{
102-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
103-
cancellationToken);
117+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced",
118+
InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken);
119+
}
120+
121+
/// <summary>
122+
/// Asynchronously abort this connection and all its channels.
123+
/// </summary>
124+
/// <remarks>
125+
/// The method behaves in the same way as <see cref="AbortAsync(IConnection, CancellationToken)"/>, with the only
126+
/// difference that the connection is closed with the given connection close code and message.
127+
/// <para>
128+
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
129+
/// </para>
130+
/// <para>
131+
/// A message indicating the reason for closing the connection
132+
/// </para>
133+
/// </remarks>
134+
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
135+
{
136+
return connection.CloseAsync(reasonCode, reasonText,
137+
InternalConstants.DefaultConnectionAbortTimeout, true, default);
104138
}
105139

106140
/// <summary>
@@ -118,8 +152,8 @@ public static Task AbortAsync(this IConnection connection, CancellationToken can
118152
/// </remarks>
119153
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default)
120154
{
121-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
122-
cancellationToken);
155+
return connection.CloseAsync(reasonCode, reasonText,
156+
InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken);
123157
}
124158

125159
/// <summary>

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ await _connection.DeleteRecordedChannelAsync(this,
234234
}
235235
}
236236

237+
public Task CloseAsync(ShutdownEventArgs args, bool abort,
238+
CancellationToken cancellationToken)
239+
{
240+
return CloseAsync(args, abort);
241+
}
242+
237243
public async Task CloseAsync(ShutdownEventArgs args, bool abort)
238244
{
239245
ThrowIfDisposed();

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,18 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
205205
return CloseAsync(args, abort);
206206
}
207207

208+
public Task CloseAsync(ShutdownEventArgs args, bool abort,
209+
CancellationToken cancellationToken)
210+
{
211+
return CloseAsync(args, abort);
212+
}
213+
208214
public async Task CloseAsync(ShutdownEventArgs args, bool abort)
209215
{
210216
CancellationToken cancellationToken = args.CancellationToken;
211217

212218
bool enqueued = false;
213-
// We should really try to clsoe the connection and therefore we don't allow this to be canceled by the user
219+
// We should really try to close the channel and therefore we don't allow this to be canceled by the user
214220
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, IsOpen ? CancellationToken.None : cancellationToken);
215221

216222
await _rpcSemaphore.WaitAsync(k.CancellationToken)

projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private async void HeartbeatReadTimerCallback(object? state)
109109
{
110110
var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds");
111111
LogCloseError(eose.Message, eose);
112-
await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose))
112+
await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose, _mainLoopCts.Token))
113113
.ConfigureAwait(false);
114114
shouldTerminate = true;
115115
}

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong p
55
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort
66
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string!
77
RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string!
8+
RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
89
RabbitMQ.Client.RabbitMQTracingOptions
910
RabbitMQ.Client.RabbitMQTracingOptions.RabbitMQTracingOptions() -> void
1011
RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.get -> bool
1112
RabbitMQ.Client.RabbitMQTracingOptions.UsePublisherAsParent.set -> void
1213
RabbitMQ.Client.RabbitMQTracingOptions.UseRoutingKeyAsOperationName.get -> bool
1314
RabbitMQ.Client.RabbitMQTracingOptions.UseRoutingKeyAsOperationName.set -> void
15+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task!
16+
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText) -> System.Threading.Tasks.Task!
1417
static RabbitMQ.Client.RabbitMQActivitySource.TracingOptions.get -> RabbitMQ.Client.RabbitMQTracingOptions!
1518
static RabbitMQ.Client.RabbitMQActivitySource.TracingOptions.set -> void

0 commit comments

Comments
 (0)