Skip to content

VALUE_DESERIALIZATION - Connection reset by peer #1334

Open
@mugx-fc

Description

@mugx-fc

Hello 👋
I'm responsible for two App Containers consuming from 2 different Kafka topics: one with Avro schema, the other schema-less.
The schema less consumer never crashed, however the one with schema crashes (and auto restarts) periodically every 2 weeks (a bit noisy, also not nice to have an unwanted downtime of 30 secs).

Now, let's assume the crashing Container has a temporary networking issue on connecting to its schema registry, how would the library (in particular SchemaRegistryClient / DeserializingConsumer) behave in this case ? would it be retrying for a little bit, or just crash brutally ?

Consider I'm having the following setup:

schema_registry_client = SchemaRegistryClient(...)
avro_deserializer = AvroDeserializer(schema_registry_client)
string_deserializer = StringDeserializer("utf_8")
consumer = DeserializingConsumer(...)

the exception is being raised is:

Traceback (most recent call last):
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/requests/adapters.py", line 440, in send
    resp = conn.urlopen(
  File "/urllib3/connectionpool.py", line 785, in urlopen
    retries = retries.increment(
  File "/urllib3/util/retry.py", line 550, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/confluent_kafka/deserializing_consumer.py", line 137, in poll
    value = self._value_deserializer(value, ctx)
  File "/confluent_kafka/schema_registry/avro.py", line 348, in __call__
    schema = self._registry.get_schema(schema_id)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 368, in get_schema
    response = self._rest_client.get('schemas/ids/
{}
'.format(schema_id))
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 124, in get
    return self.send_request(url, method='GET', query=query)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 167, in send_request
    response = self.session.request(
  File "/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/src/kafka_consumer.py", line 88, in <module>
    kafka_consumer()
  File "/src/kafka_consumer.py", line 62, in kafka_consumer
    raise e
  File "/src/kafka_consumer.py", line 48, in kafka_consumer
    message_raw = consumer.poll(1.0)
  File "/confluent_kafka/deserializing_consumer.py", line 139, in poll
    raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))"}

Thank you for any assistance or suggestions 🙏

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementRequesting a feature changepriority:lowMaintainer triage tag for indicating low impact or criticality issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions