diff --git a/.gitmodules b/.gitmodules index fbc272db1..ffe103f70 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,7 @@ [submodule "lean-quickstart"] path = lean-quickstart url = git@github.com:blockblaz/lean-quickstart.git + [submodule "leanSpec"] path = leanSpec url = https://github.com/leanEthereum/leanSpec diff --git a/build.zig b/build.zig index 1e18d00b5..d2b3dab9e 100644 --- a/build.zig +++ b/build.zig @@ -63,6 +63,7 @@ pub fn build(b: *Builder) !void { const prover = std.meta.stringToEnum(ProverChoice, prover_option) orelse .dummy; const build_rust_lib_steps = build_rust_project(b, "rust", prover); + const zkvm_host_cmd = build_rust_project(b, "rust", prover); // LTO option (disabled by default for faster builds) const enable_lto = b.option(bool, "lto", "Enable Link Time Optimization (slower builds, smaller binaries)") orelse false; @@ -285,6 +286,31 @@ pub fn build(b: *Builder) !void { zeam_beam_node.addImport("@zeam/api", zeam_api); zeam_beam_node.addImport("@zeam/key-manager", zeam_key_manager); + // add zeam-cli library module (for testing and external use) + const zeam_cli = b.addModule("@zeam/cli", .{ + .target = target, + .optimize = optimize, + .root_source_file = b.path("pkgs/cli/src/lib.zig"), + }); + zeam_cli.addImport("ssz", ssz); + zeam_cli.addImport("build_options", build_options_module); + zeam_cli.addImport("simargs", simargs); + zeam_cli.addImport("xev", xev); + zeam_cli.addImport("@zeam/database", zeam_database); + zeam_cli.addImport("@zeam/utils", zeam_utils); + zeam_cli.addImport("@zeam/params", zeam_params); + zeam_cli.addImport("@zeam/types", zeam_types); + zeam_cli.addImport("@zeam/configs", zeam_configs); + zeam_cli.addImport("@zeam/state-transition", zeam_state_transition); + zeam_cli.addImport("@zeam/state-proving-manager", zeam_state_proving_manager); + zeam_cli.addImport("@zeam/network", zeam_network); + zeam_cli.addImport("@zeam/node", zeam_beam_node); + zeam_cli.addImport("@zeam/api", zeam_api); + zeam_cli.addImport("metrics", metrics); + zeam_cli.addImport("multiformats", multiformats); + zeam_cli.addImport("enr", enr); + zeam_cli.addImport("yaml", yaml); + const zeam_spectests = b.addModule("zeam_spectests", .{ .target = target, .optimize = optimize, @@ -392,13 +418,22 @@ pub fn build(b: *Builder) !void { const integration_build_options_module = integration_build_options.createModule(); cli_integration_tests.root_module.addImport("build_options", integration_build_options_module); - // Add CLI constants module to integration tests - const cli_constants = b.addModule("cli_constants", .{ - .root_source_file = b.path("pkgs/cli/src/constants.zig"), - .target = target, - .optimize = optimize, - }); - cli_integration_tests.root_module.addImport("cli_constants", cli_constants); + // Add all dependencies needed by integration tests + cli_integration_tests.root_module.addImport("@zeam/cli", zeam_cli); + cli_integration_tests.root_module.addImport("@zeam/node", zeam_beam_node); + cli_integration_tests.root_module.addImport("@zeam/utils", zeam_utils); + cli_integration_tests.root_module.addImport("@zeam/configs", zeam_configs); + cli_integration_tests.root_module.addImport("@zeam/network", zeam_network); + cli_integration_tests.root_module.addImport("enr", enr); + cli_integration_tests.root_module.addImport("@zeam/state-transition", zeam_state_transition); + cli_integration_tests.root_module.addImport("@zeam/api", zeam_api); + cli_integration_tests.root_module.addImport("xev", xev); + cli_integration_tests.root_module.addImport("multiformats", multiformats); + cli_integration_tests.root_module.addImport("yaml", yaml); + cli_integration_tests.root_module.addImport("ssz", ssz); + cli_integration_tests.root_module.addImport("@zeam/types", zeam_types); + cli_integration_tests.root_module.addImport("@zeam/database", zeam_database); + addRustGlueLib(b, cli_integration_tests, target, prover); // Add error handler module to integration tests const error_handler_module = b.addModule("error_handler", .{ @@ -533,10 +568,11 @@ pub fn build(b: *Builder) !void { spectests.root_module.addImport("@zeam/state-transition", zeam_state_transition); spectests.root_module.addImport("ssz", ssz); - manager_tests.step.dependOn(&build_rust_lib_steps.step); - - network_tests.step.dependOn(&build_rust_lib_steps.step); - node_tests.step.dependOn(&build_rust_lib_steps.step); + manager_tests.step.dependOn(&zkvm_host_cmd.step); + cli_tests.step.dependOn(&zkvm_host_cmd.step); + network_tests.step.dependOn(&zkvm_host_cmd.step); + node_tests.step.dependOn(&zkvm_host_cmd.step); + cli_integration_tests.step.dependOn(&zkvm_host_cmd.step); transition_tests.step.dependOn(&build_rust_lib_steps.step); addRustGlueLib(b, transition_tests, target, prover); diff --git a/lean-quickstart b/lean-quickstart index 9f493fe4b..7f40f08a6 160000 --- a/lean-quickstart +++ b/lean-quickstart @@ -1 +1 @@ -Subproject commit 9f493fe4b0b46cac7a32415de0f2f11b5ba057c1 +Subproject commit 7f40f08a670ff90a76ee3e10d8576c222e5e0035 diff --git a/pkgs/cli/src/lib.zig b/pkgs/cli/src/lib.zig new file mode 100644 index 000000000..5b3fa0218 --- /dev/null +++ b/pkgs/cli/src/lib.zig @@ -0,0 +1,20 @@ +// CLI package library - exposes types and functions for testing and external use +// This module provides access to CLI internals without relative imports + +// Re-export types from main.zig +pub const NodeCommand = @import("main.zig").NodeCommand; + +// Re-export types and functions from node.zig +const node_module = @import("node.zig"); +pub const Node = node_module.Node; +pub const NodeOptions = node_module.NodeOptions; +pub const buildStartOptions = node_module.buildStartOptions; + +// Re-export api_server module +pub const api_server = @import("api_server.zig"); + +// Re-export constants module +pub const constants = @import("constants.zig"); + +// Re-export error handler module +pub const error_handler = @import("error_handler.zig"); diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index 16d81791a..a7cabcf90 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -488,17 +488,9 @@ fn mainInner() !void { .database_path = leancmd.@"data-dir", }; + try node.buildStartOptions(allocator, leancmd, &start_options); defer start_options.deinit(allocator); - node.buildStartOptions(allocator, leancmd, &start_options) catch |err| { - ErrorHandler.logErrorWithDetails(err, "build node start options", .{ - .node_id = leancmd.@"node-id", - .validator_config = leancmd.validator_config, - .custom_genesis = leancmd.custom_genesis, - }); - return err; - }; - var lean_node: node.Node = undefined; lean_node.init(allocator, &start_options) catch |err| { ErrorHandler.logErrorWithOperation(err, "initialize lean node"); diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index d7536889c..0b3edc22d 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -72,6 +72,7 @@ pub const NodeOptions = struct { allocator.free(self.bootnodes); allocator.free(self.validator_indices); allocator.free(self.local_priv_key); + allocator.free(self.genesis_spec.validator_pubkeys); } }; @@ -344,7 +345,24 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op if (bootnodes.len == 0) { return error.InvalidNodesConfig; } - const genesis_spec = try configs.genesisConfigFromYAML(parsed_config, node_cmd.override_genesis_time); + + // Parse genesis configuration (time and validator count) + const genesis_config = try configs.genesisConfigFromYAML(parsed_config, node_cmd.override_genesis_time); + + // Generate validator keys using key manager + const key_manager_lib = @import("@zeam/key-manager"); + var key_manager = try key_manager_lib.getTestKeyManager(allocator, genesis_config.validator_count, 10000); + defer key_manager.deinit(); + + // Extract all validator public keys + const validator_pubkeys = try key_manager.getAllPubkeys(allocator, genesis_config.validator_count); + errdefer allocator.free(validator_pubkeys); + + // Create the full GenesisSpec + const genesis_spec = types.GenesisSpec{ + .genesis_time = genesis_config.genesis_time, + .validator_pubkeys = validator_pubkeys, + }; const validator_indices = try validatorIndicesFromYAML(allocator, opts.node_key, parsed_validators); errdefer allocator.free(validator_indices); @@ -352,12 +370,16 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op return error.InvalidValidatorConfig; } const local_priv_key = try getPrivateKeyFromValidatorConfig(allocator, opts.node_key, parsed_validator_config); + errdefer allocator.free(local_priv_key); + + const node_key_index = try nodeKeyIndexFromYaml(opts.node_key, parsed_validator_config); + // All operations succeeded, now transfer ownership (no try statements after this point) opts.bootnodes = bootnodes; opts.validator_indices = validator_indices; opts.local_priv_key = local_priv_key; opts.genesis_spec = genesis_spec; - opts.node_key_index = try nodeKeyIndexFromYaml(opts.node_key, parsed_validator_config); + opts.node_key_index = node_key_index; } /// Parses the nodes from a YAML configuration. diff --git a/pkgs/cli/test/README.md b/pkgs/cli/test/README.md new file mode 100644 index 000000000..bb4ec04c1 --- /dev/null +++ b/pkgs/cli/test/README.md @@ -0,0 +1,175 @@ +# Zeam Integration Tests + +This directory contains integration tests for the Zeam lean consensus client. + +## Tests + +### 1. `beam_integration_test.zig` +Tests the beam command with mock network and SSE event streaming. + +### 2. `genesis_to_finalization_test.zig` +Two-node test that spawns zeam nodes directly and monitors them to finalization. + +### 3. `lean_quickstart_integration_test.zig` +**Full lean-quickstart integration test (Option B)**. Uses the lean-quickstart scripts for genesis generation and node spawning. + +--- + +## lean-quickstart Integration Test + +### Overview + +The `lean_quickstart_integration_test.zig` implements a full end-to-end test using the [lean-quickstart](https://github.com/blockblaz/lean-quickstart) tooling. This test: + +1. Uses lean-quickstart's `generate-genesis.sh` (PK's eth-beacon-genesis Docker tool) +2. Uses lean-quickstart's `spin-node.sh` to spawn zeam nodes +3. Monitors nodes for finalization via SSE events +4. Cleans up using SIGTERM signals (mimics Ctrl+C) + +### Required Changes to lean-quickstart + +For the test to work properly, the following changes are needed in the `lean-quickstart` submodule: + +#### **File: `client-cmds/zeam-cmd.sh`** + +The zeam command needs these additions: + +```bash +# Extract genesis time from config.yaml to ensure both nodes use same genesis time +genesisTime=$(yq eval '.GENESIS_TIME' "$configDir/config.yaml") + +node_binary="$scriptDir/../zig-out/bin/zeam node \ + --custom_genesis $configDir \ + --validator_config $validatorConfig \ + --override_genesis_time $genesisTime \ # ← ADD THIS + --network-dir $dataDir/$item/network \ # ← ADD THIS + --data-dir $dataDir/$item \ + --node-id $item \ + --node-key $configDir/$item.key \ + --metrics_enable \ # ← ADD THIS + --metrics_port $metricsPort" # ← FIX: Use underscore not hyphen +``` + +**Required changes:** +1. **Add `--override_genesis_time`**: Extract from config.yaml and pass to zeam +2. **Add `--network-dir`**: Each node needs isolated network directory +3. **Add `--metrics_enable`**: Required flag to enable metrics endpoint +4. **Fix `--metrics_port`**: Must use underscore `_` not hyphen `-` + +#### **File: `client-cmds/zeam-cmd.sh` - Set Binary Mode** + +```bash +# Use binary mode by default since lean-quickstart is a submodule in zeam repo +node_setup="binary" # ← Change from "docker" to "binary" +``` + +#### **File: `spin-node.sh` - Optional Improvements** + +These are optional but recommended: + +1. **Make sudo optional** (line 102-110): +```bash +# Only use sudo if explicitly requested +if [ -n "$useSudo" ]; then + cmd="sudo rm -rf $itemDataDir/*" +else + cmd="rm -rf $itemDataDir/*" +fi +``` + +2. **Use $scriptDir for relative paths** (line 113, 120): +```bash +source "$scriptDir/parse-vc.sh" # ← Add $scriptDir/ +sourceCmd="source $scriptDir/client-cmds/$client-cmd.sh" # ← Add $scriptDir/ +``` + +### Requirements + +1. **Docker**: For PK's `eth-beacon-genesis` tool (genesis generation) +2. **yq**: YAML processor for parsing configuration +3. **zeam binary**: Must be built (`zig build`) before running test + - Binary should be at `zig-out/bin/zeam` +4. **bash**: The lean-quickstart scripts use bash + +### Platform Compatibility + +#### **Linux (GitPod, CI)** +✅ **Fully working** - All tests pass including finalization + +#### **macOS** +⚠️ **Partial support** - Tests run but validator activation issue: +- Genesis generation: ✅ Works +- Node spawning: ✅ Works +- Node connectivity: ✅ Works +- Validator activation: ❌ **Only validator 0 activates, validator 1 doesn't** +- Finalization: ❌ Fails (due to only 50% stake active) + +**macOS Issue:** +This appears to be a platform-specific bug in zeam's validator initialization or libp2p networking layer. Both `genesis_to_finalization_test.zig` and `lean_quickstart_integration_test.zig` exhibit the same behavior on macOS. + +**Known symptoms on macOS:** +- Only validator index 0 produces blocks and votes +- Validator index 1 never appears in logs +- No justification beyond genesis +- No finalization occurs +- "Address already in use" panics may occur + +**Workaround:** Run tests on Linux for full functionality. + +### Running the Test + +```bash +# Build zeam first +zig build + +# Run all integration tests +zig build simtest + +# On Linux, expect: All tests pass including finalization +# On macOS, expect: Tests run but timeout (validator issue) +``` + +### Test Configuration + +The test creates a network with: +- 2 validators (1 per node) +- Metrics ports: 9669 (zeam_0), 9670 (zeam_1) +- QUIC ports: 9100 (zeam_0), 9101 (zeam_1) +- 600 second timeout for finalization +- Test directory: `test_lean_quickstart_network/` + +### Architecture + +``` +Test Process + ├─ Generate genesis via generate-genesis.sh (Docker) + ├─ Spawn zeam_0 via spin-node.sh + │ └─ spin-node.sh spawns zeam binary in background + │ └─ zeam node runs with validator 0 + ├─ Spawn zeam_1 via spin-node.sh + │ └─ spin-node.sh spawns zeam binary in background + │ └─ zeam node runs with validator 1 + ├─ Monitor finalization via SSE events + └─ Cleanup via SIGTERM (triggers script trap) +``` + +### Key Implementation Details + +1. **Working Directory**: Process.Child.cwd set to `lean-quickstart/` so relative paths work +2. **Environment**: `NETWORK_DIR` passed as relative path (`../test_network`) +3. **Signal Handling**: Uses SIGTERM (not SIGKILL) to trigger lean-quickstart's cleanup trap +4. **Process Management**: Custom `NodeProcess` struct manages child process lifecycle + +### Known Issues + +1. **macOS validator activation**: Only validator 0 active (platform-specific zeam bug) +2. **bash compatibility**: `wait -n` on line 173 of spin-node.sh fails on macOS bash 3.2 (but non-fatal) +3. **SSE connection crashes**: Race condition in api_server.zig when connection closes + +### Future Work + +- [ ] Debug macOS validator activation issue +- [ ] Fix SSE connection close race condition +- [ ] Consider adding conditional test execution based on platform +- [ ] Add metrics to track validator participation percentage + diff --git a/pkgs/cli/test/beam_integration_test.zig b/pkgs/cli/test/beam_integration_test.zig new file mode 100644 index 000000000..e6e1b8ace --- /dev/null +++ b/pkgs/cli/test/beam_integration_test.zig @@ -0,0 +1,552 @@ +const std = @import("std"); +const process = std.process; +const net = std.net; +const build_options = @import("build_options"); +const cli = @import("@zeam/cli"); +const constants = cli.constants; + +/// Verify that the Zeam executable exists and return its path +/// Includes detailed debugging output if the executable is not found +fn getZeamExecutable() ![]const u8 { + const exe_file = std.fs.openFileAbsolute(build_options.cli_exe_path, .{}) catch |err| { + std.debug.print("ERROR: Cannot find executable at {s}: {}\n", .{ build_options.cli_exe_path, err }); + + // Try to list the directory to see what's actually there + std.debug.print("INFO: Attempting to list {s} directory...\n", .{build_options.cli_exe_path}); + const dir_path = std.fs.path.dirname(build_options.cli_exe_path); + if (dir_path) |path| { + var dir = std.fs.openDirAbsolute(path, .{ .iterate = true }) catch |dir_err| { + std.debug.print("ERROR: Cannot open directory {s}: {}\n", .{ path, dir_err }); + return err; + }; + defer dir.close(); + + var iterator = dir.iterate(); + std.debug.print("INFO: Contents of {s}:\n", .{path}); + while (try iterator.next()) |entry| { + std.debug.print(" - {s} (type: {})\n", .{ entry.name, entry.kind }); + } + } + + return err; + }; + exe_file.close(); + std.debug.print("INFO: Found executable at {s}\n", .{build_options.cli_exe_path}); + return build_options.cli_exe_path; +} + +/// Helper function to start a beam simulation node and wait for it to be ready +/// Handles the complete process lifecycle: creation, spawning, and waiting for readiness +/// Returns the process handle for cleanup, or error if startup fails +fn spinBeamSimNode(allocator: std.mem.Allocator, exe_path: []const u8) !*process.Child { + // Set up process with beam command and mock network + const args = [_][]const u8{ exe_path, "beam", "--mockNetwork", "true" }; + const cli_process = try allocator.create(process.Child); + cli_process.* = process.Child.init(&args, allocator); + + // Capture stdout and stderr for debugging + // However this leads to test being cut short probably because of child process getting killed + // so commenting the pipe and letting the output to flow to console + // TODO: figureout and fix the behavior and uncomment the following + // + // cli_process.stdout_behavior = .Pipe; + // cli_process.stderr_behavior = .Pipe; + + // Start the process + cli_process.spawn() catch |err| { + std.debug.print("ERROR: Failed to spawn process: {}\n", .{err}); + allocator.destroy(cli_process); + return err; + }; + + std.debug.print("INFO: Process spawned successfully with PID\n", .{}); + + // Wait for server to be ready + const start_time = std.time.milliTimestamp(); + var server_ready = false; + var retry_count: u32 = 0; + + while (std.time.milliTimestamp() - start_time < constants.DEFAULT_SERVER_STARTUP_TIMEOUT_MS) { + retry_count += 1; + + // Print progress every 10 retries + if (retry_count % 10 == 0) { + const elapsed = @divTrunc(std.time.milliTimestamp() - start_time, 1000); + std.debug.print("INFO: Still waiting for server... ({} seconds, {} retries)\n", .{ elapsed, retry_count }); + } + + // Try to connect to the metrics server + const address = net.Address.parseIp4(constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT) catch { + std.time.sleep(constants.DEFAULT_RETRY_INTERVAL_MS * std.time.ns_per_ms); + continue; + }; + + var connection = net.tcpConnectToAddress(address) catch |err| { + // Only print error details on certain intervals to avoid spam + if (retry_count % 20 == 0) { + std.debug.print("DEBUG: Connection attempt {} failed: {}\n", .{ retry_count, err }); + } + std.time.sleep(constants.DEFAULT_RETRY_INTERVAL_MS * std.time.ns_per_ms); + continue; + }; + + // Test if we can actually send/receive data + connection.close(); + server_ready = true; + std.debug.print("SUCCESS: Server ready after {} seconds ({} retries)\n", .{ @divTrunc(std.time.milliTimestamp() - start_time, 1000), retry_count }); + break; + } + + // If server didn't start, try to get process output for debugging + if (!server_ready) { + std.debug.print("ERROR: Metrics server not ready after {} seconds ({} retries)\n", .{ @divTrunc(constants.DEFAULT_SERVER_STARTUP_TIMEOUT_MS, 1000), retry_count }); + + // Try to read any output from the process + if (cli_process.stdout) |stdout| { + var stdout_buffer: [4096]u8 = undefined; + const stdout_bytes = stdout.readAll(&stdout_buffer) catch 0; + if (stdout_bytes > 0) { + std.debug.print("STDOUT: {s}\n", .{stdout_buffer[0..stdout_bytes]}); + } + } + + if (cli_process.stderr) |stderr| { + var stderr_buffer: [4096]u8 = undefined; + const stderr_bytes = stderr.readAll(&stderr_buffer) catch 0; + if (stderr_bytes > 0) { + std.debug.print("STDERR: {s}\n", .{stderr_buffer[0..stderr_bytes]}); + } + } + + // Check if process is still running + if (cli_process.wait() catch null) |term| { + switch (term) { + .Exited => |code| std.debug.print("ERROR: Process exited with code {}\n", .{code}), + .Signal => |sig| std.debug.print("ERROR: Process killed by signal {}\n", .{sig}), + .Stopped => |sig| std.debug.print("ERROR: Process stopped by signal {}\n", .{sig}), + .Unknown => |code| std.debug.print("ERROR: Process terminated with unknown code {}\n", .{code}), + } + } else { + std.debug.print("INFO: Process is still running\n", .{}); + } + + // Server not ready, cleanup and return error + allocator.destroy(cli_process); + return error.ServerStartupTimeout; + } + + return cli_process; +} + +/// Wait for node to start and be ready for activity +/// TODO: Over time, this can be abstracted to listen for some event +/// that the node can output when being active, rather than using a fixed sleep +fn waitForNodeStart() void { + std.time.sleep(2000 * std.time.ns_per_ms); +} + +/// Helper struct for making HTTP requests to Zeam endpoints +const ZeamRequest = struct { + allocator: std.mem.Allocator, + + fn init(allocator: std.mem.Allocator) ZeamRequest { + return ZeamRequest{ .allocator = allocator }; + } + + /// Make a request to the /metrics endpoint and return the response + fn getMetrics(self: ZeamRequest) ![]u8 { + return self.makeRequest("/metrics"); + } + + /// Make a request to the /health endpoint and return the response + fn getHealth(self: ZeamRequest) ![]u8 { + return self.makeRequest("/health"); + } + + /// Internal helper to make HTTP requests to any endpoint + fn makeRequest(self: ZeamRequest, endpoint: []const u8) ![]u8 { + // Create connection to the server + const address = try net.Address.parseIp4(constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT); + var connection = try net.tcpConnectToAddress(address); + defer connection.close(); + + // Create HTTP request + var request_buffer: [4096]u8 = undefined; + const request = try std.fmt.bufPrint(&request_buffer, "GET {s} HTTP/1.1\r\n" ++ + "Host: {s}:{d}\r\n" ++ + "Connection: close\r\n" ++ + "\r\n", .{ endpoint, constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT }); + + try connection.writeAll(request); + + // Read response + var response_buffer: [8192]u8 = undefined; + const bytes_read = try connection.readAll(&response_buffer); + + // Allocate and return a copy of the response + const response = try self.allocator.dupe(u8, response_buffer[0..bytes_read]); + return response; + } + + /// Free a response returned by getMetrics() or getHealth() + fn freeResponse(self: ZeamRequest, response: []u8) void { + self.allocator.free(response); + } +}; + +/// Parsed SSE Event structure +pub const ChainEvent = struct { + event_type: []const u8, + justified_slot: ?u64, + finalized_slot: ?u64, + + /// Free the memory allocated for this event + pub fn deinit(self: ChainEvent, allocator: std.mem.Allocator) void { + allocator.free(self.event_type); + } +}; + +/// SSE Client for testing event streaming - FIXED VERSION +pub const SSEClient = struct { + allocator: std.mem.Allocator, + connection: std.net.Stream, + received_events: std.ArrayList([]u8), + // NEW: Add proper buffering for handling partial events and multiple events per read + read_buffer: std.ArrayList(u8), + parsed_events_queue: std.ArrayList(ChainEvent), + + pub fn init(allocator: std.mem.Allocator, metrics_port: u16) !SSEClient { + const address = try net.Address.parseIp4(constants.DEFAULT_SERVER_IP, metrics_port); + const connection = try net.tcpConnectToAddress(address); + + return SSEClient{ + .allocator = allocator, + .connection = connection, + .received_events = std.ArrayList([]u8).init(allocator), + .read_buffer = std.ArrayList(u8).init(allocator), + .parsed_events_queue = std.ArrayList(ChainEvent).init(allocator), + }; + } + + pub fn deinit(self: *SSEClient) void { + self.connection.close(); + for (self.received_events.items) |event| { + self.allocator.free(event); + } + self.received_events.deinit(); + self.read_buffer.deinit(); + + // Clean up parsed events queue + for (self.parsed_events_queue.items) |event| { + self.allocator.free(event.event_type); + } + self.parsed_events_queue.deinit(); + } + + pub fn connect(self: *SSEClient) !void { + // Send SSE request + const request = "GET /events HTTP/1.1\r\n" ++ + "Host: 127.0.0.1:9667\r\n" ++ + "Accept: text/event-stream\r\n" ++ + "Cache-Control: no-cache\r\n" ++ + "Connection: keep-alive\r\n" ++ + "\r\n"; + + try self.connection.writeAll(request); + } + + /// NEW: Parse all complete events from the current buffer + fn parseAllEventsFromBuffer(self: *SSEClient) !void { + var buffer_pos: usize = 0; + + while (buffer_pos < self.read_buffer.items.len) { + // Look for complete SSE event (ends with \n\n or \r\n\r\n) + const remaining_buffer = self.read_buffer.items[buffer_pos..]; + + const event_end_lf = std.mem.indexOf(u8, remaining_buffer, "\n\n"); + const event_end_crlf = std.mem.indexOf(u8, remaining_buffer, "\r\n\r\n"); + + var event_end: ?usize = null; + var separator_len: usize = 2; + + if (event_end_lf != null and event_end_crlf != null) { + // Both found, use the earlier one + if (event_end_lf.? < event_end_crlf.?) { + event_end = event_end_lf; + separator_len = 2; + } else { + event_end = event_end_crlf; + separator_len = 4; + } + } else if (event_end_lf != null) { + event_end = event_end_lf; + separator_len = 2; + } else if (event_end_crlf != null) { + event_end = event_end_crlf; + separator_len = 4; + } + + if (event_end == null) { + // No complete event found, break and wait for more data + break; + } + + // Extract the complete event block + const event_block = remaining_buffer[0..event_end.?]; + + // Parse this event and add to queue if valid + if (self.parseEventBlock(event_block)) |parsed_event| { + try self.parsed_events_queue.append(parsed_event); + + // Store raw event for debugging + const raw_event = try self.allocator.dupe(u8, event_block); + try self.received_events.append(raw_event); + } + + // Move past this event + buffer_pos += event_end.? + separator_len; + } + + // Remove processed events from buffer + if (buffer_pos > 0) { + if (buffer_pos < self.read_buffer.items.len) { + std.mem.copyForwards(u8, self.read_buffer.items[0..], self.read_buffer.items[buffer_pos..]); + try self.read_buffer.resize(self.read_buffer.items.len - buffer_pos); + } else { + self.read_buffer.clearAndFree(); + } + } + } + + /// NEW: Parse a single event block and return parsed event + fn parseEventBlock(self: *SSEClient, event_block: []const u8) ?ChainEvent { + // Find event type line + const event_line_start = std.mem.indexOf(u8, event_block, "event:") orelse return null; + const data_line_start = std.mem.indexOf(u8, event_block, "data:") orelse return null; + + // Extract event type + const event_line_slice = blk: { + const nl = std.mem.indexOfScalarPos(u8, event_block, event_line_start, '\n') orelse event_block.len; + const cr = std.mem.indexOfScalarPos(u8, event_block, event_line_start, '\r') orelse nl; + const line_end = @min(nl, cr); + break :blk std.mem.trim(u8, event_block[event_line_start + "event:".len .. line_end], " \t"); + }; + + // Extract data payload + const data_line_slice = blk2: { + const nl = std.mem.indexOfScalarPos(u8, event_block, data_line_start, '\n') orelse event_block.len; + const cr = std.mem.indexOfScalarPos(u8, event_block, data_line_start, '\r') orelse nl; + const line_end = @min(nl, cr); + break :blk2 std.mem.trim(u8, event_block[data_line_start + "data:".len .. line_end], " \t"); + }; + + // Clone event type string so it persists + const event_type_owned = self.allocator.dupe(u8, event_line_slice) catch return null; + + // Parse JSON data + const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data_line_slice, .{ .ignore_unknown_fields = true }) catch return null; + defer parsed.deinit(); + + var justified_slot: ?u64 = null; + var finalized_slot: ?u64 = null; + + if (parsed.value.object.get("justified_slot")) |js| { + switch (js) { + .integer => |ival| justified_slot = @intCast(ival), + else => {}, + } + } + + if (parsed.value.object.get("finalized_slot")) |fs| { + switch (fs) { + .integer => |ival| finalized_slot = @intCast(ival), + else => {}, + } + } + + return ChainEvent{ + .event_type = event_type_owned, + .justified_slot = justified_slot, + .finalized_slot = finalized_slot, + }; + } + + /// FIXED: Main function that reads network data, buffers it, and returns one parsed event + /// This addresses the reviewer's concern by properly handling multiple events and buffering + pub fn readEvent(self: *SSEClient) !?ChainEvent { + // First, check if we have any parsed events in queue + if (self.parsed_events_queue.items.len > 0) { + return self.parsed_events_queue.orderedRemove(0); + } + + // Read new data from network + var temp_buffer: [4096]u8 = undefined; + const bytes_read = self.connection.read(&temp_buffer) catch |err| switch (err) { + error.WouldBlock => { + std.time.sleep(50 * std.time.ns_per_ms); + return null; // No data available + }, + else => return err, + }; + + if (bytes_read == 0) { + std.time.sleep(50 * std.time.ns_per_ms); + return null; // No data available + } + + // Append new data to our persistent buffer + try self.read_buffer.appendSlice(temp_buffer[0..bytes_read]); + + // Parse all complete events from the buffer + try self.parseAllEventsFromBuffer(); + + // Return first parsed event if available + if (self.parsed_events_queue.items.len > 0) { + return self.parsed_events_queue.orderedRemove(0); + } + + return null; // No complete events available yet + } + + fn hasEvent(self: *SSEClient, event_type: []const u8) bool { + for (self.received_events.items) |event_data| { + if (std.mem.indexOf(u8, event_data, event_type) != null) { + return true; + } + } + return false; + } + + fn getEventCount(self: *SSEClient, event_type: []const u8) usize { + var count: usize = 0; + for (self.received_events.items) |event_data| { + if (std.mem.indexOf(u8, event_data, event_type) != null) { + count += 1; + } + } + return count; + } +}; + +/// Clean up a process created by spinBeamSimNode +fn cleanupProcess(allocator: std.mem.Allocator, cli_process: *process.Child) void { + _ = cli_process.kill() catch {}; + _ = cli_process.wait() catch {}; + allocator.destroy(cli_process); +} + +test "CLI beam command with mock network - complete integration test" { + const allocator = std.testing.allocator; + + // Get executable path + const exe_path = try getZeamExecutable(); + + // Start node and wait for readiness + const cli_process = try spinBeamSimNode(allocator, exe_path); + defer cleanupProcess(allocator, cli_process); + + // Wait for node to be fully active + waitForNodeStart(); + + // Test metrics endpoint + var zeam_request = ZeamRequest.init(allocator); + const response = try zeam_request.getMetrics(); + defer zeam_request.freeResponse(response); + + // Verify we got a valid HTTP response + try std.testing.expect(std.mem.indexOf(u8, response, "HTTP/1.1 200") != null or std.mem.indexOf(u8, response, "HTTP/1.0 200") != null); + + // Verify response contains actual metric names from the metrics system + try std.testing.expect(std.mem.indexOf(u8, response, "chain_onblock_duration_seconds") != null or + std.mem.indexOf(u8, response, "block_processing_duration_seconds") != null); + + // Verify response is not empty + try std.testing.expect(response.len > 100); + + std.debug.print("SUCCESS: All integration test checks passed\n", .{}); +} + +test "SSE events integration test - wait for justification and finalization" { + const allocator = std.testing.allocator; + + // Get executable path + const exe_path = try getZeamExecutable(); + + // Start node and wait for readiness + const cli_process = try spinBeamSimNode(allocator, exe_path); + defer cleanupProcess(allocator, cli_process); + + // Wait for node to be fully active + waitForNodeStart(); + + // Create SSE client + var sse_client = try SSEClient.init(allocator, constants.DEFAULT_METRICS_PORT); + defer sse_client.deinit(); + + // Connect to SSE endpoint + try sse_client.connect(); + + std.debug.print("INFO: Connected to SSE endpoint, waiting for events...\n", .{}); + + // Read events until both justification and finalization are seen, or timeout + const timeout_ms: u64 = 180000; // 180 seconds timeout + const start_ns = std.time.nanoTimestamp(); + const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms; + var got_justification = false; + var got_finalization = false; + + // FIXED: This loop now works correctly with the improved readEvent() function + while (std.time.nanoTimestamp() < deadline_ns and !(got_justification and got_finalization)) { + const event = try sse_client.readEvent(); + if (event) |e| { + // Check for justification with slot > 0 + if (!got_justification and std.mem.eql(u8, e.event_type, "new_justification")) { + if (e.justified_slot) |slot| { + if (slot > 0) { + got_justification = true; + std.debug.print("INFO: Found justification with slot {}\n", .{slot}); + } + } + } + + // Check for finalization with slot > 0 + if (!got_finalization and std.mem.eql(u8, e.event_type, "new_finalization")) { + if (e.finalized_slot) |slot| { + std.debug.print("DEBUG: Found finalization event with slot {}\n", .{slot}); + if (slot > 0) { + got_finalization = true; + std.debug.print("INFO: Found finalization with slot {}\n", .{slot}); + } + } else { + std.debug.print("DEBUG: Found finalization event with null slot\n", .{}); + } + } + + // IMPORTANT: Free the event memory after processing + e.deinit(allocator); + } + } + + // Check if we received connection event + try std.testing.expect(sse_client.hasEvent("connection")); + + // Check for chain events + const head_events = sse_client.getEventCount("new_head"); + const justification_events = sse_client.getEventCount("new_justification"); + const finalization_events = sse_client.getEventCount("new_finalization"); + + std.debug.print("INFO: Received events - Head: {}, Justification: {}, Finalization: {}\n", .{ head_events, justification_events, finalization_events }); + + // Require both justification and finalization (> 0) to have been observed + try std.testing.expect(got_justification); + try std.testing.expect(got_finalization); + + // Print some sample events for debugging + for (sse_client.received_events.items, 0..) |event_data, i| { + if (i < 5) { // Print first 5 events + std.debug.print("Event {}: {s}\n", .{ i, event_data }); + } + } + + std.debug.print("SUCCESS: SSE events integration test completed\n", .{}); +} diff --git a/pkgs/cli/test/genesis_to_finalization_test.zig b/pkgs/cli/test/genesis_to_finalization_test.zig new file mode 100644 index 000000000..7c6d3aaaa --- /dev/null +++ b/pkgs/cli/test/genesis_to_finalization_test.zig @@ -0,0 +1,437 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const process = std.process; +const net = std.net; +const build_options = @import("build_options"); +const enr_lib = @import("enr"); +const enr = enr_lib; +const beam_test = @import("beam_integration_test.zig"); +const SSEClient = beam_test.SSEClient; +const ChainEvent = beam_test.ChainEvent; + +/// Generates a test ENR with the specified IP and QUIC port +fn generateTestENR(allocator: Allocator, ip: []const u8, quic_port: u16) ![]const u8 { + const test_secret_key = "b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291"; + + var buffer = std.ArrayList(u8).init(allocator); + defer buffer.deinit(); + + var signable_enr = enr.SignableENR.fromSecretKeyString(test_secret_key) catch { + return error.ENRCreationFailed; + }; + + const ip_addr = std.net.Ip4Address.parse(ip, 0) catch { + return error.InvalidIPAddress; + }; + const ip_addr_bytes = std.mem.asBytes(&ip_addr.sa.addr); + signable_enr.set("ip", ip_addr_bytes) catch { + return error.ENRSetIPFailed; + }; + + var quic_bytes: [2]u8 = undefined; + std.mem.writeInt(u16, &quic_bytes, quic_port, .big); + signable_enr.set("quic", &quic_bytes) catch { + return error.ENRSetQUICFailed; + }; + + try enr.writeSignableENR(buffer.writer(), &signable_enr); + return buffer.toOwnedSlice(); +} + +const TestConfig = struct { + genesis_time: u64, + num_validators: u32, + test_dir: []const u8, + timeout_seconds: u64 = 300, +}; + +const FinalizationResult = struct { + finalized: bool, + finalization_slot: u64, + finalization_root: [32]u8, + timeout_reached: bool = false, +}; + +pub fn generateGenesisDirectory(allocator: Allocator, config: TestConfig) !void { + const cwd = std.fs.cwd(); + + cwd.makeDir(config.test_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const node0_dir = try std.fmt.allocPrint(allocator, "{s}/node0", .{config.test_dir}); + defer allocator.free(node0_dir); + const node1_dir = try std.fmt.allocPrint(allocator, "{s}/node1", .{config.test_dir}); + defer allocator.free(node1_dir); + + cwd.makeDir(node0_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + cwd.makeDir(node1_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const config_yaml = try std.fmt.allocPrint(allocator, + \\GENESIS_TIME: {d} + \\VALIDATOR_COUNT: {d} + , .{ config.genesis_time, config.num_validators }); + defer allocator.free(config_yaml); + + const config_path = try std.fmt.allocPrint(allocator, "{s}/config.yaml", .{config.test_dir}); + defer allocator.free(config_path); + try cwd.writeFile(.{ .sub_path = config_path, .data = config_yaml }); + + const enr_0 = try generateTestENR(allocator, "127.0.0.1", 9100); + defer allocator.free(enr_0); + const enr_1 = try generateTestENR(allocator, "127.0.0.1", 9101); + defer allocator.free(enr_1); + + const nodes_yaml = try std.fmt.allocPrint(allocator, + \\- "{s}" + \\- "{s}" + , .{ enr_0, enr_1 }); + defer allocator.free(nodes_yaml); + + const nodes_path = try std.fmt.allocPrint(allocator, "{s}/nodes.yaml", .{config.test_dir}); + defer allocator.free(nodes_path); + try cwd.writeFile(.{ .sub_path = nodes_path, .data = nodes_yaml }); + + const validators_yaml = + \\zeam_0: + \\ - 0 + \\zeam_1: + \\ - 1 + ; + + const validators_path = try std.fmt.allocPrint(allocator, "{s}/validators.yaml", .{config.test_dir}); + defer allocator.free(validators_path); + try cwd.writeFile(.{ .sub_path = validators_path, .data = validators_yaml }); + + const validator_config_yaml = + \\shuffle: roundrobin + \\validators: + \\ - name: "zeam_0" + \\ privkey: "a000000000000000000000000000000000000000000000000000000000000001" + \\ enrFields: + \\ ip: "127.0.0.1" + \\ quic: 9100 + \\ count: 1 + \\ - name: "zeam_1" + \\ privkey: "b000000000000000000000000000000000000000000000000000000000000002" + \\ enrFields: + \\ ip: "127.0.0.1" + \\ quic: 9101 + \\ count: 1 + ; + + const validator_config_path = try std.fmt.allocPrint(allocator, "{s}/validator-config.yaml", .{config.test_dir}); + defer allocator.free(validator_config_path); + try cwd.writeFile(.{ .sub_path = validator_config_path, .data = validator_config_yaml }); + + const key0_content = "a000000000000000000000000000000000000000000000000000000000000001"; + const key1_content = "b000000000000000000000000000000000000000000000000000000000000002"; + + const key0_path = try std.fmt.allocPrint(allocator, "{s}/key", .{node0_dir}); + defer allocator.free(key0_path); + try cwd.writeFile(.{ .sub_path = key0_path, .data = key0_content }); + + const key1_path = try std.fmt.allocPrint(allocator, "{s}/key", .{node1_dir}); + defer allocator.free(key1_path); + try cwd.writeFile(.{ .sub_path = key1_path, .data = key1_content }); +} + +fn spawnZeamNodeProcess( + allocator: Allocator, + node_id: u32, + config: TestConfig, + metrics_port: u16, +) !*process.Child { + std.debug.print("🔧 Preparing to spawn node {d}...\n", .{node_id}); + + const exe_path = build_options.cli_exe_path; + std.debug.print("📦 Executable path: {s}\n", .{exe_path}); + + const network_dir = try std.fmt.allocPrint(allocator, "{s}/node{d}", .{ config.test_dir, node_id }); + defer allocator.free(network_dir); + + const db_path = try std.fmt.allocPrint(allocator, "{s}/node{d}/data", .{ config.test_dir, node_id }); + defer allocator.free(db_path); + + const node_key = try std.fmt.allocPrint(allocator, "zeam_{d}", .{node_id}); + defer allocator.free(node_key); + + const metrics_port_str = try std.fmt.allocPrint(allocator, "{d}", .{metrics_port}); + defer allocator.free(metrics_port_str); + + const genesis_time_str = try std.fmt.allocPrint(allocator, "{d}", .{config.genesis_time}); + defer allocator.free(genesis_time_str); + + const cwd = std.fs.cwd(); + cwd.makeDir(db_path) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const args = &[_][]const u8{ + exe_path, + "node", + "--custom_genesis", + config.test_dir, + "--node-id", + node_key, + "--validator_config", + "genesis_bootnode", + "--override_genesis_time", + genesis_time_str, + "--metrics_enable", + "--metrics_port", + metrics_port_str, + "--network-dir", + network_dir, + "--data-dir", + db_path, + }; + + std.debug.print("📋 Command for node {d}: {s} {s}", .{ node_id, exe_path, args[1] }); + for (args[2..]) |arg| { + std.debug.print(" {s}", .{arg}); + } + std.debug.print("\n", .{}); + + const cli_process = try allocator.create(process.Child); + cli_process.* = process.Child.init(args, allocator); + + // Capture stdout and stderr to see what's happening + cli_process.stdout_behavior = .Ignore; + cli_process.stderr_behavior = .Inherit; + + std.debug.print("🚀 Spawning node {d} process...\n", .{node_id}); + + cli_process.spawn() catch |err| { + std.debug.print("❌ ERROR: Failed to spawn node {d} process: {}\n", .{ node_id, err }); + allocator.destroy(cli_process); + return err; + }; + + std.debug.print("✅ Spawned node {d} process successfully\n", .{node_id}); + return cli_process; +} + +fn waitForNodeStartup(metrics_port: u16, timeout_seconds: u64, cli_process: *process.Child) !void { + _ = cli_process; // Process handle kept for future use + std.debug.print("⏳ Waiting for node on port {d} to start (timeout: {d}s)...\n", .{ metrics_port, timeout_seconds }); + + const start_time = std.time.milliTimestamp(); + const timeout_ms = timeout_seconds * 1000; + var attempt: usize = 0; + + while (std.time.milliTimestamp() - start_time < timeout_ms) { + attempt += 1; + + if (attempt % 10 == 0) { + const elapsed = @divTrunc(std.time.milliTimestamp() - start_time, 1000); + std.debug.print("⏱️ Still waiting for port {d}... ({d}s elapsed, attempt {d})\n", .{ metrics_port, elapsed, attempt }); + } + + const address = net.Address.parseIp4("127.0.0.1", metrics_port) catch { + std.time.sleep(1000 * std.time.ns_per_ms); + continue; + }; + + var connection = net.tcpConnectToAddress(address) catch { + std.time.sleep(1000 * std.time.ns_per_ms); + continue; + }; + connection.close(); + + std.debug.print("✅ Node on port {d} is ready\n", .{metrics_port}); + return; + } + + std.debug.print("❌ Timeout waiting for node on port {d}\n", .{metrics_port}); + return error.NodeStartupTimeout; +} + +/// FIXED VERSION: Monitor SSE events for finalization with proper null handling +fn monitorForFinalization(allocator: Allocator, metrics_port: u16, timeout_seconds: u64) !FinalizationResult { + std.debug.print("📡 Creating SSE client for port {d}...\n", .{metrics_port}); + + var sse_client = try SSEClient.init(allocator, metrics_port); + defer sse_client.deinit(); + + try sse_client.connect(); + std.debug.print("✅ Connected to SSE endpoint, waiting for finalization events...\n", .{}); + + const deadline_ns = std.time.nanoTimestamp() + (@as(i64, @intCast(timeout_seconds)) * std.time.ns_per_s); + var event_count: usize = 0; + var null_count: usize = 0; + var last_progress_time = std.time.nanoTimestamp(); + + while (std.time.nanoTimestamp() < deadline_ns) { + // CRITICAL FIX: readEvent() returns !?ChainEvent + // It can return: null (no data), error (connection issue), or ChainEvent + const event_result = sse_client.readEvent() catch |err| { + std.debug.print("❌ Error reading SSE event: {}\n", .{err}); + return error.SSEReadError; + }; + + // IMPORTANT: Handle the null case - this means no data available YET + if (event_result == null) { + null_count += 1; + + // Print progress every 20 null reads (roughly every second given the 50ms sleep in SSEClient) + if (null_count % 20 == 0) { + const now = std.time.nanoTimestamp(); + if (now - last_progress_time > 5 * std.time.ns_per_s) { + const elapsed = @divTrunc(now - (deadline_ns - @as(i64, @intCast(timeout_seconds)) * std.time.ns_per_s), std.time.ns_per_s); + const remaining = @divTrunc(deadline_ns - now, std.time.ns_per_s); + std.debug.print("⏱️ Still waiting for events... ({d} events received, {d}s elapsed, {d}s remaining)\n", .{ event_count, elapsed, remaining }); + last_progress_time = now; + } + } + + continue; // Continue to next iteration + } + + // We have a valid event + const e = event_result.?; // Safe to unwrap since we checked for null + event_count += 1; + std.debug.print("📨 Event #{d}: {s}\n", .{ event_count, e.event_type }); + + // Check for finalization with slot > 0 + if (std.mem.eql(u8, e.event_type, "new_finalization")) { + if (e.finalized_slot) |slot| { + std.debug.print("🔍 Found finalization event with slot {d}\n", .{slot}); + if (slot > 0) { + std.debug.print("🎉 Finalization detected at slot {d}!\n", .{slot}); + e.deinit(allocator); + return FinalizationResult{ + .finalized = true, + .finalization_slot = slot, + .finalization_root = [_]u8{0} ** 32, + .timeout_reached = false, + }; + } + } + } + + e.deinit(allocator); + } + + std.debug.print("❌ Timeout reached after {d} seconds\n", .{timeout_seconds}); + std.debug.print("📊 Total events received: {d}\n", .{event_count}); + std.debug.print("📊 Total null reads: {d}\n", .{null_count}); + return FinalizationResult{ + .finalized = false, + .finalization_slot = 0, + .finalization_root = [_]u8{0} ** 32, + .timeout_reached = true, + }; +} + +fn runTwoNodesAsProcessesToFinalization(allocator: Allocator, config: TestConfig) !FinalizationResult { + std.debug.print("\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("🚀 STARTING TWO-NODE PROCESS TEST\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("Test config: genesis_time={d}, num_validators={d}, timeout={d}s\n", .{ config.genesis_time, config.num_validators, config.timeout_seconds }); + std.debug.print("\n", .{}); + + const node_0_port: u16 = 9669; + const node_1_port: u16 = 9670; + + std.debug.print("▶️ STEP 1: Spawning Node 0 (port {d})\n", .{node_0_port}); + const node_0_process = try spawnZeamNodeProcess(allocator, 0, config, node_0_port); + defer { + _ = node_0_process.kill() catch {}; + _ = node_0_process.wait() catch {}; + allocator.destroy(node_0_process); + } + + std.debug.print("\n▶️ STEP 2: Spawning Node 1 (port {d})\n", .{node_1_port}); + const node_1_process = try spawnZeamNodeProcess(allocator, 1, config, node_1_port); + defer { + std.debug.print("🧹 Cleaning up node 1 process...\n", .{}); + _ = node_1_process.kill() catch {}; + _ = node_1_process.wait() catch {}; + allocator.destroy(node_1_process); + } + + std.debug.print("\n✅ Both node processes spawned\n", .{}); + + std.debug.print("\n▶️ STEP 3: Waiting for nodes to start (30s timeout each)...\n", .{}); + try waitForNodeStartup(node_0_port, 30, node_0_process); + try waitForNodeStartup(node_1_port, 30, node_1_process); + + std.debug.print("\n✅ Both nodes are ready!\n", .{}); + + std.debug.print("\n▶️ STEP 4: Monitoring for finalization via SSE (timeout: {d}s)...\n", .{config.timeout_seconds}); + const result = try monitorForFinalization(allocator, node_0_port, config.timeout_seconds); + + std.debug.print("\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("🏁 TEST COMPLETE - Finalized: {}\n", .{result.finalized}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + + return result; +} + +fn cleanupGenesisDirectory(allocator: Allocator, test_dir: []const u8) !void { + _ = allocator; + const cwd = std.fs.cwd(); + cwd.deleteTree(test_dir) catch |err| switch (err) { + error.AccessDenied, error.FileBusy, error.FileSystem, error.SymLinkLoop, error.NameTooLong, error.NotDir, error.SystemResources, error.ReadOnlyFileSystem, error.InvalidUtf8, error.BadPathName, error.NetworkNotFound, error.DeviceBusy, error.NoDevice, error.ProcessFdQuotaExceeded, error.SystemFdQuotaExceeded, error.Unexpected, error.FileTooBig, error.InvalidWtf8 => return err, + }; +} + +test "genesis_generator_two_node_finalization_sim" { + var arena_allocator = std.heap.ArenaAllocator.init(std.testing.allocator); + defer arena_allocator.deinit(); + const allocator = arena_allocator.allocator(); + + // Set genesis time in the near future to allow nodes to sync before genesis + const genesis_time = @as(u64, @intCast(std.time.timestamp())) + 10; + + const config = TestConfig{ + .genesis_time = genesis_time, + .num_validators = 2, + .test_dir = "test_genesis_two_nodes", + .timeout_seconds = 120, // Reduced timeout for faster failure detection + }; + + std.debug.print("🚀 Starting Genesis Generator Two-Node Finalization Test \n", .{}); + std.debug.print("📁 Test directory: {s}\n", .{config.test_dir}); + std.debug.print("⏰ Genesis time: {d} (in ~10 seconds)\n", .{config.genesis_time}); + std.debug.print("⏰ Current time: {d}\n", .{std.time.timestamp()}); + std.debug.print("👥 Number of validators: {d}\n", .{config.num_validators}); + std.debug.print("⏱️ Timeout: {d} seconds\n", .{config.timeout_seconds}); + + const cwd = std.fs.cwd(); + cwd.makeDir("log") catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + try generateGenesisDirectory(allocator, config); + std.debug.print("✅ Generated proper genesis directory structure\n", .{}); + + const result = try runTwoNodesAsProcessesToFinalization(allocator, config); + + try cleanupGenesisDirectory(allocator, config.test_dir); + + if (result.timeout_reached) { + std.debug.print("❌ Test failed: Timeout reached after {d} seconds\n", .{config.timeout_seconds}); + return error.TestTimeout; + } + + if (!result.finalized) { + std.debug.print("❌ Test failed: No finalization detected\n", .{}); + return error.NoFinalization; + } + + std.debug.print("✅ Test passed: Finalization detected at slot {d}\n", .{result.finalization_slot}); + std.debug.print("🎉 Genesis Generator Two-Node Finalization Test completed successfully!\n", .{}); +} diff --git a/pkgs/cli/test/integration.zig b/pkgs/cli/test/integration.zig index 498db44ee..7fd71be43 100644 --- a/pkgs/cli/test/integration.zig +++ b/pkgs/cli/test/integration.zig @@ -3,553 +3,28 @@ const process = std.process; const net = std.net; const build_options = @import("build_options"); const constants = @import("cli_constants"); -const error_handler = @import("error_handler"); -const ErrorHandler = error_handler.ErrorHandler; - -/// Verify that the Zeam executable exists and return its path -/// Includes detailed debugging output if the executable is not found -fn getZeamExecutable() ![]const u8 { - const exe_file = std.fs.openFileAbsolute(build_options.cli_exe_path, .{}) catch |err| { - std.debug.print("ERROR: Cannot find executable at {s}: {}\n", .{ build_options.cli_exe_path, err }); - - // Try to list the directory to see what's actually there - std.debug.print("INFO: Attempting to list {s} directory...\n", .{build_options.cli_exe_path}); - const dir_path = std.fs.path.dirname(build_options.cli_exe_path); - if (dir_path) |path| { - var dir = std.fs.openDirAbsolute(path, .{ .iterate = true }) catch |dir_err| { - std.debug.print("ERROR: Cannot open directory {s}: {}\n", .{ path, dir_err }); - return err; - }; - defer dir.close(); - - var iterator = dir.iterate(); - std.debug.print("INFO: Contents of {s}:\n", .{path}); - while (try iterator.next()) |entry| { - std.debug.print(" - {s} (type: {})\n", .{ entry.name, entry.kind }); - } - } - - return err; - }; - exe_file.close(); - std.debug.print("INFO: Found executable at {s}\n", .{build_options.cli_exe_path}); - return build_options.cli_exe_path; -} - -/// Helper function to start a beam simulation node and wait for it to be ready -/// Handles the complete process lifecycle: creation, spawning, and waiting for readiness -/// Returns the process handle for cleanup, or error if startup fails -fn spinBeamSimNode(allocator: std.mem.Allocator, exe_path: []const u8) !*process.Child { - // Set up process with beam command and mock network - const args = [_][]const u8{ exe_path, "beam", "--mockNetwork", "true" }; - const cli_process = try allocator.create(process.Child); - cli_process.* = process.Child.init(&args, allocator); - - // Capture stdout and stderr for debugging - // However this leads to test being cut short probably because of child process getting killed - // so commenting the pipe and letting the output to flow to console - // TODO: figureout and fix the behavior and uncomment the following - // - // cli_process.stdout_behavior = .Pipe; - // cli_process.stderr_behavior = .Pipe; - - // Start the process - cli_process.spawn() catch |err| { - std.debug.print("ERROR: Failed to spawn process: {}\n", .{err}); - allocator.destroy(cli_process); - return err; - }; - - std.debug.print("INFO: Process spawned successfully with PID\n", .{}); - - // Wait for server to be ready - const start_time = std.time.milliTimestamp(); - var server_ready = false; - var retry_count: u32 = 0; - - while (std.time.milliTimestamp() - start_time < constants.DEFAULT_SERVER_STARTUP_TIMEOUT_MS) { - retry_count += 1; - - // Print progress every 10 retries - if (retry_count % 10 == 0) { - const elapsed = @divTrunc(std.time.milliTimestamp() - start_time, 1000); - std.debug.print("INFO: Still waiting for server... ({} seconds, {} retries)\n", .{ elapsed, retry_count }); - } - - // Try to connect to the metrics server - const address = net.Address.parseIp4(constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT) catch { - std.time.sleep(constants.DEFAULT_RETRY_INTERVAL_MS * std.time.ns_per_ms); - continue; - }; - - var connection = net.tcpConnectToAddress(address) catch |err| { - // Only print error details on certain intervals to avoid spam - if (retry_count % 20 == 0) { - std.debug.print("DEBUG: Connection attempt {} failed: {}\n", .{ retry_count, err }); - } - std.time.sleep(constants.DEFAULT_RETRY_INTERVAL_MS * std.time.ns_per_ms); - continue; - }; - - // Test if we can actually send/receive data - connection.close(); - server_ready = true; - std.debug.print("SUCCESS: Server ready after {} seconds ({} retries)\n", .{ @divTrunc(std.time.milliTimestamp() - start_time, 1000), retry_count }); - break; - } - - // If server didn't start, try to get process output for debugging - if (!server_ready) { - std.debug.print("ERROR: Metrics server not ready after {} seconds ({} retries)\n", .{ @divTrunc(constants.DEFAULT_SERVER_STARTUP_TIMEOUT_MS, 1000), retry_count }); - - // Try to read any output from the process - if (cli_process.stdout) |stdout| { - var stdout_buffer: [4096]u8 = undefined; - const stdout_bytes = stdout.readAll(&stdout_buffer) catch 0; - if (stdout_bytes > 0) { - std.debug.print("STDOUT: {s}\n", .{stdout_buffer[0..stdout_bytes]}); - } - } - - if (cli_process.stderr) |stderr| { - var stderr_buffer: [4096]u8 = undefined; - const stderr_bytes = stderr.readAll(&stderr_buffer) catch 0; - if (stderr_bytes > 0) { - std.debug.print("STDERR: {s}\n", .{stderr_buffer[0..stderr_bytes]}); - } - } - - // Check if process is still running - if (cli_process.wait() catch null) |term| { - switch (term) { - .Exited => |code| std.debug.print("ERROR: Process exited with code {}\n", .{code}), - .Signal => |sig| std.debug.print("ERROR: Process killed by signal {}\n", .{sig}), - .Stopped => |sig| std.debug.print("ERROR: Process stopped by signal {}\n", .{sig}), - .Unknown => |code| std.debug.print("ERROR: Process terminated with unknown code {}\n", .{code}), - } - } else { - std.debug.print("INFO: Process is still running\n", .{}); - } - - // Server not ready, cleanup and return error - allocator.destroy(cli_process); - return error.ServerStartupTimeout; - } - - return cli_process; -} - -/// Wait for node to start and be ready for activity -/// TODO: Over time, this can be abstracted to listen for some event -/// that the node can output when being active, rather than using a fixed sleep -fn waitForNodeStart() void { - std.time.sleep(2000 * std.time.ns_per_ms); -} - -/// Helper struct for making HTTP requests to Zeam endpoints -const ZeamRequest = struct { - allocator: std.mem.Allocator, - - fn init(allocator: std.mem.Allocator) ZeamRequest { - return ZeamRequest{ .allocator = allocator }; - } - - /// Make a request to the /metrics endpoint and return the response - fn getMetrics(self: ZeamRequest) ![]u8 { - return self.makeRequest("/metrics"); - } - - /// Make a request to the /health endpoint and return the response - fn getHealth(self: ZeamRequest) ![]u8 { - return self.makeRequest("/health"); - } - - /// Internal helper to make HTTP requests to any endpoint - fn makeRequest(self: ZeamRequest, endpoint: []const u8) ![]u8 { - // Create connection to the server - const address = try net.Address.parseIp4(constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT); - var connection = try net.tcpConnectToAddress(address); - defer connection.close(); - - // Create HTTP request - var request_buffer: [4096]u8 = undefined; - const request = try std.fmt.bufPrint(&request_buffer, "GET {s} HTTP/1.1\r\n" ++ - "Host: {s}:{d}\r\n" ++ - "Connection: close\r\n" ++ - "\r\n", .{ endpoint, constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT }); - - try connection.writeAll(request); - - // Read response - var response_buffer: [8192]u8 = undefined; - const bytes_read = try connection.readAll(&response_buffer); - - // Allocate and return a copy of the response - const response = try self.allocator.dupe(u8, response_buffer[0..bytes_read]); - return response; - } - - /// Free a response returned by getMetrics() or getHealth() - fn freeResponse(self: ZeamRequest, response: []u8) void { - self.allocator.free(response); - } -}; - -/// Parsed SSE Event structure -const ChainEvent = struct { - event_type: []const u8, - justified_slot: ?u64, - finalized_slot: ?u64, - - /// Free the memory allocated for this event - fn deinit(self: ChainEvent, allocator: std.mem.Allocator) void { - allocator.free(self.event_type); - } -}; - -/// SSE Client for testing event streaming - FIXED VERSION -const SSEClient = struct { - allocator: std.mem.Allocator, - connection: std.net.Stream, - received_events: std.ArrayList([]u8), - // NEW: Add proper buffering for handling partial events and multiple events per read - read_buffer: std.ArrayList(u8), - parsed_events_queue: std.ArrayList(ChainEvent), - - fn init(allocator: std.mem.Allocator) !SSEClient { - const address = try net.Address.parseIp4(constants.DEFAULT_SERVER_IP, constants.DEFAULT_METRICS_PORT); - const connection = try net.tcpConnectToAddress(address); - - return SSEClient{ - .allocator = allocator, - .connection = connection, - .received_events = std.ArrayList([]u8).init(allocator), - .read_buffer = std.ArrayList(u8).init(allocator), - .parsed_events_queue = std.ArrayList(ChainEvent).init(allocator), - }; - } - - fn deinit(self: *SSEClient) void { - self.connection.close(); - for (self.received_events.items) |event| { - self.allocator.free(event); - } - self.received_events.deinit(); - self.read_buffer.deinit(); - - // Clean up parsed events queue - for (self.parsed_events_queue.items) |event| { - self.allocator.free(event.event_type); - } - self.parsed_events_queue.deinit(); - } - - fn connect(self: *SSEClient) !void { - // Send SSE request - const request = "GET /events HTTP/1.1\r\n" ++ - "Host: 127.0.0.1:9667\r\n" ++ - "Accept: text/event-stream\r\n" ++ - "Cache-Control: no-cache\r\n" ++ - "Connection: keep-alive\r\n" ++ - "\r\n"; - - try self.connection.writeAll(request); - } - - /// NEW: Parse all complete events from the current buffer - fn parseAllEventsFromBuffer(self: *SSEClient) !void { - var buffer_pos: usize = 0; - - while (buffer_pos < self.read_buffer.items.len) { - // Look for complete SSE event (ends with \n\n or \r\n\r\n) - const remaining_buffer = self.read_buffer.items[buffer_pos..]; - - const event_end_lf = std.mem.indexOf(u8, remaining_buffer, "\n\n"); - const event_end_crlf = std.mem.indexOf(u8, remaining_buffer, "\r\n\r\n"); - - var event_end: ?usize = null; - var separator_len: usize = 2; - - if (event_end_lf != null and event_end_crlf != null) { - // Both found, use the earlier one - if (event_end_lf.? < event_end_crlf.?) { - event_end = event_end_lf; - separator_len = 2; - } else { - event_end = event_end_crlf; - separator_len = 4; - } - } else if (event_end_lf != null) { - event_end = event_end_lf; - separator_len = 2; - } else if (event_end_crlf != null) { - event_end = event_end_crlf; - separator_len = 4; - } - - if (event_end == null) { - // No complete event found, break and wait for more data - break; - } - - // Extract the complete event block - const event_block = remaining_buffer[0..event_end.?]; - - // Parse this event and add to queue if valid - if (self.parseEventBlock(event_block)) |parsed_event| { - try self.parsed_events_queue.append(parsed_event); - - // Store raw event for debugging - const raw_event = try self.allocator.dupe(u8, event_block); - try self.received_events.append(raw_event); - } - - // Move past this event - buffer_pos += event_end.? + separator_len; - } - - // Remove processed events from buffer - if (buffer_pos > 0) { - if (buffer_pos < self.read_buffer.items.len) { - std.mem.copyForwards(u8, self.read_buffer.items[0..], self.read_buffer.items[buffer_pos..]); - try self.read_buffer.resize(self.read_buffer.items.len - buffer_pos); - } else { - self.read_buffer.clearAndFree(); - } - } - } - - /// NEW: Parse a single event block and return parsed event - fn parseEventBlock(self: *SSEClient, event_block: []const u8) ?ChainEvent { - // Find event type line - const event_line_start = std.mem.indexOf(u8, event_block, "event:") orelse return null; - const data_line_start = std.mem.indexOf(u8, event_block, "data:") orelse return null; - - // Extract event type - const event_line_slice = blk: { - const nl = std.mem.indexOfScalarPos(u8, event_block, event_line_start, '\n') orelse event_block.len; - const cr = std.mem.indexOfScalarPos(u8, event_block, event_line_start, '\r') orelse nl; - const line_end = @min(nl, cr); - break :blk std.mem.trim(u8, event_block[event_line_start + "event:".len .. line_end], " \t"); - }; - - // Extract data payload - const data_line_slice = blk2: { - const nl = std.mem.indexOfScalarPos(u8, event_block, data_line_start, '\n') orelse event_block.len; - const cr = std.mem.indexOfScalarPos(u8, event_block, data_line_start, '\r') orelse nl; - const line_end = @min(nl, cr); - break :blk2 std.mem.trim(u8, event_block[data_line_start + "data:".len .. line_end], " \t"); - }; - - // Clone event type string so it persists - const event_type_owned = self.allocator.dupe(u8, event_line_slice) catch return null; - - // Parse JSON data - const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, data_line_slice, .{ .ignore_unknown_fields = true }) catch return null; - defer parsed.deinit(); - - var justified_slot: ?u64 = null; - var finalized_slot: ?u64 = null; - - if (parsed.value.object.get("justified_slot")) |js| { - switch (js) { - .integer => |ival| justified_slot = @intCast(ival), - else => {}, - } - } - - if (parsed.value.object.get("finalized_slot")) |fs| { - switch (fs) { - .integer => |ival| finalized_slot = @intCast(ival), - else => {}, - } - } - - return ChainEvent{ - .event_type = event_type_owned, - .justified_slot = justified_slot, - .finalized_slot = finalized_slot, - }; - } - - /// FIXED: Main function that reads network data, buffers it, and returns one parsed event - /// This addresses the reviewer's concern by properly handling multiple events and buffering - fn readEvent(self: *SSEClient) !?ChainEvent { - // First, check if we have any parsed events in queue - if (self.parsed_events_queue.items.len > 0) { - return self.parsed_events_queue.orderedRemove(0); - } - - // Read new data from network - var temp_buffer: [4096]u8 = undefined; - const bytes_read = self.connection.read(&temp_buffer) catch |err| switch (err) { - error.WouldBlock => { - std.time.sleep(50 * std.time.ns_per_ms); - return null; // No data available - }, - else => return err, - }; - - if (bytes_read == 0) { - std.time.sleep(50 * std.time.ns_per_ms); - return null; // No data available - } - - // Append new data to our persistent buffer - try self.read_buffer.appendSlice(temp_buffer[0..bytes_read]); - - // Parse all complete events from the buffer - try self.parseAllEventsFromBuffer(); - - // Return first parsed event if available - if (self.parsed_events_queue.items.len > 0) { - return self.parsed_events_queue.orderedRemove(0); - } - - return null; // No complete events available yet - } - - fn hasEvent(self: *SSEClient, event_type: []const u8) bool { - for (self.received_events.items) |event_data| { - if (std.mem.indexOf(u8, event_data, event_type) != null) { - return true; - } - } - return false; - } - - fn getEventCount(self: *SSEClient, event_type: []const u8) usize { - var count: usize = 0; - for (self.received_events.items) |event_data| { - if (std.mem.indexOf(u8, event_data, event_type) != null) { - count += 1; - } - } - return count; - } -}; - -/// Clean up a process created by spinBeamSimNode -fn cleanupProcess(allocator: std.mem.Allocator, cli_process: *process.Child) void { - _ = cli_process.kill() catch {}; - _ = cli_process.wait() catch {}; - allocator.destroy(cli_process); -} - -test "CLI beam command with mock network - complete integration test" { - const allocator = std.testing.allocator; - - // Get executable path - const exe_path = try getZeamExecutable(); - - // Start node and wait for readiness - const cli_process = try spinBeamSimNode(allocator, exe_path); - defer cleanupProcess(allocator, cli_process); - - // Wait for node to be fully active - waitForNodeStart(); - - // Test metrics endpoint - var zeam_request = ZeamRequest.init(allocator); - const response = try zeam_request.getMetrics(); - defer zeam_request.freeResponse(response); - - // Verify we got a valid HTTP response - try std.testing.expect(std.mem.indexOf(u8, response, "HTTP/1.1 200") != null or std.mem.indexOf(u8, response, "HTTP/1.0 200") != null); - - // Verify response contains actual metric names from the metrics system - try std.testing.expect(std.mem.indexOf(u8, response, "chain_onblock_duration_seconds") != null or - std.mem.indexOf(u8, response, "block_processing_duration_seconds") != null); - - // Verify response is not empty - try std.testing.expect(response.len > 100); - - std.debug.print("SUCCESS: All integration test checks passed\n", .{}); -} - -test "SSE events integration test - wait for justification and finalization" { - const allocator = std.testing.allocator; - - // Get executable path - const exe_path = try getZeamExecutable(); - - // Start node and wait for readiness - const cli_process = try spinBeamSimNode(allocator, exe_path); - defer cleanupProcess(allocator, cli_process); - - // Wait for node to be fully active - waitForNodeStart(); - - // Create SSE client - var sse_client = try SSEClient.init(allocator); - defer sse_client.deinit(); - - // Connect to SSE endpoint - try sse_client.connect(); - - std.debug.print("INFO: Connected to SSE endpoint, waiting for events...\n", .{}); - - // Read events until both justification and finalization are seen, or timeout - const timeout_ms: u64 = 180000; // 180 seconds timeout - const start_ns = std.time.nanoTimestamp(); - const deadline_ns = start_ns + timeout_ms * std.time.ns_per_ms; - var got_justification = false; - var got_finalization = false; - - // FIXED: This loop now works correctly with the improved readEvent() function - while (std.time.nanoTimestamp() < deadline_ns and !(got_justification and got_finalization)) { - const event = try sse_client.readEvent(); - if (event) |e| { - // Check for justification with slot > 0 - if (!got_justification and std.mem.eql(u8, e.event_type, "new_justification")) { - if (e.justified_slot) |slot| { - if (slot > 0) { - got_justification = true; - std.debug.print("INFO: Found justification with slot {}\n", .{slot}); - } - } - } - - // Check for finalization with slot > 0 - if (!got_finalization and std.mem.eql(u8, e.event_type, "new_finalization")) { - if (e.finalized_slot) |slot| { - std.debug.print("DEBUG: Found finalization event with slot {}\n", .{slot}); - if (slot > 0) { - got_finalization = true; - std.debug.print("INFO: Found finalization with slot {}\n", .{slot}); - } - } else { - std.debug.print("DEBUG: Found finalization event with null slot\n", .{}); - } - } - - // IMPORTANT: Free the event memory after processing - e.deinit(allocator); - } - } - - // Check if we received connection event - try std.testing.expect(sse_client.hasEvent("connection")); - - // Check for chain events - const head_events = sse_client.getEventCount("new_head"); - const justification_events = sse_client.getEventCount("new_justification"); - const finalization_events = sse_client.getEventCount("new_finalization"); - - std.debug.print("INFO: Received events - Head: {}, Justification: {}, Finalization: {}\n", .{ head_events, justification_events, finalization_events }); - - // Require both justification and finalization (> 0) to have been observed - try std.testing.expect(got_justification); - try std.testing.expect(got_finalization); - - // Print some sample events for debugging - for (sse_client.received_events.items, 0..) |event_data, i| { - if (i < 5) { // Print first 5 events - std.debug.print("Event {}: {s}\n", .{ i, event_data }); - } - } - - std.debug.print("SUCCESS: SSE events integration test completed\n", .{}); +const cli = @import("@zeam/cli"); +const ErrorHandler = cli.error_handler.ErrorHandler; + +// Central integration tests file that imports all integration test modules. +// This file serves as the entry point for the "simtest" build step. +// +// Integration tests imported here: +// - beam_integration_test.zig: Beam command and SSE events integration tests +// - genesis_to_finalization_test.zig: Two-node genesis to finalization simulation test +// - lean_quickstart_integration_test.zig: Full lean-quickstart integration test (Option B) +// +// Run with: zig build simtest + +test { + // Import beam command integration tests (CLI with mock network, SSE events) + _ = @import("beam_integration_test.zig"); + + // Import genesis to finalization integration test (two nodes in-process) + _ = @import("genesis_to_finalization_test.zig"); + + // Import lean-quickstart full integration test (Option B implementation) + _ = @import("lean_quickstart_integration_test.zig"); } // Test suite for ErrorHandler diff --git a/pkgs/cli/test/lean_quickstart_integration_test.zig b/pkgs/cli/test/lean_quickstart_integration_test.zig new file mode 100644 index 000000000..2edffa27d --- /dev/null +++ b/pkgs/cli/test/lean_quickstart_integration_test.zig @@ -0,0 +1,494 @@ +const std = @import("std"); +const builtin = @import("builtin"); +const Allocator = std.mem.Allocator; +const process = std.process; +const net = std.net; +const build_options = @import("build_options"); +const beam_test = @import("beam_integration_test.zig"); +const SSEClient = beam_test.SSEClient; +const ChainEvent = beam_test.ChainEvent; + +const TestConfig = struct { + genesis_time: u64, + num_validators: u32, + network_dir: []const u8, + timeout_seconds: u64 = 600, +}; + +const FinalizationResult = struct { + finalized: bool, + finalization_slot: u64, + finalization_root: [32]u8, + timeout_reached: bool = false, +}; + +/// Generate the network directory structure and validator-config.yaml for lean-quickstart +fn generateLeanQuickstartConfig(allocator: Allocator, config: TestConfig) !void { + const cwd = std.fs.cwd(); + + // Create network directory structure + cwd.makeDir(config.network_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const genesis_dir = try std.fmt.allocPrint(allocator, "{s}/genesis", .{config.network_dir}); + defer allocator.free(genesis_dir); + + cwd.makeDir(genesis_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const data_dir = try std.fmt.allocPrint(allocator, "{s}/data", .{config.network_dir}); + defer allocator.free(data_dir); + + cwd.makeDir(data_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + // Generate config.yaml (genesis time will be updated by generate-genesis.sh) + const config_yaml = try std.fmt.allocPrint(allocator, + \\# Genesis Settings + \\GENESIS_TIME: {d} + \\# Validator Settings + \\VALIDATOR_COUNT: {d} + \\ + , .{ config.genesis_time, config.num_validators }); + defer allocator.free(config_yaml); + + const config_path = try std.fmt.allocPrint(allocator, "{s}/config.yaml", .{genesis_dir}); + defer allocator.free(config_path); + try cwd.writeFile(.{ .sub_path = config_path, .data = config_yaml }); + + // Generate validator-config.yaml with TWO zeam nodes + // CRITICAL: Must include metricsPort field for each validator! + const validator_config_yaml = + \\shuffle: roundrobin + \\config: + \\ activeEpoch: 18 + \\ keyType: "hash-sig" + \\validators: + \\ - name: "zeam_0" + \\ privkey: "a000000000000000000000000000000000000000000000000000000000000001" + \\ enrFields: + \\ ip: "127.0.0.1" + \\ quic: 9100 + \\ metricsPort: 9669 + \\ count: 1 + \\ - name: "zeam_1" + \\ privkey: "b000000000000000000000000000000000000000000000000000000000000002" + \\ enrFields: + \\ ip: "127.0.0.1" + \\ quic: 9101 + \\ metricsPort: 9670 + \\ count: 1 + \\ + ; + + const validator_config_path = try std.fmt.allocPrint(allocator, "{s}/validator-config.yaml", .{genesis_dir}); + defer allocator.free(validator_config_path); + try cwd.writeFile(.{ .sub_path = validator_config_path, .data = validator_config_yaml }); + + std.debug.print("✅ Generated lean-quickstart configuration structure\n", .{}); + std.debug.print(" Network dir: {s}\n", .{config.network_dir}); + std.debug.print(" Genesis dir: {s}\n", .{genesis_dir}); +} + +/// Run lean-quickstart's generate-genesis.sh script +fn runGenesisGenerator(allocator: Allocator, network_dir: []const u8) !void { + std.debug.print("🔧 Running lean-quickstart genesis generator...\n", .{}); + + const cwd_path = try std.fs.cwd().realpathAlloc(allocator, "."); + defer allocator.free(cwd_path); + + const generate_script = try std.fmt.allocPrint(allocator, "{s}/lean-quickstart/generate-genesis.sh", .{cwd_path}); + defer allocator.free(generate_script); + + const genesis_dir = try std.fmt.allocPrint(allocator, "{s}/genesis", .{network_dir}); + defer allocator.free(genesis_dir); + + const genesis_dir_abs = try std.fs.cwd().realpathAlloc(allocator, genesis_dir); + defer allocator.free(genesis_dir_abs); + + const args = &[_][]const u8{ + "/bin/bash", + generate_script, + genesis_dir_abs, + }; + + std.debug.print(" Command: {s} {s} {s}\n", .{ args[0], args[1], args[2] }); + + const result = try std.process.Child.run(.{ + .allocator = allocator, + .argv = args, + .max_output_bytes = 1024 * 1024, // 1MB + }); + defer allocator.free(result.stdout); + defer allocator.free(result.stderr); + + if (result.term != .Exited or result.term.Exited != 0) { + std.debug.print("❌ Genesis generation failed!\n", .{}); + std.debug.print("STDOUT:\n{s}\n", .{result.stdout}); + std.debug.print("STDERR:\n{s}\n", .{result.stderr}); + return error.GenesisGenerationFailed; + } + + std.debug.print("✅ Genesis generation completed successfully\n", .{}); + std.debug.print("{s}\n", .{result.stdout}); +} + +const NodeProcess = struct { + child: *process.Child, + env_map: *process.EnvMap, + allocator: Allocator, + + /// Cleanup the node process using graceful SIGTERM + /// + /// CLEANUP FLOW: + /// 1. Send SIGTERM to spin-node.sh process + /// 2. The script's trap (line 169) catches SIGTERM + /// 3. The cleanup() function (lines 149-167) executes + /// 4. cleanup() kills the zeam child processes with kill -9 + /// 5. The script exits cleanly + /// + /// This mimics the user pressing Ctrl+C in an interactive session. + fn deinit(self: *NodeProcess) void { + std.debug.print("🧹 Initiating graceful shutdown via SIGTERM...\n", .{}); + + // Send SIGTERM to trigger the cleanup trap in spin-node.sh + // This mimics pressing Ctrl+C in an interactive session + const pid = self.child.id; + + const sigterm_result = std.posix.kill(pid, std.posix.SIG.TERM) catch |err| { + std.debug.print("⚠️ Failed to send SIGTERM to PID {d}: {}\n", .{ pid, err }); + // Continue to force kill + _ = self.child.kill() catch {}; + _ = self.child.wait() catch {}; + self.env_map.deinit(); + self.allocator.destroy(self.env_map); + self.allocator.destroy(self.child); + return; + }; + _ = sigterm_result; + + std.debug.print("✅ SIGTERM sent to PID {d}, waiting for cleanup...\n", .{pid}); + + // Give the script time to run its cleanup() function + // The trap will catch SIGTERM and execute cleanup which kills child processes + std.time.sleep(3 * std.time.ns_per_s); + + // Try to wait for the process to exit gracefully + const wait_result = self.child.wait() catch |err| { + std.debug.print("⚠️ Process didn't exit cleanly: {}, force killing...\n", .{err}); + _ = self.child.kill() catch {}; + self.env_map.deinit(); + self.allocator.destroy(self.env_map); + self.allocator.destroy(self.child); + return; + }; + + std.debug.print("✅ Process exited: {}\n", .{wait_result}); + + self.env_map.deinit(); + self.allocator.destroy(self.env_map); + self.allocator.destroy(self.child); + } +}; + +/// Spawn a single node using lean-quickstart spin-node.sh +/// +/// IMPORTANT BEHAVIOR: +/// - Sets working directory to lean-quickstart/ so relative paths work (source parse-vc.sh, etc.) +/// - spin-node.sh spawns the zeam node as a background process (line 141: eval "$execCmd" &) +/// - Then it waits indefinitely (line 173: wait -n $process_ids) +/// - The wait -n fails on macOS (bash 3.2 doesn't support -n flag) but this is OK +/// - The zeam node IS already running in the background when wait fails +/// - We will send SIGTERM during cleanup to trigger the script's cleanup trap +/// +/// This function returns immediately with the script's process handle. +/// The script will error at wait, but the node is already running. +fn spawnLeanQuickstartCluster( + allocator: Allocator, + network_dir: []const u8, +) !*NodeProcess { + std.debug.print("🚀 Spawning lean-quickstart spin-node.sh for zeam nodes\n", .{}); + + // Get absolute path to lean-quickstart + const cwd_path = try std.fs.cwd().realpathAlloc(allocator, "."); + defer allocator.free(cwd_path); + + const spin_script = try std.fmt.allocPrint(allocator, "{s}/lean-quickstart/spin-node.sh", .{cwd_path}); + defer allocator.free(spin_script); + + // IMPORTANT: lean-quickstart scripts expect NETWORK_DIR to be RELATIVE to the script directory + // Since spin-node.sh is in lean-quickstart/, and our network_dir is in zeam/, we use ../network_dir + const relative_network_dir = try std.fmt.allocPrint(allocator, "../{s}", .{network_dir}); + defer allocator.free(relative_network_dir); + + // Build command: NETWORK_DIR=... bash spin-node.sh --node zeam_0,zeam_1 --validatorConfig genesis_bootnode + const args = &[_][]const u8{ + "/bin/bash", + spin_script, + "--node", + "zeam_0,zeam_1", + "--validatorConfig", + "genesis_bootnode", + }; + + std.debug.print(" Command: {s} {s} --node zeam_0,zeam_1\n", .{ args[0], args[1] }); + std.debug.print(" NETWORK_DIR: {s} (relative to script)\n", .{relative_network_dir}); + + const node_child = try allocator.create(process.Child); + node_child.* = process.Child.init(args, allocator); + + // CRITICAL: Set working directory to lean-quickstart/ + // This makes relative paths in spin-node.sh work correctly: + // - source parse-vc.sh → finds it in current directory + // - source client-cmds/zeam-cmd.sh → finds it relative to current directory + const lean_quickstart_dir = try std.fmt.allocPrint(allocator, "{s}/lean-quickstart", .{cwd_path}); + defer allocator.free(lean_quickstart_dir); + + node_child.cwd = lean_quickstart_dir; + + // Set NETWORK_DIR environment variable + // IMPORTANT: env_map must persist for the lifetime of the process + const env_map = try allocator.create(process.EnvMap); + env_map.* = try process.getEnvMap(allocator); + + // Use relative path that the scripts expect + try env_map.put("NETWORK_DIR", relative_network_dir); + node_child.env_map = env_map; + + // Capture output for debugging + node_child.stdout_behavior = .Ignore; + node_child.stderr_behavior = .Inherit; + + // Spawn the process + node_child.spawn() catch |err| { + std.debug.print("❌ ERROR: Failed to spawn node process: {}\n", .{err}); + env_map.deinit(); + allocator.destroy(env_map); + allocator.destroy(node_child); + return err; + }; + + std.debug.print("✅ spin-node.sh spawned (PID: {d})\n", .{node_child.id}); + + const node_process = try allocator.create(NodeProcess); + node_process.* = NodeProcess{ + .child = node_child, + .env_map = env_map, + .allocator = allocator, + }; + + return node_process; +} + +/// Wait for node to be ready by polling its metrics port +fn waitForNodeStartup(metrics_port: u16, timeout_seconds: u64) !void { + std.debug.print("⏳ Waiting for node on port {d} to start (timeout: {d}s)...\n", .{ metrics_port, timeout_seconds }); + + const start_time = std.time.milliTimestamp(); + const timeout_ms = timeout_seconds * 1000; + var attempt: usize = 0; + + while (std.time.milliTimestamp() - start_time < timeout_ms) { + attempt += 1; + + if (attempt % 10 == 0) { + const elapsed = @divTrunc(std.time.milliTimestamp() - start_time, 1000); + std.debug.print("⏱️ Still waiting for port {d}... ({d}s elapsed, attempt {d})\n", .{ metrics_port, elapsed, attempt }); + } + + const address = net.Address.parseIp4("127.0.0.1", metrics_port) catch { + std.time.sleep(1000 * std.time.ns_per_ms); + continue; + }; + + var connection = net.tcpConnectToAddress(address) catch { + std.time.sleep(1000 * std.time.ns_per_ms); + continue; + }; + connection.close(); + + std.debug.print("✅ Node on port {d} is ready\n", .{metrics_port}); + return; + } + + std.debug.print("❌ Timeout waiting for node on port {d}\n", .{metrics_port}); + return error.NodeStartupTimeout; +} + +/// Monitor SSE events for finalization +fn monitorForFinalization(allocator: Allocator, metrics_port: u16, timeout_seconds: u64) !FinalizationResult { + std.debug.print("📡 Creating SSE client for port {d}...\n", .{metrics_port}); + + var sse_client = try SSEClient.init(allocator, metrics_port); + defer sse_client.deinit(); + + try sse_client.connect(); + std.debug.print("✅ Connected to SSE endpoint, waiting for finalization events...\n", .{}); + + const deadline_ns = std.time.nanoTimestamp() + (@as(i64, @intCast(timeout_seconds)) * std.time.ns_per_s); + var event_count: usize = 0; + var null_count: usize = 0; + var last_progress_time = std.time.nanoTimestamp(); + + while (std.time.nanoTimestamp() < deadline_ns) { + const event_result = sse_client.readEvent() catch |err| { + std.debug.print("❌ Error reading SSE event: {}\n", .{err}); + return error.SSEReadError; + }; + + if (event_result == null) { + null_count += 1; + + if (null_count % 20 == 0) { + const now = std.time.nanoTimestamp(); + if (now - last_progress_time > 5 * std.time.ns_per_s) { + const elapsed = @divTrunc(now - (deadline_ns - @as(i64, @intCast(timeout_seconds)) * std.time.ns_per_s), std.time.ns_per_s); + const remaining = @divTrunc(deadline_ns - now, std.time.ns_per_s); + std.debug.print("⏱️ Still waiting for events... ({d} events received, {d}s elapsed, {d}s remaining)\n", .{ event_count, elapsed, remaining }); + last_progress_time = now; + } + } + + continue; + } + + const e = event_result.?; + event_count += 1; + std.debug.print("📨 Event #{d}: {s}\n", .{ event_count, e.event_type }); + + if (std.mem.eql(u8, e.event_type, "new_finalization")) { + if (e.finalized_slot) |slot| { + std.debug.print("🔍 Found finalization event with slot {d}\n", .{slot}); + if (slot > 0) { + std.debug.print("🎉 Finalization detected at slot {d}!\n", .{slot}); + e.deinit(allocator); + return FinalizationResult{ + .finalized = true, + .finalization_slot = slot, + .finalization_root = [_]u8{0} ** 32, + .timeout_reached = false, + }; + } + } + } + + e.deinit(allocator); + } + + std.debug.print("❌ Timeout reached after {d} seconds\n", .{timeout_seconds}); + std.debug.print("📊 Total events received: {d}\n", .{event_count}); + return FinalizationResult{ + .finalized = false, + .finalization_slot = 0, + .finalization_root = [_]u8{0} ** 32, + .timeout_reached = true, + }; +} + +/// Main test function using lean-quickstart +fn runTwoNodesViaLeanQuickstart(allocator: Allocator, config: TestConfig) !FinalizationResult { + std.debug.print("\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("🚀 STARTING LEAN-QUICKSTART TWO-NODE INTEGRATION TEST\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("Test config: genesis_time={d}, num_validators={d}, timeout={d}s\n", .{ config.genesis_time, config.num_validators, config.timeout_seconds }); + std.debug.print("\n", .{}); + + // Hardcoded metrics ports from validator-config.yaml + const node_0_port: u16 = 9669; + const node_1_port: u16 = 9670; + + std.debug.print("▶️ STEP 1: Generating genesis via lean-quickstart\n", .{}); + try runGenesisGenerator(allocator, config.network_dir); + + std.debug.print("\n▶️ STEP 2: Spawning lean-quickstart spin-node.sh (--node zeam_0,zeam_1)\n", .{}); + const cluster_process = try spawnLeanQuickstartCluster(allocator, config.network_dir); + defer { + std.debug.print("\n🧹 Cleaning up lean-quickstart process...\n", .{}); + cluster_process.deinit(); + allocator.destroy(cluster_process); + } + + std.debug.print("\n✅ spin-node.sh is running, waiting for nodes to come online\n", .{}); + + std.debug.print("\n▶️ STEP 3: Waiting for nodes to start (60s timeout each)...\n", .{}); + try waitForNodeStartup(node_0_port, 60); + try waitForNodeStartup(node_1_port, 60); + + std.debug.print("\n✅ Both nodes are ready!\n", .{}); + + std.debug.print("\n▶️ STEP 4: Monitoring for finalization via SSE (timeout: {d}s)...\n", .{config.timeout_seconds}); + const result = try monitorForFinalization(allocator, node_0_port, config.timeout_seconds); + + std.debug.print("\n", .{}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + std.debug.print("🏁 TEST COMPLETE - Finalized: {}\n", .{result.finalized}); + std.debug.print("═══════════════════════════════════════════════════════════\n", .{}); + + return result; +} + +fn cleanupNetworkDirectory(allocator: Allocator, network_dir: []const u8) !void { + _ = allocator; + const cwd = std.fs.cwd(); + cwd.deleteTree(network_dir) catch { + // Ignore cleanup errors (directory may not exist) + }; +} + +test "lean_quickstart_two_node_finalization_integration" { + if (builtin.os.tag == .macos) { + std.debug.print("⏭️ Skipping lean-quickstart integration test on macOS (Docker host networking unsupported)\n", .{}); + return error.SkipZigTest; + } + + var arena_allocator = std.heap.ArenaAllocator.init(std.testing.allocator); + defer arena_allocator.deinit(); + const allocator = arena_allocator.allocator(); + + // Set genesis time in the future to allow nodes to sync + const genesis_time = @as(u64, @intCast(std.time.timestamp())) + 30; + + const config = TestConfig{ + .genesis_time = genesis_time, + .num_validators = 2, + .network_dir = "test_lean_quickstart_network", + .timeout_seconds = 600, + }; + + std.debug.print("🚀 Starting Lean-Quickstart Integration Test\n", .{}); + std.debug.print("📁 Network directory: {s}\n", .{config.network_dir}); + std.debug.print("⏰ Genesis time: {d} (in ~30 seconds)\n", .{config.genesis_time}); + std.debug.print("⏰ Current time: {d}\n", .{std.time.timestamp()}); + std.debug.print("👥 Number of validators: {d}\n", .{config.num_validators}); + std.debug.print("⏱️ Timeout: {d} seconds\n", .{config.timeout_seconds}); + + // Generate lean-quickstart configuration + try generateLeanQuickstartConfig(allocator, config); + + // Run the test + const result = try runTwoNodesViaLeanQuickstart(allocator, config); + + // Cleanup + try cleanupNetworkDirectory(allocator, config.network_dir); + + // Verify results + if (result.timeout_reached) { + std.debug.print("❌ Test failed: Timeout reached after {d} seconds\n", .{config.timeout_seconds}); + return error.TestTimeout; + } + + if (!result.finalized) { + std.debug.print("❌ Test failed: No finalization detected\n", .{}); + return error.NoFinalization; + } + + std.debug.print("✅ Test passed: Finalization detected at slot {d}\n", .{result.finalization_slot}); + std.debug.print("🎉 Lean-Quickstart Integration Test completed successfully!\n", .{}); +} diff --git a/pkgs/configs/src/lib.zig b/pkgs/configs/src/lib.zig index 398a4f8e2..3d6757704 100644 --- a/pkgs/configs/src/lib.zig +++ b/pkgs/configs/src/lib.zig @@ -50,42 +50,55 @@ const ChainConfigError = error{ InvalidChainSpec, }; -pub fn genesisConfigFromYAML(config: Yaml, override_genesis_time: ?u64) !types.GenesisSpec { - _ = config; - _ = override_genesis_time; - // TODO: Implement YAML parsing for validator pubkeys - // This function needs to: - // 1. Parse VALIDATOR_COUNT or VALIDATOR_PUBKEYS from config.yaml - // 2. If VALIDATOR_COUNT: create keymanager and extract pubkeys - // 3. If VALIDATOR_PUBKEYS: read them directly from YAML - // 4. Return GenesisSpec with genesis_time and validator_pubkeys populated - // Until this is implemented, use beam command for testing or provide genesis_spec programmatically - return error.NotImplemented; +/// Intermediate structure for genesis config before keys are generated +pub const GenesisConfig = struct { + genesis_time: u64, + validator_count: usize, +}; + +/// Parse genesis configuration from YAML (without generating keys) +/// Returns genesis time and validator count. The caller is responsible for +/// generating validator keys and creating the full GenesisSpec. +pub fn genesisConfigFromYAML(config: Yaml, override_genesis_time: ?u64) !GenesisConfig { + // Parse GENESIS_TIME from YAML + const genesis_time_value = config.docs.items[0].map.get("GENESIS_TIME") orelse return error.MissingGenesisTime; + if (genesis_time_value != .int) return error.InvalidGenesisTime; + const genesis_time = if (override_genesis_time) |override| override else @as(u64, @intCast(genesis_time_value.int)); + + // Parse VALIDATOR_COUNT from YAML + const validator_count_value = config.docs.items[0].map.get("VALIDATOR_COUNT") orelse return error.MissingValidatorCount; + if (validator_count_value != .int) return error.InvalidValidatorCount; + const validator_count: usize = @intCast(validator_count_value.int); + + if (validator_count == 0) return error.InvalidValidatorCount; + + return GenesisConfig{ + .genesis_time = genesis_time, + .validator_count = validator_count, + }; } -// TODO: Enable and update the this test once the YAML parsing for public keys PR is added -// test "load genesis config from yaml" { -// const yaml_content = -// \\# Genesis Settings -// \\GENESIS_TIME: 1704085200 -// \\ -// \\# Validator Settings -// \\VALIDATOR_COUNT: 9 -// ; -// -// var yaml: Yaml = .{ .source = yaml_content }; -// defer yaml.deinit(std.testing.allocator); -// try yaml.load(std.testing.allocator); -// -// const genesis_config = try genesisConfigFromYAML(yaml, null); -// -// try std.testing.expect(genesis_config.genesis_time == 1704085200); -// try std.testing.expect(genesis_config.num_validators() == 9); -// -// const genesis_config_override = try genesisConfigFromYAML(yaml, 1234); -// try std.testing.expect(genesis_config_override.genesis_time == 1234); -// try std.testing.expect(genesis_config_override.num_validators() == 9); -// } +test "load genesis config from yaml" { + const yaml_content = + \\# Genesis Settings + \\GENESIS_TIME: 1704085200 + \\ + \\# Validator Settings + \\VALIDATOR_COUNT: 9 + ; + + var yaml: Yaml = .{ .source = yaml_content }; + defer yaml.deinit(std.testing.allocator); + try yaml.load(std.testing.allocator); + + const genesis_config = try genesisConfigFromYAML(yaml, null); + try std.testing.expectEqual(@as(u64, 1704085200), genesis_config.genesis_time); + try std.testing.expectEqual(@as(usize, 9), genesis_config.validator_count); + + const genesis_config_override = try genesisConfigFromYAML(yaml, 1234); + try std.testing.expectEqual(@as(u64, 1234), genesis_config_override.genesis_time); + try std.testing.expectEqual(@as(usize, 9), genesis_config_override.validator_count); +} // TODO: Enable and update this test once the keymanager file-reading PR is added (followup PR) // JSON parsing for genesis config needs to support validator_pubkeys instead of num_validators