-
Notifications
You must be signed in to change notification settings - Fork 716
feat: worker-local KvIndexer in KvEventPublisher #4519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change adds a new workspace member Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Tip 📝 Customizable high-level summaries are now available in beta!You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.
Example instruction:
Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
lib/llm/src/kv_router/indexer.rs (1)
1031-1123: LocalKvIndexer buffering logic and wiring look correct overallThe wrapper correctly:
- Keeps a bounded circular buffer of
(WorkerId, KvCacheEvent)withVecDequeand trims from the front.- Records events before forwarding them to the underlying
KvIndexerviaevent_sender().send(...).- Exposes helpers for recent/all events, clearing, and buffer length, which will be useful for debugging and tests.
Two follow‑ups worth considering:
- Error handling in trait impls
apply_event_with_buffersurfacesIndexerOfflineviaResult, andstart_event_processorlogs this, which is good.- However, the
KvIndexerInterfaceimpl swallows errors:
async fn apply_event(&mut self, event: RouterEvent)ignores theResultfromapply_event_with_buffer.async fn remove_worker(&mut self, worker: WorkerId)ignores theResultfromremove_worker_sender().send(worker).await.This diverges from
KvIndexer’s behavior (whichunwrap()s and fails fast if the channels are broken). If the trait paths are ever used in production, you probably want at least atracing::warn!on these error paths, or to mirror the “panic on impossible state” behavior.
- Minor ergonomics
Deref<Target = Arc<KvIndexer>>is usable (double‑deref toKvIndexerworks), butTarget = KvIndexerwould be more natural and avoid exposing theArcat the API surface.- Using
tokio::sync::Mutexinstead ofstd::sync::Mutexforevent_bufferwould avoid blocking the async scheduler, though this is likely low impact since the critical section is tiny.lib/llm/src/kv_router/publisher.rs (3)
134-145: Local indexer construction is reasonable but has a couple of tunablesThe local indexer creation is sound:
- Uses the same
CancellationTokenas the publisher, so shutdown behavior is unified.- Uses
KvIndexerMetrics::new_unregistered()to avoid polluting global metrics, which is defensible for a per‑worker helper.Two knobs you might eventually want to expose:
max_buffer_sizeis hard‑coded as100with a TODO; consider threading this as a parameter onnew_with_local_indexer(or via config) so it can be tuned per deployment.- If you ever want visibility into local indexer behavior in production, allowing injection of
KvIndexerMetrics::from_componenthere would help.
194-197: local_indexer accessor matches the internal representation
pub fn local_indexer(&self) -> Option<&Arc<LocalKvIndexer>>correctly exposes the optional local indexer for tests or higher‑level consumers.If you later change
Deref<Target>onLocalKvIndexer, you might also consider returningOption<&LocalKvIndexer>instead of theArc, but that’s cosmetic.
1218-1279: BlockRemoved + local indexer test looks correct but could log indexer failuresThe
test_event_processor_block_removed_with_local_indexertest:
- Sends a
Storedevent followed by aRemovedevent through the same channel.- Ensures:
- The global publish path sees both events.
- The local indexer’s
find_matchesreturns no hits after the removal.That correctly validates that
KvCacheEventData::Removedis applied to the local indexer.Given that
LocalKvIndexer::remove_workerandapply_eventtrait methods currently ignore send errors, consider adding minimal logging there as well, to mirror the “log and continue” behavior used instart_event_processor. This isn’t test‑blocking but would help diagnose broken channels in non‑publisher call sites.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
Cargo.toml(1 hunks)lib/llm/src/kv_router/indexer.rs(1 hunks)lib/llm/src/kv_router/publisher.rs(10 hunks)
🧰 Additional context used
🧠 Learnings (8)
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.
Applied to files:
lib/llm/src/kv_router/indexer.rslib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Applied to files:
lib/llm/src/kv_router/indexer.rslib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.
Applied to files:
lib/llm/src/kv_router/indexer.rslib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
Repo: ai-dynamo/dynamo PR: 1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.
Applied to files:
lib/llm/src/kv_router/indexer.rslib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-09-17T20:55:06.333Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/indexer.rs:0-0
Timestamp: 2025-09-17T20:55:06.333Z
Learning: When PeaBrane encounters a complex implementation issue that would significantly expand PR scope (like the remove_worker_sender method in lib/llm/src/kv_router/indexer.rs that required thread-safe map updates and proper shard targeting), they prefer to remove the problematic implementation entirely rather than rush a partial fix, deferring the proper solution to a future PR.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-05-30T06:38:09.630Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-06-05T01:02:15.318Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
Applied to files:
lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Applied to files:
lib/llm/src/kv_router/publisher.rs
🔇 Additional comments (9)
Cargo.toml (1)
18-20: Workspace member addition looks fineAdding
"lib/kvbm-hub"to the workspace members is consistent with the existing layout; no issues from this file’s perspective. Just ensure the new crate exists and has its ownCargo.tomlbefore merging.Run
cargo metadataorcargo check -p kvbm-hublocally to verify the new member is wired correctly.lib/llm/src/kv_router/publisher.rs (8)
4-9: Importing LocalKvIndexer and compute_block_hash_for_seq is appropriateBringing
LocalKvIndexerandcompute_block_hash_for_seqinto the publisher module matches their usage below (local indexer wiring and block hash computation). No issues here.
95-107: Constructor split/new_with_local_indexer maintains backward compatibility
KvEventPublisher::new(...)now delegates tonew_with_local_indexer(..., enable_local_indexer = false), so existing callers see identical behavior.new_with_local_indexerprovides an explicit opt‑in for worker‑local indexing, which is a clean extension point.Looks good from an API‑evolution standpoint.
159-175: Threading local_indexer into the event processor is correctPassing a cloned
Option<Arc<LocalKvIndexer>>into the spawnedstart_event_processortask is the right way to:
- Share the local indexer across all events for this worker.
- Keep its lifetime tied to the publisher/event loop without extra shutdown plumbing.
No functional issues spotted here.
216-255: Event processor correctly applies local indexer then publishes, with good failure isolationThe updated
start_event_processor:
- Wraps incoming
KvCacheEventinto aRouterEventwith the worker’s ID (unchanged behavior).- If
local_indexeris present, callsapply_event_with_buffer(router_event.clone()):
- This ensures the worker‑local indexer stays in sync and records the event before global distribution.
- On failure (
IndexerOffline), logs a warning but continues.- Always attempts to publish the same
RouterEventto NATS, regardless of local indexer outcome.This is the right failure‑isolation boundary: local indexer issues don’t take down or stall global event publishing. Logging at
warn!is appropriate, since a permanently offline local indexer indicates misconfiguration or a shutdown condition.No changes requested here.
1047-1107: MockComponent and updated test_start_event_processor stay valid after the signature changeThe
MockComponentstand‑in forEventPublisherremains minimal and correct, andtest_start_event_processorhas been updated to passNonefor the newlocal_indexerparameter, still asserting a single publish toQUEUE_NAME.This keeps the original behavior covered while leaving room for local‑indexer tests.
1142-1213: test_start_event_processor_with_local_indexer exercises the happy path wellThis test validates the critical behavior when a local indexer is enabled:
- It verifies the NATS publish path still receives exactly one event on
QUEUE_NAME.- It uses
get_workers_senderon the underlyingKvIndexer(throughLocalKvIndexer) to confirm that worker1is now tracked, proving the local indexer applied the event.The use of
KvIndexerMetrics::new_unregistered()and a dedicatedCancellationTokenis consistent with how the production code constructs a local indexer.No issues here.
1284-1343: AllBlocksCleared behavior with local indexer is covered and matches router semanticsThis test:
- Stores a block, then sends a
KvCacheEventData::Clearedevent.- Verifies:
- Local indexer
find_matchesyields no matches, i.e., the worker’s blocks are cleared.- Two events were still published globally.
This aligns with the intended behavior in
RadixTree::apply_eventwhereClearedclears all blocks for the worker (all dp_ranks), as per prior design notes. Based on learnings.
1348-1389: Local indexer failure test correctly ensures NATS publishing proceeds
test_event_processor_local_indexer_failure_continuesintentionally cancels the local indexer’s token before sending an event, then:
- Runs
start_event_processorwith that dead indexer.- Asserts exactly one published event to NATS.
This proves the “log‑warn but keep publishing” behavior on local indexer failure works as intended.
Implementation and test are aligned; no change requested.
|
@karen-sy Thanks for putting this up! I like the general shape. When the CI is green, I’ll go through it in detail. |
| /// A thin wrapper around KvIndexer that buffers recent events | ||
| /// (e.g. which may be queued by router upon startup) | ||
| /// | ||
| pub struct LocalKvIndexer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would probably also need the following features
- Some check that the event ids are queued in consecutive integers.
- Some sort of binary search mechanism (can use a crate), to pinpoint the event in the buffer with
event_id = ...and get the events from that point on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure, I'm going to prototype the router<->localindexer comm route first (initially assuming that the router only ever asks for a full event dump) and then implement those features
Overview:
Details:
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.