diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 2df1aada479..38894e6b2c6 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4118,6 +4118,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "smallvec", "snafu", "tantivy", "tempfile", diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 04ca343a98e..937369ee223 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -175,7 +175,7 @@ impl BlockingDataset { }), None, Default::default(), - false, // TODO: support enable_v2_manifest_paths + false, ))?; Ok(Self { inner }) } @@ -284,11 +284,13 @@ impl BlockingDataset { transaction: Transaction, store_params: ObjectStoreParams, detached: bool, + enable_v2_manifest_paths: bool, ) -> Result { let new_dataset = RT.block_on( CommitBuilder::new(Arc::new(self.clone().inner)) .with_store_params(store_params) .with_detached(detached) + .enable_v2_manifest_paths(enable_v2_manifest_paths) .execute(transaction), )?; Ok(BlockingDataset { inner: new_dataset }) @@ -412,6 +414,24 @@ pub extern "system" fn Java_org_lance_Dataset_drop<'local>( JObject::null() } +#[no_mangle] +pub extern "system" fn Java_org_lance_Dataset_nativeMigrateManifestPathsV2( + mut env: JNIEnv, + java_dataset: JObject, +) { + ok_or_throw_without_return!( + env, + inner_native_migrate_manifest_paths_v2(&mut env, java_dataset) + ) +} + +fn inner_native_migrate_manifest_paths_v2(env: &mut JNIEnv, java_dataset: JObject) -> Result<()> { + let mut dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + RT.block_on(dataset_guard.inner.migrate_manifest_paths_v2())?; + Ok(()) +} + #[no_mangle] pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( mut env: JNIEnv<'local>, diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index c80ef36bffa..104a0432a3d 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -729,6 +729,7 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>( java_dataset: JObject, java_transaction: JObject, detached_jbool: jboolean, + enable_v2_manifest_paths: jboolean, ) -> JObject<'local> { ok_or_throw!( env, @@ -737,6 +738,7 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>( java_dataset, java_transaction, detached_jbool != 0, + enable_v2_manifest_paths != 0, ) ) } @@ -746,6 +748,7 @@ fn inner_commit_transaction<'local>( java_dataset: JObject, java_transaction: JObject, detached: bool, + enable_v2_manifest_paths: bool, ) -> Result> { let write_param_jobj = env .call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])? @@ -779,7 +782,12 @@ fn inner_commit_transaction<'local>( let new_blocking_ds = { let mut dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; - dataset_guard.commit_transaction(transaction, store_params, detached)? + dataset_guard.commit_transaction( + transaction, + store_params, + detached, + enable_v2_manifest_paths, + )? }; new_blocking_ds.into_java(env) } diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 55e8a8b0983..18415299c61 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -455,7 +455,7 @@ public Transaction.Builder newTransactionBuilder() { * @return A new instance of {@link Dataset} linked to committed version. */ public Dataset commitTransaction(Transaction transaction) { - return commitTransaction(transaction, false); + return commitTransaction(transaction, false, true); } /** @@ -464,12 +464,18 @@ public Dataset commitTransaction(Transaction transaction) { * * @param transaction The transaction to commit * @param detached If true, the commit will not be part of the main dataset lineage. + * @param enableV2ManifestPaths If true, and this is a new dataset, uses the new V2 manifest + * paths. These paths provide more efficient opening of datasets with many versions on object + * stores. This parameter has no effect if the dataset already exists. To migrate an existing + * dataset, instead use the `migrateManifestPathsV2` method. Default is true. WARNING: turning + * this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0). * @return A new instance of {@link Dataset} linked to committed version. */ - public Dataset commitTransaction(Transaction transaction, boolean detached) { + public Dataset commitTransaction( + Transaction transaction, boolean detached, boolean enableV2ManifestPaths) { Preconditions.checkNotNull(transaction); try { - Dataset dataset = nativeCommitTransaction(transaction, detached); + Dataset dataset = nativeCommitTransaction(transaction, detached, enableV2ManifestPaths); if (selfManagedAllocator) { dataset.allocator = new RootAllocator(Long.MAX_VALUE); } else { @@ -481,7 +487,8 @@ public Dataset commitTransaction(Transaction transaction, boolean detached) { } } - private native Dataset nativeCommitTransaction(Transaction transaction, boolean detached); + private native Dataset nativeCommitTransaction( + Transaction transaction, boolean detached, boolean enableV2ManifestPaths); /** * Drop a Dataset. @@ -491,6 +498,26 @@ public Dataset commitTransaction(Transaction transaction, boolean detached) { */ public static native void drop(String path, Map storageOptions); + /** + * Migrate the manifest paths to the new format. + * + *

This will update the manifest to use the new v2 format for paths. + * + *

This function is idempotent, and can be run multiple times without changing the state of the + * object store. + * + *

DANGER: this should not be run while other concurrent operations are happening. And it + * should also run until completion before resuming other operations. + */ + public void migrateManifestPathsV2() { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + nativeMigrateManifestPathsV2(); + } + } + + private native void nativeMigrateManifestPathsV2(); + /** * Add columns to the dataset. * diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index 7c1e33518ee..d17c645c454 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -1137,7 +1137,7 @@ void testCommitTransactionDetachedTrue(@TempDir Path tempDir) { FragmentMetadata fragment = suite.createNewFragment(5); Append append = Append.builder().fragments(Collections.singletonList(fragment)).build(); Transaction transaction = base.newTransactionBuilder().operation(append).build(); - try (Dataset committed = base.commitTransaction(transaction, true)) { + try (Dataset committed = base.commitTransaction(transaction, true, false)) { // Original dataset is not refreshed to the new version. assertEquals(baseVersion, base.version()); assertEquals(baseRowCount, base.countRows()); @@ -1169,7 +1169,7 @@ void testCommitTransactionDetachedTrueOnV1ManifestThrowsUnsupported(@TempDir Pat UnsupportedOperationException ex = assertThrows( UnsupportedOperationException.class, - () -> dataset.commitTransaction(transaction, true)); + () -> dataset.commitTransaction(transaction, true, false)); // Error should indicate detached commits are not supported on v1 manifests. assertNotNull(ex.getMessage()); diff --git a/java/src/test/java/org/lance/ManifestPathsV2Test.java b/java/src/test/java/org/lance/ManifestPathsV2Test.java new file mode 100644 index 00000000000..a724f75a887 --- /dev/null +++ b/java/src/test/java/org/lance/ManifestPathsV2Test.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ManifestPathsV2Test { + private static final Pattern V2_MANIFEST_PATTERN = Pattern.compile("\\d{20}\\.manifest"); + + @Test + void testMigrateManifestPathsFromV1ToV2(@TempDir Path tempDir) throws IOException { + String datasetPath = tempDir.resolve("testMigrateManifestPathsFromV1ToV2").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + // Create v1 test. + try (Dataset dataset = testDataset.createEmptyDataset(false)) { + Path versionsDir = Paths.get(datasetPath).resolve("_versions"); + assertTrue(Files.isDirectory(versionsDir), "_versions directory should exist"); + List manifestsBefore; + try (Stream stream = Files.list(versionsDir)) { + manifestsBefore = + stream + .filter( + p -> + Files.isRegularFile(p) + && p.getFileName().toString().endsWith(".manifest")) + .collect(Collectors.toList()); + } + assertEquals(1, manifestsBefore.size(), "Expected single manifest before migration"); + assertEquals("1.manifest", manifestsBefore.get(0).getFileName().toString()); + + // Migrate to v2. + dataset.migrateManifestPathsV2(); + + List manifestsAfter; + try (Stream stream = Files.list(versionsDir)) { + manifestsAfter = + stream + .filter( + p -> + Files.isRegularFile(p) + && p.getFileName().toString().endsWith(".manifest")) + .collect(Collectors.toList()); + } + assertEquals(1, manifestsAfter.size(), "Expected single manifest after migration"); + String fileName = manifestsAfter.get(0).getFileName().toString(); + assertTrue( + V2_MANIFEST_PATTERN.matcher(fileName).matches(), + "Manifest should use V2 naming scheme"); + } + } + } + + @Test + void testCreateDatasetUsesV2ManifestByDefault(@TempDir Path tempDir) throws IOException { + String datasetPath = tempDir.resolve("testCreateDatasetUsesV2ManifestByDefault").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("name", new ArrowType.Utf8()))); + WriteParams params = new WriteParams.Builder().withMode(WriteParams.WriteMode.CREATE).build(); + try (Dataset dataset = Dataset.create(allocator, datasetPath, schema, params)) { + Path versionsDir = Paths.get(datasetPath).resolve("_versions"); + assertTrue(Files.isDirectory(versionsDir), "_versions directory should exist"); + List manifests; + try (Stream stream = Files.list(versionsDir)) { + manifests = + stream + .filter( + p -> + Files.isRegularFile(p) + && p.getFileName().toString().endsWith(".manifest")) + .collect(Collectors.toList()); + } + assertEquals(1, manifests.size(), "Expected single manifest file"); + String fileName = manifests.get(0).getFileName().toString(); + assertTrue( + V2_MANIFEST_PATTERN.matcher(fileName).matches(), + "Manifest should use V2 naming scheme"); + } + } + } +} diff --git a/python/Cargo.lock b/python/Cargo.lock index 7b73a36aa6a..555308d942e 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4557,6 +4557,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "smallvec", "snafu", "tantivy", "tempfile", diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 59b6b40cc61..1e690e67115 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3298,7 +3298,7 @@ def commit( These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the - :meth:`migrate_manifest_paths_v2` method. Default is False. WARNING: + :meth:`migrate_manifest_paths_v2` method. Default is True. WARNING: turning this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0). detached : bool, optional @@ -5480,7 +5480,7 @@ def write_dataset( Literal["stable", "2.0", "2.1", "2.2", "next", "legacy", "0.1"] ] = None, use_legacy_format: Optional[bool] = None, - enable_v2_manifest_paths: bool = False, + enable_v2_manifest_paths: bool = True, enable_stable_row_ids: bool = False, auto_cleanup_options: Optional[AutoCleanupConfig] = None, commit_message: Optional[str] = None, @@ -5544,7 +5544,7 @@ def write_dataset( These paths provide more efficient opening of datasets with many versions on object stores. This parameter has no effect if the dataset already exists. To migrate an existing dataset, instead use the - :meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is False. + :meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is True. enable_stable_row_ids : bool, optional Experimental parameter: if set to true, the writer will use stable row ids. These row ids are stable after compaction operations, but not after updates. diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index f319dec8796..0c66f3e143a 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -410,6 +410,13 @@ def test_v2_manifest_paths(tmp_path: Path): assert re.match(r"\d{20}\.manifest", manifest_path[0]) +def test_default_v2_manifest_paths(tmp_path: Path): + lance.write_dataset(pa.table({"a": range(100)}), tmp_path) + manifest_path = os.listdir(tmp_path / "_versions") + assert len(manifest_path) == 1 + assert re.match(r"\d{20}\.manifest", manifest_path[0]) + + def test_v2_manifest_paths_migration(tmp_path: Path): # Create a dataset with v1 manifest paths lance.write_dataset( @@ -4024,7 +4031,7 @@ def test_default_storage_version(tmp_path: Path): def test_no_detached_v1(tmp_path: Path): table = pa.table({"x": [0]}) - dataset = lance.write_dataset(table, tmp_path) + dataset = lance.write_dataset(table, tmp_path, enable_v2_manifest_paths=False) # Make a detached append table = pa.table({"x": [1]}) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 715b9fa5e3b..5adf9c80a72 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2234,7 +2234,7 @@ impl Dataset { .transpose()?; let mut builder = CommitBuilder::new(dest.as_dest()) - .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false)) + .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(true)) .with_detached(detached.unwrap_or(false)) .with_max_retries(max_retries.unwrap_or(20)); @@ -2296,7 +2296,7 @@ impl Dataset { .transpose()?; let mut builder = CommitBuilder::new(dest.as_dest()) - .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false)) + .enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(true)) .with_detached(detached.unwrap_or(false)) .with_max_retries(max_retries.unwrap_or(20)); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7d1f4dc8395..36d235b1c2e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -84,7 +84,7 @@ pub mod transaction; pub mod udtf; pub mod updater; mod utils; -mod write; +pub mod write; use self::builder::DatasetBuilder; use self::cleanup::RemovalStats; @@ -2123,11 +2123,16 @@ impl Dataset { /// # use lance_table::io::commit::ManifestNamingScheme; /// # use lance_datagen::{array, RowCount, BatchCount}; /// # use arrow_array::types::Int32Type; + /// # use lance::dataset::write::WriteParams; /// # let data = lance_datagen::gen_batch() /// # .col("key", array::step::()) /// # .into_reader_rows(RowCount::from(10), BatchCount::from(1)); /// # let fut = async { - /// let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap(); + /// # let params = WriteParams { + /// # enable_v2_manifest_paths: false, + /// # ..Default::default() + /// # }; + /// let mut dataset = Dataset::write(data, "memory://test", Some(params)).await.unwrap(); /// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V1); /// /// dataset.migrate_manifest_paths_v2().await.unwrap(); diff --git a/rust/lance/src/dataset/tests/dataset_transactions.rs b/rust/lance/src/dataset/tests/dataset_transactions.rs index 5e651a3df8c..cf135a9518e 100644 --- a/rust/lance/src/dataset/tests/dataset_transactions.rs +++ b/rust/lance/src/dataset/tests/dataset_transactions.rs @@ -214,7 +214,16 @@ async fn test_migrate_v2_manifest_paths() { let data = lance_datagen::gen_batch() .col("key", array::step::()) .into_reader_rows(RowCount::from(10), BatchCount::from(1)); - let mut dataset = Dataset::write(data, &test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + data, + &test_uri, + Some(WriteParams { + enable_v2_manifest_paths: false, + ..Default::default() + }), + ) + .await + .unwrap(); assert_eq!( dataset.manifest_location().naming_scheme, ManifestNamingScheme::V1 diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 585596e513e..df7d6946573 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -210,7 +210,7 @@ pub struct WriteParams { /// These allow constant-time lookups for the latest manifest on object storage. /// This parameter has no effect on existing datasets. To migrate an existing /// dataset, use the [`super::Dataset::migrate_manifest_paths_v2`] method. - /// Default is False. + /// Default is True. pub enable_v2_manifest_paths: bool, pub session: Option>, @@ -268,7 +268,7 @@ impl Default for WriteParams { commit_handler: None, data_storage_version: None, enable_stable_row_ids: false, - enable_v2_manifest_paths: false, + enable_v2_manifest_paths: true, session: None, auto_cleanup: Some(AutoCleanupParams::default()), skip_auto_cleanup: false, diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 5070ee5e65f..904a291cc32 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -55,7 +55,7 @@ impl<'a> CommitBuilder<'a> { Self { dest: dest.into(), use_stable_row_ids: None, - enable_v2_manifest_paths: false, + enable_v2_manifest_paths: true, storage_format: None, commit_handler: None, store_params: None, @@ -128,7 +128,7 @@ impl<'a> CommitBuilder<'a> { /// If set to true, and this is a new dataset, uses the new v2 manifest /// paths. These allow constant-time lookups for the latest manifest on object storage. /// This parameter has no effect on existing datasets. To migrate an existing - /// dataset, use the [`Dataset::migrate_manifest_paths_v2`] method. **Default is False.** + /// dataset, use the [`Dataset::migrate_manifest_paths_v2`] method. **Default is True.** /// ///

/// WARNING: turning this on will make the dataset unreadable for older diff --git a/rust/lance/src/io/commit/dynamodb.rs b/rust/lance/src/io/commit/dynamodb.rs index a881c19b369..81275da6d0d 100644 --- a/rust/lance/src/io/commit/dynamodb.rs +++ b/rust/lance/src/io/commit/dynamodb.rs @@ -298,13 +298,14 @@ mod test { let dir = TempStrDir::default(); let ds_uri = &dir; - let mut ds = Dataset::write( - data_gen.batch(10), - ds_uri, - Some(write_params(handler.clone())), - ) - .await - .unwrap(); + let params = WriteParams { + commit_handler: Some(handler.clone()), + enable_v2_manifest_paths: false, + ..Default::default() + }; + let mut ds = Dataset::write(data_gen.batch(10), ds_uri, Some(params)) + .await + .unwrap(); for _ in 0..5 { let data = data_gen.batch(10); diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index 7adc3dc0939..9e8124d1480 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -298,13 +298,14 @@ mod test { let dir = TempStrDir::default(); let ds_uri = &dir; - let mut ds = Dataset::write( - data_gen.batch(10), - ds_uri, - Some(write_params(handler.clone())), - ) - .await - .unwrap(); + let params = WriteParams { + commit_handler: Some(handler.clone()), + enable_v2_manifest_paths: false, + ..Default::default() + }; + let mut ds = Dataset::write(data_gen.batch(10), ds_uri, Some(params)) + .await + .unwrap(); for _ in 0..5 { let data = data_gen.batch(10);