Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
9fe3965
feat: distribute IVF assignment phase
chenghao-guo Nov 20, 2025
dfe9726
fix: enforce global IVF/PQ training reuse in storage
chenghao-guo Dec 2, 2025
35a2e29
tests(vector): remove fallback; recall-only consistency across IVF/HN…
chenghao-guo Dec 4, 2025
bb08bc7
refactor: remove useless methods in index_merger
yanghua Dec 4, 2025
cb619c7
refactor: remove useless methods in index_merger
yanghua Dec 8, 2025
070faf2
refactor: remove useless methods in index_merger
yanghua Dec 8, 2025
e272924
refactor: remove useless codes
yanghua Dec 8, 2025
27f25ac
fix: fix incorrect validation and fix style
chenghao-guo Dec 9, 2025
4d477fa
fix test issue
yanghua Dec 10, 2025
9fd4a7a
fix clippy issue
yanghua Dec 11, 2025
82b9bf7
add test for index merger
yanghua Dec 11, 2025
d72bda7
add python e2e test
yanghua Dec 12, 2025
d2f86b9
add python e2e test
yanghua Dec 12, 2025
0a6818c
add python e2e test
yanghua Dec 12, 2025
9444ac8
add python e2e test
yanghua Dec 12, 2025
8ec651a
add python e2e test: test_distributed_pq_order_invariance
yanghua Dec 12, 2025
8f9f21b
add py test test_distributed_ivf_pq_order_invariance
yanghua Dec 15, 2025
106b202
try to refactor build_distributed_vector_index
yanghua Dec 15, 2025
9cba890
refactor code
yanghua Dec 15, 2025
e8a8fe4
refactor code: remove useless code
yanghua Dec 15, 2025
e3e5c1d
refactor: comments of merge_index_metadata
yanghua Dec 16, 2025
e68b0eb
refactor: remove duplicated code for create_index method
yanghua Dec 16, 2025
2f1b8b0
refactor: remove useless variable
yanghua Dec 16, 2025
28590e8
refactor: test_vector_index.py
yanghua Dec 16, 2025
389edc9
add test: test_empty_hnsw_fallback_matches_flat_search
yanghua Dec 16, 2025
a69bc69
add test: test_find_partitions_fallback_centroids_none
yanghua Dec 16, 2025
8a29652
add test for ivf_sq, IVF_HNSW_SQ
yanghua Dec 16, 2025
543515c
add more tests
yanghua Dec 16, 2025
feefdb2
refactor import and use statement
yanghua Dec 16, 2025
57f8d60
add test : test_merge_ivf_pq_success
yanghua Dec 16, 2025
28a12c5
add more tests
yanghua Dec 16, 2025
4b95bc0
refactor builder and merger
yanghua Dec 18, 2025
7edbc97
refactor: make prepare_global_ivfpq arg optional
yanghua Dec 19, 2025
28a5e03
refactor merge_partial_vector_auxiliary_files method name
yanghua Dec 19, 2025
dd80274
refactor: introduce finalize_distributed_merge in rust
yanghua Dec 19, 2025
ab600a1
fix review suggestions
yanghua Dec 20, 2025
409844b
fix review suggestions
yanghua Dec 20, 2025
6e7c3ed
revert test case
yanghua Dec 22, 2025
5b1cc88
fix review suggestions
yanghua Dec 22, 2025
7d26e4b
reduce and remove some duplicated test cases
yanghua Dec 22, 2025
05df9a5
reduce and remove some duplicated test cases: test_distributed_ivf_sq…
yanghua Dec 22, 2025
c7a3485
fix code style issue
yanghua Dec 22, 2025
b82d0d2
reduce and remove generate centroids and pq_code_book
yanghua Dec 22, 2025
904f8af
fix test issue
yanghua Dec 22, 2025
d991394
fix test issue and removed some validation logic
yanghua Dec 23, 2025
902e73e
fix test issue
yanghua Dec 23, 2025
c710175
refactor code
yanghua Dec 23, 2025
8362d62
refactor code
yanghua Dec 23, 2025
75962ad
fix test issue
yanghua Dec 24, 2025
18116d4
revert code
yanghua Dec 24, 2025
139f04f
remove useless code
yanghua Dec 24, 2025
900030d
remove useless code
yanghua Dec 24, 2025
be5be1a
refactor partial dir naming pattern
yanghua Dec 24, 2025
e50f30b
try to fix merge order stable
yanghua Dec 26, 2025
e1c3368
try to fix merge order stable
yanghua Dec 26, 2025
7eccebc
refactor code
yanghua Dec 26, 2025
a842a50
fix refactor code
yanghua Dec 26, 2025
661db53
remove hnsw related indices SupportedDistributedIndices
yanghua Dec 27, 2025
72a565f
remove get_partial_pq_codebooks
yanghua Dec 27, 2025
e4cc58e
remove get_pq_codebook
yanghua Dec 27, 2025
0ee81f0
recover reader.rs
yanghua Dec 27, 2025
00b9dbc
minor refactor
yanghua Dec 27, 2025
6504c4b
recover storage.rs
yanghua Dec 27, 2025
9dd8202
recover ivf/v2.rs
yanghua Dec 28, 2025
4d3eb49
recover index.rs
yanghua Dec 29, 2025
cecd5bb
recover index.rs and test_vector_index.py
yanghua Dec 29, 2025
cdd6362
refactor test code
yanghua Dec 29, 2025
931a395
refactor test code and distributed merger
yanghua Dec 29, 2025
f6165be
refactor merger and builder
yanghua Dec 29, 2025
af8249b
refactor vector.rs
yanghua Dec 29, 2025
13dc144
address review comments
yanghua Jan 4, 2026
996e1f8
address review comments
yanghua Jan 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 86 additions & 21 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@
from .blob import BlobFile
from .dependencies import (
_check_for_numpy,
_check_for_torch,
torch,
)
from .dependencies import numpy as np
from .dependencies import pandas as pd
from .fragment import DataFile, FragmentMetadata, LanceFragment
from .indices import IndexConfig
from .indices import IndexConfig, SupportedDistributedIndices
from .lance import (
CleanupStats,
Compaction,
Expand Down Expand Up @@ -2637,6 +2638,9 @@ def create_index(
storage_options: Optional[Dict[str, str]] = None,
filter_nan: bool = True,
train: bool = True,
# distributed indexing parameters
fragment_ids: Optional[List[int]] = None,
index_uuid: Optional[str] = None,
*,
target_partition_size: Optional[int] = None,
**kwargs,
Expand Down Expand Up @@ -2708,6 +2712,16 @@ def create_index(
If True, the index will be trained on the data (e.g., compute IVF
centroids, PQ codebooks). If False, an empty index structure will be
created without training, which can be populated later.
fragment_ids : List[int], optional
If provided, the index will be created only on the specified fragments.
This enables distributed/fragment-level indexing. When provided, the
method creates temporary index metadata but does not commit the index
to the dataset. The index can be committed later using
merge_index_metadata(index_uuid, "VECTOR", column=..., index_name=...).
index_uuid : str, optional
A UUID to use for fragment-level distributed indexing. Multiple
fragment-level indices need to share UUID for later merging.
If not provided, a new UUID will be generated.
target_partition_size: int, optional
The target partition size. If set, the number of partitions will be computed
based on the target partition size.
Expand Down Expand Up @@ -2886,6 +2900,39 @@ def create_index(
)
accelerator = None

# IMPORTANT: Distributed indexing is CPU-only. Enforce single-node when
# accelerator or torch-related paths are detected.
torch_detected = False
try:
if accelerator is not None:
torch_detected = True
else:
impl = kwargs.get("implementation")
use_torch_flag = kwargs.get("use_torch") is True
one_pass_flag = kwargs.get("one_pass_ivfpq") is True
torch_centroids = _check_for_torch(ivf_centroids)
torch_codebook = _check_for_torch(pq_codebook)
if (
(isinstance(impl, str) and impl.lower() == "torch")
or use_torch_flag
or one_pass_flag
or torch_centroids
or torch_codebook
):
torch_detected = True
except Exception:
# Be conservative: if detection fails, do not modify behavior
pass

if torch_detected:
if fragment_ids is not None or index_uuid is not None:
LOGGER.info(
"Torch detected; "
"enforce single-node indexing (distributed is CPU-only)."
)
fragment_ids = None
index_uuid = None

if accelerator is not None:
from .vector import (
one_pass_assign_ivf_pq_on_accelerator,
Expand Down Expand Up @@ -3021,11 +3068,9 @@ def create_index(
dim = ivf_centroids.shape[1]
values = pa.array(ivf_centroids.reshape(-1))
ivf_centroids = pa.FixedSizeListArray.from_arrays(values, dim)
# Convert it to RecordBatch because Rust side only accepts RecordBatch.
ivf_centroids_batch = pa.RecordBatch.from_arrays(
kwargs["ivf_centroids"] = pa.RecordBatch.from_arrays(
[ivf_centroids], ["_ivf_centroids"]
)
kwargs["ivf_centroids"] = ivf_centroids_batch

if "PQ" in index_type:
if num_sub_vectors is None:
Expand All @@ -3034,8 +3079,9 @@ def create_index(
)
kwargs["num_sub_vectors"] = num_sub_vectors

# Always attach PQ codebook if provided (global training invariant)
if pq_codebook is not None:
# User provided IVF centroids
# User provided PQ codebook
if _check_for_numpy(pq_codebook) and isinstance(
pq_codebook, np.ndarray
):
Expand Down Expand Up @@ -3067,6 +3113,13 @@ def create_index(
if shuffle_partition_concurrency is not None:
kwargs["shuffle_partition_concurrency"] = shuffle_partition_concurrency

# Add fragment_ids and index_uuid to kwargs if provided for
# distributed indexing
if fragment_ids is not None:
kwargs["fragment_ids"] = fragment_ids
if index_uuid is not None:
kwargs["index_uuid"] = index_uuid

timers["final_create_index:start"] = time.time()
self._ds.create_index(
column, index_type, name, replace, train, storage_options, kwargs
Expand Down Expand Up @@ -3119,31 +3172,43 @@ def merge_index_metadata(
batch_readhead: Optional[int] = None,
):
"""
Merge an index which is not commit at present.
Merge distributed index metadata for supported scalar
and vector index types.

This method supports all index types defined in
:class:`lance.indices.SupportedDistributedIndices`,
including scalar indices and precise vector index types.

This method does NOT commit changes.

This API merges temporary index files (e.g., per-fragment partials).
After this method returns, callers MUST explicitly commit
the index manifest using lance.LanceDataset.commit(...)
with a LanceOperation.CreateIndex.

Parameters
----------
index_uuid: str
The uuid of the index which want to merge.
The shared UUID used when building fragment-level indices.
index_type: str
The type of the index.
Only "BTREE" and "INVERTED" are supported now.
Index type name. Must be one of the enum values in
:class:`lance.indices.SupportedDistributedIndices`
(for example ``"IVF_PQ"``).
batch_readhead: int, optional
The number of prefetch batches of sub-page files for merging.
Default 1.
Prefetch concurrency used by BTREE merge reader. Default: 1.
"""
index_type = index_type.upper()
if index_type not in [
"BTREE",
"INVERTED",
]:
# Normalize type
t = index_type.upper()

valid = {member.name for member in SupportedDistributedIndices}
if t not in valid:
raise NotImplementedError(
(
'Only "BTREE" or "INVERTED" are supported for '
f"merge index metadata. Received {index_type}",
)
f"Only {', '.join(sorted(valid))} are supported, received {index_type}"
)
return self._ds.merge_index_metadata(index_uuid, index_type, batch_readhead)

# Merge physical index files at the index directory
self._ds.merge_index_metadata(index_uuid, t, batch_readhead)
return None

def session(self) -> Session:
"""
Expand Down
12 changes: 12 additions & 0 deletions python/python/lance/indices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,15 @@
class IndexFileVersion(str, Enum):
LEGACY = "Legacy"
V3 = "V3"


class SupportedDistributedIndices(str, Enum):
# Scalar index types
BTREE = "BTREE"
INVERTED = "INVERTED"
# Precise vector index types supported by distributed merge
IVF_FLAT = "IVF_FLAT"
IVF_PQ = "IVF_PQ"
IVF_SQ = "IVF_SQ"
# Deprecated generic placeholder (kept for backward compatibility)
VECTOR = "VECTOR"
47 changes: 47 additions & 0 deletions python/python/lance/indices/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,53 @@ def train_pq(
)
return PqModel(num_subvectors, pq_codebook)

def prepare_global_ivf_pq(
self,
num_partitions: Optional[int],
num_subvectors: Optional[int],
*,
distance_type: str = "l2",
accelerator: Optional[Union[str, "torch.Device"]] = None,
sample_rate: int = 256,
max_iters: int = 50,
) -> dict:
"""
Perform global training for IVF+PQ using existing CPU training paths and
return preprocessed artifacts for distributed builds.

Returns
-------
dict
A dictionary with two entries:
- "ivf_centroids": pyarrow.FixedSizeListArray of centroids
- "pq_codebook": pyarrow.FixedSizeListArray of PQ codebook

Notes
-----
This method uses the existing CPU training path by delegating to
`IndicesBuilder.train_ivf` (indices.train_ivf_model) and
`IndicesBuilder.train_pq` (indices.train_pq_model). No public method
names elsewhere are changed.
"""
# Global IVF training
ivf_model = self.train_ivf(
num_partitions,
distance_type=distance_type,
accelerator=accelerator, # None by default (CPU path)
sample_rate=sample_rate,
max_iters=max_iters,
)

# Global PQ training using IVF residuals
pq_model = self.train_pq(
ivf_model,
num_subvectors,
sample_rate=sample_rate,
max_iters=max_iters,
)

return {"ivf_centroids": ivf_model.centroids, "pq_codebook": pq_model.codebook}

def assign_ivf_partitions(
self,
ivf_model: IvfModel,
Expand Down
Loading
Loading