Skip to content

Commit ec911ab

Browse files
committed
begin to pass around generic file type
1 parent 4d4d8b6 commit ec911ab

File tree

4 files changed

+121
-53
lines changed

4 files changed

+121
-53
lines changed

src/store/bao_file.rs

Lines changed: 92 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use iroh_io::AsyncSliceReader;
3232
use super::mutable_mem_storage::{MutableMemStorage, SizeInfo};
3333
use crate::{
3434
store::BaoBatchWriter,
35-
util::{get_limited_slice, MemOrFile, SparseMemFile},
35+
util::{callback_lock::CallbackLock, get_limited_slice, FileAndSize, MemOrFile, SparseMemFile},
3636
Hash, IROH_BLOCK_SIZE,
3737
};
3838

@@ -81,46 +81,63 @@ struct DataPaths {
8181
///
8282
/// For the memory variant, it does reading in a zero copy way, since storage
8383
/// is already a `Bytes`.
84-
#[derive(Default, derive_more::Debug)]
85-
pub struct CompleteStorage {
84+
#[derive(derive_more::Debug)]
85+
#[debug(bound(T: Debug))]
86+
pub struct CompleteStorage<T> {
8687
/// data part, which can be in memory or on disk.
8788
#[debug("{:?}", data.as_ref().map_mem(|x| x.len()))]
88-
pub data: MemOrFile<Bytes, (File, u64)>,
89+
pub data: MemOrFile<Bytes, FileAndSize<T>>,
8990
/// outboard part, which can be in memory or on disk.
9091
#[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))]
91-
pub outboard: MemOrFile<Bytes, (File, u64)>,
92+
pub outboard: MemOrFile<Bytes, FileAndSize<T>>,
9293
}
9394

94-
impl CompleteStorage {
95+
impl<T> Default for CompleteStorage<T> {
96+
fn default() -> Self {
97+
Self {
98+
data: Default::default(),
99+
outboard: Default::default(),
100+
}
101+
}
102+
}
103+
104+
impl<T> CompleteStorage<T>
105+
where
106+
T: bao_tree::io::sync::ReadAt,
107+
{
95108
/// Read from the data file at the given offset, until end of file or max bytes.
96109
pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes {
97110
match &self.data {
98111
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
99-
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
112+
MemOrFile::File(FileAndSize { file, size: _ }) => {
113+
read_to_end(file, offset, len).unwrap()
114+
}
100115
}
101116
}
102117

103118
/// Read from the outboard file at the given offset, until end of file or max bytes.
104119
pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes {
105120
match &self.outboard {
106121
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
107-
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
122+
MemOrFile::File(FileAndSize { file, size: _ }) => {
123+
read_to_end(file, offset, len).unwrap()
124+
}
108125
}
109126
}
110127

111128
/// The size of the data file.
112129
pub fn data_size(&self) -> u64 {
113130
match &self.data {
114131
MemOrFile::Mem(mem) => mem.len() as u64,
115-
MemOrFile::File((_file, size)) => *size,
132+
MemOrFile::File(FileAndSize { file: _, size }) => *size,
116133
}
117134
}
118135

119136
/// The size of the outboard file.
120137
pub fn outboard_size(&self) -> u64 {
121138
match &self.outboard {
122139
MemOrFile::Mem(mem) => mem.len() as u64,
123-
MemOrFile::File((_file, size)) => *size,
140+
MemOrFile::File(FileAndSize { file: _, size }) => *size,
124141
}
125142
}
126143
}
@@ -244,7 +261,7 @@ impl FileStorage {
244261

245262
/// The storage for a bao file. This can be either in memory or on disk.
246263
#[derive(Debug)]
247-
pub(crate) enum BaoFileStorage {
264+
pub(crate) enum BaoFileStorage<T> {
248265
/// The entry is incomplete and in memory.
249266
///
250267
/// Since it is incomplete, it must be writeable.
@@ -261,16 +278,16 @@ pub(crate) enum BaoFileStorage {
261278
/// (memory or file).
262279
///
263280
/// Writing to this is a no-op, since it is already complete.
264-
Complete(CompleteStorage),
281+
Complete(CompleteStorage<T>),
265282
}
266283

267-
impl Default for BaoFileStorage {
284+
impl<T> Default for BaoFileStorage<T> {
268285
fn default() -> Self {
269286
BaoFileStorage::Complete(Default::default())
270287
}
271288
}
272289

273-
impl BaoFileStorage {
290+
impl<T> BaoFileStorage<T> {
274291
/// Take the storage out, leaving an empty storage in its place.
275292
///
276293
/// Be careful to put something back in its place, or you will lose data.
@@ -310,11 +327,11 @@ impl BaoFileStorage {
310327

311328
/// A weak reference to a bao file handle.
312329
#[derive(Debug, Clone)]
313-
pub struct BaoFileHandleWeak(Weak<BaoFileHandleInner>);
330+
pub struct BaoFileHandleWeak<T>(Weak<BaoFileHandleInner<T>>);
314331

315-
impl BaoFileHandleWeak {
332+
impl<T> BaoFileHandleWeak<T> {
316333
/// Upgrade to a strong reference if possible.
317-
pub fn upgrade(&self) -> Option<BaoFileHandle> {
334+
pub fn upgrade(&self) -> Option<BaoFileHandle<T>> {
318335
self.0.upgrade().map(BaoFileHandle)
319336
}
320337

@@ -326,15 +343,29 @@ impl BaoFileHandleWeak {
326343

327344
/// The inner part of a bao file handle.
328345
#[derive(Debug)]
329-
pub struct BaoFileHandleInner {
330-
pub(crate) storage: RwLock<BaoFileStorage>,
346+
pub struct BaoFileHandleInner<T> {
347+
pub(crate) storage: RwLock<BaoFileStorage<T>>,
331348
config: Arc<BaoFileConfig>,
332349
hash: Hash,
333350
}
334351

335352
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
336-
#[derive(Debug, Clone, derive_more::Deref)]
337-
pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
353+
#[derive(Debug)]
354+
pub struct BaoFileHandle<T>(Arc<BaoFileHandleInner<T>>);
355+
356+
impl<T> Deref for BaoFileHandle<T> {
357+
type Target = Arc<BaoFileHandleInner<T>>;
358+
359+
fn deref(&self) -> &Self::Target {
360+
&self.0
361+
}
362+
}
363+
364+
impl<T> Clone for BaoFileHandle<T> {
365+
fn clone(&self) -> Self {
366+
Self(self.0.clone())
367+
}
368+
}
338369

339370
pub(crate) type CreateCb = Arc<dyn Fn(&Hash) -> io::Result<()> + Send + Sync>;
340371

@@ -375,13 +406,18 @@ impl BaoFileConfig {
375406

376407
/// A reader for a bao file, reading just the data.
377408
#[derive(Debug)]
378-
pub struct DataReader(Option<BaoFileHandle>);
409+
pub struct DataReader<T>(Option<BaoFileHandle<T>>);
379410

380-
async fn with_storage<T, P, F>(opt: &mut Option<BaoFileHandle>, no_io: P, f: F) -> io::Result<T>
411+
async fn with_storage<T, P, F, H>(
412+
opt: &mut Option<BaoFileHandle<H>>,
413+
no_io: P,
414+
f: F,
415+
) -> io::Result<T>
381416
where
382-
P: Fn(&BaoFileStorage) -> bool + Send + 'static,
383-
F: FnOnce(&BaoFileStorage) -> io::Result<T> + Send + 'static,
417+
P: Fn(&BaoFileStorage<H>) -> bool + Send + 'static,
418+
F: FnOnce(&BaoFileStorage<H>) -> io::Result<T> + Send + 'static,
384419
T: Send + 'static,
420+
H: Send + Sync + 'static,
385421
{
386422
let handle = opt
387423
.take()
@@ -410,7 +446,10 @@ where
410446
res
411447
}
412448

413-
impl AsyncSliceReader for DataReader {
449+
impl<T> AsyncSliceReader for DataReader<T>
450+
where
451+
T: Send + Sync + bao_tree::io::sync::ReadAt + 'static,
452+
{
414453
async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
415454
with_storage(
416455
&mut self.0,
@@ -440,9 +479,12 @@ impl AsyncSliceReader for DataReader {
440479

441480
/// A reader for the outboard part of a bao file.
442481
#[derive(Debug)]
443-
pub struct OutboardReader(Option<BaoFileHandle>);
482+
pub struct OutboardReader<T>(Option<BaoFileHandle<T>>);
444483

445-
impl AsyncSliceReader for OutboardReader {
484+
impl<T> AsyncSliceReader for OutboardReader<T>
485+
where
486+
T: Send + Sync + bao_tree::io::sync::ReadAt + 'static,
487+
{
446488
async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
447489
with_storage(
448490
&mut self.0,
@@ -476,7 +518,10 @@ enum HandleChange {
476518
// later: size verified
477519
}
478520

479-
impl BaoFileHandle {
521+
impl<T> BaoFileHandle<T>
522+
where
523+
T: bao_tree::io::sync::ReadAt,
524+
{
480525
/// Create a new bao file handle.
481526
///
482527
/// This will create a new file handle with an empty memory storage.
@@ -509,8 +554,8 @@ impl BaoFileHandle {
509554
pub fn new_complete(
510555
config: Arc<BaoFileConfig>,
511556
hash: Hash,
512-
data: MemOrFile<Bytes, (File, u64)>,
513-
outboard: MemOrFile<Bytes, (File, u64)>,
557+
data: MemOrFile<Bytes, FileAndSize<T>>,
558+
outboard: MemOrFile<Bytes, FileAndSize<T>>,
514559
) -> Self {
515560
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
516561
Self(Arc::new(BaoFileHandleInner {
@@ -525,7 +570,7 @@ impl BaoFileHandle {
525570
#[cfg(feature = "fs-store")]
526571
pub(crate) fn transform(
527572
&self,
528-
f: impl FnOnce(BaoFileStorage) -> io::Result<BaoFileStorage>,
573+
f: impl FnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
529574
) -> io::Result<()> {
530575
let mut lock = self.storage.write().unwrap();
531576
let storage = lock.take();
@@ -545,15 +590,15 @@ impl BaoFileHandle {
545590
///
546591
/// Caution: this is a reader for the unvalidated data file. Reading this
547592
/// can produce data that does not match the hash.
548-
pub fn data_reader(&self) -> DataReader {
593+
pub fn data_reader(&self) -> DataReader<T> {
549594
DataReader(Some(self.clone()))
550595
}
551596

552597
/// An AsyncSliceReader for the outboard file.
553598
///
554599
/// The outboard file is used to validate the data file. It is not guaranteed
555600
/// to be complete.
556-
pub fn outboard_reader(&self) -> OutboardReader {
601+
pub fn outboard_reader(&self) -> OutboardReader<T> {
557602
OutboardReader(Some(self.clone()))
558603
}
559604

@@ -567,7 +612,7 @@ impl BaoFileHandle {
567612
}
568613

569614
/// The outboard for the file.
570-
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
615+
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader<T>>> {
571616
let root = self.hash.into();
572617
let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
573618
let outboard = self.outboard_reader();
@@ -584,7 +629,7 @@ impl BaoFileHandle {
584629
}
585630

586631
/// Create a new writer from the handle.
587-
pub fn writer(&self) -> BaoFileWriter {
632+
pub fn writer(&self) -> BaoFileWriter<T> {
588633
BaoFileWriter(Some(self.clone()))
589634
}
590635

@@ -625,7 +670,7 @@ impl BaoFileHandle {
625670
}
626671

627672
/// Downgrade to a weak reference.
628-
pub fn downgrade(&self) -> BaoFileHandleWeak {
673+
pub fn downgrade(&self) -> BaoFileHandleWeak<T> {
629674
BaoFileHandleWeak(Arc::downgrade(&self.0))
630675
}
631676
}
@@ -676,9 +721,12 @@ impl MutableMemStorage {
676721
/// It is a BaoFileHandle wrapped in an Option, so that we can take it out
677722
/// in the future.
678723
#[derive(Debug)]
679-
pub struct BaoFileWriter(Option<BaoFileHandle>);
724+
pub struct BaoFileWriter<T>(Option<BaoFileHandle<T>>);
680725

681-
impl BaoBatchWriter for BaoFileWriter {
726+
impl<T> BaoBatchWriter for BaoFileWriter<T>
727+
where
728+
T: Send + Sync + bao_tree::io::sync::ReadAt + 'static,
729+
{
682730
async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> std::io::Result<()> {
683731
let Some(handle) = self.0.take() else {
684732
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
@@ -828,7 +876,11 @@ pub mod test_support {
828876
(outboard.root.into(), chunk_ranges, encoded)
829877
}
830878

831-
pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range<u64>]) {
879+
pub async fn validate(
880+
handle: &BaoFileHandle<std::fs::File>,
881+
original: &[u8],
882+
ranges: &[Range<u64>],
883+
) {
832884
let mut r = handle.data_reader();
833885
for range in ranges {
834886
let start = range.start;

0 commit comments

Comments
 (0)