|
50 | 50 | PrimitiveType, |
51 | 51 | StringType, |
52 | 52 | StructType, |
| 53 | + strtobool, |
53 | 54 | ) |
| 55 | +from pyiceberg.utils.config import Config |
54 | 56 |
|
55 | 57 | UNASSIGNED_SEQ = -1 |
56 | 58 | DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 |
@@ -891,13 +893,79 @@ def __hash__(self) -> int: |
891 | 893 | return hash(self.manifest_path) |
892 | 894 |
|
893 | 895 |
|
894 | | -# Global cache for ManifestFile objects, keyed by manifest_path. |
895 | | -# This deduplicates ManifestFile objects across manifest lists, which commonly |
896 | | -# share manifests after append operations. |
897 | | -_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128) |
| 896 | +class _ManifestCacheManager: |
| 897 | + """Manages the manifest cache with lazy initialization from config.""" |
898 | 898 |
|
899 | | -# Lock for thread-safe cache access |
900 | | -_manifest_cache_lock = threading.RLock() |
| 899 | + _DEFAULT_SIZE = 128 |
| 900 | + |
| 901 | + def __init__(self) -> None: |
| 902 | + self._cache: LRUCache[str, ManifestFile] | None = None |
| 903 | + self._initialized = False |
| 904 | + self._lock = threading.RLock() |
| 905 | + |
| 906 | + def get_cache(self) -> LRUCache[str, ManifestFile] | None: |
| 907 | + """Return the cache if enabled, else None. Initializes from config on first call.""" |
| 908 | + with self._lock: |
| 909 | + if self._initialized: |
| 910 | + return self._cache |
| 911 | + |
| 912 | + config = Config().config |
| 913 | + |
| 914 | + # Extract nested config |
| 915 | + manifest_val = config.get("manifest") |
| 916 | + manifest_config: dict[str, Any] = manifest_val if isinstance(manifest_val, dict) else {} |
| 917 | + cache_val = manifest_config.get("cache") |
| 918 | + cache_config: dict[str, Any] = cache_val if isinstance(cache_val, dict) else {} |
| 919 | + |
| 920 | + # Parse and validate enabled flag |
| 921 | + enabled_raw = cache_config.get("enabled") |
| 922 | + enabled = True |
| 923 | + if enabled_raw is not None: |
| 924 | + try: |
| 925 | + enabled = bool(strtobool(str(enabled_raw))) |
| 926 | + except (ValueError, AttributeError) as err: |
| 927 | + raise ValueError( |
| 928 | + f"manifest.cache.enabled should be a boolean or left unset. Current value: {enabled_raw}" |
| 929 | + ) from err |
| 930 | + |
| 931 | + # Parse and validate cache size |
| 932 | + size_raw = cache_config.get("size") |
| 933 | + size = self._DEFAULT_SIZE |
| 934 | + if size_raw is not None: |
| 935 | + try: |
| 936 | + size = int(str(size_raw)) |
| 937 | + except (ValueError, TypeError) as err: |
| 938 | + raise ValueError( |
| 939 | + f"manifest.cache.size should be a positive integer or left unset. Current value: {size_raw}" |
| 940 | + ) from err |
| 941 | + if size < 1: |
| 942 | + raise ValueError(f"manifest.cache.size must be >= 1. Current value: {size}") |
| 943 | + |
| 944 | + if enabled: |
| 945 | + self._cache = LRUCache(maxsize=size) |
| 946 | + self._initialized = True |
| 947 | + return self._cache |
| 948 | + |
| 949 | + def clear(self) -> None: |
| 950 | + """Clear the cache contents. No-op if cache is disabled.""" |
| 951 | + cache = self.get_cache() |
| 952 | + if cache is not None: |
| 953 | + with self._lock: |
| 954 | + cache.clear() |
| 955 | + |
| 956 | + |
| 957 | +# Module-level cache manager instance |
| 958 | +_manifest_cache_manager = _ManifestCacheManager() |
| 959 | + |
| 960 | + |
| 961 | +def _get_manifest_cache() -> LRUCache[str, ManifestFile] | None: |
| 962 | + """Return the manifest cache if enabled, else None. Initializes from config on first call.""" |
| 963 | + return _manifest_cache_manager.get_cache() |
| 964 | + |
| 965 | + |
| 966 | +def clear_manifest_cache() -> None: |
| 967 | + """Clear the manifest cache. No-op if cache is disabled.""" |
| 968 | + _manifest_cache_manager.clear() |
901 | 969 |
|
902 | 970 |
|
903 | 971 | def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: |
@@ -927,14 +995,18 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: |
927 | 995 | file = io.new_input(manifest_list) |
928 | 996 | manifest_files = list(read_manifest_list(file)) |
929 | 997 |
|
| 998 | + cache = _get_manifest_cache() |
| 999 | + if cache is None: |
| 1000 | + return tuple(manifest_files) |
| 1001 | + |
930 | 1002 | result = [] |
931 | | - with _manifest_cache_lock: |
| 1003 | + with _manifest_cache_manager._lock: |
932 | 1004 | for manifest_file in manifest_files: |
933 | 1005 | manifest_path = manifest_file.manifest_path |
934 | | - if manifest_path in _manifest_cache: |
935 | | - result.append(_manifest_cache[manifest_path]) |
| 1006 | + if manifest_path in cache: |
| 1007 | + result.append(cache[manifest_path]) |
936 | 1008 | else: |
937 | | - _manifest_cache[manifest_path] = manifest_file |
| 1009 | + cache[manifest_path] = manifest_file |
938 | 1010 | result.append(manifest_file) |
939 | 1011 |
|
940 | 1012 | return tuple(result) |
|
0 commit comments