Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl BlockingDataset {
}),
None,
Default::default(),
false, // TODO: support enable_v2_manifest_paths
false,
))?;
Ok(Self { inner })
}
Expand Down Expand Up @@ -284,11 +284,13 @@ impl BlockingDataset {
transaction: Transaction,
store_params: ObjectStoreParams,
detached: bool,
enable_v2_manifest_paths: bool,
) -> Result<Self> {
let new_dataset = RT.block_on(
CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(store_params)
.with_detached(detached)
.enable_v2_manifest_paths(enable_v2_manifest_paths)
.execute(transaction),
)?;
Ok(BlockingDataset { inner: new_dataset })
Expand Down Expand Up @@ -412,6 +414,24 @@ pub extern "system" fn Java_org_lance_Dataset_drop<'local>(
JObject::null()
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeMigrateManifestPathsV2(
mut env: JNIEnv,
java_dataset: JObject,
) {
ok_or_throw_without_return!(
env,
inner_native_migrate_manifest_paths_v2(&mut env, java_dataset)
)
}

fn inner_native_migrate_manifest_paths_v2(env: &mut JNIEnv, java_dataset: JObject) -> Result<()> {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(dataset_guard.inner.migrate_manifest_paths_v2())?;
Ok(())
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
mut env: JNIEnv<'local>,
Expand Down
10 changes: 9 additions & 1 deletion java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>(
java_dataset: JObject,
java_transaction: JObject,
detached_jbool: jboolean,
enable_v2_manifest_paths: jboolean,
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -737,6 +738,7 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>(
java_dataset,
java_transaction,
detached_jbool != 0,
enable_v2_manifest_paths != 0,
)
)
}
Expand All @@ -746,6 +748,7 @@ fn inner_commit_transaction<'local>(
java_dataset: JObject,
java_transaction: JObject,
detached: bool,
enable_v2_manifest_paths: bool,
) -> Result<JObject<'local>> {
let write_param_jobj = env
.call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])?
Expand Down Expand Up @@ -779,7 +782,12 @@ fn inner_commit_transaction<'local>(
let new_blocking_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
dataset_guard.commit_transaction(transaction, store_params, detached)?
dataset_guard.commit_transaction(
transaction,
store_params,
detached,
enable_v2_manifest_paths,
)?
};
new_blocking_ds.into_java(env)
}
Expand Down
35 changes: 31 additions & 4 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public Transaction.Builder newTransactionBuilder() {
* @return A new instance of {@link Dataset} linked to committed version.
*/
public Dataset commitTransaction(Transaction transaction) {
return commitTransaction(transaction, false);
return commitTransaction(transaction, false, true);
}

/**
Expand All @@ -464,12 +464,18 @@ public Dataset commitTransaction(Transaction transaction) {
*
* @param transaction The transaction to commit
* @param detached If true, the commit will not be part of the main dataset lineage.
* @param enableV2ManifestPaths If true, and this is a new dataset, uses the new V2 manifest
* paths. These paths provide more efficient opening of datasets with many versions on object
* stores. This parameter has no effect if the dataset already exists. To migrate an existing
* dataset, instead use the `migrateManifestPathsV2` method. Default is true. WARNING: turning
* this on will make the dataset unreadable for older versions of Lance (prior to 0.17.0).
* @return A new instance of {@link Dataset} linked to committed version.
*/
public Dataset commitTransaction(Transaction transaction, boolean detached) {
public Dataset commitTransaction(
Transaction transaction, boolean detached, boolean enableV2ManifestPaths) {
Preconditions.checkNotNull(transaction);
try {
Dataset dataset = nativeCommitTransaction(transaction, detached);
Dataset dataset = nativeCommitTransaction(transaction, detached, enableV2ManifestPaths);
if (selfManagedAllocator) {
dataset.allocator = new RootAllocator(Long.MAX_VALUE);
} else {
Expand All @@ -481,7 +487,8 @@ public Dataset commitTransaction(Transaction transaction, boolean detached) {
}
}

private native Dataset nativeCommitTransaction(Transaction transaction, boolean detached);
private native Dataset nativeCommitTransaction(
Transaction transaction, boolean detached, boolean enableV2ManifestPaths);

/**
* Drop a Dataset.
Expand All @@ -491,6 +498,26 @@ public Dataset commitTransaction(Transaction transaction, boolean detached) {
*/
public static native void drop(String path, Map<String, String> storageOptions);

/**
* Migrate the manifest paths to the new format.
*
* <p>This will update the manifest to use the new v2 format for paths.
*
* <p>This function is idempotent, and can be run multiple times without changing the state of the
* object store.
*
* <p>DANGER: this should not be run while other concurrent operations are happening. And it
* should also run until completion before resuming other operations.
*/
public void migrateManifestPathsV2() {
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeMigrateManifestPathsV2();
}
}

private native void nativeMigrateManifestPathsV2();

/**
* Add columns to the dataset.
*
Expand Down
4 changes: 2 additions & 2 deletions java/src/test/java/org/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ void testCommitTransactionDetachedTrue(@TempDir Path tempDir) {
FragmentMetadata fragment = suite.createNewFragment(5);
Append append = Append.builder().fragments(Collections.singletonList(fragment)).build();
Transaction transaction = base.newTransactionBuilder().operation(append).build();
try (Dataset committed = base.commitTransaction(transaction, true)) {
try (Dataset committed = base.commitTransaction(transaction, true, false)) {
// Original dataset is not refreshed to the new version.
assertEquals(baseVersion, base.version());
assertEquals(baseRowCount, base.countRows());
Expand Down Expand Up @@ -1169,7 +1169,7 @@ void testCommitTransactionDetachedTrueOnV1ManifestThrowsUnsupported(@TempDir Pat
UnsupportedOperationException ex =
assertThrows(
UnsupportedOperationException.class,
() -> dataset.commitTransaction(transaction, true));
() -> dataset.commitTransaction(transaction, true, false));

// Error should indicate detached commits are not supported on v1 manifests.
assertNotNull(ex.getMessage());
Expand Down
115 changes: 115 additions & 0 deletions java/src/test/java/org/lance/ManifestPathsV2Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ManifestPathsV2Test {
private static final Pattern V2_MANIFEST_PATTERN = Pattern.compile("\\d{20}\\.manifest");

@Test
void testMigrateManifestPathsFromV1ToV2(@TempDir Path tempDir) throws IOException {
String datasetPath = tempDir.resolve("testMigrateManifestPathsFromV1ToV2").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
// Create v1 test.
try (Dataset dataset = testDataset.createEmptyDataset(false)) {
Path versionsDir = Paths.get(datasetPath).resolve("_versions");
assertTrue(Files.isDirectory(versionsDir), "_versions directory should exist");
List<Path> manifestsBefore;
try (Stream<Path> stream = Files.list(versionsDir)) {
manifestsBefore =
stream
.filter(
p ->
Files.isRegularFile(p)
&& p.getFileName().toString().endsWith(".manifest"))
.collect(Collectors.toList());
}
assertEquals(1, manifestsBefore.size(), "Expected single manifest before migration");
assertEquals("1.manifest", manifestsBefore.get(0).getFileName().toString());

// Migrate to v2.
dataset.migrateManifestPathsV2();

List<Path> manifestsAfter;
try (Stream<Path> stream = Files.list(versionsDir)) {
manifestsAfter =
stream
.filter(
p ->
Files.isRegularFile(p)
&& p.getFileName().toString().endsWith(".manifest"))
.collect(Collectors.toList());
}
assertEquals(1, manifestsAfter.size(), "Expected single manifest after migration");
String fileName = manifestsAfter.get(0).getFileName().toString();
assertTrue(
V2_MANIFEST_PATTERN.matcher(fileName).matches(),
"Manifest should use V2 naming scheme");
}
}
}

@Test
void testCreateDatasetUsesV2ManifestByDefault(@TempDir Path tempDir) throws IOException {
String datasetPath = tempDir.resolve("testCreateDatasetUsesV2ManifestByDefault").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
Schema schema =
new Schema(
Arrays.asList(
Field.nullable("id", new ArrowType.Int(32, true)),
Field.nullable("name", new ArrowType.Utf8())));
WriteParams params = new WriteParams.Builder().withMode(WriteParams.WriteMode.CREATE).build();
try (Dataset dataset = Dataset.create(allocator, datasetPath, schema, params)) {
Path versionsDir = Paths.get(datasetPath).resolve("_versions");
assertTrue(Files.isDirectory(versionsDir), "_versions directory should exist");
List<Path> manifests;
try (Stream<Path> stream = Files.list(versionsDir)) {
manifests =
stream
.filter(
p ->
Files.isRegularFile(p)
&& p.getFileName().toString().endsWith(".manifest"))
.collect(Collectors.toList());
}
assertEquals(1, manifests.size(), "Expected single manifest file");
String fileName = manifests.get(0).getFileName().toString();
assertTrue(
V2_MANIFEST_PATTERN.matcher(fileName).matches(),
"Manifest should use V2 naming scheme");
}
}
}
}
1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3298,7 +3298,7 @@ def commit(
These paths provide more efficient opening of datasets with many
versions on object stores. This parameter has no effect if the dataset
already exists. To migrate an existing dataset, instead use the
:meth:`migrate_manifest_paths_v2` method. Default is False. WARNING:
:meth:`migrate_manifest_paths_v2` method. Default is True. WARNING:
turning this on will make the dataset unreadable for older versions
of Lance (prior to 0.17.0).
detached : bool, optional
Expand Down Expand Up @@ -5480,7 +5480,7 @@ def write_dataset(
Literal["stable", "2.0", "2.1", "2.2", "next", "legacy", "0.1"]
] = None,
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
enable_v2_manifest_paths: bool = True,
enable_stable_row_ids: bool = False,
auto_cleanup_options: Optional[AutoCleanupConfig] = None,
commit_message: Optional[str] = None,
Expand Down Expand Up @@ -5544,7 +5544,7 @@ def write_dataset(
These paths provide more efficient opening of datasets with many
versions on object stores. This parameter has no effect if the dataset
already exists. To migrate an existing dataset, instead use the
:meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is False.
:meth:`LanceDataset.migrate_manifest_paths_v2` method. Default is True.
enable_stable_row_ids : bool, optional
Experimental parameter: if set to true, the writer will use stable row ids.
These row ids are stable after compaction operations, but not after updates.
Expand Down
9 changes: 8 additions & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,13 @@ def test_v2_manifest_paths(tmp_path: Path):
assert re.match(r"\d{20}\.manifest", manifest_path[0])


def test_default_v2_manifest_paths(tmp_path: Path):
lance.write_dataset(pa.table({"a": range(100)}), tmp_path)
manifest_path = os.listdir(tmp_path / "_versions")
assert len(manifest_path) == 1
assert re.match(r"\d{20}\.manifest", manifest_path[0])


def test_v2_manifest_paths_migration(tmp_path: Path):
# Create a dataset with v1 manifest paths
lance.write_dataset(
Expand Down Expand Up @@ -4024,7 +4031,7 @@ def test_default_storage_version(tmp_path: Path):

def test_no_detached_v1(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)
dataset = lance.write_dataset(table, tmp_path, enable_v2_manifest_paths=False)

# Make a detached append
table = pa.table({"x": [1]})
Expand Down
4 changes: 2 additions & 2 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,7 @@ impl Dataset {
.transpose()?;

let mut builder = CommitBuilder::new(dest.as_dest())
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false))
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(true))
.with_detached(detached.unwrap_or(false))
.with_max_retries(max_retries.unwrap_or(20));

Expand Down Expand Up @@ -2296,7 +2296,7 @@ impl Dataset {
.transpose()?;

let mut builder = CommitBuilder::new(dest.as_dest())
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(false))
.enable_v2_manifest_paths(enable_v2_manifest_paths.unwrap_or(true))
.with_detached(detached.unwrap_or(false))
.with_max_retries(max_retries.unwrap_or(20));

Expand Down
9 changes: 7 additions & 2 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub mod transaction;
pub mod udtf;
pub mod updater;
mod utils;
mod write;
pub mod write;

use self::builder::DatasetBuilder;
use self::cleanup::RemovalStats;
Expand Down Expand Up @@ -2123,11 +2123,16 @@ impl Dataset {
/// # use lance_table::io::commit::ManifestNamingScheme;
/// # use lance_datagen::{array, RowCount, BatchCount};
/// # use arrow_array::types::Int32Type;
/// # use lance::dataset::write::WriteParams;
/// # let data = lance_datagen::gen_batch()
/// # .col("key", array::step::<Int32Type>())
/// # .into_reader_rows(RowCount::from(10), BatchCount::from(1));
/// # let fut = async {
/// let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap();
/// # let params = WriteParams {
/// # enable_v2_manifest_paths: false,
/// # ..Default::default()
/// # };
/// let mut dataset = Dataset::write(data, "memory://test", Some(params)).await.unwrap();
/// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V1);
///
/// dataset.migrate_manifest_paths_v2().await.unwrap();
Expand Down
Loading
Loading