Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 56 additions & 51 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,20 +730,20 @@ fn inner_release_native_dataset(env: &mut JNIEnv, obj: JObject) -> Result<()> {
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeCreateIndex(
mut env: JNIEnv,
java_dataset: JObject,
columns_jobj: JObject, // List<String>
pub extern "system" fn Java_org_lance_Dataset_nativeCreateIndex<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject<'local>,
columns_jobj: JObject<'local>, // List<String>
index_type_code_jobj: jint,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject, // List<Integer>
index_uuid_jobj: JObject, // String
arrow_stream_addr_jobj: JObject, // Optional<Long>
) {
ok_or_throw_without_return!(
name_jobj: JObject<'local>, // Optional<String>
params_jobj: JObject<'local>, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject<'local>, // List<Integer>
index_uuid_jobj: JObject<'local>, // String
arrow_stream_addr_jobj: JObject<'local>, // Optional<Long>
) -> JObject<'local> {
ok_or_throw!(
env,
inner_create_index(
&mut env,
Expand All @@ -758,23 +758,23 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCreateIndex(
index_uuid_jobj,
arrow_stream_addr_jobj,
)
);
)
}

#[allow(clippy::too_many_arguments)]
fn inner_create_index(
env: &mut JNIEnv,
java_dataset: JObject,
columns_jobj: JObject, // List<String>
fn inner_create_index<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject<'local>,
columns_jobj: JObject<'local>, // List<String>
index_type_code_jobj: jint,
name_jobj: JObject, // Optional<String>
params_jobj: JObject, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject, // Optional<List<String>>
index_uuid_jobj: JObject, // Optional<String>
arrow_stream_addr_jobj: JObject, // Optional<Long>
) -> Result<()> {
name_jobj: JObject<'local>, // Optional<String>
params_jobj: JObject<'local>, // IndexParams
replace_jobj: jboolean, // replace
train_jobj: jboolean, // train
fragments_jobj: JObject<'local>, // Optional<List<String>>
index_uuid_jobj: JObject<'local>, // Optional<String>
arrow_stream_addr_jobj: JObject<'local>, // Optional<Long>
) -> Result<JObject<'local>> {
let columns = env.get_strings(&columns_jobj)?;
let index_type = IndexType::try_from(index_type_code_jobj)?;
let name = env.get_string_opt(&name_jobj)?;
Expand Down Expand Up @@ -837,38 +837,43 @@ fn inner_create_index(
};

let params = params_result?;
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;

let mut index_builder = dataset_guard
.inner
.create_index_builder(&columns_slice, index_type, params.as_ref())
.replace(replace)
.train(train);
// Execute index creation in a block to ensure dataset_guard is dropped
// before we call into_java (which needs to borrow env again)
let index_metadata = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;

let mut index_builder = dataset_guard
.inner
.create_index_builder(&columns_slice, index_type, params.as_ref())
.replace(replace)
.train(train);

if let Some(name) = name {
index_builder = index_builder.name(name);
}
if let Some(name) = name {
index_builder = index_builder.name(name);
}

if let Some(fragment_ids) = fragment_ids {
index_builder = index_builder.fragments(fragment_ids);
}
if let Some(fragment_ids) = fragment_ids {
index_builder = index_builder.fragments(fragment_ids);
}

if let Some(index_uuid) = index_uuid {
index_builder = index_builder.index_uuid(index_uuid);
}
if let Some(index_uuid) = index_uuid {
index_builder = index_builder.index_uuid(index_uuid);
}

if let Some(reader) = batch_reader {
index_builder = index_builder.preprocessed_data(Box::new(reader));
}
if let Some(reader) = batch_reader {
index_builder = index_builder.preprocessed_data(Box::new(reader));
}

if skip_commit {
RT.block_on(index_builder.execute_uncommitted())?;
} else {
RT.block_on(index_builder.into_future())?
}
if skip_commit {
RT.block_on(index_builder.execute_uncommitted())?
} else {
RT.block_on(index_builder.into_future())?
}
};

Ok(())
(&index_metadata).into_java(env)
}

fn should_skip_commit(index_type: IndexType, params_opt: &Option<String>) -> Result<bool> {
Expand Down
12 changes: 7 additions & 5 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,16 +765,17 @@ public void restore() {
* @param name the name of the created index
* @param params index params
* @param replace whether to replace the existing index
* @return the metadata of the created index
* @deprecated please use {@link Dataset#createIndex(IndexOptions)} instead.
*/
@Deprecated
public void createIndex(
public Index createIndex(
List<String> columns,
IndexType indexType,
Optional<String> name,
IndexParams params,
boolean replace) {
createIndex(
return createIndex(
IndexOptions.builder(columns, indexType, params)
.replace(replace)
.withIndexName(name.orElse(null))
Expand All @@ -785,11 +786,12 @@ public void createIndex(
* Creates a new index on the dataset.
*
* @param options options for building index
* @return the metadata of the created index
*/
public void createIndex(IndexOptions options) {
public Index createIndex(IndexOptions options) {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
nativeCreateIndex(
return nativeCreateIndex(
options.getColumns(),
options.getIndexType().ordinal(),
options.getIndexName(),
Expand All @@ -802,7 +804,7 @@ public void createIndex(IndexOptions options) {
}
}

private native void nativeCreateIndex(
private native Index nativeCreateIndex(
List<String> columns,
int indexTypeCode,
Optional<String> name,
Expand Down
36 changes: 24 additions & 12 deletions java/src/test/java/org/lance/ScalarIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ScalarIndexTest {
Expand All @@ -79,12 +80,18 @@ public void testCreateBTreeIndex() throws Exception {
IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build();

// Create BTree index on 'id' column
dataset.createIndex(
Collections.singletonList("id"),
IndexType.BTREE,
Optional.of("btree_id_index"),
indexParams,
true);
Index index =
dataset.createIndex(
Collections.singletonList("id"),
IndexType.BTREE,
Optional.of("btree_id_index"),
indexParams,
true);

// Verify the returned Index object
assertEquals("btree_id_index", index.name());
assertNotNull(index.uuid());
assertFalse(index.fields().isEmpty());

// Verify index was created and is in the list
assertTrue(
Expand Down Expand Up @@ -343,12 +350,17 @@ public void testCreateZonemapIndex() throws Exception {
IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build();

// Create Zonemap index on 'value' column
dataset.createIndex(
Collections.singletonList("value"),
IndexType.ZONEMAP,
Optional.of("zonemap_value_index"),
indexParams,
true);
Index index =
dataset.createIndex(
Collections.singletonList("value"),
IndexType.ZONEMAP,
Optional.of("zonemap_value_index"),
indexParams,
true);

// Verify the returned Index object
assertEquals("zonemap_value_index", index.name());
assertNotNull(index.uuid());

// Verify index was created
assertTrue(
Expand Down
20 changes: 20 additions & 0 deletions python/python/tests/test_column_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def test_scalar_index_with_mixed_case(self, mixed_case_dataset):
indices = mixed_case_dataset.list_indices()
assert len(indices) == 1
assert indices[0]["fields"] == ["userId"]
assert indices[0]["name"] == "userId_idx"

# Query using the indexed column
result = mixed_case_dataset.to_table(filter="userId = 50")
Expand All @@ -96,6 +97,9 @@ def test_scalar_index_with_mixed_case(self, mixed_case_dataset):
plan = mixed_case_dataset.scanner(filter="userId = 50").explain_plan()
assert "ScalarIndexQuery" in plan

stats = mixed_case_dataset.stats.index_stats("userId_idx")
assert stats["index_type"] == "BTree"

def test_alter_column_with_mixed_case(self, mixed_case_dataset):
"""Altering columns works with mixed-case column names."""
# alter_columns uses direct schema lookup, not SQL parsing
Expand Down Expand Up @@ -347,6 +351,7 @@ def test_scalar_index_with_special_chars(self, special_char_dataset):
assert len(indices) == 1
# Field with special chars is returned in quoted format for SQL compatibility
assert indices[0]["fields"] == ["`user-id`"]
assert indices[0]["name"] == "user-id_idx"

# Query using the indexed column (requires backticks in filter)
result = special_char_dataset.to_table(filter="`user-id` = 50")
Expand All @@ -356,6 +361,9 @@ def test_scalar_index_with_special_chars(self, special_char_dataset):
plan = special_char_dataset.scanner(filter="`user-id` = 50").explain_plan()
assert "ScalarIndexQuery" in plan

stats = special_char_dataset.stats.index_stats("user-id_idx")
assert stats["index_type"] == "BTree"

def test_alter_column_with_special_chars(self, special_char_dataset):
"""Altering columns works with special character column names."""
# alter_columns uses direct schema lookup
Expand Down Expand Up @@ -455,6 +463,7 @@ def test_scalar_index_with_nested_mixed_case(self, nested_mixed_case_dataset):
indices = nested_mixed_case_dataset.list_indices()
assert len(indices) == 1
assert indices[0]["fields"] == ["MetaData.userId"]
assert indices[0]["name"] == "MetaData.userId_idx"

# Query using the indexed column
result = nested_mixed_case_dataset.to_table(filter="MetaData.userId = 50")
Expand All @@ -466,20 +475,27 @@ def test_scalar_index_with_nested_mixed_case(self, nested_mixed_case_dataset):
).explain_plan()
assert "ScalarIndexQuery" in plan

stats = nested_mixed_case_dataset.stats.index_stats("MetaData.userId_idx")
assert stats["index_type"] == "BTree"

def test_scalar_index_on_top_level_mixed_case(self, nested_mixed_case_dataset):
"""Scalar index on top-level mixed-case column works."""
nested_mixed_case_dataset.create_scalar_index("rowId", index_type="BTREE")

indices = nested_mixed_case_dataset.list_indices()
assert len(indices) == 1
assert indices[0]["fields"] == ["rowId"]
assert indices[0]["name"] == "rowId_idx"

result = nested_mixed_case_dataset.to_table(filter="rowId = 50")
assert result.num_rows == 1

plan = nested_mixed_case_dataset.scanner(filter="rowId = 50").explain_plan()
assert "ScalarIndexQuery" in plan

stats = nested_mixed_case_dataset.stats.index_stats("rowId_idx")
assert stats["index_type"] == "BTree"

def test_scalar_index_with_lowercased_nested_path(self, nested_mixed_case_dataset):
"""Scalar index creation should work even when path is lowercased.

Expand Down Expand Up @@ -562,6 +578,7 @@ def test_scalar_index_with_nested_special_chars(self, nested_special_char_datase
assert len(indices) == 1
# Fields with special chars are returned in quoted format for SQL compatibility
assert indices[0]["fields"] == ["`meta-data`.`user-id`"]
assert indices[0]["name"] == "meta-data.user-id_idx"

# Query using the indexed column (backticks required in filter)
result = nested_special_char_dataset.to_table(
Expand All @@ -575,6 +592,9 @@ def test_scalar_index_with_nested_special_chars(self, nested_special_char_datase
).explain_plan()
assert "ScalarIndexQuery" in plan

stats = nested_special_char_dataset.stats.index_stats("meta-data.user-id_idx")
assert stats["index_type"] == "BTree"

def test_scalar_index_on_top_level_special_chars(self, nested_special_char_dataset):
"""Scalar index on top-level special char column works."""
nested_special_char_dataset.create_scalar_index("`row-id`", index_type="BTREE")
Expand Down
16 changes: 9 additions & 7 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use lance_index::{
};
use lance_io::object_store::ObjectStoreParams;
use lance_linalg::distance::MetricType;
use lance_table::format::{BasePath, Fragment};
use lance_table::format::{BasePath, Fragment, IndexMetadata};
use lance_table::io::commit::CommitHandler;

use crate::error::PythonErrorExt;
Expand Down Expand Up @@ -1809,7 +1809,7 @@ impl Dataset {
train: Option<bool>,
storage_options: Option<HashMap<String, String>>,
kwargs: Option<&Bound<PyDict>>,
) -> PyResult<()> {
) -> PyResult<PyLance<IndexMetadata>> {
let columns: Vec<&str> = columns.iter().map(|s| &**s).collect();
let index_type = index_type.to_uppercase();
let idx_type = match index_type.as_str() {
Expand Down Expand Up @@ -1979,19 +1979,21 @@ impl Dataset {
use std::future::IntoFuture;

// Use execute_uncommitted if fragment_ids is provided, otherwise use execute
if has_fragment_ids {
let index_metadata = if has_fragment_ids {
// For fragment-level indexing, use execute_uncommitted
let _index_metadata = rt()
let index_metadata = rt()
.block_on(None, builder.execute_uncommitted())?
.infer_error()?;
// Note: We don't update self.ds here as the index is not committed
index_metadata
} else {
// For regular indexing, use the standard execute path
rt().block_on(None, builder.into_future())?.infer_error()?;
let index_metadata = rt().block_on(None, builder.into_future())?.infer_error()?;
self.ds = Arc::new(new_self);
}
index_metadata
};

Ok(())
Ok(PyLance(index_metadata))
}

fn drop_index(&mut self, name: &str) -> PyResult<()> {
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ pub trait DatasetIndexExt {
/// if not provided, it will auto-generate one.
/// - `params`: index parameters.
/// - `replace`: replace the existing index if it exists.
///
/// Returns the metadata of the created index.
async fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<()>;
) -> Result<IndexMetadata>;

/// Drop indices by name.
///
Expand Down
Loading
Loading