diff --git a/src/superlocalmemory/infra/event_bus.py b/src/superlocalmemory/infra/event_bus.py index 0ddb09f..3656978 100644 --- a/src/superlocalmemory/infra/event_bus.py +++ b/src/superlocalmemory/infra/event_bus.py @@ -32,6 +32,10 @@ "memory.updated", # Existing memory modified "memory.deleted", # Memory removed "memory.recalled", # Memory retrieved by an agent + "memory.observed", # /observe accepted content into the debounce buffer + "memory.captured", # AutoCapture matched a buffered observation + "memory.dropped", # AutoCapture rejected a buffered observation + "memory.queued", # /remember accepted content into pending.db (async) "graph.updated", # Knowledge graph rebuilt "pattern.learned", # New pattern detected "agent.connected", # New agent connects diff --git a/src/superlocalmemory/server/unified_daemon.py b/src/superlocalmemory/server/unified_daemon.py index 1f0a751..17b6440 100644 --- a/src/superlocalmemory/server/unified_daemon.py +++ b/src/superlocalmemory/server/unified_daemon.py @@ -171,6 +171,33 @@ def recall(self, query: str, limit: int = 10, session_id: str = "") -> dict: # daemon startup via engine._process_pending_memories(). _engine = None + +def _emit_event( + event_type: str, + payload: dict | None = None, + *, + source_agent: str = "http_client", +) -> None: + """Emit a best-effort EventBus event from an HTTP write path. + + Mirrors mcp.shared.emit_event but tags source_protocol="http" so the + dashboard can distinguish HTTP traffic from MCP tool calls. Never raises + — a bus failure must not affect the caller's response. + """ + try: + from superlocalmemory.infra.event_bus import EventBus + from superlocalmemory.server.routes.helpers import DB_PATH + bus = EventBus.get_instance(DB_PATH) + bus.emit( + event_type, + payload=payload, + source_agent=source_agent, + source_protocol="http", + ) + except Exception as exc: + logger.debug("EventBus emit failed (%s): %s", event_type, exc) + + # v3.4.53: Limit concurrent full (non-fast) recalls. Without this, N parallel # /recall calls spawn N × 6-channel threads → Ollama serialises, reranker # lock queues, and total wall time is N × single-recall-time. 3 concurrent @@ -240,6 +267,14 @@ def enqueue(self, content: str) -> dict: self._timer = threading.Timer(self._debounce_sec, self._flush) self._timer.daemon = True self._timer.start() + _emit_event( + "memory.observed", + payload={ + "content_hash": content_hash, + "content_preview": content[:120], + "buffer_size": buf_size, + }, + ) return {"captured": True, "queued": True, "buffer_size": buf_size} def _flush(self) -> None: @@ -268,6 +303,22 @@ def _flush(self) -> None: # The prior 'processed N' counted skipped (capture=False) # items as successes — a false-positive write count. captured_count += 1 + _emit_event( + "memory.captured", + payload={ + "category": decision.category, + "confidence": getattr(decision, "confidence", None), + "content_preview": content[:120], + }, + ) + else: + _emit_event( + "memory.dropped", + payload={ + "reason": getattr(decision, "reason", "no patterns matched"), + "content_preview": content[:120], + }, + ) except Exception as exc: failed_count += 1 logger.warning( @@ -1784,6 +1835,15 @@ async def remember(req: RememberRequest, wait: bool = False): req.content, metadata=metadata, scope=scope, shared_with=shared_with, ) + _emit_event( + "memory.stored", + payload={ + "fact_ids": list(fact_ids) if fact_ids else [], + "count": len(fact_ids) if fact_ids else 0, + "path": "remember_sync", + "content_preview": req.content[:120], + }, + ) return {"ok": True, "fact_ids": fact_ids, "count": len(fact_ids)} except Exception as exc: raise HTTPException(500, detail=str(exc)) @@ -1823,6 +1883,14 @@ async def remember(req: RememberRequest, wait: bool = False): pending_id = store_pending( req.content, tags=req.tags or "", metadata=meta, ) + _emit_event( + "memory.queued", + payload={ + "pending_id": pending_id, + "tags": req.tags or "", + "content_preview": req.content[:120], + }, + ) return { "ok": True, "fact_ids": fact_ids, @@ -2196,6 +2264,16 @@ def _loop(): ) engine.store_fact_direct(fact) mark_done(item["id"]) + _emit_event( + "memory.stored", + payload={ + "pending_id": item["id"], + "memory_id": mem_id, + "path": "materializer_drain", + "content_preview": content[:120], + }, + source_agent="materializer", + ) except Exception as exc: logger.warning( "Pending %d failed: %s", item["id"], exc, diff --git a/tests/test_server/test_http_eventbus_emissions.py b/tests/test_server/test_http_eventbus_emissions.py new file mode 100644 index 0000000..7d39baa --- /dev/null +++ b/tests/test_server/test_http_eventbus_emissions.py @@ -0,0 +1,133 @@ +# Copyright (c) 2026 Varun Pratap Bhardwaj / Qualixar +# Licensed under AGPL-3.0-or-later - see LICENSE file +"""Observability: HTTP write paths must emit EventBus events. + +Before this change only MCP tool handlers emitted bus events; the HTTP +fast paths (/observe, /remember, AutoCapture pipeline, materializer) were +silent, leaving the dashboard event stream structurally empty. These tests +verify the new emissions without touching SLM core logic or altering any +write behaviour. + +All tests mock the EventBus so no database or daemon is required. +""" + +from __future__ import annotations + +import time +import unittest.mock as mock + +from superlocalmemory.infra.event_bus import VALID_EVENT_TYPES +from superlocalmemory.server.unified_daemon import ObserveBuffer, _emit_event + + +# --------------------------------------------------------------------------- +# VALID_EVENT_TYPES coverage +# --------------------------------------------------------------------------- + +def test_new_event_types_registered(): + assert "memory.observed" in VALID_EVENT_TYPES + assert "memory.captured" in VALID_EVENT_TYPES + assert "memory.dropped" in VALID_EVENT_TYPES + assert "memory.queued" in VALID_EVENT_TYPES + + +# --------------------------------------------------------------------------- +# _emit_event helper — never raises, tags source_protocol="http" +# --------------------------------------------------------------------------- + +def test_emit_event_never_raises_on_bus_error(): + with mock.patch( + "superlocalmemory.infra.event_bus.EventBus.get_instance", + side_effect=RuntimeError("bus down"), + ): + _emit_event("memory.observed") # must not propagate + + +def test_emit_event_uses_http_protocol(tmp_path): + from superlocalmemory.infra.event_bus import EventBus + EventBus.reset_instance(tmp_path / "test.db") + bus = EventBus.get_instance(tmp_path / "test.db") + received = [] + bus.subscribe(lambda e: received.append(e)) + + with mock.patch( + "superlocalmemory.infra.event_bus.EventBus.get_instance", + return_value=bus, + ): + _emit_event("memory.observed", payload={"x": 1}) + + assert received, "no event emitted" + assert received[0]["source_protocol"] == "http" + + +# --------------------------------------------------------------------------- +# ObserveBuffer.enqueue → memory.observed +# --------------------------------------------------------------------------- + +def test_enqueue_emits_memory_observed(): + buf = ObserveBuffer(debounce_sec=60) # long debounce — no flush in test + emitted: list[tuple] = [] + + with mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)), + ): + buf.enqueue("hello world") + + assert any(et == "memory.observed" for et, _ in emitted) + + +def test_enqueue_duplicate_does_not_emit(): + buf = ObserveBuffer(debounce_sec=60) + emitted: list[str] = [] + + with mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, **kw: emitted.append(et), + ): + buf.enqueue("same content") + buf.enqueue("same content") # duplicate — no event + + assert emitted.count("memory.observed") == 1 + + +# --------------------------------------------------------------------------- +# ObserveBuffer._flush → memory.captured / memory.dropped +# --------------------------------------------------------------------------- + +def _flush_with_decision(capture: bool) -> list[tuple[str, dict | None]]: + buf = ObserveBuffer(debounce_sec=0.01) + emitted: list[tuple[str, dict | None]] = [] + + decision = mock.MagicMock() + decision.capture = capture + decision.category = "note" + decision.reason = "test reason" + decision.confidence = 0.9 + + with mock.patch( + "superlocalmemory.hooks.auto_capture.AutoCapture", + ) as MockAC, mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)), + ): + MockAC.return_value.evaluate.return_value = decision + buf.set_engine(object()) + buf.enqueue("some content") + time.sleep(0.15) + + return emitted + + +def test_flush_captured_emits_memory_captured(): + emitted = _flush_with_decision(capture=True) + event_types = [et for et, _ in emitted] + assert "memory.captured" in event_types + assert "memory.dropped" not in event_types + + +def test_flush_dropped_emits_memory_dropped(): + emitted = _flush_with_decision(capture=False) + event_types = [et for et, _ in emitted] + assert "memory.dropped" in event_types + assert "memory.captured" not in event_types