diff --git a/docs/caching-architecture.md b/docs/caching-architecture.md index b4aa9c1..b74ab72 100644 --- a/docs/caching-architecture.md +++ b/docs/caching-architecture.md @@ -235,6 +235,117 @@ For workloads requiring cross-worker cache sharing, consider: - Shared filesystem caching - Restructuring workloads to minimize cross-worker file access +### Pickling and Serialization + +`CachingReadableStore` supports Python's pickle protocol for use with multiprocessing and distributed frameworks. When a `CachingReadableStore` is pickled and unpickled (e.g., sent to a worker process), it is **recreated with an empty cache**. + +```python +import pickle +from obspec_utils.cache import CachingReadableStore + +# Main process: create and populate cache +cached_store = CachingReadableStore(store, max_size=256 * 1024 * 1024) +cached_store.get("file1.nc") # Cached +cached_store.get("file2.nc") # Cached +print(cached_store.cache_size) # Non-zero + +# Simulate sending to worker (pickle roundtrip) +restored = pickle.loads(pickle.dumps(cached_store)) + +# Worker receives store with empty cache +print(restored.cache_size) # 0 +print(restored._max_size) # 256 * 1024 * 1024 (preserved) +``` + +**Design rationale:** + +1. **Cache contents are not serialized**: Serializing the full cache would defeat the purpose of distributed processing—workers would receive potentially huge payloads, and the data may not even be relevant to their partition. + +2. **Fresh cache per worker**: Each worker builds its own cache based on its workload. For file-partitioned workloads (common in data processing), this is optimal—each worker caches only the files it processes. + +3. **Configuration is preserved**: The `max_size` and underlying store are preserved, so workers use the same caching policy as the main process. + +**Requirements for pickling:** + +The underlying store (`_store`) must also be picklable. For cloud stores, this typically means using stores that can be reconstructed from configuration: + +```python +# Works: store can be pickled (configuration-based) +from obstore.store import S3Store +s3_store = S3Store(bucket="my-bucket", region="us-east-1") +cached = CachingReadableStore(s3_store) +pickle.dumps(cached) # OK + +# May not work: some Rust-backed stores aren't picklable +from obstore.store import MemoryStore +mem_store = MemoryStore() +cached = CachingReadableStore(mem_store) +pickle.dumps(cached) # TypeError: cannot pickle 'MemoryStore' object +``` + +### Distributed Usage Patterns + +#### Pattern 1: File-Partitioned Workloads (Recommended) + +When each worker processes a distinct set of files, per-worker caching works well: + +```python +from concurrent.futures import ProcessPoolExecutor +from obspec_utils.cache import CachingReadableStore + +def process_files(cached_store, file_paths): + """Each worker gets its own cache, processes its own files.""" + results = [] + for path in file_paths: + # First access: fetch from network, cache locally + data = cached_store.get(path) + # Subsequent accesses to same file: served from cache + result = analyze(data) + results.append(result) + return results + +# Create cached store in main process +store = S3Store(bucket="my-bucket") +cached_store = CachingReadableStore(store, max_size=512 * 1024 * 1024) + +# Partition files across workers +all_files = ["file1.nc", "file2.nc", "file3.nc", "file4.nc"] +partitions = [all_files[:2], all_files[2:]] + +with ProcessPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(process_files, cached_store, partition) + for partition in partitions + ] + results = [f.result() for f in futures] +``` + +#### Pattern 2: Dask Distributed + +With Dask, the cached store is serialized to each worker: + +```python +import dask +from dask.distributed import Client +from obspec_utils.cache import CachingReadableStore + +client = Client() + +store = S3Store(bucket="my-bucket") +cached_store = CachingReadableStore(store) + +@dask.delayed +def process_file(cached_store, path): + # Worker receives cached_store with empty cache + # Cache builds up as this worker processes files + data = cached_store.get(path) + return analyze(data) + +tasks = [process_file(cached_store, f) for f in file_list] +results = dask.compute(*tasks) +``` + + ## Decision Guide ### Which reader should I use? diff --git a/src/obspec_utils/cache.py b/src/obspec_utils/cache.py index 0d8cf7e..368e26c 100644 --- a/src/obspec_utils/cache.py +++ b/src/obspec_utils/cache.py @@ -113,6 +113,20 @@ def __getattr__(self, name: str) -> Any: """ return getattr(self._store, name) + def __reduce__(self): + """Support pickling for multiprocessing and distributed frameworks. + + Returns a fresh instance with an empty cache. This is intentional: + serializing the full cache contents would be inefficient for distributed + workloads where each worker typically processes different files. + + The underlying store and max_size configuration are preserved. + """ + return ( + self.__class__, + (self._store, self._max_size), + ) + def _add_to_cache(self, path: str, data: bytes) -> None: """Add data to cache, evicting LRU entries if needed. diff --git a/tests/test_cache.py b/tests/test_cache.py index bde855c..c273cfe 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,5 +1,6 @@ """Tests for CachingReadableStore.""" +import pickle import threading from concurrent.futures import ThreadPoolExecutor @@ -389,3 +390,189 @@ def test_forwards_unknown_attributes(self): # This tests that __getattr__ forwards correctly assert hasattr(cached, "put") # MemoryStore has put assert hasattr(cached, "delete") # MemoryStore has delete + + +class PicklableStore: + """A simple picklable store for testing pickle support. + + MemoryStore from obstore is Rust-backed and not picklable. + This pure-Python store allows testing CachingReadableStore's pickle support. + """ + + def __init__(self, data: dict[str, bytes] | None = None): + self._data = data or {} + + def put(self, path: str, data: bytes) -> None: + self._data[path] = data + + def get(self, path: str, *, options=None): + return _PicklableGetResult(self._data[path]) + + async def get_async(self, path: str, *, options=None): + return _PicklableGetResultAsync(self._data[path]) + + def get_range( + self, + path: str, + *, + start: int, + end: int | None = None, + length: int | None = None, + ): + data = self._data[path] + if end is not None: + return data[start:end] + elif length is not None: + return data[start : start + length] + return data[start:] + + async def get_range_async( + self, + path: str, + *, + start: int, + end: int | None = None, + length: int | None = None, + ): + return self.get_range(path, start=start, end=end, length=length) + + def get_ranges(self, path: str, *, starts, ends=None, lengths=None): + if ends is not None: + return [self._data[path][s:e] for s, e in zip(starts, ends)] + elif lengths is not None: + return [ + self._data[path][start : start + length] + for start, length in zip(starts, lengths) + ] + raise ValueError("Must provide ends or lengths") + + async def get_ranges_async(self, path: str, *, starts, ends=None, lengths=None): + return self.get_ranges(path, starts=starts, ends=ends, lengths=lengths) + + +class _PicklableGetResult: + """Mock GetResult for PicklableStore.""" + + def __init__(self, data: bytes): + self._data = data + + def buffer(self): + return self._data + + +class _PicklableGetResultAsync: + """Mock async GetResult for PicklableStore.""" + + def __init__(self, data: bytes): + self._data = data + + async def buffer_async(self): + return self._data + + +class TestPickling: + """Tests for pickling support (needed for multiprocessing/distributed).""" + + def test_pickle_roundtrip(self): + """CachingReadableStore can be pickled and unpickled.""" + source = PicklableStore() + source.put("file.txt", b"hello world") + + cached = CachingReadableStore(source, max_size=128 * 1024 * 1024) + + # Pickle and unpickle + pickled = pickle.dumps(cached) + restored = pickle.loads(pickled) + + assert isinstance(restored, CachingReadableStore) + + def test_pickle_preserves_store_and_max_size(self): + """Unpickled store preserves underlying store and max_size.""" + source = PicklableStore() + source.put("file.txt", b"hello world") + + custom_max_size = 64 * 1024 * 1024 + cached = CachingReadableStore(source, max_size=custom_max_size) + + restored = pickle.loads(pickle.dumps(cached)) + + # max_size should be preserved + assert restored._max_size == custom_max_size + + # underlying store should work (can fetch data) + result = restored.get("file.txt") + assert bytes(result.buffer()) == b"hello world" + + def test_pickle_creates_empty_cache(self): + """Unpickled store has a fresh empty cache.""" + source = PicklableStore() + source.put("file.txt", b"hello world") + source.put("file2.txt", b"more data") + + cached = CachingReadableStore(source) + + # Populate the cache + cached.get("file.txt") + cached.get("file2.txt") + assert cached.cache_size > 0 + assert len(cached.cached_paths) == 2 + + # Pickle and unpickle + restored = pickle.loads(pickle.dumps(cached)) + + # Restored cache should be empty + assert restored.cache_size == 0 + assert len(restored.cached_paths) == 0 + + def test_pickle_restored_store_is_functional(self): + """Restored store can cache new data normally.""" + source = PicklableStore() + source.put("file.txt", b"hello world") + + cached = CachingReadableStore(source, max_size=100) + cached.get("file.txt") + + restored = pickle.loads(pickle.dumps(cached)) + + # Restored store should be able to fetch and cache + result = restored.get("file.txt") + assert bytes(result.buffer()) == b"hello world" + assert "file.txt" in restored.cached_paths + assert restored.cache_size == len(b"hello world") + + def test_pickle_restored_store_lru_works(self): + """Restored store has working LRU eviction.""" + source = PicklableStore() + source.put("file1.txt", b"a" * 100) + source.put("file2.txt", b"b" * 100) + source.put("file3.txt", b"c" * 100) + + cached = CachingReadableStore(source, max_size=200) + + restored = pickle.loads(pickle.dumps(cached)) + + # Cache two files + restored.get("file1.txt") + restored.get("file2.txt") + assert restored.cached_paths == ["file1.txt", "file2.txt"] + + # Third file should evict first + restored.get("file3.txt") + assert restored.cached_paths == ["file2.txt", "file3.txt"] + + def test_pickle_multiple_protocols(self): + """Pickling works with different pickle protocols.""" + source = PicklableStore() + source.put("file.txt", b"hello world") + + cached = CachingReadableStore(source) + cached.get("file.txt") + + # Test all available protocols + for protocol in range(pickle.HIGHEST_PROTOCOL + 1): + pickled = pickle.dumps(cached, protocol=protocol) + restored = pickle.loads(pickled) + + assert restored.cache_size == 0 # Fresh cache + result = restored.get("file.txt") + assert bytes(result.buffer()) == b"hello world"