Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Domain.Exceptions;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Shouldly;

namespace Cleipnir.ResilientFunctions.Tests.Messaging.InMemoryTests;

[TestClass]
public class MessageClearerTests
{
private static readonly TimeSpan MaxWait = TimeSpan.FromSeconds(10);

[TestMethod]
public async Task ClearCoalescesCallsArrivingWhileADeleteIsInFlight()
{
var firstDeleteReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var releaseFirstDelete = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var callCount = 0;

var store = new ControllableMessageStore(async _ =>
{
if (Interlocked.Increment(ref callCount) == 1)
{
firstDeleteReached.SetResult();
await releaseFirstDelete.Task;
}
});
var clearer = CreateClearer(store);

// First call starts the drain and blocks inside DeleteMessages.
var first = clearer.Clear([1]);
await firstDeleteReached.Task.WaitAsync(MaxWait);

// These arrive mid-flight, so they should be batched into a single follow-up delete.
var second = clearer.Clear([2]);
var third = clearer.Clear([3]);

releaseFirstDelete.SetResult();
await Task.WhenAll(first, second, third).WaitAsync(MaxWait);

store.DeletedBatches.Count.ShouldBe(2);
store.DeletedBatches[0].ShouldBe(new long[] { 1 });
store.DeletedBatches[1].OrderBy(p => p).ShouldBe(new long[] { 2, 3 });
}

[TestMethod]
public async Task ClearRetriesUntilDeleteSucceedsAndNotifiesEachFailure()
{
var unhandledLock = new Lock();
var unhandled = new List<FrameworkException>();
var failuresRemaining = 3;

var store = new ControllableMessageStore(_ =>
Interlocked.Decrement(ref failuresRemaining) >= 0
? throw new InvalidOperationException("boom")
: Task.CompletedTask
);
var clearer = CreateClearer(
store,
onUnhandledException: e => { lock (unhandledLock) unhandled.Add(e); },
retryDelay: TimeSpan.FromMilliseconds(10)
);

// Despite the first three deletes throwing, the caller's task completes (it is never faulted).
await clearer.Clear([1]).WaitAsync(MaxWait);

store.DeletedPositions.ShouldContain(1L);
lock (unhandledLock)
{
unhandled.Count.ShouldBe(3);
unhandled.ShouldAllBe(e => e.InnerException is InvalidOperationException);
}
}

[TestMethod]
public async Task ClearCompletesEveryCallerUnderConcurrentLoad()
{
var store = new ControllableMessageStore(_ => Task.CompletedTask);
var clearer = CreateClearer(store);

var tasks = Enumerable
.Range(0, 200)
.Select(i => clearer.Clear([i]))
.ToArray();

await Task.WhenAll(tasks).WaitAsync(MaxWait);

store.DeletedPositions.OrderBy(p => p).ShouldBe(Enumerable.Range(0, 200).Select(i => (long)i));
}

[TestMethod]
public async Task ClearRemovesPositionsFromIgnoreSetOnceDeleted()
{
var store = new ControllableMessageStore(_ => Task.CompletedTask);
var clearer = CreateClearer(store);

clearer.MarkPushed([1, 2, 3]);
clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 2, 3 });

await clearer.Clear([2]).WaitAsync(MaxWait);

clearer.NonClearedPositions().OrderBy(p => p).ShouldBe(new long[] { 1, 3 });
}

[TestMethod]
public async Task ClearedPositionsAreGoneFromTheStoreWhenTheReturnedTaskCompletes()
{
var functionStore = new InMemoryFunctionStore();
var messageStore = functionStore.MessageStore;
var storedId = TestStoredId.Create();

await messageStore.AppendMessages([
new StoredIdAndMessage(storedId, Message()),
new StoredIdAndMessage(storedId, Message()),
new StoredIdAndMessage(storedId, Message())
]);
var positions = (await messageStore.GetMessages(storedId)).Select(m => m.Position).ToList();
positions.Count.ShouldBe(3);

var clearer = CreateClearer(messageStore);
await clearer.Clear(positions.Take(2).ToList()).WaitAsync(MaxWait);

// The instant Clear's task completes, the cleared messages must already be gone from the store.
var remaining = (await messageStore.GetMessages(storedId)).Select(m => m.Position).ToList();
remaining.ShouldBe(new[] { positions[2] });
}

private static StoredMessage Message()
=> new(MessageContent: new byte[] { 1 }, MessageType: new byte[] { 2 }, Position: 0, Replica: ReplicaId.Empty);

private static MessageClearer CreateClearer(
IMessageStore messageStore,
Action<FrameworkException>? onUnhandledException = null,
TimeSpan? retryDelay = null)
=> new(
messageStore,
new UnhandledExceptionHandler(onUnhandledException ?? (_ => { })),
retryDelay ?? TimeSpan.FromSeconds(1)
);

// Minimal IMessageStore that only implements the positions-only DeleteMessages (the sole method
// MessageClearer touches); every other member is irrelevant to these tests.
private sealed class ControllableMessageStore(Func<IReadOnlyList<long>, Task> onDelete) : IMessageStore
{
private readonly Lock _lock = new();
public List<long[]> DeletedBatches { get; } = new();
public IEnumerable<long> DeletedPositions => DeletedBatches.SelectMany(b => b);

public async Task DeleteMessages(IReadOnlyList<long> positions)
{
var batch = positions.ToArray();
lock (_lock)
DeletedBatches.Add(batch);
await onDelete(batch);
}

public Task Initialize() => throw new NotSupportedException();
public Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages) => throw new NotSupportedException();
public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException();
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions) => throw new NotSupportedException();
public Task Truncate(StoredId storedId) => throw new NotSupportedException();
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId) => throw new NotSupportedException();
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions) => throw new NotSupportedException();
public Task<Dictionary<StoredId, List<StoredMessage>>> GetMessages(IEnumerable<StoredId> storedIds) => throw new NotSupportedException();
public Task<List<StoredMessages>> GetMessagesForReplica(ReplicaId replicaId, IReadOnlyList<long> ignorePositions) => throw new NotSupportedException();
public Task<List<StoredIdAndPosition>> GetCrashedReplicaMessages(IReadOnlySet<ReplicaId> liveReplicas) => throw new NotSupportedException();
public Task SetReplica(IEnumerable<long> positions, ReplicaId newReplica, ReplicaId expectedReplica) => throw new NotSupportedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ namespace Cleipnir.ResilientFunctions.Tests.Messaging.TestTemplates;

public abstract class MessagesSubscriptionTests
{
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageWatchdog. They don't
// These tests hand-roll a QueueManager, which delegates message deletion to IMessageClearer. They don't
// assert on store cleanup, so a no-op stub suffices.
private static readonly IMessageWatchdog StubMessageWatchdog = new NoopMessageWatchdog();
private static readonly IMessageClearer StubMessageClearer = new NoopMessageClearer();

private sealed class NoopMessageWatchdog : IMessageWatchdog
private sealed class NoopMessageClearer : IMessageClearer
{
public Task RemoveMessages(StoredId storedId, IReadOnlyList<long> positions) => Task.CompletedTask;
public Task Clear(IReadOnlyList<long> positions) => Task.CompletedTask;
}

public abstract Task EventsSubscriptionSunshineScenario();
Expand Down Expand Up @@ -582,7 +582,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
StubMessageWatchdog
StubMessageClearer
);

var queueClient = await queueManager.CreateQueueClient();
Expand Down Expand Up @@ -646,7 +646,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
minimumTimeout,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
StubMessageWatchdog
StubMessageClearer
);


Expand Down Expand Up @@ -708,7 +708,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
flowTimeouts,
() => DateTime.UtcNow,
SettingsWithDefaults.Default,
StubMessageWatchdog
StubMessageClearer
);

var queueClient = await queueManager.CreateQueueClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ internal class InvocationHelper<TParam, TReturn>
private readonly StoredType _storedType;
private readonly ReplicaId _replicaId;
private readonly ResultBusyWaiter<TReturn> _resultBusyWaiter;
private readonly MessageWatchdog _messageWatchdog;
private readonly IMessageClearer _messageClearer;
public UtcNow UtcNow { get; }

private ISerializer Serializer { get; }

public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, MessageWatchdog messageWatchdog)
public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId replicaId, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, ISerializer serializer, UtcNow utcNow, bool clearChildren, IMessageClearer messageClearer)
{
_flowType = flowType;
_isParamlessFunction = isParamlessFunction;
Expand All @@ -44,7 +44,7 @@ public InvocationHelper(FlowType flowType, StoredType storedType, ReplicaId repl
_storedType = storedType;
_replicaId = replicaId;
_functionStore = functionStore;
_messageWatchdog = messageWatchdog;
_messageClearer = messageClearer;
_resultBusyWaiter = new ResultBusyWaiter<TReturn>(_functionStore, Serializer);
}

Expand Down Expand Up @@ -413,7 +413,7 @@ public async Task<ExistingEffects> CreateExistingEffects(FlowId flowId)
public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer);

public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageWatchdog);
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings, _messageClearer);

public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;

/// <summary>
/// The slice of <see cref="MessageClearer"/> a QueueManager depends on: deleting handled messages from the store
/// and dropping their positions from the watchdog's ignore-set. Exists so tests that hand-roll a QueueManager can
/// pass a no-op stub instead of a fully wired clearer.
/// </summary>
internal interface IMessageClearer
{
Task Clear(IReadOnlyList<long> positions);
}

This file was deleted.

Loading