From 01bdde2a321a89f7107b07dd0fa8f357e8767962 Mon Sep 17 00:00:00 2001 From: Abinav Date: Thu, 12 Feb 2026 22:31:05 +0000 Subject: [PATCH 1/5] feat: add persistent embedding daemon to eliminate cold-start latency (#166) Adds `leann serve` command that starts a background embedding server daemon, keeping the model warm between searches. Reduces first-search latency from 30-60s to near-zero by avoiding repeated model loads. - New `embedding_daemon.py` with daemon lifecycle management (start/stop/status) - Heartbeat-based health monitoring with stale state cleanup - EmbeddingServerManager auto-detects running daemon before spawning new servers - CLI: `leann serve`, `leann serve --stop`, `leann serve --status` - 18 unit tests covering state management, integration, and CLI https://claude.ai/code/session_01M6abMs1YzF6yhh13YerDPT --- packages/leann-core/src/leann/cli.py | 106 ++++++ .../leann-core/src/leann/embedding_daemon.py | 353 ++++++++++++++++++ .../src/leann/embedding_server_manager.py | 48 +++ tests/test_warmup_daemon.py | 293 +++++++++++++++ 4 files changed, 800 insertions(+) create mode 100644 packages/leann-core/src/leann/embedding_daemon.py create mode 100644 tests/test_warmup_daemon.py diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 122ae6b6..78c2d0fb 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -354,6 +354,46 @@ def create_parser(self) -> argparse.ArgumentParser: "--force", "-f", action="store_true", help="Force removal without confirmation" ) + # Serve command — persistent embedding daemon (#166) + serve_parser = subparsers.add_parser( + "serve", + help="Start a persistent embedding server to eliminate cold-start latency", + ) + serve_parser.add_argument( + "--embedding-model", + type=str, + default="facebook/contriever", + help="Embedding model to keep warm (default: facebook/contriever)", + ) + serve_parser.add_argument( + "--embedding-mode", + type=str, + default="sentence-transformers", + choices=["sentence-transformers", "openai", "mlx", "ollama"], + help="Embedding backend mode (default: sentence-transformers)", + ) + serve_parser.add_argument( + "--port", + type=int, + default=5557, + help="ZMQ port for the embedding server (default: 5557)", + ) + serve_parser.add_argument( + "--foreground", + action="store_true", + help="Run in the foreground instead of daemonizing", + ) + serve_parser.add_argument( + "--stop", + action="store_true", + help="Stop a running embedding daemon", + ) + serve_parser.add_argument( + "--status", + action="store_true", + help="Show the status of the embedding daemon", + ) + return parser def register_project_dir(self): @@ -1668,9 +1708,75 @@ async def run(self, args=None): await self.search_documents(args) elif args.command == "ask": await self.ask_questions(args) + elif args.command == "serve": + self.handle_serve(args) else: parser.print_help() + def handle_serve(self, args): + """Handle the ``leann serve`` command.""" + from .embedding_daemon import daemon_status, run_daemon, stop_daemon + + if args.status: + state = daemon_status() + if state is None: + print("No embedding daemon is running.") + else: + import datetime + + started = datetime.datetime.fromtimestamp( + state.get("started_at", 0) + ).strftime("%Y-%m-%d %H:%M:%S") + print(f"Embedding daemon is running:") + print(f" PID: {state.get('pid')}") + print(f" Port: {state.get('port')}") + print(f" Model: {state.get('model_name')}") + print(f" Mode: {state.get('embedding_mode')}") + print(f" Since: {started}") + return + + if args.stop: + if stop_daemon(): + print("Embedding daemon stopped.") + else: + print("No embedding daemon is running.") + return + + # Check for an already-running daemon + existing = daemon_status() + if existing is not None: + print( + f"An embedding daemon is already running on port {existing['port']} " + f"(PID {existing['pid']}, model: {existing['model_name']}). " + f"Use 'leann serve --stop' to stop it first." + ) + return + + print(f"Starting embedding daemon (model: {args.embedding_model})...") + if args.foreground: + print("Running in foreground. Press Ctrl+C to stop.") + run_daemon( + model_name=args.embedding_model, + embedding_mode=args.embedding_mode, + port=args.port, + foreground=True, + ) + else: + run_daemon( + model_name=args.embedding_model, + embedding_mode=args.embedding_mode, + port=args.port, + foreground=False, + ) + state = daemon_status() + if state: + print( + f"Embedding daemon started (PID {state['pid']}, port {state['port']}). " + f"Subsequent searches will skip model loading." + ) + else: + print("Warning: daemon started but state could not be verified.") + def main(): import logging diff --git a/packages/leann-core/src/leann/embedding_daemon.py b/packages/leann-core/src/leann/embedding_daemon.py new file mode 100644 index 00000000..7481b9fb --- /dev/null +++ b/packages/leann-core/src/leann/embedding_daemon.py @@ -0,0 +1,353 @@ +"""Persistent embedding server daemon for LEANN. + +Solves the cold-start problem (#166, #159) where the first search takes 30-60s +due to model loading. ``leann serve`` starts a background daemon that keeps the +embedding model warm. Subsequent ``leann search`` calls detect the daemon and +skip the expensive model-load step. + +The daemon writes a state file to ``~/.leann/daemon.json`` containing the PID, +ZMQ port, model name, and a heartbeat timestamp. Clients read this file to +connect to the running daemon. +""" + +from __future__ import annotations + +import json +import logging +import os +import signal +import socket +import sys +import time +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# Default location for daemon state +_DAEMON_STATE_DIR = Path.home() / ".leann" +_DAEMON_STATE_FILE = _DAEMON_STATE_DIR / "daemon.json" + +# Heartbeat interval (seconds) +_HEARTBEAT_INTERVAL = 30 +# Stale threshold — if heartbeat is older than this, daemon is dead +_STALE_THRESHOLD = 90 + + +def _get_state_file() -> Path: + """Return the path to the daemon state file.""" + return Path(os.environ.get("LEANN_DAEMON_STATE", str(_DAEMON_STATE_FILE))) + + +def _is_port_open(port: int, host: str = "localhost") -> bool: + """Check whether a TCP port is accepting connections.""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(2) + return s.connect_ex((host, port)) == 0 + except OSError: + return False + + +def _is_pid_alive(pid: int) -> bool: + """Check whether a process with the given PID exists.""" + try: + os.kill(pid, 0) + return True + except (OSError, ProcessLookupError): + return False + + +def read_daemon_state() -> Optional[dict]: + """Read the daemon state file and validate it. + + Returns the state dict if a healthy daemon is running, else ``None``. + """ + state_file = _get_state_file() + if not state_file.exists(): + return None + + try: + state = json.loads(state_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + + pid = state.get("pid") + port = state.get("port") + heartbeat = state.get("heartbeat", 0) + + if not pid or not port: + return None + + # Check if the daemon process is still alive + if not _is_pid_alive(pid): + logger.debug("Daemon PID %d is not alive, removing stale state file", pid) + _remove_state_file() + return None + + # Check heartbeat freshness + age = time.time() - heartbeat + if age > _STALE_THRESHOLD: + logger.debug("Daemon heartbeat is %.0fs old (stale), ignoring", age) + _remove_state_file() + return None + + # Verify the port is actually open + if not _is_port_open(port): + logger.debug("Daemon port %d is not open, removing stale state file", port) + _remove_state_file() + return None + + return state + + +def _write_state(state: dict) -> None: + """Write daemon state atomically.""" + state_file = _get_state_file() + state_file.parent.mkdir(parents=True, exist_ok=True) + tmp = state_file.with_suffix(".tmp") + tmp.write_text(json.dumps(state, indent=2), encoding="utf-8") + tmp.replace(state_file) + + +def _remove_state_file() -> None: + """Remove the daemon state file.""" + try: + _get_state_file().unlink(missing_ok=True) + except OSError: + pass + + +def stop_daemon() -> bool: + """Stop a running daemon. Returns True if a daemon was stopped.""" + state = read_daemon_state() + if state is None: + # Also try reading raw file (daemon might be unhealthy but still running) + state_file = _get_state_file() + if state_file.exists(): + try: + state = json.loads(state_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + pass + + if state is None: + return False + + pid = state.get("pid") + if pid and _is_pid_alive(pid): + try: + os.kill(pid, signal.SIGTERM) + # Wait briefly for graceful shutdown + for _ in range(20): + if not _is_pid_alive(pid): + break + time.sleep(0.25) + else: + # Force kill if still alive + try: + os.kill(pid, signal.SIGKILL) + except OSError: + pass + except OSError: + pass + + _remove_state_file() + return True + + +def daemon_status() -> Optional[dict]: + """Return current daemon status, or None if not running.""" + return read_daemon_state() + + +def run_daemon( + model_name: str, + embedding_mode: str = "sentence-transformers", + port: int = 5557, + provider_options: Optional[dict] = None, + foreground: bool = False, +) -> None: + """Start the embedding daemon. + + In foreground mode, blocks until interrupted. In background mode, forks a + child process and returns immediately. + """ + if not foreground: + _start_background(model_name, embedding_mode, port, provider_options) + return + + _run_foreground(model_name, embedding_mode, port, provider_options) + + +def _start_background( + model_name: str, + embedding_mode: str, + port: int, + provider_options: Optional[dict], +) -> None: + """Fork a background daemon process.""" + import subprocess + + from .settings import encode_provider_options + + cmd = [ + sys.executable, + "-m", + "leann.embedding_daemon", + "--model-name", + model_name, + "--embedding-mode", + embedding_mode, + "--port", + str(port), + ] + + env = os.environ.copy() + encoded = encode_provider_options(provider_options) + if encoded: + env["LEANN_EMBEDDING_OPTIONS"] = encoded + + # Start as a detached subprocess + proc = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + start_new_session=True, + ) + + # Wait briefly and verify it started + time.sleep(2) + if proc.poll() is not None: + raise RuntimeError( + f"Daemon process exited immediately with code {proc.returncode}" + ) + + # Wait for the state file and port to become ready + for _ in range(60): + state = read_daemon_state() + if state and state.get("port"): + return + time.sleep(1) + + raise RuntimeError("Daemon did not become ready within 60 seconds") + + +def _run_foreground( + model_name: str, + embedding_mode: str, + port: int, + provider_options: Optional[dict], +) -> None: + """Run the daemon in the foreground (blocking).""" + from .embedding_server_manager import EmbeddingServerManager, _get_available_port + + # Find an available port + try: + actual_port = _get_available_port(port) + except RuntimeError: + logger.error("No available ports starting from %d", port) + sys.exit(1) + + logger.info("Starting persistent embedding daemon on port %d...", actual_port) + + # Determine the backend module for the embedding server + backend_module = "leann_backend_hnsw.hnsw_embedding_server" + + manager = EmbeddingServerManager(backend_module_name=backend_module) + + shutdown_requested = False + + def _signal_handler(signum, frame): + nonlocal shutdown_requested + shutdown_requested = True + logger.info("Shutdown signal received, cleaning up...") + manager.stop_server() + _remove_state_file() + sys.exit(0) + + signal.signal(signal.SIGTERM, _signal_handler) + signal.signal(signal.SIGINT, _signal_handler) + + # Start the embedding server + started, ready_port = manager.start_server( + port=actual_port, + model_name=model_name, + embedding_mode=embedding_mode, + provider_options=provider_options, + ) + + if not started: + logger.error("Failed to start embedding server") + sys.exit(1) + + logger.info("Embedding daemon ready on port %d (model: %s)", ready_port, model_name) + + # Write state file + state = { + "pid": os.getpid(), + "port": ready_port, + "model_name": model_name, + "embedding_mode": embedding_mode, + "started_at": time.time(), + "heartbeat": time.time(), + } + _write_state(state) + + # Heartbeat loop — keep updating the state file so clients know we're alive + try: + while not shutdown_requested: + time.sleep(_HEARTBEAT_INTERVAL) + if shutdown_requested: + break + # Check if the server subprocess is still alive + if manager.server_process and manager.server_process.poll() is not None: + logger.error("Embedding server process died unexpectedly") + break + state["heartbeat"] = time.time() + _write_state(state) + except KeyboardInterrupt: + pass + finally: + manager.stop_server() + _remove_state_file() + logger.info("Embedding daemon shut down cleanly") + + +# --------------------------------------------------------------------------- +# CLI entry point (python -m leann.embedding_daemon) +# --------------------------------------------------------------------------- + +def _main(): + """Entry point when run as ``python -m leann.embedding_daemon``.""" + import argparse + + parser = argparse.ArgumentParser(description="LEANN persistent embedding daemon") + parser.add_argument("--model-name", required=True, help="Embedding model name") + parser.add_argument( + "--embedding-mode", + default="sentence-transformers", + help="Embedding mode (default: sentence-transformers)", + ) + parser.add_argument("--port", type=int, default=5557, help="ZMQ port (default: 5557)") + args = parser.parse_args() + + # Read provider options from environment + provider_options = None + raw = os.environ.get("LEANN_EMBEDDING_OPTIONS") + if raw: + try: + provider_options = json.loads(raw) + except (json.JSONDecodeError, TypeError): + pass + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + _run_foreground(args.model_name, args.embedding_mode, args.port, provider_options) + + +if __name__ == "__main__": + _main() diff --git a/packages/leann-core/src/leann/embedding_server_manager.py b/packages/leann-core/src/leann/embedding_server_manager.py index ca61d053..2327d999 100644 --- a/packages/leann-core/src/leann/embedding_server_manager.py +++ b/packages/leann-core/src/leann/embedding_server_manager.py @@ -189,6 +189,22 @@ def start_server( logger.info("Existing server configuration differs; restarting embedding server") self.stop_server() + # Check for a running persistent daemon (``leann serve``) before + # spawning a new subprocess. This avoids the cold-start penalty. + daemon_state = self._try_daemon(model_name, embedding_mode) + if daemon_state is not None: + daemon_port = daemon_state["port"] + logger.info( + "Using persistent embedding daemon on port %d (PID %d)", + daemon_port, + daemon_state.get("pid", -1), + ) + self.server_port = daemon_port + self._server_config = config_signature + # We don't own this process — leave server_process as None so + # stop_server() won't kill the daemon. + return True, daemon_port + # For Colab environment, use a different strategy if _is_colab_environment(): logger.info("Detected Colab environment, using alternative startup strategy") @@ -503,6 +519,38 @@ def _finalize_process(self) -> None: except Exception: pass + @staticmethod + def _try_daemon(model_name: str, embedding_mode: str) -> "dict | None": + """Check for a running ``leann serve`` daemon with a compatible model. + + Returns the daemon state dict if a matching daemon is healthy, else + ``None``. + """ + try: + from .embedding_daemon import read_daemon_state + except ImportError: + return None + + state = read_daemon_state() + if state is None: + return None + + # Verify the daemon serves the same model + mode. + if ( + state.get("model_name") == model_name + and state.get("embedding_mode", "sentence-transformers") == embedding_mode + ): + return state + + logger.debug( + "Daemon model mismatch: daemon=%s/%s, requested=%s/%s", + state.get("model_name"), + state.get("embedding_mode"), + model_name, + embedding_mode, + ) + return None + def _adopt_existing_server(self, *args, **kwargs) -> None: # Removed: cross-process adoption no longer supported return diff --git a/tests/test_warmup_daemon.py b/tests/test_warmup_daemon.py new file mode 100644 index 00000000..b3f71199 --- /dev/null +++ b/tests/test_warmup_daemon.py @@ -0,0 +1,293 @@ +"""Tests for the persistent embedding daemon (#166). + +These tests validate the daemon state management, lifecycle, and integration +with EmbeddingServerManager without requiring a real embedding model or the +HNSW C++ backend. +""" + +import json +import os +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Patch the heavy C++ import that leann.api triggers at import time. +# This lets us test the pure-Python daemon module without compiling +# leann_backend_hnsw from source. +# --------------------------------------------------------------------------- +def _ensure_importable(): + """Ensure leann.embedding_daemon can be imported. + + If leann_backend_hnsw.convert_to_csr is missing the function referenced by + api.py, we inject a stub so the rest of the package can load. + """ + try: + from leann.embedding_daemon import read_daemon_state # noqa: F401 + except (ImportError, ModuleNotFoundError): + # Stub out the missing symbol so ``leann.api`` can be imported. + _mod = sys.modules.get("leann_backend_hnsw.convert_to_csr") + if _mod is not None and not hasattr(_mod, "prune_hnsw_embeddings_inplace"): + _mod.prune_hnsw_embeddings_inplace = lambda *a, **kw: True + + # Also handle the case where the entire module is missing + if "leann_backend_hnsw" not in sys.modules: + stub = MagicMock() + sys.modules["leann_backend_hnsw"] = stub + sys.modules["leann_backend_hnsw.convert_to_csr"] = stub.convert_to_csr + stub.convert_to_csr.prune_hnsw_embeddings_inplace = lambda *a, **kw: True + + +_ensure_importable() + +from leann.embedding_daemon import ( + _STALE_THRESHOLD, + _remove_state_file, + _write_state, + daemon_status, + read_daemon_state, + stop_daemon, +) + + +@pytest.fixture(autouse=True) +def isolated_state_dir(tmp_path, monkeypatch): + """Redirect the daemon state file to a temp directory for test isolation.""" + state_file = tmp_path / "daemon.json" + monkeypatch.setenv("LEANN_DAEMON_STATE", str(state_file)) + return state_file + + +class TestDaemonStateManagement: + """Test the daemon state file read/write/cleanup logic.""" + + def test_read_daemon_state_no_file(self): + assert read_daemon_state() is None + + def test_read_daemon_state_invalid_json(self, isolated_state_dir): + isolated_state_dir.write_text("not json", encoding="utf-8") + assert read_daemon_state() is None + + def test_read_daemon_state_missing_fields(self, isolated_state_dir): + isolated_state_dir.write_text(json.dumps({"pid": 1}), encoding="utf-8") + assert read_daemon_state() is None + + def test_read_daemon_state_dead_pid(self, isolated_state_dir): + state = { + "pid": 999999999, # Almost certainly not a real PID + "port": 5557, + "heartbeat": time.time(), + } + isolated_state_dir.write_text(json.dumps(state), encoding="utf-8") + result = read_daemon_state() + # PID doesn't exist, should return None and clean up + assert result is None + assert not isolated_state_dir.exists() + + def test_read_daemon_state_stale_heartbeat(self, isolated_state_dir): + state = { + "pid": os.getpid(), # Our own PID, definitely alive + "port": 5557, + "heartbeat": time.time() - _STALE_THRESHOLD - 10, + } + isolated_state_dir.write_text(json.dumps(state), encoding="utf-8") + result = read_daemon_state() + assert result is None + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + def test_read_daemon_state_healthy(self, mock_port, isolated_state_dir): + state = { + "pid": os.getpid(), + "port": 5557, + "model_name": "facebook/contriever", + "embedding_mode": "sentence-transformers", + "heartbeat": time.time(), + } + isolated_state_dir.write_text(json.dumps(state), encoding="utf-8") + result = read_daemon_state() + assert result is not None + assert result["port"] == 5557 + assert result["model_name"] == "facebook/contriever" + + def test_write_and_read_state(self, isolated_state_dir): + state = {"pid": os.getpid(), "port": 9999, "heartbeat": time.time()} + _write_state(state) + + loaded = json.loads(isolated_state_dir.read_text(encoding="utf-8")) + assert loaded["port"] == 9999 + + def test_remove_state_file(self, isolated_state_dir): + _write_state({"pid": 1, "port": 1}) + assert isolated_state_dir.exists() + _remove_state_file() + assert not isolated_state_dir.exists() + + +class TestStopDaemon: + """Test daemon shutdown logic.""" + + def test_stop_daemon_no_running(self): + assert stop_daemon() is False + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + @patch("leann.embedding_daemon._is_pid_alive", return_value=True) + @patch("os.kill") + def test_stop_daemon_sends_sigterm( + self, mock_kill, mock_alive, mock_port, isolated_state_dir + ): + state = { + "pid": 12345, + "port": 5557, + "heartbeat": time.time(), + } + isolated_state_dir.write_text(json.dumps(state), encoding="utf-8") + + # After SIGTERM, pretend process dies + mock_alive.side_effect = [True, True, False] + + result = stop_daemon() + assert result is True + mock_kill.assert_called() + assert not isolated_state_dir.exists() + + +class TestDaemonStatus: + """Test status reporting.""" + + def test_status_no_daemon(self): + assert daemon_status() is None + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + def test_status_healthy_daemon(self, mock_port, isolated_state_dir): + state = { + "pid": os.getpid(), + "port": 5557, + "model_name": "test-model", + "embedding_mode": "sentence-transformers", + "heartbeat": time.time(), + "started_at": time.time() - 100, + } + isolated_state_dir.write_text(json.dumps(state), encoding="utf-8") + + result = daemon_status() + assert result is not None + assert result["model_name"] == "test-model" + + +class TestEmbeddingServerManagerDaemonIntegration: + """Test that EmbeddingServerManager detects and uses a running daemon.""" + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + def test_manager_uses_daemon(self, mock_port, isolated_state_dir): + from leann.embedding_server_manager import EmbeddingServerManager + + # Simulate a running daemon + _write_state( + { + "pid": os.getpid(), + "port": 6789, + "model_name": "facebook/contriever", + "embedding_mode": "sentence-transformers", + "heartbeat": time.time(), + } + ) + + manager = EmbeddingServerManager( + backend_module_name="leann_backend_hnsw.hnsw_embedding_server" + ) + + state = manager._try_daemon("facebook/contriever", "sentence-transformers") + assert state is not None + assert state["port"] == 6789 + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + def test_manager_ignores_mismatched_daemon(self, mock_port, isolated_state_dir): + from leann.embedding_server_manager import EmbeddingServerManager + + # Daemon serves a different model + _write_state( + { + "pid": os.getpid(), + "port": 6789, + "model_name": "BAAI/bge-large-en-v1.5", + "embedding_mode": "sentence-transformers", + "heartbeat": time.time(), + } + ) + + manager = EmbeddingServerManager( + backend_module_name="leann_backend_hnsw.hnsw_embedding_server" + ) + + state = manager._try_daemon("facebook/contriever", "sentence-transformers") + assert state is None + + +class TestCliServeCommand: + """Test the CLI serve command parsing and dispatch.""" + + def test_serve_status_no_daemon(self, capsys): + from leann.cli import LeannCLI + + cli = LeannCLI() + parser = cli.create_parser() + args = parser.parse_args(["serve", "--status"]) + cli.handle_serve(args) + + captured = capsys.readouterr() + assert "No embedding daemon is running" in captured.out + + @patch("leann.embedding_daemon._is_port_open", return_value=True) + def test_serve_status_running(self, mock_port, isolated_state_dir, capsys): + from leann.cli import LeannCLI + + _write_state( + { + "pid": os.getpid(), + "port": 5557, + "model_name": "facebook/contriever", + "embedding_mode": "sentence-transformers", + "heartbeat": time.time(), + "started_at": time.time(), + } + ) + + cli = LeannCLI() + parser = cli.create_parser() + args = parser.parse_args(["serve", "--status"]) + cli.handle_serve(args) + + captured = capsys.readouterr() + assert "Embedding daemon is running" in captured.out + assert "5557" in captured.out + + def test_serve_stop_no_daemon(self, capsys): + from leann.cli import LeannCLI + + cli = LeannCLI() + parser = cli.create_parser() + args = parser.parse_args(["serve", "--stop"]) + cli.handle_serve(args) + + captured = capsys.readouterr() + assert "No embedding daemon is running" in captured.out + + def test_serve_parser_has_expected_args(self): + from leann.cli import LeannCLI + + cli = LeannCLI() + parser = cli.create_parser() + args = parser.parse_args([ + "serve", + "--embedding-model", "BAAI/bge-small-en-v1.5", + "--embedding-mode", "sentence-transformers", + "--port", "6000", + "--foreground", + ]) + assert args.embedding_model == "BAAI/bge-small-en-v1.5" + assert args.port == 6000 + assert args.foreground is True From cbc6aee6ccc59b3d935b46bc4b919fd9965ce958 Mon Sep 17 00:00:00 2001 From: Abinav Date: Fri, 13 Feb 2026 00:23:21 +0000 Subject: [PATCH 2/5] fix: safe signal handling, stderr logging, and error diagnostics in daemon - Remove sys.exit() from signal handler to prevent SystemExit during arbitrary code; use shutdown flag instead for clean exit - Redirect daemon subprocess stderr to ~/.leann/daemon.log instead of DEVNULL so startup failures can be diagnosed - Include log file path in error messages when daemon fails to start https://claude.ai/code/session_01M6abMs1YzF6yhh13YerDPT --- .../leann-core/src/leann/embedding_daemon.py | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/packages/leann-core/src/leann/embedding_daemon.py b/packages/leann-core/src/leann/embedding_daemon.py index 7481b9fb..8d6d43e3 100644 --- a/packages/leann-core/src/leann/embedding_daemon.py +++ b/packages/leann-core/src/leann/embedding_daemon.py @@ -207,11 +207,17 @@ def _start_background( if encoded: env["LEANN_EMBEDDING_OPTIONS"] = encoded + # Log stderr to a file so daemon startup failures can be diagnosed. + log_dir = _get_state_file().parent + log_dir.mkdir(parents=True, exist_ok=True) + log_path = log_dir / "daemon.log" + log_fh = open(log_path, "a") # noqa: SIM115 + # Start as a detached subprocess proc = subprocess.Popen( cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + stdout=log_fh, + stderr=log_fh, env=env, start_new_session=True, ) @@ -220,7 +226,8 @@ def _start_background( time.sleep(2) if proc.poll() is not None: raise RuntimeError( - f"Daemon process exited immediately with code {proc.returncode}" + f"Daemon process exited immediately with code {proc.returncode}. " + f"Check {log_path} for details." ) # Wait for the state file and port to become ready @@ -230,7 +237,9 @@ def _start_background( return time.sleep(1) - raise RuntimeError("Daemon did not become ready within 60 seconds") + raise RuntimeError( + f"Daemon did not become ready within 60 seconds. Check {log_path} for details." + ) def _run_foreground( @@ -251,8 +260,16 @@ def _run_foreground( logger.info("Starting persistent embedding daemon on port %d...", actual_port) - # Determine the backend module for the embedding server + # Determine the backend module for the embedding server. + # Try DiskANN first if available, fall back to HNSW. backend_module = "leann_backend_hnsw.hnsw_embedding_server" + try: + import importlib + importlib.import_module("leann_backend_diskann") + # DiskANN available — but HNSW embedding server is the universal one + # that works for both backends' ZMQ protocol. + except ImportError: + pass manager = EmbeddingServerManager(backend_module_name=backend_module) @@ -261,10 +278,9 @@ def _run_foreground( def _signal_handler(signum, frame): nonlocal shutdown_requested shutdown_requested = True - logger.info("Shutdown signal received, cleaning up...") - manager.stop_server() - _remove_state_file() - sys.exit(0) + # Don't call sys.exit() from signal handler — it raises SystemExit + # during arbitrary code, which can corrupt state. Just set the flag + # and let the main loop exit cleanly. signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) From d677f36964b1901ff4ee62c70b29f97751776743 Mon Sep 17 00:00:00 2001 From: Abinav Date: Fri, 13 Feb 2026 00:39:46 +0000 Subject: [PATCH 3/5] fix: close parent's log file handle after Popen inherits it The log_fh was opened and passed to subprocess.Popen but the parent process never closed its copy, leaking a file descriptor. https://claude.ai/code/session_01M6abMs1YzF6yhh13YerDPT --- packages/leann-core/src/leann/embedding_daemon.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/leann-core/src/leann/embedding_daemon.py b/packages/leann-core/src/leann/embedding_daemon.py index 8d6d43e3..967de009 100644 --- a/packages/leann-core/src/leann/embedding_daemon.py +++ b/packages/leann-core/src/leann/embedding_daemon.py @@ -221,6 +221,8 @@ def _start_background( env=env, start_new_session=True, ) + # The child inherited the fd — close the parent's copy to avoid a leak. + log_fh.close() # Wait briefly and verify it started time.sleep(2) From e23a1911a70ce4aefc059022492241effdfb2002 Mon Sep 17 00:00:00 2001 From: Abinav Date: Fri, 13 Feb 2026 01:20:18 +0000 Subject: [PATCH 4/5] fix: pass passages_file through daemon so recompute mode works MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The daemon started the embedding server without --passages-file, which meant recompute mode (HNSW needs to resolve passage IDs during graph construction) would silently fail when search went through the daemon. Thread passages_file through: - run_daemon() → _start_background() / _run_foreground() - _run_foreground() → manager.start_server(passages_file=...) - CLI --passages-file arg for python -m leann.embedding_daemon - Stored in daemon.json state so clients can verify compatibility https://claude.ai/code/session_01M6abMs1YzF6yhh13YerDPT --- .../leann-core/src/leann/embedding_daemon.py | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/packages/leann-core/src/leann/embedding_daemon.py b/packages/leann-core/src/leann/embedding_daemon.py index 967de009..5e3cf0ad 100644 --- a/packages/leann-core/src/leann/embedding_daemon.py +++ b/packages/leann-core/src/leann/embedding_daemon.py @@ -20,7 +20,7 @@ import sys import time from pathlib import Path -from typing import Optional +from typing import Any, Optional logger = logging.getLogger(__name__) @@ -165,18 +165,24 @@ def run_daemon( embedding_mode: str = "sentence-transformers", port: int = 5557, provider_options: Optional[dict] = None, + passages_file: Optional[str] = None, foreground: bool = False, ) -> None: """Start the embedding daemon. In foreground mode, blocks until interrupted. In background mode, forks a child process and returns immediately. + + Args: + passages_file: Path to the index meta.json file. Required for + recompute mode — the HNSW embedding server uses it to resolve + passage IDs during graph construction. """ if not foreground: - _start_background(model_name, embedding_mode, port, provider_options) + _start_background(model_name, embedding_mode, port, provider_options, passages_file) return - _run_foreground(model_name, embedding_mode, port, provider_options) + _run_foreground(model_name, embedding_mode, port, provider_options, passages_file) def _start_background( @@ -184,6 +190,7 @@ def _start_background( embedding_mode: str, port: int, provider_options: Optional[dict], + passages_file: Optional[str] = None, ) -> None: """Fork a background daemon process.""" import subprocess @@ -201,6 +208,8 @@ def _start_background( "--port", str(port), ] + if passages_file: + cmd.extend(["--passages-file", str(Path(passages_file).resolve())]) env = os.environ.copy() encoded = encode_provider_options(provider_options) @@ -249,6 +258,7 @@ def _run_foreground( embedding_mode: str, port: int, provider_options: Optional[dict], + passages_file: Optional[str] = None, ) -> None: """Run the daemon in the foreground (blocking).""" from .embedding_server_manager import EmbeddingServerManager, _get_available_port @@ -288,12 +298,16 @@ def _signal_handler(signum, frame): signal.signal(signal.SIGINT, _signal_handler) # Start the embedding server - started, ready_port = manager.start_server( - port=actual_port, - model_name=model_name, - embedding_mode=embedding_mode, - provider_options=provider_options, - ) + server_kwargs: dict[str, Any] = { + "port": actual_port, + "model_name": model_name, + "embedding_mode": embedding_mode, + "provider_options": provider_options, + } + if passages_file: + server_kwargs["passages_file"] = str(Path(passages_file).resolve()) + + started, ready_port = manager.start_server(**server_kwargs) if not started: logger.error("Failed to start embedding server") @@ -302,7 +316,7 @@ def _signal_handler(signum, frame): logger.info("Embedding daemon ready on port %d (model: %s)", ready_port, model_name) # Write state file - state = { + state: dict[str, Any] = { "pid": os.getpid(), "port": ready_port, "model_name": model_name, @@ -310,6 +324,8 @@ def _signal_handler(signum, frame): "started_at": time.time(), "heartbeat": time.time(), } + if passages_file: + state["passages_file"] = str(Path(passages_file).resolve()) _write_state(state) # Heartbeat loop — keep updating the state file so clients know we're alive @@ -348,6 +364,11 @@ def _main(): help="Embedding mode (default: sentence-transformers)", ) parser.add_argument("--port", type=int, default=5557, help="ZMQ port (default: 5557)") + parser.add_argument( + "--passages-file", + default=None, + help="Path to index meta.json (required for recompute mode)", + ) args = parser.parse_args() # Read provider options from environment @@ -364,7 +385,9 @@ def _main(): format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) - _run_foreground(args.model_name, args.embedding_mode, args.port, provider_options) + _run_foreground( + args.model_name, args.embedding_mode, args.port, provider_options, args.passages_file + ) if __name__ == "__main__": From 8e5e563e7e3cbaedc56c79d4a7c6ae4aa84fe6f1 Mon Sep 17 00:00:00 2001 From: Abinav Date: Thu, 12 Feb 2026 18:48:40 -0800 Subject: [PATCH 5/5] fix: apply ruff check and format for CI --- packages/leann-core/src/leann/cli.py | 8 ++++---- packages/leann-core/src/leann/embedding_daemon.py | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 78c2d0fb..c94c1b2d 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -1724,10 +1724,10 @@ def handle_serve(self, args): else: import datetime - started = datetime.datetime.fromtimestamp( - state.get("started_at", 0) - ).strftime("%Y-%m-%d %H:%M:%S") - print(f"Embedding daemon is running:") + started = datetime.datetime.fromtimestamp(state.get("started_at", 0)).strftime( + "%Y-%m-%d %H:%M:%S" + ) + print("Embedding daemon is running:") print(f" PID: {state.get('pid')}") print(f" Port: {state.get('port')}") print(f" Model: {state.get('model_name')}") diff --git a/packages/leann-core/src/leann/embedding_daemon.py b/packages/leann-core/src/leann/embedding_daemon.py index 5e3cf0ad..ed9fcda0 100644 --- a/packages/leann-core/src/leann/embedding_daemon.py +++ b/packages/leann-core/src/leann/embedding_daemon.py @@ -220,7 +220,7 @@ def _start_background( log_dir = _get_state_file().parent log_dir.mkdir(parents=True, exist_ok=True) log_path = log_dir / "daemon.log" - log_fh = open(log_path, "a") # noqa: SIM115 + log_fh = open(log_path, "a") # Start as a detached subprocess proc = subprocess.Popen( @@ -277,6 +277,7 @@ def _run_foreground( backend_module = "leann_backend_hnsw.hnsw_embedding_server" try: import importlib + importlib.import_module("leann_backend_diskann") # DiskANN available — but HNSW embedding server is the universal one # that works for both backends' ZMQ protocol. @@ -352,6 +353,7 @@ def _signal_handler(signum, frame): # CLI entry point (python -m leann.embedding_daemon) # --------------------------------------------------------------------------- + def _main(): """Entry point when run as ``python -m leann.embedding_daemon``.""" import argparse