Skip to content

Conversation

@jaschrep-msft
Copy link
Member

Implements two foundation components to implement partitioned upload and copy.

  • PartitionedStream: Consumes a Box<dyn SeekableStream> and converts it to a Stream<Item = Result<Bytes, Error>> where each Ok(Bytes) returned is a contiguously buffered partition to be used for a put block or equivalent request.
  • run_all_with_concurrency_limit(): Takes a sequence of async jobs (impl FnOnce() -> impl Future<Output = Result<(), Error>>). These will be sequences of put block operations or equivalent requests.

@github-actions github-actions bot added the Storage Storage Service (Queues, Blobs, Files) label Nov 12, 2025
@jaschrep-msft jaschrep-msft marked this pull request as ready for review November 19, 2025 22:01
Copilot AI review requested due to automatic review settings November 19, 2025 22:01
Copilot finished reviewing on behalf of jaschrep-msft November 19, 2025 22:04
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements foundational components for partitioned upload and copy operations in Azure Storage Blob SDK, introducing stream partitioning and concurrent operation execution capabilities.

  • Adds PartitionedStream that converts a SeekableStream into partitioned Bytes chunks for block operations
  • Implements run_all_with_concurrency_limit() for executing async operations with configurable concurrency
  • Includes comprehensive test coverage for both components

Reviewed Changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated 16 comments.

Show a summary per file
File Description
sdk/storage/azure_storage_blob/src/streams/partitioned_stream.rs New implementation of PartitionedStream with Stream and FusedStream traits, plus test suite
sdk/storage/azure_storage_blob/src/streams/mod.rs Module declaration for streams
sdk/storage/azure_storage_blob/src/partitioned_transfer/mod.rs New implementation of run_all_with_concurrency_limit() with concurrency control logic and tests
sdk/storage/azure_storage_blob/src/lib.rs Module declarations for new partitioned_transfer and streams modules
sdk/storage/azure_storage_blob/Cargo.toml Added bytes and futures dependencies (and rand for tests)
Cargo.lock Lock file updates reflecting new dependencies

inner_complete: bool,
}

impl PartitionedStream {
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the new() constructor. Consider adding a doc comment that explains the parameters and the purpose of this constructor, especially the partition_len parameter which defines the size of each partition.

Suggested change
impl PartitionedStream {
impl PartitionedStream {
/// Creates a new `PartitionedStream` from a seekable stream.
///
/// # Parameters
/// - `inner`: The underlying seekable stream to partition.
/// - `partition_len`: The size (in bytes) of each partition. Must be greater than zero.
///
/// The stream will read data in chunks of up to `partition_len` bytes.

Copilot uses AI. Check for mistakes.

impl FusedStream for PartitionedStream {
fn is_terminated(&self) -> bool {
self.inner_complete && self.buf.is_empty()
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FusedStream implementation checks self.buf.is_empty() but should check self.buf_offset == 0 instead. A stream is terminated when the inner stream is complete AND the buffer has been fully consumed (offset is 0). The current implementation could incorrectly report termination when there's still buffered data to return.

Suggested change
self.inner_complete && self.buf.is_empty()
self.inner_complete && self.buf_offset == 0

Copilot uses AI. Check for mistakes.
};

type AzureResult<T> = azure_core::Result<T>;

Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for this function. Since it's a key foundation component for partitioned transfers, consider adding a doc comment that explains:

  • What the function does (executes async operations with concurrency limit)
  • The parameters (ops_queue and parallel)
  • The behavior (how operations are scheduled and errors handled)
  • Example usage if appropriate
Suggested change
/// Executes async operations from a queue with a concurrency limit.
///
/// This function consumes a stream (`ops_queue`) of async operation factories (closures returning futures),
/// and runs up to `parallel` operations concurrently. As operations complete, new ones are started from the queue,
/// maintaining the concurrency limit. If any operation or queue item returns an error, the function returns early
/// with that error. When all operations and queue items are complete, returns `Ok(())`.
///
/// # Parameters
/// - `ops_queue`: A stream yielding `Result<FnOnce() -> TFut, TErr>`. Each item is either a closure producing a future,
/// or an error. The stream must be `Unpin`.
/// - `parallel`: The maximum number of operations to run concurrently. Must be non-zero.
///
/// # Behavior
/// - Operations are scheduled as soon as possible, up to the concurrency limit.
/// - If an error is encountered in the queue or in any operation, the function returns that error immediately.
/// - When the queue is exhausted, waits for all running operations to complete before returning.
///
/// # Example
/// ```rust
/// use futures::{stream, StreamExt};
/// use std::num::NonZeroUsize;
///
/// async fn example() {
/// let ops = vec![
/// Ok(|| async { Ok(()) }),
/// Ok(|| async { Ok(()) }),
/// ];
/// let ops_stream = stream::iter(ops);
/// run_all_with_concurrency_limit(ops_stream, NonZeroUsize::new(2).unwrap()).await.unwrap();
/// }
/// ```
///
/// # Errors
/// Returns the first error encountered from the queue or any operation.
///
/// # Type Parameters
/// - `TFut`: Future type returned by each operation.
/// - `TErr`: Error type for queue or operation failures.

Copilot uses AI. Check for mistakes.
}

#[tokio::test]
async fn slow_stream() -> AzureResult<()> {
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name slow_stream is unclear. Consider renaming to something more descriptive that indicates what is being tested, such as handles_slow_stream or handles_delayed_stream_items.

Copilot generated this review using guidance from repository custom instructions.
}

#[tokio::test]
async fn partitions_exact_multiple() -> AzureResult<()> {
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name partitions_exact_multiple is unclear. Consider renaming to something more descriptive that indicates what is being tested, such as partitions_data_evenly_divisible_by_partition_size or handles_exact_multiple_partitions.

Copilot generated this review using guidance from repository custom instructions.
}

#[tokio::test]
async fn partitions_none() -> AzureResult<()> {
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name partitions_none is unclear. Consider renaming to something more descriptive that indicates what is being tested, such as handles_empty_stream or returns_none_for_empty_data.

Copilot generated this review using guidance from repository custom instructions.
use futures::{ready, stream::FusedStream, AsyncRead, Stream};

type AzureResult<T> = azure_core::Result<T>;

Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the public-facing (within crate) PartitionedStream struct. Consider adding a doc comment that explains its purpose: converting a SeekableStream into a stream of partitioned Bytes chunks.

Suggested change
/// Converts a `SeekableStream` into a stream of partitioned `Bytes` chunks.
///
/// This struct reads from a seekable stream and yields fixed-size byte chunks,
/// making it easier to process large streams in manageable partitions.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Storage Storage Service (Queues, Blobs, Files)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants