diff --git a/docs/api/obspec.md b/docs/api/obspec.md index cb37981..410706d 100644 --- a/docs/api/obspec.md +++ b/docs/api/obspec.md @@ -1,2 +1,3 @@ ::: obspec_utils.obspec.BufferedStoreReader ::: obspec_utils.obspec.EagerStoreReader +::: obspec_utils.obspec.ParallelStoreReader diff --git a/docs/index.md b/docs/index.md index 63e1f7a..619dad1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -80,7 +80,7 @@ data = await store.get_range_async(path, start=0, end=1000) The file handlers provide file-like interfaces (read, seek, tell) for reading from object stores. They work with **any** ReadableStore implementation: ```python -from obspec_utils.obspec import BufferedStoreReader, EagerStoreReader +from obspec_utils.obspec import BufferedStoreReader, EagerStoreReader, ParallelStoreReader # Works with obstore from obstore.store import S3Store @@ -99,6 +99,10 @@ reader.seek(0) # Seek back to start # Eager reader loads entire file into memory eager_reader = EagerStoreReader(store, "file.bin") data = eager_reader.readall() + +# Parallel reader uses get_ranges() for efficient multi-chunk fetching with LRU cache +parallel_reader = ParallelStoreReader(store, "file.bin", chunk_size=256*1024) +data = parallel_reader.read(1000) ``` ## Contributing diff --git a/src/obspec_utils/obspec.py b/src/obspec_utils/obspec.py index 5b24377..4009f1d 100644 --- a/src/obspec_utils/obspec.py +++ b/src/obspec_utils/obspec.py @@ -1,8 +1,7 @@ from __future__ import annotations import io - - +from collections import OrderedDict from typing import Protocol, runtime_checkable from obspec import ( @@ -266,8 +265,201 @@ def tell(self) -> int: return self._buffer.tell() +class ParallelStoreReader: + """ + A file-like reader that uses parallel range requests for efficient chunk fetching. + + This reader divides the file into fixed-size chunks and uses `get_ranges()` to + fetch multiple chunks in parallel. An LRU cache stores recently accessed chunks + to avoid redundant fetches. + + This is particularly efficient for workloads that access multiple non-contiguous + regions of a file, such as reading Zarr/HDF5 datasets. + + Works with any ReadableStore protocol implementation. + """ + + def __init__( + self, + store: ReadableStore, + path: str, + chunk_size: int = 256 * 1024, + max_cached_chunks: int = 64, + ) -> None: + """ + Create a parallel reader with chunk-based caching. + + Parameters + ---------- + store + Any object implementing the [ReadableStore][obspec_utils.obspec.ReadableStore] protocol. + path + The path to the file within the store. + chunk_size + Size of each chunk in bytes. Smaller chunks mean more granular caching + but potentially more requests. + max_cached_chunks + Maximum number of chunks to keep in the LRU cache. + """ + self._store = store + self._path = path + self._chunk_size = chunk_size + self._max_cached_chunks = max_cached_chunks + self._position = 0 + self._size: int | None = None + # LRU cache: OrderedDict with chunk_index -> bytes + self._cache: OrderedDict[int, bytes] = OrderedDict() + + def _get_size(self) -> int: + """Lazily fetch the file size via a get() call.""" + if self._size is None: + result = self._store.get(self._path) + self._size = result.meta["size"] + return self._size + + def _get_chunks(self, chunk_indices: list[int]) -> dict[int, bytes]: + """Fetch multiple chunks in parallel using get_ranges().""" + # Filter out already cached chunks + needed = [i for i in chunk_indices if i not in self._cache] + + if needed: + file_size = self._get_size() + starts = [] + lengths = [] + + for chunk_idx in needed: + start = chunk_idx * self._chunk_size + # Handle last chunk which may be smaller + end = min(start + self._chunk_size, file_size) + starts.append(start) + lengths.append(end - start) + + # Fetch all chunks in parallel + results = self._store.get_ranges(self._path, starts=starts, lengths=lengths) + + # Store in cache + for chunk_idx, data in zip(needed, results): + self._cache[chunk_idx] = bytes(data) + # Move to end (most recently used) + self._cache.move_to_end(chunk_idx) + + # Evict oldest if over capacity + while len(self._cache) > self._max_cached_chunks: + self._cache.popitem(last=False) + + # Return requested chunks from cache + return {i: self._cache[i] for i in chunk_indices} + + def read(self, size: int = -1, /) -> bytes: + """ + Read up to `size` bytes from the file. + + Parameters + ---------- + size + Number of bytes to read. If -1, read the entire file. + + Returns + ------- + bytes + The data read from the file. + """ + if size == -1: + return self.readall() + + file_size = self._get_size() + + # Clamp to remaining bytes + remaining = file_size - self._position + if size > remaining: + size = remaining + if size <= 0: + return b"" + + # Determine which chunks we need + start_chunk = self._position // self._chunk_size + end_pos = self._position + size + end_chunk = (end_pos - 1) // self._chunk_size + + chunk_indices = list(range(start_chunk, end_chunk + 1)) + chunks = self._get_chunks(chunk_indices) + + # Assemble the result + result = io.BytesIO() + for chunk_idx in chunk_indices: + chunk_data = chunks[chunk_idx] + chunk_start = chunk_idx * self._chunk_size + + # Calculate slice within this chunk + local_start = max(0, self._position - chunk_start) + local_end = min(len(chunk_data), end_pos - chunk_start) + + result.write(chunk_data[local_start:local_end]) + + data = result.getvalue() + self._position += len(data) + return data + + def readall(self) -> bytes: + """ + Read the entire file. + + Returns + ------- + bytes + The complete file contents. + """ + result = self._store.get(self._path) + data = bytes(result.buffer()) + self._size = len(data) + self._position = len(data) + return data + + def seek(self, offset: int, whence: int = 0, /) -> int: + """ + Move the file position. + + Parameters + ---------- + offset + Position offset. + whence + Reference point: 0=start (SEEK_SET), 1=current (SEEK_CUR), 2=end (SEEK_END). + + Returns + ------- + int + The new absolute position. + """ + if whence == 0: # SEEK_SET + self._position = offset + elif whence == 1: # SEEK_CUR + self._position += offset + elif whence == 2: # SEEK_END + self._position = self._get_size() + offset + else: + raise ValueError(f"Invalid whence value: {whence}") + + if self._position < 0: + self._position = 0 + + return self._position + + def tell(self) -> int: + """ + Return the current file position. + + Returns + ------- + int + Current position in bytes from start of file. + """ + return self._position + + __all__: list[str] = [ "ReadableStore", "BufferedStoreReader", "EagerStoreReader", + "ParallelStoreReader", ] diff --git a/tests/test_registry.py b/tests/test_registry.py index f4ff5ca..bf7cf9b 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -2,7 +2,14 @@ from obstore.store import MemoryStore from obspec_utils.registry import ObjectStoreRegistry -from obspec_utils.obspec import ReadableStore +from obspec_utils.obspec import ( + ReadableStore, + BufferedStoreReader, + EagerStoreReader, + ParallelStoreReader, +) + +ALL_READERS = [BufferedStoreReader, EagerStoreReader, ParallelStoreReader] def test_registry(): @@ -168,88 +175,133 @@ async def test_registry_with_async_operations(): assert bytes(result) == b"async" -class TestBufferedStoreReader: - """Tests for the BufferedStoreReader class.""" +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_basic_operations(ReaderClass): + """Test basic read, seek, tell operations for all readers.""" + memstore = MemoryStore() + memstore.put("test.txt", b"hello world from store reader") + + reader = ReaderClass(memstore, "test.txt") + + # Test read + assert reader.read(5) == b"hello" + assert reader.tell() == 5 + + # Test seek and read + reader.seek(6) + assert reader.read(5) == b"world" - def test_buffered_store_reader_with_obstore(self): - """Test BufferedStoreReader with an obstore MemoryStore.""" - from obspec_utils.obspec import BufferedStoreReader + # Test seek from current (SEEK_CUR) + reader.seek(-5, 1) + assert reader.read(5) == b"world" - memstore = MemoryStore() - memstore.put("test.txt", b"hello world from store reader") + # Test readall + reader.seek(0) + assert reader.readall() == b"hello world from store reader" - reader = BufferedStoreReader(memstore, "test.txt", buffer_size=10) - # Test read - assert reader.read(5) == b"hello" - assert reader.tell() == 5 +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_seek_end(ReaderClass): + """Test SEEK_END functionality for all readers.""" + memstore = MemoryStore() + memstore.put("test.txt", b"0123456789") - # Test seek and read - reader.seek(6) - assert reader.read(5) == b"world" + reader = ReaderClass(memstore, "test.txt") - # Test seek from current - reader.seek(-5, 1) # SEEK_CUR - assert reader.read(5) == b"world" + # Seek to 2 bytes before end + reader.seek(-2, 2) # SEEK_END + assert reader.read(2) == b"89" - # Test readall - reader.seek(0) - assert reader.readall() == b"hello world from store reader" - def test_buffered_store_reader_seek_end(self): - """Test SEEK_END functionality.""" - from obspec_utils.obspec import BufferedStoreReader +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_all_seek_modes(ReaderClass): + """Test all seek modes for all readers.""" + memstore = MemoryStore() + memstore.put("test.txt", b"0123456789ABCDEF") - memstore = MemoryStore() - memstore.put("test.txt", b"0123456789") + reader = ReaderClass(memstore, "test.txt") - reader = BufferedStoreReader(memstore, "test.txt") + # SEEK_SET + reader.seek(5) + assert reader.tell() == 5 + assert reader.read(3) == b"567" - # Seek to 2 bytes before end - reader.seek(-2, 2) # SEEK_END - assert reader.read(2) == b"89" + # SEEK_CUR + reader.seek(-3, 1) + assert reader.tell() == 5 + assert reader.read(3) == b"567" - def test_buffered_store_reader_buffering(self): - """Test that buffering works correctly.""" - from obspec_utils.obspec import BufferedStoreReader + # SEEK_END + reader.seek(-4, 2) + assert reader.read(4) == b"CDEF" - memstore = MemoryStore() - memstore.put("test.txt", b"0123456789ABCDEF") - # Small buffer size to test buffering behavior - reader = BufferedStoreReader(memstore, "test.txt", buffer_size=8) +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_read_past_end(ReaderClass): + """Test reading past end of file for all readers.""" + memstore = MemoryStore() + memstore.put("test.txt", b"short") - # First read should fetch buffer_size bytes - assert reader.read(2) == b"01" - # Second read should come from buffer - assert reader.read(2) == b"23" + reader = ReaderClass(memstore, "test.txt") + # For EagerStoreReader, read() returns what's available + # For others, they clamp to file size + data = reader.read(100) + assert data == b"short" -class TestEagerStoreReader: - """Tests for the EagerStoreReader class.""" - def test_eager_store_reader(self): - """Test EagerStoreReader with an obstore MemoryStore.""" - from obspec_utils.obspec import EagerStoreReader +def test_buffered_reader_buffering(): + """Test that BufferedStoreReader buffering works correctly.""" + memstore = MemoryStore() + memstore.put("test.txt", b"0123456789ABCDEF") + + # Small buffer size to test buffering behavior + reader = BufferedStoreReader(memstore, "test.txt", buffer_size=8) - memstore = MemoryStore() - memstore.put("test.txt", b"cached content here") + # First read should fetch buffer_size bytes + assert reader.read(2) == b"01" + # Second read should come from buffer + assert reader.read(2) == b"23" + + +def test_parallel_reader_cross_chunk_read(): + """Test ParallelStoreReader reading across chunk boundaries.""" + memstore = MemoryStore() + memstore.put("test.txt", b"0123456789ABCDEF") + + # Small chunk size to test cross-chunk reads + reader = ParallelStoreReader(memstore, "test.txt", chunk_size=4) + + # Read across chunk boundary (chunks are 0-3, 4-7, 8-11, 12-15) + reader.seek(2) + assert reader.read(6) == b"234567" # Spans chunks 0 and 1 + + # Read spanning multiple chunks + reader.seek(0) + assert reader.read(10) == b"0123456789" # Spans chunks 0, 1, and 2 + + +def test_parallel_reader_caching(): + """Test that ParallelStoreReader chunks are cached correctly.""" + memstore = MemoryStore() + memstore.put("test.txt", b"0123456789ABCDEF") - reader = EagerStoreReader(memstore, "test.txt") + reader = ParallelStoreReader( + memstore, "test.txt", chunk_size=4, max_cached_chunks=2 + ) - # Test read - assert reader.read(6) == b"cached" + # Read first chunk + reader.seek(0) + assert reader.read(4) == b"0123" - # Test seek and read - reader.seek(7) - assert reader.read(7) == b"content" + # Read second chunk + reader.seek(4) + assert reader.read(4) == b"4567" - # Test readall preserves position - pos_before = reader.tell() - data = reader.readall() - assert data == b"cached content here" - assert reader.tell() == pos_before + # Read third chunk - should evict first chunk from cache + reader.seek(8) + assert reader.read(4) == b"89AB" - # Test seek to start - reader.seek(0) - assert reader.read() == b"cached content here" + # Reading first chunk again should still work (refetched) + reader.seek(0) + assert reader.read(4) == b"0123" diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 45e5b47..6ef7e09 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -1,40 +1,41 @@ +import pytest import xarray as xr -from obspec_utils.obspec import BufferedStoreReader, EagerStoreReader +from obspec_utils.obspec import ( + BufferedStoreReader, + EagerStoreReader, + ParallelStoreReader, +) from obstore.store import LocalStore -def test_local_reader(local_netcdf4_file) -> None: - ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") - reader = BufferedStoreReader(store=LocalStore(), path=local_netcdf4_file) - ds_obstore = xr.open_dataset(reader, engine="h5netcdf") - xr.testing.assert_allclose(ds_fsspec, ds_obstore) +ALL_READERS = [BufferedStoreReader, EagerStoreReader, ParallelStoreReader] -def test_eager_reader(local_netcdf4_file) -> None: - """Test that EagerStoreReader works with xarray.""" +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_with_xarray(local_netcdf4_file, ReaderClass) -> None: + """Test that all readers work with xarray.""" ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") - reader = EagerStoreReader(store=LocalStore(), path=local_netcdf4_file) + reader = ReaderClass(store=LocalStore(), path=local_netcdf4_file) ds_obstore = xr.open_dataset(reader, engine="h5netcdf") xr.testing.assert_allclose(ds_fsspec, ds_obstore) -def test_eager_reader_interface(local_netcdf4_file) -> None: - """Test that EagerStoreReader implements the same interface as BufferedStoreReader.""" +@pytest.mark.parametrize("ReaderClass", [EagerStoreReader, ParallelStoreReader]) +def test_reader_interface_matches_buffered(local_netcdf4_file, ReaderClass) -> None: + """Test that readers implement the same interface as BufferedStoreReader.""" store = LocalStore() buffered_reader = BufferedStoreReader(store=store, path=local_netcdf4_file) - eager_reader = EagerStoreReader(store=store, path=local_netcdf4_file) + other_reader = ReaderClass(store=store, path=local_netcdf4_file) - # Test readall - data_buffered = buffered_reader.readall() - data_eager = eager_reader.readall() - assert data_buffered == data_eager - assert isinstance(data_eager, bytes) + assert buffered_reader.readall() == other_reader.readall() + assert isinstance(other_reader.readall(), bytes) -def test_eager_reader_multiple_reads(local_netcdf4_file) -> None: - """Test that EagerStoreReader can perform multiple reads.""" +@pytest.mark.parametrize("ReaderClass", ALL_READERS) +def test_reader_multiple_reads(local_netcdf4_file, ReaderClass) -> None: + """Test that readers can perform multiple reads with seek/tell.""" store = LocalStore() - reader = EagerStoreReader(store=store, path=local_netcdf4_file) + reader = ReaderClass(store=store, path=local_netcdf4_file) # Read the first 100 bytes chunk1 = reader.read(100)