diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageClearerTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageClearerTests.cs index a0ebf7f6..30e5613c 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageClearerTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/InMemoryTests/MessageClearerTests.cs @@ -167,7 +167,6 @@ public async Task DeleteMessages(IReadOnlyList positions) public Task Initialize() => throw new NotSupportedException(); public Task AppendMessages(IReadOnlyList messages) => throw new NotSupportedException(); public Task ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException(); - public Task DeleteMessages(StoredId storedId, IEnumerable positions) => throw new NotSupportedException(); public Task Truncate(StoredId storedId) => throw new NotSupportedException(); public Task> GetMessages(StoredId storedId) => throw new NotSupportedException(); public Task> GetMessages(StoredId storedId, IReadOnlyList skipPositions) => throw new NotSupportedException(); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs index 3af12e38..cedb8ae1 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessageStoreTests.cs @@ -845,7 +845,7 @@ await functionStore.CreateFunction( messages.Count.ShouldBe(4); // Delete the first and third messages - await messageStore.DeleteMessages(functionId, new[] { messages[0].Position, messages[2].Position }); + await messageStore.DeleteMessages(new[] { messages[0].Position, messages[2].Position }); var remainingMessages = (await messageStore.GetMessages(functionId)).ToList(); remainingMessages.Count.ShouldBe(2); @@ -878,7 +878,7 @@ await functionStore.CreateFunction( var messages = (await messageStore.GetMessages(functionId)).ToList(); messages.Count.ShouldBe(2); - await messageStore.DeleteMessages(functionId, new[] { messages[0].Position }); + await messageStore.DeleteMessages(new[] { messages[0].Position }); var remainingMessages = (await messageStore.GetMessages(functionId)).ToList(); remainingMessages.Count.ShouldBe(1); @@ -902,17 +902,25 @@ await functionStore.CreateFunction( var messageStore = functionStore.MessageStore; const string msg1 = "message1"; + const string msg2 = "message2"; await messageStore.AppendMessage(functionId, new StoredMessage(msg1.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0)); + await messageStore.AppendMessage(functionId, new StoredMessage(msg2.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0)); var messages = (await messageStore.GetMessages(functionId)).ToList(); - messages.Count.ShouldBe(1); + messages.Count.ShouldBe(2); - // Try to delete messages at non-existent positions (should not throw) - await messageStore.DeleteMessages(functionId, new[] { 999L, 1000L }); + // Delete the first message, then delete its now-vacant position again - deleting a position + // that no longer exists should be a no-op rather than throw. A real (already-deleted) position + // is used rather than an arbitrary number, since positions are globally unique and a hardcoded + // value could collide with another flow's message in a shared store. + var deletedPosition = messages[0].Position; + await messageStore.DeleteMessages(new[] { deletedPosition }); + await messageStore.DeleteMessages(new[] { deletedPosition }); var remainingMessages = (await messageStore.GetMessages(functionId)).ToList(); remainingMessages.Count.ShouldBe(1); + remainingMessages[0].DefaultDeserialize().ShouldBe(msg2); } public abstract Task DeleteMessagesWithEmptyPositionsDoesNotThrow(); @@ -932,7 +940,7 @@ await functionStore.CreateFunction( var messageStore = functionStore.MessageStore; // Should not throw when deleting with empty positions list - await messageStore.DeleteMessages(functionId, Array.Empty()); + await messageStore.DeleteMessages(Array.Empty()); } public abstract Task DeleteMessagesOnlyAffectsSpecifiedStoredId(); @@ -974,8 +982,9 @@ await functionStore.CreateFunction( var messages1 = (await messageStore.GetMessages(id1)).ToList(); messages1.Count.ShouldBe(2); - // Delete first message from id1 only - await messageStore.DeleteMessages(id1, new[] { messages1[0].Position }); + // Delete first message from id1 only - the position is globally unique to that message, + // so deleting it must not affect id2's messages. + await messageStore.DeleteMessages(new[] { messages1[0].Position }); // Verify id1 has only one message left var remainingMessages1 = (await messageStore.GetMessages(id1)).ToList(); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs index c338032a..ebe8be33 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs @@ -174,17 +174,18 @@ protected async Task PostponingExistingFunctionFromControlPanelSucceeds(Task(int position, T message, string? idempotencyKey = n /// Message position public async Task Remove(long position) { - await _messageStore.DeleteMessages(_storedId, positions: [position]); + await _messageStore.DeleteMessages(positions: [position]); // Invalidate cache so it will be re-fetched with correct data _receivedMessages = null; diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs index 484355d7..00cfa16e 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/IMessageStore.cs @@ -19,7 +19,6 @@ public interface IMessageStore Task AppendMessages(IReadOnlyList messages); Task ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage); - Task DeleteMessages(StoredId storedId, IEnumerable positions); /// /// Deletes the messages at the given positions regardless of which flow they belong to. Positions are diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index abf56b24..af648b51 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -613,21 +613,6 @@ public Task ReplaceMessage(StoredId storedId, long position, StoredMessage } } - public Task DeleteMessages(StoredId storedId, IEnumerable positions) - { - lock (_sync) - { - if (!_messages.ContainsKey(storedId)) - return Task.CompletedTask; - - var messages = _messages[storedId]; - foreach (var position in positions) - messages.Remove(position); - - return Task.CompletedTask; - } - } - public Task DeleteMessages(IReadOnlyList positions) { lock (_sync) diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs index d9692ea0..f3fb3686 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs @@ -103,17 +103,6 @@ public async Task ReplaceMessage(StoredId storedId, long position, StoredM return affectedRows == 1; } - public async Task DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsList = positions.ToList(); - if (positionsList.Count == 0) - return; - - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - await using var command = _sqlGenerator.DeleteMessages(storedId, positionsList).ToSqlCommand(conn); - await command.ExecuteNonQueryAsync(); - } - public async Task DeleteMessages(IReadOnlyList positions) { if (positions.Count == 0) diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs index 12c74632..6e113d67 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs @@ -749,19 +749,4 @@ WHERE id IN ({storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ")}) .ToList()); } - public StoreCommand DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsList = positions.ToList(); - - var sql = @$" - DELETE FROM {tablePrefix}_messages - WHERE id = ? AND position IN ({string.Join(", ", positionsList.Select(_ => "?"))})"; - - var command = StoreCommand.Create(sql); - command.AddParameter(storedId.AsGuid.ToString("N")); - foreach (var position in positionsList) - command.AddParameter(position); - - return command; - } } \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index 44c89566..5d85b1e3 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -107,17 +107,6 @@ public async Task ReplaceMessage(StoredId storedId, long position, StoredM return affectedRows == 1; } - public async Task DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsArray = positions.ToArray(); - if (positionsArray.Length == 0) - return; - - await using var conn = await CreateConnection(); - await using var command = sqlGenerator.DeleteMessages(storedId, positionsArray).ToNpgsqlCommand(conn); - await command.ExecuteNonQueryAsync(); - } - public async Task DeleteMessages(IReadOnlyList positions) { if (positions.Count == 0) diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index bf230d4d..1ae54681 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -694,16 +694,6 @@ public async Task>> ReadMessag .ToList()); } - public StoreCommand DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsArray = positions.ToArray(); - var sql = @$" - DELETE FROM {tablePrefix}_messages - WHERE id = $1 AND position = ANY($2)"; - - return StoreCommand.Create(sql, values: [ storedId.AsGuid, positionsArray ]); - } - private string? _setParametersSql; private string? _setParametersSqlWithoutReplica; public StoreCommand SetParameters( diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs index ae58104a..4c51b628 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs @@ -740,18 +740,4 @@ public StoreCommand SetReplica(IEnumerable positions, ReplicaId newReplica .ToList()); } - public StoreCommand DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsList = positions.ToList(); - var sql = @$" - DELETE FROM {tablePrefix}_Messages - WHERE Id = @Id AND Position IN ({positionsList.Select((_, i) => $"@Position{i}").StringJoin(", ")})"; - - var command = StoreCommand.Create(sql); - command.AddParameter("@Id", storedId.AsGuid); - for (var i = 0; i < positionsList.Count; i++) - command.AddParameter($"@Position{i}", positionsList[i]); - - return command; - } } \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs index 49f7695b..932796a2 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs @@ -124,17 +124,6 @@ public async Task ReplaceMessage(StoredId storedId, long position, StoredM return affectedRows == 1; } - public async Task DeleteMessages(StoredId storedId, IEnumerable positions) - { - var positionsList = positions.ToList(); - if (positionsList.Count == 0) - return; - - await using var conn = await CreateConnection(); - await using var command = _sqlGenerator.DeleteMessages(storedId, positionsList).ToSqlCommand(conn); - await command.ExecuteNonQueryAsync(); - } - public async Task DeleteMessages(IReadOnlyList positions) { if (positions.Count == 0)