Skip to content

Commit 2ae3cfc

Browse files
committed
Async queue/dequeue
1 parent c8b6d42 commit 2ae3cfc

File tree

3 files changed

+18
-18
lines changed

3 files changed

+18
-18
lines changed

IPBanProSDKExtensionMethods.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public static IReadOnlyCollection<string> SplitWithNoEmptyEntries(this string te
171171
/// <param name="socket">Web socket</param>
172172
/// <param name="subscription">Subscription type</param>
173173
/// <returns>True if message queued, false if not</returns>
174-
public static bool SubscribeWebSocket(this ClientWebSocket socket, IPBanProAPIWebSocketSubscription subscription)
174+
public static System.Threading.Tasks.Task<bool> SubscribeWebSocket(this ClientWebSocket socket, IPBanProAPIWebSocketSubscription subscription)
175175
{
176176
return socket.QueueMessage(new Message
177177
{
@@ -189,7 +189,7 @@ public static bool SubscribeWebSocket(this ClientWebSocket socket, IPBanProAPIWe
189189
/// <param name="socket">Web socket</param>
190190
/// <param name="subscription">Subscription type</param>
191191
/// <returns>True if message queued, false if not</returns>
192-
public static bool UnsubscribeWebSocket(this ClientWebSocket socket, IPBanProAPIWebSocketSubscription subscription)
192+
public static System.Threading.Tasks.Task<bool> UnsubscribeWebSocket(this ClientWebSocket socket, IPBanProAPIWebSocketSubscription subscription)
193193
{
194194
return socket.QueueMessage(new Message
195195
{

Interfaces/IQueueMessage.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public interface IQueueMessage : IDisposable
3131
/// <param name="message">Message to send, should be a Message or byte[]</param>
3232
/// <param name="groupId">Group id, or 0 for none</param>
3333
/// <returns>True if success, false if error</returns>
34-
bool QueueMessage(object message, int groupId = 0);
34+
System.Threading.Tasks.Task<bool> QueueMessage(object message, int groupId = 0);
3535
}
3636

3737
/// <summary>
@@ -46,7 +46,7 @@ public interface IQueueMessageServer : IQueueMessage
4646
/// <param name="message">Message to queue, should be Message or byte[]</param>
4747
/// <param name="clients">Clients to send to (null for all)</param>
4848
/// <returns>True if queued, false if no group with the specified id or disposed</returns>
49-
bool QueueMessageForClients(int groupId, object message, System.Collections.Generic.IEnumerable<IQueueMessage> clients = null);
49+
System.Threading.Tasks.Task<bool> QueueMessageForClients(int groupId, object message, System.Collections.Generic.IEnumerable<IQueueMessage> clients = null);
5050
}
5151

5252
/// <summary>

WebSocket/ClientWebSocket.cs

+14-14
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void Dispose()
330330
/// <param name="message">Message to send</param>
331331
/// <param name="groupId">Group id, ignored for now</param>
332332
/// <returns>True if success, false if error</returns>
333-
public bool QueueMessage(object message, int groupId = IPBanProBaseAPI.WebSocketGroupIdNone)
333+
public async Task<bool> QueueMessage(object message, int groupId = IPBanProBaseAPI.WebSocketGroupIdNone)
334334
{
335335
if (webSocket is null ||
336336
message is null ||
@@ -413,7 +413,7 @@ async Task enqueueCallback(IQueueMessage socket)
413413
}
414414
}
415415

416-
QueueActions(enqueueCallback);
416+
await QueueActions(enqueueCallback);
417417
return true;
418418
}
419419

@@ -444,7 +444,7 @@ public void WaitForAck(string id, int timeoutMilliseconds = 60000)
444444
}
445445
}
446446

447-
private void QueueActions(params Func<IQueueMessage, Task>[] actions)
447+
private async Task QueueActions(params Func<IQueueMessage, Task>[] actions)
448448
{
449449
if (actions != null && actions.Length != 0)
450450
{
@@ -467,16 +467,16 @@ private void QueueActions(params Func<IQueueMessage, Task>[] actions)
467467
}
468468
};
469469

470-
messageQueue.Enqueue(enqueueActionsFunc);
470+
await messageQueue.EnqueueAsync(enqueueActionsFunc);
471471
}
472472
}
473473

474-
private void QueueActionsWithNoExceptions(params Func<IQueueMessage, Task>[] actions)
474+
private async Task QueueActionsWithNoExceptions(params Func<IQueueMessage, Task>[] actions)
475475
{
476476
if (actions != null && actions.Length != 0)
477477
{
478478
Func<IQueueMessage, Task>[] actionsCopy = actions;
479-
messageQueue.Enqueue((Func<Task>)(async () =>
479+
await messageQueue.EnqueueAsync((Func<Task>)(async () =>
480480
{
481481
foreach (var action in actionsCopy.Where(a => a != null))
482482
{
@@ -554,7 +554,7 @@ private async Task ReadTask()
554554

555555
// on connect may make additional calls that must succeed, such as rest calls
556556
// for lists, etc.
557-
QueueActionsWithNoExceptions(InvokeConnected);
557+
await QueueActionsWithNoExceptions(InvokeConnected);
558558

559559
while (webSocket.State == WebSocketState.Open)
560560
{
@@ -566,7 +566,7 @@ private async Task ReadTask()
566566
if (result.MessageType == WebSocketMessageType.Close)
567567
{
568568
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationTokenSource.Token);
569-
QueueActions(InvokeDisconnected);
569+
await QueueActions(InvokeDisconnected);
570570
}
571571
else
572572
{
@@ -586,7 +586,7 @@ private async Task ReadTask()
586586
if (result.MessageType == WebSocketMessageType.Text)
587587
{
588588
string text = Encoding.UTF8.GetString(stream.GetBuffer(), 0, (int)stream.Length);
589-
messageQueue.Enqueue(text);
589+
await messageQueue.EnqueueAsync(text);
590590
}
591591
// otherwise treat message as binary
592592
else
@@ -618,12 +618,12 @@ private async Task ReadTask()
618618
}
619619
else
620620
{
621-
messageQueue.Enqueue(message);
621+
await messageQueue.EnqueueAsync(message);
622622
}
623623
}
624624
else
625625
{
626-
messageQueue.Enqueue(stream.ToArray());
626+
await messageQueue.EnqueueAsync(stream.ToArray());
627627
}
628628
}
629629
}
@@ -648,7 +648,7 @@ private async Task ReadTask()
648648
{
649649
if (wasConnected)
650650
{
651-
QueueActions(InvokeDisconnected);
651+
await QueueActions(InvokeDisconnected);
652652
}
653653

654654
Logger.Info("Client web socket was disconnected from {0}, attempting reconnection...", uri);
@@ -701,7 +701,7 @@ private async Task MessageTask()
701701
{
702702
Logger.Debug("Sending ping from client web socket connection to {0}", Uri);
703703
lastPing = IPBanService.UtcNow;
704-
QueueMessage("ping");
704+
await QueueMessage("ping");
705705
}
706706

707707
if ((result = await messageQueue.TryDequeueAsync(messageQueueTimeout, cancellationTokenSource.Token)).Key)
@@ -755,7 +755,7 @@ private async Task MessageTask()
755755
lastCheck = IPBanService.UtcNow;
756756

757757
// this must succeed, the callback may be requests lists or other resources that must not fail
758-
QueueActionsWithNoExceptions(InvokeConnected);
758+
await QueueActionsWithNoExceptions(InvokeConnected);
759759
}
760760
}
761761
}

0 commit comments

Comments
 (0)