Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/superlocalmemory/infra/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
"memory.updated", # Existing memory modified
"memory.deleted", # Memory removed
"memory.recalled", # Memory retrieved by an agent
"memory.observed", # /observe accepted content into the debounce buffer
"memory.captured", # AutoCapture matched a buffered observation
"memory.dropped", # AutoCapture rejected a buffered observation
"memory.queued", # /remember accepted content into pending.db (async)
"graph.updated", # Knowledge graph rebuilt
"pattern.learned", # New pattern detected
"agent.connected", # New agent connects
Expand Down
78 changes: 78 additions & 0 deletions src/superlocalmemory/server/unified_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,33 @@ def recall(self, query: str, limit: int = 10, session_id: str = "") -> dict:
# daemon startup via engine._process_pending_memories().
_engine = None


def _emit_event(
event_type: str,
payload: dict | None = None,
*,
source_agent: str = "http_client",
) -> None:
"""Emit a best-effort EventBus event from an HTTP write path.

Mirrors mcp.shared.emit_event but tags source_protocol="http" so the
dashboard can distinguish HTTP traffic from MCP tool calls. Never raises
— a bus failure must not affect the caller's response.
"""
try:
from superlocalmemory.infra.event_bus import EventBus
from superlocalmemory.server.routes.helpers import DB_PATH
bus = EventBus.get_instance(DB_PATH)
bus.emit(
event_type,
payload=payload,
source_agent=source_agent,
source_protocol="http",
)
except Exception as exc:
logger.debug("EventBus emit failed (%s): %s", event_type, exc)


# v3.4.53: Limit concurrent full (non-fast) recalls. Without this, N parallel
# /recall calls spawn N × 6-channel threads → Ollama serialises, reranker
# lock queues, and total wall time is N × single-recall-time. 3 concurrent
Expand Down Expand Up @@ -240,6 +267,14 @@ def enqueue(self, content: str) -> dict:
self._timer = threading.Timer(self._debounce_sec, self._flush)
self._timer.daemon = True
self._timer.start()
_emit_event(
"memory.observed",
payload={
"content_hash": content_hash,
"content_preview": content[:120],
"buffer_size": buf_size,
},
)
return {"captured": True, "queued": True, "buffer_size": buf_size}

def _flush(self) -> None:
Expand Down Expand Up @@ -268,6 +303,22 @@ def _flush(self) -> None:
# The prior 'processed N' counted skipped (capture=False)
# items as successes — a false-positive write count.
captured_count += 1
_emit_event(
"memory.captured",
payload={
"category": decision.category,
"confidence": getattr(decision, "confidence", None),
"content_preview": content[:120],
},
)
else:
_emit_event(
"memory.dropped",
payload={
"reason": getattr(decision, "reason", "no patterns matched"),
"content_preview": content[:120],
},
)
except Exception as exc:
failed_count += 1
logger.warning(
Expand Down Expand Up @@ -1784,6 +1835,15 @@ async def remember(req: RememberRequest, wait: bool = False):
req.content, metadata=metadata,
scope=scope, shared_with=shared_with,
)
_emit_event(
"memory.stored",
payload={
"fact_ids": list(fact_ids) if fact_ids else [],
"count": len(fact_ids) if fact_ids else 0,
"path": "remember_sync",
"content_preview": req.content[:120],
},
)
return {"ok": True, "fact_ids": fact_ids, "count": len(fact_ids)}
except Exception as exc:
raise HTTPException(500, detail=str(exc))
Expand Down Expand Up @@ -1823,6 +1883,14 @@ async def remember(req: RememberRequest, wait: bool = False):
pending_id = store_pending(
req.content, tags=req.tags or "", metadata=meta,
)
_emit_event(
"memory.queued",
payload={
"pending_id": pending_id,
"tags": req.tags or "",
"content_preview": req.content[:120],
},
)
return {
"ok": True,
"fact_ids": fact_ids,
Expand Down Expand Up @@ -2196,6 +2264,16 @@ def _loop():
)
engine.store_fact_direct(fact)
mark_done(item["id"])
_emit_event(
"memory.stored",
payload={
"pending_id": item["id"],
"memory_id": mem_id,
"path": "materializer_drain",
"content_preview": content[:120],
},
source_agent="materializer",
)
except Exception as exc:
logger.warning(
"Pending %d failed: %s", item["id"], exc,
Expand Down
133 changes: 133 additions & 0 deletions tests/test_server/test_http_eventbus_emissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright (c) 2026 Varun Pratap Bhardwaj / Qualixar
# Licensed under AGPL-3.0-or-later - see LICENSE file
"""Observability: HTTP write paths must emit EventBus events.

Before this change only MCP tool handlers emitted bus events; the HTTP
fast paths (/observe, /remember, AutoCapture pipeline, materializer) were
silent, leaving the dashboard event stream structurally empty. These tests
verify the new emissions without touching SLM core logic or altering any
write behaviour.

All tests mock the EventBus so no database or daemon is required.
"""

from __future__ import annotations

import time
import unittest.mock as mock

from superlocalmemory.infra.event_bus import VALID_EVENT_TYPES
from superlocalmemory.server.unified_daemon import ObserveBuffer, _emit_event


# ---------------------------------------------------------------------------
# VALID_EVENT_TYPES coverage
# ---------------------------------------------------------------------------

def test_new_event_types_registered():
assert "memory.observed" in VALID_EVENT_TYPES
assert "memory.captured" in VALID_EVENT_TYPES
assert "memory.dropped" in VALID_EVENT_TYPES
assert "memory.queued" in VALID_EVENT_TYPES


# ---------------------------------------------------------------------------
# _emit_event helper — never raises, tags source_protocol="http"
# ---------------------------------------------------------------------------

def test_emit_event_never_raises_on_bus_error():
with mock.patch(
"superlocalmemory.infra.event_bus.EventBus.get_instance",
side_effect=RuntimeError("bus down"),
):
_emit_event("memory.observed") # must not propagate


def test_emit_event_uses_http_protocol(tmp_path):
from superlocalmemory.infra.event_bus import EventBus
EventBus.reset_instance(tmp_path / "test.db")
bus = EventBus.get_instance(tmp_path / "test.db")
received = []
bus.subscribe(lambda e: received.append(e))

with mock.patch(
"superlocalmemory.infra.event_bus.EventBus.get_instance",
return_value=bus,
):
_emit_event("memory.observed", payload={"x": 1})

assert received, "no event emitted"
assert received[0]["source_protocol"] == "http"


# ---------------------------------------------------------------------------
# ObserveBuffer.enqueue → memory.observed
# ---------------------------------------------------------------------------

def test_enqueue_emits_memory_observed():
buf = ObserveBuffer(debounce_sec=60) # long debounce — no flush in test
emitted: list[tuple] = []

with mock.patch(
"superlocalmemory.server.unified_daemon._emit_event",
side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)),
):
buf.enqueue("hello world")

assert any(et == "memory.observed" for et, _ in emitted)


def test_enqueue_duplicate_does_not_emit():
buf = ObserveBuffer(debounce_sec=60)
emitted: list[str] = []

with mock.patch(
"superlocalmemory.server.unified_daemon._emit_event",
side_effect=lambda et, **kw: emitted.append(et),
):
buf.enqueue("same content")
buf.enqueue("same content") # duplicate — no event

assert emitted.count("memory.observed") == 1


# ---------------------------------------------------------------------------
# ObserveBuffer._flush → memory.captured / memory.dropped
# ---------------------------------------------------------------------------

def _flush_with_decision(capture: bool) -> list[tuple[str, dict | None]]:
buf = ObserveBuffer(debounce_sec=0.01)
emitted: list[tuple[str, dict | None]] = []

decision = mock.MagicMock()
decision.capture = capture
decision.category = "note"
decision.reason = "test reason"
decision.confidence = 0.9

with mock.patch(
"superlocalmemory.hooks.auto_capture.AutoCapture",
) as MockAC, mock.patch(
"superlocalmemory.server.unified_daemon._emit_event",
side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)),
):
MockAC.return_value.evaluate.return_value = decision
buf.set_engine(object())
buf.enqueue("some content")
time.sleep(0.15)

return emitted


def test_flush_captured_emits_memory_captured():
emitted = _flush_with_decision(capture=True)
event_types = [et for et, _ in emitted]
assert "memory.captured" in event_types
assert "memory.dropped" not in event_types


def test_flush_dropped_emits_memory_dropped():
emitted = _flush_with_decision(capture=False)
event_types = [et for et, _ in emitted]
assert "memory.dropped" in event_types
assert "memory.captured" not in event_types
Loading