-
Couldn't load subscription status.
- Fork 662
fix: introduce service discovery interface (1/n) #3937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: mohammedabdulwahhab <[email protected]>
Signed-off-by: mohammedabdulwahhab <[email protected]>
| @@ -0,0 +1,174 @@ | |||
| // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mock will eventually be replaced with one that uses the memory impl for KeyValueStoreManager
WalkthroughThis PR introduces a pluggable service discovery system to the runtime. It defines core discovery abstractions (DiscoveryKey, DiscoveryInstance, DiscoveryEvent, DiscoveryClient trait) and provides a mock in-memory implementation for testing. The discovery client is integrated into DistributedRuntime as a lazy-initialized field. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–25 minutes
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/runtime/src/distributed.rs (1)
20-24: Name collision confirmed:errorimported twice (compile error E0252)The figment::error import conflicts with the super module's re-export of
anyhow::anyhowaserror. Line 234 uses the anyhow macro (error!("...")), while figment::error is unused. Remove the figment import.-use figment::error;
🧹 Nitpick comments (8)
lib/runtime/src/lib.rs (1)
99-101: Trait-object field is fine; consider a type alias for clarity
Arc<OnceCell<Arc<dyn discovery::DiscoveryClient>>>is correct. For readability and fewer repeated bounds, consider aliasing:+// in lib/runtime/src/discovery/mod.rs (or a shared prelude) +pub type DiscoveryClientRef = Arc<dyn DiscoveryClient + 'static>; + -// here -discovery_client: Arc<OnceCell<Arc<dyn discovery::DiscoveryClient>>>, +discovery_client: Arc<OnceCell<DiscoveryClientRef>>,lib/runtime/src/distributed.rs (1)
228-239: Wire a usable default for early testing instead of hard errorReturning
Err(error!(...))blocks adopters. Prefer initializing a mock whenself.is_staticor under acfg(test)/feature flag.- pub async fn discovery_client(&self) -> Result<Arc<dyn DiscoveryClient>> { - let client = self - .discovery_client - .get_or_try_init(async { - // TODO: Replace when KeyValueDiscoveryClient is implemented - Err(error!("No discovery clients yet implemented.")) - }) - .await?; - Ok(client.clone()) - } + pub async fn discovery_client(&self) -> Result<Arc<dyn DiscoveryClient>> { + let client = self + .discovery_client + .get_or_try_init(async { + #[cfg(feature = "discovery-mock")] + { + // Use in-memory mock for static or test environments + let registry = crate::discovery::SharedMockRegistry::new(); + let mock = crate::discovery::MockDiscoveryClient::new( + format!("drt-{}", self.connection_id()), + registry, + ); + return OK(Arc::new(mock) as Arc<dyn DiscoveryClient>); + } + // TODO: Replace with KeyValueDiscoveryClient (etcd/KV-backed) + Err(error!("No discovery clients yet implemented")) + }) + .await?; + Ok(client.clone()) + }Add imports near the top:
+use crate::discovery::{MockDiscoveryClient, SharedMockRegistry};lib/runtime/src/discovery/mod.rs (3)
36-43: Include key context in Removed events
Removed(String)only carriesinstance_id. Downstream often needs key context for metrics/logs; consider:-pub enum DiscoveryEvent { - Added(DiscoveryInstance), - Removed(String), -} +pub enum DiscoveryEvent { + Added(DiscoveryInstance), + Removed { key: DiscoveryKey, instance_id: String }, +}This avoids coupling on external state to correlate removals.
45-47: Stream alias is good; considerfutures_core::StreamTo reduce dependency surface in trait signatures, you can use
futures_core::Streaminstead offutures::Stream. Optional.-use futures::Stream; +use futures_core::Stream;
49-60: Lifecycle: add an unserve/deregister pathMost backends need explicit deregistration or a lease guard. Consider adding:
async fn unserve(&self, instance: &DiscoveryInstance) -> Result<()>;Alternatively, make
servereturn a drop-guard that deregisters onDrop.lib/runtime/src/discovery/mock.rs (3)
10-20: Avoid std::sync::Mutex in async codeUsing
std::sync::Mutexinside async tasks can block the scheduler. Since this is test-only, it’s acceptable, but preferparking_lot::Mutex(fast) ortokio::sync::Mutexif held across.await.-use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use parking_lot::Mutex;
69-110: Polling loop: useintervaland configurable periodTight
sleep(10ms)loops can cause jitter and unnecessary wakeups. Usetokio::time::intervaland make the period configurable.- let stream = async_stream::stream! { - let mut known_instances = HashSet::new(); - loop { + let stream = async_stream::stream! { + let mut known_instances = HashSet::new(); + let mut ticker = tokio::time::interval(tokio::time::Duration::from_millis(50)); + loop { // ... - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + ticker.tick().await; } };
118-174: Harden test with timeouts and helper removal APITests can hang if events don’t arrive. Wrap
next()withtimeout, and consider exposing aremove()helper instead of mutating internals.- let event = stream.next().await.unwrap().unwrap(); + let event = tokio::time::timeout( + tokio::time::Duration::from_secs(1), + stream.next() + ).await.expect("event timed out").unwrap().unwrap();Optionally add:
impl SharedMockRegistry { pub fn remove(&self, key: &DiscoveryKey, instance_id: &str) { let mut g = self.instances.lock().unwrap(); if let Some(vec) = g.get_mut(key) { vec.retain(|i| matches!(i, DiscoveryInstance::Endpoint { instance_id: id, .. } if id != instance_id)); } } }Then use
registry.remove(&key, "instance-1");.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
lib/runtime/src/discovery/mock.rs(1 hunks)lib/runtime/src/discovery/mod.rs(1 hunks)lib/runtime/src/distributed.rs(3 hunks)lib/runtime/src/lib.rs(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
lib/runtime/src/discovery/mock.rs (1)
lib/runtime/src/discovery/mod.rs (3)
instance_id(53-53)serve(56-56)list_and_watch(59-59)
lib/runtime/src/discovery/mod.rs (2)
lib/bindings/python/src/dynamo/_core.pyi (1)
Endpoint(133-174)lib/runtime/src/discovery/mock.rs (3)
instance_id(40-42)serve(44-67)list_and_watch(69-110)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: vllm (amd64)
- GitHub Check: trtllm (arm64)
- GitHub Check: sglang
- GitHub Check: operator (amd64)
- GitHub Check: vllm (arm64)
- GitHub Check: trtllm (amd64)
- GitHub Check: operator (arm64)
- GitHub Check: clippy (.)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: clippy (lib/bindings/python)
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: tests (.)
- GitHub Check: Build and Test - dynamo
- GitHub Check: tests (launch/dynamo-run)
🔇 Additional comments (3)
lib/runtime/src/lib.rs (1)
25-25: Module exposure looks goodPublicly exposing
pub mod discovery;aligns with the new API surface. No issues.lib/runtime/src/distributed.rs (1)
94-94: OnceCell for discovery client: LGTMLazy shared init via
Arc<OnceCell<...>>matches existing patterns (e.g.,tcp_server).lib/runtime/src/discovery/mod.rs (1)
12-22: Key shape is fine for v1
DiscoveryKey::Endpoint { namespace, component, endpoint }is a clear minimal surface. No issues.
| component: String, | ||
| endpoint: String, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other types to you anticipate having here?
| fn instance_id(&self) -> String; | ||
|
|
||
| /// Registers an object in the discovery plane with the instance id | ||
| async fn serve(&self, key: DiscoveryKey) -> Result<DiscoveryInstance>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be register? The first word in the comment is "Registers".
serve makes me think of a server, like an HTTP server for example, so I expect a long running thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I was also thinking something along the lines of publish or broadcast could work
| async fn serve(&self, key: DiscoveryKey) -> Result<DiscoveryInstance>; | ||
|
|
||
| /// Returns a stream of discovery events (Added/Removed) for the given discovery key | ||
| async fn list_and_watch(&self, key: DiscoveryKey) -> Result<DiscoveryStream>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To discover new models you watch model_card::ROOT_PATH which is v1/mdc. So not a DiscoveryKey.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To discover new instances you watch component::INSTANCE_ROOT_PATH which is v1/instances.
| // TODO: Replace when KeyValueDiscoveryClient is implemented | ||
| Err(error!("No discovery clients yet implemented.")) | ||
| }) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you initialize it in new? Then you don't need the OnceCell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this makes sense
Summary by CodeRabbit
New Features