Skip to content

Commit 8eb5733

Browse files
authored
feat: Read and seek (#119)
## Description Add an api for reading and seeking a blob using tokio::io::AsyncRead and tokio::io::AsyncSeek ## Breaking Changes Changes api::blobs::ExportRangesProgress::stream to be non-async - it does not have to be! ## Notes & open questions Note: the entire machinery of seek and read is horribly undocumented regarding the non happy case. So I went with the following approach: if you ever drop a future of either seek or read before completing, the reader becomes poisioned and will never work again, you have to grab a new one, which is cheap. This bypasses the whole question about what to do if somebody calls x without having completed y, and in normal usage you should never notice this. ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent 719cdb4 commit 8eb5733

File tree

5 files changed

+382
-9
lines changed

5 files changed

+382
-9
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ jobs:
143143
- uses: taiki-e/install-action@cross
144144

145145
- name: test
146-
run: cross test --all --target ${{ matrix.target }} -- --test-threads=12
146+
run: cross test --all --target ${{ matrix.target }} -- --test-threads=4
147147
env:
148148
RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }}
149149

src/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! with a remote store via rpc calls.
55
//!
66
//! The entry point for the api is the [`Store`] struct. There are several ways
7-
//! to obtain a `Store` instance: it is available via [`Deref`](std::ops::Deref)
7+
//! to obtain a `Store` instance: it is available via [`Deref`]
88
//! from the different store implementations
99
//! (e.g. [`MemStore`](crate::store::mem::MemStore)
1010
//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the

src/api/blobs.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
2929
use quinn::SendStream;
3030
use range_collections::{range_set::RangeSetRange, RangeSet2};
3131
use ref_cast::RefCast;
32+
use serde::{Deserialize, Serialize};
3233
use tokio::io::AsyncWriteExt;
3334
use tracing::trace;
35+
mod reader;
36+
pub use reader::BlobReader;
3437

3538
// Public reexports from the proto module.
3639
//
@@ -102,6 +105,38 @@ impl Blobs {
102105
})
103106
}
104107

108+
/// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
109+
/// and therefore can be used to read the blob's content.
110+
///
111+
/// Any access to parts of the blob that are not present will result in an error.
112+
///
113+
/// Example:
114+
/// ```rust
115+
/// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
116+
/// use tokio::io::AsyncReadExt;
117+
///
118+
/// # async fn example() -> anyhow::Result<()> {
119+
/// let store = MemStore::new();
120+
/// let tag = store.add_slice(b"Hello, world!").await?;
121+
/// let mut reader = store.reader(tag.hash);
122+
/// let mut buf = String::new();
123+
/// reader.read_to_string(&mut buf).await?;
124+
/// assert_eq!(buf, "Hello, world!");
125+
/// # Ok(())
126+
/// }
127+
/// ```
128+
pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
129+
self.reader_with_opts(ReaderOptions { hash: hash.into() })
130+
}
131+
132+
/// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
133+
/// and therefore can be used to read the blob's content.
134+
///
135+
/// Any access to parts of the blob that are not present will result in an error.
136+
pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
137+
BlobReader::new(self.clone(), options)
138+
}
139+
105140
/// Delete a blob.
106141
///
107142
/// This function is not public, because it does not work as expected when called manually,
@@ -647,6 +682,12 @@ impl<'a> AddProgress<'a> {
647682
}
648683
}
649684

685+
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
686+
#[derive(Debug, Clone, Serialize, Deserialize)]
687+
pub struct ReaderOptions {
688+
pub hash: Hash,
689+
}
690+
650691
/// An observe result. Awaiting this will return the current state.
651692
///
652693
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
@@ -856,7 +897,7 @@ impl ExportRangesProgress {
856897
/// range of 0..100, you will get the entire first chunk, 0..1024.
857898
///
858899
/// It is up to the caller to clip the ranges to the requested ranges.
859-
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
900+
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860901
Gen::new(|co| async move {
861902
let mut rx = match self.inner.await {
862903
Ok(rx) => rx,

0 commit comments

Comments
 (0)