Skip to content

Commit 3c15561

Browse files
committed
make hash calculation parallel
1 parent 5e80f89 commit 3c15561

5 files changed

Lines changed: 150 additions & 171 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ edition = "2021"
88

99
[dependencies]
1010
chunkfs = "0.1"
11+
futures = "0.3"
12+
rayon = "1.10"
13+
1114

1215
[dev-dependencies]
1316
rand = "0.8.5"

runner/src/main.rs

Lines changed: 21 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ use chunkfs::chunkers::{
66
FSChunker, LeapChunker, RabinChunker, SeqChunker, SizeParams, SuperChunker, UltraChunker,
77
};
88
use chunkfs::hashers::Sha256Hasher;
9-
use chunkfs::{ChunkerRef, FileSystem};
10-
use chunkfs::chunkers::seq::OperationMode;
11-
use sbc_algorithm::decoders;
12-
use sbc_algorithm::encoders;
9+
use chunkfs::FileSystem;
10+
use sbc_algorithm::{decoders, encoders};
1311
use sbc_algorithm::{SBCMap, SBCScrubber};
1412
use std::collections::HashMap;
15-
use std::fs::OpenOptions;
16-
use std::io::{self, Write};
17-
use std::time::Instant;
13+
use std::fs;
14+
use std::io;
15+
1816
#[allow(dead_code)]
1917
const MB: usize = 1024 * 1024;
2018

@@ -25,72 +23,21 @@ fn generate_data(mb_size: usize) -> Vec<u8> {
2523
}
2624

2725
fn main() -> io::Result<()> {
28-
let chunk_size = SizeParams::new(2 * 1024, 8 * 1024, 12 * 1024);
29-
let chunkers: Vec<(&str, ChunkerRef)> = vec![
30-
("super", SuperChunker::new(chunk_size.clone()).into()),
31-
("fs", FSChunker::default().into()),
32-
("leap", LeapChunker::new(chunk_size.clone()).into()),
33-
("rabin", RabinChunker::new(chunk_size.clone()).into()),
34-
(
35-
"seq",
36-
SeqChunker::new(OperationMode::Increasing, chunk_size.clone(), Default::default()).into(),
37-
),
38-
("ultra", UltraChunker::new(chunk_size.clone()).into()),
39-
];
40-
let dataset_path = "runner/files/kernels.tar";
41-
let mut out_file = OpenOptions::new()
42-
.write(true)
43-
.create(true)
44-
.truncate(false)
45-
.open("out-sbc-kernels-gdelta1.csv")?;
46-
writeln!(
47-
out_file,
48-
"Chunker\tCDCReadTime(s)\tCDCWriteTime(s)\tCDCDedupRatio\tScrubTime\tSBCReadTime\tSBCDedupRatio"
49-
)?;
50-
51-
for (chunker_name, chunker_ref) in chunkers {
52-
let data = std::fs::read(dataset_path)?;
53-
let mut fs = FileSystem::new_with_scrubber(
54-
HashMap::default(),
55-
SBCMap::new(decoders::GdeltaDecoder),
56-
Box::new(SBCScrubber::new(encoders::GdeltaEncoder)),
57-
Sha256Hasher::default(),
58-
);
59-
let mut handle = fs.create_file("file".to_string(), chunker_ref)?;
60-
let now = Instant::now();
61-
fs.write_to_file(&mut handle, &data)?;
62-
let cdc_write_time = now.elapsed();
63-
fs.close_file(handle)?;
64-
let cdc_dedup_ratio = fs.cdc_dedup_ratio();
65-
66-
let handle = fs.open_file_readonly("file".to_string())?;
67-
let now = Instant::now();
68-
fs.read_file_complete(&handle)?;
69-
let cdc_read_time = now.elapsed();
70-
71-
let res = fs.scrub().unwrap();
72-
let sbc_dedup_ratio = fs.total_dedup_ratio();
73-
74-
let now = Instant::now();
75-
fs.read_file_complete(&handle)?;
76-
let sbc_read_time = now.elapsed();
77-
78-
writeln!(
79-
out_file,
80-
"{}\t{:.10}\t{:.10}\t{:.10}\t{:.10}\t{:.10}\t{:.10}",
81-
chunker_name,
82-
cdc_read_time.as_secs_f64(),
83-
cdc_write_time.as_secs_f64(),
84-
cdc_dedup_ratio,
85-
res.running_time.as_secs_f64(),
86-
sbc_read_time.as_secs_f64(),
87-
sbc_dedup_ratio
88-
)?;
89-
fs.clear_file_system()?;
90-
println!("Scrubber results: {res:?}");
91-
println!("CDChunking complete, dedup_ratio: {}", cdc_dedup_ratio);
92-
println!("SBC dedup ratio: {}", sbc_dedup_ratio);
93-
println!("delta: {}", sbc_dedup_ratio - cdc_dedup_ratio);
94-
}
26+
let data = fs::read("runner/files/my_data")?;
27+
let chunk_size = SizeParams::new(2 * 1024, 8 * 1024, 16 * 1024);
28+
let mut fs = FileSystem::new_with_scrubber(
29+
HashMap::default(),
30+
SBCMap::new(decoders::GdeltaDecoder),
31+
Box::new(SBCScrubber::new(encoders::GdeltaEncoder)),
32+
Sha256Hasher::default(),
33+
);
34+
let mut handle = fs.create_file("file".to_string(), SuperChunker::new(chunk_size))?;
35+
36+
fs.write_to_file(&mut handle, &data)?;
37+
fs.close_file(handle)?;
38+
let cdc_dedup_ratio = fs.cdc_dedup_ratio();
39+
let res = fs.scrub().unwrap();
40+
let sbc_dedup_ratio = fs.total_dedup_ratio();
41+
println!("{}, {:?}, {}", cdc_dedup_ratio, res, sbc_dedup_ratio);
9542
Ok(())
9643
}

src/chunkfs_sbc.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
use crate::decoders::Decoder;
2-
use crate::encoders::{Encoder};
2+
use crate::encoders::Encoder;
33
use crate::graph::Graph;
44
use crate::{hash_functions, ChunkType, SBCHash, SBCMap};
55
use chunkfs::{
66
ChunkHash, Data, DataContainer, Database, IterableDatabase, Scrub, ScrubMeasurements,
77
};
8+
use rayon::prelude::*;
9+
use rayon::ThreadPoolBuilder;
810
use std::collections::HashMap;
911
use std::io;
1012
use std::io::{Error, ErrorKind};
13+
use std::sync::{Arc, Mutex};
1114
use std::time::Instant;
1215

16+
const NUM_THREADS_FOR_HASHING: usize = 4;
17+
18+
type ClusterType<'a> = HashMap<u32, Vec<(u32, &'a mut DataContainer<SBCHash>)>>;
19+
1320
impl<D: Decoder> Database<SBCHash, Vec<u8>> for SBCMap<D> {
1421
fn insert(&mut self, sbc_hash: SBCHash, chunk: Vec<u8>) -> io::Result<()> {
1522
self.sbc_hashmap.insert(sbc_hash, chunk);
@@ -69,29 +76,24 @@ pub struct SBCScrubber<E>
6976
where
7077
E: Encoder,
7178
{
72-
graph: Graph,
79+
graph: Arc<Mutex<Graph>>,
7380
encoder: E,
7481
}
7582

7683
impl<E: Encoder> SBCScrubber<E> {
7784
pub fn new(_encoder: E) -> SBCScrubber<E> {
7885
SBCScrubber {
79-
graph: Graph::new(),
86+
graph: Arc::new(Mutex::new(Graph::new())),
8087
encoder: _encoder,
8188
}
8289
}
8390
}
8491

85-
// impl<E: Encoder> Default for SBCScrubber<E> {
86-
// fn default() -> SBCScrubber<LevenshteinEncoder> {
87-
// Self::new(LevenshteinEncoder)
88-
// }
89-
// }
90-
9192
impl<Hash: ChunkHash, B, D: Decoder, E: Encoder> Scrub<Hash, B, SBCHash, SBCMap<D>>
9293
for SBCScrubber<E>
9394
where
94-
B: IterableDatabase<Hash, DataContainer<SBCHash>>,
95+
for<'data> B:
96+
IterableDatabase<Hash, DataContainer<SBCHash>> + IntoParallelRefMutIterator<'data> + 'data,
9597
{
9698
fn scrub<'a>(
9799
&mut self,
@@ -101,28 +103,45 @@ where
101103
where
102104
Hash: 'a,
103105
{
104-
let time_start = Instant::now();
105106
let mut processed_data = 0;
106107
let mut data_left = 0;
107-
let mut clusters: HashMap<u32, Vec<(u32, &mut DataContainer<SBCHash>)>> = HashMap::new();
108-
for (_, data_container) in database.iterator_mut() {
109-
match data_container.extract() {
110-
Data::Chunk(data) => {
111-
let sbc_hash = hash_functions::sbc_hashing(data.as_slice());
112-
let parent_hash = self.graph.add_vertex(sbc_hash);
113-
let cluster = clusters.entry(parent_hash).or_default();
114-
cluster.push((sbc_hash, data_container));
108+
let pool = ThreadPoolBuilder::new()
109+
.num_threads(NUM_THREADS_FOR_HASHING)
110+
.build()
111+
.unwrap();
112+
let clusters: Arc<Mutex<ClusterType>> = Arc::new(Mutex::new(HashMap::new()));
113+
114+
let mut mut_refs_database: Vec<&mut DataContainer<SBCHash>> =
115+
database.iterator_mut().map(|(_, b)| b).collect();
116+
117+
let time_start = Instant::now();
118+
pool.install(|| {
119+
mut_refs_database.par_iter_mut().for_each(|data_container| {
120+
match data_container.extract() {
121+
Data::Chunk(data) => {
122+
let sbc_hash = hash_functions::sbc_hashing(data.as_slice());
123+
let parent_hash = self.graph.lock().unwrap().add_vertex(sbc_hash);
124+
let mut clusters_lock = clusters.lock().unwrap();
125+
let cluster = clusters_lock.entry(parent_hash).or_default();
126+
cluster.push((sbc_hash, data_container));
127+
}
128+
Data::TargetChunk(_) => {
129+
panic!()
130+
}
115131
}
116-
Data::TargetChunk(_) => {}
117-
}
118-
}
132+
});
133+
});
119134
let time_hashing = time_start.elapsed();
120135
println!("time for hashing: {time_hashing:?}");
121-
let (clusters_data_left, clusters_processed_data) =
122-
self.encoder.encode_clusters(&mut clusters, target_map);
136+
137+
let (clusters_data_left, clusters_processed_data) = self
138+
.encoder
139+
.encode_clusters(&mut clusters.lock().unwrap(), target_map);
140+
123141
data_left += clusters_data_left;
124142
processed_data += clusters_processed_data;
125143
let running_time = time_start.elapsed();
144+
126145
Ok(ScrubMeasurements {
127146
processed_data,
128147
running_time,

0 commit comments

Comments
 (0)