Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public async Task DeleteMessages(IReadOnlyList<long> positions)
public Task Initialize() => throw new NotSupportedException();
public Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages) => throw new NotSupportedException();
public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage) => throw new NotSupportedException();
public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions) => throw new NotSupportedException();
public Task Truncate(StoredId storedId) => throw new NotSupportedException();
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId) => throw new NotSupportedException();
public Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, IReadOnlyList<long> skipPositions) => throw new NotSupportedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ await functionStore.CreateFunction(
messages.Count.ShouldBe(4);

// Delete the first and third messages
await messageStore.DeleteMessages(functionId, new[] { messages[0].Position, messages[2].Position });
await messageStore.DeleteMessages(new[] { messages[0].Position, messages[2].Position });

var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
remainingMessages.Count.ShouldBe(2);
Expand Down Expand Up @@ -878,7 +878,7 @@ await functionStore.CreateFunction(
var messages = (await messageStore.GetMessages(functionId)).ToList();
messages.Count.ShouldBe(2);

await messageStore.DeleteMessages(functionId, new[] { messages[0].Position });
await messageStore.DeleteMessages(new[] { messages[0].Position });

var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
remainingMessages.Count.ShouldBe(1);
Expand All @@ -902,17 +902,25 @@ await functionStore.CreateFunction(
var messageStore = functionStore.MessageStore;

const string msg1 = "message1";
const string msg2 = "message2";

await messageStore.AppendMessage(functionId, new StoredMessage(msg1.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0));
await messageStore.AppendMessage(functionId, new StoredMessage(msg2.ToJson().ToUtf8Bytes(), typeof(string).SimpleQualifiedName().ToUtf8Bytes(), Replica: ReplicaId.Empty, Position: 0));

var messages = (await messageStore.GetMessages(functionId)).ToList();
messages.Count.ShouldBe(1);
messages.Count.ShouldBe(2);

// Try to delete messages at non-existent positions (should not throw)
await messageStore.DeleteMessages(functionId, new[] { 999L, 1000L });
// Delete the first message, then delete its now-vacant position again - deleting a position
// that no longer exists should be a no-op rather than throw. A real (already-deleted) position
// is used rather than an arbitrary number, since positions are globally unique and a hardcoded
// value could collide with another flow's message in a shared store.
var deletedPosition = messages[0].Position;
await messageStore.DeleteMessages(new[] { deletedPosition });
await messageStore.DeleteMessages(new[] { deletedPosition });

var remainingMessages = (await messageStore.GetMessages(functionId)).ToList();
remainingMessages.Count.ShouldBe(1);
remainingMessages[0].DefaultDeserialize().ShouldBe(msg2);
}

public abstract Task DeleteMessagesWithEmptyPositionsDoesNotThrow();
Expand All @@ -932,7 +940,7 @@ await functionStore.CreateFunction(
var messageStore = functionStore.MessageStore;

// Should not throw when deleting with empty positions list
await messageStore.DeleteMessages(functionId, Array.Empty<long>());
await messageStore.DeleteMessages(Array.Empty<long>());
}

public abstract Task DeleteMessagesOnlyAffectsSpecifiedStoredId();
Expand Down Expand Up @@ -974,8 +982,9 @@ await functionStore.CreateFunction(
var messages1 = (await messageStore.GetMessages(id1)).ToList();
messages1.Count.ShouldBe(2);

// Delete first message from id1 only
await messageStore.DeleteMessages(id1, new[] { messages1[0].Position });
// Delete first message from id1 only - the position is globally unique to that message,
// so deleting it must not affect id2's messages.
await messageStore.DeleteMessages(new[] { messages1[0].Position });

// Verify id1 has only one message left
var remainingMessages1 = (await messageStore.GetMessages(id1)).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,18 @@ protected async Task PostponingExistingFunctionFromControlPanelSucceeds(Task<IFu
controlPanel.Status.ShouldBe(Status.Failed);
controlPanel.FatalWorkflowException.ShouldNotBeNull();

await controlPanel.Postpone(new DateTime(1_000_000));
var postponeUntil = DateTime.UtcNow.AddDays(1);
await controlPanel.Postpone(postponeUntil);

await controlPanel.Refresh();
controlPanel.Status.ShouldBe(Status.Postponed);
controlPanel.PostponedUntil.ShouldNotBeNull();
controlPanel.PostponedUntil.Value.Ticks.ShouldBe(1_000_000);
controlPanel.PostponedUntil.Value.Ticks.ShouldBe(postponeUntil.Ticks);

var sf = await store.GetFunction(rFunc.MapToStoredId(functionId.Instance));
sf.ShouldNotBeNull();
sf.Status.ShouldBe(Status.Postponed);
sf.Expires.ShouldBe(1_000_000);
sf.Expires.ShouldBe(postponeUntil.Ticks);

var fwe = (FatalWorkflowException) unhandledExceptionCatcher.ThrownExceptions.Single().InnerException!;
fwe.ErrorType.ShouldBe(typeof(InvalidOperationException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async Task Replace<T>(int position, T message, string? idempotencyKey = n
/// <param name="position">Message position</param>
public async Task Remove(long position)
{
await _messageStore.DeleteMessages(_storedId, positions: [position]);
await _messageStore.DeleteMessages(positions: [position]);

// Invalidate cache so it will be re-fetched with correct data
_receivedMessages = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public interface IMessageStore
Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages);

Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage storedMessage);
Task DeleteMessages(StoredId storedId, IEnumerable<long> positions);

/// <summary>
/// Deletes the messages at the given positions regardless of which flow they belong to. Positions are
Expand Down
15 changes: 0 additions & 15 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -613,21 +613,6 @@ public Task<bool> ReplaceMessage(StoredId storedId, long position, StoredMessage
}
}

public Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
lock (_sync)
{
if (!_messages.ContainsKey(storedId))
return Task.CompletedTask;

var messages = _messages[storedId];
foreach (var position in positions)
messages.Remove(position);

return Task.CompletedTask;
}
}

public Task DeleteMessages(IReadOnlyList<long> positions)
{
lock (_sync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,6 @@ public async Task<bool> ReplaceMessage(StoredId storedId, long position, StoredM
return affectedRows == 1;
}

public async Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsList = positions.ToList();
if (positionsList.Count == 0)
return;

await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
await using var command = _sqlGenerator.DeleteMessages(storedId, positionsList).ToSqlCommand(conn);
await command.ExecuteNonQueryAsync();
}

public async Task DeleteMessages(IReadOnlyList<long> positions)
{
if (positions.Count == 0)
Expand Down
15 changes: 0 additions & 15 deletions Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -749,19 +749,4 @@ WHERE id IN ({storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ")})
.ToList());
}

public StoreCommand DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsList = positions.ToList();

var sql = @$"
DELETE FROM {tablePrefix}_messages
WHERE id = ? AND position IN ({string.Join(", ", positionsList.Select(_ => "?"))})";

var command = StoreCommand.Create(sql);
command.AddParameter(storedId.AsGuid.ToString("N"));
foreach (var position in positionsList)
command.AddParameter(position);

return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ public async Task<bool> ReplaceMessage(StoredId storedId, long position, StoredM
return affectedRows == 1;
}

public async Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsArray = positions.ToArray();
if (positionsArray.Length == 0)
return;

await using var conn = await CreateConnection();
await using var command = sqlGenerator.DeleteMessages(storedId, positionsArray).ToNpgsqlCommand(conn);
await command.ExecuteNonQueryAsync();
}

public async Task DeleteMessages(IReadOnlyList<long> positions)
{
if (positions.Count == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,16 +694,6 @@ public async Task<Dictionary<StoredId, IReadOnlyList<StoredMessage>>> ReadMessag
.ToList());
}

public StoreCommand DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsArray = positions.ToArray();
var sql = @$"
DELETE FROM {tablePrefix}_messages
WHERE id = $1 AND position = ANY($2)";

return StoreCommand.Create(sql, values: [ storedId.AsGuid, positionsArray ]);
}

private string? _setParametersSql;
private string? _setParametersSqlWithoutReplica;
public StoreCommand SetParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,18 +740,4 @@ public StoreCommand SetReplica(IEnumerable<long> positions, ReplicaId newReplica
.ToList());
}

public StoreCommand DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsList = positions.ToList();
var sql = @$"
DELETE FROM {tablePrefix}_Messages
WHERE Id = @Id AND Position IN ({positionsList.Select((_, i) => $"@Position{i}").StringJoin(", ")})";

var command = StoreCommand.Create(sql);
command.AddParameter("@Id", storedId.AsGuid);
for (var i = 0; i < positionsList.Count; i++)
command.AddParameter($"@Position{i}", positionsList[i]);

return command;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,6 @@ public async Task<bool> ReplaceMessage(StoredId storedId, long position, StoredM
return affectedRows == 1;
}

public async Task DeleteMessages(StoredId storedId, IEnumerable<long> positions)
{
var positionsList = positions.ToList();
if (positionsList.Count == 0)
return;

await using var conn = await CreateConnection();
await using var command = _sqlGenerator.DeleteMessages(storedId, positionsList).ToSqlCommand(conn);
await command.ExecuteNonQueryAsync();
}

public async Task DeleteMessages(IReadOnlyList<long> positions)
{
if (positions.Count == 0)
Expand Down