Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions docs/caching-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,117 @@ For workloads requiring cross-worker cache sharing, consider:
- Shared filesystem caching
- Restructuring workloads to minimize cross-worker file access

### Pickling and Serialization

`CachingReadableStore` supports Python's pickle protocol for use with multiprocessing and distributed frameworks. When a `CachingReadableStore` is pickled and unpickled (e.g., sent to a worker process), it is **recreated with an empty cache**.

```python
import pickle
from obspec_utils.cache import CachingReadableStore

# Main process: create and populate cache
cached_store = CachingReadableStore(store, max_size=256 * 1024 * 1024)
cached_store.get("file1.nc") # Cached
cached_store.get("file2.nc") # Cached
print(cached_store.cache_size) # Non-zero

# Simulate sending to worker (pickle roundtrip)
restored = pickle.loads(pickle.dumps(cached_store))

# Worker receives store with empty cache
print(restored.cache_size) # 0
print(restored._max_size) # 256 * 1024 * 1024 (preserved)
```

**Design rationale:**

1. **Cache contents are not serialized**: Serializing the full cache would defeat the purpose of distributed processing—workers would receive potentially huge payloads, and the data may not even be relevant to their partition.

2. **Fresh cache per worker**: Each worker builds its own cache based on its workload. For file-partitioned workloads (common in data processing), this is optimal—each worker caches only the files it processes.

3. **Configuration is preserved**: The `max_size` and underlying store are preserved, so workers use the same caching policy as the main process.

**Requirements for pickling:**

The underlying store (`_store`) must also be picklable. For cloud stores, this typically means using stores that can be reconstructed from configuration:

```python
# Works: store can be pickled (configuration-based)
from obstore.store import S3Store
s3_store = S3Store(bucket="my-bucket", region="us-east-1")
cached = CachingReadableStore(s3_store)
pickle.dumps(cached) # OK

# May not work: some Rust-backed stores aren't picklable
from obstore.store import MemoryStore
mem_store = MemoryStore()
cached = CachingReadableStore(mem_store)
pickle.dumps(cached) # TypeError: cannot pickle 'MemoryStore' object
```

### Distributed Usage Patterns

#### Pattern 1: File-Partitioned Workloads (Recommended)

When each worker processes a distinct set of files, per-worker caching works well:

```python
from concurrent.futures import ProcessPoolExecutor
from obspec_utils.cache import CachingReadableStore

def process_files(cached_store, file_paths):
"""Each worker gets its own cache, processes its own files."""
results = []
for path in file_paths:
# First access: fetch from network, cache locally
data = cached_store.get(path)
# Subsequent accesses to same file: served from cache
result = analyze(data)
results.append(result)
return results

# Create cached store in main process
store = S3Store(bucket="my-bucket")
cached_store = CachingReadableStore(store, max_size=512 * 1024 * 1024)

# Partition files across workers
all_files = ["file1.nc", "file2.nc", "file3.nc", "file4.nc"]
partitions = [all_files[:2], all_files[2:]]

with ProcessPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(process_files, cached_store, partition)
for partition in partitions
]
results = [f.result() for f in futures]
```

#### Pattern 2: Dask Distributed

With Dask, the cached store is serialized to each worker:

```python
import dask
from dask.distributed import Client
from obspec_utils.cache import CachingReadableStore

client = Client()

store = S3Store(bucket="my-bucket")
cached_store = CachingReadableStore(store)

@dask.delayed
def process_file(cached_store, path):
# Worker receives cached_store with empty cache
# Cache builds up as this worker processes files
data = cached_store.get(path)
return analyze(data)

tasks = [process_file(cached_store, f) for f in file_list]
results = dask.compute(*tasks)
```


## Decision Guide

### Which reader should I use?
Expand Down
14 changes: 14 additions & 0 deletions src/obspec_utils/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ def __getattr__(self, name: str) -> Any:
"""
return getattr(self._store, name)

def __reduce__(self):
"""Support pickling for multiprocessing and distributed frameworks.

Returns a fresh instance with an empty cache. This is intentional:
serializing the full cache contents would be inefficient for distributed
workloads where each worker typically processes different files.

The underlying store and max_size configuration are preserved.
"""
return (
self.__class__,
(self._store, self._max_size),
)

def _add_to_cache(self, path: str, data: bytes) -> None:
"""Add data to cache, evicting LRU entries if needed.

Expand Down
187 changes: 187 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for CachingReadableStore."""

import pickle
import threading
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -389,3 +390,189 @@ def test_forwards_unknown_attributes(self):
# This tests that __getattr__ forwards correctly
assert hasattr(cached, "put") # MemoryStore has put
assert hasattr(cached, "delete") # MemoryStore has delete


class PicklableStore:
"""A simple picklable store for testing pickle support.

MemoryStore from obstore is Rust-backed and not picklable.
This pure-Python store allows testing CachingReadableStore's pickle support.
"""

def __init__(self, data: dict[str, bytes] | None = None):
self._data = data or {}

def put(self, path: str, data: bytes) -> None:
self._data[path] = data

def get(self, path: str, *, options=None):
return _PicklableGetResult(self._data[path])

async def get_async(self, path: str, *, options=None):
return _PicklableGetResultAsync(self._data[path])

def get_range(
self,
path: str,
*,
start: int,
end: int | None = None,
length: int | None = None,
):
data = self._data[path]
if end is not None:
return data[start:end]
elif length is not None:
return data[start : start + length]
return data[start:]

async def get_range_async(
self,
path: str,
*,
start: int,
end: int | None = None,
length: int | None = None,
):
return self.get_range(path, start=start, end=end, length=length)

def get_ranges(self, path: str, *, starts, ends=None, lengths=None):
if ends is not None:
return [self._data[path][s:e] for s, e in zip(starts, ends)]
elif lengths is not None:
return [
self._data[path][start : start + length]
for start, length in zip(starts, lengths)
]
raise ValueError("Must provide ends or lengths")

async def get_ranges_async(self, path: str, *, starts, ends=None, lengths=None):
return self.get_ranges(path, starts=starts, ends=ends, lengths=lengths)


class _PicklableGetResult:
"""Mock GetResult for PicklableStore."""

def __init__(self, data: bytes):
self._data = data

def buffer(self):
return self._data


class _PicklableGetResultAsync:
"""Mock async GetResult for PicklableStore."""

def __init__(self, data: bytes):
self._data = data

async def buffer_async(self):
return self._data


class TestPickling:
"""Tests for pickling support (needed for multiprocessing/distributed)."""

def test_pickle_roundtrip(self):
"""CachingReadableStore can be pickled and unpickled."""
source = PicklableStore()
source.put("file.txt", b"hello world")

cached = CachingReadableStore(source, max_size=128 * 1024 * 1024)

# Pickle and unpickle
pickled = pickle.dumps(cached)
restored = pickle.loads(pickled)

assert isinstance(restored, CachingReadableStore)

def test_pickle_preserves_store_and_max_size(self):
"""Unpickled store preserves underlying store and max_size."""
source = PicklableStore()
source.put("file.txt", b"hello world")

custom_max_size = 64 * 1024 * 1024
cached = CachingReadableStore(source, max_size=custom_max_size)

restored = pickle.loads(pickle.dumps(cached))

# max_size should be preserved
assert restored._max_size == custom_max_size

# underlying store should work (can fetch data)
result = restored.get("file.txt")
assert bytes(result.buffer()) == b"hello world"

def test_pickle_creates_empty_cache(self):
"""Unpickled store has a fresh empty cache."""
source = PicklableStore()
source.put("file.txt", b"hello world")
source.put("file2.txt", b"more data")

cached = CachingReadableStore(source)

# Populate the cache
cached.get("file.txt")
cached.get("file2.txt")
assert cached.cache_size > 0
assert len(cached.cached_paths) == 2

# Pickle and unpickle
restored = pickle.loads(pickle.dumps(cached))

# Restored cache should be empty
assert restored.cache_size == 0
assert len(restored.cached_paths) == 0

def test_pickle_restored_store_is_functional(self):
"""Restored store can cache new data normally."""
source = PicklableStore()
source.put("file.txt", b"hello world")

cached = CachingReadableStore(source, max_size=100)
cached.get("file.txt")

restored = pickle.loads(pickle.dumps(cached))

# Restored store should be able to fetch and cache
result = restored.get("file.txt")
assert bytes(result.buffer()) == b"hello world"
assert "file.txt" in restored.cached_paths
assert restored.cache_size == len(b"hello world")

def test_pickle_restored_store_lru_works(self):
"""Restored store has working LRU eviction."""
source = PicklableStore()
source.put("file1.txt", b"a" * 100)
source.put("file2.txt", b"b" * 100)
source.put("file3.txt", b"c" * 100)

cached = CachingReadableStore(source, max_size=200)

restored = pickle.loads(pickle.dumps(cached))

# Cache two files
restored.get("file1.txt")
restored.get("file2.txt")
assert restored.cached_paths == ["file1.txt", "file2.txt"]

# Third file should evict first
restored.get("file3.txt")
assert restored.cached_paths == ["file2.txt", "file3.txt"]

def test_pickle_multiple_protocols(self):
"""Pickling works with different pickle protocols."""
source = PicklableStore()
source.put("file.txt", b"hello world")

cached = CachingReadableStore(source)
cached.get("file.txt")

# Test all available protocols
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
pickled = pickle.dumps(cached, protocol=protocol)
restored = pickle.loads(pickled)

assert restored.cache_size == 0 # Fresh cache
result = restored.get("file.txt")
assert bytes(result.buffer()) == b"hello world"
Loading