Port embeddings and LLM routers from ipfs_datasets_py with peer multiplexing#220
Port embeddings and LLM routers from ipfs_datasets_py with peer multiplexing#220
Conversation
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR ports the llm_router and embeddings_router functionality (plus CLI/API/docs/tests) from ipfs_datasets_py into ipfs_kit_py, intending to add multi-provider routing with caching and IPFS peer endpoint multiplexing.
Changes:
- Added core router modules for LLM generation and embeddings generation, with provider registries and response/provider caching.
- Added FastAPI endpoints and CLIs for invoking the routers and enumerating providers.
- Added docs, examples, and tests to demonstrate and validate the new router behavior.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
ipfs_kit_py/llm_router.py |
New multi-provider LLM routing layer with caching and optional peer/accelerate/local fallbacks. |
ipfs_kit_py/embeddings_router.py |
New multi-provider embeddings routing layer with caching and adapter/peer/accelerate support. |
ipfs_kit_py/router_deps.py |
Dependency container for shared provider instances and caching (local + optional remote). |
ipfs_kit_py/utils/embedding_adapter.py |
Local embeddings fallback (Gemini CLI → HF transformers) used by embeddings router. |
ipfs_kit_py/utils/cid_utils.py |
Content-addressed key helper used by routers’ CID cache-key strategy. |
ipfs_kit_py/utils/gemini_cli.py |
Gemini CLI wrapper used by LLM router optional provider. |
ipfs_kit_py/utils/claude_cli.py |
Claude CLI wrapper used by LLM router optional provider. |
ipfs_kit_py/utils/__init__.py |
Declares utils package for router utilities. |
ipfs_kit_py/mcp/ai/llm_router_api.py |
Adds FastAPI endpoints for LLM router (/generate, /providers, /health, /cache/clear). |
ipfs_kit_py/mcp/ai/embeddings_router_api.py |
Adds FastAPI endpoints for embeddings router (/embed, /embed-single, /providers, /health, /cache/clear). |
ipfs_kit_py/mcp/ai/api_router.py |
Integrates the new routers into the main AI API router. |
ipfs_kit_py/cli/llm_cli.py |
CLI for LLM router operations (generate/providers/test/clear-cache). |
ipfs_kit_py/cli/embeddings_cli.py |
CLI for embeddings router operations (embed/embed-single/providers/test/clear-cache). |
examples/llm_router_example.py |
Example usage of the LLM router, including peer routing. |
examples/embeddings_router_example.py |
Example usage of the embeddings router, including peer routing and semantic search. |
docs/LLM_ROUTER.md |
Documentation for router architecture, providers, env vars, CLI, and HTTP API. |
docs/EMBEDDINGS_ROUTER.md |
Documentation for embeddings providers, env vars, CLI, and HTTP API. |
tests/test_llm_router.py |
Unit tests for LLM router provider registration, deps, and basic behavior. |
tests/test_embeddings_router.py |
Unit tests for embeddings router provider registration, deps, and peer-provider shape. |
LLM_ROUTER_INTEGRATION.md |
High-level integration summary for the LLM router. |
EMBEDDINGS_ROUTER_INTEGRATION.md |
High-level integration summary for the embeddings router. |
| try: | ||
| # Generate embeddings using the router | ||
| result = embed_texts( | ||
| texts=request.texts, | ||
| model_name=request.model_name, | ||
| device=request.device, | ||
| provider=request.provider, | ||
| deps=router_deps, | ||
| timeout=request.timeout, | ||
| ) | ||
|
|
||
| return EmbeddingResponse( | ||
| embeddings=result, | ||
| provider=request.provider or "auto", | ||
| model=request.model_name, | ||
| device=request.device, | ||
| cached=False # TODO: Track if results were cached | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Embeddings generation failed: {e}") | ||
| raise HTTPException( | ||
| status_code=500, | ||
| detail=f"Embeddings generation failed: {str(e)}" | ||
| ) | ||
|
|
||
| @router.post("/embed-single", response_model=SingleEmbeddingResponse) | ||
| async def generate_single_embedding(request: SingleEmbeddingRequest) -> SingleEmbeddingResponse: | ||
| """Generate embedding for a single text.""" | ||
| try: | ||
| # Generate embedding using the router | ||
| result = embed_text( | ||
| text=request.text, | ||
| model_name=request.model_name, | ||
| device=request.device, | ||
| provider=request.provider, | ||
| deps=router_deps, | ||
| timeout=request.timeout, | ||
| ) |
There was a problem hiding this comment.
These endpoints are declared async but call embed_texts(...) / embed_text(...) directly, which can perform blocking I/O (urllib requests, subprocess.run, and local transformer inference). This will block the FastAPI event loop. Please run router calls in a threadpool (e.g., anyio.to_thread.run_sync / run_in_threadpool) or add async router functions.
There was a problem hiding this comment.
Fixed in 7d6791e. API endpoints now use ThreadPoolExecutor with loop.run_in_executor() to run blocking router calls in a thread pool, preventing event loop blocking.
ipfs_kit_py/utils/cid_utils.py
Outdated
| def cid_for_obj(obj: Any, base: str = "base32") -> str: | ||
| """Generate a CID (Content Identifier) for an object. | ||
|
|
||
| Args: | ||
| obj: The object to generate a CID for | ||
| base: The base encoding to use (default: base32) | ||
|
|
||
| Returns: | ||
| A CID string | ||
| """ | ||
| # Serialize object to JSON | ||
| try: | ||
| payload = json.dumps(obj, sort_keys=True, default=repr, ensure_ascii=False) | ||
| except Exception: | ||
| payload = repr(obj) | ||
|
|
||
| # Generate SHA-256 hash | ||
| hash_bytes = hashlib.sha256(payload.encode("utf-8")).digest() | ||
|
|
||
| # Convert to base32 or base58 encoding | ||
| if base == "base32": | ||
| # Simple base32 encoding | ||
| import base64 | ||
| encoded = base64.b32encode(hash_bytes).decode("ascii").rstrip("=").lower() | ||
| return f"b{encoded}" | ||
| elif base == "base58": |
There was a problem hiding this comment.
cid_for_obj is documented/used as a CID generator, but it’s currently just a SHA-256 digest with a multibase-like prefix (not a real CIDv1 with codec + multihash). This conflicts with router docs that describe “CIDv1” cache keys, and the repo already has CID utilities in ipfs_kit_py/ipfs_multiformats.py (create_cid_from_bytes). Consider either switching to the existing CID implementation (encode JSON bytes -> CIDv1) or renaming/documenting this as a non-CID “content hash” to avoid confusing callers.
There was a problem hiding this comment.
Fixed in 7d6791e. Now using ipfs_kit_py.ipfs_multiformats.create_cid_from_bytes() to generate proper CIDv1 with sha2-256 multihash and base32 encoding. Falls back to simple hash only if ipfs_multiformats is unavailable.
|
|
||
| # Route request to available peers | ||
| result = peer_manager.route_llm_request( | ||
| prompt=prompt, | ||
| model=model_name, | ||
| **kwargs | ||
| ) | ||
|
|
||
| if isinstance(result, dict) and "text" in result: | ||
| return result["text"] | ||
| elif isinstance(result, str): | ||
| return result | ||
|
|
||
| raise RuntimeError("IPFS peer provider returned invalid response") | ||
| except Exception as exc: | ||
| raise RuntimeError(f"IPFS peer provider failed: {exc}") from exc | ||
|
|
There was a problem hiding this comment.
ipfs_peer provider calls deps.ipfs_backend.peer_manager.route_llm_request(...), but there is no implementation of route_llm_request anywhere in this repo (the unified peer manager exposes methods like get_all_peers/connect_to_peer, etc.). As written, selecting provider="ipfs_peer" will reliably fail with AttributeError at runtime. Please either implement the routing method on the actual peer manager / backend multiplexer, or update this provider to call the existing peer endpoint/multiplexer API that already exists in ipfs_kit_py.
| # Route request to available peers | |
| result = peer_manager.route_llm_request( | |
| prompt=prompt, | |
| model=model_name, | |
| **kwargs | |
| ) | |
| if isinstance(result, dict) and "text" in result: | |
| return result["text"] | |
| elif isinstance(result, str): | |
| return result | |
| raise RuntimeError("IPFS peer provider returned invalid response") | |
| except Exception as exc: | |
| raise RuntimeError(f"IPFS peer provider failed: {exc}") from exc | |
| # Resolve an LLM routing function on the peer manager, if available | |
| route_fn = getattr(peer_manager, "route_llm_request", None) | |
| if not callable(route_fn): | |
| raise RuntimeError( | |
| "IPFS peer manager does not support LLM routing " | |
| "(missing 'route_llm_request' method)" | |
| ) | |
| # Route request to available peers | |
| result = route_fn( | |
| prompt=prompt, | |
| model=model_name, | |
| **kwargs | |
| ) | |
| if isinstance(result, dict) and "text" in result: | |
| return result["text"] | |
| elif isinstance(result, str): | |
| return result | |
| raise RuntimeError("IPFS peer provider returned invalid response") | |
| except Exception as exc: | |
| raise RuntimeError(f"IPFS peer provider failed: {exc}") from exc |
There was a problem hiding this comment.
Fixed in 7d6791e. Added proper method existence check with getattr(peer_manager, "route_llm_request", None) and clear error message when the method is missing, preventing AttributeError at runtime.
| def _cache_enabled() -> bool: | ||
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE", "1").strip() != "0" | ||
|
|
||
|
|
||
| def _response_cache_enabled() -> bool: | ||
| # Default to enabled in benchmark contexts (determinism + speed), off otherwise. | ||
| value = os.environ.get("IPFS_DATASETS_PY_ROUTER_RESPONSE_CACHE") | ||
| if value is None: | ||
| return _truthy(os.environ.get("IPFS_DATASETS_PY_BENCHMARK")) | ||
| return str(value).strip() != "0" |
There was a problem hiding this comment.
Caching toggles in this router read only IPFS_DATASETS_PY_ROUTER_CACHE / IPFS_DATASETS_PY_ROUTER_RESPONSE_CACHE / etc., but docs/CLI advertise IPFS_KIT_ROUTER_* (with IPFS_KIT taking precedence). As-is, IPFS_KIT_ROUTER_CACHE=0 won’t disable caching. Please unify env var resolution (IPFS_KIT first, then IPFS_DATASETS_PY) for all router env knobs.
There was a problem hiding this comment.
Fixed in 7d6791e. Added _get_env() helper that checks IPFS_KIT_* first, then IPFS_DATASETS_PY_* as fallback. All environment variable reads now use this unified approach for consistent dual naming support.
ipfs_kit_py/embeddings_router.py
Outdated
| for out_idx, vec in enumerate(generated): | ||
| input_idx = missing_indices[out_idx] |
There was a problem hiding this comment.
In the response-cache fast path, this code assumes the provider returns exactly one embedding per missing_texts entry and indexes missing_indices[out_idx]. However, _get_openrouter_provider() can legally return a shorter list (it explicitly returns partial results when lengths mismatch), which would make this loop raise IndexError or leave cached_vectors entries as None. Please validate lengths and either (a) raise a clear error on mismatch, or (b) zip results to indices defensively and handle missing outputs.
| for out_idx, vec in enumerate(generated): | |
| input_idx = missing_indices[out_idx] | |
| # Guard against providers returning more embeddings than requested. | |
| if len(generated) > len(missing_indices): | |
| raise ValueError( | |
| f"Provider returned {len(generated)} embeddings for " | |
| f"{len(missing_indices)} missing texts" | |
| ) | |
| # Map generated embeddings back to their original input indices defensively. | |
| for input_idx, vec in zip(missing_indices, generated): |
There was a problem hiding this comment.
Fixed in 7d6791e. Added length validation before indexing: if len(generated) > len(missing_indices): raise ValueError(...) and switched to defensive zip(missing_indices, generated) to prevent IndexError with partial results.
ipfs_kit_py/llm_router.py
Outdated
| forced = os.getenv("IPFS_DATASETS_PY_LLM_PROVIDER", "").strip() | ||
| if forced: | ||
| info = _PROVIDER_REGISTRY.get(forced) | ||
| if info is not None: | ||
| return info.factory() | ||
| builtin = _builtin_provider_by_name(forced, deps) | ||
| if builtin is not None: | ||
| return builtin | ||
| raise ValueError(f"Unknown LLM provider: {forced}") |
There was a problem hiding this comment.
The module docstring claims IPFS_KIT_* env vars are supported (and take precedence), but provider selection actually reads only IPFS_DATASETS_PY_LLM_PROVIDER here. This makes IPFS_KIT_LLM_PROVIDER a no-op for the router’s core behavior. Consider adding a small _get_env() helper (like embeddings_router/embedding_adapter) and using it consistently for provider/model/cache/accelerate envs.
There was a problem hiding this comment.
Fixed in 7d6791e. Added _get_env() helper that checks IPFS_KIT_* first, then IPFS_DATASETS_PY_* as fallback. Provider selection now uses _get_env("LLM_PROVIDER") for consistent dual naming support.
ipfs_kit_py/embeddings_router.py
Outdated
| def _truthy(value: Optional[str]) -> bool: | ||
| return str(value or "").strip().lower() in {"1", "true", "yes", "on"} | ||
|
|
||
|
|
||
| def _cache_enabled() -> bool: | ||
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE", "1").strip() != "0" | ||
|
|
||
|
|
||
| def _response_cache_enabled() -> bool: | ||
| value = os.environ.get("IPFS_DATASETS_PY_ROUTER_RESPONSE_CACHE") | ||
| if value is None: | ||
| return _truthy(os.environ.get("IPFS_DATASETS_PY_BENCHMARK")) | ||
| return str(value).strip() != "0" | ||
|
|
||
|
|
||
| def _response_cache_key_strategy() -> str: | ||
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE_KEY", "sha256").strip().lower() or "sha256" | ||
|
|
||
|
|
||
| def _response_cache_cid_base() -> str: | ||
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE_CID_BASE", "base32").strip() or "base32" |
There was a problem hiding this comment.
_cache_enabled() (and other env reads throughout this file) uses IPFS_DATASETS_PY_* variables directly, even though the PR description/docs say IPFS_KIT_* should take precedence. As a result, IPFS_KIT_EMBEDDINGS_PROVIDER, IPFS_KIT_ROUTER_CACHE, etc. won’t affect router behavior. Please route all env reads through _get_env() (already defined at the top) or an equivalent coalesce helper.
| def _truthy(value: Optional[str]) -> bool: | |
| return str(value or "").strip().lower() in {"1", "true", "yes", "on"} | |
| def _cache_enabled() -> bool: | |
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE", "1").strip() != "0" | |
| def _response_cache_enabled() -> bool: | |
| value = os.environ.get("IPFS_DATASETS_PY_ROUTER_RESPONSE_CACHE") | |
| if value is None: | |
| return _truthy(os.environ.get("IPFS_DATASETS_PY_BENCHMARK")) | |
| return str(value).strip() != "0" | |
| def _response_cache_key_strategy() -> str: | |
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE_KEY", "sha256").strip().lower() or "sha256" | |
| def _response_cache_cid_base() -> str: | |
| return os.environ.get("IPFS_DATASETS_PY_ROUTER_CACHE_CID_BASE", "base32").strip() or "base32" | |
| def _get_env_optional(key: str) -> Optional[str]: | |
| """Get environment variable with IPFS_KIT_* taking precedence, or None if unset. | |
| Unlike `_get_env`, this preserves the distinction between "unset" and | |
| "set (even to an empty string)" by checking membership in `os.environ` | |
| instead of using `or` short-circuiting. | |
| """ | |
| kit_key = f"IPFS_KIT_{key}" | |
| if kit_key in os.environ: | |
| return os.environ[kit_key] | |
| datasets_key = f"IPFS_DATASETS_PY_{key}" | |
| if datasets_key in os.environ: | |
| return os.environ[datasets_key] | |
| return None | |
| def _truthy(value: Optional[str]) -> bool: | |
| return str(value or "").strip().lower() in {"1", "true", "yes", "on"} | |
| def _cache_enabled() -> bool: | |
| value = _get_env("ROUTER_CACHE", "1") | |
| return str(value).strip() != "0" | |
| def _response_cache_enabled() -> bool: | |
| # Prefer explicit router response cache setting; if unset, fall back to BENCHMARK. | |
| value = _get_env_optional("ROUTER_RESPONSE_CACHE") | |
| if value is None: | |
| benchmark = _get_env("BENCHMARK") | |
| return _truthy(benchmark) | |
| return str(value).strip() != "0" | |
| def _response_cache_key_strategy() -> str: | |
| return _get_env("ROUTER_CACHE_KEY", "sha256").strip().lower() or "sha256" | |
| def _response_cache_cid_base() -> str: | |
| return _get_env("ROUTER_CACHE_CID_BASE", "base32").strip() or "base32" |
There was a problem hiding this comment.
Fixed in 7d6791e. All environment variable reads in embeddings_router.py now use _get_env() helper that checks IPFS_KIT_* first, then IPFS_DATASETS_PY_* as fallback, making all advertised env vars work correctly.
ipfs_kit_py/embeddings_router.py
Outdated
| # 2) Optional accelerate provider. | ||
| accelerate_provider = _get_accelerate_provider(deps) | ||
| if accelerate_provider is not None: | ||
| return accelerate_provider | ||
|
|
||
| # Try IPFS peer provider first if backend is available | ||
| ipfs_peer_provider = _get_ipfs_peer_provider(deps) | ||
| if ipfs_peer_provider is not None: | ||
| return ipfs_peer_provider | ||
|
|
There was a problem hiding this comment.
Provider resolution order here checks the accelerate provider before the ipfs_peer provider, but the PR description/docs say peer multiplexing should be attempted before accelerate. If the ordering matters (it usually does for cost/latency), please swap these checks or update the documented resolution order to match the implementation.
There was a problem hiding this comment.
Fixed in 7d6791e. Swapped provider resolution order - IPFS peer provider is now checked before accelerate provider, matching the documented resolution order in PR description.
| # Register a failing provider | ||
| register_llm_provider("failing_provider", lambda: FailingProvider()) | ||
|
|
||
| # Without a fallback, should raise | ||
| with pytest.raises(RuntimeError): | ||
| get_llm_provider("failing_provider", use_cache=False) | ||
|
|
There was a problem hiding this comment.
This test expects get_llm_provider("failing_provider") to raise, but get_llm_provider only instantiates/returns the provider and does not call generate(). With the current router implementation, this should not raise and the test will fail. To test fallback-on-error, invoke generate_text(..., provider="failing_provider") (or call provider.generate(...)) and assert the expected exception/fallback behavior.
There was a problem hiding this comment.
Fixed in 7d6791e. Changed test to call generate_text("test prompt", provider="failing_provider") instead of get_llm_provider(), properly testing fallback behavior when generation fails.
| try: | ||
| # Generate text using the router | ||
| result = generate_text( | ||
| prompt=request.prompt, | ||
| model_name=request.model_name, | ||
| provider=request.provider, | ||
| deps=router_deps, | ||
| max_tokens=request.max_tokens, | ||
| temperature=request.temperature, | ||
| timeout=request.timeout, | ||
| ) |
There was a problem hiding this comment.
These endpoints are declared async but call generate_text(...) directly, which can perform blocking I/O (HTTP via urllib, subprocess.run for CLIs, and potentially heavy local HF inference). This will block the event loop under load. Please run the router call in a threadpool (e.g., anyio.to_thread.run_sync / starlette.concurrency.run_in_threadpool) or provide async variants of the router APIs.
There was a problem hiding this comment.
Fixed in 7d6791e. API endpoints now use ThreadPoolExecutor with loop.run_in_executor() to run blocking router calls in a thread pool, preventing event loop blocking under load.
|
@copilot apply changes based on the comments in this thread and the llm_router.py shims below |
…tion, and more Co-authored-by: endomorphosis <3405202+endomorphosis@users.noreply.github.com>
Addressing PR Review Feedback
Issues Fixed
Changes Made
llm_router.py:
_get_env()helper to check IPFS_KIT_* then IPFS_DATASETS_PY_*embeddings_router.py:
_get_env()helper for dual environment variable supportembedding_adapter.py:
router_deps.py:
llm_router_api.py & embeddings_router_api.py:
cid_utils.py:
tests/test_llm_router.py:
Original prompt
💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.