feat(grpc): implement continuous Watch streaming for health servicers#917
feat(grpc): implement continuous Watch streaming for health servicers#917V2arK wants to merge 9 commits intolightseekorg:mainfrom
Conversation
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
TDD red phase: 7 tests for SGLangHealthServicer.Watch() continuous streaming. 5 fail against current single-yield implementation. Adds sglang MagicMock stubs to conftest to allow collection without a full SGLang installation. Signed-off-by: Honglin Zhu <honglin@nvidia.com> Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
TDD red phase: 7 tests for VllmHealthServicer.Watch() continuous streaming. 3 fail (exits_on_shutdown, engine_failure, no_duplicate) as expected; 4 pass against current single-yield stub. Also adds vllm module stubs to conftest so tests collect without vLLM installed. Signed-off-by: Honglin <honglin@nvidia.com> Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
_notify_shutdown() now also sets self._watch_notified_shutdown = True so subclasses can detect explicit shutdown (via set_not_serving()) in _is_shutting_down() independently of their engine-specific flags. Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
📝 WalkthroughWalkthroughThis PR updates the grpc-servicer package to version 0.6.0, introducing a new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a shared HealthWatchMixin to implement the gRPC Health Checking Protocol's Watch RPC for both SGLang and vLLM inference engines. The mixin provides a continuous streaming response that updates clients on health status changes or server shutdown. The PR also includes a version bump to 0.6.0, the addition of test-specific dependencies in pyproject.toml, and a comprehensive suite of unit tests. Feedback was provided regarding the use of inspect.isawaitable() for more robust detection of asynchronous results in the mixin's status resolution logic.
| async def _resolve_watch_status(self, service_name: str) -> int: | ||
| """Call _compute_watch_status, handling both sync and async impls.""" | ||
| result = self._compute_watch_status(service_name) | ||
| if asyncio.iscoroutine(result): |
There was a problem hiding this comment.
For more robust detection of awaitable results from _compute_watch_status, it's better to use inspect.isawaitable() instead of asyncio.iscoroutine(). isawaitable() is more general and correctly handles not just coroutines from async def functions, but also other awaitable objects like asyncio.Future or custom objects with an __await__ method. This makes the mixin more resilient to different implementation patterns in subclasses.
You'll need to add import inspect at the top of the file.
| if asyncio.iscoroutine(result): | |
| if inspect.isawaitable(result): |
@coderabbitai review |
|
🧠 Learnings used✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 012e5f6e16
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| self._watch_shutdown_event.wait(), | ||
| timeout=self.WATCH_POLL_INTERVAL_S, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio.TimeoutError in Watch poll loop
Watch() currently catches built-in TimeoutError, but on Python 3.10 (which is supported via requires-python >=3.10) asyncio.wait_for() raises asyncio.TimeoutError instead. When a stream is healthy and no shutdown event occurs for one poll interval, that timeout escapes the loop and aborts the RPC rather than continuing to poll, so long-lived watch streams terminate unexpectedly in normal operation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/health_servicer.py`:
- Around line 149-176: Extract the hard-coded 30s timeout into a class-level
constant (e.g., SCHEDULER_RESPONSIVENESS_TIMEOUT_S) and replace the literal 30
in both _compute_watch_status and Check with
self.SCHEDULER_RESPONSIVENESS_TIMEOUT_S; add the constant to the class
definition, update the time_since comparison in _compute_watch_status and the
corresponding check in Check() to use that constant, and ensure any tests or
other methods referencing the 30s behavior use the new constant name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 033c0e2b-a6dd-400c-b263-c6c6868ccb12
📒 Files selected for processing (8)
grpc_servicer/pyproject.tomlgrpc_servicer/smg_grpc_servicer/health_watch.pygrpc_servicer/smg_grpc_servicer/sglang/health_servicer.pygrpc_servicer/smg_grpc_servicer/vllm/health_servicer.pygrpc_servicer/tests/__init__.pygrpc_servicer/tests/conftest.pygrpc_servicer/tests/test_sglang_health_watch.pygrpc_servicer/tests/test_vllm_health_watch.py
| def _is_shutting_down(self) -> bool: | ||
| # _watch_notified_shutdown is set by _notify_shutdown() in set_not_serving(); | ||
| # gracefully_exit covers external shutdown from the request manager. | ||
| return self.request_manager.gracefully_exit or self._watch_notified_shutdown | ||
|
|
||
| Yields: | ||
| HealthCheckResponse messages when status changes | ||
| """ | ||
| service_name = request.service | ||
| logger.debug(f"Health watch request for service: '{service_name}'") | ||
| def _compute_watch_status(self, service_name: str) -> int: | ||
| """Sync status computation -- no I/O needed.""" | ||
| if self.request_manager.gracefully_exit: | ||
| return NOT_SERVING | ||
|
|
||
| # Send current status | ||
| response = await self.Check(request, context) | ||
| yield response | ||
| if service_name == self.OVERALL_SERVER: | ||
| return self._serving_status.get(self.OVERALL_SERVER, NOT_SERVING) | ||
|
|
||
| if service_name == self.SGLANG_SERVICE: | ||
| base_status = self._serving_status.get(self.SGLANG_SERVICE, NOT_SERVING) | ||
| if base_status != SERVING: | ||
| return base_status | ||
| time_since = time.time() - self.request_manager.last_receive_tstamp | ||
| if time_since > 30 and len(self.request_manager.rid_to_state) > 0: | ||
| logger.warning( | ||
| "Scheduler not responsive (%.1fs, %d pending)", | ||
| time_since, | ||
| len(self.request_manager.rid_to_state), | ||
| ) | ||
| return NOT_SERVING | ||
| return SERVING | ||
|
|
||
| # Note: Full Watch implementation would monitor status changes | ||
| # and stream updates. For K8s probes, Check is sufficient. | ||
| return SERVICE_UNKNOWN |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
LGTM with minor note.
The implementations are correct:
_is_shutting_down()appropriately checks bothgracefully_exitand_watch_notified_shutdownto handle external shutdown signals_compute_watch_status()correctly mirrorsCheck()logic as a sync method
💡 Consider extracting the magic number 30 as a constant
The 30-second scheduler responsiveness timeout is duplicated between Check() (line 127) and _compute_watch_status() (line 167). Consider extracting this as a class constant for maintainability:
+ SCHEDULER_RESPONSIVENESS_TIMEOUT_S = 30
+
# Service names we support
OVERALL_SERVER = ""Then use self.SCHEDULER_RESPONSIVENESS_TIMEOUT_S in both methods.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/smg_grpc_servicer/sglang/health_servicer.py` around lines 149 -
176, Extract the hard-coded 30s timeout into a class-level constant (e.g.,
SCHEDULER_RESPONSIVENESS_TIMEOUT_S) and replace the literal 30 in both
_compute_watch_status and Check with self.SCHEDULER_RESPONSIVENESS_TIMEOUT_S;
add the constant to the class definition, update the time_since comparison in
_compute_watch_status and the corresponding check in Check() to use that
constant, and ensure any tests or other methods referencing the 30s behavior use
the new constant name.
Description
Problem
SGLangHealthServicer.Watch()andVllmHealthServicer.Watch()yield a single response then close the stream. This violates the gRPC Health Checking Protocol, which requires Watch to be a long-lived server-streaming RPC that sends updates whenever the service's health status changes.Additionally,
SGLangHealthServicer.Watch()delegates toself.Check(), which callscontext.set_code(NOT_FOUND)andcontext.set_details()for unknown services, polluting the streaming response context.Follow-up from #885. Ref: vllm-project/vllm#38016.
Solution
Add
HealthWatchMixinproviding the Watch loop skeleton (poll +asyncio.Eventfor immediate shutdown wakeup, yield-on-change, cancel handling). Both servicers integrate the mixin and implement_compute_watch_status()and_is_shutting_down().await async_llm.check_health())The mixin's
_resolve_watch_status()bridge method auto-detects sync vs async implementations viaasyncio.iscoroutine(), so each servicer uses its natural calling convention.Spec deviation: for unknown services, the stream sends
SERVICE_UNKNOWNonce then exits (spec says keep open for dynamic registration, but smg services are statically defined).Test Plan
Unit tests: 14/14 passed (macOS + x86 Linux)
vLLM E2E Watch deferred -- requires vllm-project/vllm#38016 to register
grpc.health.v1.Healthin the gRPC server.Checklist
cargo +nightly fmtpasses (no Rust changes)cargo clippy --all-targets --all-features -- -D warningspasses (no Rust changes)Summary by CodeRabbit
Chores
New Features
Tests