diff --git a/docs/api/aiohttp.md b/docs/api/aiohttp.md index 9631889..f613926 100644 --- a/docs/api/aiohttp.md +++ b/docs/api/aiohttp.md @@ -1 +1,5 @@ -::: obspec_utils.aiohttp.AiohttpStore +::: obspec_utils.aiohttp + options: + members: true + filters: + - "!^_" # Exclude private members diff --git a/docs/api/obspec.md b/docs/api/obspec.md index 5ae5359..387764c 100644 --- a/docs/api/obspec.md +++ b/docs/api/obspec.md @@ -1,2 +1,5 @@ -::: obspec_utils.obspec.StoreReader -::: obspec_utils.obspec.StoreMemCacheReader +::: obspec_utils.obspec + options: + filters: + - "!^_" # Exclude private members + - "!ReadableStore*" diff --git a/docs/api/obstore.md b/docs/api/obstore.md index d31868e..7f3c235 100644 --- a/docs/api/obstore.md +++ b/docs/api/obstore.md @@ -1,2 +1,5 @@ -::: obspec_utils.obstore.ObstoreReader -::: obspec_utils.obstore.ObstoreMemCacheReader +::: obspec_utils.obstore + options: + members: true + filters: + - "!^_" # Exclude private members \ No newline at end of file diff --git a/docs/api/registry.md b/docs/api/registry.md index 4812ac0..6dfea15 100644 --- a/docs/api/registry.md +++ b/docs/api/registry.md @@ -1,2 +1,5 @@ -::: obspec_utils.registry.ObjectStoreRegistry -::: obspec_utils.registry.UrlKey +::: obspec_utils.registry + options: + members: true + filters: + - "!^_" # Exclude private members \ No newline at end of file diff --git a/docs/api/typing.md b/docs/api/typing.md index f6ab911..889bbff 100644 --- a/docs/api/typing.md +++ b/docs/api/typing.md @@ -1,4 +1,7 @@ ::: obspec_utils.obspec.ReadableStore -::: obspec_utils.typing.Url -::: obspec_utils.typing.Path +::: obspec_utils.typing + options: + members: true + filters: + - "!^_" # Exclude private members \ No newline at end of file diff --git a/docs/benchmark.md b/docs/benchmark.md new file mode 100644 index 0000000..92b1fb1 --- /dev/null +++ b/docs/benchmark.md @@ -0,0 +1,180 @@ +# Benchmarking + +`obspec-utils` includes a benchmark script for comparing the performance of different approaches to reading cloud-hosted data. + +## Benchmark Script + +The benchmark script compares fsspec, obstore readers, and VirtualiZarr + Icechunk approaches for reading NetCDF files from S3. + +??? note "View full script" + + ```python + --8<-- "scripts/benchmark_readers.py" + ``` + +## Running the Benchmark + +```bash +# Full benchmark with default settings +uv run scripts/benchmark_readers.py + +# Quick test with fewer files +uv run scripts/benchmark_readers.py --n-files 2 + +# Skip specific benchmarks +uv run scripts/benchmark_readers.py --skip fsspec_default obstore_eager + +# Label results for a specific environment +uv run scripts/benchmark_readers.py --environment cloud --description "AWS us-west-2" +``` + +## Benchmark Results + +```python exec="on" +import json +from pathlib import Path + +results_file = Path("scripts/benchmark_timings.json") + +if results_file.exists(): + with open(results_file) as f: + all_results = json.load(f) + + for env_name, env_data in all_results.items(): + print(f"### {env_data.get('description', env_name)}") + print() + print(f"- **Environment**: {env_data.get('environment', 'unknown')}") + print(f"- **Files tested**: {env_data.get('n_files', 'N/A')}") + print(f"- **Timestamp**: {env_data.get('timestamp', 'N/A')}") + print() + + timings = env_data.get("timings", {}) + if timings: + # Sort by total time + sorted_methods = sorted(timings.items(), key=lambda x: x[1].get("total", float("inf"))) + fastest_total = sorted_methods[0][1].get("total", 1) if sorted_methods else 1 + + print("| Method | Open | Spatial | Time Slice | Timeseries | **Total** |") + print("|--------|-----:|--------:|-----------:|-----------:|----------:|") + + for method, times in sorted_methods: + total = times.get("total", 0) + speedup = total / fastest_total if fastest_total > 0 else 1 + speedup_str = " ⚡" if speedup <= 1.01 else f" ({speedup:.1f}x)" + + print( + f"| {method} | " + f"{times.get('open', 0):.2f}s | " + f"{times.get('spatial_subset_load', 0):.2f}s | " + f"{times.get('time_slice_load', 0):.2f}s | " + f"{times.get('timeseries_load', 0):.2f}s | " + f"**{total:.2f}s**{speedup_str} |" + ) + + print() + print("*All times in seconds. Lower is better.*") + print() +else: + print("*No benchmark results available. Run the benchmark script to generate results.*") +``` + + +## Command Line Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--environment` | `local` | Label for this run (`local` or `cloud`) | +| `--description` | auto | Description for this run | +| `--n-files` | 5 | Number of files to test with | +| `--output` | `benchmark_timings.json` | Output JSON file | +| `--skip` | `fsspec_default` | Benchmarks to skip | + +## Benchmarked Methods + +| Method | Description | +|--------|-------------| +| `fsspec_default_cache` | fsspec with default caching strategy | +| `fsspec_block_cache` | fsspec with 8MB block cache | +| `obstore_reader` | Basic `ObstoreReader` with buffered reads | +| `obstore_eager` | `ObstoreEagerReader` - loads entire file into memory | +| `obstore_prefetch` | `ObstorePrefetchReader` - background prefetching | +| `obstore_parallel` | `ObstoreParallelReader` - parallel range fetching | +| `obstore_hybrid` | `ObstoreHybridReader` - exponential readahead + parallel fetching | +| `virtualzarr_icechunk` | VirtualiZarr + Icechunk for virtual Zarr stores | + + +## File Handlers Comparison + +### ObstoreReader + +The basic reader with configurable buffer size. Best for simple sequential reads. + +```python +from obspec_utils import ObstoreReader + +reader = ObstoreReader(store, path, buffer_size=1024*1024) +``` + +### ObstoreEagerReader + +Loads the entire file into memory before reading. Best when files will be read multiple times and are small enough to fit in memory. + +```python +from obspec_utils import ObstoreEagerReader + +reader = ObstoreEagerReader(store, path) +``` + +### ObstorePrefetchReader + +Prefetches upcoming byte ranges in background threads. Best for sequential read patterns. + +```python +from obspec_utils import ObstorePrefetchReader + +reader = ObstorePrefetchReader( + store, path, + prefetch_size=4*1024*1024, # 4 MB ahead + chunk_size=1024*1024, # 1 MB chunks + max_workers=2, +) +``` + +### ObstoreParallelReader + +Fetches multiple byte ranges in parallel using `get_ranges`. Best for random access patterns. + +```python +from obspec_utils import ObstoreParallelReader + +reader = ObstoreParallelReader( + store, path, + chunk_size=1024*1024, # 1 MB chunks + batch_size=16, # Up to 16 parallel fetches +) +``` + +### ObstoreHybridReader + +Combines exponential readahead (for metadata) with parallel chunk fetching (for data). Best for HDF5/NetCDF files. + +```python +from obspec_utils import ObstoreHybridReader + +reader = ObstoreHybridReader( + store, path, + initial_readahead=32*1024, # Start with 32 KB + readahead_multiplier=2.0, # Double each time + chunk_size=1024*1024, # 1 MB chunks for data +) +``` + +## Choosing the Right Reader + +| Use Case | Recommended Reader | +|----------|-------------------| +| Small files, repeated access | `ObstoreEagerReader` | +| Sequential reads, streaming | `ObstorePrefetchReader` | +| Random access, array chunks | `ObstoreParallelReader` | +| HDF5/NetCDF files | `ObstoreHybridReader` | +| Simple, one-time reads | `ObstoreReader` | diff --git a/docs/index.md b/docs/index.md index 4fd73bf..1558b0e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,7 +10,12 @@ Utilities for interacting with object storage, based on [obspec](https://github. 2. **ReadableStore Protocol**: A minimal protocol defining the read-only interface required for object storage access. This allows alternative backends (like aiohttp) to be used instead of obstore. -3. **File Handlers**: Wrappers around obstore's file reading capabilities that provide a familiar file-like interface. +3. **File Handlers**: Wrappers around obstore's file reading capabilities that provide a familiar file-like interface, making it easy to integrate with libraries that expect standard Python file objects: + - `ObstoreReader`: Basic reader with buffered reads + - `ObstoreEagerReader`: Eagerly loads entire file into memory + - `ObstorePrefetchReader`: Background prefetching for sequential reads + - `ObstoreParallelReader`: Parallel range fetching for random access + - `ObstoreHybridReader`: Combines exponential readahead with parallel fetching ## Design Philosophy @@ -110,8 +115,9 @@ data = cached_reader.readall() For maximum performance with obstore, use the obstore-specific readers which leverage obstore's native `ReadableFile`: ```python +import xarray as xr from obstore.store import S3Store -from obspec_utils.obstore import ObstoreReader, ObstoreMemCacheReader +from obspec_utils.obstore import ObstoreReader, ObstoreMemCacheReader, ObstoreEagerReader, ObstoreHybridReader store = S3Store(bucket="my-bucket") @@ -119,11 +125,17 @@ store = S3Store(bucket="my-bucket") reader = ObstoreReader(store, "path/to/file.bin", buffer_size=1024*1024) data = reader.read(100) -# Uses obstore's MemoryStore for caching -cached_reader = ObstoreMemCacheReader(store, "path/to/file.bin") -data = cached_reader.readall() +# Memory-cached reader for repeated access +cached_reader = ObstoreEagerReader(store, "path/to/file.bin") +data = cached_reader.readall() # Read entire file from memory cache + +# Hybrid reader for HDF5/NetCDF files (recommended for xarray) +with ObstoreHybridReader(store, "path/to/file.nc") as reader: + ds = xr.open_dataset(reader, engine="h5netcdf") ``` +See the [Benchmark](benchmark.md) page for performance comparisons between the different readers. + ## Contributing 1. Clone the repository: `git clone https://github.com/virtual-zarr/obspec-utils.git` diff --git a/mkdocs.yml b/mkdocs.yml index c7c60d1..d01cc18 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -14,6 +14,7 @@ extra: nav: - "index.md" + - "benchmark.md" - "API": - Typing: "api/typing.md" - Aiohttp Adapters: "api/aiohttp.md" diff --git a/scripts/benchmark_readers.py b/scripts/benchmark_readers.py new file mode 100644 index 0000000..c0a89df --- /dev/null +++ b/scripts/benchmark_readers.py @@ -0,0 +1,683 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "obspec-utils", +# "obstore", +# "fsspec", +# "s3fs", +# "xarray", +# "h5netcdf", +# "icechunk", +# "zarr>=3", +# "numpy", +# "dask", +# ] +# +# [tool.uv.sources] +# obspec-utils = { path = ".." } +# /// +""" +Benchmark script comparing different approaches for reading cloud-hosted data. +""" + +from __future__ import annotations + +import argparse +import json +import time +import warnings +from collections.abc import Callable +from dataclasses import dataclass, field +from pathlib import Path +from time import perf_counter +from typing import Any + +import fsspec +import xarray as xr +import zarr + +warnings.filterwarnings( + "ignore", + message="Numcodecs codecs are not in the Zarr version 3 specification*", + category=UserWarning, +) + +# Configure zarr for better async performance +zarr.config.set({"threading.max_workers": 32, "async.concurrency": 128}) + + +# ============================================================================= +# Configuration +# ============================================================================= + +# S3 bucket and path for NLDAS3 data +BUCKET = "nasa-waterinsight" +PREFIX = "NLDAS3/forcing/daily" +REGION = "us-west-2" + +# Test parameters for consistent benchmarking +# Note: time selections use method="nearest" for robustness with varying file counts +SPATIAL_SUBSET_KWARGS = { + "lat": slice(10, 15), + "lon": slice(-60, -55), +} +TIME_SLICE_KWARGS = {"time": "2001-01-05", "method": "nearest"} +SPATIAL_POINT_KWARGS = {"lat": 45, "lon": -150, "method": "nearest"} + + +@dataclass +class TimingResult: + """Container for timing results.""" + + method: str + open_time: float = 0.0 + spatial_subset_time: float = 0.0 + time_slice_time: float = 0.0 + timeseries_time: float = 0.0 + + @property + def total_time(self) -> float: + return ( + self.open_time + + self.spatial_subset_time + + self.time_slice_time + + self.timeseries_time + ) + + def to_dict(self) -> dict[str, float]: + return { + "open": self.open_time, + "spatial_subset_load": self.spatial_subset_time, + "time_slice_load": self.time_slice_time, + "timeseries_load": self.timeseries_time, + "total": self.total_time, + } + + +@dataclass +class BenchmarkResults: + """Container for all benchmark results.""" + + environment: str + description: str + n_files: int + timestamp: str = field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S")) + results: dict[str, TimingResult] = field(default_factory=dict) + + def add_result(self, result: TimingResult) -> None: + self.results[result.method] = result + + def to_dict(self) -> dict[str, Any]: + return { + "environment": self.environment, + "description": self.description, + "n_files": self.n_files, + "timestamp": self.timestamp, + "timings": { + method: result.to_dict() for method, result in self.results.items() + }, + } + + def save(self, filepath: Path) -> None: + """Save results to JSON file.""" + all_results = {} + if filepath.exists(): + with open(filepath) as f: + all_results = json.load(f) + + all_results[self.environment] = self.to_dict() + + with open(filepath, "w") as f: + json.dump(all_results, f, indent=2) + + print(f"\n{'='*60}") + print(f"Results saved to {filepath}") + print(f"{'='*60}") + + +def print_timing(category: str, method: str, elapsed: float) -> None: + """Print a timing result.""" + print(f" {category}: {elapsed:.2f}s") + + +def run_benchmarks(ds: xr.Dataset, n_files: int) -> tuple[float, float, float]: + """Run standard benchmark operations on a dataset.""" + # Spatial subset (single time step) + start = perf_counter() + _ = ds["Tair"].isel(time=0).sel(**SPATIAL_SUBSET_KWARGS).load() + spatial_time = perf_counter() - start + + # Time slice (all spatial data for one time) + start = perf_counter() + _ = ds["Tair"].sel(**TIME_SLICE_KWARGS).load() + time_slice_time = perf_counter() - start + + # Timeseries at a point + start = perf_counter() + _ = ds["Tair"].sel(**SPATIAL_POINT_KWARGS).isel(time=slice(0, n_files)).load() + timeseries_time = perf_counter() - start + + return spatial_time, time_slice_time, timeseries_time + + +def print_result_summary(result: TimingResult) -> None: + """Print a summary box for a timing result.""" + print(f"\n {'─'*50}") + print(f" {result.method}") + print(f" {'─'*50}") + print(f" Open: {result.open_time:>8.2f}s") + print(f" Spatial subset: {result.spatial_subset_time:>8.2f}s") + print(f" Time slice: {result.time_slice_time:>8.2f}s") + print(f" Timeseries: {result.timeseries_time:>8.2f}s") + print(f" {'─'*50}") + print(f" TOTAL: {result.total_time:>8.2f}s") + print(f" {'─'*50}") + + +# ============================================================================= +# Benchmark Methods +# ============================================================================= + + +def benchmark_fsspec_default(files: list[str], n_files: int) -> TimingResult: + """Benchmark fsspec with default cache settings.""" + print("\n[fsspec_default_cache] Testing fsspec + h5netcdf (default cache)...") + + result = TimingResult(method="fsspec_default_cache") + + start = perf_counter() + fs = fsspec.filesystem("s3", anon=True) + file_objs = [fs.open(f) for f in files[:n_files]] + ds = xr.open_mfdataset( + file_objs, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + # Cleanup + for f in file_objs: + f.close() + del fs, file_objs, ds + + print_result_summary(result) + return result + + +def benchmark_fsspec_block_cache(files: list[str], n_files: int) -> TimingResult: + """Benchmark fsspec with block cache settings.""" + print("\n[fsspec_block_cache] Testing fsspec + h5netcdf (block cache)...") + + result = TimingResult(method="fsspec_block_cache") + + fsspec_caching = { + "cache_type": "blockcache", + "block_size": 1024 * 1024 * 8, # 8 MB blocks + } + + start = perf_counter() + fs = fsspec.filesystem("s3", anon=True) + file_objs = [fs.open(f, **fsspec_caching) for f in files[:n_files]] + ds = xr.open_mfdataset( + file_objs, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + # Cleanup + for f in file_objs: + f.close() + del fs, file_objs, ds + + print_result_summary(result) + return result + + +def benchmark_obstore_reader(files: list[str], n_files: int) -> TimingResult: + """Benchmark obstore ObstoreReader.""" + from obstore.store import S3Store + + from obspec_utils import ObstoreReader + + print("\n[obstore_reader] Testing ObstoreReader...") + + result = TimingResult(method="obstore_reader") + + start = perf_counter() + store = S3Store(bucket=BUCKET, region=REGION, config={"skip_signature": "true"}) + readers = [ + ObstoreReader(store=store, path=f.replace(f"s3://{BUCKET}/", "")) + for f in files[:n_files] + ] + ds = xr.open_mfdataset( + readers, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + del store, readers, ds + + print_result_summary(result) + return result + + +def benchmark_obstore_eager(files: list[str], n_files: int) -> TimingResult: + """Benchmark obstore ObstoreEagerReader.""" + from obstore.store import S3Store + + from obspec_utils import ObstoreEagerReader + + print("\n[obstore_eager] Testing ObstoreEagerReader...") + + result = TimingResult(method="obstore_eager") + + start = perf_counter() + store = S3Store(bucket=BUCKET, region=REGION, config={"skip_signature": "true"}) + readers = [ + ObstoreEagerReader(store=store, path=f.replace(f"s3://{BUCKET}/", "")) + for f in files[:n_files] + ] + ds = xr.open_mfdataset( + readers, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + del store, readers, ds + + print_result_summary(result) + return result + + +def benchmark_obstore_prefetch(files: list[str], n_files: int) -> TimingResult: + """Benchmark obstore ObstorePrefetchReader.""" + from obstore.store import S3Store + + from obspec_utils import ObstorePrefetchReader + + print("\n[obstore_prefetch] Testing ObstorePrefetchReader...") + + result = TimingResult(method="obstore_prefetch") + + start = perf_counter() + store = S3Store(bucket=BUCKET, region=REGION, config={"skip_signature": "true"}) + readers = [ + ObstorePrefetchReader( + store=store, + path=f.replace(f"s3://{BUCKET}/", ""), + prefetch_size=4 * 1024 * 1024, # 4 MB prefetch + chunk_size=1024 * 1024, # 1 MB chunks + ) + for f in files[:n_files] + ] + ds = xr.open_mfdataset( + readers, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + # Cleanup prefetch readers + for r in readers: + r.close() + del store, readers, ds + + print_result_summary(result) + return result + + +def benchmark_obstore_parallel(files: list[str], n_files: int) -> TimingResult: + """Benchmark obstore ObstoreParallelReader.""" + from obstore.store import S3Store + + from obspec_utils import ObstoreParallelReader + + print("\n[obstore_parallel] Testing ObstoreParallelReader...") + + result = TimingResult(method="obstore_parallel") + + start = perf_counter() + store = S3Store(bucket=BUCKET, region=REGION, config={"skip_signature": "true"}) + readers = [ + ObstoreParallelReader( + store=store, + path=f.replace(f"s3://{BUCKET}/", ""), + chunk_size=1024 * 1024, # 1 MB chunks + batch_size=16, # Fetch up to 16 ranges in parallel + ) + for f in files[:n_files] + ] + ds = xr.open_mfdataset( + readers, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + # Cleanup parallel readers + for r in readers: + r.close() + del store, readers, ds + + print_result_summary(result) + return result + + +def benchmark_obstore_hybrid(files: list[str], n_files: int) -> TimingResult: + """Benchmark obstore ObstoreHybridReader.""" + from obstore.store import S3Store + + from obspec_utils import ObstoreHybridReader + + print("\n[obstore_hybrid] Testing ObstoreHybridReader...") + + result = TimingResult(method="obstore_hybrid") + + start = perf_counter() + store = S3Store(bucket=BUCKET, region=REGION, config={"skip_signature": "true"}) + readers = [ + ObstoreHybridReader( + store=store, + path=f.replace(f"s3://{BUCKET}/", ""), + initial_readahead=32 * 1024, # 32 KB initial + readahead_multiplier=2.0, # Double each time + chunk_size=1024 * 1024, # 1 MB chunks for random access + batch_size=16, + ) + for f in files[:n_files] + ] + ds = xr.open_mfdataset( + readers, engine="h5netcdf", combine="nested", concat_dim="time", parallel=True + ) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + # Cleanup hybrid readers + for r in readers: + r.close() + del store, readers, ds + + print_result_summary(result) + return result + + +def benchmark_virtualzarr_icechunk(n_files: int) -> TimingResult: + """Benchmark VirtualiZarr + Icechunk approach.""" + import icechunk + + print("\n[virtualzarr_icechunk] Testing VirtualiZarr + Icechunk...") + + result = TimingResult(method="virtualzarr_icechunk") + + start = perf_counter() + storage = icechunk.s3_storage( + bucket=BUCKET, + prefix="virtual-zarr-store/NLDAS-3-icechunk", + region="us-west-2", + anonymous=True, + ) + + chunk_url = f"s3://{BUCKET}/{PREFIX}/" + virtual_credentials = icechunk.containers_credentials( + {chunk_url: icechunk.s3_anonymous_credentials()} + ) + + repo = icechunk.Repository.open( + storage=storage, + authorize_virtual_chunk_access=virtual_credentials, + ) + + session = repo.readonly_session("main") + ds = xr.open_zarr(session.store, consolidated=False, zarr_format=3, chunks={}) + result.open_time = perf_counter() - start + + ( + result.spatial_subset_time, + result.time_slice_time, + result.timeseries_time, + ) = run_benchmarks(ds, n_files) + + del session, repo, storage, ds + + print_result_summary(result) + return result + + +# ============================================================================= +# Results Display +# ============================================================================= + + +def print_comparison_table(results: BenchmarkResults) -> None: + """Print a comparison table of all results.""" + print("\n") + print("=" * 80) + print(" BENCHMARK COMPARISON") + print("=" * 80) + print(f" Environment: {results.environment}") + print(f" Description: {results.description}") + print(f" Files tested: {results.n_files}") + print(f" Timestamp: {results.timestamp}") + print("=" * 80) + + # Header + print( + f"\n{'Method':<25} {'Open':>10} {'Spatial':>10} {'Time':>10} {'Series':>10} {'TOTAL':>10}" + ) + print("-" * 80) + + # Sort by total time + sorted_results = sorted(results.results.values(), key=lambda r: r.total_time) + + fastest = sorted_results[0].total_time if sorted_results else 1 + + for result in sorted_results: + speedup = result.total_time / fastest if fastest > 0 else 1 + speedup_str = f"({speedup:.1f}x)" if speedup > 1.01 else "(fastest)" + + print( + f"{result.method:<25} " + f"{result.open_time:>10.2f} " + f"{result.spatial_subset_time:>10.2f} " + f"{result.time_slice_time:>10.2f} " + f"{result.timeseries_time:>10.2f} " + f"{result.total_time:>10.2f} {speedup_str}" + ) + + print("-" * 80) + print("\nAll times in seconds. Lower is better.") + + +# ============================================================================= +# Main +# ============================================================================= + + +def get_nldas_files() -> list[str]: + """Get list of NLDAS3 files from S3.""" + print("Discovering NLDAS3 files on S3...") + fs = fsspec.filesystem("s3", anon=True) + files = fs.glob(f"s3://{BUCKET}/{PREFIX}/**/*.nc") + files = sorted(["s3://" + f for f in files]) + print(f"Found {len(files)} files") + return files + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Benchmark cloud data reading approaches", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--environment", + choices=["local", "cloud"], + default="local", + help="Environment label for this run (default: local)", + ) + parser.add_argument( + "--description", + default=None, + help="Description for this run (e.g., 'MacBook Pro - Durham')", + ) + parser.add_argument( + "--n-files", + type=int, + default=5, + help="Number of files to test with (default: 5)", + ) + parser.add_argument( + "--output", + type=Path, + default=Path("scripts/benchmark_timings.json"), + help="Output file for timing results (default: scripts/benchmark_timings.json)", + ) + parser.add_argument( + "--skip", + nargs="+", + default=[], + choices=[ + "fsspec_default", + "fsspec_block", + "obstore_reader", + "obstore_eager", + "obstore_prefetch", + "obstore_parallel", + "obstore_hybrid", + "virtualzarr", + ], + help="Skip specific benchmarks", + ) + + args = parser.parse_args() + + description = args.description or ( + "Cloud compute (us-west-2)" if args.environment == "cloud" else "Local machine" + ) + + print("=" * 60) + print(" Cloud Data Reading Benchmark") + print("=" * 60) + print(f" Environment: {args.environment}") + print(f" Description: {description}") + print(f" N files: {args.n_files}") + print("=" * 60) + + # Get file list + files = get_nldas_files() + + if len(files) < args.n_files: + print(f"Warning: Only {len(files)} files available, using all of them") + args.n_files = len(files) + + # Initialize results + results = BenchmarkResults( + environment=args.environment, + description=description, + n_files=args.n_files, + ) + + # Run benchmarks + benchmarks: list[tuple[str, Callable[[], TimingResult]]] = [] + + if "fsspec_default" not in args.skip: + benchmarks.append( + ("fsspec_default", lambda: benchmark_fsspec_default(files, args.n_files)) + ) + + if "fsspec_block" not in args.skip: + benchmarks.append( + ( + "fsspec_block", + lambda: benchmark_fsspec_block_cache(files, args.n_files), + ) + ) + + if "obstore_reader" not in args.skip: + benchmarks.append( + ("obstore_reader", lambda: benchmark_obstore_reader(files, args.n_files)) + ) + + if "obstore_eager" not in args.skip: + benchmarks.append( + ( + "obstore_eager", + lambda: benchmark_obstore_eager(files, args.n_files), + ) + ) + + if "obstore_prefetch" not in args.skip: + benchmarks.append( + ( + "obstore_prefetch", + lambda: benchmark_obstore_prefetch(files, args.n_files), + ) + ) + + if "obstore_parallel" not in args.skip: + benchmarks.append( + ( + "obstore_parallel", + lambda: benchmark_obstore_parallel(files, args.n_files), + ) + ) + + if "obstore_hybrid" not in args.skip: + benchmarks.append( + ( + "obstore_hybrid", + lambda: benchmark_obstore_hybrid(files, args.n_files), + ) + ) + + if "virtualzarr" not in args.skip: + benchmarks.append( + ("virtualzarr", lambda: benchmark_virtualzarr_icechunk(args.n_files)) + ) + + for name, benchmark_fn in benchmarks: + try: + result = benchmark_fn() + results.add_result(result) + except Exception as e: + print(f"\n[{name}] FAILED: {e}") + + # Print comparison and save results + print_comparison_table(results) + results.save(args.output) + + +if __name__ == "__main__": + main() diff --git a/scripts/benchmark_timings.json b/scripts/benchmark_timings.json new file mode 100644 index 0000000..5ca0fa1 --- /dev/null +++ b/scripts/benchmark_timings.json @@ -0,0 +1,66 @@ +{ + "local": { + "environment": "local", + "description": "Local machine", + "n_files": 5, + "timestamp": "2025-12-19 17:41:11", + "timings": { + "fsspec_default_cache": { + "open": 278.9289269580004, + "spatial_subset_load": 0.01783616700049606, + "time_slice_load": 120.8567786249987, + "timeseries_load": 0.0270211669994751, + "total": 399.83056291699904 + }, + "fsspec_block_cache": { + "open": 41.593530750000355, + "spatial_subset_load": 0.013234292000561254, + "time_slice_load": 3.684706666001148, + "timeseries_load": 2.232457792000787, + "total": 47.52392950000285 + }, + "obstore_reader": { + "open": 73.77926016699712, + "spatial_subset_load": 0.6722390420000011, + "time_slice_load": 27.978767083001003, + "timeseries_load": 2.128492916999676, + "total": 104.5587592089978 + }, + "obstore_eager": { + "open": 137.93904283400116, + "spatial_subset_load": 0.025063291999686044, + "time_slice_load": 1.0478867909987457, + "timeseries_load": 0.02721483400091529, + "total": 139.0392077510005 + }, + "obstore_prefetch": { + "open": 22.70367158299996, + "spatial_subset_load": 0.2600817500024277, + "time_slice_load": 12.648685750002187, + "timeseries_load": 1.6537993339989043, + "total": 37.26623841700348 + }, + "obstore_parallel": { + "open": 14.851732625000295, + "spatial_subset_load": 0.42988983299801475, + "time_slice_load": 9.5801755829998, + "timeseries_load": 1.1358166659993003, + "total": 25.99761470699741 + }, + "obstore_hybrid": { + "open": 14.648422333997587, + "spatial_subset_load": 0.46534970799984876, + "time_slice_load": 9.89927154200268, + "timeseries_load": 1.2309044579997135, + "total": 26.24394804199983 + }, + "virtualzarr_icechunk": { + "open": 2.2793160830005945, + "spatial_subset_load": 2.765376625000499, + "time_slice_load": 4.176158291997126, + "timeseries_load": 0.6269212090010114, + "total": 9.847772208999231 + } + } + } +} \ No newline at end of file diff --git a/src/obspec_utils/obstore.py b/src/obspec_utils/obstore.py index 3cfda9a..bf983da 100644 --- a/src/obspec_utils/obstore.py +++ b/src/obspec_utils/obstore.py @@ -2,6 +2,20 @@ from typing import TYPE_CHECKING +import threading +from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor + +import obstore as obs + +from obstore.store import MemoryStore + + +if TYPE_CHECKING: + from obstore import ReadableFile + from obstore.store import ObjectStore + + try: import obstore as obs from obstore.store import MemoryStore @@ -11,38 +25,26 @@ ) from e -if TYPE_CHECKING: - from obstore import ReadableFile - from obstore.store import ObjectStore - - class ObstoreReader: - """ - A file-like reader using obstore's native ReadableFile. - - This class uses obstore's optimized `open_reader()` which provides efficient - buffered reading. It requires an actual [obstore.store.ObjectStore][] instance. - - For a generic reader that works with any ReadableStore, use - [StoreReader][obspec_utils.obspec.StoreReader] instead. - """ - _reader: ReadableFile def __init__( self, store: ObjectStore, path: str, buffer_size: int = 1024 * 1024 ) -> None: """ - Create an obstore file reader. + Create an obstore file reader that implements the read, readall, seek, and tell methods, which + can be used in libraries that expect file-like objects. + + This wrapper is necessary in order to return Python bytes types rather than obstore Bytes buffers. Parameters ---------- store - An obstore [ObjectStore][obstore.store.ObjectStore] instance. + [ObjectStore][obstore.store.ObjectStore] for reading the file. path - The path to the file within the store. + The path to the file within the store. This should not include the prefix. buffer_size - The minimum number of bytes to read in a single request. + The minimum number of bytes to read in a single request. Up to buffer_size bytes will be buffered in memory. """ self._reader = obs.open_reader(store, path, buffer_size=buffer_size) @@ -52,22 +54,21 @@ def read(self, size: int, /) -> bytes: def readall(self) -> bytes: return self._reader.read().to_bytes() - def seek(self, offset: int, whence: int = 0, /) -> int: + def seek(self, offset: int, whence: int = 0, /): + # TODO: Check on default for whence return self._reader.seek(offset, whence) def tell(self) -> int: return self._reader.tell() -class ObstoreMemCacheReader(ObstoreReader): +class ObstoreEagerReader(ObstoreReader): """ - A file-like reader using obstore's MemoryStore for caching. - - This class fetches the entire file into obstore's MemoryStore, then uses - obstore's native ReadableFile for efficient cached reads. + A file reader that eagerly loads the entire file into memory. - For a generic cached reader that works with any ReadableStore, use - [StoreMemCacheReader][obspec_utils.obspec.StoreMemCacheReader] instead. + This reader loads the complete file contents into a MemoryStore before + any reads occur. This is beneficial for files that will be read multiple + times or when you want to avoid repeated network requests. """ _reader: ReadableFile @@ -75,19 +76,873 @@ class ObstoreMemCacheReader(ObstoreReader): def __init__(self, store: ObjectStore, path: str) -> None: """ - Create an obstore memory-cached reader. + Create an obstore file reader that eagerly loads the file into memory. Parameters ---------- store - An obstore [ObjectStore][obstore.store.ObjectStore] instance. + [ObjectStore][obstore.store.ObjectStore] for reading the file. path - The path to the file within the store. + The path to the file within the store. This should not include the prefix. """ self._memstore = MemoryStore() buffer = store.get(path).bytes() self._memstore.put(path, buffer) + self._reader = obs.open_reader(self._memstore, path) -__all__ = ["ObstoreReader", "ObstoreMemCacheReader"] +class ObstorePrefetchReader: + """ + A file reader that prefetches upcoming byte ranges in the background. + + This reader anticipates sequential read patterns and fetches data ahead of + the current position using background threads. This can significantly reduce + latency for sequential reads from remote object stores. + + The prefetch buffer uses an LRU cache to manage memory, automatically evicting + older chunks when the cache reaches capacity. + """ + + def __init__( + self, + store: ObjectStore, + path: str, + *, + prefetch_size: int = 4 * 1024 * 1024, + chunk_size: int = 1024 * 1024, + max_workers: int = 2, + max_cached_chunks: int = 8, + ) -> None: + """ + Create an obstore file reader with background prefetching. + + Parameters + ---------- + store + [ObjectStore][obstore.store.ObjectStore] for reading the file. + path + The path to the file within the store. This should not include the prefix. + prefetch_size + Total number of bytes to prefetch ahead of the current position. + Default is 4 MB. + chunk_size + Size of each prefetch chunk in bytes. Smaller chunks provide finer + granularity but more overhead. Default is 1 MB. + max_workers + Maximum number of concurrent prefetch threads. Default is 2. + max_cached_chunks + Maximum number of chunks to keep in the LRU cache. Oldest chunks + are evicted when this limit is exceeded. Default is 8. + """ + self._store = store + self._path = path + self._prefetch_size = prefetch_size + self._chunk_size = chunk_size + self._max_cached_chunks = max_cached_chunks + + # Get file size + meta = obs.head(store, path) + self._size: int = meta["size"] + + # Current position in the file + self._pos = 0 + + # LRU cache: chunk_index -> bytes + # Using OrderedDict for LRU behavior + self._cache: OrderedDict[int, bytes] = OrderedDict() + self._cache_lock = threading.Lock() + + # Track which chunks are currently being fetched + self._pending_chunks: set[int] = set() + self._pending_lock = threading.Lock() + + # Thread pool for background prefetching + self._executor = ThreadPoolExecutor(max_workers=max_workers) + + # Flag to stop prefetching on close + self._closed = False + + def _chunk_index(self, pos: int) -> int: + """Get the chunk index for a given byte position.""" + return pos // self._chunk_size + + def _chunk_range(self, chunk_idx: int) -> tuple[int, int]: + """Get the (start, end) byte range for a chunk index.""" + start = chunk_idx * self._chunk_size + end = min(start + self._chunk_size, self._size) + return start, end + + def _fetch_chunk(self, chunk_idx: int) -> bytes | None: + """Fetch a single chunk from the store.""" + if self._closed: + return None + + start, end = self._chunk_range(chunk_idx) + if start >= self._size: + return None + + data = obs.get_range(self._store, self._path, start=start, end=end) + return data.to_bytes() + + def _prefetch_chunk_background(self, chunk_idx: int) -> None: + """Background task to prefetch a chunk.""" + if self._closed: + return + + try: + data = self._fetch_chunk(chunk_idx) + if data is not None: + with self._cache_lock: + if chunk_idx not in self._cache: + self._cache[chunk_idx] = data + # Evict oldest if over capacity + while len(self._cache) > self._max_cached_chunks: + self._cache.popitem(last=False) + finally: + with self._pending_lock: + self._pending_chunks.discard(chunk_idx) + + def _get_chunk(self, chunk_idx: int) -> bytes | None: + """Get a chunk, fetching synchronously if not cached.""" + # Check cache first + with self._cache_lock: + if chunk_idx in self._cache: + # Move to end for LRU + self._cache.move_to_end(chunk_idx) + return self._cache[chunk_idx] + + # Fetch synchronously + data = self._fetch_chunk(chunk_idx) + if data is not None: + with self._cache_lock: + self._cache[chunk_idx] = data + self._cache.move_to_end(chunk_idx) + while len(self._cache) > self._max_cached_chunks: + self._cache.popitem(last=False) + + return data + + def _trigger_prefetch(self) -> None: + """Trigger prefetching of upcoming chunks.""" + if self._closed: + return + + current_chunk = self._chunk_index(self._pos) + prefetch_end = self._pos + self._prefetch_size + end_chunk = self._chunk_index(min(prefetch_end, self._size - 1)) + + for chunk_idx in range(current_chunk, end_chunk + 1): + # Skip if already cached or being fetched + with self._cache_lock: + if chunk_idx in self._cache: + continue + + with self._pending_lock: + if chunk_idx in self._pending_chunks: + continue + self._pending_chunks.add(chunk_idx) + + # Submit prefetch task + self._executor.submit(self._prefetch_chunk_background, chunk_idx) + + def read(self, size: int = -1, /) -> bytes: + """ + Read up to size bytes from the file. + + Parameters + ---------- + size + Maximum number of bytes to read. If -1, read until end of file. + + Returns + ------- + bytes + The data read from the file. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if size == -1: + size = self._size - self._pos + + if size <= 0 or self._pos >= self._size: + return b"" + + # Clamp to remaining bytes + size = min(size, self._size - self._pos) + + # Trigger prefetch for upcoming data + self._trigger_prefetch() + + # Collect data from chunks + result = bytearray() + bytes_remaining = size + + while bytes_remaining > 0 and self._pos < self._size: + chunk_idx = self._chunk_index(self._pos) + chunk_data = self._get_chunk(chunk_idx) + + if chunk_data is None: + break + + # Calculate offset within chunk + chunk_start, _ = self._chunk_range(chunk_idx) + offset_in_chunk = self._pos - chunk_start + + # Calculate how much to read from this chunk + available = len(chunk_data) - offset_in_chunk + to_read = min(bytes_remaining, available) + + result.extend(chunk_data[offset_in_chunk : offset_in_chunk + to_read]) + self._pos += to_read + bytes_remaining -= to_read + + return bytes(result) + + def readall(self) -> bytes: + """Read and return all remaining bytes until end of file.""" + return self.read(-1) + + def seek(self, offset: int, whence: int = 0, /) -> int: + """ + Change the stream position. + + Parameters + ---------- + offset + Position offset. + whence + Reference point for offset: + - 0: Start of file (default) + - 1: Current position + - 2: End of file + + Returns + ------- + int + The new absolute position. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if whence == 0: + new_pos = offset + elif whence == 1: + new_pos = self._pos + offset + elif whence == 2: + new_pos = self._size + offset + else: + raise ValueError(f"Invalid whence value: {whence}") + + self._pos = max(0, min(new_pos, self._size)) + return self._pos + + def tell(self) -> int: + """Return the current stream position.""" + if self._closed: + raise ValueError("I/O operation on closed file") + return self._pos + + def close(self) -> None: + """Close the reader and release resources.""" + self._closed = True + self._executor.shutdown(wait=False) + with self._cache_lock: + self._cache.clear() + + def __enter__(self) -> "ObstorePrefetchReader": + return self + + def __exit__(self, *args) -> None: + self.close() + + +class ObstoreParallelReader: + """ + A file reader that fetches multiple byte ranges in parallel. + + This reader batches range requests and fetches them concurrently using + obstore's get_ranges API. This is particularly effective for workloads + that read multiple non-contiguous regions of a file, such as loading + array chunks from HDF5/NetCDF files. + + Unlike the prefetch reader which speculatively fetches ahead, this reader + fetches exactly what's needed but does so in parallel batches. + """ + + def __init__( + self, + store: ObjectStore, + path: str, + *, + chunk_size: int = 1024 * 1024, + max_cached_chunks: int = 32, + batch_size: int = 16, + ) -> None: + """ + Create an obstore file reader with parallel range fetching. + + Parameters + ---------- + store + [ObjectStore][obstore.store.ObjectStore] for reading the file. + path + The path to the file within the store. This should not include the prefix. + chunk_size + Size of each chunk in bytes. Reads are aligned to chunk boundaries + and multiple chunks are fetched in parallel. Default is 1 MB. + max_cached_chunks + Maximum number of chunks to keep in the LRU cache. Default is 32. + batch_size + Maximum number of ranges to fetch in a single parallel request. + Default is 16. + """ + self._store = store + self._path = path + self._chunk_size = chunk_size + self._max_cached_chunks = max_cached_chunks + self._batch_size = batch_size + + # Get file size + meta = obs.head(store, path) + self._size: int = meta["size"] + + # Current position in the file + self._pos = 0 + + # LRU cache: chunk_index -> bytes + self._cache: OrderedDict[int, bytes] = OrderedDict() + self._cache_lock = threading.Lock() + + self._closed = False + + def _chunk_index(self, pos: int) -> int: + """Get the chunk index for a given byte position.""" + return pos // self._chunk_size + + def _chunk_range(self, chunk_idx: int) -> tuple[int, int]: + """Get the (start, end) byte range for a chunk index.""" + start = chunk_idx * self._chunk_size + end = min(start + self._chunk_size, self._size) + return start, end + + def _fetch_chunks_parallel(self, chunk_indices: list[int]) -> dict[int, bytes]: + """Fetch multiple chunks in parallel using get_ranges.""" + if not chunk_indices or self._closed: + return {} + + # Build range requests + starts = [] + ends = [] + valid_indices = [] + + for idx in chunk_indices: + start, end = self._chunk_range(idx) + if start < self._size: + starts.append(start) + ends.append(end) + valid_indices.append(idx) + + if not starts: + return {} + + # Fetch all ranges in parallel + results = obs.get_ranges(self._store, self._path, starts=starts, ends=ends) + + # Map results back to chunk indices + return {idx: data.to_bytes() for idx, data in zip(valid_indices, results)} + + def _get_chunks(self, chunk_indices: list[int]) -> dict[int, bytes]: + """Get multiple chunks, fetching missing ones in parallel.""" + result = {} + missing = [] + + # Check cache for each chunk + with self._cache_lock: + for idx in chunk_indices: + if idx in self._cache: + self._cache.move_to_end(idx) + result[idx] = self._cache[idx] + else: + missing.append(idx) + + # Fetch missing chunks in batches + for i in range(0, len(missing), self._batch_size): + batch = missing[i : i + self._batch_size] + fetched = self._fetch_chunks_parallel(batch) + + # Add to cache and result + with self._cache_lock: + for idx, data in fetched.items(): + self._cache[idx] = data + self._cache.move_to_end(idx) + result[idx] = data + + # Evict oldest chunks if over capacity + while len(self._cache) > self._max_cached_chunks: + self._cache.popitem(last=False) + + return result + + def read(self, size: int = -1, /) -> bytes: + """ + Read up to size bytes from the file. + + Parameters + ---------- + size + Maximum number of bytes to read. If -1, read until end of file. + + Returns + ------- + bytes + The data read from the file. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if size == -1: + size = self._size - self._pos + + if size <= 0 or self._pos >= self._size: + return b"" + + # Clamp to remaining bytes + size = min(size, self._size - self._pos) + + # Determine which chunks we need + start_chunk = self._chunk_index(self._pos) + end_chunk = self._chunk_index(self._pos + size - 1) + chunk_indices = list(range(start_chunk, end_chunk + 1)) + + # Fetch all needed chunks in parallel + chunks = self._get_chunks(chunk_indices) + + # Assemble result from chunks + result = bytearray() + bytes_remaining = size + + while bytes_remaining > 0 and self._pos < self._size: + chunk_idx = self._chunk_index(self._pos) + chunk_data = chunks.get(chunk_idx) + + if chunk_data is None: + break + + # Calculate offset within chunk + chunk_start, _ = self._chunk_range(chunk_idx) + offset_in_chunk = self._pos - chunk_start + + # Calculate how much to read from this chunk + available = len(chunk_data) - offset_in_chunk + to_read = min(bytes_remaining, available) + + result.extend(chunk_data[offset_in_chunk : offset_in_chunk + to_read]) + self._pos += to_read + bytes_remaining -= to_read + + return bytes(result) + + def readall(self) -> bytes: + """Read and return all remaining bytes until end of file.""" + return self.read(-1) + + def seek(self, offset: int, whence: int = 0, /) -> int: + """ + Change the stream position. + + Parameters + ---------- + offset + Position offset. + whence + Reference point for offset: + - 0: Start of file (default) + - 1: Current position + - 2: End of file + + Returns + ------- + int + The new absolute position. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if whence == 0: + new_pos = offset + elif whence == 1: + new_pos = self._pos + offset + elif whence == 2: + new_pos = self._size + offset + else: + raise ValueError(f"Invalid whence value: {whence}") + + self._pos = max(0, min(new_pos, self._size)) + return self._pos + + def tell(self) -> int: + """Return the current stream position.""" + if self._closed: + raise ValueError("I/O operation on closed file") + return self._pos + + def close(self) -> None: + """Close the reader and release resources.""" + self._closed = True + with self._cache_lock: + self._cache.clear() + + def __enter__(self) -> "ObstoreParallelReader": + return self + + def __exit__(self, *args) -> None: + self.close() + + +class ObstoreHybridReader: + """ + A file reader combining exponential readahead with parallel chunk fetching. + + This reader uses two complementary caching strategies: + + 1. **Exponential readahead cache**: For sequential reads from the file start + (typical for HDF5/NetCDF metadata parsing). Fetches grow exponentially + (e.g., 32KB → 64KB → 128KB) to minimize round-trips while avoiding + over-fetching for small files. + + 2. **Parallel chunk cache**: For random access to data chunks. Uses + `get_ranges` to fetch multiple chunks in parallel with LRU eviction. + + The reader automatically selects the appropriate strategy based on access + patterns, making it effective for both metadata-heavy operations (opening + files) and data-heavy operations (loading array slices). + """ + + def __init__( + self, + store: ObjectStore, + path: str, + *, + initial_readahead: int = 32 * 1024, + readahead_multiplier: float = 2.0, + max_readahead: int = 16 * 1024 * 1024, + chunk_size: int = 1024 * 1024, + max_cached_chunks: int = 32, + batch_size: int = 16, + ) -> None: + """ + Create an obstore file reader with hybrid caching. + + Parameters + ---------- + store + [ObjectStore][obstore.store.ObjectStore] for reading the file. + path + The path to the file within the store. This should not include the prefix. + initial_readahead + Initial readahead size in bytes for sequential reads. Default is 32 KB. + readahead_multiplier + Multiplier for subsequent readahead sizes. Default is 2.0 (doubling). + max_readahead + Maximum readahead size in bytes. Default is 16 MB. + chunk_size + Size of each chunk for random access reads. Default is 1 MB. + max_cached_chunks + Maximum number of chunks in the LRU cache. Default is 32. + batch_size + Maximum number of ranges to fetch in a single parallel request. + Default is 16. + """ + self._store = store + self._path = path + self._initial_readahead = initial_readahead + self._readahead_multiplier = readahead_multiplier + self._max_readahead = max_readahead + self._chunk_size = chunk_size + self._max_cached_chunks = max_cached_chunks + self._batch_size = batch_size + + # Get file size + meta = obs.head(store, path) + self._size: int = meta["size"] + + # Current position in the file + self._pos = 0 + + # Sequential readahead cache (contiguous from offset 0) + self._seq_buffers: list[bytes] = [] + self._seq_len = 0 + self._last_readahead_size = 0 + + # LRU chunk cache for random access + self._chunk_cache: OrderedDict[int, bytes] = OrderedDict() + self._cache_lock = threading.Lock() + + self._closed = False + + def _next_readahead_size(self) -> int: + """Calculate the next readahead size using exponential growth.""" + if self._last_readahead_size == 0: + return self._initial_readahead + next_size = int(self._last_readahead_size * self._readahead_multiplier) + return min(next_size, self._max_readahead) + + def _seq_contains(self, start: int, end: int) -> bool: + """Check if the range is fully contained in the sequential cache.""" + return start >= 0 and end <= self._seq_len + + def _seq_slice(self, start: int, end: int) -> bytes: + """Extract a slice from the sequential cache.""" + if start >= end: + return b"" + + result = bytearray() + remaining_start = start + remaining_end = end + + for buf in self._seq_buffers: + buf_len = len(buf) + + # Skip buffers before our range + if remaining_start >= buf_len: + remaining_start -= buf_len + remaining_end -= buf_len + continue + + # Extract from this buffer + chunk_start = remaining_start + chunk_end = min(remaining_end, buf_len) + result.extend(buf[chunk_start:chunk_end]) + + remaining_start = 0 + remaining_end -= buf_len + + if remaining_end <= 0: + break + + return bytes(result) + + def _extend_sequential_cache(self, needed_end: int) -> None: + """Extend the sequential cache to cover at least needed_end.""" + while self._seq_len < needed_end and self._seq_len < self._size: + fetch_size = self._next_readahead_size() + # Ensure we fetch at least enough to cover the needed range + fetch_size = max(fetch_size, needed_end - self._seq_len) + fetch_end = min(self._seq_len + fetch_size, self._size) + + if fetch_end <= self._seq_len: + break + + data = obs.get_range( + self._store, self._path, start=self._seq_len, end=fetch_end + ) + buf = data.to_bytes() + + self._seq_buffers.append(buf) + self._seq_len += len(buf) + self._last_readahead_size = len(buf) + + def _chunk_index(self, pos: int) -> int: + """Get the chunk index for a given byte position.""" + return pos // self._chunk_size + + def _chunk_range(self, chunk_idx: int) -> tuple[int, int]: + """Get the (start, end) byte range for a chunk index.""" + start = chunk_idx * self._chunk_size + end = min(start + self._chunk_size, self._size) + return start, end + + def _fetch_chunks_parallel(self, chunk_indices: list[int]) -> dict[int, bytes]: + """Fetch multiple chunks in parallel using get_ranges.""" + if not chunk_indices or self._closed: + return {} + + starts = [] + ends = [] + valid_indices = [] + + for idx in chunk_indices: + start, end = self._chunk_range(idx) + if start < self._size: + starts.append(start) + ends.append(end) + valid_indices.append(idx) + + if not starts: + return {} + + results = obs.get_ranges(self._store, self._path, starts=starts, ends=ends) + return {idx: data.to_bytes() for idx, data in zip(valid_indices, results)} + + def _get_chunks(self, chunk_indices: list[int]) -> dict[int, bytes]: + """Get multiple chunks, fetching missing ones in parallel.""" + result = {} + missing = [] + + with self._cache_lock: + for idx in chunk_indices: + if idx in self._chunk_cache: + self._chunk_cache.move_to_end(idx) + result[idx] = self._chunk_cache[idx] + else: + missing.append(idx) + + # Fetch missing chunks in batches + for i in range(0, len(missing), self._batch_size): + batch = missing[i : i + self._batch_size] + fetched = self._fetch_chunks_parallel(batch) + + with self._cache_lock: + for idx, data in fetched.items(): + self._chunk_cache[idx] = data + self._chunk_cache.move_to_end(idx) + result[idx] = data + + while len(self._chunk_cache) > self._max_cached_chunks: + self._chunk_cache.popitem(last=False) + + return result + + def _read_via_chunks(self, start: int, end: int) -> bytes: + """Read a range using the chunk cache with parallel fetching.""" + start_chunk = self._chunk_index(start) + end_chunk = self._chunk_index(end - 1) if end > start else start_chunk + chunk_indices = list(range(start_chunk, end_chunk + 1)) + + chunks = self._get_chunks(chunk_indices) + + result = bytearray() + pos = start + remaining = end - start + + while remaining > 0 and pos < self._size: + chunk_idx = self._chunk_index(pos) + chunk_data = chunks.get(chunk_idx) + + if chunk_data is None: + break + + chunk_start, _ = self._chunk_range(chunk_idx) + offset_in_chunk = pos - chunk_start + available = len(chunk_data) - offset_in_chunk + to_read = min(remaining, available) + + result.extend(chunk_data[offset_in_chunk : offset_in_chunk + to_read]) + pos += to_read + remaining -= to_read + + return bytes(result) + + def read(self, size: int = -1, /) -> bytes: + """ + Read up to size bytes from the file. + + Uses exponential readahead for sequential reads from the file start, + and parallel chunk fetching for random access patterns. + + Parameters + ---------- + size + Maximum number of bytes to read. If -1, read until end of file. + + Returns + ------- + bytes + The data read from the file. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if size == -1: + size = self._size - self._pos + + if size <= 0 or self._pos >= self._size: + return b"" + + size = min(size, self._size - self._pos) + start = self._pos + end = start + size + + # Decide which cache strategy to use: + # - If reading from within or just past the sequential cache, extend it + # - Otherwise, use chunk-based parallel fetching + use_sequential = start <= self._seq_len + + if use_sequential: + # Extend sequential cache if needed + if end > self._seq_len: + self._extend_sequential_cache(end) + + # Read from sequential cache + data = self._seq_slice(start, min(end, self._seq_len)) + + # If we still need more (sequential cache hit file end), that's all we get + self._pos += len(data) + return data + else: + # Use parallel chunk fetching for random access + data = self._read_via_chunks(start, end) + self._pos += len(data) + return data + + def readall(self) -> bytes: + """Read and return all remaining bytes until end of file.""" + return self.read(-1) + + def seek(self, offset: int, whence: int = 0, /) -> int: + """ + Change the stream position. + + Parameters + ---------- + offset + Position offset. + whence + Reference point for offset: + - 0: Start of file (default) + - 1: Current position + - 2: End of file + + Returns + ------- + int + The new absolute position. + """ + if self._closed: + raise ValueError("I/O operation on closed file") + + if whence == 0: + new_pos = offset + elif whence == 1: + new_pos = self._pos + offset + elif whence == 2: + new_pos = self._size + offset + else: + raise ValueError(f"Invalid whence value: {whence}") + + self._pos = max(0, min(new_pos, self._size)) + return self._pos + + def tell(self) -> int: + """Return the current stream position.""" + if self._closed: + raise ValueError("I/O operation on closed file") + return self._pos + + def close(self) -> None: + """Close the reader and release resources.""" + self._closed = True + self._seq_buffers.clear() + with self._cache_lock: + self._chunk_cache.clear() + + def __enter__(self) -> "ObstoreHybridReader": + return self + + def __exit__(self, *args) -> None: + self.close() + + +__all__ = ["ObstoreReader", "ObstoreEagerReader"] diff --git a/src/obspec_utils/registry.py b/src/obspec_utils/registry.py index e690a92..2b5394e 100644 --- a/src/obspec_utils/registry.py +++ b/src/obspec_utils/registry.py @@ -29,7 +29,7 @@ def get_url_key(url: Url) -> UrlKey: """ Generate the UrlKey containing a url's scheme and authority/netloc that is used a the - primary key's in a [ObjectStoreRegistry.map][obspec_utils.registry.ObjectStoreRegistry.map] + primary key's in the [ObjectStoreRegistry][obspec_utils.registry.ObjectStoreRegistry] Parameters ---------- diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 0014f12..7ba11f6 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -1,6 +1,12 @@ import xarray as xr -from obspec_utils.obstore import ObstoreMemCacheReader, ObstoreReader -from obstore.store import LocalStore +from obspec_utils.obstore import ( + ObstoreReader, + ObstoreHybridReader, + ObstoreParallelReader, + ObstorePrefetchReader, + ObstoreEagerReader, +) +from obstore.store import LocalStore, MemoryStore def test_local_reader(local_netcdf4_file) -> None: @@ -10,31 +16,31 @@ def test_local_reader(local_netcdf4_file) -> None: xr.testing.assert_allclose(ds_fsspec, ds_obstore) -def test_memcache_reader(local_netcdf4_file) -> None: - """Test that ObstoreMemCacheReader works with xarray.""" +def test_eager_reader(local_netcdf4_file) -> None: + """Test that ObstoreEagerReader works with xarray.""" ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") - reader = ObstoreMemCacheReader(store=LocalStore(), path=local_netcdf4_file) + reader = ObstoreEagerReader(store=LocalStore(), path=local_netcdf4_file) ds_obstore = xr.open_dataset(reader, engine="h5netcdf") xr.testing.assert_allclose(ds_fsspec, ds_obstore) -def test_memcache_reader_interface(local_netcdf4_file) -> None: - """Test that ObstoreMemCacheReader implements the same interface as ObstoreReader.""" +def test_eager_reader_interface(local_netcdf4_file) -> None: + """Test that ObstoreEagerReader implements the same interface as ObstoreReader.""" store = LocalStore() regular_reader = ObstoreReader(store=store, path=local_netcdf4_file) - memcache_reader = ObstoreMemCacheReader(store=store, path=local_netcdf4_file) + eager_reader = ObstoreEagerReader(store=store, path=local_netcdf4_file) # Test readall data_regular = regular_reader.readall() - data_memcache = memcache_reader.readall() + data_memcache = eager_reader.readall() assert data_regular == data_memcache assert isinstance(data_memcache, bytes) -def test_memcache_reader_multiple_reads(local_netcdf4_file) -> None: - """Test that ObstoreMemCacheReader can perform multiple reads.""" +def test_eager_reader_multiple_reads(local_netcdf4_file) -> None: + """Test that ObstoreEagerReader can perform multiple reads.""" store = LocalStore() - reader = ObstoreMemCacheReader(store=store, path=local_netcdf4_file) + reader = ObstoreEagerReader(store=store, path=local_netcdf4_file) # Read the first 100 bytes chunk1 = reader.read(100) @@ -60,3 +66,567 @@ def test_memcache_reader_multiple_reads(local_netcdf4_file) -> None: # Re-reading from the beginning should give us the same data chunk1_again = reader.read(100) assert chunk1 == chunk1_again + + +def test_prefetch_reader(local_netcdf4_file) -> None: + """Test that ObstorePrefetchReader works with xarray.""" + ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") + with ObstorePrefetchReader(store=LocalStore(), path=local_netcdf4_file) as reader: + ds_obstore = xr.open_dataset(reader, engine="h5netcdf") + xr.testing.assert_allclose(ds_fsspec, ds_obstore) + + +def test_prefetch_reader_interface(local_netcdf4_file) -> None: + """Test that ObstorePrefetchReader implements the same interface as ObstoreReader.""" + store = LocalStore() + regular_reader = ObstoreReader(store=store, path=local_netcdf4_file) + prefetch_reader = ObstorePrefetchReader(store=store, path=local_netcdf4_file) + + # Test readall + data_regular = regular_reader.readall() + data_prefetch = prefetch_reader.readall() + assert data_regular == data_prefetch + assert isinstance(data_prefetch, bytes) + + prefetch_reader.close() + + +def test_prefetch_reader_multiple_reads(local_netcdf4_file) -> None: + """Test that ObstorePrefetchReader can perform multiple reads.""" + store = LocalStore() + reader = ObstorePrefetchReader(store=store, path=local_netcdf4_file) + + # Read the first 100 bytes + chunk1 = reader.read(100) + assert len(chunk1) == 100 + assert isinstance(chunk1, bytes) + + # Read the next 100 bytes + chunk2 = reader.read(100) + assert len(chunk2) == 100 + assert isinstance(chunk2, bytes) + + # The two chunks should be different (different parts of the file) + assert chunk1 != chunk2 + + # Test tell + position = reader.tell() + assert position == 200 + + # Test seek + reader.seek(0) + assert reader.tell() == 0 + + # Re-reading from the beginning should give us the same data + chunk1_again = reader.read(100) + assert chunk1 == chunk1_again + + reader.close() + + +def test_prefetch_reader_seek_whence() -> None: + """Test seek with different whence values.""" + store = MemoryStore() + data = b"0123456789" * 100 # 1000 bytes + store.put("test.bin", data) + + reader = ObstorePrefetchReader( + store=store, path="test.bin", chunk_size=100, prefetch_size=200 + ) + + # whence=0 (SEEK_SET): from start + pos = reader.seek(500) + assert pos == 500 + assert reader.tell() == 500 + + # whence=1 (SEEK_CUR): from current position + pos = reader.seek(100, 1) + assert pos == 600 + assert reader.tell() == 600 + + # whence=2 (SEEK_END): from end + pos = reader.seek(-100, 2) + assert pos == 900 + assert reader.tell() == 900 + + # Read from position 900 + chunk = reader.read(50) + assert chunk == data[900:950] + + reader.close() + + +def test_prefetch_reader_chunked_reads() -> None: + """Test reading across chunk boundaries.""" + store = MemoryStore() + data = bytes(range(256)) * 40 # 10240 bytes + store.put("test.bin", data) + + # Use small chunks to test boundary handling + reader = ObstorePrefetchReader( + store=store, path="test.bin", chunk_size=100, prefetch_size=300 + ) + + # Read across chunk boundary (chunk 0 ends at 100) + chunk = reader.read(150) + assert chunk == data[:150] + assert reader.tell() == 150 + + # Read more, crossing another boundary + chunk = reader.read(100) + assert chunk == data[150:250] + assert reader.tell() == 250 + + reader.close() + + +def test_prefetch_reader_context_manager() -> None: + """Test context manager properly closes resources.""" + store = MemoryStore() + store.put("test.bin", b"hello world") + + with ObstorePrefetchReader(store=store, path="test.bin") as reader: + data = reader.readall() + assert data == b"hello world" + + # After closing, operations should raise + try: + reader.read(1) + assert False, "Expected ValueError" + except ValueError: + pass + + +def test_prefetch_reader_read_beyond_eof() -> None: + """Test reading beyond end of file.""" + store = MemoryStore() + store.put("test.bin", b"short") + + reader = ObstorePrefetchReader(store=store, path="test.bin", chunk_size=100) + + # Read more than available + data = reader.read(1000) + assert data == b"short" + assert reader.tell() == 5 + + # Read at EOF returns empty + data = reader.read(100) + assert data == b"" + + reader.close() + + +# ============================================================================= +# ObstoreParallelReader Tests +# ============================================================================= + + +def test_parallel_reader(local_netcdf4_file) -> None: + """Test that ObstoreParallelReader works with xarray.""" + ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") + with ObstoreParallelReader(store=LocalStore(), path=local_netcdf4_file) as reader: + ds_obstore = xr.open_dataset(reader, engine="h5netcdf") + xr.testing.assert_allclose(ds_fsspec, ds_obstore) + + +def test_parallel_reader_interface(local_netcdf4_file) -> None: + """Test that ObstoreParallelReader implements the same interface as ObstoreReader.""" + store = LocalStore() + regular_reader = ObstoreReader(store=store, path=local_netcdf4_file) + parallel_reader = ObstoreParallelReader(store=store, path=local_netcdf4_file) + + # Test readall + data_regular = regular_reader.readall() + data_parallel = parallel_reader.readall() + assert data_regular == data_parallel + assert isinstance(data_parallel, bytes) + + parallel_reader.close() + + +def test_parallel_reader_multiple_reads(local_netcdf4_file) -> None: + """Test that ObstoreParallelReader can perform multiple reads.""" + store = LocalStore() + reader = ObstoreParallelReader(store=store, path=local_netcdf4_file) + + # Read the first 100 bytes + chunk1 = reader.read(100) + assert len(chunk1) == 100 + assert isinstance(chunk1, bytes) + + # Read the next 100 bytes + chunk2 = reader.read(100) + assert len(chunk2) == 100 + assert isinstance(chunk2, bytes) + + # The two chunks should be different (different parts of the file) + assert chunk1 != chunk2 + + # Test tell + position = reader.tell() + assert position == 200 + + # Test seek + reader.seek(0) + assert reader.tell() == 0 + + # Re-reading from the beginning should give us the same data + chunk1_again = reader.read(100) + assert chunk1 == chunk1_again + + reader.close() + + +def test_parallel_reader_seek_whence() -> None: + """Test seek with different whence values.""" + store = MemoryStore() + data = b"0123456789" * 100 # 1000 bytes + store.put("test.bin", data) + + reader = ObstoreParallelReader(store=store, path="test.bin", chunk_size=100) + + # whence=0 (SEEK_SET): from start + pos = reader.seek(500) + assert pos == 500 + assert reader.tell() == 500 + + # whence=1 (SEEK_CUR): from current position + pos = reader.seek(100, 1) + assert pos == 600 + assert reader.tell() == 600 + + # whence=2 (SEEK_END): from end + pos = reader.seek(-100, 2) + assert pos == 900 + assert reader.tell() == 900 + + # Read from position 900 + chunk = reader.read(50) + assert chunk == data[900:950] + + reader.close() + + +def test_parallel_reader_chunked_reads() -> None: + """Test reading across chunk boundaries with parallel fetching.""" + store = MemoryStore() + data = bytes(range(256)) * 40 # 10240 bytes + store.put("test.bin", data) + + # Use small chunks to test boundary handling + reader = ObstoreParallelReader(store=store, path="test.bin", chunk_size=100) + + # Read across chunk boundary (chunk 0 ends at 100) + chunk = reader.read(150) + assert chunk == data[:150] + assert reader.tell() == 150 + + # Read more, crossing another boundary + chunk = reader.read(100) + assert chunk == data[150:250] + assert reader.tell() == 250 + + reader.close() + + +def test_parallel_reader_large_read() -> None: + """Test reading a large range that spans many chunks.""" + store = MemoryStore() + data = bytes(range(256)) * 400 # 102400 bytes + store.put("test.bin", data) + + # Use small chunks to ensure parallel fetching kicks in + reader = ObstoreParallelReader( + store=store, path="test.bin", chunk_size=1000, batch_size=8 + ) + + # Read a large chunk that spans multiple batches + chunk = reader.read(50000) + assert chunk == data[:50000] + assert reader.tell() == 50000 + + reader.close() + + +def test_parallel_reader_context_manager() -> None: + """Test context manager properly closes resources.""" + store = MemoryStore() + store.put("test.bin", b"hello world") + + with ObstoreParallelReader(store=store, path="test.bin") as reader: + data = reader.readall() + assert data == b"hello world" + + # After closing, operations should raise + try: + reader.read(1) + assert False, "Expected ValueError" + except ValueError: + pass + + +def test_parallel_reader_read_beyond_eof() -> None: + """Test reading beyond end of file.""" + store = MemoryStore() + store.put("test.bin", b"short") + + reader = ObstoreParallelReader(store=store, path="test.bin", chunk_size=100) + + # Read more than available + data = reader.read(1000) + assert data == b"short" + assert reader.tell() == 5 + + # Read at EOF returns empty + data = reader.read(100) + assert data == b"" + + reader.close() + + +def test_parallel_reader_caching() -> None: + """Test that the LRU cache works correctly.""" + store = MemoryStore() + data = bytes(range(256)) * 10 # 2560 bytes + store.put("test.bin", data) + + # Small cache to test eviction + reader = ObstoreParallelReader( + store=store, path="test.bin", chunk_size=100, max_cached_chunks=3 + ) + + # Read first 100 bytes (caches chunk 0) + chunk0 = reader.read(100) + assert chunk0 == data[:100] + + # Seek to chunk 5 and read (should trigger cache eviction) + reader.seek(500) + chunk5 = reader.read(100) + assert chunk5 == data[500:600] + + # Seek back to start and read again (chunk 0 should be re-fetched) + reader.seek(0) + chunk0_again = reader.read(100) + assert chunk0_again == data[:100] + + reader.close() + + +# ============================================================================= +# ObstoreHybridReader Tests +# ============================================================================= + + +def test_hybrid_reader(local_netcdf4_file) -> None: + """Test that ObstoreHybridReader works with xarray.""" + ds_fsspec = xr.open_dataset(local_netcdf4_file, engine="h5netcdf") + with ObstoreHybridReader(store=LocalStore(), path=local_netcdf4_file) as reader: + ds_obstore = xr.open_dataset(reader, engine="h5netcdf") + xr.testing.assert_allclose(ds_fsspec, ds_obstore) + + +def test_hybrid_reader_interface(local_netcdf4_file) -> None: + """Test that ObstoreHybridReader implements the same interface as ObstoreReader.""" + store = LocalStore() + regular_reader = ObstoreReader(store=store, path=local_netcdf4_file) + hybrid_reader = ObstoreHybridReader(store=store, path=local_netcdf4_file) + + # Test readall + data_regular = regular_reader.readall() + data_hybrid = hybrid_reader.readall() + assert data_regular == data_hybrid + assert isinstance(data_hybrid, bytes) + + hybrid_reader.close() + + +def test_hybrid_reader_multiple_reads(local_netcdf4_file) -> None: + """Test that ObstoreHybridReader can perform multiple reads.""" + store = LocalStore() + reader = ObstoreHybridReader(store=store, path=local_netcdf4_file) + + # Read the first 100 bytes + chunk1 = reader.read(100) + assert len(chunk1) == 100 + assert isinstance(chunk1, bytes) + + # Read the next 100 bytes + chunk2 = reader.read(100) + assert len(chunk2) == 100 + assert isinstance(chunk2, bytes) + + # The two chunks should be different (different parts of the file) + assert chunk1 != chunk2 + + # Test tell + position = reader.tell() + assert position == 200 + + # Test seek + reader.seek(0) + assert reader.tell() == 0 + + # Re-reading from the beginning should give us the same data + chunk1_again = reader.read(100) + assert chunk1 == chunk1_again + + reader.close() + + +def test_hybrid_reader_exponential_readahead() -> None: + """Test that the exponential readahead cache grows correctly.""" + store = MemoryStore() + data = bytes(range(256)) * 100 # 25600 bytes + store.put("test.bin", data) + + reader = ObstoreHybridReader( + store=store, + path="test.bin", + initial_readahead=100, + readahead_multiplier=2.0, + ) + + # First read triggers initial readahead (100 bytes) + chunk = reader.read(50) + assert chunk == data[:50] + assert reader._seq_len >= 100 # Should have fetched at least initial size + + # Sequential read should use cached data or extend + chunk = reader.read(100) + assert chunk == data[50:150] + + # After multiple reads, readahead should have grown + reader.read(500) + # Readahead should have grown: 100 -> 200 -> 400 -> ... + assert reader._last_readahead_size >= 100 + + reader.close() + + +def test_hybrid_reader_random_access() -> None: + """Test that random access uses chunk-based fetching.""" + store = MemoryStore() + data = bytes(range(256)) * 100 # 25600 bytes + store.put("test.bin", data) + + reader = ObstoreHybridReader( + store=store, + path="test.bin", + initial_readahead=100, + chunk_size=500, + ) + + # First read from start (uses sequential cache) + reader.read(50) + assert reader._seq_len > 0 + + # Seek way past sequential cache + reader.seek(10000) + chunk = reader.read(100) + assert chunk == data[10000:10100] + + # Should have used chunk cache, not extended sequential + # Sequential cache should still be small + assert reader._seq_len < 1000 + + reader.close() + + +def test_hybrid_reader_seek_whence() -> None: + """Test seek with different whence values.""" + store = MemoryStore() + data = b"0123456789" * 100 # 1000 bytes + store.put("test.bin", data) + + reader = ObstoreHybridReader(store=store, path="test.bin") + + # whence=0 (SEEK_SET): from start + pos = reader.seek(500) + assert pos == 500 + assert reader.tell() == 500 + + # whence=1 (SEEK_CUR): from current position + pos = reader.seek(100, 1) + assert pos == 600 + assert reader.tell() == 600 + + # whence=2 (SEEK_END): from end + pos = reader.seek(-100, 2) + assert pos == 900 + assert reader.tell() == 900 + + reader.close() + + +def test_hybrid_reader_context_manager() -> None: + """Test context manager properly closes resources.""" + store = MemoryStore() + store.put("test.bin", b"hello world") + + with ObstoreHybridReader(store=store, path="test.bin") as reader: + data = reader.readall() + assert data == b"hello world" + + # After closing, operations should raise + try: + reader.read(1) + assert False, "Expected ValueError" + except ValueError: + pass + + +def test_hybrid_reader_read_beyond_eof() -> None: + """Test reading beyond end of file.""" + store = MemoryStore() + store.put("test.bin", b"short") + + reader = ObstoreHybridReader(store=store, path="test.bin") + + # Read more than available + data = reader.read(1000) + assert data == b"short" + assert reader.tell() == 5 + + # Read at EOF returns empty + data = reader.read(100) + assert data == b"" + + reader.close() + + +def test_hybrid_reader_mixed_access_pattern() -> None: + """Test mixed sequential and random access patterns.""" + store = MemoryStore() + data = bytes(range(256)) * 200 # 51200 bytes + store.put("test.bin", data) + + reader = ObstoreHybridReader( + store=store, + path="test.bin", + initial_readahead=1000, + chunk_size=2000, + ) + + # Start with sequential reads (uses readahead cache) + chunk1 = reader.read(500) + assert chunk1 == data[:500] + + chunk2 = reader.read(500) + assert chunk2 == data[500:1000] + + # Jump to random location (uses chunk cache) + reader.seek(40000) + chunk3 = reader.read(1000) + assert chunk3 == data[40000:41000] + + # Jump back near start (should still use sequential cache) + reader.seek(100) + chunk4 = reader.read(200) + assert chunk4 == data[100:300] + + # Jump to another random location + reader.seek(25000) + chunk5 = reader.read(500) + assert chunk5 == data[25000:25500] + + reader.close()