Skip to content

Commit f32e43a

Browse files
committed
refactor!: move delete to ObjectStoreExt
The normal delete is really just a bulk delete with a single entry. Part of #385.
1 parent bb9021c commit f32e43a

File tree

14 files changed

+33
-106
lines changed

14 files changed

+33
-106
lines changed

src/aws/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,6 @@ impl ObjectStore for AmazonS3 {
250250
self.client.get_opts(location, options).await
251251
}
252252

253-
async fn delete(&self, location: &Path) -> Result<()> {
254-
self.client.request(Method::DELETE, location).send().await?;
255-
Ok(())
256-
}
257-
258253
fn delete_stream(
259254
&self,
260255
locations: BoxStream<'static, Result<Path>>,

src/azure/client.rs

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ pub(crate) enum Error {
7474
path: String,
7575
},
7676

77-
#[error("Error performing delete request {}: {}", path, source)]
78-
DeleteRequest {
79-
source: crate::client::retry::RetryError,
80-
path: String,
81-
},
82-
8377
#[error("Error performing bulk delete request: {}", source)]
8478
BulkDeleteRequest {
8579
source: crate::client::retry::RetryError,
@@ -150,9 +144,9 @@ pub(crate) enum Error {
150144
impl From<Error> for crate::Error {
151145
fn from(err: Error) -> Self {
152146
match err {
153-
Error::GetRequest { source, path }
154-
| Error::DeleteRequest { source, path }
155-
| Error::PutRequest { source, path } => source.error(STORE, path),
147+
Error::GetRequest { source, path } | Error::PutRequest { source, path } => {
148+
source.error(STORE, path)
149+
}
156150
_ => Self::Generic {
157151
store: STORE,
158152
source: Box::new(err),
@@ -627,36 +621,6 @@ impl AzureClient {
627621
.map_err(|source| Error::Metadata { source })?)
628622
}
629623

630-
/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
631-
pub(crate) async fn delete_request<T: Serialize + ?Sized + Sync>(
632-
&self,
633-
path: &Path,
634-
query: &T,
635-
) -> Result<()> {
636-
let credential = self.get_credential().await?;
637-
let url = self.config.path_url(path);
638-
639-
let sensitive = credential
640-
.as_deref()
641-
.map(|c| c.sensitive_request())
642-
.unwrap_or_default();
643-
self.client
644-
.delete(url.as_str())
645-
.query(query)
646-
.header(&DELETE_SNAPSHOTS, "include")
647-
.with_azure_authorization(&credential, &self.config.account)
648-
.retryable(&self.config.retry_config)
649-
.sensitive(sensitive)
650-
.send()
651-
.await
652-
.map_err(|source| {
653-
let path = path.as_ref().into();
654-
Error::DeleteRequest { source, path }
655-
})?;
656-
657-
Ok(())
658-
}
659-
660624
fn build_bulk_delete_body(
661625
&self,
662626
boundary: &str,

src/azure/credential.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use url::Url;
4747
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
4848
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
4949
pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type");
50-
pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots");
5150
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
5251
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
5352
static PARTNER_TOKEN: HeaderName = HeaderName::from_static("x-ms-partner-token");

src/azure/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,10 @@ impl ObjectStore for MicrosoftAzure {
116116
self.client.get_opts(location, options).await
117117
}
118118

119-
async fn delete(&self, location: &Path) -> Result<()> {
120-
self.client.delete_request(location, &()).await
121-
}
122-
123119
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
124120
self.client.list(prefix)
125121
}
122+
126123
fn delete_stream(
127124
&self,
128125
locations: BoxStream<'static, Result<Path>>,

src/chunked.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,6 @@ impl ObjectStore for ChunkedStore {
139139
self.inner.get_ranges(location, ranges).await
140140
}
141141

142-
async fn delete(&self, location: &Path) -> Result<()> {
143-
self.inner.delete(location).await
144-
}
145-
146142
fn delete_stream(
147143
&self,
148144
locations: BoxStream<'static, Result<Path>>,

src/gcp/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,6 @@ impl ObjectStore for GoogleCloudStorage {
180180
self.client.get_opts(location, options).await
181181
}
182182

183-
async fn delete(&self, location: &Path) -> Result<()> {
184-
self.client.delete_request(location).await
185-
}
186-
187183
fn delete_stream(
188184
&self,
189185
locations: BoxStream<'static, Result<Path>>,

src/http/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,6 @@ impl ObjectStore for HttpStore {
132132
self.client.get_opts(location, options).await
133133
}
134134

135-
async fn delete(&self, location: &Path) -> Result<()> {
136-
self.client.delete(location).await
137-
}
138-
139135
fn delete_stream(
140136
&self,
141137
locations: BoxStream<'static, Result<Path>>,

src/lib.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -834,9 +834,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
834834
.await
835835
}
836836

837-
/// Delete the object at the specified location.
838-
async fn delete(&self, location: &Path) -> Result<()>;
839-
840837
/// Delete all the objects at the specified locations
841838
///
842839
/// When supported, this method will use bulk operations that delete more
@@ -939,10 +936,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
939936
/// # todo!()
940937
/// # }
941938
/// #
942-
/// # async fn delete(&self, _: &Path) -> Result<()> {
943-
/// # todo!()
944-
/// # }
945-
/// #
946939
/// fn delete_stream(
947940
/// &self,
948941
/// locations: BoxStream<'static, Result<Path>>,
@@ -1100,10 +1093,6 @@ macro_rules! as_ref_impl {
11001093
self.as_ref().get_ranges(location, ranges).await
11011094
}
11021095

1103-
async fn delete(&self, location: &Path) -> Result<()> {
1104-
self.as_ref().delete(location).await
1105-
}
1106-
11071096
fn delete_stream(
11081097
&self,
11091098
locations: BoxStream<'static, Result<Path>>,
@@ -1243,6 +1232,9 @@ pub trait ObjectStoreExt: ObjectStore {
12431232

12441233
/// Return the metadata for the specified location
12451234
fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
1235+
1236+
/// Delete the object at the specified location.
1237+
fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
12461238
}
12471239

12481240
impl<T> ObjectStoreExt for T
@@ -1272,6 +1264,24 @@ where
12721264
let options = GetOptions::new().with_head(true);
12731265
Ok(self.get_opts(location, options).await?.meta)
12741266
}
1267+
1268+
async fn delete(&self, location: &Path) -> Result<()> {
1269+
let location = location.clone();
1270+
let mut stream =
1271+
self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed());
1272+
let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
1273+
store: "ext",
1274+
source: "`delete_stream` was supposed to yield once but didn't".into(),
1275+
})?;
1276+
if stream.next().await.is_some() {
1277+
Err(Error::Generic {
1278+
store: "ext",
1279+
source: "`delete_stream` yielded more than once".into(),
1280+
})
1281+
} else {
1282+
Ok(())
1283+
}
1284+
}
12751285
}
12761286

12771287
/// Result of a list call that includes objects, prefixes (directories) and a

src/limit.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,18 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
105105
self.inner.get_ranges(location, ranges).await
106106
}
107107

108-
async fn delete(&self, location: &Path) -> Result<()> {
109-
let _permit = self.semaphore.acquire().await.unwrap();
110-
self.inner.delete(location).await
111-
}
112-
113108
fn delete_stream(
114109
&self,
115110
locations: BoxStream<'static, Result<Path>>,
116111
) -> BoxStream<'static, Result<Path>> {
117-
self.inner.delete_stream(locations)
112+
let inner = Arc::clone(&self.inner);
113+
let fut = Arc::clone(&self.semaphore)
114+
.acquire_owned()
115+
.map(move |permit| {
116+
let s = inner.delete_stream(locations);
117+
PermitWrapper::new(s, permit.unwrap())
118+
});
119+
fut.into_stream().flatten().boxed()
118120
}
119121

120122
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {

src/local.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,14 +443,6 @@ impl ObjectStore for LocalFileSystem {
443443
.await
444444
}
445445

446-
async fn delete(&self, location: &Path) -> Result<()> {
447-
let config = Arc::clone(&self.config);
448-
let automatic_cleanup = self.automatic_cleanup;
449-
let location = location.clone();
450-
maybe_spawn_blocking(move || Self::delete_location(config, automatic_cleanup, &location))
451-
.await
452-
}
453-
454446
fn delete_stream(
455447
&self,
456448
locations: BoxStream<'static, Result<Path>>,

0 commit comments

Comments
 (0)