Skip to content

bug: async reader sometime stops reading messages without any errors. #538

Open
@vgvoleg

Description

@vgvoleg

Bug Report

YDB Python SDK version:

latest

Environment

linux

Current behavior:

Examples of errors in logs:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/grpc/aio/_call.py", line 452, in _consume_request_iterator
    await self._write(request)
  File "/usr/local/lib/python3.10/dist-packages/grpc/aio/_call.py", line 489, in _write
    raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
asyncio.exceptions.InvalidStateError: RPC already finished.
Exception while consuming the request_iterator: <AioRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Internal error from Core"
	debug_error_string = "Failed "execute_batch": (<grpc._cython.cygrpc.SendMessageOperation object at 0x7f35b0e77a30>,)"
>
Exception while consuming the request_iterator: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "connections to all backends failing; last error: UNKNOWN: ipv6:%5B2a02:6b8:c41:21:0:1517:fdbf:63a5%5D:2135: Failed to connect to remote host: connect: Connection refused (111)"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"connections to all backends failing; last error: UNKNOWN: ipv6:%5B2a02:6b8:c41:21:0:1517:fdbf:63a5%5D:2135: Failed to connect to remote host: connect: Connection refused (111)", grpc_status:14, created_time:"2024-09-20T18:58:56.948845895+03:00"}"
>

Expected behavior:

Steps to reproduce:

Related code:

workaround with reconnect works fine

while True:
    its_time_to_die = (time.perf_counter() - start_time) > topic_client.worker_ttl
    if its_time_to_die:
        # periodically rebooting worker as a workaround for error (RPC already finished)
        raise Exception(
            client=topic_client, message=f"ttl ({topic_client.worker_ttl} sec) has expired"
        )

    try:
        try:
            batch: PublicBatch | None = await asyncio.wait_for(
                reader.receive_batch(), timeout=RECEIVE_MESSAGE_TIMEOUT
            )
        except asyncio.TimeoutError as exc:
            if isinstance(exc.__cause__, asyncio.CancelledError):
                # for cases when future is cancelling immediately for a long time
                # (e.g. while dc maintenance)
                # we need to sleep instead of calling receive_batch() like a ddos
                # TODO: backoff
                await asyncio.sleep(RECEIVE_MESSAGE_TIMEOUT)
            continue

        if batch:
            log_context["batch"] = str(batch)  # FIXME: potentially heavyweight
            if batch.alive:
                await self._process_batch(batch, log_context=log_context)
            if batch.alive:
                await asyncio.wait_for(reader.commit_with_ack(batch), timeout=RECEIVE_MESSAGE_TIMEOUT)

    except Exception as e:
        raise Exception(
            client=topic_client,
            message="YDB Internal Error. Failed to process batch",
            log_context=log_context,
        ) from e

Other information:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions