Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions turbovec-python/python/turbovec/_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Shared in-batch duplicate resolution for the framework integrations.

Each upstream library resolves a repeated id *within a single write* its own
way, and every turbovec wrapper must match its upstream to stay a true
drop-in:

- LangChain's ``InMemoryVectorStore`` overwrites on a repeated key → KEEP_LAST
- LlamaIndex rejects duplicate ``node_id`` in a batch → REJECT
- agno's LanceDb is append-only and keeps every row → KEEP_ALL
- Haystack exposes a runtime ``DuplicatePolicy`` (FAIL/SKIP/OVERWRITE).
Its resolution is *stateful* (it dedups against the existing store as well
as the batch, with deferred issue-#89 removal), so it does not reduce to
the pure in-batch function here and keeps its own logic; this enum still
documents the mapping (OVERWRITE→KEEP_LAST, SKIP→KEEP_FIRST, FAIL→REJECT).

The shared piece is the in-batch resolution only: given one key per item,
return the indices to keep. Each wrapper still owns its key extraction and
its cross-store upsert/removal.
"""
from __future__ import annotations

import enum
from typing import Hashable, List, Sequence


class DuplicatePolicy(enum.Enum):
"""How to resolve items that share a key within a single batch."""

KEEP_LAST = "keep_last"
"""One item per key; the last occurrence wins (dict-overwrite semantics)."""

KEEP_FIRST = "keep_first"
"""One item per key; the first occurrence wins."""

REJECT = "reject"
"""Raise ``ValueError`` if any key repeats; otherwise keep everything."""

KEEP_ALL = "keep_all"
"""No deduplication; items with duplicate keys all survive."""


def resolve_duplicates(
keys: Sequence[Hashable], policy: DuplicatePolicy
) -> List[int]:
"""Return, in ascending order, the batch indices to keep under ``policy``.

The returned indices index into ``keys`` (and any parallel arrays the
caller holds). For KEEP_ALL and REJECT the result is ``0..len(keys)``;
for KEEP_LAST/KEEP_FIRST it collapses to one index per distinct key.

Raises:
ValueError: under REJECT, if any key occurs more than once.
"""
if policy is DuplicatePolicy.KEEP_ALL:
return list(range(len(keys)))
if policy is DuplicatePolicy.REJECT:
seen: set = set()
for k in keys:
if k in seen:
raise ValueError(f"duplicate id in batch: {k!r}")
seen.add(k)
return list(range(len(keys)))
# KEEP_LAST / KEEP_FIRST collapse to one index per key.
chosen: dict = {}
for i, k in enumerate(keys):
if policy is DuplicatePolicy.KEEP_LAST or k not in chosen:
chosen[k] = i
return sorted(chosen.values())


__all__ = ["DuplicatePolicy", "resolve_duplicates"]
55 changes: 55 additions & 0 deletions turbovec-python/python/turbovec/_persist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Shared persistence consistency checks for the framework integrations.

Each wrapper persists two artifacts: the binary ``.tvim`` index and a JSON
side-car holding the handle -> document/node/text payload maps. At query
time the wrapper resolves an index-returned u64 handle through that side-car
map. If the two files are out of sync — a partial copy, a stale backup, a
hand-edited or tampered side-car — an index handle won't resolve and the
wrapper would raise an opaque ``KeyError`` deep inside a query.

``check_persisted_handles`` turns that into a clean ``ValueError`` at load
time. ``IdMapIndex`` exposes only ``__len__`` and ``contains``; that's
sufficient: if the side-car's handle set and the index have equal size and
every side-car handle is present in the index, the two are a bijection (no
index handle can be missing from the side-car).
"""
from __future__ import annotations

from typing import Iterable


def check_persisted_handles(index, handles: Iterable[int], *, what: str = "entry") -> None:
"""Validate that the side-car's handle set matches the loaded index.

Args:
index: the loaded ``IdMapIndex`` (uses ``len`` and ``contains``).
handles: the u64 handles the side-car maps can resolve.
what: noun for error messages (e.g. "document", "node").

Raises:
ValueError: if the side-car has duplicate handles, a different count
than the index, or a handle the index doesn't contain.
"""
handle_list = [int(h) for h in handles]
n_index = len(index)

if len(set(handle_list)) != len(handle_list):
raise ValueError(
f"persisted store is corrupt: duplicate {what} handles in the side-car"
)
if len(handle_list) != n_index:
raise ValueError(
f"persisted store is inconsistent with its index: side-car has "
f"{len(handle_list)} {what} handle(s) but the index holds {n_index}. "
f"The .tvim index and its JSON side-car are out of sync."
)
for h in handle_list:
if not index.contains(h):
raise ValueError(
f"persisted store is inconsistent with its index: {what} handle "
f"{h} is not present in the index. The .tvim index and its JSON "
f"side-car are out of sync."
)


__all__ = ["check_persisted_handles"]
110 changes: 64 additions & 46 deletions turbovec-python/python/turbovec/agno.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ def __init__(
# freshly-constructed store doesn't "exist" until `create()` is
# called, and `drop()` returns it to that state.
self._index: Optional[IdMapIndex] = None
# str doc_id -> u64 handle
self._str_to_u64: Dict[str, int] = {}
# str doc_id -> set of u64 handles. One-to-many: agno's derived
# doc_id is NOT unique (two documents with identical content, or a
# repeated explicit doc.id within a batch, derive the same id), and
# LanceDb keeps every such row. Mapping one doc_id to a single handle
# silently orphaned the earlier vectors — present in search and the
# index count but unreachable by id, so undeletable (issue #104).
self._str_to_u64: Dict[str, Set[int]] = {}
# u64 handle -> stored payload (mirrors LanceDb's "payload" shape)
self._u64_to_doc: Dict[int, Dict[str, Any]] = {}
# u64 handle assignment counter
Expand Down Expand Up @@ -355,7 +360,7 @@ def insert(
cleaned = doc.content.replace("\x00", "�") if doc.content else ""
doc_id = self._derive_doc_id(doc, content_hash, cleaned)
h = int(handle)
self._str_to_u64[doc_id] = h
self._str_to_u64.setdefault(doc_id, set()).add(h)
self._u64_to_doc[h] = {
"id": doc_id,
"name": doc.name,
Expand Down Expand Up @@ -440,14 +445,24 @@ def _remove_handle(self, handle: int) -> None:
return
self._index.remove(handle)
doc_id = data.get("id")
# Only clear the id->handle mapping if it still points at this
# handle; a re-inserted doc may have repointed it to a new handle.
if doc_id is not None and self._str_to_u64.get(doc_id) == handle:
self._str_to_u64.pop(doc_id, None)
# Drop the name->id link only if no surviving handle keeps that id.
# Drop just this handle from the id's handle set; remove the id
# entirely only once no handle remains under it.
if doc_id is not None:
handles = self._str_to_u64.get(doc_id)
if handles is not None:
handles.discard(handle)
if not handles:
del self._str_to_u64[doc_id]
# Drop the name->id link only if no surviving handle keeps that
# (name, id) pair. The derived doc_id excludes `name`, so two docs
# with different names can share an id — matching on id alone would
# leave a stale name entry when the last handle for this name goes.
name = data.get("name")
if name and name in self._name_to_ids:
if not any(d.get("id") == doc_id for d in self._u64_to_doc.values()):
if not any(
d.get("id") == doc_id and d.get("name") == name
for d in self._u64_to_doc.values()
):
self._name_to_ids[name].discard(doc_id)
if not self._name_to_ids[name]:
del self._name_to_ids[name]
Expand Down Expand Up @@ -615,58 +630,58 @@ def get_supported_search_types(self) -> List[SearchType]:
def delete_by_id(self, id: str) -> bool:
if self._index is None:
return False
handle = self._str_to_u64.pop(id, None)
if handle is None:
handles = self._str_to_u64.get(id)
if not handles:
return False
doc_data = self._u64_to_doc.pop(handle, None)
if doc_data is not None:
name = doc_data.get("name")
if name and name in self._name_to_ids:
self._name_to_ids[name].discard(id)
if not self._name_to_ids[name]:
del self._name_to_ids[name]
self._index.remove(handle)
# Lazily drop content_hash from the set if no surviving doc has it.
if doc_data is not None:
ch = doc_data.get("content_hash")
if ch and not any(
d.get("content_hash") == ch for d in self._u64_to_doc.values()
):
self._content_hashes.discard(ch)
# Remove every vector sharing this id — a non-unique derived doc_id
# can map to several handles. _remove_handle maintains the id, name,
# and content_hash side-indexes per handle.
for handle in list(handles):
self._remove_handle(handle)
return True

def delete_by_name(self, name: str) -> bool:
if self._index is None:
return False
ids = list(self._name_to_ids.get(name, set()))
for doc_id in ids:
self.delete_by_id(doc_id)
return bool(ids)
# Remove exactly the handles whose stored name matches. Delegating to
# delete_by_id would key on the derived doc_id, which excludes `name`,
# so it would also delete a differently-named doc that happens to
# share the id. LanceDb deletes rows matching the predicate directly.
handles = [h for h, d in self._u64_to_doc.items() if d.get("name") == name]
for handle in handles:
self._remove_handle(handle)
return bool(handles)

def delete_by_metadata(self, metadata: Dict[str, Any]) -> bool:
if self._index is None:
return False
items = list(metadata.items())
to_delete = [
data["id"]
for data in self._u64_to_doc.values()
# Remove the matching handles directly (see delete_by_name): the
# derived doc_id ignores metadata, so delete_by_id would over-delete
# distinct docs that collide on the id.
handles = [
h
for h, data in self._u64_to_doc.items()
if all((data.get("meta_data") or {}).get(k) == v for k, v in items)
]
for doc_id in to_delete:
self.delete_by_id(doc_id)
return bool(to_delete)
for handle in handles:
self._remove_handle(handle)
return bool(handles)

def delete_by_content_id(self, content_id: str) -> bool:
if self._index is None:
return False
to_delete = [
data["id"]
for data in self._u64_to_doc.values()
# Remove the matching handles directly (see delete_by_name): the
# derived doc_id ignores content_id, so delete_by_id would over-delete
# distinct docs that collide on the id.
handles = [
h
for h, data in self._u64_to_doc.items()
if data.get("content_id") == content_id
]
for doc_id in to_delete:
self.delete_by_id(doc_id)
return bool(to_delete)
for handle in handles:
self._remove_handle(handle)
return bool(handles)

def update_metadata(self, content_id: str, metadata: Dict[str, Any]) -> None:
"""Merge ``metadata`` into both ``meta_data`` and the ``filters``
Expand Down Expand Up @@ -752,10 +767,13 @@ def _load_from(self, folder: Path) -> None:
self._u64_to_doc = {int(h): d for h, d in state["u64_to_doc"]}
self._next_u64 = int(state["next_u64"])

# Rebuild reverse indexes from the loaded payload.
self._str_to_u64 = {
data["id"]: handle for handle, data in self._u64_to_doc.items()
}
# Rebuild reverse indexes from the loaded payload. doc_id is
# non-unique, so accumulate handles into a set per id rather than a
# dict comprehension (which would drop all but the last handle and
# re-orphan the very vectors issue #104 fixed).
self._str_to_u64 = {}
for handle, data in self._u64_to_doc.items():
self._str_to_u64.setdefault(data["id"], set()).add(handle)
self._content_hashes = set()
self._name_to_ids = {}
for data in self._u64_to_doc.values():
Expand Down
2 changes: 2 additions & 0 deletions turbovec-python/python/turbovec/haystack.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import numpy as np

from ._persist import check_persisted_handles
from ._turbovec import IdMapIndex

try:
Expand Down Expand Up @@ -671,6 +672,7 @@ def load_from_disk(
store._str_to_u64 = {
data["id"]: handle for handle, data in store._u64_to_doc.items()
}
check_persisted_handles(store._index, store._u64_to_doc.keys(), what="document")
return store

# ---- Internals ----------------------------------------------------
Expand Down
7 changes: 5 additions & 2 deletions turbovec-python/python/turbovec/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import numpy as np

from ._dedup import DuplicatePolicy, resolve_duplicates
from ._persist import check_persisted_handles
from ._turbovec import IdMapIndex

try:
Expand Down Expand Up @@ -184,8 +186,8 @@ def _store_texts_and_vectors(
# earlier vectors. The returned id list still mirrors the input
# (one entry per input text), as the reference does.
result_ids = ids
if len(set(ids)) != len(ids):
keep = sorted({id_: i for i, id_ in enumerate(ids)}.values())
keep = resolve_duplicates(ids, DuplicatePolicy.KEEP_LAST)
if len(keep) != len(ids):
ids = [ids[i] for i in keep]
texts_list = [texts_list[i] for i in keep]
metadatas = [metadatas[i] for i in keep]
Expand Down Expand Up @@ -542,6 +544,7 @@ def load(
# JSON object keys are strings; the str_to_u64 values are already
# ints in the payload, just need to confirm.
str_to_u64 = {sid: int(h) for sid, h in state["str_to_u64"].items()}
check_persisted_handles(index, str_to_u64.values(), what="document")
return cls(
embedding=embedding,
index=index,
Expand Down
21 changes: 13 additions & 8 deletions turbovec-python/python/turbovec/llama_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import numpy as np

from ._dedup import DuplicatePolicy, resolve_duplicates
from ._persist import check_persisted_handles
from ._turbovec import IdMapIndex

try:
Expand Down Expand Up @@ -143,14 +145,16 @@ def add(self, nodes: list[BaseNode], **_: Any) -> list[str]:
# `query` later resolves through the duplicate node_id, returning
# the second node's payload attached to the first node's vector.
# Caller's job to deduplicate before calling add.
seen: set[str] = set()
for n in nodes:
if n.node_id in seen:
raise ValueError(
f"duplicate node_id {n.node_id!r} appears multiple times "
"in the input batch; deduplicate before calling add()"
)
seen.add(n.node_id)
node_ids = [n.node_id for n in nodes]
try:
resolve_duplicates(node_ids, DuplicatePolicy.REJECT)
except ValueError:
seen: set[str] = set()
dup = next(nid for nid in node_ids if nid in seen or seen.add(nid))
raise ValueError(
f"duplicate node_id {dup!r} appears multiple times "
"in the input batch; deduplicate before calling add()"
) from None

embeddings = [node.get_embedding() for node in nodes]
vectors = np.asarray(embeddings, dtype=np.float32)
Expand Down Expand Up @@ -648,6 +652,7 @@ def from_persist_path(
store._node_id_to_u64 = {nid: int(h) for nid, h in state["node_id_to_u64"]}
store._u64_to_node_id = {h: nid for nid, h in store._node_id_to_u64.items()}
store._next_u64 = int(state["next_u64"])
check_persisted_handles(index, store._u64_to_node_id.keys(), what="node")
return store

@classmethod
Expand Down
Loading
Loading