From 0f075a018afdb19a71c95d7eee734f9b99945165 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Jan 2026 14:52:25 -0800 Subject: [PATCH] add an FTS search benchmark for regression --- .../benchmarks/test_fts_search.py | 75 ++++++ .../python/ci_benchmarks/datagen/gen_all.py | 36 +++ .../python/ci_benchmarks/datagen/wikipedia.py | 213 ++++++++++++++++++ rust/lance/Cargo.toml | 4 + rust/lance/benches/fts_search.rs | 104 +++++++++ 5 files changed, 432 insertions(+) create mode 100644 python/python/ci_benchmarks/benchmarks/test_fts_search.py create mode 100644 python/python/ci_benchmarks/datagen/wikipedia.py create mode 100644 rust/lance/benches/fts_search.rs diff --git a/python/python/ci_benchmarks/benchmarks/test_fts_search.py b/python/python/ci_benchmarks/benchmarks/test_fts_search.py new file mode 100644 index 00000000000..4a3141e6e0c --- /dev/null +++ b/python/python/ci_benchmarks/benchmarks/test_fts_search.py @@ -0,0 +1,75 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +# Benchmarks for Full Text Search (FTS) queries on Wikipedia dataset. +# +# Tests various query types (basic, match, phrase) with different +# parameters (K values, cache settings) to measure FTS latency. +# +# This benchmark is loosely modeld after the Quickwit benchmark located +# at https://github.com/quickwit-oss/search-benchmark-game and uses a +# similar Wikipedia dataset. However, the dataset used by this benchmark +# comes from HuggingFace and is smaller so it can't be compared directly. + +import lance +import pytest +from ci_benchmarks.datasets import get_dataset_uri +from ci_benchmarks.utils import wipe_os_cache + +# K values for result limits +K_VALUES = [10, 100, 1000] +K_LABELS = ["k10", "k100", "k1000"] + +# Test queries - common Wikipedia search terms +BASIC_QUERIES = [ + "lost episode", + "artificial intelligence", + "database systems", +] + +BASIC_QUERY_LABELS = [ + "lost_episode", + "artificial_intelligence", + "database_systems", +] + +# Phrase queries for exact matching +PHRASE_QUERIES = [ + '"machine learning algorithm"', + '"artificial intelligence research"', +] + +PHRASE_QUERY_LABELS = [ + "phrase_machine_learning_algorithm", + "phrase_artificial_intelligence_research", +] + +ALL_QUERIES = BASIC_QUERIES + PHRASE_QUERIES +ALL_QUERY_LABELS = BASIC_QUERY_LABELS + PHRASE_QUERY_LABELS + + +@pytest.mark.parametrize("k", K_VALUES, ids=K_LABELS) +@pytest.mark.parametrize("query", ALL_QUERIES, ids=ALL_QUERY_LABELS) +@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"]) +def test_query(benchmark, k, query, use_cache): + """Benchmark basic FTS string query.""" + dataset_uri = get_dataset_uri("wikipedia") + ds = lance.dataset(dataset_uri) + + def clear_cache(): + wipe_os_cache(dataset_uri) + + def bench(): + to_search = ds if use_cache else lance.dataset(dataset_uri) + to_search.to_table(full_text_query=query, limit=k, columns=["_rowid"]) + + setup = None if use_cache else clear_cache + warmup_rounds = 1 if use_cache else 0 + + benchmark.pedantic( + bench, + warmup_rounds=warmup_rounds, + rounds=100, + iterations=1, + setup=setup, + ) diff --git a/python/python/ci_benchmarks/datagen/gen_all.py b/python/python/ci_benchmarks/datagen/gen_all.py index 3006a4cd641..1da7c05fd9b 100644 --- a/python/python/ci_benchmarks/datagen/gen_all.py +++ b/python/python/ci_benchmarks/datagen/gen_all.py @@ -1,9 +1,45 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors +import logging + +from lance.log import LOGGER + from ci_benchmarks.datagen.basic import gen_basic from ci_benchmarks.datagen.lineitems import gen_tcph +from ci_benchmarks.datagen.wikipedia import gen_wikipedia + + +def setup_logging(): + """Set up logging to display to console with timestamps.""" + # Check if handler already exists (avoid duplicate handlers) + if not LOGGER.handlers: + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + handler.setFormatter(formatter) + LOGGER.addHandler(handler) + LOGGER.setLevel(logging.INFO) + if __name__ == "__main__": + setup_logging() + LOGGER.info("=" * 80) + LOGGER.info("Starting dataset generation for all benchmarks") + LOGGER.info("=" * 80) + + LOGGER.info("Generating basic dataset...") gen_basic() + + LOGGER.info("Generating TPC-H lineitem dataset...") gen_tcph() + + LOGGER.info("Generating Wikipedia dataset...") + gen_wikipedia() + + LOGGER.info("=" * 80) + LOGGER.info("All datasets generated successfully!") + LOGGER.info("=" * 80) diff --git a/python/python/ci_benchmarks/datagen/wikipedia.py b/python/python/ci_benchmarks/datagen/wikipedia.py new file mode 100644 index 00000000000..b64b60fea47 --- /dev/null +++ b/python/python/ci_benchmarks/datagen/wikipedia.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +# Creates a Wikipedia dataset for Full Text Search (FTS) benchmarking. +# +# Downloads Wikipedia data from HuggingFace, creates a Lance dataset, and builds +# FTS indices to support various query types. + +import re + +import lance +import pyarrow as pa +from datasets import load_dataset +from lance.log import LOGGER + +from ci_benchmarks.datasets import get_dataset_uri + +# HuggingFace dataset configuration +HF_DATASET = "wikimedia/wikipedia" +HF_SUBSET = "20231101.en" +HF_SPLIT = "train" +NUM_ROWS = 100_000 + +SCHEMA = pa.schema( + { + "id": pa.string(), + "text": pa.large_string(), + } +) + + +def _download_and_process_wikipedia(batch_size: int = 5000): + """Download Wikipedia data from HuggingFace and yield batches. + + Downloads the first NUM_ROWS from the wikimedia/wikipedia dataset + and yields PyArrow RecordBatches. + + Args: + batch_size: Number of rows per batch + + Yields: + PyArrow RecordBatch + """ + LOGGER.info( + "Downloading Wikipedia dataset from HuggingFace: %s (subset: %s, split: %s)", + HF_DATASET, + HF_SUBSET, + HF_SPLIT, + ) + LOGGER.info("Will download first %s rows", f"{NUM_ROWS:,}") + + # Load dataset from HuggingFace with streaming to avoid loading all into memory + LOGGER.info("Loading dataset in streaming mode...") + dataset = load_dataset( + HF_DATASET, + HF_SUBSET, + split=HF_SPLIT, + streaming=True, + ) + + LOGGER.info("Dataset initialized, starting to download and process rows...") + + batch_data = {"id": [], "text": []} + total_rows = 0 + + for idx, row in enumerate(dataset): + if total_rows >= NUM_ROWS: + break + + # Extract fields + # HuggingFace wikipedia dataset has: id, url, title, text + row_id = row.get("url", f"row_{idx}") + text = row.get("text", "") + + # Skip empty text + if not text or text.strip() == "": + continue + + # Transform text (lowercase and keep only letters) + batch_data["id"].append(row_id) + batch_data["text"].append(transform(text)) + + # Yield batch when we reach batch_size + if len(batch_data["id"]) >= batch_size: + batch = pa.record_batch( + [ + pa.array(batch_data["id"], type=pa.string()), + pa.array(batch_data["text"], type=pa.large_string()), + ], + names=["id", "text"], + ) + yield batch + total_rows += len(batch_data["id"]) + progress_pct = (total_rows / NUM_ROWS) * 100 + LOGGER.info( + "Processed %s / %s rows (%.1f%%)", + f"{total_rows:,}", + f"{NUM_ROWS:,}", + progress_pct, + ) + + # Clear batch data + batch_data = {"id": [], "text": []} + + # Yield remaining data + if batch_data["id"]: + batch = pa.record_batch( + [ + pa.array(batch_data["id"], type=pa.string()), + pa.array(batch_data["text"], type=pa.large_string()), + ], + names=["id", "text"], + ) + yield batch + total_rows += len(batch_data["id"]) + + LOGGER.info("Finished processing %s total rows", f"{total_rows:,}") + + +PTN = re.compile("[^a-zA-Z]+") + + +def transform(text): + return PTN.sub(" ", text.lower()) + + +def _create_indices(ds: lance.LanceDataset): + """Create FTS indices on the dataset. + + Creates indices to support different query types: + 1. Inverted index with position for phrase queries + + Args: + ds: Lance dataset to create indices on + """ + existing_indices = [idx.name for idx in ds.list_indices()] + + # Create inverted index with position support for phrase queries + # This index supports both match and phrase queries + if "text_fts_idx" not in existing_indices: + LOGGER.info("Creating FTS index on 'text' column with position support") + ds.create_scalar_index( + "text", + index_type="INVERTED", + with_position=True, + name="text_fts_idx", + ) + LOGGER.info("FTS index 'text_fts_idx' created successfully") + else: + LOGGER.info("FTS index 'text_fts_idx' already exists") + + +def _create(dataset_uri: str): + """Create Wikipedia dataset and indices (idempotent). + + Args: + dataset_uri: URI where the dataset should be created + """ + LOGGER.info("Checking if Wikipedia dataset exists at %s", dataset_uri) + + try: + ds = lance.dataset(dataset_uri) + row_count = ds.count_rows() + LOGGER.info("Dataset exists with %s rows", f"{row_count:,}") + + # Check if indices exist + existing_indices = [idx.name for idx in ds.list_indices()] + if "text_fts_idx" in existing_indices: + LOGGER.info("Dataset and indices already exist, skipping generation") + return + else: + LOGGER.info("Dataset exists but indices are missing, creating indices...") + _create_indices(ds) + return + + except ValueError: + # Dataset doesn't exist, create it + LOGGER.info("Dataset does not exist, will create from HuggingFace source") + + # Download and create dataset + LOGGER.info("Starting Wikipedia dataset creation at %s", dataset_uri) + ds = lance.write_dataset( + _download_and_process_wikipedia(), + dataset_uri, + schema=SCHEMA, + mode="create", + use_legacy_format=False, + ) + + row_count = ds.count_rows() + LOGGER.info("Dataset created successfully with %s rows", f"{row_count:,}") + + # Create FTS indices + LOGGER.info("Creating FTS indices...") + _create_indices(ds) + + LOGGER.info("Wikipedia dataset generation complete!") + + +def gen_wikipedia(): + """Generate Wikipedia dataset for FTS benchmarks. + + This is the main entry point for dataset generation. + Downloads the first 1 million rows from the wikimedia/wikipedia dataset + (20231101.en subset) from HuggingFace, creates a Lance dataset, + and builds FTS indices. + """ + dataset_uri = get_dataset_uri("wikipedia") + _create(dataset_uri) + + +if __name__ == "__main__": + gen_wikipedia() diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index a36cb6c25e0..0ad1bbf26a9 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -166,6 +166,10 @@ harness = false name = "random_access" harness = false +[[bench]] +name = "fts_search" +harness = false + [[bench]] name = "vector_throughput" harness = false diff --git a/rust/lance/benches/fts_search.rs b/rust/lance/benches/fts_search.rs new file mode 100644 index 00000000000..3832d6cd40b --- /dev/null +++ b/rust/lance/benches/fts_search.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +/// This is a rust end-to-end benchmark for full text search. It is meant to be supplementary to the +/// python benchmark located at python/python/ci_benchmarks/benchmarks/test_fts_search.py. You can use +/// the python/python/ci_benchmarks/datagen/wikipedia.py script to generate the dataset. You will need +/// to set the LANCE_WIKIPEDIA_DATASET_PATH environment variable to the path of the dataset generated +/// by that script. +/// +/// This benchmark is primarily intended for developers to use for profiling and debugging. The python +/// benchmark is more comprehensive and will cover regression testing. +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use futures::TryStreamExt; +use lance::Dataset; +use lance_index::scalar::FullTextSearchQuery; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; +use std::env; + +const WIKIPEDIA_DATASET_ENV_VAR: &str = "LANCE_WIKIPEDIA_DATASET_PATH"; + +/// Get the Wikipedia dataset path from environment variable. +/// Panics if the environment variable is not set. +fn get_wikipedia_dataset_path() -> String { + env::var(WIKIPEDIA_DATASET_ENV_VAR).unwrap_or_else(|_| { + panic!( + "Environment variable {} must be set to the path of the indexed Wikipedia dataset", + WIKIPEDIA_DATASET_ENV_VAR + ) + }) +} + +/// Benchmark full text search on Wikipedia dataset with different K values +fn bench_fts_search(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let dataset_path = get_wikipedia_dataset_path(); + + // Open the dataset once + let dataset = rt + .block_on(Dataset::open(&dataset_path)) + .unwrap_or_else(|e| { + panic!( + "Failed to open Wikipedia dataset at '{}': {}", + dataset_path, e + ) + }); + + // Test with different K values + let k_values = [10, 100, 1000]; + + let mut group = c.benchmark_group("fts_search_lost_episode"); + + for k in k_values.iter() { + group.bench_with_input(BenchmarkId::from_parameter(k), k, |b, &k| { + b.iter(|| { + rt.block_on(async { + let mut scanner = dataset.scan(); + let mut stream = scanner + .full_text_search(FullTextSearchQuery::new("lost episode".to_string())) + .unwrap() + .limit(Some(k as i64), None) + .unwrap() + .project(&["_rowid"]) + .unwrap() + .try_into_stream() + .await + .unwrap(); + + let mut num_rows = 0; + while let Some(batch) = stream.try_next().await.unwrap() { + num_rows += batch.num_rows(); + } + + // Verify we got results (should be at most k rows) + assert!( + num_rows <= k, + "Expected at most {} rows, got {}", + k, + num_rows + ); + }) + }); + }); + } + + group.finish(); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_fts_search +); + +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = bench_fts_search +); + +criterion_main!(benches);