Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten ReceivedMessage structure #102

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 19 additions & 32 deletions src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Frozen;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Runtime.CompilerServices;
Expand All @@ -14,17 +15,31 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
{
var readBytes = 0;
readBytes += DecodeMessageBody(buffer, out var body);
readBytes += DecodeHeaders(buffer[readBytes..], out var headers);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var messageId);
readBytes += ArtemisBinaryConverter.ReadNullableSimpleString(buffer[readBytes..], out var address);
readBytes += ArtemisBinaryConverter.ReadNullableGuid(buffer[readBytes..], out var userId);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type);
readBytes += ArtemisBinaryConverter.ReadBool(buffer[readBytes..], out var durable);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority);
readBytes += DecodeProperties(buffer[readBytes..], out var properties);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId);
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount);

Message = new ReceivedMessage
{
Body = body,
Headers = headers,
MessageId = messageId,
Address = address ?? "",
UserId = userId,
Type = type,
Durable = durable,
Expiration = expiration == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(expiration),
Timestamp = timestamp == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(timestamp),
Priority = priority,
Properties = properties,
MessageDelivery = new MessageDelivery(ConsumerId, headers.MessageId)
MessageDelivery = new MessageDelivery(ConsumerId, messageId),
};

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
Expand All @@ -43,34 +58,6 @@ private static int DecodeMessageBody(ReadOnlySpan<byte> buffer, out ReadOnlyMemo

return offset + bodyLength;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeHeaders(ReadOnlySpan<byte> buffer, out ReadOnlyHeaders value)
{
var offset = 0;
offset += ArtemisBinaryConverter.ReadInt64(buffer, out var messageId);
offset += ArtemisBinaryConverter.ReadNullableSimpleString(buffer[offset..], out var address);
offset += ArtemisBinaryConverter.ReadNullableGuid(buffer[offset..], out var userId);
offset += ArtemisBinaryConverter.ReadByte(buffer[offset..], out var type);
offset += ArtemisBinaryConverter.ReadBool(buffer[offset..], out var durable);
offset += ArtemisBinaryConverter.ReadInt64(buffer[offset..], out var expiration);
offset += ArtemisBinaryConverter.ReadInt64(buffer[offset..], out var timestamp);
offset += ArtemisBinaryConverter.ReadByte(buffer[offset..], out var priority);

value = new ReadOnlyHeaders
{
MessageId = messageId,
Address = address ?? "",
UserId = userId,
Type = type,
Durable = durable,
Expiration = expiration == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(expiration),
Timestamp = timestamp == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(timestamp),
Priority = priority
};

return offset;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> value)
Expand All @@ -87,7 +74,7 @@ private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDict
properties.Add(key, obj);
}

value = properties;
value = properties.ToFrozenDictionary();

return readBytes;
}
Expand Down
28 changes: 0 additions & 28 deletions src/ArtemisNetCoreClient/ReadOnlyHeaders.cs

This file was deleted.

23 changes: 21 additions & 2 deletions src/ArtemisNetCoreClient/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,29 @@ namespace ActiveMQ.Artemis.Core.Client;
/// </summary>
public class ReceivedMessage
{
public required long MessageId { get; init; }

public required string Address { get; init; }

public required Guid? UserId { get; init; }

public required byte Type { get; init; }

public required bool Durable { get; init; }

/// <summary>
/// The message headers
/// Gets the date and time when this message expires. If set to <see cref="DateTimeOffset.MinValue"/>,
/// the message is considered to never expire.
/// </summary>
public required ReadOnlyHeaders Headers { get; init; }
public DateTimeOffset Expiration { get; init; }

/// <summary>
/// Gets the date and time when this message was created.
/// </summary>
public required DateTimeOffset Timestamp { get; init; }

// TODO: Enum?
public required byte Priority { get; init; }

/// <summary>
/// The message properties
Expand Down
2 changes: 1 addition & 1 deletion test/ArtemisNetCoreClient.Tests/MessageExpirationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ await producer.SendMessageAsync(new Message
// Assert
Assert.NotNull(receivedMessage);
Assert.Equal("expiry_message"u8.ToArray(), receivedMessage.Body.ToArray());
Assert.Equal(expiration.DropTicsPrecision(), receivedMessage.Headers.Expiration);
Assert.Equal(expiration.DropTicsPrecision(), receivedMessage.Expiration);
}

[Fact]
Expand Down
2 changes: 1 addition & 1 deletion test/ArtemisNetCoreClient.Tests/MessageTimestampSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ await producer.SendMessageAsync(new Message

// Assert
Assert.NotNull(receivedMessage);
Assert.Equal(timestamp.DropTicsPrecision(), receivedMessage.Headers.Timestamp);
Assert.Equal(timestamp.DropTicsPrecision(), receivedMessage.Timestamp);
}
}
Loading