-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
224 lines (182 loc) · 7.51 KB
/
main.py
File metadata and controls
224 lines (182 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# 文件路径: main.py
# 文件名: main.py
# 创建时间: 2025-12-18 23:30:00
# 修改时间: 2025-12-25 22:00:00
# 作者: xiaobai-dev
# 文件版本: v4.0 (优化: 统一使用 AgentOS + PostgresDb 共享数据库,支持生产级内存/会话/监控)
"""
项目主入口文件
为什么: 作为 FastAPI 应用的启动入口,需要集成 Agno AgentOS、路由、中间件、监控和生命周期管理
做什么: 创建统一的 AgentOS 实例,注册所有 Agent/Team,挂载业务路由,提供健康检查和监控端点
如何做:
- 执行结果: 启动一个完整的生产级 AgentOS 应用,支持多 Agent 协作、会话追踪、Tracing 监控
- 修改内容: 本次优化为官方推荐架构:统一 PostgresDb 实例传入 AgentOS,移除手动 lifespan 覆盖,增强稳定性
"""
import sys
import asyncio
from typing import Optional
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.exceptions import RequestValidationError
from fastapi.responses import Response
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
# Windows 系统下需要特殊的事件循环策略
# 为什么: Windows 默认 Selector 事件循环不支持某些异步特性
# 做什么: 设置 WindowsSelectorEventLoopPolicy
# 如何做:
# - 执行结果: 避免 asyncio 运行时警告
# - 修改内容: 无
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# ==================== 核心依赖导入 ====================
from core.lifespan import lifespan
from core.state import app_state
from config import settings
from config.redis_db import get_redis
from config.database import get_db
from middleware import (
RequestTrackerMiddleware,
GlobalErrorMiddleware,
validation_exception_handler
)
from logger import system_logger
# 为什么: 时间工具模块已重构,需要从新路径导入常用函数
# 做什么: 导入时间格式化与日志前缀工具
# 如何做:
# - 执行结果: 可直接使用 format_duration、get_log_header
# - 修改内容: 更新导入路径
from utils.time import (
CoreTime,
human_readable_duration as format_duration,
build_log_prefix as get_log_header,
)
# 为什么: 静态资源目录可能动态变化
# 做什么: 导入目录工具函数
from utils.directory_utils import get_static_dir
# 兼容性别名
now = CoreTime.now
# ==================== 路由导入 ====================
# 为什么: 业务功能分散在不同路由模块中,需要统一注册
# 做什么: 导入登录路由和聊天路由
# 如何做:
# - 执行结果: /login 和 /chat 相关接口可用
# - 修改内容: 无
from routers import login_router, chat_router
# ==================== Agno v2.x 核心集成 ====================
# 为什么: 需要统一数据库实例供所有 Agent 和 AgentOS 使用,实现会话/记忆共享
# 做什么: 创建 PostgresDb 实例(生产级,支持 PgVector)
# 如何做:
# - 执行结果: db 对象可直接传入 AgentOS 和 Agent
# - 修改内容: 移除旧的 agent_db 变量,统一使用 db
from agno.db.postgres import PostgresDb
db = PostgresDb(db_url=settings.DATABASE_URL)
# 为什么: 破冰团队是核心业务,需要注册到 AgentOS
# 做什么: 从 agents 包导入 ice_breaker_team(包含 Supervisor + FlirtMaster)
# 如何做:
# - 执行结果: ice_breaker_team 可通过 AgentOS 调用
# - 修改内容: 无
from agents import ice_breaker_team
# 其他教学/研究 Agent(示例,可按需扩展)
from agno.models.openai import OpenAIChat # 或 DeepSeek 等
tutor_agent = Agent(
name="毒舌教学助手",
model=OpenAIChat(id="gpt-4o"), # 根据实际配置调整
instructions="你是'x iaobai'的毒舌教学助手。说话毒舌但有分寸,嘲讽里带着关心。",
db=db, # 统一使用共享 db
add_history_to_context=True,
markdown=True,
)
research_agent = Agent(
name="研究助手",
model=OpenAIChat(id="gpt-4o"),
instructions="你是一个专业的研究助手。",
db=db,
add_history_to_context=True,
markdown=True,
)
# ==================== AgentOS 统一运行时 ====================
# 为什么: AgentOS 是 Agno v2.x 官方推荐的生产入口,提供内置监控、Tracing、Control Plane 支持
# 做什么: 创建 AgentOS 实例,注册所有 Agent 和 Team
# 如何做:
# - 执行结果: 返回完整的 FastAPI app,包含 AgentOS 内置路由(/docs、/chat 等)
# - 修改内容: 传入共享 db,支持生产级功能
from agno.os import AgentOS
agent_os = AgentOS(
agents=[tutor_agent, research_agent],
teams=[ice_breaker_team], # 核心破冰团队
db=db, # 关键:共享数据库,实现记忆/会话统一管理
)
# 获取最终 FastAPI 应用
app = agent_os.get_app()
# ==================== 中间件与异常处理 ====================
# 为什么: 需要全局异常捕获、请求追踪、CORS 支持
# 做什么: 添加中间件和自定义异常处理器
# 如何做:
# - 执行结果: 所有请求统一处理,提升可观测性
# - 修改内容: 无
app.add_middleware(GlobalErrorMiddleware)
app.add_middleware(RequestTrackerMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.server.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.exception_handler(RequestValidationError)(validation_exception_handler)
# ==================== 静态文件与业务路由挂载 ====================
# 为什么: 前端静态资源需要服务
# 做什么: 挂载 static 目录
# 如何做:
# - 执行结果: /static 路径可用
# - 修改内容: 无
app.mount("/static", StaticFiles(directory=get_static_dir()), name="static")
# 业务路由注册
app.include_router(login_router)
app.include_router(chat_router)
# ==================== 监控端点 ====================
@app.get("/prometheus_metrics")
async def prometheus_metrics():
"""Prometheus 指标导出端点"""
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
# ==================== 系统端点 ====================
@app.get("/", tags=["系统"])
async def root():
"""根路由 - 系统状态"""
runtime = format_duration(int((now() - app_state.startup_time).total_seconds())) if app_state.startup_time else "0秒"
return {
"service": settings.project_name,
"version": settings.project_version,
"status": "running",
"runtime": runtime
}
@app.get("/health", tags=["系统"])
async def health_check(
redis: Redis = Depends(get_redis),
db_session: AsyncSession = Depends(get_db)
):
"""健康检查(Redis + PostgreSQL)"""
try:
await redis.ping()
await db_session.execute(text("SELECT 1"))
return {"status": "healthy", "components": {"redis": "ok", "postgres": "ok"}}
except Exception as e:
system_logger.error(f"Health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
# ==================== 启动入口 ====================
if __name__ == "__main__":
import uvicorn
app_target = "main:app"
uvicorn.run(
app_target,
host=settings.server.host,
port=settings.server.port,
reload=settings.server.reload,
workers=settings.server.workers if not settings.server.reload else 1,
log_level="info",
# AgentOS 已内置 lifespan,无需手动覆盖
)