Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
48bdfc8
feat(l1): avoid opening DB if schema version doesn't match
MegaRedHand Dec 10, 2025
33af0c3
fix: don't check metadata if engine type is InMemory
MegaRedHand Dec 10, 2025
eca1b7e
fix: avoid TOC-TOU race condition in metadata file creation
MegaRedHand Dec 10, 2025
6e434ab
refactor: turn else-if into if
MegaRedHand Dec 10, 2025
4ddd39c
refactor: remove matches!
MegaRedHand Dec 10, 2025
c7040a0
fix: create directory if it doesn't exist
MegaRedHand Dec 10, 2025
ff23927
refactor: store the file name in a constant
MegaRedHand Dec 10, 2025
428ca59
Merge branch 'main' into schema-version-independent-of-backend
MegaRedHand Dec 11, 2025
1c4dba8
feat: fail if the datadir exists with no metadata
MegaRedHand Dec 11, 2025
0443c25
fix: avoid creating datadir before Store is initialized
MegaRedHand Dec 11, 2025
5f6805d
refactor: simplify nested ifs
MegaRedHand Dec 11, 2025
09fb05c
fix: don't create datadir before running node
MegaRedHand Dec 11, 2025
23893fd
feat: improve error message
MegaRedHand Dec 11, 2025
7e455a4
chore: remove comment
MegaRedHand Dec 11, 2025
feeddd4
fix: don't treat empty datadirs as old DBs
MegaRedHand Dec 11, 2025
324ac4b
chore: revert utils change
MegaRedHand Dec 11, 2025
d64153d
Revert "fix: don't create datadir before running node"
MegaRedHand Dec 11, 2025
092688c
fix: create datadir in validation function, just in case
MegaRedHand Dec 11, 2025
5a61b26
Merge branch 'main' into schema-version-independent-of-backend
MegaRedHand Dec 11, 2025
4acf291
Merge branch 'main' into schema-version-independent-of-backend
MegaRedHand Dec 12, 2025
1797218
fix: properly wait for processes to finish
MegaRedHand Dec 12, 2025
7970fd5
refactor: remove sleep
MegaRedHand Dec 12, 2025
e829c3c
fix: create metadata file when checkpointing
MegaRedHand Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,10 @@ pub async fn init_l1(

let store = match init_store(datadir, genesis).await {
Ok(store) => store,
Err(StoreError::IncompatibleDBVersion) => {
Err(err @ StoreError::IncompatibleDBVersion { .. })
| Err(err @ StoreError::NotFoundDBVersion { .. }) => {
Comment on lines +399 to +400
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We start at 1 so we can treat 0 and missing as equivalent, so I don't think the new error kind is necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the distinction clear and also simplifies the formatting of the error here.

return Err(eyre::eyre!(
"Incompatible DB version. Please run `ethrex removedb` and restart node"
"{err}. Please erase your DB by running `ethrex removedb` and restart node to resync. Note that this will take a while."
));
}
Err(error) => return Err(eyre::eyre!("Failed to create Store: {error}")),
Expand Down
10 changes: 8 additions & 2 deletions crates/storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ pub enum StoreError {
UpdateBatchNoBlocks,
#[error("Pivot changed")]
PivotChanged,
#[error("Incompatible DB Version")]
IncompatibleDBVersion,
#[error("Error reading from disk: {0}")]
IoError(#[from] std::io::Error),
#[error("Error serializing metadata: {0}")]
DbMetadataError(#[from] serde_json::Error),
#[error("Incompatible DB Version: not found, expected v{expected}")]
NotFoundDBVersion { expected: u64 },
#[error("Incompatible DB Version: found v{found}, expected v{expected}")]
IncompatibleDBVersion { found: u64, expected: u64 },
}
3 changes: 3 additions & 0 deletions crates/storage/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ pub use store::{
/// Store Schema Version, must be updated on any breaking change
/// An upgrade to a newer schema version invalidates currently stored data, requiring a re-sync.
pub const STORE_SCHEMA_VERSION: u64 = 1;

/// Name of the file storing the metadata about the database
pub const STORE_METADATA_FILENAME: &str = "metadata.json";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the place for the schema versioning is the DB. Decoupling it is asking for trouble. E.g., I don't see the removedb command deleting this properly.
If we don't want it to be data, we may follow a scheme similar to what Postgres does, which is encoding versioning in file names (e.g., the internal directory might be data-{schema_version}, with failure indicating incompatibility).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removedb removes the whole datadir, including the metadata file, so there should be no problem there.

The main reason to move the schema versioning to another file is to make it backend-agnostic. Keeping it inside the DB means we need to open a Store before checking if we support its version. Right now, this means we may have some data loss due to unknown columns being dropped by RocksDB. This can be solved, but it would require the backend itself to check the version.

It also has the benefit that we can include more information, like the engine type we're using, and we can easily inspect it because we use plaintext. Having it in filenames is a good alternative, but I don't see the benefits in moving to that.

102 changes: 71 additions & 31 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "rocksdb")]
use crate::backend::rocksdb::RocksDBBackend;
use crate::{
STORE_SCHEMA_VERSION,
STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION,
api::{
StorageBackend,
tables::{
Expand Down Expand Up @@ -39,9 +39,11 @@ use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitn
use ethrex_trie::{Node, NodeRLP};
use lru::LruCache;
use rustc_hash::FxBuildHasher;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap, hash_map::Entry},
fmt::Debug,
io::Write,
path::{Path, PathBuf},
sync::{
Arc, Mutex,
Expand Down Expand Up @@ -1341,6 +1343,12 @@ impl Store {
pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> {
// Ignore unused variable warning when compiling without DB features
let db_path = path.as_ref().to_path_buf();

if engine_type != EngineType::InMemory {
// Check that the last used DB version matches the current version
validate_store_schema_version(&db_path)?;
}

match engine_type {
#[cfg(feature = "rocksdb")]
EngineType::RocksDB => {
Expand All @@ -1359,8 +1367,6 @@ impl Store {
db_path: PathBuf,
commit_threshold: usize,
) -> Result<Self, StoreError> {
check_schema_version(backend.as_ref())?;

debug!("Initializing Store with {commit_threshold} in-memory diff-layers");
let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0);
let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0);
Expand Down Expand Up @@ -2380,7 +2386,9 @@ impl Store {
}

pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<(), StoreError> {
self.backend.create_checkpoint(path.as_ref())
self.backend.create_checkpoint(path.as_ref())?;
init_metadata_file(path.as_ref())?;
Ok(())
}

pub fn get_store_directory(&self) -> Result<PathBuf, StoreError> {
Expand Down Expand Up @@ -2800,33 +2808,6 @@ fn snap_state_key(index: SnapStateIndex) -> Vec<u8> {
(index as u8).encode_to_vec()
}

fn check_schema_version(backend: &dyn StorageBackend) -> Result<(), StoreError> {
// Check that the last used DB version matches the current version
let latest_store_schema_version = backend
.begin_read()?
.get(MISC_VALUES, b"store_schema_version")?
.map(|bytes| -> Result<u64, StoreError> {
let array: [u8; 8] = bytes.try_into().map_err(|_| {
StoreError::Custom("Invalid store schema version bytes".to_string())
})?;
Ok(u64::from_le_bytes(array))
})
.transpose()?;
if let Some(schema_version) = latest_store_schema_version
&& schema_version != STORE_SCHEMA_VERSION
{
return Err(StoreError::IncompatibleDBVersion);
}
let mut tx = backend.begin_write()?;
tx.put(
MISC_VALUES,
b"store_schema_version",
&STORE_SCHEMA_VERSION.to_le_bytes(),
)?;
tx.commit()?;
Ok(())
}

fn encode_code(code: &Code) -> Vec<u8> {
let mut buf = Vec::with_capacity(
6 + code.bytecode.len() + std::mem::size_of_val(code.jump_targets.as_slice()),
Expand All @@ -2852,6 +2833,65 @@ impl LatestBlockHeaderCache {
}
}

#[derive(Debug, Serialize, Deserialize)]
struct StoreMetadata {
schema_version: u64,
}

impl StoreMetadata {
fn new(schema_version: u64) -> Self {
Self { schema_version }
}
}

fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> {
let metadata_path = path.join(STORE_METADATA_FILENAME);
// If metadata file does not exist, try to create it
if !metadata_path.exists() {
// If datadir exists but is not empty, this is probably a DB for an
// old ethrex version and we should return an error
if path.exists() && !dir_is_empty(path)? {
return Err(StoreError::NotFoundDBVersion {
expected: STORE_SCHEMA_VERSION,
});
}
init_metadata_file(path)?;
return Ok(());
}
if !metadata_path.is_file() {
return Err(StoreError::Custom(
"store schema path exists but is not a file".to_string(),
));
}
let file_contents = std::fs::read_to_string(metadata_path)?;
let metadata: StoreMetadata = serde_json::from_str(&file_contents)?;

// Check schema version matches the expected one
if metadata.schema_version != STORE_SCHEMA_VERSION {
return Err(StoreError::IncompatibleDBVersion {
found: metadata.schema_version,
expected: STORE_SCHEMA_VERSION,
});
}
Ok(())
}

fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> {
std::fs::create_dir_all(parent_path)?;

let metadata_path = parent_path.join(STORE_METADATA_FILENAME);
let metadata = StoreMetadata::new(STORE_SCHEMA_VERSION);
let serialized_metadata = serde_json::to_string_pretty(&metadata)?;
let mut new_file = std::fs::File::create_new(metadata_path)?;
new_file.write_all(serialized_metadata.as_bytes())?;
Ok(())
}

fn dir_is_empty(path: &Path) -> Result<bool, StoreError> {
let is_empty = std::fs::read_dir(path)?.next().is_none();
Ok(is_empty)
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down
2 changes: 1 addition & 1 deletion tooling/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions tooling/reorgs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ where
// Run in another task to clean up properly on panic
let result = tokio::spawn(test_fn(simulator.clone())).await;

simulator.lock_owned().await.stop();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
simulator.lock_owned().await.stop().await;

match result {
Ok(_) => info!(test=%test_name, elapsed=?start.elapsed(), "test completed successfully"),
Expand Down
41 changes: 26 additions & 15 deletions tooling/reorgs/src/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct Simulator {
genesis_path: PathBuf,
configs: Vec<Options>,
enodes: Vec<String>,
cancellation_tokens: Vec<CancellationToken>,
cancellation_tokens: Vec<(CancellationToken, tokio::task::JoinHandle<()>)>,
}

impl Simulator {
Expand Down Expand Up @@ -97,7 +97,10 @@ impl Simulator {

opts.syncmode = SyncMode::Full;

let _ = std::fs::remove_dir_all(&opts.datadir);
if opts.datadir.exists() {
std::fs::remove_dir_all(&opts.datadir)
.expect("Failed to remove existing data directory");
}
std::fs::create_dir_all(&opts.datadir).expect("Failed to create data directory");

let now = SystemTime::now()
Expand All @@ -110,7 +113,6 @@ impl Simulator {
let cancel = CancellationToken::new();

self.configs.push(opts.clone());
self.cancellation_tokens.push(cancel.clone());

let mut cmd = Command::new(&self.cmd_path);
cmd.args([
Expand Down Expand Up @@ -142,20 +144,26 @@ impl Simulator {
.expect("node initialization timed out");
self.enodes.push(enode);

tokio::spawn(async move {
let mut child = child;
tokio::select! {
_ = cancel.cancelled() => {
if let Some(pid) = child.id() {
// NOTE: we use SIGTERM instead of child.kill() so sockets are closed
signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap();
let waiter = tokio::spawn({
let cancel = cancel.clone();
async move {
let mut child = child;
tokio::select! {
_ = cancel.cancelled() => {
if let Some(pid) = child.id() {
// NOTE: we use SIGTERM instead of child.kill() so sockets are closed
signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM).unwrap();
}
}
res = child.wait() => {
assert!(res.unwrap().success());
}
}
res = child.wait() => {
assert!(res.unwrap().success());
}
// Ignore any errors on shutdown
let _ = child.wait().await.unwrap();
}
});
self.cancellation_tokens.push((cancel, waiter));

info!(
"Started node {n} at http://{}:{}",
Expand All @@ -165,10 +173,13 @@ impl Simulator {
self.get_node(n)
}

pub fn stop(&self) {
for token in &self.cancellation_tokens {
pub async fn stop(&mut self) {
for (token, waiter) in self.cancellation_tokens.drain(..) {
token.cancel();
waiter.await.unwrap();
}
self.enodes.clear();
self.configs.clear();
}

fn get_http_url(&self, index: usize) -> Url {
Expand Down