Skip to content

feat(flexlb): batch scheduling infrastructure, strategy refactor, and error model hardening#1138

Open
wzy-99 wants to merge 12 commits into
feat/dsv4_on_devfrom
feat/flexlb_base
Open

feat(flexlb): batch scheduling infrastructure, strategy refactor, and error model hardening#1138
wzy-99 wants to merge 12 commits into
feat/dsv4_on_devfrom
feat/flexlb_base

Conversation

@wzy-99

@wzy-99 wzy-99 commented Jun 24, 2026

Copy link
Copy Markdown
Collaborator

Summary

This PR introduces FlexLB batch scheduling infrastructure with SLO-based admission control, strategy refactoring, and error model hardening.

Key Changes

Batch Scheduling Infrastructure

  • Batch scheduling with SLO-based admission control
  • FixedWindowBatcherAlgorithm with configurable batch window and SLO budgets
  • BatcherContext with TOCTOU-safe dropHead() and sortedItems() caching
  • FlexlbBatchScheduler with inflight eviction and cancel race protection

Strategy Refactor

  • Extract BatchItem as standalone class to decouple scheduler
  • Isolate WorkerEndpoint state (volatile WorkerStatus for cross-thread visibility)
  • WorkerEndpoint as sole Worker abstraction

Error Model Hardening (C++ engine)

  • Error reporting (MALLOC_FAILED) in GenerateStream lifecycle methods
  • Request timeout detection (checkTimeoutWithoutLock) in GenerateStream
  • ResponseBuffer queue capacity limit (kMaxQueueSize=1000) to prevent OOM
  • Error-bearing unit cleanup in FIFOScheduler waiting queue
  • Pickle backward compatibility for RuntimeConfig/PDSepConfig (size >= N)
  • Pass engine request_id through to API response aux_info

Review Fixes (Round 3)

  • GrpcWorkerStatusRunner: consecutive failure counter (max 3) before marking worker dead
  • worker_status.py: support str role values from JSON config path
  • ConfigService: try-catch for FLEXLB_CONFIG JSON parse fallback
  • FlexlbConfig: NumberFormatException handling
  • BatcherContext: TOCTOU fix with sortedItems caching
  • FixedWindowBatcherAlgorithm: force-dispatch to avoid busy-wait dead loop
  • WorkerEndpoint: volatile WorkerStatus

Test Coverage

  • Unit tests: PrefillEndpoint, DefaultBatchDispatcher, InflightEvictor, PrefillTimePredictor

Previous Reviews

  • Round 1 (16:48 6/23): P0/3 + P1/10 — addressed in 2640241
  • Round 2 (02:11 6/24): P1/10 — addressed
  • Round 3 (03:51 6/24): P0/2 + P1/15 — remaining issues fixed in this PR

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/1 · P1/0 · P2/1 · P3/0

Blocking Issues

P0

  • GrpcWorkerStatusRunner 缺少 AtomicLong import,编译失败 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:43
    • 建议:在 import 区域添加 import java.util.concurrent.atomic.AtomicLong;(与 java.util.Map 等 JDK import 放在一起)。

Non-blocking Suggestions

P2

  • getattr(RoleType, role) 大小写敏感且异常类型与契约不一致 @ rtp_llm/server/worker_status.py:81
    • 建议:改为 getattr(RoleType, role.upper(), None) 并对 None 结果抛出明确的 ValueError,或用 try/except AttributeError 包装并 raise ValueError(...) from e。

Checklist Violations (3 fail / 56 total)

General Principles Checklist

  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue getattr(RoleType, role) 大小写敏感且异常类型与契约不一致
    worker_status.py 中 getattr(RoleType, role) 对无效字符串抛出 AttributeError 而非 ValueError,错误类型与 else 分支不一致。虽然 pydantic 会包装为 ValidationError,但错误消息不清晰。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → checklist-only
    连续失败计数器逻辑是关键行为变更(从单次失败判死改为 3 次连续失败),但 GrpcWorkerStatusCheckRunnerTest 仅覆盖成功路径,缺少失败容忍场景。Python 侧 validate_role str 分支在 schedule_meta_test.py 中已有覆盖。不升级为 issue,因改动较小且 GrpcWorkerStatusRunner 通常在集成层面验证。

Python Static-First Checklist

  • [P.A] 静态结构与类型纪律 — 禁止 getattr/setattr literal 访问 → issue getattr(RoleType, role) 大小写敏感且异常类型与契约不一致
    worker_status.py:81 使用 getattr(RoleType, role) 做枚举名称动态查找,属于 getattr 合理使用场景(动态变量 key),但对非法字符串抛出 AttributeError 而非 ValueError 的行为不理想。已在 issues 中报告。

Strengths

  • 连续失败计数器(consecutiveFailures + MAX_CONSECUTIVE_FAILURES=3)是合理的容错设计,避免单次 gRPC 超时或网络抖动误判 worker 死亡
  • include 路径从 model_rpc/ 修正为 cache/ 与 BUILD 依赖 recent_cache_key_window 同步修改,保持头文件位置与构建依赖一致
  • 成功响应后重置 consecutiveFailures.set(0) 的位置正确——放在 handleStatusResponse 开头,确保只有真正收到有效响应才重置

@wzy-99 wzy-99 force-pushed the feat/flexlb_base branch 2 times, most recently from 5dff119 to e5fa216 Compare June 24, 2026 11:21
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/1 · P1/0 · P2/1 · P3/1

Blocking Issues

P0

  • consecutiveFailures 计数器因 Runner 实例每次新建而永远无法累积到阈值 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:44
    • 建议:将 consecutiveFailures 计数器移到 WorkerStatus 对象上(生命周期跨同步周期),或在 EngineSyncRunner 中维护 Map<String, AtomicLong> 逐 worker 累积失败次数,构造 Runner 时传入。

Non-blocking Suggestions

P2

  • gRPC 异常路径重复上报两次错误指标 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:91
    • 建议:在 launchGrpcStatusCheck 返回 null 后跳过 handleStatusResponse 中的 RESPONSE_NULL 上报,或在 handleStatusResponse 中区分「gRPC 异常导致的 null」和「正常返回但 body 为空」两种情况。

P3

  • Enum 查找建议使用 item 访问而非 getattr @ rtp_llm/server/worker_status.py:82
    • 建议:若 RoleType 是标准 Enum,改用 RoleType[role.upper()] 并将 except 改为 KeyError;若是 pybind11 枚举,添加注释说明使用 getattr 是有意为之。

Checklist Violations (7 fail / 69 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue consecutiveFailures 计数器因 Runner 实例每次新建而永远无法累积到阈值
    GrpcWorkerStatusRunner.consecutiveFailures 是实例字段,但 EngineSyncRunner 每个同步周期新建 Runner 实例(line 139),导致计数器永远从 0 开始,永远达不到阈值 3。Worker liveness 检测失效——持续 gRPC 失败时 worker 永远不会被标记为 dead。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue gRPC 异常路径重复上报两次错误指标
    gRPC 异常时 handleException 上报错误指标后返回 null,handleStatusResponse(null) 再次上报 RESPONSE_NULL,同一失败产生两条独立指标,干扰监控准确性。
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → checklist-only
    return_request_id 字段从 GenerateConfig/AuxInfo/GenerateOutputs 移除。该功能默认 false、commit 标注为 revert、未正式发布,pydantic BaseModel 默认忽略 extra fields,实际兼容性风险极低,不升级为 issue。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → checklist-only
    GrpcWorkerStatusCheckRunnerTest 覆盖了单次调用场景,但未测试连续失败累积。由于 Runner 是每次新建实例,现有测试框架无法覆盖跨周期累积——这是 P0 bug 的测试缺口,修复对象生命周期后需补充跨周期累积测试。
  • [6.1] Quality — Mega-PR 已拆分为独立变更 → checklist-only
    PR 混合了三个独立主题:return_request_id 移除、FlexLB 重构、C++ API 适配。各主题通过独立 commit 分隔,但理想情况下应拆为独立 PR。当前 commit 粒度清晰,不阻塞合入。
  • [6.1] Quality — PR description 说明动机与设计 → checklist-only
    从 commit 序列可推断 PR 动机,但 return_request_id 移除缺少显式迁移说明。因该功能默认 false 且未正式发布(commit 标注 revert),实际影响有限,不升级为 issue。

Python Static-First Checklist

  • [P.A] 静态结构与类型纪律 — 禁止 getattr/setattr literal 访问 → issue Enum 查找建议使用 item 访问而非 getattr
    worker_status.py:82 使用 getattr(RoleType, role.upper())。若 RoleType 是 pybind11 枚举则不支持 getitem,getattr 可接受;若是标准 Enum 应改用 RoleType[role.upper()]。当前因 except AttributeError 兜底不会出错,属 P3 风格改进。

Strengths

  • BatcherContext 的 dispatch/dropHead 从 queue.remove(item) 改为 remove(item),修复了 sortedCacheDirty 缓存失效标记未更新导致 sortedItems() 返回过期缓存的正确性缺陷
  • return_request_id 透传功能清理彻底:跨 9 个文件移除所有引用链,包括 GenerateConfig、ServerConfig、AuxInfo、GenerateOutputs、frontend_worker、openai_endpoint、custom_renderer、model_rpc_client、server_group_args
  • C++ cuda13 编译修复系统性解决 absl::string_view 隐式转换、include 路径(RecentCacheKeyWindow.h → cache/)和 BUILD 依赖问题
  • FlexLB 测试从旧 API (TaskStateEnum/waitingTaskList/localTaskMap) 完整迁移到新 API (TaskPhase/runningTaskList),新增空列表、null 列表、队列满等边界覆盖

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/1 · P1/0 · P2/2 · P3/0

Blocking Issues

P0

  • GrpcWorkerStatusRunner 的 consecutiveFailures 计数器因实例每周期重建而永远无法累积 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:44
    • 建议:将 consecutiveFailures 计数器移至 WorkerStatus 对象(已通过 getOrCreateWorkerStatus 跨周期复用),或将 GrpcWorkerStatusRunner 改为每个 worker 缓存复用同一实例。

Non-blocking Suggestions

P2

  • envValueIsFalse/envValueIsTrue 及相关环境变量检查函数在两个文件中重复定义 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:66
    • 建议:将 envValueIsTrue/envValueIsFalse 和 prefillTraceLogEnabled/prefillTheoryHitLogEnabled 提取到公共 utility header(如 rtp_llm/cpp/utils/EnvUtils.h),两处统一引用。
  • GrpcWorkerStatusRunner 连续失败阈值 MAX_CONSECUTIVE_FAILURES=3 硬编码且偏低 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:45
    • 建议:通过 FlexlbConfig 使阈值可配置,默认值可适当提高以适应瞬时网络抖动;或至少在常量旁注释选择 3 的依据。

Checklist Violations (9 fail / 88 total)

General Principles Checklist

  • [6.1] Software Engineering — DRY:重复非平凡逻辑被抽取或显式复用 → issue envValueIsFalse/envValueIsTrue 及相关环境变量检查函数在两个文件中重复定义
    envValueIsFalse/envValueIsTrue 及 prefillTraceLogEnabled/prefillTheoryHitLogEnabled 在 PrefillRpcServer.cc:60-103 和 PrefillCacheHitMetricsReporter.cc:22-60 完全重复定义,共 4 个函数。
  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue GrpcWorkerStatusRunner 的 consecutiveFailures 计数器因实例每周期重建而永远无法累积
    GrpcWorkerStatusRunner 的 consecutiveFailures 是实例字段,但 EngineSyncRunner 每周期创建新实例(:139-142),计数器无法累积,setAlive(false) 永远不执行。
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → checklist-only
    return_request_id/enable_return_request_id 字段直接删除无 deprecation warning。该特性默认 false 且为内部调试用,移除对大多数部署无影响,风险较低,不升级为 issue。
  • [6.1] Quality — 逻辑变更未混入无关格式化 → checklist-only
    PR 混合多个独立特性方向但各方向内部逻辑自洽,不是格式化噪声混入。在 feature branch 上下文中可接受,不升级为 issue。
  • [6.1] Quality — Mega-PR 已拆分为独立变更 → checklist-only
    28 个文件跨 Python/C++/Java 三语言含 5+ 个独立方向。建议拆分,但在 feature branch 合并场景下各方向存在关联,可接受,不升级为 issue。

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue envValueIsFalse/envValueIsTrue 及相关环境变量检查函数在两个文件中重复定义
    envValueIsFalse/envValueIsTrue 及 prefillTraceLogEnabled/prefillTheoryHitLogEnabled 在 PrefillRpcServer.cc 和 PrefillCacheHitMetricsReporter.cc 完全重复定义(共 4 个函数)。
  • [A] 兼容性与配置 — 环境变量/命令行参数/proto 重命名有向后兼容 fallback → checklist-only
    --enable_return_request_id CLI 参数及对应 ServerConfig 字段直接删除。该特性默认 false 且为内部调试用,移除对大多数部署无影响,与兼容性 checklist 同因,不升级为 issue。
  • [B] 正确性与逻辑 — 逻辑错误、off-by-one、null/zero 检查 → issue GrpcWorkerStatusRunner 的 consecutiveFailures 计数器因实例每周期重建而永远无法累积
    GrpcWorkerStatusRunner.consecutiveFailures 是实例字段,EngineSyncRunner 每周期 new 实例(:139-142),计数器永远从 0 开始,setAlive(false) 永远不执行。
  • [B] 正确性与逻辑 — 状态标志有完整 set/reset 生命周期 → issue GrpcWorkerStatusRunner 的 consecutiveFailures 计数器因实例每周期重建而永远无法累积
    consecutiveFailures 的 set(0) 和 incrementAndGet 无法跨周期配合,因 EngineSyncRunner 每周期重建 GrpcWorkerStatusRunner 实例。

Strengths

  • return_request_id 全链路清理彻底:GenerateConfig、ServerConfig、AuxInfo、GenerateOutputs、前端、OpenAI endpoint、custom_renderer、model_rpc_client 所有生产者和消费者均同步移除,无遗留引用
  • BatcherContext.dispatch/dropHead 从 queue.remove() 改为 this.remove() 正确修复了 sortedCacheDirty 标志未更新的缓存失效 bug
  • FIFOScheduler group isolation 测试覆盖全面,含 size2/3/4、group+single 混合、多 group 隔离、交错入队等边界场景

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/1

lgtm ready to ci

Non-blocking Suggestions

P2

  • FlexLB 默认 batch 算法从 slo_budget 改为 fixed_window,影响现有部署 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:364
    • 建议:在 PR description 或 release notes 中明确标注此默认值变更及其影响,确认所有现有部署已评估或显式设置了 flexlbBatchAlgorithm。如果 fixed_window 已经过充分验证且确实更适合默认场景,可保留此变更。

P3

  • envValueIsFalse 在 PrefillRpcServer.cc 和 PrefillCacheHitMetricsReporter.cc 中重复定义 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:63
    • 建议:将 envValueIsTrue/envValueIsFalse 提取到共享 utility header(如 rtp_llm/cpp/utils/EnvUtils.h),两个源文件统一引用。

Checklist Violations (7 fail / 103 total)

General Principles Checklist

  • [6.1] Software Engineering — DRY:重复非平凡逻辑被抽取或显式复用 → issue envValueIsFalse 在 PrefillRpcServer.cc 和 PrefillCacheHitMetricsReporter.cc 中重复定义
    envValueIsFalse/envValueIsTrue 在 PrefillRpcServer.cc(文件作用域)和 PrefillCacheHitMetricsReporter.cc(匿名命名空间)中各有完整独立实现,逻辑完全相同,应提取到共享工具头文件。
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → issue FlexLB 默认 batch 算法从 slo_budget 改为 fixed_window,影响现有部署
    FlexlbConfig.flexlbBatchAlgorithm 默认值从 slo_budget 变更为 fixed_window,未显式配置的现有部署将静默切换批处理行为,丧失 SLO deadline tracking 和 EMA 估计。
  • [6.1] Tests — 被删除测试有等价替代覆盖 → checklist-only
    FlexlbBatchSchedulerTest 删除了 processQueue_dispatches_when_batch_size_reached 测试,slo_budget 仍作为 WorkerBatcher 的 fallback 算法存在(第 49 行)。WorkerStatusTest 和 ConfigServiceTest 删除的测试已由 PrefillResourceMeasureTest 等替代,但 slo_budget batch size 触发路径覆盖有所降低。影响有限因 slo_budget 已非默认算法。

RTP-LLM Checklist

  • [A] 兼容性与配置 — 环境变量/命令行参数/proto 重命名有向后兼容 fallback → checklist-only
    --enable_return_request_id CLI 参数和 ENABLE_RETURN_REQUEST_ID 环境变量被完全移除,无 deprecation warning 或 fallback。使用此参数的部署脚本将因 argparse unrecognized argument 启动失败。影响有限(默认 False 且标注为 internal debugging only),但严格来说缺少向后兼容 fallback。
  • [A] 兼容性与配置 — 默认值变更已评估对现有用户影响 → issue FlexLB 默认 batch 算法从 slo_budget 改为 fixed_window,影响现有部署
    FlexlbConfig.flexlbBatchAlgorithm 默认从 slo_budget → fixed_window。环境变量可覆盖,但未在 PR 中标注影响评估。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → checklist-only
    FIFOScheduler group 隔离新增多个测试覆盖全面,Java 测试重写适配新 API。但 slo_budget batcher 的 batch size 触发测试(processQueue_dispatches_when_batch_size_reached)被删除,slo_budget 仍作为 WorkerBatcher 的 fallback 算法存在,该路径覆盖有所降低。影响有限因 slo_budget 已非默认算法。
  • [I] 代码质量 — 同一功能用统一工具函数 → issue envValueIsFalse 在 PrefillRpcServer.cc 和 PrefillCacheHitMetricsReporter.cc 中重复定义
    envValueIsFalse/envValueIsTrue 在 PrefillRpcServer.cc(文件作用域)和 PrefillCacheHitMetricsReporter.cc(匿名命名空间)中各有完整独立实现,逻辑完全相同。应提取到共享工具头文件。

Strengths

  • PrefillRpcServer EnqueueGroup 中 slots_ptr lambda 捕获修复:正确捕获 shared_ptr<vector>,防止 outer lambda 销毁后 inner lambda 中的裸指针悬空(use-after-free 修复)
  • BatcherContext.remove() 修复:将 queue.remove(item) 改为 remove(item),确保移除元素时正确设置 sortedCacheDirty = true,避免排序缓存返回已删除的过时数据
  • GrpcWorkerStatusRunner 引入连续失败计数器(AtomicLong),要求连续 3 次 gRPC 失败才将 worker 标记为 dead,避免单次网络抖动导致节点误下线
  • FIFOScheduler group isolation 新增全面测试用例,覆盖 group size 2/3/4、group 与 single 不混排、多 group 隔离、interleaved 等边界场景
  • GenerateStream.cc 将错误的 result.status().message() 改为正确的 result.message(),并显式构造 std::string 避免 string_view 拼接歧义
  • return_request_id 特性清理彻底:config、server args、frontend、openai endpoint、renderer、datatypes 全链路联动删除,无悬挂引用

@github-actions

Copy link
Copy Markdown

CI dispatcher could not find a native build run for HEAD SHA c8af443e.

This can happen if the PR was opened before the CI architecture change, or if the original run was deleted.

To fix: push any commit (even empty: git commit --allow-empty -m "trigger CI" && git push) to create a native build run, then re-approve or post lgtm ready to ci.

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/0 · P1/3 · P2/6 · P3/2

Blocking Issues

P1

  • isNonBatchPath 逻辑遗漏 AUTO 模式下不符合 batch 条件的请求 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:274
    • 建议:CostBasedPrefillStrategy 只能被非 batch 路径到达(batch 路径走 FlexlbBatchScheduler),因此 isNonBatchPath 应改为 return ctx.getScheduleMode() != ScheduleModeEnum.BATCH; 或直接去掉条件始终 commit。当前逻辑在默认配置下(batchEnabled=true, AUTO)会导致 maxNewTokens<=1 等不合格 batch 请求无 inflight 追踪,cost-based scoring 失准引起热点。
  • GrpcWorkerStatusRunner.getRole().name() 可能 NPE @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:111
    • 建议:改为防空: String roleName = newWorkerStatus.getRole() != null ? newWorkerStatus.getRole().name() : "UNKNOWN"; 或在 RoleTypeProtoConverter 中为 default case 返回一个 fallback 枚举值而非 null。NPE 会导致该 worker 的状态同步线程中断,后续所有健康检查失效。
  • RoleAddr.validate_role 不再接受 str,但 serialize_role 仍输出 str,JSON 往返反序列化会失败 @ rtp_llm/config/generate_config.py:53
    • 建议:validate_role 应同时接受 str,与 ServerStatus.validate_role (worker_status.py:74-86) 保持一致: 增加 elif isinstance(v, str): return getattr(RoleType, v.upper())。或将 serialize_role 改为输出 int (role.value),使序列化格式与验证器匹配。

Non-blocking Suggestions

P2

  • 非批次路径 inflight 无上限,可能在高 QPS 下放大 realWaitTimeMs() 迭代开销 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:253
    • 建议:考虑为非批次路径添加类似 maxInflightBatchesPerWorker 的准入控制,或在 doSelect 内缓存 realWaitTimeMs() 结果避免同一轮决策中对同一 endpoint 重复迭代(当前单轮最多调用 5 次)。短期内 TTL=5min 兜底可防止无限增长,但在 engine crash + 高 QPS 场景下 5 分钟内可能积累大量 entry。
  • realWaitTimeMs() 在单次 doSelect 中对同一 endpoint 重复调用多达 5 次 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:142
    • 建议:在 doSelect 开头对每个 endpoint 计算一次 waitMs 并缓存到局部 Map<PrefillEndpoint, Long>,后续引用缓存值。这样每个 endpoint 只遍历一次 inflightBatches,减少 N5 → N1 次 map 迭代。注意:此为既有模式,本 PR 通过增加非批次 inflight 条目使其影响略有放大。
  • CostBasedPrefillStrategy 测试未覆盖非 batch 路径 inflight 追踪 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:253
    • 建议:添加测试用例:设置 scheduleMode=DIRECT 或 flexlbBatchEnabled=false,select 后验证 ep.realWaitTimeMs() > 0 / inflightBatches 包含 requestId 条目;并验证 rollBack 后条目被清除。
  • calibrate 中非 batch 请求 warn 日志在 isNonBatchPath 缺陷下会频繁误报 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/PrefillEndpoint.java:147
    • 建议:修复 isNonBatchPath 后此问题自然消失。如果保持现状,应将 warn 降为 debug 以避免日志刷屏(因为 non-batch 请求未被 commit 到 inflight map 是一个已知的'设计'行为)。
  • is_backend_service_ready 硬编码返回 True,前端健康检查失效 @ rtp_llm/server/backend_rpc_server_visitor.py:549
    • 建议:即便 FlexLB 接管路由,健康检查仍应验证 FlexLB master 可达性,否则所有请求在运行时才发现路由失败。建议至少保留一个轻量级的 FlexLB 连通性检查,或在注释中说明上游 K8s probe 如何覆盖此场景。同时移除注释掉的旧代码,避免 dead code。
  • ConfigService 解析 FLEXLB_CONFIG 失败时静默回退到默认配置 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:37
    • 建议:配置解析失败回退到默认值可能导致生产环境用错误配置静默运行(如 TTL、batch size 等全用默认值)。建议改为 fail-fast 抛出异常,或至少上报一个告警 metric。

P3

  • psutil 从延迟导入改为模块顶层导入,影响 45 个导入者的启动速度 @ rtp_llm/utils/util.py:12
    • 建议:保持 psutil 的延迟导入(在 stop_server 内部 import),或移至仅 stop_server 使用的独立模块,避免所有导入 util.py 的模块无条件加载这个 C 扩展。
  • psutil 顶层 import 无 ImportError 保护 @ rtp_llm/utils/util.py:13
    • 建议:psutil 是必需依赖且已列入 requirements,顶层 import 是合理的模式改进。无需改动,仅注意确保 psutil 在所有测试/生产环境的 requirements 中。

Checklist Violations (1 fail / 56 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue CostBasedPrefillStrategy 测试未覆盖非 batch 路径 inflight 追踪
    isNonBatchPath 在默认配置下返回 false,导致 CostBasedPrefillStrategyTest 从未触发 commitBatch 路径。非 batch inflight 追踪的核心新逻辑缺乏有效测试覆盖。

Strengths

  • 将 inflight TTL 从 1 小时降至 5 分钟,显著减少 engine crash 后 stale entry 对 realWaitTimeMs 估算的扭曲时间窗口
  • endpointRegistry.evictExpiredAll() 独立调度,解耦了 BatchScheduler 和 EndpointRegistry 的生命周期职责
  • 非批次路径正确实现了 inflight 预留(commitBatch)+ 回滚(releaseBatch),使 load-aware scoring 对所有路由路径生效
  • calibrate() 中对 batchId<0 的非批次请求立即清理 inflight,避免等到 TTL 才释放
  • TTL eviction 从 FlexlbBatchScheduler 解耦到 EndpointRegistry 自身的 @scheduled 方法,职责更清晰
  • 非 batch 请求的 inflight 清理逻辑(calibrate Phase 1)设计思路正确:按 requestId 作为 key 直接移除
  • rollBack 方法从空实现改为实际释放 inflight 预留,修复了路由失败时的 inflight 泄漏
  • TTL 从 1h 降低到 5min 更合理地平衡了安全网延迟与 realWaitTimeMs 准确度

…ng, and PrefillRpcServer metrics

Combined from:
- cc55836 feat: sync flexlb batch async fetch
- b422c6c feat(flexlb): batch scheduling, strategy refactor, error model hardening, and test coverage
- 89adee8 feat(PrefillRpcServer): enhance thread pool management with worker lambda pool and metrics
- 909cfe5 feat(util): add requests library import for HTTP functionality
@wzy-99 wzy-99 force-pushed the feat/flexlb_base branch from 5c9f37a to 63e65ed Compare June 26, 2026 03:32
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/0 · P1/11 · P2/16 · P3/2

Blocking Issues

P1

  • setPrefillCoefficients 解析中途失败导致系数半更新 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:314
    • 建议:先解析到临时变量,全部成功后再赋值。parts.length<3 时 log.warn。
  • enqueue() 硬编码 group_id=-1,丢弃 stream 的 group 语义 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:116
    • 建议:enqueue() 应读取 stream->groupId(),或对 isGroup() stream 拒绝入队并要求使用 enqueueGroup()。
  • admitWaitingUnits 清除 errored unit 时未 finish 组内无错 stream,导致永久挂起 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:217
    • 建议:erase unit 前对所有 stream 调用 reportError + finish()。
  • master_client_test.py 使用旧 HTTP 签名,与新 gRPC API 不匹配 @ rtp_llm/server/test/master_client_test.py:46
    • 建议:重写 _CaptureMasterClient 匹配新 gRPC 签名,修复所有断言。
  • RpcCpuTpBroadcaster 使用 DP 过滤的地址列表,dp_size>1 时 CHECK 必崩 @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:115
    • 建议:传入 all_worker_grpc_addrs(本 PR 新增字段,world_size 大小)而非 worker_grpc_addrs。
  • getEngineScheduleInfo 持 shared_lock 却写入 entry.task_info.phase,数据竞争 @ rtp_llm/cpp/model_rpc/RpcServerRuntimeMeta.h:33
    • 建议:拷贝到局部变量再修改:auto copy = entry.task_info; copy.phase = ...; push_back(copy);
  • pushTask 后 use-after-move,fallback 线程拿到空 lambda @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1853
    • 建议:先传拷贝给 pushTask:auto err = pool->pushTask(finish_lambda); if (err) std::thread(std::move(finish_lambda)).detach();
  • activelyNotifyParticipants 通知条件反转,远程节点永远收不到 master 切换通知 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/ZookeeperMasterElectService.java:371
    • 建议:改为 !localIp.equals(participant.getId()),与注释 'Only notify non-master participants' 一致。
  • waitForLeadershipTransfer while(true) 无退出上限,ZK 分区时 shutdown 永久阻塞 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/ZookeeperMasterElectService.java:216
    • 建议:增加最大等待次数(如 30 次 = 30 秒),超过后 warn + 强制退出。
  • ResourceMeasure 接口重构导致 RandomStrategy 资源检查失效 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/resource/ResourceMeasure.java:22
    • 建议:DecodeResourceMeasure/PrefillResourceMeasure 需 @OverRide isResourceAvailable(WorkerEndpoint),内部 instanceof 委托到类型化方法。
  • RoleAddr validator 删除 string 支持但 serializer 仍输出 string @ rtp_llm/config/generate_config.py:54
    • 建议:validator 应同时支持 str:if isinstance(v, str): return RoleType[v]。或 serializer 改为输出 int。

Non-blocking Suggestions

P2

  • getParsedSloBuckets 无同步保护、返回可变 List、静默吞异常 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:467
    • 建议:setCostSloBuckets 中 eager init;返回 unmodifiableList;catch 中 log.warn。
  • WorkerStatus.updateFromResponse 混用 AtomicLong 和非 volatile 字段无同步 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/dao/master/WorkerStatus.java:53
    • 建议:updateFromResponse 内加 lock,或将高频读取字段改为 volatile。
  • applyTrafficPolicyOverride 缺 try-catch,异常配置导致启动崩溃 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:123
    • 建议:仿 FLEXLB_CONFIG 路径加 try-catch,解析失败时 log.error 并跳过覆盖。
  • canAdmitUnit 对 group unit 完全跳过 token 预算检查 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:200
    • 建议:对 group unit 也应检查 token 预算,或添加注释说明此为有意行为并记录 WARNING。
  • RuntimeConfig/PDSepConfig pickle 不向后兼容 @ rtp_llm/cpp/pybind/ConfigInit.cc:1305
    • 建议:接受旧长度(13/20),新字段可选读取并使用默认值。参考 ProfilingDebugLoggingConfig 的做法。
  • ResponseBufferWriter::Write 队列满时静默丢弃最旧消息 @ rtp_llm/cpp/model_rpc/ResponseBuffer.cc:122
    • 建议:首次丢弃时 log WARNING(限频),或设置 error_status 通知消费端。
  • countStreams() 每次遍历所有 unit,schedule() 热路径调用 8 次 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:169
    • 建议:维护增量计数器(enqueue 时 +=, erase 时 -=),或在 schedule() 开头缓存一次。
  • RpcCpuTpBroadcaster inbox 无大小上限 @ rtp_llm/cpp/distribute/RpcCpuTpBroadcaster.cc:281
    • 建议:添加 inbox_ 大小上限(如 tp_size * 128),超限时 reject 并 LOG_WARNING。
  • stopAsyncResponseWorkers 超时后 force reset count 可能下溢 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:783
    • 建议:用 atomic + stop flag,finishAsyncResponseWorker 中检查 stop flag。
  • ExpirationCleaner 移除 worker 后未清理 EndpointRegistry @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/schedule/ExpirationCleaner.java:49
    • 建议:移除 worker 后通知 EndpointRegistry 清理对应 endpoint(close batcher + 移除 map entry)。
  • ensureEndpoint 异常中断整个 sync 轮次 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/EngineSyncRunner.java:233
    • 建议:改为 Logger.warn + continue,只跳过该 worker 不中断循环。
  • LBStatusConsistencyService 静态线程池未关闭 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/LBStatusConsistencyService.java:27
    • 建议:在 destroy() 中增加 SCHEDULED_EXECUTOR_SERVICE.shutdownNow()。
  • GenerateConfig 删除 force_batch 字段可能破坏外部 JSON 调用方 @ rtp_llm/config/generate_config.py:175
    • 建议:确认无外部消费者,或添加向后兼容别名 / model_config extra='ignore'。
  • applyEnvironmentOverrides 直接 field.set() 绕过自定义 setter @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:89
    • 建议:对存在自定义 setter 的字段改用 setter 调用。
  • EnqueueBatch 热路径同步执行 response_registry_.gc() @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1597
    • 建议:删除此行,后台 GC 线程已覆盖。
  • appendPrefillTheoryHitLogLine 无轮转机制,日志文件无限增长 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:455
    • 建议:添加文件大小检查和轮转,或通过现有日志框架输出。

P3

  • NormalExecutor 和 MtpExecutor 中 readEnvFlagOnce 重复定义 @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:52
    • 建议:提取到共用 utils 头文件。
  • GatherBatchScheduler 重复定义父类已有的方法 @ rtp_llm/cpp/engine_base/schedulers/GatherBatchScheduler.h:107
    • 建议:将父类方法改为 protected,子类直接复用。

Checklist Violations (7 fail / 56 total)

General Principles Checklist

  • [6.1] Software Engineering — LSP:子类/重写保持基类契约 → issue ResourceMeasure 接口重构导致 RandomStrategy 资源检查失效
    ResourceMeasure 默认方法与子类 overload 形成隐式契约断裂,RandomStrategy 走默认实现跳过资源检查
  • [6.1] Software Engineering — DRY:重复非平凡逻辑被抽取或显式复用 → issue NormalExecutor 和 MtpExecutor 中 readEnvFlagOnce 重复定义
    readEnvFlagOnce/readEnvIntOnce 在 NormalExecutor.cc 和 MtpExecutor.cc 中重复定义;GatherBatchScheduler 重复父类方法
  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue admitWaitingUnits 清除 errored unit 时未 finish 组内无错 stream,导致永久挂起
    admitWaitingUnits 清除 errored unit 时不 finish 组内无错 stream;stopAsyncResponseWorkers 超时后 count 下溢
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue ensureEndpoint 异常中断整个 sync 轮次
    ensureEndpoint 异常中断整个 sync 轮次而非 skip + continue
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → issue RuntimeConfig/PDSepConfig pickle 不向后兼容
    RuntimeConfig/PDSepConfig pickle 不兼容旧版本;GenerateConfig 删除 force_batch 字段;RoleAddr serializer/validator 不一致
  • [6.1] Tests — 被删除测试有等价替代覆盖 → issue master_client_test.py 使用旧 HTTP 签名,与新 gRPC API 不匹配
    master_client_test.py 使用旧 HTTP API 签名,与新 gRPC 接口不匹配,测试必定失败

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue NormalExecutor 和 MtpExecutor 中 readEnvFlagOnce 重复定义
    readEnvFlagOnce/readEnvIntOnce 在 NormalExecutor.cc 和 MtpExecutor.cc 重复定义

Strengths

  • GenerateStateMachine 重构为 GenerateStream 上清晰的生命周期方法 (prepare/isReady/activate/advance/finish),大幅提升可维护性
  • 新增 ScheduleUnit 抽象统一管理 group 和 single stream 调度,消除旧 evaluateWaitingStreams 的复杂 group-gathering 逻辑
  • WorkerStatus 移除 localTaskMap ConcurrentHashMap 及其状态机迭代,简化为直接吸收 gRPC 响应
  • Logger 从自定义 log level + lambda 分配改为直接委托 SLF4J,消除每次调用的不必要对象分配
  • ConfigService 新增 FLEXLB_CONFIG 解析 try-catch 兜底,异常 JSON 不再导致启动失败

wzy-99 added 6 commits June 26, 2026 14:43
…e + 3 P1 fixes

- P0: move consecutiveFailures from GrpcWorkerStatusRunner instance field to
  WorkerStatus object, so the counter persists across sync cycles instead of
  being reset every time a new Runner is created
- P1: fix isNonBatchPath to use != BATCH instead of == DIRECT, so AUTO-mode
  requests that fail batch qualification also get inflight tracking
- P1: add null guard for newWorkerStatus.getRole().name() to prevent NPE
  crashing the sync thread
- P1: add str branch in generate_config.py validate_role for JSON round-trip
Add per-request INFO-level event logs across engine/scheduler/executor/
dispatcher layers for complete request traceability (prefill → decode →
finish). All events carry trace_id via streamLogTag() formatted as
'trace_id=XXX req_id=YYY'.

Changes:
- GenerateStream.h: implement streamLogTag() inline with trace_id+req_id
- FIFOScheduler.cc: add request_activated (waiting→running + loading→
  running) and request_finished (cleanup) logs
- NormalExecutor.cc: add trace_id to prefill_batch_begin; add new
  decode_step_begin event for decode batch visibility
- NormalOutputDispatcher.cc: add first_token + decode_finished per-request
  events (compatible with dispatchOutputAsync path)
- frontend_server.py: add request_arrival + request_completion logs

Design: all events use RTP_LLM_ACCESS_LOG_INFO (no [RANK][file:line]
prefix); INFO only at batch boundaries/state transitions; trace_id
coverage on all per-request events.
P1 #10: ResourceMeasure 重构导致 RandomStrategy 资源检查失效
  - DecodeResourceMeasure/PrefillResourceMeasure 添加 @OverRide isResourceAvailable(WorkerEndpoint) 桥接方法
  - 通过 instanceof 委托到类型化方法,修复之前仅走接口 default 方法(只查 isAlive())的 bug

P1 #8: activelyNotifyParticipants 通知条件反转
  - 修正 localIp.equals 为 !localIp.equals,确保远程节点能收到 master 切换通知

P1 #9: waitForLeadershipTransfer 无退出上限
  - 新增 MAX_WAIT_COUNT=30 最大等待保护,超时后 warn + 强制退出

P2 #3: applyTrafficPolicyOverride 缺 try-catch
  - JsonUtils.toObject 包裹 try-catch,防止异常 JSON 导致启动崩溃

P2 #12: LBStatusConsistencyService 静态线程池未关闭
  - 添加 @PreDestroy shutdown() 关闭 SCHEDULED_EXECUTOR_SERVICE
…ia PDSepConfig

- FlexlbConfig: flexlbBatchQueueMaxSize 64 → 1024
- PDSepConfig: add prefill_enqueue/worker_lambda/slot_pool_size fields (default 0 = formula)
- PrefillRpcServer: initThreadPools() reads from pd_sep_config instead of hardcoded/env var
- ConfigInit.cc: pybind registration + pickle for 3 new PDSepConfig fields
- pd_separation_group_args.py: CLI args for 3 thread pool sizes
…n in .h)

Commit f1df601 added an inline definition in GenerateStream.h but forgot
to remove the existing out-of-line implementation in GenerateStream.cc,
causing a redefinition error on cuda13_x86 builds.
- Add flexlbBatchFixedMaxInflightBatches config (default 0, disabled)
  to limit in-flight batches per prefill worker in fixed_window mode
- Rename SLO batcher config flexlbBatchMaxInflightBatchesPerWorker
  -> flexlbBatchSloMaxInflightBatches for naming consistency
- FixedWindowBatcherAlgorithm parks instead of dispatching when
  engine inflight batch count >= limit, preventing engine overload
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/0 · P1/11 · P2/44 · P3/0

Blocking Issues

P1

  • streamLogTag 格式变更导致 Python/C++ 日志关联断裂 @ rtp_llm/cpp/engine_base/stream/GenerateStream.h:465
    • 建议:统一 C++/Python 的字段名和顺序。建议 C++ 保持 'request_id=' 而非缩写 'req_id=',或同步更新 rtp_llm/dash_sc/inference/servicer.py:108 的 stream_log_tag() 格式。同时保持字段顺序一致(两边都用 trace_id 在前或 request_id 在前)。
  • InflightEvictor 回调异常会中断后续所有 entry 的清理 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/InflightEvictor.java:52
    • 建议:在 evictExpired 循环内用 try-catch 包裹 onEvict.accept() 调用,确保单个 entry 的回调异常不影响其余 entry 的清理。例如:try { onEvict.accept(entry.getValue()); } catch (Exception e) { Logger.error(...); }
  • WorkerBatcher.shutdown() 与 waitForNonEmpty() 存在竞态导致请求丢失 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:130
    • 建议:将 waitForNonEmpty() 改为 peek() + 短暂 sleep 或 LockSupport.parkNanos 的模式,避免从队列中临时移除 item。或者在 shutdown 中等待 workerThread 终止后再 drainTo,确保 take-put 原子窗口已关闭。
  • BatchInflight.lastSeenMs volatile 读-改-写存在竞态条件 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/BatchInflight.java:76
    • 建议:将 lastSeenMs 改为 AtomicLong 并使用 updateAndGet(base -> Math.max(base, statusMs)),与 progressBaseMs 保持一致的模式。
  • BatcherContext.sortedItems() 缓存在 offer() 添加新项后不会失效 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:74
    • 建议:在 WorkerBatcher.offer() 的 queue.add(item) 之后,通过 BatcherContext 暴露的方法将 sortedCacheDirty 设为 true;或者在 BatcherContext 中提供一个 offer() 包装方法,确保添加元素时缓存失效。
  • DefaultRouterTest 向全局静态 LoadBalanceStrategyFactory 注册 mock 但从不清理 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultRouterTest.java:93
    • 建议:在 tearDown() 中恢复 LoadBalanceStrategyFactory 的原始注册(或把注册行删掉,因为 mockStaticLoadBalanceStrategyFactory() 已通过反射覆写 DefaultRouter 内部 map,不需要注册到全局工厂)。另外 line 95 用 vitLoadBalancer 覆盖了 COST_BASED_PREFILL,应改为正确的 strategy enum 或直接移除这些 register 调用。
  • DefaultBatchDispatcherTest 缺少 @AfterEach 关闭 dispatcher,导致线程泄漏 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultBatchDispatcherTest.java:55
    • 建议:添加 @AfterEach void tearDown() { dispatcher.shutdown(); } 确保每个测试后线程池被关闭。
  • EngineWorkerStatusTest 使用内联清理而非 @AfterEach,断言失败时全局状态泄漏 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/sync/status/EngineWorkerStatusTest.java:81
    • 建议:添加 @AfterEach 方法统一清理所有 status map(decode/prefill/pdfusion/vit),移除每个测试末尾的内联 clear()。参考 DefaultRouterTest 的 tearDown() 写法。
  • CostBasedPrefillStrategyTest 缺少 @AfterEach 清理全局 prefill status map @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/strategy/CostBasedPrefillStrategyTest.java:46
    • 建议:添加 @AfterEach void tearDown() { EngineWorkerStatus.MODEL_ROLE_WORKER_STATUS.getPrefillStatusMap().clear(); } 确保测试隔离。
  • PDSepConfig unpickle 向后不兼容:旧版序列化数据无法反序列化 @ rtp_llm/cpp/pybind/ConfigInit.cc:1756
    • 建议:将最小 size 检查保留为 23(旧版最小值),对 index 23-25 使用条件赋值:if (t.size() > 23) c.prefill_enqueue_pool_size = t[23].cast<int64_t>(); 等。这样旧版数据可以用默认值 0 正常反序列化,保持滚动升级兼容。
  • setPrefillCoefficients 解析失败时会产生部分更新,导致系数不一致 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:325
    • 建议:先解析到临时变量中,全部成功后再一次性赋值。当前实现中如 parts[1] 解析失败,costAlpha0 已被修改但 costAlpha1-5 保持旧值,违反了注释中 'Keep existing default values' 的约定,产生不一致的系数组合,影响 CostBasedPrefill 负载均衡决策。

Non-blocking Suggestions

P2

  • snprintf 固定 256 字节缓冲区可能截断长 trace_id @ rtp_llm/cpp/engine_base/stream/GenerateStream.h:466
    • 建议:使用 std::string 拼接或 fmt::format 代替固定缓冲区 snprintf,避免截断风险。如果坚持用固定缓冲区(为性能),至少对 trace_id 做长度截断并在截断时追加省略标记如 '...'。
  • request_finished 日志使用 %ld 打印 size_t 类型 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:289
    • 建议:将 %ld 改为 %zu(size_t 的正确格式符),或 static_cast 显式转换。虽然在 LP64 平台上正值不会出错,但不匹配的格式符是 UB。
  • FIFOScheduler 中 request_activated 日志代码块完全重复 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:258
    • 建议:提取为私有辅助函数,如 logStreamActivated(const GenerateStreamPtr& s),减少代码重复和 I-cache 占用。
  • backpressure park 时 predictor-based early dispatch 路径的候选列表分配被跳过,行为一致但注释过时 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FixedWindowBatcherAlgorithm.java:29
    • 建议:删除第 29 行 '
    • No inflight-batch backpressure check.
    • ',因为该 PR 已经为 fixed_window 添加了 backpressure 检查。
  • [未确认-降级] WorkerBatcher 线程死亡后队列请求永久滞留 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:42
    • 建议:方案一:在 PrefillEndpoint 或 EndpointRegistry 中加 watchdog 定期检查 workerThread.isAlive(),死亡后重建 batcher。方案二:将 waitForNonEmpty 改为 peek+sleep 模式避免 take-put 窗口。至少应在 UncaughtExceptionHandler 中将 stopped 置 true 并清空队列(调用 handler.onOfferFailure 给每个请求回错误)。
  • WorkerBatcher.offer() 队列大小检查存在 TOCTOU 竞态 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:66
    • 建议:使用 AtomicInteger 计数器(offer 前 incrementAndGet,失败时 decrementAndGet),或使用信号量进行精确容量控制。对于调度场景,当前的近似限制可能可以接受——如果是,请在注释中说明是 best-effort。
  • EndpointRegistry 无删除/GC 机制,worker 轮换时内存无界增长 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/EndpointRegistry.java:15
    • 建议:在 ExpirationCleaner 或 EndpointRegistry.scheduledEviction() 中联动检查:当 worker 从 workerStatusMap 移除后,也从 registry 中移除对应 endpoint(先 close 再 remove)。或在 EndpointRegistry 增加 removeStale() 方法定期调用。
  • DecodeEndpoint 过期 eviction 回调为空操作,无日志无指标 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/DecodeEndpoint.java:26
    • 建议:在回调中添加 warn 级别日志(记录 requestId 和 kvTokens),并考虑增加 eviction counter 指标,便于生产环境发现 decode 确认丢失问题。
  • BatcherContext.sortedItems() 缓存在多线程下不安全 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/BatcherContext.java:97
    • 建议:将 sortedCache 也标记为 volatile,或将 sortedItems() 改为 synchronized。当前 headWaitMs/queueWaitMs 不调用 sortedItems(),所以实际风险受限——但如果未来算法变化引用了它就会触发。
  • LBStatusConsistencyService.SCHEDULED_EXECUTOR_SERVICE 是 static final 但 shutdown 是实例方法 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/LBStatusConsistencyService.java:29
    • 建议:将 SCHEDULED_EXECUTOR_SERVICE 改为实例字段(非 static),在构造函数中初始化。这样每个 Spring context 有独立的 executor,@PreDestroy 可以安全关闭。
  • MasterEngineSynchronizer 构造函数中 Long.parseLong 对非法环境变量无防护 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/synchronizer/MasterEngineSynchronizer.java:63
    • 建议:添加 NumberFormatException 处理或使用 Optional + 默认值模式。例如:try { Long.parseLong(...) } catch (NumberFormatException e) { Logger.warn(...); defaultValue; }
  • BatchItem.hashCode()/equals() 包含 volatile mutable 字段 sortKey @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/BatchItem.java:115
    • 建议:从 equals()/hashCode() 中移除 sortKey 字段。使用 requestId() 或 (ctx, future) 作为身份标识即可,sortKey 仅用于排序。
  • DefaultBatchDispatcher.shutdownNow() 丢弃排队任务导致 future 未完成 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/DefaultBatchDispatcher.java:69
    • 建议:在 shutdown() 中获取 shutdownNow() 的返回值,解析其中的 BatchItem 并对每个调用 callback.onFailure(),或改用 graceful shutdown(先 shutdown() 再 awaitTermination)。
  • CostBasedDecodeStrategy.buildServerStatus() 中 reserve() 成功后异常不回滚 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedDecodeStrategy.java:258
    • 建议:在 catch 块中添加 optimalEndpoint.release(requestId) 以回滚资源预留。
  • WorkerBatcher.waitForNonEmpty() take-put 模式短暂移除队头元素 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:131
    • 建议:考虑使用 queue.peek() 配合短暂 sleep 循环等待非空,避免修改队列状态;或者文档标注 headSortKey()/queueSize() 读取可能短暂不一致。
  • 新增 isResourceAvailable(WorkerEndpoint) 覆盖方法缺少单测 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/resource/DecodeResourceMeasureTest.java:41
    • 建议:增加测试用例:用 WorkerEndpoint 类型参数(实际传入 DecodeEndpoint 实例)调用 isResourceAvailable,验证 instanceof 分发正确;再传入非 DecodeEndpoint 类型的 WorkerEndpoint 验证 fallback 到 super 逻辑。PrefillResourceMeasureTest 同理。
  • FixedWindowBatcherAlgorithm 新增 backpressure gate 缺少测试 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/FixedWindowBatcherAlgorithmTest.java:19
    • 建议:在 FixedWindowBatcherAlgorithmTest 中增加 processQueue 测试:构造 mock BatcherContext 使 getFlexlbBatchFixedMaxInflightBatches() 返回 1,prefillEp().getInflightBatchCount() 返回 1,验证 processQueue 不调用 dispatch 即返回。
  • isNonBatchPath 语义变更(DIRECT → !BATCH)缺少测试 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/strategy/CostBasedPrefillStrategyTest.java:280
    • 建议:增加测试用例:分别设置 BalanceContext.scheduleMode 为 AUTO、DIRECT、BATCH,验证 AUTO 和 DIRECT 走 non-batch path(执行 inflight reservation),BATCH 不走 non-batch path。
  • DefaultRouterTest.setUp() line 95 用 vitLoadBalancer 覆盖 COST_BASED_PREFILL 注册 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultRouterTest.java:95
    • 建议:如果 VIT 需要独立 strategy enum,应使用正确的 enum 值注册;否则把 93-96 行的全局 register 调用全部移除(反射覆写已足够)。
  • FixedWindowBatcherAlgorithmTest 时间断言容差仅 5ms,CI 易 flaky @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/FixedWindowBatcherAlgorithmTest.java:36
    • 建议:放宽容差到 ±20ms(assertTrue(waitMs >= 80 && waitMs <= 110)),或改用受控时钟来消除时间依赖。
  • FlexlbBatchSchedulerTest.batcher_rejects_when_queue_full 缺少错误码断言 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/FlexlbBatchSchedulerTest.java:346
    • 建议:添加 assertEquals(StrategyErrorType.QUEUE_FULL.getErrorCode(), response.getCode()) 确保拒绝原因正确。
  • FlexlbBatchSchedulerTest.submit_rejects_when_global_inflight_limit_reached 第一个 future 未被 await 或断言 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/FlexlbBatchSchedulerTest.java:324
    • 建议:将返回值赋给变量,在 releaseBlock.countDown() 后 await 并断言 isSuccess(),确保第一个请求正常完成。
  • DefaultRouterTest.should_log_error_when_master_request_is_null 断言过弱 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultRouterTest.java:353
    • 建议:补充 assertFalse(response.isSuccess()) 并验证错误码,确保 null request 被正确拒绝。
  • RouteServiceTest 使用 .block() 无超时,响应式管道挂起将导致测试无限阻塞 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/service/RouteServiceTest.java:61
    • 建议:改用 .block(Duration.ofSeconds(5)) 或 StepVerifier.create(...).expectNext(...).verifyComplete() 加超时保护。
  • DefaultRouterTest 反射覆写私有字段 + fail() 吞异常堆栈 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultRouterTest.java:134
    • 建议:改为 fail('Failed to mock LoadBalanceStrategyFactory', e) 保留异常链,或考虑通过构造函数注入 loadBalancerMap 避免反射。
  • GrpcWorkerStatusRunner consecutiveFailures 迁移到 WorkerStatus 后缺少失败路径测试 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/sync/runner/GrpcWorkerStatusCheckRunnerTest.java:30
    • 建议:新增测试:mock engineGrpcService 抛异常,验证 workerStatus.getConsecutiveFailures() 递增;连续失败 >= 3 次后验证 endpoint 被移除或 worker 被标记不可用。
  • 用户配置的线程池大小无上限校验 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:808
    • 建议:极端大值(如 --prefill_enqueue_pool_size 1000000)会尝试创建百万线程,在 pool->start() 失败时通过 RTP_LLM_CHECK_WITH_INFO 崩溃,错误信息不明确。建议加上限 clamp(如 max 1024)或在 C++ 侧打印 WARNING 并 clamp 到合理范围,提供清晰的运维反馈。
  • RoleType 字符串验证使用 getattr 会抛 AttributeError 而非 ValueError @ rtp_llm/config/generate_config.py:60
    • 建议:用 try/except AttributeError 包裹 getattr 调用,或检查 hasattr 后再取值,保持和其他分支一致抛 ValueError。不过其他地方 (worker_status.py:82) 也用了同样的 getattr 模式,所以这是项目惯例。
  • decode_step_begin 日志在 model forward 之后才打印 @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:339
    • 建议:如果 decode_step_begin 意图记录 decode 步骤开始,应该移到 model_->forward() 调用之前(与 prefill_batch_begin 对称)。目前位置在 forward 之后,日志时间不能反映 decode 开始时间。
  • publishNormalDeviceState 每个 stream 触发 O(B) 个 CUDA kernel launch @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:724
    • 建议:对 B 个 decode streams,可以用一次 batched torch::arange 或 torch::add(seq_lens_batched, 1) 替代 B 个独立标量 CUDA kernel launch。在 batch=256 时,per-stream add+to 意味着 512 次无用的 kernel dispatch overhead(约数 us/次,累计可达百 us 级)。考虑将所有 stream 的 seq_len 组成一个 [B] 形状 tensor 做一次 add。
  • prepareGrpcNormalDeviceState 为每个 pending stream 做 torch::full CUDA 分配 @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:639
    • 建议:每个 pending gRPC stream 首次 decode 时分配 2 个 scalar CUDA tensor。对于 PD decode 角色批量接收大量 stream 时,大量小 tensor 分配会碎片化 CUDA allocator。考虑复用 pre-allocated 的 scalar tensor 或使用 pinned_host+async copy。
  • RpcCpuTpBroadcaster::broadcast 在每次调用时复制 peer_tp_ranks_ 和 group_key_ @ rtp_llm/cpp/distribute/RpcCpuTpBroadcaster.cc:167
    • 建议:broadcast() 在热路径中每次调用都复制 peer_tp_ranks_ vector 和 group_key_ string。这些值在 initialize() 后不变。可以在 initialize() 后使用 shared_ptr<const vector> 或直接用 const 引用(initialized 后不会 reset),避免每次 broadcast 的堆分配。
  • dispatchOutputAsync 中 allStreams() 被多次调用,每次创建新 list @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:748
    • 建议:allStreams() 每次调用都复制 decode_streams_ 并拼接 context_streams_,涉及 shared_ptr 引用计数。在 dispatchOutputAsync 路径中,stream_groups_copy 已经是值拷贝,allStreams() 再多调一次是浪费。建议只调一次并传递引用/const ref,或让 StreamGroups 缓存 allStreams 结果。
  • asyncDebugEnabled() 每次调用都 getenv + 构造 std::string @ rtp_llm/cpp/normal_engine/NormalOutputDispatcher.cc:17
    • 建议:虽然只在 error path 调用,但 getenv + string 构造比 static bool 缓存模式多出不必要的开销。其他 env flag(如 useStreamAsync)已经用了 static const bool + lambda 一次性读取的模式,建议统一。
  • request_arrival 日志 prompt_len 对非 prompt 请求(如 chat/messages)始终为 0 @ rtp_llm/frontend/frontend_server.py:265
    • 建议:对于 OpenAI Chat 格式请求,prompt 字段不存在。建议同时检查 messages 字段长度,或使用更通用的 input_len 计算。
  • flexlbBatchQueueMaxSize 从 64 大幅提高到 1024,无内存/延迟告警 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:359
    • 建议:16 倍增加默认队列深度。在高并发场景下可能导致请求堆积、内存增长和尾延迟恶化。建议添加日志或指标监控队列深度,或在文档中明确说明调整原因和预期影响。
  • FixedWindowBatcherAlgorithm backpressure 使用 1ms sleep 可能导致 CPU 空转 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FixedWindowBatcherAlgorithm.java:80
    • 建议:持续高压时 1ms sleep 循环等同于忙等。建议使用 exponential backoff(如从 1ms 增到 10ms),或使用 CountDownLatch/Condition 通知机制,避免 CPU 浪费。
  • getParsedSloBuckets 存在 TOCTOU 竞态:并发 setCostSloBuckets 可导致写入过期的解析结果 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:478
    • 建议:costSloBuckets 非 volatile,setCostSloBuckets 和 getParsedSloBuckets 之间存在竞态窗口:解析线程读取旧 costSloBuckets 后,setCostSloBuckets 置空 parsedSloBuckets,解析线程再写回过期结果。如需线程安全,可将解析逻辑放在 setCostSloBuckets 中(eager parsing),或对两个字段加 synchronized。
  • parseValue 拼接 JSON 字符串未转义,含特殊字符的枚举值会导致解析异常 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:197
    • 建议:若 value 含引号或反斜杠,拼接出的 JSON 不合法。建议使用 Jackson ObjectMapper.readValue(value, targetType) 或先用 fromName/valueOf 直接匹配枚举名,而非构造 JSON 字符串。当前由外层 try-catch 兜底不会崩溃,但错误信息难以定位。
  • Logger.logbackLogger() 硬转型 logback Logger,非 logback 环境下 ClassCastException @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/util/Logger.java:83
    • 建议:如果测试或其他环境使用非 logback 的 SLF4J 绑定(如 SimpleLogger),此处会抛 ClassCastException。可加 instanceof 检查并优雅降级,或在文档中明确标注 logback 为必须依赖。
  • setPrefillCoefficients 静默吞掉 NumberFormatException,无日志输出 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:340
    • 建议:catch 块内添加 log.warn("Failed to parse prefill coefficients: {}", csv, e),否则运维无法感知配置解析失败,排查代价模型异常时缺少关键线索。
  • applyEnvironmentOverrides 通过反射直接设置字段,绕过自定义 setter 的副作用 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:86
    • 建议:改用 setter 方法调用(通过 PropertyDescriptor 或显式构造 setter method name),或在 isSupportedType 中排除有自定义 setter 的字段(如 costSloBuckets)。当前通过 COST_SLO_BUCKETS 环境变量设置值时不会触发 parsedSloBuckets 缓存失效。
  • ExpirationCleaner 清理过期 worker 未同步清理 EndpointRegistry 中的 endpoint @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/schedule/ExpirationCleaner.java:36
    • 建议:在 doClean 中删除 worker 后, 通过 EndpointRegistry 回调关闭对应的 endpoint (endpoint.close()), 或让 EndpointRegistry 订阅 worker 状态变更事件.
  • PrefillEndpointTest 缺少 @AfterEach 关闭 batcher 线程 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/endpoint/PrefillEndpointTest.java:35
    • 建议:添加 @AfterEach void tearDown() { if (endpoint != null) endpoint.close(); }

Checklist ✅ (56 items passed)

Strengths

  • 在 FIFOScheduler 的 activated/finished 关键路径添加了结构化 access log,覆盖 prefill/decode 角色区分,对生产排障和全链路追踪有显著价值
  • streamLogTag 从 .cc 实现移到 .h 内联,减少了虚函数调用开销,适合日志热路径
  • PD 分离线程池大小参数化(prefill_enqueue/worker_lambda/slot_pool_size),支持运维动态调优而非硬编码
  • 线程池参数默认值 0 回退到公式计算,负值也安全回退,避免配置错误导致崩溃
  • 修复了 streamLogTag() 的 ODR 违规(先加 inline 再删 .cc 中的重复定义)
  • backpressure gate 默认关闭 (default 0),保持了 fixed_window 原有的 always-dispatch 行为,向后兼容
  • 将原来的 getFlexlbBatchMaxInflightBatchesPerWorker 拆分为 slo/fixed_window 独立配置,避免两种算法互相干扰

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/0 · P1/12 · P2/22 · P3/3

Blocking Issues

P1

  • w4a8_int4_per_channel_quant_weight.py 缺少 import logging,NameError 掩盖 ImportError @ rtp_llm/model_loader/w4a8_int4_per_channel_quant_weight.py:31
    • 建议:在文件顶部添加 import logging。
  • cutlass_moe.py get_cutlass_batched_moe_mm_data 为 None 时无守卫直接调用 @ rtp_llm/models_py/modules/factory/fused_moe/impl/cuda/executors/cutlass_moe.py:455
    • 建议:在 check_conditions 中添加 checker.check(get_cutlass_batched_moe_mm_data is not None)。
  • cutlass_w4a8_moe.py compute_reorder_stride/w4a8_group_gemm_ptpc 为 None 时 init 直接崩溃 @ rtp_llm/models_py/modules/factory/fused_moe/impl/cuda/executors/cutlass_w4a8_moe.py:95
    • 建议:在 check_conditions 中检查 compute_reorder_stride is not None and w4a8_group_gemm_ptpc is not None。
  • PDSepConfig pickle 向后不兼容:旧 tuple 无法反序列化 @ rtp_llm/cpp/pybind/ConfigInit.cc:1756
    • 建议:使用 t.size() >= N 渐进解析,缺失字段填默认值(同 KVCacheConfig 做法)。
  • FlexlbGrpcForwarder: channels.remove 后未 shutdown channel,TCP 连接泄漏 @ rtp_llm/flexlb/flexlb-api/src/main/java/org/flexlb/httpserver/FlexlbGrpcForwarder.java:63
    • 建议:remove 前获取引用并 shutdownNow:ManagedChannel ch = channels.remove(key); if (ch != null) ch.shutdownNow();
  • master_client_test _CaptureMasterClient 签名与 gRPC 实际接口不匹配,全部测试必定失败 @ rtp_llm/server/test/master_client_test.py:45
    • 建议:重写签名为 (addr, request_pb, timeout_s, request_id),断言改为 proto 属性访问。
  • backend_rpc_server_visitor_test _FakeMasterClient 签名缺少参数且参数名不匹配 @ rtp_llm/server/test/backend_rpc_server_visitor_test.py:53
    • 建议:签名改为 (block_cache_keys, cache_key_block_size, input, request_id, input_pb=None),断言 key 改为 input_pb。visitor 补设 _page_rr_route_cache_keys=False。
  • PrefillEndpointTest.calibrateHandlesTaskWithNoBatchId 断言错误,测试必定失败 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/endpoint/PrefillEndpointTest.java:176
    • 建议:将 commitBatch batchId 改为与 requestId 不同的值(如 999L),使 remove(1L) 不命中。
  • EngineSyncRunner.ensureEndpoint() 对 VIT/PDFUSION 抛异常导致整个角色同步中断 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/EngineSyncRunner.java:238
    • 建议:VIT/PDFUSION 不需要 endpoint,ensureEndpoint 应跳过而非抛异常。移除 else throw 或在循环内 try-catch 隔离。
  • detached 线程捕获 this 指针,析构时 use-after-free 风险 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1863
    • 建议:改用 shared_from_this() 捕获,或在析构中通过 atomic flag + 等待确保 detached 线程退出。
  • completeCancelled 绕过 rollbackOnce 导致 decode slot 双重释放 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FlexlbBatchScheduler.java:373
    • 建议:在 entry==null 分支也使用 AtomicBoolean 保护 rollback,或在 BatchItem 上添加 rollbackOnce 标志。
  • cacheVersion long→int 截断导致版本号溢出后缓存更新永久阻塞 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/service/grpc/EngineGrpcService.java:91
    • 建议:溢出前 clamp 到 Integer.MAX_VALUE,或将 proto 字段升级为 int64。

Non-blocking Suggestions

P2

  • setPrefillCoefficients 静默吞掉 NumberFormatException,无日志 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:340
    • 建议:添加 log.warn("Failed to parse prefill coefficients: {}", csv, e)。
  • Logger 硬耦合 logback,非 logback 后端 ClassCastException @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/util/Logger.java:83
    • 建议:添加 instanceof 检查,非 logback 后端降级为 no-op。
  • GenerateConfig 删除 force_batch 字段可能导致旧请求反序列化失败 @ rtp_llm/config/generate_config.py:179
    • 建议:配置 extra='ignore' 或保留 force_batch 为 deprecated alias。
  • fused_rope_kvcache_op.py prefill/decode 函数 None 时无守卫直接调用 @ rtp_llm/ops/fused_rope_kvcache_op.py:98
    • 建议:在 init 或 prepare 中 assert 非 None,或在模块级 import 失败时直接 raise。
  • cache_key_block_size 参数传入但未使用,成为死参数 @ rtp_llm/server/master_client.py:200
    • 建议:如不需要则移除参数;如需要则在 proto 中添加对应字段。
  • sortedCacheDirty 在 offer 路径不被标脏,新请求可能被 pick 遗漏 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:166
    • 建议:在 offer() 中 queue.add 后调用 ctx.markDirty(),确保新入队元素可见。
  • onOfferFailure error 可为 null 但实现未做空指针防护 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FlexlbBatchScheduler.java:251
    • 建议:加 error != null ? error.getMessage() : "queue full" 防护,或修正接口文档。
  • CostBasedPrefillStrategy 计算了 cacheHit 但未纳入评分,cache-aware 选择失效 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:97
    • 建议:将 cacheHit 纳入 computeScore,使 cache-aware routing 真正生效。
  • WorkerBatcher.waitForNonEmpty take/put 模式在 shutdown 时可丢失请求 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/WorkerBatcher.java:131
    • 建议:改用 queue.peek()+LockSupport.park 避免出队,或在 InterruptedException catch 中 drain 剩余。
  • ResponseBufferWriter 队列满时静默丢弃消息,无日志/指标 @ rtp_llm/cpp/model_rpc/ResponseBuffer.cc:124
    • 建议:添加 drop counter 或 log warning,便于排查不完整输出问题。
  • canAdmitUnit 对 group 跳过 token 预算检查 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:200
    • 建议:对 group 也做 max_batch_tokens_size_ 保护,或添加注释说明上游已校验。
  • LBStatusConsistencyService static final 线程池 shutdown 后不可恢复 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/LBStatusConsistencyService.java:29
    • 建议:改为实例字段,在构造函数中创建、@PreDestroy 中关闭。
  • RpcCpuTpBroadcaster inbox_ 无大小上限,异常情况可能 OOM @ rtp_llm/cpp/distribute/RpcCpuTpBroadcaster.cc:281
    • 建议:增加 inbox_ 大小上限检查(如 1000),超限时 log 警告。
  • applyEnvironmentOverrides 通过 field.set 绕过自定义 setter @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/ConfigService.java:89
    • 建议:对有自定义 setter 的字段调用 setter 而非 field.set。
  • CostBasedPrefillStrategy 每个 endpoint 重复调用 realWaitTimeMs() 3-4 次 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:142
    • 建议:在 select() 入口缓存每个 endpoint 的 realWaitTimeMs() 值,后续复用。
  • DecodeEndpoint.inflightKvReserved() 每次调用遍历 ConcurrentHashMap @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/DecodeEndpoint.java:97
    • 建议:维护 AtomicLong inflightKvReservedSum,在 reserve()/release()/calibrate() 时增减。
  • InflightEvictor onEvict 回调导致 O(B²) 复杂度 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/PrefillEndpoint.java:41
    • 建议:将 onEvict 回调改为空操作,仅在 evictExpiredBatches() 结束后调一次 refresh。
  • EnqueueBatch 热路径同步调用 gc(),持锁遍历所有 entries @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1601
    • 建议:移除热路径 gc() 调用,依赖后台 GC 线程;或仅在 registry size 超阈值时触发。
  • DefaultBatchDispatcherTest PrefillEndpoint WorkerBatcher 线程未关闭 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultBatchDispatcherTest.java:240
    • 建议:添加 @AfterEach 调用 PrefillEndpoint.close() 和 dispatcher.shutdown()。
  • 多个 Strategy 测试类使用静态 MODEL_ROLE_WORKER_STATUS 但部分无 @beforeeach clear @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/strategy/CostBasedDecodeStrategyTest.java:88
    • 建议:在所有使用 MODEL_ROLE_WORKER_STATUS 的测试类 @beforeeach 中统一 clear() 所有静态 map。
  • DefaultRouterTest 静态工厂重复注册导致状态污染 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/DefaultRouterTest.java:95
    • 建议:为 VIT 使用独立策略枚举值,或在 @AfterEach 中重置 LoadBalanceStrategyFactory。
  • BatchInflight.lastSeenMs 的 read-compare-write 非原子更新 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/endpoint/BatchInflight.java:76
    • 建议:改用 AtomicLong.updateAndGet(old -> Math.max(old, statusMs)),与 progressBaseMs 处理保持一致。

P3

  • RtpLLMOp.cc 重复 include autil/EnvUtil.h @ rtp_llm/cpp/pybind/multi_gpu_gpt/RtpLLMOp.cc:1
    • 建议:删除重复的 #include。
  • PoolMetrics active/queued/completed 成员从未递增 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.h:410
    • 建议:在 pushTask 前后维护计数器,或移除未使用字段。
  • ipPort.split(":") 不支持 IPv6 地址 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/GrpcWorkerStatusRunner.java:54
    • 建议:使用 lastIndexOf(':') 或统一 host:port 解析工具函数。

Checklist ✅ (56 items passed)

Strengths

  • WorkerStatus 重构移除 ConcurrentHashMap localTaskMap 及其 CAS 操作,简化为引擎上报权威值,降低热路径 CPU 开销并减少状态管理 bug
  • ConfigService 新增 try-catch 防止配置解析失败导致服务无法启动,提升鲁棒性
  • RoleType/LoadBalanceStrategyEnum 添加 @JsonCreator/@jsonvalue 支持多种格式反序列化
  • Logger 重构为直接委托 SLF4J/logback,消除自定义 globalLogLevel 双重过滤
  • 对 rtp_kernel 依赖做了 try/except 包裹,使模块在缺少 kernel 时仍能被 import

…ster routing

Add MasterConfig.disable_domain_fallback field with env var
MASTER_DISABLE_DOMAIN_FALLBACK to prevent fallback to VipServer
domain routing when master is unavailable or not configured.
When enabled, requests will fail with ROUTE_ERROR instead of
silently degrading to domain-based service discovery.
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1138

Status: BLOCKING

Summary: P0/1 · P1/15 · P2/31 · P3/2

Blocking Issues

P0

  • RpcCpuTpBroadcaster 使用 DP 过滤后地址列表,DP>1 时越界崩溃 @ rtp_llm/cpp/normal_engine/NormalExecutor.cc:115
    • 建议:改用 params.runtime_config.all_worker_grpc_addrs,按 world_rank 索引,大小为 world_size。

P1

  • cutlass_moe.py 导入失败设为 None,运行时 TypeError 崩溃 @ rtp_llm/models_py/modules/factory/fused_moe/impl/cuda/executors/cutlass_moe.py:455
    • 建议:在 check_conditions 或 init 中检查 is None 并抛出清晰异常,或保持原始 raise 而非设为 None。
  • setPrefillCoefficients 解析失败时系数部分更新不一致 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:326
    • 建议:先解析到临时变量,全部成功后再赋值给实例字段。catch 块添加 warn 日志。
  • RoleType.fromString 遇未知值抛异常,导致整条 WorkerStatusResponse 反序列化失败 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/dao/route/RoleType.java:50
    • 建议:用 try-catch 包裹 valueOf,未知值返回 null(与 value==null 一致)。
  • finish_lambda 双重 std::move 导致使用已移动对象 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1860
    • 建议:确认 pushTask 返回 ERROR 时 lambda 是否已被消费。若已消费需在 pushTask 前保留副本。
  • waitStreamBeforeRun 使用 static 局部变量捕获成员配置 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:877
    • 建议:改为普通局部变量 int64_t,避免多实例共享和溢出。
  • getEngineScheduleInfo 在 shared_lock 下写入 entry.task_info.phase,数据竞争 @ rtp_llm/cpp/model_rpc/RpcServerRuntimeMeta.h:33
    • 建议:将 derivePhase 结果写入本地 copy:auto task_copy = entry.task_info; task_copy.phase = derivePhase(...);
  • PDSepConfig pickle 反序列化不兼容旧格式 @ rtp_llm/cpp/pybind/ConfigInit.cc:1756
    • 建议:改为 if (t.size() < 20) 并对位置 20-25 用条件解析:t.size() > idx ? t[idx].cast<...>() : default_value。
  • FlexlbGrpcForwarder channel 错误时 remove 未 shutdown,gRPC 资源泄漏 @ rtp_llm/flexlb/flexlb-api/src/main/java/org/flexlb/httpserver/FlexlbGrpcForwarder.java:63
    • 建议:ManagedChannel removed = channels.remove(channelKey); if (removed != null) removed.shutdownNow();
  • w4a8 量化文件缺少 logging import,ImportError 路径 NameError @ rtp_llm/model_loader/w4a8_int4_per_channel_quant_weight.py:31
    • 建议:在文件头部添加 import logging。
  • ExpirationCleaner 会立即驱逐新创建的 WorkerStatus @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/schedule/ExpirationCleaner.java:45
    • 建议:跳过 statusLastUpdateTime < 0 的条目,或初始化为当前时间。
  • getOrCreateWorkerStatus 非原子操作导致 Endpoint 持有孤儿 WorkerStatus @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/EngineSyncRunner.java:204
    • 建议:改为 workerStatuses.computeIfAbsent(key, k -> new WorkerStatus(...));
  • DefaultBatchDispatcher 回调循环无 per-item 错误隔离 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/DefaultBatchDispatcher.java:128
    • 建议:对每个 callback 调用包裹 try-catch,异常时 log+continue。
  • FlexlbBatchScheduler.submit() 异常路径未 rollback decode KV 预留 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FlexlbBatchScheduler.java:137
    • 建议:在 catch 块增加 if (routeResponse != null) rollback(routeResponse)。
  • EngineSyncRunner.ensureEndpoint() 对 VIT/PDFUSION 角色抛异常终止整轮同步 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/runner/EngineSyncRunner.java:239
    • 建议:改为 log.warn + return,跳过不支持的角色类型。
  • server_config_setup.py DeepEP 日志两行合并为一行 @ rtp_llm/config/server_config_setup.py:109
    • 建议:在 USE_DEEPEP_LOW_LATENCY 行末添加 \n。

Non-blocking Suggestions

P2

  • fused_rope_kvcache_op ImportError 后函数为 None,调用时 TypeError @ rtp_llm/ops/fused_rope_kvcache_op.py:53
    • 建议:在 init 中检查关键函数 is None 时 raise ImportError。
  • cutlass_w4a8_moe compute_reorder_stride=None 在 init 中无保护直接调用 @ rtp_llm/models_py/modules/factory/fused_moe/impl/cuda/executors/cutlass_w4a8_moe.py:95
    • 建议:在 check_conditions 中检查 compute_reorder_stride is not None。
  • master_group_args --master_disable_domain_fallback 使用 type=bool 解析异常 @ rtp_llm/server/server_args/master_group_args.py:50
    • 建议:改为 action='store_true' 或使用自定义 str2bool 函数。
  • is_backend_service_ready 硬编码返回 True,健康检查失去意义 @ rtp_llm/server/backend_rpc_server_visitor.py:555
    • 建议:考虑检查 FlexLB master 连通性,或在 disable_domain_fallback=True 时才无条件返回 True。
  • MasterClient gRPC channel 无主动回收,地址变更后旧 channel 泄漏 @ rtp_llm/server/master_client.py:133
    • 建议:在 _get_channel 中添加简单 LRU 或检查 target 是否与当前 master_addr 一致。
  • trans_input 在每次请求中被重复调用两次 @ rtp_llm/server/backend_rpc_server_visitor.py:219
    • 建议:缓存 trans_input 结果,或在 enqueued_by_master=True 时跳过完整 trans_input。
  • 测试 mock 签名与实际 gRPC 接口完全不匹配 @ rtp_llm/server/test/master_client_test.py:46
    • 建议:重写测试 mock 匹配新的 gRPC/protobuf 接口签名。
  • DecodeRpcServer::initThreadPool 条件检查逻辑反转 @ rtp_llm/cpp/model_rpc/DecodeRpcServer.cc:74
    • 建议:将条件改为 if (resource_.workers.empty()) { return; }。
  • EnqueueBatch 热路径尾部调用 gc() 持全局锁遍历 map @ rtp_llm/cpp/model_rpc/PrefillRpcServer.cc:1604
    • 建议:移除此处同步 GC,依赖后台 GC 线程。
  • ResponseBufferWriter::Write 队列满时静默丢弃旧数据 @ rtp_llm/cpp/model_rpc/ResponseBuffer.cc:125
    • 建议:至少添加 WARNING 日志记录被丢弃的 response。
  • RpcServerRuntimeMeta::trimFinishedStreams 定义但从未调用 @ rtp_llm/cpp/model_rpc/RpcServerRuntimeMeta.h:132
    • 建议:在适当位置调用 trimFinishedStreams(),或删除死代码。
  • PoolMetrics 的 active/queued/completed 始终为 0 @ rtp_llm/cpp/model_rpc/PrefillRpcServer.h:19
    • 建议:在 pushTask/async 前后递增/递减 active/queued/completed。
  • admitWaitingUnits 移除错误 unit 时未调用 finish() @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:218
    • 建议:在 erase 前对 unit 中每个 stream 调用 finish()。
  • canAdmitUnit 对 group unit 跳过 token 预算检查 @ rtp_llm/cpp/engine_base/schedulers/FIFOScheduler.cc:200
    • 建议:若 group 由 FlexLB 保证预算则添加注释说明假设,否则也做 token 预算校验。
  • RpcCpuTpBroadcaster inbox 无界增长风险 @ rtp_llm/cpp/distribute/RpcCpuTpBroadcaster.cc:281
    • 建议:对 inbox_ 添加 TTL 清理或大小限制。
  • get_backend_role_addrs 的 cache_key_block_size 参数为死代码 @ rtp_llm/server/master_client.py:200
    • 建议:若为预留字段则传入 PB;若不需要则从签名移除。
  • RoleAddr.validate_role 对无效字符串抛 AttributeError 而非 ValueError @ rtp_llm/config/generate_config.py:60
    • 建议:改为 getattr(..., None) 并在 None 时 raise ValueError。
  • Logger 硬转型 logback Logger,非 logback 环境 ClassCastException @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/util/Logger.java:83
    • 建议:用 instanceof 检查,不匹配时降级为 no-op。
  • getParsedSloBuckets 静默忽略格式错误的 SLO bucket 条目 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/config/FlexlbConfig.java:488
    • 建议:在 catch 块添加 warn 日志。
  • updateFromResponse 多字段非原子写入无锁保护 @ rtp_llm/flexlb/flexlb-common/src/main/java/org/flexlb/dao/master/WorkerStatus.java:54
    • 建议:在 updateFromResponse 中使用 lock,或将频繁读取字段标为 volatile。
  • FlexlbGrpcForwarder channels computeIfAbsent + remove 存在竞态 @ rtp_llm/flexlb/flexlb-api/src/main/java/org/flexlb/httpserver/FlexlbGrpcForwarder.java:54
    • 建议:使用 channels.compute() 原子操作。
  • CostBasedDecodeStrategy.weightedRandomSelection 浮点精度可能跳过最后候选 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedDecodeStrategy.java:239
    • 建议:循环后直接返回最后一个元素。
  • BatchItem.hashCode() 包含可变 sortKey 字段 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/BatchItem.java:115
    • 建议:从 equals/hashCode 中移除 sortKey。
  • LBStatusConsistencyService static final executor shutdown 后无法重建 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/consistency/LBStatusConsistencyService.java:29
    • 建议:改为实例字段而非 static final。
  • 静态线程池在构造函数中赋值,多次实例化泄漏且核心线程数过大 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/sync/synchronizer/MasterEngineSynchronizer.java:170
    • 建议:改为 static initializer 确保单例,并根据 worker 数量动态计算核心线程数。
  • SloBudgetBatcherAlgorithm.pickWithinIncrementalBudget 中 O(n²) 列表分配 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/SloBudgetBatcherAlgorithm.java:166
    • 建议:直接在 picked 上 add/remove 原地试探,或让 predictor 支持增量计算。
  • CostBasedDecodeStrategy realKvUsed() 单次路由决策中重复调用 3-4 次 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedDecodeStrategy.java:135
    • 建议:在 select() 入口缓存每个 endpoint 的 realKvUsed 值。
  • CostBasedPrefillStrategy realWaitTimeMs() 单次决策中重复调用约 4N 次 @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/strategy/CostBasedPrefillStrategy.java:142
    • 建议:在 doSelect() 开头缓存每个 endpoint 的值。
  • FlexlbBatchScheduler.flushItems() ArrayList.contains() 导致 O(n²) @ rtp_llm/flexlb/flexlb-sync/src/main/java/org/flexlb/balance/scheduler/FlexlbBatchScheduler.java:276
    • 建议:改用 HashSet 实现 O(1) 查找。
  • FixedWindowBatcherAlgorithmTest 时间容差窗口过窄,flaky 风险 @ rtp_llm/flexlb/flexlb-sync/src/test/java/org/flexlb/balance/scheduler/FixedWindowBatcherAlgorithmTest.java:36
    • 建议:放宽容差或注入 Clock 接口消除壁钟依赖。
  • RtpLLMOp.cc 重复 include autil/EnvUtil.h @ rtp_llm/cpp/pybind/multi_gpu_gpt/RtpLLMOp.cc:9
    • 建议:删除第 9 行的重复 #include。

P3

  • GatherBatchScheduler 重复实现 gatherCountStreams/gatherFlattenRunning @ rtp_llm/cpp/engine_base/schedulers/GatherBatchScheduler.h:144
    • 建议:将 countStreams 改为 protected,删除重复实现。
  • GenerateStream::prepare() 中 wait_time_us_ 未被设置 @ rtp_llm/cpp/engine_base/stream/GenerateStream.cc:619
    • 建议:在 prepare() 的 lock guard 内设置 wait_time_us_。

Checklist Violations (6 fail / 56 total)

General Principles Checklist

  • [6.1] Software Engineering — DRY:重复非平凡逻辑被抽取或显式复用 → issue GatherBatchScheduler 重复实现 gatherCountStreams/gatherFlattenRunning
    GatherBatchScheduler 重复实现了 FIFOScheduler::countStreams。影响较小。
  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue FlexlbBatchScheduler.submit() 异常路径未 rollback decode KV 预留
    FlexlbBatchScheduler.submit() 异常路径未 rollback decode KV 预留。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue setPrefillCoefficients 解析失败时系数部分更新不一致
    多处静默吞异常:getParsedSloBuckets、setPrefillCoefficients catch 无日志。
  • [6.1] Architecture — 可观测性:日志/指标/超时可操作、非噪声 → issue PoolMetrics 的 active/queued/completed 始终为 0
    PoolMetrics active/queued/completed 始终为 0,ResponseBuffer 丢弃数据无日志。
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → issue PDSepConfig pickle 反序列化不兼容旧格式
    PDSepConfig pickle 新格式(26元素)不兼容旧格式(20元素),滚动升级时中断。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 测试 mock 签名与实际 gRPC 接口完全不匹配
    FlexLB 新增 Java 模块有单测,但 EngineSyncRunnerTest 和 GrpcWorkerStatusCheckRunnerTest 缺少异常路径覆盖。

Strengths

  • HTTP→gRPC 迁移整体设计合理:channel 缓存、error 处理、slave fallback 逻辑完整
  • ServerStatus.validate_role 新增 int/RoleType/str 三种类型支持,兼容 gRPC proto int enum 和旧 JSON str
  • ConfigService 对 FLEXLB_CONFIG 和 TrafficPolicy 解析都加了 try-catch 兜底,解析失败使用默认配置
  • WorkerStatus 简化为 updateFromResponse 一次性更新引擎权威值,大幅降低并发竞态风险
  • MasterClient 从 HTTP/aiohttp 迁移到 gRPC 减少序列化开销,是合理的架构演进
  • health_check_path 参数化设计良好,支持自定义健康检查路径并做了边界处理

wzy-99 added 2 commits June 29, 2026 23:00
… configurable default schedule mode

1. CostBasedPrefillStrategy.isNonBatchPath(): remove scheduleMode check
   - When flexlbBatchEnabled=true, only check the global flag; schedule mode
     is no longer a condition for creating request-ID placeholder entries.
   - This eliminates ~93% redundant inflightBatches entries that were mixing
     batch-ID and request-ID keys in the same ConcurrentHashMap.

2. PrefillEndpoint.calibrate(): log inflightBatches.size() instead of keySet()
   - Avoid dumping 250+ IDs per calibrate log line when placeholder entries exist.

3. FlexlbConfig: add DEFAULT_SCHEDULE_MODE env var support
   - New field defaultScheduleMode (String, default 'AUTO'), overridable via
     DEFAULT_SCHEDULE_MODE env var (AUTO/BATCH/DIRECT).

4. FlexlbServiceImpl: use config-driven default schedule mode
   - Replace hardcoded toScheduleMode() with resolveScheduleMode() that falls
     back to config.getDefaultScheduleModeEnum() when proto sends default value.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants