Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mempalace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The Python package that powers MemPalace. All modules, all logic.

| Module | What it does |
|--------|-------------|
| `chromadb_utils.py` | Safe batched reads from ChromaDB — prevents silent truncation on large palaces |
| `cli.py` | CLI entry point — routes to mine, search, init, compress, wake-up |
| `config.py` | Configuration loading — `~/.mempalace/config.json`, env vars, defaults |
| `normalize.py` | Converts 5 chat formats (Claude Code JSONL, Claude.ai JSON, ChatGPT JSON, Slack JSON, plain text) to standard transcript format |
Expand Down
55 changes: 55 additions & 0 deletions mempalace/chromadb_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Utilities for safe ChromaDB collection reads.

ChromaDB's ``col.get()`` without an explicit ``limit`` applies a small
internal default that silently truncates results on large palaces.
Even with a limit, very large values can exceed SQLite's ~999 variable
cap. The helper below reads in batches so every caller gets complete
results regardless of palace size.
"""

_BATCH_SIZE = 5000


def get_all(col, *, include=None, where=None, batch_size=_BATCH_SIZE):
"""Read **all** records from a ChromaDB collection in safe batches.

Args:
col: A ChromaDB collection object.
include: List of fields to include (e.g. ``["metadatas"]``).
where: Optional ChromaDB ``where`` filter dict.
batch_size: Records per batch (default 5 000).

Returns:
A merged result dict with the same shape as ``col.get()``
(keys: ``ids``, and whichever extras were requested via *include*).
"""
total = col.count()
if total == 0:
result = {"ids": []}
for field in include or []:
result[field] = []
return result

all_ids = []
all_fields = {field: [] for field in (include or [])}

offset = 0
while offset < total:
kwargs = {"limit": batch_size, "offset": offset, "include": include or []}
if where:
kwargs["where"] = where
batch = col.get(**kwargs)

all_ids.extend(batch["ids"])
for field in include or []:
all_fields[field].extend(batch.get(field, []))

# Guard against empty batches (e.g. filtered where returns nothing)
if not batch["ids"]:
break
offset += len(batch["ids"])

result = {"ids": all_ids}
result.update(all_fields)
return result
4 changes: 2 additions & 2 deletions mempalace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def palace_path(self):
"""Path to the memory palace data directory."""
env_val = os.environ.get("MEMPALACE_PALACE_PATH") or os.environ.get("MEMPAL_PALACE_PATH")
if env_val:
return env_val
return self._file_config.get("palace_path", DEFAULT_PALACE_PATH)
return os.path.expanduser(env_val)
return os.path.expanduser(self._file_config.get("palace_path", DEFAULT_PALACE_PATH))

@property
def collection_name(self):
Expand Down
6 changes: 3 additions & 3 deletions mempalace/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,10 +999,10 @@ def tool_diary_read(agent_name: str, last_n: int = 10):
return _no_palace()

try:
results = col.get(
where={"$and": [{"wing": wing}, {"room": "diary"}]},
results = get_all(
col,
include=["documents", "metadatas"],
limit=10000,
where={"$and": [{"wing": wing}, {"room": "diary"}]},
)

if not results["ids"]:
Expand Down
4 changes: 2 additions & 2 deletions mempalace/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime
from collections import defaultdict

from .chromadb_utils import get_all
from .palace import (
NORMALIZE_VERSION,
SKIP_DIRS,
Expand Down Expand Up @@ -838,8 +839,7 @@ def status(palace_path: str):
return

# Count by wing and room
total = col.count()
r = col.get(limit=total, include=["metadatas"]) if total else {"metadatas": []}
r = get_all(col, include=["metadatas"])
metas = r["metadatas"]

wing_rooms = defaultdict(lambda: defaultdict(int))
Expand Down
6 changes: 3 additions & 3 deletions tests/benchmarks/test_layers_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ def test_token_budget(self, n_drawers, tmp_path):
record_metric("wakeup_budget", f"tokens_at_{n_drawers}", token_estimate)
record_metric("wakeup_budget", f"chars_at_{n_drawers}", len(text))

assert (
token_estimate < 1200
), f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers"
assert token_estimate < 1200, (
f"Wake-up exceeded budget: ~{token_estimate} tokens at {n_drawers} drawers"
)


@pytest.mark.benchmark
Expand Down
132 changes: 132 additions & 0 deletions tests/test_chromadb_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Tests for chromadb_utils.get_all — batched reads from ChromaDB."""

import shutil
import tempfile

import chromadb

from mempalace.chromadb_utils import get_all


def _create_palace(n_drawers, n_wings=1):
"""Create a temp palace with *n_drawers* spread across *n_wings* wings.

Returns (palace_path, collection).
"""
palace_path = tempfile.mkdtemp()
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")

wing_names = [f"wing_{i}" for i in range(n_wings)]
room_names = ["src", "docs", "tests"]

ids = []
docs = []
metas = []
for i in range(n_drawers):
ids.append(f"drawer_{i}")
docs.append(f"content for drawer {i}")
metas.append(
{
"wing": wing_names[i % n_wings],
"room": room_names[i % len(room_names)],
"source_file": f"file_{i}.py",
}
)

# ChromaDB add has its own batch limits, so insert in chunks
batch_size = 5000
for start in range(0, len(ids), batch_size):
end = start + batch_size
col.add(
ids=ids[start:end],
documents=docs[start:end],
metadatas=metas[start:end],
)

return palace_path, col


def test_get_all_returns_all_metadata():
"""get_all must return every drawer's metadata, not just a default subset."""
palace_path, col = _create_palace(50)
try:
results = get_all(col, include=["metadatas"])
assert len(results["ids"]) == 50
assert len(results["metadatas"]) == 50
finally:
shutil.rmtree(palace_path)


def test_get_all_returns_documents_and_metadatas():
"""get_all should return multiple include fields correctly."""
palace_path, col = _create_palace(20)
try:
results = get_all(col, include=["documents", "metadatas"])
assert len(results["ids"]) == 20
assert len(results["documents"]) == 20
assert len(results["metadatas"]) == 20
assert "content for drawer 0" in results["documents"][0]
finally:
shutil.rmtree(palace_path)


def test_get_all_with_where_filter():
"""get_all should respect where filters and only return matching drawers."""
palace_path, col = _create_palace(30, n_wings=3)
try:
results = get_all(col, include=["metadatas"], where={"wing": "wing_0"})
assert len(results["ids"]) == 10
for m in results["metadatas"]:
assert m["wing"] == "wing_0"
finally:
shutil.rmtree(palace_path)


def test_get_all_on_empty_collection():
"""get_all on an empty collection should return empty lists, not error."""
palace_path = tempfile.mkdtemp()
try:
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")
results = get_all(col, include=["metadatas"])
assert results["ids"] == []
assert results["metadatas"] == []
finally:
shutil.rmtree(palace_path)


def test_get_all_batches_large_collections():
"""get_all with a small batch_size must still return all drawers."""
palace_path, col = _create_palace(100)
try:
# Force tiny batches to exercise the pagination loop
results = get_all(col, include=["metadatas"], batch_size=7)
assert len(results["ids"]) == 100
assert len(results["metadatas"]) == 100
finally:
shutil.rmtree(palace_path)


def test_get_all_no_duplicate_ids():
"""Batched reads must not produce duplicate drawer IDs."""
palace_path, col = _create_palace(50)
try:
results = get_all(col, include=["metadatas"], batch_size=13)
assert len(results["ids"]) == len(set(results["ids"]))
finally:
shutil.rmtree(palace_path)


def test_get_all_filtered_pagination():
"""get_all with a where filter and small batch_size must return all matching drawers."""
palace_path, col = _create_palace(60, n_wings=3)
try:
# 60 drawers across 3 wings → 20 per wing; batch_size=7 forces multiple pages
results = get_all(col, include=["metadatas"], where={"wing": "wing_0"}, batch_size=7)
assert len(results["ids"]) == 20
for m in results["metadatas"]:
assert m["wing"] == "wing_0"
assert len(results["ids"]) == len(set(results["ids"]))
finally:
shutil.rmtree(palace_path)
19 changes: 19 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ def test_env_override():
del os.environ["MEMPALACE_PALACE_PATH"]


def test_config_file_tilde_is_expanded():
tmpdir = tempfile.mkdtemp()
with open(os.path.join(tmpdir, "config.json"), "w") as f:
json.dump({"palace_path": "~/.mempalace/palace"}, f)
cfg = MempalaceConfig(config_dir=tmpdir)
assert not cfg.palace_path.startswith("~")
assert cfg.palace_path == os.path.expanduser("~/.mempalace/palace")


def test_env_var_tilde_is_expanded():
os.environ["MEMPALACE_PALACE_PATH"] = "~/custom/palace"
try:
cfg = MempalaceConfig(config_dir=tempfile.mkdtemp())
assert not cfg.palace_path.startswith("~")
assert cfg.palace_path == os.path.expanduser("~/custom/palace")
finally:
del os.environ["MEMPALACE_PALACE_PATH"]


def test_init():
tmpdir = tempfile.mkdtemp()
cfg = MempalaceConfig(config_dir=tmpdir)
Expand Down
94 changes: 93 additions & 1 deletion tests/test_layers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
"""Tests for mempalace.layers — Layer0, Layer1, Layer2, Layer3, MemoryStack."""

import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

import chromadb
import yaml

from mempalace.layers import Layer0, Layer1, Layer2, Layer3, MemoryStack


Expand Down Expand Up @@ -127,7 +133,7 @@ def test_layer1_empty_palace():
assert "No memories" in result


def test_layer1_with_wing_filter():
def test_layer1_with_wing_filter_mocked():
docs = ["Memory about project X"]
metas = [{"room": "general", "source_file": "x.txt", "importance": 3}]
mock_col = _mock_chromadb_for_layer(docs, metas)
Expand Down Expand Up @@ -655,3 +661,89 @@ def test_memory_stack_status_with_palace(tmp_path):

assert result["total_drawers"] == 42
assert result["L0_identity"]["exists"] is True


# ── Integration tests — real ChromaDB ───────────────────────────────────


def _write_file(path, content):
Path(path).parent.mkdir(parents=True, exist_ok=True)
Path(path).write_text(content, encoding="utf-8")


def test_layer1_returns_content_from_all_rooms():
"""Layer1 should pull drawers from every room in the palace."""
from mempalace.miner import mine

tmpdir = tempfile.mkdtemp()
try:
project = Path(tmpdir) / "project"
project.mkdir()

# Create files in two directories to trigger two rooms
_write_file(
project / "backend" / "api.py",
"def handle_request():\n return 'ok'\n" * 20,
)
_write_file(
project / "docs" / "guide.md",
"# User Guide\nThis is the documentation.\n" * 20,
)

with open(project / "mempalace.yaml", "w") as f:
yaml.dump(
{
"wing": "test_wing",
"rooms": [
{"name": "backend", "description": "Backend API code"},
{"name": "docs", "description": "Documentation"},
],
},
f,
)

palace_path = str(Path(tmpdir) / "palace")
mine(str(project), palace_path)

layer1 = Layer1(palace_path=palace_path)
output = layer1.generate()

assert "L1" in output
assert output != "## L1 — No drawers found."
assert output != "## L1 — No memories yet."
# Verify content from both rooms is represented
output_lower = output.lower()
assert "backend" in output_lower or "api" in output_lower, "backend room content missing"
assert "docs" in output_lower or "guide" in output_lower, "docs room content missing"
finally:
shutil.rmtree(tmpdir)


def test_layer1_with_wing_filter():
"""Layer1 with a wing filter should only return drawers from that wing."""
tmpdir = tempfile.mkdtemp()
try:
palace_path = str(Path(tmpdir) / "palace")
client = chromadb.PersistentClient(path=palace_path)
col = client.get_or_create_collection("mempalace_drawers")

# Insert drawers across two wings
ids = []
docs = []
metas = []
for i in range(20):
wing = "alpha" if i < 10 else "beta"
ids.append(f"d_{i}")
docs.append(f"content from {wing} drawer {i}")
metas.append({"wing": wing, "room": "general", "source_file": f"f{i}.py"})
col.add(ids=ids, documents=docs, metadatas=metas)

layer1 = Layer1(palace_path=palace_path, wing="alpha")
output = layer1.generate()

assert "L1" in output
assert output != "## L1 — No memories yet."
# Verify the filter excluded "beta" wing content
assert "content from beta" not in output
finally:
shutil.rmtree(tmpdir)
Loading