-
Notifications
You must be signed in to change notification settings - Fork 45
refactor osprey to use confluent-kafka instead of kafka-python #220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ef703ca
e38532c
434eb4b
bddcdee
647f5bd
9ba3cb7
6fa6274
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,34 +1,127 @@ | ||
| import platform | ||
| from typing import Any | ||
|
|
||
| import sentry_sdk | ||
| from kafka import KafkaProducer | ||
| from confluent_kafka import Producer | ||
| from confluent_kafka.admin import AdminClient, NewTopic | ||
| from osprey.engine.executor.execution_context import ExecutionResult | ||
| from osprey.worker.lib.osprey_shared.logging import get_logger | ||
| from osprey.worker.sinks.sink.output_sink import BaseOutputSink | ||
| from osprey.worker.sinks.utils.kafka import ThreadedKafkaProducer | ||
|
|
||
| logger = get_logger() | ||
|
|
||
|
|
||
| class EmptyBootstrapServersException(Exception): | ||
| """Exception that is raised whenever the server list provided to KafkaOutputSink is empty.""" | ||
|
|
||
|
|
||
| class InvalidOutputTopicException(Exception): | ||
| """Exception that is raised whenever the output topic passed to KafkaOutputSink is empty.""" | ||
|
|
||
|
|
||
| class KafkaOutputSink(BaseOutputSink): | ||
| """An output sink that sends the extracted features to a given kafka topic.""" | ||
|
|
||
| def __init__(self, kafka_topic: str, kafka_producer: KafkaProducer): | ||
| self._kafka_topic = kafka_topic | ||
| self._kafka_producer = kafka_producer | ||
| def __init__( | ||
| self, | ||
| bootstrap_servers: list[str], | ||
| output_topic: str, | ||
| client_id: str | None, | ||
| ) -> None: | ||
| if len(bootstrap_servers) == 0: | ||
| raise EmptyBootstrapServersException() | ||
|
|
||
| if output_topic == '': | ||
| raise InvalidOutputTopicException() | ||
|
|
||
| self.logger = get_logger('KafkaOutputSink') | ||
|
|
||
| self._bootstrap_servers = bootstrap_servers | ||
| self._output_topic = output_topic | ||
|
|
||
| # NOTE(haileyok): this is...not necessary probably | ||
| self.topic_ensured = False | ||
|
|
||
| if client_id is None: | ||
| client_hostname = platform.node() | ||
| if client_hostname != '': | ||
| client_id = f'{client_hostname};host_override={bootstrap_servers[0]}' | ||
| else: | ||
| client_id = f'osprey-output-sink;host_override={bootstrap_servers[0]}' | ||
|
|
||
| self.logger.info(f'Creating Kafka producer with client id {client_id}') | ||
|
|
||
| config = { | ||
| 'bootstrap.servers': ','.join(bootstrap_servers), | ||
| 'client.id': client_id, | ||
| 'queue.buffering.max.messages': 1_000_000, | ||
| 'linger.ms': 10, | ||
| 'retries': 10, | ||
| 'request.timeout.ms': 30_000, | ||
| 'socket.timeout.ms': 30_000, | ||
| 'delivery.timeout.ms': 120_000, | ||
| 'statistics.interval.ms': 10_000, | ||
| 'log.connection.close': False, | ||
| 'enable.idempotence': True, | ||
| 'acks': 'all', | ||
| 'max.in.flight.requests.per.connection': 5, | ||
| 'message.max.bytes': 20_000_000, | ||
| } | ||
|
|
||
| self._producer = Producer(config) | ||
| self._threaded_producer = ThreadedKafkaProducer(self._producer) | ||
| self.ensure_topic() | ||
|
|
||
| super().__init__() | ||
|
|
||
| def ensure_topic(self) -> None: | ||
| """Create the Kafka topic if it does not yet exist.""" | ||
| admin_client = AdminClient({'bootstrap.servers': ','.join(self._bootstrap_servers)}) | ||
|
|
||
| try: | ||
| metadata = admin_client.list_topics(timeout=10) | ||
| except Exception as e: | ||
| self.logger.error(f'Error listing topics, unable to ensure topic: {e}') | ||
| return | ||
|
|
||
| if self._output_topic in metadata.topics: | ||
| self.topic_ensured = True | ||
| return | ||
|
|
||
| self.logger.info(f'Creating topic {self._output_topic}') | ||
| try: | ||
| # TODO: num_partitions and replication_factor should not be hard coded... | ||
| topic = NewTopic(self._output_topic, num_partitions=3, replication_factor=3) | ||
|
Comment on lines
+94
to
+95
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only other place
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea this would need to be an environment variable for sure. like emelia's comment though, this is kinda an infra thing that might not belong in here and i might just end up removing it. |
||
| fs = admin_client.create_topics([topic]) | ||
| fs[self._output_topic].result() | ||
| self.topic_ensured = True | ||
| except Exception as e: | ||
| self.logger.error(f'Error creating topic, unable to ensure topic: {e}') | ||
|
|
||
| def will_do_work(self, result: ExecutionResult) -> bool: | ||
| return True | ||
|
|
||
| def push(self, result: ExecutionResult) -> None: | ||
| kafka_future: Any = self._kafka_producer.send( | ||
| topic=self._kafka_topic, value=result.extracted_features_json.encode('utf-8') | ||
| self._threaded_producer.produce( | ||
| self._output_topic, | ||
| value=result.extracted_features_json.encode('utf-8'), | ||
| on_delivery=self._on_delivery, | ||
| ) | ||
| kafka_future.add_errback(self.push_err_to_sentry) | ||
|
|
||
| def _on_delivery(self, err: Any, msg: Any) -> None: | ||
| if err is not None: | ||
| self.push_err_to_sentry(err) | ||
|
|
||
| def flush(self, timeout: float = 30) -> int: | ||
| return self._threaded_producer.flush(timeout) | ||
|
|
||
| @classmethod | ||
| def push_err_to_sentry(cls, e: Exception) -> None: | ||
| logger.error(f'exception raised when pushing event to kafka: {str(e)}') | ||
| sentry_sdk.capture_exception(error=e) | ||
|
|
||
| def stop(self) -> None: | ||
| pass | ||
| remaining = self._threaded_producer.close(timeout=10) | ||
| if remaining > 0: | ||
| logger.warning(f'{remaining} messages were not delivered') | ||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be worth making this optional, as in some environments you don't want dynamic topic creation, but instead to manage topics with say, terraform (this was a thing we did with IFTAS CCS, such that anything exploiting our code couldn't gain admin access to kafka)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya i agree, i might just rip this code out entirely but maybe ill make it some environment variable that you can toggle on...idk yet