From 3b7eb103a8360ce7b78432aa9c358f5d9920e4c1 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 9 Feb 2026 15:09:00 -0500 Subject: [PATCH 1/3] remove obstore dep --- pyproject.toml | 71 ++++++++++++++++++++------------------------------ 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index be99e07..62a8fdd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,10 +3,7 @@ requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" [tool.hatch.build.targets.sdist] -exclude = [ - "/.github", - "/docs", -] +exclude = ["/.github", "/docs"] [project] name = "obspec-utils" @@ -17,22 +14,19 @@ requires-python = ">=3.11" license = "Apache-2.0" keywords = [] authors = [ - { name = "Max Jones", email = "14077947+maxrjones@users.noreply.github.com" }, + { name = "Max Jones", email = "14077947+maxrjones@users.noreply.github.com" }, ] classifiers = [ - "Development Status :: 4 - Beta", - "Programming Language :: Python", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: 3.13", - "Programming Language :: Python :: 3.14", - "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", -] -dependencies = [ - "obspec", - "obstore", + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", ] +dependencies = ["obspec"] [project.urls] Documentation = "https://obspec-utils.readthedocs.io" @@ -55,24 +49,17 @@ test = [ "s3fs", ] xarray = [ - "xarray", - "h5netcdf", - "h5py", - "cftime", - "scipy", - "rasterio", - "rioxarray", -] -dask = [ - "dask", -] -fsspec = [ - "s3fs", - "fsspec", -] -dev = [ - "ipykernel>=6.29.5", + "xarray", + "h5netcdf", + "h5py", + "cftime", + "scipy", + "rasterio", + "rioxarray", ] +dask = ["dask"] +fsspec = ["s3fs", "fsspec"] +dev = ["ipykernel>=6.29.5"] docs = [ "mkdocs-material[imaging]>=9.6.14", "mkdocs>=1.6.1", @@ -94,21 +81,19 @@ hooks.vcs.version-file = "src/obspec_utils/_version.py" source_pkgs = ["obspec_utils"] branch = true parallel = true -omit = [ - "src/obspec_utils/__about__.py", -] +omit = ["src/obspec_utils/__about__.py"] [tool.coverage.paths] obspec_utils = ["src/obspec_utils", "*/obspec-utils/src/obspec_utils"] [tool.coverage.report] -exclude_lines = [ - "no cov", - "if __name__ == .__main__.:", - "if TYPE_CHECKING:", -] +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] [tool.pytest.ini_options] markers = [ - "network: marks tests as requiring network access (run with --network)", + "network: marks tests as requiring network access (run with --network)", ] + +[tool.ruff] +select = ["ALL"] +ignore = ["EM", "TD"] From 511be86a6f29b8076f673ca00a6ee829b02ce0d8 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 9 Feb 2026 16:11:35 -0500 Subject: [PATCH 2/3] Overhaul AiohttpStore --- pyproject.toml | 4 +- src/obspec_utils/stores/_aiohttp.py | 618 ++++------------------------ 2 files changed, 93 insertions(+), 529 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 62a8fdd..b38e38a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ xarray = [ ] dask = ["dask"] fsspec = ["s3fs", "fsspec"] -dev = ["ipykernel>=6.29.5"] +dev = ["aiohttp>=3.13.3", "ipykernel>=7"] docs = [ "mkdocs-material[imaging]>=9.6.14", "mkdocs>=1.6.1", @@ -96,4 +96,4 @@ markers = [ [tool.ruff] select = ["ALL"] -ignore = ["EM", "TD"] +ignore = ["EM", "TD", "TRY003", "FIX002"] diff --git a/src/obspec_utils/stores/_aiohttp.py b/src/obspec_utils/stores/_aiohttp.py index fcb9986..be60bab 100644 --- a/src/obspec_utils/stores/_aiohttp.py +++ b/src/obspec_utils/stores/_aiohttp.py @@ -1,13 +1,12 @@ -""" -Aiohttp-based implementation of the ReadableStore protocol. +"""Aiohttp-based implementation of the ReadableStore protocol. -This module provides an alternative HTTP backend using aiohttp instead of obstore's HTTPStore. +This module provides an alternative HTTP backend using aiohttp instead of obstore's +HTTPStore. It's useful for generic HTTPS access (e.g., THREDDS, NASA data from outside AWS region) where obstore's HTTPStore (designed for WebDAV/S3-like semantics) may not be ideal. -Example +Example: ------- - ```python from obspec_utils import ObjectStoreRegistry from obspec_utils.stores import AiohttpStore @@ -21,126 +20,43 @@ resolved_store, path = registry.resolve("https://example.com/data/file.nc") data = await resolved_store.get_range_async(path, start=0, end=1000) ``` + """ from __future__ import annotations import asyncio -import threading -from collections.abc import AsyncIterator, Coroutine, Iterator, Sequence -from dataclasses import dataclass, field -from datetime import datetime, timezone -from typing import TYPE_CHECKING, TypeVar - -from obspec import GetResult, GetResultAsync - -from obspec_utils.protocols import ReadableStore - -if TYPE_CHECKING: - from obspec import Attributes, GetOptions, ObjectMeta +from typing import TYPE_CHECKING, Self, overload -T = TypeVar("T") +from obspec import GetRangeAsync, GetRangesAsync try: import aiohttp except ImportError as e: - raise ImportError( - "aiohttp is required for AiohttpStore. Install it with: pip install aiohttp" - ) from e - - -@dataclass -class AiohttpGetResult(GetResult): - """ - Result from a get request using aiohttp. - - Implements the obspec GetResult protocol for synchronous iteration. - """ - - _data: bytes - _meta: ObjectMeta - _attributes: Attributes = field(default_factory=dict) - _range: tuple[int, int] = (0, 0) - - def __post_init__(self): - if self._range == (0, 0): - self._range = (0, len(self._data)) + msg = "aiohttp is required for AiohttpStore. Install it with: pip install aiohttp" + raise ImportError(msg) from e - @property - def attributes(self) -> Attributes: - """Additional object attributes.""" - return self._attributes - - def buffer(self) -> bytes: - """Return the data as a buffer.""" - return self._data - - @property - def meta(self) -> ObjectMeta: - """The ObjectMeta for this object.""" - return self._meta - - @property - def range(self) -> tuple[int, int]: - """The range of bytes returned by this request.""" - return self._range - - def __iter__(self) -> Iterator[bytes]: - """Iterate over chunks of the data.""" - yield self._data - - -@dataclass -class AiohttpGetResultAsync(GetResultAsync): - """ - Result from an async get request using aiohttp. - - Implements the obspec GetResultAsync protocol for asynchronous iteration. - """ - - _data: bytes - _meta: ObjectMeta - _attributes: Attributes = field(default_factory=dict) - _range: tuple[int, int] = (0, 0) - - def __post_init__(self): - if self._range == (0, 0): - self._range = (0, len(self._data)) - - @property - def attributes(self) -> Attributes: - """Additional object attributes.""" - return self._attributes +if TYPE_CHECKING: + from collections.abc import Sequence - async def buffer_async(self) -> bytes: - """Return the data as a buffer.""" - return self._data + from aiohttp import ClientSession - @property - def meta(self) -> ObjectMeta: - """The ObjectMeta for this object.""" - return self._meta - @property - def range(self) -> tuple[int, int]: - """The range of bytes returned by this request.""" - return self._range +__all__ = ["AiohttpStore"] - async def __aiter__(self) -> AsyncIterator[bytes]: - """Async iterate over chunks of the data.""" - yield self._data +class AiohttpStore(GetRangeAsync, GetRangesAsync): + """An [aiohttp]-based object store implementation. -class AiohttpStore(ReadableStore): - """ - An [aiohttp](https://docs.aiohttp.org/en/stable/)-based object store implementation. + [aiohttp]: https://docs.aiohttp.org/en/stable/ - This provides a lightweight alternative to obstore's [HTTPStore][obstore.store.HTTPStore] for generic - HTTP/HTTPS access. It's particularly useful for: + This provides a lightweight alternative to obstore's + [HTTPStore][obstore.store.HTTPStore] for generic HTTP/HTTPS access. It's + particularly useful for: - THREDDS data servers - NASA data access from outside AWS regions - - Any generic HTTP endpoint that doesn't need S3-like semantics + - Any generic HTTP endpoint that doesn't provide S3-like semantics The store should be used as an async context manager to efficiently reuse a single HTTP session across multiple requests. @@ -156,7 +72,6 @@ class AiohttpStore(ReadableStore): Examples -------- - Recommended usage with async context manager: ```python @@ -169,14 +84,6 @@ class AiohttpStore(ReadableStore): chunk = await store.get_range_async("file.nc", start=0, end=1000) ``` - Synchronous usage (creates a session per request): - - ```python - store = AiohttpStore("https://example.com/data") - result = store.get("file.nc") - data = result.buffer() - ``` - With authentication: ```python @@ -186,251 +93,79 @@ class AiohttpStore(ReadableStore): ) as store: result = await store.get_async("protected/file.nc") ``` + """ + _session: ClientSession | None + + @overload def __init__( self, - base_url: str, + base_url_or_client: str, + *, + headers: dict[str, str] | None = None, + timeout: float = 30.0, + ) -> None: ... + @overload + def __init__( + self, + base_url_or_client: ClientSession, + *, + headers: None = None, + timeout: float = 30.0, + ) -> None: ... + def __init__( + self, + base_url_or_client: str | ClientSession, *, headers: dict[str, str] | None = None, timeout: float = 30.0, ) -> None: - self.base_url = base_url.rstrip("/") - self.headers = headers or {} - self.timeout = aiohttp.ClientTimeout(total=timeout) - self._session: aiohttp.ClientSession | None = None - # Event loop for sync methods when called from async context (e.g., Jupyter) - self._sync_loop: asyncio.AbstractEventLoop | None = None - self._sync_thread: threading.Thread | None = None - self._sync_lock = threading.Lock() - - async def __aenter__(self) -> "AiohttpStore": + if isinstance(base_url_or_client, aiohttp.ClientSession): + self._session = base_url_or_client + self.base_url = "" + self._user_provided_session = True + else: + self._session = None + self.base_url = base_url_or_client.rstrip("/") + self.headers = headers or {} + self.timeout = aiohttp.ClientTimeout(total=timeout) + self._user_provided_session = False + + async def __aenter__(self) -> Self: """Enter the async context manager, creating a reusable session.""" - self._session = aiohttp.ClientSession( - timeout=self.timeout, - headers=self.headers, - ) + if not self._user_provided_session: + self._session = aiohttp.ClientSession( + timeout=self.timeout, + headers=self.headers, + ) + return self - async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: ANN001 """Exit the async context manager, closing the session.""" + if self._user_provided_session: + # Don't close a user-provided session + return + if self._session is not None: await self._session.close() - self._session = None - - def _get_sync_loop(self) -> asyncio.AbstractEventLoop: - """Get or create event loop for sync operations when inside a running loop.""" - if self._sync_loop is None: - with self._sync_lock: - if self._sync_loop is None: - loop = asyncio.new_event_loop() - thread = threading.Thread( - target=loop.run_forever, - name=f"aiohttp_store_{id(self)}", - daemon=True, - ) - thread.start() - self._sync_loop = loop - self._sync_thread = thread - return self._sync_loop - - def _run_sync(self, coro: Coroutine[None, None, T]) -> T: - """Run coroutine synchronously, handling nested event loops (e.g., Jupyter).""" - try: - asyncio.get_running_loop() - except RuntimeError: - # No running loop - use asyncio.run() directly - return asyncio.run(coro) - - # Inside running loop - use store's dedicated loop - loop = self._get_sync_loop() - future = asyncio.run_coroutine_threadsafe(coro, loop) - return future.result() - - def _cleanup_sync_loop(self) -> None: - """Stop the sync loop and thread.""" - if self._sync_loop is not None: - self._sync_loop.call_soon_threadsafe(self._sync_loop.stop) - if self._sync_thread is not None: - self._sync_thread.join(timeout=1.0) - self._sync_loop = None - self._sync_thread = None - - def close(self) -> None: - """Close the store and release resources.""" - self._cleanup_sync_loop() - - def __del__(self) -> None: - """Clean up on garbage collection.""" - self._cleanup_sync_loop() def _build_url(self, path: str) -> str: """Build the full URL from base URL and path.""" path = path.removeprefix("/") return f"{self.base_url}/{path}" if path else self.base_url - def _get_header_case_insensitive( - self, headers: dict, name: str, default: str | None = None - ) -> str | None: - """Get a header value with case-insensitive name lookup.""" - # Try exact match first - if name in headers: - return headers[name] - # Try case-insensitive lookup - name_lower = name.lower() - for key, value in headers.items(): - if key.lower() == name_lower: - return value - return default - - def _parse_meta_from_headers( - self, path: str, headers: dict, content_length: int | None = None - ) -> ObjectMeta: - """Extract ObjectMeta from HTTP response headers.""" - # Parse last-modified header - last_modified_str = self._get_header_case_insensitive(headers, "Last-Modified") - if last_modified_str: - # Parse HTTP date format - try: - from email.utils import parsedate_to_datetime - - last_modified = parsedate_to_datetime(last_modified_str) - except (ValueError, TypeError): - last_modified = datetime.now(timezone.utc) - else: - last_modified = datetime.now(timezone.utc) - - # Get size from Content-Length or Content-Range - size = content_length or 0 - content_range = self._get_header_case_insensitive(headers, "Content-Range") - if content_range and "/" in content_range: - # Format: bytes 0-999/1234 - total_str = content_range.split("/")[-1] - if total_str != "*": - size = int(total_str) - else: - content_length_str = self._get_header_case_insensitive( - headers, "Content-Length" + def _get_valid_session(self) -> ClientSession: + """Assert that the session is valid for making requests.""" + if self._session is None: + msg = ( + "Aiohttp session not initialized.\n" + "Either provide a session or use as an async context manager." ) - if content_length_str: - size = int(content_length_str) - - return { - "path": path, - "last_modified": last_modified, - "size": size, - "e_tag": self._get_header_case_insensitive(headers, "ETag"), - "version": None, - } - - def _parse_attributes_from_headers(self, headers: dict) -> Attributes: - """Extract Attributes from HTTP response headers.""" - attrs: Attributes = {} - header_names = [ - "Content-Disposition", - "Content-Encoding", - "Content-Language", - "Content-Type", - "Cache-Control", - ] - for header_name in header_names: - value = self._get_header_case_insensitive(headers, header_name) - if value is not None: - attrs[header_name] = value - return attrs - - # --- Async methods (primary implementation) --- - - async def _do_get_async( - self, - session: aiohttp.ClientSession, - path: str, - *, - options: GetOptions | None = None, - ) -> AiohttpGetResultAsync: - """Internal method that performs the actual GET request.""" - url = self._build_url(path) - request_headers = {} if self._session else dict(self.headers) + raise RuntimeError(msg) - # Handle range option if specified - byte_range = (0, 0) - if options and "range" in options: - range_opt = options["range"] - if isinstance(range_opt, tuple): - start, end = range_opt[0], range_opt[1] - request_headers["Range"] = f"bytes={start}-{end - 1}" - byte_range = (start, end) - elif isinstance(range_opt, dict): - if (offset := range_opt.get("offset")) is not None: - request_headers["Range"] = f"bytes={offset}-" - elif (suffix := range_opt.get("suffix")) is not None: - request_headers["Range"] = f"bytes=-{suffix}" - - async with session.get(url, headers=request_headers) as response: - response.raise_for_status() - data = await response.read() - meta = self._parse_meta_from_headers( - path, dict(response.headers), len(data) - ) - attrs = self._parse_attributes_from_headers(dict(response.headers)) - - if byte_range == (0, 0): - byte_range = (0, len(data)) - - return AiohttpGetResultAsync( - _data=data, - _meta=meta, - _attributes=attrs, - _range=byte_range, - ) - - async def get_async( - self, - path: str, - *, - options: GetOptions | None = None, - ) -> AiohttpGetResultAsync: - """ - Download a file asynchronously. - - Parameters - ---------- - path - Path to the file relative to base_url. - options - Optional get options (range, conditionals, etc.). - - Returns - ------- - AiohttpGetResultAsync - Result object with buffer_async() method and metadata. - """ - if self._session is not None: - return await self._do_get_async(self._session, path, options=options) - - # Fallback: create a temporary session for this request - async with aiohttp.ClientSession( - timeout=self.timeout, headers=self.headers - ) as session: - return await self._do_get_async(session, path, options=options) - - async def _do_get_range_async( - self, - session: aiohttp.ClientSession, - path: str, - *, - start: int, - end: int, - ) -> bytes: - """Internal method that performs the actual range GET request.""" - url = self._build_url(path) - request_headers = {} if self._session else dict(self.headers) - # HTTP Range is inclusive on both ends, obspec end is exclusive - request_headers["Range"] = f"bytes={start}-{end - 1}" - - async with session.get(url, headers=request_headers) as response: - response.raise_for_status() - return await response.read() + return self._session async def get_range_async( self, @@ -440,8 +175,7 @@ async def get_range_async( end: int | None = None, length: int | None = None, ) -> bytes: - """ - Download a byte range asynchronously. + """Download a byte range asynchronously. Parameters ---------- @@ -458,22 +192,23 @@ async def get_range_async( ------- bytes The requested byte range. + """ if end is None and length is None: raise ValueError("Either 'end' or 'length' must be provided") if end is None: end = start + length # type: ignore[operator] - if self._session is not None: - return await self._do_get_range_async( - self._session, path, start=start, end=end - ) + session = self._get_valid_session() + + url = self._build_url(path) + request_headers = {} if self._session else dict(self.headers) + # HTTP Range is inclusive on both ends, obspec end is exclusive + request_headers["Range"] = f"bytes={start}-{end - 1}" - # Fallback: create a temporary session for this request - async with aiohttp.ClientSession( - timeout=self.timeout, headers=self.headers - ) as session: - return await self._do_get_range_async(session, path, start=start, end=end) + async with session.get(url, headers=request_headers) as response: + response.raise_for_status() + return await response.read() async def get_ranges_async( self, @@ -483,8 +218,7 @@ async def get_ranges_async( ends: Sequence[int] | None = None, lengths: Sequence[int] | None = None, ) -> Sequence[bytes]: - """ - Download multiple byte ranges asynchronously. + """Download multiple byte ranges asynchronously. Parameters ---------- @@ -501,186 +235,16 @@ async def get_ranges_async( ------- Sequence[bytes] The requested byte ranges. + """ if ends is None and lengths is None: raise ValueError("Either 'ends' or 'lengths' must be provided") if ends is None: - ends = [s + ln for s, ln in zip(starts, lengths)] # type: ignore[arg-type] - - if self._session is not None: - # Use managed session for all concurrent requests - tasks = [ - self._do_get_range_async(self._session, path, start=s, end=e) - for s, e in zip(starts, ends) - ] - return await asyncio.gather(*tasks) - - # Fallback: create a single temporary session for all requests - async with aiohttp.ClientSession( - timeout=self.timeout, headers=self.headers - ) as session: - tasks = [ - self._do_get_range_async(session, path, start=s, end=e) - for s, e in zip(starts, ends) - ] - return await asyncio.gather(*tasks) - - # --- Sync methods (wrap async) --- - - def get( - self, - path: str, - *, - options: GetOptions | None = None, - ) -> AiohttpGetResult: - """ - Download a file synchronously. - - This wraps the async implementation for convenience. - - Parameters - ---------- - path - Path to the file relative to base_url. - options - Optional get options. - - Returns - ------- - AiohttpGetResult - Result object with buffer() method and metadata. - """ - result = self._run_sync(self.get_async(path, options=options)) - return AiohttpGetResult( - _data=result._data, - _meta=result._meta, - _attributes=result._attributes, - _range=result._range, - ) - - def get_range( - self, - path: str, - *, - start: int, - end: int | None = None, - length: int | None = None, - ) -> bytes: - """ - Download a byte range synchronously. - - This wraps the async implementation for convenience. - - Parameters - ---------- - path - Path to the file relative to base_url. - start - Start byte offset. - end - End byte offset (exclusive). - length - Number of bytes to read. + ends = [s + ln for s, ln in zip(starts, lengths, strict=False)] # type: ignore[arg-type] - Returns - ------- - bytes - The requested byte range. - """ - return self._run_sync( - self.get_range_async(path, start=start, end=end, length=length) - ) - - def get_ranges( - self, - path: str, - *, - starts: Sequence[int], - ends: Sequence[int] | None = None, - lengths: Sequence[int] | None = None, - ) -> Sequence[bytes]: - """ - Download multiple byte ranges synchronously. - - This wraps the async implementation for convenience. - - Parameters - ---------- - path - Path to the file relative to base_url. - starts - Sequence of start byte offsets. - ends - Sequence of end byte offsets (exclusive). - lengths - Sequence of lengths. - - Returns - ------- - Sequence[bytes] - The requested byte ranges. - """ - return self._run_sync( - self.get_ranges_async(path, starts=starts, ends=ends, lengths=lengths) - ) - - # --- Head methods --- - - async def _do_head_async( - self, - session: aiohttp.ClientSession, - path: str, - ) -> ObjectMeta: - """Internal method that performs the actual HEAD request.""" - url = self._build_url(path) - request_headers = {} if self._session else dict(self.headers) - - async with session.head( - url, allow_redirects=True, headers=request_headers - ) as response: - response.raise_for_status() - return self._parse_meta_from_headers(path, dict(response.headers)) - - async def head_async(self, path: str) -> ObjectMeta: - """ - Get file metadata asynchronously via HEAD request. - - Parameters - ---------- - path - Path to the file relative to base_url. - - Returns - ------- - ObjectMeta - File metadata including size, last_modified, e_tag, etc. - """ - if self._session is not None: - return await self._do_head_async(self._session, path) - - # Fallback: create a temporary session for this request - async with aiohttp.ClientSession( - timeout=self.timeout, headers=self.headers - ) as session: - return await self._do_head_async(session, path) - - def head(self, path: str) -> ObjectMeta: - """ - Get file metadata synchronously via HEAD request. - - This wraps the async implementation for convenience. - - Parameters - ---------- - path - Path to the file relative to base_url. - - Returns - ------- - ObjectMeta - File metadata including size, last_modified, e_tag, etc. - """ - return self._run_sync(self.head_async(path)) - - -__all__ = ["AiohttpStore", "AiohttpGetResult", "AiohttpGetResultAsync"] + # TODO: coalesce ranges + tasks = [ + self.get_range_async(path, start=s, end=e) + for s, e in zip(starts, ends, strict=False) + ] + return await asyncio.gather(*tasks) From cb7bbec24cb51b53a9464610693eb8394773c653 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 9 Feb 2026 16:23:33 -0500 Subject: [PATCH 3/3] clean up get_ranges --- src/obspec_utils/stores/_aiohttp.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/obspec_utils/stores/_aiohttp.py b/src/obspec_utils/stores/_aiohttp.py index be60bab..669dfe0 100644 --- a/src/obspec_utils/stores/_aiohttp.py +++ b/src/obspec_utils/stores/_aiohttp.py @@ -237,14 +237,20 @@ async def get_ranges_async( The requested byte ranges. """ - if ends is None and lengths is None: + if ends is not None and lengths is not None: + raise ValueError("Only one of 'ends' or 'lengths' should be provided") + + if ends is not None: + tasks = [ + self.get_range_async(path, start=start, end=end) + for start, end in zip(starts, ends, strict=True) + ] + elif lengths is not None: + tasks = [ + self.get_range_async(path, start=start, length=length) + for start, length in zip(starts, lengths, strict=True) + ] + else: raise ValueError("Either 'ends' or 'lengths' must be provided") - if ends is None: - ends = [s + ln for s, ln in zip(starts, lengths, strict=False)] # type: ignore[arg-type] - - # TODO: coalesce ranges - tasks = [ - self.get_range_async(path, start=s, end=e) - for s, e in zip(starts, ends, strict=False) - ] + return await asyncio.gather(*tasks)