Skip to content

Commit 2791f79

Browse files
committed
implement delete range
1 parent 26790ab commit 2791f79

File tree

9 files changed

+105
-53
lines changed

9 files changed

+105
-53
lines changed

src/rpc.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl<D: crate::store::Store> Handler<D> {
295295

296296
async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> {
297297
self.store()
298-
.set_tag(msg.name, None)
298+
.delete_tags(msg.from, msg.to)
299299
.await
300300
.map_err(|e| RpcError::new(&e))?;
301301
Ok(())
@@ -403,13 +403,11 @@ impl<D: crate::store::Store> Handler<D> {
403403
blobs.store().sync().await.map_err(|e| RpcError::new(&e))?;
404404
}
405405
if let Some(batch) = msg.batch {
406-
if let Some(content) = msg.value.as_ref() {
407-
blobs
408-
.batches()
409-
.await
410-
.remove_one(batch, content)
411-
.map_err(|e| RpcError::new(&*e))?;
412-
}
406+
blobs
407+
.batches()
408+
.await
409+
.remove_one(batch, &msg.value)
410+
.map_err(|e| RpcError::new(&*e))?;
413411
}
414412
Ok(())
415413
}
@@ -582,10 +580,7 @@ impl<D: crate::store::Store> Handler<D> {
582580
let HashAndFormat { hash, format } = *hash_and_format;
583581
let tag = match tag {
584582
SetTagOption::Named(tag) => {
585-
blobs
586-
.store()
587-
.set_tag(tag.clone(), Some(*hash_and_format))
588-
.await?;
583+
blobs.store().set_tag(tag.clone(), *hash_and_format).await?;
589584
tag
590585
}
591586
SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?,
@@ -774,10 +769,7 @@ impl<D: crate::store::Store> Handler<D> {
774769
let HashAndFormat { hash, format } = hash_and_format;
775770
let tag = match msg.tag {
776771
SetTagOption::Named(tag) => {
777-
blobs
778-
.store()
779-
.set_tag(tag.clone(), Some(hash_and_format))
780-
.await?;
772+
blobs.store().set_tag(tag.clone(), hash_and_format).await?;
781773
tag
782774
}
783775
SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?,
@@ -917,7 +909,7 @@ impl<D: crate::store::Store> Handler<D> {
917909
SetTagOption::Named(tag) => {
918910
blobs
919911
.store()
920-
.set_tag(tag.clone(), Some(*hash_and_format))
912+
.set_tag(tag.clone(), *hash_and_format)
921913
.await
922914
.map_err(|e| RpcError::new(&e))?;
923915
tag
@@ -932,7 +924,7 @@ impl<D: crate::store::Store> Handler<D> {
932924
for tag in tags_to_delete {
933925
blobs
934926
.store()
935-
.set_tag(tag, None)
927+
.delete_tags(Some(tag.clone()), Some(tag.successor()))
936928
.await
937929
.map_err(|e| RpcError::new(&e))?;
938930
}
@@ -969,7 +961,7 @@ impl<D: crate::store::Store> Handler<D> {
969961
progress.send(DownloadProgress::AllDone(stats)).await.ok();
970962
match tag {
971963
SetTagOption::Named(tag) => {
972-
self.store().set_tag(tag, Some(hash_and_format)).await?;
964+
self.store().set_tag(tag, hash_and_format).await?;
973965
}
974966
SetTagOption::Auto => {
975967
self.store().create_tag(hash_and_format).await?;

src/rpc/client/blobs/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ where
441441
.rpc
442442
.rpc(tags::SetRequest {
443443
name: tag,
444-
value: Some(tt.hash_and_format()),
444+
value: tt.hash_and_format(),
445445
batch: Some(self.0.batch),
446446
sync: SyncMode::Full,
447447
})

src/rpc/client/tags.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ pub struct DeleteOptions {
143143
}
144144

145145
impl DeleteOptions {
146+
/// Delete a single tag
146147
pub fn single(name: Tag) -> Self {
147148
Self {
148149
to: Some(name.successor()),

src/rpc/proto/tags.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ pub struct CreateRequest {
6060
pub struct SetRequest {
6161
/// Name of the tag
6262
pub name: Tag,
63-
/// Value of the tag, None to delete
64-
pub value: Option<HashAndFormat>,
63+
/// Value of the tag
64+
pub value: HashAndFormat,
6565
/// Batch to use, none for global
6666
pub batch: Option<BatchId>,
6767
/// Sync mode

src/store/fs.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ use std::{
6767
collections::{BTreeMap, BTreeSet},
6868
future::Future,
6969
io,
70+
ops::Bound,
7071
path::{Path, PathBuf},
7172
sync::{Arc, RwLock},
7273
time::{Duration, SystemTime},
@@ -608,10 +609,16 @@ pub(crate) enum ActorMessage {
608609
ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
609610
>,
610611
},
611-
/// Modification method: set a tag to a value, or remove it.
612+
/// Modification method: set a tag to a value.
612613
SetTag {
613614
tag: Tag,
614-
value: Option<HashAndFormat>,
615+
value: HashAndFormat,
616+
tx: oneshot::Sender<ActorResult<()>>,
617+
},
618+
/// Modification method: set a tag to a value.
619+
DeleteTags {
620+
from: Option<Tag>,
621+
to: Option<Tag>,
615622
tx: oneshot::Sender<ActorResult<()>>,
616623
},
617624
/// Modification method: create a new unique tag and set it to a value.
@@ -673,6 +680,7 @@ impl ActorMessage {
673680
| Self::CreateTag { .. }
674681
| Self::SetFullEntryState { .. }
675682
| Self::Delete { .. }
683+
| Self::DeleteTags { .. }
676684
| Self::GcDelete { .. } => MessageCategory::ReadWrite,
677685
Self::UpdateInlineOptions { .. }
678686
| Self::Sync { .. }
@@ -870,14 +878,22 @@ impl StoreInner {
870878
Ok(tags)
871879
}
872880

873-
async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
881+
async fn set_tag(&self, tag: Tag, value: HashAndFormat) -> OuterResult<()> {
874882
let (tx, rx) = oneshot::channel();
875883
self.tx
876884
.send(ActorMessage::SetTag { tag, value, tx })
877885
.await?;
878886
Ok(rx.await??)
879887
}
880888

889+
async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> OuterResult<()> {
890+
let (tx, rx) = oneshot::channel();
891+
self.tx
892+
.send(ActorMessage::DeleteTags { from, to, tx })
893+
.await?;
894+
Ok(rx.await??)
895+
}
896+
881897
async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
882898
let (tx, rx) = oneshot::channel();
883899
self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
@@ -1371,10 +1387,14 @@ impl super::Store for Store {
13711387
.await??)
13721388
}
13731389

1374-
async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
1390+
async fn set_tag(&self, name: Tag, hash: HashAndFormat) -> io::Result<()> {
13751391
Ok(self.0.set_tag(name, hash).await?)
13761392
}
13771393

1394+
async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
1395+
Ok(self.0.delete_tags(from, to).await?)
1396+
}
1397+
13781398
async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
13791399
Ok(self.0.create_tag(hash).await?)
13801400
}
@@ -1998,19 +2018,23 @@ impl ActorState {
19982018
Ok(tag)
19992019
}
20002020

2001-
fn set_tag(
2021+
fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> {
2022+
tables.tags.insert(tag, value)?;
2023+
Ok(())
2024+
}
2025+
2026+
fn delete_tags(
20022027
&self,
20032028
tables: &mut Tables,
2004-
tag: Tag,
2005-
value: Option<HashAndFormat>,
2029+
from: Option<Tag>,
2030+
to: Option<Tag>,
20062031
) -> ActorResult<()> {
2007-
match value {
2008-
Some(value) => {
2009-
tables.tags.insert(tag, value)?;
2010-
}
2011-
None => {
2012-
tables.tags.remove(tag)?;
2013-
}
2032+
let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
2033+
let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2034+
let removing = tables.tags.extract_from_if((from, to), |_, _| true)?;
2035+
// drain the iterator to actually remove the tags
2036+
for res in removing {
2037+
res?;
20142038
}
20152039
Ok(())
20162040
}
@@ -2358,6 +2382,10 @@ impl ActorState {
23582382
let res = self.set_tag(tables, tag, value);
23592383
tx.send(res).ok();
23602384
}
2385+
ActorMessage::DeleteTags { from, to, tx } => {
2386+
let res = self.delete_tags(tables, from, to);
2387+
tx.send(res).ok();
2388+
}
23612389
ActorMessage::CreateTag { hash, tx } => {
23622390
let res = self.create_tag(tables, hash);
23632391
tx.send(res).ok();

src/store/mem.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,28 @@ impl super::Store for Store {
207207
.await?
208208
}
209209

210-
async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
210+
async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> {
211211
let mut state = self.write_lock();
212-
if let Some(value) = value {
213-
state.tags.insert(name, value);
214-
} else {
215-
state.tags.remove(&name);
216-
}
212+
state.tags.insert(name, value);
213+
Ok(())
214+
}
215+
216+
async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
217+
let mut state = self.write_lock();
218+
// todo: more efficient impl
219+
state.tags.retain(|tag, _| {
220+
if let Some(from) = &from {
221+
if tag < from {
222+
return true;
223+
}
224+
}
225+
if let Some(to) = &to {
226+
if tag >= to {
227+
return true;
228+
}
229+
}
230+
false
231+
});
217232
Ok(())
218233
}
219234

src/store/readonly_mem.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,11 @@ impl super::Store for Store {
313313
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
314314
}
315315

316-
async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
316+
async fn set_tag(&self, _name: Tag, _hash: HashAndFormat) -> io::Result<()> {
317+
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
318+
}
319+
320+
async fn delete_tags(&self, _from: Option<Tag>, _to: Option<Tag>) -> io::Result<()> {
317321
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
318322
}
319323

src/store/traits.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,19 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
345345
fn set_tag(
346346
&self,
347347
name: Tag,
348-
hash: Option<HashAndFormat>,
348+
hash: HashAndFormat,
349+
) -> impl Future<Output = io::Result<()>> + Send;
350+
351+
/// Delete a single tag
352+
fn delete_tag(&self, name: Tag) -> impl Future<Output = io::Result<()>> + Send {
353+
self.delete_tags(Some(name.clone()), Some(name.successor()))
354+
}
355+
356+
/// Bulk delete tags
357+
fn delete_tags(
358+
&self,
359+
from: Option<Tag>,
360+
to: Option<Tag>,
349361
) -> impl Future<Output = io::Result<()>> + Send;
350362

351363
/// Create a new tag

tests/gc.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,15 @@ async fn gc_basics() -> Result<()> {
188188
// create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there.
189189
let tag = Tag::from("test");
190190
bao_store
191-
.set_tag(tag.clone(), Some(HashAndFormat::raw(h2)))
191+
.set_tag(tag.clone(), HashAndFormat::raw(h2))
192192
.await?;
193193
drop(tt2);
194194
tracing::info!("dropped tt2");
195195
step(&evs).await;
196196
assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete);
197197

198198
// delete the explicit tag, entry should be gone
199-
bao_store.set_tag(tag, None).await?;
199+
bao_store.delete_tag(tag).await?;
200200
step(&evs).await;
201201
assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound);
202202

@@ -234,7 +234,7 @@ async fn gc_hashseq_impl() -> Result<()> {
234234
// make a permanent tag for the link seq, then delete the temp tag. Entries should still be there.
235235
let tag = Tag::from("test");
236236
bao_store
237-
.set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr)))
237+
.set_tag(tag.clone(), HashAndFormat::hash_seq(hr))
238238
.await?;
239239
drop(ttr);
240240
step(&evs).await;
@@ -244,15 +244,15 @@ async fn gc_hashseq_impl() -> Result<()> {
244244

245245
// change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries.
246246
bao_store
247-
.set_tag(tag.clone(), Some(HashAndFormat::raw(hr)))
247+
.set_tag(tag.clone(), HashAndFormat::raw(hr))
248248
.await?;
249249
step(&evs).await;
250250
assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound);
251251
assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound);
252252
assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete);
253253

254254
// delete the permanent tag, everything should be gone
255-
bao_store.set_tag(tag, None).await?;
255+
bao_store.delete_tag(tag).await?;
256256
step(&evs).await;
257257
assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound);
258258
assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound);
@@ -339,7 +339,7 @@ async fn gc_file_basics() -> Result<()> {
339339
drop(tt2);
340340
let tag = Tag::from("test");
341341
bao_store
342-
.set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash())))
342+
.set_tag(tag.clone(), HashAndFormat::hash_seq(*ttr.hash()))
343343
.await?;
344344
drop(ttr);
345345

@@ -359,7 +359,7 @@ async fn gc_file_basics() -> Result<()> {
359359

360360
tracing::info!("changing tag from hashseq to raw, this should orphan the children");
361361
bao_store
362-
.set_tag(tag.clone(), Some(HashAndFormat::raw(hr)))
362+
.set_tag(tag.clone(), HashAndFormat::raw(hr))
363363
.await?;
364364

365365
// now only hr itself should be protected, but not its children
@@ -376,7 +376,7 @@ async fn gc_file_basics() -> Result<()> {
376376
assert!(!path(&hr).exists());
377377
assert!(!outboard_path(&hr).exists());
378378

379-
bao_store.set_tag(tag, None).await?;
379+
bao_store.delete_tag(tag).await?;
380380
step(&evs).await;
381381
bao_store.sync().await?;
382382
assert!(check_consistency(&bao_store).await? <= ReportLevel::Info);
@@ -504,7 +504,7 @@ async fn gc_file_stress() -> Result<()> {
504504
if i % 100 == 0 {
505505
let tag = Tag::from(format!("test{}", i));
506506
bao_store
507-
.set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash())))
507+
.set_tag(tag.clone(), HashAndFormat::raw(*tt.hash()))
508508
.await?;
509509
live.push(*tt.hash());
510510
} else {

0 commit comments

Comments
 (0)