Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions p2pool/src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,31 +89,29 @@ impl InMemoryShareChain {

let mut p2chain = None;
if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? {
let bkp_file = config
.block_cache_file
.as_path()
.parent()
.ok_or_else(|| anyhow!("Block cache file has no parent"))?
.join("block_cache_backup")
.join(pow_algo.to_string());
info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);
// let bkp_file = config
// .block_cache_file
// .as_path()
// .parent()
// .ok_or_else(|| anyhow!("Block cache file has no parent"))?
// .join("block_cache_backup")
// .join(pow_algo.to_string());
// info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);

// First remove the old backup file
let _unused = fs::remove_dir_all(bkp_file.as_path())
.inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
fs::create_dir_all(bkp_file.parent().unwrap())
.map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
fs::rename(data_path.as_path(), bkp_file.as_path())
.map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?;
let old = LmdbBlockStorage::new_from_path(bkp_file.as_path());
let new = LmdbBlockStorage::new_from_path(&data_path);
// let _unused = fs::remove_dir_all(bkp_file.as_path())
// .inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
// fs::create_dir_all(bkp_file.parent().unwrap())
// .map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
// fs::rename(data_path.as_path(), bkp_file.as_path())
// .map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?;
let block_cache = LmdbBlockStorage::new_from_path(&data_path);
match P2Chain::try_load(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
old,
new,
block_cache,
&squad,
config
.minimum_randomx_target_difficulty
Expand Down
25 changes: 14 additions & 11 deletions p2pool/src/sharechain/lmdb_block_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,11 @@ impl LmdbBlockStorage {
info!(target: LOG_TARGET, "Using block storage at {:?}", path);
if !fs::exists(path).expect("could not get file") {
fs::create_dir_all(path).unwrap();
// fs::File::create(path).unwrap();
}
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let file_handle = manager.get_or_create(path, Rkv::new::<Lmdb>).unwrap();

let env = file_handle.read().expect("reader");
// let dbs = env.get_dbs().expect("No dbs");
// if !dbs.contains(&Some("migrations".to_string())) {
// let store = env.open_integer("migrations", StoreOptions::create()).unwrap();
// let writer = env.write().expect("writer");
// store.put(&writer, 0, &rkv::Value::Str("init")).unwrap();
// writer.commit();
// }
let mut migrations = HashMap::new();
{
let store = env.open_single("migrations", StoreOptions::create()).unwrap();
Expand Down Expand Up @@ -152,7 +144,18 @@ impl BlockCache for LmdbBlockStorage {
}
}

fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
//First we check if the block already exists, if it does we don't insert it, if force is set, we overwrite it
if force {
let env = self.file_handle.read().expect("reader");
let store = env.open_single("block_cache_v2", StoreOptions::create()).unwrap();
let reader = env.read().expect("reader");
let block = store.get(&reader, hash.as_bytes()).unwrap();
// we dont want to deserialise this block, so we just check if it exists
if block.is_some() {
return;
}
}
Comment on lines +147 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improved block insertion logic with force parameter

The updated insert method now includes a force parameter that determines whether to check for existing blocks before insertion. This is the key change that prevents recreating the database on start.

However, the implementation logic seems inverted:

  • If force is true, it checks if the block exists and returns early if it does
  • This means force is actually preventing insertion rather than forcing it

The parameter should be renamed for clarity or the logic should be inverted:

-    fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
-        //First we check if the block already exists, if it does we don't insert it, if force is set, we overwrite it
-        if force {
+    fn insert(&self, hash: BlockHash, block: Arc<P2Block>, skip_if_exists: bool) {
+        //First we check if the block already exists, if it does we don't insert it
+        if skip_if_exists {
             let env = self.file_handle.read().expect("reader");
             let store = env.open_single("block_cache_v2", StoreOptions::create()).unwrap();
             let reader = env.read().expect("reader");
             let block = store.get(&reader, hash.as_bytes()).unwrap();
             // we dont want to deserialise this block, so we just check if it exists
             if block.is_some() {
                 return;
             }
         }

Alternatively, invert the logic if "force" should mean "overwrite existing":

-    fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
-        //First we check if the block already exists, if it does we don't insert it, if force is set, we overwrite it
-        if force {
+    fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
+        //Check if block exists and skip insertion unless force is set
+        if !force {
             let env = self.file_handle.read().expect("reader");
             let store = env.open_single("block_cache_v2", StoreOptions::create()).unwrap();
             let reader = env.read().expect("reader");
             let block = store.get(&reader, hash.as_bytes()).unwrap();
             // we dont want to deserialise this block, so we just check if it exists
             if block.is_some() {
                 return;
             }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
//First we check if the block already exists, if it does we don't insert it, if force is set, we overwrite it
if force {
let env = self.file_handle.read().expect("reader");
let store = env.open_single("block_cache_v2", StoreOptions::create()).unwrap();
let reader = env.read().expect("reader");
let block = store.get(&reader, hash.as_bytes()).unwrap();
// we dont want to deserialise this block, so we just check if it exists
if block.is_some() {
return;
}
}
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, skip_if_exists: bool) {
//First we check if the block already exists, if it does we don't insert it
if skip_if_exists {
let env = self.file_handle.read().expect("reader");
let store = env.open_single("block_cache_v2", StoreOptions::create()).unwrap();
let reader = env.read().expect("reader");
let block = store.get(&reader, hash.as_bytes()).unwrap();
// we dont want to deserialise this block, so we just check if it exists
if block.is_some() {
return;
}
}

// Retry if the map is full
// This weird pattern of setting a bool is so that the env is closed before resizing, otherwise
// you can't resize with active transactions.
Expand Down Expand Up @@ -225,7 +228,7 @@ fn resize_db(env: &Rkv<LmdbEnvironment>) {
pub trait BlockCache {
fn get(&self, hash: &BlockHash) -> Option<Arc<P2Block>>;
fn delete(&self, hash: &BlockHash);
fn insert(&self, hash: BlockHash, block: Arc<P2Block>);
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool);
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error>;
}

Expand Down Expand Up @@ -256,7 +259,7 @@ pub mod test {
self.blocks.write().unwrap().remove(hash);
}

fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
self.blocks.write().unwrap().insert(hash, block);
}
Comment on lines +262 to 264
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Needs implementation of force parameter

The InMemoryBlockCache implementation of the insert method includes the new force parameter but doesn't implement the conditional behavior seen in the LmdbBlockStorage implementation. This inconsistency could lead to different behavior between test and production code.

-        fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
-            self.blocks.write().unwrap().insert(hash, block);
+        fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
+            if !force {
+                if self.blocks.read().unwrap().contains_key(&hash) {
+                    return;
+                }
+            }
+            self.blocks.write().unwrap().insert(hash, block);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
self.blocks.write().unwrap().insert(hash, block);
}
fn insert(&self, hash: BlockHash, block: Arc<P2Block>, force: bool) {
if !force {
if self.blocks.read().unwrap().contains_key(&hash) {
return;
}
}
self.blocks.write().unwrap().insert(hash, block);
}


Expand Down
13 changes: 8 additions & 5 deletions p2pool/src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
ops::{Deref, Sub},
sync::Arc,
};

use std::time::Instant;
use chrono::{Duration, Utc};
use itertools::Itertools;
use log::*;
Expand Down Expand Up @@ -204,7 +204,6 @@ impl<T: BlockCache> P2Chain<T> {
total_size: u64,
share_window: u64,
block_time: u64,
from_block_cache: T,
new_block_cache: T,
squad: &str,
minimum_randomx_target_difficulty: u64,
Expand Down Expand Up @@ -233,7 +232,8 @@ impl<T: BlockCache> P2Chain<T> {
))
.timestamp() as u64)
.into();
for (i, block) in from_block_cache.all_blocks()?.into_iter().enumerate() {
let start = Instant::now();
for (i, block) in new_chain.block_cache.all_blocks()?.into_iter().enumerate() {
if block.version != PROTOCOL_VERSION {
warn!(target: LOG_TARGET, "Block version mismatch, skipping block");
continue;
Expand All @@ -258,6 +258,9 @@ impl<T: BlockCache> P2Chain<T> {
error!(target: LOG_TARGET, "Failed to load block into chain: {}", e);
});
}
let time = start.elapsed();
info!(target: LOG_TARGET, "Loaded chain in {:?}", time.as_secs());
panic!("close");
Ok(new_chain)
}

Expand Down Expand Up @@ -706,7 +709,7 @@ impl<T: BlockCache> P2Chain<T> {
let level = self
.level_at_height(height)
.ok_or(ShareChainError::BlockLevelNotFound)?;
level.add_block(Arc::new(actual_block))?;
level.add_block(Arc::new(actual_block), true)?;

Ok(())
}
Expand Down Expand Up @@ -1083,7 +1086,7 @@ impl<T: BlockCache> P2Chain<T> {
}
match self.level_at_height(new_block_height) {
Some(level) => {
level.add_block(block)?;
level.add_block(block, false)?;
self.verify_chain(new_block_height, block_hash)
},
None => {
Expand Down
6 changes: 3 additions & 3 deletions p2pool/src/sharechain/p2chain_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<T: BlockCache> P2ChainLevel<T> {
let mut block_headers = HashMap::new();
block_headers.insert(block.hash, header);

block_cache.insert(block.hash, block);
block_cache.insert(block.hash, block, false);

Self {
block_cache,
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<T: BlockCache> P2ChainLevel<T> {
*lock = hash;
}

pub fn add_block(&self, block: Arc<P2Block>) -> Result<(), ShareChainError> {
pub fn add_block(&self, block: Arc<P2Block>, force: bool) -> Result<(), ShareChainError> {
if self.height != block.height {
return Err(ShareChainError::InvalidBlock {
reason: "Block height does not match the chain level height".to_string(),
Expand All @@ -158,7 +158,7 @@ impl<T: BlockCache> P2ChainLevel<T> {
.write()
.expect("could not lock")
.insert(block.hash, header);
self.block_cache.insert(block.hash, block);
self.block_cache.insert(block.hash, block, force);
Ok(())
}

Expand Down
Loading