Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions python/python/ci_benchmarks/benchmarks/test_fts_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

# Benchmarks for Full Text Search (FTS) queries on Wikipedia dataset.
#
# Tests various query types (basic, match, phrase) with different
# parameters (K values, cache settings) to measure FTS latency.
#
# This benchmark is loosely modeld after the Quickwit benchmark located
# at https://github.com/quickwit-oss/search-benchmark-game and uses a
# similar Wikipedia dataset. However, the dataset used by this benchmark
# comes from HuggingFace and is smaller so it can't be compared directly.

import lance
import pytest
from ci_benchmarks.datasets import get_dataset_uri
from ci_benchmarks.utils import wipe_os_cache

# K values for result limits
K_VALUES = [10, 100, 1000]
K_LABELS = ["k10", "k100", "k1000"]

# Test queries - common Wikipedia search terms
BASIC_QUERIES = [
"lost episode",
"artificial intelligence",
"database systems",
]

BASIC_QUERY_LABELS = [
"lost_episode",
"artificial_intelligence",
"database_systems",
]

# Phrase queries for exact matching
PHRASE_QUERIES = [
'"machine learning algorithm"',
'"artificial intelligence research"',
]

PHRASE_QUERY_LABELS = [
"phrase_machine_learning_algorithm",
"phrase_artificial_intelligence_research",
]

ALL_QUERIES = BASIC_QUERIES + PHRASE_QUERIES
ALL_QUERY_LABELS = BASIC_QUERY_LABELS + PHRASE_QUERY_LABELS


@pytest.mark.parametrize("k", K_VALUES, ids=K_LABELS)
@pytest.mark.parametrize("query", ALL_QUERIES, ids=ALL_QUERY_LABELS)
@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"])
def test_query(benchmark, k, query, use_cache):
"""Benchmark basic FTS string query."""
dataset_uri = get_dataset_uri("wikipedia")
ds = lance.dataset(dataset_uri)

def clear_cache():
wipe_os_cache(dataset_uri)

def bench():
to_search = ds if use_cache else lance.dataset(dataset_uri)
to_search.to_table(full_text_query=query, limit=k, columns=["_rowid"])

setup = None if use_cache else clear_cache
warmup_rounds = 1 if use_cache else 0

benchmark.pedantic(
bench,
warmup_rounds=warmup_rounds,
rounds=100,
iterations=1,
setup=setup,
)
36 changes: 36 additions & 0 deletions python/python/ci_benchmarks/datagen/gen_all.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import logging

from lance.log import LOGGER

from ci_benchmarks.datagen.basic import gen_basic
from ci_benchmarks.datagen.lineitems import gen_tcph
from ci_benchmarks.datagen.wikipedia import gen_wikipedia


def setup_logging():
"""Set up logging to display to console with timestamps."""
# Check if handler already exists (avoid duplicate handlers)
if not LOGGER.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
LOGGER.setLevel(logging.INFO)


if __name__ == "__main__":
setup_logging()
LOGGER.info("=" * 80)
LOGGER.info("Starting dataset generation for all benchmarks")
LOGGER.info("=" * 80)

LOGGER.info("Generating basic dataset...")
gen_basic()

LOGGER.info("Generating TPC-H lineitem dataset...")
gen_tcph()

LOGGER.info("Generating Wikipedia dataset...")
gen_wikipedia()

LOGGER.info("=" * 80)
LOGGER.info("All datasets generated successfully!")
LOGGER.info("=" * 80)
213 changes: 213 additions & 0 deletions python/python/ci_benchmarks/datagen/wikipedia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

# Creates a Wikipedia dataset for Full Text Search (FTS) benchmarking.
#
# Downloads Wikipedia data from HuggingFace, creates a Lance dataset, and builds
# FTS indices to support various query types.

import re

import lance
import pyarrow as pa
from datasets import load_dataset
from lance.log import LOGGER

from ci_benchmarks.datasets import get_dataset_uri

# HuggingFace dataset configuration
HF_DATASET = "wikimedia/wikipedia"
HF_SUBSET = "20231101.en"
HF_SPLIT = "train"
NUM_ROWS = 100_000

SCHEMA = pa.schema(
{
"id": pa.string(),
"text": pa.large_string(),
}
)


def _download_and_process_wikipedia(batch_size: int = 5000):
"""Download Wikipedia data from HuggingFace and yield batches.

Downloads the first NUM_ROWS from the wikimedia/wikipedia dataset
and yields PyArrow RecordBatches.

Args:
batch_size: Number of rows per batch

Yields:
PyArrow RecordBatch
"""
LOGGER.info(
"Downloading Wikipedia dataset from HuggingFace: %s (subset: %s, split: %s)",
HF_DATASET,
HF_SUBSET,
HF_SPLIT,
)
LOGGER.info("Will download first %s rows", f"{NUM_ROWS:,}")

# Load dataset from HuggingFace with streaming to avoid loading all into memory
LOGGER.info("Loading dataset in streaming mode...")
dataset = load_dataset(
HF_DATASET,
HF_SUBSET,
split=HF_SPLIT,
streaming=True,
)

LOGGER.info("Dataset initialized, starting to download and process rows...")

batch_data = {"id": [], "text": []}
total_rows = 0

for idx, row in enumerate(dataset):
if total_rows >= NUM_ROWS:
break

# Extract fields
# HuggingFace wikipedia dataset has: id, url, title, text
row_id = row.get("url", f"row_{idx}")
text = row.get("text", "")

# Skip empty text
if not text or text.strip() == "":
continue

# Transform text (lowercase and keep only letters)
batch_data["id"].append(row_id)
batch_data["text"].append(transform(text))

# Yield batch when we reach batch_size
if len(batch_data["id"]) >= batch_size:
batch = pa.record_batch(
[
pa.array(batch_data["id"], type=pa.string()),
pa.array(batch_data["text"], type=pa.large_string()),
],
names=["id", "text"],
)
yield batch
total_rows += len(batch_data["id"])
progress_pct = (total_rows / NUM_ROWS) * 100
LOGGER.info(
"Processed %s / %s rows (%.1f%%)",
f"{total_rows:,}",
f"{NUM_ROWS:,}",
progress_pct,
)

# Clear batch data
batch_data = {"id": [], "text": []}

# Yield remaining data
if batch_data["id"]:
batch = pa.record_batch(
[
pa.array(batch_data["id"], type=pa.string()),
pa.array(batch_data["text"], type=pa.large_string()),
],
names=["id", "text"],
)
yield batch
total_rows += len(batch_data["id"])

LOGGER.info("Finished processing %s total rows", f"{total_rows:,}")


PTN = re.compile("[^a-zA-Z]+")


def transform(text):
return PTN.sub(" ", text.lower())


def _create_indices(ds: lance.LanceDataset):
"""Create FTS indices on the dataset.

Creates indices to support different query types:
1. Inverted index with position for phrase queries

Args:
ds: Lance dataset to create indices on
"""
existing_indices = [idx.name for idx in ds.list_indices()]

# Create inverted index with position support for phrase queries
# This index supports both match and phrase queries
if "text_fts_idx" not in existing_indices:
LOGGER.info("Creating FTS index on 'text' column with position support")
ds.create_scalar_index(
"text",
index_type="INVERTED",
with_position=True,
name="text_fts_idx",
)
LOGGER.info("FTS index 'text_fts_idx' created successfully")
else:
LOGGER.info("FTS index 'text_fts_idx' already exists")


def _create(dataset_uri: str):
"""Create Wikipedia dataset and indices (idempotent).

Args:
dataset_uri: URI where the dataset should be created
"""
LOGGER.info("Checking if Wikipedia dataset exists at %s", dataset_uri)

try:
ds = lance.dataset(dataset_uri)
row_count = ds.count_rows()
LOGGER.info("Dataset exists with %s rows", f"{row_count:,}")

# Check if indices exist
existing_indices = [idx.name for idx in ds.list_indices()]
if "text_fts_idx" in existing_indices:
LOGGER.info("Dataset and indices already exist, skipping generation")
return
else:
LOGGER.info("Dataset exists but indices are missing, creating indices...")
_create_indices(ds)
return

except ValueError:
# Dataset doesn't exist, create it
LOGGER.info("Dataset does not exist, will create from HuggingFace source")

# Download and create dataset
LOGGER.info("Starting Wikipedia dataset creation at %s", dataset_uri)
ds = lance.write_dataset(
_download_and_process_wikipedia(),
dataset_uri,
schema=SCHEMA,
mode="create",
use_legacy_format=False,
)

row_count = ds.count_rows()
LOGGER.info("Dataset created successfully with %s rows", f"{row_count:,}")

# Create FTS indices
LOGGER.info("Creating FTS indices...")
_create_indices(ds)

LOGGER.info("Wikipedia dataset generation complete!")


def gen_wikipedia():
"""Generate Wikipedia dataset for FTS benchmarks.

This is the main entry point for dataset generation.
Downloads the first 1 million rows from the wikimedia/wikipedia dataset
(20231101.en subset) from HuggingFace, creates a Lance dataset,
and builds FTS indices.
"""
dataset_uri = get_dataset_uri("wikipedia")
_create(dataset_uri)


if __name__ == "__main__":
gen_wikipedia()
4 changes: 4 additions & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ harness = false
name = "random_access"
harness = false

[[bench]]
name = "fts_search"
harness = false

[[bench]]
name = "vector_throughput"
harness = false
Expand Down
Loading
Loading