Skip to content

wakeup output queue when stream is finished or errored#1106

Open
yyhclimacool wants to merge 2 commits into
alibaba:mainfrom
yyhclimacool:bugfix/wakeup_when_finished
Open

wakeup output queue when stream is finished or errored#1106
yyhclimacool wants to merge 2 commits into
alibaba:mainfrom
yyhclimacool:bugfix/wakeup_when_finished

Conversation

@yyhclimacool

Copy link
Copy Markdown
Contributor

No description provided.

@yyhclimacool yyhclimacool requested a review from LLLLKKKK as a code owner June 16, 2026 07:18
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1106

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/0 · P3/0

Blocking Issues

P1

  • Error 事件路径没有唤醒阻塞的输出消费者 @ rtp_llm/cpp/engine_base/stream/GenerateStream.cc:513
    • 建议:把 Error 事件的唤醒统一放进 reportEvent()/reportEventWithoutLock() 的 Error 分支,或让所有错误入口走 reportError(),并补充阻塞 nextOutput 被 Error 唤醒的测试。

Checklist Violations (5 fail / 55 total)

RTP-LLM Checklist

  • [B] 正确性与逻辑 — 逻辑错误、off-by-one、null/zero 检查 → issue Error 事件路径没有唤醒阻塞的输出消费者
    Error 唤醒逻辑漏掉 reportEvent(Error) 入口,错误状态已设置但输出等待方可能仍阻塞。
  • [B] 正确性与逻辑 — 边界 case(空输入、单元素、最大值) → issue Error 事件路径没有唤醒阻塞的输出消费者
    输出队列为空且 checkTimeout() 当场置错时,代码仍会执行 waitNotEmpty()。
  • [B] 正确性与逻辑 — 状态标志有完整 set/reset 生命周期 → issue Error 事件路径没有唤醒阻塞的输出消费者
    Error event 写入 error_info 后缺少对应输出队列唤醒动作,错误生命周期没有完整传播到消费者。
  • [C] 线程安全与并发 — TOCTOU 竞态 → issue Error 事件路径没有唤醒阻塞的输出消费者
    nextOutput() 在循环条件判断后调用 checkTimeout() 改变错误状态,但没有在 waitNotEmpty() 前重新检查,形成检查与等待之间的状态窗口。
  • [I] 代码质量 — 同一功能用统一工具函数 → issue Error 事件路径没有唤醒阻塞的输出消费者
    同样表示错误事件的 reportError() 与 reportEvent(Error) 唤醒行为不一致。

Strengths

  • 通过虚接口封装 NormalGenerateStream 的输出队列唤醒,基类不直接依赖具体队列类型。
  • FINISHED 状态转移后新增输出队列唤醒,覆盖普通完成路径的等待退出场景。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1106

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/0

lgtm ready to ci

Non-blocking Suggestions

P2

  • 回归测试失败时可能遗留 joinable 线程 @ rtp_llm/cpp/engine_base/stream/test/GenerateStreamTest.cc:136
    • 建议:使用 join guard/std::jthread,或改成 EXPECT 后统一 join;同时用同步原语确认 consumer 已进入等待。

Checklist Violations (2 fail / 59 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 回归测试失败时可能遗留 joinable 线程
    GenerateStreamTest.cc:126-140 的新增并发回归测试在 join 前使用 ASSERT;断言失败时会遗留 joinable 线程并触发 std::terminate。

RTP-LLM Checklist

  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 回归测试失败时可能遗留 joinable 线程
    GenerateStreamTest.cc:126-140 的关键回归测试在 join 前使用 ASSERT,失败时会触发 std::terminate,需先修正测试收尾。

Strengths

  • 错误事件入口统一收敛到 reportEventWithoutLock,并为阻塞的 nextOutput() 增加了回归测试。

@yyhclimacool yyhclimacool force-pushed the bugfix/wakeup_when_finished branch from 0191b22 to 8209a9e Compare June 16, 2026 09:22
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1106

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/1 · P3/0

Blocking Issues

P1

  • 报错唤醒路径仍无同步读取错误状态 @ rtp_llm/cpp/normal_engine/NormalGenerateStream.cc:6
    • 建议:让错误状态读写使用同一同步边界:nextOutput/hasError 加锁读取,或将错误码改为 atomic 并明确 wakeup 的内存序,同时补并发/TSan 覆盖。

Non-blocking Suggestions

P2

  • 超时路径仍可能等待输出队列 1 秒 @ rtp_llm/cpp/normal_engine/NormalGenerateStream.cc:7
    • 建议:在 checkTimeout() 后立即重新检查 hasError()/isFinished(),或把等待改为带 stream 状态谓词的 wait,并补充 timeout 路径回归测试。

Checklist Violations (7 fail / 59 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue 报错唤醒路径仍无同步读取错误状态
    NormalGenerateStream.cc:6/10 无锁读取 error_info,而 reportEventWithoutLock 在锁内写入后唤醒队列,失败状态跨线程可见性不完整。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue 超时路径仍可能等待输出队列 1 秒
    NormalGenerateStream.cc:7 的 checkTimeout() 可设置 Error,但随后仍直接 waitNotEmpty,当前消费者不会立即 fail-fast 返回错误。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 超时路径仍可能等待输出队列 1 秒
    GenerateStreamTest 覆盖外部 reportError 唤醒,但未覆盖 nextOutput 内部 checkTimeout 设置 Error 后不应继续 wait 的路径。

RTP-LLM Checklist

  • [B] 正确性与逻辑 — 逻辑错误、off-by-one、null/zero 检查 → issue 超时路径仍可能等待输出队列 1 秒
    NormalGenerateStream.cc:7 的 checkTimeout() 后缺少状态复查,导致 Error 已设置时仍进入队列等待。
  • [B] 正确性与逻辑 — 状态标志有完整 set/reset 生命周期 → issue 超时路径仍可能等待输出队列 1 秒
    Error 状态可由当前 nextOutput() 的 checkTimeout() 设置,但循环没有在 wait 前消费该状态。
  • [C] 线程安全与并发 — TOCTOU 竞态 → issue 报错唤醒路径仍无同步读取错误状态
    reportEventWithoutLock 写 error_info 后 wakeup,nextOutput 被唤醒后无锁读取 hasError,检查与读取没有同一同步边界。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 超时路径仍可能等待输出队列 1 秒
    回归测试只覆盖外部线程 reportError,未覆盖 checkTimeout 在 nextOutput 内部设置 Error 的遗漏路径。

Strengths

  • Error 事件入口收敛到 reportEventWithoutLock,避免 reportError/reportEvent 分散维护唤醒逻辑。
  • 新增回归测试覆盖阻塞 nextOutput 被外部 reportError 唤醒的主路径。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1106

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/1 · P3/0

Blocking Issues

P1

  • waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞 @ rtp_llm/cpp/engine_base/stream/GenerateStream.cc:659
    • 建议:将等待谓词扩展为 NeedRemoteGenerate || hasError() || getStatus()==FINISHED,并补充 Error/FINISHED 唤醒回归测试。

Non-blocking Suggestions

P2

  • 新增阻塞测试依赖固定耗时阈值 @ rtp_llm/cpp/engine_base/stream/test/GenerateStreamTest.cc:156
    • 建议:改为受控同步点或轮询 future.wait_for 到明确状态,避免固定 sleep 和耗时阈值驱动断言。

Checklist Violations (9 fail / 59 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    Error/FINISHED 会 notify_all,但 waitForRemoteGenerate 的谓词未把错误或结束视为可返回条件,PD-sep 失败路径状态不变量不完整。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    nextOutput 错误路径已 fail-fast,但 waitForRemoteGenerate 收到错误通知后仍继续等待,错误语义未贯穿所有等待消费者。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    新增测试覆盖 nextOutput 的 Error/timeout,但未覆盖同一 cv 上的 waitForRemoteGenerate Error/FINISHED 唤醒路径。_
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → checklist-only
    单个草稿指出客户端取消、timeout_ms<=0/长超时边界未覆盖;多数未将其确认为合并 issue,按悲观 checklist 合并保留为提示项。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    PD-sep 的 waitForRemoteGenerate 属于远程生成等待路径,但新增测试只覆盖本地 nextOutput 消费者。

RTP-LLM Checklist

  • [B] 正确性与逻辑 — 逻辑错误、off-by-one、null/zero 检查 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    waitForRemoteGenerate 的等待谓词遗漏错误/结束状态,导致条件已经无法满足时仍继续等待。
  • [B] 正确性与逻辑 — Busy-wait 循环有终止条件 → checklist-only
    单个草稿指出 nextOutput 的周期 wait_for 对外部客户端取消缺少直接终止条件;该点未作为合并 issue,按悲观 checklist 合并保留。
  • [C] 线程安全与并发 — TOCTOU 竞态 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    通知发生在 Error/FINISHED 状态变化后,但 waitForRemoteGenerate 的等待谓词只看 NeedRemoteGenerate,状态变化与等待条件不一致。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue waitForRemoteGenerate 在错误/结束唤醒后仍会继续阻塞
    新增回归测试未覆盖 waitForRemoteGenerate 的 Error/FINISHED 唤醒,导致本次 notify_all 变更的一条消费者路径未被验证。

Strengths

  • nextOutput 改为基于 stream 状态谓词的 condition_variable 等待,主输出路径比原先依赖 1s 队列等待更明确。
  • 新增了 reportError 与 timeout 的回归测试,覆盖了本次修复的主要 nextOutput 场景。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants