Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse routing type for the ReceivedMessage #139

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ArtemisNetCoreClient/Framing/ArtemisBinaryConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ public static int GetNullableObjectByteCount(object? value)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int ReadNullableObject(in ReadOnlySpan<byte> source, out object? value)
{
var readBytes = ReadByte(source, out var isNotNull);
switch (isNotNull)
var readBytes = ReadByte(source, out var type);
switch (type)
{
case DataConstants.Null:
value = null;
Expand Down Expand Up @@ -607,7 +607,7 @@ public static int ReadNullableObject(in ReadOnlySpan<byte> source, out object? v
value = longValue;
break;
default:
throw new NotSupportedException($"Unsupported object type: {isNotNull}");
throw new NotSupportedException($"Unsupported object type: {type}");
}

return readBytes;
Expand Down
38 changes: 25 additions & 13 deletions src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority);
readBytes += DecodeProperties(buffer[readBytes..], out var properties);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId);
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount);

Expand All @@ -40,6 +40,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
Priority = priority,
Properties = properties,
MessageDelivery = new MessageDelivery(ConsumerId, messageId),
RoutingType = routingType
};

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
Expand All @@ -60,28 +61,39 @@ private static int DecodeMessageBody(ReadOnlySpan<byte> buffer, out ReadOnlyMemo
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> value)
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> properties, out RoutingType? routingType)
{
routingType = null;
properties = ReadOnlyDictionary<string, object?>.Empty;

var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
if (isNotNull == DataConstants.NotNull)
{
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out var count);
var properties = new Dictionary<string, object?>(count);
var mutableProperties = new Dictionary<string, object?>(count);
for (var i = 0; i < count; i++)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out var key);
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
properties.Add(key, obj);

if (key == MessageHeaders.RoutingType)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type);
if (type == DataConstants.Byte)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var routingTypeByte);
routingType = (RoutingType) routingTypeByte;
}
}
else
{
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
mutableProperties.Add(key, obj);
}
}

value = properties.ToFrozenDictionary();

return readBytes;
}
else
{
value = ReadOnlyDictionary<string, object?>.Empty;
return readBytes;
properties = mutableProperties.ToFrozenDictionary();
}

return readBytes;
}
}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public class ReceivedMessage
/// even if the message has been disposed or discarded.
/// </summary>
public required MessageDelivery MessageDelivery { get; init; }

/// <summary>
/// The routing type used when sending the message.
/// </summary>
public required RoutingType? RoutingType { get; init; }
}
3 changes: 2 additions & 1 deletion test/ArtemisNetCoreClient.Tests/AnonymousProducerSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using ActiveMQ.Artemis.Core.Client.InternalUtilities;
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using NScenario;
using Xunit;
Expand Down Expand Up @@ -185,6 +184,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
Assert.All(messages, message =>
{
Assert.NotNull(message);
Assert.Equal(RoutingType.Anycast, message.RoutingType);
Assert.Equal("anycast_msg"u8.ToArray(), message.Body.ToArray());
});
});
Expand Down Expand Up @@ -222,6 +222,7 @@ await consumer.ReceiveMessageAsync(testFixture.CancellationToken)
Assert.All(messages, message =>
{
Assert.NotNull(message);
Assert.Equal(RoutingType.Multicast, message.RoutingType);
Assert.Equal("multicast_msg"u8.ToArray(), message.Body.ToArray());
});
});
Expand Down
Loading