Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 92 additions & 17 deletions packages/leann-core/src/leann/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@

logger = logging.getLogger(__name__)

# Passage ID schemes recorded in <index>.meta.json["passage_id_scheme"].
# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text).
# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across
# file moves and reorderings.
# Older indexes have no passage_id_scheme field — readers must default to
# "sequential" when the key is absent. See #329 for the rollout plan.
PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential"
PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash"


def get_registered_backends() -> list[str]:
"""Get list of registered backend names."""
Expand Down Expand Up @@ -376,14 +385,30 @@ def __init__(
embedding_options: Optional[dict[str, Any]] = None,
prebuild_bm25: bool = False,
bm25_backend: str = "fts5",
passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL,
**backend_kwargs,
):
if bm25_backend != "fts5":
logger.warning(f"bm25_backend={bm25_backend!r} is deprecated; using 'fts5'.")
bm25_backend = "fts5"
self.bm25_backend = bm25_backend
self.prebuild_bm25 = prebuild_bm25 or bm25_backend == "fts5"
if passage_id_scheme not in (
PASSAGE_ID_SCHEME_SEQUENTIAL,
PASSAGE_ID_SCHEME_CONTENT_HASH,
):
raise ValueError(
f"Unknown passage_id_scheme: {passage_id_scheme!r}. "
f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, "
f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}."
)
self.passage_id_scheme = passage_id_scheme
self.backend_name = backend_name
if backend_name == "diskann" and passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
raise ValueError(
"passage_id_scheme='content-hash' is not supported by the DiskANN backend yet "
"because DiskANN search returns numeric labels without a persisted passage ID map."
)
# Normalize incompatible combinations early (for consistent metadata)
if backend_name == "hnsw":
is_recompute = backend_kwargs.get("is_recompute", True)
Expand Down Expand Up @@ -477,10 +502,37 @@ def __init__(
self.backend_kwargs = backend_kwargs
self.chunks: list[dict[str, Any]] = []

@staticmethod
def _make_unique_passage_id(base_id: str, reserved_ids: set[str]) -> str:
if base_id not in reserved_ids:
return base_id
suffix = 1
while f"{base_id}-{suffix}" in reserved_ids:
suffix += 1
return f"{base_id}-{suffix}"

def _generate_passage_id(self, text: str, reserved_ids: Optional[set[str]] = None) -> str:
"""Generate a passage ID per the configured scheme.

sequential: str(insertion index) — fast, position-dependent, current default.
content-hash: sha256(text)[:16] — content-stable across file moves and
reorderings when text is unique. Duplicate text receives a numeric suffix
so the passage store, ID maps, and vector labels stay one-to-one.
"""
if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
import hashlib

base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return self._make_unique_passage_id(base_id, reserved_ids or set())
return str(len(self.chunks))

def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None):
if metadata is None:
metadata = {}
passage_id = metadata.get("id", str(len(self.chunks)))
if "id" in metadata and metadata["id"] is not None:
passage_id = str(metadata["id"])
else:
passage_id = self._generate_passage_id(text, {chunk["id"] for chunk in self.chunks})
chunk_data = {"id": passage_id, "text": text, "metadata": metadata}
self.chunks.append(chunk_data)

Expand Down Expand Up @@ -570,12 +622,13 @@ def build_index(self, index_path: str):
builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs)
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -714,12 +767,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda
# Create metadata file
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -881,14 +935,28 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
)

valid_chunks: list[dict[str, Any]] = []
reserved_ids = set(existing_ids)
next_sequential_id = len(offset_map)
for chunk in self.chunks:
text = chunk.get("text", "")
if not isinstance(text, str) or not text.strip():
continue
metadata = chunk.setdefault("metadata", {})
passage_id = chunk.get("id") or metadata.get("id")
if passage_id and passage_id in existing_ids:
if "id" in metadata and metadata["id"] is not None:
passage_id = str(metadata["id"])
elif self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
passage_id = self._generate_passage_id(text, reserved_ids)
else:
while str(next_sequential_id) in reserved_ids:
next_sequential_id += 1
passage_id = str(next_sequential_id)
next_sequential_id += 1

if passage_id in reserved_ids:
raise ValueError(f"Passage ID '{passage_id}' already exists in the index.")
chunk["id"] = passage_id
metadata["id"] = passage_id
reserved_ids.add(passage_id)
valid_chunks.append(chunk)

if not valid_chunks:
Expand Down Expand Up @@ -924,12 +992,6 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]

# IVF: add_vectors then append passages/offset (no ZMQ/server)
if backend_name == "ivf":
for i, chunk in enumerate(valid_chunks):
pid = chunk.get("id") or chunk.get("metadata", {}).get("id")
if not pid:
pid = str(len(offset_map) + i)
chunk.setdefault("metadata", {})["id"] = pid
chunk["id"] = pid
passage_ids = [c["id"] for c in valid_chunks]
try:
from leann_backend_ivf import add_vectors as ivf_add_vectors
Expand Down Expand Up @@ -1012,17 +1074,16 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
passage_meta_mode = meta.get("embedding_mode", self.embedding_mode)
passage_provider_options = meta.get("embedding_options", self.embedding_options)

base_id = index.ntotal
for offset, chunk in enumerate(valid_chunks):
new_id = str(base_id + offset)
chunk.setdefault("metadata", {})["id"] = new_id
chunk["id"] = new_id

# Append passages/offsets before we attempt index.add so the ZMQ server
# can resolve newly assigned IDs during recompute. Keep rollback hooks
# so we can restore files if the update fails mid-way.
rollback_passages_size = passages_file.stat().st_size if passages_file.exists() else 0
offset_map_backup = offset_map.copy()
idmap_file = (
path.parent
/ f"{path.name[: -len('.leann')] if path.name.endswith('.leann') else path.name}.ids.txt"
)
rollback_idmap_size = idmap_file.stat().st_size if idmap_file.exists() else None

try:
with open(passages_file, "a", encoding="utf-8") as f:
Expand All @@ -1042,6 +1103,9 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]

with open(offset_file, "wb") as f:
pickle.dump(offset_map, f)
with open(idmap_file, "a", encoding="utf-8") as f:
for chunk in valid_chunks:
f.write(str(chunk["id"]) + "\n")

server_manager: Optional[EmbeddingServerManager] = None
server_started = False
Expand Down Expand Up @@ -1097,6 +1161,11 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
offset_map = offset_map_backup
with open(offset_file, "wb") as f:
pickle.dump(offset_map, f)
if rollback_idmap_size is None:
idmap_file.unlink(missing_ok=True)
elif idmap_file.exists():
with open(idmap_file, "rb+") as f:
f.truncate(rollback_idmap_size)
raise

meta["total_passages"] = len(offset_map)
Expand Down Expand Up @@ -1177,6 +1246,12 @@ def __init__(
)
self.bm25_scorer: Optional[BM25Index] = None

# Surface the index's passage ID scheme so callers can introspect.
# Older indexes (pre-#330) don't record this field — they're sequential.
self.passage_id_scheme: str = self.meta_data.get(
"passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL
)

# Optional one-shot warmup at construction time to hide cold-start latency.
if self._warmup:
self.warmup()
Expand Down
55 changes: 50 additions & 5 deletions packages/leann-core/src/leann/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from llama_index.core.node_parser import SentenceSplitter
from tqdm import tqdm

from .api import LeannBuilder, LeannChat, LeannSearcher
from .api import (
PASSAGE_ID_SCHEME_CONTENT_HASH,
PASSAGE_ID_SCHEME_SEQUENTIAL,
LeannBuilder,
LeannChat,
LeannSearcher,
)
from .embedding_server_manager import EmbeddingServerManager
from .interactive_utils import create_cli_session
from .registry import register_project_directory
Expand Down Expand Up @@ -347,6 +353,16 @@ def create_parser(self) -> argparse.ArgumentParser:
default=True,
help="Fall back to traditional chunking if AST chunking fails (default: True)",
)
build_parser.add_argument(
"--id-scheme",
choices=["sequential", "content-hash"],
default="sequential",
help=(
"How passage IDs are assigned. 'sequential' (default) keys by insertion "
"order; 'content-hash' uses sha256(text)[:16], stable across file moves "
"and reorderings. See #329."
),
)

# Watch command
watch_parser = subparsers.add_parser(
Expand Down Expand Up @@ -1880,7 +1896,31 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict
in paths
]

def _existing_index_id_scheme(self, index_path: str) -> Optional[str]:
"""Return the passage_id_scheme recorded in an existing index's meta.json.

Returns None when the index doesn't exist yet. Older indexes pre-#330
have no field and are sequential by definition.
"""
meta_path = Path(index_path).with_suffix(".leann.meta.json")
if not meta_path.exists():
return None
try:
with open(meta_path, encoding="utf-8") as f:
return json.load(f).get("passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL)
except Exception:
return None

def _make_incremental_builder(self, args) -> "LeannBuilder":
# For incremental updates, the existing index's scheme wins. Otherwise
# IDs would mix schemes within one index, which breaks lookups.
existing_scheme = self._existing_index_id_scheme(self.get_index_path(args.index_name))
scheme = existing_scheme or getattr(args, "id_scheme", "sequential")
if existing_scheme and getattr(args, "id_scheme", existing_scheme) != existing_scheme:
print(
f"Note: --id-scheme={args.id_scheme} ignored — index '{args.index_name}' "
f"was built with passage_id_scheme={existing_scheme!r}, keeping that."
)
return LeannBuilder(
backend_name=args.backend_name,
embedding_model=args.embedding_model,
Expand All @@ -1891,6 +1931,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder":
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=scheme,
)

def _incremental_add_only(
Expand All @@ -1904,8 +1945,9 @@ def _incremental_add_only(
new_chunks = self._chunks_for_paths(all_texts, new_paths)
if not new_chunks:
return False
self._assign_chunk_ids(new_chunks)
builder = self._make_incremental_builder(args)
if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH:
self._assign_chunk_ids(new_chunks)
for chunk in new_chunks:
builder.add_text(chunk["text"], metadata=chunk["metadata"])
print(
Expand Down Expand Up @@ -1998,13 +2040,15 @@ def _incremental_ivf_update(
for p in changed_paths:
path_set.update(self._path_lookup_keys(p, sync_roots))
new_chunks = self._chunks_for_paths(all_texts, path_set)
# Use unique IDs: passages can have mixed path formats so we may miss some ids_to_remove
self._assign_unique_chunk_ids(new_chunks)
# Use unique IDs for sequential indexes: passages can have mixed path formats so we may
# miss some ids_to_remove. Content-hash indexes let LeannBuilder derive IDs from text.
builder = self._make_incremental_builder(args)
if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH:
self._assign_unique_chunk_ids(new_chunks)

if not ids_to_remove and not new_chunks:
return False

builder = self._make_incremental_builder(args)
for chunk in new_chunks:
builder.add_text(chunk["text"], metadata=chunk["metadata"])

Expand Down Expand Up @@ -2384,6 +2428,7 @@ async def build_index(self, args):
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=getattr(args, "id_scheme", "sequential"),
)

for chunk in all_texts:
Expand Down
Loading
Loading