diff --git a/Cargo.lock b/Cargo.lock index 83f908bb..6e2d095c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1728,6 +1728,7 @@ dependencies = [ "cb-signer", "eyre", "reqwest 0.12.23", + "serde", "serde_json", "tempfile", "tokio", diff --git a/config.example.toml b/config.example.toml index f15b2262..edcddbf8 100644 --- a/config.example.toml +++ b/config.example.toml @@ -55,9 +55,13 @@ extra_validation_enabled = false # Execution Layer RPC url to use for extra validation # OPTIONAL # rpc_url = "https://ethereum-holesky-rpc.publicnode.com" -# URL of the SSV API server to use, if you have a mux that targets an SSV node operator -# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4" -# ssv_api_url = "https://api.ssv.network/api/v4" +# URL of your local SSV node API endpoint, if you have a mux that targets an SSV node operator +# OPTIONAL, DEFAULT: "http://localhost:16000/v1/" +# ssv_node_api_url = "http://localhost:16000/v1/" +# URL of the public SSV API server, if you have a mux that targets an SSV node operator. This is used as +# a fallback if the user's own SSV node is not reachable. +# OPTIONAL, DEFAULT: "https://api.ssv.network/api/v4/" +# ssv_public_api_url = "https://api.ssv.network/api/v4/" # Timeout for any HTTP requests sent from the PBS module to other services, in seconds # OPTIONAL, DEFAULT: 10 http_timeout_seconds = 10 diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 27950d1c..d67f8487 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -62,7 +62,8 @@ impl PbsMuxes { .load( &mux.id, chain, - default_pbs.ssv_api_url.clone(), + default_pbs.ssv_node_api_url.clone(), + default_pbs.ssv_public_api_url.clone(), default_pbs.rpc_url.clone(), http_timeout, ) @@ -212,7 +213,8 @@ impl MuxKeysLoader { &self, mux_id: &str, chain: Chain, - ssv_api_url: Url, + ssv_node_api_url: Url, + ssv_public_api_url: Url, rpc_url: Option, http_timeout: Duration, ) -> eyre::Result> { @@ -258,7 +260,8 @@ impl MuxKeysLoader { } NORegistry::SSV => { fetch_ssv_pubkeys( - ssv_api_url, + ssv_node_api_url, + ssv_public_api_url, chain, U256::from(*node_operator_id), http_timeout, @@ -391,11 +394,62 @@ async fn fetch_lido_registry_keys( } async fn fetch_ssv_pubkeys( - mut api_url: Url, + node_url: Url, + public_url: Url, chain: Chain, node_operator_id: U256, http_timeout: Duration, ) -> eyre::Result> { + // Try the node API first + match fetch_ssv_pubkeys_from_ssv_node(node_url.clone(), node_operator_id, http_timeout).await { + Ok(pubkeys) => Ok(pubkeys), + Err(e) => { + // Fall back to public API + warn!( + "failed to fetch pubkeys from SSV node API at {node_url}: {e}; falling back to public API", + ); + fetch_ssv_pubkeys_from_public_api(public_url, chain, node_operator_id, http_timeout) + .await + } + } +} + +/// Ensures that the SSV API URL has a trailing slash +fn ensure_ssv_api_url(url: &mut Url) -> eyre::Result<()> { + // Validate the URL - this appends a trailing slash if missing as efficiently as + // possible + if !url.path().ends_with('/') { + match url.path_segments_mut() { + Ok(mut segments) => segments.push(""), // Analogous to a trailing slash + Err(_) => bail!("SSV API URL is not a valid base URL"), + }; + } + Ok(()) +} + +/// Fetches SSV pubkeys from the user's SSV node +async fn fetch_ssv_pubkeys_from_ssv_node( + mut url: Url, + node_operator_id: U256, + http_timeout: Duration, +) -> eyre::Result> { + ensure_ssv_api_url(&mut url)?; + let route = "validators"; + let url = url.join(route).wrap_err("failed to construct SSV API URL")?; + + let response = request_ssv_pubkeys_from_ssv_node(url, node_operator_id, http_timeout).await?; + let pubkeys = response.data.into_iter().map(|v| v.public_key).collect::>(); + Ok(pubkeys) +} + +/// Fetches SSV pubkeys from the public SSV network API with pagination +async fn fetch_ssv_pubkeys_from_public_api( + mut url: Url, + chain: Chain, + node_operator_id: U256, + http_timeout: Duration, +) -> eyre::Result> { + ensure_ssv_api_url(&mut url)?; const MAX_PER_PAGE: usize = 100; let chain_name = match chain { @@ -408,22 +462,13 @@ async fn fetch_ssv_pubkeys( let mut pubkeys: Vec = vec![]; let mut page = 1; - // Validate the URL - this appends a trailing slash if missing as efficiently as - // possible - if !api_url.path().ends_with('/') { - match api_url.path_segments_mut() { - Ok(mut segments) => segments.push(""), // Analogous to a trailing slash - Err(_) => bail!("SSV API URL is not a valid base URL"), - }; - } - loop { let route = format!( "{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}", ); - let url = api_url.join(&route).wrap_err("failed to construct SSV API URL")?; + let url = url.join(&route).wrap_err("failed to construct SSV API URL")?; - let response = fetch_ssv_pubkeys_from_url(url, http_timeout).await?; + let response = request_ssv_pubkeys_from_public_api(url, http_timeout).await?; let fetched = response.validators.len(); pubkeys.extend( response.validators.into_iter().map(|v| v.pubkey).collect::>(), diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 7bcf91e3..286b2e4a 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -123,9 +123,12 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, - /// URL for the SSV network API - #[serde(default = "default_ssv_api_url")] - pub ssv_api_url: Url, + /// URL for the user's own SSV node API endpoint + #[serde(default = "default_ssv_node_api_url")] + pub ssv_node_api_url: Url, + /// URL for the public SSV network API server + #[serde(default = "default_public_ssv_api_url")] + pub ssv_public_api_url: Url, /// Timeout for HTTP requests in seconds #[serde(default = "default_u64::")] pub http_timeout_seconds: u64, @@ -402,7 +405,12 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC )) } -/// Default URL for the SSV network API -fn default_ssv_api_url() -> Url { +/// Default URL for the user's SSV node API endpoint (/v1/validators). +fn default_ssv_node_api_url() -> Url { + Url::parse("http://localhost:16000/v1/").expect("default URL is valid") +} + +/// Default URL for the public SSV network API. +fn default_public_ssv_api_url() -> Url { Url::parse("https://api.ssv.network/api/v4/").expect("default URL is valid") } diff --git a/crates/common/src/interop/ssv/types.rs b/crates/common/src/interop/ssv/types.rs index b8ac2e23..0a133393 100644 --- a/crates/common/src/interop/ssv/types.rs +++ b/crates/common/src/interop/ssv/types.rs @@ -2,11 +2,60 @@ use serde::{Deserialize, Deserializer, Serialize}; use crate::types::BlsPublicKey; -/// Response from the SSV API for validators +/// Response from the SSV API for validators (the new way, relies on using SSV +/// node API) #[derive(Deserialize, Serialize)] -pub struct SSVResponse { +pub struct SSVNodeResponse { /// List of validators returned by the SSV API - pub validators: Vec, + pub data: Vec, +} + +/// Representation of a validator in the SSV API +#[derive(Clone)] +pub struct SSVNodeValidator { + /// The public key of the validator + pub public_key: BlsPublicKey, +} + +impl<'de> Deserialize<'de> for SSVNodeValidator { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator::deserialize(deserializer)?; + let bytes = alloy::hex::decode(&s.public_key).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::deserialize(&bytes) + .map_err(|e| serde::de::Error::custom(format!("invalid BLS public key: {e:?}")))?; + + Ok(Self { public_key: pubkey }) + } +} + +impl Serialize for SSVNodeValidator { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + struct SSVValidator { + public_key: String, + } + + let s = SSVValidator { public_key: self.public_key.as_hex_string() }; + s.serialize(serializer) + } +} + +/// Response from the SSV API for validators from the public api.ssv.network URL +#[derive(Deserialize, Serialize)] +pub struct SSVPublicResponse { + /// List of validators returned by the SSV API + pub validators: Vec, /// Pagination information pub pagination: SSVPagination, @@ -14,12 +63,12 @@ pub struct SSVResponse { /// Representation of a validator in the SSV API #[derive(Clone)] -pub struct SSVValidator { +pub struct SSVPublicValidator { /// The public key of the validator pub pubkey: BlsPublicKey, } -impl<'de> Deserialize<'de> for SSVValidator { +impl<'de> Deserialize<'de> for SSVPublicValidator { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, @@ -38,7 +87,7 @@ impl<'de> Deserialize<'de> for SSVValidator { } } -impl Serialize for SSVValidator { +impl Serialize for SSVPublicValidator { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, diff --git a/crates/common/src/interop/ssv/utils.rs b/crates/common/src/interop/ssv/utils.rs index e443e018..4a262d69 100644 --- a/crates/common/src/interop/ssv/utils.rs +++ b/crates/common/src/interop/ssv/utils.rs @@ -1,24 +1,52 @@ use std::time::Duration; +use alloy::primitives::U256; use eyre::Context; +use serde_json::json; use url::Url; -use crate::{config::safe_read_http_response, interop::ssv::types::SSVResponse}; +use crate::{ + config::safe_read_http_response, + interop::ssv::types::{SSVNodeResponse, SSVPublicResponse}, +}; -pub async fn fetch_ssv_pubkeys_from_url( +pub async fn request_ssv_pubkeys_from_ssv_node( url: Url, + node_operator_id: U256, http_timeout: Duration, -) -> eyre::Result { +) -> eyre::Result { + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; + let body = json!({ + "operators": [node_operator_id] + }); + let response = client.get(url).json(&body).send().await.map_err(|e| { + if e.is_timeout() { + eyre::eyre!("Request to SSV node timed out: {e}") + } else { + eyre::eyre!("Error sending request to SSV node: {e}") + } + })?; + + // Parse the response as JSON + let body_bytes = safe_read_http_response(response).await?; + serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") +} + +pub async fn request_ssv_pubkeys_from_public_api( + url: Url, + http_timeout: Duration, +) -> eyre::Result { let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; let response = client.get(url).send().await.map_err(|e| { if e.is_timeout() { - eyre::eyre!("Request to SSV network API timed out: {e}") + eyre::eyre!("Request to SSV public API timed out: {e}") } else { - eyre::eyre!("Error sending request to SSV network API: {e}") + eyre::eyre!("Error sending request to SSV public API: {e}") } })?; // Parse the response as JSON let body_bytes = safe_read_http_response(response).await?; - serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") + serde_json::from_slice::(&body_bytes) + .wrap_err("failed to parse SSV response") } diff --git a/crates/pbs/src/service.rs b/crates/pbs/src/service.rs index 6659ae85..3290a476 100644 --- a/crates/pbs/src/service.rs +++ b/crates/pbs/src/service.rs @@ -117,7 +117,8 @@ impl PbsService { .load( &runtime_config.id, config.chain, - default_pbs.ssv_api_url.clone(), + default_pbs.ssv_node_api_url.clone(), + default_pbs.ssv_public_api_url.clone(), default_pbs.rpc_url.clone(), http_timeout, ) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 5e8e1596..d7fb1615 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -13,6 +13,7 @@ cb-signer.workspace = true eyre.workspace = true lh_types.workspace = true reqwest.workspace = true +serde.workspace = true serde_json.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/tests/data/ssv_valid_node.json b/tests/data/ssv_valid_node.json new file mode 100644 index 00000000..59ad205f --- /dev/null +++ b/tests/data/ssv_valid_node.json @@ -0,0 +1,22 @@ +{ + "data": [ + { + "public_key": "aa370f6250d421d00437b9900407a7ad93b041aeb7259d99b55ab8b163277746680e93e841f87350737bceee46aa104d", + "index": "1311498", + "status": "active_ongoing", + "activation_epoch": "273156", + "exit_epoch": "18446744073709551615", + "owner": "5e33db0b37622f7e6b2f0654aa7b985d854ea9cb", + "committee": [ + 1, + 2, + 3, + 4 + ], + "quorum": 0, + "partial_quorum": 0, + "graffiti": "", + "liquidated": false + } + ] +} \ No newline at end of file diff --git a/tests/data/ssv_valid.json b/tests/data/ssv_valid_public.json similarity index 100% rename from tests/data/ssv_valid.json rename to tests/data/ssv_valid_public.json diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 42e36a8e..d332711f 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,4 +1,5 @@ pub mod mock_relay; -pub mod mock_ssv; +pub mod mock_ssv_node; +pub mod mock_ssv_public; pub mod mock_validator; pub mod utils; diff --git a/tests/src/mock_ssv_node.rs b/tests/src/mock_ssv_node.rs new file mode 100644 index 00000000..7f24569d --- /dev/null +++ b/tests/src/mock_ssv_node.rs @@ -0,0 +1,118 @@ +use std::{net::SocketAddr, sync::Arc}; + +use alloy::primitives::U256; +use axum::{ + extract::{Json, State}, + response::Response, + routing::get, +}; +use cb_common::{ + config::MUXER_HTTP_MAX_LENGTH, + interop::ssv::types::{SSVNodeResponse, SSVNodeValidator}, +}; +use serde::Deserialize; +use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; +use tracing::info; + +pub const TEST_HTTP_TIMEOUT: u64 = 2; + +/// State for the mock server +#[derive(Clone)] +pub struct SsvNodeMockState { + /// List of pubkeys for the mock server to return + pub validators: Arc>>, + + /// Whether to force a timeout response to simulate a server error + pub force_timeout: Arc>, +} + +#[derive(Deserialize)] +struct SsvNodeValidatorsRequestBody { + pub operators: Vec, +} + +/// Creates a simple mock server to simulate the SSV API endpoint under +/// various conditions for testing. Note this ignores +pub async fn create_mock_ssv_node_server( + port: u16, + state: Option, +) -> Result, axum::Error> { + let data = include_str!("../../tests/data/ssv_valid_node.json"); + let response = + serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(SsvNodeMockState { + validators: Arc::new(RwLock::new(response.data)), + force_timeout: Arc::new(RwLock::new(false)), + }); + let router = axum::Router::new() + .route("/v1/validators", get(handle_validators)) + .route("/big_data", get(handle_big_data)) + .with_state(state) + .into_make_service(); + + let address = SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; + let server = axum::serve(listener, router).with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); + }); + let result = Ok(tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {e}"); + } + })); + info!("Mock server started on http://localhost:{port}/"); + result +} + +/// Returns a valid SSV validators response, or a timeout if requested in +/// the server state +async fn handle_validators( + State(state): State, + Json(body): Json, +) -> Response { + // Time out if requested + if *state.force_timeout.read().await { + return handle_timeout().await; + } + + // Make sure the request deserialized properly + let _operators = body.operators; + + // Generate the response based on the current validators + let response: SSVNodeResponse; + { + let validators = state.validators.read().await; + response = SSVNodeResponse { data: validators.clone() }; + } + + // Create a valid response + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&response).unwrap().into()) + .unwrap() +} + +/// Sends a response with a large body - larger than the maximum allowed. +/// Note that hyper overwrites the content-length header automatically, so +/// setting it here wouldn't actually change the value that ultimately +/// gets sent to the server. +async fn handle_big_data() -> Response { + let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body(body.into()) + .unwrap() +} + +/// Simulates a timeout by sleeping for a long time +async fn handle_timeout() -> Response { + // Sleep for a long time to simulate a timeout + tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body("Timeout response".into()) + .unwrap() +} diff --git a/tests/src/mock_ssv.rs b/tests/src/mock_ssv_public.rs similarity index 83% rename from tests/src/mock_ssv.rs rename to tests/src/mock_ssv_public.rs index 7ed8eb23..a014db42 100644 --- a/tests/src/mock_ssv.rs +++ b/tests/src/mock_ssv_public.rs @@ -7,7 +7,7 @@ use axum::{ }; use cb_common::{ config::MUXER_HTTP_MAX_LENGTH, - interop::ssv::types::{SSVPagination, SSVResponse, SSVValidator}, + interop::ssv::types::{SSVPagination, SSVPublicResponse, SSVPublicValidator}, }; use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; use tracing::info; @@ -16,9 +16,9 @@ pub const TEST_HTTP_TIMEOUT: u64 = 2; /// State for the mock server #[derive(Clone)] -pub struct SsvMockState { +pub struct PublicSsvMockState { /// List of pubkeys for the mock server to return - pub validators: Arc>>, + pub validators: Arc>>, /// Whether to force a timeout response to simulate a server error pub force_timeout: Arc>, @@ -26,13 +26,14 @@ pub struct SsvMockState { /// Creates a simple mock server to simulate the SSV API endpoint under /// various conditions for testing. Note this ignores -pub async fn create_mock_ssv_server( +pub async fn create_mock_public_ssv_server( port: u16, - state: Option, + state: Option, ) -> Result, axum::Error> { - let data = include_str!("../../tests/data/ssv_valid.json"); - let response = serde_json::from_str::(data).expect("failed to parse test data"); - let state = state.unwrap_or(SsvMockState { + let data = include_str!("../../tests/data/ssv_valid_public.json"); + let response = + serde_json::from_str::(data).expect("failed to parse test data"); + let state = state.unwrap_or(PublicSsvMockState { validators: Arc::new(RwLock::new(response.validators)), force_timeout: Arc::new(RwLock::new(false)), }); @@ -62,7 +63,7 @@ pub async fn create_mock_ssv_server( /// Returns a valid SSV validators response, or a timeout if requested in /// the server state async fn handle_validators( - State(state): State, + State(state): State, Path((_, _)): Path<(String, u64)>, ) -> Response { // Time out if requested @@ -71,11 +72,11 @@ async fn handle_validators( } // Generate the response based on the current validators - let response: SSVResponse; + let response: SSVPublicResponse; { let validators = state.validators.read().await; let pagination = SSVPagination { total: validators.len() }; - response = SSVResponse { validators: validators.clone(), pagination }; + response = SSVPublicResponse { validators: validators.clone(), pagination }; } // Create a valid response diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 58ef42cf..253007c7 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -79,7 +79,8 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, - ssv_api_url: Url::parse("https://example.net").unwrap(), + ssv_node_api_url: Url::parse("http://localhost:0").unwrap(), + ssv_public_api_url: Url::parse("http://localhost:0").unwrap(), rpc_url: None, http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 3a15b49b..7609bdc3 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -1,8 +1,15 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use alloy::primitives::U256; use cb_common::{ - config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, RuntimeMuxConfig}, - interop::ssv::utils::fetch_ssv_pubkeys_from_url, + config::{ + HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, MuxConfig, MuxKeysLoader, PbsMuxes, + RuntimeMuxConfig, + }, + interop::ssv::{ + types::{SSVNodeValidator, SSVPublicValidator}, + utils::{request_ssv_pubkeys_from_public_api, request_ssv_pubkeys_from_ssv_node}, + }, signer::random_secret, types::Chain, utils::{ResponseReadError, set_ignore_content_length}, @@ -10,7 +17,8 @@ use cb_common::{ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, - mock_ssv::{SsvMockState, TEST_HTTP_TIMEOUT, create_mock_ssv_server}, + mock_ssv_node::{SsvNodeMockState, create_mock_ssv_node_server}, + mock_ssv_public::{PublicSsvMockState, TEST_HTTP_TIMEOUT, create_mock_public_ssv_server}, mock_validator::MockValidator, utils::{ bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_static_config, setup_test_env, @@ -25,18 +33,20 @@ use url::Url; #[tokio::test] /// Tests that a successful SSV network fetch is handled and parsed properly -async fn test_ssv_network_fetch() -> Result<()> { +/// from the public API +async fn test_ssv_public_network_fetch() -> Result<()> { // Start the mock server let port = 30100; - let _server_handle = create_mock_ssv_server(port, None).await?; + let server_handle = create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1")) .unwrap(); let response = - fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?; + request_ssv_pubkeys_from_public_api(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) + .await?; // Make sure the response is correct - // NOTE: requires that ssv_data.json dpesn't change + // NOTE: requires that ssv_valid_public.json doesn't change assert_eq!(response.validators.len(), 3); let expected_pubkeys = [ bls_pubkey_from_hex_unchecked( @@ -54,7 +64,7 @@ async fn test_ssv_network_fetch() -> Result<()> { } // Clean up the server handle - _server_handle.abort(); + server_handle.abort(); Ok(()) } @@ -65,9 +75,10 @@ async fn test_ssv_network_fetch() -> Result<()> { async fn test_ssv_network_fetch_big_data() -> Result<()> { // Start the mock server let port = 30101; - let server_handle = cb_tests::mock_ssv::create_mock_ssv_server(port, None).await?; + let server_handle = + cb_tests::mock_ssv_public::create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); - let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + let response = request_ssv_pubkeys_from_public_api(url, Duration::from_secs(120)).await; // The response should fail due to content length being too big match response { @@ -96,15 +107,16 @@ async fn test_ssv_network_fetch_big_data() -> Result<()> { async fn test_ssv_network_fetch_timeout() -> Result<()> { // Start the mock server let port = 30102; - let state = SsvMockState { + let state = PublicSsvMockState { validators: Arc::new(RwLock::new(vec![])), force_timeout: Arc::new(RwLock::new(true)), }; - let server_handle = create_mock_ssv_server(port, Some(state)).await?; + let server_handle = create_mock_public_ssv_server(port, Some(state)).await?; let url = Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1")) .unwrap(); - let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + let response = + request_ssv_pubkeys_from_public_api(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; // The response should fail due to timeout assert!(response.is_err(), "Expected timeout error, but got success"); @@ -125,9 +137,9 @@ async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> // Start the mock server let port = 30103; set_ignore_content_length(true); - let server_handle = create_mock_ssv_server(port, None).await?; + let server_handle = create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); - let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(120)).await; + let response = request_ssv_pubkeys_from_public_api(url, Duration::from_secs(120)).await; // The response should fail due to the body being too big match response { @@ -150,6 +162,37 @@ async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> Ok(()) } +#[tokio::test] +/// Tests that a successful SSV network fetch is handled and parsed properly +/// from the node API +async fn test_ssv_node_network_fetch() -> Result<()> { + // Start the mock server + let port = 30104; + let _server_handle = create_mock_ssv_node_server(port, None).await?; + let url = Url::parse(&format!("http://localhost:{port}/v1/validators")).unwrap(); + let response = request_ssv_pubkeys_from_ssv_node( + url, + U256::from(1), + Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT), + ) + .await?; + + // Make sure the response is correct + // NOTE: requires that ssv_valid_node.json doesn't change + assert_eq!(response.data.len(), 1); + let expected_pubkeys = [bls_pubkey_from_hex_unchecked( + "aa370f6250d421d00437b9900407a7ad93b041aeb7259d99b55ab8b163277746680e93e841f87350737bceee46aa104d", + )]; + for (i, validator) in response.data.iter().enumerate() { + assert_eq!(validator.public_key, expected_pubkeys[i]); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) +} + #[tokio::test] async fn test_mux() -> Result<()> { setup_test_env(); @@ -228,3 +271,196 @@ async fn test_mux() -> Result<()> { Ok(()) } + +/// Tests the SSV mux with dynamic registry fetching from an SSV node +#[tokio::test] +async fn test_ssv_multi_with_node() -> Result<()> { + // Generate keys + let signer = random_secret(); + let pubkey = signer.public_key(); + let signer2 = random_secret(); + let pubkey2 = signer2.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3711; + + // Start the mock SSV node + let ssv_node_port = pbs_port + 1; + let ssv_node_url = Url::parse(&format!("http://localhost:{ssv_node_port}/v1/"))?; + let mock_ssv_node_state = SsvNodeMockState { + validators: Arc::new(RwLock::new(vec![ + SSVNodeValidator { public_key: pubkey.clone() }, + SSVNodeValidator { public_key: pubkey2.clone() }, + ])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_node_handle = + create_mock_ssv_node_server(ssv_node_port, Some(mock_ssv_node_state.clone())).await?; + + // Start the mock SSV public API + let ssv_public_port = ssv_node_port + 1; + let ssv_public_url = Url::parse(&format!("http://localhost:{ssv_public_port}/api/v4/"))?; + let mock_ssv_public_state = PublicSsvMockState { + validators: Arc::new(RwLock::new(vec![SSVPublicValidator { pubkey: pubkey.clone() }])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_public_handle = + create_mock_public_ssv_server(ssv_public_port, Some(mock_ssv_public_state.clone())).await?; + + // Start a mock relay to be used by the mux + let relay_port = ssv_public_port + 1; + let relay = generate_mock_relay(relay_port, pubkey.clone())?; + let relay_id = relay.id.clone().to_string(); + let relay_state = Arc::new(MockRelayState::new(chain, signer)); + let relay_task = tokio::spawn(start_mock_relay_service(relay_state.clone(), relay_port)); + + // Create the registry mux + let loader = MuxKeysLoader::Registry { + enable_refreshing: true, + node_operator_id: 1, + lido_module_id: None, + registry: cb_common::config::NORegistry::SSV, + }; + let muxes = PbsMuxes { + muxes: vec![MuxConfig { + id: relay_id.clone(), + loader: Some(loader), + late_in_slot_time_ms: Some(u64::MAX), + relays: vec![(*relay.config).clone()], + timeout_get_header_ms: Some(u64::MAX - 1), + validator_pubkeys: vec![], + }], + }; + + // Set up the PBS config + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.ssv_node_api_url = ssv_node_url.clone(); + pbs_config.ssv_public_api_url = ssv_public_url.clone(); + pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second + let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; + let relays = vec![relay.clone()]; // Default relay only + let mut config = to_pbs_config(chain, pbs_config, relays); + config.all_relays.push(relay.clone()); // Add the mux relay to just this field + config.mux_lookup = Some(mux_lookup); + config.registry_muxes = Some(registry_muxes); + + // Run PBS service + let state = PbsState::new(config); + let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + info!("Started PBS server with pubkey {pubkey}"); + + // Wait for the server to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to run a get_header on the new pubkey, which should use the default + // relay only since it hasn't been seen in the mux yet + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV node + + // Shut down the server handles + pbs_server.abort(); + ssv_node_handle.abort(); + ssv_public_handle.abort(); + relay_task.abort(); + + Ok(()) +} + +/// Tests the SSV mux with dynamic registry fetching from the public SSV API +/// when the local node is down +#[tokio::test] +async fn test_ssv_multi_with_public() -> Result<()> { + // Generate keys + let signer = random_secret(); + let pubkey = signer.public_key(); + let signer2 = random_secret(); + let pubkey2 = signer2.public_key(); + + let chain = Chain::Hoodi; + let pbs_port = 3720; + + // Start the mock SSV node + let ssv_node_port = pbs_port + 1; + let ssv_node_url = Url::parse(&format!("http://localhost:{ssv_node_port}/v1/"))?; + + // Don't start the SSV node server to simulate it being down + // let ssv_node_handle = create_mock_ssv_node_server(ssv_node_port, + // Some(mock_ssv_node_state.clone())).await?; + + // Start the mock SSV public API + let ssv_public_port = ssv_node_port + 1; + let ssv_public_url = Url::parse(&format!("http://localhost:{ssv_public_port}/api/v4/"))?; + let mock_ssv_public_state = PublicSsvMockState { + validators: Arc::new(RwLock::new(vec![ + SSVPublicValidator { pubkey: pubkey.clone() }, + SSVPublicValidator { pubkey: pubkey2.clone() }, + ])), + force_timeout: Arc::new(RwLock::new(false)), + }; + let ssv_public_handle = + create_mock_public_ssv_server(ssv_public_port, Some(mock_ssv_public_state.clone())).await?; + + // Start a mock relay to be used by the mux + let relay_port = ssv_public_port + 1; + let relay = generate_mock_relay(relay_port, pubkey.clone())?; + let relay_id = relay.id.clone().to_string(); + let relay_state = Arc::new(MockRelayState::new(chain, signer)); + let relay_task = tokio::spawn(start_mock_relay_service(relay_state.clone(), relay_port)); + + // Create the registry mux + let loader = MuxKeysLoader::Registry { + enable_refreshing: true, + node_operator_id: 1, + lido_module_id: None, + registry: cb_common::config::NORegistry::SSV, + }; + let muxes = PbsMuxes { + muxes: vec![MuxConfig { + id: relay_id.clone(), + loader: Some(loader), + late_in_slot_time_ms: Some(u64::MAX), + relays: vec![(*relay.config).clone()], + timeout_get_header_ms: Some(u64::MAX - 1), + validator_pubkeys: vec![], + }], + }; + + // Set up the PBS config + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.ssv_node_api_url = ssv_node_url.clone(); + pbs_config.ssv_public_api_url = ssv_public_url.clone(); + pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second + let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; + let relays = vec![relay.clone()]; // Default relay only + let mut config = to_pbs_config(chain, pbs_config, relays); + config.all_relays.push(relay.clone()); // Add the mux relay to just this field + config.mux_lookup = Some(mux_lookup); + config.registry_muxes = Some(registry_muxes); + + // Run PBS service + let state = PbsState::new(config); + let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + info!("Started PBS server with pubkey {pubkey}"); + + // Wait for the server to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Try to run a get_header on the new pubkey, which should use the default + // relay only since it hasn't been seen in the mux yet + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending get header"); + let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV public API + + // Shut down the server handles + pbs_server.abort(); + //ssv_node_handle.abort(); + ssv_public_handle.abort(); + relay_task.abort(); + + Ok(()) +} diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index da582ec7..44837b60 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -2,14 +2,14 @@ use std::{sync::Arc, time::Duration}; use cb_common::{ config::{MuxConfig, MuxKeysLoader, PbsMuxes}, - interop::ssv::types::SSVValidator, + interop::ssv::types::SSVPublicValidator, signer::random_secret, types::Chain, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{MockRelayState, start_mock_relay_service}, - mock_ssv::{SsvMockState, create_mock_ssv_server}, + mock_ssv_public::{PublicSsvMockState, create_mock_public_ssv_server}, mock_validator::MockValidator, utils::{generate_mock_relay, get_pbs_static_config, to_pbs_config}, }; @@ -44,14 +44,14 @@ async fn test_auto_refresh() -> Result<()> { let ssv_api_port = pbs_port + 1; // Intentionally missing a trailing slash to ensure this is handled properly let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}/api/v4"))?; - let mock_ssv_state = SsvMockState { - validators: Arc::new(RwLock::new(vec![SSVValidator { + let mock_ssv_state = PublicSsvMockState { + validators: Arc::new(RwLock::new(vec![SSVPublicValidator { pubkey: existing_mux_pubkey.clone(), }])), force_timeout: Arc::new(RwLock::new(false)), }; let ssv_server_handle = - create_mock_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; + create_mock_public_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; // Start a default relay for non-mux keys let default_relay_port = ssv_api_port + 1; @@ -88,7 +88,7 @@ async fn test_auto_refresh() -> Result<()> { // Set up the PBS config let mut pbs_config = get_pbs_static_config(pbs_port); - pbs_config.ssv_api_url = ssv_api_url.clone(); + pbs_config.ssv_public_api_url = ssv_api_url.clone(); pbs_config.mux_registry_refresh_interval_seconds = 1; // Refresh the mux every second let (mux_lookup, registry_muxes) = muxes.validate_and_fill(chain, &pbs_config).await?; let relays = vec![default_relay.clone()]; // Default relay only @@ -126,7 +126,7 @@ async fn test_auto_refresh() -> Result<()> { // Add another validator { let mut validators = mock_ssv_state.validators.write().await; - validators.push(SSVValidator { pubkey: new_mux_pubkey.clone() }); + validators.push(SSVPublicValidator { pubkey: new_mux_pubkey.clone() }); info!("Added new validator {new_mux_pubkey} to the SSV mock server"); }