From f52dd0c4ccd873891d688d8b5def565080571482 Mon Sep 17 00:00:00 2001 From: Barry Gausden Date: Fri, 19 Jun 2026 17:33:48 +0100 Subject: [PATCH] feat(observability): emit EventBus events from HTTP write paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this change only MCP tool handlers emitted EventBus events. The HTTP fast paths — /observe, /remember (sync + async), the AutoCapture pipeline, and the pending materializer drain — were completely silent. This left the dashboard health-pane event stream and events_last_24h counter structurally empty regardless of how much traffic the daemon served. This change is purely observational. It does not alter any write behaviour, recall semantics, data model, or API contract. Changes: - infra/event_bus.py: register four new event types (memory.observed, memory.captured, memory.dropped, memory.queued). memory.stored already existed. - server/unified_daemon.py: add _emit_event() helper (source_protocol= "http", best-effort, never raises) and wire it into six callsites: ObserveBuffer.enqueue → memory.observed ObserveBuffer._flush match → memory.captured ObserveBuffer._flush miss → memory.dropped POST /remember (async) → memory.queued POST /remember?wait=true → memory.stored materializer drain → memory.stored (source_agent=materializer) - tests/test_server/test_http_eventbus_emissions.py: 10 new tests covering event type registration, never-raises guarantee, http protocol tagging, enqueue emission, duplicate suppression, and flush captured/dropped paths. --- src/superlocalmemory/infra/event_bus.py | 4 + src/superlocalmemory/server/unified_daemon.py | 78 ++++++++++ .../test_http_eventbus_emissions.py | 133 ++++++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 tests/test_server/test_http_eventbus_emissions.py diff --git a/src/superlocalmemory/infra/event_bus.py b/src/superlocalmemory/infra/event_bus.py index 0ddb09f2..36569781 100644 --- a/src/superlocalmemory/infra/event_bus.py +++ b/src/superlocalmemory/infra/event_bus.py @@ -32,6 +32,10 @@ "memory.updated", # Existing memory modified "memory.deleted", # Memory removed "memory.recalled", # Memory retrieved by an agent + "memory.observed", # /observe accepted content into the debounce buffer + "memory.captured", # AutoCapture matched a buffered observation + "memory.dropped", # AutoCapture rejected a buffered observation + "memory.queued", # /remember accepted content into pending.db (async) "graph.updated", # Knowledge graph rebuilt "pattern.learned", # New pattern detected "agent.connected", # New agent connects diff --git a/src/superlocalmemory/server/unified_daemon.py b/src/superlocalmemory/server/unified_daemon.py index 1f0a7512..17b64402 100644 --- a/src/superlocalmemory/server/unified_daemon.py +++ b/src/superlocalmemory/server/unified_daemon.py @@ -171,6 +171,33 @@ def recall(self, query: str, limit: int = 10, session_id: str = "") -> dict: # daemon startup via engine._process_pending_memories(). _engine = None + +def _emit_event( + event_type: str, + payload: dict | None = None, + *, + source_agent: str = "http_client", +) -> None: + """Emit a best-effort EventBus event from an HTTP write path. + + Mirrors mcp.shared.emit_event but tags source_protocol="http" so the + dashboard can distinguish HTTP traffic from MCP tool calls. Never raises + — a bus failure must not affect the caller's response. + """ + try: + from superlocalmemory.infra.event_bus import EventBus + from superlocalmemory.server.routes.helpers import DB_PATH + bus = EventBus.get_instance(DB_PATH) + bus.emit( + event_type, + payload=payload, + source_agent=source_agent, + source_protocol="http", + ) + except Exception as exc: + logger.debug("EventBus emit failed (%s): %s", event_type, exc) + + # v3.4.53: Limit concurrent full (non-fast) recalls. Without this, N parallel # /recall calls spawn N × 6-channel threads → Ollama serialises, reranker # lock queues, and total wall time is N × single-recall-time. 3 concurrent @@ -240,6 +267,14 @@ def enqueue(self, content: str) -> dict: self._timer = threading.Timer(self._debounce_sec, self._flush) self._timer.daemon = True self._timer.start() + _emit_event( + "memory.observed", + payload={ + "content_hash": content_hash, + "content_preview": content[:120], + "buffer_size": buf_size, + }, + ) return {"captured": True, "queued": True, "buffer_size": buf_size} def _flush(self) -> None: @@ -268,6 +303,22 @@ def _flush(self) -> None: # The prior 'processed N' counted skipped (capture=False) # items as successes — a false-positive write count. captured_count += 1 + _emit_event( + "memory.captured", + payload={ + "category": decision.category, + "confidence": getattr(decision, "confidence", None), + "content_preview": content[:120], + }, + ) + else: + _emit_event( + "memory.dropped", + payload={ + "reason": getattr(decision, "reason", "no patterns matched"), + "content_preview": content[:120], + }, + ) except Exception as exc: failed_count += 1 logger.warning( @@ -1784,6 +1835,15 @@ async def remember(req: RememberRequest, wait: bool = False): req.content, metadata=metadata, scope=scope, shared_with=shared_with, ) + _emit_event( + "memory.stored", + payload={ + "fact_ids": list(fact_ids) if fact_ids else [], + "count": len(fact_ids) if fact_ids else 0, + "path": "remember_sync", + "content_preview": req.content[:120], + }, + ) return {"ok": True, "fact_ids": fact_ids, "count": len(fact_ids)} except Exception as exc: raise HTTPException(500, detail=str(exc)) @@ -1823,6 +1883,14 @@ async def remember(req: RememberRequest, wait: bool = False): pending_id = store_pending( req.content, tags=req.tags or "", metadata=meta, ) + _emit_event( + "memory.queued", + payload={ + "pending_id": pending_id, + "tags": req.tags or "", + "content_preview": req.content[:120], + }, + ) return { "ok": True, "fact_ids": fact_ids, @@ -2196,6 +2264,16 @@ def _loop(): ) engine.store_fact_direct(fact) mark_done(item["id"]) + _emit_event( + "memory.stored", + payload={ + "pending_id": item["id"], + "memory_id": mem_id, + "path": "materializer_drain", + "content_preview": content[:120], + }, + source_agent="materializer", + ) except Exception as exc: logger.warning( "Pending %d failed: %s", item["id"], exc, diff --git a/tests/test_server/test_http_eventbus_emissions.py b/tests/test_server/test_http_eventbus_emissions.py new file mode 100644 index 00000000..7d39baa5 --- /dev/null +++ b/tests/test_server/test_http_eventbus_emissions.py @@ -0,0 +1,133 @@ +# Copyright (c) 2026 Varun Pratap Bhardwaj / Qualixar +# Licensed under AGPL-3.0-or-later - see LICENSE file +"""Observability: HTTP write paths must emit EventBus events. + +Before this change only MCP tool handlers emitted bus events; the HTTP +fast paths (/observe, /remember, AutoCapture pipeline, materializer) were +silent, leaving the dashboard event stream structurally empty. These tests +verify the new emissions without touching SLM core logic or altering any +write behaviour. + +All tests mock the EventBus so no database or daemon is required. +""" + +from __future__ import annotations + +import time +import unittest.mock as mock + +from superlocalmemory.infra.event_bus import VALID_EVENT_TYPES +from superlocalmemory.server.unified_daemon import ObserveBuffer, _emit_event + + +# --------------------------------------------------------------------------- +# VALID_EVENT_TYPES coverage +# --------------------------------------------------------------------------- + +def test_new_event_types_registered(): + assert "memory.observed" in VALID_EVENT_TYPES + assert "memory.captured" in VALID_EVENT_TYPES + assert "memory.dropped" in VALID_EVENT_TYPES + assert "memory.queued" in VALID_EVENT_TYPES + + +# --------------------------------------------------------------------------- +# _emit_event helper — never raises, tags source_protocol="http" +# --------------------------------------------------------------------------- + +def test_emit_event_never_raises_on_bus_error(): + with mock.patch( + "superlocalmemory.infra.event_bus.EventBus.get_instance", + side_effect=RuntimeError("bus down"), + ): + _emit_event("memory.observed") # must not propagate + + +def test_emit_event_uses_http_protocol(tmp_path): + from superlocalmemory.infra.event_bus import EventBus + EventBus.reset_instance(tmp_path / "test.db") + bus = EventBus.get_instance(tmp_path / "test.db") + received = [] + bus.subscribe(lambda e: received.append(e)) + + with mock.patch( + "superlocalmemory.infra.event_bus.EventBus.get_instance", + return_value=bus, + ): + _emit_event("memory.observed", payload={"x": 1}) + + assert received, "no event emitted" + assert received[0]["source_protocol"] == "http" + + +# --------------------------------------------------------------------------- +# ObserveBuffer.enqueue → memory.observed +# --------------------------------------------------------------------------- + +def test_enqueue_emits_memory_observed(): + buf = ObserveBuffer(debounce_sec=60) # long debounce — no flush in test + emitted: list[tuple] = [] + + with mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)), + ): + buf.enqueue("hello world") + + assert any(et == "memory.observed" for et, _ in emitted) + + +def test_enqueue_duplicate_does_not_emit(): + buf = ObserveBuffer(debounce_sec=60) + emitted: list[str] = [] + + with mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, **kw: emitted.append(et), + ): + buf.enqueue("same content") + buf.enqueue("same content") # duplicate — no event + + assert emitted.count("memory.observed") == 1 + + +# --------------------------------------------------------------------------- +# ObserveBuffer._flush → memory.captured / memory.dropped +# --------------------------------------------------------------------------- + +def _flush_with_decision(capture: bool) -> list[tuple[str, dict | None]]: + buf = ObserveBuffer(debounce_sec=0.01) + emitted: list[tuple[str, dict | None]] = [] + + decision = mock.MagicMock() + decision.capture = capture + decision.category = "note" + decision.reason = "test reason" + decision.confidence = 0.9 + + with mock.patch( + "superlocalmemory.hooks.auto_capture.AutoCapture", + ) as MockAC, mock.patch( + "superlocalmemory.server.unified_daemon._emit_event", + side_effect=lambda et, payload=None, **kw: emitted.append((et, payload)), + ): + MockAC.return_value.evaluate.return_value = decision + buf.set_engine(object()) + buf.enqueue("some content") + time.sleep(0.15) + + return emitted + + +def test_flush_captured_emits_memory_captured(): + emitted = _flush_with_decision(capture=True) + event_types = [et for et, _ in emitted] + assert "memory.captured" in event_types + assert "memory.dropped" not in event_types + + +def test_flush_dropped_emits_memory_dropped(): + emitted = _flush_with_decision(capture=False) + event_types = [et for et, _ in emitted] + assert "memory.dropped" in event_types + assert "memory.captured" not in event_types