Skip to content

feat: supports dp rank scheduling and scheduling with the minimun number of…#1007

Open
jiashaokun-1 wants to merge 1 commit intolightseekorg:mainfrom
jiashaokun-1:main
Open

feat: supports dp rank scheduling and scheduling with the minimun number of…#1007
jiashaokun-1 wants to merge 1 commit intolightseekorg:mainfrom
jiashaokun-1:main

Conversation

@jiashaokun-1
Copy link
Copy Markdown

@jiashaokun-1 jiashaokun-1 commented Apr 1, 2026

Motivation

sglang#19268 The routed_dp_rank and disagg_prefill_dp_rank support external DP dispatch w/ PD-disaggregation mode. This modification supports this function in the gateway and schedules requests to the dprank with the minimum number of tokens.

In the current PD-Disaggregation mode. the prefill instances only support the round_robin scheduling policy. Under this scheduling policy, which DP group a request is routed to is determined by the result of bootstrap_room mod dpsize. This can lead to load imbalance among DP groups, and this imbalance becomes more pronounced when the input requests are variable-length sequences.
Based on the above situation, I have added a new feature to select the DP group with the lightest load to process requests, thereby achieving and supporting DP load balance for PD-Disaggregation mode. The current load is measured by the number of tokens, which can be adjusted as needed in the future.The enabling or disabling of this feature is controlled by the parameter dp_minimum_tokens_scheduler.

Key Changes

1. Load Management

A new DPLoadManager struct has been added to manage loads, providing three methods: update_dp_loads ,get_lowest_dp_load , and load_increment.

#[derive(Debug, Default)]
pub struct WorkerLoadManager {
    // <worker, <dp_rank, loads>>
    dp_cached_loads: RwLock<HashMap<String, HashMap<isize, isize>>>,
}
  1. The update_dp_loads method updates the load. After periodically collecting load data, the LoadMonitor calls this method to perform the update.
  2. The get_lowest_dp_load method takes a worker as input and returns the dp_rank with the lowest load among the workers.
  3. The load_increment method is used to add an increment to the load of a specific dp_rank for a worker. This is done to prevent all requests from being scheduled to the same DP group during the interval between two load reports.

2. New interfaces are added.

The DPRankLoadPolicy and MinimumTokensPolicy APIs are added. The worker is passed in, and the dpRank with the minimum number of tokens is selected.

#[async_trait]
pub trait DPRankLoadPolicy: Send + Sync + Debug {
    async fn select_dp_rank(&self, worker: &dyn Worker, text_str: isize) -> Option<isize>;
}

impl DPRankLoadPolicy for MinimumTokensPolicy {
    async fn select_dp_rank(&self, worker: &dyn Worker, text_str: isize) -> Option<isize> {
        if let Some(worker_load) = self.worker_load_manager.as_ref() {
            let lowest_tokens_dp_rank = worker_load.get_lowest_dp_load(worker);
            if let Some(dp_rank) = lowest_tokens_dp_rank {
                worker_load.load_increment(worker, dp_rank, text_str);
            }
            return lowest_tokens_dp_rank;
        }
        None
    }
}

###3. Configuration Parameters

  1. A new configuration parameter dp-minimum-tokens-scheduler has been added to enable scheduling of the minimum load DP group when declaring the parameter.
  2. A new configuration parameter worker-load-check-interval has been added to specify the interval for load collection. Previously, the load collection interval reused the configuration of worker-startup-check-interval. Now, this new configuration item is separate from the startup check.

Benchmarking and Profiling

Comparing the performance gains of several scheduling strategies on variable-length datasets before and after enabling dp-minimum-tokens-scheduler, the Mean TPOP performance improved by approximately 9%, and the Mean TTFT (with max_out_len=1 set to eliminate the impact of decoding on prefill) performance improved by about 15%. The specific data is as follows:

Policy Mean TTFT(ms) max_out_len=1 Mean TPOT (ms)
benckmark enable dp minimum tokens Performance benckmark enable dp minimum tokense Performance
Random 2631 2249 +14.52% 53.41 48.15 +9.85%
Round_robin 2646 2164 +18.22% 55.42 49.02 +11.55%
Cache_aware 2601 2216 +14.80% 52.67 48.48 +7.96%

router

python -m sglang_router.launch_router   --pd-disaggregation   --prefill-policy cache_aware --dp-minimum-tokens-scheduler --worker-load-check-interval 1   --prefill grpc://141.61.29.204:6699   --decode grpc://127.0.0.1:7699   --model-path /home/weights/DeepSeek-R1_w8a8   --tokenizer-path /home/weights/DeepSeek-R1_w8a8   --host 0.0.0.0 --port 4567

prefill

python3 -m sglang.launch_server --model-path ${MODEL_PATH} --tp-size 1 --dp-size 2 --base-gpu-id 8 --disaggregation-mode prefill --trust-remote-code --attention-backend ascend --disaggregation-transfer-backend ascend --device npu --quantization modelslim --watchdog-timeout 9000 --host 141.61.29.204 --grpc-mode --metrics-port 10000 --port 6699 --cuda-graph-bs 8 16 24 28 32 36 --mem-fraction-static 0.71 --max-running-requests 144 --chunked-prefill-size -1  --dtype bfloat16 --load-balance-method round_robin --disable-overlap-schedule --enable-metrics

decode

python3 -m sglang.launch_server --model-path ${MODEL_PATH} --tp-size 1 --dp-size 2 --base-gpu-id 12 --disaggregation-mode decode --trust-remote-code --attention-backend ascend --disaggregation-transfer-backend ascend --device npu --quantization modelslim --watchdog-timeout 9000 --host 141.61.29.204 --grpc-mode --metrics-port 10001 --port 7699 --cuda-graph-bs 8 16 24 28 32 36 --mem-fraction-static 0.71 --max-running-requests 144 --chunked-prefill-size -1  --dtype bfloat16 --load-balance-method round_robin --disable-overlap-schedule --load-balance-method round_robin --prefill-round-robin-balance --enable-metrics

benchmark
The model evaluation tool I used is AISBench. Click the link below to learn more: AISBench. Variable-length datasets are prone to load imbalance scenarios. During testing, I constructed a variable-length dataset. When using the benchmark's --dataset-name random option to specify the dataset, the input prompts were being split. Therefore, I used AISBench to replace the original dataset with a variable-length dataset during the test runs.

ais_bench --models vllm_api_stream_chat --datasets gsm8k_gen_0_shot_cot_str_perf  --debug --summarizer default_perf --mode perf

╒══════════════════════════╤═════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤══════╕
│ Performance Parameters   │ Stage   │ Average         │ Min             │ Max             │ Median          │ P75             │ P90             │ P99             │  N   │
╞══════════════════════════╪═════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪══════╡
│ E2EL                     │ total   │ 55044.4573 ms   │ 11143.2514 ms   │ 195472.4726 ms  │ 35057.9889 ms   │ 69464.8019 ms   │ 132295.1279 ms  │ 185509.4833 ms  │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ TTFT                     │ total   │ 893.5725 ms     │ 348.2403 ms     │ 1549.0416 ms    │ 901.9457 ms     │ 1084.3782 ms    │ 1215.7292 ms    │ 1469.2704 ms    │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ TPOT                     │ total   │ 48.4854 ms      │ 33.7401 ms      │ 63.8094 ms      │ 48.7747 ms      │ 52.4151 ms      │ 55.4097 ms      │ 59.9007 ms      │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ ITL                      │ total   │ 114.1014 ms     │ 0.0068 ms       │ 594.2452 ms     │ 115.7806 ms     │ 119.678 ms      │ 123.1585 ms     │ 251.8668 ms     │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ InputTokens              │ total   │ 2560.5243       │ 1481.0          │ 4120.0          │ 2216.0          │ 3402.25         │ 3943.3          │ 4107.49         │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ OutputTokens             │ total   │ 1159.8058       │ 187.0           │ 4068.0          │ 715.5           │ 1358.0          │ 2769.4          │ 4068.0          │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ OutputTokenThroughput    │ total   │ 20.3425 token/s │ 14.7171 token/s │ 28.1505 token/s │ 19.9678 token/s │ 22.0362 token/s │ 23.6102 token/s │ 26.6782 token/s │ 1000 │
╘══════════════════════════╧═════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧══════╛
╒══════════════════════════╤═════════╤═══════════════════╕
│ Common Metric            │ Stage   │ Value             │
╞══════════════════════════╪═════════╪═══════════════════╡
│ Benchmark Duration       │ total   │ 392829.9277 ms    │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Requests           │ total   │ 1000              │
├──────────────────────────┼─────────┼───────────────────┤
│ Failed Requests          │ total   │ 0                 │
├──────────────────────────┼─────────┼───────────────────┤
│ Success Requests         │ total   │ 1000              │
├──────────────────────────┼─────────┼───────────────────┤
│ Concurrency              │ total   │ 86.5959           │
├──────────────────────────┼─────────┼───────────────────┤
│ Max Concurrency          │ total   │ 270               │
├──────────────────────────┼─────────┼───────────────────┤
│ Request Throughput       │ total   │ 2.5456 req/s      │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Input Tokens       │ total   │ 1582404           │
├──────────────────────────┼─────────┼───────────────────┤
│ Prefill Token Throughput │ total   │ 2865.4913 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Total generated tokens   │ total   │ 716760            │
├──────────────────────────┼─────────┼───────────────────┤
│ Input Token Throughput   │ total   │ 4028.2165 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Output Token Throughput  │ total   │ 1824.6064 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Token Throughput   │ total   │ 5852.8229 token/s │
╘══════════════════════════╧═════════╧═══════════════════╛
Checklist
  • cargo +nightly fmt passes
  • cargo clippy --all-targets --all-features -- -D warnings passes
  • (Optional) Documentation updated
  • (Optional) Please join us on Slack #sig-smg to discuss, review, and merge PRs

Summary by CodeRabbit

  • New Features

    • Opt-in "minimum tokens" scheduler to better balance token usage across data-parallel groups.
    • New CLI flag and Python constructor/config option to enable the scheduler.
    • Per-worker DP-group load tracking and DP-rank-aware routing; routers can select and annotate DP ranks on requests.
  • Documentation

    • Constructor and CLI docs updated to describe the new scheduler option.

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Repo admins can enable using credits for code reviews in their settings.

@github-actions github-actions bot added python-bindings Python bindings changes model-gateway Model gateway crate changes labels Apr 1, 2026
@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Apr 1, 2026

Hi @jiashaokun-1, the DCO sign-off check has failed. All commits must include a Signed-off-by line.

To fix existing commits:

# Sign off the last N commits (replace N with the number of unsigned commits)
git rebase HEAD~N --signoff
git push --force-with-lease

To sign off future commits automatically:

  • Use git commit -s every time, or
  • VSCode: enable Git: Always Sign Off in Settings
  • PyCharm: enable Sign-off commit in the Commit tool window

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 1, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an optional DP "minimum tokens" scheduler: new Router config/CLI flag, WorkerLoadManager for per-worker DP-rank loads, DPRankLoadPolicy abstraction and MinimumTokensPolicy, PolicyRegistry wiring, LoadMonitor integration, and router changes to select and inject DP ranks into dual-dispatch requests.

Changes

Cohort / File(s) Summary
Configuration & CLI
bindings/python/src/smg/router.py, bindings/python/src/smg/router_args.py, model_gateway/src/config/types.rs, model_gateway/src/config/builder.rs, model_gateway/src/main.rs
Add dp_minimum_tokens_scheduler option/flag across Python bindings, RouterConfig types, builder API, and CLI.
Python FFI
bindings/python/src/lib.rs
Expose dp_minimum_tokens_scheduler: bool on Python Router and forward into Rust RouterConfig on construction.
Load Tracking Core
model_gateway/src/core/worker_load.rs, model_gateway/src/core/mod.rs
New WorkerLoadManager with RwLock-backed cache of worker_url -> (dp_rank -> load), update and select+increment semantics; exported from core.
Load Monitor Integration
model_gateway/src/core/worker_manager.rs, crates/protocols/src/worker.rs
LoadMonitor now holds worker_load_manager; polling collects dp-rank maps (WorkerLoadResponse::dp_rank_loads) and updates manager; WorkerLoadResponse gains dp_rank_loads() accessor.
Policy Framework
model_gateway/src/policies/mod.rs, model_gateway/src/policies/registry.rs, model_gateway/src/policies/dp_min_token.rs
Introduce DPRankLoadPolicy trait, registry support for DP-rank policies, and MinimumTokensPolicy that uses WorkerLoadManager to pick/increment lowest-load DP rank.
Router & Dispatch
model_gateway/src/routers/factory.rs, model_gateway/src/routers/http/pd_router.rs
Factory registers MinimumTokensPolicy when enabled. PD router creates separate prefill/decode JSON payloads, queries DP-rank policy for selections, injects routed DP-rank fields, and dispatches distinct payloads for prefill and decode.
Protocols
crates/protocols/src/worker.rs
Add dp_rank_loads(&self) -> HashMap<isize,isize> to expose per-scheduler dp_rank -> num_used_tokens mapping from WorkerLoadResponse.

Sequence Diagram(s)

sequenceDiagram
    participant Worker
    participant LoadMonitor
    participant WorkerLoadManager
    participant PolicyRegistry

    Worker->>LoadMonitor: report WorkerLoadResponse (incl. dp_rank_loads)
    LoadMonitor->>WorkerLoadManager: update_dp_loads(group_dp_loads)
    WorkerLoadManager->>WorkerLoadManager: merge per-worker dp-rank maps
    LoadMonitor->>PolicyRegistry: get_dp_rank_policy()
    PolicyRegistry-->>LoadMonitor: MinimumTokensPolicy (if set)
Loading
sequenceDiagram
    participant Client
    participant PDRouter
    participant DPRankPolicy
    participant WorkerLoadManager
    participant WorkerPrefill as Worker(Prefill)
    participant WorkerDecode as Worker(Decode)

    Client->>PDRouter: execute_dual_dispatch(context, request)
    PDRouter->>DPRankPolicy: select_dp_rank(prefill_worker, estimated_cost)
    DPRankPolicy->>WorkerLoadManager: select_and_increment_lowest_dp_load(prefill_worker, cost)
    WorkerLoadManager-->>DPRankPolicy: prefill_rank
    DPRankPolicy-->>PDRouter: prefill_rank
    PDRouter->>DPRankPolicy: select_dp_rank(decode_worker, estimated_cost)
    DPRankPolicy->>WorkerLoadManager: select_and_increment_lowest_dp_load(decode_worker, cost)
    WorkerLoadManager-->>DPRankPolicy: decode_rank
    DPRankPolicy-->>PDRouter: decode_rank
    PDRouter->>WorkerPrefill: POST prefill payload (with routed_dp_rank)
    PDRouter->>WorkerDecode: POST decode payload (with routed_dp_rank + disagg_prefill_dp_rank)
    WorkerPrefill-->>Client: prefill response
    WorkerDecode-->>Client: decode response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested labels

protocols, benchmarks, tests

Suggested reviewers

  • CatherineSue
  • key4ng
  • slin1237

Poem

🐇 I count the tokens, small and bright,
I nudge the lightest rank to flight,
I bump a load, then watch it swoon,
Prefill, decode—balanced tune,
Hoppy routing into night!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning The title is incomplete and cuts off mid-sentence ('minimun number of…'), making it unclear and grammatically incorrect; it also misspells 'minimum' as 'minimun'. Complete the title and correct the spelling: 'feat: support DP rank scheduling with minimum token distribution' or similar; ensure it is a complete, grammatically correct sentence.
Docstring Coverage ⚠️ Warning Docstring coverage is 62.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a minimum tokens scheduler for data-parallel (DP) groups, enabling the gateway to route requests to the DP rank with the lowest load. Key additions include a WorkerLoadManager for tracking per-rank token usage, a MinimumTokensPolicy for rank selection, and logic within the PDRouter to inject routed DP ranks into outgoing requests. Review feedback identifies a critical need to update request text capture logic to ensure cost estimation works correctly, points out a potential memory leak in the load manager's cache, and suggests several naming and documentation improvements to enhance code clarity and consistency.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/config/types.rs`:
- Line 29: The new required config field dp_minimum_tokens_scheduler can break
deserialization for existing configs; add serde default behavior so missing
values deserialize safely by annotating the field dp_minimum_tokens_scheduler in
the struct in types.rs with #[serde(default)] (or #[serde(default =
"default_dp_minimum_tokens_scheduler")] plus a small fn returning false if you
prefer an explicit default), ensuring the struct still derives Deserialize so
older config files without dp_minimum_tokens_scheduler load with the expected
default (false).

In `@model_gateway/src/policies/dp_min_token.rs`:
- Around line 24-31: The selection and reservation must be made atomic: modify
the worker load manager to add a new method (e.g.,
take_lowest_dp_load_and_increment or take_and_increment_lowest) that acquires
the write lock, finds the minimum DP rank, increments its cached load, and
returns the rank in one operation; then update
MinimumTokensPolicy::select_dp_rank to call this new method instead of calling
worker_load.get_lowest_dp_load() followed by worker_load.load_increment(),
removing the two separate lock acquisitions and ensuring a single lock-protected
reservation.

In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 354-363: The current logic derives text_length from
context.request_text bytes, causing no-op bumps when that field is empty and
miscounting tokens; instead compute token_count in the same unit as the DP
minimum-token gauges and pass that to dp_rank_policy.select_dp_rank. Concretely,
in the block that uses policy_registry.get_dp_rank_policy() and calls
select_dp_rank(prefill.as_ref(), text_length) / select_dp_rank(decode.as_ref(),
text_length), replace the bytes-based context.request_text.len() with a
token-aware value: prefer any existing token/count fields from prefill or decode
if present (e.g., prefill.request_tokens or decode.request_tokens), otherwise
derive tokens from the request text using the same tokenizer/tokenizer helper
used elsewhere in the scheduler (so units match dp_minimum_tokens_scheduler),
and default to 0; then use that token_count when calling select_dp_rank.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c478e36c-6c65-43ec-b087-905f310e6640

📥 Commits

Reviewing files that changed from the base of the PR and between 709e1f6 and c266ec0.

📒 Files selected for processing (15)
  • bindings/python/src/lib.rs
  • bindings/python/src/smg/router.py
  • bindings/python/src/smg/router_args.py
  • crates/protocols/src/worker.rs
  • model_gateway/src/config/builder.rs
  • model_gateway/src/config/types.rs
  • model_gateway/src/core/mod.rs
  • model_gateway/src/core/worker_load.rs
  • model_gateway/src/core/worker_manager.rs
  • model_gateway/src/main.rs
  • model_gateway/src/policies/dp_min_token.rs
  • model_gateway/src/policies/mod.rs
  • model_gateway/src/policies/registry.rs
  • model_gateway/src/routers/factory.rs
  • model_gateway/src/routers/http/pd_router.rs

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
model_gateway/src/core/worker_manager.rs (1)

494-519: ⚠️ Potential issue | 🟠 Major

Prune the DP-rank cache when a worker group is removed.

At Line 518, the new DP-rank cache only ever gets refreshed. on_group_removed() still clears the watch-map entries only, so a worker that disappears and later comes back under the same URL can inherit the previous instance's cached DP-rank loads until the next successful scrape. Add a matching removal path on the load manager when groups are torn down.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_manager.rs` around lines 494 - 519, The DP-rank
cache is only refreshed in the scrape path (via
worker_load_manager.update_dp_loads(&group_dp_loads)) but not cleared when a
group is torn down; add a removal path in the group removal flow: implement a
method on WorkerLoadManager (e.g., remove_group_dp_loads or
clear_dp_loads_for_urls) that deletes DP-rank entries for the URLs being removed
and call it from on_group_removed() when the watch-map entries for the group are
cleared so a returning worker at the same URL won't inherit stale DP-rank loads.
♻️ Duplicate comments (3)
model_gateway/src/routers/http/pd_router.rs (1)

361-370: ⚠️ Potential issue | 🟠 Major

Don't base the DP-rank reservation on optional byte length.

At Line 363, context.request_text is only populated when the prefill/decode worker policies asked for text. If dp_minimum_tokens_scheduler is enabled with a policy like round-robin, this path feeds 0 into the reservation on every request, so the cached DP load never bumps between scrapes and repeated requests keep landing on the same rank. Even when present, String::len() measures bytes, not the token unit used by the worker gauges.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/http/pd_router.rs` around lines 361 - 370, The
DP-rank reservation is incorrectly using context.request_text.byte length
(context.request_text and String::len()) which is optional and measures bytes,
causing zero reservations and sticky ranks; change the logic in the block around
dp_rank_policy_opt/select_dp_rank to derive the length from the actual
prefill/decode token counts (or any provided token-length field) instead of
context.request_text, and convert to token units (not bytes) before calling
dp_rank_policy.select_dp_rank(prefill.as_ref(), ...) and
select_dp_rank(decode.as_ref(), ...); if no token info exists, fall back to a
safe default token count rather than 0 so dp_minimum_tokens_scheduler and
round-robin policies observe load.
model_gateway/src/policies/dp_min_token.rs (1)

25-31: ⚠️ Potential issue | 🟠 Major

Make selection and reservation atomic.

At Lines 27-29, the minimum-rank lookup and the cached-load bump happen in two separate calls. Concurrent requests can observe the same minimum before either increment lands, which recreates the burst hot-spotting this policy is supposed to prevent.

Suggested shape
 impl DPRankLoadPolicy for MinimumTokensPolicy {
-    fn select_dp_rank(&self, worker: &dyn Worker, text_str: isize) -> Option<isize> {
-        if let Some(worker_load) = self.worker_load_manager.as_ref() {
-            let lowest_tokens_dp_rank = worker_load.get_lowest_dp_load(worker);
-            if let Some(dp_rank) = lowest_tokens_dp_rank {
-                worker_load.load_increment(worker, dp_rank, text_str);
-            }
-            return lowest_tokens_dp_rank;
-        }
-        None
+    fn select_dp_rank(&self, worker: &dyn Worker, token_delta: isize) -> Option<isize> {
+        self.worker_load_manager
+            .as_ref()
+            .and_then(|manager| manager.take_lowest_dp_load_and_increment(worker, token_delta))
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/policies/dp_min_token.rs` around lines 25 - 31, The
selection in select_dp_rank is non-atomic: it calls
worker_load.get_lowest_dp_load(...) and then worker_load.load_increment(...),
allowing races; change this to a single atomic reservation call on the
worker_load_manager/worker_load (e.g., add and call a method like
reserve_lowest_dp_load(worker, text_str) or
get_and_increment_lowest_dp_load(worker, text_str)) that finds the lowest
dp_rank and increments its cached load in one synchronized operation, and
replace the existing pair of calls to get_lowest_dp_load and load_increment
inside select_dp_rank with that new atomic method.
model_gateway/src/config/types.rs (1)

29-29: ⚠️ Potential issue | 🟠 Major

Add serde default for backward-compatible deserialization.

RouterConfig derives Deserialize, but this new field is required unless present in input. Older config files missing this key can fail to load.

🔧 Proposed fix
-    pub dp_minimum_tokens_scheduler: bool,
+    #[serde(default)]
+    pub dp_minimum_tokens_scheduler: bool,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/config/types.rs` at line 29, RouterConfig's new field
dp_minimum_tokens_scheduler is not optional for Deserialize and breaks loading
older configs; mark the field with serde's default so missing keys deserialize
safely (e.g., add #[serde(default)] to the dp_minimum_tokens_scheduler field or
supply a named default fn and use #[serde(default =
"default_dp_minimum_tokens_scheduler")]) and ensure the default value (false) is
returned by that fn or the field's Rust default behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/protocols/src/worker.rs`:
- Around line 942-949: The dp_rank_loads function currently uses HashMap::insert
which overwrites previous entries for the same dp_rank and loses earlier
snapshots; change it to aggregate token counts per dp_rank by using the HashMap
entry API (or equivalent) to sum snapshot.num_used_tokens into the existing
value keyed by snapshot.dp_rank (cast to isize) instead of replacing it, so the
resulting HashMap<isize,isize> contains the total tokens per dp_rank.

In `@model_gateway/src/core/worker_load.rs`:
- Around line 25-27: The current read/write calls on self.dp_cached_loads
silently ignore PoisonError (e.g., the write() in the shown block and the read()
in get_lowest_dp_load and write() in load_increment), which leaves DP load state
unchanged when a lock is poisoned; change those to handle poisoned locks by
unwrapping the poisoned guard (use PoisonError::into_inner()) or otherwise
recovering the inner data, and log the poison event via the existing logger
before proceeding so the update/read still occurs; specifically update the
occurrences of self.dp_cached_loads.read() in get_lowest_dp_load and
self.dp_cached_loads.write() in load_increment and the shown block to recover
from PoisonError, log the error, and then continue with the inner guard to
perform the extend/reads.
- Around line 34-36: The selection currently uses .min_by_key(|&(_, load)| load)
which yields nondeterministic ties; change the key to a deterministic tuple that
includes the identifier so ties break stably (e.g. .min_by_key(|&(rand_id,
load)| (load, rand_id))). Update the closure that produces rand_id accordingly
so the call to min_by_key uses (load, rand_id) (or (load, dp_rank) if that is
the identifier) to ensure deterministic tie-breaking.

In `@model_gateway/src/routers/factory.rs`:
- Around line 121-122: Remove the unnecessary clone of router_config: instead of
creating `let config = ctx.router_config.clone()` just read the flag directly
from `ctx.router_config.dp_minimum_tokens_scheduler`; update the `if`
conditional to use `ctx.router_config.dp_minimum_tokens_scheduler` and remove
the unused `config` binding so we don't clone the config struct just to read a
boolean.

---

Outside diff comments:
In `@model_gateway/src/core/worker_manager.rs`:
- Around line 494-519: The DP-rank cache is only refreshed in the scrape path
(via worker_load_manager.update_dp_loads(&group_dp_loads)) but not cleared when
a group is torn down; add a removal path in the group removal flow: implement a
method on WorkerLoadManager (e.g., remove_group_dp_loads or
clear_dp_loads_for_urls) that deletes DP-rank entries for the URLs being removed
and call it from on_group_removed() when the watch-map entries for the group are
cleared so a returning worker at the same URL won't inherit stale DP-rank loads.

---

Duplicate comments:
In `@model_gateway/src/config/types.rs`:
- Line 29: RouterConfig's new field dp_minimum_tokens_scheduler is not optional
for Deserialize and breaks loading older configs; mark the field with serde's
default so missing keys deserialize safely (e.g., add #[serde(default)] to the
dp_minimum_tokens_scheduler field or supply a named default fn and use
#[serde(default = "default_dp_minimum_tokens_scheduler")]) and ensure the
default value (false) is returned by that fn or the field's Rust default
behavior.

In `@model_gateway/src/policies/dp_min_token.rs`:
- Around line 25-31: The selection in select_dp_rank is non-atomic: it calls
worker_load.get_lowest_dp_load(...) and then worker_load.load_increment(...),
allowing races; change this to a single atomic reservation call on the
worker_load_manager/worker_load (e.g., add and call a method like
reserve_lowest_dp_load(worker, text_str) or
get_and_increment_lowest_dp_load(worker, text_str)) that finds the lowest
dp_rank and increments its cached load in one synchronized operation, and
replace the existing pair of calls to get_lowest_dp_load and load_increment
inside select_dp_rank with that new atomic method.

In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 361-370: The DP-rank reservation is incorrectly using
context.request_text.byte length (context.request_text and String::len()) which
is optional and measures bytes, causing zero reservations and sticky ranks;
change the logic in the block around dp_rank_policy_opt/select_dp_rank to derive
the length from the actual prefill/decode token counts (or any provided
token-length field) instead of context.request_text, and convert to token units
(not bytes) before calling dp_rank_policy.select_dp_rank(prefill.as_ref(), ...)
and select_dp_rank(decode.as_ref(), ...); if no token info exists, fall back to
a safe default token count rather than 0 so dp_minimum_tokens_scheduler and
round-robin policies observe load.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 96d57302-3692-4df3-8297-b332f0dcf3f7

📥 Commits

Reviewing files that changed from the base of the PR and between c266ec0 and 243f593.

📒 Files selected for processing (15)
  • bindings/python/src/lib.rs
  • bindings/python/src/smg/router.py
  • bindings/python/src/smg/router_args.py
  • crates/protocols/src/worker.rs
  • model_gateway/src/config/builder.rs
  • model_gateway/src/config/types.rs
  • model_gateway/src/core/mod.rs
  • model_gateway/src/core/worker_load.rs
  • model_gateway/src/core/worker_manager.rs
  • model_gateway/src/main.rs
  • model_gateway/src/policies/dp_min_token.rs
  • model_gateway/src/policies/mod.rs
  • model_gateway/src/policies/registry.rs
  • model_gateway/src/routers/factory.rs
  • model_gateway/src/routers/http/pd_router.rs

Comment on lines +942 to +949
pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
let mut map = HashMap::new();
for snapshot in &self.loads {
map.insert(
snapshot.dp_rank as isize,
snapshot.num_used_tokens as isize,
);
}
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid overwriting duplicate DP-rank snapshots.

At Line 945, HashMap::insert drops earlier entries for the same dp_rank. If multiple snapshots carry the same rank (including default 0), load is undercounted and DP-rank selection becomes incorrect. Aggregate instead of overwrite.

🔧 Proposed fix
 pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
     let mut map = HashMap::new();
     for snapshot in &self.loads {
-        map.insert(
-            snapshot.dp_rank as isize,
-            snapshot.num_used_tokens as isize,
-        );
+        *map.entry(snapshot.dp_rank as isize).or_insert(0) +=
+            snapshot.num_used_tokens as isize;
     }
     map
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/protocols/src/worker.rs` around lines 942 - 949, The dp_rank_loads
function currently uses HashMap::insert which overwrites previous entries for
the same dp_rank and loses earlier snapshots; change it to aggregate token
counts per dp_rank by using the HashMap entry API (or equivalent) to sum
snapshot.num_used_tokens into the existing value keyed by snapshot.dp_rank (cast
to isize) instead of replacing it, so the resulting HashMap<isize,isize>
contains the total tokens per dp_rank.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ping1jing2, glad it makes sense! Here's the fix to apply in crates/protocols/src/worker.rs:

pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
    let mut map = HashMap::new();
    for snapshot in &self.loads {
        *map.entry(snapshot.dp_rank as isize).or_insert(0) +=
            snapshot.num_used_tokens as isize;
    }
    map
}

This ensures that if multiple SchedulerLoadSnapshot entries share the same dp_rank (e.g., the default 0), their num_used_tokens values are summed rather than the earlier snapshot being silently dropped. Without this, WorkerLoadManager::select_and_increment_lowest_dp_load could select a DP rank based on an undercount, leading to incorrect scheduling decisions.


🧠 Learnings used
Learnt from: CatherineSue
Repo: lightseekorg/smg PR: 836
File: model_gateway/src/core/worker_registry.rs:500-511
Timestamp: 2026-03-24T00:24:23.448Z
Learning: In repo lightseekorg/smg, file model_gateway/src/core/worker_registry.rs: In `register_or_replace()`, when `replace()` returns `false` (concurrent removal raced ahead of the lock), the method intentionally still returns the now-stale `existing_id` rather than retrying registration. This is by design: the mutation lock (`worker_mutation_locks`) ensures this window only opens when `remove()` fully completes before `replace()` acquires the lock, making it a brief transient state that the next K8s discovery cycle or registration call will self-heal. Do not flag returning `existing_id` after a failed `replace()` as a bug in this file.

Learnt from: key4ng
Repo: lightseekorg/smg PR: 867
File: tui/src/state.rs:412-448
Timestamp: 2026-03-23T20:27:21.425Z
Learning: In lightseekorg/smg, `tui/src/state.rs` (`poll_once`): the rate calculations for rps, tok/s, and worker_rps (using `s.prev_request_count`, `s.prev_input_tokens`, `s.prev_output_tokens`, `s.prev_worker_requests`) intentionally divide counter deltas by `interval_secs` rather than actual elapsed time. When `fetch_metrics()` fails for a tick, the previous counters are kept, which can produce a slightly inflated rate on the next successful scrape. This is an accepted trade-off: for a 20-point rolling sparkline dashboard, the visual impact is negligible and the added complexity of wall-clock tracking is not warranted. Do not flag this as an issue.

Learnt from: pallasathena92
Repo: lightseekorg/smg PR: 687
File: model_gateway/src/routers/openai/router.rs:1161-1164
Timestamp: 2026-03-11T01:21:39.783Z
Learning: In repo lightseekorg/smg, file model_gateway/src/routers/openai/router.rs: `select_worker_for_model` (which uses `find_best_worker_for_model` → `min_by_key(|w| w.load())`, i.e. least-load) is the single, uniform worker selection path used by all route handlers in `OpenAIRouter` (chat, responses, realtime session, client secret, transcription session, WebSocket, and WebRTC). This is an intentional design choice. Do not flag least-load selection as a policy bypass bug for any individual route — it applies equally to all 8+ call sites. Policy-based routing (round-robin, session affinity, etc.) is deferred to a separate PR that would change `find_best_worker_for_model` for all endpoints.

Learnt from: XinyueZhang369
Repo: lightseekorg/smg PR: 399
File: protocols/src/interactions.rs:505-509
Timestamp: 2026-02-19T03:08:50.192Z
Learning: In code reviews for Rust projects using the validator crate (v0.20.0), ensure that custom validation functions for numeric primitive types (e.g., f32, i32, u32, i16, etc.) accept the value by value, not by reference. Example: fn validate(value: f32) { ... }. The validator derive macro has a hardcoded list of numeric types that are passed by value, while all other types are passed by reference. Apply this guideline whenever validating numeric fields to align with the derive macro behavior.

Learnt from: slin1237
Repo: lightseekorg/smg PR: 489
File: model_gateway/src/core/token_bucket.rs:58-63
Timestamp: 2026-02-21T02:30:51.443Z
Learning: For lint-only/Clippy enforcement PRs in this repository, avoid introducing behavioral changes (e.g., new input validation or logic changes). Treat such PRs as non-functional changes and plan a separate follow-up issue/PR for hardening or behavior changes. This applies broadly to Rust files across the repo; during review, focus on lint/style corrections and clearly note any intentional exceptions. 

Learnt from: slin1237
Repo: lightseekorg/smg PR: 489
File: protocols/src/responses.rs:928-931
Timestamp: 2026-02-21T02:36:00.882Z
Learning: In Rust code across the repository, use the marker INVARIANT: to document assumptions in safe code. Reserve SAFETY: for explaining why unsafe blocks are sound. This improves clarity of invariants and safety reasoning. Example reference: protocols/src/responses.rs near validate_tool_choice_with_tools().

Learnt from: slin1237
Repo: lightseekorg/smg PR: 489
File: mesh/src/sync.rs:83-83
Timestamp: 2026-02-21T02:37:01.416Z
Learning: General Rust formatting rule: format! with implicit captures only supports simple identifiers, not full expressions like {state.model_id}. For cases where you want to interpolate a field or expression, bind the value first and interpolate the binding, e.g., let model_id = &state.model_id; and then use format!("policy:{}", model_id). In the specific file mesh/src/sync.rs, prefer format!("policy:{}", state.model_id) or bind to a local variable if you need named interpolation, to keep clarity and avoid unintended captures.

Learnt from: zhaowenzi
Repo: lightseekorg/smg PR: 807
File: model_gateway/src/middleware.rs:61-81
Timestamp: 2026-03-18T21:32:00.041Z
Learning: In Rust code using the http crate, HeaderMap::get() is effectively case-insensitive because HeaderName normalizes keys to lowercase on insertion and lookup. Do not require or perform explicit .to_lowercase() before HeaderMap::get() calls. Mark as not a concern for case-sensitivity in lookups; only consider normalization when inserting or comparing via HeaderName, not in lookups.

Learnt from: key4ng
Repo: lightseekorg/smg PR: 867
File: tui/src/app.rs:798-813
Timestamp: 2026-03-22T20:13:55.778Z
Learning: In this repo (lightseekorg/smg), treat the workspace `Cargo.toml`’s `package.rust-version` (MSRV) as the source of truth (e.g., `rust-version = "1.85"`). When reviewing Rust changes, do not flag usage of Rust language/library features that were stabilized on or before the MSRV (e.g., `Option::is_none_or`, stabilized in 1.82, is compatible with an MSRV of 1.85). Always verify the MSRV from the workspace `Cargo.toml` rather than relying on issue templates.

Comment on lines +121 to +122
let config = ctx.router_config.clone();
if config.dp_minimum_tokens_scheduler {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Avoid cloning config just to read a bool flag.

Read ctx.router_config.dp_minimum_tokens_scheduler directly to remove unnecessary clone.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/factory.rs` around lines 121 - 122, Remove the
unnecessary clone of router_config: instead of creating `let config =
ctx.router_config.clone()` just read the flag directly from
`ctx.router_config.dp_minimum_tokens_scheduler`; update the `if` conditional to
use `ctx.router_config.dp_minimum_tokens_scheduler` and remove the unused
`config` binding so we don't clone the config struct just to read a boolean.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
model_gateway/src/core/worker_load.rs (2)

33-37: 🧹 Nitpick | 🔵 Trivial

Use deterministic tie-breaking for equal DP-rank loads.

At Line 35, equal loads currently depend on hash-map iteration order. Tie-break with (load, dp_rank) for stable selection.

♻️ Proposed tweak
-                    .min_by_key(|&(_, load)| load)
-                    .map(|(&rand_id, _)| rand_id);
+                    .min_by_key(|&(dp_rank, load)| (*load, *dp_rank))
+                    .map(|(&dp_rank, _)| dp_rank);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_load.rs` around lines 33 - 37, The current
selection uses loads.iter().min_by_key(|&(_, load)| ...) which produces
non-deterministic ties; change the key to include dp_rank so ties break
deterministically (e.g., use min_by_key with a tuple like (load, dp_rank)) and
ensure you extract the rand_id the same way (referencing loads, min_by_key,
rand_id and dp_rank in worker_load.rs) so equal loads are ordered by dp_rank for
stable selection.

25-27: ⚠️ Potential issue | 🟠 Major

Do not silently drop updates/reads on poisoned lock state.

At Line 25/31/45, if let Ok(...) skips all work when poisoned, which can freeze or stale DP-rank load state without signal.

🔧 Proposed fix
-        if let Ok(mut cached) = self.dp_cached_loads.write() {
-            cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())));
-        }
+        let mut cached = self
+            .dp_cached_loads
+            .write()
+            .unwrap_or_else(|poisoned| poisoned.into_inner());
+        cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())));
-        if let Ok(cached_loads) = self.dp_cached_loads.read() {
+        let cached_loads = self
+            .dp_cached_loads
+            .read()
+            .unwrap_or_else(|poisoned| poisoned.into_inner());
             if let Some(loads) = cached_loads.get(worker.url()) {
                 return loads
                     .iter()
                     .min_by_key(|&(_, load)| load)
                     .map(|(&rand_id, _)| rand_id);
             }
-        }
         None
-        if let Ok(mut cached_loads) = self.dp_cached_loads.write() {
+        let mut cached_loads = self
+            .dp_cached_loads
+            .write()
+            .unwrap_or_else(|poisoned| poisoned.into_inner());
             if let Some(loads) = cached_loads.get_mut(worker.url()) {
                 if let Some(dp_load) = loads.get_mut(&dp_rank) {
                     *dp_load += increment;
                 }
             }
-        }

Also applies to: 31-39, 45-55

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_load.rs` around lines 25 - 27, The current use
of `if let Ok(...) = self.dp_cached_loads.write()` silently ignores poisoned
lock errors and drops updates; replace the `if let Ok` with a `match` on
`self.dp_cached_loads.write()` (and similarly for reads) so you either (a)
handle the PoisonError by calling `err.into_inner()` to recover the guard and
proceed to `cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())))`
while logging a warning, or (b) propagate the error out (return a Result) so
callers are aware; ensure you update every occurrence (the
`dp_cached_loads.write()`, `.read()` sites and the `cached.extend(...)` usages)
to avoid silently dropping updates.
crates/protocols/src/worker.rs (1)

942-949: ⚠️ Potential issue | 🟠 Major

Aggregate duplicate DP-rank snapshots instead of overwriting.

If self.loads contains repeated dp_rank entries, insert at Line 945 replaces prior values and loses token counts. Sum by key to preserve total per-rank load.

🔧 Proposed fix
 pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
     let mut map = HashMap::new();
     for snapshot in &self.loads {
-        map.insert(
-            snapshot.dp_rank as isize,
-            snapshot.num_used_tokens as isize,
-        );
+        *map.entry(snapshot.dp_rank as isize).or_insert(0) +=
+            snapshot.num_used_tokens as isize;
     }
     map
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/protocols/src/worker.rs` around lines 942 - 949, The dp_rank_loads
function currently overwrites counts because it uses map.insert for each
snapshot; change it to aggregate by summing snapshot.num_used_tokens per
snapshot.dp_rank instead — e.g., use HashMap::entry on the same map in
dp_rank_loads and add snapshot.num_used_tokens to the existing value (or
initialize to snapshot.num_used_tokens when absent) so duplicate
snapshot.dp_rank keys accumulate totals rather than replace prior values.
model_gateway/src/policies/dp_min_token.rs (1)

24-31: ⚠️ Potential issue | 🟠 Major

Make the DP-rank reservation atomic.

get_lowest_dp_load() and load_increment() are separate lock acquisitions. Two concurrent requests can both observe the same minimum rank before either bump lands, so bursty traffic can still pile onto one DP rank. Move the pick-and-bump into a single WorkerLoadManager write-locked operation and call that here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/policies/dp_min_token.rs` around lines 24 - 31, The
selection and increment must be atomic: replace the two-step call sequence in
MinimumTokensPolicy::select_dp_rank (which currently calls
worker_load.get_lowest_dp_load(...) and then worker_load.load_increment(...))
with a single write-locked operation on WorkerLoadManager; add a new method on
WorkerLoadManager such as reserve_lowest_dp_load(worker: &dyn Worker, tokens:
isize) -> Option<isize> (or similar name like pick_and_increment_lowest) that
performs the pick-and-bump under one write lock, and call that method from
select_dp_rank instead of separately calling get_lowest_dp_load and
load_increment so concurrent requests cannot observe the same minimum rank.
model_gateway/src/routers/http/pd_router.rs (1)

361-370: ⚠️ Potential issue | 🟠 Major

Feed select_dp_rank() a token estimate, not request_text.len().

context.request_text is only populated when the prefill/decode LB policies ask for it, so with dp_minimum_tokens_scheduler enabled under policies like round-robin or random this becomes 0 and the temporary reservation never bumps. Even when it is present, String::len() is UTF-8 bytes, not the token units stored in dp_rank_loads(). Use actual request token counts/the tokenizer path here, or at minimum make request-text extraction depend on the DP-rank policy too.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/http/pd_router.rs` around lines 361 - 370, The code
is passing UTF-8 byte length from context.request_text into
dp_rank_policy.select_dp_rank (via text_length), which is wrong; change
select_dp_rank calls in this block
(dp_rank_policy.select_dp_rank(prefill.as_ref(), text_length) and
select_dp_rank(decode.as_ref(), text_length)) to receive a token count instead:
compute token estimates using the same tokenizer/path used by dp_rank_loads or
use any request token count already available on the request object, and only
extract request_text when the dp_rank_policy actually needs it (or defer to
dp_rank_policy to request text). Ensure you reference select_dp_rank,
dp_rank_policy, context.request_text, prefill, decode, and dp_rank_loads when
updating the logic so token units match the policy expectations and
dp_minimum_tokens_scheduler works correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/core/worker_load.rs`:
- Around line 30-55: The selection and increment must be done atomically: add a
new method (e.g., reserve_lowest_dp_load or get_and_increment_lowest_dp_load)
that takes &self, worker: &dyn Worker, and increment: isize, acquires the write
lock on dp_cached_loads once, finds the minimum (using the same logic in
get_lowest_dp_load), increments that dp_rank's load by increment, and returns
Option<isize> of the chosen rank; replace callers that currently call
get_lowest_dp_load followed by load_increment (notably in
model_gateway/src/policies/dp_min_token.rs) to call this new method so selection
and increment happen inside the same critical section on dp_cached_loads.

In `@model_gateway/src/routers/factory.rs`:
- Around line 121-130: The code registers a global DP rank policy
(MinimumTokensPolicy) when router_config.dp_minimum_tokens_scheduler is true but
the gRPC PD path (create_grpc_pd_router()) doesn't get this policy, making the
flag a silent no-op for gRPC PD routing; either wire the same policy into the
gRPC path or fail fast when gRPC PD mode is selected. Fix by detecting gRPC PD
mode used by the router (the code path that calls create_grpc_pd_router()) and:
a) when gRPC PD mode is active, create the same MinimumTokensPolicy (using
ctx.load_monitor.as_ref().map(...)) and call
ctx.policy_registry.set_dp_rank_policy(...) inside create_grpc_pd_router(), or
b) if you prefer failing fast, check router_config.dp_minimum_tokens_scheduler
in the code path that selects create_grpc_pd_router() and return an error /
panic indicating the flag is unsupported for gRPC PD; ensure you update only the
relevant locations (the block that constructs MinimumTokensPolicy and the
create_grpc_pd_router() entry point) so both routing paths are consistent.

---

Duplicate comments:
In `@crates/protocols/src/worker.rs`:
- Around line 942-949: The dp_rank_loads function currently overwrites counts
because it uses map.insert for each snapshot; change it to aggregate by summing
snapshot.num_used_tokens per snapshot.dp_rank instead — e.g., use HashMap::entry
on the same map in dp_rank_loads and add snapshot.num_used_tokens to the
existing value (or initialize to snapshot.num_used_tokens when absent) so
duplicate snapshot.dp_rank keys accumulate totals rather than replace prior
values.

In `@model_gateway/src/core/worker_load.rs`:
- Around line 33-37: The current selection uses loads.iter().min_by_key(|&(_,
load)| ...) which produces non-deterministic ties; change the key to include
dp_rank so ties break deterministically (e.g., use min_by_key with a tuple like
(load, dp_rank)) and ensure you extract the rand_id the same way (referencing
loads, min_by_key, rand_id and dp_rank in worker_load.rs) so equal loads are
ordered by dp_rank for stable selection.
- Around line 25-27: The current use of `if let Ok(...) =
self.dp_cached_loads.write()` silently ignores poisoned lock errors and drops
updates; replace the `if let Ok` with a `match` on
`self.dp_cached_loads.write()` (and similarly for reads) so you either (a)
handle the PoisonError by calling `err.into_inner()` to recover the guard and
proceed to `cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())))`
while logging a warning, or (b) propagate the error out (return a Result) so
callers are aware; ensure you update every occurrence (the
`dp_cached_loads.write()`, `.read()` sites and the `cached.extend(...)` usages)
to avoid silently dropping updates.

In `@model_gateway/src/policies/dp_min_token.rs`:
- Around line 24-31: The selection and increment must be atomic: replace the
two-step call sequence in MinimumTokensPolicy::select_dp_rank (which currently
calls worker_load.get_lowest_dp_load(...) and then
worker_load.load_increment(...)) with a single write-locked operation on
WorkerLoadManager; add a new method on WorkerLoadManager such as
reserve_lowest_dp_load(worker: &dyn Worker, tokens: isize) -> Option<isize> (or
similar name like pick_and_increment_lowest) that performs the pick-and-bump
under one write lock, and call that method from select_dp_rank instead of
separately calling get_lowest_dp_load and load_increment so concurrent requests
cannot observe the same minimum rank.

In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 361-370: The code is passing UTF-8 byte length from
context.request_text into dp_rank_policy.select_dp_rank (via text_length), which
is wrong; change select_dp_rank calls in this block
(dp_rank_policy.select_dp_rank(prefill.as_ref(), text_length) and
select_dp_rank(decode.as_ref(), text_length)) to receive a token count instead:
compute token estimates using the same tokenizer/path used by dp_rank_loads or
use any request token count already available on the request object, and only
extract request_text when the dp_rank_policy actually needs it (or defer to
dp_rank_policy to request text). Ensure you reference select_dp_rank,
dp_rank_policy, context.request_text, prefill, decode, and dp_rank_loads when
updating the logic so token units match the policy expectations and
dp_minimum_tokens_scheduler works correctly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7152ed46-8df0-4af6-9fea-71439d4a3a4f

📥 Commits

Reviewing files that changed from the base of the PR and between 243f593 and f0090cd.

📒 Files selected for processing (15)
  • bindings/python/src/lib.rs
  • bindings/python/src/smg/router.py
  • bindings/python/src/smg/router_args.py
  • crates/protocols/src/worker.rs
  • model_gateway/src/config/builder.rs
  • model_gateway/src/config/types.rs
  • model_gateway/src/core/mod.rs
  • model_gateway/src/core/worker_load.rs
  • model_gateway/src/core/worker_manager.rs
  • model_gateway/src/main.rs
  • model_gateway/src/policies/dp_min_token.rs
  • model_gateway/src/policies/mod.rs
  • model_gateway/src/policies/registry.rs
  • model_gateway/src/routers/factory.rs
  • model_gateway/src/routers/http/pd_router.rs

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
model_gateway/src/routers/factory.rs (2)

121-122: 🧹 Nitpick | 🔵 Trivial

Avoid cloning router_config just to read one boolean.

At Line 121-Line 122, read ctx.router_config.dp_minimum_tokens_scheduler directly to remove an unnecessary clone.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/factory.rs` around lines 121 - 122, The code
unnecessarily clones ctx.router_config into let config just to check the
dp_minimum_tokens_scheduler flag; change the check to read the boolean directly
(e.g., use ctx.router_config.dp_minimum_tokens_scheduler) and remove the
intermediate let config = ctx.router_config.clone() binding to avoid the clone
and simplify the code around the conditional that currently references config.

121-130: ⚠️ Potential issue | 🟠 Major

dp_minimum_tokens_scheduler is only wired for HTTP PD, not gRPC PD.

At Line 121-Line 130, DP-rank policy registration happens in create_pd_router only. create_grpc_pd_router does not perform equivalent registration, so the same flag can be a no-op in gRPC PD mode.

🔧 Proposed fix (shared helper used by both PD constructors)
 impl RouterFactory {
+    fn maybe_set_dp_rank_policy(ctx: &Arc<AppContext>) {
+        if ctx.router_config.dp_minimum_tokens_scheduler {
+            let policy = MinimumTokensPolicy::new(
+                ctx.load_monitor
+                    .as_ref()
+                    .map(|m| m.worker_load_manager.clone()),
+            );
+            let dp_rank_policy: Arc<dyn DPRankLoadPolicy> = Arc::new(policy);
+            ctx.policy_registry.set_dp_rank_policy(dp_rank_policy);
+        }
+    }
+
     pub async fn create_pd_router(
         prefill_policy_config: Option<&PolicyConfig>,
         decode_policy_config: Option<&PolicyConfig>,
         main_policy_config: &PolicyConfig,
         ctx: &Arc<AppContext>,
     ) -> Result<Box<dyn RouterTrait>, String> {
@@
-        let config = ctx.router_config.clone();
-        if config.dp_minimum_tokens_scheduler {
-            let mini_tokens_policy = MinimumTokensPolicy::new(
-                ctx.load_monitor
-                    .as_ref()
-                    .map(|load_monitor_arc| load_monitor_arc.worker_load_manager.clone()),
-            );
-            let dp_rank_policy: Arc<dyn DPRankLoadPolicy> = Arc::new(mini_tokens_policy);
-            ctx.policy_registry.set_dp_rank_policy(dp_rank_policy);
-        }
+        Self::maybe_set_dp_rank_policy(ctx);
         let router = PDRouter::new(ctx).await?;
@@
     pub fn create_grpc_pd_router(
         prefill_policy_config: Option<&PolicyConfig>,
         decode_policy_config: Option<&PolicyConfig>,
         main_policy_config: &PolicyConfig,
         ctx: &Arc<AppContext>,
     ) -> Result<Box<dyn RouterTrait>, String> {
@@
+        Self::maybe_set_dp_rank_policy(ctx);
         let router = GrpcPDRouter::new(ctx)?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/factory.rs` around lines 121 - 130, The DP
minimum-tokens policy is only registered in create_pd_router so
dp_minimum_tokens_scheduler is a no-op for gRPC PD; extract the registration
logic into a shared helper (e.g., a function like
register_dp_minimum_tokens_policy) that accepts &RouterContext or the ctx fields
used, constructs MinimumTokensPolicy (using ctx.load_monitor.as_ref().map(|lm|
lm.worker_load_manager.clone())), wraps it as Arc<dyn DPRankLoadPolicy>, and
calls ctx.policy_registry.set_dp_rank_policy(...); then call this helper from
both create_pd_router and create_grpc_pd_router so the flag is honored for both
HTTP and gRPC PD paths.
crates/protocols/src/worker.rs (1)

942-946: ⚠️ Potential issue | 🟠 Major

Aggregate duplicate dp_rank snapshots instead of overwriting.

At Line 945, insert replaces prior entries for the same rank. If duplicate snapshots arrive for one rank, token totals are undercounted and DP-rank routing can skew.

🔧 Proposed fix
 pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
     let mut map = HashMap::new();
     for snapshot in &self.loads {
-        map.insert(snapshot.dp_rank as isize, snapshot.num_used_tokens as isize);
+        *map.entry(snapshot.dp_rank as isize).or_insert(0) +=
+            snapshot.num_used_tokens as isize;
     }
     map
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/protocols/src/worker.rs` around lines 942 - 946, The dp_rank_loads
function currently uses map.insert(...) which overwrites existing entries for
the same dp_rank, causing undercounting when multiple snapshots share a rank;
update dp_rank_loads to aggregate duplicate dp_rank snapshots by summing
snapshot.num_used_tokens into the existing map value (use
HashMap::entry(...).and_modify(...).or_insert(...) or get_mut to add to the
existing value) when iterating self.loads so map[dp_rank] contains the total
tokens across all snapshots.
model_gateway/src/routers/http/pd_router.rs (1)

361-370: ⚠️ Potential issue | 🟠 Major

Use a token-count reservation here, not request_text.len().

PDRequestContext.request_text is only filled when the prefill/decode load-balancing policies ask for text, so enabling dp_minimum_tokens_scheduler alongside a non-text policy makes estimated_cost 0 here and the cached DP-rank bump becomes a no-op. Even when populated, String::len() is bytes rather than tokens, so the reservation diverges from the token gauges the scheduler is comparing against. Please compute this from a token count that is available independently of needs_request_text().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/http/pd_router.rs` around lines 361 - 370, The
estimated_cost uses request_text.len() (bytes) which is wrong; instead derive
estimated_cost from the request token count that exists independently of
needs_request_text() (e.g. a PDRequestContext token-count field such as
request_token_count/request_tokens/request_size_tokens), convert that token
count to isize and fall back to 0 if absent, then pass that value into
dp_rank_policy.select_dp_rank for prefill and decode; update the code around
PDRequestContext.request_text and the estimated_cost binding used by
select_dp_rank (and keep the isize conversion/unwrap_or(0) behavior).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/main.rs`:
- Around line 203-204: The CLI flag dp_minimum_tokens_scheduler lacks a
user-facing help string; add a concise help description for it (either via a doc
comment above the field or by adding help = "..." inside the #[arg(...)]
attribute) so the --help output explains the flag's purpose; update the
#[arg(long, default_value_t = false, help_heading = "Routing Policy")] for
dp_minimum_tokens_scheduler to include a clear one-line help message describing
what enabling the minimum-tokens scheduler does.

---

Duplicate comments:
In `@crates/protocols/src/worker.rs`:
- Around line 942-946: The dp_rank_loads function currently uses map.insert(...)
which overwrites existing entries for the same dp_rank, causing undercounting
when multiple snapshots share a rank; update dp_rank_loads to aggregate
duplicate dp_rank snapshots by summing snapshot.num_used_tokens into the
existing map value (use HashMap::entry(...).and_modify(...).or_insert(...) or
get_mut to add to the existing value) when iterating self.loads so map[dp_rank]
contains the total tokens across all snapshots.

In `@model_gateway/src/routers/factory.rs`:
- Around line 121-122: The code unnecessarily clones ctx.router_config into let
config just to check the dp_minimum_tokens_scheduler flag; change the check to
read the boolean directly (e.g., use
ctx.router_config.dp_minimum_tokens_scheduler) and remove the intermediate let
config = ctx.router_config.clone() binding to avoid the clone and simplify the
code around the conditional that currently references config.
- Around line 121-130: The DP minimum-tokens policy is only registered in
create_pd_router so dp_minimum_tokens_scheduler is a no-op for gRPC PD; extract
the registration logic into a shared helper (e.g., a function like
register_dp_minimum_tokens_policy) that accepts &RouterContext or the ctx fields
used, constructs MinimumTokensPolicy (using ctx.load_monitor.as_ref().map(|lm|
lm.worker_load_manager.clone())), wraps it as Arc<dyn DPRankLoadPolicy>, and
calls ctx.policy_registry.set_dp_rank_policy(...); then call this helper from
both create_pd_router and create_grpc_pd_router so the flag is honored for both
HTTP and gRPC PD paths.

In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 361-370: The estimated_cost uses request_text.len() (bytes) which
is wrong; instead derive estimated_cost from the request token count that exists
independently of needs_request_text() (e.g. a PDRequestContext token-count field
such as request_token_count/request_tokens/request_size_tokens), convert that
token count to isize and fall back to 0 if absent, then pass that value into
dp_rank_policy.select_dp_rank for prefill and decode; update the code around
PDRequestContext.request_text and the estimated_cost binding used by
select_dp_rank (and keep the isize conversion/unwrap_or(0) behavior).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 86e878e7-78a8-4e1c-8d26-f4062e2ed3c5

📥 Commits

Reviewing files that changed from the base of the PR and between f0090cd and a25c255.

📒 Files selected for processing (15)
  • bindings/python/src/lib.rs
  • bindings/python/src/smg/router.py
  • bindings/python/src/smg/router_args.py
  • crates/protocols/src/worker.rs
  • model_gateway/src/config/builder.rs
  • model_gateway/src/config/types.rs
  • model_gateway/src/core/mod.rs
  • model_gateway/src/core/worker_load.rs
  • model_gateway/src/core/worker_manager.rs
  • model_gateway/src/main.rs
  • model_gateway/src/policies/dp_min_token.rs
  • model_gateway/src/policies/mod.rs
  • model_gateway/src/policies/registry.rs
  • model_gateway/src/routers/factory.rs
  • model_gateway/src/routers/http/pd_router.rs

Comment on lines +203 to +204
#[arg(long, default_value_t = false, help_heading = "Routing Policy")]
dp_minimum_tokens_scheduler: bool,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a help string for the new CLI flag.

Unlike the surrounding options, this field has no doc comment or explicit help, so --help will expose --dp-minimum-tokens-scheduler without explaining what it does.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/main.rs` around lines 203 - 204, The CLI flag
dp_minimum_tokens_scheduler lacks a user-facing help string; add a concise help
description for it (either via a doc comment above the field or by adding help =
"..." inside the #[arg(...)] attribute) so the --help output explains the flag's
purpose; update the #[arg(long, default_value_t = false, help_heading = "Routing
Policy")] for dp_minimum_tokens_scheduler to include a clear one-line help
message describing what enabling the minimum-tokens scheduler does.

@ping1jing2
Copy link
Copy Markdown
Collaborator

please update your PR title or commit msg refer to contributing

Comment on lines +386 to +390
routing_group.add_argument(
f"--{prefix}dp-minimum-tokens-scheduler",
action="store_true",
help="Enable minimum tokens scheduler for data parallel group",
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update documentation as there is a user-facing change here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines +942 to +949
pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
let mut map = HashMap::new();
for snapshot in &self.loads {
map.insert(
snapshot.dp_rank as isize,
snapshot.num_used_tokens as isize,
);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

@ping1jing2
Copy link
Copy Markdown
Collaborator

please pay attention to this

@jiashaokun-1 jiashaokun-1 changed the title supports dp rank scheduling and scheduling with the minimun number of… feat: supports dp rank scheduling and scheduling with the minimun number of… Apr 2, 2026
@github-actions github-actions bot added the protocols Protocols crate changes label Apr 2, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
crates/protocols/src/worker.rs (1)

942-947: ⚠️ Potential issue | 🟠 Major

Aggregate duplicate DP-rank snapshots instead of overwriting them.

HashMap::insert drops earlier samples for the same dp_rank, so the router can undercount a rank's token load and pick the wrong target when the backend reports duplicate/defaulted ranks. Sum into the existing entry instead of replacing it.

🔧 Proposed fix
 pub fn dp_rank_loads(&self) -> HashMap<isize, isize> {
     let mut map = HashMap::new();
     for snapshot in &self.loads {
-        map.insert(snapshot.dp_rank as isize, snapshot.num_used_tokens as isize);
+        *map.entry(snapshot.dp_rank as isize).or_insert(0) +=
+            snapshot.num_used_tokens as isize;
     }
     map
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/protocols/src/worker.rs` around lines 942 - 947, The dp_rank_loads
function currently uses HashMap::insert for each snapshot (iterating over
self.loads with snapshot.dp_rank and snapshot.num_used_tokens), which overwrites
previous entries for the same dp_rank; change this to aggregate duplicate
dp_rank entries by summing snapshot.num_used_tokens into the existing map value
instead of replacing it—use the HashMap entry API (or check for an existing
value and add to it) so each map.insert call becomes map.entry(snapshot.dp_rank
as isize).and_modify(|v| *v += snapshot.num_used_tokens as
isize).or_insert(snapshot.num_used_tokens as isize), ensuring the returned
HashMap contains summed token counts per dp_rank.
model_gateway/src/main.rs (1)

203-204: ⚠️ Potential issue | 🟡 Minor

Add a help string for the new CLI flag.

Unlike the surrounding options, this field has no doc comment or explicit help, so --help exposes --dp-minimum-tokens-scheduler without explaining it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/main.rs` around lines 203 - 204, The new CLI flag
dp_minimum_tokens_scheduler lacks a help string; add a brief help description
either as a doc comment above the field or by adding help = "..." inside the
#[arg(...)] attribute for the dp_minimum_tokens_scheduler field so that --help
displays a clear explanation of what the flag does (reference the
dp_minimum_tokens_scheduler field and its #[arg(...)] declaration).
model_gateway/src/routers/factory.rs (1)

121-130: ⚠️ Potential issue | 🟠 Major

dp_minimum_tokens_scheduler is still a silent no-op on gRPC PD.

create_router() sends ConnectionMode::Grpc + RoutingMode::PrefillDecode through create_grpc_pd_router(), but the DP-rank policy is only registered here in create_pd_router(). Users can enable the flag and still get default gRPC PD routing. Wire the same policy into create_grpc_pd_router() or reject that config combination early.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/factory.rs` around lines 121 - 130, The
dp_minimum_tokens_scheduler flag is registered only in create_pd_router() so
create_grpc_pd_router() (called via create_router() with ConnectionMode::Grpc +
RoutingMode::PrefillDecode) silently ignores it; update create_grpc_pd_router()
to construct the same MinimumTokensPolicy (using MinimumTokensPolicy::new with
ctx.load_monitor.as_ref().map(|lm| lm.worker_load_manager.clone())) and call
ctx.policy_registry.set_dp_rank_policy(Arc::new(mini_tokens_policy)) when
ctx.router_config.dp_minimum_tokens_scheduler is true, or alternatively validate
in create_router() and early-return an error if that config combination is
unsupported; touch the functions create_grpc_pd_router, create_pd_router,
create_router, dp_minimum_tokens_scheduler, MinimumTokensPolicy and
ctx.policy_registry.set_dp_rank_policy when implementing the change.
model_gateway/src/routers/http/pd_router.rs (1)

361-366: ⚠️ Potential issue | 🟠 Major

DP-rank estimated cost can collapse to zero and uses the wrong unit.

On Line 363–366, estimated_cost comes from request_text.len() (bytes) with None => 0. Because request_text may be absent when only the DP-rank scheduler needs it, this can turn load increment into a no-op; and byte length is not token count for a token-based scheduler.

Please compute/passthrough a token-count estimate in the same unit as DP load metrics, and ensure it is available whenever DP-rank policy is active.

Also applies to: 367-371

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/http/pd_router.rs` around lines 361 - 366, The
DP-rank estimated_cost currently uses byte length of context.request_text (and
zero when absent), which can produce zero or use the wrong unit; change the
logic in the block that checks self.policy_registry.get_dp_rank_policy()
(dp_rank_policy_opt/dp_rank_policy) to always produce a token-count estimate in
the same unit as your DP load metrics: first check for an existing token count
on the request context (e.g., a field like context.token_count or similar) and
use it if present; otherwise, compute a token estimate from context.request_text
using the project's tokenizer utility (or a lightweight heuristic like
words-to-tokens fallback) instead of request_text.len(); ensure you never return
0 (use a min of 1) so load increment is not a no-op and pass that token-count
into estimated_cost where dp-rank uses it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/core/worker_load.rs`:
- Around line 23-47: The cached per-worker DP-rank maps in dp_cached_loads are
never evicted when a worker/group is explicitly removed, so update_dp_loads and
select_and_increment_lowest_dp_load can operate on stale entries; add a public
remove_workers(&self, urls: &[String]) method on WorkerLoadManager that takes a
list of worker URLs and removes their entries from dp_cached_loads (using the
same RwLock write path as update_dp_loads), then call this helper from
LoadMonitor::on_group_removed with the URLs of removed workers (derived via
Worker::url() or the group metadata) to ensure removed worker URLs are purged
and won't be reused by select_and_increment_lowest_dp_load.

---

Duplicate comments:
In `@crates/protocols/src/worker.rs`:
- Around line 942-947: The dp_rank_loads function currently uses HashMap::insert
for each snapshot (iterating over self.loads with snapshot.dp_rank and
snapshot.num_used_tokens), which overwrites previous entries for the same
dp_rank; change this to aggregate duplicate dp_rank entries by summing
snapshot.num_used_tokens into the existing map value instead of replacing it—use
the HashMap entry API (or check for an existing value and add to it) so each
map.insert call becomes map.entry(snapshot.dp_rank as isize).and_modify(|v| *v
+= snapshot.num_used_tokens as isize).or_insert(snapshot.num_used_tokens as
isize), ensuring the returned HashMap contains summed token counts per dp_rank.

In `@model_gateway/src/main.rs`:
- Around line 203-204: The new CLI flag dp_minimum_tokens_scheduler lacks a help
string; add a brief help description either as a doc comment above the field or
by adding help = "..." inside the #[arg(...)] attribute for the
dp_minimum_tokens_scheduler field so that --help displays a clear explanation of
what the flag does (reference the dp_minimum_tokens_scheduler field and its
#[arg(...)] declaration).

In `@model_gateway/src/routers/factory.rs`:
- Around line 121-130: The dp_minimum_tokens_scheduler flag is registered only
in create_pd_router() so create_grpc_pd_router() (called via create_router()
with ConnectionMode::Grpc + RoutingMode::PrefillDecode) silently ignores it;
update create_grpc_pd_router() to construct the same MinimumTokensPolicy (using
MinimumTokensPolicy::new with ctx.load_monitor.as_ref().map(|lm|
lm.worker_load_manager.clone())) and call
ctx.policy_registry.set_dp_rank_policy(Arc::new(mini_tokens_policy)) when
ctx.router_config.dp_minimum_tokens_scheduler is true, or alternatively validate
in create_router() and early-return an error if that config combination is
unsupported; touch the functions create_grpc_pd_router, create_pd_router,
create_router, dp_minimum_tokens_scheduler, MinimumTokensPolicy and
ctx.policy_registry.set_dp_rank_policy when implementing the change.

In `@model_gateway/src/routers/http/pd_router.rs`:
- Around line 361-366: The DP-rank estimated_cost currently uses byte length of
context.request_text (and zero when absent), which can produce zero or use the
wrong unit; change the logic in the block that checks
self.policy_registry.get_dp_rank_policy() (dp_rank_policy_opt/dp_rank_policy) to
always produce a token-count estimate in the same unit as your DP load metrics:
first check for an existing token count on the request context (e.g., a field
like context.token_count or similar) and use it if present; otherwise, compute a
token estimate from context.request_text using the project's tokenizer utility
(or a lightweight heuristic like words-to-tokens fallback) instead of
request_text.len(); ensure you never return 0 (use a min of 1) so load increment
is not a no-op and pass that token-count into estimated_cost where dp-rank uses
it.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 34b00d07-fea7-4c4e-8fd9-d7be4f5725d1

📥 Commits

Reviewing files that changed from the base of the PR and between a25c255 and f50665b.

📒 Files selected for processing (15)
  • bindings/python/src/lib.rs
  • bindings/python/src/smg/router.py
  • bindings/python/src/smg/router_args.py
  • crates/protocols/src/worker.rs
  • model_gateway/src/config/builder.rs
  • model_gateway/src/config/types.rs
  • model_gateway/src/core/mod.rs
  • model_gateway/src/core/worker_load.rs
  • model_gateway/src/core/worker_manager.rs
  • model_gateway/src/main.rs
  • model_gateway/src/policies/dp_min_token.rs
  • model_gateway/src/policies/mod.rs
  • model_gateway/src/policies/registry.rs
  • model_gateway/src/routers/factory.rs
  • model_gateway/src/routers/http/pd_router.rs

Comment on lines +23 to +47
pub fn update_dp_loads(&self, loads: &HashMap<String, HashMap<isize, isize>>) {
debug!("WorkerLoadManager update_dp_loads map:{:?}", loads);
let mut cached = self
.dp_cached_loads
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())));
}

pub fn select_and_increment_lowest_dp_load(
&self,
worker: &dyn Worker,
increment: isize,
) -> Option<isize> {
let mut cached = self
.dp_cached_loads
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let loads = cached.get_mut(worker.url())?;
let (&dp_rank, _) = loads.iter().min_by_key(|&(rank, load)| (*load, *rank))?;
if let Some(v) = loads.get_mut(&dp_rank) {
*v += increment;
}
Some(dp_rank)
}
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add an eviction path for removed worker URLs.

update_dp_loads() only refreshes URLs that appear in the latest scrape, and select_and_increment_lowest_dp_load() trusts whatever is still cached. After an explicit worker/group removal, that stale per-rank map stays resident indefinitely, so a restarted worker that comes back on the same URL before the next poll can inherit the previous instance's DP-rank map and route the first requests to stale or invalid ranks. Please add a remove_workers() helper here and call it from LoadMonitor::on_group_removed().

🔧 One way to fix it
 pub struct WorkerLoadManager {
     // <worker, <dp_rank, loads>>
     dp_cached_loads: RwLock<HashMap<String, HashMap<isize, isize>>>,
 }
@@
     pub fn update_dp_loads(&self, loads: &HashMap<String, HashMap<isize, isize>>) {
         debug!("WorkerLoadManager update_dp_loads map:{:?}", loads);
         let mut cached = self
             .dp_cached_loads
             .write()
             .unwrap_or_else(|poisoned| poisoned.into_inner());
         cached.extend(loads.iter().map(|(k, v)| (k.clone(), v.clone())));
     }
+
+    pub fn remove_workers(&self, worker_urls: &[String]) {
+        let mut cached = self
+            .dp_cached_loads
+            .write()
+            .unwrap_or_else(|poisoned| poisoned.into_inner());
+        for url in worker_urls {
+            cached.remove(url);
+        }
+    }

And in model_gateway/src/core/worker_manager.rs:

         if !worker_urls.is_empty() {
+            self.worker_load_manager.remove_workers(worker_urls);
             self.tx.send_modify(|map| {
                 for url in worker_urls {
                     map.remove(url);
                 }
             });
         }
Based on learnings: `PowerOfTwoPolicy` intentionally keeps last-known-good load data during transient worker failures; the gap here is that explicit worker removal never purges the cached URL.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/core/worker_load.rs` around lines 23 - 47, The cached
per-worker DP-rank maps in dp_cached_loads are never evicted when a worker/group
is explicitly removed, so update_dp_loads and
select_and_increment_lowest_dp_load can operate on stale entries; add a public
remove_workers(&self, urls: &[String]) method on WorkerLoadManager that takes a
list of worker URLs and removes their entries from dp_cached_loads (using the
same RwLock write path as update_dp_loads), then call this helper from
LoadMonitor::on_group_removed with the URLs of removed workers (derived via
Worker::url() or the group metadata) to ensure removed worker URLs are purged
and won't be reused by select_and_increment_lowest_dp_load.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review this comment @jiashaokun-1

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Apr 2, 2026
@jiashaokun-1
Copy link
Copy Markdown
Author

please pay attention to this

fixed

… tokens

Signed-off-by: jiashaokun <1114621279@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation model-gateway Model gateway crate changes protocols Protocols crate changes python-bindings Python bindings changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants