Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
|-----|------|---------|-------------|
| `max_tool_rounds` | int | `3` | Maximum router-model tool loop iterations before forcing upstream completion |
| `tool_exec_timeout_s` | int | `30` | Timeout for each local shell script or builtin command execution |
| `routing_timeout_s` | float | `10.0` | Timeout in seconds for each routing-model HTTP call. On timeout the router retries once with small jitter, then falls back to forwarding the request to upstream |
| `tools_base_dir` | string | - | Base directory for Python tools invoked by wrapper scripts. Sets `FR_TOOLS_BASE_DIR` env var |
| `fr_completion_check` | object | `{"enabled": true, "mode": "permissive", "always_true": false}` | Enable router model self-judgment, or force FR-only responses for router-model testing |
| `fr_context_history` | object | `{"enabled": true}` | Preserve router model context across requests |
Expand Down
1 change: 1 addition & 0 deletions examples/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"scripts_dir": "scripts",
"max_tool_rounds": 6,
"tool_exec_timeout_s": 30,
"routing_timeout_s": 10.0,
"debug_logging": {
"enabled": false
}
Expand Down
38 changes: 30 additions & 8 deletions function_router/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import json
import logging
import os
import random
import re
import sys
import threading
Expand Down Expand Up @@ -153,6 +154,7 @@ class AppConfig:
fr_context_history: bool = True
fr_context_preserve: bool = False
debug_logging: bool = False
routing_timeout_s: float = 10.0

@property
def functions_path(self) -> Path:
Expand Down Expand Up @@ -421,6 +423,7 @@ def load_config(config_path: Path) -> AppConfig:
debug_logging=bool(
data.get("debug_logging", {}).get("enabled", False)
),
routing_timeout_s=float(data.get("routing_timeout_s", 10.0)),
)
except KeyError as exc:
raise RuntimeError(f"missing config key: {exc.args[0]}") from exc
Expand Down Expand Up @@ -822,7 +825,12 @@ async def qwen_health_check() -> bool:


async def call_qwen(messages: list[dict[str, Any]]) -> dict[str, Any]:
"""Send a non-streaming chat completion request to the local Qwen endpoint."""
"""Send a non-streaming chat completion request to the local Qwen endpoint.

Retries once on timeout with a small random jitter to ride out brief
routing-model latency spikes (e.g. KV cache warmup). After the retry is
exhausted the timeout propagates so the caller can fall back to upstream.
"""

if STATE.http_client is None or STATE.config is None or STATE.tools is None:
raise RuntimeError("application state is not initialized")
Expand All @@ -842,12 +850,26 @@ async def call_qwen(messages: list[dict[str, Any]]) -> dict[str, Any]:
"Authorization": f"Bearer {STATE.config.routing.api_key}",
"Content-Type": "application/json",
}
response = await STATE.http_client.post(
f"{STATE.config.routing.base_url.rstrip('/')}/chat/completions",
json=payload,
headers=headers,
timeout=10.0,
)
url = f"{STATE.config.routing.base_url.rstrip('/')}/chat/completions"
timeout_s = STATE.config.routing_timeout_s

try:
response = await STATE.http_client.post(
url, json=payload, headers=headers, timeout=timeout_s,
)
except httpx.TimeoutException as exc:
if STATE.logger is not None:
STATE.logger.warning("routing model timeout (%.1fs), retrying once", timeout_s)
await asyncio.sleep(random.uniform(0.1, 0.5))
try:
response = await STATE.http_client.post(
url, json=payload, headers=headers, timeout=timeout_s,
)
except httpx.TimeoutException as retry_exc:
if STATE.logger is not None:
STATE.logger.warning("routing model timeout on retry, giving up")
raise retry_exc from exc

response.raise_for_status()
return response.json()

Expand Down Expand Up @@ -882,7 +904,7 @@ async def warmup_qwen() -> bool:
f"{STATE.config.routing.base_url.rstrip('/')}/chat/completions",
json=payload,
headers=headers,
timeout=10.0,
timeout=STATE.config.routing_timeout_s,
)
response.raise_for_status()
return True
Expand Down
122 changes: 122 additions & 0 deletions scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,110 @@ prompt_required() {
done
}

probe_model_endpoint() {
local label="$1"
local base_url="$2"
local api_key="$3"
local model="$4"
local with_tools="$5"
local output
output=$(LABEL="$label" BASE_URL="$base_url" API_KEY="$api_key" MODEL="$model" WITH_TOOLS="$with_tools" python3 - <<'PY'
import json
import os
import re
import socket
import sys
import urllib.error
import urllib.request

base_url = os.environ["BASE_URL"].rstrip("/")
raw_key = os.environ["API_KEY"]
model = os.environ["MODEL"]
with_tools = os.environ["WITH_TOOLS"] == "1"

m = re.fullmatch(r"\$\{([A-Z_][A-Z0-9_]*)\}", raw_key)
if m:
resolved = os.environ.get(m.group(1))
if resolved is None:
sys.stderr.write(f"warning: API key references {raw_key} but it is not set in this shell\n")
key = raw_key
else:
key = resolved
elif raw_key == "" or raw_key.lower() == "any":
key = "any"
else:
key = raw_key

url = f"{base_url}/chat/completions"
body = {
"model": model,
"messages": [{"role": "user", "content": "ping"}],
"max_tokens": 1,
}
if with_tools:
body["tools"] = [{
"type": "function",
"function": {
"name": "ping",
"description": "connectivity probe",
"parameters": {"type": "object", "properties": {}},
},
}]

req = urllib.request.Request(
url,
data=json.dumps(body).encode("utf-8"),
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {key}",
},
method="POST",
)

def emit(status, detail):
print(f"{status}\t{url}\t{detail}")

try:
with urllib.request.urlopen(req, timeout=10) as resp:
code = resp.getcode()
raw = resp.read(2048).decode("utf-8", errors="replace")
if 200 <= code < 300:
emit("OK", model)
sys.exit(0)
emit("FAIL", f"HTTP {code}: {raw[:200]}")
sys.exit(1)
except urllib.error.HTTPError as e:
raw = ""
try:
raw = e.read(2048).decode("utf-8", errors="replace")
except Exception:
pass
emit("FAIL", f"HTTP {e.code}: {raw[:200]}")
sys.exit(1)
except socket.timeout:
emit("FAIL", "Timed out after 10s")
sys.exit(1)
except urllib.error.URLError as e:
emit("FAIL", f"Connection error: {e.reason!r}")
sys.exit(1)
except Exception as e:
emit("FAIL", f"Unexpected error: {e!r}")
sys.exit(1)
PY
)
local probe_exit=$?
local status_field url_field detail_field
IFS=$'\t' read -r status_field url_field detail_field <<< "$output"
if [ "$probe_exit" -eq 0 ] && [ "$status_field" = "OK" ]; then
printf ' \u2713 %s model reachable: %s\n' "$label" "$detail_field"
return 0
fi
printf ' \u2717 %s model check failed:\n' "$label"
printf ' URL: %s\n' "$url_field"
printf ' %s\n' "$detail_field"
return 1
}

prompt_yes_no() {
local prompt_en="$1"
local prompt_zh="$2"
Expand Down Expand Up @@ -267,6 +371,24 @@ echo " 示例值: /home/mt/tools"
TOOLS_BASE_DIR=$(prompt_default " Tools base directory" " 工具根目录" "$HOME/.function-router/scripts")
OPENCLAW_CONFIG=$(prompt_default " OpenClaw config path" " OpenClaw 配置路径" "$DEFAULT_OPENCLAW_CONFIG")

echo
echo "── Verifying model endpoints / 验证模型连通性 ──"
ROUTING_OK=1
UPSTREAM_OK=1
probe_model_endpoint "Routing" "$ROUTING_BASE_URL" "$ROUTING_API_KEY" "$ROUTING_MODEL" 1 || ROUTING_OK=0
probe_model_endpoint "Upstream" "$UPSTREAM_BASE_URL" "$UPSTREAM_API_KEY" "$UPSTREAM_MODEL" 0 || UPSTREAM_OK=0

if [ "$ROUTING_OK" -ne 1 ] || [ "$UPSTREAM_OK" -ne 1 ]; then
echo
CONTINUE_ANYWAY=$(prompt_yes_no "One or more model checks failed. Continue install anyway?" "一个或多个模型检测失败,是否仍要继续安装?" "N")
if [ "$CONTINUE_ANYWAY" != "yes" ]; then
echo "Aborted. No files were modified."
echo "已中止,未修改任何文件。"
exit 1
fi
fi
echo

mkdir -p "$TARGET_DIR" "$SCRIPTS_DIR" "$LOGS_DIR"
cp "$REPO_ROOT/examples/config.example.json" "$CONFIG_PATH"
cp "$REPO_ROOT/examples/functions.example.jsonl" "$FUNCTIONS_PATH"
Expand Down
20 changes: 20 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,23 @@ def test_load_config_invalid_structure(tmp_path: Path) -> None:

with pytest.raises(RuntimeError, match="invalid config structure"):
load_config(config_path)


def test_load_config_default_routing_timeout(tmp_path: Path) -> None:
config_path = tmp_path / "config.json"
write_json(config_path, valid_config_payload())

config = load_config(config_path)

assert config.routing_timeout_s == 10.0


def test_load_config_custom_routing_timeout(tmp_path: Path) -> None:
payload = valid_config_payload()
payload["routing_timeout_s"] = 20
config_path = tmp_path / "config.json"
write_json(config_path, payload)

config = load_config(config_path)

assert config.routing_timeout_s == 20.0
89 changes: 89 additions & 0 deletions tests/test_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
def restore_state() -> None:
old_config = server.STATE.config
old_tools = server.STATE.tools
old_http_client = server.STATE.http_client
try:
yield
finally:
server.STATE.config = old_config
server.STATE.tools = old_tools
server.STATE.http_client = old_http_client


def make_config(max_tool_rounds: int = 3) -> server.AppConfig:
Expand Down Expand Up @@ -377,3 +379,90 @@ async def test_execute_tool_non_builtin_still_requires_script_file(tmp_path: Pat
result = await server.execute_tool("custom_tool", json.dumps({"path": "/tmp"}))

assert result["error"] == "script not found: custom_tool.sh"


class _StubResponse:
def __init__(self, payload: dict) -> None:
self._payload = payload

def raise_for_status(self) -> None:
return None

def json(self) -> dict:
return self._payload


class _StubHttpClient:
"""Records call timeouts and replays a scripted sequence of responses."""

def __init__(self, responses):
self._responses = list(responses)
self.call_count = 0
self.timeouts_seen: list[float] = []

async def post(self, url, json=None, headers=None, timeout=None):
import httpx

self.call_count += 1
self.timeouts_seen.append(timeout)
item = self._responses.pop(0)
if isinstance(item, BaseException):
raise item
return _StubResponse(item)


@pytest.mark.asyncio
async def test_call_qwen_uses_configured_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
config = make_config()
config.routing_timeout_s = 15.0
server.STATE.config = config
server.STATE.tools = []
client = _StubHttpClient([{"choices": [{"message": {"content": "ok"}}]}])
server.STATE.http_client = client

await server.call_qwen([{"role": "user", "content": "hi"}])

assert client.call_count == 1
assert client.timeouts_seen == [15.0]


@pytest.mark.asyncio
async def test_call_qwen_retries_once_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
import httpx

server.STATE.config = make_config()
server.STATE.tools = []
client = _StubHttpClient([
httpx.ReadTimeout("slow"),
{"choices": [{"message": {"content": "recovered"}}]},
])
server.STATE.http_client = client
monkeypatch.setattr(server.asyncio, "sleep", lambda _s: _noop_sleep())

result = await server.call_qwen([{"role": "user", "content": "hi"}])

assert client.call_count == 2
assert result["choices"][0]["message"]["content"] == "recovered"


@pytest.mark.asyncio
async def test_call_qwen_propagates_timeout_after_retry(monkeypatch: pytest.MonkeyPatch) -> None:
import httpx

server.STATE.config = make_config()
server.STATE.tools = []
client = _StubHttpClient([
httpx.ReadTimeout("slow"),
httpx.ReadTimeout("still slow"),
])
server.STATE.http_client = client
monkeypatch.setattr(server.asyncio, "sleep", lambda _s: _noop_sleep())

with pytest.raises(httpx.TimeoutException):
await server.call_qwen([{"role": "user", "content": "hi"}])

assert client.call_count == 2


async def _noop_sleep() -> None:
return None