Skip to content

Conversation

StarJourneyMingsing
Copy link

@StarJourneyMingsing StarJourneyMingsing commented Sep 13, 2025

Switch Dapr health check from blocking call to async call to avoid blocking the event loop in async environments

Description

This PR addresses a critical issue in the Dapr Python SDK's async pubsub subscription functionality where the gRPC stream fails to close properly during reconnection, causing the first response after reconnection to be lost. The root cause is the synchronous health check blocking the async event loop, which prevents the _StreamRequestMixin._done_writing() method from executing properly.

Details:

  1. Stream Closing Process: The close() method calls self._stream.cancel(), which triggers the _StreamRequestMixin._consume_request_iterator() to complete and call await self._done_writing().
  2. Critical Issue: The _done_writing() method in _StreamRequestMixin (line 518 in grpc/aio/call.py) calls await self._cython_call.send_receive_close() to properly close the gRPC stream. However, when DaprHealth.wait_until_ready() blocks the event loop synchronously, this crucial cleanup operation cannot complete.
  3. Stream State Corruption: Without proper stream closure via send_receive_close(), the gRPC stream remains in an inconsistent state. When a new stream is created after reconnection, the first response from the outgoing_request_iterator() (line 41 in subscription.py) cannot be sent because the previous stream's cleanup was incomplete.
class _StreamRequestMixin(Call):
    _metadata_sent: asyncio.Event
    _done_writing_flag: bool
    _async_request_poller: Optional[asyncio.Task]
    _request_style: _APIStyle

    def _init_stream_request_mixin(
        self, request_iterator: Optional[RequestIterableType]
    ):
        self._metadata_sent = asyncio.Event()
        self._done_writing_flag = False

        # If user passes in an async iterator, create a consumer Task.
        if request_iterator is not None:
            self._async_request_poller = self._loop.create_task(
                self._consume_request_iterator(request_iterator)
            )
            self._request_style = _APIStyle.ASYNC_GENERATOR
        else:
            self._async_request_poller = None
            self._request_style = _APIStyle.READER_WRITER

    def _raise_for_different_style(self, style: _APIStyle):
        if self._request_style is not style:
            raise cygrpc.UsageError(_API_STYLE_ERROR)

    def cancel(self) -> bool:
        if super().cancel():
            if self._async_request_poller is not None:
                self._async_request_poller.cancel()
            return True
        else:
            return False

    def _metadata_sent_observer(self):
        self._metadata_sent.set()

    async def _consume_request_iterator(
        self, request_iterator: RequestIterableType
    ) -> None:
        try:
            if inspect.isasyncgen(request_iterator) or hasattr(
                request_iterator, "__aiter__"
            ):
                async for request in request_iterator:
                    try:
                        await self._write(request)
                    except AioRpcError as rpc_error:
                        _LOGGER.debug(
                            (
                                "Exception while consuming the"
                                " request_iterator: %s"
                            ),
                            rpc_error,
                        )
                        return
            else:
                for request in request_iterator:
                    try:
                        await self._write(request)
                    except AioRpcError as rpc_error:
                        _LOGGER.debug(
                            (
                                "Exception while consuming the"
                                " request_iterator: %s"
                            ),
                            rpc_error,
                        )
                        return

            await self._done_writing()
        except:  # pylint: disable=bare-except
            # Client iterators can raise exceptions, which we should handle by
            # cancelling the RPC and logging the client's error. No exceptions
            # should escape this function.
            _LOGGER.debug(
                "Client request_iterator raised exception:\n%s",
                traceback.format_exc(),
            )
            self.cancel()

    async def _write(self, request: RequestType) -> None:
        if self.done():
            raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
        if self._done_writing_flag:
            raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
        if not self._metadata_sent.is_set():
            await self._metadata_sent.wait()
            if self.done():
                await self._raise_for_status()

        serialized_request = _common.serialize(
            request, self._request_serializer
        )
        try:
            await self._cython_call.send_serialized_message(serialized_request)
        except cygrpc.InternalError as err:
            self._cython_call.set_internal_error(str(err))
            await self._raise_for_status()
        except asyncio.CancelledError:
            if not self.cancelled():
                self.cancel()
            raise

    async def _done_writing(self) -> None:
        if self.done():
            # If the RPC is finished, do nothing.
            return
        if not self._done_writing_flag:
            # If the done writing is not sent before, try to send it.
            self._done_writing_flag = True
            try:
                await self._cython_call.send_receive_close()
            except asyncio.CancelledError:
                if not self.cancelled():
                    self.cancel()
                raise

    async def write(self, request: RequestType) -> None:
        self._raise_for_different_style(_APIStyle.READER_WRITER)
        await self._write(request)

    async def done_writing(self) -> None:
        """Signal peer that client is done writing.

        This method is idempotent.
        """
        self._raise_for_different_style(_APIStyle.READER_WRITER)
        await self._done_writing()

    async def wait_for_connection(self) -> None:
        await self._metadata_sent.wait()
        if self.done():
            await self._raise_for_status()

Solution:

The fix makes the health check asynchronous by using loop.run_in_executor(), which:

  1. Prevents blocking the async event loop during health check
  2. Ensures the gRPC stream is properly closed before creating a new one

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #[issue number]

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

@StarJourneyMingsing StarJourneyMingsing requested review from a team as code owners September 13, 2025 09:06
@StarJourneyMingsing StarJourneyMingsing changed the title wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the event loop in async environments Sep 13, 2025
@StarJourneyMingsing StarJourneyMingsing changed the title wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the event loop in async environments wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the event loop Sep 13, 2025
@StarJourneyMingsing StarJourneyMingsing changed the title wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the event loop wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the aio grpc stream closing properly Sep 13, 2025
@StarJourneyMingsing StarJourneyMingsing changed the title wait for Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking the aio grpc stream closing properly Wait for the Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking, ensuring the asyncio gRPC stream can close properly. Sep 13, 2025
mingsing added 4 commits October 14, 2025 19:19
Switch Dapr health check from blocking call to async call to avoid blocking the event loop in async environments

Signed-off-by: mingsing <[email protected]>
Signed-off-by: mingsing <[email protected]>
Signed-off-by: mingsing <[email protected]>
Signed-off-by: mingsing <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants