[Feature] Add Full Async Colocate Example#1557
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 64a500f32f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| while len(data) < target_size: | ||
| completed = worker.drain_completed() | ||
| for group in completed: | ||
| # Skip aborted groups | ||
| try: | ||
| if any(s.status == Sample.Status.ABORTED for s in group): | ||
| try: | ||
| data_buffer.add_samples([group]) | ||
| except Exception: | ||
| pass | ||
| continue | ||
| except Exception: | ||
| pass | ||
|
|
||
| data.append(group) | ||
| if len(data) >= target_size: | ||
| break |
There was a problem hiding this comment.
Drop or return extra groups after target_size reached
When drain_completed() returns more groups than needed, the loop breaks as soon as target_size is reached and the remaining groups are left in the worker’s output_queue. On the next rollout, those leftover groups are consumed before any fresh generation, meaning they were generated with the previous policy weights (before the train/offload cycle) and can be used as if they belong to the new rollout_id. This introduces off‑policy data leakage between steps. Consider explicitly clearing or returning any extra groups in the queue after reaching target_size (or draining the queue during pause()), similar to how the standard rollout path aborts/clears pending work per step.
Useful? React with 👍 / 👎.
No description provided.