|
16 | 16 | # under the License. |
17 | 17 | # pylint: disable=redefined-outer-name,arguments-renamed,fixme |
18 | 18 | from tempfile import TemporaryDirectory |
| 19 | +from unittest import mock |
19 | 20 |
|
20 | 21 | import fastavro |
21 | 22 | import pytest |
22 | 23 |
|
| 24 | +import pyiceberg.manifest as manifest_module |
23 | 25 | from pyiceberg.avro.codecs import AvroCompressionCodec |
24 | 26 | from pyiceberg.io import load_file_io |
25 | 27 | from pyiceberg.io.pyarrow import PyArrowFileIO |
|
32 | 34 | ManifestEntryStatus, |
33 | 35 | ManifestFile, |
34 | 36 | PartitionFieldSummary, |
35 | | - _manifest_cache, |
| 37 | + _get_manifest_cache, |
36 | 38 | _manifests, |
| 39 | + clear_manifest_cache, |
37 | 40 | read_manifest_list, |
38 | 41 | write_manifest, |
39 | 42 | write_manifest_list, |
|
46 | 49 |
|
47 | 50 |
|
48 | 51 | @pytest.fixture(autouse=True) |
49 | | -def clear_global_manifests_cache() -> None: |
50 | | - # Clear the global cache before each test |
51 | | - _manifest_cache.clear() |
| 52 | +def reset_global_manifests_cache() -> None: |
| 53 | + # Reset cache state before each test so config is re-read |
| 54 | + manifest_module._manifest_cache_manager._cache = None |
| 55 | + manifest_module._manifest_cache_manager._initialized = False |
52 | 56 |
|
53 | 57 |
|
54 | 58 | def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None: |
@@ -804,9 +808,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: |
804 | 808 |
|
805 | 809 | # Verify cache size - should only have 3 unique ManifestFile objects |
806 | 810 | # instead of 1 + 2 + 3 = 6 objects as with the old approach |
807 | | - assert len(_manifest_cache) == 3, ( |
808 | | - f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" |
809 | | - ) |
| 811 | + cache = _get_manifest_cache() |
| 812 | + assert cache is not None, "Manifest cache should be enabled for this test" |
| 813 | + assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}" |
810 | 814 |
|
811 | 815 |
|
812 | 816 | def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: |
@@ -879,9 +883,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: |
879 | 883 | # With the new approach, we should have exactly N objects |
880 | 884 |
|
881 | 885 | # Verify cache has exactly N unique entries |
882 | | - assert len(_manifest_cache) == num_manifests, ( |
| 886 | + cache = _get_manifest_cache() |
| 887 | + assert cache is not None, "Manifest cache should be enabled for this test" |
| 888 | + assert len(cache) == num_manifests, ( |
883 | 889 | f"Cache should contain exactly {num_manifests} ManifestFile objects, " |
884 | | - f"but has {len(_manifest_cache)}. " |
| 890 | + f"but has {len(cache)}. " |
885 | 891 | f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." |
886 | 892 | ) |
887 | 893 |
|
@@ -932,3 +938,117 @@ def test_manifest_writer_tell(format_version: TableVersion) -> None: |
932 | 938 | after_entry_bytes = writer.tell() |
933 | 939 |
|
934 | 940 | assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry" |
| 941 | + |
| 942 | + |
| 943 | +def test_clear_manifest_cache() -> None: |
| 944 | + """Test that clear_manifest_cache() clears cache entries while keeping cache enabled.""" |
| 945 | + io = PyArrowFileIO() |
| 946 | + |
| 947 | + with TemporaryDirectory() as tmp_dir: |
| 948 | + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) |
| 949 | + spec = UNPARTITIONED_PARTITION_SPEC |
| 950 | + |
| 951 | + # Create a manifest file |
| 952 | + manifest_path = f"{tmp_dir}/manifest.avro" |
| 953 | + with write_manifest( |
| 954 | + format_version=2, |
| 955 | + spec=spec, |
| 956 | + schema=schema, |
| 957 | + output_file=io.new_output(manifest_path), |
| 958 | + snapshot_id=1, |
| 959 | + avro_compression="zstandard", |
| 960 | + ) as writer: |
| 961 | + data_file = DataFile.from_args( |
| 962 | + content=DataFileContent.DATA, |
| 963 | + file_path=f"{tmp_dir}/data.parquet", |
| 964 | + file_format=FileFormat.PARQUET, |
| 965 | + partition=Record(), |
| 966 | + record_count=100, |
| 967 | + file_size_in_bytes=1000, |
| 968 | + ) |
| 969 | + writer.add_entry( |
| 970 | + ManifestEntry.from_args( |
| 971 | + status=ManifestEntryStatus.ADDED, |
| 972 | + snapshot_id=1, |
| 973 | + data_file=data_file, |
| 974 | + ) |
| 975 | + ) |
| 976 | + manifest_file = writer.to_manifest_file() |
| 977 | + |
| 978 | + # Create a manifest list |
| 979 | + list_path = f"{tmp_dir}/manifest-list.avro" |
| 980 | + with write_manifest_list( |
| 981 | + format_version=2, |
| 982 | + output_file=io.new_output(list_path), |
| 983 | + snapshot_id=1, |
| 984 | + parent_snapshot_id=None, |
| 985 | + sequence_number=1, |
| 986 | + avro_compression="zstandard", |
| 987 | + ) as list_writer: |
| 988 | + list_writer.add_manifests([manifest_file]) |
| 989 | + |
| 990 | + # Populate the cache |
| 991 | + _manifests(io, list_path) |
| 992 | + |
| 993 | + # Verify cache has entries |
| 994 | + cache = _get_manifest_cache() |
| 995 | + assert cache is not None, "Cache should be enabled" |
| 996 | + assert len(cache) > 0, "Cache should have entries after reading manifests" |
| 997 | + |
| 998 | + # Clear the cache |
| 999 | + clear_manifest_cache() |
| 1000 | + |
| 1001 | + # Verify cache is empty but still enabled |
| 1002 | + cache_after = _get_manifest_cache() |
| 1003 | + assert cache_after is not None, "Cache should still be enabled after clear" |
| 1004 | + assert len(cache_after) == 0, "Cache should be empty after clear" |
| 1005 | + |
| 1006 | + |
| 1007 | +@pytest.mark.parametrize( |
| 1008 | + "env_vars,expected_enabled,expected_size", |
| 1009 | + [ |
| 1010 | + ({}, True, 128), # defaults |
| 1011 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "true"}, True, 128), |
| 1012 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false"}, False, 128), |
| 1013 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, True, 64), |
| 1014 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "256"}, True, 256), |
| 1015 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false", "PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, False, 64), |
| 1016 | + ], |
| 1017 | +) |
| 1018 | +def test_manifest_cache_config_valid_values(env_vars: dict[str, str], expected_enabled: bool, expected_size: int) -> None: |
| 1019 | + """Test that valid config values are applied correctly.""" |
| 1020 | + import os |
| 1021 | + |
| 1022 | + with mock.patch.dict(os.environ, env_vars, clear=False): |
| 1023 | + # Reset cache state so config is re-read |
| 1024 | + manifest_module._manifest_cache_manager._cache = None |
| 1025 | + manifest_module._manifest_cache_manager._initialized = False |
| 1026 | + cache = _get_manifest_cache() |
| 1027 | + |
| 1028 | + if expected_enabled: |
| 1029 | + assert cache is not None, "Cache should be enabled" |
| 1030 | + assert cache.maxsize == expected_size, f"Cache size should be {expected_size}" |
| 1031 | + else: |
| 1032 | + assert cache is None, "Cache should be disabled" |
| 1033 | + |
| 1034 | + |
| 1035 | +@pytest.mark.parametrize( |
| 1036 | + "env_vars,expected_error_substring", |
| 1037 | + [ |
| 1038 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "maybe"}, "manifest.cache.enabled should be a boolean"), |
| 1039 | + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "invalid"}, "manifest.cache.enabled should be a boolean"), |
| 1040 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "abc"}, "manifest.cache.size should be a positive integer"), |
| 1041 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "0"}, "manifest.cache.size must be >= 1"), |
| 1042 | + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "-5"}, "manifest.cache.size must be >= 1"), |
| 1043 | + ], |
| 1044 | +) |
| 1045 | +def test_manifest_cache_config_invalid_values(env_vars: dict[str, str], expected_error_substring: str) -> None: |
| 1046 | + """Test that invalid config values raise ValueError with appropriate message.""" |
| 1047 | + import os |
| 1048 | + |
| 1049 | + with mock.patch.dict(os.environ, env_vars, clear=False): |
| 1050 | + # Reset cache state so config is re-read |
| 1051 | + manifest_module._manifest_cache_manager._cache = None |
| 1052 | + manifest_module._manifest_cache_manager._initialized = False |
| 1053 | + with pytest.raises(ValueError, match=expected_error_substring): |
| 1054 | + _get_manifest_cache() |
0 commit comments