diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 0c1595586e4..d23344fdfa7 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -396,9 +396,10 @@ pub async fn init_l1( let store = match init_store(datadir, genesis).await { Ok(store) => store, - Err(StoreError::IncompatibleDBVersion) => { + Err(err @ StoreError::IncompatibleDBVersion { .. }) + | Err(err @ StoreError::NotFoundDBVersion { .. }) => { return Err(eyre::eyre!( - "Incompatible DB version. Please run `ethrex removedb` and restart node" + "{err}. Please erase your DB by running `ethrex removedb` and restart node to resync. Note that this will take a while." )); } Err(error) => return Err(eyre::eyre!("Failed to create Store: {error}")), diff --git a/crates/storage/error.rs b/crates/storage/error.rs index f3534509bd7..318115fb1b9 100644 --- a/crates/storage/error.rs +++ b/crates/storage/error.rs @@ -40,6 +40,12 @@ pub enum StoreError { UpdateBatchNoBlocks, #[error("Pivot changed")] PivotChanged, - #[error("Incompatible DB Version")] - IncompatibleDBVersion, + #[error("Error reading from disk: {0}")] + IoError(#[from] std::io::Error), + #[error("Error serializing metadata: {0}")] + DbMetadataError(#[from] serde_json::Error), + #[error("Incompatible DB Version: not found, expected v{expected}")] + NotFoundDBVersion { expected: u64 }, + #[error("Incompatible DB Version: found v{found}, expected v{expected}")] + IncompatibleDBVersion { found: u64, expected: u64 }, } diff --git a/crates/storage/lib.rs b/crates/storage/lib.rs index 0246ded8f50..a36ad55eff4 100644 --- a/crates/storage/lib.rs +++ b/crates/storage/lib.rs @@ -17,3 +17,6 @@ pub use store::{ /// Store Schema Version, must be updated on any breaking change /// An upgrade to a newer schema version invalidates currently stored data, requiring a re-sync. pub const STORE_SCHEMA_VERSION: u64 = 1; + +/// Name of the file storing the metadata about the database +pub const STORE_METADATA_FILENAME: &str = "metadata.json"; diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 9bdfdfbcf2a..0301f329d68 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1,7 +1,7 @@ #[cfg(feature = "rocksdb")] use crate::backend::rocksdb::RocksDBBackend; use crate::{ - STORE_SCHEMA_VERSION, + STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION, api::{ StorageBackend, tables::{ @@ -39,9 +39,11 @@ use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitn use ethrex_trie::{Node, NodeRLP}; use lru::LruCache; use rustc_hash::FxBuildHasher; +use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, HashMap, hash_map::Entry}, fmt::Debug, + io::Write, path::{Path, PathBuf}, sync::{ Arc, Mutex, @@ -1341,6 +1343,12 @@ impl Store { pub fn new(path: impl AsRef, engine_type: EngineType) -> Result { // Ignore unused variable warning when compiling without DB features let db_path = path.as_ref().to_path_buf(); + + if engine_type != EngineType::InMemory { + // Check that the last used DB version matches the current version + validate_store_schema_version(&db_path)?; + } + match engine_type { #[cfg(feature = "rocksdb")] EngineType::RocksDB => { @@ -1359,8 +1367,6 @@ impl Store { db_path: PathBuf, commit_threshold: usize, ) -> Result { - check_schema_version(backend.as_ref())?; - debug!("Initializing Store with {commit_threshold} in-memory diff-layers"); let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0); let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0); @@ -2380,7 +2386,9 @@ impl Store { } pub fn create_checkpoint(&self, path: impl AsRef) -> Result<(), StoreError> { - self.backend.create_checkpoint(path.as_ref()) + self.backend.create_checkpoint(path.as_ref())?; + init_metadata_file(path.as_ref())?; + Ok(()) } pub fn get_store_directory(&self) -> Result { @@ -2800,33 +2808,6 @@ fn snap_state_key(index: SnapStateIndex) -> Vec { (index as u8).encode_to_vec() } -fn check_schema_version(backend: &dyn StorageBackend) -> Result<(), StoreError> { - // Check that the last used DB version matches the current version - let latest_store_schema_version = backend - .begin_read()? - .get(MISC_VALUES, b"store_schema_version")? - .map(|bytes| -> Result { - let array: [u8; 8] = bytes.try_into().map_err(|_| { - StoreError::Custom("Invalid store schema version bytes".to_string()) - })?; - Ok(u64::from_le_bytes(array)) - }) - .transpose()?; - if let Some(schema_version) = latest_store_schema_version - && schema_version != STORE_SCHEMA_VERSION - { - return Err(StoreError::IncompatibleDBVersion); - } - let mut tx = backend.begin_write()?; - tx.put( - MISC_VALUES, - b"store_schema_version", - &STORE_SCHEMA_VERSION.to_le_bytes(), - )?; - tx.commit()?; - Ok(()) -} - fn encode_code(code: &Code) -> Vec { let mut buf = Vec::with_capacity( 6 + code.bytecode.len() + std::mem::size_of_val(code.jump_targets.as_slice()), @@ -2852,6 +2833,65 @@ impl LatestBlockHeaderCache { } } +#[derive(Debug, Serialize, Deserialize)] +struct StoreMetadata { + schema_version: u64, +} + +impl StoreMetadata { + fn new(schema_version: u64) -> Self { + Self { schema_version } + } +} + +fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> { + let metadata_path = path.join(STORE_METADATA_FILENAME); + // If metadata file does not exist, try to create it + if !metadata_path.exists() { + // If datadir exists but is not empty, this is probably a DB for an + // old ethrex version and we should return an error + if path.exists() && !dir_is_empty(path)? { + return Err(StoreError::NotFoundDBVersion { + expected: STORE_SCHEMA_VERSION, + }); + } + init_metadata_file(path)?; + return Ok(()); + } + if !metadata_path.is_file() { + return Err(StoreError::Custom( + "store schema path exists but is not a file".to_string(), + )); + } + let file_contents = std::fs::read_to_string(metadata_path)?; + let metadata: StoreMetadata = serde_json::from_str(&file_contents)?; + + // Check schema version matches the expected one + if metadata.schema_version != STORE_SCHEMA_VERSION { + return Err(StoreError::IncompatibleDBVersion { + found: metadata.schema_version, + expected: STORE_SCHEMA_VERSION, + }); + } + Ok(()) +} + +fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> { + std::fs::create_dir_all(parent_path)?; + + let metadata_path = parent_path.join(STORE_METADATA_FILENAME); + let metadata = StoreMetadata::new(STORE_SCHEMA_VERSION); + let serialized_metadata = serde_json::to_string_pretty(&metadata)?; + let mut new_file = std::fs::File::create_new(metadata_path)?; + new_file.write_all(serialized_metadata.as_bytes())?; + Ok(()) +} + +fn dir_is_empty(path: &Path) -> Result { + let is_empty = std::fs::read_dir(path)?.next().is_none(); + Ok(is_empty) +} + #[cfg(test)] mod tests { use bytes::Bytes; diff --git a/tooling/Cargo.lock b/tooling/Cargo.lock index 9f159a4e3e8..2587dafab4a 100644 --- a/tooling/Cargo.lock +++ b/tooling/Cargo.lock @@ -3503,6 +3503,7 @@ dependencies = [ "ethrex-blockchain", "ethrex-common 7.0.0", "ethrex-crypto", + "ethrex-metrics", "ethrex-rlp 7.0.0", "ethrex-storage 7.0.0", "ethrex-storage-rollup", @@ -3772,7 +3773,6 @@ dependencies = [ "hex", "lazy_static", "rkyv", - "rocksdb", "rustc-hash 2.1.1", "serde", "serde_json", diff --git a/tooling/reorgs/src/main.rs b/tooling/reorgs/src/main.rs index 582e0380c72..43c5e599f73 100644 --- a/tooling/reorgs/src/main.rs +++ b/tooling/reorgs/src/main.rs @@ -72,8 +72,7 @@ where // Run in another task to clean up properly on panic let result = tokio::spawn(test_fn(simulator.clone())).await; - simulator.lock_owned().await.stop(); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + simulator.lock_owned().await.stop().await; match result { Ok(_) => info!(test=%test_name, elapsed=?start.elapsed(), "test completed successfully"), diff --git a/tooling/reorgs/src/simulator.rs b/tooling/reorgs/src/simulator.rs index e1db414fda0..a1312aab52f 100644 --- a/tooling/reorgs/src/simulator.rs +++ b/tooling/reorgs/src/simulator.rs @@ -43,7 +43,7 @@ pub struct Simulator { genesis_path: PathBuf, configs: Vec, enodes: Vec, - cancellation_tokens: Vec, + cancellation_tokens: Vec<(CancellationToken, tokio::task::JoinHandle<()>)>, } impl Simulator { @@ -97,7 +97,10 @@ impl Simulator { opts.syncmode = SyncMode::Full; - let _ = std::fs::remove_dir_all(&opts.datadir); + if opts.datadir.exists() { + std::fs::remove_dir_all(&opts.datadir) + .expect("Failed to remove existing data directory"); + } std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory"); let now = SystemTime::now() @@ -110,7 +113,6 @@ impl Simulator { let cancel = CancellationToken::new(); self.configs.push(opts.clone()); - self.cancellation_tokens.push(cancel.clone()); let mut cmd = Command::new(&self.cmd_path); cmd.args([ @@ -142,20 +144,26 @@ impl Simulator { .expect("node initialization timed out"); self.enodes.push(enode); - tokio::spawn(async move { - let mut child = child; - tokio::select! { - _ = cancel.cancelled() => { - if let Some(pid) = child.id() { - // NOTE: we use SIGTERM instead of child.kill() so sockets are closed - signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); + let waiter = tokio::spawn({ + let cancel = cancel.clone(); + async move { + let mut child = child; + tokio::select! { + _ = cancel.cancelled() => { + if let Some(pid) = child.id() { + // NOTE: we use SIGTERM instead of child.kill() so sockets are closed + signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap(); + } + } + res = child.wait() => { + assert!(res.unwrap().success()); } } - res = child.wait() => { - assert!(res.unwrap().success()); - } + // Ignore any errors on shutdown + let _ = child.wait().await.unwrap(); } }); + self.cancellation_tokens.push((cancel, waiter)); info!( "Started node {n} at http://{}:{}", @@ -165,10 +173,13 @@ impl Simulator { self.get_node(n) } - pub fn stop(&self) { - for token in &self.cancellation_tokens { + pub async fn stop(&mut self) { + for (token, waiter) in self.cancellation_tokens.drain(..) { token.cancel(); + waiter.await.unwrap(); } + self.enodes.clear(); + self.configs.clear(); } fn get_http_url(&self, index: usize) -> Url {