Skip to content

Commit 5a88490

Browse files
committed
fix: validate inbound payloads
1 parent 9d43e86 commit 5a88490

File tree

6 files changed

+329
-17
lines changed

6 files changed

+329
-17
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ jobs:
128128

129129
build-all-provers:
130130
name: build-all-provers
131-
runs-on: ubuntu-latest
131+
runs-on: ${{ matrix.os }}
132132
needs: build # Only run if the build job succeeds
133133
strategy:
134134
matrix:
@@ -182,7 +182,7 @@ jobs:
182182

183183
test:
184184
name: test
185-
runs-on: ubuntu-latest
185+
runs-on: ${{ matrix.os }}
186186
strategy:
187187
matrix:
188188
os: [ubuntu-latest, macos-latest]

pkgs/cli/test/integration.zig

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,9 +523,10 @@ test "SSE events integration test - wait for justification and finalization" {
523523
// Node3 starts after first finalization and syncs via parent block requests (blocks_by_root).
524524
// We verify sync by waiting for finalization to advance beyond the first finalized slot,
525525
// which proves the chain continued progressing after node3 joined.
526-
const timeout_ms: u64 = 240000; // 240 seconds timeout
526+
const timeout_ms: u64 = 240000; // 240 seconds to reach first justification/finalization
527+
const post_finalization_timeout_ms: u64 = 120000; // extra time for node3 sync/finalization advancement
527528
const start_ns = std.time.nanoTimestamp();
528-
const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms;
529+
var deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms;
529530
var got_justification = false;
530531
var got_finalization = false;
531532
var got_node3_sync = false;
@@ -555,8 +556,13 @@ test "SSE events integration test - wait for justification and finalization" {
555556
got_finalization = true;
556557
first_finalized_slot = slot;
557558
head_count_at_finalization = sse_client.getEventCount("new_head");
559+
const sync_deadline_ns = std.time.nanoTimestamp() + post_finalization_timeout_ms * std.time.ns_per_ms;
560+
if (sync_deadline_ns > deadline_ns) {
561+
deadline_ns = sync_deadline_ns;
562+
}
558563
std.debug.print("INFO: First finalization at slot {} — node 3 will start syncing via parent block requests\n", .{slot});
559564
std.debug.print("INFO: Head events at finalization: {}\n", .{head_count_at_finalization});
565+
std.debug.print("INFO: Extended deadline by {}ms for node3 sync verification\n", .{post_finalization_timeout_ms});
560566
} else if (got_finalization and slot > first_finalized_slot and !got_node3_sync) {
561567
// Finalization advanced beyond the first finalized slot.
562568
// This means the chain continued progressing after node3 joined.

pkgs/network/src/ethlibp2p.zig

Lines changed: 134 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ const ServerStreamError = error{
2626
const MAX_RPC_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
2727
const MAX_VARINT_BYTES: usize = uvarint.bufferSize(usize);
2828

29+
// SSZ size constants derived directly from type definitions for payload validation.
30+
const CHECKPOINT_SSZ_SIZE = zeam_utils.fixedSszSize(types.Checkpoint);
31+
const ATTESTATION_DATA_SSZ_SIZE = zeam_utils.fixedSszSize(types.AttestationData);
32+
const SIGNED_ATTESTATION_SSZ_SIZE = zeam_utils.fixedSszSize(types.SignedAttestation);
33+
34+
comptime {
35+
if (ATTESTATION_DATA_SSZ_SIZE != 8 + 3 * CHECKPOINT_SSZ_SIZE) {
36+
@compileError("AttestationData SSZ layout changed; revisit payload size assumptions");
37+
}
38+
if (SIGNED_ATTESTATION_SSZ_SIZE != 8 + ATTESTATION_DATA_SSZ_SIZE + types.SIGSIZE) {
39+
@compileError("SignedAttestation SSZ layout changed; revisit payload size assumptions");
40+
}
41+
}
42+
43+
// SignedBlockWithAttestation is variable-size with 2 variable fields (message, signature).
44+
// SSZ struct encoding: at least 2 offsets (4 bytes each) = 8 bytes minimum.
45+
const MIN_SIGNED_BLOCK_WITH_ATTESTATION_SSZ_SIZE = 8;
46+
2947
const FrameDecodeError = error{
3048
EmptyFrame,
3149
PayloadTooLarge,
@@ -48,6 +66,53 @@ fn decodeVarint(bytes: []const u8) uvarint.VarintParseError!struct { value: usiz
4866
};
4967
}
5068

69+
fn validateGossipSnappyHeader(message_bytes: []const u8) (uvarint.VarintParseError || error{PayloadTooLarge})!struct { value: usize, length: usize } {
70+
const decoded = try decodeVarint(message_bytes);
71+
if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
72+
return error.PayloadTooLarge;
73+
}
74+
return .{
75+
.value = decoded.value,
76+
.length = decoded.length,
77+
};
78+
}
79+
80+
fn validateSignedBlockWithAttestation(bytes: []const u8) !void {
81+
if (bytes.len < MIN_SIGNED_BLOCK_WITH_ATTESTATION_SSZ_SIZE) {
82+
return error.InvalidEncoding;
83+
}
84+
85+
const message_offset: usize = @intCast(std.mem.readInt(u32, bytes[0..4], .little));
86+
const signature_offset: usize = @intCast(std.mem.readInt(u32, bytes[4..8], .little));
87+
88+
if (message_offset != MIN_SIGNED_BLOCK_WITH_ATTESTATION_SSZ_SIZE) {
89+
return error.InvalidEncoding;
90+
}
91+
if (signature_offset < message_offset) {
92+
return error.InvalidEncoding;
93+
}
94+
if (signature_offset > bytes.len) {
95+
return error.InvalidEncoding;
96+
}
97+
}
98+
99+
fn initSignedBlockWithAttestation(allocator: Allocator) !types.SignedBlockWithAttestation {
100+
var block: types.BeamBlock = undefined;
101+
try block.setToDefault(allocator);
102+
errdefer block.deinit();
103+
104+
var signatures = try types.createBlockSignatures(allocator, 0);
105+
errdefer signatures.deinit();
106+
107+
return .{
108+
.message = .{
109+
.block = block,
110+
.proposer_attestation = undefined,
111+
},
112+
.signature = signatures,
113+
};
114+
}
115+
51116
/// Build a request frame with varint-encoded uncompressed size followed by snappy-framed payload.
52117
fn buildRequestFrame(allocator: Allocator, uncompressed_size: usize, snappy_payload: []const u8) ![]u8 {
53118
if (uncompressed_size > MAX_RPC_MESSAGE_SIZE) {
@@ -87,11 +152,7 @@ fn parseRequestFrame(bytes: []const u8) FrameDecodeError!struct {
87152
return error.EmptyFrame;
88153
}
89154

90-
const decoded = try decodeVarint(bytes);
91-
92-
if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
93-
return error.PayloadTooLarge;
94-
}
155+
const decoded = try validateGossipSnappyHeader(bytes);
95156

96157
return .{
97158
.declared_len = decoded.value,
@@ -111,11 +172,7 @@ fn parseResponseFrame(bytes: []const u8) FrameDecodeError!struct {
111172
return error.Incomplete;
112173
}
113174

114-
const decoded = try decodeVarint(bytes[1..]);
115-
116-
if (decoded.value > MAX_RPC_MESSAGE_SIZE) {
117-
return error.PayloadTooLarge;
118-
}
175+
const decoded = try validateGossipSnappyHeader(bytes[1..]);
119176

120177
return .{
121178
.code = bytes[0],
@@ -283,7 +340,6 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
283340
};
284341

285342
const message_bytes: []const u8 = message_ptr[0..message_len];
286-
287343
const uncompressed_message = snappyz.decode(zigHandler.allocator, message_bytes) catch |e| {
288344
zigHandler.logger.err("Error in snappyz decoding the message for topic={s}: {any}", .{ std.mem.span(topic_str), e });
289345
if (writeFailedBytes(message_bytes, "snappyz_decode", zigHandler.allocator, null, zigHandler.logger)) |filename| {
@@ -294,9 +350,32 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
294350
return;
295351
};
296352
defer zigHandler.allocator.free(uncompressed_message);
353+
354+
if (uncompressed_message.len > MAX_RPC_MESSAGE_SIZE) {
355+
zigHandler.logger.err(
356+
"Gossip message decompressed size {d} exceeds limit {d} for topic={s}",
357+
.{ uncompressed_message.len, MAX_RPC_MESSAGE_SIZE, std.mem.span(topic_str) },
358+
);
359+
return;
360+
}
361+
297362
const message: interface.GossipMessage = switch (topic.gossip_topic) {
298363
.block => blockmessage: {
299-
var message_data: types.SignedBlockWithAttestation = undefined;
364+
validateSignedBlockWithAttestation(uncompressed_message) catch {
365+
const message_offset = if (uncompressed_message.len >= 4) std.mem.readInt(u32, uncompressed_message[0..4], .little) else 0;
366+
const signature_offset = if (uncompressed_message.len >= 8) std.mem.readInt(u32, uncompressed_message[4..8], .little) else 0;
367+
zigHandler.logger.err(
368+
"Invalid gossip block top-level SSZ offsets: len={d} message_offset={d} signature_offset={d}",
369+
.{ uncompressed_message.len, message_offset, signature_offset },
370+
);
371+
return;
372+
};
373+
var message_data = initSignedBlockWithAttestation(zigHandler.allocator) catch |e| {
374+
zigHandler.logger.err("Error initializing signed block payload before deserialization: {any}", .{e});
375+
return;
376+
};
377+
var decode_succeeded = false;
378+
defer if (!decode_succeeded) message_data.deinit();
300379
ssz.deserialize(types.SignedBlockWithAttestation, uncompressed_message, &message_data, zigHandler.allocator) catch |e| {
301380
zigHandler.logger.err("Error in deserializing the signed block message: {any}", .{e});
302381
if (writeFailedBytes(uncompressed_message, "block", zigHandler.allocator, null, zigHandler.logger)) |filename| {
@@ -306,10 +385,18 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
306385
}
307386
return;
308387
};
388+
decode_succeeded = true;
309389

310390
break :blockmessage .{ .block = message_data };
311391
},
312392
.attestation => attestationmessage: {
393+
if (uncompressed_message.len != SIGNED_ATTESTATION_SSZ_SIZE) {
394+
zigHandler.logger.err(
395+
"Gossip attestation message size mismatch: got {d} bytes, expected {d}",
396+
.{ uncompressed_message.len, SIGNED_ATTESTATION_SSZ_SIZE },
397+
);
398+
return;
399+
}
313400
var message_data: types.SignedAttestation = undefined;
314401
ssz.deserialize(types.SignedAttestation, uncompressed_message, &message_data, zigHandler.allocator) catch |e| {
315402
zigHandler.logger.err("Error in deserializing the signed attestation message: {any}", .{e});
@@ -1222,3 +1309,38 @@ pub const EthLibp2p = struct {
12221309
return result;
12231310
}
12241311
};
1312+
1313+
test "SIGNED_ATTESTATION_SSZ_SIZE matches actual serialized size" {
1314+
const attestation = types.SignedAttestation{
1315+
.validator_id = 0,
1316+
.message = .{
1317+
.slot = 0,
1318+
.head = .{ .root = [_]u8{0} ** 32, .slot = 0 },
1319+
.target = .{ .root = [_]u8{0} ** 32, .slot = 0 },
1320+
.source = .{ .root = [_]u8{0} ** 32, .slot = 0 },
1321+
},
1322+
.signature = [_]u8{0} ** types.SIGSIZE,
1323+
};
1324+
var serialized: std.ArrayList(u8) = .empty;
1325+
defer serialized.deinit(std.testing.allocator);
1326+
try ssz.serialize(types.SignedAttestation, attestation, &serialized, std.testing.allocator);
1327+
try std.testing.expectEqual(SIGNED_ATTESTATION_SSZ_SIZE, serialized.items.len);
1328+
}
1329+
1330+
test "validateGossipSnappyHeader rejects oversized declared size" {
1331+
var scratch: [MAX_VARINT_BYTES]u8 = undefined;
1332+
const encoded = uvarint.encode(usize, MAX_RPC_MESSAGE_SIZE + 1, &scratch);
1333+
try std.testing.expectError(error.PayloadTooLarge, validateGossipSnappyHeader(encoded));
1334+
}
1335+
1336+
test "validateSignedBlockWithAttestationTopLevelOffsets rejects invalid offsets" {
1337+
var bad_first: [8]u8 = undefined;
1338+
std.mem.writeInt(u32, bad_first[0..4], 4, .little);
1339+
std.mem.writeInt(u32, bad_first[4..8], 8, .little);
1340+
try std.testing.expectError(error.InvalidEncoding, validateSignedBlockWithAttestation(&bad_first));
1341+
1342+
var bad_second: [8]u8 = undefined;
1343+
std.mem.writeInt(u32, bad_second[0..4], 8, .little);
1344+
std.mem.writeInt(u32, bad_second[4..8], 4, .little);
1345+
try std.testing.expectError(error.InvalidEncoding, validateSignedBlockWithAttestation(&bad_second));
1346+
}

0 commit comments

Comments
 (0)