diff --git a/backend/migrations/versions/009_add_bot_usage_tracking.py b/backend/migrations/versions/009_add_bot_usage_tracking.py new file mode 100644 index 00000000..882bb7a6 --- /dev/null +++ b/backend/migrations/versions/009_add_bot_usage_tracking.py @@ -0,0 +1,33 @@ +"""新增 bot_usage_tracking 資料表 + +追蹤未綁定用戶的訊息使用量,支援 rate limiting。 + +Revision ID: 009 +""" + +from alembic import op + +revision = "009" +down_revision = "008" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute(""" + CREATE TABLE bot_usage_tracking ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + bot_user_id UUID NOT NULL REFERENCES bot_users(id) ON DELETE CASCADE, + period_type VARCHAR(10) NOT NULL, + period_key VARCHAR(20) NOT NULL, + message_count INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(bot_user_id, period_type, period_key) + ) + """) + # UNIQUE 約束已隱含建立唯一索引,無需額外 CREATE INDEX + + +def downgrade() -> None: + op.execute("DROP TABLE IF EXISTS bot_usage_tracking") diff --git a/backend/src/ching_tech_os/api/linebot_router.py b/backend/src/ching_tech_os/api/linebot_router.py index c95efb00..4c13aad6 100644 --- a/backend/src/ching_tech_os/api/linebot_router.py +++ b/backend/src/ching_tech_os/api/linebot_router.py @@ -79,6 +79,8 @@ check_line_access, update_group_settings, reply_text, + push_text, + get_line_user_record, ) from ..services.linebot_ai import handle_text_message @@ -259,21 +261,85 @@ async def process_message_event(event: MessageEvent) -> None: if not has_access: if deny_reason == "user_not_bound": - # 個人對話:回覆提示訊息 - if not is_group and event.reply_token: + # 身份分流:根據策略決定拒絕或進入受限模式 + from ..services.bot.identity_router import ( + route_unbound, + handle_restricted_mode, + ) + + route_result = route_unbound( + platform_type="line", is_group=is_group + ) + if route_result.action == "reject": + if event.reply_token: + try: + await reply_text(event.reply_token, route_result.reply_text) + except Exception as e: + logger.warning(f"回覆未綁定訊息失敗: {e}") + elif route_result.action == "restricted": + # 受限模式:先攔截斜線指令 + from ..services.bot.commands import router as cmd_router + + parsed_cmd = cmd_router.parse(content) + if parsed_cmd: + cmd, cmd_args = parsed_cmd + from ..services.bot.commands import CommandContext + + cmd_ctx = CommandContext( + platform_type="line", + platform_user_id=line_user_id, + bot_user_id=str(user_uuid), + ctos_user_id=None, # 未綁定 + is_admin=False, + is_group=is_group, + group_id=str(group_uuid) if group_uuid else None, + reply_token=event.reply_token, + raw_args=cmd_args, + ) + cmd_reply = await cmd_router.dispatch(cmd, cmd_args, cmd_ctx) + if cmd_reply and event.reply_token: + try: + await reply_text(event.reply_token, cmd_reply) + except Exception: + await push_text(line_user_id, cmd_reply) + return + + # 受限模式 AI 處理 try: - await reply_text( - event.reply_token, - "請先在 CTOS 系統綁定您的 Line 帳號才能使用此服務。\n\n" - "步驟:\n" - "1. 登入 CTOS 系統\n" - "2. 進入 Line Bot 管理頁面\n" - "3. 點擊「綁定 Line 帳號」產生驗證碼\n" - "4. 將驗證碼發送給我完成綁定", + # 取得使用者顯示名稱 + display_name = None + user_row = await get_line_user_record( + line_user_id, "display_name" + ) + if user_row: + display_name = user_row["display_name"] + + reply = await handle_restricted_mode( + content=content, + platform_user_id=line_user_id, + bot_user_id=str(user_uuid), + is_group=is_group, + line_group_id=group_uuid, + message_uuid=message_uuid, + user_display_name=display_name, ) + if reply: + # reply_token 可能在長時間 AI 處理後過期, + # 先嘗試 reply,失敗則 fallback 到 push + if event.reply_token: + try: + await reply_text(event.reply_token, reply) + except Exception: + await push_text(line_user_id, reply) + else: + await push_text(line_user_id, reply) except Exception as e: - logger.warning(f"回覆未綁定訊息失敗: {e}") - # 群組對話:靜默不回應 + logger.error(f"受限模式 AI 處理失敗: {e}", exc_info=True) + try: + await push_text(line_user_id, "抱歉,處理訊息時發生錯誤,請稍後再試。") + except Exception as push_e: + logger.warning(f"推送錯誤訊息失敗: {push_e}") + # silent: 群組靜默忽略 elif deny_reason == "group_not_allowed": # 群組未開啟 AI 回應,靜默不回應 pass diff --git a/backend/src/ching_tech_os/config.py b/backend/src/ching_tech_os/config.py index dd08f519..ad40a7e8 100644 --- a/backend/src/ching_tech_os/config.py +++ b/backend/src/ching_tech_os/config.py @@ -142,6 +142,38 @@ class Settings: "ctos", # 系統簡稱 ] + # =================== + # Bot 多模式平台設定 + # =================== + # 未綁定用戶策略:reject(預設,拒絕並提示綁定)/ restricted(走受限模式 Agent) + bot_unbound_user_policy: str = _get_env("BOT_UNBOUND_USER_POLICY", "reject") + # 受限模式使用的 AI 模型(控制成本,預設用較輕量的 haiku) + bot_restricted_model: str = _get_env("BOT_RESTRICTED_MODEL", "haiku") + # Debug 模式使用的 AI 模型 + bot_debug_model: str = _get_env("BOT_DEBUG_MODEL", "sonnet") + # 頻率限制開關(僅在 restricted 模式下生效) + bot_rate_limit_enabled: bool = _get_env_bool("BOT_RATE_LIMIT_ENABLED", True) + # 每小時訊息上限(未綁定用戶) + bot_rate_limit_hourly: int = _get_env_int("BOT_RATE_LIMIT_HOURLY", 20) + # 每日訊息上限(未綁定用戶) + bot_rate_limit_daily: int = _get_env_int("BOT_RATE_LIMIT_DAILY", 50) + # 驗證 hourly <= daily(若設定不合理則修正) + if bot_rate_limit_hourly > bot_rate_limit_daily: + logger.warning( + "BOT_RATE_LIMIT_HOURLY (%d) > BOT_RATE_LIMIT_DAILY (%d)," + "將 hourly 限制調整為與 daily 相同", + bot_rate_limit_hourly, + bot_rate_limit_daily, + ) + bot_rate_limit_hourly = bot_rate_limit_daily + + # 圖書館公開資料夾(逗號分隔,未綁定用戶只能看到這些資料夾) + library_public_folders: list[str] = [ + f.strip() + for f in _get_env("LIBRARY_PUBLIC_FOLDERS", "產品資料,教育訓練").split(",") + if f.strip() + ] + # =================== # Telegram Bot 設定 # =================== diff --git a/backend/src/ching_tech_os/main.py b/backend/src/ching_tech_os/main.py index d8b73ae0..98baab63 100644 --- a/backend/src/ching_tech_os/main.py +++ b/backend/src/ching_tech_os/main.py @@ -117,6 +117,10 @@ async def lifespan(app: FastAPI): app.state.skillhub_client = SkillHubClient() await init_db_pool() + # 註冊 Bot 斜線指令 + from .services.bot.command_handlers import register_builtin_commands + register_builtin_commands() + # 啟動模組初始化(依啟停狀態) for module_id, info in get_module_registry().items(): if not is_module_enabled(module_id): diff --git a/backend/src/ching_tech_os/models/knowledge.py b/backend/src/ching_tech_os/models/knowledge.py index d7152045..fe32ae4a 100644 --- a/backend/src/ching_tech_os/models/knowledge.py +++ b/backend/src/ching_tech_os/models/knowledge.py @@ -66,6 +66,7 @@ class KnowledgeCreate(BaseModel): category: str = "technical" scope: str = "personal" # 預設為個人知識(global、personal 或 project) project_id: str | None = None # 關聯專案 UUID(scope=project 時使用) + is_public: bool = False # 是否允許未綁定用戶查詢 tags: KnowledgeTags = Field(default_factory=KnowledgeTags) source: KnowledgeSource | None = None related: list[str] = Field(default_factory=list) @@ -81,6 +82,7 @@ class KnowledgeUpdate(BaseModel): category: str | None = None scope: str | None = None # global, personal, project owner: str | None = None # 擁有者帳號(設為空字串可清除) + is_public: bool | None = None # 是否允許未綁定用戶查詢 tags: KnowledgeTags | None = None source: KnowledgeSource | None = None related: list[str] | None = None @@ -96,6 +98,7 @@ class KnowledgeResponse(BaseModel): scope: str = "global" # global、personal 或 project owner: str | None = None # 擁有者帳號(個人知識用) project_id: str | None = None # 關聯專案 UUID(專案知識用) + is_public: bool = False # 是否允許未綁定用戶查詢 tags: KnowledgeTags source: KnowledgeSource related: list[str] @@ -116,6 +119,7 @@ class KnowledgeListItem(BaseModel): scope: str = "global" # global、personal 或 project owner: str | None = None # 擁有者帳號(個人知識用) project_id: str | None = None # 關聯專案 UUID(專案知識用) + is_public: bool = False # 是否允許未綁定用戶查詢 tags: KnowledgeTags author: str updated_at: date @@ -176,6 +180,7 @@ class IndexEntry(BaseModel): scope: str = "global" # global、personal 或 project owner: str | None = None # 擁有者帳號(個人知識用) project_id: str | None = None # 關聯專案 UUID(專案知識用) + is_public: bool = False # 是否允許未綁定用戶查詢 tags: KnowledgeTags author: str created_at: str diff --git a/backend/src/ching_tech_os/services/bot/command_handlers.py b/backend/src/ching_tech_os/services/bot/command_handlers.py new file mode 100644 index 00000000..4efb9a09 --- /dev/null +++ b/backend/src/ching_tech_os/services/bot/command_handlers.py @@ -0,0 +1,139 @@ +"""內建指令 handler + +註冊所有內建的斜線指令到 CommandRouter。 +""" + +from __future__ import annotations + +import logging +import re +import time + +from .commands import CommandContext, SlashCommand, router +from ..bot_line.trigger import reset_conversation + +logger = logging.getLogger(__name__) + + +async def _handle_reset(ctx: CommandContext) -> str | None: + """重置對話歷史""" + # 群組檢查已由 CommandRouter.dispatch() 的 private_only 處理 + await reset_conversation(ctx.platform_user_id) + return "已清除對話歷史,開始新對話!有什麼可以幫你的嗎?" + + +async def _handle_debug(ctx: CommandContext) -> str | None: + """管理員系統診斷指令 + + 使用 bot-debug Agent 和 debug-skill 腳本分析系統狀態。 + """ + from .. import ai_manager + from ..claude_agent import call_claude + from ..bot.ai import parse_ai_response + from ...config import settings + + # 取得 bot-debug Agent + agent = await ai_manager.get_agent_by_name("bot-debug") + if not agent: + return "⚠️ bot-debug Agent 不存在,請確認系統已正確初始化。" + + # 取得 system prompt + system_prompt_data = agent.get("system_prompt") + if isinstance(system_prompt_data, dict): + system_prompt = system_prompt_data.get("content", "") + else: + system_prompt = "" + + if not system_prompt: + return "⚠️ bot-debug Agent 缺少 system_prompt 設定。" + + # 取得 Agent 定義的工具 + agent_tools = agent.get("tools") or ["run_skill_script"] + + # 準備 prompt(使用者的問題描述,或預設「分析系統目前狀態」) + user_problem = ctx.raw_args.strip() if ctx.raw_args else "" + if user_problem: + prompt = f"管理員問題:{user_problem}" + else: + prompt = "請執行系統綜合健康檢查(check-system-health),分析目前系統狀態並回報結果。" + + # 呼叫 Claude CLI + model = settings.bot_debug_model + start_time = time.time() + + try: + response = await call_claude( + prompt=prompt, + model=model, + system_prompt=system_prompt, + timeout=180, # 3 分鐘(診斷腳本可能需要時間) + tools=agent_tools, + ctos_user_id=ctx.ctos_user_id, + ) + except Exception: + logger.exception("Debug 指令 AI 呼叫失敗") + return "⚠️ 診斷執行失敗,請查看系統日誌。" + + duration_ms = int((time.time() - start_time) * 1000) + logger.info(f"/debug 診斷完成,耗時 {duration_ms}ms") + + # 記錄 AI Log + try: + from ..linebot_ai import log_linebot_ai_call + + await log_linebot_ai_call( + message_uuid=None, + line_group_id=None, + is_group=False, + input_prompt=prompt, + history=None, + system_prompt=system_prompt, + allowed_tools=agent_tools, + model=model, + response=response, + duration_ms=duration_ms, + context_type_override="bot-debug", + ) + except Exception: + logger.warning("記錄 /debug AI Log 失敗", exc_info=True) + + # 解析回應 + reply_text, _files = parse_ai_response(response.message) + + if not reply_text: + return "診斷完成,但未產生回報內容。" + + # 過濾 FILE_MESSAGE(debug 不需要檔案傳送) + reply_text = re.sub(r"\[FILE_MESSAGE:[^\]]+\]", "", reply_text).strip() + + return reply_text or "診斷完成,但未產生回報內容。" + + +def register_builtin_commands() -> None: + """註冊所有內建指令""" + + # /reset 指令(包含所有別名) + router.register( + SlashCommand( + name="reset", + aliases=["新對話", "新对话", "清除對話", "清除对话", "忘記", "忘记"], + handler=_handle_reset, + require_bound=False, # 受限模式用戶也可以重置 + require_admin=False, + private_only=True, + ) + ) + + # /debug 指令(管理員專用,僅限個人對話) + router.register( + SlashCommand( + name="debug", + aliases=["診斷", "diag"], + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + + logger.info(f"已註冊 {len({id(v) for v in router._commands.values()})} 個內建指令") diff --git a/backend/src/ching_tech_os/services/bot/commands.py b/backend/src/ching_tech_os/services/bot/commands.py new file mode 100644 index 00000000..710e27e2 --- /dev/null +++ b/backend/src/ching_tech_os/services/bot/commands.py @@ -0,0 +1,157 @@ +"""Bot 斜線指令路由系統 + +可擴充的指令框架,支援 Line/Telegram 共用。 +指令在 AI 處理流程之前攔截處理。 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any, Callable, Coroutine + +from ...database import get_connection + +logger = logging.getLogger(__name__) + + +async def get_command_user_context( + bot_user_id: str | None, +) -> tuple[int | None, bool]: + """從 bot_user_id 查詢 CTOS 帳號綁定狀態和管理員身份 + + Line 和 Telegram 共用的查詢邏輯。 + + Returns: + (ctos_user_id, is_admin) + """ + if not bot_user_id: + return None, False + try: + async with get_connection() as conn: + row = await conn.fetchrow( + "SELECT user_id FROM bot_users WHERE id = $1", + bot_user_id, + ) + if row and row["user_id"]: + from ..user import get_user_role_and_permissions + + user_info = await get_user_role_and_permissions(row["user_id"]) + return row["user_id"], user_info["role"] == "admin" + except Exception: + logger.exception("查詢用戶綁定狀態失敗") + return None, False + + +@dataclass +class CommandContext: + """指令執行上下文""" + + platform_type: str # "line" | "telegram" + platform_user_id: str # Line user ID 或 Telegram user ID + bot_user_id: str | None # bot_users.id (UUID) + ctos_user_id: int | None # CTOS 帳號 ID(None = 未綁定) + is_admin: bool # 是否為管理員 + is_group: bool # 是否為群組對話 + group_id: str | None # 群組 ID(群組對話時) + reply_token: str | None # Line reply token(Line 平台時) + raw_args: str # 指令後的參數文字 + + +@dataclass +class SlashCommand: + """斜線指令定義""" + + name: str # 指令名稱(如 "debug") + aliases: list[str] = field(default_factory=list) # 別名 + handler: Callable[[CommandContext], Coroutine[Any, Any, str | None]] = field( + default=None # type: ignore — 註冊時必須提供 handler + ) + require_bound: bool = False # 是否要求已綁定 CTOS 帳號 + require_admin: bool = False # 是否要求管理員 + private_only: bool = False # 是否僅限個人對話 + platforms: set[str] = field(default_factory=lambda: {"line", "telegram"}) + + +class CommandRouter: + """斜線指令路由器""" + + def __init__(self) -> None: + # key = 正規化後的指令名稱(不含 /),value = SlashCommand + self._commands: dict[str, SlashCommand] = {} + + def register(self, command: SlashCommand) -> None: + """註冊指令""" + key = command.name.lower() + self._commands[key] = command + for alias in command.aliases: + self._commands[alias.lower()] = command + + def parse(self, content: str) -> tuple[SlashCommand, str] | None: + """解析訊息,回傳 (command, args) 或 None + + 僅匹配以 / 開頭的訊息。 + """ + text = content.strip() + if not text.startswith("/"): + return None + + # 分離指令名稱和參數 + parts = text[1:].split(None, 1) # 去掉 /,以第一個空白分割 + if not parts: + return None + + cmd_name = parts[0].lower() + # 處理 Telegram 的 /command@botname 格式 + if "@" in cmd_name: + cmd_name = cmd_name.split("@")[0] + + args = parts[1] if len(parts) > 1 else "" + + command = self._commands.get(cmd_name) + if command is None: + return None + + return (command, args) + + async def dispatch( + self, command: SlashCommand, args: str, context: CommandContext + ) -> str | None: + """執行指令,包含權限檢查 + + 回傳值: + - str: 回覆文字 + - None: 靜默處理(不回覆) + """ + # 平台檢查 + if context.platform_type not in command.platforms: + return None # 不支援的平台,視為一般訊息 + + # 群組限制 + if command.private_only and context.is_group: + return None # 靜默忽略 + + # 綁定檢查 + if command.require_bound and context.ctos_user_id is None: + return "請先綁定 CTOS 帳號才能使用此指令" + + # 管理員檢查 + if command.require_admin and not context.is_admin: + return "此指令僅限管理員使用" + + # 設定參數 + context.raw_args = args + + if command.handler is None: + logger.error(f"指令 /{command.name} 未設定 handler") + return "指令設定錯誤,請聯繫管理員" + + try: + return await command.handler(context) + except Exception: + logger.exception(f"指令 /{command.name} 執行失敗") + return "指令執行時發生錯誤,請稍後再試" + + +# 全域路由器實例 +router = CommandRouter() diff --git a/backend/src/ching_tech_os/services/bot/identity_router.py b/backend/src/ching_tech_os/services/bot/identity_router.py new file mode 100644 index 00000000..81c4f9ae --- /dev/null +++ b/backend/src/ching_tech_os/services/bot/identity_router.py @@ -0,0 +1,265 @@ +"""Bot 身份分流路由器 + +根據 BOT_UNBOUND_USER_POLICY 設定,決定未綁定用戶的處理路徑: +- reject(預設):回覆綁定提示,不進行 AI 處理 +- restricted:路由到受限模式 AI 流程 +""" + +from __future__ import annotations + +import logging +import re +import time +from uuid import UUID + +from ...config import settings + +logger = logging.getLogger(__name__) + +# 各平台的綁定提示訊息 +BINDING_PROMPT_LINE = ( + "請先在 CTOS 系統綁定您的 Line 帳號才能使用此服務。\n\n" + "步驟:\n" + "1. 登入 CTOS 系統\n" + "2. 進入 Line Bot 管理頁面\n" + "3. 點擊「綁定 Line 帳號」產生驗證碼\n" + "4. 將驗證碼發送給我完成綁定" +) + +BINDING_PROMPT_TELEGRAM = ( + "請先在 CTOS 系統綁定您的 Telegram 帳號才能使用此服務。\n\n" + "步驟:\n" + "1. 登入 CTOS 系統\n" + "2. 進入 Bot 管理頁面\n" + "3. 點擊「綁定帳號」產生驗證碼\n" + "4. 將 6 位數驗證碼發送給我完成綁定" +) + + +def get_unbound_policy() -> str: + """取得未綁定用戶策略 + + Returns: + "reject" 或 "restricted" + """ + policy = settings.bot_unbound_user_policy.lower().strip() + if policy not in ("reject", "restricted"): + logger.warning( + "BOT_UNBOUND_USER_POLICY 值無效: %r,使用預設 'reject'", policy + ) + return "reject" + return policy + + +class UnboundRouteResult: + """身份分流結果""" + + __slots__ = ("action", "reply_text") + + def __init__(self, action: str, reply_text: str | None = None): + """ + Args: + action: "reject"(拒絕並回覆提示)| "restricted"(進入受限模式)| "silent"(靜默忽略) + reply_text: 拒絕時的回覆文字(action="reject" 時有值) + """ + self.action = action + self.reply_text = reply_text + + +def route_unbound( + *, + platform_type: str, + is_group: bool, +) -> UnboundRouteResult: + """根據策略決定未綁定用戶的處理路徑 + + Args: + platform_type: "line" | "telegram" + is_group: 是否為群組對話 + + Returns: + UnboundRouteResult 指示應採取的動作 + """ + # 群組中的未綁定用戶一律靜默忽略(不受策略影響) + if is_group: + return UnboundRouteResult(action="silent") + + policy = get_unbound_policy() + + if policy == "restricted": + return UnboundRouteResult(action="restricted") + + # reject 策略:回覆綁定提示 + if platform_type == "telegram": + return UnboundRouteResult(action="reject", reply_text=BINDING_PROMPT_TELEGRAM) + return UnboundRouteResult(action="reject", reply_text=BINDING_PROMPT_LINE) + + +async def handle_restricted_mode( + *, + content: str, + platform_user_id: str, + bot_user_id: str | None, + is_group: bool, + line_group_id: UUID | None = None, + message_uuid: UUID | None = None, + user_display_name: str | None = None, +) -> str | None: + """執行受限模式 AI 流程 + + 使用 bot-restricted Agent、受限工具白名單、縮短的對話歷史。 + + Args: + content: 使用者訊息內容 + platform_user_id: 平台用戶 ID(Line user ID 或 Telegram user ID) + bot_user_id: bot_users.id (UUID 字串) + is_group: 是否為群組對話 + line_group_id: 群組 UUID(用於取得對話歷史) + message_uuid: 訊息 UUID(用於 AI log) + user_display_name: 使用者顯示名稱 + + Returns: + AI 回應文字,或 None + """ + # 原子性頻率限制檢查+計數(在 AI 處理之前) + if bot_user_id: + from .rate_limiter import check_and_increment + + allowed, deny_msg = await check_and_increment(bot_user_id) + if not allowed: + return deny_msg + + from .. import ai_manager + from ..claude_agent import call_claude + from ..linebot_ai import ( + build_system_prompt, + get_conversation_context, + log_linebot_ai_call, + ) + from ..mcp import get_mcp_tool_names + from ..linebot_agents import get_mcp_servers_for_user + from ..bot.ai import parse_ai_response + + # 1. 取得 bot-restricted Agent + agent = await ai_manager.get_agent_by_name("bot-restricted") + if not agent: + logger.error("bot-restricted Agent 不存在,無法進行受限模式 AI 處理") + return "系統設定錯誤,請聯繫管理員。" + + # 2. 取得 model(優先使用環境變數設定) + model = settings.bot_restricted_model + + # 3. 取得 system prompt + system_prompt_data = agent.get("system_prompt") + if isinstance(system_prompt_data, dict): + base_prompt = system_prompt_data.get("content", "") + else: + base_prompt = "" + + if not base_prompt: + logger.error("bot-restricted Agent 缺少 system_prompt") + return "系統設定錯誤,請聯繫管理員。" + + # 加入對話識別(標記為未綁定用戶) + base_prompt += ( + f"\n\n【對話識別】\n" + f"平台用戶 ID: {platform_user_id}\n" + f"用戶身份: 未綁定用戶(受限模式)\n" + f"ctos_user_id: (未關聯)" + ) + + # 4. 取得 Agent 定義的工具白名單 + agent_tools = agent.get("tools") or [] + + # 受限模式不使用使用者權限,使用空權限(僅 Agent 定義的工具) + app_permissions: dict[str, bool] = {} + + # 組裝 system prompt(加入工具說明) + system_prompt = await build_system_prompt( + line_group_id, + platform_user_id if not is_group else None, + base_prompt, + agent_tools, + app_permissions, + ) + + # 5. 取得對話歷史(限制 10 條,較已綁定用戶的 20 條縮短) + history, images, files = await get_conversation_context( + line_group_id, + platform_user_id if not is_group else None, + limit=10, + exclude_message_id=message_uuid, + ) + + # 6. 準備用戶訊息 + if user_display_name: + user_message = f"user[{user_display_name}]: {content}" + else: + user_message = f"user: {content}" + + # 7. 組裝工具列表(僅 Agent 定義的工具 + 對應的 MCP 工具) + # 受限模式的 MCP 工具也受 Agent 設定限制 + mcp_tools = await get_mcp_tool_names(exclude_group_only=not is_group) + # 只保留 Agent 工具白名單中的 MCP 工具 + if agent_tools: + mcp_tool_set = set(mcp_tools) + allowed_mcp = [t for t in agent_tools if t in mcp_tool_set] + # Agent 工具中不在 MCP 列表中的可能是外部 skill + non_mcp_tools = [t for t in agent_tools if t not in mcp_tool_set] + all_tools = list(dict.fromkeys(allowed_mcp + non_mcp_tools)) + else: + all_tools = [] + + # 取得需要的 MCP server(按需載入) + required_mcp_servers = set() + if all_tools: + # 受限模式使用空權限取得 MCP servers + required_mcp_servers = await get_mcp_servers_for_user(app_permissions) + + # 8. 呼叫 Claude CLI + start_time = time.time() + + try: + response = await call_claude( + prompt=user_message, + model=model, + history=history, + system_prompt=system_prompt, + timeout=120, # 受限模式超時較短(2 分鐘) + tools=all_tools, + required_mcp_servers=required_mcp_servers, + ctos_user_id=None, # 未綁定用戶 + ) + except Exception: + logger.exception("受限模式 AI 呼叫失敗") + return "抱歉,處理您的訊息時發生錯誤,請稍後再試。" + + duration_ms = int((time.time() - start_time) * 1000) + + # 9. 記錄 AI Log + if message_uuid: + await log_linebot_ai_call( + message_uuid=message_uuid, + line_group_id=line_group_id, + is_group=is_group, + input_prompt=user_message, + history=history, + system_prompt=system_prompt, + allowed_tools=all_tools, + model=model, + response=response, + duration_ms=duration_ms, + ) + + # 10. 解析回應(受限模式僅支援文字,不處理 FILE_MESSAGE) + reply_text, _files = parse_ai_response(response.message) + + if not reply_text: + return "抱歉,我目前無法回答您的問題。" + + # 過濾 FILE_MESSAGE 標記(受限模式不支援檔案發送) + reply_text = re.sub(r"\[FILE_MESSAGE:[^\]]+\]", "", reply_text).strip() + + # 使用量已在 check_and_increment 中原子性計數,無需再次記錄 + + return reply_text or "抱歉,我目前無法回答您的問題。" diff --git a/backend/src/ching_tech_os/services/bot/rate_limiter.py b/backend/src/ching_tech_os/services/bot/rate_limiter.py new file mode 100644 index 00000000..542967b7 --- /dev/null +++ b/backend/src/ching_tech_os/services/bot/rate_limiter.py @@ -0,0 +1,181 @@ +"""Bot 頻率限制器 + +使用 PostgreSQL 的 bot_usage_tracking 表追蹤未綁定用戶的使用量。 +僅在 BOT_UNBOUND_USER_POLICY=restricted 且 BOT_RATE_LIMIT_ENABLED=true 時生效。 +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from ...config import settings +from ...database import get_connection + +logger = logging.getLogger(__name__) + + +class _RateLimitExceeded(Exception): + """內部例外:頻率超限,用於觸發 transaction rollback""" + + +def _current_hourly_key() -> str: + """取得當前小時的 period_key(如 '2026-02-26-14')""" + now = datetime.now(timezone.utc) + return now.strftime("%Y-%m-%d-%H") + + +def _current_daily_key() -> str: + """取得當天的 period_key(如 '2026-02-26')""" + now = datetime.now(timezone.utc) + return now.strftime("%Y-%m-%d") + + +async def check_and_increment(bot_user_id: str) -> tuple[bool, str | None]: + """原子性地檢查頻率限制並遞增計數器 + + 在同一個交易中執行 SELECT + UPDATE,避免 TOCTOU 競爭條件。 + + Args: + bot_user_id: bot_users.id (UUID 字串) + + Returns: + (是否允許, 拒絕訊息) - 允許時拒絕訊息為 None + """ + if not settings.bot_rate_limit_enabled: + # 即使未啟用頻率限制,仍記錄使用量供統計分析 + await record_usage(bot_user_id) + return True, None + + hourly_key = _current_hourly_key() + daily_key = _current_daily_key() + + try: + async with get_connection() as conn: + # 使用交易確保原子性:先遞增再檢查, + # 若超限則拋出例外觸發 rollback,避免被拒絕的請求虛增計數器 + async with conn.transaction(): + # 先 UPSERT 計數器(+1),再檢查是否超限 + # 這避免了 check-then-act 的 TOCTOU 問題 + hourly_row = await conn.fetchrow( + """ + INSERT INTO bot_usage_tracking (bot_user_id, period_type, period_key, message_count) + VALUES ($1, 'hourly', $2, 1) + ON CONFLICT (bot_user_id, period_type, period_key) + DO UPDATE SET message_count = bot_usage_tracking.message_count + 1, + updated_at = NOW() + RETURNING message_count + """, + bot_user_id, + hourly_key, + ) + daily_row = await conn.fetchrow( + """ + INSERT INTO bot_usage_tracking (bot_user_id, period_type, period_key, message_count) + VALUES ($1, 'daily', $2, 1) + ON CONFLICT (bot_user_id, period_type, period_key) + DO UPDATE SET message_count = bot_usage_tracking.message_count + 1, + updated_at = NOW() + RETURNING message_count + """, + bot_user_id, + daily_key, + ) + + hourly_count = hourly_row["message_count"] if hourly_row else 0 + daily_count = daily_row["message_count"] if daily_row else 0 + + # 檢查每小時限額(已遞增後的值) + if hourly_count > settings.bot_rate_limit_hourly: + raise _RateLimitExceeded( + f"您已達到每小時使用上限({settings.bot_rate_limit_hourly} 則訊息)。\n" + "請稍後再試,或綁定帳號以獲得完整服務。" + ) + + # 檢查每日限額 + if daily_count > settings.bot_rate_limit_daily: + raise _RateLimitExceeded( + f"您已達到每日使用上限({settings.bot_rate_limit_daily} 則訊息)。\n" + "請明天再試,或綁定帳號以獲得完整服務。" + ) + + return True, None + + except _RateLimitExceeded as e: + # 超限:transaction 已 rollback,計數器未遞增 + return False, str(e) + + except Exception: + logger.exception("頻率限制檢查失敗,允許通過(fail-open)") + return True, None + + +async def record_usage(bot_user_id: str) -> None: + """記錄使用量(UPSERT 每小時和每日計數) + + 用於 rate limit 未啟用時仍記錄統計資料。 + + Args: + bot_user_id: bot_users.id (UUID 字串) + """ + hourly_key = _current_hourly_key() + daily_key = _current_daily_key() + + try: + async with get_connection() as conn: + async with conn.transaction(): + # UPSERT 每小時計數 + await conn.execute( + """ + INSERT INTO bot_usage_tracking (bot_user_id, period_type, period_key, message_count) + VALUES ($1, 'hourly', $2, 1) + ON CONFLICT (bot_user_id, period_type, period_key) + DO UPDATE SET message_count = bot_usage_tracking.message_count + 1, + updated_at = NOW() + """, + bot_user_id, + hourly_key, + ) + # UPSERT 每日計數 + await conn.execute( + """ + INSERT INTO bot_usage_tracking (bot_user_id, period_type, period_key, message_count) + VALUES ($1, 'daily', $2, 1) + ON CONFLICT (bot_user_id, period_type, period_key) + DO UPDATE SET message_count = bot_usage_tracking.message_count + 1, + updated_at = NOW() + """, + bot_user_id, + daily_key, + ) + except Exception: + # 記錄使用量失敗不應阻擋訊息處理 + logger.exception("記錄使用量失敗") + + +async def cleanup_old_tracking(days: int = 30) -> int: + """清理過期的使用量追蹤資料 + + Args: + days: 保留天數(預設 30 天) + + Returns: + 刪除的記錄數 + """ + try: + async with get_connection() as conn: + result = await conn.execute( + """ + DELETE FROM bot_usage_tracking + WHERE updated_at < NOW() - INTERVAL '1 day' * $1 + """, + days, + ) + # result 格式如 "DELETE 42" + deleted = int(result.split()[-1]) if result else 0 + if deleted > 0: + logger.info("已清理 %d 筆過期的使用量追蹤資料", deleted) + return deleted + except Exception: + logger.exception("清理使用量追蹤資料失敗") + return 0 diff --git a/backend/src/ching_tech_os/services/bot_line/trigger.py b/backend/src/ching_tech_os/services/bot_line/trigger.py index 96ac464b..24eca492 100644 --- a/backend/src/ching_tech_os/services/bot_line/trigger.py +++ b/backend/src/ching_tech_os/services/bot_line/trigger.py @@ -61,15 +61,17 @@ async def is_bot_message(line_message_id: str) -> bool: async def reset_conversation( - line_user_id: str, + platform_user_id: str, ) -> bool: """重置用戶的對話歷史 設定 conversation_reset_at 為當前時間, 之後查詢對話歷史時會忽略這個時間之前的訊息。 + 支援 Line 和 Telegram 平台(以 platform_user_id 查詢)。 + Args: - line_user_id: Line 用戶 ID + platform_user_id: 平台用戶 ID(Line user ID 或 Telegram user ID) Returns: 是否成功 @@ -81,11 +83,11 @@ async def reset_conversation( SET conversation_reset_at = NOW() WHERE platform_user_id = $1 """, - line_user_id, + platform_user_id, ) success = result == "UPDATE 1" if success: - logger.info(f"已重置對話歷史: {line_user_id}") + logger.info(f"已重置對話歷史: {platform_user_id}") return success diff --git a/backend/src/ching_tech_os/services/bot_telegram/handler.py b/backend/src/ching_tech_os/services/bot_telegram/handler.py index f600ffb9..a44b4a39 100644 --- a/backend/src/ching_tech_os/services/bot_telegram/handler.py +++ b/backend/src/ching_tech_os/services/bot_telegram/handler.py @@ -38,9 +38,6 @@ logger = logging.getLogger("bot_telegram.handler") -# 重置對話指令 -RESET_COMMANDS = {"/新對話", "/reset"} - # /start 歡迎訊息 START_MESSAGE = ( "👋 歡迎使用 CTOS Bot!\n\n" @@ -99,7 +96,7 @@ async def _ensure_bot_user(user, conn) -> str: display_name, row["id"], ) - return row["id"] + return str(row["id"]) # 新建用戶 row = await conn.fetchrow( @@ -113,7 +110,7 @@ async def _ensure_bot_user(user, conn) -> str: display_name, ) logger.info(f"建立 Telegram 用戶: {display_name} ({platform_user_id})") - return row["id"] + return str(row["id"]) async def _ensure_bot_group(chat, conn) -> str: @@ -137,7 +134,7 @@ async def _ensure_bot_group(chat, conn) -> str: group_name, row["id"], ) - return row["id"] + return str(row["id"]) # 新建群組(預設 allow_ai_response = false) row = await conn.fetchrow( @@ -151,7 +148,7 @@ async def _ensure_bot_group(chat, conn) -> str: group_name, ) logger.info(f"建立 Telegram 群組: {group_name} ({platform_group_id})") - return row["id"] + return str(row["id"]) async def _save_message( @@ -425,10 +422,33 @@ async def _handle_text( except Exception as e: logger.error(f"確保用戶/群組失敗: {e}", exc_info=True) - # 私訊才處理指令和綁定驗證碼 + # === 斜線指令攔截(統一使用 CommandRouter) === + from ..bot.commands import CommandContext, get_command_user_context, router as command_router + + parsed = command_router.parse(text) + if parsed is not None: + command, args = parsed + ctos_user_id, is_admin = await get_command_user_context(bot_user_id) + + ctx = CommandContext( + platform_type="telegram", + platform_user_id=str(user.id) if user else "", + bot_user_id=bot_user_id, + ctos_user_id=ctos_user_id, + is_admin=is_admin, + is_group=is_group, + group_id=bot_group_id, + reply_token=None, + raw_args=args, + ) + reply = await command_router.dispatch(command, args, ctx) + if reply is not None: + await adapter.send_text(chat_id, reply) + return + + # Telegram 專屬指令(不在 CommandRouter 中的) if not is_group: - # /start 和 /help 指令(不需綁定即可使用) - cmd = text.strip().split("@")[0] # 處理 /start@botname 格式 + cmd = text.strip().split("@")[0] if cmd == "/start": await adapter.send_text(chat_id, START_MESSAGE) return @@ -436,20 +456,6 @@ async def _handle_text( await adapter.send_text(chat_id, HELP_MESSAGE) return - # 檢查重置指令 - if text.strip() in RESET_COMMANDS: - if bot_user_id: - try: - async with get_connection() as conn: - await conn.execute( - "UPDATE bot_users SET conversation_reset_at = NOW() WHERE id = $1", - bot_user_id, - ) - except Exception as e: - logger.error(f"重置對話失敗: {e}", exc_info=True) - await adapter.send_text(chat_id, "對話已重置 ✨") - return - # 檢查是否為綁定驗證碼(6 位數字) if bot_user_id and await is_binding_code_format(text.strip()): success, msg = await verify_binding_code(bot_user_id, text.strip()) @@ -463,20 +469,68 @@ async def _handle_text( ) if not has_access: if deny_reason == "user_not_bound": - if not is_group: - # 私訊:回覆綁定提示 - await adapter.send_text( - chat_id, - "請先在 CTOS 系統綁定您的 Telegram 帳號才能使用此服務。\n\n" - "步驟:\n" - "1. 登入 CTOS 系統\n" - "2. 進入 Bot 管理頁面\n" - "3. 點擊「綁定帳號」產生驗證碼\n" - "4. 將 6 位數驗證碼發送給我完成綁定\n\n" - f"📋 您的 Telegram ID:{chat_id}\n" - "(設定 Admin Chat ID 時可使用此 ID)", - ) - # 群組:未綁定用戶靜默忽略 + # 身份分流:根據策略決定拒絕或進入受限模式 + from ..bot.identity_router import ( + route_unbound, + handle_restricted_mode, + ) + + route_result = route_unbound( + platform_type="telegram", is_group=is_group + ) + if route_result.action == "reject": + if not is_group and route_result.reply_text: + extra = ( + f"\n\n📋 您的 Telegram ID:{chat_id}\n" + "(設定 Admin Chat ID 時可使用此 ID)" + ) + await adapter.send_text( + chat_id, route_result.reply_text + extra + ) + elif route_result.action == "restricted": + # 斜線指令已在上方 L431 由 CommandRouter 統一處理並 return, + # 到此處的文字一定不是已註冊指令,直接進入 AI 流程。 + + # 受限模式 AI 處理 + try: + display_name = None + if user: + display_name = user.full_name or user.username + + # 儲存訊息以取得 message_uuid(用於 AI log) + restricted_msg_uuid = None + if bot_user_id: + try: + async with get_connection() as conn: + restricted_msg_uuid = await _save_message( + conn, + message_id=f"tg_{message.message_id}", + bot_user_id=bot_user_id, + bot_group_id=bot_group_id, + message_type="text", + content=text, + is_from_bot=False, + ) + except Exception as e: + logger.warning(f"受限模式儲存訊息失敗: {e}") + + reply = await handle_restricted_mode( + content=text, + platform_user_id=str(user.id) if user else chat_id, + bot_user_id=bot_user_id, + is_group=is_group, + line_group_id=None, + message_uuid=restricted_msg_uuid, + user_display_name=display_name, + ) + if reply: + await adapter.send_text(chat_id, reply) + except Exception as e: + logger.error(f"受限模式 AI 處理失敗: {e}", exc_info=True) + await adapter.send_text( + chat_id, "抱歉,處理訊息時發生錯誤,請稍後再試。" + ) + # silent: 群組靜默忽略 # group_not_allowed:靜默忽略 return diff --git a/backend/src/ching_tech_os/services/knowledge.py b/backend/src/ching_tech_os/services/knowledge.py index 5748cbbd..fac5e23a 100644 --- a/backend/src/ching_tech_os/services/knowledge.py +++ b/backend/src/ching_tech_os/services/knowledge.py @@ -212,6 +212,7 @@ def _metadata_to_response( scope=metadata.get("scope", "global"), owner=metadata.get("owner"), project_id=metadata.get("project_id"), + is_public=bool(metadata.get("is_public", False)), tags=tags, source=source, related=metadata.get("related", []), @@ -258,6 +259,7 @@ def search_knowledge( topics: list[str] | None = None, scope: str | None = None, current_username: str | None = None, + public_only: bool = False, ) -> KnowledgeListResponse: """搜尋知識 @@ -271,6 +273,7 @@ def search_knowledge( topics: 主題過濾 scope: 範圍過濾(global、personal、all) current_username: 目前使用者帳號(用於過濾個人知識) + public_only: 僅回傳公開知識(受限模式用) Returns: 符合條件的知識列表 @@ -361,6 +364,13 @@ def search_knowledge( if matching_files is not None and entry.filename not in matching_files: continue + # 公開存取過濾(受限模式:僅回傳 scope=global 且 is_public=true 的項目) + if public_only: + entry_is_public = getattr(entry, "is_public", False) + entry_scope_val = getattr(entry, "scope", "global") + if entry_scope_val != "global" or not entry_is_public: + continue + # Scope 過濾 entry_scope = getattr(entry, "scope", "global") entry_owner = getattr(entry, "owner", None) @@ -410,6 +420,7 @@ def search_knowledge( updated_at = date.fromisoformat(updated_at) entry_project_id = getattr(entry, "project_id", None) + entry_is_public = getattr(entry, "is_public", False) results.append( KnowledgeListItem( id=entry.id, @@ -419,6 +430,7 @@ def search_knowledge( scope=entry_scope, owner=entry_owner, project_id=entry_project_id, + is_public=entry_is_public, tags=entry.tags, author=entry.author, updated_at=updated_at, @@ -487,6 +499,7 @@ def create_knowledge( "scope": knowledge_scope, "owner": knowledge_owner, "project_id": knowledge_project_id, + "is_public": data.is_public, "tags": { "projects": data.tags.projects, "roles": data.tags.roles, @@ -600,6 +613,8 @@ def update_knowledge( } if data.related is not None: metadata["related"] = data.related + if data.is_public is not None: + metadata["is_public"] = data.is_public # 更新內容 if data.content is not None: diff --git a/backend/src/ching_tech_os/services/linebot_agents.py b/backend/src/ching_tech_os/services/linebot_agents.py index 030780c3..e42822b4 100644 --- a/backend/src/ching_tech_os/services/linebot_agents.py +++ b/backend/src/ching_tech_os/services/linebot_agents.py @@ -6,6 +6,7 @@ import logging from . import ai_manager +from ..config import settings from ..models.ai import AiPromptCreate, AiAgentCreate from .permissions import get_effective_app_permissions @@ -32,6 +33,8 @@ # Agent 名稱常數 AGENT_LINEBOT_PERSONAL = "linebot-personal" AGENT_LINEBOT_GROUP = "linebot-group" +AGENT_BOT_RESTRICTED = "bot-restricted" +AGENT_BOT_DEBUG = "bot-debug" # 完整的 linebot-personal prompt LINEBOT_PERSONAL_PROMPT = """你是擎添工業的 AI 助理,透過 Line 或 Telegram 與用戶進行個人對話。 @@ -399,6 +402,76 @@ - 列表用「・」或數字編號 - 分隔用空行,不要用分隔線""" +# 受限模式 prompt(未綁定用戶使用) +BOT_RESTRICTED_PROMPT = """你是 AI 助理,僅能回答特定範圍的問題。請根據可用工具和知識範圍提供協助。 + +你的能力範圍: +- 回答一般性問題 +- 搜尋公開的知識庫內容(使用 search_knowledge 工具) +- 提供基本資訊查詢 + +限制: +- 你無法存取內部系統資料 +- 你無法執行任何修改操作 +- 你只能回覆純文字訊息 + +回應原則: +- 使用繁體中文 +- 語氣親切專業 +- 如果無法回答,請誠實告知並建議綁定帳號以獲得完整功能 + +格式規則(極重要,必須遵守): +- 絕對禁止使用任何 Markdown 格式 +- 禁止:### 標題、**粗體**、*斜體*、`程式碼`、[連結](url)、- 列表 +- 只能使用純文字、emoji、全形標點符號 +- 列表用「・」或數字編號 +- 分隔用空行,不要用分隔線""" + +# Debug 模式 prompt(管理員診斷用) +BOT_DEBUG_PROMPT = """你是 CTOS 系統診斷助理,專門協助管理員分析和診斷系統問題。 + +你可以使用 run_skill_script 工具執行以下診斷腳本(skill: debug-skill): + +1. check-server-logs - 查詢伺服器日誌 + · 參數:lines(行數,預設 50)、keyword(關鍵字過濾) + · 用途:查看 CTOS 後端服務的運行日誌 + +2. check-ai-logs - 查詢 AI 對話記錄 + · 參數:limit(筆數,預設 10)、errors_only(僅顯示錯誤,預設 false) + · 用途:檢查 AI 呼叫記錄、失敗原因 + +3. check-nginx-logs - 查詢 Nginx 日誌 + · 參數:lines(行數,預設 50)、type(access 或 error,預設 error) + · 用途:查看 HTTP 請求日誌和錯誤 + +4. check-db-status - 查詢資料庫狀態 + · 參數:無 + · 用途:查看資料庫連線數、表大小、磁碟使用量 + +5. check-system-health - 綜合健康檢查 + · 參數:無 + · 用途:一次檢查所有項目,產生摘要報告 + +診斷流程建議: +1. 如果用戶描述了具體問題,針對性地選擇相關的診斷腳本 +2. 如果用戶沒有描述具體問題,先執行 check-system-health 取得整體狀態 +3. 根據初步結果,深入調查可疑的項目 + +輸出格式: +- 問題摘要:簡述發現的問題 +- 嚴重程度:正常 / 注意 / 警告 / 嚴重 +- 可能原因:列出最可能的原因 +- 建議處理方式:具體的處理步驟 + +安全限制: +- 僅使用 debug-skill 提供的腳本,不要執行其他操作 +- 所有操作都是唯讀的,不會修改系統狀態 + +格式規則(極重要,必須遵守): +- 絕對禁止使用任何 Markdown 格式 +- 只能使用純文字、emoji、全形標點符號 +- 列表用「・」或數字編號""" + # 預設 Agent 設定 DEFAULT_LINEBOT_AGENTS = [ { @@ -429,6 +502,38 @@ }, ] +# 受限模式 + Debug 模式 Agent 設定 +DEFAULT_BOT_MODE_AGENTS = [ + { + "name": AGENT_BOT_RESTRICTED, + "display_name": "受限模式助理", + "description": "未綁定用戶的受限模式 Agent,prompt 和工具可由部署方自訂", + "model": f"claude-{settings.bot_restricted_model}", + "tools": ["search_knowledge"], + "prompt": { + "name": AGENT_BOT_RESTRICTED, + "display_name": "受限模式助理 Prompt", + "category": "bot", + "content": BOT_RESTRICTED_PROMPT, + "description": "未綁定用戶使用,受限的 AI 回覆功能", + }, + }, + { + "name": AGENT_BOT_DEBUG, + "display_name": "系統診斷助理", + "description": "管理員專用的系統診斷 Agent", + "model": f"claude-{settings.bot_debug_model}", + "tools": ["run_skill_script"], + "prompt": { + "name": AGENT_BOT_DEBUG, + "display_name": "系統診斷助理 Prompt", + "category": "bot", + "content": BOT_DEBUG_PROMPT, + "description": "管理員診斷系統問題使用,搭配 debug-skill", + }, + }, +] + async def _build_seed_prompt(is_group: bool) -> str: """建立預設 Agent 的動態工具 prompt(僅首次建立時使用)。""" @@ -453,14 +558,14 @@ async def _build_seed_prompt(is_group: bool) -> str: return "\n\n".join(sections) -async def ensure_default_linebot_agents() -> None: - """ - 確保預設的 Line Bot Agent 存在。 +async def _ensure_agents(agent_configs: list[dict], *, use_dynamic_prompt: bool = False) -> None: + """確保指定的 Agent 存在(不覆蓋已存在的設定)。 - 如果 Agent 已存在則跳過(保留使用者修改)。 - 如果不存在則建立 Agent 和對應的 Prompt。 + Args: + agent_configs: Agent 設定列表 + use_dynamic_prompt: 是否使用動態生成的工具 prompt(僅 linebot agents 使用) """ - for agent_config in DEFAULT_LINEBOT_AGENTS: + for agent_config in agent_configs: agent_name = agent_config["name"] # 檢查 Agent 是否存在 @@ -477,14 +582,17 @@ async def ensure_default_linebot_agents() -> None: prompt_id = existing_prompt["id"] logger.debug(f"Prompt '{prompt_config['name']}' 已存在,使用現有 Prompt") else: - is_group = agent_name == AGENT_LINEBOT_GROUP - dynamic_content = await _build_seed_prompt(is_group) + content = prompt_config["content"] + if use_dynamic_prompt: + is_group = agent_name == AGENT_LINEBOT_GROUP + dynamic_content = await _build_seed_prompt(is_group) + content = dynamic_content or content # 建立 Prompt prompt_data = AiPromptCreate( name=prompt_config["name"], display_name=prompt_config["display_name"], category=prompt_config["category"], - content=dynamic_content or prompt_config["content"], + content=content, description=prompt_config["description"], ) new_prompt = await ai_manager.create_prompt(prompt_data) @@ -499,11 +607,25 @@ async def ensure_default_linebot_agents() -> None: model=agent_config["model"], system_prompt_id=prompt_id, is_active=True, + tools=agent_config.get("tools"), ) await ai_manager.create_agent(agent_data) logger.info(f"已建立 Agent: {agent_name}") +async def ensure_default_linebot_agents() -> None: + """ + 確保預設的 Line Bot Agent 和模式 Agent 存在。 + + 如果 Agent 已存在則跳過(保留使用者修改)。 + 如果不存在則建立 Agent 和對應的 Prompt。 + """ + # 原有的 linebot agents(使用動態 prompt) + await _ensure_agents(DEFAULT_LINEBOT_AGENTS, use_dynamic_prompt=True) + # 受限模式 + Debug 模式 agents(使用靜態 prompt) + await _ensure_agents(DEFAULT_BOT_MODE_AGENTS, use_dynamic_prompt=False) + + async def get_linebot_agent(is_group: bool) -> dict | None: """ 取得 Line Bot Agent 設定。 diff --git a/backend/src/ching_tech_os/services/linebot_ai.py b/backend/src/ching_tech_os/services/linebot_ai.py index 0183a7aa..22c9cda0 100644 --- a/backend/src/ching_tech_os/services/linebot_ai.py +++ b/backend/src/ching_tech_os/services/linebot_ai.py @@ -34,7 +34,6 @@ save_bot_response, save_file_record, reset_conversation, - is_reset_command, ensure_temp_image, get_image_info_by_line_message_id, get_temp_image_path, @@ -480,40 +479,7 @@ async def process_message_with_ai( """ is_group = line_group_id is not None - # 檢查是否為重置對話指令(僅限個人對話) - if is_reset_command(content): - if is_group: - # 群組不支援重置,靜默忽略 - return None - elif line_user_id: - # 個人對話:執行重置 - await reset_conversation(line_user_id) - reset_msg = "已清除對話歷史,開始新對話!有什麼可以幫你的嗎?" - # 儲存 Bot 回應 - await save_bot_response( - group_uuid=None, - content=reset_msg, - responding_to_line_user_id=line_user_id, - ) - # 回覆訊息(reply token 可能過期,失敗時改用 push message) - reply_success = False - if reply_token: - try: - await reply_text(reply_token, reset_msg) - reply_success = True - except Exception as e: - logger.warning(f"回覆重置訊息失敗(reply token 可能過期): {e}") - - # 如果沒有 reply_token 或回覆失敗,改用 push message - if not reply_success and line_user_id: - try: - await push_text(line_user_id, reset_msg) - logger.info(f"使用 push message 發送重置訊息給 {line_user_id}") - except Exception as e: - logger.error(f"Push 重置訊息也失敗: {e}") - - return reset_msg - return None + # 重置對話指令已由 CommandRouter 在 handle_text_message 中攔截處理 # 檢查是否回覆機器人訊息(群組對話用) is_reply_to_bot = False @@ -942,7 +908,7 @@ async def process_message_with_ai( async def log_linebot_ai_call( - message_uuid: UUID, + message_uuid: UUID | None, line_group_id: UUID | None, is_group: bool, input_prompt: str, @@ -1008,7 +974,7 @@ async def log_linebot_ai_call( agent_id=agent_id, prompt_id=prompt_id, context_type=context_type_override or ("linebot-group" if is_group else "linebot-personal"), - context_id=str(message_uuid), + context_id=str(message_uuid) if message_uuid else None, input_prompt=full_input, system_prompt=system_prompt, allowed_tools=allowed_tools, @@ -1410,12 +1376,55 @@ async def handle_text_message( reply_token: Line 回覆 token quoted_message_id: 被回覆的訊息 ID(用戶回覆舊訊息時) """ - # 取得用戶顯示名稱 + # === 斜線指令攔截(在 AI 處理之前) === + from .bot.commands import CommandContext, router as command_router + + # 查詢用戶資訊(指令和一般訊息都需要,只查一次) + user_row = await get_line_user_record( + line_user_id, "id, user_id, display_name" + ) + ctos_user_id = None + is_admin = False + bot_user_id = None user_display_name = None - user_row = await get_line_user_record(line_user_id, "display_name") if user_row: - user_display_name = user_row["display_name"] + bot_user_id = str(user_row["id"]) if user_row["id"] else None + user_display_name = user_row.get("display_name") + if user_row["user_id"]: + ctos_user_id = user_row["user_id"] + from .user import get_user_role_and_permissions + user_info = await get_user_role_and_permissions(ctos_user_id) + is_admin = user_info["role"] == "admin" + + parsed = command_router.parse(content) + if parsed is not None: + command, args = parsed + ctx = CommandContext( + platform_type="line", + platform_user_id=line_user_id, + bot_user_id=bot_user_id, + ctos_user_id=ctos_user_id, + is_admin=is_admin, + is_group=line_group_id is not None, + group_id=str(line_group_id) if line_group_id else None, + reply_token=reply_token, + raw_args=args, + ) + reply = await command_router.dispatch(command, args, ctx) + if reply is not None: + # reply_token 可能在長時間指令(如 /debug 3 分鐘)後過期, + # 先嘗試 reply,失敗則 fallback 到 push + if reply_token: + try: + await reply_text(reply_token, reply) + except Exception: + await push_text(line_user_id, reply) + else: + await push_text(line_user_id, reply) + # 指令已處理,不進入 AI 流程 + return + # === 一般訊息,進入 AI 處理 === # 處理訊息 await process_message_with_ai( message_uuid=message_uuid, diff --git a/backend/src/ching_tech_os/services/mcp/knowledge_tools.py b/backend/src/ching_tech_os/services/mcp/knowledge_tools.py index 96a1faaa..b1b8cd09 100644 --- a/backend/src/ching_tech_os/services/mcp/knowledge_tools.py +++ b/backend/src/ching_tech_os/services/mcp/knowledge_tools.py @@ -120,12 +120,16 @@ async def search_knowledge( except Exception as e: logger.warning(f"取得使用者名稱失敗: {e}") + # 未綁定用戶(ctos_user_id 為 None)僅能搜尋公開知識 + public_only = ctos_user_id is None + try: result = kb_service.search_knowledge( query=search_query, project=project, category=category, current_username=current_username, + public_only=public_only, ) if not result.items: diff --git a/backend/src/ching_tech_os/services/mcp/nas_tools.py b/backend/src/ching_tech_os/services/mcp/nas_tools.py index 92dd7281..1d6b65ee 100644 --- a/backend/src/ching_tech_os/services/mcp/nas_tools.py +++ b/backend/src/ching_tech_os/services/mcp/nas_tools.py @@ -892,21 +892,38 @@ async def list_library_folders( max_depth: 瀏覽深度,預設 2 ctos_user_id: CTOS 用戶 ID(從對話識別取得,用於權限檢查) """ + from ...config import settings + await ensure_db_connection() - # 權限檢查 - allowed, error_msg = await check_mcp_tool_permission("list_library_folders", ctos_user_id) - if not allowed: - return f"❌ {error_msg}" + # 未綁定用戶:判斷公開資料夾過濾 + is_unbound = ctos_user_id is None - lib_allowed, lib_result = await _check_library_permission(ctos_user_id) - if not lib_allowed: - return f"錯誤:{lib_result}" - library_root = lib_result + # 權限檢查(未綁定用戶走公開資料夾路徑,跳過完整權限檢查) + if is_unbound: + library_root = settings.library_mount_path + public_folders = settings.library_public_folders + else: + allowed, error_msg = await check_mcp_tool_permission("list_library_folders", ctos_user_id) + if not allowed: + return f"❌ {error_msg}" + lib_allowed, lib_result = await _check_library_permission(ctos_user_id) + if not lib_allowed: + return f"錯誤:{lib_result}" + library_root = lib_result + public_folders = [] # 組合目標路徑 if path: - clean_path = _sanitize_path_segment(path) + # 分段清理路徑(支援多層路徑如「產品資料/子目錄」) + segments = [_sanitize_path_segment(s) for s in path.split("/") if s.strip()] + segments = [s for s in segments if s] + if not segments: + return f"路徑無效:{path}" + # 未綁定用戶:檢查第一層目錄是否在公開列表中 + if is_unbound and segments[0] not in public_folders: + return "❌ 此資料夾不對外開放" + clean_path = "/".join(segments) target_dir = FsPath(library_root) / clean_path else: target_dir = FsPath(library_root) @@ -916,9 +933,12 @@ async def list_library_folders( if not target_dir.is_dir(): return f"路徑不是資料夾:{path}" - # 遍歷資料夾結構 + # 遍歷資料夾結構(未綁定用戶只顯示公開資料夾) lines = ["擎添圖書館/" + (f"{path}/" if path else "")] - _walk_tree(target_dir, lines, prefix="", current_depth=0, max_depth=max_depth) + _walk_tree( + target_dir, lines, prefix="", current_depth=0, max_depth=max_depth, + allowed_names=public_folders if is_unbound and not path else None, + ) if len(lines) == 1: lines.append(" (空)") @@ -932,8 +952,13 @@ def _walk_tree( prefix: str, current_depth: int, max_depth: int, + allowed_names: list[str] | None = None, ) -> None: - """遞迴建立樹狀結構文字""" + """遞迴建立樹狀結構文字 + + Args: + allowed_names: 若指定,只顯示名稱在列表中的第一層子目錄(用於公開資料夾過濾) + """ if current_depth >= max_depth: return @@ -944,7 +969,10 @@ def _walk_tree( return dirs = [e for e in entries if e.is_dir()] - files = [e for e in entries if e.is_file()] + # 根目錄層級過濾:只保留公開資料夾 + if allowed_names is not None: + dirs = [d for d in dirs if d.name in allowed_names] + files = [e for e in entries if e.is_file()] if allowed_names is None else [] for i, d in enumerate(dirs): is_last = (i == len(dirs) - 1) and not files diff --git a/backend/src/ching_tech_os/services/scheduler.py b/backend/src/ching_tech_os/services/scheduler.py index befa21a2..c5698c90 100644 --- a/backend/src/ching_tech_os/services/scheduler.py +++ b/backend/src/ching_tech_os/services/scheduler.py @@ -310,6 +310,20 @@ async def cleanup_media_temp_folders(): logger.debug("媒體暫存清理: 無過期資料夾") +async def cleanup_old_bot_tracking(): + """清理過期的 bot 使用量追蹤資料(保留 30 天)""" + from .bot.rate_limiter import cleanup_old_tracking + + try: + deleted = await cleanup_old_tracking(days=30) + if deleted > 0: + logger.info(f"清理 Bot 使用量追蹤: 刪除 {deleted} 筆") + else: + logger.debug("Bot 使用量追蹤清理: 無過期資料") + except Exception as e: + logger.error(f"清理 Bot 使用量追蹤失敗: {e}") + + async def check_telegram_webhook_health(): """ 檢查 Telegram Webhook 健康狀態 @@ -422,6 +436,15 @@ def start_scheduler(): replace_existing=True ) + # 每天凌晨 4:30 清理過期的 bot 使用量追蹤資料 + scheduler.add_job( + cleanup_old_bot_tracking, + CronTrigger(hour=4, minute=30), + id='cleanup_old_bot_tracking', + name='清理過期 Bot 使用量追蹤', + replace_existing=True, + ) + # 依啟用模組註冊排程任務 for module_id, info in get_module_registry().items(): if not is_module_enabled(module_id): diff --git a/backend/src/ching_tech_os/skills/debug-skill/SKILL.md b/backend/src/ching_tech_os/skills/debug-skill/SKILL.md new file mode 100644 index 00000000..ade70d3b --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/SKILL.md @@ -0,0 +1,34 @@ +--- +name: debug-skill +description: 系統診斷工具(管理員專用),用於查詢伺服器、AI、Nginx 日誌和資料庫狀態 +allowed-tools: mcp__ching-tech-os__run_skill_script +metadata: + ctos: + mcp_servers: ching-tech-os + requires_app: admin +--- + +【系統診斷工具】 +管理員專用的診斷腳本集合,透過 `run_skill_script` 呼叫: + +- `check-server-logs`: 查詢 CTOS 伺服器日誌 + · input: {"lines": 50, "keyword": "error"} + · lines: 查詢行數(預設 50) + · keyword: 關鍵字過濾(可選) + +- `check-ai-logs`: 查詢 AI 對話記錄 + · input: {"limit": 10, "errors_only": false} + · limit: 查詢筆數(預設 10) + · errors_only: 僅顯示失敗記錄(預設 false) + +- `check-nginx-logs`: 查詢 Nginx 日誌 + · input: {"lines": 50, "type": "error"} + · lines: 查詢行數(預設 50) + · type: "access" 或 "error"(預設 "error") + +- `check-db-status`: 查詢資料庫狀態 + · input: {}(無參數) + +- `check-system-health`: 綜合健康檢查 + · input: {}(無參數) + · 一次檢查所有項目,回傳摘要報告 diff --git a/backend/src/ching_tech_os/skills/debug-skill/scripts/check-ai-logs.py b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-ai-logs.py new file mode 100644 index 00000000..27d68c10 --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-ai-logs.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +"""查詢 AI 對話記錄(ai_logs 資料表)""" + +import json +import subprocess + +from ching_tech_os.skills.script_utils import parse_stdin_json_object + + +def _safe_int(value, default: int, min_val: int = 1, max_val: int = 50) -> int: + """安全的整數轉換,帶範圍限制""" + try: + return max(min_val, min(int(value), max_val)) + except (ValueError, TypeError): + return default + + +def main() -> int: + payload, error = parse_stdin_json_object() + if error: + print(json.dumps({"success": False, "error": error}, ensure_ascii=False)) + return 1 + payload = payload or {} + + limit = _safe_int(payload.get("limit", 10), default=10, max_val=50) + errors_only = payload.get("errors_only", False) is True # 嚴格布林驗證 + + # 建構 SQL 查詢(LIMIT 值已透過 _safe_int 做範圍限制) + where_clause = "WHERE success = false" if errors_only else "" + sql = ( + f"SELECT id, context_type, model, success, duration_ms, " + f"error_message, created_at " + f"FROM ai_logs {where_clause} " + f"ORDER BY created_at DESC LIMIT {limit}" + ) + + try: + cmd = [ + "docker", "exec", "ching-tech-os-db", + "psql", "-U", "ching_tech", "-d", "ching_tech_os", + "-c", sql, + ] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=15, + ) + + output = result.stdout or "" + if result.returncode != 0 and result.stderr: + output += f"\n[stderr] {result.stderr}" + + print(json.dumps({ + "success": True, + "limit": limit, + "errors_only": errors_only, + "output": output[:30000], + }, ensure_ascii=False)) + return 0 + + except subprocess.TimeoutExpired: + print(json.dumps({"success": False, "error": "指令執行逾時"}, ensure_ascii=False)) + return 1 + except Exception: + print(json.dumps({"success": False, "error": "查詢 AI 日誌失敗"}, ensure_ascii=False)) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/src/ching_tech_os/skills/debug-skill/scripts/check-db-status.py b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-db-status.py new file mode 100644 index 00000000..a42d9b71 --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-db-status.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +"""查詢資料庫狀態(連線數、主要資料表行數、資料庫大小)""" + +import json +import subprocess + +from ching_tech_os.skills.script_utils import parse_stdin_json_object + + +def _run_sql(sql: str, timeout: int = 10) -> str: + """執行 SQL 並回傳結果""" + result = subprocess.run( + [ + "docker", "exec", "ching-tech-os-db", + "psql", "-U", "ching_tech", "-d", "ching_tech_os", + "-c", sql, + ], + capture_output=True, + text=True, + timeout=timeout, + ) + return result.stdout or result.stderr or "" + + +def main() -> int: + payload, error = parse_stdin_json_object() + if error: + print(json.dumps({"success": False, "error": error}, ensure_ascii=False)) + return 1 + + try: + sections = [] + + # 1. 資料庫大小 + db_size = _run_sql( + "SELECT pg_size_pretty(pg_database_size('ching_tech_os')) as db_size;" + ) + sections.append(f"=== 資料庫大小 ===\n{db_size}") + + # 2. 連線數 + connections = _run_sql( + "SELECT state, count(*) FROM pg_stat_activity " + "WHERE datname = 'ching_tech_os' GROUP BY state ORDER BY count DESC;" + ) + sections.append(f"=== 連線狀態 ===\n{connections}") + + # 3. 主要資料表行數 + table_counts = _run_sql( + "SELECT relname as table_name, n_live_tup as row_count " + "FROM pg_stat_user_tables " + "WHERE schemaname = 'public' " + "ORDER BY n_live_tup DESC LIMIT 20;" + ) + sections.append(f"=== 資料表行數(前 20)===\n{table_counts}") + + # 4. 資料表大小 + table_sizes = _run_sql( + "SELECT tablename, " + "pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename)) as size " + "FROM pg_tables WHERE schemaname = 'public' " + "ORDER BY pg_total_relation_size(schemaname || '.' || tablename) DESC LIMIT 10;" + ) + sections.append(f"=== 資料表大小(前 10)===\n{table_sizes}") + + output = "\n".join(sections) + + print(json.dumps({ + "success": True, + "output": output[:30000], + }, ensure_ascii=False)) + return 0 + + except subprocess.TimeoutExpired: + print(json.dumps({"success": False, "error": "指令執行逾時"}, ensure_ascii=False)) + return 1 + except Exception: + print(json.dumps({"success": False, "error": "查詢資料庫狀態失敗"}, ensure_ascii=False)) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/src/ching_tech_os/skills/debug-skill/scripts/check-nginx-logs.py b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-nginx-logs.py new file mode 100644 index 00000000..09ba0dd3 --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-nginx-logs.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""查詢 Nginx 日誌(docker logs ching-tech-os-nginx)""" + +import json +import subprocess + +from ching_tech_os.skills.script_utils import parse_stdin_json_object + +# 允許的 log_type 白名單 +_VALID_LOG_TYPES = {"access", "error"} + + +def _safe_int(value, default: int, min_val: int = 1, max_val: int = 500) -> int: + """安全的整數轉換,帶範圍限制""" + try: + return max(min_val, min(int(value), max_val)) + except (ValueError, TypeError): + return default + + +def main() -> int: + payload, error = parse_stdin_json_object() + if error: + print(json.dumps({"success": False, "error": error}, ensure_ascii=False)) + return 1 + payload = payload or {} + + lines = _safe_int(payload.get("lines", 50), default=50, max_val=500) + log_type = payload.get("type", "error") + # 白名單驗證:非法值預設為 error + if log_type not in _VALID_LOG_TYPES: + log_type = "error" + + try: + # docker logs 的 stdout = access log, stderr = error log + cmd = ["docker", "logs", "--tail", str(lines), "ching-tech-os-nginx"] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=15, + ) + + if log_type == "access": + output = result.stdout or "(無 access log)" + else: + output = result.stderr or "(無 error log)" + + print(json.dumps({ + "success": True, + "lines_requested": lines, + "type": log_type, + "output": output[:30000], + }, ensure_ascii=False)) + return 0 + + except subprocess.TimeoutExpired: + print(json.dumps({"success": False, "error": "指令執行逾時"}, ensure_ascii=False)) + return 1 + except Exception: + print(json.dumps({"success": False, "error": "查詢 Nginx 日誌失敗"}, ensure_ascii=False)) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/src/ching_tech_os/skills/debug-skill/scripts/check-server-logs.py b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-server-logs.py new file mode 100644 index 00000000..267eac30 --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-server-logs.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""查詢 CTOS 伺服器日誌(journalctl -u ching-tech-os)""" + +import json +import subprocess + +from ching_tech_os.skills.script_utils import parse_stdin_json_object + + +def _safe_int(value, default: int, min_val: int = 1, max_val: int = 500) -> int: + """安全的整數轉換,帶範圍限制""" + try: + return max(min_val, min(int(value), max_val)) + except (ValueError, TypeError): + return default + + +def main() -> int: + payload, error = parse_stdin_json_object() + if error: + print(json.dumps({"success": False, "error": error}, ensure_ascii=False)) + return 1 + payload = payload or {} + + lines = _safe_int(payload.get("lines", 50), default=50, max_val=500) + keyword = payload.get("keyword", "") + + try: + cmd = ["journalctl", "-u", "ching-tech-os", "-n", str(lines), "--no-pager"] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=15, + ) + + output = result.stdout or "" + if result.returncode != 0 and result.stderr: + output += f"\n[stderr] {result.stderr}" + + # 關鍵字過濾 + if keyword and output: + filtered = [ + line for line in output.splitlines() + if keyword.lower() in line.lower() + ] + output = "\n".join(filtered) + if not filtered: + output = f"(未找到包含 '{keyword}' 的日誌行)" + + print(json.dumps({ + "success": True, + "lines_requested": lines, + "keyword": keyword or None, + "output": output[:30000], # 限制輸出大小 + }, ensure_ascii=False)) + return 0 + + except subprocess.TimeoutExpired: + print(json.dumps({"success": False, "error": "指令執行逾時"}, ensure_ascii=False)) + return 1 + except Exception: + print(json.dumps({"success": False, "error": "查詢伺服器日誌失敗"}, ensure_ascii=False)) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/src/ching_tech_os/skills/debug-skill/scripts/check-system-health.py b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-system-health.py new file mode 100644 index 00000000..ed4e9785 --- /dev/null +++ b/backend/src/ching_tech_os/skills/debug-skill/scripts/check-system-health.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +"""綜合系統健康檢查 - 一次跑完所有診斷項目,回傳摘要報告""" + +import json +import subprocess + +from ching_tech_os.skills.script_utils import parse_stdin_json_object + + +def _run_cmd(cmd: list[str], timeout: int = 10) -> tuple[str, bool]: + """執行指令,回傳 (輸出, 是否成功)""" + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout + ) + output = (result.stdout or "") + (result.stderr or "") + return output.strip(), result.returncode == 0 + except subprocess.TimeoutExpired: + return "(指令逾時)", False + except Exception: + return "(指令執行失敗)", False + + +def _run_sql(sql: str) -> tuple[str, bool]: + """執行 SQL 查詢""" + return _run_cmd([ + "docker", "exec", "ching-tech-os-db", + "psql", "-U", "ching_tech", "-d", "ching_tech_os", + "-t", "-c", sql, + ]) + + +def main() -> int: + payload, error = parse_stdin_json_object() + if error: + print(json.dumps({"success": False, "error": error}, ensure_ascii=False)) + return 1 + + checks = [] + overall_status = "正常" + + # 1. CTOS 服務狀態 + output, ok = _run_cmd(["systemctl", "is-active", "ching-tech-os"]) + status = "正常" if ok and "active" in output else "異常" + checks.append({"name": "CTOS 服務", "status": status, "detail": output[:200]}) + if status == "異常": + overall_status = "嚴重" + + # 2. CTOS 最近錯誤日誌 + output, ok = _run_cmd([ + "journalctl", "-u", "ching-tech-os", "-n", "20", + "--no-pager", "-p", "err", + ]) + error_count = len(output.strip().splitlines()) if output.strip() else 0 + if error_count > 10: + status = "警告" + if overall_status == "正常": + overall_status = "注意" + elif error_count > 0: + status = "注意" + if overall_status == "正常": + overall_status = "注意" + else: + status = "正常" + checks.append({ + "name": "CTOS 錯誤日誌", + "status": status, + "detail": f"最近 20 行錯誤日誌:{error_count} 行\n{output[:500]}", + }) + + # 3. Docker 容器狀態 + output, ok = _run_cmd(["docker", "ps", "--format", "{{.Names}}: {{.Status}}"]) + container_issues = [] + for line in output.splitlines(): + if line and "Up" not in line: + container_issues.append(line) + if container_issues: + status = "警告" + if overall_status in ("正常", "注意"): + overall_status = "警告" + else: + status = "正常" + checks.append({ + "name": "Docker 容器", + "status": status, + "detail": output[:500] if output else "(無法查詢 Docker 狀態)", + }) + + # 4. 資料庫連線 + output, ok = _run_sql( + "SELECT count(*) FROM pg_stat_activity WHERE datname = 'ching_tech_os'" + ) + db_conn_count = output.strip() if ok else "N/A" + status = "正常" + try: + if ok and int(db_conn_count) > 80: + status = "警告" + if overall_status in ("正常", "注意"): + overall_status = "警告" + except ValueError: + pass + checks.append({ + "name": "資料庫連線數", + "status": status, + "detail": f"目前連線數: {db_conn_count}", + }) + + # 5. 資料庫大小 + output, ok = _run_sql( + "SELECT pg_size_pretty(pg_database_size('ching_tech_os'))" + ) + checks.append({ + "name": "資料庫大小", + "status": "正常", + "detail": f"大小: {output.strip()}" if ok else "無法查詢", + }) + + # 6. AI logs 最近失敗 + output, ok = _run_sql( + "SELECT count(*) FROM ai_logs " + "WHERE success = false AND created_at > NOW() - INTERVAL '1 hour'" + ) + ai_errors = output.strip() if ok else "N/A" + try: + if ok and int(ai_errors) > 5: + status = "注意" + if overall_status == "正常": + overall_status = "注意" + else: + status = "正常" + except ValueError: + status = "正常" + checks.append({ + "name": "AI 失敗記錄(1 小時內)", + "status": status, + "detail": f"失敗次數: {ai_errors}", + }) + + # 7. Nginx 狀態 + output, ok = _run_cmd([ + "docker", "inspect", "--format", "{{.State.Status}}", "ching-tech-os-nginx" + ]) + nginx_status = output.strip() if ok else "未知" + status = "正常" if nginx_status == "running" else "異常" + if status == "異常" and overall_status != "嚴重": + overall_status = "警告" + checks.append({ + "name": "Nginx", + "status": status, + "detail": f"容器狀態: {nginx_status}", + }) + + # 組裝報告 + report_lines = [f"整體狀態: {overall_status}", ""] + for check in checks: + icon = {"正常": "✅", "注意": "⚠️", "警告": "🔶", "異常": "❌"}.get( + check["status"], "❓" + ) + report_lines.append(f"{icon} {check['name']}: {check['status']}") + if check.get("detail"): + # 縮排細節 + for detail_line in check["detail"].splitlines()[:5]: + report_lines.append(f" {detail_line}") + report_lines.append("") + + print(json.dumps({ + "success": True, + "overall_status": overall_status, + "checks_count": len(checks), + "output": "\n".join(report_lines)[:30000], + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/tests/test_api_linebot_router_events.py b/backend/tests/test_api_linebot_router_events.py index 758124dc..9553c9dd 100644 --- a/backend/tests/test_api_linebot_router_events.py +++ b/backend/tests/test_api_linebot_router_events.py @@ -503,6 +503,135 @@ async def test_process_media_audio_and_image_reclassify(monkeypatch: pytest.Monk assert save_file_record.await_args_list[1].kwargs["file_type"] == "image" +@pytest.mark.asyncio +async def test_restricted_mode_routing(monkeypatch: pytest.MonkeyPatch) -> None: + """受限模式路由:斜線指令攔截、AI 處理、reply_token fallback""" + monkeypatch.setattr(linebot_router, "TextMessageContent", _TextMessage) + monkeypatch.setattr(linebot_router, "ImageMessageContent", _ImageMessage) + + get_user_profile = AsyncMock(return_value={"displayName": "U"}) + get_or_create_user = AsyncMock(return_value=uuid4()) + save_message = AsyncMock(return_value=uuid4()) + reply_text_fn = AsyncMock() + push_text_fn = AsyncMock() + get_line_user_record = AsyncMock(return_value={"display_name": "TestUser"}) + + monkeypatch.setattr(linebot_router, "get_user_profile", get_user_profile) + monkeypatch.setattr(linebot_router, "get_or_create_user", get_or_create_user) + monkeypatch.setattr(linebot_router, "save_message", save_message) + monkeypatch.setattr(linebot_router, "reply_text", reply_text_fn) + monkeypatch.setattr(linebot_router, "push_text", push_text_fn) + monkeypatch.setattr(linebot_router, "is_binding_code_format", AsyncMock(return_value=False)) + monkeypatch.setattr(linebot_router, "check_line_access", AsyncMock(return_value=(False, "user_not_bound"))) + monkeypatch.setattr(linebot_router, "get_line_user_record", get_line_user_record) + + # --- 測試 1: 受限模式斜線指令攔截(/reset)--- + from ching_tech_os.services.bot.identity_router import UnboundRouteResult + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.route_unbound", + lambda **kw: UnboundRouteResult(action="restricted"), + ) + # mock handle_restricted_mode 避免呼叫 + handle_restricted_mock = AsyncMock(return_value="AI 回覆") + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.handle_restricted_mode", + handle_restricted_mock, + ) + + # 先測試斜線指令攔截 + from ching_tech_os.services.bot_line.trigger import reset_conversation + monkeypatch.setattr( + "ching_tech_os.services.bot_line.trigger.reset_conversation", + AsyncMock(), + ) + + reply_text_fn.reset_mock() + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-reset", "/reset"), + source=_Source(user_id="U1"), + ) + ) + # /reset 應該被攔截,回覆文字 + reply_text_fn.assert_awaited() + handle_restricted_mock.assert_not_awaited() + + # --- 測試 2: 受限模式 AI 處理 + reply 成功 --- + reply_text_fn.reset_mock() + handle_restricted_mock.reset_mock() + + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-ai", "你好"), + source=_Source(user_id="U1"), + ) + ) + handle_restricted_mock.assert_awaited_once() + reply_text_fn.assert_awaited_once() + + # --- 測試 3: reply_token 過期 → fallback 到 push_text --- + reply_text_fn.reset_mock() + push_text_fn.reset_mock() + handle_restricted_mock.reset_mock() + reply_text_fn.side_effect = Exception("reply token expired") + + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-expired", "你好"), + source=_Source(user_id="U1"), + ) + ) + push_text_fn.assert_awaited_once() + + # --- 測試 4: 無 reply_token → 直接 push --- + reply_text_fn.reset_mock() + push_text_fn.reset_mock() + handle_restricted_mock.reset_mock() + reply_text_fn.side_effect = None + + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-no-token", "你好"), + source=_Source(user_id="U1"), + reply_token=None, + ) + ) + push_text_fn.assert_awaited_once() + reply_text_fn.assert_not_awaited() + + # --- 測試 5: AI 處理例外 → push 錯誤訊息 --- + push_text_fn.reset_mock() + handle_restricted_mock.side_effect = Exception("AI crashed") + + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-crash", "你好"), + source=_Source(user_id="U1"), + ) + ) + push_text_fn.assert_awaited_once() + assert "錯誤" in push_text_fn.await_args[0][1] + + # --- 測試 6: AI 回應 None → 不發送 --- + push_text_fn.reset_mock() + reply_text_fn.reset_mock() + handle_restricted_mock.side_effect = None + handle_restricted_mock.return_value = None + + await linebot_router.process_message_event( + _Event( + message=_TextMessage("m-none", "你好"), + source=_Source(user_id="U1"), + ) + ) + reply_text_fn.assert_not_awaited() + push_text_fn.assert_not_awaited() + + @pytest.mark.asyncio async def test_linebot_router_admin_routes(monkeypatch: pytest.MonkeyPatch) -> None: session = _session() diff --git a/backend/tests/test_bot_commands.py b/backend/tests/test_bot_commands.py new file mode 100644 index 00000000..478c4bcb --- /dev/null +++ b/backend/tests/test_bot_commands.py @@ -0,0 +1,199 @@ +"""Bot 斜線指令路由框架測試""" + +import pytest + +from ching_tech_os.services.bot.commands import ( + CommandContext, + CommandRouter, + SlashCommand, +) + + +def _make_ctx(**kwargs) -> CommandContext: + """建立測試用 CommandContext""" + defaults = { + "platform_type": "line", + "platform_user_id": "U123", + "bot_user_id": "bot-user-uuid", + "ctos_user_id": 1, + "is_admin": False, + "is_group": False, + "group_id": None, + "reply_token": "token-123", + "raw_args": "", + } + defaults.update(kwargs) + return CommandContext(**defaults) + + +@pytest.fixture +def router(): + return CommandRouter() + + +@pytest.fixture +def echo_handler(): + """簡單的 echo handler""" + async def handler(ctx: CommandContext) -> str: + return f"echo: {ctx.raw_args}" if ctx.raw_args else "echo" + return handler + + +class TestCommandRouterParse: + """測試指令解析""" + + def test_parse_simple_command(self, router, echo_handler): + router.register(SlashCommand(name="test", handler=echo_handler)) + result = router.parse("/test") + assert result is not None + cmd, args = result + assert cmd.name == "test" + assert args == "" + + def test_parse_command_with_args(self, router, echo_handler): + router.register(SlashCommand(name="debug", handler=echo_handler)) + result = router.parse("/debug 系統有問題") + assert result is not None + cmd, args = result + assert cmd.name == "debug" + assert args == "系統有問題" + + def test_parse_case_insensitive(self, router, echo_handler): + router.register(SlashCommand(name="reset", handler=echo_handler)) + result = router.parse("/Reset") + assert result is not None + assert result[0].name == "reset" + + def test_parse_alias(self, router, echo_handler): + router.register(SlashCommand( + name="reset", + aliases=["新對話", "忘記"], + handler=echo_handler, + )) + result = router.parse("/新對話") + assert result is not None + assert result[0].name == "reset" + + def test_parse_unknown_command_returns_none(self, router): + result = router.parse("/unknown") + assert result is None + + def test_parse_non_command_returns_none(self, router, echo_handler): + router.register(SlashCommand(name="test", handler=echo_handler)) + assert router.parse("hello") is None + assert router.parse("") is None + assert router.parse("test") is None + + def test_parse_telegram_bot_mention_format(self, router, echo_handler): + """Telegram 的 /command@botname 格式""" + router.register(SlashCommand(name="reset", handler=echo_handler)) + result = router.parse("/reset@mybot") + assert result is not None + assert result[0].name == "reset" + + +class TestCommandRouterDispatch: + """測試指令分發和權限檢查""" + + @pytest.mark.asyncio + async def test_dispatch_basic(self, router, echo_handler): + router.register(SlashCommand(name="test", handler=echo_handler)) + ctx = _make_ctx() + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert result == "echo" + + @pytest.mark.asyncio + async def test_dispatch_with_args(self, router, echo_handler): + router.register(SlashCommand(name="test", handler=echo_handler)) + ctx = _make_ctx() + result = await router.dispatch( + router._commands["test"], "hello world", ctx + ) + assert result == "echo: hello world" + + @pytest.mark.asyncio + async def test_require_bound_rejects_unbound(self, router, echo_handler): + """未綁定用戶執行需綁定的指令""" + router.register(SlashCommand( + name="test", handler=echo_handler, require_bound=True, + )) + ctx = _make_ctx(ctos_user_id=None) + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert "綁定" in result + + @pytest.mark.asyncio + async def test_require_bound_allows_bound(self, router, echo_handler): + """已綁定用戶可執行需綁定的指令""" + router.register(SlashCommand( + name="test", handler=echo_handler, require_bound=True, + )) + ctx = _make_ctx(ctos_user_id=1) + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert result == "echo" + + @pytest.mark.asyncio + async def test_require_admin_rejects_non_admin(self, router, echo_handler): + """非管理員執行管理員指令""" + router.register(SlashCommand( + name="test", handler=echo_handler, require_admin=True, + )) + ctx = _make_ctx(is_admin=False) + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert "管理員" in result + + @pytest.mark.asyncio + async def test_require_admin_allows_admin(self, router, echo_handler): + """管理員可執行管理員指令""" + router.register(SlashCommand( + name="test", handler=echo_handler, require_admin=True, + )) + ctx = _make_ctx(is_admin=True) + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert result == "echo" + + @pytest.mark.asyncio + async def test_private_only_ignores_group(self, router, echo_handler): + """群組中執行僅限個人的指令""" + router.register(SlashCommand( + name="test", handler=echo_handler, private_only=True, + )) + ctx = _make_ctx(is_group=True) + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert result is None # 靜默忽略 + + @pytest.mark.asyncio + async def test_platform_mismatch_returns_none(self, router, echo_handler): + """不支援的平台""" + router.register(SlashCommand( + name="test", handler=echo_handler, platforms={"telegram"}, + )) + ctx = _make_ctx(platform_type="line") + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert result is None + + @pytest.mark.asyncio + async def test_handler_exception_returns_error(self, router): + """handler 拋出例外時回覆錯誤訊息""" + async def bad_handler(ctx): + raise RuntimeError("test error") + + router.register(SlashCommand(name="test", handler=bad_handler)) + ctx = _make_ctx() + result = await router.dispatch( + router._commands["test"], "", ctx + ) + assert "錯誤" in result diff --git a/backend/tests/test_bot_multi_mode_integration.py b/backend/tests/test_bot_multi_mode_integration.py new file mode 100644 index 00000000..06e2aa8b --- /dev/null +++ b/backend/tests/test_bot_multi_mode_integration.py @@ -0,0 +1,590 @@ +"""Bot 多模式平台整合測試 + +涵蓋身份分流、受限模式、知識庫公開存取、/debug 指令、rate limiter 等功能的 +端對端測試。 +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# ============================================================ +# 9.1 BOT_UNBOUND_USER_POLICY=reject 回歸測試 +# ============================================================ + + +class TestRejectPolicyRegression: + """確認 reject 策略下行為與現有系統一致""" + + def test_reject_policy_line_private(self): + """Line 個人對話 — 回覆綁定提示""" + from ching_tech_os.services.bot.identity_router import route_unbound + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "reject" + assert result.reply_text is not None + assert "CTOS" in result.reply_text + assert "綁定" in result.reply_text + + def test_reject_policy_telegram_private(self): + """Telegram 個人對話 — 回覆綁定提示(含 Telegram 字樣)""" + from ching_tech_os.services.bot.identity_router import route_unbound + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="telegram", is_group=False) + assert result.action == "reject" + assert "Telegram" in result.reply_text + + def test_reject_policy_group_silent(self): + """群組中未綁定用戶 — 靜默忽略(不受策略影響)""" + from ching_tech_os.services.bot.identity_router import route_unbound + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="line", is_group=True) + assert result.action == "silent" + assert result.reply_text is None + + def test_default_policy_is_reject(self): + """預設策略為 reject""" + from ching_tech_os.services.bot.identity_router import get_unbound_policy + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + assert get_unbound_policy() == "reject" + + def test_invalid_policy_falls_back_to_reject(self): + """無效策略值 → 回退到 reject""" + from ching_tech_os.services.bot.identity_router import get_unbound_policy + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "invalid_value" + assert get_unbound_policy() == "reject" + + +# ============================================================ +# 9.2 BOT_UNBOUND_USER_POLICY=restricted 受限模式測試 +# ============================================================ + + +class TestRestrictedPolicy: + """確認 restricted 策略下未綁定用戶可使用受限模式對話""" + + def test_restricted_policy_route(self): + """restricted 策略 — 路由到受限模式""" + from ching_tech_os.services.bot.identity_router import route_unbound + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "restricted" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "restricted" + assert result.reply_text is None # 不回覆拒絕訊息 + + def test_restricted_policy_group_still_silent(self): + """restricted 策略 — 群組中仍然靜默忽略""" + from ching_tech_os.services.bot.identity_router import route_unbound + + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "restricted" + result = route_unbound(platform_type="line", is_group=True) + assert result.action == "silent" + + @pytest.mark.asyncio + async def test_restricted_mode_full_flow(self): + """受限模式完整流程:取得 Agent → 呼叫 AI → 回覆""" + from ching_tech_os.services.bot.identity_router import handle_restricted_mode + + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.check_and_increment", + new_callable=AsyncMock, + return_value=(True, None), + ), + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是擎添工業的 AI 助理"}, + "tools": ["search_knowledge"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="system prompt", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=["search_knowledge"], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("您好!我是擎添工業的 AI 助理。", []), + ), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-abc", + is_group=False, + ) + assert result is not None + assert "擎添" in result + + @pytest.mark.asyncio + async def test_restricted_mode_no_bot_user_id(self): + """bot_user_id 為 None → 跳過 rate limit,仍可執行""" + from ching_tech_os.services.bot.identity_router import handle_restricted_mode + + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是 AI 助理"}, + "tools": [], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="prompt", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=[], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("回覆內容", []), + ), + ): + # bot_user_id=None → 不檢查 rate limit + result = await handle_restricted_mode( + content="測試", + platform_user_id="U999", + bot_user_id=None, + is_group=False, + ) + assert result == "回覆內容" + + +# ============================================================ +# 9.3 受限模式 search_knowledge 只回傳公開知識 +# ============================================================ + + +class TestPublicKnowledgeFiltering: + """確認受限模式下 search_knowledge 只回傳公開知識""" + + def test_public_only_filter_scope_and_public(self): + """public_only=True 時只回傳 scope=global 且 is_public=true""" + from ching_tech_os.models.knowledge import KnowledgeTags + + # 模擬索引中的知識項目 + entries = [ + SimpleNamespace( + id="K001", title="公開知識", type="knowledge", category="technical", + scope="global", is_public=True, tags=KnowledgeTags(), + author="system", updated_at="2025-01-01", owner=None, + project_id=None, filename="k001.md", + ), + SimpleNamespace( + id="K002", title="內部知識", type="knowledge", category="technical", + scope="global", is_public=False, tags=KnowledgeTags(), + author="system", updated_at="2025-01-01", owner=None, + project_id=None, filename="k002.md", + ), + SimpleNamespace( + id="K003", title="個人知識", type="knowledge", category="technical", + scope="personal", is_public=True, tags=KnowledgeTags(), + author="user1", updated_at="2025-01-01", owner="user1", + project_id=None, filename="k003.md", + ), + ] + + # 測試過濾邏輯 + filtered = [] + for entry in entries: + entry_is_public = getattr(entry, "is_public", False) + entry_scope_val = getattr(entry, "scope", "global") + if entry_scope_val != "global" or not entry_is_public: + continue + filtered.append(entry) + + # 只有 K001 符合(global + is_public) + assert len(filtered) == 1 + assert filtered[0].id == "K001" + + def test_mcp_tool_public_only_for_unbound(self): + """MCP search_knowledge 工具:ctos_user_id=None → public_only=True""" + # 驗證邏輯正確性 + ctos_user_id = None + public_only = ctos_user_id is None + assert public_only is True + + def test_mcp_tool_no_filter_for_bound(self): + """MCP search_knowledge 工具:ctos_user_id 有值 → public_only=False""" + ctos_user_id = 42 + public_only = ctos_user_id is None + assert public_only is False + + +# ============================================================ +# 9.4 /debug 管理員執行/非管理員拒絕 +# ============================================================ + + +class TestDebugCommand: + """確認 /debug 指令的權限控制""" + + @pytest.mark.asyncio + async def test_debug_admin_can_execute(self): + """管理員可以執行 /debug""" + from ching_tech_os.services.bot.commands import CommandContext, router + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + ctx = CommandContext( + platform_type="line", + platform_user_id="U_ADMIN", + bot_user_id="uuid-admin", + ctos_user_id=1, + is_admin=True, + is_group=False, + group_id=None, + reply_token=None, + raw_args="", + ) + + parsed = router.parse("/debug") + assert parsed is not None + command, args = parsed + + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是診斷助理"}, + "tools": ["run_skill_script"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("系統狀態正常", []), + ), + ): + reply = await router.dispatch(command, args, ctx) + assert reply is not None + assert "系統狀態正常" in reply + + @pytest.mark.asyncio + async def test_debug_non_admin_rejected(self): + """非管理員被拒絕""" + from ching_tech_os.services.bot.commands import CommandContext, router + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + ctx = CommandContext( + platform_type="line", + platform_user_id="U_NORMAL", + bot_user_id="uuid-normal", + ctos_user_id=2, + is_admin=False, + is_group=False, + group_id=None, + reply_token=None, + raw_args="", + ) + + parsed = router.parse("/debug") + assert parsed is not None + command, args = parsed + + reply = await router.dispatch(command, args, ctx) + assert reply is not None + assert "管理員" in reply + + @pytest.mark.asyncio + async def test_debug_group_silent(self): + """群組中 /debug 靜默忽略""" + from ching_tech_os.services.bot.commands import CommandContext, router + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + ctx = CommandContext( + platform_type="line", + platform_user_id="U_ADMIN", + bot_user_id="uuid-admin", + ctos_user_id=1, + is_admin=True, + is_group=True, + group_id="group-1", + reply_token=None, + raw_args="", + ) + + parsed = router.parse("/debug") + assert parsed is not None + command, args = parsed + + reply = await router.dispatch(command, args, ctx) + assert reply is None # 群組靜默忽略 + + @pytest.mark.asyncio + async def test_debug_unbound_rejected(self): + """未綁定用戶被拒絕""" + from ching_tech_os.services.bot.commands import CommandContext, router + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + ctx = CommandContext( + platform_type="line", + platform_user_id="U_UNBOUND", + bot_user_id="uuid-unbound", + ctos_user_id=None, + is_admin=False, + is_group=False, + group_id=None, + reply_token=None, + raw_args="", + ) + + parsed = router.parse("/debug") + assert parsed is not None + command, args = parsed + + reply = await router.dispatch(command, args, ctx) + assert reply is not None + assert "綁定" in reply + + +# ============================================================ +# 9.5 rate limiter 超限測試 +# ============================================================ + + +class TestRateLimiterIntegration: + """確認 rate limiter 超過限額時回覆使用上限提示""" + + @pytest.mark.asyncio + async def test_rate_limit_exceeded_blocks_restricted_mode(self): + """超過頻率限制 → 受限模式回覆上限提示""" + from ching_tech_os.services.bot.identity_router import handle_restricted_mode + + deny_msg = "您今日的使用次數已達上限,請明天再試。" + + with patch( + "ching_tech_os.services.bot.rate_limiter.check_and_increment", + new_callable=AsyncMock, + return_value=(False, deny_msg), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert result == deny_msg + + @pytest.mark.asyncio + async def test_rate_limit_not_checked_without_bot_user(self): + """bot_user_id=None → 不檢查 rate limit""" + from ching_tech_os.services.bot.identity_router import handle_restricted_mode + + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + # 不 mock rate_limiter — 因為 bot_user_id=None 時不會呼叫 + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "AI"}, + "tools": [], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="p", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=[], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("OK", []), + ), + ): + result = await handle_restricted_mode( + content="hi", + platform_user_id="U999", + bot_user_id=None, + is_group=False, + ) + assert result == "OK" + + @pytest.mark.asyncio + async def test_hourly_rate_limit_key(self): + """每小時限額 key 格式正確""" + from ching_tech_os.services.bot.rate_limiter import _current_hourly_key + + key = _current_hourly_key() + # 格式:YYYY-MM-DD-HH + assert len(key.split("-")) == 4 + + @pytest.mark.asyncio + async def test_daily_rate_limit_key(self): + """每日限額 key 格式正確""" + from ching_tech_os.services.bot.rate_limiter import _current_daily_key + + key = _current_daily_key() + # 格式:YYYY-MM-DD + assert len(key.split("-")) == 3 + + +# ============================================================ +# 跨功能整合測試 +# ============================================================ + + +class TestCrossFunctionalIntegration: + """跨功能整合驗證""" + + def test_library_public_folders_config(self): + """LIBRARY_PUBLIC_FOLDERS 環境變數有預設值""" + from ching_tech_os.config import settings + + assert isinstance(settings.library_public_folders, list) + assert len(settings.library_public_folders) > 0 + + def test_knowledge_models_have_is_public(self): + """知識庫模型包含 is_public 欄位""" + from ching_tech_os.models.knowledge import ( + KnowledgeCreate, + KnowledgeUpdate, + KnowledgeResponse, + KnowledgeListItem, + IndexEntry, + ) + + # 驗證 is_public 存在且預設為 False + create = KnowledgeCreate(title="t", content="c") + assert create.is_public is False + + update = KnowledgeUpdate() + assert update.is_public is None # Optional + + # 驗證 Response 和 ListItem 有 is_public + assert "is_public" in KnowledgeResponse.model_fields + assert "is_public" in KnowledgeListItem.model_fields + assert "is_public" in IndexEntry.model_fields + + def test_slash_command_aliases(self): + """斜線指令別名可正確解析""" + from ching_tech_os.services.bot.commands import router + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + + # /reset 別名 + for alias in ["/reset", "/新對話", "/忘記"]: + parsed = router.parse(alias) + assert parsed is not None, f"別名 {alias} 無法解析" + assert parsed[0].name == "reset" + + # /debug 別名 + for alias in ["/debug", "/診斷", "/diag"]: + parsed = router.parse(alias) + assert parsed is not None, f"別名 {alias} 無法解析" + assert parsed[0].name == "debug" diff --git a/backend/tests/test_bot_telegram_handler.py b/backend/tests/test_bot_telegram_handler.py index 526d128e..e9dcf7a6 100644 --- a/backend/tests/test_bot_telegram_handler.py +++ b/backend/tests/test_bot_telegram_handler.py @@ -255,10 +255,21 @@ async def test_handle_text_command_and_access_paths(monkeypatch: pytest.MonkeyPa await handler._handle_text(message, "/help", "100", chat, user, False, adapter) adapter.send_text.assert_awaited_with("100", handler.HELP_MESSAGE) - # reset 指令 + # reset 指令(透過 CommandRouter 處理) + from ching_tech_os.services.bot import command_handlers + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + + register_builtin_commands() + monkeypatch.setattr(command_handlers, "reset_conversation", AsyncMock()) + # mock get_user_role_and_permissions 避免 DB 查詢 + monkeypatch.setattr( + handler, "get_user_role_and_permissions", + AsyncMock(return_value={"role": "user"}), + ) adapter.send_text.reset_mock() await handler._handle_text(message, "/reset", "100", chat, user, False, adapter) - adapter.send_text.assert_awaited_with("100", "對話已重置 ✨") + adapter.send_text.assert_awaited_once() + assert "對話" in adapter.send_text.await_args.args[1] or "清除" in adapter.send_text.await_args.args[1] # 綁定碼 monkeypatch.setattr(handler, "is_binding_code_format", AsyncMock(return_value=True)) @@ -286,6 +297,85 @@ async def test_handle_text_command_and_access_paths(monkeypatch: pytest.MonkeyPa adapter.send_text.assert_awaited_with("100", "抱歉,處理訊息時發生錯誤,請稍後再試。") +@pytest.mark.asyncio +async def test_restricted_mode_routing(monkeypatch: pytest.MonkeyPatch) -> None: + """測試 Telegram 受限模式:指令攔截(頂層)+ AI 處理 + 錯誤處理""" + conn = AsyncMock() + monkeypatch.setattr(handler, "get_connection", lambda: _CM(conn)) + monkeypatch.setattr(handler, "_ensure_bot_user", AsyncMock(return_value="u1")) + monkeypatch.setattr(handler, "_save_message", AsyncMock(return_value="msg-r1")) + monkeypatch.setattr(handler, "is_binding_code_format", AsyncMock(return_value=False)) + + # 未綁定用戶,restricted 策略 + monkeypatch.setattr( + handler, "check_line_access", + AsyncMock(return_value=(False, "user_not_bound")), + ) + + adapter = SimpleNamespace(send_text=AsyncMock(), bot=SimpleNamespace()) + message = SimpleNamespace(message_id=1) + chat = SimpleNamespace(id=100, type="private") + user = SimpleNamespace(id=9, full_name="小明", username="xm") + + # 註冊內建指令 + from ching_tech_os.services.bot import command_handlers + from ching_tech_os.services.bot.command_handlers import register_builtin_commands + register_builtin_commands() + monkeypatch.setattr(command_handlers, "reset_conversation", AsyncMock()) + + # --- 1. 斜線指令由頂層 CommandRouter 攔截(不到 restricted 分支)--- + from ching_tech_os.services.bot.identity_router import UnboundRouteResult + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.route_unbound", + lambda **kw: UnboundRouteResult(action="restricted"), + ) + adapter.send_text.reset_mock() + await handler._handle_text(message, "/reset", "100", chat, user, False, adapter) + adapter.send_text.assert_awaited_once() + assert "對話" in adapter.send_text.await_args.args[1] + + # --- 2. 受限模式 AI 處理成功(含 message_uuid 傳遞)--- + mock_restricted = AsyncMock(return_value="受限模式回覆") + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.handle_restricted_mode", + mock_restricted, + ) + adapter.send_text.reset_mock() + await handler._handle_text(message, "你好", "100", chat, user, False, adapter) + adapter.send_text.assert_awaited_with("100", "受限模式回覆") + # 確認 message_uuid 和 str(bot_user_id) 有傳入 + call_kwargs = mock_restricted.call_args.kwargs + assert call_kwargs["message_uuid"] == "msg-r1" + assert call_kwargs["bot_user_id"] == "u1" + + # --- 3. 受限模式 AI 回傳 None → 不送訊息 --- + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.handle_restricted_mode", + AsyncMock(return_value=None), + ) + adapter.send_text.reset_mock() + await handler._handle_text(message, "你好", "100", chat, user, False, adapter) + adapter.send_text.assert_not_awaited() + + # --- 4. 受限模式 AI 拋出例外 → 錯誤訊息 --- + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.handle_restricted_mode", + AsyncMock(side_effect=RuntimeError("AI boom")), + ) + adapter.send_text.reset_mock() + await handler._handle_text(message, "你好", "100", chat, user, False, adapter) + assert "發生錯誤" in adapter.send_text.await_args.args[1] + + # --- 5. silent 路由(群組)→ 不送訊息 --- + monkeypatch.setattr( + "ching_tech_os.services.bot.identity_router.route_unbound", + lambda **kw: UnboundRouteResult(action="silent"), + ) + adapter.send_text.reset_mock() + await handler._handle_text(message, "你好", "100", chat, user, True, adapter) + adapter.send_text.assert_not_awaited() + + @pytest.mark.asyncio async def test_handle_media_paths(monkeypatch: pytest.MonkeyPatch) -> None: conn = AsyncMock() diff --git a/backend/tests/test_debug_command.py b/backend/tests/test_debug_command.py new file mode 100644 index 00000000..8b5a1843 --- /dev/null +++ b/backend/tests/test_debug_command.py @@ -0,0 +1,236 @@ +"""Bot /debug 指令測試""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from ching_tech_os.services.bot.commands import CommandContext, CommandRouter, SlashCommand +from ching_tech_os.services.bot.command_handlers import _handle_debug + + +def _make_ctx(**kwargs) -> CommandContext: + """建立測試用 CommandContext""" + defaults = { + "platform_type": "line", + "platform_user_id": "U123", + "bot_user_id": "bot-user-uuid", + "ctos_user_id": 1, + "is_admin": True, + "is_group": False, + "group_id": None, + "reply_token": "token-123", + "raw_args": "", + } + defaults.update(kwargs) + return CommandContext(**defaults) + + +class TestDebugCommand: + """測試 /debug 指令""" + + @pytest.mark.asyncio + async def test_debug_agent_not_found(self): + """bot-debug Agent 不存在 → 回傳錯誤""" + with patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value=None, + ): + ctx = _make_ctx(is_admin=True, raw_args="") + result = await _handle_debug(ctx) + assert "bot-debug Agent 不存在" in result + + @pytest.mark.asyncio + async def test_debug_no_prompt(self): + """Agent 無 system_prompt → 回傳錯誤""" + with patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={"system_prompt": None, "tools": []}, + ): + ctx = _make_ctx(is_admin=True) + result = await _handle_debug(ctx) + assert "缺少 system_prompt" in result + + @pytest.mark.asyncio + async def test_debug_successful(self): + """成功的 debug 流程""" + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是系統診斷助理"}, + "tools": ["run_skill_script"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("系統狀態正常,無異常。", []), + ), + ): + ctx = _make_ctx(is_admin=True, raw_args="系統有什麼問題嗎") + result = await _handle_debug(ctx) + assert result == "系統狀態正常,無異常。" + + @pytest.mark.asyncio + async def test_debug_default_prompt(self): + """無問題描述時使用預設 prompt""" + mock_response = MagicMock() + mock_response.tool_calls = [] + + with ( + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是系統診斷助理"}, + "tools": ["run_skill_script"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ) as mock_call, + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("健檢結果正常", []), + ), + ): + ctx = _make_ctx(is_admin=True, raw_args="") + result = await _handle_debug(ctx) + assert result == "健檢結果正常" + # 檢查呼叫 call_claude 時的 prompt 包含預設文字 + call_args = mock_call.call_args + assert "check-system-health" in call_args.kwargs.get("prompt", "") + + @pytest.mark.asyncio + async def test_debug_call_failure(self): + """AI 呼叫失敗 → 回傳錯誤""" + with ( + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是系統診斷助理"}, + "tools": ["run_skill_script"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + side_effect=Exception("timeout"), + ), + ): + ctx = _make_ctx(is_admin=True, raw_args="問題?") + result = await _handle_debug(ctx) + assert "診斷執行失敗" in result + + +class TestDebugCommandRouting: + """測試 /debug 指令路由(權限檢查)""" + + def test_debug_registered(self): + """/debug 指令已註冊""" + router = CommandRouter() + router.register( + SlashCommand( + name="debug", + aliases=["診斷"], + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + parsed = router.parse("/debug 查詢日誌") + assert parsed is not None + cmd, args = parsed + assert cmd.name == "debug" + assert args == "查詢日誌" + + @pytest.mark.asyncio + async def test_non_admin_rejected(self): + """非管理員 → 拒絕""" + router = CommandRouter() + router.register( + SlashCommand( + name="debug", + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + parsed = router.parse("/debug") + assert parsed is not None + cmd, args = parsed + ctx = _make_ctx(is_admin=False) + result = await router.dispatch(cmd, args, ctx) + assert "管理員" in result + + @pytest.mark.asyncio + async def test_unbound_rejected(self): + """未綁定用戶 → 拒絕""" + router = CommandRouter() + router.register( + SlashCommand( + name="debug", + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + parsed = router.parse("/debug") + assert parsed is not None + cmd, args = parsed + ctx = _make_ctx(ctos_user_id=None, is_admin=False) + result = await router.dispatch(cmd, args, ctx) + assert "綁定" in result + + @pytest.mark.asyncio + async def test_group_silently_ignored(self): + """群組中 → 靜默忽略""" + router = CommandRouter() + router.register( + SlashCommand( + name="debug", + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + parsed = router.parse("/debug") + assert parsed is not None + cmd, args = parsed + ctx = _make_ctx(is_group=True, is_admin=True) + result = await router.dispatch(cmd, args, ctx) + assert result is None # 靜默忽略 + + def test_alias_parse(self): + """別名 /診斷 可解析""" + router = CommandRouter() + router.register( + SlashCommand( + name="debug", + aliases=["診斷", "diag"], + handler=_handle_debug, + require_bound=True, + require_admin=True, + private_only=True, + ) + ) + parsed = router.parse("/診斷") + assert parsed is not None + cmd, args = parsed + assert cmd.name == "debug" diff --git a/backend/tests/test_identity_router.py b/backend/tests/test_identity_router.py new file mode 100644 index 00000000..031b68fa --- /dev/null +++ b/backend/tests/test_identity_router.py @@ -0,0 +1,355 @@ +"""身份分流路由器單元測試""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from ching_tech_os.services.bot.identity_router import ( + UnboundRouteResult, + route_unbound, + get_unbound_policy, + handle_restricted_mode, + BINDING_PROMPT_LINE, + BINDING_PROMPT_TELEGRAM, +) + + +# ============================================================ +# route_unbound() 測試 +# ============================================================ + + +class TestRouteUnbound: + """測試 route_unbound() 分流邏輯""" + + def test_reject_policy_line_private(self): + """reject 策略 + Line 個人對話 → 回覆綁定提示""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "reject" + assert result.reply_text == BINDING_PROMPT_LINE + + def test_reject_policy_telegram_private(self): + """reject 策略 + Telegram 個人對話 → 回覆 Telegram 綁定提示""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="telegram", is_group=False) + assert result.action == "reject" + assert result.reply_text == BINDING_PROMPT_TELEGRAM + + def test_reject_policy_group_silent(self): + """reject 策略 + 群組 → 靜默忽略""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + result = route_unbound(platform_type="line", is_group=True) + assert result.action == "silent" + assert result.reply_text is None + + def test_restricted_policy_private(self): + """restricted 策略 + 個人對話 → 走受限模式""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "restricted" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "restricted" + assert result.reply_text is None + + def test_restricted_policy_group_still_silent(self): + """restricted 策略 + 群組 → 仍靜默忽略(群組不受策略影響)""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "restricted" + result = route_unbound(platform_type="telegram", is_group=True) + assert result.action == "silent" + + def test_default_policy_fallback(self): + """未設定/無效策略 → 預設 reject""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "invalid_value" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "reject" + + def test_empty_policy_fallback(self): + """空字串策略 → 預設 reject""" + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "" + result = route_unbound(platform_type="line", is_group=False) + assert result.action == "reject" + + +# ============================================================ +# get_unbound_policy() 測試 +# ============================================================ + + +class TestGetUnboundPolicy: + """測試 get_unbound_policy()""" + + def test_reject(self): + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "reject" + assert get_unbound_policy() == "reject" + + def test_restricted(self): + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "restricted" + assert get_unbound_policy() == "restricted" + + def test_case_insensitive(self): + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = "RESTRICTED" + assert get_unbound_policy() == "restricted" + + def test_whitespace_trimmed(self): + with patch( + "ching_tech_os.services.bot.identity_router.settings" + ) as mock_settings: + mock_settings.bot_unbound_user_policy = " reject " + assert get_unbound_policy() == "reject" + + +# ============================================================ +# handle_restricted_mode() 測試 +# ============================================================ + + +def _rate_limit_patches(): + """建立 rate limiter mock patches(避免 DB 連線)""" + return ( + patch( + "ching_tech_os.services.bot.rate_limiter.check_and_increment", + new_callable=AsyncMock, + return_value=(True, None), + ), + ) + + +class TestHandleRestrictedMode: + """測試受限模式 AI 流程""" + + @pytest.mark.asyncio + async def test_agent_not_found(self): + """bot-restricted Agent 不存在 → 回傳錯誤訊息""" + (rl,) = _rate_limit_patches() + with ( + rl, + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value=None, + ), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert result == "系統設定錯誤,請聯繫管理員。" + + @pytest.mark.asyncio + async def test_agent_no_prompt(self): + """bot-restricted Agent 無 system_prompt → 回傳錯誤""" + (rl,) = _rate_limit_patches() + with ( + rl, + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={"system_prompt": None, "tools": []}, + ), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert result == "系統設定錯誤,請聯繫管理員。" + + @pytest.mark.asyncio + async def test_successful_restricted_flow(self): + """成功的受限模式 AI 流程""" + mock_response = MagicMock() + mock_response.tool_calls = [] + (rl,) = _rate_limit_patches() + + with ( + rl, + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是 AI 助理"}, + "tools": ["search_knowledge"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="test prompt", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=["search_knowledge", "get_knowledge_item"], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("你好!有什麼可以幫你的?", []), + ), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert result == "你好!有什麼可以幫你的?" + + @pytest.mark.asyncio + async def test_file_message_filtered(self): + """受限模式過濾 FILE_MESSAGE 標記""" + mock_response = MagicMock() + mock_response.tool_calls = [] + (rl,) = _rate_limit_patches() + + with ( + rl, + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是 AI 助理"}, + "tools": ["search_knowledge"], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="test prompt", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=["search_knowledge"], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=( + "這是結果 [FILE_MESSAGE:path/to/file] 完畢", [] + ), + ), + ): + result = await handle_restricted_mode( + content="查詢", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert "[FILE_MESSAGE:" not in result + assert "這是結果" in result + assert "完畢" in result + + @pytest.mark.asyncio + async def test_empty_response_fallback(self): + """空回應 → 預設訊息""" + mock_response = MagicMock() + mock_response.tool_calls = [] + (rl,) = _rate_limit_patches() + + with ( + rl, + patch( + "ching_tech_os.services.ai_manager.get_agent_by_name", + new_callable=AsyncMock, + return_value={ + "system_prompt": {"content": "你是 AI 助理"}, + "tools": [], + }, + ), + patch( + "ching_tech_os.services.claude_agent.call_claude", + new_callable=AsyncMock, + return_value=mock_response, + ), + patch( + "ching_tech_os.services.linebot_ai.build_system_prompt", + new_callable=AsyncMock, + return_value="test prompt", + ), + patch( + "ching_tech_os.services.linebot_ai.get_conversation_context", + new_callable=AsyncMock, + return_value=([], [], []), + ), + patch( + "ching_tech_os.services.mcp.get_mcp_tool_names", + new_callable=AsyncMock, + return_value=[], + ), + patch( + "ching_tech_os.services.linebot_agents.get_mcp_servers_for_user", + new_callable=AsyncMock, + return_value=set(), + ), + patch( + "ching_tech_os.services.bot.ai.parse_ai_response", + return_value=("", []), + ), + ): + result = await handle_restricted_mode( + content="你好", + platform_user_id="U123", + bot_user_id="uuid-123", + is_group=False, + ) + assert result == "抱歉,我目前無法回答您的問題。" diff --git a/backend/tests/test_linebot_agents_init.py b/backend/tests/test_linebot_agents_init.py index ae34eaae..8fa89d0e 100644 --- a/backend/tests/test_linebot_agents_init.py +++ b/backend/tests/test_linebot_agents_init.py @@ -23,6 +23,7 @@ async def test_get_linebot_agent(monkeypatch: pytest.MonkeyPatch): @pytest.mark.asyncio async def test_ensure_default_linebot_agents_skip_existing(monkeypatch: pytest.MonkeyPatch): + # 所有 agent 都已存在 → 不建立任何東西 monkeypatch.setattr( linebot_agents.ai_manager, "get_agent_by_name", @@ -41,10 +42,12 @@ async def test_ensure_default_linebot_agents_skip_existing(monkeypatch: pytest.M @pytest.mark.asyncio async def test_ensure_default_linebot_agents_create_prompt_and_agent(monkeypatch: pytest.MonkeyPatch): + # linebot agents (2) + bot mode agents (2) = 4 個 agent + # 前 2 個(linebot)不存在,後 2 個(bot mode)已存在 monkeypatch.setattr( linebot_agents.ai_manager, "get_agent_by_name", - AsyncMock(side_effect=[None, None]), + AsyncMock(side_effect=[None, None, {"id": 3}, {"id": 4}]), ) monkeypatch.setattr( linebot_agents.ai_manager, diff --git a/backend/tests/test_linebot_ai_service.py b/backend/tests/test_linebot_ai_service.py index 7e5bfc3b..19cc8bba 100644 --- a/backend/tests/test_linebot_ai_service.py +++ b/backend/tests/test_linebot_ai_service.py @@ -303,7 +303,7 @@ async def test_handle_text_message(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( linebot_ai, "get_line_user_record", - AsyncMock(side_effect=[{"display_name": "小明"}, None]), + AsyncMock(side_effect=[{"id": "bot-user-1", "user_id": None, "display_name": "小明"}, None]), ) message_uuid = uuid4() @@ -329,82 +329,8 @@ async def test_handle_text_message(monkeypatch: pytest.MonkeyPatch) -> None: assert process.await_args.kwargs["user_display_name"] is None -@pytest.mark.asyncio -async def test_process_message_with_ai_reset_group_ignored(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: True) - result = await linebot_ai.process_message_with_ai( - message_uuid=uuid4(), - content="/reset", - line_group_id=uuid4(), - line_user_id="U1", - reply_token="r1", - ) - assert result is None - - -@pytest.mark.asyncio -async def test_process_message_with_ai_reset_reply_success(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: True) - monkeypatch.setattr(linebot_ai, "reset_conversation", AsyncMock(return_value=True)) - monkeypatch.setattr(linebot_ai, "save_bot_response", AsyncMock(return_value=uuid4())) - reply_text = AsyncMock(return_value="m1") - push_text = AsyncMock(return_value=("m2", None)) - monkeypatch.setattr(linebot_ai, "reply_text", reply_text) - monkeypatch.setattr(linebot_ai, "push_text", push_text) - - result = await linebot_ai.process_message_with_ai( - message_uuid=uuid4(), - content="/reset", - line_group_id=None, - line_user_id="U1", - reply_token="r1", - ) - assert "已清除對話歷史" in (result or "") - reply_text.assert_awaited_once() - push_text.assert_not_called() - - -@pytest.mark.asyncio -async def test_process_message_with_ai_reset_push_fallback(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: True) - monkeypatch.setattr(linebot_ai, "reset_conversation", AsyncMock(return_value=True)) - monkeypatch.setattr(linebot_ai, "save_bot_response", AsyncMock(return_value=uuid4())) - monkeypatch.setattr(linebot_ai, "reply_text", AsyncMock(side_effect=RuntimeError("expired"))) - push_text = AsyncMock(return_value=("m2", None)) - monkeypatch.setattr(linebot_ai, "push_text", push_text) - - result = await linebot_ai.process_message_with_ai( - message_uuid=uuid4(), - content="/新對話", - line_group_id=None, - line_user_id="U1", - reply_token="r1", - ) - assert "已清除對話歷史" in (result or "") - push_text.assert_awaited_once() - - -@pytest.mark.asyncio -async def test_process_message_with_ai_reset_push_also_fail(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: True) - monkeypatch.setattr(linebot_ai, "reset_conversation", AsyncMock(return_value=True)) - monkeypatch.setattr(linebot_ai, "save_bot_response", AsyncMock(return_value=uuid4())) - monkeypatch.setattr(linebot_ai, "reply_text", AsyncMock(side_effect=RuntimeError("expired"))) - monkeypatch.setattr(linebot_ai, "push_text", AsyncMock(side_effect=RuntimeError("push failed"))) - - result = await linebot_ai.process_message_with_ai( - message_uuid=uuid4(), - content="/新對話", - line_group_id=None, - line_user_id="U1", - reply_token="r1", - ) - assert "已清除對話歷史" in (result or "") - - @pytest.mark.asyncio async def test_process_message_with_ai_not_triggered(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: False) is_bot_message = AsyncMock(return_value=False) monkeypatch.setattr(linebot_ai, "is_bot_message", is_bot_message) monkeypatch.setattr(linebot_ai, "should_trigger_ai", lambda *_args, **_kwargs: False) @@ -423,7 +349,6 @@ async def test_process_message_with_ai_not_triggered(monkeypatch: pytest.MonkeyP @pytest.mark.asyncio async def test_process_message_with_ai_agent_not_found(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: False) monkeypatch.setattr(linebot_ai, "should_trigger_ai", lambda *_args, **_kwargs: True) monkeypatch.setattr(linebot_ai, "get_linebot_agent", AsyncMock(return_value=None)) reply_text = AsyncMock() @@ -442,7 +367,6 @@ async def test_process_message_with_ai_agent_not_found(monkeypatch: pytest.Monke @pytest.mark.asyncio async def test_process_message_with_ai_missing_system_prompt(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: False) monkeypatch.setattr(linebot_ai, "should_trigger_ai", lambda *_args, **_kwargs: True) monkeypatch.setattr( linebot_ai, @@ -490,7 +414,6 @@ def _patch_process_base(monkeypatch: pytest.MonkeyPatch) -> dict: linebot_agents_module = importlib.import_module("ching_tech_os.services.linebot_agents") mcp_module = importlib.import_module("ching_tech_os.services.mcp") - monkeypatch.setattr(linebot_ai, "is_reset_command", lambda _content: False) monkeypatch.setattr(linebot_ai, "is_bot_message", AsyncMock(return_value=True)) monkeypatch.setattr(linebot_ai, "should_trigger_ai", lambda *_args, **_kwargs: True) monkeypatch.setattr( diff --git a/backend/tests/test_rate_limiter.py b/backend/tests/test_rate_limiter.py new file mode 100644 index 00000000..69e65e98 --- /dev/null +++ b/backend/tests/test_rate_limiter.py @@ -0,0 +1,269 @@ +"""Rate Limiter 單元測試""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from ching_tech_os.services.bot.rate_limiter import ( + check_and_increment, + record_usage, + cleanup_old_tracking, + _current_hourly_key, + _current_daily_key, +) + + +# ============================================================ +# 輔助函式測試 +# ============================================================ + + +class TestPeriodKeys: + """period_key 格式測試""" + + def test_hourly_key_format(self): + """每小時 key 格式為 YYYY-MM-DD-HH""" + key = _current_hourly_key() + assert len(key) == 13 + parts = key.split("-") + assert len(parts) == 4 + assert len(parts[0]) == 4 + assert len(parts[1]) == 2 + assert len(parts[2]) == 2 + assert len(parts[3]) == 2 + + def test_daily_key_format(self): + """每日 key 格式為 YYYY-MM-DD""" + key = _current_daily_key() + assert len(key) == 10 + parts = key.split("-") + assert len(parts) == 3 + + +# ============================================================ +# 輔助 mock 工廠 +# ============================================================ + + +def _make_conn_mock(hourly_count: int = 0, daily_count: int = 0): + """建立模擬 DB 連線(支援 transaction + fetchrow) + + asyncpg 的 conn.transaction() 回傳一個同步物件,但支援 async with。 + """ + mock_conn = MagicMock() + + # fetchrow 依次回傳 hourly 和 daily 的計數結果 + mock_conn.fetchrow = AsyncMock( + side_effect=[ + {"message_count": hourly_count}, + {"message_count": daily_count}, + ] + ) + mock_conn.execute = AsyncMock() + + # transaction() 是同步呼叫,回傳的物件支援 async with + mock_txn = MagicMock() + mock_txn.__aenter__ = AsyncMock(return_value=mock_txn) + mock_txn.__aexit__ = AsyncMock(return_value=False) + mock_conn.transaction = MagicMock(return_value=mock_txn) + + return mock_conn + + +def _make_get_conn_patch(mock_conn): + """建立 get_connection context manager patch""" + mock_ctx = MagicMock() + mock_ctx.__aenter__ = AsyncMock(return_value=mock_conn) + mock_ctx.__aexit__ = AsyncMock(return_value=False) + return patch( + "ching_tech_os.services.bot.rate_limiter.get_connection", + return_value=mock_ctx, + ) + + +# ============================================================ +# check_and_increment() 測試 +# ============================================================ + + +class TestCheckAndIncrement: + """測試原子性 check_and_increment()""" + + @pytest.mark.asyncio + async def test_rate_limit_disabled_allows_all(self): + """停用 rate limit → 一律通過(但仍記錄用量)""" + mock_conn = _make_conn_mock() + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + _make_get_conn_patch(mock_conn), + ): + mock_settings.bot_rate_limit_enabled = False + allowed, msg = await check_and_increment("uuid-123") + assert allowed is True + assert msg is None + + @pytest.mark.asyncio + async def test_within_limits_allowed(self): + """使用量在限額內(遞增後 6 和 11)→ 通過""" + mock_conn = _make_conn_mock(hourly_count=6, daily_count=11) + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + _make_get_conn_patch(mock_conn), + ): + mock_settings.bot_rate_limit_enabled = True + mock_settings.bot_rate_limit_hourly = 20 + mock_settings.bot_rate_limit_daily = 50 + + allowed, msg = await check_and_increment("uuid-123") + assert allowed is True + assert msg is None + + @pytest.mark.asyncio + async def test_hourly_limit_exceeded(self): + """遞增後超過每小時限額 → 拒絕""" + mock_conn = _make_conn_mock(hourly_count=21, daily_count=30) + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + _make_get_conn_patch(mock_conn), + ): + mock_settings.bot_rate_limit_enabled = True + mock_settings.bot_rate_limit_hourly = 20 + mock_settings.bot_rate_limit_daily = 50 + + allowed, msg = await check_and_increment("uuid-123") + assert allowed is False + assert "每小時" in msg + assert "20" in msg + + @pytest.mark.asyncio + async def test_daily_limit_exceeded(self): + """遞增後超過每日限額 → 拒絕""" + mock_conn = _make_conn_mock(hourly_count=5, daily_count=51) + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + _make_get_conn_patch(mock_conn), + ): + mock_settings.bot_rate_limit_enabled = True + mock_settings.bot_rate_limit_hourly = 20 + mock_settings.bot_rate_limit_daily = 50 + + allowed, msg = await check_and_increment("uuid-123") + assert allowed is False + assert "每日" in msg + assert "50" in msg + + @pytest.mark.asyncio + async def test_first_message_allowed(self): + """第一則訊息(遞增後 hourly=1, daily=1)→ 通過""" + mock_conn = _make_conn_mock(hourly_count=1, daily_count=1) + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + _make_get_conn_patch(mock_conn), + ): + mock_settings.bot_rate_limit_enabled = True + mock_settings.bot_rate_limit_hourly = 20 + mock_settings.bot_rate_limit_daily = 50 + + allowed, msg = await check_and_increment("uuid-123") + assert allowed is True + assert msg is None + + @pytest.mark.asyncio + async def test_db_error_fail_open(self): + """DB 錯誤 → fail-open(允許通過)""" + mock_ctx = MagicMock() + mock_ctx.__aenter__ = AsyncMock(side_effect=Exception("DB error")) + mock_ctx.__aexit__ = AsyncMock(return_value=False) + + with ( + patch( + "ching_tech_os.services.bot.rate_limiter.settings" + ) as mock_settings, + patch( + "ching_tech_os.services.bot.rate_limiter.get_connection", + return_value=mock_ctx, + ), + ): + mock_settings.bot_rate_limit_enabled = True + + allowed, msg = await check_and_increment("uuid-123") + assert allowed is True + assert msg is None + + +# ============================================================ +# record_usage() 測試 +# ============================================================ + + +class TestRecordUsage: + """測試 record_usage()""" + + @pytest.mark.asyncio + async def test_record_executes_upserts(self): + """記錄使用量在交易中執行兩次 UPSERT(hourly + daily)""" + mock_conn = _make_conn_mock() + + with _make_get_conn_patch(mock_conn): + await record_usage("uuid-123") + assert mock_conn.execute.call_count == 2 + + @pytest.mark.asyncio + async def test_record_failure_does_not_raise(self): + """記錄失敗不應拋出例外""" + mock_ctx = MagicMock() + mock_ctx.__aenter__ = AsyncMock(side_effect=Exception("DB error")) + mock_ctx.__aexit__ = AsyncMock(return_value=False) + + with patch( + "ching_tech_os.services.bot.rate_limiter.get_connection", + return_value=mock_ctx, + ): + await record_usage("uuid-123") + + +# ============================================================ +# cleanup_old_tracking() 測試 +# ============================================================ + + +class TestCleanupOldTracking: + """測試 cleanup_old_tracking()""" + + @pytest.mark.asyncio + async def test_cleanup_returns_count(self): + """清理成功回傳刪除筆數""" + mock_conn = MagicMock() + mock_conn.execute = AsyncMock(return_value="DELETE 42") + + with _make_get_conn_patch(mock_conn): + deleted = await cleanup_old_tracking(days=30) + assert deleted == 42 + + @pytest.mark.asyncio + async def test_cleanup_failure_returns_zero(self): + """清理失敗回傳 0""" + mock_ctx = MagicMock() + mock_ctx.__aenter__ = AsyncMock(side_effect=Exception("DB error")) + mock_ctx.__aexit__ = AsyncMock(return_value=False) + + with patch( + "ching_tech_os.services.bot.rate_limiter.get_connection", + return_value=mock_ctx, + ): + deleted = await cleanup_old_tracking() + assert deleted == 0 diff --git a/backend/tests/test_scheduler_service.py b/backend/tests/test_scheduler_service.py index 571d7e55..064718b2 100644 --- a/backend/tests/test_scheduler_service.py +++ b/backend/tests/test_scheduler_service.py @@ -153,11 +153,12 @@ def shutdown(self): scheduler.start_scheduler() assert dummy.running is True - assert len(dummy.jobs) == 7 + assert len(dummy.jobs) == 8 job_ids = {kwargs.get("id") for _, kwargs in dummy.jobs} assert "cleanup_old_messages" in job_ids assert "create_next_month_partitions" in job_ids assert "cleanup_expired_share_links" in job_ids + assert "cleanup_old_bot_tracking" in job_ids assert "file-manager:cleanup_linebot_temp_files" in job_ids assert "file-manager:cleanup_media_temp_folders" in job_ids assert "ai-agent:cleanup_ai_images" in job_ids diff --git a/frontend/js/knowledge-base.js b/frontend/js/knowledge-base.js index ba03047d..5b5a28e7 100644 --- a/frontend/js/knowledge-base.js +++ b/frontend/js/knowledge-base.js @@ -1146,6 +1146,7 @@ const KnowledgeBaseModule = (function() { type: selectedKnowledge.type, category: selectedKnowledge.category, scope: selectedKnowledge.scope || 'global', + is_public: selectedKnowledge.is_public || false, tags: { ...selectedKnowledge.tags }, }; @@ -1221,6 +1222,12 @@ const KnowledgeBaseModule = (function() { ${tags?.projects.map(p => ``).join('')} +