diff --git a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs index a4574c6..224364b 100644 --- a/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs +++ b/src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs @@ -1,3 +1,4 @@ +using System.Collections.Frozen; using System.Collections.ObjectModel; using System.Diagnostics; using System.Runtime.CompilerServices; @@ -14,7 +15,14 @@ public SessionReceiveMessage(ReadOnlySpan buffer) { var readBytes = 0; readBytes += DecodeMessageBody(buffer, out var body); - readBytes += DecodeHeaders(buffer[readBytes..], out var headers); + readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var messageId); + readBytes += ArtemisBinaryConverter.ReadNullableSimpleString(buffer[readBytes..], out var address); + readBytes += ArtemisBinaryConverter.ReadNullableGuid(buffer[readBytes..], out var userId); + readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type); + readBytes += ArtemisBinaryConverter.ReadBool(buffer[readBytes..], out var durable); + readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration); + readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp); + readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority); readBytes += DecodeProperties(buffer[readBytes..], out var properties); readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId); readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount); @@ -22,9 +30,16 @@ public SessionReceiveMessage(ReadOnlySpan buffer) Message = new ReceivedMessage { Body = body, - Headers = headers, + MessageId = messageId, + Address = address ?? "", + UserId = userId, + Type = type, + Durable = durable, + Expiration = expiration == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(expiration), + Timestamp = timestamp == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(timestamp), + Priority = priority, Properties = properties, - MessageDelivery = new MessageDelivery(ConsumerId, headers.MessageId) + MessageDelivery = new MessageDelivery(ConsumerId, messageId), }; Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}"); @@ -43,34 +58,6 @@ private static int DecodeMessageBody(ReadOnlySpan buffer, out ReadOnlyMemo return offset + bodyLength; } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int DecodeHeaders(ReadOnlySpan buffer, out ReadOnlyHeaders value) - { - var offset = 0; - offset += ArtemisBinaryConverter.ReadInt64(buffer, out var messageId); - offset += ArtemisBinaryConverter.ReadNullableSimpleString(buffer[offset..], out var address); - offset += ArtemisBinaryConverter.ReadNullableGuid(buffer[offset..], out var userId); - offset += ArtemisBinaryConverter.ReadByte(buffer[offset..], out var type); - offset += ArtemisBinaryConverter.ReadBool(buffer[offset..], out var durable); - offset += ArtemisBinaryConverter.ReadInt64(buffer[offset..], out var expiration); - offset += ArtemisBinaryConverter.ReadInt64(buffer[offset..], out var timestamp); - offset += ArtemisBinaryConverter.ReadByte(buffer[offset..], out var priority); - - value = new ReadOnlyHeaders - { - MessageId = messageId, - Address = address ?? "", - UserId = userId, - Type = type, - Durable = durable, - Expiration = expiration == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(expiration), - Timestamp = timestamp == 0 ? DateTimeOffset.MinValue : DateTimeOffset.FromUnixTimeMilliseconds(timestamp), - Priority = priority - }; - - return offset; - } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDictionary value) @@ -87,7 +74,7 @@ private static int DecodeProperties(ReadOnlySpan buffer, out IReadOnlyDict properties.Add(key, obj); } - value = properties; + value = properties.ToFrozenDictionary(); return readBytes; } diff --git a/src/ArtemisNetCoreClient/ReadOnlyHeaders.cs b/src/ArtemisNetCoreClient/ReadOnlyHeaders.cs deleted file mode 100644 index 22703b8..0000000 --- a/src/ArtemisNetCoreClient/ReadOnlyHeaders.cs +++ /dev/null @@ -1,28 +0,0 @@ -namespace ActiveMQ.Artemis.Core.Client; - -public readonly struct ReadOnlyHeaders -{ - public required long MessageId { get; init; } - - public required string Address { get; init; } - - public required Guid? UserId { get; init; } - - public required byte Type { get; init; } - - public required bool Durable { get; init; } - - /// - /// Gets the date and time when this message expires. If set to , - /// the message is considered to never expire. - /// - public DateTimeOffset Expiration { get; init; } - - /// - /// Gets the date and time when this message was created. - /// - public required DateTimeOffset Timestamp { get; init; } - - // TODO: Enum? - public required byte Priority { get; init; } -} \ No newline at end of file diff --git a/src/ArtemisNetCoreClient/ReceivedMessage.cs b/src/ArtemisNetCoreClient/ReceivedMessage.cs index 3bed7d0..d8e7d95 100644 --- a/src/ArtemisNetCoreClient/ReceivedMessage.cs +++ b/src/ArtemisNetCoreClient/ReceivedMessage.cs @@ -6,10 +6,29 @@ namespace ActiveMQ.Artemis.Core.Client; /// public class ReceivedMessage { + public required long MessageId { get; init; } + + public required string Address { get; init; } + + public required Guid? UserId { get; init; } + + public required byte Type { get; init; } + + public required bool Durable { get; init; } + /// - /// The message headers + /// Gets the date and time when this message expires. If set to , + /// the message is considered to never expire. /// - public required ReadOnlyHeaders Headers { get; init; } + public DateTimeOffset Expiration { get; init; } + + /// + /// Gets the date and time when this message was created. + /// + public required DateTimeOffset Timestamp { get; init; } + + // TODO: Enum? + public required byte Priority { get; init; } /// /// The message properties diff --git a/test/ArtemisNetCoreClient.Tests/MessageExpirationSpec.cs b/test/ArtemisNetCoreClient.Tests/MessageExpirationSpec.cs index f8c10ca..b96c5a0 100644 --- a/test/ArtemisNetCoreClient.Tests/MessageExpirationSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/MessageExpirationSpec.cs @@ -42,7 +42,7 @@ await producer.SendMessageAsync(new Message // Assert Assert.NotNull(receivedMessage); Assert.Equal("expiry_message"u8.ToArray(), receivedMessage.Body.ToArray()); - Assert.Equal(expiration.DropTicsPrecision(), receivedMessage.Headers.Expiration); + Assert.Equal(expiration.DropTicsPrecision(), receivedMessage.Expiration); } [Fact] diff --git a/test/ArtemisNetCoreClient.Tests/MessageTimestampSpec.cs b/test/ArtemisNetCoreClient.Tests/MessageTimestampSpec.cs index 81ac7ff..7670565 100644 --- a/test/ArtemisNetCoreClient.Tests/MessageTimestampSpec.cs +++ b/test/ArtemisNetCoreClient.Tests/MessageTimestampSpec.cs @@ -40,6 +40,6 @@ await producer.SendMessageAsync(new Message // Assert Assert.NotNull(receivedMessage); - Assert.Equal(timestamp.DropTicsPrecision(), receivedMessage.Headers.Timestamp); + Assert.Equal(timestamp.DropTicsPrecision(), receivedMessage.Timestamp); } } \ No newline at end of file