Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 108 additions & 25 deletions docs/src/distributed-indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

Lance-Ray provides distributed index building functionality that leverages Ray's distributed computing capabilities to efficiently create text indices for Lance datasets. This is particularly useful for large-scale datasets as it can distribute index building work across multiple Ray worker nodes.

## New Distributed APIs
## Distributed APIs

`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future.
### Scalar Indexing

### How It Works
`create_scalar_index()` - Distributedly create scalar index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future.

#### How It Works
The `create_scalar_index` function allows you to create full-text search indices for Lance datasets using the Ray distributed computing framework. This function distributes the index building process across multiple Ray worker nodes, with each node responsible for building indices for a subset of dataset fragments. These indices are then merged and committed as a single index.

**Backward Compatibility**:
Expand All @@ -19,7 +21,8 @@ The `create_scalar_index` function allows you to create full-text search indices

```python
def create_scalar_index(
dataset: Union[str, "lance.LanceDataset"],
uri: Optional[str] = None,
*,
column: str,
index_type: Union[
Literal["BTREE"],
Expand All @@ -31,47 +34,102 @@ def create_scalar_index(
Literal["ZONEMAP"],
IndexConfig,
],
table_id: Optional[list[str]] = None,
name: Optional[str] = None,
*,
replace: bool = True,
train: bool = True,
fragment_ids: Optional[list[int]] = None,
index_uuid: Optional[str] = None,
num_workers: int = 4,
storage_options: Optional[dict[str, str]] = None,
namespace_impl: Optional[str] = None,
namespace_properties: Optional[dict[str, str]] = None,
ray_remote_args: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> "lance.LanceDataset":

```

### Parameters
#### Parameters

| Parameter | Type | Description |
|-----------|------|-------------|
| `dataset` | `str` or `lance.LanceDataset` | Lance dataset or its URI |
| `uri` | `str`, optional | The URI of the Lance dataset. Either `uri` OR (`namespace_impl` + `table_id`) must be provided. |
| `column` | `str` | Column name to index |
| `index_type` | `str` or `IndexConfig` | Index type, can be `"INVERTED"`, `"FTS"`, `"BTREE"`, `"BITMAP"`, `"LABEL_LIST"`, `"NGRAM"`, `"ZONEMAP"`, or `IndexConfig` object |
| `table_id` | `list[str]`, optional | The table identifier as a list of strings. |
| `name` | `str`, optional | Index name, auto-generated if not provided |
| `replace` | `bool`, optional | Whether to replace existing index with the same name, default is `True` |
| `train` | `bool`, optional | Whether to train the index, default is `True` |
| `fragment_ids` | `list[int]`, optional | Optional list of fragment IDs to build index on |
| `index_uuid` | `str`, optional | Optional fragment UUID for distributed indexing |
| `num_workers` | `int`, optional | Number of Ray worker nodes to use, default is 4 |
| `storage_options` | `Dict[str, str]`, optional | Storage options for the dataset |
| `namespace_impl` | `str`, optional | The namespace implementation type (e.g., `"rest"`, `"dir"`) |
| `namespace_properties` | `Dict[str, str]`, optional | Properties for connecting to the namespace |
| `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) |
| `**kwargs` | `Any` | Additional arguments passed to `create_scalar_index` |

**Note:** For distributed indexing, currently only `"INVERTED"`,`"FTS"` and `"BTREE"` index types are supported.
**Note:** For distributed scalar indexing, currently only `"INVERTED"`, `"FTS"` and `"BTREE"` index types are supported.

### Return Value
#### Return Value

The function returns an updated Lance dataset with the newly created index.

### Vector Indexing

`create_index()` - Distributedly create vector indices using Ray. It leverages Ray to parallelize the index building process across multiple workers.

#### Supported Index Types
The following vector index types are supported for distributed building:
- `IVF_FLAT`
- `IVF_SQ`
- `IVF_PQ`

#### `create_index`

```python
def create_index(
uri: Union[str, "lance.LanceDataset"],
column: str,
index_type: str,
name: Optional[str] = None,
*,
replace: bool = True,
num_workers: int = 4,
storage_options: Optional[dict[str, str]] = None,
ray_remote_args: Optional[dict[str, Any]] = None,
metric: str = "l2",
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
**kwargs: Any,
) -> "lance.LanceDataset":
```

#### Parameters

| Parameter | Type | Description |
|-----------|------|-------------|
| `uri` | `str` or `lance.LanceDataset` | Lance dataset or its URI |
| `column` | `str` | Vector column name to index |
| `index_type` | `str` | Vector index type (e.g., `"IVF_PQ"`, `"IVF_SQ"`, `"IVF_FLAT"`) |
| `name` | `str`, optional | Index name, auto-generated if not provided |
| `replace` | `bool`, optional | Whether to replace existing index, default is `True` |
| `num_workers` | `int`, optional | Number of Ray workers to use, default is 4 |
| `storage_options` | `Dict[str, str]`, optional | Storage options for the dataset |
| `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) |
| `metric` | `str`, optional | Distance metric to use (e.g., `"l2"`, `"cosine"`, `"dot"`, `"hamming"`), default is `"l2"` |
| `num_partitions` | `int`, optional | Number of IVF partitions |
| `num_sub_vectors` | `int`, optional | Number of PQ sub-vectors |
| `**kwargs` | `Any` | Additional arguments to pass (e.g., `sample_rate`) |

#### Return Value

The function returns an updated Lance dataset with the newly created vector index.

## Examples

### FTS Index
### FTS Index (Scalar)
```python
import lance
import lance_ray as lr
Expand All @@ -81,7 +139,7 @@ dataset = lance.dataset("path/to/dataset")

# Build distributed index
updated_dataset = lr.create_scalar_index(
dataset=dataset,
uri=dataset.uri,
column="text",
index_type="INVERTED",
num_workers=4
Expand All @@ -98,13 +156,14 @@ results = updated_dataset.scanner(
).to_table()
print(f"Search results: {results}")
```
### BTREE Index

### BTREE Index (Scalar)
```python
# Assume a LanceDataset with a numeric column "id" exists at this path
import lance_ray as lr

updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
uri="path/to/dataset",
column="id",
index_type="BTREE",
name="btree_multiple_fragment_idx",
Expand All @@ -117,24 +176,48 @@ updated_dataset.scanner(filter="id = 100", columns=["id", "text"]).to_table()
updated_dataset.scanner(filter="id >= 200 AND id < 800", columns=["id", "text"]).to_table()
```

### Vector Index (IVF_PQ / IVF_SQ / IVF_FLAT)
```python
import lance_ray as lr

# Build a distributed IVF_PQ index
updated_dataset = lr.create_index(
uri="path/to/dataset.lance",
column="vector",
index_type="IVF_PQ",
name="idx_ivf_pq",
num_workers=4,
num_partitions=256,
num_sub_vectors=16,
metric="l2"
)

### Custom Index Name
# Build a distributed IVF_SQ index
updated_dataset = lr.create_index(
uri="path/to/dataset.lance",
column="vector",
index_type="IVF_SQ",
name="idx_ivf_sq",
num_workers=4,
num_partitions=256,
)

```python
updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
column="text",
index_type="INVERTED",
name="custom_text_index",
num_workers=4
# Build a distributed IVF_FLAT index
updated_dataset = lr.create_index(
uri="path/to/dataset.lance",
column="vector",
index_type="IVF_FLAT",
name="idx_ivf_flat",
num_workers=4,
num_partitions=256,
)
```

### Custom Ray Options

```python
updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
uri="path/to/dataset",
column="text",
index_type="INVERTED",
num_workers=4,
Expand All @@ -147,7 +230,7 @@ updated_dataset = lr.create_scalar_index(
```python
# Create index with custom name
updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
uri="path/to/dataset",
column="text",
index_type="INVERTED",
name="my_text_index",
Expand All @@ -156,7 +239,7 @@ updated_dataset = lr.create_scalar_index(

# Try to create another index with the same name (will replace by default)
updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
uri="path/to/dataset",
column="text",
index_type="INVERTED",
name="my_text_index", # Same name as before
Expand All @@ -169,7 +252,7 @@ import lance_ray as lr

try:
updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
uri="path/to/dataset",
column="text",
index_type="INVERTED",
name="my_text_index", # Same name as existing index
Expand Down
3 changes: 2 additions & 1 deletion lance_ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

# Fragment API imports
from .fragment import LanceFragmentWriter
from .index import create_scalar_index
from .index import create_index, create_scalar_index
from .io import add_columns, read_lance, write_lance

__all__ = [
"read_lance",
"write_lance",
"add_columns",
"create_scalar_index",
"create_index",
"compact_files",
"LanceFragmentWriter",
"LanceFragmentCommitter",
Expand Down
Loading