Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ wheels/
# Node
node_modules/

# Environment variables
# Environment variables and local config
.env
docker-compose.yml
docker-compose.override.yml

# IDE
.idea/
Expand Down
243 changes: 205 additions & 38 deletions hindsight-api/hindsight_api/api/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
)
logger = logging.getLogger(__name__)

# Context variable to hold the current bank_id from the URL path
# Default bank_id from environment variable
DEFAULT_BANK_ID = os.environ.get("HINDSIGHT_MCP_BANK_ID", "default")

# Context variable to hold the current bank_id
_current_bank_id: ContextVar[str | None] = ContextVar("current_bank_id", default=None)


def get_current_bank_id() -> str | None:
"""Get the current bank_id from context (set from URL path)."""
"""Get the current bank_id from context."""
return _current_bank_id.get()


Expand All @@ -44,12 +47,13 @@ def create_mcp_server(memory: MemoryEngine) -> FastMCP:
memory: MemoryEngine instance (required)

Returns:
Configured FastMCP server instance
Configured FastMCP server instance with stateless_http enabled
"""
mcp = FastMCP("hindsight-mcp-server")
# Use stateless_http=True for Claude Code compatibility
mcp = FastMCP("hindsight-mcp-server", stateless_http=True)

@mcp.tool()
async def retain(content: str, context: str = "general") -> str:
async def retain(content: str, context: str = "general", bank_id: str | None = None) -> str:
"""
Store important information to long-term memory.

Expand All @@ -65,21 +69,22 @@ async def retain(content: str, context: str = "general") -> str:
Args:
content: The fact/memory to store (be specific and include relevant details)
context: Category for the memory (e.g., 'preferences', 'work', 'hobbies', 'family'). Default: 'general'
bank_id: Optional bank to store in (defaults to session bank). Use for cross-bank operations.
"""
try:
bank_id = get_current_bank_id()
if bank_id is None:
target_bank = bank_id or get_current_bank_id()
if target_bank is None:
return "Error: No bank_id configured"
await memory.retain_batch_async(
bank_id=bank_id, contents=[{"content": content, "context": context}], request_context=RequestContext()
bank_id=target_bank, contents=[{"content": content, "context": context}], request_context=RequestContext()
)
return "Memory stored successfully"
return f"Memory stored successfully in bank '{target_bank}'"
except Exception as e:
logger.error(f"Error storing memory: {e}", exc_info=True)
return f"Error: {str(e)}"

@mcp.tool()
async def recall(query: str, max_results: int = 10) -> str:
async def recall(query: str, max_results: int = 10, bank_id: str | None = None) -> str:
"""
Search memories to provide personalized, context-aware responses.

Expand All @@ -92,15 +97,16 @@ async def recall(query: str, max_results: int = 10) -> str:
Args:
query: Natural language search query (e.g., "user's food preferences", "what projects is user working on")
max_results: Maximum number of results to return (default: 10)
bank_id: Optional bank to search in (defaults to session bank). Use for cross-bank operations.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is clear enough for the agent to understand what do to with it. have you tried it out already?

initalliy this was the only implementation but I had issues letting Claude to understand the bank_id param and what to use exactly. and when it gets confused by a tool usually it just ignores it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right it should include a better tool description. Let me work on that. It works for me locally because I created a skill that uses Hindsight and it describes how to use it.

"""
try:
bank_id = get_current_bank_id()
if bank_id is None:
target_bank = bank_id or get_current_bank_id()
if target_bank is None:
return "Error: No bank_id configured"
from hindsight_api.engine.memory_engine import Budget

search_result = await memory.recall_async(
bank_id=bank_id,
bank_id=target_bank,
query=query,
fact_type=list(VALID_RECALL_FACT_TYPES),
budget=Budget.LOW,
Expand All @@ -118,22 +124,179 @@ async def recall(query: str, max_results: int = 10) -> str:
for fact in search_result.results[:max_results]
]

return json.dumps({"results": results}, indent=2)
return json.dumps({"results": results, "bank_id": target_bank}, indent=2)
except Exception as e:
logger.error(f"Error searching: {e}", exc_info=True)
return json.dumps({"error": str(e), "results": []})

@mcp.tool()
async def reflect(query: str, context: str | None = None, budget: str = "low", bank_id: str | None = None) -> str:
"""
Generate thoughtful analysis by synthesizing stored memories with the bank's personality.

WHEN TO USE THIS TOOL:
Use reflect when you need reasoned analysis, not just fact retrieval. This tool
thinks through the question using everything the bank knows and its personality traits.

EXAMPLES OF GOOD QUERIES:
- "What patterns have emerged in how I approach debugging?"
- "Based on my past decisions, what architectural style do I prefer?"
- "What might be the best approach for this problem given what you know about me?"
- "How should I prioritize these tasks based on my goals?"

HOW IT DIFFERS FROM RECALL:
- recall: Returns raw facts matching your search (fast lookup)
- reflect: Reasons across memories to form a synthesized answer (deeper analysis)

Use recall for "what did I say about X?" and reflect for "what should I do about X?"

Args:
query: The question or topic to reflect on
context: Optional context about why this reflection is needed
budget: Search budget - 'low', 'mid', or 'high' (default: 'low')
bank_id: Optional bank to reflect in (defaults to session bank). Use for cross-bank operations.
"""
try:
target_bank = bank_id or get_current_bank_id()
if target_bank is None:
return "Error: No bank_id configured"
from hindsight_api.engine.memory_engine import Budget

# Map string budget to enum
budget_map = {"low": Budget.LOW, "mid": Budget.MID, "high": Budget.HIGH}
budget_enum = budget_map.get(budget.lower(), Budget.LOW)

reflect_result = await memory.reflect_async(
bank_id=target_bank,
query=query,
budget=budget_enum,
context=context,
request_context=RequestContext(),
)

# Return the reflection text and optionally facts used
result = {
"text": reflect_result.text,
"bank_id": target_bank,
"based_on": {
fact_type: [
{"id": f.id, "text": f.text, "context": f.context}
for f in facts
]
for fact_type, facts in (reflect_result.based_on or {}).items()
if facts
},
}

# Include new opinions if any were formed
if reflect_result.new_opinions:
result["new_opinions"] = [
{"text": op.text, "confidence": op.confidence}
for op in reflect_result.new_opinions
]

return json.dumps(result, indent=2)
except Exception as e:
logger.error(f"Error reflecting: {e}", exc_info=True)
return json.dumps({"error": str(e), "text": ""})

@mcp.tool()
async def list_banks() -> str:
"""
List all available memory banks.

Use this to discover banks for orchestration or to find
the correct bank_id for cross-bank operations.

Returns:
JSON list of banks with id, name, and creation date
"""
try:
banks = await memory.list_banks(request_context=RequestContext())
return json.dumps({
"banks": [
{
"id": b["bank_id"],
"name": b.get("name"),
"created_at": str(b.get("created_at")) if b.get("created_at") else None
}
for b in banks
]
}, indent=2)
except Exception as e:
logger.error(f"Error listing banks: {e}", exc_info=True)
return json.dumps({"error": str(e), "banks": []})

@mcp.tool()
async def create_bank(bank_id: str, name: str | None = None, background: str | None = None) -> str:
"""
Create or update a memory bank.

Use this to create new banks for different agents, sessions, or purposes.
Banks are isolated memory stores - each bank has its own memories and personality.

Args:
bank_id: Unique identifier for the bank (e.g., 'orchestrator-memory', 'agent-1')
name: Human-readable name for the bank
background: Context about what this bank stores or its purpose
"""
try:
# Get or create the bank profile (auto-creates with defaults)
await memory.get_bank_profile(bank_id, request_context=RequestContext())

# Update name and/or background if provided
if name is not None or background is not None:
await memory.update_bank_info(
bank_id,
name=name,
background=background,
request_context=RequestContext()
)

# Get final profile
profile = await memory.get_bank_profile(bank_id, request_context=RequestContext())
return json.dumps({
"success": True,
"bank_id": bank_id,
"name": profile.get("name"),
"background": profile.get("background"),
"disposition": profile.get("disposition").model_dump() if hasattr(profile.get("disposition"), "model_dump") else dict(profile.get("disposition", {}))
}, indent=2)
except Exception as e:
logger.error(f"Error creating bank: {e}", exc_info=True)
return json.dumps({"error": str(e), "success": False})

return mcp


class MCPMiddleware:
"""ASGI middleware that extracts bank_id from path and sets context."""
"""ASGI middleware that extracts bank_id from header or path and sets context.

Bank ID can be provided via:
1. X-Bank-Id header (recommended for Claude Code)
2. URL path: /mcp/{bank_id}/
3. Environment variable HINDSIGHT_MCP_BANK_ID (fallback default)

For Claude Code, configure with:
claude mcp add --transport http hindsight http://localhost:8888/mcp \\
--header "X-Bank-Id: my-bank"
"""

def __init__(self, app, memory: MemoryEngine):
self.app = app
self.memory = memory
self.mcp_server = create_mcp_server(memory)
self.mcp_app = self.mcp_server.http_app()
# Expose the lifespan for the parent app to chain
self.lifespan = self.mcp_app.lifespan_handler if hasattr(self.mcp_app, 'lifespan_handler') else None

def _get_header(self, scope: dict, name: str) -> str | None:
"""Extract a header value from ASGI scope."""
name_lower = name.lower().encode()
for header_name, header_value in scope.get("headers", []):
if header_name.lower() == name_lower:
return header_value.decode()
return None

async def __call__(self, scope, receive, send):
if scope["type"] != "http":
Expand All @@ -145,37 +308,40 @@ async def __call__(self, scope, receive, send):
# Strip any mount prefix (e.g., /mcp) that FastAPI might not have stripped
root_path = scope.get("root_path", "")
if root_path and path.startswith(root_path):
path = path[len(root_path) :] or "/"
path = path[len(root_path):] or "/"

# Also handle case where mount path wasn't stripped (e.g., /mcp/...)
if path.startswith("/mcp/"):
path = path[4:] # Remove /mcp prefix

# Extract bank_id from path: /{bank_id}/ or /{bank_id}
# http_app expects requests at /
if not path.startswith("/") or len(path) <= 1:
# No bank_id in path - return error
await self._send_error(send, 400, "bank_id required in path: /mcp/{bank_id}/")
return

# Extract bank_id from first path segment
parts = path[1:].split("/", 1)
if not parts[0]:
await self._send_error(send, 400, "bank_id required in path: /mcp/{bank_id}/")
return

bank_id = parts[0]
new_path = "/" + parts[1] if len(parts) > 1 else "/"
elif path == "/mcp":
path = "/"

# Try to get bank_id from header first (for Claude Code compatibility)
bank_id = self._get_header(scope, "X-Bank-Id")

# If no header, try to extract from path: /{bank_id}/...
new_path = path
if not bank_id and path.startswith("/") and len(path) > 1:
parts = path[1:].split("/", 1)
if parts[0] and parts[0] != "mcp":
# First segment looks like a bank_id
bank_id = parts[0]
new_path = "/" + parts[1] if len(parts) > 1 else "/"

# Fall back to default bank_id
if not bank_id:
bank_id = DEFAULT_BANK_ID
logger.debug(f"Using default bank_id: {bank_id}")

# Set bank_id context
token = _current_bank_id.set(bank_id)
try:
new_scope = scope.copy()
new_scope["path"] = new_path
# Clear root_path since we're passing directly to the app
new_scope["root_path"] = ""

# Wrap send to rewrite the SSE endpoint URL to include bank_id
# The SSE app sends "event: endpoint\ndata: /messages\n" but we need
# the client to POST to /{bank_id}/messages instead
# Wrap send to rewrite the SSE endpoint URL to include bank_id if using path-based routing
async def send_wrapper(message):
if message["type"] == "http.response.body":
body = message.get("body", b"")
Expand Down Expand Up @@ -211,9 +377,10 @@ def create_mcp_app(memory: MemoryEngine):
"""
Create an ASGI app that handles MCP requests.

URL pattern: /mcp/{bank_id}/

The bank_id is extracted from the URL path and made available to tools.
Bank ID can be provided via:
1. X-Bank-Id header: claude mcp add --transport http hindsight http://localhost:8888/mcp --header "X-Bank-Id: my-bank"
2. URL path: /mcp/{bank_id}/
3. Environment variable HINDSIGHT_MCP_BANK_ID (fallback, default: "default")

Args:
memory: MemoryEngine instance
Expand Down
Loading