Skip to content

Commit 3cfc229

Browse files
crepererumalamb
andcommitted
refactor: introduce ObjectStoreExt trait
See #385. Co-authored-by: Andrew Lamb <[email protected]>
1 parent 1b8ecc7 commit 3cfc229

File tree

12 files changed

+57
-41
lines changed

12 files changed

+57
-41
lines changed

src/aws/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ impl PaginatedListStore for AmazonS3 {
498498
mod tests {
499499
use super::*;
500500
use crate::ClientOptions;
501+
use crate::ObjectStoreExt;
501502
use crate::client::SpawnedReqwestConnector;
502503
use crate::client::get::GetClient;
503504
use crate::client::retry::RetryContext;

src/azure/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ impl PaginatedListStore for MicrosoftAzure {
311311
#[cfg(test)]
312312
mod tests {
313313
use super::*;
314+
use crate::ObjectStoreExt;
314315
use crate::integration::*;
315316
use crate::tests::*;
316317
use bytes::Bytes;

src/buffered.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ impl AsyncBufRead for BufReader {
210210

211211
/// An async buffered writer compatible with the tokio IO traits
212212
///
213-
/// This writer adaptively uses [`ObjectStore::put`] or
213+
/// This writer adaptively uses [`ObjectStore::put_opts`] or
214214
/// [`ObjectStore::put_multipart`] depending on the amount of data that has
215215
/// been written.
216216
///
217217
/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
218-
/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
218+
/// using [`ObjectStore::put_opts`]. If `capacity` is exceeded, data will instead be
219219
/// streamed using [`ObjectStore::put_multipart`]
220220
pub struct BufWriter {
221221
capacity: usize,
@@ -242,7 +242,7 @@ enum BufWriterState {
242242
Prepare(BoxFuture<'static, crate::Result<WriteMultipart>>),
243243
/// Write to a multipart upload
244244
Write(Option<WriteMultipart>),
245-
/// [`ObjectStore::put`]
245+
/// [`ObjectStore::put_opts`]
246246
Flush(BoxFuture<'static, crate::Result<()>>),
247247
}
248248

@@ -489,7 +489,7 @@ mod tests {
489489
use super::*;
490490
use crate::memory::InMemory;
491491
use crate::path::Path;
492-
use crate::{Attribute, GetOptions};
492+
use crate::{Attribute, GetOptions, ObjectStoreExt};
493493
use itertools::Itertools;
494494
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
495495

src/chunked.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl ObjectStore for ChunkedStore {
186186
mod tests {
187187
use futures::StreamExt;
188188

189+
use crate::ObjectStoreExt;
189190
#[cfg(feature = "fs")]
190191
use crate::integration::*;
191192
#[cfg(feature = "fs")]

src/gcp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ impl PaginatedListStore for GoogleCloudStorage {
302302
mod test {
303303
use credential::DEFAULT_GCS_BASE_URL;
304304

305+
use crate::ObjectStoreExt;
305306
use crate::integration::*;
306307
use crate::tests::*;
307308

src/integration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::multipart::MultipartStore;
2929
use crate::path::Path;
3030
use crate::{
3131
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
32-
ObjectStore, PutMode, PutPayload, UpdateVersion, WriteMultipart,
32+
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
3333
};
3434
use bytes::Bytes;
3535
use futures::stream::FuturesUnordered;

src/lib.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@
252252
//!
253253
//! # Put Object
254254
//!
255-
//! Use the [`ObjectStore::put`] method to atomically write data.
255+
//! Use the [`ObjectStoreExt::put`] method to atomically write data.
256256
//!
257257
//! ```ignore-wasm32
258258
//! # use object_store::local::LocalFileSystem;
259-
//! # use object_store::{ObjectStore, PutPayload};
259+
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
260260
//! # use std::sync::Arc;
261261
//! # use object_store::path::Path;
262262
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
@@ -364,7 +364,7 @@
364364
//!
365365
//! ```ignore-wasm32
366366
//! # use object_store::local::LocalFileSystem;
367-
//! # use object_store::{ObjectStore, PutPayloadMut};
367+
//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
368368
//! # use std::sync::Arc;
369369
//! # use bytes::Bytes;
370370
//! # use tokio::io::AsyncWriteExt;
@@ -613,19 +613,24 @@ pub type DynObjectStore = dyn ObjectStore;
613613
pub type MultipartId = String;
614614

615615
/// Universal API to multiple object store services.
616+
///
617+
/// For more convience methods, check [`ObjectStoreExt`].
618+
///
619+
/// # Contract
620+
/// This trait is meant as a contract between object store implementations
621+
/// (e.g. providers, wrappers) and the `object_store` crate itself and is
622+
/// intended to be the minimum API required for an object store.
623+
///
624+
/// The [`ObjectStoreExt`] acts as an API/contract between `object_store`
625+
/// and the store users and provides additional methods that may be simpler to use but overlap
626+
/// in functionality with `ObjectStore`
616627
#[async_trait]
617628
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
618-
/// Save the provided bytes to the specified location
629+
/// Save the provided `payload` to `location` with the given options
619630
///
620631
/// The operation is guaranteed to be atomic, it will either successfully
621632
/// write the entirety of `payload` to `location`, or fail. No clients
622633
/// should be able to observe a partially written object
623-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
624-
self.put_opts(location, payload, PutOptions::default())
625-
.await
626-
}
627-
628-
/// Save the provided `payload` to `location` with the given options
629634
async fn put_opts(
630635
&self,
631636
location: &Path,
@@ -635,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
635640

636641
/// Perform a multipart upload
637642
///
638-
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
643+
/// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
639644
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
640645
///
641646
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
@@ -646,7 +651,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
646651

647652
/// Perform a multipart upload with options
648653
///
649-
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
654+
/// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
650655
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
651656
///
652657
/// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
@@ -892,7 +897,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
892897
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
893898
/// # let root = tempfile::TempDir::new().unwrap();
894899
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
895-
/// # use object_store::{ObjectStore, ObjectMeta};
900+
/// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
896901
/// # use object_store::path::Path;
897902
/// # use futures::{StreamExt, TryStreamExt};
898903
/// #
@@ -1103,10 +1108,6 @@ macro_rules! as_ref_impl {
11031108
($type:ty) => {
11041109
#[async_trait]
11051110
impl ObjectStore for $type {
1106-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1107-
self.as_ref().put(location, payload).await
1108-
}
1109-
11101111
async fn put_opts(
11111112
&self,
11121113
location: &Path,
@@ -1201,6 +1202,31 @@ macro_rules! as_ref_impl {
12011202
as_ref_impl!(Arc<dyn ObjectStore>);
12021203
as_ref_impl!(Box<dyn ObjectStore>);
12031204

1205+
/// Extension trait for [`ObjectStore`] with convinience functions.
1206+
///
1207+
/// See "contract" section within the [`ObjectStore`] documentation for more reasoning.
1208+
///
1209+
/// # Implementation
1210+
/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
1211+
pub trait ObjectStoreExt: ObjectStore {
1212+
/// Save the provided bytes to the specified location
1213+
///
1214+
/// The operation is guaranteed to be atomic, it will either successfully
1215+
/// write the entirety of `payload` to `location`, or fail. No clients
1216+
/// should be able to observe a partially written object
1217+
fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
1218+
}
1219+
1220+
impl<T> ObjectStoreExt for T
1221+
where
1222+
T: ObjectStore + ?Sized,
1223+
{
1224+
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1225+
self.put_opts(location, payload, PutOptions::default())
1226+
.await
1227+
}
1228+
}
1229+
12041230
/// Result of a list call that includes objects, prefixes (directories) and a
12051231
/// token for the next set of results. Individual result sets may be limited to
12061232
/// 1,000 objects based on the underlying object storage's limitations.

src/limit.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
7171

7272
#[async_trait]
7373
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
74-
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
75-
let _permit = self.semaphore.acquire().await.unwrap();
76-
self.inner.put(location, payload).await
77-
}
78-
7974
async fn put_opts(
8075
&self,
8176
location: &Path,

src/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ mod tests {
11241124
#[cfg(target_family = "unix")]
11251125
use tempfile::NamedTempFile;
11261126

1127-
use crate::integration::*;
1127+
use crate::{ObjectStoreExt, integration::*};
11281128

11291129
use super::*;
11301130

src/memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ impl MultipartUpload for InMemoryUpload {
550550

551551
#[cfg(test)]
552552
mod tests {
553-
use crate::integration::*;
553+
use crate::{ObjectStoreExt, integration::*};
554554

555555
use super::*;
556556

0 commit comments

Comments
 (0)