Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use lance::io::{ObjectStore, ObjectStoreParams};
use lance::table::format::IndexMetadata;
use lance::table::format::{BasePath, Fragment};
use lance_core::datatypes::Schema as LanceSchema;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::btree::BTreeParameters;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::DatasetIndexExt;
Expand Down Expand Up @@ -977,6 +978,54 @@ fn inner_merge_index_metadata(
})
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices(
mut env: JNIEnv,
java_dataset: JObject,
options_obj: JObject, // OptimizeOptions
) {
ok_or_throw_without_return!(
env,
inner_optimize_indices(&mut env, java_dataset, options_obj)
);
}

fn inner_optimize_indices(
env: &mut JNIEnv,
java_dataset: JObject,
java_options: JObject, // OptimizeOptions
) -> Result<()> {
let mut options = OptimizeOptions::default();

if !java_options.is_null() {
options.num_indices_to_merge =
env.get_optional_usize_from_method(&java_options, "getNumIndicesToMerge")?;

// getIndexNames(): Optional<List<String>>
let index_names_obj = env
.call_method(
&java_options,
"getIndexNames",
"()Ljava/util/Optional;",
&[],
)?
.l()?;
let index_names = env.get_strings_opt(&index_names_obj)?;
options.index_names = index_names;

// isRetrain(): boolean
let retrain = env
.call_method(&java_options, "isRetrain", "()Z", &[])?
.z()?;
options.retrain = retrain;
}

let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(dataset_guard.inner.optimize_indices(&options))?;
Ok(())
}

//////////////////
// Read Methods //
//////////////////
Expand Down
16 changes: 16 additions & 0 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.lance.index.IndexOptions;
import org.lance.index.IndexParams;
import org.lance.index.IndexType;
import org.lance.index.OptimizeOptions;
import org.lance.io.StorageOptionsProvider;
import org.lance.ipc.DataStatistics;
import org.lance.ipc.LanceScanner;
Expand Down Expand Up @@ -976,6 +977,21 @@ public Optional<Transaction> readTransaction() {

private native Transaction nativeReadTransaction();

/**
* Optimize index metadata and segments for this dataset.
*
* @param options options controlling index optimization behavior
*/
public void optimizeIndices(OptimizeOptions options) {
Preconditions.checkNotNull(options);
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeOptimizeIndices(options);
}
}

private native void nativeOptimizeIndices(OptimizeOptions options);

/**
* @return all the created indexes names
*/
Expand Down
105 changes: 105 additions & 0 deletions java/src/main/java/org/lance/index/OptimizeOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.index;

import java.util.List;
import java.util.Optional;

/**
* Options for optimizing indices on a dataset.
*
* <p>This mirrors the behavior of {@code lance_index::optimize::OptimizeOptions} in Rust.
*
* <p>All fields are optional on the Java side except {@code retrain}. Defaults are delegated to the
* Rust implementation.
*/
public class OptimizeOptions {

private final Optional<Integer> numIndicesToMerge;
private final Optional<List<String>> indexNames;
private final boolean retrain;

private OptimizeOptions(
Optional<Integer> numIndicesToMerge, Optional<List<String>> indexNames, boolean retrain) {
this.numIndicesToMerge = numIndicesToMerge;
this.indexNames = indexNames;
this.retrain = retrain;
}

/** Number of indices to merge per index name. */
public Optional<Integer> getNumIndicesToMerge() {
return numIndicesToMerge;
}

/**
* Names of indices to optimize. If empty, all user indices will be considered (system indices are
* always excluded).
*/
public Optional<List<String>> getIndexNames() {
return indexNames;
}

/** Whether to retrain the index instead of performing an incremental merge. */
public boolean isRetrain() {
return retrain;
}

/** Create a new builder for {@link OptimizeOptions}. */
public static Builder builder() {
return new Builder();
}

/** Builder for {@link OptimizeOptions}. */
public static class Builder {
private Optional<Integer> numIndicesToMerge = Optional.empty();
private Optional<List<String>> indexNames = Optional.empty();
private boolean retrain = false;

private Builder() {}

/**
* Set the number of indices to merge.
*
* @param numIndicesToMerge number of indices to merge per index name
*/
public Builder numIndicesToMerge(int numIndicesToMerge) {
this.numIndicesToMerge = Optional.of(numIndicesToMerge);
return this;
}

/**
* Restrict optimization to a subset of index names.
*
* @param indexNames index names to optimize
*/
public Builder indexNames(List<String> indexNames) {
this.indexNames = Optional.ofNullable(indexNames);
return this;
}

/**
* Whether to retrain the index.
*
* @param retrain if true, retrain instead of incremental merge
*/
public Builder retrain(boolean retrain) {
this.retrain = retrain;
return this;
}

public OptimizeOptions build() {
return new OptimizeOptions(numIndicesToMerge, indexNames, retrain);
}
}
}
62 changes: 62 additions & 0 deletions java/src/test/java/org/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
package org.lance;

import org.lance.compaction.CompactionOptions;
import org.lance.index.Index;
import org.lance.index.IndexParams;
import org.lance.index.IndexType;
import org.lance.index.OptimizeOptions;
import org.lance.index.scalar.ScalarIndexParams;
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
import org.lance.operation.Append;
Expand Down Expand Up @@ -1705,6 +1710,63 @@ void testBranches(@TempDir Path tempDir) {
}
}

@Test
void testOptimizingIndices(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("optimize_scalar").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);

// version 1, empty dataset
try (Dataset ignored = testDataset.createEmptyDataset()) {
// write first fragment at version 1 -> dataset version 2
try (Dataset dsWithData = testDataset.write(1, 10)) {
ScalarIndexParams scalarParams =
ScalarIndexParams.create("btree", "{\"zone_size\": 2048}");
IndexParams indexParams =
IndexParams.builder().setScalarIndexParams(scalarParams).build();

dsWithData.createIndex(
Collections.singletonList("id"),
IndexType.BTREE,
Optional.of("id_idx"),
indexParams,
true);

List<Index> beforeIndexes = dsWithData.getIndexes();
Index idIndexBefore =
beforeIndexes.stream()
.filter(idx -> "id_idx".equals(idx.name()))
.findFirst()
.orElse(null);
assertNotNull(idIndexBefore);
List<Integer> beforeFragments = idIndexBefore.fragments().orElse(Collections.emptyList());
assertTrue(beforeFragments.contains(0));
assertEquals(1, beforeFragments.size());
}

// append new fragment using readVersion 2 -> dataset version 3
try (Dataset dsAppended = testDataset.write(2, 10)) {
OptimizeOptions options = OptimizeOptions.builder().numIndicesToMerge(0).build();
dsAppended.optimizeIndices(options);

List<Index> afterIndexes = dsAppended.getIndexes();
Index idIndexAfter =
afterIndexes.stream()
.filter(idx -> "id_idx".equals(idx.name()))
.findFirst()
.orElse(null);
assertNotNull(idIndexAfter);
List<Integer> afterFragments = idIndexAfter.fragments().orElse(Collections.emptyList());

assertTrue(afterFragments.contains(0));
assertTrue(afterFragments.contains(1));
assertEquals(2, afterFragments.size());
}
}
}
}

// ===== Blob API tests =====
@Test
void testReadZeroLengthBlob(@TempDir Path tempDir) throws Exception {
Expand Down