diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index e22633ad..3d18385a 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) +# Passage ID schemes recorded in .meta.json["passage_id_scheme"]. +# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text). +# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across +# file moves and reorderings. +# Older indexes have no passage_id_scheme field — readers must default to +# "sequential" when the key is absent. See #329 for the rollout plan. +PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential" +PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash" + def get_registered_backends() -> list[str]: """Get list of registered backend names.""" @@ -376,6 +385,7 @@ def __init__( embedding_options: Optional[dict[str, Any]] = None, prebuild_bm25: bool = False, bm25_backend: str = "fts5", + passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL, **backend_kwargs, ): if bm25_backend != "fts5": @@ -383,7 +393,22 @@ def __init__( bm25_backend = "fts5" self.bm25_backend = bm25_backend self.prebuild_bm25 = prebuild_bm25 or bm25_backend == "fts5" + if passage_id_scheme not in ( + PASSAGE_ID_SCHEME_SEQUENTIAL, + PASSAGE_ID_SCHEME_CONTENT_HASH, + ): + raise ValueError( + f"Unknown passage_id_scheme: {passage_id_scheme!r}. " + f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, " + f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}." + ) + self.passage_id_scheme = passage_id_scheme self.backend_name = backend_name + if backend_name == "diskann" and passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + raise ValueError( + "passage_id_scheme='content-hash' is not supported by the DiskANN backend yet " + "because DiskANN search returns numeric labels without a persisted passage ID map." + ) # Normalize incompatible combinations early (for consistent metadata) if backend_name == "hnsw": is_recompute = backend_kwargs.get("is_recompute", True) @@ -477,10 +502,37 @@ def __init__( self.backend_kwargs = backend_kwargs self.chunks: list[dict[str, Any]] = [] + @staticmethod + def _make_unique_passage_id(base_id: str, reserved_ids: set[str]) -> str: + if base_id not in reserved_ids: + return base_id + suffix = 1 + while f"{base_id}-{suffix}" in reserved_ids: + suffix += 1 + return f"{base_id}-{suffix}" + + def _generate_passage_id(self, text: str, reserved_ids: Optional[set[str]] = None) -> str: + """Generate a passage ID per the configured scheme. + + sequential: str(insertion index) — fast, position-dependent, current default. + content-hash: sha256(text)[:16] — content-stable across file moves and + reorderings when text is unique. Duplicate text receives a numeric suffix + so the passage store, ID maps, and vector labels stay one-to-one. + """ + if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + import hashlib + + base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + return self._make_unique_passage_id(base_id, reserved_ids or set()) + return str(len(self.chunks)) + def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None): if metadata is None: metadata = {} - passage_id = metadata.get("id", str(len(self.chunks))) + if "id" in metadata and metadata["id"] is not None: + passage_id = str(metadata["id"]) + else: + passage_id = self._generate_passage_id(text, {chunk["id"] for chunk in self.chunks}) chunk_data = {"id": passage_id, "text": text, "metadata": metadata} self.chunks.append(chunk_data) @@ -570,12 +622,13 @@ def build_index(self, index_path: str): builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs) leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -714,12 +767,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda # Create metadata file leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -881,14 +935,28 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] ) valid_chunks: list[dict[str, Any]] = [] + reserved_ids = set(existing_ids) + next_sequential_id = len(offset_map) for chunk in self.chunks: text = chunk.get("text", "") if not isinstance(text, str) or not text.strip(): continue metadata = chunk.setdefault("metadata", {}) - passage_id = chunk.get("id") or metadata.get("id") - if passage_id and passage_id in existing_ids: + if "id" in metadata and metadata["id"] is not None: + passage_id = str(metadata["id"]) + elif self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + passage_id = self._generate_passage_id(text, reserved_ids) + else: + while str(next_sequential_id) in reserved_ids: + next_sequential_id += 1 + passage_id = str(next_sequential_id) + next_sequential_id += 1 + + if passage_id in reserved_ids: raise ValueError(f"Passage ID '{passage_id}' already exists in the index.") + chunk["id"] = passage_id + metadata["id"] = passage_id + reserved_ids.add(passage_id) valid_chunks.append(chunk) if not valid_chunks: @@ -924,12 +992,6 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] # IVF: add_vectors then append passages/offset (no ZMQ/server) if backend_name == "ivf": - for i, chunk in enumerate(valid_chunks): - pid = chunk.get("id") or chunk.get("metadata", {}).get("id") - if not pid: - pid = str(len(offset_map) + i) - chunk.setdefault("metadata", {})["id"] = pid - chunk["id"] = pid passage_ids = [c["id"] for c in valid_chunks] try: from leann_backend_ivf import add_vectors as ivf_add_vectors @@ -1012,17 +1074,16 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] passage_meta_mode = meta.get("embedding_mode", self.embedding_mode) passage_provider_options = meta.get("embedding_options", self.embedding_options) - base_id = index.ntotal - for offset, chunk in enumerate(valid_chunks): - new_id = str(base_id + offset) - chunk.setdefault("metadata", {})["id"] = new_id - chunk["id"] = new_id - # Append passages/offsets before we attempt index.add so the ZMQ server # can resolve newly assigned IDs during recompute. Keep rollback hooks # so we can restore files if the update fails mid-way. rollback_passages_size = passages_file.stat().st_size if passages_file.exists() else 0 offset_map_backup = offset_map.copy() + idmap_file = ( + path.parent + / f"{path.name[: -len('.leann')] if path.name.endswith('.leann') else path.name}.ids.txt" + ) + rollback_idmap_size = idmap_file.stat().st_size if idmap_file.exists() else None try: with open(passages_file, "a", encoding="utf-8") as f: @@ -1042,6 +1103,9 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] with open(offset_file, "wb") as f: pickle.dump(offset_map, f) + with open(idmap_file, "a", encoding="utf-8") as f: + for chunk in valid_chunks: + f.write(str(chunk["id"]) + "\n") server_manager: Optional[EmbeddingServerManager] = None server_started = False @@ -1097,6 +1161,11 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] offset_map = offset_map_backup with open(offset_file, "wb") as f: pickle.dump(offset_map, f) + if rollback_idmap_size is None: + idmap_file.unlink(missing_ok=True) + elif idmap_file.exists(): + with open(idmap_file, "rb+") as f: + f.truncate(rollback_idmap_size) raise meta["total_passages"] = len(offset_map) @@ -1177,6 +1246,12 @@ def __init__( ) self.bm25_scorer: Optional[BM25Index] = None + # Surface the index's passage ID scheme so callers can introspect. + # Older indexes (pre-#330) don't record this field — they're sequential. + self.passage_id_scheme: str = self.meta_data.get( + "passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL + ) + # Optional one-shot warmup at construction time to hide cold-start latency. if self._warmup: self.warmup() diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 0d50cf34..ae4803e5 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -16,7 +16,13 @@ from llama_index.core.node_parser import SentenceSplitter from tqdm import tqdm -from .api import LeannBuilder, LeannChat, LeannSearcher +from .api import ( + PASSAGE_ID_SCHEME_CONTENT_HASH, + PASSAGE_ID_SCHEME_SEQUENTIAL, + LeannBuilder, + LeannChat, + LeannSearcher, +) from .embedding_server_manager import EmbeddingServerManager from .interactive_utils import create_cli_session from .registry import register_project_directory @@ -347,6 +353,16 @@ def create_parser(self) -> argparse.ArgumentParser: default=True, help="Fall back to traditional chunking if AST chunking fails (default: True)", ) + build_parser.add_argument( + "--id-scheme", + choices=["sequential", "content-hash"], + default="sequential", + help=( + "How passage IDs are assigned. 'sequential' (default) keys by insertion " + "order; 'content-hash' uses sha256(text)[:16], stable across file moves " + "and reorderings. See #329." + ), + ) # Watch command watch_parser = subparsers.add_parser( @@ -1880,7 +1896,31 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict in paths ] + def _existing_index_id_scheme(self, index_path: str) -> Optional[str]: + """Return the passage_id_scheme recorded in an existing index's meta.json. + + Returns None when the index doesn't exist yet. Older indexes pre-#330 + have no field and are sequential by definition. + """ + meta_path = Path(index_path).with_suffix(".leann.meta.json") + if not meta_path.exists(): + return None + try: + with open(meta_path, encoding="utf-8") as f: + return json.load(f).get("passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL) + except Exception: + return None + def _make_incremental_builder(self, args) -> "LeannBuilder": + # For incremental updates, the existing index's scheme wins. Otherwise + # IDs would mix schemes within one index, which breaks lookups. + existing_scheme = self._existing_index_id_scheme(self.get_index_path(args.index_name)) + scheme = existing_scheme or getattr(args, "id_scheme", "sequential") + if existing_scheme and getattr(args, "id_scheme", existing_scheme) != existing_scheme: + print( + f"Note: --id-scheme={args.id_scheme} ignored — index '{args.index_name}' " + f"was built with passage_id_scheme={existing_scheme!r}, keeping that." + ) return LeannBuilder( backend_name=args.backend_name, embedding_model=args.embedding_model, @@ -1891,6 +1931,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder": is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=scheme, ) def _incremental_add_only( @@ -1904,8 +1945,9 @@ def _incremental_add_only( new_chunks = self._chunks_for_paths(all_texts, new_paths) if not new_chunks: return False - self._assign_chunk_ids(new_chunks) builder = self._make_incremental_builder(args) + if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH: + self._assign_chunk_ids(new_chunks) for chunk in new_chunks: builder.add_text(chunk["text"], metadata=chunk["metadata"]) print( @@ -1998,13 +2040,15 @@ def _incremental_ivf_update( for p in changed_paths: path_set.update(self._path_lookup_keys(p, sync_roots)) new_chunks = self._chunks_for_paths(all_texts, path_set) - # Use unique IDs: passages can have mixed path formats so we may miss some ids_to_remove - self._assign_unique_chunk_ids(new_chunks) + # Use unique IDs for sequential indexes: passages can have mixed path formats so we may + # miss some ids_to_remove. Content-hash indexes let LeannBuilder derive IDs from text. + builder = self._make_incremental_builder(args) + if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH: + self._assign_unique_chunk_ids(new_chunks) if not ids_to_remove and not new_chunks: return False - builder = self._make_incremental_builder(args) for chunk in new_chunks: builder.add_text(chunk["text"], metadata=chunk["metadata"]) @@ -2384,6 +2428,7 @@ async def build_index(self, args): is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) for chunk in all_texts: diff --git a/tests/test_passage_id_scheme.py b/tests/test_passage_id_scheme.py new file mode 100644 index 00000000..ec511355 --- /dev/null +++ b/tests/test_passage_id_scheme.py @@ -0,0 +1,181 @@ +import hashlib +import json +import pickle +import sys +from types import ModuleType, SimpleNamespace +from typing import Any, cast + +import leann.api as leann_api +import numpy as np +from leann.api import LeannBuilder +from leann.cli import LeannCLI + + +def _content_id(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def _write_ivf_index(tmp_path, *, passage_id_scheme: str = "content-hash") -> str: + index_path = tmp_path / "documents.leann" + existing_id = _content_id("existing text") + passages_file = tmp_path / "documents.leann.passages.jsonl" + offset_file = tmp_path / "documents.leann.passages.idx" + with passages_file.open("w", encoding="utf-8") as f: + offset = f.tell() + f.write( + json.dumps( + {"id": existing_id, "text": "existing text", "metadata": {"id": existing_id}} + ) + + "\n" + ) + with offset_file.open("wb") as f: + pickle.dump({existing_id: offset}, f) + (tmp_path / "documents.index").write_bytes(b"fake-index") + (tmp_path / "documents.leann.meta.json").write_text( + json.dumps( + { + "version": "1.1", + "backend_name": "ivf", + "embedding_model": "test-model", + "embedding_mode": "sentence-transformers", + "dimensions": 2, + "backend_kwargs": {}, + "passage_id_scheme": passage_id_scheme, + "passage_sources": [ + { + "type": "jsonl", + "path": passages_file.name, + "index_path": offset_file.name, + } + ], + } + ), + encoding="utf-8", + ) + return str(index_path) + + +def _patch_ivf_update(monkeypatch): + calls = [] + fake_ivf = ModuleType("leann_backend_ivf") + + def add_vectors(index_path, embeddings, passage_ids): + calls.append((index_path, embeddings.copy(), list(passage_ids))) + + cast(Any, fake_ivf).add_vectors = add_vectors + monkeypatch.setitem(sys.modules, "leann_backend_ivf", fake_ivf) + monkeypatch.setitem(leann_api.BACKEND_REGISTRY, "ivf", SimpleNamespace()) + monkeypatch.setattr( + leann_api, + "compute_embeddings", + lambda texts, *args, **kwargs: np.ones((len(texts), 2), dtype=np.float32), + ) + return calls + + +def test_builder_content_hash_passage_ids_suffix_duplicate_text(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("same text", metadata={"source": "a.txt"}) + builder.add_text("same text", metadata={"source": "b.txt"}) + builder.add_text("different text", metadata={"source": "c.txt"}) + + same_id = _content_id("same text") + assert builder.chunks[0]["id"] == same_id + assert builder.chunks[1]["id"] == f"{same_id}-1" + assert builder.chunks[2]["id"] == _content_id("different text") + + +def test_builder_preserves_falsy_explicit_metadata_id(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("zero id", metadata={"id": 0, "source": "a.txt"}) + builder.add_text("empty id", metadata={"id": "", "source": "b.txt"}) + + assert builder.chunks[0]["id"] == "0" + assert builder.chunks[1]["id"] == "" + + +def test_builder_rejects_content_hash_diskann_until_id_map_exists(): + try: + LeannBuilder(backend_name="diskann", passage_id_scheme="content-hash") + except ValueError as exc: + assert "not supported by the DiskANN backend" in str(exc) + else: + raise AssertionError("DiskANN content-hash builds must be rejected") + + +def test_legacy_missing_id_scheme_is_sequential(tmp_path): + meta_path = tmp_path / "documents.leann.meta.json" + meta_path.write_text(json.dumps({"version": "1.0"}), encoding="utf-8") + + cli = LeannCLI() + + assert cli._existing_index_id_scheme(str(tmp_path / "documents.leann")) == "sequential" + + +def test_incremental_content_hash_add_only_does_not_preassign_path_ids(tmp_path, monkeypatch): + cli = LeannCLI() + added_chunks = [] + + class FakeBuilder: + passage_id_scheme = "content-hash" + + def add_text(self, text, metadata=None): + added_chunks.append((text, dict(metadata or {}))) + + def update_index(self, index_path): + assert index_path == str(tmp_path / "documents.leann") + + monkeypatch.setattr(cli, "_make_incremental_builder", lambda _args: FakeBuilder()) + all_texts = [ + { + "text": "stable content", + "metadata": {"file_path": str(tmp_path / "doc.txt")}, + } + ] + + assert cli._incremental_add_only( + str(tmp_path / "documents.leann"), + all_texts, + SimpleNamespace(index_name="demo"), + {str(tmp_path / "doc.txt")}, + ) + + assert added_chunks == [("stable content", {"file_path": str(tmp_path / "doc.txt")})] + + +def test_update_content_hash_suffixes_existing_id_collision(tmp_path, monkeypatch): + index_path = _write_ivf_index(tmp_path) + add_vectors_calls = _patch_ivf_update(monkeypatch) + builder = LeannBuilder( + backend_name="ivf", + dimensions=2, + passage_id_scheme="content-hash", + ) + + builder.add_text("existing text", metadata={"source": "duplicate.txt"}) + builder.update_index(index_path) + + assert add_vectors_calls[0][2] == [f"{_content_id('existing text')}-1"] + offset_map = pickle.loads((tmp_path / "documents.leann.passages.idx").read_bytes()) + assert f"{_content_id('existing text')}-1" in offset_map + + +def test_update_preserves_falsy_explicit_metadata_ids(tmp_path, monkeypatch): + index_path = _write_ivf_index(tmp_path) + add_vectors_calls = _patch_ivf_update(monkeypatch) + builder = LeannBuilder( + backend_name="ivf", + dimensions=2, + passage_id_scheme="content-hash", + ) + + builder.add_text("zero id", metadata={"id": 0}) + builder.add_text("empty id", metadata={"id": ""}) + builder.update_index(index_path) + + assert add_vectors_calls[0][2] == ["0", ""] + offset_map = pickle.loads((tmp_path / "documents.leann.passages.idx").read_bytes()) + assert "0" in offset_map + assert "" in offset_map