Skip to content

suspicious memory leak in producer (rdkafka) #1361

Open
@tigerinus

Description

@tigerinus

Description

I have a microservice that consumes messages from Kafka, do some work with it, and publish the result back to Kafka.

However it quickly get OOMKilled after started.

With help of memory profile, I managed to figure out that it's rdk:broker0 contributed the biggest memory usage (In my example it's a 384MiB pod in Kubernetes)

image

As seen in this report, there is no Python object that holds anything larger than 2MB from GC; It's rdk:broker0 holding 4460 allocations and 165MiB of memory unreleased.

Here is the KafkaProducerService code that calls Producer:

# irrelevant code omitted 

class KafkaProducerService:
    '''
    main class
    '''
    def __init__(self, bootstrap_servers: str, topic: str) -> None:
        self.__producer = Producer({
            'bootstrap.servers': bootstrap_servers
        })

        self.__topic = topic
        self.__count = 0
        self.__previous_count_timestamp = datetime.now().timestamp()

    def publish(self, key: str, value: bytes, headers: dict) -> None:
        '''
        publish message
        '''
        try:
            self.__producer.produce(
                self.__topic,
                key=key,
                value=value,
                headers=headers
            )
        except BufferError as error:
            raise RuntimeError(
                "internal producer message queue is full"
            ) from error
        except KafkaException as error:
            raise RuntimeError(
                "error adding to producer message queue"
            ) from error

        num_messages_to_be_delievered = len(self.__producer)
        if num_messages_to_be_delievered > 1000:
            log.debug("wait for %s messages to be delivered to Kafka...",
                      num_messages_to_be_delievered)
            try:
                num_message = self.__producer.flush()
            except KafkaException as error:
                raise RuntimeError(
                    "error when flushing producer message queue to Kafka"
                ) from error

            log.debug("%d messages still in Kafka", num_message)

        self.__count += 1
        self.__count_published()

    def __count_published(self) -> None:
        current_count_timestamp = datetime.now().timestamp()
        if current_count_timestamp - self.__previous_count_timestamp >= 1:
            self.__previous_count_timestamp = current_count_timestamp

            if self.__count == 0:
                return

            log.info("%d messages published (%s messages pending for delivery)",
                     self.__count, len(self.__producer))
            self.__count = 0

How to reproduce

  1. Install https://bloomberg.github.io/memray
  2. Use the snippet code above, build a Python script that keeps re-publishing Kafka messages back to the same topic.
  • each message should be bigger than 50k
  1. Run the script with memray for like 5mins.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
>>> confluent_kafka.version()
('1.8.2', 17302016)
>>> confluent_kafka.libversion()
('1.6.0', 17170687)
  • Apache Kafka broker version:
3.0.0
  • Client configuration: {...}

default, except bootstrap.servers

  • Operating system:

Reproduced on both Alpine and Debian (Bullseye)

Metadata

Metadata

Assignees

No one assigned

    Labels

    investigate furtherIt's unclear what the issue is at this time but there is enough interest to look into itpriority:lowMaintainer triage tag for indicating low impact or criticality issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions