Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/obspec.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
::: obspec_utils.obspec.BufferedStoreReader
::: obspec_utils.obspec.EagerStoreReader
::: obspec_utils.obspec.ParallelStoreReader
6 changes: 5 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ data = await store.get_range_async(path, start=0, end=1000)
The file handlers provide file-like interfaces (read, seek, tell) for reading from object stores. They work with **any** ReadableStore implementation:

```python
from obspec_utils.obspec import BufferedStoreReader, EagerStoreReader
from obspec_utils.obspec import BufferedStoreReader, EagerStoreReader, ParallelStoreReader

# Works with obstore
from obstore.store import S3Store
Expand All @@ -99,6 +99,10 @@ reader.seek(0) # Seek back to start
# Eager reader loads entire file into memory
eager_reader = EagerStoreReader(store, "file.bin")
data = eager_reader.readall()

# Parallel reader uses get_ranges() for efficient multi-chunk fetching with LRU cache
parallel_reader = ParallelStoreReader(store, "file.bin", chunk_size=256*1024)
data = parallel_reader.read(1000)
```

## Contributing
Expand Down
196 changes: 194 additions & 2 deletions src/obspec_utils/obspec.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import io


from collections import OrderedDict
from typing import Protocol, runtime_checkable

from obspec import (
Expand Down Expand Up @@ -266,8 +265,201 @@ def tell(self) -> int:
return self._buffer.tell()


class ParallelStoreReader:
"""
A file-like reader that uses parallel range requests for efficient chunk fetching.

This reader divides the file into fixed-size chunks and uses `get_ranges()` to
fetch multiple chunks in parallel. An LRU cache stores recently accessed chunks
to avoid redundant fetches.

This is particularly efficient for workloads that access multiple non-contiguous
regions of a file, such as reading Zarr/HDF5 datasets.

Works with any ReadableStore protocol implementation.
"""

def __init__(
self,
store: ReadableStore,
path: str,
chunk_size: int = 256 * 1024,
max_cached_chunks: int = 64,
) -> None:
"""
Create a parallel reader with chunk-based caching.

Parameters
----------
store
Any object implementing the [ReadableStore][obspec_utils.obspec.ReadableStore] protocol.
path
The path to the file within the store.
chunk_size
Size of each chunk in bytes. Smaller chunks mean more granular caching
but potentially more requests.
max_cached_chunks
Maximum number of chunks to keep in the LRU cache.
"""
self._store = store
self._path = path
self._chunk_size = chunk_size
self._max_cached_chunks = max_cached_chunks
self._position = 0
self._size: int | None = None
# LRU cache: OrderedDict with chunk_index -> bytes
self._cache: OrderedDict[int, bytes] = OrderedDict()

def _get_size(self) -> int:
"""Lazily fetch the file size via a get() call."""
if self._size is None:
result = self._store.get(self._path)
self._size = result.meta["size"]
return self._size

def _get_chunks(self, chunk_indices: list[int]) -> dict[int, bytes]:
"""Fetch multiple chunks in parallel using get_ranges()."""
# Filter out already cached chunks
needed = [i for i in chunk_indices if i not in self._cache]

if needed:
file_size = self._get_size()
starts = []
lengths = []

for chunk_idx in needed:
start = chunk_idx * self._chunk_size
# Handle last chunk which may be smaller
end = min(start + self._chunk_size, file_size)
starts.append(start)
lengths.append(end - start)

# Fetch all chunks in parallel
results = self._store.get_ranges(self._path, starts=starts, lengths=lengths)

# Store in cache
for chunk_idx, data in zip(needed, results):
self._cache[chunk_idx] = bytes(data)
# Move to end (most recently used)
self._cache.move_to_end(chunk_idx)

# Evict oldest if over capacity
while len(self._cache) > self._max_cached_chunks:
self._cache.popitem(last=False)

# Return requested chunks from cache
return {i: self._cache[i] for i in chunk_indices}

def read(self, size: int = -1, /) -> bytes:
"""
Read up to `size` bytes from the file.

Parameters
----------
size
Number of bytes to read. If -1, read the entire file.

Returns
-------
bytes
The data read from the file.
"""
if size == -1:
return self.readall()

file_size = self._get_size()

# Clamp to remaining bytes
remaining = file_size - self._position
if size > remaining:
size = remaining
if size <= 0:
return b""

# Determine which chunks we need
start_chunk = self._position // self._chunk_size
end_pos = self._position + size
end_chunk = (end_pos - 1) // self._chunk_size

chunk_indices = list(range(start_chunk, end_chunk + 1))
chunks = self._get_chunks(chunk_indices)

# Assemble the result
result = io.BytesIO()
for chunk_idx in chunk_indices:
chunk_data = chunks[chunk_idx]
chunk_start = chunk_idx * self._chunk_size

# Calculate slice within this chunk
local_start = max(0, self._position - chunk_start)
local_end = min(len(chunk_data), end_pos - chunk_start)

result.write(chunk_data[local_start:local_end])

data = result.getvalue()
self._position += len(data)
return data

def readall(self) -> bytes:
"""
Read the entire file.

Returns
-------
bytes
The complete file contents.
"""
result = self._store.get(self._path)
data = bytes(result.buffer())
self._size = len(data)
self._position = len(data)
return data

def seek(self, offset: int, whence: int = 0, /) -> int:
"""
Move the file position.

Parameters
----------
offset
Position offset.
whence
Reference point: 0=start (SEEK_SET), 1=current (SEEK_CUR), 2=end (SEEK_END).

Returns
-------
int
The new absolute position.
"""
if whence == 0: # SEEK_SET
self._position = offset
elif whence == 1: # SEEK_CUR
self._position += offset
elif whence == 2: # SEEK_END
self._position = self._get_size() + offset
else:
raise ValueError(f"Invalid whence value: {whence}")

if self._position < 0:
self._position = 0

return self._position

def tell(self) -> int:
"""
Return the current file position.

Returns
-------
int
Current position in bytes from start of file.
"""
return self._position


__all__: list[str] = [
"ReadableStore",
"BufferedStoreReader",
"EagerStoreReader",
"ParallelStoreReader",
]
Loading
Loading