diff --git a/src/hash.rs b/src/hash.rs index ce87a06e8..dda748348 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -256,6 +256,12 @@ pub struct HashAndFormat { pub format: BlobFormat, } +impl From<Hash> for HashAndFormat { + fn from(hash: Hash) -> Self { + Self::raw(hash) + } +} + #[cfg(feature = "redb")] mod redb_support { use postcard::experimental::max_size::MaxSize; diff --git a/src/rpc.rs b/src/rpc.rs index a17cbb77c..d17364aec 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -30,7 +30,7 @@ use proto::{ }, tags::{ CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, - ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode, + ListRequest as TagListRequest, RenameRequest, SetRequest as TagsSetRequest, SyncMode, }, Request, RpcError, RpcResult, RpcService, }; @@ -158,6 +158,7 @@ impl<D: crate::store::Store> Handler<D> { Set(msg) => chan.rpc(msg, self, Self::tags_set).await, DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, + Rename(msg) => chan.rpc(msg, self, Self::tags_rename).await, } } @@ -295,7 +296,7 @@ impl<D: crate::store::Store> Handler<D> { async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> { self.store() - .set_tag(msg.name, None) + .delete_tags(msg.from, msg.to) .await .map_err(|e| RpcError::new(&e))?; Ok(()) @@ -313,7 +314,7 @@ impl<D: crate::store::Store> Handler<D> { tracing::info!("blob_list_tags"); let blobs = self; Gen::new(|co| async move { - let tags = blobs.store().tags().await.unwrap(); + let tags = blobs.store().tags(msg.from, msg.to).await.unwrap(); #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { @@ -382,6 +383,16 @@ impl<D: crate::store::Store> Handler<D> { rx.map(AddPathResponse) } + async fn tags_rename(self, msg: RenameRequest) -> RpcResult<()> { + let blobs = self; + blobs + .store() + .rename_tag(msg.from, msg.to) + .await + .map_err(|e| RpcError::new(&e))?; + Ok(()) + } + async fn tags_set(self, msg: TagsSetRequest) -> RpcResult<()> { let blobs = self; blobs @@ -393,13 +404,11 @@ impl<D: crate::store::Store> Handler<D> { blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; } if let Some(batch) = msg.batch { - if let Some(content) = msg.value.as_ref() { - blobs - .batches() - .await - .remove_one(batch, content) - .map_err(|e| RpcError::new(&*e))?; - } + blobs + .batches() + .await + .remove_one(batch, &msg.value) + .map_err(|e| RpcError::new(&*e))?; } Ok(()) } @@ -572,10 +581,7 @@ impl<D: crate::store::Store> Handler<D> { let HashAndFormat { hash, format } = *hash_and_format; let tag = match tag { SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(*hash_and_format)) - .await?; + blobs.store().set_tag(tag.clone(), *hash_and_format).await?; tag } SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, @@ -764,10 +770,7 @@ impl<D: crate::store::Store> Handler<D> { let HashAndFormat { hash, format } = hash_and_format; let tag = match msg.tag { SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(hash_and_format)) - .await?; + blobs.store().set_tag(tag.clone(), hash_and_format).await?; tag } SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?, @@ -907,7 +910,7 @@ impl<D: crate::store::Store> Handler<D> { SetTagOption::Named(tag) => { blobs .store() - .set_tag(tag.clone(), Some(*hash_and_format)) + .set_tag(tag.clone(), *hash_and_format) .await .map_err(|e| RpcError::new(&e))?; tag @@ -922,7 +925,7 @@ impl<D: crate::store::Store> Handler<D> { for tag in tags_to_delete { blobs .store() - .set_tag(tag, None) + .delete_tags(Some(tag.clone()), Some(tag.successor())) .await .map_err(|e| RpcError::new(&e))?; } @@ -959,7 +962,7 @@ impl<D: crate::store::Store> Handler<D> { progress.send(DownloadProgress::AllDone(stats)).await.ok(); match tag { SetTagOption::Named(tag) => { - self.store().set_tag(tag, Some(hash_and_format)).await?; + self.store().set_tag(tag, hash_and_format).await?; } SetTagOption::Auto => { self.store().create_tag(hash_and_format).await?; diff --git a/src/rpc/client/blobs/batch.rs b/src/rpc/client/blobs/batch.rs index b82f17837..5a964441e 100644 --- a/src/rpc/client/blobs/batch.rs +++ b/src/rpc/client/blobs/batch.rs @@ -441,7 +441,7 @@ where .rpc .rpc(tags::SetRequest { name: tag, - value: Some(tt.hash_and_format()), + value: tt.hash_and_format(), batch: Some(self.0.batch), sync: SyncMode::Full, }) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 103ecc618..595a139d3 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -2,25 +2,37 @@ //! //! The purpose of tags is to mark information as important to prevent it //! from being garbage-collected (if the garbage collector is turned on). -//! Currently this is used for blobs. //! -//! The main entry point is the [`Client`]. +//! A tag has a name that is an arbitrary byte string. In many cases this will be +//! a valid UTF8 string, but there are also use cases where it is useful to have +//! non string data like integer ids in the tag name. +//! +//! Tags point to a [`HashAndFormat`]. +//! +//! A tag can point to a hash with format [`BlobFormat::Raw`]. In that case it will +//! protect *just this blob* from being garbage-collected. //! -//! [`Client::list`] can be used to list all tags. -//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format. +//! It can also point to a hash in format [`BlobFormat::HashSeq`]. In that case it will +//! protect the blob itself and all hashes in the blob (the blob must be just a sequence of hashes). +//! Using this format it is possible to protect a large number of blobs with a single tag. //! -//! [`Client::delete`] can be used to delete a tag. +//! Tags can be created, read, renamed and deleted. Tags *do not* have to correspond to +//! already existing data. It is perfectly valid to create a tag for data you don't have yet. +//! +//! The main entry point is the [`Client`]. +use std::ops::{Bound, RangeBounds}; + use anyhow::Result; -use futures_lite::{Stream, StreamExt}; +use futures_lite::{io, Stream, StreamExt}; use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; use serde::{Deserialize, Serialize}; use crate::{ rpc::proto::{ - tags::{DeleteRequest, ListRequest}, + tags::{DeleteRequest, ListRequest, SetRequest, SyncMode}, RpcService, }, - BlobFormat, Hash, Tag, + BlobFormat, Hash, HashAndFormat, Tag, }; /// Iroh tags client. @@ -30,6 +42,147 @@ pub struct Client<C = BoxedConnector<RpcService>> { pub(super) rpc: RpcClient<RpcService, C>, } +/// Options for a list operation. +#[derive(Debug, Clone)] +pub struct ListOptions { + /// List tags to hash seqs + pub hash_seq: bool, + /// List tags to raw blobs + pub raw: bool, + /// Optional from tag (inclusive) + pub from: Option<Tag>, + /// Optional to tag (exclusive) + pub to: Option<Tag>, +} + +fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>) +where + R: RangeBounds<E>, + E: AsRef<[u8]>, +{ + let from = match range.start_bound() { + Bound::Included(start) => Some(Tag::from(start.as_ref())), + Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()), + Bound::Unbounded => None, + }; + let to = match range.end_bound() { + Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()), + Bound::Excluded(end) => Some(Tag::from(end.as_ref())), + Bound::Unbounded => None, + }; + (from, to) +} + +impl ListOptions { + /// List a range of tags + pub fn range<R, E>(range: R) -> Self + where + R: RangeBounds<E>, + E: AsRef<[u8]>, + { + let (from, to) = tags_from_range(range); + Self { + from, + to, + raw: true, + hash_seq: true, + } + } + + /// List tags with a prefix + pub fn prefix(prefix: &[u8]) -> Self { + let from = Tag::from(prefix); + let to = from.next_prefix(); + Self { + raw: true, + hash_seq: true, + from: Some(from), + to, + } + } + + /// List a single tag + pub fn single(name: &[u8]) -> Self { + let from = Tag::from(name); + Self { + to: Some(from.successor()), + from: Some(from), + raw: true, + hash_seq: true, + } + } + + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + from: None, + to: None, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + from: None, + to: None, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + from: None, + to: None, + } + } +} + +/// Options for a delete operation. +#[derive(Debug, Clone)] +pub struct DeleteOptions { + /// Optional from tag (inclusive) + pub from: Option<Tag>, + /// Optional to tag (exclusive) + pub to: Option<Tag>, +} + +impl DeleteOptions { + /// Delete a single tag + pub fn single(name: &[u8]) -> Self { + let name = Tag::from(name); + Self { + to: Some(name.successor()), + from: Some(name), + } + } + + /// Delete a range of tags + pub fn range<R, E>(range: R) -> Self + where + R: RangeBounds<E>, + E: AsRef<[u8]>, + { + let (from, to) = tags_from_range(range); + Self { from, to } + } + + /// Delete tags with a prefix + pub fn prefix(prefix: &[u8]) -> Self { + let from = Tag::from(prefix); + let to = from.next_prefix(); + Self { + from: Some(from), + to, + } + } +} + /// A client that uses the memory connector. pub type MemClient = Client<crate::rpc::MemConnector>; @@ -42,27 +195,123 @@ where Self { rpc } } + /// List all tags with options. + /// + /// This is the most flexible way to list tags. All the other list methods are just convenience + /// methods that call this one with the appropriate options. + pub async fn list_with_opts( + &self, + options: ListOptions, + ) -> Result<impl Stream<Item = Result<TagInfo>>> { + let stream = self + .rpc + .server_streaming(ListRequest::from(options)) + .await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// Set the value for a single tag + pub async fn set(&self, name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Result<()> { + self.rpc + .rpc(SetRequest { + name: Tag::from(name.as_ref()), + value: value.into(), + batch: None, + sync: SyncMode::Full, + }) + .await??; + Ok(()) + } + + /// Get the value of a single tag + pub async fn get(&self, name: impl AsRef<[u8]>) -> Result<Option<TagInfo>> { + let mut stream = self + .list_with_opts(ListOptions::single(name.as_ref())) + .await?; + stream.next().await.transpose() + } + + /// Rename a tag atomically + /// + /// If the tag does not exist, this will return an error. + pub async fn rename(&self, from: impl AsRef<[u8]>, to: impl AsRef<[u8]>) -> Result<()> { + let from = from.as_ref(); + let to = to.as_ref(); + let Some(old) = self.get(from.as_ref()).await? else { + return Err(io::Error::new(io::ErrorKind::NotFound, "Tag not found").into()); + }; + self.set(to.as_ref(), old.hash_and_format()).await?; + self.delete(from.as_ref()).await?; + Ok(()) + } + + /// List a range of tags + pub async fn list_range<R, E>(&self, range: R) -> Result<impl Stream<Item = Result<TagInfo>>> + where + R: RangeBounds<E>, + E: AsRef<[u8]>, + { + self.list_with_opts(ListOptions::range(range)).await + } + + /// Lists all tags with the given prefix. + pub async fn list_prefix( + &self, + prefix: impl AsRef<[u8]>, + ) -> Result<impl Stream<Item = Result<TagInfo>>> { + self.list_with_opts(ListOptions::prefix(prefix.as_ref())) + .await + } + /// Lists all tags. pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> { - let stream = self.rpc.server_streaming(ListRequest::all()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + self.list_with_opts(ListOptions::all()).await } /// Lists all tags with a hash_seq format. pub async fn list_hash_seq(&self) -> Result<impl Stream<Item = Result<TagInfo>>> { - let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + self.list_with_opts(ListOptions::hash_seq()).await } /// Deletes a tag. - pub async fn delete(&self, name: Tag) -> Result<()> { - self.rpc.rpc(DeleteRequest { name }).await??; + pub async fn delete_with_opts(&self, options: DeleteOptions) -> Result<()> { + self.rpc.rpc(DeleteRequest::from(options)).await??; Ok(()) } + + /// Deletes a tag. + pub async fn delete(&self, name: impl AsRef<[u8]>) -> Result<()> { + self.delete_with_opts(DeleteOptions::single(name.as_ref())) + .await + } + + /// Deletes a range of tags. + pub async fn delete_range<R, E>(&self, range: R) -> Result<()> + where + R: RangeBounds<E>, + E: AsRef<[u8]>, + { + self.delete_with_opts(DeleteOptions::range(range)).await + } + + /// Delete all tags with the given prefix. + pub async fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<()> { + self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref())) + .await + } + + /// Delete all tags. Use with care. After this, all data will be garbage collected. + pub async fn delete_all(&self) -> Result<()> { + self.delete_with_opts(DeleteOptions { + from: None, + to: None, + }) + .await + } } /// Information about a tag. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TagInfo { /// Name of the tag pub name: Tag, @@ -71,3 +320,24 @@ pub struct TagInfo { /// Hash of the data pub hash: Hash, } + +impl TagInfo { + /// Create a new tag info. + pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self { + let name = name.as_ref(); + let value = value.into(); + Self { + name: Tag::from(name), + hash: value.hash, + format: value.format, + } + } + + /// Get the hash and format of the tag. + pub fn hash_and_format(&self) -> HashAndFormat { + HashAndFormat { + hash: self.hash, + format: self.format, + } + } +} diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 54d35f625..f30547fa7 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -4,7 +4,11 @@ use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::{RpcResult, RpcService}; -use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag}; +use crate::{ + net_protocol::BatchId, + rpc::client::tags::{DeleteOptions, ListOptions, TagInfo}, + HashAndFormat, Tag, +}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] @@ -16,6 +20,8 @@ pub enum Request { #[rpc(response = RpcResult<()>)] Set(SetRequest), #[rpc(response = RpcResult<()>)] + Rename(RenameRequest), + #[rpc(response = RpcResult<()>)] DeleteTag(DeleteRequest), #[server_streaming(response = TagInfo)] ListTags(ListRequest), @@ -56,8 +62,8 @@ pub struct CreateRequest { pub struct SetRequest { /// Name of the tag pub name: Tag, - /// Value of the tag, None to delete - pub value: Option<HashAndFormat>, + /// Value of the tag + pub value: HashAndFormat, /// Batch to use, none for global pub batch: Option<BatchId>, /// Sync mode @@ -73,37 +79,46 @@ pub struct ListRequest { pub raw: bool, /// List hash seq tags pub hash_seq: bool, + /// From tag (inclusive) + pub from: Option<Tag>, + /// To tag (exclusive) + pub to: Option<Tag>, } -impl ListRequest { - /// List all tags - pub fn all() -> Self { +impl From<ListOptions> for ListRequest { + fn from(options: ListOptions) -> Self { Self { - raw: true, - hash_seq: true, + raw: options.raw, + hash_seq: options.hash_seq, + from: options.from, + to: options.to, } } +} - /// List raw tags - pub fn raw() -> Self { - Self { - raw: true, - hash_seq: false, - } - } +/// Delete a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRequest { + /// From tag (inclusive) + pub from: Option<Tag>, + /// To tag (exclusive) + pub to: Option<Tag>, +} - /// List hash seq tags - pub fn hash_seq() -> Self { +impl From<DeleteOptions> for DeleteRequest { + fn from(options: DeleteOptions) -> Self { Self { - raw: false, - hash_seq: true, + from: options.from, + to: options.to, } } } -/// Delete a tag +/// Rename a tag atomically #[derive(Debug, Serialize, Deserialize)] -pub struct DeleteRequest { - /// Name of the tag - pub name: Tag, +pub struct RenameRequest { + /// Old tag name + pub from: Tag, + /// New tag name + pub to: Tag, } diff --git a/src/store/fs.rs b/src/store/fs.rs index 8f6adb37f..091d674cf 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -67,6 +67,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, future::Future, io, + ops::Bound, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::{Duration, SystemTime}, @@ -85,7 +86,6 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tokio::io::AsyncWriteExt; use tracing::trace_span; - mod tables; #[doc(hidden)] pub mod test_support; @@ -601,17 +601,23 @@ pub(crate) enum ActorMessage { }, /// Bulk query method: get the entire tags table Tags { - #[debug(skip)] - filter: FilterPredicate<Tag, HashAndFormat>, + from: Option<Tag>, + to: Option<Tag>, #[allow(clippy::type_complexity)] tx: oneshot::Sender< ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>, >, }, - /// Modification method: set a tag to a value, or remove it. + /// Modification method: set a tag to a value. SetTag { tag: Tag, - value: Option<HashAndFormat>, + value: HashAndFormat, + tx: oneshot::Sender<ActorResult<()>>, + }, + /// Modification method: set a tag to a value. + DeleteTags { + from: Option<Tag>, + to: Option<Tag>, tx: oneshot::Sender<ActorResult<()>>, }, /// Modification method: create a new unique tag and set it to a value. @@ -619,6 +625,12 @@ pub(crate) enum ActorMessage { hash: HashAndFormat, tx: oneshot::Sender<ActorResult<Tag>>, }, + /// Modification method: rename a tag atomically. + RenameTag { + from: Tag, + to: Tag, + tx: oneshot::Sender<ActorResult<()>>, + }, /// Modification method: unconditional delete the data for a number of hashes Delete { hashes: Vec<Hash>, @@ -673,6 +685,8 @@ impl ActorMessage { | Self::CreateTag { .. } | Self::SetFullEntryState { .. } | Self::Delete { .. } + | Self::DeleteTags { .. } + | Self::RenameTag { .. } | Self::GcDelete { .. } => MessageCategory::ReadWrite, Self::UpdateInlineOptions { .. } | Self::Sync { .. } @@ -856,11 +870,13 @@ impl StoreInner { Ok(res) } - async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> { + async fn tags( + &self, + from: Option<Tag>, + to: Option<Tag>, + ) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> { let (tx, rx) = oneshot::channel(); - let filter: FilterPredicate<Tag, HashAndFormat> = - Box::new(|_i, k, v| Some((k.value(), v.value()))); - self.tx.send(ActorMessage::Tags { filter, tx }).await?; + self.tx.send(ActorMessage::Tags { from, to, tx }).await?; let tags = rx.await?; // transform the internal error type into io::Error let tags = tags? @@ -870,7 +886,7 @@ impl StoreInner { Ok(tags) } - async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> { + async fn set_tag(&self, tag: Tag, value: HashAndFormat) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx .send(ActorMessage::SetTag { tag, value, tx }) @@ -878,12 +894,28 @@ impl StoreInner { Ok(rx.await??) } + async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> OuterResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ActorMessage::DeleteTags { from, to, tx }) + .await?; + Ok(rx.await??) + } + async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::CreateTag { hash, tx }).await?; Ok(rx.await??) } + async fn rename_tag(&self, from: Tag, to: Tag) -> OuterResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ActorMessage::RenameTag { from, to, tx }) + .await?; + Ok(rx.await??) + } + async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::Delete { hashes, tx }).await?; @@ -1284,8 +1316,12 @@ impl super::ReadableStore for Store { Ok(Box::new(self.0.partial_blobs().await?.into_iter())) } - async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> { - Ok(Box::new(self.0.tags().await?.into_iter())) + async fn tags( + &self, + from: Option<Tag>, + to: Option<Tag>, + ) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> { + Ok(Box::new(self.0.tags(from, to).await?.into_iter())) } fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> { @@ -1371,14 +1407,22 @@ impl super::Store for Store { .await??) } - async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> { + async fn set_tag(&self, name: Tag, hash: HashAndFormat) -> io::Result<()> { Ok(self.0.set_tag(name, hash).await?) } + async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> { + Ok(self.0.delete_tags(from, to).await?) + } + async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> { Ok(self.0.create_tag(hash).await?) } + async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> { + Ok(self.0.rename_tag(from, to).await?) + } + async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> { Ok(self.0.delete(hashes).await?) } @@ -1966,23 +2010,21 @@ impl ActorState { fn tags( &mut self, tables: &impl ReadableTables, - filter: FilterPredicate<Tag, HashAndFormat>, + from: Option<Tag>, + to: Option<Tag>, ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> { let mut res = Vec::new(); - let mut index = 0u64; - #[allow(clippy::explicit_counter_loop)] - for item in tables.tags().iter()? { + let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded); + let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded); + for item in tables.tags().range((from, to))? { match item { Ok((k, v)) => { - if let Some(item) = filter(index, k, v) { - res.push(Ok(item)); - } + res.push(Ok((k.value(), v.value()))); } Err(e) => { res.push(Err(e)); } } - index += 1; } Ok(res) } @@ -1998,19 +2040,35 @@ impl ActorState { Ok(tag) } - fn set_tag( + fn rename_tag(&mut self, tables: &mut Tables, from: Tag, to: Tag) -> ActorResult<()> { + let value = tables + .tags + .get(from)? + .ok_or_else(|| { + ActorError::Io(io::Error::new(io::ErrorKind::NotFound, "tag not found")) + })? + .value(); + tables.tags.insert(to, value)?; + Ok(()) + } + + fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> { + tables.tags.insert(tag, value)?; + Ok(()) + } + + fn delete_tags( &self, tables: &mut Tables, - tag: Tag, - value: Option<HashAndFormat>, + from: Option<Tag>, + to: Option<Tag>, ) -> ActorResult<()> { - match value { - Some(value) => { - tables.tags.insert(tag, value)?; - } - None => { - tables.tags.remove(tag)?; - } + let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded); + let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded); + let removing = tables.tags.extract_from_if((from, to), |_, _| true)?; + // drain the iterator to actually remove the tags + for res in removing { + res?; } Ok(()) } @@ -2319,8 +2377,8 @@ impl ActorState { let res = self.blobs(tables, filter); tx.send(res).ok(); } - ActorMessage::Tags { filter, tx } => { - let res = self.tags(tables, filter); + ActorMessage::Tags { from, to, tx } => { + let res = self.tags(tables, from, to); tx.send(res).ok(); } ActorMessage::GcStart { tx } => { @@ -2358,10 +2416,18 @@ impl ActorState { let res = self.set_tag(tables, tag, value); tx.send(res).ok(); } + ActorMessage::DeleteTags { from, to, tx } => { + let res = self.delete_tags(tables, from, to); + tx.send(res).ok(); + } ActorMessage::CreateTag { hash, tx } => { let res = self.create_tag(tables, hash); tx.send(res).ok(); } + ActorMessage::RenameTag { from, to, tx } => { + let res = self.rename_tag(tables, from, to); + tx.send(res).ok(); + } ActorMessage::Delete { hashes, tx } => { let res = self.delete(tables, hashes, true); tx.send(res).ok(); diff --git a/src/store/mem.rs b/src/store/mem.rs index 89d8fffd7..105b8ddd5 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -17,6 +17,7 @@ use bao_tree::{ use bytes::{Bytes, BytesMut}; use futures_lite::{Stream, StreamExt}; use iroh_io::AsyncSliceReader; +use tracing::info; use super::{ temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode, @@ -207,13 +208,43 @@ impl super::Store for Store { .await? } - async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> { + async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> { let mut state = self.write_lock(); - if let Some(value) = value { - state.tags.insert(name, value); - } else { - state.tags.remove(&name); - } + let value = state.tags.remove(&from).ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("tag not found: {:?}", from), + ) + })?; + state.tags.insert(to, value); + Ok(()) + } + + async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> { + let mut state = self.write_lock(); + state.tags.insert(name, value); + Ok(()) + } + + async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> { + let mut state = self.write_lock(); + info!("deleting tags from {:?} to {:?}", from, to); + // state.tags.remove(&from.unwrap()); + // todo: more efficient impl + state.tags.retain(|tag, _| { + if let Some(from) = &from { + if tag < from { + return true; + } + } + if let Some(to) = &to { + if tag >= to { + return true; + } + } + info!(" removing {:?}", tag); + false + }); Ok(()) } @@ -427,10 +458,30 @@ impl ReadableStore for Store { )) } - async fn tags(&self) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> { + async fn tags( + &self, + from: Option<Tag>, + to: Option<Tag>, + ) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> { #[allow(clippy::mutable_key_type)] let tags = self.read_lock().tags.clone(); - Ok(Box::new(tags.into_iter().map(Ok))) + let tags = tags + .into_iter() + .filter(move |(tag, _)| { + if let Some(from) = &from { + if tag < from { + return false; + } + } + if let Some(to) = &to { + if tag >= to { + return false; + } + } + true + }) + .map(Ok); + Ok(Box::new(tags)) } fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> { diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index a04161554..ec8f1b5f3 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -232,7 +232,11 @@ impl ReadableStore for Store { )) } - async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> { + async fn tags( + &self, + _from: Option<Tag>, + _to: Option<Tag>, + ) -> io::Result<DbIter<(Tag, HashAndFormat)>> { Ok(Box::new(std::iter::empty())) } @@ -303,6 +307,10 @@ impl super::Store for Store { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } + async fn rename_tag(&self, _from: Tag, _to: Tag) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "not implemented")) + } + async fn import_stream( &self, data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send, @@ -313,7 +321,11 @@ impl super::Store for Store { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } - async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> { + async fn set_tag(&self, _name: Tag, _hash: HashAndFormat) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "not implemented")) + } + + async fn delete_tags(&self, _from: Option<Tag>, _to: Option<Tag>) -> io::Result<()> { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } diff --git a/src/store/traits.rs b/src/store/traits.rs index 01c48229d..1d0df2325 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -262,7 +262,11 @@ pub trait ReadableStore: Map { /// been imported, and hash sequences that have been created internally. fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send; /// list all tags (collections or other explicitly added things) in the database - fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send; + fn tags( + &self, + from: Option<Tag>, + to: Option<Tag>, + ) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send; /// Temp tags fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>; @@ -345,7 +349,22 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { fn set_tag( &self, name: Tag, - hash: Option<HashAndFormat>, + hash: HashAndFormat, + ) -> impl Future<Output = io::Result<()>> + Send; + + /// Rename a tag + fn rename_tag(&self, from: Tag, to: Tag) -> impl Future<Output = io::Result<()>> + Send; + + /// Delete a single tag + fn delete_tag(&self, name: Tag) -> impl Future<Output = io::Result<()>> + Send { + self.delete_tags(Some(name.clone()), Some(name.successor())) + } + + /// Bulk delete tags + fn delete_tags( + &self, + from: Option<Tag>, + to: Option<Tag>, ) -> impl Future<Output = io::Result<()>> + Send; /// Create a new tag @@ -623,7 +642,7 @@ pub(super) async fn gc_mark_task<'a>( } let mut roots = BTreeSet::new(); debug!("traversing tags"); - for item in store.tags().await? { + for item in store.tags(None, None).await? { let (name, haf) = item?; debug!("adding root {:?} {:?}", name, haf); roots.insert(haf); diff --git a/src/util.rs b/src/util.rs index b2c0d76a8..fcf3115bf 100644 --- a/src/util.rs +++ b/src/util.rs @@ -74,6 +74,18 @@ mod redb_support { } } +impl From<&[u8]> for Tag { + fn from(value: &[u8]) -> Self { + Self(Bytes::copy_from_slice(value)) + } +} + +impl AsRef<[u8]> for Tag { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + impl Borrow<[u8]> for Tag { fn borrow(&self) -> &[u8] { self.0.as_ref() @@ -132,6 +144,26 @@ impl Tag { i += 1; } } + + /// The successor of this tag in lexicographic order. + pub fn successor(&self) -> Self { + let mut bytes = self.0.to_vec(); + // increment_vec(&mut bytes); + bytes.push(0); + Self(bytes.into()) + } + + /// If this is a prefix, get the next prefix. + /// + /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte. + pub fn next_prefix(&self) -> Option<Self> { + let mut bytes = self.0.to_vec(); + if next_prefix(&mut bytes) { + Some(Self(bytes.into())) + } else { + None + } + } } /// Option for commands that allow setting a tag @@ -302,6 +334,22 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 { BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size() } +/// Given a prefix, increment it lexographically. +/// +/// If the prefix is all FF, this will return false because there is no +/// higher prefix than that. +#[allow(dead_code)] +pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool { + for byte in bytes.iter_mut().rev() { + if *byte < 255 { + *byte += 1; + return true; + } + *byte = 0; + } + false +} + /// Synchronously compute the outboard of a file, and return hash and outboard. /// /// It is assumed that the file is not modified while this is running. diff --git a/tests/gc.rs b/tests/gc.rs index ea4526027..dcf76b4ef 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -188,7 +188,7 @@ async fn gc_basics() -> Result<()> { // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) + .set_tag(tag.clone(), HashAndFormat::raw(h2)) .await?; drop(tt2); tracing::info!("dropped tt2"); @@ -196,7 +196,7 @@ async fn gc_basics() -> Result<()> { assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); // delete the explicit tag, entry should be gone - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); @@ -234,7 +234,7 @@ async fn gc_hashseq_impl() -> Result<()> { // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) + .set_tag(tag.clone(), HashAndFormat::hash_seq(hr)) .await?; drop(ttr); step(&evs).await; @@ -244,7 +244,7 @@ async fn gc_hashseq_impl() -> Result<()> { // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .set_tag(tag.clone(), HashAndFormat::raw(hr)) .await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); @@ -252,7 +252,7 @@ async fn gc_hashseq_impl() -> Result<()> { assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); // delete the permanent tag, everything should be gone - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); @@ -339,7 +339,7 @@ async fn gc_file_basics() -> Result<()> { drop(tt2); let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) + .set_tag(tag.clone(), HashAndFormat::hash_seq(*ttr.hash())) .await?; drop(ttr); @@ -359,7 +359,7 @@ async fn gc_file_basics() -> Result<()> { tracing::info!("changing tag from hashseq to raw, this should orphan the children"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .set_tag(tag.clone(), HashAndFormat::raw(hr)) .await?; // now only hr itself should be protected, but not its children @@ -376,7 +376,7 @@ async fn gc_file_basics() -> Result<()> { assert!(!path(&hr).exists()); assert!(!outboard_path(&hr).exists()); - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; bao_store.sync().await?; assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); @@ -504,7 +504,7 @@ async fn gc_file_stress() -> Result<()> { if i % 100 == 0 { let tag = Tag::from(format!("test{}", i)); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) + .set_tag(tag.clone(), HashAndFormat::raw(*tt.hash())) .await?; live.push(*tt.hash()); } else { diff --git a/tests/tags.rs b/tests/tags.rs new file mode 100644 index 000000000..8a4af2d54 --- /dev/null +++ b/tests/tags.rs @@ -0,0 +1,150 @@ +#![cfg(all(feature = "net_protocol", feature = "rpc"))] +use futures_lite::StreamExt; +use futures_util::Stream; +use iroh::Endpoint; +use iroh_blobs::{ + net_protocol::Blobs, + rpc::{ + client::tags::{self, TagInfo}, + proto::RpcService, + }, + Hash, HashAndFormat, +}; +use testresult::TestResult; + +async fn to_vec<T>(stream: impl Stream<Item = anyhow::Result<T>>) -> anyhow::Result<Vec<T>> { + let res = stream.collect::<Vec<_>>().await; + res.into_iter().collect::<anyhow::Result<Vec<_>>>() +} + +fn expected(tags: impl IntoIterator<Item = &'static str>) -> Vec<TagInfo> { + tags.into_iter() + .map(|tag| TagInfo::new(tag, Hash::new(tag))) + .collect() +} + +async fn set<C: quic_rpc::Connector<RpcService>>( + tags: &tags::Client<C>, + names: impl IntoIterator<Item = &str>, +) -> TestResult<()> { + for name in names { + tags.set(name, Hash::new(name)).await?; + } + Ok(()) +} + +async fn tags_smoke<C: quic_rpc::Connector<RpcService>>(tags: tags::Client<C>) -> TestResult<()> { + set(&tags, ["a", "b", "c", "d", "e"]).await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c", "d", "e"])); + + let stream = tags.list_range("b".."d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["b", "c"])); + + let stream = tags.list_range("b"..).await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["b", "c", "d", "e"])); + + let stream = tags.list_range(.."d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c"])); + + let stream = tags.list_range(..="d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c", "d"])); + + tags.delete_range("b"..).await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a"])); + + tags.delete_range(..="a").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected([])); + + set(&tags, ["a", "aa", "aaa", "aab", "b"]).await?; + + let stream = tags.list_prefix("aa").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["aa", "aaa", "aab"])); + + tags.delete_prefix("aa").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b"])); + + tags.delete_prefix("").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected([])); + + set(&tags, ["a", "b", "c"]).await?; + + assert_eq!( + tags.get("b").await?, + Some(TagInfo::new("b", Hash::new("b"))) + ); + + tags.delete("b").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "c"])); + + assert_eq!(tags.get("b").await?, None); + + tags.delete_all().await?; + + tags.set("a", HashAndFormat::hash_seq(Hash::new("a"))) + .await?; + tags.set("b", HashAndFormat::raw(Hash::new("b"))).await?; + let stream = tags.list_hash_seq().await?; + let res = to_vec(stream).await?; + assert_eq!( + res, + vec![TagInfo { + name: "a".into(), + hash: Hash::new("a"), + format: iroh_blobs::BlobFormat::HashSeq, + }] + ); + + tags.delete_all().await?; + set(&tags, ["c"]).await?; + tags.rename("c", "f").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!( + res, + vec![TagInfo { + name: "f".into(), + hash: Hash::new("c"), + format: iroh_blobs::BlobFormat::Raw, + }] + ); + + let res = tags.rename("y", "z").await; + assert!(res.is_err()); + Ok(()) +} + +#[tokio::test] +async fn tags_smoke_mem() -> TestResult<()> { + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::memory().build(&endpoint); + let client = blobs.client(); + tags_smoke(client.tags()).await +} + +#[tokio::test] +async fn tags_smoke_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::persistent(td.path().join("blobs.db")) + .await? + .build(&endpoint); + let client = blobs.client(); + tags_smoke(client.tags()).await +}