Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
.version = "0.0.0",
.dependencies = .{
.ssz = .{
.url = "git+https://github.com/blockblaz/ssz.zig#c5394395dd7d0f8eda685c4723ad25ebbf550570",
.hash = "ssz-0.0.9-Lfwd693UAgBn89Sl4ljI4p3jW6ioLgonPWIdhIeIUDMN",
.url = "git+https://github.com/blockblaz/ssz.zig#50ddfa98d3b6485ae4e445a61a3b5392be3e9a62",
.hash = "ssz-0.0.9-Lfwd6zf0AgAYs-N72OTKqQGZEe6tF-YfpHn41ykIlDiH",
},
.zigcli = .{
.url = "git+https://github.com/jiacai2050/zigcli#67a06e34cbe25a39bb86e35796cc5a0ca45a31f7",
Expand Down
39 changes: 28 additions & 11 deletions pkgs/network/src/ethlibp2p.zig
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ fn decodeVarint(bytes: []const u8) uvarint.VarintParseError!struct { value: usiz
};
}

fn validateGossipSnappyHeader(message_bytes: []const u8) (uvarint.VarintParseError || error{PayloadTooLarge})!struct { value: usize, length: usize } {
const decoded = try decodeVarint(message_bytes);
if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
return error.PayloadTooLarge;
}
return .{
.value = decoded.value,
.length = decoded.length,
};
}

/// Build a request frame with varint-encoded uncompressed size followed by snappy-framed payload.
fn buildRequestFrame(allocator: Allocator, uncompressed_size: usize, snappy_payload: []const u8) ![]u8 {
if (uncompressed_size > MAX_RPC_MESSAGE_SIZE) {
Expand Down Expand Up @@ -87,11 +98,7 @@ fn parseRequestFrame(bytes: []const u8) FrameDecodeError!struct {
return error.EmptyFrame;
}

const decoded = try decodeVarint(bytes);

if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
return error.PayloadTooLarge;
}
const decoded = try validateGossipSnappyHeader(bytes);

return .{
.declared_len = decoded.value,
Expand All @@ -111,11 +118,7 @@ fn parseResponseFrame(bytes: []const u8) FrameDecodeError!struct {
return error.Incomplete;
}

const decoded = try decodeVarint(bytes[1..]);

if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
return error.PayloadTooLarge;
}
const decoded = try validateGossipSnappyHeader(bytes[1..]);

return .{
.code = bytes[0],
Expand Down Expand Up @@ -305,7 +308,6 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
};

const message_bytes: []const u8 = message_ptr[0..message_len];

const uncompressed_message = snappyz.decode(zigHandler.allocator, message_bytes) catch |e| {
zigHandler.logger.err("Error in snappyz decoding the message for topic={s}: {any}", .{ std.mem.span(topic_str), e });
if (writeFailedBytes(message_bytes, "snappyz_decode", zigHandler.allocator, null, zigHandler.logger)) |filename| {
Expand All @@ -316,6 +318,15 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
return;
};
defer zigHandler.allocator.free(uncompressed_message);

if (uncompressed_message.len > MAX_RPC_MESSAGE_SIZE) {
zigHandler.logger.err(
"Gossip message decompressed size {d} exceeds limit {d} for topic={s}",
.{ uncompressed_message.len, MAX_RPC_MESSAGE_SIZE, std.mem.span(topic_str) },
);
return;
}

var message: interface.GossipMessage = switch (topic.gossip_topic.kind) {
.block => .{ .block = deserializeGossipMessage(
types.SignedBlockWithAttestation,
Expand Down Expand Up @@ -1283,3 +1294,9 @@ pub const EthLibp2p = struct {
return result;
}
};

test "validateGossipSnappyHeader rejects oversized declared size" {
var scratch: [MAX_VARINT_BYTES]u8 = undefined;
const encoded = uvarint.encode(usize, MAX_RPC_MESSAGE_SIZE + 1, &scratch);
try std.testing.expectError(error.PayloadTooLarge, validateGossipSnappyHeader(encoded));
}
155 changes: 154 additions & 1 deletion pkgs/network/src/interface.zig
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,23 @@ pub const ReqRespRequest = union(LeanSupportedProtocol) {
}
}

fn validateBytes(method: LeanSupportedProtocol, bytes: []const u8) !void {
switch (method) {
.blocks_by_root => {
// BlockByRootRequest is a struct with a single variable-size field (roots: List[Root, N]).
// SSZ struct encoding: 4 bytes offset (must be 4) + N * 32 bytes of roots.
if (bytes.len < 4) return error.InvalidEncoding;
const offset = std.mem.readInt(u32, bytes[0..4], .little);
if (offset != 4) return error.InvalidEncoding;
const list_data_len = bytes.len - 4;
if (list_data_len % 32 != 0) return error.InvalidEncoding;
},
.status => {},
}
}

pub fn deserialize(allocator: Allocator, method: LeanSupportedProtocol, bytes: []const u8) !Self {
try validateBytes(method, bytes);
return switch (method) {
inline else => |tag| {
const PayloadType = std.meta.TagPayload(Self, tag);
Expand Down Expand Up @@ -500,12 +516,64 @@ pub const ReqRespResponse = union(LeanSupportedProtocol) {
return serialized.toOwnedSlice(allocator);
}

fn initPayload(comptime tag: LeanSupportedProtocol, allocator: Allocator) !std.meta.TagPayload(Self, tag) {
const PayloadType = std.meta.TagPayload(Self, tag);
return switch (tag) {
.blocks_by_root => block_payload: {
var block: types.BeamBlock = undefined;
try block.setToDefault(allocator);
errdefer block.deinit();

var signatures = try types.createBlockSignatures(allocator, 0);
errdefer signatures.deinit();

break :block_payload PayloadType{
.message = .{
.block = block,
.proposer_attestation = undefined,
},
.signature = signatures,
};
},
inline else => @as(PayloadType, undefined),
};
}

fn deinitPayload(comptime tag: LeanSupportedProtocol, payload: *std.meta.TagPayload(Self, tag)) void {
switch (tag) {
.blocks_by_root => payload.deinit(),
inline else => {},
}
}

fn validateBytes(method: LeanSupportedProtocol, bytes: []const u8) !void {
switch (method) {
.blocks_by_root => {
// SignedBlockWithAttestation is variable-size with 2 variable fields (message, signature).
// Validate minimum size and top-level offsets before deserialization.
if (bytes.len < 8) return error.InvalidEncoding;

const message_offset: usize = @intCast(std.mem.readInt(u32, bytes[0..4], .little));
const signature_offset: usize = @intCast(std.mem.readInt(u32, bytes[4..8], .little));

if (message_offset != 8) return error.InvalidEncoding;
if (signature_offset < message_offset) return error.InvalidEncoding;
if (signature_offset > bytes.len) return error.InvalidEncoding;
},
.status => {},
}
}

pub fn deserialize(allocator: Allocator, method: LeanSupportedProtocol, bytes: []const u8) !ReqRespResponse {
try validateBytes(method, bytes);
return switch (method) {
inline else => |tag| {
const PayloadType = std.meta.TagPayload(Self, tag);
var payload: PayloadType = undefined;
var payload = try initPayload(tag, allocator);
var succeeded = false;
defer if (!succeeded) deinitPayload(tag, &payload);
try ssz.deserialize(PayloadType, bytes, &payload, allocator);
succeeded = true;
return @unionInit(Self, @tagName(tag), payload);
},
};
Expand Down Expand Up @@ -993,3 +1061,88 @@ test LeanNetworkTopic {
try std.testing.expectEqual(topic.encoding, decoded_topic.encoding);
try std.testing.expect(std.mem.eql(u8, topic.network, decoded_topic.network));
}

test "blocks_by_root roundtrip serialize/deserialize" {
const allocator = std.testing.allocator;
var roots = try ssz.utils.List(types.Root, consensus_params.MAX_REQUEST_BLOCKS).init(allocator);
try roots.append([_]u8{0x01} ** 32);
try roots.append([_]u8{0x02} ** 32);
var original = ReqRespRequest{ .blocks_by_root = .{ .roots = roots } };
defer original.deinit();

const serialized = try original.serialize(allocator);
defer allocator.free(serialized);

var deserialized = try ReqRespRequest.deserialize(allocator, .blocks_by_root, serialized);
defer deserialized.deinit();

try std.testing.expectEqual(@as(usize, 2), deserialized.blocks_by_root.roots.len());
const root0 = try deserialized.blocks_by_root.roots.get(0);
const root1 = try deserialized.blocks_by_root.roots.get(1);
try std.testing.expect(std.mem.eql(u8, &root0, &([_]u8{0x01} ** 32)));
try std.testing.expect(std.mem.eql(u8, &root1, &([_]u8{0x02} ** 32)));
}

// ReqRespResponse validation tests

test "response status roundtrip serialize/deserialize" {
const allocator = std.testing.allocator;
const original = ReqRespResponse{ .status = .{
.finalized_root = [_]u8{0x01} ** 32,
.finalized_slot = 42,
.head_root = [_]u8{0x02} ** 32,
.head_slot = 100,
} };

const serialized = try original.serialize(allocator);
defer allocator.free(serialized);

try std.testing.expectEqual(@as(usize, 80), serialized.len);

var deserialized = try ReqRespResponse.deserialize(allocator, .status, serialized);
defer deserialized.deinit();

try std.testing.expectEqual(@as(u64, 42), deserialized.status.finalized_slot);
try std.testing.expectEqual(@as(u64, 100), deserialized.status.head_slot);
}

test "response blocks_by_root roundtrip serialize/deserialize" {
const allocator = std.testing.allocator;

var attestations = try types.AggregatedAttestations.init(allocator);
const signatures = try types.createBlockSignatures(allocator, attestations.len());

var original = ReqRespResponse{ .blocks_by_root = .{
.message = .{
.block = .{
.slot = 7,
.proposer_index = 3,
.parent_root = [_]u8{0x11} ** 32,
.state_root = [_]u8{0x22} ** 32,
.body = .{ .attestations = attestations },
},
.proposer_attestation = .{
.validator_id = 3,
.data = .{
.slot = 7,
.head = .{ .root = [_]u8{0x11} ** 32, .slot = 6 },
.target = .{ .root = [_]u8{0x11} ** 32, .slot = 6 },
.source = .{ .root = [_]u8{0x00} ** 32, .slot = 0 },
},
},
},
.signature = signatures,
} };
defer original.deinit();

const serialized = try original.serialize(allocator);
defer allocator.free(serialized);

var decoded = try ReqRespResponse.deserialize(allocator, .blocks_by_root, serialized);
defer decoded.deinit();

try std.testing.expectEqual(@as(types.Slot, 7), decoded.blocks_by_root.message.block.slot);
try std.testing.expectEqual(@as(types.ValidatorIndex, 3), decoded.blocks_by_root.message.proposer_attestation.validator_id);
try std.testing.expect(std.mem.eql(u8, &decoded.blocks_by_root.message.block.parent_root, &([_]u8{0x11} ** 32)));
try std.testing.expectEqual(@as(usize, 0), decoded.blocks_by_root.signature.attestation_signatures.len());
}
1 change: 1 addition & 0 deletions pkgs/utils/src/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub const jsonToString = json_factory.jsonToString;

const ssz_factory = @import("./ssz.zig");
pub const hashTreeRoot = ssz_factory.hashTreeRoot;
pub const fixedSszSize = ssz_factory.fixedSszSize;

const fmt_factory = @import("./fmt.zig");
// Avoid to use `usingnamespace` to make upgrade easier in the future.
Expand Down
5 changes: 5 additions & 0 deletions pkgs/utils/src/ssz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ pub fn hashTreeRoot(
) !void {
try ssz.hashTreeRoot(Hasher, T, value, out, allocator);
}

pub fn fixedSszSize(comptime T: type) usize {
return ssz.serializedFixedSize(T) catch
@compileError(std.fmt.comptimePrint("SSZ type {s} must remain fixed-size", .{@typeName(T)}));
}
Loading