Skip to content

Conversation

@mohammedabdulwahhab
Copy link
Contributor

@mohammedabdulwahhab mohammedabdulwahhab commented Oct 26, 2025

  • Adds a ServiceDiscovery interface to be used in place of direct ETCD calls
  • Introduces a Kubernetes impl of the interface using EndpointSlices
  • Use of ServiceDiscovery interface is feature flagged via the env var USE_SERVICE_DISCOVERY

Summary by CodeRabbit

Release Notes

  • New Features

    • Added Kubernetes-based service discovery integration, enabling automatic instance registration and discovery via Kubernetes endpoints.
    • Introduced configurable service discovery via USE_SERVICE_DISCOVERY environment variable.
    • System now watches for instance changes and updates in real-time.
  • Chores

    • Added Kubernetes and API client dependencies to support service discovery capabilities.

@copy-pr-bot
Copy link

copy-pr-bot bot commented Oct 26, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@grahamking
Copy link
Contributor

@mohammedabdulwahhab When you get closer, could you split this into several smaller units? 3k lines is a lot!

@mohammedabdulwahhab mohammedabdulwahhab changed the title Mabdulwahhab/etcdless fix: use service discovery interface (1/n) Oct 27, 2025
@github-actions github-actions bot added the fix label Oct 27, 2025
@mohammedabdulwahhab mohammedabdulwahhab marked this pull request as ready for review October 27, 2025 18:14
@mohammedabdulwahhab mohammedabdulwahhab requested review from a team as code owners October 27, 2025 18:14
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 27, 2025

Walkthrough

This PR introduces Kubernetes-based service discovery to the runtime, replacing ETCD-based instance registration. It adds trait-based discovery abstractions, a Kubernetes implementation using EndpointSlices, and integrates service discovery throughout the component lifecycle with backward compatibility via feature gates and environment configuration.

Changes

Cohort / File(s) Summary
Example namespace updates
examples/custom_backend/hello_world/client.py, examples/custom_backend/hello_world/hello_world.py
Changed namespace context from "hello_world" to "test" in endpoint lookup and runtime initialization.
Kubernetes dependencies
lib/runtime/Cargo.toml
Added three new dependencies: kube (2.0.1), k8s-openapi (0.26.0), and schemars (1) to support Kubernetes API interactions.
Discovery module foundation
lib/runtime/src/discovery/mod.rs
New public service discovery API defining traits (ServiceDiscovery, InstanceHandle), types (Instance, InstanceStatus, InstanceEvent), error handling (DiscoveryError), and a helper function (instance_id_to_u64).
Kubernetes discovery implementation
lib/runtime/src/discovery/kubernetes.rs
New Kubernetes-based ServiceDiscovery implementation using EndpointSlices. Includes KubernetesServiceDiscovery struct with watcher spawning per namespace/component, instance extraction from EndpointSlices, and KubernetesInstanceHandle with pod name reading. Includes unit and integration tests.
Service discovery configuration
lib/runtime/src/config.rs
Added is_service_discovery_enabled() helper function that reads the USE_SERVICE_DISCOVERY environment variable.
Component discovery integration
lib/runtime/src/component.rs
Added instance_handle field (Arc) to store discovery registration result, new register_instance() method for registration, instance_handle() accessor, and changed add_stats_service signature from &mut self to &self.
Component endpoint registration
lib/runtime/src/component/endpoint.rs
Introduced feature-gated registration paths: uses ServiceDiscovery with instance_id when enabled, falls back to ETCD lease-based registration when disabled. Updated to compute instance_id from discovery handle or lease_id, and mark instance as Ready.
Component service visibility
lib/runtime/src/component/service.rs
Made SERVICE_VERSION constant public.
Client discovery routing
lib/runtime/src/component/client.rs
Major refactoring: new_dynamic now branches to new_dynamic_v2 (ServiceDiscovery) or new_dynamic_etcd (ETCD) based on is_service_discovery_enabled(). Added get_or_create_dynamic_instance_source_v2 for ServiceDiscovery watcher setup and convert_discovery_instance_to_runtime helper for instance translation.
Distributed runtime integration
lib/runtime/src/distributed.rs
Added service_discovery field to hold optional ServiceDiscovery instance, new create_service_discovery() async helper that initializes Kubernetes discovery when enabled, and public service_discovery() accessor.
Public API exposure
lib/runtime/src/lib.rs
Exposed config module as public, added private service_discovery field to DistributedRuntime.
Python bindings
lib/bindings/python/rust/lib.rs
Modified Component::create_service to use immutable binding for inner variable and added conditional service discovery registration via register_instance() with error propagation.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Kubernetes watcher implementation (lib/runtime/src/discovery/kubernetes.rs): Complex state management with broadcast channels, EndpointSlice watching, and event streaming logic requires careful verification of correctness.
  • Feature-gated registration paths (lib/runtime/src/component/endpoint.rs): Multiple conditional branches for ServiceDiscovery vs. ETCD require tracing execution paths and verifying both legacy and new flows work correctly.
  • Instance discovery lifecycle (lib/runtime/src/component.rs, lib/runtime/src/component/client.rs): New OnceCell-based instance tracking and the new_dynamic_v2 conversion logic involving discovery instances to runtime instances need careful scrutiny.
  • Cross-module coordination: Changes span 11+ files with interdependencies; verify that service_discovery initialization, instance registration, and client discovery logic integrate correctly.

Poem

🐰 ServiceDiscovery hops arrive,
Kubernetes keeps instances alive,
EndpointSlices dance and sway,
Watching pods the modern way!
No more ETCD leases today!

Pre-merge checks

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.08% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description Check ⚠️ Warning The pull request description is significantly incomplete compared to the required template. The author provided only a brief three-bullet-point summary that covers some aspects of the changes but lacks critical sections required by the template. Specifically, the description is missing the "Overview" section with detailed context, the "Where should the reviewer start?" section identifying key files for review, and the "Related Issues" section with action keywords and issue references. Additionally, the provided details are extremely terse and do not adequately expand on the changes or their rationale. While the content that is present is on-topic and relevant to the PR objectives, the overall structure and completeness fall substantially short of the template requirements. The author should revise the PR description to follow the required template structure. This includes adding an "Overview" section that provides context for why these changes are being made, expanding the "Details" section with more comprehensive information about each change, adding a "Where should the reviewer start?" section that highlights the key files for review (such as lib/runtime/src/discovery/mod.rs, lib/runtime/src/discovery/kubernetes.rs, and lib/runtime/src/component/client.rs), and including a "Related Issues" section with appropriate action keywords and any relevant GitHub issue numbers. The large size of this PR (noted as ~3,000 lines) makes a clear, well-structured description even more important for reviewers.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The pull request title "fix: use service discovery interface (1/n)" directly and clearly describes the main change in the changeset. The title captures the primary objective of introducing and using a service discovery interface as a replacement for direct ETCD calls, as evidenced by the new ServiceDiscovery trait, Kubernetes implementation, and related refactoring throughout the codebase. The "(1/n)" suffix appropriately indicates this is part of a series of changes, which aligns with the PR objectives noting this is a large changeset that will eventually be split. The title is concise, specific, and avoids vague terminology.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (1)
examples/custom_backend/hello_world/client.py (1)

27-27: Clarify the namespace change and fix formatting.

Same question as in hello_world.py: Is the namespace change from "hello_world" to "test" intentional?

Additionally, please address the Black formatting failure reported in the pipeline.

🧹 Nitpick comments (16)
lib/runtime/src/config.rs (1)

424-431: Consider using env_is_truthy() for consistency.

The current implementation uses parse::<bool>() which only accepts "true" or "false" strings. For consistency with other environment variable checks in this codebase (lines 409-414), consider using env_is_truthy("USE_SERVICE_DISCOVERY") which accepts "1", "true", "on", "yes" as truthy values.

Apply this diff for consistency:

-/// Check if service discovery is enabled via USE_SERVICE_DISCOVERY environment variable
-/// Returns true if USE_SERVICE_DISCOVERY is set to a truthy value, false otherwise
-pub fn is_service_discovery_enabled() -> bool {
-    std::env::var("USE_SERVICE_DISCOVERY")
-        .ok()
-        .and_then(|v| v.parse::<bool>().ok())
-        .unwrap_or(false)
-}
+/// Check if service discovery is enabled via USE_SERVICE_DISCOVERY environment variable
+/// Returns true if USE_SERVICE_DISCOVERY is set to a truthy value, false otherwise
+pub fn is_service_discovery_enabled() -> bool {
+    env_is_truthy("USE_SERVICE_DISCOVERY")
+}
lib/runtime/src/distributed.rs (1)

211-224: Add error logging for service discovery initialization failures.

The implementation is sound, but consider adding error logging when Kubernetes service discovery initialization fails. Currently, errors are propagated silently.

Apply this diff to improve observability:

 async fn create_service_discovery() -> Result<Option<Arc<Box<dyn ServiceDiscovery>>>> {
     if !crate::config::is_service_discovery_enabled() {
         tracing::debug!("Service discovery disabled (USE_SERVICE_DISCOVERY not set)");
         return Ok(None);
     }

     // Currently only Kubernetes discovery is supported
     // In the future, this can be extended to support other backends
     tracing::info!("Initializing Kubernetes-based service discovery");
-    let discovery: Box<dyn ServiceDiscovery> = Box::new(KubernetesServiceDiscovery::new().await?);
+    let discovery: Box<dyn ServiceDiscovery> = Box::new(
+        KubernetesServiceDiscovery::new()
+            .await
+            .map_err(|e| {
+                tracing::error!("Failed to initialize Kubernetes service discovery: {}", e);
+                e
+            })?
+    );
     Ok(Some(Arc::new(discovery)))
 }
lib/runtime/src/component.rs (3)

174-179: Prefer Arc in OnceCell to avoid borrowing trait objects across awaits

Holding an &Box across awaits is brittle. Store Arc and return clones for ergonomic, Send+Sync-safe usage.

Apply:

-    #[builder(default = "Arc::new(async_once_cell::OnceCell::new())")]
-    #[educe(Debug(ignore))]
-    instance_handle: Arc<async_once_cell::OnceCell<Box<dyn crate::discovery::InstanceHandle>>>,
+    #[builder(default = "Arc::new(async_once_cell::OnceCell::new())")]
+    #[educe(Debug(ignore))]
+    instance_handle: Arc<async_once_cell::OnceCell<Arc<dyn crate::discovery::InstanceHandle>>>,

And adjust the accessor accordingly (see below).


277-282: Return a dyn reference (or Arc) instead of &Box to simplify call sites

Current signature leaks Box and complicates borrowing across awaits. Minimal change: return &dyn. Ideal if you adopt Arc above.

Minimal change:

-    pub fn instance_handle(&self) -> Result<&Box<dyn crate::discovery::InstanceHandle>> {
-        let handle = self.instance_handle.get();
-        handle.ok_or_else(|| error!("Component not registered with service discovery"))
-    }
+    pub fn instance_handle(&self) -> Result<&(dyn crate::discovery::InstanceHandle)> {
+        self.instance_handle
+            .get()
+            .map(|b| b.as_ref())
+            .ok_or_else(|| error!("Component not registered with service discovery"))
+    }

If you switch the cell to store Arc<dyn ...>:

-    pub fn instance_handle(&self) -> Result<&(dyn crate::discovery::InstanceHandle)> { ... }
+    pub fn instance_handle(&self) -> Result<Arc<dyn crate::discovery::InstanceHandle>> {
+        self.instance_handle
+            .get()
+            .cloned()
+            .ok_or_else(|| error!("Component not registered with service discovery"))
+    }

390-435: NATS service creation race handled; minor nit: remove pre-check to reduce TOCTOU

You already resolve the race by re-checking and stopping the created service. The initial contains_key pre-check adds needless cost. Consider removing it and relying on the single guarded insert path.

lib/runtime/src/component/endpoint.rs (2)

88-93: Debug message mixes legacy path details; make neutral

When discovery is enabled, printing etcd_path_with_lease_id is confusing.

Example:

-        tracing::debug!(
-            "Starting endpoint: {} (instance_id: {})",
-            endpoint.etcd_path_with_lease_id(lease_id),
-            instance_id
-        );
+        tracing::debug!("Starting endpoint {}/{}:{} (instance_id: {})",
+            endpoint.component.namespace().name(),
+            endpoint.component.name(),
+            endpoint.name(),
+            instance_id
+        );

142-147: subject uses instance_id — OK; guard etcd-only vars

Minor: gate etcd_path/etcd_client construction under legacy path to avoid unused values when discovery is enabled.

lib/runtime/src/component/client.rs (2)

186-210: Resetting both available and free sets each update may mask “busy” state

Overwriting instance_free with the full set discards “busy” inhibition signals. Consider updating instance_free only from busy_instance_ids to preserve intent.


206-210: Run cargo fmt to remove trailing whitespace

Trailing whitespace detected at line 208 within the reviewed code section. Running cargo fmt will resolve this formatting issue.

lib/runtime/src/discovery/kubernetes.rs (4)

36-44: Avoid duplicate watchers; add per-key guard and cancellation

Receiver-count heuristic races. Maintain a per-key OnceCell/flag and a CancellationToken to stop the task when last subscriber drops.

I can provide a small watcher-registry if needed.

Also applies to: 283-301


241-256: Namespace selection: use watch/list namespace consistently and allow override

You use POD_NAMESPACE for both watch and list; good. Consider allowing an explicit override via config to support cross-namespace watches when needed.


186-219: InstanceHandle no-ops: document semantics

set_ready/set_metadata are no-ops; ensure callers don’t rely on side effects. Current code handles OK. Add docs to trait impl explaining Kubernetes-driven readiness.

Also applies to: 221-239


55-83: extract_instances filters by Ready — OK; consider carrying addresses/ports

For future routing across transports, consider capturing port info from EndpointSlice ports.

lib/runtime/src/discovery/mod.rs (3)

41-53: Trait shapes look good; minor doc clarifications

Document that InstanceHandle methods are best-effort and may be no-ops depending on backend (e.g., Kubernetes).


71-79: instance_id_to_u64 collision risk acknowledged — OK

Function is fine; consider adding a namespace prefix to the hash input if future backends re-use IDs cross namespaces.


7-12: Module layout

Re-exports (legacy, kubernetes) are clear. Add cfg gating if non-Kubernetes targets build this crate.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 07cf074 and b26a8ee.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • lib/bindings/python/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (13)
  • examples/custom_backend/hello_world/client.py (1 hunks)
  • examples/custom_backend/hello_world/hello_world.py (1 hunks)
  • lib/bindings/python/rust/lib.rs (1 hunks)
  • lib/runtime/Cargo.toml (1 hunks)
  • lib/runtime/src/component.rs (5 hunks)
  • lib/runtime/src/component/client.rs (3 hunks)
  • lib/runtime/src/component/endpoint.rs (6 hunks)
  • lib/runtime/src/component/service.rs (1 hunks)
  • lib/runtime/src/config.rs (1 hunks)
  • lib/runtime/src/discovery/kubernetes.rs (1 hunks)
  • lib/runtime/src/discovery/mod.rs (1 hunks)
  • lib/runtime/src/distributed.rs (5 hunks)
  • lib/runtime/src/lib.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
lib/runtime/src/component/endpoint.rs (3)
lib/runtime/src/config.rs (1)
  • is_service_discovery_enabled (426-431)
lib/runtime/src/discovery/mod.rs (2)
  • instance_id (45-45)
  • instance_id_to_u64 (72-79)
lib/runtime/src/component.rs (6)
  • instance_handle (278-281)
  • etcd_path_with_lease_id (572-574)
  • subject (607-609)
  • etcd_path (260-263)
  • etcd_path (562-569)
  • etcd_path (712-714)
lib/runtime/src/discovery/mod.rs (4)
lib/runtime/src/component.rs (10)
  • serde_json (301-301)
  • new (685-691)
  • id (106-108)
  • id (528-534)
  • register_instance (438-449)
  • namespace (265-267)
  • namespace (703-710)
  • component (540-542)
  • component (694-700)
  • list_instances (293-311)
lib/runtime/src/component/client.rs (1)
  • serde_json (272-272)
lib/runtime/src/discovery/kubernetes.rs (7)
  • instance_id (203-205)
  • set_metadata (207-211)
  • set_ready (213-218)
  • new (25-34)
  • register_instance (223-239)
  • list_instances (241-270)
  • watch (272-302)
lib/runtime/src/discovery/legacy.rs (1)
  • new (22-27)
examples/custom_backend/hello_world/client.py (1)
lib/runtime/src/distributed.rs (2)
  • runtime (226-228)
  • namespace (245-247)
lib/runtime/src/distributed.rs (2)
lib/runtime/src/config.rs (1)
  • is_service_discovery_enabled (426-431)
lib/bindings/python/rust/lib.rs (2)
  • new (428-464)
  • new (1096-1100)
lib/runtime/src/component.rs (3)
lib/runtime/src/distributed.rs (1)
  • nats_client (286-288)
lib/runtime/src/component/service.rs (1)
  • build_nats_service (25-60)
lib/runtime/src/discovery/mod.rs (1)
  • register_instance (85-89)
lib/runtime/src/component/client.rs (4)
lib/runtime/src/component.rs (8)
  • endpoint (283-291)
  • client (620-626)
  • new (685-691)
  • namespace (265-267)
  • namespace (703-710)
  • component (540-542)
  • component (694-700)
  • service_name (251-254)
lib/runtime/src/config.rs (1)
  • is_service_discovery_enabled (426-431)
lib/runtime/src/distributed.rs (3)
  • new (51-198)
  • instance_sources (329-331)
  • namespace (245-247)
lib/runtime/src/discovery/mod.rs (2)
  • instance_id (45-45)
  • instance_id_to_u64 (72-79)
lib/bindings/python/rust/lib.rs (2)
lib/bindings/python/rust/llm/entrypoint.rs (1)
  • to_pyerr (312-317)
lib/runtime/src/config.rs (1)
  • is_service_discovery_enabled (426-431)
lib/runtime/src/discovery/kubernetes.rs (1)
lib/runtime/src/discovery/mod.rs (7)
  • new (63-68)
  • instance_id (45-45)
  • set_metadata (48-48)
  • set_ready (51-51)
  • register_instance (85-89)
  • list_instances (92-96)
  • watch (99-103)
🪛 GitHub Actions: Copyright Checks
lib/runtime/src/discovery/mod.rs

[error] 1-1: Copyright header check failed. Invalid/Missing header in lib/runtime/src/discovery/mod.rs. Step: pwsh /workspace/.github/workflows/copyright-check.ps1. Exit code: 255.

lib/runtime/src/discovery/kubernetes.rs

[error] 1-1: Copyright header check failed. Invalid/Missing header in lib/runtime/src/discovery/kubernetes.rs. Step: pwsh /workspace/.github/workflows/copyright-check.ps1. Exit code: 255.

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/3898/merge) by mohammedabdulwahhab.
lib/runtime/src/component/endpoint.rs

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.


[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

examples/custom_backend/hello_world/client.py

[error] 1-1: Black formatting check failed. Reformatted 1 file(s) during the hook.

lib/runtime/src/component/client.rs

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

lib/bindings/python/rust/lib.rs

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

lib/runtime/src/discovery/kubernetes.rs

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

🪛 GitHub Actions: Rust pre-merge checks
lib/runtime/src/component/client.rs

[error] 205-210: rustfmt formatting issue detected. Run 'cargo fmt' to fix the formatting. The diff shows the change from a single-line debug! macro to a multi-line formatted macro.

lib/bindings/python/rust/lib.rs

[error] 689-689: cargo fmt -- --check failed. Code formatting changes detected; please run 'cargo fmt' to format the file.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build and Test - dynamo
🔇 Additional comments (15)
examples/custom_backend/hello_world/hello_world.py (1)

26-26: Clarify the namespace change from "hello_world" to "test".

The namespace has been changed from "hello_world" to "test". Is this intentional for testing purposes, or should it remain as "hello_world" for the example?

lib/bindings/python/rust/lib.rs (1)

688-688: LGTM! Correct mutability change.

Changing inner from mutable to immutable is appropriate since it's not being mutated in the async block.

lib/runtime/src/component/service.rs (1)

23-23: LGTM! Appropriate visibility change.

Making SERVICE_VERSION public is reasonable for service discovery integration and external version reporting.

lib/runtime/src/lib.rs (2)

20-20: LGTM! Config module exposure is necessary.

Making the config module public allows external access to is_service_discovery_enabled() and other configuration helpers.


115-116: LGTM! Appropriate field addition for service discovery.

The service_discovery field correctly uses Option to represent the feature-gated nature of service discovery. The Arc<Box<dyn ServiceDiscovery>> pattern is appropriate for shared ownership of a trait object.

lib/runtime/src/distributed.rs (3)

8-8: LGTM! Appropriate imports for service discovery.

The imports for ServiceDiscovery trait and KubernetesServiceDiscovery implementation are correctly added.


88-89: LGTM! Service discovery initialization properly integrated.

The initialization and storage of the service discovery handle is correctly implemented with proper error propagation.

Also applies to: 103-103


314-319: LGTM! Service discovery accessor is well-designed.

The accessor correctly returns an error with a helpful message when service discovery is not enabled, guiding users to set the USE_SERVICE_DISCOVERY flag.

lib/runtime/Cargo.toml (1)

59-61: Kubernetes dependencies verified—all versions are current.

Verification confirms:

  • kube 2.0.1 is the latest released version
  • k8s-openapi 0.26.0 is the latest version
  • schemars 1.0.4 is the latest version (the Cargo.toml constraint "1" will resolve to this)

The dependency selections and feature flags are sound for the ServiceDiscovery implementation.

lib/runtime/src/component/endpoint.rs (2)

121-131: Stats subject updated to instance_id — good

Using endpoint.subject_to(instance_id) aligns subjects across server/client.


244-275: Trailing whitespace and double-cancel concerns are not applicable; error message improvement is optional

The code in lines 244–275 has no trailing whitespace. Additionally, cancel_token.cancel() is not called twice—each error path (service discovery vs. ETCD) is in a separate if/else branch, so cancellation occurs only once per execution.

The optional suggestion to add instance_id context to the service discovery error message (line 251) remains valid for debugging, though the current approach is functionally correct.

Likely an incorrect or invalid review comment.

lib/runtime/src/component/client.rs (3)

68-76: Feature-gated path selection — OK

Branching on USE_SERVICE_DISCOVERY is straightforward and keeps legacy intact.


77-90: new_dynamic_v2 looks fine; start monitoring immediately

Construction + monitor is consistent with etcd path.


404-427: Mapping discovery instances to runtime — OK; stable subject

Slug + “name-{:x}” matches server-side convention.

lib/runtime/src/component.rs (1)

437-448: Remove this review comment; proposed fix introduces a time-of-check-time-of-use race condition

The review comment misrepresents the concurrency guarantees of async_once_cell::OnceCell::get_or_init(). According to the library's design, at-most-one successful initializer is guaranteed, with the future polled to completion ensuring only one runs. Two concurrent calls cannot both execute discovery.register_instance() unless the initializing future is cancelled mid-flight—which has no evidence in the codebase.

The proposed fix is demonstrably worse. The check-then-set pattern (if get().is_some() followed by set(handle)) introduces a genuine time-of-check-time-of-use race: between the check and the set() call, another task can initialize the cell, causing set() to fail with AlreadyInitialized. The current code using get_or_init() is atomic and correct.

If cancellation is a concern in a production scenario, that requires architectural changes (e.g., atomic initialize-and-register patterns), not the proposed check-then-set workaround.

Likely an incorrect or invalid review comment.

Comment on lines +690 to +696
// Create the NATS service
inner.add_stats_service().await.map_err(to_pyerr)?;

// Feature flag: register with service discovery if enabled
if rs::config::is_service_discovery_enabled() {
inner.register_instance().await.map_err(to_pyerr)?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Service discovery integration looks correct, but fix formatting.

The conditional service discovery registration logic is sound and properly gated behind the feature flag. However, please address the pipeline failures:

  1. Run cargo fmt to fix formatting
  2. Remove trailing whitespace (likely on line 692)
🤖 Prompt for AI Agents
In lib/bindings/python/rust/lib.rs around lines 690 to 696, the service
discovery conditional is correct but formatting is off; run cargo fmt to apply
standard Rust formatting and remove any trailing whitespace (notably at line
692) so the file passes CI formatting checks.

Comment on lines +309 to +403
/// V2: Create instance source using ServiceDiscovery interface
async fn get_or_create_dynamic_instance_source_v2(
endpoint: &Endpoint,
) -> Result<Arc<InstanceSource>> {
let drt = endpoint.drt();
let instance_sources = drt.instance_sources();
let mut instance_sources = instance_sources.lock().await;

// Check if we already have a watcher for this endpoint
if let Some(instance_source) = instance_sources.get(endpoint) {
if let Some(instance_source) = instance_source.upgrade() {
return Ok(instance_source);
} else {
instance_sources.remove(endpoint);
}
}

let namespace = endpoint.component.namespace.name();
let component = endpoint.component.name();

// Get service discovery interface and set up watch for instance changes
let discovery = drt.service_discovery()?;
let mut instance_watch = discovery.watch(&namespace, component).await?;

// Watch automatically streams existing instances as ADDED events, so no need to call list_instances()
let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);

let secondary = endpoint.component.drt.runtime.secondary().clone();
let endpoint_name = endpoint.name.clone();
let namespace_clone = namespace.clone();
let component_clone = component.to_string();

// Spawn background task to process instance events
secondary.spawn(async move {
tracing::debug!("Starting ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone);

// Map instances by their discovery instance_id (string) for proper removal tracking
let mut map: HashMap<String, Instance> = HashMap::new();

loop {
let event = tokio::select! {
_ = watch_tx.closed() => {
tracing::debug!("All watchers have closed; shutting down ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone);
break;
}
event = instance_watch.recv() => {
match event {
Ok(event) => event,
Err(e) => {
tracing::debug!("Watch stream error: {}; shutting down ServiceDiscovery watcher for {}/{}", e, namespace_clone, component_clone);
break;
}
}
}
};

match event {
crate::discovery::InstanceEvent::Added(disc_instance) => {
if let Some(runtime_instance) = Self::convert_discovery_instance_to_runtime(
disc_instance.clone(),
&namespace_clone,
&component_clone,
&endpoint_name,
) {
// Use discovery instance_id as key for proper removal tracking
map.insert(disc_instance.instance_id.clone(), runtime_instance);
tracing::debug!("Added instance {}, total instances: {}", disc_instance.instance_id, map.len());
}
}
crate::discovery::InstanceEvent::Removed(instance_id) => {
if map.remove(&instance_id).is_some() {
tracing::debug!("Removed instance {}, total instances: {}", instance_id, map.len());
} else {
tracing::warn!("Attempted to remove non-existent instance: {}", instance_id);
}
}
}

let instances: Vec<Instance> = map.values().cloned().collect();

if watch_tx.send(instances).is_err() {
tracing::debug!("Unable to send watch updates; shutting down ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone);
break;
}
}

tracing::debug!("Completed ServiceDiscovery watcher for {}/{}", namespace_clone, component_clone);
let _ = watch_tx.send(vec![]);
});

let instance_source = Arc::new(InstanceSource::Dynamic(watch_rx));
instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
Ok(instance_source)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Watcher lifecycle and duplication risks

  • needs_watch uses receiver_count()==0, racing with subscribe() causing duplicate watchers.
  • No stop when all subscribers drop; watchers may leak.

Suggested adjustments:

  • Track a “watcher started” flag per key (e.g., separate HashMap<String, bool> or store JoinHandle) under the same Mutex.
  • Start watcher before returning the first receiver, or compute needs_watch before cloning/subscribing.
  • Stop watcher when receiver_count drops to 0 (maintain a cancellation token per key).
    I can draft a small watcher registry if helpful.
🤖 Prompt for AI Agents
In lib/runtime/src/component/client.rs around lines 309-403, the
ServiceDiscovery watcher can be started multiple times due to racing on
receiver_count and never stops when all subscribers drop; fix by adding a
watcher registry entry (stored in the same Mutex-protected map alongside the
Weak InstanceSource) that records either a "watcher started" flag or the
JoinHandle and a cancellation token; compute whether a watcher is needed before
cloning/subscribing and only spawn the background task if there is no registry
entry, store the cancellation token/handle in the registry, and have the spawned
task listen on that token and on watch_tx.closed() so it will exit and clean up
the registry entry when receiver_count reaches 0 (or when the token is
triggered), ensuring you remove the registry entry under the same mutex when the
watcher stops to avoid leaks and duplicate watchers.

Comment on lines 74 to 87
// Feature flag determines registration path: ServiceDiscovery (new) or ETCD lease (legacy)
let use_service_discovery = crate::config::is_service_discovery_enabled();

// Get instance_id: from ServiceDiscovery handle or ETCD lease_id
let instance_id = if use_service_discovery {
if let Ok(instance_handle) = endpoint.component.instance_handle() {
crate::discovery::instance_id_to_u64(instance_handle.instance_id())
} else {
panic!("Service discovery enabled but component not registered. Call component.register_instance() first.");
}
} else {
lease_id
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t panic when service discovery isn’t registered; try to register or return an error

Panic in library code is a reliability bug. Attempt lazy registration, else return a descriptive error.

Apply:

-        let instance_id = if use_service_discovery {
-            if let Ok(instance_handle) = endpoint.component.instance_handle() {
-                crate::discovery::instance_id_to_u64(instance_handle.instance_id())
-            } else {
-                panic!("Service discovery enabled but component not registered. Call component.register_instance() first.");
-            }
-        } else {
-            lease_id
-        };
+        let instance_id = if use_service_discovery {
+            let handle = match endpoint.component.instance_handle() {
+                Ok(h) => h,
+                Err(_) => {
+                    // try lazy registration
+                    endpoint.component.register_instance().await?;
+                    endpoint.component.instance_handle()?
+                }
+            };
+            crate::discovery::instance_id_to_u64(handle.instance_id())
+        } else {
+            lease_id
+        };

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In lib/runtime/src/component/endpoint.rs around lines 74 to 87, the code
currently panics if service discovery is enabled but the component has no
registered instance; instead, attempt lazy registration by calling
component.register_instance() (or the appropriate registration API) when
instance_handle() returns Err, capture and propagate any errors, and if
registration fails return a Result::Err with a descriptive error (do not panic).
Ensure the function signature and callers handle the Result, convert lease_id
path to the same Result flow, and include clear error context like "service
discovery enabled but failed to register instance: <error>" when returning the
error.

Comment on lines 1 to 447
use super::*;
use async_trait::async_trait;
use futures::StreamExt;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
api::{Api, ListParams},
runtime::{watcher, WatchStreamExt},
Client, ResourceExt,
};
use parking_lot::Mutex;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::broadcast;

/// Kubernetes-based implementation of ServiceDiscovery using EndpointSlices
#[derive(Clone)]
pub struct KubernetesServiceDiscovery {
client: Client,
event_senders: Arc<Mutex<HashMap<String, broadcast::Sender<InstanceEvent>>>>,
}

impl KubernetesServiceDiscovery {
/// Create a new KubernetesServiceDiscovery
pub async fn new() -> Result<Self> {
let client = Client::try_default()
.await
.map_err(|e| DiscoveryError::RegistrationError(format!("Failed to create k8s client: {}", e)))?;

Ok(Self {
client,
event_senders: Arc::new(Mutex::new(HashMap::new())),
})
}

/// Get or create an event sender for a specific namespace/component pair
fn get_or_create_sender(&self, key: String) -> broadcast::Sender<InstanceEvent> {
let mut senders = self.event_senders.lock();
senders.entry(key).or_insert_with(|| {
let (tx, _) = broadcast::channel(100);
tx
}).clone()
}

/// Build label selector for namespace and component
fn build_label_selector(namespace: &str, component: &str) -> String {
format!("dynamo.namespace={},dynamo.component={}", namespace, component)
}

/// Watch key for a namespace/component pair
fn watch_key(namespace: &str, component: &str) -> String {
format!("{}:{}", namespace, component)
}

/// Extract pod names and their ready status from EndpointSlices
fn extract_instances(endpoint_slices: &[EndpointSlice]) -> Vec<Instance> {
let mut instances = Vec::new();

for slice in endpoint_slices {
for endpoint in &slice.endpoints {
// Only include ready endpoints
if let Some(conditions) = &endpoint.conditions {
if conditions.ready != Some(true) {
continue;
}
}

// Extract pod name from targetRef
if let Some(target_ref) = &endpoint.target_ref {
if target_ref.kind.as_deref() == Some("Pod") {
if let Some(pod_name) = &target_ref.name {
instances.push(Instance::new(
pod_name.clone(),
Value::Null,
));
}
}
}
}
}

instances
}

/// Start watching EndpointSlices for a specific namespace/component
fn start_watch(&self, namespace: &str, component: &str, watch_namespace: &str) {
let client = self.client.clone();
let label_selector = Self::build_label_selector(namespace, component);
let event_tx = self.get_or_create_sender(Self::watch_key(namespace, component));
let watch_namespace = watch_namespace.to_string();
let namespace_copy = namespace.to_string();
let component_copy = component.to_string();

tokio::spawn(async move {
println!(
"[K8s Discovery] Starting EndpointSlice watch: namespace={}, component={}, k8s_namespace={}, labels={}",
namespace_copy, component_copy, watch_namespace, label_selector
);
let api: Api<EndpointSlice> = Api::namespaced(client, &watch_namespace);
let watch_config = watcher::Config::default()
.labels(&label_selector);

let mut stream = watcher(api, watch_config)
.applied_objects()
.boxed();

// Track known ready instances across all slices
// Key: pod name, Value: slice name (for tracking which slice it came from)
let mut known_ready: HashMap<String, String> = HashMap::new();
// Track current state of all slices
let mut slice_instances: HashMap<String, HashSet<String>> = HashMap::new();

while let Some(result) = stream.next().await {
match result {
Ok(endpoint_slice) => {
let slice_name = endpoint_slice.name_any();

// Extract ready instances from this slice
let mut slice_ready = HashSet::new();

for endpoint in &endpoint_slice.endpoints {
// Check if endpoint is ready
let is_ready = endpoint.conditions
.as_ref()
.and_then(|c| c.ready)
.unwrap_or(false);

if is_ready {
if let Some(target_ref) = &endpoint.target_ref {
if target_ref.kind.as_deref() == Some("Pod") {
if let Some(pod_name) = &target_ref.name {
slice_ready.insert(pod_name.clone());
}
}
}
}
}

// Update slice_instances map
slice_instances.insert(slice_name.clone(), slice_ready);

// Rebuild the complete set of ready instances across all slices
// TODO: First pass, entire set of instances is rebuilt across Dynamo.
let mut current_ready: HashMap<String, String> = HashMap::new();
for (slice, pods) in &slice_instances {
for pod in pods {
current_ready.insert(pod.clone(), slice.clone());
}
}

// Find newly ready instances (Added events)
for pod_name in current_ready.keys() {
if !known_ready.contains_key(pod_name) {
println!(
"[K8s Discovery] ✅ Instance ADDED: pod_name={}, slice={}",
pod_name, slice_name
);
let instance = Instance::new(pod_name.clone(), Value::Null);
let _ = event_tx.send(InstanceEvent::Added(instance));
}
}

// Find no-longer-ready instances (Removed events)
for pod_name in known_ready.keys() {
if !current_ready.contains_key(pod_name) {
println!(
"[K8s Discovery] ❌ Instance REMOVED: pod_name={}",
pod_name
);
let _ = event_tx.send(InstanceEvent::Removed(pod_name.clone()));
}
}

known_ready = current_ready;
}
Err(e) => {
eprintln!("[K8s Discovery] ⚠️ Error watching EndpointSlices: {}", e);
// Continue watching despite errors
}
}
}
});
}
}

/// Handle for a Kubernetes-registered instance
pub struct KubernetesInstanceHandle {
instance_id: String,
}

impl KubernetesInstanceHandle {
/// Read pod name from environment variable
fn read_pod_name() -> Result<String> {
std::env::var("POD_NAME")
.map_err(|_| DiscoveryError::RegistrationError(
"POD_NAME environment variable not set".to_string()
))
}
}

#[async_trait]
impl InstanceHandle for KubernetesInstanceHandle {
fn instance_id(&self) -> &str {
&self.instance_id
}

async fn set_metadata(&self, _metadata: Value) -> Result<()> {
// Metadata changes are not supported in this implementation
// The Kubernetes operator manages the pod metadata
Ok(())
}

async fn set_ready(&self, _status: InstanceStatus) -> Result<()> {
// Readiness is controlled by Kubernetes readiness probes
// The operator and pod's readiness probe determine the actual status
// This is a no-op as the pod's readiness is reflected in EndpointSlices
Ok(())
}
}

#[async_trait]
impl ServiceDiscovery for KubernetesServiceDiscovery {
async fn register_instance(
&self,
namespace: &str,
component: &str,
) -> Result<Box<dyn InstanceHandle>> {
// Read pod name from environment
let instance_id = KubernetesInstanceHandle::read_pod_name()?;

println!(
"[K8s Discovery] 📝 Registered instance: namespace={}, component={}, pod_name={}",
namespace, component, instance_id
);

Ok(Box::new(KubernetesInstanceHandle {
instance_id,
}))
}

async fn list_instances(
&self,
namespace: &str,
component: &str,
) -> Result<Vec<Instance>> {
// Query all EndpointSlices with matching labels
let label_selector = Self::build_label_selector(namespace, component);

// Get the current namespace from env var, or use "default"
let current_namespace = std::env::var("POD_NAMESPACE")
.unwrap_or_else(|_| "default".to_string());

let api: Api<EndpointSlice> = Api::namespaced(self.client.clone(), &current_namespace);
let lp = ListParams::default().labels(&label_selector);

let slices = api.list(&lp).await
.map_err(|e| DiscoveryError::MetadataError(format!("Failed to list EndpointSlices: {}", e)))?;

let instances = Self::extract_instances(&slices.items);

println!(
"[K8s Discovery] 📋 Listed {} instances: namespace={}, component={}, pods={:?}",
instances.len(),
namespace,
component,
instances.iter().map(|i| &i.instance_id).collect::<Vec<_>>()
);

Ok(instances)
}

async fn watch(
&self,
namespace: &str,
component: &str,
) -> Result<broadcast::Receiver<InstanceEvent>> {
let key = Self::watch_key(namespace, component);

// Get or create event sender for this namespace/component
let event_tx = self.get_or_create_sender(key.clone());

// Check if we need to start a watcher
let needs_watch = {
let senders = self.event_senders.lock();
senders.get(&key).map(|tx| tx.receiver_count()).unwrap_or(0) == 0
};

if needs_watch {
// Get the current namespace from env var, or use "default"
let watch_namespace = std::env::var("POD_NAMESPACE")
.unwrap_or_else(|_| "default".to_string());

println!(
"[K8s Discovery] 👀 Starting new EndpointSlice watcher: namespace={}, component={}",
namespace, component
);

self.start_watch(namespace, component, &watch_namespace);
}

Ok(event_tx.subscribe())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[test]
fn test_build_label_selector() {
let selector = KubernetesServiceDiscovery::build_label_selector("my-ns", "my-comp");
assert_eq!(selector, "dynamo.namespace=my-ns,dynamo.component=my-comp");
}

#[test]
fn test_watch_key() {
let key = KubernetesServiceDiscovery::watch_key("ns1", "comp1");
assert_eq!(key, "ns1:comp1");
}

/// Integration test for Kubernetes service discovery
///
/// Prerequisites:
/// 1. Have a Kubernetes cluster accessible via kubectl
/// 2. Set KUBECONFIG environment variable to point to your kubeconfig file:
/// export KUBECONFIG=/path/to/your/kubeconfig
/// 3. Set POD_NAMESPACE environment variable (defaults to "default"):
/// export POD_NAMESPACE=default
/// 4. Create EndpointSlices in your cluster with the following labels:
/// dynamo.namespace=test
/// dynamo.component=worker
///
/// Example EndpointSlice creation (see kubernetes/endpoint-slice-test.yaml):
/// kubectl apply -f kubernetes/endpoint-slice-test.yaml
///
/// Run this test with:
/// cargo test --package dynamo-runtime test_kubernetes_discovery -- --ignored --nocapture
#[tokio::test]
#[ignore] // Ignore by default since it requires a running cluster
async fn test_kubernetes_discovery_list_and_watch() {
// Initialize tracing for debugging
let _ = tracing_subscriber::fmt()
.with_env_filter("debug")
.try_init();

// Create discovery client
let discovery = KubernetesServiceDiscovery::new()
.await
.expect("Failed to create Kubernetes discovery client. Make sure KUBECONFIG is set.");

println!("✓ Successfully connected to Kubernetes cluster");

// Test list_instances
println!("\n--- Testing list_instances ---");
let instances = discovery
.list_instances("test", "worker")
.await
.expect("Failed to list instances");

println!("Found {} instances:", instances.len());
for instance in &instances {
println!(" - {}", instance.instance_id);
}

// Test watch
println!("\n--- Testing watch ---");
let mut watch = discovery
.watch("test", "worker")
.await
.expect("Failed to create watch");

println!("Watching for changes (will wait 30 seconds)...");
println!("Now you can:");
println!(" 1. Scale up/down your deployment: kubectl scale deployment <name> --replicas=N");
println!(" 2. Delete pods: kubectl delete pod <pod-name>");
println!(" 3. Create new EndpointSlices with matching labels");

// Wait for events for 30 seconds
let timeout = Duration::from_secs(30);
let start = tokio::time::Instant::now();

let mut event_count = 0;
while start.elapsed() < timeout {
match tokio::time::timeout(Duration::from_secs(1), watch.recv()).await {
Ok(Ok(event)) => {
event_count += 1;
match event {
InstanceEvent::Added(instance) => {
println!(" [ADDED] Instance: {}", instance.instance_id);
}
InstanceEvent::Removed(instance_id) => {
println!(" [REMOVED] Instance: {}", instance_id);
}
}
}
Ok(Err(e)) => {
println!(" Watch error: {:?}", e);
break;
}
Err(_) => {
// Timeout - no event received, continue waiting
}
}
}

println!("\n--- Test Summary ---");
println!("Total events received: {}", event_count);

// Re-list to see final state
let final_instances = discovery
.list_instances("test", "worker")
.await
.expect("Failed to list instances");

println!("Final instance count: {}", final_instances.len());
for instance in &final_instances {
println!(" - {}", instance.instance_id);
}
}

/// Quick smoke test to verify connection to cluster
/// Run with: cargo test --package dynamo-runtime test_kubernetes_connection -- --ignored
#[tokio::test]
#[ignore]
async fn test_kubernetes_connection() {
let discovery = KubernetesServiceDiscovery::new()
.await
.expect("Failed to create Kubernetes discovery client");

// Just try to list - even if there are no results, connection works
let result = discovery.list_instances("test", "worker").await;

match result {
Ok(instances) => {
println!("✓ Connected successfully! Found {} instances", instances.len());
for instance in &instances {
println!(" - {}", instance.instance_id);
}
}
Err(e) => {
panic!("Failed to list instances: {:?}", e);
}
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Trailing whitespace check failing

Run pre-commit hooks to fix trailing whitespace across this file.


🏁 Script executed:

#!/bin/bash
rg -nP '\s+$' lib/runtime/src/discovery/kubernetes.rs

Length of output: 635


Remove trailing whitespace across the file

Trailing whitespace is detected at 40+ locations (lines 29, 58, 67, 81, 93, 102, 106, 112, 117, 120, 127, 138, 141, 150, 162, 173, 230, 235, 248, 252, 255, 258, 260, 268, 278, 281, 287, 292, 297, 300, 323, 333, 336, 409, 415, 430, 433). Run pre-commit hooks or your linter to clean this up before merging.

🧰 Tools
🪛 GitHub Actions: Copyright Checks

[error] 1-1: Copyright header check failed. Invalid/Missing header in lib/runtime/src/discovery/kubernetes.rs. Step: pwsh /workspace/.github/workflows/copyright-check.ps1. Exit code: 255.

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/3898/merge) by mohammedabdulwahhab.

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

🤖 Prompt for AI Agents
lib/runtime/src/discovery/kubernetes.rs lines 1-447: the file contains trailing
whitespace on many lines (e.g. 29, 58, 67, 81, … 433); remove all trailing
spaces at the end of lines, re-run your pre-commit hooks/linter (or run a bulk
cleanup like strip trailing whitespace in your editor or `git ls-files -z |
xargs -0 sed -i 's/[ \t]*$//'`), run `cargo fmt` (or your repo's formatter),
verify no trailing whitespace remains with `git diff --check` and commit the
cleaned file.

@@ -0,0 +1,447 @@
use super::*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add SPDX header to satisfy copyright check

Required by pipeline.

Apply:

+// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use super::*;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::*;
🧰 Tools
🪛 GitHub Actions: Copyright Checks

[error] 1-1: Copyright header check failed. Invalid/Missing header in lib/runtime/src/discovery/kubernetes.rs. Step: pwsh /workspace/.github/workflows/copyright-check.ps1. Exit code: 255.

🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/3898/merge) by mohammedabdulwahhab.

[error] 1-1: Trailing whitespace hook modified files. Please re-run pre-commit to ensure clean formatting.

🤖 Prompt for AI Agents
In lib/runtime/src/discovery/kubernetes.rs at line 1, add the SPDX license
header as a Rust line comment at the very top of the file (e.g. //
SPDX-License-Identifier: <project-license>) using the project's canonical SPDX
identifier (replace <project-license> with the repo's license such as Apache-2.0
or MIT) so the pipeline copyright check passes.

Comment on lines 95 to 101
println!(
"[K8s Discovery] Starting EndpointSlice watch: namespace={}, component={}, k8s_namespace={}, labels={}",
namespace_copy, component_copy, watch_namespace, label_selector
);
let api: Api<EndpointSlice> = Api::namespaced(client, &watch_namespace);
let watch_config = watcher::Config::default()
.labels(&label_selector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use tracing macros instead of println!/eprintln!

Adopt tracing::{debug,info,warn,error} for consistency and structured logs.

Example:

- println!( "...", ... );
+ tracing::info!(namespace=%namespace_copy, component=%component_copy, k8s_namespace=%watch_namespace, labels=%label_selector, "[K8s Discovery] Starting EndpointSlice watch");

Apply similarly to all println!/eprintln!.

Also applies to: 231-239, 261-269, 293-301

🤖 Prompt for AI Agents
In lib/runtime/src/discovery/kubernetes.rs around lines 95 to 101 (and similarly
at 231-239, 261-269, 293-301), replace println!/eprintln! calls with the
appropriate tracing macros (tracing::info, tracing::debug, tracing::warn,
tracing::error) to produce structured logs; import tracing prelude if necessary,
convert the message and arguments to tracing macro usage (e.g.,
tracing::info!("... namespace={}, component={}, ...", namespace_copy,
component_copy, ...)), and ensure log levels match the original intent of each
print call.

Comment on lines 103 to 181
let mut stream = watcher(api, watch_config)
.applied_objects()
.boxed();

// Track known ready instances across all slices
// Key: pod name, Value: slice name (for tracking which slice it came from)
let mut known_ready: HashMap<String, String> = HashMap::new();
// Track current state of all slices
let mut slice_instances: HashMap<String, HashSet<String>> = HashMap::new();

while let Some(result) = stream.next().await {
match result {
Ok(endpoint_slice) => {
let slice_name = endpoint_slice.name_any();

// Extract ready instances from this slice
let mut slice_ready = HashSet::new();

for endpoint in &endpoint_slice.endpoints {
// Check if endpoint is ready
let is_ready = endpoint.conditions
.as_ref()
.and_then(|c| c.ready)
.unwrap_or(false);

if is_ready {
if let Some(target_ref) = &endpoint.target_ref {
if target_ref.kind.as_deref() == Some("Pod") {
if let Some(pod_name) = &target_ref.name {
slice_ready.insert(pod_name.clone());
}
}
}
}
}

// Update slice_instances map
slice_instances.insert(slice_name.clone(), slice_ready);

// Rebuild the complete set of ready instances across all slices
// TODO: First pass, entire set of instances is rebuilt across Dynamo.
let mut current_ready: HashMap<String, String> = HashMap::new();
for (slice, pods) in &slice_instances {
for pod in pods {
current_ready.insert(pod.clone(), slice.clone());
}
}

// Find newly ready instances (Added events)
for pod_name in current_ready.keys() {
if !known_ready.contains_key(pod_name) {
println!(
"[K8s Discovery] ✅ Instance ADDED: pod_name={}, slice={}",
pod_name, slice_name
);
let instance = Instance::new(pod_name.clone(), Value::Null);
let _ = event_tx.send(InstanceEvent::Added(instance));
}
}

// Find no-longer-ready instances (Removed events)
for pod_name in known_ready.keys() {
if !current_ready.contains_key(pod_name) {
println!(
"[K8s Discovery] ❌ Instance REMOVED: pod_name={}",
pod_name
);
let _ = event_tx.send(InstanceEvent::Removed(pod_name.clone()));
}
}

known_ready = current_ready;
}
Err(e) => {
eprintln!("[K8s Discovery] ⚠️ Error watching EndpointSlices: {}", e);
// Continue watching despite errors
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Watcher ignores Deleted events; removals from slice_instances aren’t handled

Using applied_objects() misses deletions, so REMOVED events won’t fire when slices are deleted. Track watcher::Event::{Applied, Deleted, Restarted}.

Apply (core loop sketch):

- let mut stream = watcher(api, watch_config).applied_objects().boxed();
+ let mut stream = watcher(api, watch_config).boxed();

- while let Some(result) = stream.next().await {
-     match result {
-         Ok(endpoint_slice) => {
+ while let Some(event) = stream.next().await {
+     match event {
+         Ok(watcher::Event::Applied(endpoint_slice)) => {
              // build slice_ready, update slice_instances[slice_name] = set
           ...
-         }
-         Err(e) => { ... }
+         }
+         Ok(watcher::Event::Deleted(endpoint_slice)) => {
+             let slice_name = endpoint_slice.name_any();
+             slice_instances.remove(&slice_name);
+             // recompute current_ready and emit Removed for now-missing pods
+             ...
+         }
+         Ok(watcher::Event::Restarted(slices)) => {
+             slice_instances.clear();
+             for s in slices { /* recompute per-slice sets */ }
+         }
+         Err(e) => { tracing::warn!(error=%e, "EndpointSlice watch error"); }
      }
   }

Also replace known_ready/current_ready diffing accordingly.

Committable suggestion skipped: line range outside the PR's diff.

@@ -0,0 +1,104 @@
use async_trait::async_trait;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add SPDX header

Pipeline fails due to missing header.

Apply:

+// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+// SPDX-License-Identifier: Apache-2.0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use async_trait::async_trait;
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
🧰 Tools
🪛 GitHub Actions: Copyright Checks

[error] 1-1: Copyright header check failed. Invalid/Missing header in lib/runtime/src/discovery/mod.rs. Step: pwsh /workspace/.github/workflows/copyright-check.ps1. Exit code: 255.

🤖 Prompt for AI Agents
In lib/runtime/src/discovery/mod.rs around line 1, the file is missing an SPDX
license header which breaks the pipeline; add a single-line SPDX header as the
very first line (for example: // SPDX-License-Identifier: MIT) and ensure a
newline follows it so the header is the top-most line of the file.

inner.add_stats_service().await.map_err(to_pyerr)?;

// Feature flag: register with service discovery if enabled
if rs::config::is_service_discovery_enabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't service discovery always enabled? Registration currently happens in register_llm binding.


/// Create the appropriate ServiceDiscovery implementation based on environment.
/// Only initializes if USE_SERVICE_DISCOVERY feature flag is enabled.
async fn create_service_discovery() -> Result<Option<Arc<Box<dyn ServiceDiscovery>>>> {
Copy link
Contributor

@grahamking grahamking Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never need both Arc and Box. They are both pointers, Arc is a shared pointer and Box a unique pointer.

let discovery: Box<dyn ServiceDiscovery> =
Box::new(KubernetesServiceDiscovery::new().await?);
Ok(Some(Arc::new(discovery)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand.

Service discovery happens right now via etcd. The idea here is to optionally allowing use Kubernetes. Why does it say only Kubernetes is supported?

/// Wrapped in Arc<OnceCell> so it can be set once and shared across all clones
#[builder(default = "Arc::new(async_once_cell::OnceCell::new())")]
#[educe(Debug(ignore))]
instance_handle: Arc<async_once_cell::OnceCell<Box<dyn crate::discovery::InstanceHandle>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Instance in Dynamo-speak is a specific worker. You have many instances handling an endpoint. Can you make this name more specific?

@grahamking
Copy link
Contributor

I think this is an interesting start. There's too much going on for a single PR though.

Could you split it into three PRs like this?

  1. Just the new service discovery interface, with maybe a mock implementation and a test. I'm picturing the interface as a property of DistributedRuntime, but maybe there's somewhere better.
  2. KeyValueStoreManager behind the new service discovery interface, code now uses that service discovery interface instead of direct etcd. It goes DistributedRuntime -> KeyValueStoreManager (drt has a store() to get it) -> etcd.
  3. Kubernetes using the service discovery interface.

@mohammedabdulwahhab
Copy link
Contributor Author

Makes sense @grahamking . Let me break it down as you mentioned.

@mohammedabdulwahhab
Copy link
Contributor Author

Closing in favor of modular, more-focused PRs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants